bufferpool2.0: Implement buffer invalidation

Change-Id: If7a4a38004f50b4d43a2fae4781f541fe322c249
diff --git a/media/bufferpool/2.0/AccessorImpl.cpp b/media/bufferpool/2.0/AccessorImpl.cpp
index 0ba6600..4cc8abc 100644
--- a/media/bufferpool/2.0/AccessorImpl.cpp
+++ b/media/bufferpool/2.0/AccessorImpl.cpp
@@ -21,6 +21,7 @@
 #include <time.h>
 #include <unistd.h>
 #include <utils/Log.h>
+#include <thread>
 #include "AccessorImpl.h"
 #include "Connection.h"
 
@@ -47,6 +48,7 @@
     const std::shared_ptr<BufferPoolAllocation> mAllocation;
     const size_t mAllocSize;
     const std::vector<uint8_t> mConfig;
+    bool mInvalidated;
 
     InternalBuffer(
             BufferId id,
@@ -54,11 +56,16 @@
             const size_t allocSize,
             const std::vector<uint8_t> &allocConfig)
             : mId(id), mOwnerCount(0), mTransactionCount(0),
-            mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
+            mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
+            mInvalidated(false) {}
 
     const native_handle_t *handle() {
         return mAllocation->handle();
     }
+
+    void invalidate() {
+        mInvalidated = true;
+    }
 };
 
 struct TransactionStatus {
@@ -138,21 +145,29 @@
 }
 
 ResultStatus Accessor::Impl::connect(
-        const sp<Accessor> &accessor, sp<Connection> *connection,
-        ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr) {
+        const sp<Accessor> &accessor, const sp<IObserver> &observer,
+        sp<Connection> *connection,
+        ConnectionId *pConnectionId,
+        uint32_t *pMsgId,
+        const StatusDescriptor** statusDescPtr,
+        const InvalidationDescriptor** invDescPtr) {
     sp<Connection> newConnection = new Connection();
     ResultStatus status = ResultStatus::CRITICAL_ERROR;
     {
         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
         if (newConnection) {
             ConnectionId id = (int64_t)sPid << 32 | sSeqId;
-            status = mBufferPool.mObserver.open(id, fmqDescPtr);
+            status = mBufferPool.mObserver.open(id, statusDescPtr);
             if (status == ResultStatus::OK) {
                 newConnection->initialize(accessor, id);
                 *connection = newConnection;
                 *pConnectionId = id;
+                *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
+                mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
+                mBufferPool.mInvalidation.onConnect(id, observer);
                 ++sSeqId;
             }
+
         }
         mBufferPool.processStatusMessages();
         mBufferPool.cleanUp();
@@ -165,6 +180,7 @@
     mBufferPool.processStatusMessages();
     mBufferPool.handleClose(connectionId);
     mBufferPool.mObserver.close(connectionId);
+    mBufferPool.mInvalidation.onClose(connectionId);
     // Since close# will be called after all works are finished, it is OK to
     // evict unused buffers.
     mBufferPool.cleanUp(true);
@@ -229,11 +245,30 @@
     mBufferPool.cleanUp(clearCache);
 }
 
+void Accessor::Impl::flush() {
+    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+    mBufferPool.processStatusMessages();
+    mBufferPool.flush(shared_from_this());
+}
+
+void Accessor::Impl::handleInvalidateAck() {
+    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+    mBufferPool.processStatusMessages();
+    mBufferPool.mInvalidation.onHandleAck();
+}
+
+bool Accessor::Impl::isValid() {
+    return mBufferPool.isValid();
+}
+
 Accessor::Impl::Impl::BufferPool::BufferPool()
     : mTimestampUs(getTimestampNow()),
       mLastCleanUpUs(mTimestampUs),
       mLastLogUs(mTimestampUs),
-      mSeq(0) {}
+      mSeq(0),
+      mStartSeq(0) {
+    mValid = mInvalidationChannel.isValid();
+}
 
 
 // Statistics helper
@@ -242,6 +277,8 @@
     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
 }
 
