blob: 0d591d78a9de67232ed15760e31c57185833444b [file] [log] [blame]
Sungtak Leebbe37b62018-08-29 15:15:48 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "BufferPoolAccessor"
18//#define LOG_NDEBUG 0
19
20#include <sys/types.h>
21#include <time.h>
22#include <unistd.h>
23#include <utils/Log.h>
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070024#include <thread>
Sungtak Leebbe37b62018-08-29 15:15:48 -070025#include "AccessorImpl.h"
26#include "Connection.h"
27
28namespace android {
29namespace hardware {
30namespace media {
31namespace bufferpool {
32namespace V2_0 {
33namespace implementation {
34
35namespace {
36 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
37 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
38
39 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
Sungtak Lee30dbaa22019-10-03 14:38:01 -070040 static constexpr size_t kMinBufferCountForEviction = 25;
Sungtak Leebbe37b62018-08-29 15:15:48 -070041}
42
43// Buffer structure in bufferpool process
44struct InternalBuffer {
45 BufferId mId;
46 size_t mOwnerCount;
47 size_t mTransactionCount;
48 const std::shared_ptr<BufferPoolAllocation> mAllocation;
49 const size_t mAllocSize;
50 const std::vector<uint8_t> mConfig;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070051 bool mInvalidated;
Sungtak Leebbe37b62018-08-29 15:15:48 -070052
53 InternalBuffer(
54 BufferId id,
55 const std::shared_ptr<BufferPoolAllocation> &alloc,
56 const size_t allocSize,
57 const std::vector<uint8_t> &allocConfig)
58 : mId(id), mOwnerCount(0), mTransactionCount(0),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070059 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
60 mInvalidated(false) {}
Sungtak Leebbe37b62018-08-29 15:15:48 -070061
62 const native_handle_t *handle() {
63 return mAllocation->handle();
64 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070065
66 void invalidate() {
67 mInvalidated = true;
68 }
Sungtak Leebbe37b62018-08-29 15:15:48 -070069};
70
71struct TransactionStatus {
72 TransactionId mId;
73 BufferId mBufferId;
74 ConnectionId mSender;
75 ConnectionId mReceiver;
76 BufferStatus mStatus;
77 int64_t mTimestampUs;
78 bool mSenderValidated;
79
80 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
81 mId = message.transactionId;
82 mBufferId = message.bufferId;
83 mStatus = message.newStatus;
84 mTimestampUs = timestampUs;
85 if (mStatus == BufferStatus::TRANSFER_TO) {
86 mSender = message.connectionId;
87 mReceiver = message.targetConnectionId;
88 mSenderValidated = true;
89 } else {
90 mSender = -1LL;
91 mReceiver = message.connectionId;
92 mSenderValidated = false;
93 }
94 }
95};
96
97// Helper template methods for handling map of set.
98template<class T, class U>
99bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
100 auto iter = mapOfSet->find(key);
101 if (iter == mapOfSet->end()) {
102 std::set<U> valueSet{value};
103 mapOfSet->insert(std::make_pair(key, valueSet));
104 return true;
105 } else if (iter->second.find(value) == iter->second.end()) {
106 iter->second.insert(value);
107 return true;
108 }
109 return false;
110}
111
112template<class T, class U>
113bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
114 bool ret = false;
115 auto iter = mapOfSet->find(key);
116 if (iter != mapOfSet->end()) {
117 if (iter->second.erase(value) > 0) {
118 ret = true;
119 }
120 if (iter->second.size() == 0) {
121 mapOfSet->erase(iter);
122 }
123 }
124 return ret;
125}
126
127template<class T, class U>
128bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
129 auto iter = mapOfSet->find(key);
130 if (iter != mapOfSet->end()) {
131 auto setIter = iter->second.find(value);
132 return setIter != iter->second.end();
133 }
134 return false;
135}
136
137int32_t Accessor::Impl::sPid = getpid();
138uint32_t Accessor::Impl::sSeqId = time(nullptr);
139
140Accessor::Impl::Impl(
141 const std::shared_ptr<BufferPoolAllocator> &allocator)
142 : mAllocator(allocator) {}
143
144Accessor::Impl::~Impl() {
145}
146
147ResultStatus Accessor::Impl::connect(
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700148 const sp<Accessor> &accessor, const sp<IObserver> &observer,
149 sp<Connection> *connection,
150 ConnectionId *pConnectionId,
151 uint32_t *pMsgId,
152 const StatusDescriptor** statusDescPtr,
153 const InvalidationDescriptor** invDescPtr) {
Sungtak Leebbe37b62018-08-29 15:15:48 -0700154 sp<Connection> newConnection = new Connection();
155 ResultStatus status = ResultStatus::CRITICAL_ERROR;
156 {
157 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
158 if (newConnection) {
159 ConnectionId id = (int64_t)sPid << 32 | sSeqId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700160 status = mBufferPool.mObserver.open(id, statusDescPtr);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700161 if (status == ResultStatus::OK) {
162 newConnection->initialize(accessor, id);
163 *connection = newConnection;
164 *pConnectionId = id;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700165 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700166 mBufferPool.mConnectionIds.insert(id);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700167 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
168 mBufferPool.mInvalidation.onConnect(id, observer);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700169 ++sSeqId;
170 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700171
Sungtak Leebbe37b62018-08-29 15:15:48 -0700172 }
173 mBufferPool.processStatusMessages();
174 mBufferPool.cleanUp();
175 }
176 return status;
177}
178
179ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
180 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
Sungtak Leed3128382018-11-07 17:30:37 -0800181 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700182 mBufferPool.processStatusMessages();
183 mBufferPool.handleClose(connectionId);
184 mBufferPool.mObserver.close(connectionId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700185 mBufferPool.mInvalidation.onClose(connectionId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700186 // Since close# will be called after all works are finished, it is OK to
187 // evict unused buffers.
188 mBufferPool.cleanUp(true);
189 return ResultStatus::OK;
190}
191
192ResultStatus Accessor::Impl::allocate(
193 ConnectionId connectionId, const std::vector<uint8_t>& params,
194 BufferId *bufferId, const native_handle_t** handle) {
195 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
196 mBufferPool.processStatusMessages();
197 ResultStatus status = ResultStatus::OK;
198 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
199 lock.unlock();
200 std::shared_ptr<BufferPoolAllocation> alloc;
201 size_t allocSize;
202 status = mAllocator->allocate(params, &alloc, &allocSize);
203 lock.lock();
204 if (status == ResultStatus::OK) {
205 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
206 }
207 ALOGV("create a buffer %d : %u %p",
208 status == ResultStatus::OK, *bufferId, *handle);
209 }
210 if (status == ResultStatus::OK) {
211 // TODO: handle ownBuffer failure
212 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
213 }
214 mBufferPool.cleanUp();
215 return status;
216}
217
218ResultStatus Accessor::Impl::fetch(
219 ConnectionId connectionId, TransactionId transactionId,
220 BufferId bufferId, const native_handle_t** handle) {
221 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
222 mBufferPool.processStatusMessages();
223 auto found = mBufferPool.mTransactions.find(transactionId);
224 if (found != mBufferPool.mTransactions.end() &&
225 contains(&mBufferPool.mPendingTransactions,
226 connectionId, transactionId)) {
227 if (found->second->mSenderValidated &&
228 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
229 found->second->mBufferId == bufferId) {
230 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
231 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
232 if (bufferIt != mBufferPool.mBuffers.end()) {
233 mBufferPool.mStats.onBufferFetched();
234 *handle = bufferIt->second->handle();
235 return ResultStatus::OK;
236 }
237 }
238 }
239 mBufferPool.cleanUp();
240 return ResultStatus::CRITICAL_ERROR;
241}
242
243void Accessor::Impl::cleanUp(bool clearCache) {
244 // transaction timeout, buffer cacheing TTL handling
245 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
246 mBufferPool.processStatusMessages();
247 mBufferPool.cleanUp(clearCache);
248}
249
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700250void Accessor::Impl::flush() {
251 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
252 mBufferPool.processStatusMessages();
253 mBufferPool.flush(shared_from_this());
254}
255
256void Accessor::Impl::handleInvalidateAck() {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800257 std::map<ConnectionId, const sp<IObserver>> observers;
258 uint32_t invalidationId;
259 {
260 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
261 mBufferPool.processStatusMessages();
262 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
263 }
264 // Do not hold lock for send invalidations
Sungtak Lee603467b2019-05-16 16:48:00 -0700265 size_t deadClients = 0;
Sungtak Leebe6a1182018-12-17 19:00:40 -0800266 for (auto it = observers.begin(); it != observers.end(); ++it) {
267 const sp<IObserver> observer = it->second;
268 if (observer) {
269 Return<void> transResult = observer->onMessage(it->first, invalidationId);
Sungtak Lee603467b2019-05-16 16:48:00 -0700270 if (!transResult.isOk()) {
271 ++deadClients;
272 }
Sungtak Leebe6a1182018-12-17 19:00:40 -0800273 }
274 }
Sungtak Lee603467b2019-05-16 16:48:00 -0700275 if (deadClients > 0) {
276 ALOGD("During invalidation found %zu dead clients", deadClients);
277 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700278}
279
280bool Accessor::Impl::isValid() {
281 return mBufferPool.isValid();
282}
283
Sungtak Leebbe37b62018-08-29 15:15:48 -0700284Accessor::Impl::Impl::BufferPool::BufferPool()
285 : mTimestampUs(getTimestampNow()),
286 mLastCleanUpUs(mTimestampUs),
287 mLastLogUs(mTimestampUs),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700288 mSeq(0),
289 mStartSeq(0) {
290 mValid = mInvalidationChannel.isValid();
291}
Sungtak Leebbe37b62018-08-29 15:15:48 -0700292
293
294// Statistics helper
295template<typename T, typename S>
296int percentage(T base, S total) {
297 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
298}
299
Sungtak Leed3128382018-11-07 17:30:37 -0800300std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700301
Sungtak Leebbe37b62018-08-29 15:15:48 -0700302Accessor::Impl::Impl::BufferPool::~BufferPool() {
303 std::lock_guard<std::mutex> lock(mMutex);
Sungtak Leed3318082018-09-07 15:52:43 -0700304 ALOGD("Destruction - bufferpool2 %p "
Sungtak Leebbe37b62018-08-29 15:15:48 -0700305 "cached: %zu/%zuM, %zu/%d%% in use; "
306 "allocs: %zu, %d%% recycled; "
307 "transfers: %zu, %d%% unfetced",
308 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
309 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
310 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
311 mStats.mTotalTransfers,
312 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
313}
314
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700315void Accessor::Impl::BufferPool::Invalidation::onConnect(
316 ConnectionId conId, const sp<IObserver>& observer) {
317 mAcks[conId] = mInvalidationId; // starts from current invalidationId
318 mObservers.insert(std::make_pair(conId, observer));
319}
320
321void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
322 mAcks.erase(conId);
323 mObservers.erase(conId);
324}
325
326void Accessor::Impl::BufferPool::Invalidation::onAck(
327 ConnectionId conId,
328 uint32_t msgId) {
329 auto it = mAcks.find(conId);
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800330 if (it == mAcks.end()) {
331 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
332 return;
333 }
334 if (isMessageLater(msgId, it->second)) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700335 mAcks[conId] = msgId;
336 }
337}
338
339void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
340 BufferId bufferId,
341 BufferInvalidationChannel &channel) {
342 for (auto it = mPendings.begin(); it != mPendings.end();) {
Sungtak Leed3128382018-11-07 17:30:37 -0800343 if (it->isInvalidated(bufferId)) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700344 uint32_t msgId = 0;
345 if (it->mNeedsAck) {
346 msgId = ++mInvalidationId;
347 if (msgId == 0) {
348 // wrap happens
349 msgId = ++mInvalidationId;
350 }
351 }
352 channel.postInvalidation(msgId, it->mFrom, it->mTo);
Sungtak Leed3128382018-11-07 17:30:37 -0800353 it = mPendings.erase(it);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700354 continue;
355 }
356 ++it;
357 }
358}
359
360void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
361 bool needsAck,
362 uint32_t from,
363 uint32_t to,
364 size_t left,
365 BufferInvalidationChannel &channel,
366 const std::shared_ptr<Accessor::Impl> &impl) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700367 uint32_t msgId = 0;
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800368 if (needsAck) {
369 msgId = ++mInvalidationId;
370 if (msgId == 0) {
371 // wrap happens
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700372 msgId = ++mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700373 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800374 }
Sungtak Leed3318082018-09-07 15:52:43 -0700375 ALOGV("bufferpool2 invalidation requested and queued");
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800376 if (left == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700377 channel.postInvalidation(msgId, from, to);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700378 } else {
379 // TODO: sending hint message?
Sungtak Leed3318082018-09-07 15:52:43 -0700380 ALOGV("bufferpoo2 invalidation requested and pending");
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700381 Pending pending(needsAck, from, to, left, impl);
382 mPendings.push_back(pending);
383 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800384 sInvalidator->addAccessor(mId, impl);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700385}
386
Sungtak Leebe6a1182018-12-17 19:00:40 -0800387void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
388 std::map<ConnectionId, const sp<IObserver>> *observers,
389 uint32_t *invalidationId) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700390 if (mInvalidationId != 0) {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800391 *invalidationId = mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700392 std::set<int> deads;
393 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
394 if (it->second != mInvalidationId) {
Sungtak Leed3128382018-11-07 17:30:37 -0800395 const sp<IObserver> observer = mObservers[it->first];
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700396 if (observer) {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800397 observers->emplace(it->first, observer);
398 ALOGV("connection %lld will call observer (%u: %u)",
Sungtak Leed3128382018-11-07 17:30:37 -0800399 (long long)it->first, it->second, mInvalidationId);
Sungtak Leebe6a1182018-12-17 19:00:40 -0800400 // N.B: onMessage will be called later. ignore possibility of
401 // onMessage# oneway call being lost.
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800402 it->second = mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700403 } else {
Sungtak Leed3318082018-09-07 15:52:43 -0700404 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700405 deads.insert(it->first);
406 }
407 }
408 }
409 if (deads.size() > 0) {
410 for (auto it = deads.begin(); it != deads.end(); ++it) {
411 onClose(*it);
412 }
413 }
414 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800415 if (mPendings.size() == 0) {
416 // All invalidation Ids are synced and no more pending invalidations.
417 sInvalidator->delAccessor(mId);
418 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700419}
420
Sungtak Leebbe37b62018-08-29 15:15:48 -0700421bool Accessor::Impl::BufferPool::handleOwnBuffer(
422 ConnectionId connectionId, BufferId bufferId) {
423
424 bool added = insert(&mUsingBuffers, connectionId, bufferId);
425 if (added) {
426 auto iter = mBuffers.find(bufferId);
427 iter->second->mOwnerCount++;
428 }
429 insert(&mUsingConnections, bufferId, connectionId);
430 return added;
431}
432
433bool Accessor::Impl::BufferPool::handleReleaseBuffer(
434 ConnectionId connectionId, BufferId bufferId) {
435 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
436 if (deleted) {
437 auto iter = mBuffers.find(bufferId);
438 iter->second->mOwnerCount--;
439 if (iter->second->mOwnerCount == 0 &&
440 iter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700441 if (!iter->second->mInvalidated) {
442 mStats.onBufferUnused(iter->second->mAllocSize);
443 mFreeBuffers.insert(bufferId);
444 } else {
445 mStats.onBufferUnused(iter->second->mAllocSize);
446 mStats.onBufferEvicted(iter->second->mAllocSize);
447 mBuffers.erase(iter);
448 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
449 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700450 }
451 }
452 erase(&mUsingConnections, bufferId, connectionId);
453 ALOGV("release buffer %u : %d", bufferId, deleted);
454 return deleted;
455}
456
457bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
458 auto completed = mCompletedTransactions.find(
459 message.transactionId);
460 if (completed != mCompletedTransactions.end()) {
461 // already completed
462 mCompletedTransactions.erase(completed);
463 return true;
464 }
465 // the buffer should exist and be owned.
466 auto bufferIter = mBuffers.find(message.bufferId);
467 if (bufferIter == mBuffers.end() ||
468 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
469 return false;
470 }
471 auto found = mTransactions.find(message.transactionId);
472 if (found != mTransactions.end()) {
473 // transfer_from was received earlier.
474 found->second->mSender = message.connectionId;
475 found->second->mSenderValidated = true;
476 return true;
477 }
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700478 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
479 // N.B: it could be fake or receive connection already closed.
480 ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
481 this, (long long)message.targetConnectionId);
482 return false;
483 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700484 mStats.onBufferSent();
485 mTransactions.insert(std::make_pair(
486 message.transactionId,
487 std::make_unique<TransactionStatus>(message, mTimestampUs)));
488 insert(&mPendingTransactions, message.targetConnectionId,
489 message.transactionId);
490 bufferIter->second->mTransactionCount++;
491 return true;
492}
493
494bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
495 auto found = mTransactions.find(message.transactionId);
496 if (found == mTransactions.end()) {
497 // TODO: is it feasible to check ownership here?
498 mStats.onBufferSent();
499 mTransactions.insert(std::make_pair(
500 message.transactionId,
501 std::make_unique<TransactionStatus>(message, mTimestampUs)));
502 insert(&mPendingTransactions, message.connectionId,
503 message.transactionId);
504 auto bufferIter = mBuffers.find(message.bufferId);
505 bufferIter->second->mTransactionCount++;
506 } else {
507 if (message.connectionId == found->second->mReceiver) {
508 found->second->mStatus = BufferStatus::TRANSFER_FROM;
509 }
510 }
511 return true;
512}
513
514bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
515 auto found = mTransactions.find(message.transactionId);
516 if (found != mTransactions.end()) {
517 bool deleted = erase(&mPendingTransactions, message.connectionId,
518 message.transactionId);
519 if (deleted) {
520 if (!found->second->mSenderValidated) {
521 mCompletedTransactions.insert(message.transactionId);
522 }
523 auto bufferIter = mBuffers.find(message.bufferId);
524 if (message.newStatus == BufferStatus::TRANSFER_OK) {
525 handleOwnBuffer(message.connectionId, message.bufferId);
526 }
527 bufferIter->second->mTransactionCount--;
528 if (bufferIter->second->mOwnerCount == 0
529 && bufferIter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700530 if (!bufferIter->second->mInvalidated) {
531 mStats.onBufferUnused(bufferIter->second->mAllocSize);
532 mFreeBuffers.insert(message.bufferId);
533 } else {
534 mStats.onBufferUnused(bufferIter->second->mAllocSize);
535 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
536 mBuffers.erase(bufferIter);
537 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
538 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700539 }
540 mTransactions.erase(found);
541 }
542 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
543 message.bufferId, deleted);
544 return deleted;
545 }
546 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
547 message.bufferId);
548 return false;
549}
550
551void Accessor::Impl::BufferPool::processStatusMessages() {
552 std::vector<BufferStatusMessage> messages;
553 mObserver.getBufferStatusChanges(messages);
554 mTimestampUs = getTimestampNow();
555 for (BufferStatusMessage& message: messages) {
556 bool ret = false;
557 switch (message.newStatus) {
558 case BufferStatus::NOT_USED:
559 ret = handleReleaseBuffer(
560 message.connectionId, message.bufferId);
561 break;
562 case BufferStatus::USED:
563 // not happening
564 break;
565 case BufferStatus::TRANSFER_TO:
566 ret = handleTransferTo(message);
567 break;
568 case BufferStatus::TRANSFER_FROM:
569 ret = handleTransferFrom(message);
570 break;
571 case BufferStatus::TRANSFER_TIMEOUT:
572 // TODO
573 break;
574 case BufferStatus::TRANSFER_LOST:
575 // TODO
576 break;
577 case BufferStatus::TRANSFER_FETCH:
578 // not happening
579 break;
580 case BufferStatus::TRANSFER_OK:
581 case BufferStatus::TRANSFER_ERROR:
582 ret = handleTransferResult(message);
583 break;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700584 case BufferStatus::INVALIDATION_ACK:
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700585 mInvalidation.onAck(message.connectionId, message.bufferId);
Sungtak Leed3128382018-11-07 17:30:37 -0800586 ret = true;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700587 break;
Sungtak Leebbe37b62018-08-29 15:15:48 -0700588 }
589 if (ret == false) {
590 ALOGW("buffer status message processing failure - message : %d connection : %lld",
591 message.newStatus, (long long)message.connectionId);
592 }
593 }
594 messages.clear();
595}
596
597bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
598 // Cleaning buffers
599 auto buffers = mUsingBuffers.find(connectionId);
600 if (buffers != mUsingBuffers.end()) {
601 for (const BufferId& bufferId : buffers->second) {
602 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
603 if (deleted) {
604 auto bufferIter = mBuffers.find(bufferId);
605 bufferIter->second->mOwnerCount--;
606 if (bufferIter->second->mOwnerCount == 0 &&
607 bufferIter->second->mTransactionCount == 0) {
608 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700609 if (!bufferIter->second->mInvalidated) {
610 mStats.onBufferUnused(bufferIter->second->mAllocSize);
611 mFreeBuffers.insert(bufferId);
612 } else {
613 mStats.onBufferUnused(bufferIter->second->mAllocSize);
614 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
615 mBuffers.erase(bufferIter);
616 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
617 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700618 }
619 }
620 }
621 mUsingBuffers.erase(buffers);
622 }
623
624 // Cleaning transactions
625 auto pending = mPendingTransactions.find(connectionId);
626 if (pending != mPendingTransactions.end()) {
627 for (const TransactionId& transactionId : pending->second) {
628 auto iter = mTransactions.find(transactionId);
629 if (iter != mTransactions.end()) {
630 if (!iter->second->mSenderValidated) {
631 mCompletedTransactions.insert(transactionId);
632 }
633 BufferId bufferId = iter->second->mBufferId;
634 auto bufferIter = mBuffers.find(bufferId);
635 bufferIter->second->mTransactionCount--;
636 if (bufferIter->second->mOwnerCount == 0 &&
637 bufferIter->second->mTransactionCount == 0) {
638 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700639 if (!bufferIter->second->mInvalidated) {
640 mStats.onBufferUnused(bufferIter->second->mAllocSize);
641 mFreeBuffers.insert(bufferId);
642 } else {
643 mStats.onBufferUnused(bufferIter->second->mAllocSize);
644 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
645 mBuffers.erase(bufferIter);
646 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
647 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700648 }
649 mTransactions.erase(iter);
650 }
651 }
652 }
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700653 mConnectionIds.erase(connectionId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700654 return true;
655}
656
657bool Accessor::Impl::BufferPool::getFreeBuffer(
658 const std::shared_ptr<BufferPoolAllocator> &allocator,
659 const std::vector<uint8_t> &params, BufferId *pId,
660 const native_handle_t** handle) {
661 auto bufferIt = mFreeBuffers.begin();
662 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
663 BufferId bufferId = *bufferIt;
664 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
665 break;
666 }
667 }
668 if (bufferIt != mFreeBuffers.end()) {
669 BufferId id = *bufferIt;
670 mFreeBuffers.erase(bufferIt);
671 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
672 *handle = mBuffers[id]->handle();
673 *pId = id;
674 ALOGV("recycle a buffer %u %p", id, *handle);
675 return true;
676 }
677 return false;
678}
679
680ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
681 const std::shared_ptr<BufferPoolAllocation> &alloc,
682 const size_t allocSize,
683 const std::vector<uint8_t> &params,
684 BufferId *pId,
685 const native_handle_t** handle) {
686
687 BufferId bufferId = mSeq++;
688 if (mSeq == Connection::SYNC_BUFFERID) {
689 mSeq = 0;
690 }
691 std::unique_ptr<InternalBuffer> buffer =
692 std::make_unique<InternalBuffer>(
693 bufferId, alloc, allocSize, params);
694 if (buffer) {
695 auto res = mBuffers.insert(std::make_pair(
696 bufferId, std::move(buffer)));
697 if (res.second) {
698 mStats.onBufferAllocated(allocSize);
699 *handle = alloc->handle();
700 *pId = bufferId;
701 return ResultStatus::OK;
702 }
703 }
704 return ResultStatus::NO_MEMORY;
705}
706
707void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
708 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
709 mLastCleanUpUs = mTimestampUs;
710 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
711 mLastLogUs = mTimestampUs;
Sungtak Leed3318082018-09-07 15:52:43 -0700712 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
Sungtak Leebbe37b62018-08-29 15:15:48 -0700713 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
714 "%zu/%zu (fetch/transfer)",
715 this, mStats.mBuffersCached, mStats.mSizeCached,
716 mStats.mBuffersInUse, mStats.mSizeInUse,
717 mStats.mTotalRecycles, mStats.mTotalAllocations,
718 mStats.mTotalFetches, mStats.mTotalTransfers);
719 }
720 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
Sungtak Lee30dbaa22019-10-03 14:38:01 -0700721 if (!clearCache && (mStats.mSizeCached < kMinAllocBytesForEviction
722 || mBuffers.size() < kMinBufferCountForEviction)) {
Sungtak Leebbe37b62018-08-29 15:15:48 -0700723 break;
724 }
725 auto it = mBuffers.find(*freeIt);
726 if (it != mBuffers.end() &&
727 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
728 mStats.onBufferEvicted(it->second->mAllocSize);
729 mBuffers.erase(it);
730 freeIt = mFreeBuffers.erase(freeIt);
731 } else {
732 ++freeIt;
Sungtak Leed3318082018-09-07 15:52:43 -0700733 ALOGW("bufferpool2 inconsistent!");
Sungtak Leebbe37b62018-08-29 15:15:48 -0700734 }
735 }
736 }
737}
738
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700739void Accessor::Impl::BufferPool::invalidate(
740 bool needsAck, BufferId from, BufferId to,
741 const std::shared_ptr<Accessor::Impl> &impl) {
742 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
743 if (isBufferInRange(from, to, *freeIt)) {
744 auto it = mBuffers.find(*freeIt);
745 if (it != mBuffers.end() &&
746 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
747 mStats.onBufferEvicted(it->second->mAllocSize);
748 mBuffers.erase(it);
749 freeIt = mFreeBuffers.erase(freeIt);
750 continue;
751 } else {
Sungtak Leed3318082018-09-07 15:52:43 -0700752 ALOGW("bufferpool2 inconsistent!");
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700753 }
754 }
755 ++freeIt;
756 }
757
758 size_t left = 0;
759 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
760 if (isBufferInRange(from, to, it->first)) {
761 it->second->invalidate();
762 ++left;
763 }
764 }
765 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
766}
767
768void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
769 BufferId from = mStartSeq;
770 BufferId to = mSeq;
771 mStartSeq = mSeq;
772 // TODO: needsAck params
Sungtak Leed3128382018-11-07 17:30:37 -0800773 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700774 if (from != to) {
775 invalidate(true, from, to, impl);
776 }
777}
778
779void Accessor::Impl::invalidatorThread(
780 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
781 std::mutex &mutex,
782 std::condition_variable &cv,
783 bool &ready) {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700784 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
785 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
786 constexpr useconds_t MAX_SLEEP_US = 10000;
787 uint32_t numSpin = 0;
788 useconds_t sleepUs = 1;
789
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700790 while(true) {
791 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
792 {
793 std::unique_lock<std::mutex> lock(mutex);
794 if (!ready) {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700795 numSpin = 0;
796 sleepUs = 1;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700797 cv.wait(lock);
798 }
799 copied.insert(accessors.begin(), accessors.end());
800 }
801 std::list<ConnectionId> erased;
802 for (auto it = copied.begin(); it != copied.end(); ++it) {
803 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
804 if (!impl) {
805 erased.push_back(it->first);
806 } else {
807 impl->handleInvalidateAck();
808 }
809 }
810 {
811 std::unique_lock<std::mutex> lock(mutex);
812 for (auto it = erased.begin(); it != erased.end(); ++it) {
813 accessors.erase(*it);
814 }
815 if (accessors.size() == 0) {
816 ready = false;
817 } else {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700818 // TODO Use an efficient way to wait over FMQ.
819 // N.B. Since there is not a efficient way to wait over FMQ,
820 // polling over the FMQ is the current way to prevent draining
821 // CPU.
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700822 lock.unlock();
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700823 ++numSpin;
824 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
825 sleepUs < MAX_SLEEP_US) {
826 sleepUs *= 10;
827 }
828 if (numSpin % NUM_SPIN_TO_LOG == 0) {
829 ALOGW("invalidator thread spinning");
830 }
831 ::usleep(sleepUs);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700832 }
833 }
834 }
835}
836
837Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
838 std::thread invalidator(
839 invalidatorThread,
840 std::ref(mAccessors),
841 std::ref(mMutex),
842 std::ref(mCv),
843 std::ref(mReady));
844 invalidator.detach();
845}
846
847void Accessor::Impl::AccessorInvalidator::addAccessor(
848 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
849 bool notify = false;
850 std::unique_lock<std::mutex> lock(mMutex);
851 if (mAccessors.find(accessorId) == mAccessors.end()) {
852 if (!mReady) {
853 mReady = true;
854 notify = true;
855 }
856 mAccessors.insert(std::make_pair(accessorId, impl));
Sungtak Leed3128382018-11-07 17:30:37 -0800857 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700858 }
859 lock.unlock();
860 if (notify) {
861 mCv.notify_one();
862 }
863}
864
865void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
866 std::lock_guard<std::mutex> lock(mMutex);
867 mAccessors.erase(accessorId);
Sungtak Leed3128382018-11-07 17:30:37 -0800868 ALOGV("buffer invalidation deleted bp:%u", accessorId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700869 if (mAccessors.size() == 0) {
870 mReady = false;
871 }
872}
873
Sungtak Leed3128382018-11-07 17:30:37 -0800874std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
875
876void Accessor::Impl::createInvalidator() {
877 if (!sInvalidator) {
878 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
879 }
880}
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700881
Sungtak Leebbe37b62018-08-29 15:15:48 -0700882} // namespace implementation
883} // namespace V2_0
884} // namespace bufferpool
885} // namespace media
886} // namespace hardware
887} // namespace android