blob: a5366f62fcbde420f141f9a7033d509b20983687 [file] [log] [blame]
Sungtak Leed79b6da2018-11-12 17:52:17 -08001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "BufferPoolAccessor"
18//#define LOG_NDEBUG 0
19
20#include <sys/types.h>
21#include <time.h>
22#include <unistd.h>
23#include <utils/Log.h>
24#include "AccessorImpl.h"
25#include "Connection.h"
26
27namespace android {
28namespace hardware {
29namespace media {
30namespace bufferpool {
31namespace V1_0 {
32namespace implementation {
33
34namespace {
35 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
36 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
37
38 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
39 static constexpr size_t kMinBufferCountForEviction = 40;
40}
41
42// Buffer structure in bufferpool process
43struct InternalBuffer {
44 BufferId mId;
45 size_t mOwnerCount;
46 size_t mTransactionCount;
47 const std::shared_ptr<BufferPoolAllocation> mAllocation;
48 const size_t mAllocSize;
49 const std::vector<uint8_t> mConfig;
50
51 InternalBuffer(
52 BufferId id,
53 const std::shared_ptr<BufferPoolAllocation> &alloc,
54 const size_t allocSize,
55 const std::vector<uint8_t> &allocConfig)
56 : mId(id), mOwnerCount(0), mTransactionCount(0),
57 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
58
59 const native_handle_t *handle() {
60 return mAllocation->handle();
61 }
62};
63
64struct TransactionStatus {
65 TransactionId mId;
66 BufferId mBufferId;
67 ConnectionId mSender;
68 ConnectionId mReceiver;
69 BufferStatus mStatus;
70 int64_t mTimestampUs;
71 bool mSenderValidated;
72
73 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
74 mId = message.transactionId;
75 mBufferId = message.bufferId;
76 mStatus = message.newStatus;
77 mTimestampUs = timestampUs;
78 if (mStatus == BufferStatus::TRANSFER_TO) {
79 mSender = message.connectionId;
80 mReceiver = message.targetConnectionId;
81 mSenderValidated = true;
82 } else {
83 mSender = -1LL;
84 mReceiver = message.connectionId;
85 mSenderValidated = false;
86 }
87 }
88};
89
90// Helper template methods for handling map of set.
91template<class T, class U>
92bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
93 auto iter = mapOfSet->find(key);
94 if (iter == mapOfSet->end()) {
95 std::set<U> valueSet{value};
96 mapOfSet->insert(std::make_pair(key, valueSet));
97 return true;
98 } else if (iter->second.find(value) == iter->second.end()) {
99 iter->second.insert(value);
100 return true;
101 }
102 return false;
103}
104
105template<class T, class U>
106bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
107 bool ret = false;
108 auto iter = mapOfSet->find(key);
109 if (iter != mapOfSet->end()) {
110 if (iter->second.erase(value) > 0) {
111 ret = true;
112 }
113 if (iter->second.size() == 0) {
114 mapOfSet->erase(iter);
115 }
116 }
117 return ret;
118}
119
120template<class T, class U>
121bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
122 auto iter = mapOfSet->find(key);
123 if (iter != mapOfSet->end()) {
124 auto setIter = iter->second.find(value);
125 return setIter != iter->second.end();
126 }
127 return false;
128}
129
130int32_t Accessor::Impl::sPid = getpid();
131uint32_t Accessor::Impl::sSeqId = time(nullptr);
132
133Accessor::Impl::Impl(
134 const std::shared_ptr<BufferPoolAllocator> &allocator)
135 : mAllocator(allocator) {}
136
137Accessor::Impl::~Impl() {
138}
139
140ResultStatus Accessor::Impl::connect(
141 const sp<Accessor> &accessor, sp<Connection> *connection,
142 ConnectionId *pConnectionId, const QueueDescriptor** fmqDescPtr) {
143 sp<Connection> newConnection = new Connection();
144 ResultStatus status = ResultStatus::CRITICAL_ERROR;
145 {
146 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
147 if (newConnection) {
148 ConnectionId id = (int64_t)sPid << 32 | sSeqId;
149 status = mBufferPool.mObserver.open(id, fmqDescPtr);
150 if (status == ResultStatus::OK) {
151 newConnection->initialize(accessor, id);
152 *connection = newConnection;
153 *pConnectionId = id;
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700154 mBufferPool.mConnectionIds.insert(id);
Sungtak Leed79b6da2018-11-12 17:52:17 -0800155 ++sSeqId;
156 }
157 }
158 mBufferPool.processStatusMessages();
159 mBufferPool.cleanUp();
160 }
161 return status;
162}
163
164ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
165 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
166 mBufferPool.processStatusMessages();
167 mBufferPool.handleClose(connectionId);
168 mBufferPool.mObserver.close(connectionId);
169 // Since close# will be called after all works are finished, it is OK to
170 // evict unused buffers.
171 mBufferPool.cleanUp(true);
172 return ResultStatus::OK;
173}
174
175ResultStatus Accessor::Impl::allocate(
176 ConnectionId connectionId, const std::vector<uint8_t>& params,
177 BufferId *bufferId, const native_handle_t** handle) {
178 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
179 mBufferPool.processStatusMessages();
180 ResultStatus status = ResultStatus::OK;
181 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
182 lock.unlock();
183 std::shared_ptr<BufferPoolAllocation> alloc;
184 size_t allocSize;
185 status = mAllocator->allocate(params, &alloc, &allocSize);
186 lock.lock();
187 if (status == ResultStatus::OK) {
188 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
189 }
190 ALOGV("create a buffer %d : %u %p",
191 status == ResultStatus::OK, *bufferId, *handle);
192 }
193 if (status == ResultStatus::OK) {
194 // TODO: handle ownBuffer failure
195 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
196 }
197 mBufferPool.cleanUp();
198 return status;
199}
200
201ResultStatus Accessor::Impl::fetch(
202 ConnectionId connectionId, TransactionId transactionId,
203 BufferId bufferId, const native_handle_t** handle) {
204 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
205 mBufferPool.processStatusMessages();
206 auto found = mBufferPool.mTransactions.find(transactionId);
207 if (found != mBufferPool.mTransactions.end() &&
208 contains(&mBufferPool.mPendingTransactions,
209 connectionId, transactionId)) {
210 if (found->second->mSenderValidated &&
211 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
212 found->second->mBufferId == bufferId) {
213 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
214 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
215 if (bufferIt != mBufferPool.mBuffers.end()) {
216 mBufferPool.mStats.onBufferFetched();
217 *handle = bufferIt->second->handle();
218 return ResultStatus::OK;
219 }
220 }
221 }
222 mBufferPool.cleanUp();
223 return ResultStatus::CRITICAL_ERROR;
224}
225
226void Accessor::Impl::cleanUp(bool clearCache) {
227 // transaction timeout, buffer cacheing TTL handling
228 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
229 mBufferPool.processStatusMessages();
230 mBufferPool.cleanUp(clearCache);
231}
232
233Accessor::Impl::Impl::BufferPool::BufferPool()
234 : mTimestampUs(getTimestampNow()),
235 mLastCleanUpUs(mTimestampUs),
236 mLastLogUs(mTimestampUs),
237 mSeq(0) {}
238
239
240// Statistics helper
241template<typename T, typename S>
242int percentage(T base, S total) {
243 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
244}
245
246Accessor::Impl::Impl::BufferPool::~BufferPool() {
247 std::lock_guard<std::mutex> lock(mMutex);
248 ALOGD("Destruction - bufferpool %p "
249 "cached: %zu/%zuM, %zu/%d%% in use; "
250 "allocs: %zu, %d%% recycled; "
251 "transfers: %zu, %d%% unfetced",
252 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
253 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
254 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
255 mStats.mTotalTransfers,
256 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
257}
258
259bool Accessor::Impl::BufferPool::handleOwnBuffer(
260 ConnectionId connectionId, BufferId bufferId) {
261
262 bool added = insert(&mUsingBuffers, connectionId, bufferId);
263 if (added) {
264 auto iter = mBuffers.find(bufferId);
265 iter->second->mOwnerCount++;
266 }
267 insert(&mUsingConnections, bufferId, connectionId);
268 return added;
269}
270
271bool Accessor::Impl::BufferPool::handleReleaseBuffer(
272 ConnectionId connectionId, BufferId bufferId) {
273 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
274 if (deleted) {
275 auto iter = mBuffers.find(bufferId);
276 iter->second->mOwnerCount--;
277 if (iter->second->mOwnerCount == 0 &&
278 iter->second->mTransactionCount == 0) {
279 mStats.onBufferUnused(iter->second->mAllocSize);
280 mFreeBuffers.insert(bufferId);
281 }
282 }
283 erase(&mUsingConnections, bufferId, connectionId);
284 ALOGV("release buffer %u : %d", bufferId, deleted);
285 return deleted;
286}
287
288bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
289 auto completed = mCompletedTransactions.find(
290 message.transactionId);
291 if (completed != mCompletedTransactions.end()) {
292 // already completed
293 mCompletedTransactions.erase(completed);
294 return true;
295 }
296 // the buffer should exist and be owned.
297 auto bufferIter = mBuffers.find(message.bufferId);
298 if (bufferIter == mBuffers.end() ||
299 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
300 return false;
301 }
302 auto found = mTransactions.find(message.transactionId);
303 if (found != mTransactions.end()) {
304 // transfer_from was received earlier.
305 found->second->mSender = message.connectionId;
306 found->second->mSenderValidated = true;
307 return true;
308 }
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700309 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
310 // N.B: it could be fake or receive connection already closed.
311 ALOGD("bufferpool %p receiver connection %lld is no longer valid",
312 this, (long long)message.targetConnectionId);
313 return false;
314 }
Sungtak Leed79b6da2018-11-12 17:52:17 -0800315 mStats.onBufferSent();
316 mTransactions.insert(std::make_pair(
317 message.transactionId,
318 std::make_unique<TransactionStatus>(message, mTimestampUs)));
319 insert(&mPendingTransactions, message.targetConnectionId,
320 message.transactionId);
321 bufferIter->second->mTransactionCount++;
322 return true;
323}
324
325bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
326 auto found = mTransactions.find(message.transactionId);
327 if (found == mTransactions.end()) {
328 // TODO: is it feasible to check ownership here?
329 mStats.onBufferSent();
330 mTransactions.insert(std::make_pair(
331 message.transactionId,
332 std::make_unique<TransactionStatus>(message, mTimestampUs)));
333 insert(&mPendingTransactions, message.connectionId,
334 message.transactionId);
335 auto bufferIter = mBuffers.find(message.bufferId);
336 bufferIter->second->mTransactionCount++;
337 } else {
338 if (message.connectionId == found->second->mReceiver) {
339 found->second->mStatus = BufferStatus::TRANSFER_FROM;
340 }
341 }
342 return true;
343}
344
345bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
346 auto found = mTransactions.find(message.transactionId);
347 if (found != mTransactions.end()) {
348 bool deleted = erase(&mPendingTransactions, message.connectionId,
349 message.transactionId);
350 if (deleted) {
351 if (!found->second->mSenderValidated) {
352 mCompletedTransactions.insert(message.transactionId);
353 }
354 auto bufferIter = mBuffers.find(message.bufferId);
355 if (message.newStatus == BufferStatus::TRANSFER_OK) {
356 handleOwnBuffer(message.connectionId, message.bufferId);
357 }
358 bufferIter->second->mTransactionCount--;
359 if (bufferIter->second->mOwnerCount == 0
360 && bufferIter->second->mTransactionCount == 0) {
361 mStats.onBufferUnused(bufferIter->second->mAllocSize);
362 mFreeBuffers.insert(message.bufferId);
363 }
364 mTransactions.erase(found);
365 }
366 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
367 message.bufferId, deleted);
368 return deleted;
369 }
370 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
371 message.bufferId);
372 return false;
373}
374
375void Accessor::Impl::BufferPool::processStatusMessages() {
376 std::vector<BufferStatusMessage> messages;
377 mObserver.getBufferStatusChanges(messages);
378 mTimestampUs = getTimestampNow();
379 for (BufferStatusMessage& message: messages) {
380 bool ret = false;
381 switch (message.newStatus) {
382 case BufferStatus::NOT_USED:
383 ret = handleReleaseBuffer(
384 message.connectionId, message.bufferId);
385 break;
386 case BufferStatus::USED:
387 // not happening
388 break;
389 case BufferStatus::TRANSFER_TO:
390 ret = handleTransferTo(message);
391 break;
392 case BufferStatus::TRANSFER_FROM:
393 ret = handleTransferFrom(message);
394 break;
395 case BufferStatus::TRANSFER_TIMEOUT:
396 // TODO
397 break;
398 case BufferStatus::TRANSFER_LOST:
399 // TODO
400 break;
401 case BufferStatus::TRANSFER_FETCH:
402 // not happening
403 break;
404 case BufferStatus::TRANSFER_OK:
405 case BufferStatus::TRANSFER_ERROR:
406 ret = handleTransferResult(message);
407 break;
408 }
409 if (ret == false) {
410 ALOGW("buffer status message processing failure - message : %d connection : %lld",
411 message.newStatus, (long long)message.connectionId);
412 }
413 }
414 messages.clear();
415}
416
417bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
418 // Cleaning buffers
419 auto buffers = mUsingBuffers.find(connectionId);
420 if (buffers != mUsingBuffers.end()) {
421 for (const BufferId& bufferId : buffers->second) {
422 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
423 if (deleted) {
424 auto bufferIter = mBuffers.find(bufferId);
425 bufferIter->second->mOwnerCount--;
426 if (bufferIter->second->mOwnerCount == 0 &&
427 bufferIter->second->mTransactionCount == 0) {
428 // TODO: handle freebuffer insert fail
429 mStats.onBufferUnused(bufferIter->second->mAllocSize);
430 mFreeBuffers.insert(bufferId);
431 }
432 }
433 }
434 mUsingBuffers.erase(buffers);
435 }
436
437 // Cleaning transactions
438 auto pending = mPendingTransactions.find(connectionId);
439 if (pending != mPendingTransactions.end()) {
440 for (const TransactionId& transactionId : pending->second) {
441 auto iter = mTransactions.find(transactionId);
442 if (iter != mTransactions.end()) {
443 if (!iter->second->mSenderValidated) {
444 mCompletedTransactions.insert(transactionId);
445 }
446 BufferId bufferId = iter->second->mBufferId;
447 auto bufferIter = mBuffers.find(bufferId);
448 bufferIter->second->mTransactionCount--;
449 if (bufferIter->second->mOwnerCount == 0 &&
450 bufferIter->second->mTransactionCount == 0) {
451 // TODO: handle freebuffer insert fail
452 mStats.onBufferUnused(bufferIter->second->mAllocSize);
453 mFreeBuffers.insert(bufferId);
454 }
455 mTransactions.erase(iter);
456 }
457 }
458 }
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700459 mConnectionIds.erase(connectionId);
Sungtak Leed79b6da2018-11-12 17:52:17 -0800460 return true;
461}
462
463bool Accessor::Impl::BufferPool::getFreeBuffer(
464 const std::shared_ptr<BufferPoolAllocator> &allocator,
465 const std::vector<uint8_t> &params, BufferId *pId,
466 const native_handle_t** handle) {
467 auto bufferIt = mFreeBuffers.begin();
468 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
469 BufferId bufferId = *bufferIt;
470 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
471 break;
472 }
473 }
474 if (bufferIt != mFreeBuffers.end()) {
475 BufferId id = *bufferIt;
476 mFreeBuffers.erase(bufferIt);
477 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
478 *handle = mBuffers[id]->handle();
479 *pId = id;
480 ALOGV("recycle a buffer %u %p", id, *handle);
481 return true;
482 }
483 return false;
484}
485
486ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
487 const std::shared_ptr<BufferPoolAllocation> &alloc,
488 const size_t allocSize,
489 const std::vector<uint8_t> &params,
490 BufferId *pId,
491 const native_handle_t** handle) {
492
493 BufferId bufferId = mSeq++;
494 if (mSeq == Connection::SYNC_BUFFERID) {
495 mSeq = 0;
496 }
497 std::unique_ptr<InternalBuffer> buffer =
498 std::make_unique<InternalBuffer>(
499 bufferId, alloc, allocSize, params);
500 if (buffer) {
501 auto res = mBuffers.insert(std::make_pair(
502 bufferId, std::move(buffer)));
503 if (res.second) {
504 mStats.onBufferAllocated(allocSize);
505 *handle = alloc->handle();
506 *pId = bufferId;
507 return ResultStatus::OK;
508 }
509 }
510 return ResultStatus::NO_MEMORY;
511}
512
513void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
514 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
515 mLastCleanUpUs = mTimestampUs;
516 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
517 mLastLogUs = mTimestampUs;
518 ALOGD("bufferpool %p : %zu(%zu size) total buffers - "
519 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
520 "%zu/%zu (fetch/transfer)",
521 this, mStats.mBuffersCached, mStats.mSizeCached,
522 mStats.mBuffersInUse, mStats.mSizeInUse,
523 mStats.mTotalRecycles, mStats.mTotalAllocations,
524 mStats.mTotalFetches, mStats.mTotalTransfers);
525 }
526 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
527 if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
528 && mBuffers.size() < kMinBufferCountForEviction) {
529 break;
530 }
531 auto it = mBuffers.find(*freeIt);
532 if (it != mBuffers.end() &&
533 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
534 mStats.onBufferEvicted(it->second->mAllocSize);
535 mBuffers.erase(it);
536 freeIt = mFreeBuffers.erase(freeIt);
537 } else {
538 ++freeIt;
539 ALOGW("bufferpool inconsistent!");
540 }
541 }
542 }
543}
544
545} // namespace implementation
546} // namespace V1_0
547} // namespace bufferpool
548} // namespace media
549} // namespace hardware
550} // namespace android