blob: 4cc8abc623f6db096188d9871fd107bad69bce6e [file] [log] [blame]
Sungtak Leebbe37b62018-08-29 15:15:48 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "BufferPoolAccessor"
18//#define LOG_NDEBUG 0
19
20#include <sys/types.h>
21#include <time.h>
22#include <unistd.h>
23#include <utils/Log.h>
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070024#include <thread>
Sungtak Leebbe37b62018-08-29 15:15:48 -070025#include "AccessorImpl.h"
26#include "Connection.h"
27
28namespace android {
29namespace hardware {
30namespace media {
31namespace bufferpool {
32namespace V2_0 {
33namespace implementation {
34
35namespace {
36 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38
39 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40 static constexpr size_t kMinBufferCountForEviction = 40;
41}
42
43// Buffer structure in bufferpool process
44struct InternalBuffer {
45 BufferId mId;
46 size_t mOwnerCount;
47 size_t mTransactionCount;
48 const std::shared_ptr<BufferPoolAllocation> mAllocation;
49 const size_t mAllocSize;
50 const std::vector<uint8_t> mConfig;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070051 bool mInvalidated;
Sungtak Leebbe37b62018-08-29 15:15:48 -070052
53 InternalBuffer(
54 BufferId id,
55 const std::shared_ptr<BufferPoolAllocation> &alloc,
56 const size_t allocSize,
57 const std::vector<uint8_t> &allocConfig)
58 : mId(id), mOwnerCount(0), mTransactionCount(0),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070059 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
60 mInvalidated(false) {}
Sungtak Leebbe37b62018-08-29 15:15:48 -070061
62 const native_handle_t *handle() {
63 return mAllocation->handle();
64 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070065
66 void invalidate() {
67 mInvalidated = true;
68 }
Sungtak Leebbe37b62018-08-29 15:15:48 -070069};
70
71struct TransactionStatus {
72 TransactionId mId;
73 BufferId mBufferId;
74 ConnectionId mSender;
75 ConnectionId mReceiver;
76 BufferStatus mStatus;
77 int64_t mTimestampUs;
78 bool mSenderValidated;
79
80 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
81 mId = message.transactionId;
82 mBufferId = message.bufferId;
83 mStatus = message.newStatus;
84 mTimestampUs = timestampUs;
85 if (mStatus == BufferStatus::TRANSFER_TO) {
86 mSender = message.connectionId;
87 mReceiver = message.targetConnectionId;
88 mSenderValidated = true;
89 } else {
90 mSender = -1LL;
91 mReceiver = message.connectionId;
92 mSenderValidated = false;
93 }
94 }
95};
96
97// Helper template methods for handling map of set.
98template<class T, class U>
99bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
100 auto iter = mapOfSet->find(key);
101 if (iter == mapOfSet->end()) {
102 std::set<U> valueSet{value};
103 mapOfSet->insert(std::make_pair(key, valueSet));
104 return true;
105 } else if (iter->second.find(value) == iter->second.end()) {
106 iter->second.insert(value);
107 return true;
108 }
109 return false;
110}
111
112template<class T, class U>
113bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
114 bool ret = false;
115 auto iter = mapOfSet->find(key);
116 if (iter != mapOfSet->end()) {
117 if (iter->second.erase(value) > 0) {
118 ret = true;
119 }
120 if (iter->second.size() == 0) {
121 mapOfSet->erase(iter);
122 }
123 }
124 return ret;
125}
126
127template<class T, class U>
128bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
129 auto iter = mapOfSet->find(key);
130 if (iter != mapOfSet->end()) {
131 auto setIter = iter->second.find(value);
132 return setIter != iter->second.end();
133 }
134 return false;
135}
136
137int32_t Accessor::Impl::sPid = getpid();
138uint32_t Accessor::Impl::sSeqId = time(nullptr);
139
140Accessor::Impl::Impl(
141 const std::shared_ptr<BufferPoolAllocator> &allocator)
142 : mAllocator(allocator) {}
143
144Accessor::Impl::~Impl() {
145}
146
147ResultStatus Accessor::Impl::connect(
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700148 const sp<Accessor> &accessor, const sp<IObserver> &observer,
149 sp<Connection> *connection,
150 ConnectionId *pConnectionId,
151 uint32_t *pMsgId,
152 const StatusDescriptor** statusDescPtr,
153 const InvalidationDescriptor** invDescPtr) {
Sungtak Leebbe37b62018-08-29 15:15:48 -0700154 sp<Connection> newConnection = new Connection();
155 ResultStatus status = ResultStatus::CRITICAL_ERROR;
156 {
157 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
158 if (newConnection) {
159 ConnectionId id = (int64_t)sPid << 32 | sSeqId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700160 status = mBufferPool.mObserver.open(id, statusDescPtr);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700161 if (status == ResultStatus::OK) {
162 newConnection->initialize(accessor, id);
163 *connection = newConnection;
164 *pConnectionId = id;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700165 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
166 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
167 mBufferPool.mInvalidation.onConnect(id, observer);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700168 ++sSeqId;
169 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700170
Sungtak Leebbe37b62018-08-29 15:15:48 -0700171 }
172 mBufferPool.processStatusMessages();
173 mBufferPool.cleanUp();
174 }
175 return status;
176}
177
178ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
179 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
180 mBufferPool.processStatusMessages();
181 mBufferPool.handleClose(connectionId);
182 mBufferPool.mObserver.close(connectionId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700183 mBufferPool.mInvalidation.onClose(connectionId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700184 // Since close# will be called after all works are finished, it is OK to
185 // evict unused buffers.
186 mBufferPool.cleanUp(true);
187 return ResultStatus::OK;
188}
189
190ResultStatus Accessor::Impl::allocate(
191 ConnectionId connectionId, const std::vector<uint8_t>& params,
192 BufferId *bufferId, const native_handle_t** handle) {
193 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
194 mBufferPool.processStatusMessages();
195 ResultStatus status = ResultStatus::OK;
196 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
197 lock.unlock();
198 std::shared_ptr<BufferPoolAllocation> alloc;
199 size_t allocSize;
200 status = mAllocator->allocate(params, &alloc, &allocSize);
201 lock.lock();
202 if (status == ResultStatus::OK) {
203 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
204 }
205 ALOGV("create a buffer %d : %u %p",
206 status == ResultStatus::OK, *bufferId, *handle);
207 }
208 if (status == ResultStatus::OK) {
209 // TODO: handle ownBuffer failure
210 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
211 }
212 mBufferPool.cleanUp();
213 return status;
214}
215
216ResultStatus Accessor::Impl::fetch(
217 ConnectionId connectionId, TransactionId transactionId,
218 BufferId bufferId, const native_handle_t** handle) {
219 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
220 mBufferPool.processStatusMessages();
221 auto found = mBufferPool.mTransactions.find(transactionId);
222 if (found != mBufferPool.mTransactions.end() &&
223 contains(&mBufferPool.mPendingTransactions,
224 connectionId, transactionId)) {
225 if (found->second->mSenderValidated &&
226 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
227 found->second->mBufferId == bufferId) {
228 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
229 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
230 if (bufferIt != mBufferPool.mBuffers.end()) {
231 mBufferPool.mStats.onBufferFetched();
232 *handle = bufferIt->second->handle();
233 return ResultStatus::OK;
234 }
235 }
236 }
237 mBufferPool.cleanUp();
238 return ResultStatus::CRITICAL_ERROR;
239}
240
241void Accessor::Impl::cleanUp(bool clearCache) {
242 // transaction timeout, buffer cacheing TTL handling
243 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
244 mBufferPool.processStatusMessages();
245 mBufferPool.cleanUp(clearCache);
246}
247
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700248void Accessor::Impl::flush() {
249 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
250 mBufferPool.processStatusMessages();
251 mBufferPool.flush(shared_from_this());
252}
253
254void Accessor::Impl::handleInvalidateAck() {
255 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
256 mBufferPool.processStatusMessages();
257 mBufferPool.mInvalidation.onHandleAck();
258}
259
260bool Accessor::Impl::isValid() {
261 return mBufferPool.isValid();
262}
263
Sungtak Leebbe37b62018-08-29 15:15:48 -0700264Accessor::Impl::Impl::BufferPool::BufferPool()
265 : mTimestampUs(getTimestampNow()),
266 mLastCleanUpUs(mTimestampUs),
267 mLastLogUs(mTimestampUs),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700268 mSeq(0),
269 mStartSeq(0) {
270 mValid = mInvalidationChannel.isValid();
271}
Sungtak Leebbe37b62018-08-29 15:15:48 -0700272
273
274// Statistics helper
275template<typename T, typename S>
276int percentage(T base, S total) {
277 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
278}
279
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700280std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sSeqId(0);
281
Sungtak Leebbe37b62018-08-29 15:15:48 -0700282Accessor::Impl::Impl::BufferPool::~BufferPool() {
283 std::lock_guard<std::mutex> lock(mMutex);
284 ALOGD("Destruction - bufferpool %p "
285 "cached: %zu/%zuM, %zu/%d%% in use; "
286 "allocs: %zu, %d%% recycled; "
287 "transfers: %zu, %d%% unfetced",
288 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
289 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
290 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
291 mStats.mTotalTransfers,
292 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
293}
294
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700295void Accessor::Impl::BufferPool::Invalidation::onConnect(
296 ConnectionId conId, const sp<IObserver>& observer) {
297 mAcks[conId] = mInvalidationId; // starts from current invalidationId
298 mObservers.insert(std::make_pair(conId, observer));
299}
300
301void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
302 mAcks.erase(conId);
303 mObservers.erase(conId);
304}
305
306void Accessor::Impl::BufferPool::Invalidation::onAck(
307 ConnectionId conId,
308 uint32_t msgId) {
309 auto it = mAcks.find(conId);
310 if (it == mAcks.end() || isMessageLater(msgId, it->second)) {
311 mAcks[conId] = msgId;
312 }
313}
314
315void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
316 BufferId bufferId,
317 BufferInvalidationChannel &channel) {
318 for (auto it = mPendings.begin(); it != mPendings.end();) {
319 if (it->invalidate(bufferId)) {
320 it = mPendings.erase(it);
321 uint32_t msgId = 0;
322 if (it->mNeedsAck) {
323 msgId = ++mInvalidationId;
324 if (msgId == 0) {
325 // wrap happens
326 msgId = ++mInvalidationId;
327 }
328 }
329 channel.postInvalidation(msgId, it->mFrom, it->mTo);
330 sInvalidator.addAccessor(mId, it->mImpl);
331 continue;
332 }
333 ++it;
334 }
335}
336
337void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
338 bool needsAck,
339 uint32_t from,
340 uint32_t to,
341 size_t left,
342 BufferInvalidationChannel &channel,
343 const std::shared_ptr<Accessor::Impl> &impl) {
344 if (left == 0) {
345 uint32_t msgId = 0;
346 if (needsAck) {
347 msgId = ++mInvalidationId;
348 if (msgId == 0) {
349 // wrap happens
350 msgId = ++mInvalidationId;
351 }
352 }
353 channel.postInvalidation(msgId, from, to);
354 sInvalidator.addAccessor(mId, impl);
355 } else {
356 // TODO: sending hint message?
357 Pending pending(needsAck, from, to, left, impl);
358 mPendings.push_back(pending);
359 }
360}
361
362void Accessor::Impl::BufferPool::Invalidation::onHandleAck() {
363 if (mInvalidationId != 0) {
364 std::set<int> deads;
365 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
366 if (it->second != mInvalidationId) {
367 const sp<IObserver> observer = mObservers[it->first].promote();
368 if (observer) {
369 observer->onMessage(it->first, mInvalidationId);
370 } else {
371 deads.insert(it->first);
372 }
373 }
374 }
375 if (deads.size() > 0) {
376 for (auto it = deads.begin(); it != deads.end(); ++it) {
377 onClose(*it);
378 }
379 }
380 }
381 // All invalidation Ids are synced.
382 sInvalidator.delAccessor(mId);
383}
384
Sungtak Leebbe37b62018-08-29 15:15:48 -0700385bool Accessor::Impl::BufferPool::handleOwnBuffer(
386 ConnectionId connectionId, BufferId bufferId) {
387
388 bool added = insert(&mUsingBuffers, connectionId, bufferId);
389 if (added) {
390 auto iter = mBuffers.find(bufferId);
391 iter->second->mOwnerCount++;
392 }
393 insert(&mUsingConnections, bufferId, connectionId);
394 return added;
395}
396
397bool Accessor::Impl::BufferPool::handleReleaseBuffer(
398 ConnectionId connectionId, BufferId bufferId) {
399 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
400 if (deleted) {
401 auto iter = mBuffers.find(bufferId);
402 iter->second->mOwnerCount--;
403 if (iter->second->mOwnerCount == 0 &&
404 iter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700405 if (!iter->second->mInvalidated) {
406 mStats.onBufferUnused(iter->second->mAllocSize);
407 mFreeBuffers.insert(bufferId);
408 } else {
409 mStats.onBufferUnused(iter->second->mAllocSize);
410 mStats.onBufferEvicted(iter->second->mAllocSize);
411 mBuffers.erase(iter);
412 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
413 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700414 }
415 }
416 erase(&mUsingConnections, bufferId, connectionId);
417 ALOGV("release buffer %u : %d", bufferId, deleted);
418 return deleted;
419}
420
421bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
422 auto completed = mCompletedTransactions.find(
423 message.transactionId);
424 if (completed != mCompletedTransactions.end()) {
425 // already completed
426 mCompletedTransactions.erase(completed);
427 return true;
428 }
429 // the buffer should exist and be owned.
430 auto bufferIter = mBuffers.find(message.bufferId);
431 if (bufferIter == mBuffers.end() ||
432 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
433 return false;
434 }
435 auto found = mTransactions.find(message.transactionId);
436 if (found != mTransactions.end()) {
437 // transfer_from was received earlier.
438 found->second->mSender = message.connectionId;
439 found->second->mSenderValidated = true;
440 return true;
441 }
442 // TODO: verify there is target connection Id
443 mStats.onBufferSent();
444 mTransactions.insert(std::make_pair(
445 message.transactionId,
446 std::make_unique<TransactionStatus>(message, mTimestampUs)));
447 insert(&mPendingTransactions, message.targetConnectionId,
448 message.transactionId);
449 bufferIter->second->mTransactionCount++;
450 return true;
451}
452
453bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
454 auto found = mTransactions.find(message.transactionId);
455 if (found == mTransactions.end()) {
456 // TODO: is it feasible to check ownership here?
457 mStats.onBufferSent();
458 mTransactions.insert(std::make_pair(
459 message.transactionId,
460 std::make_unique<TransactionStatus>(message, mTimestampUs)));
461 insert(&mPendingTransactions, message.connectionId,
462 message.transactionId);
463 auto bufferIter = mBuffers.find(message.bufferId);
464 bufferIter->second->mTransactionCount++;
465 } else {
466 if (message.connectionId == found->second->mReceiver) {
467 found->second->mStatus = BufferStatus::TRANSFER_FROM;
468 }
469 }
470 return true;
471}
472
473bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
474 auto found = mTransactions.find(message.transactionId);
475 if (found != mTransactions.end()) {
476 bool deleted = erase(&mPendingTransactions, message.connectionId,
477 message.transactionId);
478 if (deleted) {
479 if (!found->second->mSenderValidated) {
480 mCompletedTransactions.insert(message.transactionId);
481 }
482 auto bufferIter = mBuffers.find(message.bufferId);
483 if (message.newStatus == BufferStatus::TRANSFER_OK) {
484 handleOwnBuffer(message.connectionId, message.bufferId);
485 }
486 bufferIter->second->mTransactionCount--;
487 if (bufferIter->second->mOwnerCount == 0
488 && bufferIter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700489 if (!bufferIter->second->mInvalidated) {
490 mStats.onBufferUnused(bufferIter->second->mAllocSize);
491 mFreeBuffers.insert(message.bufferId);
492 } else {
493 mStats.onBufferUnused(bufferIter->second->mAllocSize);
494 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
495 mBuffers.erase(bufferIter);
496 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
497 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700498 }
499 mTransactions.erase(found);
500 }
501 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
502 message.bufferId, deleted);
503 return deleted;
504 }
505 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
506 message.bufferId);
507 return false;
508}
509
510void Accessor::Impl::BufferPool::processStatusMessages() {
511 std::vector<BufferStatusMessage> messages;
512 mObserver.getBufferStatusChanges(messages);
513 mTimestampUs = getTimestampNow();
514 for (BufferStatusMessage& message: messages) {
515 bool ret = false;
516 switch (message.newStatus) {
517 case BufferStatus::NOT_USED:
518 ret = handleReleaseBuffer(
519 message.connectionId, message.bufferId);
520 break;
521 case BufferStatus::USED:
522 // not happening
523 break;
524 case BufferStatus::TRANSFER_TO:
525 ret = handleTransferTo(message);
526 break;
527 case BufferStatus::TRANSFER_FROM:
528 ret = handleTransferFrom(message);
529 break;
530 case BufferStatus::TRANSFER_TIMEOUT:
531 // TODO
532 break;
533 case BufferStatus::TRANSFER_LOST:
534 // TODO
535 break;
536 case BufferStatus::TRANSFER_FETCH:
537 // not happening
538 break;
539 case BufferStatus::TRANSFER_OK:
540 case BufferStatus::TRANSFER_ERROR:
541 ret = handleTransferResult(message);
542 break;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700543 case BufferStatus::INVALIDATION_ACK:
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700544 mInvalidation.onAck(message.connectionId, message.bufferId);
Sungtak Leed491f1f2018-10-05 15:56:56 -0700545 break;
Sungtak Leebbe37b62018-08-29 15:15:48 -0700546 }
547 if (ret == false) {
548 ALOGW("buffer status message processing failure - message : %d connection : %lld",
549 message.newStatus, (long long)message.connectionId);
550 }
551 }
552 messages.clear();
553}
554
555bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
556 // Cleaning buffers
557 auto buffers = mUsingBuffers.find(connectionId);
558 if (buffers != mUsingBuffers.end()) {
559 for (const BufferId& bufferId : buffers->second) {
560 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
561 if (deleted) {
562 auto bufferIter = mBuffers.find(bufferId);
563 bufferIter->second->mOwnerCount--;
564 if (bufferIter->second->mOwnerCount == 0 &&
565 bufferIter->second->mTransactionCount == 0) {
566 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700567 if (!bufferIter->second->mInvalidated) {
568 mStats.onBufferUnused(bufferIter->second->mAllocSize);
569 mFreeBuffers.insert(bufferId);
570 } else {
571 mStats.onBufferUnused(bufferIter->second->mAllocSize);
572 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
573 mBuffers.erase(bufferIter);
574 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
575 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700576 }
577 }
578 }
579 mUsingBuffers.erase(buffers);
580 }
581
582 // Cleaning transactions
583 auto pending = mPendingTransactions.find(connectionId);
584 if (pending != mPendingTransactions.end()) {
585 for (const TransactionId& transactionId : pending->second) {
586 auto iter = mTransactions.find(transactionId);
587 if (iter != mTransactions.end()) {
588 if (!iter->second->mSenderValidated) {
589 mCompletedTransactions.insert(transactionId);
590 }
591 BufferId bufferId = iter->second->mBufferId;
592 auto bufferIter = mBuffers.find(bufferId);
593 bufferIter->second->mTransactionCount--;
594 if (bufferIter->second->mOwnerCount == 0 &&
595 bufferIter->second->mTransactionCount == 0) {
596 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700597 if (!bufferIter->second->mInvalidated) {
598 mStats.onBufferUnused(bufferIter->second->mAllocSize);
599 mFreeBuffers.insert(bufferId);
600 } else {
601 mStats.onBufferUnused(bufferIter->second->mAllocSize);
602 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
603 mBuffers.erase(bufferIter);
604 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
605 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700606 }
607 mTransactions.erase(iter);
608 }
609 }
610 }
611 return true;
612}
613
614bool Accessor::Impl::BufferPool::getFreeBuffer(
615 const std::shared_ptr<BufferPoolAllocator> &allocator,
616 const std::vector<uint8_t> &params, BufferId *pId,
617 const native_handle_t** handle) {
618 auto bufferIt = mFreeBuffers.begin();
619 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
620 BufferId bufferId = *bufferIt;
621 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
622 break;
623 }
624 }
625 if (bufferIt != mFreeBuffers.end()) {
626 BufferId id = *bufferIt;
627 mFreeBuffers.erase(bufferIt);
628 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
629 *handle = mBuffers[id]->handle();
630 *pId = id;
631 ALOGV("recycle a buffer %u %p", id, *handle);
632 return true;
633 }
634 return false;
635}
636
637ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
638 const std::shared_ptr<BufferPoolAllocation> &alloc,
639 const size_t allocSize,
640 const std::vector<uint8_t> &params,
641 BufferId *pId,
642 const native_handle_t** handle) {
643
644 BufferId bufferId = mSeq++;
645 if (mSeq == Connection::SYNC_BUFFERID) {
646 mSeq = 0;
647 }
648 std::unique_ptr<InternalBuffer> buffer =
649 std::make_unique<InternalBuffer>(
650 bufferId, alloc, allocSize, params);
651 if (buffer) {
652 auto res = mBuffers.insert(std::make_pair(
653 bufferId, std::move(buffer)));
654 if (res.second) {
655 mStats.onBufferAllocated(allocSize);
656 *handle = alloc->handle();
657 *pId = bufferId;
658 return ResultStatus::OK;
659 }
660 }
661 return ResultStatus::NO_MEMORY;
662}
663
664void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
665 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
666 mLastCleanUpUs = mTimestampUs;
667 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
668 mLastLogUs = mTimestampUs;
669 ALOGD("bufferpool %p : %zu(%zu size) total buffers - "
670 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
671 "%zu/%zu (fetch/transfer)",
672 this, mStats.mBuffersCached, mStats.mSizeCached,
673 mStats.mBuffersInUse, mStats.mSizeInUse,
674 mStats.mTotalRecycles, mStats.mTotalAllocations,
675 mStats.mTotalFetches, mStats.mTotalTransfers);
676 }
677 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
678 if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
679 && mBuffers.size() < kMinBufferCountForEviction) {
680 break;
681 }
682 auto it = mBuffers.find(*freeIt);
683 if (it != mBuffers.end() &&
684 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
685 mStats.onBufferEvicted(it->second->mAllocSize);
686 mBuffers.erase(it);
687 freeIt = mFreeBuffers.erase(freeIt);
688 } else {
689 ++freeIt;
690 ALOGW("bufferpool inconsistent!");
691 }
692 }
693 }
694}
695
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700696void Accessor::Impl::BufferPool::invalidate(
697 bool needsAck, BufferId from, BufferId to,
698 const std::shared_ptr<Accessor::Impl> &impl) {
699 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
700 if (isBufferInRange(from, to, *freeIt)) {
701 auto it = mBuffers.find(*freeIt);
702 if (it != mBuffers.end() &&
703 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
704 mStats.onBufferEvicted(it->second->mAllocSize);
705 mBuffers.erase(it);
706 freeIt = mFreeBuffers.erase(freeIt);
707 continue;
708 } else {
709 ALOGW("bufferpool inconsistent!");
710 }
711 }
712 ++freeIt;
713 }
714
715 size_t left = 0;
716 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
717 if (isBufferInRange(from, to, it->first)) {
718 it->second->invalidate();
719 ++left;
720 }
721 }
722 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
723}
724
725void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
726 BufferId from = mStartSeq;
727 BufferId to = mSeq;
728 mStartSeq = mSeq;
729 // TODO: needsAck params
730 if (from != to) {
731 invalidate(true, from, to, impl);
732 }
733}
734
735void Accessor::Impl::invalidatorThread(
736 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
737 std::mutex &mutex,
738 std::condition_variable &cv,
739 bool &ready) {
740 while(true) {
741 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
742 {
743 std::unique_lock<std::mutex> lock(mutex);
744 if (!ready) {
745 cv.wait(lock);
746 }
747 copied.insert(accessors.begin(), accessors.end());
748 }
749 std::list<ConnectionId> erased;
750 for (auto it = copied.begin(); it != copied.end(); ++it) {
751 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
752 if (!impl) {
753 erased.push_back(it->first);
754 } else {
755 impl->handleInvalidateAck();
756 }
757 }
758 {
759 std::unique_lock<std::mutex> lock(mutex);
760 for (auto it = erased.begin(); it != erased.end(); ++it) {
761 accessors.erase(*it);
762 }
763 if (accessors.size() == 0) {
764 ready = false;
765 } else {
766 // prevent draining cpu.
767 lock.unlock();
768 std::this_thread::yield();
769 }
770 }
771 }
772}
773
774Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
775 std::thread invalidator(
776 invalidatorThread,
777 std::ref(mAccessors),
778 std::ref(mMutex),
779 std::ref(mCv),
780 std::ref(mReady));
781 invalidator.detach();
782}
783
784void Accessor::Impl::AccessorInvalidator::addAccessor(
785 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
786 bool notify = false;
787 std::unique_lock<std::mutex> lock(mMutex);
788 if (mAccessors.find(accessorId) == mAccessors.end()) {
789 if (!mReady) {
790 mReady = true;
791 notify = true;
792 }
793 mAccessors.insert(std::make_pair(accessorId, impl));
794 }
795 lock.unlock();
796 if (notify) {
797 mCv.notify_one();
798 }
799}
800
801void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
802 std::lock_guard<std::mutex> lock(mMutex);
803 mAccessors.erase(accessorId);
804 if (mAccessors.size() == 0) {
805 mReady = false;
806 }
807}
808
809Accessor::Impl::AccessorInvalidator Accessor::Impl::sInvalidator;
810
Sungtak Leebbe37b62018-08-29 15:15:48 -0700811} // namespace implementation
812} // namespace V2_0
813} // namespace bufferpool
814} // namespace media
815} // namespace hardware
816} // namespace android