bufferpool2.0: Implement buffer invalidation
Change-Id: If7a4a38004f50b4d43a2fae4781f541fe322c249
diff --git a/media/bufferpool/2.0/Accessor.cpp b/media/bufferpool/2.0/Accessor.cpp
index 3eaea7c..f264501 100644
--- a/media/bufferpool/2.0/Accessor.cpp
+++ b/media/bufferpool/2.0/Accessor.cpp
@@ -117,19 +117,18 @@
Return<void> Accessor::connect(
const sp<::android::hardware::media::bufferpool::V2_0::IObserver>& observer,
connect_cb _hidl_cb) {
- (void)observer;
sp<Connection> connection;
ConnectionId connectionId;
+ uint32_t msgId;
const StatusDescriptor* fmqDesc;
+ const InvalidationDescriptor* invDesc;
- ResultStatus status = connect(&connection, &connectionId, &fmqDesc, false);
+ ResultStatus status = connect(
+ observer, false, &connection, &connectionId, &msgId, &fmqDesc, &invDesc);
if (status == ResultStatus::OK) {
- _hidl_cb(status, connection, connectionId, *fmqDesc,
- android::hardware::MQDescriptorUnsync<BufferInvalidationMessage>(
- std::vector<android::hardware::GrantorDescriptor>(),
- nullptr /* nhandle */, 0 /* size */));
+ _hidl_cb(status, connection, connectionId, msgId, *fmqDesc, *invDesc);
} else {
- _hidl_cb(status, nullptr, -1LL,
+ _hidl_cb(status, nullptr, -1LL, 0,
android::hardware::MQDescriptorSync<BufferStatusMessage>(
std::vector<android::hardware::GrantorDescriptor>(),
nullptr /* nhandle */, 0 /* size */),
@@ -147,7 +146,15 @@
}
bool Accessor::isValid() {
- return (bool)mImpl;
+ return (bool)mImpl && mImpl->isValid();
+}
+
+ResultStatus Accessor::flush() {
+ if (mImpl) {
+ mImpl->flush();
+ return ResultStatus::OK;
+ }
+ return ResultStatus::CRITICAL_ERROR;
}
ResultStatus Accessor::allocate(
@@ -170,10 +177,15 @@
}
ResultStatus Accessor::connect(
+ const sp<IObserver> &observer, bool local,
sp<Connection> *connection, ConnectionId *pConnectionId,
- const StatusDescriptor** fmqDescPtr, bool local) {
+ uint32_t *pMsgId,
+ const StatusDescriptor** statusDescPtr,
+ const InvalidationDescriptor** invDescPtr) {
if (mImpl) {
- ResultStatus status = mImpl->connect(this, connection, pConnectionId, fmqDescPtr);
+ ResultStatus status = mImpl->connect(
+ this, observer, connection, pConnectionId, pMsgId,
+ statusDescPtr, invDescPtr);
if (!local && status == ResultStatus::OK) {
sp<Accessor> accessor(this);
sConnectionDeathRecipient->add(*pConnectionId, accessor);
diff --git a/media/bufferpool/2.0/Accessor.h b/media/bufferpool/2.0/Accessor.h
index a718da1..4b5b17a 100644
--- a/media/bufferpool/2.0/Accessor.h
+++ b/media/bufferpool/2.0/Accessor.h
@@ -95,6 +95,9 @@
/** Returns whether the accessor is valid. */
bool isValid();
+ /** Invalidates all buffers which are owned by bufferpool */
+ ResultStatus flush();
+
/** Allocates a buffer from a buffer pool.
*
* @param connectionId the connection id of the client.
@@ -135,20 +138,28 @@
* created connection in order to communicate with the buffer pool. An
* FMQ for buffer status message is also created for the client.
*
- * @param connection created connection
- * @param pConnectionId the id of the created connection
- * @param fmqDescPtr FMQ descriptor for shared buffer status message
- * queue between a buffer pool and the client.
+ * @param observer client observer for buffer invalidation
* @param local true when a connection request comes from local process,
* false otherwise.
+ * @param connection created connection
+ * @param pConnectionId the id of the created connection
+ * @param pMsgId the id of the recent buffer pool message
+ * @param statusDescPtr FMQ descriptor for shared buffer status message
+ * queue between a buffer pool and the client.
+ * @param invDescPtr FMQ descriptor for buffer invalidation message
+ * queue from a buffer pool to the client.
*
* @return OK when a connection is successfully made.
* NO_MEMORY when there is no memory.
* CRITICAL_ERROR otherwise.
*/
ResultStatus connect(
+ const sp<IObserver>& observer,
+ bool local,
sp<Connection> *connection, ConnectionId *pConnectionId,
- const StatusDescriptor** fmqDescPtr, bool local);
+ uint32_t *pMsgId,
+ const StatusDescriptor** statusDescPtr,
+ const InvalidationDescriptor** invDescPtr);
/**
* Closes the specified connection to the client.
@@ -176,7 +187,7 @@
private:
class Impl;
- std::unique_ptr<Impl> mImpl;
+ std::shared_ptr<Impl> mImpl;
};
} // namespace implementation
diff --git a/media/bufferpool/2.0/AccessorImpl.cpp b/media/bufferpool/2.0/AccessorImpl.cpp
index 0ba6600..4cc8abc 100644
--- a/media/bufferpool/2.0/AccessorImpl.cpp
+++ b/media/bufferpool/2.0/AccessorImpl.cpp
@@ -21,6 +21,7 @@
#include <time.h>
#include <unistd.h>
#include <utils/Log.h>
+#include <thread>
#include "AccessorImpl.h"
#include "Connection.h"
@@ -47,6 +48,7 @@
const std::shared_ptr<BufferPoolAllocation> mAllocation;
const size_t mAllocSize;
const std::vector<uint8_t> mConfig;
+ bool mInvalidated;
InternalBuffer(
BufferId id,
@@ -54,11 +56,16 @@
const size_t allocSize,
const std::vector<uint8_t> &allocConfig)
: mId(id), mOwnerCount(0), mTransactionCount(0),
- mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
+ mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
+ mInvalidated(false) {}
const native_handle_t *handle() {
return mAllocation->handle();
}
+
+ void invalidate() {
+ mInvalidated = true;
+ }
};
struct TransactionStatus {
@@ -138,21 +145,29 @@
}
ResultStatus Accessor::Impl::connect(
- const sp<Accessor> &accessor, sp<Connection> *connection,
- ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr) {
+ const sp<Accessor> &accessor, const sp<IObserver> &observer,
+ sp<Connection> *connection,
+ ConnectionId *pConnectionId,
+ uint32_t *pMsgId,
+ const StatusDescriptor** statusDescPtr,
+ const InvalidationDescriptor** invDescPtr) {
sp<Connection> newConnection = new Connection();
ResultStatus status = ResultStatus::CRITICAL_ERROR;
{
std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
if (newConnection) {
ConnectionId id = (int64_t)sPid << 32 | sSeqId;
- status = mBufferPool.mObserver.open(id, fmqDescPtr);
+ status = mBufferPool.mObserver.open(id, statusDescPtr);
if (status == ResultStatus::OK) {
newConnection->initialize(accessor, id);
*connection = newConnection;
*pConnectionId = id;
+ *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
+ mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
+ mBufferPool.mInvalidation.onConnect(id, observer);
++sSeqId;
}
+
}
mBufferPool.processStatusMessages();
mBufferPool.cleanUp();
@@ -165,6 +180,7 @@
mBufferPool.processStatusMessages();
mBufferPool.handleClose(connectionId);
mBufferPool.mObserver.close(connectionId);
+ mBufferPool.mInvalidation.onClose(connectionId);
// Since close# will be called after all works are finished, it is OK to
// evict unused buffers.
mBufferPool.cleanUp(true);
@@ -229,11 +245,30 @@
mBufferPool.cleanUp(clearCache);
}
+void Accessor::Impl::flush() {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ mBufferPool.flush(shared_from_this());
+}
+
+void Accessor::Impl::handleInvalidateAck() {
+ std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+ mBufferPool.processStatusMessages();
+ mBufferPool.mInvalidation.onHandleAck();
+}
+
+bool Accessor::Impl::isValid() {
+ return mBufferPool.isValid();
+}
+
Accessor::Impl::Impl::BufferPool::BufferPool()
: mTimestampUs(getTimestampNow()),
mLastCleanUpUs(mTimestampUs),
mLastLogUs(mTimestampUs),
- mSeq(0) {}
+ mSeq(0),
+ mStartSeq(0) {
+ mValid = mInvalidationChannel.isValid();
+}
// Statistics helper
@@ -242,6 +277,8 @@
return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
}
+std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sSeqId(0);
+
Accessor::Impl::Impl::BufferPool::~BufferPool() {
std::lock_guard<std::mutex> lock(mMutex);
ALOGD("Destruction - bufferpool %p "
@@ -255,6 +292,96 @@
percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
}
+void Accessor::Impl::BufferPool::Invalidation::onConnect(
+ ConnectionId conId, const sp<IObserver>& observer) {
+ mAcks[conId] = mInvalidationId; // starts from current invalidationId
+ mObservers.insert(std::make_pair(conId, observer));
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
+ mAcks.erase(conId);
+ mObservers.erase(conId);
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onAck(
+ ConnectionId conId,
+ uint32_t msgId) {
+ auto it = mAcks.find(conId);
+ if (it == mAcks.end() || isMessageLater(msgId, it->second)) {
+ mAcks[conId] = msgId;
+ }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
+ BufferId bufferId,
+ BufferInvalidationChannel &channel) {
+ for (auto it = mPendings.begin(); it != mPendings.end();) {
+ if (it->invalidate(bufferId)) {
+ it = mPendings.erase(it);
+ uint32_t msgId = 0;
+ if (it->mNeedsAck) {
+ msgId = ++mInvalidationId;
+ if (msgId == 0) {
+ // wrap happens
+ msgId = ++mInvalidationId;
+ }
+ }
+ channel.postInvalidation(msgId, it->mFrom, it->mTo);
+ sInvalidator.addAccessor(mId, it->mImpl);
+ continue;
+ }
+ ++it;
+ }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
+ bool needsAck,
+ uint32_t from,
+ uint32_t to,
+ size_t left,
+ BufferInvalidationChannel &channel,
+ const std::shared_ptr<Accessor::Impl> &impl) {
+ if (left == 0) {
+ uint32_t msgId = 0;
+ if (needsAck) {
+ msgId = ++mInvalidationId;
+ if (msgId == 0) {
+ // wrap happens
+ msgId = ++mInvalidationId;
+ }
+ }
+ channel.postInvalidation(msgId, from, to);
+ sInvalidator.addAccessor(mId, impl);
+ } else {
+ // TODO: sending hint message?
+ Pending pending(needsAck, from, to, left, impl);
+ mPendings.push_back(pending);
+ }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onHandleAck() {
+ if (mInvalidationId != 0) {
+ std::set<int> deads;
+ for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
+ if (it->second != mInvalidationId) {
+ const sp<IObserver> observer = mObservers[it->first].promote();
+ if (observer) {
+ observer->onMessage(it->first, mInvalidationId);
+ } else {
+ deads.insert(it->first);
+ }
+ }
+ }
+ if (deads.size() > 0) {
+ for (auto it = deads.begin(); it != deads.end(); ++it) {
+ onClose(*it);
+ }
+ }
+ }
+ // All invalidation Ids are synced.
+ sInvalidator.delAccessor(mId);
+}
+
bool Accessor::Impl::BufferPool::handleOwnBuffer(
ConnectionId connectionId, BufferId bufferId) {
@@ -275,8 +402,15 @@
iter->second->mOwnerCount--;
if (iter->second->mOwnerCount == 0 &&
iter->second->mTransactionCount == 0) {
- mStats.onBufferUnused(iter->second->mAllocSize);
- mFreeBuffers.insert(bufferId);
+ if (!iter->second->mInvalidated) {
+ mStats.onBufferUnused(iter->second->mAllocSize);
+ mFreeBuffers.insert(bufferId);
+ } else {
+ mStats.onBufferUnused(iter->second->mAllocSize);
+ mStats.onBufferEvicted(iter->second->mAllocSize);
+ mBuffers.erase(iter);
+ mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+ }
}
}
erase(&mUsingConnections, bufferId, connectionId);
@@ -352,8 +486,15 @@
bufferIter->second->mTransactionCount--;
if (bufferIter->second->mOwnerCount == 0
&& bufferIter->second->mTransactionCount == 0) {
- mStats.onBufferUnused(bufferIter->second->mAllocSize);
- mFreeBuffers.insert(message.bufferId);
+ if (!bufferIter->second->mInvalidated) {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mFreeBuffers.insert(message.bufferId);
+ } else {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+ mBuffers.erase(bufferIter);
+ mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
+ }
}
mTransactions.erase(found);
}
@@ -400,7 +541,7 @@
ret = handleTransferResult(message);
break;
case BufferStatus::INVALIDATION_ACK:
- // TODO
+ mInvalidation.onAck(message.connectionId, message.bufferId);
break;
}
if (ret == false) {
@@ -423,8 +564,15 @@
if (bufferIter->second->mOwnerCount == 0 &&
bufferIter->second->mTransactionCount == 0) {
// TODO: handle freebuffer insert fail
- mStats.onBufferUnused(bufferIter->second->mAllocSize);
- mFreeBuffers.insert(bufferId);
+ if (!bufferIter->second->mInvalidated) {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mFreeBuffers.insert(bufferId);
+ } else {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+ mBuffers.erase(bufferIter);
+ mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+ }
}
}
}
@@ -446,8 +594,15 @@
if (bufferIter->second->mOwnerCount == 0 &&
bufferIter->second->mTransactionCount == 0) {
// TODO: handle freebuffer insert fail
- mStats.onBufferUnused(bufferIter->second->mAllocSize);
- mFreeBuffers.insert(bufferId);
+ if (!bufferIter->second->mInvalidated) {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mFreeBuffers.insert(bufferId);
+ } else {
+ mStats.onBufferUnused(bufferIter->second->mAllocSize);
+ mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+ mBuffers.erase(bufferIter);
+ mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+ }
}
mTransactions.erase(iter);
}
@@ -538,6 +693,121 @@
}
}
+void Accessor::Impl::BufferPool::invalidate(
+ bool needsAck, BufferId from, BufferId to,
+ const std::shared_ptr<Accessor::Impl> &impl) {
+ for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
+ if (isBufferInRange(from, to, *freeIt)) {
+ auto it = mBuffers.find(*freeIt);
+ if (it != mBuffers.end() &&
+ it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
+ mStats.onBufferEvicted(it->second->mAllocSize);
+ mBuffers.erase(it);
+ freeIt = mFreeBuffers.erase(freeIt);
+ continue;
+ } else {
+ ALOGW("bufferpool inconsistent!");
+ }
+ }
+ ++freeIt;
+ }
+
+ size_t left = 0;
+ for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
+ if (isBufferInRange(from, to, it->first)) {
+ it->second->invalidate();
+ ++left;
+ }
+ }
+ mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
+}
+
+void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
+ BufferId from = mStartSeq;
+ BufferId to = mSeq;
+ mStartSeq = mSeq;
+ // TODO: needsAck params
+ if (from != to) {
+ invalidate(true, from, to, impl);
+ }
+}
+
+void Accessor::Impl::invalidatorThread(
+ std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
+ std::mutex &mutex,
+ std::condition_variable &cv,
+ bool &ready) {
+ while(true) {
+ std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ if (!ready) {
+ cv.wait(lock);
+ }
+ copied.insert(accessors.begin(), accessors.end());
+ }
+ std::list<ConnectionId> erased;
+ for (auto it = copied.begin(); it != copied.end(); ++it) {
+ const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
+ if (!impl) {
+ erased.push_back(it->first);
+ } else {
+ impl->handleInvalidateAck();
+ }
+ }
+ {
+ std::unique_lock<std::mutex> lock(mutex);
+ for (auto it = erased.begin(); it != erased.end(); ++it) {
+ accessors.erase(*it);
+ }
+ if (accessors.size() == 0) {
+ ready = false;
+ } else {
+ // prevent draining cpu.
+ lock.unlock();
+ std::this_thread::yield();
+ }
+ }
+ }
+}
+
+Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
+ std::thread invalidator(
+ invalidatorThread,
+ std::ref(mAccessors),
+ std::ref(mMutex),
+ std::ref(mCv),
+ std::ref(mReady));
+ invalidator.detach();
+}
+
+void Accessor::Impl::AccessorInvalidator::addAccessor(
+ uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
+ bool notify = false;
+ std::unique_lock<std::mutex> lock(mMutex);
+ if (mAccessors.find(accessorId) == mAccessors.end()) {
+ if (!mReady) {
+ mReady = true;
+ notify = true;
+ }
+ mAccessors.insert(std::make_pair(accessorId, impl));
+ }
+ lock.unlock();
+ if (notify) {
+ mCv.notify_one();
+ }
+}
+
+void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
+ std::lock_guard<std::mutex> lock(mMutex);
+ mAccessors.erase(accessorId);
+ if (mAccessors.size() == 0) {
+ mReady = false;
+ }
+}
+
+Accessor::Impl::AccessorInvalidator Accessor::Impl::sInvalidator;
+
} // namespace implementation
} // namespace V2_0
} // namespace bufferpool
diff --git a/media/bufferpool/2.0/AccessorImpl.h b/media/bufferpool/2.0/AccessorImpl.h
index 1d33880..6b03494 100644
--- a/media/bufferpool/2.0/AccessorImpl.h
+++ b/media/bufferpool/2.0/AccessorImpl.h
@@ -19,6 +19,7 @@
#include <map>
#include <set>
+#include <condition_variable>
#include "Accessor.h"
namespace android {
@@ -33,15 +34,20 @@
/**
* An implementation of a buffer pool accessor(or a buffer pool implementation.) */
-class Accessor::Impl {
+class Accessor::Impl
+ : public std::enable_shared_from_this<Accessor::Impl> {
public:
Impl(const std::shared_ptr<BufferPoolAllocator> &allocator);
~Impl();
ResultStatus connect(
- const sp<Accessor> &accessor, sp<Connection> *connection,
- ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr);
+ const sp<Accessor> &accessor, const sp<IObserver> &observer,
+ sp<Connection> *connection,
+ ConnectionId *pConnectionId,
+ uint32_t *pMsgId,
+ const StatusDescriptor** statusDescPtr,
+ const InvalidationDescriptor** invDescPtr);
ResultStatus close(ConnectionId connectionId);
@@ -55,8 +61,14 @@
BufferId bufferId,
const native_handle_t** handle);
+ void flush();
+
void cleanUp(bool clearCache);
+ bool isValid();
+
+ void handleInvalidateAck();
+
private:
// ConnectionId = pid : (timestamp_created + seqId)
// in order to guarantee uniqueness for each connection
@@ -78,7 +90,10 @@
int64_t mLastCleanUpUs;
int64_t mLastLogUs;
BufferId mSeq;
+ BufferId mStartSeq;
+ bool mValid;
BufferStatusObserver mObserver;
+ BufferInvalidationChannel mInvalidationChannel;
std::map<ConnectionId, std::set<BufferId>> mUsingBuffers;
std::map<BufferId, std::set<ConnectionId>> mUsingConnections;
@@ -95,6 +110,54 @@
std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers;
std::set<BufferId> mFreeBuffers;
+ struct Invalidation {
+ static std::atomic<std::uint32_t> sSeqId;
+
+ struct Pending {
+ bool mNeedsAck;
+ uint32_t mFrom;
+ uint32_t mTo;
+ size_t mLeft;
+ const std::weak_ptr<Accessor::Impl> mImpl;
+ Pending(bool needsAck, uint32_t from, uint32_t to, size_t left,
+ const std::shared_ptr<Accessor::Impl> &impl)
+ : mNeedsAck(needsAck),
+ mFrom(from),
+ mTo(to),
+ mLeft(left),
+ mImpl(impl)
+ {}
+
+ bool invalidate(uint32_t bufferId) {
+ return isBufferInRange(mFrom, mTo, bufferId) && --mLeft == 0;
+ }
+ };
+
+ std::list<Pending> mPendings;
+ std::map<ConnectionId, uint32_t> mAcks;
+ std::map<ConnectionId, const wp<IObserver>> mObservers;
+ uint32_t mInvalidationId;
+ uint32_t mId;
+
+ Invalidation() : mInvalidationId(0), mId(sSeqId.fetch_add(1)) {}
+
+ void onConnect(ConnectionId conId, const sp<IObserver> &observer);
+
+ void onClose(ConnectionId conId);
+
+ void onAck(ConnectionId conId, uint32_t msgId);
+
+ void onBufferInvalidated(
+ BufferId bufferId,
+ BufferInvalidationChannel &channel);
+
+ void onInvalidationRequest(
+ bool needsAck, uint32_t from, uint32_t to, size_t left,
+ BufferInvalidationChannel &channel,
+ const std::shared_ptr<Accessor::Impl> &impl);
+
+ void onHandleAck();
+ } mInvalidation;
/// Buffer pool statistics which tracks allocation and transfer statistics.
struct Stats {
/// Total size of allocations which are used or available to use.
@@ -164,6 +227,13 @@
}
} mStats;
+ bool isValid() {
+ return mValid;
+ }
+
+ void invalidate(bool needsAck, BufferId from, BufferId to,
+ const std::shared_ptr<Accessor::Impl> &impl);
+
public:
/** Creates a buffer pool. */
BufferPool();
@@ -286,8 +356,33 @@
*/
void cleanUp(bool clearCache = false);
+ /**
+ * Processes pending buffer status messages and invalidate all current
+ * free buffers. Active buffers are invalidated after being inactive.
+ */
+ void flush(const std::shared_ptr<Accessor::Impl> &impl);
+
friend class Accessor::Impl;
} mBufferPool;
+
+ struct AccessorInvalidator {
+ std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> mAccessors;
+ std::mutex mMutex;
+ std::condition_variable mCv;
+ bool mReady;
+
+ AccessorInvalidator();
+ void addAccessor(uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl);
+ void delAccessor(uint32_t accessorId);
+ };
+
+ static AccessorInvalidator sInvalidator;
+
+ static void invalidatorThread(
+ std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
+ std::mutex &mutex,
+ std::condition_variable &cv,
+ bool &ready);
};
} // namespace implementation
diff --git a/media/bufferpool/2.0/Android.bp b/media/bufferpool/2.0/Android.bp
index 413125a..cd4e06e 100644
--- a/media/bufferpool/2.0/Android.bp
+++ b/media/bufferpool/2.0/Android.bp
@@ -8,6 +8,7 @@
"BufferStatus.cpp",
"ClientManager.cpp",
"Connection.cpp",
+ "Observer.cpp",
],
export_include_dirs: [
"include",
diff --git a/media/bufferpool/2.0/BufferPoolClient.cpp b/media/bufferpool/2.0/BufferPoolClient.cpp
index 0f763f7..c80beff 100644
--- a/media/bufferpool/2.0/BufferPoolClient.cpp
+++ b/media/bufferpool/2.0/BufferPoolClient.cpp
@@ -36,9 +36,9 @@
class BufferPoolClient::Impl
: public std::enable_shared_from_this<BufferPoolClient::Impl> {
public:
- explicit Impl(const sp<Accessor> &accessor);
+ explicit Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer);
- explicit Impl(const sp<IAccessor> &accessor);
+ explicit Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer);
bool isValid() {
return mValid;
@@ -58,6 +58,10 @@
bool isActive(int64_t *lastTransactionUs, bool clearCache);
+ void receiveInvalidation(uint32_t msgID);
+
+ ResultStatus flush();
+
ResultStatus allocate(const std::vector<uint8_t> ¶ms,
native_handle_t **handle,
std::shared_ptr<BufferPoolData> *buffer);
@@ -83,10 +87,14 @@
void trySyncFromRemote();
- bool syncReleased();
+ bool syncReleased(uint32_t msgId = 0);
void evictCaches(bool clearCache = false);
+ void invalidateBuffer(BufferId id);
+
+ void invalidateRange(BufferId from, BufferId to);
+
ResultStatus allocateBufferHandle(
const std::vector<uint8_t>& params, BufferId *bufferId,
native_handle_t **handle);
@@ -106,6 +114,7 @@
uint32_t mSeqId;
ConnectionId mConnectionId;
int64_t mLastEvictCacheUs;
+ std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
// CachedBuffers
struct BufferCache {
@@ -130,12 +139,16 @@
} mCache;
// FMQ - release notifier
- struct {
+ struct ReleaseCache {
std::mutex mLock;
// TODO: use only one list?(using one list may dealy sending messages?)
std::list<BufferId> mReleasingIds;
std::list<BufferId> mReleasedIds;
+ uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
+ bool mInvalidateAck;
std::unique_ptr<BufferStatusChannel> mStatusChannel;
+
+ ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {}
} mReleasing;
// This lock is held during synchronization from remote side.
@@ -162,7 +175,6 @@
struct BufferPoolClient::Impl::ClientBuffer {
private:
- bool mInvalidated; // TODO: implement
int64_t mExpireUs;
bool mHasCache;
ConnectionId mConnectionId;
@@ -177,9 +189,8 @@
public:
ClientBuffer(
ConnectionId connectionId, BufferId id, native_handle_t *handle)
- : mInvalidated(false), mHasCache(false),
- mConnectionId(connectionId), mId(id), mHandle(handle) {
- (void)mInvalidated;
+ : mHasCache(false), mConnectionId(connectionId),
+ mId(id), mHandle(handle) {
mExpireUs = getTimestampNow() + kCacheTtlUs;
}
@@ -190,6 +201,10 @@
}
}
+ BufferId id() const {
+ return mId;
+ }
+
bool expire() const {
int64_t now = getTimestampNow();
return now >= mExpireUs;
@@ -244,41 +259,53 @@
}
};
-BufferPoolClient::Impl::Impl(const sp<Accessor> &accessor)
+BufferPoolClient::Impl::Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer)
: mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
mLastEvictCacheUs(getTimestampNow()) {
- const StatusDescriptor *fmqDesc;
+ const StatusDescriptor *statusDesc;
+ const InvalidationDescriptor *invDesc;
ResultStatus status = accessor->connect(
- &mLocalConnection, &mConnectionId, &fmqDesc, true);
+ observer, true,
+ &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
+ &statusDesc, &invDesc);
if (status == ResultStatus::OK) {
mReleasing.mStatusChannel =
- std::make_unique<BufferStatusChannel>(*fmqDesc);
+ std::make_unique<BufferStatusChannel>(*statusDesc);
+ mInvalidationListener =
+ std::make_unique<BufferInvalidationListener>(*invDesc);
mValid = mReleasing.mStatusChannel &&
- mReleasing.mStatusChannel->isValid();
+ mReleasing.mStatusChannel->isValid() &&
+ mInvalidationListener &&
+ mInvalidationListener->isValid();
}
}
-BufferPoolClient::Impl::Impl(const sp<IAccessor> &accessor)
+BufferPoolClient::Impl::Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer)
: mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
mLastEvictCacheUs(getTimestampNow()) {
bool valid = false;
- sp<IObserver> observer; // TODO
sp<IConnection>& outConnection = mRemoteConnection;
ConnectionId& id = mConnectionId;
+ uint32_t& outMsgId = mReleasing.mInvalidateId;
std::unique_ptr<BufferStatusChannel>& outChannel =
mReleasing.mStatusChannel;
+ std::unique_ptr<BufferInvalidationListener>& outObserver =
+ mInvalidationListener;
Return<void> transResult = accessor->connect(
observer,
- [&valid, &outConnection, &id, &outChannel]
+ [&valid, &outConnection, &id, &outMsgId, &outChannel, &outObserver]
(ResultStatus status, sp<IConnection> connection,
- ConnectionId connectionId, const StatusDescriptor& desc,
+ ConnectionId connectionId, uint32_t msgId,
+ const StatusDescriptor& statusDesc,
const InvalidationDescriptor& invDesc) {
- (void) invDesc;
if (status == ResultStatus::OK) {
outConnection = connection;
id = connectionId;
- outChannel = std::make_unique<BufferStatusChannel>(desc);
- if (outChannel && outChannel->isValid()) {
+ outMsgId = msgId;
+ outChannel = std::make_unique<BufferStatusChannel>(statusDesc);
+ outObserver = std::make_unique<BufferInvalidationListener>(invDesc);
+ if (outChannel && outChannel->isValid() &&
+ outObserver && outObserver->isValid()) {
valid = true;
}
}
@@ -302,6 +329,24 @@
return active;
}
+void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) {
+ std::lock_guard<std::mutex> lock(mCache.mLock);
+ syncReleased(messageId);
+ // TODO: evict cache required?
+}
+
+ResultStatus BufferPoolClient::Impl::flush() {
+ if (!mLocal || !mLocalConnection || !mValid) {
+ return ResultStatus::CRITICAL_ERROR;
+ }
+ {
+ std::unique_lock<std::mutex> lock(mCache.mLock);
+ syncReleased();
+ evictCaches();
+ return mLocalConnection->flush();
+ }
+}
+
ResultStatus BufferPoolClient::Impl::allocate(
const std::vector<uint8_t> ¶ms,
native_handle_t **pHandle,
@@ -455,6 +500,11 @@
bool BufferPoolClient::Impl::postSend(
BufferId bufferId, ConnectionId receiver,
TransactionId *transactionId, int64_t *timestampUs) {
+ {
+ // TODO: don't need to call syncReleased every time
+ std::lock_guard<std::mutex> lock(mCache.mLock);
+ syncReleased();
+ }
bool ret = false;
bool needsSync = false;
{
@@ -538,34 +588,74 @@
}
// should have mCache.mLock
-bool BufferPoolClient::Impl::syncReleased() {
- std::lock_guard<std::mutex> lock(mReleasing.mLock);
- if (mReleasing.mReleasingIds.size() > 0) {
- mReleasing.mStatusChannel->postBufferRelease(
- mConnectionId, mReleasing.mReleasingIds,
- mReleasing.mReleasedIds);
- }
- if (mReleasing.mReleasedIds.size() > 0) {
- for (BufferId& id: mReleasing.mReleasedIds) {
- ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
- auto found = mCache.mBuffers.find(id);
- if (found != mCache.mBuffers.end()) {
- if (found->second->onCacheRelease()) {
- mCache.decActive_l();
+bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) {
+ bool cleared = false;
+ {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ if (mReleasing.mReleasingIds.size() > 0) {
+ mReleasing.mStatusChannel->postBufferRelease(
+ mConnectionId, mReleasing.mReleasingIds,
+ mReleasing.mReleasedIds);
+ }
+ if (mReleasing.mReleasedIds.size() > 0) {
+ for (BufferId& id: mReleasing.mReleasedIds) {
+ ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
+ auto found = mCache.mBuffers.find(id);
+ if (found != mCache.mBuffers.end()) {
+ if (found->second->onCacheRelease()) {
+ mCache.decActive_l();
+ } else {
+ // should not happen!
+ ALOGW("client %lld cache release status inconsitent!",
+ (long long)mConnectionId);
+ }
} else {
// should not happen!
- ALOGW("client %lld cache release status inconsitent!",
- (long long)mConnectionId);
+ ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId);
}
+ }
+ mReleasing.mReleasedIds.clear();
+ cleared = true;
+ }
+ }
+ std::vector<BufferInvalidationMessage> invalidations;
+ mInvalidationListener->getInvalidations(invalidations);
+ uint32_t lastMsgId = 0;
+ if (invalidations.size() > 0) {
+ for (auto it = invalidations.begin(); it != invalidations.end(); ++it) {
+ if (it->messageId != 0) {
+ lastMsgId = it->messageId;
+ }
+ if (it->fromBufferId == it->toBufferId) {
+ // TODO: handle fromBufferId = UINT32_MAX
+ invalidateBuffer(it->fromBufferId);
} else {
- // should not happen!
- ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId);
+ invalidateRange(it->fromBufferId, it->toBufferId);
}
}
- mReleasing.mReleasedIds.clear();
- return true;
}
- return false;
+ {
+ std::lock_guard<std::mutex> lock(mReleasing.mLock);
+ if (lastMsgId != 0) {
+ if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) {
+ mReleasing.mInvalidateId = lastMsgId;
+ mReleasing.mInvalidateAck = false;
+ }
+ } else if (messageId != 0) {
+ // messages are drained.
+ if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
+ mReleasing.mInvalidateId = lastMsgId;
+ mReleasing.mInvalidateAck = true;
+ }
+ }
+ if (!mReleasing.mInvalidateAck) {
+ // post ACK
+ mReleasing.mStatusChannel->postBufferInvalidateAck(
+ mConnectionId,
+ mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
+ }
+ }
+ return cleared;
}
// should have mCache.mLock
@@ -587,6 +677,49 @@
}
}
+// should have mCache.mLock
+void BufferPoolClient::Impl::invalidateBuffer(BufferId id) {
+ for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) {
+ if (id == it->second->id()) {
+ if (!it->second->hasCache()) {
+ mCache.mBuffers.erase(it);
+ ALOGV("cache invalidated %lld : buffer %u",
+ (long long)mConnectionId, id);
+ } else {
+ ALOGW("Inconsitent invalidation %lld : activer buffer!! %u",
+ (long long)mConnectionId, (unsigned int)id);
+ }
+ break;
+ }
+ }
+}
+
+// should have mCache.mLock
+void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) {
+ size_t invalidated = 0;
+ for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
+ if (!it->second->hasCache()) {
+ BufferId bid = it->second->id();
+ if (from < to) {
+ if (from <= bid && bid < to) {
+ ++invalidated;
+ it = mCache.mBuffers.erase(it);
+ continue;
+ }
+ } else {
+ if (from <= bid || bid < to) {
+ ++invalidated;
+ it = mCache.mBuffers.erase(it);
+ continue;
+ }
+ }
+ }
+ ++it;
+ }
+ ALOGV("cache invalidated %lld : # of invalidated %zu",
+ (long long)mConnectionId, invalidated);
+}
+
ResultStatus BufferPoolClient::Impl::allocateBufferHandle(
const std::vector<uint8_t>& params, BufferId *bufferId,
native_handle_t** handle) {
@@ -629,12 +762,14 @@
}
-BufferPoolClient::BufferPoolClient(const sp<Accessor> &accessor) {
- mImpl = std::make_shared<Impl>(accessor);
+BufferPoolClient::BufferPoolClient(const sp<Accessor> &accessor,
+ const sp<IObserver> &observer) {
+ mImpl = std::make_shared<Impl>(accessor, observer);
}
-BufferPoolClient::BufferPoolClient(const sp<IAccessor> &accessor) {
- mImpl = std::make_shared<Impl>(accessor);
+BufferPoolClient::BufferPoolClient(const sp<IAccessor> &accessor,
+ const sp<IObserver> &observer) {
+ mImpl = std::make_shared<Impl>(accessor, observer);
}
BufferPoolClient::~BufferPoolClient() {
@@ -672,6 +807,19 @@
return ResultStatus::CRITICAL_ERROR;
}
+void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
+ if (isValid()) {
+ mImpl->receiveInvalidation(msgId);
+ }
+}
+
+ResultStatus BufferPoolClient::flush() {
+ if (isValid()) {
+ return mImpl->flush();
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
ResultStatus BufferPoolClient::allocate(
const std::vector<uint8_t> ¶ms,
native_handle_t **handle,
diff --git a/media/bufferpool/2.0/BufferPoolClient.h b/media/bufferpool/2.0/BufferPoolClient.h
index 1889ea3..e8d9ae6 100644
--- a/media/bufferpool/2.0/BufferPoolClient.h
+++ b/media/bufferpool/2.0/BufferPoolClient.h
@@ -49,7 +49,8 @@
* Creates a buffer pool client from a local buffer pool
* (via ClientManager#create).
*/
- explicit BufferPoolClient(const sp<Accessor> &accessor);
+ explicit BufferPoolClient(const sp<Accessor> &accessor,
+ const sp<IObserver> &observer);
/**
* Creates a buffer pool client from a remote buffer pool
@@ -57,7 +58,8 @@
* Note: A buffer pool client created with remote buffer pool cannot
* allocate a buffer.
*/
- explicit BufferPoolClient(const sp<IAccessor> &accessor);
+ explicit BufferPoolClient(const sp<IAccessor> &accessor,
+ const sp<IObserver> &observer);
/** Destructs a buffer pool client. */
~BufferPoolClient();
@@ -73,6 +75,10 @@
ResultStatus getAccessor(sp<IAccessor> *accessor);
+ void receiveInvalidation(uint32_t msgId);
+
+ ResultStatus flush();
+
ResultStatus allocate(const std::vector<uint8_t> ¶ms,
native_handle_t **handle,
std::shared_ptr<BufferPoolData> *buffer);
@@ -92,6 +98,7 @@
std::shared_ptr<Impl> mImpl;
friend struct ClientManager;
+ friend struct Observer;
};
} // namespace implementation
diff --git a/media/bufferpool/2.0/BufferStatus.cpp b/media/bufferpool/2.0/BufferStatus.cpp
index 0d3f5a3..6937260 100644
--- a/media/bufferpool/2.0/BufferStatus.cpp
+++ b/media/bufferpool/2.0/BufferStatus.cpp
@@ -17,6 +17,7 @@
#define LOG_TAG "BufferPoolStatus"
//#define LOG_NDEBUG 0
+#include <thread>
#include <time.h>
#include "BufferStatus.h"
@@ -37,6 +38,18 @@
return stamp;
}
+bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId) {
+ return curMsgId != prevMsgId && curMsgId - prevMsgId < prevMsgId - curMsgId;
+}
+
+bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId) {
+ if (from < to) {
+ return from <= bufferId && bufferId < to;
+ } else { // wrap happens
+ return from <= bufferId || bufferId < to;
+ }
+}
+
static constexpr int kNumElementsInQueue = 1024*16;
static constexpr int kMinElementsToSyncInQueue = 128;
@@ -139,6 +152,29 @@
}
}
+void BufferStatusChannel::postBufferInvalidateAck(
+ ConnectionId connectionId,
+ uint32_t invalidateId,
+ bool *invalidated) {
+ if (mValid && !*invalidated) {
+ size_t avail = mBufferStatusQueue->availableToWrite();
+ if (avail > 0) {
+ BufferStatusMessage message;
+ message.newStatus = BufferStatus::INVALIDATION_ACK;
+ message.bufferId = invalidateId;
+ message.connectionId = connectionId;
+ if (!mBufferStatusQueue->write(&message, 1)) {
+ // Since avaliable # of writes are already confirmed,
+ // this should not happen.
+ // TODO: error handing?
+ ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
+ return;
+ }
+ *invalidated = true;
+ }
+ }
+}
+
bool BufferStatusChannel::postBufferStatusMessage(
TransactionId transactionId, BufferId bufferId,
BufferStatus status, ConnectionId connectionId, ConnectionId targetId,
@@ -182,6 +218,83 @@
return false;
}
+BufferInvalidationListener::BufferInvalidationListener(
+ const InvalidationDescriptor &fmqDesc) {
+ std::unique_ptr<BufferInvalidationQueue> queue =
+ std::make_unique<BufferInvalidationQueue>(fmqDesc);
+ if (!queue || queue->isValid() == false) {
+ mValid = false;
+ return;
+ }
+ mValid = true;
+ mBufferInvalidationQueue = std::move(queue);
+ // drain previous messages
+ size_t avail = std::min(
+ mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue);
+ std::vector<BufferInvalidationMessage> temp(avail);
+ if (avail > 0) {
+ mBufferInvalidationQueue->read(temp.data(), avail);
+ }
+}
+
+void BufferInvalidationListener::getInvalidations(
+ std::vector<BufferInvalidationMessage> &messages) {
+ // Try twice in case of overflow.
+ // TODO: handling overflow though it may not happen.
+ for (int i = 0; i < 2; ++i) {
+ size_t avail = std::min(
+ mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue);
+ if (avail > 0) {
+ std::vector<BufferInvalidationMessage> temp(avail);
+ if (mBufferInvalidationQueue->read(temp.data(), avail)) {
+ messages.reserve(messages.size() + avail);
+ for (auto it = temp.begin(); it != temp.end(); ++it) {
+ messages.push_back(*it);
+ }
+ break;
+ }
+ } else {
+ return;
+ }
+ }
+}
+
+bool BufferInvalidationListener::isValid() {
+ return mValid;
+}
+
+BufferInvalidationChannel::BufferInvalidationChannel()
+ : mValid(true),
+ mBufferInvalidationQueue(
+ std::make_unique<BufferInvalidationQueue>(kNumElementsInQueue, true)) {
+ if (!mBufferInvalidationQueue || mBufferInvalidationQueue->isValid() == false) {
+ mValid = false;
+ }
+}
+
+bool BufferInvalidationChannel::isValid() {
+ return mValid;
+}
+
+void BufferInvalidationChannel::getDesc(const InvalidationDescriptor **fmqDescPtr) {
+ if (mValid) {
+ *fmqDescPtr = mBufferInvalidationQueue->getDesc();
+ } else {
+ *fmqDescPtr = nullptr;
+ }
+}
+
+void BufferInvalidationChannel::postInvalidation(
+ uint32_t msgId, BufferId fromId, BufferId toId) {
+ BufferInvalidationMessage message;
+
+ message.messageId = msgId;
+ message.fromBufferId = fromId;
+ message.toBufferId = toId;
+ // TODO: handle failure (it does not happen normally.)
+ mBufferInvalidationQueue->write(&message);
+}
+
} // namespace implementation
} // namespace V2_0
} // namespace bufferpool
diff --git a/media/bufferpool/2.0/BufferStatus.h b/media/bufferpool/2.0/BufferStatus.h
index 777a320..fa65838 100644
--- a/media/bufferpool/2.0/BufferStatus.h
+++ b/media/bufferpool/2.0/BufferStatus.h
@@ -37,9 +37,13 @@
/** Returns monotonic timestamp in Us since fixed point in time. */
int64_t getTimestampNow();
+bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId);
+
+bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId);
+
/**
- * A collection of FMQ for a buffer pool. buffer ownership/status change
- * messages are sent via the FMQs from the clients.
+ * A collection of buffer status message FMQ for a buffer pool. buffer
+ * ownership/status change messages are sent via the FMQs from the clients.
*/
class BufferStatusObserver {
private:
@@ -47,7 +51,8 @@
mBufferStatusQueues;
public:
- /** Creates an FMQ for the specified connection(client).
+ /** Creates a buffer status message FMQ for the specified
+ * connection(client).
*
* @param connectionId connection Id of the specified client.
* @param fmqDescPtr double ptr of created FMQ's descriptor.
@@ -58,7 +63,8 @@
*/
ResultStatus open(ConnectionId id, const StatusDescriptor** fmqDescPtr);
- /** Closes an FMQ for the specified connection(client).
+ /** Closes a buffer status message FMQ for the specified
+ * connection(client).
*
* @param connectionId connection Id of the specified client.
*
@@ -75,8 +81,8 @@
};
/**
- * An FMQ for a buffer pool client. Buffer ownership/status change messages
- * are sent via the fmq to the buffer pool.
+ * A buffer status message FMQ for a buffer pool client. Buffer ownership/status
+ * change messages are sent via the fmq to the buffer pool.
*/
class BufferStatusChannel {
private:
@@ -85,7 +91,8 @@
public:
/**
- * Connects to an FMQ from a descriptor of the created FMQ.
+ * Connects to a buffer status message FMQ from a descriptor of
+ * the created FMQ.
*
* @param fmqDesc Descriptor of the created FMQ.
*/
@@ -131,6 +138,86 @@
ConnectionId connectionId,
ConnectionId targetId,
std::list<BufferId> &pending, std::list<BufferId> &posted);
+
+ /**
+ * Posts a buffer invaliadation messge to the buffer pool.
+ *
+ * @param connectionId connection Id of the client.
+ * @param invalidateId invalidation ack to the buffer pool.
+ * if invalidation id is zero, the ack will not be
+ * posted.
+ * @param invalidated sets {@code true} only when the invalidation ack is
+ * posted.
+ */
+ void postBufferInvalidateAck(
+ ConnectionId connectionId,
+ uint32_t invalidateId,
+ bool *invalidated);
+};
+
+/**
+ * A buffer invalidation FMQ for a buffer pool client. Buffer invalidation
+ * messages are received via the fmq from the buffer pool. Buffer invalidation
+ * messages are handled as soon as possible.
+ */
+class BufferInvalidationListener {
+private:
+ bool mValid;
+ std::unique_ptr<BufferInvalidationQueue> mBufferInvalidationQueue;
+
+public:
+ /**
+ * Connects to a buffer invalidation FMQ from a descriptor of the created FMQ.
+ *
+ * @param fmqDesc Descriptor of the created FMQ.
+ */
+ BufferInvalidationListener(const InvalidationDescriptor &fmqDesc);
+
+ /** Retrieves all pending buffer invalidation messages from the buffer pool.
+ *
+ * @param messages retrieved pending messages.
+ */
+ void getInvalidations(std::vector<BufferInvalidationMessage> &messages);
+
+ /** Returns whether the FMQ is connected succesfully. */
+ bool isValid();
+};
+
+/**
+ * A buffer invalidation FMQ for a buffer pool. A buffer pool will send buffer
+ * invalidation messages to the clients via the FMQ. The FMQ is shared among
+ * buffer pool clients.
+ */
+class BufferInvalidationChannel {
+private:
+ bool mValid;
+ std::unique_ptr<BufferInvalidationQueue> mBufferInvalidationQueue;
+
+public:
+ /**
+ * Creates a buffer invalidation FMQ for a buffer pool.
+ */
+ BufferInvalidationChannel();
+
+ /** Returns whether the FMQ is connected succesfully. */
+ bool isValid();
+
+ /**
+ * Retrieves the descriptor of a buffer invalidation FMQ. the descriptor may
+ * be passed to the client for buffer invalidation handling.
+ *
+ * @param fmqDescPtr double ptr of created FMQ's descriptor.
+ */
+ void getDesc(const InvalidationDescriptor **fmqDescPtr);
+
+ /** Posts a buffer invalidation for invalidated buffers.
+ *
+ * @param msgId Invalidation message id which is used when clients send
+ * acks back via BufferStatusMessage
+ * @param fromId The start bufferid of the invalidated buffers(inclusive)
+ * @param toId The end bufferId of the invalidated buffers(inclusive)
+ */
+ void postInvalidation(uint32_t msgId, BufferId fromId, BufferId toId);
};
} // namespace implementation
diff --git a/media/bufferpool/2.0/ClientManager.cpp b/media/bufferpool/2.0/ClientManager.cpp
index eeaf093..f8531d8 100644
--- a/media/bufferpool/2.0/ClientManager.cpp
+++ b/media/bufferpool/2.0/ClientManager.cpp
@@ -23,6 +23,7 @@
#include <unistd.h>
#include <utils/Log.h>
#include "BufferPoolClient.h"
+#include "Observer.h"
namespace android {
namespace hardware {
@@ -106,6 +107,8 @@
ResultStatus close(ConnectionId connectionId);
+ ResultStatus flush(ConnectionId connectionId);
+
ResultStatus allocate(ConnectionId connectionId,
const std::vector<uint8_t> ¶ms,
native_handle_t **handle,
@@ -153,10 +156,13 @@
mClients;
} mActive;
+ sp<Observer> mObserver;
+
ClientManagerCookieHolder mRemoteClientCookies;
};
-ClientManager::Impl::Impl() {}
+ClientManager::Impl::Impl()
+ : mObserver(new Observer()) {}
ResultStatus ClientManager::Impl::registerSender(
const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
@@ -185,7 +191,7 @@
lock.unlock();
ResultStatus result = ResultStatus::OK;
const std::shared_ptr<BufferPoolClient> client =
- std::make_shared<BufferPoolClient>(accessor);
+ std::make_shared<BufferPoolClient>(accessor, mObserver);
lock.lock();
if (!client) {
result = ResultStatus::NO_MEMORY;
@@ -197,6 +203,7 @@
const std::weak_ptr<BufferPoolClient> wclient = client;
mCache.mClients.push_back(std::make_pair(accessor, wclient));
ConnectionId conId = client->getConnectionId();
+ mObserver->addClient(conId, wclient);
{
std::lock_guard<std::mutex> lock(mActive.mMutex);
mActive.mClients.insert(std::make_pair(conId, client));
@@ -266,8 +273,9 @@
if (!accessor || !accessor->isValid()) {
return ResultStatus::CRITICAL_ERROR;
}
+ // TODO: observer is local. use direct call instead of hidl call.
std::shared_ptr<BufferPoolClient> client =
- std::make_shared<BufferPoolClient>(accessor);
+ std::make_shared<BufferPoolClient>(accessor, mObserver);
if (!client || !client->isValid()) {
return ResultStatus::CRITICAL_ERROR;
}
@@ -280,6 +288,7 @@
const std::weak_ptr<BufferPoolClient> wclient = client;
mCache.mClients.push_back(std::make_pair(accessor, wclient));
ConnectionId conId = client->getConnectionId();
+ mObserver->addClient(conId, wclient);
{
std::lock_guard<std::mutex> lock(mActive.mMutex);
mActive.mClients.insert(std::make_pair(conId, client));
@@ -291,12 +300,13 @@
}
ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
- std::lock_guard<std::mutex> lock1(mCache.mMutex);
- std::lock_guard<std::mutex> lock2(mActive.mMutex);
+ std::unique_lock<std::mutex> lock1(mCache.mMutex);
+ std::unique_lock<std::mutex> lock2(mActive.mMutex);
auto it = mActive.mClients.find(connectionId);
if (it != mActive.mClients.end()) {
sp<IAccessor> accessor;
it->second->getAccessor(&accessor);
+ std::shared_ptr<BufferPoolClient> closing = it->second;
mActive.mClients.erase(connectionId);
for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
// clean up dead client caches
@@ -307,11 +317,27 @@
cit++;
}
}
+ lock2.unlock();
+ lock1.unlock();
+ closing->flush();
return ResultStatus::OK;
}
return ResultStatus::NOT_FOUND;
}
+ResultStatus ClientManager::Impl::flush(ConnectionId connectionId) {
+ std::shared_ptr<BufferPoolClient> client;
+ {
+ std::lock_guard<std::mutex> lock(mActive.mMutex);
+ auto it = mActive.mClients.find(connectionId);
+ if (it == mActive.mClients.end()) {
+ return ResultStatus::NOT_FOUND;
+ }
+ client = it->second;
+ }
+ return client->flush();
+}
+
ResultStatus ClientManager::Impl::allocate(
ConnectionId connectionId, const std::vector<uint8_t> ¶ms,
native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
@@ -461,6 +487,13 @@
return ResultStatus::CRITICAL_ERROR;
}
+ResultStatus ClientManager::flush(ConnectionId connectionId) {
+ if (mImpl) {
+ return mImpl->flush(connectionId);
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
ResultStatus ClientManager::allocate(
ConnectionId connectionId, const std::vector<uint8_t> ¶ms,
native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
diff --git a/media/bufferpool/2.0/Connection.cpp b/media/bufferpool/2.0/Connection.cpp
index cd837a1..6bd0e79 100644
--- a/media/bufferpool/2.0/Connection.cpp
+++ b/media/bufferpool/2.0/Connection.cpp
@@ -60,6 +60,13 @@
}
}
+ResultStatus Connection::flush() {
+ if (mInitialized && mAccessor) {
+ return mAccessor->flush();
+ }
+ return ResultStatus::CRITICAL_ERROR;
+}
+
ResultStatus Connection::allocate(
const std::vector<uint8_t> ¶ms, BufferId *bufferId,
const native_handle_t **handle) {
diff --git a/media/bufferpool/2.0/Connection.h b/media/bufferpool/2.0/Connection.h
index e2b47f1..8507749 100644
--- a/media/bufferpool/2.0/Connection.h
+++ b/media/bufferpool/2.0/Connection.h
@@ -44,6 +44,11 @@
Return<void> fetch(uint64_t transactionId, uint32_t bufferId, fetch_cb _hidl_cb) override;
/**
+ * Invalidates all buffers which are active and/or are ready to be recycled.
+ */
+ ResultStatus flush();
+
+ /**
* Allocates a buffer using the specified parameters. Recycles a buffer if
* it is possible. The returned buffer can be transferred to other remote
* clients(Connection).
diff --git a/media/bufferpool/2.0/Observer.cpp b/media/bufferpool/2.0/Observer.cpp
new file mode 100644
index 0000000..5b23160
--- /dev/null
+++ b/media/bufferpool/2.0/Observer.cpp
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Observer.h"
+
+namespace android {
+namespace hardware {
+namespace media {
+namespace bufferpool {
+namespace V2_0 {
+namespace implementation {
+
+Observer::Observer() {
+}
+
+Observer::~Observer() {
+}
+
+// Methods from ::android::hardware::media::bufferpool::V2_0::IObserver follow.
+Return<void> Observer::onMessage(int64_t connectionId, uint32_t msgId) {
+ std::unique_lock<std::mutex> lock(mLock);
+ auto it = mClients.find(connectionId);
+ if (it != mClients.end()) {
+ const std::shared_ptr<BufferPoolClient> client = it->second.lock();
+ if (!client) {
+ mClients.erase(it);
+ } else {
+ lock.unlock();
+ client->receiveInvalidation(msgId);
+ }
+ }
+ return Void();
+}
+
+void Observer::addClient(ConnectionId connectionId,
+ const std::weak_ptr<BufferPoolClient> &wclient) {
+ std::lock_guard<std::mutex> lock(mLock);
+ for (auto it = mClients.begin(); it != mClients.end();) {
+ if (!it->second.lock() || it->first == connectionId) {
+ it = mClients.erase(it);
+ } else {
+ ++it;
+ }
+ }
+ mClients.insert(std::make_pair(connectionId, wclient));
+
+}
+
+void Observer::delClient(ConnectionId connectionId) {
+ std::lock_guard<std::mutex> lock(mLock);
+ mClients.erase(connectionId);
+}
+
+
+} // namespace implementation
+} // namespace V2_0
+} // namespace bufferpool
+} // namespace media
+} // namespace hardware
+} // namespace android
diff --git a/media/bufferpool/2.0/Observer.h b/media/bufferpool/2.0/Observer.h
new file mode 100644
index 0000000..42bd7c1
--- /dev/null
+++ b/media/bufferpool/2.0/Observer.h
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H
+#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H
+
+#include <android/hardware/media/bufferpool/2.0/IObserver.h>
+#include <hidl/MQDescriptor.h>
+#include <hidl/Status.h>
+#include "BufferPoolClient.h"
+
+namespace android {
+namespace hardware {
+namespace media {
+namespace bufferpool {
+namespace V2_0 {
+namespace implementation {
+
+using ::android::hardware::hidl_array;
+using ::android::hardware::hidl_memory;
+using ::android::hardware::hidl_string;
+using ::android::hardware::hidl_vec;
+using ::android::hardware::Return;
+using ::android::hardware::Void;
+using ::android::sp;
+
+struct Observer : public IObserver {
+ // Methods from ::android::hardware::media::bufferpool::V2_0::IObserver follow.
+ Return<void> onMessage(int64_t connectionId, uint32_t msgId) override;
+
+ ~Observer();
+
+ void addClient(ConnectionId connectionId,
+ const std::weak_ptr<BufferPoolClient> &wclient);
+
+ void delClient(ConnectionId connectionId);
+
+private:
+ Observer();
+
+ friend struct ClientManager;
+
+ std::mutex mLock;
+ std::map<ConnectionId, const std::weak_ptr<BufferPoolClient>> mClients;
+};
+
+} // namespace implementation
+} // namespace V2_0
+} // namespace bufferpool
+} // namespace media
+} // namespace hardware
+} // namespace android
+
+#endif // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H
diff --git a/media/bufferpool/2.0/include/bufferpool/ClientManager.h b/media/bufferpool/2.0/include/bufferpool/ClientManager.h
index cfc3bc3..953c304 100644
--- a/media/bufferpool/2.0/include/bufferpool/ClientManager.h
+++ b/media/bufferpool/2.0/include/bufferpool/ClientManager.h
@@ -92,6 +92,18 @@
ResultStatus close(ConnectionId connectionId);
/**
+ * Evicts cached allocations. If it's local connection, release the
+ * previous allocations and do not recycle current active allocations.
+ *
+ * @param connectionId The id of the connection.
+ *
+ * @return OK when the connection is resetted.
+ * NOT_FOUND when the specified connection was not found.
+ * CRITICAL_ERROR otherwise.
+ */
+ ResultStatus flush(ConnectionId connectionId);
+
+ /**
* Allocates a buffer from the specified connection.
*
* @param connectionId The id of the connection.