blob: cc1b3bd80294b2c765a41389988e3ad810df4b62 [file] [log] [blame]
Sungtak Leebbe37b62018-08-29 15:15:48 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Ray Essick3688d0a2019-10-09 15:31:58 -070017#define LOG_TAG "BufferPoolAccessor2.0"
Sungtak Leebbe37b62018-08-29 15:15:48 -070018//#define LOG_NDEBUG 0
19
20#include <sys/types.h>
Ray Essick3688d0a2019-10-09 15:31:58 -070021#include <stdint.h>
Sungtak Leebbe37b62018-08-29 15:15:48 -070022#include <time.h>
23#include <unistd.h>
24#include <utils/Log.h>
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070025#include <thread>
Sungtak Leebbe37b62018-08-29 15:15:48 -070026#include "AccessorImpl.h"
27#include "Connection.h"
28
29namespace android {
30namespace hardware {
31namespace media {
32namespace bufferpool {
33namespace V2_0 {
34namespace implementation {
35
36namespace {
37 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
38 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
39
40 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
Sungtak Lee30dbaa22019-10-03 14:38:01 -070041 static constexpr size_t kMinBufferCountForEviction = 25;
Sungtak Lee7651a272020-04-27 00:16:50 -070042
43 static constexpr nsecs_t kEvictGranularityNs = 1000000000; // 1 sec
44 static constexpr nsecs_t kEvictDurationNs = 5000000000; // 5 secs
Sungtak Leebbe37b62018-08-29 15:15:48 -070045}
46
47// Buffer structure in bufferpool process
48struct InternalBuffer {
49 BufferId mId;
50 size_t mOwnerCount;
51 size_t mTransactionCount;
52 const std::shared_ptr<BufferPoolAllocation> mAllocation;
53 const size_t mAllocSize;
54 const std::vector<uint8_t> mConfig;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070055 bool mInvalidated;
Sungtak Leebbe37b62018-08-29 15:15:48 -070056
57 InternalBuffer(
58 BufferId id,
59 const std::shared_ptr<BufferPoolAllocation> &alloc,
60 const size_t allocSize,
61 const std::vector<uint8_t> &allocConfig)
62 : mId(id), mOwnerCount(0), mTransactionCount(0),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070063 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
64 mInvalidated(false) {}
Sungtak Leebbe37b62018-08-29 15:15:48 -070065
66 const native_handle_t *handle() {
67 return mAllocation->handle();
68 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070069
70 void invalidate() {
71 mInvalidated = true;
72 }
Sungtak Leebbe37b62018-08-29 15:15:48 -070073};
74
75struct TransactionStatus {
76 TransactionId mId;
77 BufferId mBufferId;
78 ConnectionId mSender;
79 ConnectionId mReceiver;
80 BufferStatus mStatus;
81 int64_t mTimestampUs;
82 bool mSenderValidated;
83
84 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
85 mId = message.transactionId;
86 mBufferId = message.bufferId;
87 mStatus = message.newStatus;
88 mTimestampUs = timestampUs;
89 if (mStatus == BufferStatus::TRANSFER_TO) {
90 mSender = message.connectionId;
91 mReceiver = message.targetConnectionId;
92 mSenderValidated = true;
93 } else {
94 mSender = -1LL;
95 mReceiver = message.connectionId;
96 mSenderValidated = false;
97 }
98 }
99};
100
101// Helper template methods for handling map of set.
102template<class T, class U>
103bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
104 auto iter = mapOfSet->find(key);
105 if (iter == mapOfSet->end()) {
106 std::set<U> valueSet{value};
107 mapOfSet->insert(std::make_pair(key, valueSet));
108 return true;
109 } else if (iter->second.find(value) == iter->second.end()) {
110 iter->second.insert(value);
111 return true;
112 }
113 return false;
114}
115
116template<class T, class U>
117bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
118 bool ret = false;
119 auto iter = mapOfSet->find(key);
120 if (iter != mapOfSet->end()) {
121 if (iter->second.erase(value) > 0) {
122 ret = true;
123 }
124 if (iter->second.size() == 0) {
125 mapOfSet->erase(iter);
126 }
127 }
128 return ret;
129}
130
131template<class T, class U>
132bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
133 auto iter = mapOfSet->find(key);
134 if (iter != mapOfSet->end()) {
135 auto setIter = iter->second.find(value);
136 return setIter != iter->second.end();
137 }
138 return false;
139}
140
Sungtak Leebbe37b62018-08-29 15:15:48 -0700141uint32_t Accessor::Impl::sSeqId = time(nullptr);
142
143Accessor::Impl::Impl(
144 const std::shared_ptr<BufferPoolAllocator> &allocator)
Sungtak Lee7651a272020-04-27 00:16:50 -0700145 : mAllocator(allocator), mScheduleEvictTs(0) {}
Sungtak Leebbe37b62018-08-29 15:15:48 -0700146
147Accessor::Impl::~Impl() {
148}
149
150ResultStatus Accessor::Impl::connect(
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700151 const sp<Accessor> &accessor, const sp<IObserver> &observer,
152 sp<Connection> *connection,
153 ConnectionId *pConnectionId,
154 uint32_t *pMsgId,
155 const StatusDescriptor** statusDescPtr,
156 const InvalidationDescriptor** invDescPtr) {
Sungtak Leebbe37b62018-08-29 15:15:48 -0700157 sp<Connection> newConnection = new Connection();
158 ResultStatus status = ResultStatus::CRITICAL_ERROR;
159 {
160 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
161 if (newConnection) {
Ray Essick3688d0a2019-10-09 15:31:58 -0700162 int32_t pid = getpid();
163 ConnectionId id = (int64_t)pid << 32 | sSeqId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700164 status = mBufferPool.mObserver.open(id, statusDescPtr);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700165 if (status == ResultStatus::OK) {
166 newConnection->initialize(accessor, id);
167 *connection = newConnection;
168 *pConnectionId = id;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700169 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700170 mBufferPool.mConnectionIds.insert(id);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700171 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
172 mBufferPool.mInvalidation.onConnect(id, observer);
Ray Essick3688d0a2019-10-09 15:31:58 -0700173 if (sSeqId == UINT32_MAX) {
174 sSeqId = 0;
175 } else {
176 ++sSeqId;
177 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700178 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700179
Sungtak Leebbe37b62018-08-29 15:15:48 -0700180 }
181 mBufferPool.processStatusMessages();
182 mBufferPool.cleanUp();
Sungtak Lee7651a272020-04-27 00:16:50 -0700183 scheduleEvictIfNeeded();
Sungtak Leebbe37b62018-08-29 15:15:48 -0700184 }
185 return status;
186}
187
188ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
189 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
Sungtak Leed3128382018-11-07 17:30:37 -0800190 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700191 mBufferPool.processStatusMessages();
192 mBufferPool.handleClose(connectionId);
193 mBufferPool.mObserver.close(connectionId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700194 mBufferPool.mInvalidation.onClose(connectionId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700195 // Since close# will be called after all works are finished, it is OK to
196 // evict unused buffers.
197 mBufferPool.cleanUp(true);
Sungtak Lee7651a272020-04-27 00:16:50 -0700198 scheduleEvictIfNeeded();
Sungtak Leebbe37b62018-08-29 15:15:48 -0700199 return ResultStatus::OK;
200}
201
202ResultStatus Accessor::Impl::allocate(
203 ConnectionId connectionId, const std::vector<uint8_t>& params,
204 BufferId *bufferId, const native_handle_t** handle) {
205 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
206 mBufferPool.processStatusMessages();
207 ResultStatus status = ResultStatus::OK;
208 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
209 lock.unlock();
210 std::shared_ptr<BufferPoolAllocation> alloc;
211 size_t allocSize;
212 status = mAllocator->allocate(params, &alloc, &allocSize);
213 lock.lock();
214 if (status == ResultStatus::OK) {
215 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
216 }
217 ALOGV("create a buffer %d : %u %p",
218 status == ResultStatus::OK, *bufferId, *handle);
219 }
220 if (status == ResultStatus::OK) {
221 // TODO: handle ownBuffer failure
222 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
223 }
224 mBufferPool.cleanUp();
Sungtak Lee7651a272020-04-27 00:16:50 -0700225 scheduleEvictIfNeeded();
Sungtak Leebbe37b62018-08-29 15:15:48 -0700226 return status;
227}
228
229ResultStatus Accessor::Impl::fetch(
230 ConnectionId connectionId, TransactionId transactionId,
231 BufferId bufferId, const native_handle_t** handle) {
232 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
233 mBufferPool.processStatusMessages();
234 auto found = mBufferPool.mTransactions.find(transactionId);
235 if (found != mBufferPool.mTransactions.end() &&
236 contains(&mBufferPool.mPendingTransactions,
237 connectionId, transactionId)) {
238 if (found->second->mSenderValidated &&
239 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
240 found->second->mBufferId == bufferId) {
241 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
242 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
243 if (bufferIt != mBufferPool.mBuffers.end()) {
244 mBufferPool.mStats.onBufferFetched();
245 *handle = bufferIt->second->handle();
246 return ResultStatus::OK;
247 }
248 }
249 }
250 mBufferPool.cleanUp();
Sungtak Lee7651a272020-04-27 00:16:50 -0700251 scheduleEvictIfNeeded();
Sungtak Leebbe37b62018-08-29 15:15:48 -0700252 return ResultStatus::CRITICAL_ERROR;
253}
254
255void Accessor::Impl::cleanUp(bool clearCache) {
256 // transaction timeout, buffer cacheing TTL handling
257 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
258 mBufferPool.processStatusMessages();
259 mBufferPool.cleanUp(clearCache);
260}
261
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700262void Accessor::Impl::flush() {
263 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
264 mBufferPool.processStatusMessages();
265 mBufferPool.flush(shared_from_this());
266}
267
268void Accessor::Impl::handleInvalidateAck() {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800269 std::map<ConnectionId, const sp<IObserver>> observers;
270 uint32_t invalidationId;
271 {
272 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
273 mBufferPool.processStatusMessages();
274 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
275 }
276 // Do not hold lock for send invalidations
Sungtak Lee603467b2019-05-16 16:48:00 -0700277 size_t deadClients = 0;
Sungtak Leebe6a1182018-12-17 19:00:40 -0800278 for (auto it = observers.begin(); it != observers.end(); ++it) {
279 const sp<IObserver> observer = it->second;
280 if (observer) {
281 Return<void> transResult = observer->onMessage(it->first, invalidationId);
Sungtak Lee603467b2019-05-16 16:48:00 -0700282 if (!transResult.isOk()) {
283 ++deadClients;
284 }
Sungtak Leebe6a1182018-12-17 19:00:40 -0800285 }
286 }
Sungtak Lee603467b2019-05-16 16:48:00 -0700287 if (deadClients > 0) {
288 ALOGD("During invalidation found %zu dead clients", deadClients);
289 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700290}
291
292bool Accessor::Impl::isValid() {
293 return mBufferPool.isValid();
294}
295
Sungtak Leebbe37b62018-08-29 15:15:48 -0700296Accessor::Impl::Impl::BufferPool::BufferPool()
297 : mTimestampUs(getTimestampNow()),
298 mLastCleanUpUs(mTimestampUs),
299 mLastLogUs(mTimestampUs),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700300 mSeq(0),
301 mStartSeq(0) {
302 mValid = mInvalidationChannel.isValid();
303}
Sungtak Leebbe37b62018-08-29 15:15:48 -0700304
305
306// Statistics helper
307template<typename T, typename S>
308int percentage(T base, S total) {
309 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
310}
311
Sungtak Leed3128382018-11-07 17:30:37 -0800312std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700313
Sungtak Leebbe37b62018-08-29 15:15:48 -0700314Accessor::Impl::Impl::BufferPool::~BufferPool() {
315 std::lock_guard<std::mutex> lock(mMutex);
Sungtak Leed3318082018-09-07 15:52:43 -0700316 ALOGD("Destruction - bufferpool2 %p "
Sungtak Leebbe37b62018-08-29 15:15:48 -0700317 "cached: %zu/%zuM, %zu/%d%% in use; "
318 "allocs: %zu, %d%% recycled; "
Nicholas Reiterba056462019-07-30 16:10:18 -0700319 "transfers: %zu, %d%% unfetched",
Sungtak Leebbe37b62018-08-29 15:15:48 -0700320 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
321 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
322 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
323 mStats.mTotalTransfers,
324 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
325}
326
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700327void Accessor::Impl::BufferPool::Invalidation::onConnect(
328 ConnectionId conId, const sp<IObserver>& observer) {
329 mAcks[conId] = mInvalidationId; // starts from current invalidationId
330 mObservers.insert(std::make_pair(conId, observer));
331}
332
333void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
334 mAcks.erase(conId);
335 mObservers.erase(conId);
336}
337
338void Accessor::Impl::BufferPool::Invalidation::onAck(
339 ConnectionId conId,
340 uint32_t msgId) {
341 auto it = mAcks.find(conId);
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800342 if (it == mAcks.end()) {
343 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
344 return;
345 }
346 if (isMessageLater(msgId, it->second)) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700347 mAcks[conId] = msgId;
348 }
349}
350
351void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
352 BufferId bufferId,
353 BufferInvalidationChannel &channel) {
354 for (auto it = mPendings.begin(); it != mPendings.end();) {
Sungtak Leed3128382018-11-07 17:30:37 -0800355 if (it->isInvalidated(bufferId)) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700356 uint32_t msgId = 0;
357 if (it->mNeedsAck) {
358 msgId = ++mInvalidationId;
359 if (msgId == 0) {
360 // wrap happens
361 msgId = ++mInvalidationId;
362 }
363 }
364 channel.postInvalidation(msgId, it->mFrom, it->mTo);
Sungtak Leed3128382018-11-07 17:30:37 -0800365 it = mPendings.erase(it);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700366 continue;
367 }
368 ++it;
369 }
370}
371
372void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
373 bool needsAck,
374 uint32_t from,
375 uint32_t to,
376 size_t left,
377 BufferInvalidationChannel &channel,
378 const std::shared_ptr<Accessor::Impl> &impl) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700379 uint32_t msgId = 0;
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800380 if (needsAck) {
381 msgId = ++mInvalidationId;
382 if (msgId == 0) {
383 // wrap happens
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700384 msgId = ++mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700385 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800386 }
Sungtak Leed3318082018-09-07 15:52:43 -0700387 ALOGV("bufferpool2 invalidation requested and queued");
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800388 if (left == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700389 channel.postInvalidation(msgId, from, to);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700390 } else {
391 // TODO: sending hint message?
Sungtak Leed3318082018-09-07 15:52:43 -0700392 ALOGV("bufferpoo2 invalidation requested and pending");
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700393 Pending pending(needsAck, from, to, left, impl);
394 mPendings.push_back(pending);
395 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800396 sInvalidator->addAccessor(mId, impl);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700397}
398
Sungtak Leebe6a1182018-12-17 19:00:40 -0800399void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
400 std::map<ConnectionId, const sp<IObserver>> *observers,
401 uint32_t *invalidationId) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700402 if (mInvalidationId != 0) {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800403 *invalidationId = mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700404 std::set<int> deads;
405 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
406 if (it->second != mInvalidationId) {
Sungtak Leed3128382018-11-07 17:30:37 -0800407 const sp<IObserver> observer = mObservers[it->first];
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700408 if (observer) {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800409 observers->emplace(it->first, observer);
410 ALOGV("connection %lld will call observer (%u: %u)",
Sungtak Leed3128382018-11-07 17:30:37 -0800411 (long long)it->first, it->second, mInvalidationId);
Sungtak Leebe6a1182018-12-17 19:00:40 -0800412 // N.B: onMessage will be called later. ignore possibility of
413 // onMessage# oneway call being lost.
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800414 it->second = mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700415 } else {
Sungtak Leed3318082018-09-07 15:52:43 -0700416 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700417 deads.insert(it->first);
418 }
419 }
420 }
421 if (deads.size() > 0) {
422 for (auto it = deads.begin(); it != deads.end(); ++it) {
423 onClose(*it);
424 }
425 }
426 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800427 if (mPendings.size() == 0) {
428 // All invalidation Ids are synced and no more pending invalidations.
429 sInvalidator->delAccessor(mId);
430 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700431}
432
Sungtak Leebbe37b62018-08-29 15:15:48 -0700433bool Accessor::Impl::BufferPool::handleOwnBuffer(
434 ConnectionId connectionId, BufferId bufferId) {
435
436 bool added = insert(&mUsingBuffers, connectionId, bufferId);
437 if (added) {
438 auto iter = mBuffers.find(bufferId);
439 iter->second->mOwnerCount++;
440 }
441 insert(&mUsingConnections, bufferId, connectionId);
442 return added;
443}
444
445bool Accessor::Impl::BufferPool::handleReleaseBuffer(
446 ConnectionId connectionId, BufferId bufferId) {
447 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
448 if (deleted) {
449 auto iter = mBuffers.find(bufferId);
450 iter->second->mOwnerCount--;
451 if (iter->second->mOwnerCount == 0 &&
452 iter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700453 if (!iter->second->mInvalidated) {
454 mStats.onBufferUnused(iter->second->mAllocSize);
455 mFreeBuffers.insert(bufferId);
456 } else {
457 mStats.onBufferUnused(iter->second->mAllocSize);
458 mStats.onBufferEvicted(iter->second->mAllocSize);
459 mBuffers.erase(iter);
460 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
461 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700462 }
463 }
464 erase(&mUsingConnections, bufferId, connectionId);
465 ALOGV("release buffer %u : %d", bufferId, deleted);
466 return deleted;
467}
468
469bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
470 auto completed = mCompletedTransactions.find(
471 message.transactionId);
472 if (completed != mCompletedTransactions.end()) {
473 // already completed
474 mCompletedTransactions.erase(completed);
475 return true;
476 }
477 // the buffer should exist and be owned.
478 auto bufferIter = mBuffers.find(message.bufferId);
479 if (bufferIter == mBuffers.end() ||
480 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
481 return false;
482 }
483 auto found = mTransactions.find(message.transactionId);
484 if (found != mTransactions.end()) {
485 // transfer_from was received earlier.
486 found->second->mSender = message.connectionId;
487 found->second->mSenderValidated = true;
488 return true;
489 }
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700490 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
491 // N.B: it could be fake or receive connection already closed.
492 ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
493 this, (long long)message.targetConnectionId);
494 return false;
495 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700496 mStats.onBufferSent();
497 mTransactions.insert(std::make_pair(
498 message.transactionId,
499 std::make_unique<TransactionStatus>(message, mTimestampUs)));
500 insert(&mPendingTransactions, message.targetConnectionId,
501 message.transactionId);
502 bufferIter->second->mTransactionCount++;
503 return true;
504}
505
506bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
507 auto found = mTransactions.find(message.transactionId);
508 if (found == mTransactions.end()) {
509 // TODO: is it feasible to check ownership here?
510 mStats.onBufferSent();
511 mTransactions.insert(std::make_pair(
512 message.transactionId,
513 std::make_unique<TransactionStatus>(message, mTimestampUs)));
514 insert(&mPendingTransactions, message.connectionId,
515 message.transactionId);
516 auto bufferIter = mBuffers.find(message.bufferId);
517 bufferIter->second->mTransactionCount++;
518 } else {
519 if (message.connectionId == found->second->mReceiver) {
520 found->second->mStatus = BufferStatus::TRANSFER_FROM;
521 }
522 }
523 return true;
524}
525
526bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
527 auto found = mTransactions.find(message.transactionId);
528 if (found != mTransactions.end()) {
529 bool deleted = erase(&mPendingTransactions, message.connectionId,
530 message.transactionId);
531 if (deleted) {
532 if (!found->second->mSenderValidated) {
533 mCompletedTransactions.insert(message.transactionId);
534 }
535 auto bufferIter = mBuffers.find(message.bufferId);
536 if (message.newStatus == BufferStatus::TRANSFER_OK) {
537 handleOwnBuffer(message.connectionId, message.bufferId);
538 }
539 bufferIter->second->mTransactionCount--;
540 if (bufferIter->second->mOwnerCount == 0
541 && bufferIter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700542 if (!bufferIter->second->mInvalidated) {
543 mStats.onBufferUnused(bufferIter->second->mAllocSize);
544 mFreeBuffers.insert(message.bufferId);
545 } else {
546 mStats.onBufferUnused(bufferIter->second->mAllocSize);
547 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
548 mBuffers.erase(bufferIter);
549 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
550 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700551 }
552 mTransactions.erase(found);
553 }
554 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
555 message.bufferId, deleted);
556 return deleted;
557 }
558 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
559 message.bufferId);
560 return false;
561}
562
563void Accessor::Impl::BufferPool::processStatusMessages() {
564 std::vector<BufferStatusMessage> messages;
565 mObserver.getBufferStatusChanges(messages);
566 mTimestampUs = getTimestampNow();
567 for (BufferStatusMessage& message: messages) {
568 bool ret = false;
569 switch (message.newStatus) {
570 case BufferStatus::NOT_USED:
571 ret = handleReleaseBuffer(
572 message.connectionId, message.bufferId);
573 break;
574 case BufferStatus::USED:
575 // not happening
576 break;
577 case BufferStatus::TRANSFER_TO:
578 ret = handleTransferTo(message);
579 break;
580 case BufferStatus::TRANSFER_FROM:
581 ret = handleTransferFrom(message);
582 break;
583 case BufferStatus::TRANSFER_TIMEOUT:
584 // TODO
585 break;
586 case BufferStatus::TRANSFER_LOST:
587 // TODO
588 break;
589 case BufferStatus::TRANSFER_FETCH:
590 // not happening
591 break;
592 case BufferStatus::TRANSFER_OK:
593 case BufferStatus::TRANSFER_ERROR:
594 ret = handleTransferResult(message);
595 break;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700596 case BufferStatus::INVALIDATION_ACK:
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700597 mInvalidation.onAck(message.connectionId, message.bufferId);
Sungtak Leed3128382018-11-07 17:30:37 -0800598 ret = true;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700599 break;
Sungtak Leebbe37b62018-08-29 15:15:48 -0700600 }
601 if (ret == false) {
602 ALOGW("buffer status message processing failure - message : %d connection : %lld",
603 message.newStatus, (long long)message.connectionId);
604 }
605 }
606 messages.clear();
607}
608
609bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
610 // Cleaning buffers
611 auto buffers = mUsingBuffers.find(connectionId);
612 if (buffers != mUsingBuffers.end()) {
613 for (const BufferId& bufferId : buffers->second) {
614 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
615 if (deleted) {
616 auto bufferIter = mBuffers.find(bufferId);
617 bufferIter->second->mOwnerCount--;
618 if (bufferIter->second->mOwnerCount == 0 &&
619 bufferIter->second->mTransactionCount == 0) {
620 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700621 if (!bufferIter->second->mInvalidated) {
622 mStats.onBufferUnused(bufferIter->second->mAllocSize);
623 mFreeBuffers.insert(bufferId);
624 } else {
625 mStats.onBufferUnused(bufferIter->second->mAllocSize);
626 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
627 mBuffers.erase(bufferIter);
628 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
629 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700630 }
631 }
632 }
633 mUsingBuffers.erase(buffers);
634 }
635
636 // Cleaning transactions
637 auto pending = mPendingTransactions.find(connectionId);
638 if (pending != mPendingTransactions.end()) {
639 for (const TransactionId& transactionId : pending->second) {
640 auto iter = mTransactions.find(transactionId);
641 if (iter != mTransactions.end()) {
642 if (!iter->second->mSenderValidated) {
643 mCompletedTransactions.insert(transactionId);
644 }
645 BufferId bufferId = iter->second->mBufferId;
646 auto bufferIter = mBuffers.find(bufferId);
647 bufferIter->second->mTransactionCount--;
648 if (bufferIter->second->mOwnerCount == 0 &&
649 bufferIter->second->mTransactionCount == 0) {
650 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700651 if (!bufferIter->second->mInvalidated) {
652 mStats.onBufferUnused(bufferIter->second->mAllocSize);
653 mFreeBuffers.insert(bufferId);
654 } else {
655 mStats.onBufferUnused(bufferIter->second->mAllocSize);
656 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
657 mBuffers.erase(bufferIter);
658 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
659 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700660 }
661 mTransactions.erase(iter);
662 }
663 }
664 }
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700665 mConnectionIds.erase(connectionId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700666 return true;
667}
668
669bool Accessor::Impl::BufferPool::getFreeBuffer(
670 const std::shared_ptr<BufferPoolAllocator> &allocator,
671 const std::vector<uint8_t> &params, BufferId *pId,
672 const native_handle_t** handle) {
673 auto bufferIt = mFreeBuffers.begin();
674 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
675 BufferId bufferId = *bufferIt;
676 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
677 break;
678 }
679 }
680 if (bufferIt != mFreeBuffers.end()) {
681 BufferId id = *bufferIt;
682 mFreeBuffers.erase(bufferIt);
683 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
684 *handle = mBuffers[id]->handle();
685 *pId = id;
686 ALOGV("recycle a buffer %u %p", id, *handle);
687 return true;
688 }
689 return false;
690}
691
692ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
693 const std::shared_ptr<BufferPoolAllocation> &alloc,
694 const size_t allocSize,
695 const std::vector<uint8_t> &params,
696 BufferId *pId,
697 const native_handle_t** handle) {
698
699 BufferId bufferId = mSeq++;
700 if (mSeq == Connection::SYNC_BUFFERID) {
701 mSeq = 0;
702 }
703 std::unique_ptr<InternalBuffer> buffer =
704 std::make_unique<InternalBuffer>(
705 bufferId, alloc, allocSize, params);
706 if (buffer) {
707 auto res = mBuffers.insert(std::make_pair(
708 bufferId, std::move(buffer)));
709 if (res.second) {
710 mStats.onBufferAllocated(allocSize);
711 *handle = alloc->handle();
712 *pId = bufferId;
713 return ResultStatus::OK;
714 }
715 }
716 return ResultStatus::NO_MEMORY;
717}
718
719void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
720 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
721 mLastCleanUpUs = mTimestampUs;
722 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
723 mLastLogUs = mTimestampUs;
Sungtak Leed3318082018-09-07 15:52:43 -0700724 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
Sungtak Leebbe37b62018-08-29 15:15:48 -0700725 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
726 "%zu/%zu (fetch/transfer)",
727 this, mStats.mBuffersCached, mStats.mSizeCached,
728 mStats.mBuffersInUse, mStats.mSizeInUse,
729 mStats.mTotalRecycles, mStats.mTotalAllocations,
730 mStats.mTotalFetches, mStats.mTotalTransfers);
731 }
732 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
Sungtak Lee30dbaa22019-10-03 14:38:01 -0700733 if (!clearCache && (mStats.mSizeCached < kMinAllocBytesForEviction
734 || mBuffers.size() < kMinBufferCountForEviction)) {
Sungtak Leebbe37b62018-08-29 15:15:48 -0700735 break;
736 }
737 auto it = mBuffers.find(*freeIt);
738 if (it != mBuffers.end() &&
739 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
740 mStats.onBufferEvicted(it->second->mAllocSize);
741 mBuffers.erase(it);
742 freeIt = mFreeBuffers.erase(freeIt);
743 } else {
744 ++freeIt;
Sungtak Leed3318082018-09-07 15:52:43 -0700745 ALOGW("bufferpool2 inconsistent!");
Sungtak Leebbe37b62018-08-29 15:15:48 -0700746 }
747 }
748 }
749}
750
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700751void Accessor::Impl::BufferPool::invalidate(
752 bool needsAck, BufferId from, BufferId to,
753 const std::shared_ptr<Accessor::Impl> &impl) {
754 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
755 if (isBufferInRange(from, to, *freeIt)) {
756 auto it = mBuffers.find(*freeIt);
757 if (it != mBuffers.end() &&
758 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
759 mStats.onBufferEvicted(it->second->mAllocSize);
760 mBuffers.erase(it);
761 freeIt = mFreeBuffers.erase(freeIt);
762 continue;
763 } else {
Sungtak Leed3318082018-09-07 15:52:43 -0700764 ALOGW("bufferpool2 inconsistent!");
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700765 }
766 }
767 ++freeIt;
768 }
769
770 size_t left = 0;
771 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
772 if (isBufferInRange(from, to, it->first)) {
773 it->second->invalidate();
774 ++left;
775 }
776 }
777 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
778}
779
780void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
781 BufferId from = mStartSeq;
782 BufferId to = mSeq;
783 mStartSeq = mSeq;
784 // TODO: needsAck params
Sungtak Leed3128382018-11-07 17:30:37 -0800785 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700786 if (from != to) {
787 invalidate(true, from, to, impl);
788 }
789}
790
791void Accessor::Impl::invalidatorThread(
792 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
793 std::mutex &mutex,
794 std::condition_variable &cv,
795 bool &ready) {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700796 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
797 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
798 constexpr useconds_t MAX_SLEEP_US = 10000;
799 uint32_t numSpin = 0;
800 useconds_t sleepUs = 1;
801
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700802 while(true) {
803 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
804 {
805 std::unique_lock<std::mutex> lock(mutex);
806 if (!ready) {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700807 numSpin = 0;
808 sleepUs = 1;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700809 cv.wait(lock);
810 }
811 copied.insert(accessors.begin(), accessors.end());
812 }
813 std::list<ConnectionId> erased;
814 for (auto it = copied.begin(); it != copied.end(); ++it) {
815 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
816 if (!impl) {
817 erased.push_back(it->first);
818 } else {
819 impl->handleInvalidateAck();
820 }
821 }
822 {
823 std::unique_lock<std::mutex> lock(mutex);
824 for (auto it = erased.begin(); it != erased.end(); ++it) {
825 accessors.erase(*it);
826 }
827 if (accessors.size() == 0) {
828 ready = false;
829 } else {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700830 // TODO Use an efficient way to wait over FMQ.
831 // N.B. Since there is not a efficient way to wait over FMQ,
832 // polling over the FMQ is the current way to prevent draining
833 // CPU.
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700834 lock.unlock();
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700835 ++numSpin;
836 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
837 sleepUs < MAX_SLEEP_US) {
838 sleepUs *= 10;
839 }
840 if (numSpin % NUM_SPIN_TO_LOG == 0) {
841 ALOGW("invalidator thread spinning");
842 }
843 ::usleep(sleepUs);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700844 }
845 }
846 }
847}
848
849Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
850 std::thread invalidator(
851 invalidatorThread,
852 std::ref(mAccessors),
853 std::ref(mMutex),
854 std::ref(mCv),
855 std::ref(mReady));
856 invalidator.detach();
857}
858
859void Accessor::Impl::AccessorInvalidator::addAccessor(
860 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
861 bool notify = false;
862 std::unique_lock<std::mutex> lock(mMutex);
863 if (mAccessors.find(accessorId) == mAccessors.end()) {
864 if (!mReady) {
865 mReady = true;
866 notify = true;
867 }
868 mAccessors.insert(std::make_pair(accessorId, impl));
Sungtak Leed3128382018-11-07 17:30:37 -0800869 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700870 }
871 lock.unlock();
872 if (notify) {
873 mCv.notify_one();
874 }
875}
876
877void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
878 std::lock_guard<std::mutex> lock(mMutex);
879 mAccessors.erase(accessorId);
Sungtak Leed3128382018-11-07 17:30:37 -0800880 ALOGV("buffer invalidation deleted bp:%u", accessorId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700881 if (mAccessors.size() == 0) {
882 mReady = false;
883 }
884}
885
Sungtak Leed3128382018-11-07 17:30:37 -0800886std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
887
888void Accessor::Impl::createInvalidator() {
889 if (!sInvalidator) {
890 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
891 }
892}
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700893
Sungtak Lee7651a272020-04-27 00:16:50 -0700894void Accessor::Impl::evictorThread(
895 std::map<const std::weak_ptr<Accessor::Impl>, nsecs_t, std::owner_less<>> &accessors,
896 std::mutex &mutex,
897 std::condition_variable &cv) {
898 std::list<const std::weak_ptr<Accessor::Impl>> evictList;
899 while (true) {
900 int expired = 0;
901 int evicted = 0;
902 {
903 nsecs_t now = systemTime();
904 std::unique_lock<std::mutex> lock(mutex);
905 if (accessors.size() == 0) {
906 cv.wait(lock);
907 }
908 auto it = accessors.begin();
909 while (it != accessors.end()) {
910 if (now > (it->second + kEvictDurationNs)) {
911 ++expired;
912 evictList.push_back(it->first);
913 it = accessors.erase(it);
914 } else {
915 ++it;
916 }
917 }
918 }
919 // evict idle accessors;
920 for (auto it = evictList.begin(); it != evictList.end(); ++it) {
921 const std::shared_ptr<Accessor::Impl> accessor = it->lock();
922 if (accessor) {
923 accessor->cleanUp(true);
924 ++evicted;
925 }
926 }
927 if (expired > 0) {
928 ALOGD("evictor expired: %d, evicted: %d", expired, evicted);
929 }
930 evictList.clear();
931 ::usleep(kEvictGranularityNs / 1000);
932 }
933}
934
935Accessor::Impl::AccessorEvictor::AccessorEvictor() {
936 std::thread evictor(
937 evictorThread,
938 std::ref(mAccessors),
939 std::ref(mMutex),
940 std::ref(mCv));
941 evictor.detach();
942}
943
944void Accessor::Impl::AccessorEvictor::addAccessor(
945 const std::weak_ptr<Accessor::Impl> &impl, nsecs_t ts) {
946 std::lock_guard<std::mutex> lock(mMutex);
947 bool notify = mAccessors.empty();
948 auto it = mAccessors.find(impl);
949 if (it == mAccessors.end()) {
950 mAccessors.emplace(impl, ts);
951 } else {
952 it->second = ts;
953 }
954 if (notify) {
955 mCv.notify_one();
956 }
957}
958
959std::unique_ptr<Accessor::Impl::AccessorEvictor> Accessor::Impl::sEvictor;
960
961void Accessor::Impl::createEvictor() {
962 if (!sEvictor) {
963 sEvictor = std::make_unique<Accessor::Impl::AccessorEvictor>();
964 }
965}
966
967void Accessor::Impl::scheduleEvictIfNeeded() {
968 nsecs_t now = systemTime();
969
970 if (now > (mScheduleEvictTs + kEvictGranularityNs)) {
971 mScheduleEvictTs = now;
972 sEvictor->addAccessor(shared_from_this(), now);
973 }
974}
975
Sungtak Leebbe37b62018-08-29 15:15:48 -0700976} // namespace implementation
977} // namespace V2_0
978} // namespace bufferpool
979} // namespace media
980} // namespace hardware
981} // namespace android