+std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sSeqId(0);
+
 Accessor::Impl::Impl::BufferPool::~BufferPool() {
     std::lock_guard<std::mutex> lock(mMutex);
     ALOGD("Destruction - bufferpool %p "
@@ -255,6 +292,96 @@
           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
 }
 
+void Accessor::Impl::BufferPool::Invalidation::onConnect(
+        ConnectionId conId, const sp<IObserver>& observer) {
+    mAcks[conId] = mInvalidationId; // starts from current invalidationId
+    mObservers.insert(std::make_pair(conId, observer));
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
+    mAcks.erase(conId);
+    mObservers.erase(conId);
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onAck(
+        ConnectionId conId,
+        uint32_t msgId) {
+    auto it = mAcks.find(conId);
+    if (it == mAcks.end() || isMessageLater(msgId, it->second)) {
+        mAcks[conId] = msgId;
+    }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
+        BufferId bufferId,
+        BufferInvalidationChannel &channel) {
+    for (auto it = mPendings.begin(); it != mPendings.end();) {
+        if (it->invalidate(bufferId)) {
+            it = mPendings.erase(it);
+            uint32_t msgId = 0;
+            if (it->mNeedsAck) {
+                msgId = ++mInvalidationId;
+                if (msgId == 0) {
+                    // wrap happens
+                    msgId = ++mInvalidationId;
+                }
+            }
+            channel.postInvalidation(msgId, it->mFrom, it->mTo);
+            sInvalidator.addAccessor(mId, it->mImpl);
+            continue;
+        }
+        ++it;
+    }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
+        bool needsAck,
+        uint32_t from,
+        uint32_t to,
+        size_t left,
+        BufferInvalidationChannel &channel,
+        const std::shared_ptr<Accessor::Impl> &impl) {
+    if (left == 0) {
+        uint32_t msgId = 0;
+        if (needsAck) {
+            msgId = ++mInvalidationId;
+            if (msgId == 0) {
+                // wrap happens
+                msgId = ++mInvalidationId;
+            }
+        }
+        channel.postInvalidation(msgId, from, to);
+        sInvalidator.addAccessor(mId, impl);
+    } else {
+        // TODO: sending hint message?
+        Pending pending(needsAck, from, to, left, impl);
+        mPendings.push_back(pending);
+    }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onHandleAck() {
+    if (mInvalidationId != 0) {
+        std::set<int> deads;
+        for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
+            if (it->second != mInvalidationId) {
+                const sp<IObserver> observer = mObservers[it->first].promote();
+                if (observer) {
+                    observer->onMessage(it->first, mInvalidationId);
+                } else {
+                    deads.insert(it->first);
+                }
+            }
+        }
+        if (deads.size() > 0) {
+            for (auto it = deads.begin(); it != deads.end(); ++it) {
+                onClose(*it);
+            }
+        }
+    }
+    // All invalidation Ids are synced.
+    sInvalidator.delAccessor(mId);
+}
+
 bool Accessor::Impl::BufferPool::handleOwnBuffer(
         ConnectionId connectionId, BufferId bufferId) {
 
@@ -275,8 +402,15 @@
         iter->second->mOwnerCount--;
         if (iter->second->mOwnerCount == 0 &&
                 iter->second->mTransactionCount == 0) {
-            mStats.onBufferUnused(iter->second->mAllocSize);
-            mFreeBuffers.insert(bufferId);
+            if (!iter->second->mInvalidated) {
+                mStats.onBufferUnused(iter->second->mAllocSize);
+                mFreeBuffers.insert(bufferId);
+            } else {
+                mStats.onBufferUnused(iter->second->mAllocSize);
+                mStats.onBufferEvicted(iter->second->mAllocSize);
+                mBuffers.erase(iter);
+                mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+            }
         }
     }
     erase(&mUsingConnections, bufferId, connectionId);
@@ -352,8 +486,15 @@
             bufferIter->second->mTransactionCount--;
             if (bufferIter->second->mOwnerCount == 0
                 && bufferIter->second->mTransactionCount == 0) {
-                mStats.onBufferUnused(bufferIter->second->mAllocSize);
-                mFreeBuffers.insert(message.bufferId);
+                if (!bufferIter->second->mInvalidated) {
+                    mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                    mFreeBuffers.insert(message.bufferId);
+                } else {
+                    mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                    mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+                    mBuffers.erase(bufferIter);
+                    mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
+                }
             }
             mTransactions.erase(found);
         }
@@ -400,7 +541,7 @@
                 ret = handleTransferResult(message);
                 break;
             case BufferStatus::INVALIDATION_ACK:
-                // TODO
+                mInvalidation.onAck(message.connectionId, message.bufferId);
                 break;
         }
         if (ret == false) {
@@ -423,8 +564,15 @@
                 if (bufferIter->second->mOwnerCount == 0 &&
                         bufferIter->second->mTransactionCount == 0) {
                     // TODO: handle freebuffer insert fail
-                    mStats.onBufferUnused(bufferIter->second->mAllocSize);
-                    mFreeBuffers.insert(bufferId);
+                    if (!bufferIter->second->mInvalidated) {
+                        mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                        mFreeBuffers.insert(bufferId);
+                    } else {
+                        mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                        mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+                        mBuffers.erase(bufferIter);
+                        mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+                    }
                 }
             }
         }
