blob: 194765637c11a6672cb20dfaa5004695c7b17f52 [file] [log] [blame]
Sungtak Leebbe37b62018-08-29 15:15:48 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Ray Essick3688d0a2019-10-09 15:31:58 -070017#define LOG_TAG "BufferPoolAccessor2.0"
Sungtak Leebbe37b62018-08-29 15:15:48 -070018//#define LOG_NDEBUG 0
19
20#include <sys/types.h>
Ray Essick3688d0a2019-10-09 15:31:58 -070021#include <stdint.h>
Sungtak Leebbe37b62018-08-29 15:15:48 -070022#include <time.h>
23#include <unistd.h>
24#include <utils/Log.h>
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070025#include <thread>
Sungtak Leebbe37b62018-08-29 15:15:48 -070026#include "AccessorImpl.h"
27#include "Connection.h"
28
29namespace android {
30namespace hardware {
31namespace media {
32namespace bufferpool {
33namespace V2_0 {
34namespace implementation {
35
36namespace {
37 static constexpr int64_t kCleanUpDurationUs = 500000; // TODO tune 0.5 sec
38 static constexpr int64_t kLogDurationUs = 5000000; // 5 secs
39
40 static constexpr size_t kMinAllocBytesForEviction = 1024*1024*15;
Sungtak Lee30dbaa22019-10-03 14:38:01 -070041 static constexpr size_t kMinBufferCountForEviction = 25;
Sungtak Leebbe37b62018-08-29 15:15:48 -070042}
43
44// Buffer structure in bufferpool process
45struct InternalBuffer {
46 BufferId mId;
47 size_t mOwnerCount;
48 size_t mTransactionCount;
49 const std::shared_ptr<BufferPoolAllocation> mAllocation;
50 const size_t mAllocSize;
51 const std::vector<uint8_t> mConfig;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070052 bool mInvalidated;
Sungtak Leebbe37b62018-08-29 15:15:48 -070053
54 InternalBuffer(
55 BufferId id,
56 const std::shared_ptr<BufferPoolAllocation> &alloc,
57 const size_t allocSize,
58 const std::vector<uint8_t> &allocConfig)
59 : mId(id), mOwnerCount(0), mTransactionCount(0),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070060 mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
61 mInvalidated(false) {}
Sungtak Leebbe37b62018-08-29 15:15:48 -070062
63 const native_handle_t *handle() {
64 return mAllocation->handle();
65 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070066
67 void invalidate() {
68 mInvalidated = true;
69 }
Sungtak Leebbe37b62018-08-29 15:15:48 -070070};
71
72struct TransactionStatus {
73 TransactionId mId;
74 BufferId mBufferId;
75 ConnectionId mSender;
76 ConnectionId mReceiver;
77 BufferStatus mStatus;
78 int64_t mTimestampUs;
79 bool mSenderValidated;
80
81 TransactionStatus(const BufferStatusMessage &message, int64_t timestampUs) {
82 mId = message.transactionId;
83 mBufferId = message.bufferId;
84 mStatus = message.newStatus;
85 mTimestampUs = timestampUs;
86 if (mStatus == BufferStatus::TRANSFER_TO) {
87 mSender = message.connectionId;
88 mReceiver = message.targetConnectionId;
89 mSenderValidated = true;
90 } else {
91 mSender = -1LL;
92 mReceiver = message.connectionId;
93 mSenderValidated = false;
94 }
95 }
96};
97
98// Helper template methods for handling map of set.
99template<class T, class U>
100bool insert(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
101 auto iter = mapOfSet->find(key);
102 if (iter == mapOfSet->end()) {
103 std::set<U> valueSet{value};
104 mapOfSet->insert(std::make_pair(key, valueSet));
105 return true;
106 } else if (iter->second.find(value) == iter->second.end()) {
107 iter->second.insert(value);
108 return true;
109 }
110 return false;
111}
112
113template<class T, class U>
114bool erase(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
115 bool ret = false;
116 auto iter = mapOfSet->find(key);
117 if (iter != mapOfSet->end()) {
118 if (iter->second.erase(value) > 0) {
119 ret = true;
120 }
121 if (iter->second.size() == 0) {
122 mapOfSet->erase(iter);
123 }
124 }
125 return ret;
126}
127
128template<class T, class U>
129bool contains(std::map<T, std::set<U>> *mapOfSet, T key, U value) {
130 auto iter = mapOfSet->find(key);
131 if (iter != mapOfSet->end()) {
132 auto setIter = iter->second.find(value);
133 return setIter != iter->second.end();
134 }
135 return false;
136}
137
Sungtak Leebbe37b62018-08-29 15:15:48 -0700138uint32_t Accessor::Impl::sSeqId = time(nullptr);
139
140Accessor::Impl::Impl(
141 const std::shared_ptr<BufferPoolAllocator> &allocator)
142 : mAllocator(allocator) {}
143
144Accessor::Impl::~Impl() {
145}
146
147ResultStatus Accessor::Impl::connect(
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700148 const sp<Accessor> &accessor, const sp<IObserver> &observer,
149 sp<Connection> *connection,
150 ConnectionId *pConnectionId,
151 uint32_t *pMsgId,
152 const StatusDescriptor** statusDescPtr,
153 const InvalidationDescriptor** invDescPtr) {
Sungtak Leebbe37b62018-08-29 15:15:48 -0700154 sp<Connection> newConnection = new Connection();
155 ResultStatus status = ResultStatus::CRITICAL_ERROR;
156 {
157 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
158 if (newConnection) {
Ray Essick3688d0a2019-10-09 15:31:58 -0700159 int32_t pid = getpid();
160 ConnectionId id = (int64_t)pid << 32 | sSeqId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700161 status = mBufferPool.mObserver.open(id, statusDescPtr);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700162 if (status == ResultStatus::OK) {
163 newConnection->initialize(accessor, id);
164 *connection = newConnection;
165 *pConnectionId = id;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700166 *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700167 mBufferPool.mConnectionIds.insert(id);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700168 mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
169 mBufferPool.mInvalidation.onConnect(id, observer);
Ray Essick3688d0a2019-10-09 15:31:58 -0700170 if (sSeqId == UINT32_MAX) {
171 sSeqId = 0;
172 } else {
173 ++sSeqId;
174 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700175 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700176
Sungtak Leebbe37b62018-08-29 15:15:48 -0700177 }
178 mBufferPool.processStatusMessages();
179 mBufferPool.cleanUp();
180 }
181 return status;
182}
183
184ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
185 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
Sungtak Leed3128382018-11-07 17:30:37 -0800186 ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700187 mBufferPool.processStatusMessages();
188 mBufferPool.handleClose(connectionId);
189 mBufferPool.mObserver.close(connectionId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700190 mBufferPool.mInvalidation.onClose(connectionId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700191 // Since close# will be called after all works are finished, it is OK to
192 // evict unused buffers.
193 mBufferPool.cleanUp(true);
194 return ResultStatus::OK;
195}
196
197ResultStatus Accessor::Impl::allocate(
198 ConnectionId connectionId, const std::vector<uint8_t>& params,
199 BufferId *bufferId, const native_handle_t** handle) {
200 std::unique_lock<std::mutex> lock(mBufferPool.mMutex);
201 mBufferPool.processStatusMessages();
202 ResultStatus status = ResultStatus::OK;
203 if (!mBufferPool.getFreeBuffer(mAllocator, params, bufferId, handle)) {
204 lock.unlock();
205 std::shared_ptr<BufferPoolAllocation> alloc;
206 size_t allocSize;
207 status = mAllocator->allocate(params, &alloc, &allocSize);
208 lock.lock();
209 if (status == ResultStatus::OK) {
210 status = mBufferPool.addNewBuffer(alloc, allocSize, params, bufferId, handle);
211 }
212 ALOGV("create a buffer %d : %u %p",
213 status == ResultStatus::OK, *bufferId, *handle);
214 }
215 if (status == ResultStatus::OK) {
216 // TODO: handle ownBuffer failure
217 mBufferPool.handleOwnBuffer(connectionId, *bufferId);
218 }
219 mBufferPool.cleanUp();
220 return status;
221}
222
223ResultStatus Accessor::Impl::fetch(
224 ConnectionId connectionId, TransactionId transactionId,
225 BufferId bufferId, const native_handle_t** handle) {
226 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
227 mBufferPool.processStatusMessages();
228 auto found = mBufferPool.mTransactions.find(transactionId);
229 if (found != mBufferPool.mTransactions.end() &&
230 contains(&mBufferPool.mPendingTransactions,
231 connectionId, transactionId)) {
232 if (found->second->mSenderValidated &&
233 found->second->mStatus == BufferStatus::TRANSFER_FROM &&
234 found->second->mBufferId == bufferId) {
235 found->second->mStatus = BufferStatus::TRANSFER_FETCH;
236 auto bufferIt = mBufferPool.mBuffers.find(bufferId);
237 if (bufferIt != mBufferPool.mBuffers.end()) {
238 mBufferPool.mStats.onBufferFetched();
239 *handle = bufferIt->second->handle();
240 return ResultStatus::OK;
241 }
242 }
243 }
244 mBufferPool.cleanUp();
245 return ResultStatus::CRITICAL_ERROR;
246}
247
248void Accessor::Impl::cleanUp(bool clearCache) {
249 // transaction timeout, buffer cacheing TTL handling
250 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
251 mBufferPool.processStatusMessages();
252 mBufferPool.cleanUp(clearCache);
253}
254
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700255void Accessor::Impl::flush() {
256 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
257 mBufferPool.processStatusMessages();
258 mBufferPool.flush(shared_from_this());
259}
260
261void Accessor::Impl::handleInvalidateAck() {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800262 std::map<ConnectionId, const sp<IObserver>> observers;
263 uint32_t invalidationId;
264 {
265 std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
266 mBufferPool.processStatusMessages();
267 mBufferPool.mInvalidation.onHandleAck(&observers, &invalidationId);
268 }
269 // Do not hold lock for send invalidations
Sungtak Lee603467b2019-05-16 16:48:00 -0700270 size_t deadClients = 0;
Sungtak Leebe6a1182018-12-17 19:00:40 -0800271 for (auto it = observers.begin(); it != observers.end(); ++it) {
272 const sp<IObserver> observer = it->second;
273 if (observer) {
274 Return<void> transResult = observer->onMessage(it->first, invalidationId);
Sungtak Lee603467b2019-05-16 16:48:00 -0700275 if (!transResult.isOk()) {
276 ++deadClients;
277 }
Sungtak Leebe6a1182018-12-17 19:00:40 -0800278 }
279 }
Sungtak Lee603467b2019-05-16 16:48:00 -0700280 if (deadClients > 0) {
281 ALOGD("During invalidation found %zu dead clients", deadClients);
282 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700283}
284
285bool Accessor::Impl::isValid() {
286 return mBufferPool.isValid();
287}
288
Sungtak Leebbe37b62018-08-29 15:15:48 -0700289Accessor::Impl::Impl::BufferPool::BufferPool()
290 : mTimestampUs(getTimestampNow()),
291 mLastCleanUpUs(mTimestampUs),
292 mLastLogUs(mTimestampUs),
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700293 mSeq(0),
294 mStartSeq(0) {
295 mValid = mInvalidationChannel.isValid();
296}
Sungtak Leebbe37b62018-08-29 15:15:48 -0700297
298
299// Statistics helper
300template<typename T, typename S>
301int percentage(T base, S total) {
302 return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
303}
304
Sungtak Leed3128382018-11-07 17:30:37 -0800305std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700306
Sungtak Leebbe37b62018-08-29 15:15:48 -0700307Accessor::Impl::Impl::BufferPool::~BufferPool() {
308 std::lock_guard<std::mutex> lock(mMutex);
Sungtak Leed3318082018-09-07 15:52:43 -0700309 ALOGD("Destruction - bufferpool2 %p "
Sungtak Leebbe37b62018-08-29 15:15:48 -0700310 "cached: %zu/%zuM, %zu/%d%% in use; "
311 "allocs: %zu, %d%% recycled; "
Nicholas Reiterba056462019-07-30 16:10:18 -0700312 "transfers: %zu, %d%% unfetched",
Sungtak Leebbe37b62018-08-29 15:15:48 -0700313 this, mStats.mBuffersCached, mStats.mSizeCached >> 20,
314 mStats.mBuffersInUse, percentage(mStats.mBuffersInUse, mStats.mBuffersCached),
315 mStats.mTotalAllocations, percentage(mStats.mTotalRecycles, mStats.mTotalAllocations),
316 mStats.mTotalTransfers,
317 percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
318}
319
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700320void Accessor::Impl::BufferPool::Invalidation::onConnect(
321 ConnectionId conId, const sp<IObserver>& observer) {
322 mAcks[conId] = mInvalidationId; // starts from current invalidationId
323 mObservers.insert(std::make_pair(conId, observer));
324}
325
326void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
327 mAcks.erase(conId);
328 mObservers.erase(conId);
329}
330
331void Accessor::Impl::BufferPool::Invalidation::onAck(
332 ConnectionId conId,
333 uint32_t msgId) {
334 auto it = mAcks.find(conId);
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800335 if (it == mAcks.end()) {
336 ALOGW("ACK from inconsistent connection! %lld", (long long)conId);
337 return;
338 }
339 if (isMessageLater(msgId, it->second)) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700340 mAcks[conId] = msgId;
341 }
342}
343
344void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
345 BufferId bufferId,
346 BufferInvalidationChannel &channel) {
347 for (auto it = mPendings.begin(); it != mPendings.end();) {
Sungtak Leed3128382018-11-07 17:30:37 -0800348 if (it->isInvalidated(bufferId)) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700349 uint32_t msgId = 0;
350 if (it->mNeedsAck) {
351 msgId = ++mInvalidationId;
352 if (msgId == 0) {
353 // wrap happens
354 msgId = ++mInvalidationId;
355 }
356 }
357 channel.postInvalidation(msgId, it->mFrom, it->mTo);
Sungtak Leed3128382018-11-07 17:30:37 -0800358 it = mPendings.erase(it);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700359 continue;
360 }
361 ++it;
362 }
363}
364
365void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
366 bool needsAck,
367 uint32_t from,
368 uint32_t to,
369 size_t left,
370 BufferInvalidationChannel &channel,
371 const std::shared_ptr<Accessor::Impl> &impl) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700372 uint32_t msgId = 0;
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800373 if (needsAck) {
374 msgId = ++mInvalidationId;
375 if (msgId == 0) {
376 // wrap happens
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700377 msgId = ++mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700378 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800379 }
Sungtak Leed3318082018-09-07 15:52:43 -0700380 ALOGV("bufferpool2 invalidation requested and queued");
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800381 if (left == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700382 channel.postInvalidation(msgId, from, to);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700383 } else {
384 // TODO: sending hint message?
Sungtak Leed3318082018-09-07 15:52:43 -0700385 ALOGV("bufferpoo2 invalidation requested and pending");
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700386 Pending pending(needsAck, from, to, left, impl);
387 mPendings.push_back(pending);
388 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800389 sInvalidator->addAccessor(mId, impl);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700390}
391
Sungtak Leebe6a1182018-12-17 19:00:40 -0800392void Accessor::Impl::BufferPool::Invalidation::onHandleAck(
393 std::map<ConnectionId, const sp<IObserver>> *observers,
394 uint32_t *invalidationId) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700395 if (mInvalidationId != 0) {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800396 *invalidationId = mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700397 std::set<int> deads;
398 for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
399 if (it->second != mInvalidationId) {
Sungtak Leed3128382018-11-07 17:30:37 -0800400 const sp<IObserver> observer = mObservers[it->first];
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700401 if (observer) {
Sungtak Leebe6a1182018-12-17 19:00:40 -0800402 observers->emplace(it->first, observer);
403 ALOGV("connection %lld will call observer (%u: %u)",
Sungtak Leed3128382018-11-07 17:30:37 -0800404 (long long)it->first, it->second, mInvalidationId);
Sungtak Leebe6a1182018-12-17 19:00:40 -0800405 // N.B: onMessage will be called later. ignore possibility of
406 // onMessage# oneway call being lost.
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800407 it->second = mInvalidationId;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700408 } else {
Sungtak Leed3318082018-09-07 15:52:43 -0700409 ALOGV("bufferpool2 observer died %lld", (long long)it->first);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700410 deads.insert(it->first);
411 }
412 }
413 }
414 if (deads.size() > 0) {
415 for (auto it = deads.begin(); it != deads.end(); ++it) {
416 onClose(*it);
417 }
418 }
419 }
Sungtak Leeb355f7c2018-11-19 14:49:10 -0800420 if (mPendings.size() == 0) {
421 // All invalidation Ids are synced and no more pending invalidations.
422 sInvalidator->delAccessor(mId);
423 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700424}
425
Sungtak Leebbe37b62018-08-29 15:15:48 -0700426bool Accessor::Impl::BufferPool::handleOwnBuffer(
427 ConnectionId connectionId, BufferId bufferId) {
428
429 bool added = insert(&mUsingBuffers, connectionId, bufferId);
430 if (added) {
431 auto iter = mBuffers.find(bufferId);
432 iter->second->mOwnerCount++;
433 }
434 insert(&mUsingConnections, bufferId, connectionId);
435 return added;
436}
437
438bool Accessor::Impl::BufferPool::handleReleaseBuffer(
439 ConnectionId connectionId, BufferId bufferId) {
440 bool deleted = erase(&mUsingBuffers, connectionId, bufferId);
441 if (deleted) {
442 auto iter = mBuffers.find(bufferId);
443 iter->second->mOwnerCount--;
444 if (iter->second->mOwnerCount == 0 &&
445 iter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700446 if (!iter->second->mInvalidated) {
447 mStats.onBufferUnused(iter->second->mAllocSize);
448 mFreeBuffers.insert(bufferId);
449 } else {
450 mStats.onBufferUnused(iter->second->mAllocSize);
451 mStats.onBufferEvicted(iter->second->mAllocSize);
452 mBuffers.erase(iter);
453 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
454 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700455 }
456 }
457 erase(&mUsingConnections, bufferId, connectionId);
458 ALOGV("release buffer %u : %d", bufferId, deleted);
459 return deleted;
460}
461
462bool Accessor::Impl::BufferPool::handleTransferTo(const BufferStatusMessage &message) {
463 auto completed = mCompletedTransactions.find(
464 message.transactionId);
465 if (completed != mCompletedTransactions.end()) {
466 // already completed
467 mCompletedTransactions.erase(completed);
468 return true;
469 }
470 // the buffer should exist and be owned.
471 auto bufferIter = mBuffers.find(message.bufferId);
472 if (bufferIter == mBuffers.end() ||
473 !contains(&mUsingBuffers, message.connectionId, message.bufferId)) {
474 return false;
475 }
476 auto found = mTransactions.find(message.transactionId);
477 if (found != mTransactions.end()) {
478 // transfer_from was received earlier.
479 found->second->mSender = message.connectionId;
480 found->second->mSenderValidated = true;
481 return true;
482 }
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700483 if (mConnectionIds.find(message.targetConnectionId) == mConnectionIds.end()) {
484 // N.B: it could be fake or receive connection already closed.
485 ALOGD("bufferpool2 %p receiver connection %lld is no longer valid",
486 this, (long long)message.targetConnectionId);
487 return false;
488 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700489 mStats.onBufferSent();
490 mTransactions.insert(std::make_pair(
491 message.transactionId,
492 std::make_unique<TransactionStatus>(message, mTimestampUs)));
493 insert(&mPendingTransactions, message.targetConnectionId,
494 message.transactionId);
495 bufferIter->second->mTransactionCount++;
496 return true;
497}
498
499bool Accessor::Impl::BufferPool::handleTransferFrom(const BufferStatusMessage &message) {
500 auto found = mTransactions.find(message.transactionId);
501 if (found == mTransactions.end()) {
502 // TODO: is it feasible to check ownership here?
503 mStats.onBufferSent();
504 mTransactions.insert(std::make_pair(
505 message.transactionId,
506 std::make_unique<TransactionStatus>(message, mTimestampUs)));
507 insert(&mPendingTransactions, message.connectionId,
508 message.transactionId);
509 auto bufferIter = mBuffers.find(message.bufferId);
510 bufferIter->second->mTransactionCount++;
511 } else {
512 if (message.connectionId == found->second->mReceiver) {
513 found->second->mStatus = BufferStatus::TRANSFER_FROM;
514 }
515 }
516 return true;
517}
518
519bool Accessor::Impl::BufferPool::handleTransferResult(const BufferStatusMessage &message) {
520 auto found = mTransactions.find(message.transactionId);
521 if (found != mTransactions.end()) {
522 bool deleted = erase(&mPendingTransactions, message.connectionId,
523 message.transactionId);
524 if (deleted) {
525 if (!found->second->mSenderValidated) {
526 mCompletedTransactions.insert(message.transactionId);
527 }
528 auto bufferIter = mBuffers.find(message.bufferId);
529 if (message.newStatus == BufferStatus::TRANSFER_OK) {
530 handleOwnBuffer(message.connectionId, message.bufferId);
531 }
532 bufferIter->second->mTransactionCount--;
533 if (bufferIter->second->mOwnerCount == 0
534 && bufferIter->second->mTransactionCount == 0) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700535 if (!bufferIter->second->mInvalidated) {
536 mStats.onBufferUnused(bufferIter->second->mAllocSize);
537 mFreeBuffers.insert(message.bufferId);
538 } else {
539 mStats.onBufferUnused(bufferIter->second->mAllocSize);
540 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
541 mBuffers.erase(bufferIter);
542 mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
543 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700544 }
545 mTransactions.erase(found);
546 }
547 ALOGV("transfer finished %llu %u - %d", (unsigned long long)message.transactionId,
548 message.bufferId, deleted);
549 return deleted;
550 }
551 ALOGV("transfer not found %llu %u", (unsigned long long)message.transactionId,
552 message.bufferId);
553 return false;
554}
555
556void Accessor::Impl::BufferPool::processStatusMessages() {
557 std::vector<BufferStatusMessage> messages;
558 mObserver.getBufferStatusChanges(messages);
559 mTimestampUs = getTimestampNow();
560 for (BufferStatusMessage& message: messages) {
561 bool ret = false;
562 switch (message.newStatus) {
563 case BufferStatus::NOT_USED:
564 ret = handleReleaseBuffer(
565 message.connectionId, message.bufferId);
566 break;
567 case BufferStatus::USED:
568 // not happening
569 break;
570 case BufferStatus::TRANSFER_TO:
571 ret = handleTransferTo(message);
572 break;
573 case BufferStatus::TRANSFER_FROM:
574 ret = handleTransferFrom(message);
575 break;
576 case BufferStatus::TRANSFER_TIMEOUT:
577 // TODO
578 break;
579 case BufferStatus::TRANSFER_LOST:
580 // TODO
581 break;
582 case BufferStatus::TRANSFER_FETCH:
583 // not happening
584 break;
585 case BufferStatus::TRANSFER_OK:
586 case BufferStatus::TRANSFER_ERROR:
587 ret = handleTransferResult(message);
588 break;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700589 case BufferStatus::INVALIDATION_ACK:
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700590 mInvalidation.onAck(message.connectionId, message.bufferId);
Sungtak Leed3128382018-11-07 17:30:37 -0800591 ret = true;
Sungtak Leed491f1f2018-10-05 15:56:56 -0700592 break;
Sungtak Leebbe37b62018-08-29 15:15:48 -0700593 }
594 if (ret == false) {
595 ALOGW("buffer status message processing failure - message : %d connection : %lld",
596 message.newStatus, (long long)message.connectionId);
597 }
598 }
599 messages.clear();
600}
601
602bool Accessor::Impl::BufferPool::handleClose(ConnectionId connectionId) {
603 // Cleaning buffers
604 auto buffers = mUsingBuffers.find(connectionId);
605 if (buffers != mUsingBuffers.end()) {
606 for (const BufferId& bufferId : buffers->second) {
607 bool deleted = erase(&mUsingConnections, bufferId, connectionId);
608 if (deleted) {
609 auto bufferIter = mBuffers.find(bufferId);
610 bufferIter->second->mOwnerCount--;
611 if (bufferIter->second->mOwnerCount == 0 &&
612 bufferIter->second->mTransactionCount == 0) {
613 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700614 if (!bufferIter->second->mInvalidated) {
615 mStats.onBufferUnused(bufferIter->second->mAllocSize);
616 mFreeBuffers.insert(bufferId);
617 } else {
618 mStats.onBufferUnused(bufferIter->second->mAllocSize);
619 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
620 mBuffers.erase(bufferIter);
621 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
622 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700623 }
624 }
625 }
626 mUsingBuffers.erase(buffers);
627 }
628
629 // Cleaning transactions
630 auto pending = mPendingTransactions.find(connectionId);
631 if (pending != mPendingTransactions.end()) {
632 for (const TransactionId& transactionId : pending->second) {
633 auto iter = mTransactions.find(transactionId);
634 if (iter != mTransactions.end()) {
635 if (!iter->second->mSenderValidated) {
636 mCompletedTransactions.insert(transactionId);
637 }
638 BufferId bufferId = iter->second->mBufferId;
639 auto bufferIter = mBuffers.find(bufferId);
640 bufferIter->second->mTransactionCount--;
641 if (bufferIter->second->mOwnerCount == 0 &&
642 bufferIter->second->mTransactionCount == 0) {
643 // TODO: handle freebuffer insert fail
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700644 if (!bufferIter->second->mInvalidated) {
645 mStats.onBufferUnused(bufferIter->second->mAllocSize);
646 mFreeBuffers.insert(bufferId);
647 } else {
648 mStats.onBufferUnused(bufferIter->second->mAllocSize);
649 mStats.onBufferEvicted(bufferIter->second->mAllocSize);
650 mBuffers.erase(bufferIter);
651 mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
652 }
Sungtak Leebbe37b62018-08-29 15:15:48 -0700653 }
654 mTransactions.erase(iter);
655 }
656 }
657 }
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700658 mConnectionIds.erase(connectionId);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700659 return true;
660}
661
662bool Accessor::Impl::BufferPool::getFreeBuffer(
663 const std::shared_ptr<BufferPoolAllocator> &allocator,
664 const std::vector<uint8_t> &params, BufferId *pId,
665 const native_handle_t** handle) {
666 auto bufferIt = mFreeBuffers.begin();
667 for (;bufferIt != mFreeBuffers.end(); ++bufferIt) {
668 BufferId bufferId = *bufferIt;
669 if (allocator->compatible(params, mBuffers[bufferId]->mConfig)) {
670 break;
671 }
672 }
673 if (bufferIt != mFreeBuffers.end()) {
674 BufferId id = *bufferIt;
675 mFreeBuffers.erase(bufferIt);
676 mStats.onBufferRecycled(mBuffers[id]->mAllocSize);
677 *handle = mBuffers[id]->handle();
678 *pId = id;
679 ALOGV("recycle a buffer %u %p", id, *handle);
680 return true;
681 }
682 return false;
683}
684
685ResultStatus Accessor::Impl::BufferPool::addNewBuffer(
686 const std::shared_ptr<BufferPoolAllocation> &alloc,
687 const size_t allocSize,
688 const std::vector<uint8_t> &params,
689 BufferId *pId,
690 const native_handle_t** handle) {
691
692 BufferId bufferId = mSeq++;
693 if (mSeq == Connection::SYNC_BUFFERID) {
694 mSeq = 0;
695 }
696 std::unique_ptr<InternalBuffer> buffer =
697 std::make_unique<InternalBuffer>(
698 bufferId, alloc, allocSize, params);
699 if (buffer) {
700 auto res = mBuffers.insert(std::make_pair(
701 bufferId, std::move(buffer)));
702 if (res.second) {
703 mStats.onBufferAllocated(allocSize);
704 *handle = alloc->handle();
705 *pId = bufferId;
706 return ResultStatus::OK;
707 }
708 }
709 return ResultStatus::NO_MEMORY;
710}
711
712void Accessor::Impl::BufferPool::cleanUp(bool clearCache) {
713 if (clearCache || mTimestampUs > mLastCleanUpUs + kCleanUpDurationUs) {
714 mLastCleanUpUs = mTimestampUs;
715 if (mTimestampUs > mLastLogUs + kLogDurationUs) {
716 mLastLogUs = mTimestampUs;
Sungtak Leed3318082018-09-07 15:52:43 -0700717 ALOGD("bufferpool2 %p : %zu(%zu size) total buffers - "
Sungtak Leebbe37b62018-08-29 15:15:48 -0700718 "%zu(%zu size) used buffers - %zu/%zu (recycle/alloc) - "
719 "%zu/%zu (fetch/transfer)",
720 this, mStats.mBuffersCached, mStats.mSizeCached,
721 mStats.mBuffersInUse, mStats.mSizeInUse,
722 mStats.mTotalRecycles, mStats.mTotalAllocations,
723 mStats.mTotalFetches, mStats.mTotalTransfers);
724 }
725 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
Sungtak Lee30dbaa22019-10-03 14:38:01 -0700726 if (!clearCache && (mStats.mSizeCached < kMinAllocBytesForEviction
727 || mBuffers.size() < kMinBufferCountForEviction)) {
Sungtak Leebbe37b62018-08-29 15:15:48 -0700728 break;
729 }
730 auto it = mBuffers.find(*freeIt);
731 if (it != mBuffers.end() &&
732 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
733 mStats.onBufferEvicted(it->second->mAllocSize);
734 mBuffers.erase(it);
735 freeIt = mFreeBuffers.erase(freeIt);
736 } else {
737 ++freeIt;
Sungtak Leed3318082018-09-07 15:52:43 -0700738 ALOGW("bufferpool2 inconsistent!");
Sungtak Leebbe37b62018-08-29 15:15:48 -0700739 }
740 }
741 }
742}
743
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700744void Accessor::Impl::BufferPool::invalidate(
745 bool needsAck, BufferId from, BufferId to,
746 const std::shared_ptr<Accessor::Impl> &impl) {
747 for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
748 if (isBufferInRange(from, to, *freeIt)) {
749 auto it = mBuffers.find(*freeIt);
750 if (it != mBuffers.end() &&
751 it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
752 mStats.onBufferEvicted(it->second->mAllocSize);
753 mBuffers.erase(it);
754 freeIt = mFreeBuffers.erase(freeIt);
755 continue;
756 } else {
Sungtak Leed3318082018-09-07 15:52:43 -0700757 ALOGW("bufferpool2 inconsistent!");
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700758 }
759 }
760 ++freeIt;
761 }
762
763 size_t left = 0;
764 for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
765 if (isBufferInRange(from, to, it->first)) {
766 it->second->invalidate();
767 ++left;
768 }
769 }
770 mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
771}
772
773void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
774 BufferId from = mStartSeq;
775 BufferId to = mSeq;
776 mStartSeq = mSeq;
777 // TODO: needsAck params
Sungtak Leed3128382018-11-07 17:30:37 -0800778 ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700779 if (from != to) {
780 invalidate(true, from, to, impl);
781 }
782}
783
784void Accessor::Impl::invalidatorThread(
785 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
786 std::mutex &mutex,
787 std::condition_variable &cv,
788 bool &ready) {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700789 constexpr uint32_t NUM_SPIN_TO_INCREASE_SLEEP = 1024;
790 constexpr uint32_t NUM_SPIN_TO_LOG = 1024*8;
791 constexpr useconds_t MAX_SLEEP_US = 10000;
792 uint32_t numSpin = 0;
793 useconds_t sleepUs = 1;
794
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700795 while(true) {
796 std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
797 {
798 std::unique_lock<std::mutex> lock(mutex);
799 if (!ready) {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700800 numSpin = 0;
801 sleepUs = 1;
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700802 cv.wait(lock);
803 }
804 copied.insert(accessors.begin(), accessors.end());
805 }
806 std::list<ConnectionId> erased;
807 for (auto it = copied.begin(); it != copied.end(); ++it) {
808 const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
809 if (!impl) {
810 erased.push_back(it->first);
811 } else {
812 impl->handleInvalidateAck();
813 }
814 }
815 {
816 std::unique_lock<std::mutex> lock(mutex);
817 for (auto it = erased.begin(); it != erased.end(); ++it) {
818 accessors.erase(*it);
819 }
820 if (accessors.size() == 0) {
821 ready = false;
822 } else {
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700823 // TODO Use an efficient way to wait over FMQ.
824 // N.B. Since there is not a efficient way to wait over FMQ,
825 // polling over the FMQ is the current way to prevent draining
826 // CPU.
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700827 lock.unlock();
Sungtak Leeccc32cb2019-08-20 18:07:54 -0700828 ++numSpin;
829 if (numSpin % NUM_SPIN_TO_INCREASE_SLEEP == 0 &&
830 sleepUs < MAX_SLEEP_US) {
831 sleepUs *= 10;
832 }
833 if (numSpin % NUM_SPIN_TO_LOG == 0) {
834 ALOGW("invalidator thread spinning");
835 }
836 ::usleep(sleepUs);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700837 }
838 }
839 }
840}
841
842Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
843 std::thread invalidator(
844 invalidatorThread,
845 std::ref(mAccessors),
846 std::ref(mMutex),
847 std::ref(mCv),
848 std::ref(mReady));
849 invalidator.detach();
850}
851
852void Accessor::Impl::AccessorInvalidator::addAccessor(
853 uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
854 bool notify = false;
855 std::unique_lock<std::mutex> lock(mMutex);
856 if (mAccessors.find(accessorId) == mAccessors.end()) {
857 if (!mReady) {
858 mReady = true;
859 notify = true;
860 }
861 mAccessors.insert(std::make_pair(accessorId, impl));
Sungtak Leed3128382018-11-07 17:30:37 -0800862 ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700863 }
864 lock.unlock();
865 if (notify) {
866 mCv.notify_one();
867 }
868}
869
870void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
871 std::lock_guard<std::mutex> lock(mMutex);
872 mAccessors.erase(accessorId);
Sungtak Leed3128382018-11-07 17:30:37 -0800873 ALOGV("buffer invalidation deleted bp:%u", accessorId);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700874 if (mAccessors.size() == 0) {
875 mReady = false;
876 }
877}
878
Sungtak Leed3128382018-11-07 17:30:37 -0800879std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
880
881void Accessor::Impl::createInvalidator() {
882 if (!sInvalidator) {
883 sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
884 }
885}
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700886
Sungtak Leebbe37b62018-08-29 15:15:48 -0700887} // namespace implementation
888} // namespace V2_0
889} // namespace bufferpool
890} // namespace media
891} // namespace hardware
892} // namespace android