blob: 526090943cb95e2903d0bc57b477ae639a41baa1 [file] [log] [blame]
Sungtak Leebbe37b62018-08-29 15:15:48 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "BufferPoolAccessor"
18//#define LOG_NDEBUG 0
19
20#include <sys/types.h>
21#include <time.h>
22#include <unistd.h>
23#include <utils/Log.h>
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070024#include <thread>
Sungtak Leebbe37b62018-08-29 15:15:48 -070025#include "AccessorImpl.h"
26#include "Connection.h"
27
28namespace android {
29namespace hardware {
30namespace media {
31namespace bufferpool {
32namespace V2_0 {
33namespace implementation {
34
35namespace {
36 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38
39 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
40 static constexpr size_t kMinBufferCountForEviction = 40;
41}
42
43// Buffer structure in bufferpool process
44struct InternalBuffer {
45 BufferId mId;
46 size_t mOwnerCount;
47 size_t mTransactionCount;
48 const std::shared_ptr<BufferPoolAllocation> mAllocation;
49 const size_t mAllocSize;
50 const std::vector<uint8_t> mConfig;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070051 bool mInvalidated;
Sungtak Leebbe37b62018-08-29 15:15:48 -070052
53 InternalBuffer(
54 BufferId id,
55 const std::shared_ptr<BufferPoolAllocation> &alloc,
56 const size_t allocSize,
57 const std::vector<uint8_t> &allocConfig)
58 : mId(id), mOwnerCount(0), mTransactionCount(0),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070059 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
60 mInvalidated(false) {}
Sungtak Leebbe37b62018-08-29 15:15:48 -070061
62 const native_handle_t *handle() {
63 return mAllocation->handle();
64 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070065
66 void invalidate() {
67 mInvalidated = true;
68 }
Sungtak Leebbe37b62018-08-29 15:15:48 -070069};
70
71struct TransactionStatus {
72 TransactionId mId;
73 BufferId mBufferId;
74 ConnectionId mSender;
75 ConnectionId mReceiver;
76 BufferStatus mStatus;
77 int64_t mTimestampUs;
78 bool mSenderValidated;
79
80 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
81 mId = message.transactionId;
82 mBufferId = message.bufferId;
83 mStatus = message.newStatus;
84 mTimestampUs = timestampUs;
85 if (mStatus == BufferStatus::TRANSFER_TO) {
86 mSender = message.connectionId;
87 mReceiver = message.targetConnectionId;
88 mSenderValidated = true;
89 } else {
90 mSender = -1LL;
91 mReceiver = message.connectionId;
92 mSenderValidated = false;
93 }
94 }
95};
96
97// Helper template methods for handling map of set.
98template<class T, class U>
99bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
100 auto iter = mapOfSet->find(key);
101 if (iter == mapOfSet->end()) {
102 std::set<U> valueSet{value};
103 mapOfSet->insert(std::make_pair(key, valueSet));
104 return true;
105 } else if (iter->second.find(value) == iter->second.end()) {
106 iter->second.insert(value);
107 return true;
108 }
109 return false;
110}
111
112template<class T, class U>
113bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
114 bool ret = false;
115 auto iter = mapOfSet->find(key);
116 if (iter != mapOfSet->end()) {
117 if (iter->second.erase(value) > 0) {
118 ret = true;
119 }
120 if (iter->second.size() == 0) {
121 mapOfSet->erase(iter);
122 }
123 }
124 return ret;
125}
126
127template<class T, class U>
128bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
129 auto iter = mapOfSet->find(key);
130 if (iter != mapOfSet->end()) {
131 auto setIter = iter->second.find(value);
132 return setIter != iter->second.end();
133 }
134 return false;
135}
136
137int32_t Accessor::Impl::sPid = getpid();
138uint32_t Accessor::Impl::sSeqId = time(nullptr);
139
140Accessor::Impl::Impl(
141 const std::shared_ptr<BufferPoolAllocator> &allocator)
142 : mAllocator(allocator) {}
143
144Accessor::Impl::~Impl() {
145}
146
147ResultStatus Accessor::Impl::connect(
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700148 const sp<Accessor> &accessor, const sp<IObserver> &observer,
149 sp<Connection> *connection,
150 ConnectionId *pConnectionId,
151 uint32_t *pMsgId,
152 const StatusDescriptor** statusDescPtr,
153 const InvalidationDescriptor** invDescPtr) {
Sungtak Leebbe37b62018-08-29 15:15:48 -0700154 sp<Connection> newConnection = new Connection();
155 ResultStatus status = ResultStatus::CRITICAL_ERROR;
156 {
157 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
158 if (newConnection) {
159 ConnectionId id = (int64_t)sPid << 32 | sSeqId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700160 status = mBufferPool.mObserver.open(id, statusDescPtr);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700161 if (status == ResultStatus::OK) {
162 newConnection->initialize(accessor, id);
163 *connection = newConnection;
164 *pConnectionId = id;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700165 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
166 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
167 mBufferPool.mInvalidation.onConnect(id, observer);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700168 ++sSeqId;
169 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700170
Sungtak Leebbe37b62018-08-29 15:15:48 -0700171 }
172 mBufferPool.processStatusMessages();
173 mBufferPool.cleanUp();
174 }
175 return status;
176}
177
178ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
179 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
Sungtak Leed3128382018-11-07 17:30:37 -0800180 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700181 mBufferPool.processStatusMessages();
182 mBufferPool.handleClose(connectionId);
183 mBufferPool.mObserver.close(connectionId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700184 mBufferPool.mInvalidation.onClose(connectionId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700185 // Since close# will be called after all works are finished, it is OK to
186 // evict unused buffers.
187 mBufferPool.cleanUp(true);
188 return ResultStatus::OK;
189}
190
191ResultStatus Accessor::Impl::allocate(
192 ConnectionId connectionId, const std::vector<uint8_t>& params,
193 BufferId *bufferId, const native_handle_t** handle) {
194 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
195 mBufferPool.processStatusMessages();
196 ResultStatus status = ResultStatus::OK;
197 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
198 lock.unlock();
199 std::shared_ptr<BufferPoolAllocation> alloc;
200 size_t allocSize;
201 status = mAllocator->allocate(params, &alloc, &allocSize);
202 lock.lock();
203 if (status == ResultStatus::OK) {
204 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
205 }
206 ALOGV("create a buffer %d : %u %p",
207 status == ResultStatus::OK, *bufferId, *handle);
208 }
209 if (status == ResultStatus::OK) {
210 // TODO: handle ownBuffer failure
211 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
212 }
213 mBufferPool.cleanUp();
214 return status;
215}
216
217ResultStatus Accessor::Impl::fetch(
218 ConnectionId connectionId, TransactionId transactionId,
219 BufferId bufferId, const native_handle_t** handle) {
220 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
221 mBufferPool.processStatusMessages();
222 auto found = mBufferPool.mTransactions.find(transactionId);
223 if (found != mBufferPool.mTransactions.end() &&
224 contains(&mBufferPool.mPendingTransactions,
225 connectionId, transactionId)) {
226 if (found->second->mSenderValidated &&
227 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
228 found->second->mBufferId == bufferId) {
229 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
230 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
231 if (bufferIt != mBufferPool.mBuffers.end()) {
232 mBufferPool.mStats.onBufferFetched();
233 *handle = bufferIt->second->handle();
234 return ResultStatus::OK;
235 }
236 }
237 }
238 mBufferPool.cleanUp();
239 return ResultStatus::CRITICAL_ERROR;
240}
241
242void Accessor::Impl::cleanUp(bool clearCache) {
243 // transaction timeout, buffer cacheing TTL handling
244 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
245 mBufferPool.processStatusMessages();
246 mBufferPool.cleanUp(clearCache);
247}
248
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700249void Accessor::Impl::flush() {
250 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
251 mBufferPool.processStatusMessages();
252 mBufferPool.flush(shared_from_this());
253}
254
255void Accessor::Impl::handleInvalidateAck() {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800256 std::map<ConnectionId, const sp<IObserver>> observers;
257 uint32_t invalidationId;
258 {
259 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
260 mBufferPool.processStatusMessages();
261 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
262 }
263 // Do not hold lock for send invalidations
264 for (auto it = observers.begin(); it != observers.end(); ++it) {
265 const sp<IObserver> observer = it->second;
266 if (observer) {
267 Return<void> transResult = observer->onMessage(it->first, invalidationId);
268 (void) transResult;
269 }
270 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700271}
272
273bool Accessor::Impl::isValid() {
274 return mBufferPool.isValid();
275}
276
Sungtak Leebbe37b62018-08-29 15:15:48 -0700277Accessor::Impl::Impl::BufferPool::BufferPool()
278 : mTimestampUs(getTimestampNow()),
279 mLastCleanUpUs(mTimestampUs),
280 mLastLogUs(mTimestampUs),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700281 mSeq(0),
282 mStartSeq(0) {
283 mValid = mInvalidationChannel.isValid();
284}
Sungtak Leebbe37b62018-08-29 15:15:48 -0700285
286
287// Statistics helper
288template<typename T, typename S>
289int percentage(T base, S total) {
290 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
291}
292
Sungtak Leed3128382018-11-07 17:30:37 -0800293std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700294
Sungtak Leebbe37b62018-08-29 15:15:48 -0700295Accessor::Impl::Impl::BufferPool::~BufferPool() {
296 std::lock_guard<std::mutex> lock(mMutex);
Sungtak Leed3318082018-09-07 15:52:43 -0700297 ALOGD("Destruction - bufferpool2 %p "
Sungtak Leebbe37b62018-08-29 15:15:48 -0700298 "cached: %zu/%zuM, %zu/%d%% in use; "
299 "allocs: %zu, %d%% recycled; "
300 "transfers: %zu, %d%% unfetced",
301 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
302 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
303 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
304 mStats.mTotalTransfers,
305 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
306}
307
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700308void Accessor::Impl::BufferPool::Invalidation::onConnect(
309 ConnectionId conId, const sp<IObserver>& observer) {
310 mAcks[conId] = mInvalidationId; // starts from current invalidationId
311 mObservers.insert(std::make_pair(conId, observer));
312}
313
314void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
315 mAcks.erase(conId);
316 mObservers.erase(conId);
317}
318
319void Accessor::Impl::BufferPool::Invalidation::onAck(
320 ConnectionId conId,
321 uint32_t msgId) {
322 auto it = mAcks.find(conId);
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800323 if (it == mAcks.end()) {
324 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
325 return;
326 }
327 if (isMessageLater(msgId, it->second)) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700328 mAcks[conId] = msgId;
329 }
330}
331
332void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
333 BufferId bufferId,
334 BufferInvalidationChannel &channel) {
335 for (auto it = mPendings.begin(); it != mPendings.end();) {
Sungtak Leed3128382018-11-07 17:30:37 -0800336 if (it->isInvalidated(bufferId)) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700337 uint32_t msgId = 0;
338 if (it->mNeedsAck) {
339 msgId = ++mInvalidationId;
340 if (msgId == 0) {
341 // wrap happens
342 msgId = ++mInvalidationId;
343 }
344 }
345 channel.postInvalidation(msgId, it->mFrom, it->mTo);
Sungtak Leed3128382018-11-07 17:30:37 -0800346 it = mPendings.erase(it);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700347 continue;
348 }
349 ++it;
350 }
351}
352
353void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
354 bool needsAck,
355 uint32_t from,
356 uint32_t to,
357 size_t left,
358 BufferInvalidationChannel &channel,
359 const std::shared_ptr<Accessor::Impl> &impl) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700360 uint32_t msgId = 0;
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800361 if (needsAck) {
362 msgId = ++mInvalidationId;
363 if (msgId == 0) {
364 // wrap happens
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700365 msgId = ++mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700366 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800367 }
Sungtak Leed3318082018-09-07 15:52:43 -0700368 ALOGV("bufferpool2 invalidation requested and queued");
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800369 if (left == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700370 channel.postInvalidation(msgId, from, to);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700371 } else {
372 // TODO: sending hint message?
Sungtak Leed3318082018-09-07 15:52:43 -0700373 ALOGV("bufferpoo2 invalidation requested and pending");
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700374 Pending pending(needsAck, from, to, left, impl);
375 mPendings.push_back(pending);
376 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800377 sInvalidator->addAccessor(mId, impl);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700378}
379
Sungtak Leebe6a1182018-12-17 19:00:40 -0800380void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
381 std::map<ConnectionId, const sp<IObserver>> *observers,
382 uint32_t *invalidationId) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700383 if (mInvalidationId != 0) {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800384 *invalidationId = mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700385 std::set<int> deads;
386 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
387 if (it->second != mInvalidationId) {
Sungtak Leed3128382018-11-07 17:30:37 -0800388 const sp<IObserver> observer = mObservers[it->first];
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700389 if (observer) {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800390 observers->emplace(it->first, observer);
391 ALOGV("connection %lld will call observer (%u: %u)",
Sungtak Leed3128382018-11-07 17:30:37 -0800392 (long long)it->first, it->second, mInvalidationId);
Sungtak Leebe6a1182018-12-17 19:00:40 -0800393 // N.B: onMessage will be called later. ignore possibility of
394 // onMessage# oneway call being lost.
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800395 it->second = mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700396 } else {
Sungtak Leed3318082018-09-07 15:52:43 -0700397 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700398 deads.insert(it->first);
399 }
400 }
401 }
402 if (deads.size() > 0) {
403 for (auto it = deads.begin(); it != deads.end(); ++it) {
404 onClose(*it);
405 }
406 }
407 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800408 if (mPendings.size() == 0) {
409 // All invalidation Ids are synced and no more pending invalidations.
410 sInvalidator->delAccessor(mId);
411 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700412}
413
Sungtak Leebbe37b62018-08-29 15:15:48 -0700414bool Accessor::Impl::BufferPool::handleOwnBuffer(
415 ConnectionId connectionId, BufferId bufferId) {
416
417 bool added = insert(&mUsingBuffers, connectionId, bufferId);
418 if (added) {
419 auto iter = mBuffers.find(bufferId);
420 iter->second->mOwnerCount++;
421 }
422 insert(&mUsingConnections, bufferId, connectionId);
423 return added;
424}
425
426bool Accessor::Impl::BufferPool::handleReleaseBuffer(
427 ConnectionId connectionId, BufferId bufferId) {
428 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
429 if (deleted) {
430 auto iter = mBuffers.find(bufferId);
431 iter->second->mOwnerCount--;
432 if (iter->second->mOwnerCount == 0 &&
433 iter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700434 if (!iter->second->mInvalidated) {
435 mStats.onBufferUnused(iter->second->mAllocSize);
436 mFreeBuffers.insert(bufferId);
437 } else {
438 mStats.onBufferUnused(iter->second->mAllocSize);
439 mStats.onBufferEvicted(iter->second->mAllocSize);
440 mBuffers.erase(iter);
441 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
442 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700443 }
444 }
445 erase(&mUsingConnections, bufferId, connectionId);
446 ALOGV("release buffer %u : %d", bufferId, deleted);
447 return deleted;
448}
449
450bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
451 auto completed = mCompletedTransactions.find(
452 message.transactionId);
453 if (completed != mCompletedTransactions.end()) {
454 // already completed
455 mCompletedTransactions.erase(completed);
456 return true;
457 }
458 // the buffer should exist and be owned.
459 auto bufferIter = mBuffers.find(message.bufferId);
460 if (bufferIter == mBuffers.end() ||
461 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
462 return false;
463 }
464 auto found = mTransactions.find(message.transactionId);
465 if (found != mTransactions.end()) {
466 // transfer_from was received earlier.
467 found->second->mSender = message.connectionId;
468 found->second->mSenderValidated = true;
469 return true;
470 }
471 // TODO: verify there is target connection Id
472 mStats.onBufferSent();
473 mTransactions.insert(std::make_pair(
474 message.transactionId,
475 std::make_unique<TransactionStatus>(message, mTimestampUs)));
476 insert(&mPendingTransactions, message.targetConnectionId,
477 message.transactionId);
478 bufferIter->second->mTransactionCount++;
479 return true;
480}
481
482bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
483 auto found = mTransactions.find(message.transactionId);
484 if (found == mTransactions.end()) {
485 // TODO: is it feasible to check ownership here?
486 mStats.onBufferSent();
487 mTransactions.insert(std::make_pair(
488 message.transactionId,
489 std::make_unique<TransactionStatus>(message, mTimestampUs)));
490 insert(&mPendingTransactions, message.connectionId,
491 message.transactionId);
492 auto bufferIter = mBuffers.find(message.bufferId);
493 bufferIter->second->mTransactionCount++;
494 } else {
495 if (message.connectionId == found->second->mReceiver) {
496 found->second->mStatus = BufferStatus::TRANSFER_FROM;
497 }
498 }
499 return true;
500}
501
502bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
503 auto found = mTransactions.find(message.transactionId);
504 if (found != mTransactions.end()) {
505 bool deleted = erase(&mPendingTransactions, message.connectionId,
506 message.transactionId);
507 if (deleted) {
508 if (!found->second->mSenderValidated) {
509 mCompletedTransactions.insert(message.transactionId);
510 }
511 auto bufferIter = mBuffers.find(message.bufferId);
512 if (message.newStatus == BufferStatus::TRANSFER_OK) {
513 handleOwnBuffer(message.connectionId, message.bufferId);
514 }
515 bufferIter->second->mTransactionCount--;
516 if (bufferIter->second->mOwnerCount == 0
517 && bufferIter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700518 if (!bufferIter->second->mInvalidated) {
519 mStats.onBufferUnused(bufferIter->second->mAllocSize);
520 mFreeBuffers.insert(message.bufferId);
521 } else {
522 mStats.onBufferUnused(bufferIter->second->mAllocSize);
523 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
524 mBuffers.erase(bufferIter);
525 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
526 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700527 }
528 mTransactions.erase(found);
529 }
530 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
531 message.bufferId, deleted);
532 return deleted;
533 }
534 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
535 message.bufferId);
536 return false;
537}
538
539void Accessor::Impl::BufferPool::processStatusMessages() {
540 std::vector<BufferStatusMessage> messages;
541 mObserver.getBufferStatusChanges(messages);
542 mTimestampUs = getTimestampNow();
543 for (BufferStatusMessage& message: messages) {
544 bool ret = false;
545 switch (message.newStatus) {
546 case BufferStatus::NOT_USED:
547 ret = handleReleaseBuffer(
548 message.connectionId, message.bufferId);
549 break;
550 case BufferStatus::USED:
551 // not happening
552 break;
553 case BufferStatus::TRANSFER_TO:
554 ret = handleTransferTo(message);
555 break;
556 case BufferStatus::TRANSFER_FROM:
557 ret = handleTransferFrom(message);
558 break;
559 case BufferStatus::TRANSFER_TIMEOUT:
560 // TODO
561 break;
562 case BufferStatus::TRANSFER_LOST:
563 // TODO
564 break;
565 case BufferStatus::TRANSFER_FETCH:
566 // not happening
567 break;
568 case BufferStatus::TRANSFER_OK:
569 case BufferStatus::TRANSFER_ERROR:
570 ret = handleTransferResult(message);
571 break;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700572 case BufferStatus::INVALIDATION_ACK:
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700573 mInvalidation.onAck(message.connectionId, message.bufferId);
Sungtak Leed3128382018-11-07 17:30:37 -0800574 ret = true;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700575 break;
Sungtak Leebbe37b62018-08-29 15:15:48 -0700576 }
577 if (ret == false) {
578 ALOGW("buffer status message processing failure - message : %d connection : %lld",
579 message.newStatus, (long long)message.connectionId);
580 }
581 }
582 messages.clear();
583}
584
585bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
586 // Cleaning buffers
587 auto buffers = mUsingBuffers.find(connectionId);
588 if (buffers != mUsingBuffers.end()) {
589 for (const BufferId& bufferId : buffers->second) {
590 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
591 if (deleted) {
592 auto bufferIter = mBuffers.find(bufferId);
593 bufferIter->second->mOwnerCount--;
594 if (bufferIter->second->mOwnerCount == 0 &&
595 bufferIter->second->mTransactionCount == 0) {
596 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700597 if (!bufferIter->second->mInvalidated) {
598 mStats.onBufferUnused(bufferIter->second->mAllocSize);
599 mFreeBuffers.insert(bufferId);
600 } else {
601 mStats.onBufferUnused(bufferIter->second->mAllocSize);
602 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
603 mBuffers.erase(bufferIter);
604 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
605 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700606 }
607 }
608 }
609 mUsingBuffers.erase(buffers);
610 }
611
612 // Cleaning transactions
613 auto pending = mPendingTransactions.find(connectionId);
614 if (pending != mPendingTransactions.end()) {
615 for (const TransactionId& transactionId : pending->second) {
616 auto iter = mTransactions.find(transactionId);
617 if (iter != mTransactions.end()) {
618 if (!iter->second->mSenderValidated) {
619 mCompletedTransactions.insert(transactionId);
620 }
621 BufferId bufferId = iter->second->mBufferId;
622 auto bufferIter = mBuffers.find(bufferId);
623 bufferIter->second->mTransactionCount--;
624 if (bufferIter->second->mOwnerCount == 0 &&
625 bufferIter->second->mTransactionCount == 0) {
626 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700627 if (!bufferIter->second->mInvalidated) {
628 mStats.onBufferUnused(bufferIter->second->mAllocSize);
629 mFreeBuffers.insert(bufferId);
630 } else {
631 mStats.onBufferUnused(bufferIter->second->mAllocSize);
632 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
633 mBuffers.erase(bufferIter);
634 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
635 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700636 }
637 mTransactions.erase(iter);
638 }
639 }
640 }
641 return true;
642}
643
644bool Accessor::Impl::BufferPool::getFreeBuffer(
645 const std::shared_ptr<BufferPoolAllocator> &allocator,
646 const std::vector<uint8_t> &params, BufferId *pId,
647 const native_handle_t** handle) {
648 auto bufferIt = mFreeBuffers.begin();
649 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
650 BufferId bufferId = *bufferIt;
651 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
652 break;
653 }
654 }
655 if (bufferIt != mFreeBuffers.end()) {
656 BufferId id = *bufferIt;
657 mFreeBuffers.erase(bufferIt);
658 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
659 *handle = mBuffers[id]->handle();
660 *pId = id;
661 ALOGV("recycle a buffer %u %p", id, *handle);
662 return true;
663 }
664 return false;
665}
666
667ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
668 const std::shared_ptr<BufferPoolAllocation> &alloc,
669 const size_t allocSize,
670 const std::vector<uint8_t> &params,
671 BufferId *pId,
672 const native_handle_t** handle) {
673
674 BufferId bufferId = mSeq++;
675 if (mSeq == Connection::SYNC_BUFFERID) {
676 mSeq = 0;
677 }
678 std::unique_ptr<InternalBuffer> buffer =
679 std::make_unique<InternalBuffer>(
680 bufferId, alloc, allocSize, params);
681 if (buffer) {
682 auto res = mBuffers.insert(std::make_pair(
683 bufferId, std::move(buffer)));
684 if (res.second) {
685 mStats.onBufferAllocated(allocSize);
686 *handle = alloc->handle();
687 *pId = bufferId;
688 return ResultStatus::OK;
689 }
690 }
691 return ResultStatus::NO_MEMORY;
692}
693
694void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
695 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
696 mLastCleanUpUs = mTimestampUs;
697 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
698 mLastLogUs = mTimestampUs;
Sungtak Leed3318082018-09-07 15:52:43 -0700699 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
Sungtak Leebbe37b62018-08-29 15:15:48 -0700700 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
701 "%zu/%zu (fetch/transfer)",
702 this, mStats.mBuffersCached, mStats.mSizeCached,
703 mStats.mBuffersInUse, mStats.mSizeInUse,
704 mStats.mTotalRecycles, mStats.mTotalAllocations,
705 mStats.mTotalFetches, mStats.mTotalTransfers);
706 }
707 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
708 if (!clearCache && mStats.mSizeCached < kMinAllocBytesForEviction
709 && mBuffers.size() < kMinBufferCountForEviction) {
710 break;
711 }
712 auto it = mBuffers.find(*freeIt);
713 if (it != mBuffers.end() &&
714 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
715 mStats.onBufferEvicted(it->second->mAllocSize);
716 mBuffers.erase(it);
717 freeIt = mFreeBuffers.erase(freeIt);
718 } else {
719 ++freeIt;
Sungtak Leed3318082018-09-07 15:52:43 -0700720 ALOGW("bufferpool2 inconsistent!");
Sungtak Leebbe37b62018-08-29 15:15:48 -0700721 }
722 }
723 }
724}
725
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700726void Accessor::Impl::BufferPool::invalidate(
727 bool needsAck, BufferId from, BufferId to,
728 const std::shared_ptr<Accessor::Impl> &impl) {
729 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
730 if (isBufferInRange(from, to, *freeIt)) {
731 auto it = mBuffers.find(*freeIt);
732 if (it != mBuffers.end() &&
733 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
734 mStats.onBufferEvicted(it->second->mAllocSize);
735 mBuffers.erase(it);
736 freeIt = mFreeBuffers.erase(freeIt);
737 continue;
738 } else {
Sungtak Leed3318082018-09-07 15:52:43 -0700739 ALOGW("bufferpool2 inconsistent!");
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700740 }
741 }
742 ++freeIt;
743 }
744
745 size_t left = 0;
746 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
747 if (isBufferInRange(from, to, it->first)) {
748 it->second->invalidate();
749 ++left;
750 }
751 }
752 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
753}
754
755void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
756 BufferId from = mStartSeq;
757 BufferId to = mSeq;
758 mStartSeq = mSeq;
759 // TODO: needsAck params
Sungtak Leed3128382018-11-07 17:30:37 -0800760 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700761 if (from != to) {
762 invalidate(true, from, to, impl);
763 }
764}
765
766void Accessor::Impl::invalidatorThread(
767 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
768 std::mutex &mutex,
769 std::condition_variable &cv,
770 bool &ready) {
771 while(true) {
772 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
773 {
774 std::unique_lock<std::mutex> lock(mutex);
775 if (!ready) {
776 cv.wait(lock);
777 }
778 copied.insert(accessors.begin(), accessors.end());
779 }
780 std::list<ConnectionId> erased;
781 for (auto it = copied.begin(); it != copied.end(); ++it) {
782 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
783 if (!impl) {
784 erased.push_back(it->first);
785 } else {
786 impl->handleInvalidateAck();
787 }
788 }
789 {
790 std::unique_lock<std::mutex> lock(mutex);
791 for (auto it = erased.begin(); it != erased.end(); ++it) {
792 accessors.erase(*it);
793 }
794 if (accessors.size() == 0) {
795 ready = false;
796 } else {
797 // prevent draining cpu.
798 lock.unlock();
799 std::this_thread::yield();
800 }
801 }
802 }
803}
804
805Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
806 std::thread invalidator(
807 invalidatorThread,
808 std::ref(mAccessors),
809 std::ref(mMutex),
810 std::ref(mCv),
811 std::ref(mReady));
812 invalidator.detach();
813}
814
815void Accessor::Impl::AccessorInvalidator::addAccessor(
816 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
817 bool notify = false;
818 std::unique_lock<std::mutex> lock(mMutex);
819 if (mAccessors.find(accessorId) == mAccessors.end()) {
820 if (!mReady) {
821 mReady = true;
822 notify = true;
823 }
824 mAccessors.insert(std::make_pair(accessorId, impl));
Sungtak Leed3128382018-11-07 17:30:37 -0800825 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700826 }
827 lock.unlock();
828 if (notify) {
829 mCv.notify_one();
830 }
831}
832
833void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
834 std::lock_guard<std::mutex> lock(mMutex);
835 mAccessors.erase(accessorId);
Sungtak Leed3128382018-11-07 17:30:37 -0800836 ALOGV("buffer invalidation deleted bp:%u", accessorId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700837 if (mAccessors.size() == 0) {
838 mReady = false;
839 }
840}
841
Sungtak Leed3128382018-11-07 17:30:37 -0800842std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
843
844void Accessor::Impl::createInvalidator() {
845 if (!sInvalidator) {
846 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
847 }
848}
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700849
Sungtak Leebbe37b62018-08-29 15:15:48 -0700850} // namespace implementation
851} // namespace V2_0
852} // namespace bufferpool
853} // namespace media
854} // namespace hardware
855} // namespace android