blob: f8531d812c9c5c39e276a426f24806436f35e756 [file] [log] [blame]
Sungtak Leebbe37b62018-08-29 15:15:48 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16#define LOG_TAG "BufferPoolManager"
17//#define LOG_NDEBUG 0
18
19#include <bufferpool/ClientManager.h>
20#include <hidl/HidlTransportSupport.h>
21#include <sys/types.h>
22#include <time.h>
23#include <unistd.h>
24#include <utils/Log.h>
25#include "BufferPoolClient.h"
Sungtak Leec7f9e2c2018-09-14 16:23:40 -070026#include "Observer.h"
Sungtak Leebbe37b62018-08-29 15:15:48 -070027
28namespace android {
29namespace hardware {
30namespace media {
31namespace bufferpool {
32namespace V2_0 {
33namespace implementation {
34
35static constexpr int64_t kRegisterTimeoutUs = 500000; // 0.5 sec
36static constexpr int64_t kCleanUpDurationUs = 1000000; // TODO: 1 sec tune
37static constexpr int64_t kClientTimeoutUs = 5000000; // TODO: 5 secs tune
38
39/**
40 * The holder of the cookie of remote IClientManager.
41 * The cookie is process locally unique for each IClientManager.
42 * (The cookie is used to notify death of clients to bufferpool process.)
43 */
44class ClientManagerCookieHolder {
45public:
46 /**
47 * Creates a cookie holder for remote IClientManager(s).
48 */
49 ClientManagerCookieHolder();
50
51 /**
52 * Gets a cookie for a remote IClientManager.
53 *
54 * @param manager the specified remote IClientManager.
55 * @param added true when the specified remote IClientManager is added
56 * newly, false otherwise.
57 *
58 * @return the process locally unique cookie for the specified IClientManager.
59 */
60 uint64_t getCookie(const sp<IClientManager> &manager, bool *added);
61
62private:
63 uint64_t mSeqId;
64 std::mutex mLock;
65 std::list<std::pair<const wp<IClientManager>, uint64_t>> mManagers;
66};
67
68ClientManagerCookieHolder::ClientManagerCookieHolder() : mSeqId(0){}
69
70uint64_t ClientManagerCookieHolder::getCookie(
71 const sp<IClientManager> &manager,
72 bool *added) {
73 std::lock_guard<std::mutex> lock(mLock);
74 for (auto it = mManagers.begin(); it != mManagers.end();) {
75 const sp<IClientManager> key = it->first.promote();
76 if (key) {
77 if (interfacesEqual(key, manager)) {
78 *added = false;
79 return it->second;
80 }
81 ++it;
82 } else {
83 it = mManagers.erase(it);
84 }
85 }
86 uint64_t id = mSeqId++;
87 *added = true;
88 mManagers.push_back(std::make_pair(manager, id));
89 return id;
90}
91
92class ClientManager::Impl {
93public:
94 Impl();
95
96 // BnRegisterSender
97 ResultStatus registerSender(const sp<IAccessor> &accessor,
98 ConnectionId *pConnectionId);
99
100 // BpRegisterSender
101 ResultStatus registerSender(const sp<IClientManager> &receiver,
102 ConnectionId senderId,
103 ConnectionId *receiverId);
104
105 ResultStatus create(const std::shared_ptr<BufferPoolAllocator> &allocator,
106 ConnectionId *pConnectionId);
107
108 ResultStatus close(ConnectionId connectionId);
109
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700110 ResultStatus flush(ConnectionId connectionId);
111
Sungtak Leebbe37b62018-08-29 15:15:48 -0700112 ResultStatus allocate(ConnectionId connectionId,
113 const std::vector<uint8_t> &params,
114 native_handle_t **handle,
115 std::shared_ptr<BufferPoolData> *buffer);
116
117 ResultStatus receive(ConnectionId connectionId,
118 TransactionId transactionId,
119 BufferId bufferId,
120 int64_t timestampUs,
121 native_handle_t **handle,
122 std::shared_ptr<BufferPoolData> *buffer);
123
124 ResultStatus postSend(ConnectionId receiverId,
125 const std::shared_ptr<BufferPoolData> &buffer,
126 TransactionId *transactionId,
127 int64_t *timestampUs);
128
129 ResultStatus getAccessor(ConnectionId connectionId,
130 sp<IAccessor> *accessor);
131
132 void cleanUp(bool clearCache = false);
133
134private:
135 // In order to prevent deadlock between multiple locks,
136 // always lock ClientCache.lock before locking ActiveClients.lock.
137 struct ClientCache {
138 // This lock is held for brief duration.
139 // Blocking operation is not performed while holding the lock.
140 std::mutex mMutex;
141 std::list<std::pair<const wp<IAccessor>, const std::weak_ptr<BufferPoolClient>>>
142 mClients;
143 std::condition_variable mConnectCv;
144 bool mConnecting;
145 int64_t mLastCleanUpUs;
146
147 ClientCache() : mConnecting(false), mLastCleanUpUs(getTimestampNow()) {}
148 } mCache;
149
150 // Active clients which can be retrieved via ConnectionId
151 struct ActiveClients {
152 // This lock is held for brief duration.
153 // Blocking operation is not performed holding the lock.
154 std::mutex mMutex;
155 std::map<ConnectionId, const std::shared_ptr<BufferPoolClient>>
156 mClients;
157 } mActive;
158
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700159 sp<Observer> mObserver;
160
Sungtak Leebbe37b62018-08-29 15:15:48 -0700161 ClientManagerCookieHolder mRemoteClientCookies;
162};
163
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700164ClientManager::Impl::Impl()
165 : mObserver(new Observer()) {}
Sungtak Leebbe37b62018-08-29 15:15:48 -0700166
167ResultStatus ClientManager::Impl::registerSender(
168 const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
169 cleanUp();
170 int64_t timeoutUs = getTimestampNow() + kRegisterTimeoutUs;
171 do {
172 std::unique_lock<std::mutex> lock(mCache.mMutex);
173 for (auto it = mCache.mClients.begin(); it != mCache.mClients.end(); ++it) {
174 sp<IAccessor> sAccessor = it->first.promote();
175 if (sAccessor && interfacesEqual(sAccessor, accessor)) {
176 const std::shared_ptr<BufferPoolClient> client = it->second.lock();
177 if (client) {
178 std::lock_guard<std::mutex> lock(mActive.mMutex);
179 *pConnectionId = client->getConnectionId();
180 if (mActive.mClients.find(*pConnectionId) != mActive.mClients.end()) {
181 ALOGV("register existing connection %lld", (long long)*pConnectionId);
182 return ResultStatus::ALREADY_EXISTS;
183 }
184 }
185 mCache.mClients.erase(it);
186 break;
187 }
188 }
189 if (!mCache.mConnecting) {
190 mCache.mConnecting = true;
191 lock.unlock();
192 ResultStatus result = ResultStatus::OK;
193 const std::shared_ptr<BufferPoolClient> client =
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700194 std::make_shared<BufferPoolClient>(accessor, mObserver);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700195 lock.lock();
196 if (!client) {
197 result = ResultStatus::NO_MEMORY;
198 } else if (!client->isValid()) {
199 result = ResultStatus::CRITICAL_ERROR;
200 }
201 if (result == ResultStatus::OK) {
202 // TODO: handle insert fail. (malloc fail)
203 const std::weak_ptr<BufferPoolClient> wclient = client;
204 mCache.mClients.push_back(std::make_pair(accessor, wclient));
205 ConnectionId conId = client->getConnectionId();
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700206 mObserver->addClient(conId, wclient);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700207 {
208 std::lock_guard<std::mutex> lock(mActive.mMutex);
209 mActive.mClients.insert(std::make_pair(conId, client));
210 }
211 *pConnectionId = conId;
212 ALOGV("register new connection %lld", (long long)*pConnectionId);
213 }
214 mCache.mConnecting = false;
215 lock.unlock();
216 mCache.mConnectCv.notify_all();
217 return result;
218 }
219 mCache.mConnectCv.wait_for(
220 lock, std::chrono::microseconds(kRegisterTimeoutUs));
221 } while (getTimestampNow() < timeoutUs);
222 // TODO: return timeout error
223 return ResultStatus::CRITICAL_ERROR;
224}
225
226ResultStatus ClientManager::Impl::registerSender(
227 const sp<IClientManager> &receiver,
228 ConnectionId senderId,
229 ConnectionId *receiverId) {
230 sp<IAccessor> accessor;
231 bool local = false;
232 {
233 std::lock_guard<std::mutex> lock(mActive.mMutex);
234 auto it = mActive.mClients.find(senderId);
235 if (it == mActive.mClients.end()) {
236 return ResultStatus::NOT_FOUND;
237 }
238 it->second->getAccessor(&accessor);
239 local = it->second->isLocal();
240 }
241 ResultStatus rs = ResultStatus::CRITICAL_ERROR;
242 if (accessor) {
243 Return<void> transResult = receiver->registerSender(
244 accessor,
245 [&rs, receiverId](
246 ResultStatus status,
247 int64_t connectionId) {
248 rs = status;
249 *receiverId = connectionId;
250 });
251 if (!transResult.isOk()) {
252 return ResultStatus::CRITICAL_ERROR;
253 } else if (local && rs == ResultStatus::OK) {
254 sp<ConnectionDeathRecipient> recipient = Accessor::getConnectionDeathRecipient();
255 if (recipient) {
256 ALOGV("client death recipient registered %lld", (long long)*receiverId);
257 bool added;
258 uint64_t cookie = mRemoteClientCookies.getCookie(receiver, &added);
259 recipient->addCookieToConnection(cookie, *receiverId);
260 if (added) {
261 Return<bool> transResult = receiver->linkToDeath(recipient, cookie);
262 }
263 }
264 }
265 }
266 return rs;
267}
268
269ResultStatus ClientManager::Impl::create(
270 const std::shared_ptr<BufferPoolAllocator> &allocator,
271 ConnectionId *pConnectionId) {
272 const sp<Accessor> accessor = new Accessor(allocator);
273 if (!accessor || !accessor->isValid()) {
274 return ResultStatus::CRITICAL_ERROR;
275 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700276 // TODO: observer is local. use direct call instead of hidl call.
Sungtak Leebbe37b62018-08-29 15:15:48 -0700277 std::shared_ptr<BufferPoolClient> client =
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700278 std::make_shared<BufferPoolClient>(accessor, mObserver);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700279 if (!client || !client->isValid()) {
280 return ResultStatus::CRITICAL_ERROR;
281 }
282 // Since a new bufferpool is created, evict memories which are used by
283 // existing bufferpools and clients.
284 cleanUp(true);
285 {
286 // TODO: handle insert fail. (malloc fail)
287 std::lock_guard<std::mutex> lock(mCache.mMutex);
288 const std::weak_ptr<BufferPoolClient> wclient = client;
289 mCache.mClients.push_back(std::make_pair(accessor, wclient));
290 ConnectionId conId = client->getConnectionId();
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700291 mObserver->addClient(conId, wclient);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700292 {
293 std::lock_guard<std::mutex> lock(mActive.mMutex);
294 mActive.mClients.insert(std::make_pair(conId, client));
295 }
296 *pConnectionId = conId;
297 ALOGV("create new connection %lld", (long long)*pConnectionId);
298 }
299 return ResultStatus::OK;
300}
301
302ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700303 std::unique_lock<std::mutex> lock1(mCache.mMutex);
304 std::unique_lock<std::mutex> lock2(mActive.mMutex);
Sungtak Leebbe37b62018-08-29 15:15:48 -0700305 auto it = mActive.mClients.find(connectionId);
306 if (it != mActive.mClients.end()) {
307 sp<IAccessor> accessor;
308 it->second->getAccessor(&accessor);
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700309 std::shared_ptr<BufferPoolClient> closing = it->second;
Sungtak Leebbe37b62018-08-29 15:15:48 -0700310 mActive.mClients.erase(connectionId);
311 for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
312 // clean up dead client caches
313 sp<IAccessor> cAccessor = cit->first.promote();
314 if (!cAccessor || (accessor && interfacesEqual(cAccessor, accessor))) {
315 cit = mCache.mClients.erase(cit);
316 } else {
317 cit++;
318 }
319 }
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700320 lock2.unlock();
321 lock1.unlock();
322 closing->flush();
Sungtak Leebbe37b62018-08-29 15:15:48 -0700323 return ResultStatus::OK;
324 }
325 return ResultStatus::NOT_FOUND;
326}
327
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700328ResultStatus ClientManager::Impl::flush(ConnectionId connectionId) {
329 std::shared_ptr<BufferPoolClient> client;
330 {
331 std::lock_guard<std::mutex> lock(mActive.mMutex);
332 auto it = mActive.mClients.find(connectionId);
333 if (it == mActive.mClients.end()) {
334 return ResultStatus::NOT_FOUND;
335 }
336 client = it->second;
337 }
338 return client->flush();
339}
340
Sungtak Leebbe37b62018-08-29 15:15:48 -0700341ResultStatus ClientManager::Impl::allocate(
342 ConnectionId connectionId, const std::vector<uint8_t> &params,
343 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
344 std::shared_ptr<BufferPoolClient> client;
345 {
346 std::lock_guard<std::mutex> lock(mActive.mMutex);
347 auto it = mActive.mClients.find(connectionId);
348 if (it == mActive.mClients.end()) {
349 return ResultStatus::NOT_FOUND;
350 }
351 client = it->second;
352 }
353 return client->allocate(params, handle, buffer);
354}
355
356ResultStatus ClientManager::Impl::receive(
357 ConnectionId connectionId, TransactionId transactionId,
358 BufferId bufferId, int64_t timestampUs,
359 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
360 std::shared_ptr<BufferPoolClient> client;
361 {
362 std::lock_guard<std::mutex> lock(mActive.mMutex);
363 auto it = mActive.mClients.find(connectionId);
364 if (it == mActive.mClients.end()) {
365 return ResultStatus::NOT_FOUND;
366 }
367 client = it->second;
368 }
369 return client->receive(transactionId, bufferId, timestampUs, handle, buffer);
370}
371
372ResultStatus ClientManager::Impl::postSend(
373 ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
374 TransactionId *transactionId, int64_t *timestampUs) {
375 ConnectionId connectionId = buffer->mConnectionId;
376 std::shared_ptr<BufferPoolClient> client;
377 {
378 std::lock_guard<std::mutex> lock(mActive.mMutex);
379 auto it = mActive.mClients.find(connectionId);
380 if (it == mActive.mClients.end()) {
381 return ResultStatus::NOT_FOUND;
382 }
383 client = it->second;
384 }
385 return client->postSend(receiverId, buffer, transactionId, timestampUs);
386}
387
388ResultStatus ClientManager::Impl::getAccessor(
389 ConnectionId connectionId, sp<IAccessor> *accessor) {
390 std::shared_ptr<BufferPoolClient> client;
391 {
392 std::lock_guard<std::mutex> lock(mActive.mMutex);
393 auto it = mActive.mClients.find(connectionId);
394 if (it == mActive.mClients.end()) {
395 return ResultStatus::NOT_FOUND;
396 }
397 client = it->second;
398 }
399 return client->getAccessor(accessor);
400}
401
402void ClientManager::Impl::cleanUp(bool clearCache) {
403 int64_t now = getTimestampNow();
404 int64_t lastTransactionUs;
405 std::lock_guard<std::mutex> lock1(mCache.mMutex);
406 if (clearCache || mCache.mLastCleanUpUs + kCleanUpDurationUs < now) {
407 std::lock_guard<std::mutex> lock2(mActive.mMutex);
408 int cleaned = 0;
409 for (auto it = mActive.mClients.begin(); it != mActive.mClients.end();) {
410 if (!it->second->isActive(&lastTransactionUs, clearCache)) {
411 if (lastTransactionUs + kClientTimeoutUs < now) {
412 sp<IAccessor> accessor;
413 it->second->getAccessor(&accessor);
414 it = mActive.mClients.erase(it);
415 ++cleaned;
416 continue;
417 }
418 }
419 ++it;
420 }
421 for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
422 // clean up dead client caches
423 sp<IAccessor> cAccessor = cit->first.promote();
424 if (!cAccessor) {
425 cit = mCache.mClients.erase(cit);
426 } else {
427 ++cit;
428 }
429 }
430 ALOGV("# of cleaned connections: %d", cleaned);
431 mCache.mLastCleanUpUs = now;
432 }
433}
434
435// Methods from ::android::hardware::media::bufferpool::V2_0::IClientManager follow.
436Return<void> ClientManager::registerSender(const sp<::android::hardware::media::bufferpool::V2_0::IAccessor>& bufferPool, registerSender_cb _hidl_cb) {
437 if (mImpl) {
438 ConnectionId connectionId = -1;
439 ResultStatus status = mImpl->registerSender(bufferPool, &connectionId);
440 _hidl_cb(status, connectionId);
441 } else {
442 _hidl_cb(ResultStatus::CRITICAL_ERROR, -1);
443 }
444 return Void();
445}
446
447// Methods for local use.
448sp<ClientManager> ClientManager::sInstance;
449std::mutex ClientManager::sInstanceLock;
450
451sp<ClientManager> ClientManager::getInstance() {
452 std::lock_guard<std::mutex> lock(sInstanceLock);
453 if (!sInstance) {
454 sInstance = new ClientManager();
455 }
456 return sInstance;
457}
458
459ClientManager::ClientManager() : mImpl(new Impl()) {}
460
461ClientManager::~ClientManager() {
462}
463
464ResultStatus ClientManager::create(
465 const std::shared_ptr<BufferPoolAllocator> &allocator,
466 ConnectionId *pConnectionId) {
467 if (mImpl) {
468 return mImpl->create(allocator, pConnectionId);
469 }
470 return ResultStatus::CRITICAL_ERROR;
471}
472
473ResultStatus ClientManager::registerSender(
474 const sp<IClientManager> &receiver,
475 ConnectionId senderId,
476 ConnectionId *receiverId) {
477 if (mImpl) {
478 return mImpl->registerSender(receiver, senderId, receiverId);
479 }
480 return ResultStatus::CRITICAL_ERROR;
481}
482
483ResultStatus ClientManager::close(ConnectionId connectionId) {
484 if (mImpl) {
485 return mImpl->close(connectionId);
486 }
487 return ResultStatus::CRITICAL_ERROR;
488}
489
Sungtak Leec7f9e2c2018-09-14 16:23:40 -0700490ResultStatus ClientManager::flush(ConnectionId connectionId) {
491 if (mImpl) {
492 return mImpl->flush(connectionId);
493 }
494 return ResultStatus::CRITICAL_ERROR;
495}
496
Sungtak Leebbe37b62018-08-29 15:15:48 -0700497ResultStatus ClientManager::allocate(
498 ConnectionId connectionId, const std::vector<uint8_t> &params,
499 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
500 if (mImpl) {
501 return mImpl->allocate(connectionId, params, handle, buffer);
502 }
503 return ResultStatus::CRITICAL_ERROR;
504}
505
506ResultStatus ClientManager::receive(
507 ConnectionId connectionId, TransactionId transactionId,
508 BufferId bufferId, int64_t timestampUs,
509 native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
510 if (mImpl) {
511 return mImpl->receive(connectionId, transactionId, bufferId,
512 timestampUs, handle, buffer);
513 }
514 return ResultStatus::CRITICAL_ERROR;
515}
516
517ResultStatus ClientManager::postSend(
518 ConnectionId receiverId, const std::shared_ptr<BufferPoolData> &buffer,
519 TransactionId *transactionId, int64_t* timestampUs) {
520 if (mImpl && buffer) {
521 return mImpl->postSend(receiverId, buffer, transactionId, timestampUs);
522 }
523 return ResultStatus::CRITICAL_ERROR;
524}
525
526void ClientManager::cleanUp() {
527 if (mImpl) {
528 mImpl->cleanUp(true);
529 }
530}
531
532} // namespace implementation
533} // namespace V2_0
534} // namespace bufferpool
535} // namespace media
536} // namespace hardware
537} // namespace android