@@ -446,8 +594,15 @@
                 if (bufferIter->second->mOwnerCount == 0 &&
                     bufferIter->second->mTransactionCount == 0) {
                     // TODO: handle freebuffer insert fail
-                    mStats.onBufferUnused(bufferIter->second->mAllocSize);
-                    mFreeBuffers.insert(bufferId);
+                    if (!bufferIter->second->mInvalidated) {
+                        mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                        mFreeBuffers.insert(bufferId);
+                    } else {
+                        mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                        mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+                        mBuffers.erase(bufferIter);
+                        mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+                    }
                 }
                 mTransactions.erase(iter);
             }
@@ -538,6 +693,121 @@
     }
 }
 
+void Accessor::Impl::BufferPool::invalidate(
+        bool needsAck, BufferId from, BufferId to,
+        const std::shared_ptr<Accessor::Impl> &impl) {
+    for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
+        if (isBufferInRange(from, to, *freeIt)) {
+            auto it = mBuffers.find(*freeIt);
+            if (it != mBuffers.end() &&
+                it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
+                mStats.onBufferEvicted(it->second->mAllocSize);
+                mBuffers.erase(it);
+                freeIt = mFreeBuffers.erase(freeIt);
+                continue;
+            } else {
+                ALOGW("bufferpool inconsistent!");
+            }
+        }
+        ++freeIt;
+    }
+
+    size_t left = 0;
+    for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
+        if (isBufferInRange(from, to, it->first)) {
+            it->second->invalidate();
+            ++left;
+        }
+    }
+    mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
+}
+
+void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
+    BufferId from = mStartSeq;
+    BufferId to = mSeq;
+    mStartSeq = mSeq;
+    // TODO: needsAck params 
+    if (from != to) {
+        invalidate(true, from, to, impl);
+    }
+}
+
+void Accessor::Impl::invalidatorThread(
+            std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
+            std::mutex &mutex,
+            std::condition_variable &cv,
+            bool &ready) {
+    while(true) {
+        std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            if (!ready) {
+                cv.wait(lock);
+            }
+            copied.insert(accessors.begin(), accessors.end());
+        }
+        std::list<ConnectionId> erased;
+        for (auto it = copied.begin(); it != copied.end(); ++it) {
+            const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
+            if (!impl) {
+                erased.push_back(it->first);
+            } else {
+                impl->handleInvalidateAck();
+            }
+        }
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            for (auto it = erased.begin(); it != erased.end(); ++it) {
+                accessors.erase(*it);
+            }
+            if (accessors.size() == 0) {
+                ready = false;
+            } else {
+                // prevent draining cpu.
+                lock.unlock();
+                std::this_thread::yield();
+            }
+        }
+    }
+}
+
+Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
+    std::thread invalidator(
+            invalidatorThread,
+            std::ref(mAccessors),
+            std::ref(mMutex),
+            std::ref(mCv),
+            std::ref(mReady));
+    invalidator.detach();
+}
+
+void Accessor::Impl::AccessorInvalidator::addAccessor(
+        uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
+    bool notify = false;
+    std::unique_lock<std::mutex> lock(mMutex);
+    if (mAccessors.find(accessorId) == mAccessors.end()) {
+        if (!mReady) {
+            mReady = true;
+            notify = true;
+        }
+        mAccessors.insert(std::make_pair(accessorId, impl));
+    }
+    lock.unlock();
+    if (notify) {
+        mCv.notify_one();
+    }
+}
+
+void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
+    std::lock_guard<std::mutex> lock(mMutex);
+    mAccessors.erase(accessorId);
+    if (mAccessors.size() == 0) {
+        mReady = false;
+    }
+}
+
+Accessor::Impl::AccessorInvalidator Accessor::Impl::sInvalidator;
+
 }  // namespace implementation
 }  // namespace V2_0
 }  // namespace bufferpool