bufferpool2.0: Implement buffer invalidation

Change-Id: If7a4a38004f50b4d43a2fae4781f541fe322c249
diff --git a/media/bufferpool/2.0/Accessor.cpp b/media/bufferpool/2.0/Accessor.cpp
index 3eaea7c..f264501 100644
--- a/media/bufferpool/2.0/Accessor.cpp
+++ b/media/bufferpool/2.0/Accessor.cpp
@@ -117,19 +117,18 @@
 Return<void> Accessor::connect(
         const sp<::android::hardware::media::bufferpool::V2_0::IObserver>& observer,
         connect_cb _hidl_cb) {
-    (void)observer;
     sp<Connection> connection;
     ConnectionId connectionId;
+    uint32_t msgId;
     const StatusDescriptor* fmqDesc;
+    const InvalidationDescriptor* invDesc;
 
-    ResultStatus status = connect(&connection, &connectionId, &fmqDesc, false);
+    ResultStatus status = connect(
+            observer, false, &connection, &connectionId, &msgId, &fmqDesc, &invDesc);
     if (status == ResultStatus::OK) {
-        _hidl_cb(status, connection, connectionId, *fmqDesc,
-                 android::hardware::MQDescriptorUnsync<BufferInvalidationMessage>(
-                         std::vector<android::hardware::GrantorDescriptor>(),
-                         nullptr /* nhandle */, 0 /* size */));
+        _hidl_cb(status, connection, connectionId, msgId, *fmqDesc, *invDesc);
     } else {
-        _hidl_cb(status, nullptr, -1LL,
+        _hidl_cb(status, nullptr, -1LL, 0,
                  android::hardware::MQDescriptorSync<BufferStatusMessage>(
                          std::vector<android::hardware::GrantorDescriptor>(),
                          nullptr /* nhandle */, 0 /* size */),
@@ -147,7 +146,15 @@
 }
 
 bool Accessor::isValid() {
-    return (bool)mImpl;
+    return (bool)mImpl && mImpl->isValid();
+}
+
+ResultStatus Accessor::flush() {
+    if (mImpl) {
+        mImpl->flush();
+        return ResultStatus::OK;
+    }
+    return ResultStatus::CRITICAL_ERROR;
 }
 
 ResultStatus Accessor::allocate(
@@ -170,10 +177,15 @@
 }
 
 ResultStatus Accessor::connect(
+        const sp<IObserver> &observer, bool local,
         sp<Connection> *connection, ConnectionId *pConnectionId,
-        const StatusDescriptor** fmqDescPtr, bool local) {
+        uint32_t *pMsgId,
+        const StatusDescriptor** statusDescPtr,
+        const InvalidationDescriptor** invDescPtr) {
     if (mImpl) {
-        ResultStatus status = mImpl->connect(this, connection, pConnectionId, fmqDescPtr);
+        ResultStatus status = mImpl->connect(
+                this, observer, connection, pConnectionId, pMsgId,
+                statusDescPtr, invDescPtr);
         if (!local && status == ResultStatus::OK) {
             sp<Accessor> accessor(this);
             sConnectionDeathRecipient->add(*pConnectionId, accessor);
diff --git a/media/bufferpool/2.0/Accessor.h b/media/bufferpool/2.0/Accessor.h
index a718da1..4b5b17a 100644
--- a/media/bufferpool/2.0/Accessor.h
+++ b/media/bufferpool/2.0/Accessor.h
@@ -95,6 +95,9 @@
     /** Returns whether the accessor is valid. */
     bool isValid();
 
+    /** Invalidates all buffers which are owned by bufferpool */
+    ResultStatus flush();
+
     /** Allocates a buffer from a buffer pool.
      *
      * @param connectionId  the connection id of the client.
@@ -135,20 +138,28 @@
      * created connection in order to communicate with the buffer pool. An
      * FMQ for buffer status message is also created for the client.
      *
-     * @param connection    created connection
-     * @param pConnectionId the id of the created connection
-     * @param fmqDescPtr    FMQ descriptor for shared buffer status message
-     *                      queue between a buffer pool and the client.
+     * @param observer      client observer for buffer invalidation
      * @param local         true when a connection request comes from local process,
      *                      false otherwise.
+     * @param connection    created connection
+     * @param pConnectionId the id of the created connection
+     * @param pMsgId        the id of the recent buffer pool message
+     * @param statusDescPtr FMQ descriptor for shared buffer status message
+     *                      queue between a buffer pool and the client.
+     * @param invDescPtr    FMQ descriptor for buffer invalidation message
+     *                      queue from a buffer pool to the client.
      *
      * @return OK when a connection is successfully made.
      *         NO_MEMORY when there is no memory.
      *         CRITICAL_ERROR otherwise.
      */
     ResultStatus connect(
+            const sp<IObserver>& observer,
+            bool local,
             sp<Connection> *connection, ConnectionId *pConnectionId,
-            const StatusDescriptor** fmqDescPtr, bool local);
+            uint32_t *pMsgId,
+            const StatusDescriptor** statusDescPtr,
+            const InvalidationDescriptor** invDescPtr);
 
     /**
      * Closes the specified connection to the client.
@@ -176,7 +187,7 @@
 
 private:
     class Impl;
-    std::unique_ptr<Impl> mImpl;
+    std::shared_ptr<Impl> mImpl;
 };
 
 }  // namespace implementation
diff --git a/media/bufferpool/2.0/AccessorImpl.cpp b/media/bufferpool/2.0/AccessorImpl.cpp
index 0ba6600..4cc8abc 100644
--- a/media/bufferpool/2.0/AccessorImpl.cpp
+++ b/media/bufferpool/2.0/AccessorImpl.cpp
@@ -21,6 +21,7 @@
 #include <time.h>
 #include <unistd.h>
 #include <utils/Log.h>
+#include <thread>
 #include "AccessorImpl.h"
 #include "Connection.h"
 
@@ -47,6 +48,7 @@
     const std::shared_ptr<BufferPoolAllocation> mAllocation;
     const size_t mAllocSize;
     const std::vector<uint8_t> mConfig;
+    bool mInvalidated;
 
     InternalBuffer(
             BufferId id,
@@ -54,11 +56,16 @@
             const size_t allocSize,
             const std::vector<uint8_t> &allocConfig)
             : mId(id), mOwnerCount(0), mTransactionCount(0),
-            mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig) {}
+            mAllocation(alloc), mAllocSize(allocSize), mConfig(allocConfig),
+            mInvalidated(false) {}
 
     const native_handle_t *handle() {
         return mAllocation->handle();
     }
+
+    void invalidate() {
+        mInvalidated = true;
+    }
 };
 
 struct TransactionStatus {
@@ -138,21 +145,29 @@
 }
 
 ResultStatus Accessor::Impl::connect(
-        const sp<Accessor> &accessor, sp<Connection> *connection,
-        ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr) {
+        const sp<Accessor> &accessor, const sp<IObserver> &observer,
+        sp<Connection> *connection,
+        ConnectionId *pConnectionId,
+        uint32_t *pMsgId,
+        const StatusDescriptor** statusDescPtr,
+        const InvalidationDescriptor** invDescPtr) {
     sp<Connection> newConnection = new Connection();
     ResultStatus status = ResultStatus::CRITICAL_ERROR;
     {
         std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
         if (newConnection) {
             ConnectionId id = (int64_t)sPid << 32 | sSeqId;
-            status = mBufferPool.mObserver.open(id, fmqDescPtr);
+            status = mBufferPool.mObserver.open(id, statusDescPtr);
             if (status == ResultStatus::OK) {
                 newConnection->initialize(accessor, id);
                 *connection = newConnection;
                 *pConnectionId = id;
+                *pMsgId = mBufferPool.mInvalidation.mInvalidationId;
+                mBufferPool.mInvalidationChannel.getDesc(invDescPtr);
+                mBufferPool.mInvalidation.onConnect(id, observer);
                 ++sSeqId;
             }
+
         }
         mBufferPool.processStatusMessages();
         mBufferPool.cleanUp();
@@ -165,6 +180,7 @@
     mBufferPool.processStatusMessages();
     mBufferPool.handleClose(connectionId);
     mBufferPool.mObserver.close(connectionId);
+    mBufferPool.mInvalidation.onClose(connectionId);
     // Since close# will be called after all works are finished, it is OK to
     // evict unused buffers.
     mBufferPool.cleanUp(true);
@@ -229,11 +245,30 @@
     mBufferPool.cleanUp(clearCache);
 }
 
+void Accessor::Impl::flush() {
+    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+    mBufferPool.processStatusMessages();
+    mBufferPool.flush(shared_from_this());
+}
+
+void Accessor::Impl::handleInvalidateAck() {
+    std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+    mBufferPool.processStatusMessages();
+    mBufferPool.mInvalidation.onHandleAck();
+}
+
+bool Accessor::Impl::isValid() {
+    return mBufferPool.isValid();
+}
+
 Accessor::Impl::Impl::BufferPool::BufferPool()
     : mTimestampUs(getTimestampNow()),
       mLastCleanUpUs(mTimestampUs),
       mLastLogUs(mTimestampUs),
-      mSeq(0) {}
+      mSeq(0),
+      mStartSeq(0) {
+    mValid = mInvalidationChannel.isValid();
+}
 
 
 // Statistics helper
@@ -242,6 +277,8 @@
     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
 }
 
+std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sSeqId(0);
+
 Accessor::Impl::Impl::BufferPool::~BufferPool() {
     std::lock_guard<std::mutex> lock(mMutex);
     ALOGD("Destruction - bufferpool %p "
@@ -255,6 +292,96 @@
           percentage(mStats.mTotalTransfers - mStats.mTotalFetches, mStats.mTotalTransfers));
 }
 
+void Accessor::Impl::BufferPool::Invalidation::onConnect(
+        ConnectionId conId, const sp<IObserver>& observer) {
+    mAcks[conId] = mInvalidationId; // starts from current invalidationId
+    mObservers.insert(std::make_pair(conId, observer));
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onClose(ConnectionId conId) {
+    mAcks.erase(conId);
+    mObservers.erase(conId);
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onAck(
+        ConnectionId conId,
+        uint32_t msgId) {
+    auto it = mAcks.find(conId);
+    if (it == mAcks.end() || isMessageLater(msgId, it->second)) {
+        mAcks[conId] = msgId;
+    }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onBufferInvalidated(
+        BufferId bufferId,
+        BufferInvalidationChannel &channel) {
+    for (auto it = mPendings.begin(); it != mPendings.end();) {
+        if (it->invalidate(bufferId)) {
+            it = mPendings.erase(it);
+            uint32_t msgId = 0;
+            if (it->mNeedsAck) {
+                msgId = ++mInvalidationId;
+                if (msgId == 0) {
+                    // wrap happens
+                    msgId = ++mInvalidationId;
+                }
+            }
+            channel.postInvalidation(msgId, it->mFrom, it->mTo);
+            sInvalidator.addAccessor(mId, it->mImpl);
+            continue;
+        }
+        ++it;
+    }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onInvalidationRequest(
+        bool needsAck,
+        uint32_t from,
+        uint32_t to,
+        size_t left,
+        BufferInvalidationChannel &channel,
+        const std::shared_ptr<Accessor::Impl> &impl) {
+    if (left == 0) {
+        uint32_t msgId = 0;
+        if (needsAck) {
+            msgId = ++mInvalidationId;
+            if (msgId == 0) {
+                // wrap happens
+                msgId = ++mInvalidationId;
+            }
+        }
+        channel.postInvalidation(msgId, from, to);
+        sInvalidator.addAccessor(mId, impl);
+    } else {
+        // TODO: sending hint message?
+        Pending pending(needsAck, from, to, left, impl);
+        mPendings.push_back(pending);
+    }
+}
+
+void Accessor::Impl::BufferPool::Invalidation::onHandleAck() {
+    if (mInvalidationId != 0) {
+        std::set<int> deads;
+        for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
+            if (it->second != mInvalidationId) {
+                const sp<IObserver> observer = mObservers[it->first].promote();
+                if (observer) {
+                    observer->onMessage(it->first, mInvalidationId);
+                } else {
+                    deads.insert(it->first);
+                }
+            }
+        }
+        if (deads.size() > 0) {
+            for (auto it = deads.begin(); it != deads.end(); ++it) {
+                onClose(*it);
+            }
+        }
+    }
+    // All invalidation Ids are synced.
+    sInvalidator.delAccessor(mId);
+}
+
 bool Accessor::Impl::BufferPool::handleOwnBuffer(
         ConnectionId connectionId, BufferId bufferId) {
 
@@ -275,8 +402,15 @@
         iter->second->mOwnerCount--;
         if (iter->second->mOwnerCount == 0 &&
                 iter->second->mTransactionCount == 0) {
-            mStats.onBufferUnused(iter->second->mAllocSize);
-            mFreeBuffers.insert(bufferId);
+            if (!iter->second->mInvalidated) {
+                mStats.onBufferUnused(iter->second->mAllocSize);
+                mFreeBuffers.insert(bufferId);
+            } else {
+                mStats.onBufferUnused(iter->second->mAllocSize);
+                mStats.onBufferEvicted(iter->second->mAllocSize);
+                mBuffers.erase(iter);
+                mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+            }
         }
     }
     erase(&mUsingConnections, bufferId, connectionId);
@@ -352,8 +486,15 @@
             bufferIter->second->mTransactionCount--;
             if (bufferIter->second->mOwnerCount == 0
                 && bufferIter->second->mTransactionCount == 0) {
-                mStats.onBufferUnused(bufferIter->second->mAllocSize);
-                mFreeBuffers.insert(message.bufferId);
+                if (!bufferIter->second->mInvalidated) {
+                    mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                    mFreeBuffers.insert(message.bufferId);
+                } else {
+                    mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                    mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+                    mBuffers.erase(bufferIter);
+                    mInvalidation.onBufferInvalidated(message.bufferId, mInvalidationChannel);
+                }
             }
             mTransactions.erase(found);
         }
@@ -400,7 +541,7 @@
                 ret = handleTransferResult(message);
                 break;
             case BufferStatus::INVALIDATION_ACK:
-                // TODO
+                mInvalidation.onAck(message.connectionId, message.bufferId);
                 break;
         }
         if (ret == false) {
@@ -423,8 +564,15 @@
                 if (bufferIter->second->mOwnerCount == 0 &&
                         bufferIter->second->mTransactionCount == 0) {
                     // TODO: handle freebuffer insert fail
-                    mStats.onBufferUnused(bufferIter->second->mAllocSize);
-                    mFreeBuffers.insert(bufferId);
+                    if (!bufferIter->second->mInvalidated) {
+                        mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                        mFreeBuffers.insert(bufferId);
+                    } else {
+                        mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                        mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+                        mBuffers.erase(bufferIter);
+                        mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+                    }
                 }
             }
         }
@@ -446,8 +594,15 @@
                 if (bufferIter->second->mOwnerCount == 0 &&
                     bufferIter->second->mTransactionCount == 0) {
                     // TODO: handle freebuffer insert fail
-                    mStats.onBufferUnused(bufferIter->second->mAllocSize);
-                    mFreeBuffers.insert(bufferId);
+                    if (!bufferIter->second->mInvalidated) {
+                        mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                        mFreeBuffers.insert(bufferId);
+                    } else {
+                        mStats.onBufferUnused(bufferIter->second->mAllocSize);
+                        mStats.onBufferEvicted(bufferIter->second->mAllocSize);
+                        mBuffers.erase(bufferIter);
+                        mInvalidation.onBufferInvalidated(bufferId, mInvalidationChannel);
+                    }
                 }
                 mTransactions.erase(iter);
             }
@@ -538,6 +693,121 @@
     }
 }
 
+void Accessor::Impl::BufferPool::invalidate(
+        bool needsAck, BufferId from, BufferId to,
+        const std::shared_ptr<Accessor::Impl> &impl) {
+    for (auto freeIt = mFreeBuffers.begin(); freeIt != mFreeBuffers.end();) {
+        if (isBufferInRange(from, to, *freeIt)) {
+            auto it = mBuffers.find(*freeIt);
+            if (it != mBuffers.end() &&
+                it->second->mOwnerCount == 0 && it->second->mTransactionCount == 0) {
+                mStats.onBufferEvicted(it->second->mAllocSize);
+                mBuffers.erase(it);
+                freeIt = mFreeBuffers.erase(freeIt);
+                continue;
+            } else {
+                ALOGW("bufferpool inconsistent!");
+            }
+        }
+        ++freeIt;
+    }
+
+    size_t left = 0;
+    for (auto it = mBuffers.begin(); it != mBuffers.end(); ++it) {
+        if (isBufferInRange(from, to, it->first)) {
+            it->second->invalidate();
+            ++left;
+        }
+    }
+    mInvalidation.onInvalidationRequest(needsAck, from, to, left, mInvalidationChannel, impl);
+}
+
+void Accessor::Impl::BufferPool::flush(const std::shared_ptr<Accessor::Impl> &impl) {
+    BufferId from = mStartSeq;
+    BufferId to = mSeq;
+    mStartSeq = mSeq;
+    // TODO: needsAck params 
+    if (from != to) {
+        invalidate(true, from, to, impl);
+    }
+}
+
+void Accessor::Impl::invalidatorThread(
+            std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
+            std::mutex &mutex,
+            std::condition_variable &cv,
+            bool &ready) {
+    while(true) {
+        std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> copied;
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            if (!ready) {
+                cv.wait(lock);
+            }
+            copied.insert(accessors.begin(), accessors.end());
+        }
+        std::list<ConnectionId> erased;
+        for (auto it = copied.begin(); it != copied.end(); ++it) {
+            const std::shared_ptr<Accessor::Impl> impl = it->second.lock();
+            if (!impl) {
+                erased.push_back(it->first);
+            } else {
+                impl->handleInvalidateAck();
+            }
+        }
+        {
+            std::unique_lock<std::mutex> lock(mutex);
+            for (auto it = erased.begin(); it != erased.end(); ++it) {
+                accessors.erase(*it);
+            }
+            if (accessors.size() == 0) {
+                ready = false;
+            } else {
+                // prevent draining cpu.
+                lock.unlock();
+                std::this_thread::yield();
+            }
+        }
+    }
+}
+
+Accessor::Impl::AccessorInvalidator::AccessorInvalidator() : mReady(false) {
+    std::thread invalidator(
+            invalidatorThread,
+            std::ref(mAccessors),
+            std::ref(mMutex),
+            std::ref(mCv),
+            std::ref(mReady));
+    invalidator.detach();
+}
+
+void Accessor::Impl::AccessorInvalidator::addAccessor(
+        uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl) {
+    bool notify = false;
+    std::unique_lock<std::mutex> lock(mMutex);
+    if (mAccessors.find(accessorId) == mAccessors.end()) {
+        if (!mReady) {
+            mReady = true;
+            notify = true;
+        }
+        mAccessors.insert(std::make_pair(accessorId, impl));
+    }
+    lock.unlock();
+    if (notify) {
+        mCv.notify_one();
+    }
+}
+
+void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
+    std::lock_guard<std::mutex> lock(mMutex);
+    mAccessors.erase(accessorId);
+    if (mAccessors.size() == 0) {
+        mReady = false;
+    }
+}
+
+Accessor::Impl::AccessorInvalidator Accessor::Impl::sInvalidator;
+
 }  // namespace implementation
 }  // namespace V2_0
 }  // namespace bufferpool
diff --git a/media/bufferpool/2.0/AccessorImpl.h b/media/bufferpool/2.0/AccessorImpl.h
index 1d33880..6b03494 100644
--- a/media/bufferpool/2.0/AccessorImpl.h
+++ b/media/bufferpool/2.0/AccessorImpl.h
@@ -19,6 +19,7 @@
 
 #include <map>
 #include <set>
+#include <condition_variable>
 #include "Accessor.h"
 
 namespace android {
@@ -33,15 +34,20 @@
 
 /**
  * An implementation of a buffer pool accessor(or a buffer pool implementation.) */
-class Accessor::Impl {
+class Accessor::Impl 
+    : public std::enable_shared_from_this<Accessor::Impl> {
 public:
     Impl(const std::shared_ptr<BufferPoolAllocator> &allocator);
 
     ~Impl();
 
     ResultStatus connect(
-            const sp<Accessor> &accessor, sp<Connection> *connection,
-            ConnectionId *pConnectionId, const StatusDescriptor** fmqDescPtr);
+            const sp<Accessor> &accessor, const sp<IObserver> &observer,
+            sp<Connection> *connection,
+            ConnectionId *pConnectionId,
+            uint32_t *pMsgId,
+            const StatusDescriptor** statusDescPtr,
+            const InvalidationDescriptor** invDescPtr);
 
     ResultStatus close(ConnectionId connectionId);
 
@@ -55,8 +61,14 @@
                        BufferId bufferId,
                        const native_handle_t** handle);
 
+    void flush();
+
     void cleanUp(bool clearCache);
 
+    bool isValid();
+
+    void handleInvalidateAck();
+
 private:
     // ConnectionId = pid : (timestamp_created + seqId)
     // in order to guarantee uniqueness for each connection
@@ -78,7 +90,10 @@
         int64_t mLastCleanUpUs;
         int64_t mLastLogUs;
         BufferId mSeq;
+        BufferId mStartSeq;
+        bool mValid;
         BufferStatusObserver mObserver;
+        BufferInvalidationChannel mInvalidationChannel;
 
         std::map<ConnectionId, std::set<BufferId>> mUsingBuffers;
         std::map<BufferId, std::set<ConnectionId>> mUsingConnections;
@@ -95,6 +110,54 @@
         std::map<BufferId, std::unique_ptr<InternalBuffer>> mBuffers;
         std::set<BufferId> mFreeBuffers;
 
+        struct Invalidation {
+            static std::atomic<std::uint32_t> sSeqId;
+
+            struct Pending {
+                bool mNeedsAck;
+                uint32_t mFrom;
+                uint32_t mTo;
+                size_t mLeft;
+                const std::weak_ptr<Accessor::Impl> mImpl;
+                Pending(bool needsAck, uint32_t from, uint32_t to, size_t left,
+                        const std::shared_ptr<Accessor::Impl> &impl)
+                        : mNeedsAck(needsAck),
+                          mFrom(from),
+                          mTo(to),
+                          mLeft(left),
+                          mImpl(impl)
+                {}
+
+                bool invalidate(uint32_t bufferId) {
+                    return isBufferInRange(mFrom, mTo, bufferId) && --mLeft == 0;
+                }
+            };
+
+            std::list<Pending> mPendings;
+            std::map<ConnectionId, uint32_t> mAcks;
+            std::map<ConnectionId, const wp<IObserver>> mObservers;
+            uint32_t mInvalidationId;
+            uint32_t mId;
+
+            Invalidation() : mInvalidationId(0), mId(sSeqId.fetch_add(1)) {}
+
+            void onConnect(ConnectionId conId, const sp<IObserver> &observer);
+
+            void onClose(ConnectionId conId);
+
+            void onAck(ConnectionId conId, uint32_t msgId);
+
+            void onBufferInvalidated(
+                    BufferId bufferId,
+                    BufferInvalidationChannel &channel);
+
+            void onInvalidationRequest(
+                    bool needsAck, uint32_t from, uint32_t to, size_t left,
+                    BufferInvalidationChannel &channel,
+                    const std::shared_ptr<Accessor::Impl> &impl);
+
+            void onHandleAck();
+        } mInvalidation;
         /// Buffer pool statistics which tracks allocation and transfer statistics.
         struct Stats {
             /// Total size of allocations which are used or available to use.
@@ -164,6 +227,13 @@
             }
         } mStats;
 
+        bool isValid() {
+            return mValid;
+        }
+
+        void invalidate(bool needsAck, BufferId from, BufferId to,
+                        const std::shared_ptr<Accessor::Impl> &impl);
+
     public:
         /** Creates a buffer pool. */
         BufferPool();
@@ -286,8 +356,33 @@
          */
         void cleanUp(bool clearCache = false);
 
+        /**
+         * Processes pending buffer status messages and invalidate all current
+         * free buffers. Active buffers are invalidated after being inactive.
+         */
+        void flush(const std::shared_ptr<Accessor::Impl> &impl);
+
         friend class Accessor::Impl;
     } mBufferPool;
+
+    struct  AccessorInvalidator {
+        std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> mAccessors;
+        std::mutex mMutex;
+        std::condition_variable mCv;
+        bool mReady;
+
+        AccessorInvalidator();
+        void addAccessor(uint32_t accessorId, const std::weak_ptr<Accessor::Impl> &impl);
+        void delAccessor(uint32_t accessorId);
+    };
+
+    static AccessorInvalidator sInvalidator;
+
+    static void invalidatorThread(
+        std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
+        std::mutex &mutex,
+        std::condition_variable &cv,
+        bool &ready);
 };
 
 }  // namespace implementation
diff --git a/media/bufferpool/2.0/Android.bp b/media/bufferpool/2.0/Android.bp
index 413125a..cd4e06e 100644
--- a/media/bufferpool/2.0/Android.bp
+++ b/media/bufferpool/2.0/Android.bp
@@ -8,6 +8,7 @@
         "BufferStatus.cpp",
         "ClientManager.cpp",
         "Connection.cpp",
+        "Observer.cpp",
     ],
     export_include_dirs: [
         "include",
diff --git a/media/bufferpool/2.0/BufferPoolClient.cpp b/media/bufferpool/2.0/BufferPoolClient.cpp
index 0f763f7..c80beff 100644
--- a/media/bufferpool/2.0/BufferPoolClient.cpp
+++ b/media/bufferpool/2.0/BufferPoolClient.cpp
@@ -36,9 +36,9 @@
 class BufferPoolClient::Impl
         : public std::enable_shared_from_this<BufferPoolClient::Impl> {
 public:
-    explicit Impl(const sp<Accessor> &accessor);
+    explicit Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer);
 
-    explicit Impl(const sp<IAccessor> &accessor);
+    explicit Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer);
 
     bool isValid() {
         return mValid;
@@ -58,6 +58,10 @@
 
     bool isActive(int64_t *lastTransactionUs, bool clearCache);
 
+    void receiveInvalidation(uint32_t msgID);
+
+    ResultStatus flush();
+
     ResultStatus allocate(const std::vector<uint8_t> &params,
                           native_handle_t **handle,
                           std::shared_ptr<BufferPoolData> *buffer);
@@ -83,10 +87,14 @@
 
     void trySyncFromRemote();
 
-    bool syncReleased();
+    bool syncReleased(uint32_t msgId = 0);
 
     void evictCaches(bool clearCache = false);
 
+    void invalidateBuffer(BufferId id);
+
+    void invalidateRange(BufferId from, BufferId to);
+
     ResultStatus allocateBufferHandle(
             const std::vector<uint8_t>& params, BufferId *bufferId,
             native_handle_t **handle);
@@ -106,6 +114,7 @@
     uint32_t mSeqId;
     ConnectionId mConnectionId;
     int64_t mLastEvictCacheUs;
+    std::unique_ptr<BufferInvalidationListener> mInvalidationListener;
 
     // CachedBuffers
     struct BufferCache {
@@ -130,12 +139,16 @@
     } mCache;
 
     // FMQ - release notifier
-    struct {
+    struct ReleaseCache {
         std::mutex mLock;
         // TODO: use only one list?(using one list may dealy sending messages?)
         std::list<BufferId> mReleasingIds;
         std::list<BufferId> mReleasedIds;
+        uint32_t mInvalidateId; // TODO: invalidation ACK to bufferpool
+        bool mInvalidateAck;
         std::unique_ptr<BufferStatusChannel> mStatusChannel;
+
+        ReleaseCache() : mInvalidateId(0), mInvalidateAck(true) {}
     } mReleasing;
 
     // This lock is held during synchronization from remote side.
@@ -162,7 +175,6 @@
 
 struct BufferPoolClient::Impl::ClientBuffer {
 private:
-    bool mInvalidated; // TODO: implement
     int64_t mExpireUs;
     bool mHasCache;
     ConnectionId mConnectionId;
@@ -177,9 +189,8 @@
 public:
     ClientBuffer(
             ConnectionId connectionId, BufferId id, native_handle_t *handle)
-            : mInvalidated(false), mHasCache(false),
-              mConnectionId(connectionId), mId(id), mHandle(handle) {
-        (void)mInvalidated;
+            : mHasCache(false), mConnectionId(connectionId),
+              mId(id), mHandle(handle) {
         mExpireUs = getTimestampNow() + kCacheTtlUs;
     }
 
@@ -190,6 +201,10 @@
         }
     }
 
+    BufferId id() const {
+        return mId;
+    }
+
     bool expire() const {
         int64_t now = getTimestampNow();
         return now >= mExpireUs;
@@ -244,41 +259,53 @@
     }
 };
 
-BufferPoolClient::Impl::Impl(const sp<Accessor> &accessor)
+BufferPoolClient::Impl::Impl(const sp<Accessor> &accessor, const sp<IObserver> &observer)
     : mLocal(true), mValid(false), mAccessor(accessor), mSeqId(0),
       mLastEvictCacheUs(getTimestampNow()) {
-    const StatusDescriptor *fmqDesc;
+    const StatusDescriptor *statusDesc;
+    const InvalidationDescriptor *invDesc;
     ResultStatus status = accessor->connect(
-            &mLocalConnection, &mConnectionId, &fmqDesc, true);
+            observer, true,
+            &mLocalConnection, &mConnectionId, &mReleasing.mInvalidateId,
+            &statusDesc, &invDesc);
     if (status == ResultStatus::OK) {
         mReleasing.mStatusChannel =
-                std::make_unique<BufferStatusChannel>(*fmqDesc);
+                std::make_unique<BufferStatusChannel>(*statusDesc);
+        mInvalidationListener =
+                std::make_unique<BufferInvalidationListener>(*invDesc);
         mValid = mReleasing.mStatusChannel &&
-                mReleasing.mStatusChannel->isValid();
+                mReleasing.mStatusChannel->isValid() &&
+                mInvalidationListener &&
+                mInvalidationListener->isValid();
     }
 }
 
-BufferPoolClient::Impl::Impl(const sp<IAccessor> &accessor)
+BufferPoolClient::Impl::Impl(const sp<IAccessor> &accessor, const sp<IObserver> &observer)
     : mLocal(false), mValid(false), mAccessor(accessor), mSeqId(0),
       mLastEvictCacheUs(getTimestampNow()) {
     bool valid = false;
-    sp<IObserver> observer; // TODO
     sp<IConnection>& outConnection = mRemoteConnection;
     ConnectionId& id = mConnectionId;
+    uint32_t& outMsgId = mReleasing.mInvalidateId;
     std::unique_ptr<BufferStatusChannel>& outChannel =
             mReleasing.mStatusChannel;
+    std::unique_ptr<BufferInvalidationListener>& outObserver =
+            mInvalidationListener;
     Return<void> transResult = accessor->connect(
             observer,
-            [&valid, &outConnection, &id, &outChannel]
+            [&valid, &outConnection, &id, &outMsgId, &outChannel, &outObserver]
             (ResultStatus status, sp<IConnection> connection,
-             ConnectionId connectionId, const StatusDescriptor& desc,
+             ConnectionId connectionId, uint32_t msgId,
+             const StatusDescriptor& statusDesc,
              const InvalidationDescriptor& invDesc) {
-                (void) invDesc;
                 if (status == ResultStatus::OK) {
                     outConnection = connection;
                     id = connectionId;
-                    outChannel = std::make_unique<BufferStatusChannel>(desc);
-                    if (outChannel && outChannel->isValid()) {
+                    outMsgId = msgId;
+                    outChannel = std::make_unique<BufferStatusChannel>(statusDesc);
+                    outObserver = std::make_unique<BufferInvalidationListener>(invDesc);
+                    if (outChannel && outChannel->isValid() &&
+                        outObserver && outObserver->isValid()) {
                         valid = true;
                     }
                 }
@@ -302,6 +329,24 @@
     return active;
 }
 
+void BufferPoolClient::Impl::receiveInvalidation(uint32_t messageId) {
+    std::lock_guard<std::mutex> lock(mCache.mLock);
+    syncReleased(messageId);
+    // TODO: evict cache required?
+}
+
+ResultStatus BufferPoolClient::Impl::flush() {
+    if (!mLocal || !mLocalConnection || !mValid) {
+        return ResultStatus::CRITICAL_ERROR;
+    }
+    {
+        std::unique_lock<std::mutex> lock(mCache.mLock);
+        syncReleased();
+        evictCaches();
+        return mLocalConnection->flush();
+    }
+}
+
 ResultStatus BufferPoolClient::Impl::allocate(
         const std::vector<uint8_t> &params,
         native_handle_t **pHandle,
@@ -455,6 +500,11 @@
 bool BufferPoolClient::Impl::postSend(
         BufferId bufferId, ConnectionId receiver,
         TransactionId *transactionId, int64_t *timestampUs) {
+    {
+        // TODO: don't need to call syncReleased every time
+        std::lock_guard<std::mutex> lock(mCache.mLock);
+        syncReleased();
+    }
     bool ret = false;
     bool needsSync = false;
     {
@@ -538,34 +588,74 @@
 }
 
 // should have mCache.mLock
-bool BufferPoolClient::Impl::syncReleased() {
-    std::lock_guard<std::mutex> lock(mReleasing.mLock);
-    if (mReleasing.mReleasingIds.size() > 0) {
-        mReleasing.mStatusChannel->postBufferRelease(
-                mConnectionId, mReleasing.mReleasingIds,
-                mReleasing.mReleasedIds);
-    }
-    if (mReleasing.mReleasedIds.size() > 0) {
-        for (BufferId& id: mReleasing.mReleasedIds) {
-            ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
-            auto found = mCache.mBuffers.find(id);
-            if (found != mCache.mBuffers.end()) {
-                if (found->second->onCacheRelease()) {
-                    mCache.decActive_l();
+bool BufferPoolClient::Impl::syncReleased(uint32_t messageId) {
+    bool cleared = false;
+    {
+        std::lock_guard<std::mutex> lock(mReleasing.mLock);
+        if (mReleasing.mReleasingIds.size() > 0) {
+            mReleasing.mStatusChannel->postBufferRelease(
+                    mConnectionId, mReleasing.mReleasingIds,
+                    mReleasing.mReleasedIds);
+        }
+        if (mReleasing.mReleasedIds.size() > 0) {
+            for (BufferId& id: mReleasing.mReleasedIds) {
+                ALOGV("client release buffer %lld - %u", (long long)mConnectionId, id);
+                auto found = mCache.mBuffers.find(id);
+                if (found != mCache.mBuffers.end()) {
+                    if (found->second->onCacheRelease()) {
+                        mCache.decActive_l();
+                    } else {
+                        // should not happen!
+                        ALOGW("client %lld cache release status inconsitent!",
+                            (long long)mConnectionId);
+                    }
                 } else {
                     // should not happen!
-                    ALOGW("client %lld cache release status inconsitent!",
-                          (long long)mConnectionId);
+                    ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId);
                 }
+            }
+            mReleasing.mReleasedIds.clear();
+            cleared = true;
+        }
+    }
+    std::vector<BufferInvalidationMessage> invalidations;
+    mInvalidationListener->getInvalidations(invalidations);
+    uint32_t lastMsgId = 0;
+    if (invalidations.size() > 0) {
+        for (auto it = invalidations.begin(); it != invalidations.end(); ++it) {
+            if (it->messageId != 0) {
+                lastMsgId = it->messageId;
+            }
+            if (it->fromBufferId == it->toBufferId) {
+                // TODO: handle fromBufferId = UINT32_MAX
+                invalidateBuffer(it->fromBufferId);
             } else {
-                // should not happen!
-                ALOGW("client %lld cache status inconsitent!", (long long)mConnectionId);
+                invalidateRange(it->fromBufferId, it->toBufferId);
             }
         }
-        mReleasing.mReleasedIds.clear();
-        return true;
     }
-    return false;
+    {
+        std::lock_guard<std::mutex> lock(mReleasing.mLock);
+        if (lastMsgId != 0) {
+            if (isMessageLater(lastMsgId, mReleasing.mInvalidateId)) {
+                mReleasing.mInvalidateId = lastMsgId;
+                mReleasing.mInvalidateAck = false;
+            }
+        } else if (messageId != 0) {
+            // messages are drained.
+            if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
+                mReleasing.mInvalidateId = lastMsgId;
+                mReleasing.mInvalidateAck = true;
+            }
+        }
+        if (!mReleasing.mInvalidateAck) {
+            // post ACK
+            mReleasing.mStatusChannel->postBufferInvalidateAck(
+                    mConnectionId,
+                    mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
+        }
+    }
+    return cleared;
 }
 
 // should have mCache.mLock
@@ -587,6 +677,49 @@
     }
 }
 
+// should have mCache.mLock
+void BufferPoolClient::Impl::invalidateBuffer(BufferId id) {
+    for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end(); ++it) {
+        if (id == it->second->id()) {
+            if (!it->second->hasCache()) {
+                mCache.mBuffers.erase(it);
+                ALOGV("cache invalidated %lld : buffer %u",
+                      (long long)mConnectionId, id);
+            } else {
+                ALOGW("Inconsitent invalidation %lld : activer buffer!! %u",
+                      (long long)mConnectionId, (unsigned int)id);
+            }
+            break;
+        }
+    }
+}
+
+// should have mCache.mLock
+void BufferPoolClient::Impl::invalidateRange(BufferId from, BufferId to) {
+    size_t invalidated = 0;
+    for (auto it = mCache.mBuffers.begin(); it != mCache.mBuffers.end();) {
+        if (!it->second->hasCache()) {
+            BufferId bid = it->second->id();
+            if (from < to) {
+                if (from <= bid && bid < to) {
+                    ++invalidated;
+                    it = mCache.mBuffers.erase(it);
+                    continue;
+                }
+            } else {
+                if (from <= bid || bid < to) {
+                    ++invalidated;
+                    it = mCache.mBuffers.erase(it);
+                    continue;
+                }
+            }
+        }
+        ++it;
+    }
+    ALOGV("cache invalidated %lld : # of invalidated %zu",
+          (long long)mConnectionId, invalidated);
+}
+
 ResultStatus BufferPoolClient::Impl::allocateBufferHandle(
         const std::vector<uint8_t>& params, BufferId *bufferId,
         native_handle_t** handle) {
@@ -629,12 +762,14 @@
 }
 
 
-BufferPoolClient::BufferPoolClient(const sp<Accessor> &accessor) {
-    mImpl = std::make_shared<Impl>(accessor);
+BufferPoolClient::BufferPoolClient(const sp<Accessor> &accessor,
+                                   const sp<IObserver> &observer) {
+    mImpl = std::make_shared<Impl>(accessor, observer);
 }
 
-BufferPoolClient::BufferPoolClient(const sp<IAccessor> &accessor) {
-    mImpl = std::make_shared<Impl>(accessor);
+BufferPoolClient::BufferPoolClient(const sp<IAccessor> &accessor,
+                                   const sp<IObserver> &observer) {
+    mImpl = std::make_shared<Impl>(accessor, observer);
 }
 
 BufferPoolClient::~BufferPoolClient() {
@@ -672,6 +807,19 @@
     return ResultStatus::CRITICAL_ERROR;
 }
 
+void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
+    if (isValid()) {
+        mImpl->receiveInvalidation(msgId);
+    }
+}
+
+ResultStatus BufferPoolClient::flush() {
+    if (isValid()) {
+        return mImpl->flush();
+    }
+    return ResultStatus::CRITICAL_ERROR;
+}
+
 ResultStatus BufferPoolClient::allocate(
         const std::vector<uint8_t> &params,
         native_handle_t **handle,
diff --git a/media/bufferpool/2.0/BufferPoolClient.h b/media/bufferpool/2.0/BufferPoolClient.h
index 1889ea3..e8d9ae6 100644
--- a/media/bufferpool/2.0/BufferPoolClient.h
+++ b/media/bufferpool/2.0/BufferPoolClient.h
@@ -49,7 +49,8 @@
      * Creates a buffer pool client from a local buffer pool
      * (via ClientManager#create).
      */
-    explicit BufferPoolClient(const sp<Accessor> &accessor);
+    explicit BufferPoolClient(const sp<Accessor> &accessor,
+                              const sp<IObserver> &observer);
 
     /**
      * Creates a buffer pool client from a remote buffer pool
@@ -57,7 +58,8 @@
      * Note: A buffer pool client created with remote buffer pool cannot
      * allocate a buffer.
      */
-    explicit BufferPoolClient(const sp<IAccessor> &accessor);
+    explicit BufferPoolClient(const sp<IAccessor> &accessor,
+                              const sp<IObserver> &observer);
 
     /** Destructs a buffer pool client. */
     ~BufferPoolClient();
@@ -73,6 +75,10 @@
 
     ResultStatus getAccessor(sp<IAccessor> *accessor);
 
+    void receiveInvalidation(uint32_t msgId);
+
+    ResultStatus flush();
+
     ResultStatus allocate(const std::vector<uint8_t> &params,
                           native_handle_t **handle,
                           std::shared_ptr<BufferPoolData> *buffer);
@@ -92,6 +98,7 @@
     std::shared_ptr<Impl> mImpl;
 
     friend struct ClientManager;
+    friend struct Observer;
 };
 
 }  // namespace implementation
diff --git a/media/bufferpool/2.0/BufferStatus.cpp b/media/bufferpool/2.0/BufferStatus.cpp
index 0d3f5a3..6937260 100644
--- a/media/bufferpool/2.0/BufferStatus.cpp
+++ b/media/bufferpool/2.0/BufferStatus.cpp
@@ -17,6 +17,7 @@
 #define LOG_TAG "BufferPoolStatus"
 //#define LOG_NDEBUG 0
 
+#include <thread>
 #include <time.h>
 #include "BufferStatus.h"
 
@@ -37,6 +38,18 @@
     return stamp;
 }
 
+bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId) {
+    return curMsgId != prevMsgId && curMsgId - prevMsgId < prevMsgId - curMsgId;
+}
+
+bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId) {
+    if (from < to) {
+        return from <= bufferId && bufferId < to;
+    } else { // wrap happens
+        return from <= bufferId || bufferId < to;
+    }
+}
+
 static constexpr int kNumElementsInQueue = 1024*16;
 static constexpr int kMinElementsToSyncInQueue = 128;
 
@@ -139,6 +152,29 @@
     }
 }
 
+void BufferStatusChannel::postBufferInvalidateAck(
+        ConnectionId connectionId,
+        uint32_t invalidateId,
+        bool *invalidated) {
+    if (mValid && !*invalidated) {
+        size_t avail = mBufferStatusQueue->availableToWrite();
+        if (avail > 0) {
+            BufferStatusMessage message;
+            message.newStatus = BufferStatus::INVALIDATION_ACK;
+            message.bufferId = invalidateId;
+            message.connectionId = connectionId;
+            if (!mBufferStatusQueue->write(&message, 1)) {
+                // Since avaliable # of writes are already confirmed,
+                // this should not happen.
+                // TODO: error handing?
+                ALOGW("FMQ message cannot be sent from %lld", (long long)connectionId);
+                return;
+            }
+            *invalidated = true;
+        }
+    }
+}
+
 bool BufferStatusChannel::postBufferStatusMessage(
         TransactionId transactionId, BufferId bufferId,
         BufferStatus status, ConnectionId connectionId, ConnectionId targetId,
@@ -182,6 +218,83 @@
     return false;
 }
 
+BufferInvalidationListener::BufferInvalidationListener(
+        const InvalidationDescriptor &fmqDesc) {
+    std::unique_ptr<BufferInvalidationQueue> queue =
+            std::make_unique<BufferInvalidationQueue>(fmqDesc);
+    if (!queue || queue->isValid() == false) {
+        mValid = false;
+        return;
+    }
+    mValid  = true;
+    mBufferInvalidationQueue = std::move(queue);
+    // drain previous messages
+    size_t avail = std::min(
+            mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue);
+    std::vector<BufferInvalidationMessage> temp(avail);
+    if (avail > 0) {
+        mBufferInvalidationQueue->read(temp.data(), avail);
+    }
+}
+
+void BufferInvalidationListener::getInvalidations(
+        std::vector<BufferInvalidationMessage> &messages) {
+    // Try twice in case of overflow.
+    // TODO: handling overflow though it may not happen.
+    for (int i = 0; i < 2; ++i) {
+        size_t avail = std::min(
+                mBufferInvalidationQueue->availableToRead(), (size_t) kNumElementsInQueue);
+        if (avail > 0) {
+            std::vector<BufferInvalidationMessage> temp(avail);
+            if (mBufferInvalidationQueue->read(temp.data(), avail)) {
+                messages.reserve(messages.size() + avail);
+                for (auto it = temp.begin(); it != temp.end(); ++it) {
+                    messages.push_back(*it);
+                }
+                break;
+            }
+        } else {
+            return;
+        }
+    }
+}
+
+bool BufferInvalidationListener::isValid() {
+    return mValid;
+}
+
+BufferInvalidationChannel::BufferInvalidationChannel()
+    : mValid(true),
+      mBufferInvalidationQueue(
+              std::make_unique<BufferInvalidationQueue>(kNumElementsInQueue, true)) {
+    if (!mBufferInvalidationQueue || mBufferInvalidationQueue->isValid() == false) {
+        mValid = false;
+    }
+}
+
+bool BufferInvalidationChannel::isValid() {
+    return mValid;
+}
+
+void BufferInvalidationChannel::getDesc(const InvalidationDescriptor **fmqDescPtr) {
+    if (mValid) {
+        *fmqDescPtr = mBufferInvalidationQueue->getDesc();
+    } else {
+        *fmqDescPtr = nullptr;
+    }
+}
+
+void BufferInvalidationChannel::postInvalidation(
+        uint32_t msgId, BufferId fromId, BufferId toId) {
+    BufferInvalidationMessage message;
+
+    message.messageId = msgId;
+    message.fromBufferId = fromId;
+    message.toBufferId = toId;
+    // TODO: handle failure (it does not happen normally.)
+    mBufferInvalidationQueue->write(&message);
+}
+
 }  // namespace implementation
 }  // namespace V2_0
 }  // namespace bufferpool
diff --git a/media/bufferpool/2.0/BufferStatus.h b/media/bufferpool/2.0/BufferStatus.h
index 777a320..fa65838 100644
--- a/media/bufferpool/2.0/BufferStatus.h
+++ b/media/bufferpool/2.0/BufferStatus.h
@@ -37,9 +37,13 @@
 /** Returns monotonic timestamp in Us since fixed point in time. */
 int64_t getTimestampNow();
 
+bool isMessageLater(uint32_t curMsgId, uint32_t prevMsgId);
+
+bool isBufferInRange(BufferId from, BufferId to, BufferId bufferId);
+
 /**
- * A collection of FMQ for a buffer pool. buffer ownership/status change
- * messages are sent via the FMQs from the clients.
+ * A collection of buffer status message FMQ for a buffer pool. buffer
+ * ownership/status change messages are sent via the FMQs from the clients.
  */
 class BufferStatusObserver {
 private:
@@ -47,7 +51,8 @@
             mBufferStatusQueues;
 
 public:
-    /** Creates an FMQ for the specified connection(client).
+    /** Creates a buffer status message FMQ for the specified
+     * connection(client).
      *
      * @param connectionId  connection Id of the specified client.
      * @param fmqDescPtr    double ptr of created FMQ's descriptor.
@@ -58,7 +63,8 @@
      */
     ResultStatus open(ConnectionId id, const StatusDescriptor** fmqDescPtr);
 
-    /** Closes an FMQ for the specified connection(client).
+    /** Closes a buffer status message FMQ for the specified
+     * connection(client).
      *
      * @param connectionId  connection Id of the specified client.
      *
@@ -75,8 +81,8 @@
 };
 
 /**
- * An FMQ for a buffer pool client. Buffer ownership/status change messages
- * are sent via the fmq to the buffer pool.
+ * A buffer status message FMQ for a buffer pool client. Buffer ownership/status
+ * change messages are sent via the fmq to the buffer pool.
  */
 class BufferStatusChannel {
 private:
@@ -85,7 +91,8 @@
 
 public:
     /**
-     * Connects to an FMQ from a descriptor of the created FMQ.
+     * Connects to a buffer status message FMQ from a descriptor of
+     * the created FMQ.
      *
      * @param fmqDesc   Descriptor of the created FMQ.
      */
@@ -131,6 +138,86 @@
             ConnectionId connectionId,
             ConnectionId targetId,
             std::list<BufferId> &pending, std::list<BufferId> &posted);
+
+    /**
+     * Posts a buffer invaliadation messge to the buffer pool.
+     *
+     * @param connectionId  connection Id of the client.
+     * @param invalidateId  invalidation ack to the buffer pool.
+     *                      if invalidation id is zero, the ack will not be
+     *                      posted.
+     * @param invalidated   sets {@code true} only when the invalidation ack is
+     *                      posted.
+     */
+    void postBufferInvalidateAck(
+            ConnectionId connectionId,
+            uint32_t invalidateId,
+            bool *invalidated);
+};
+
+/**
+ * A buffer invalidation FMQ for a buffer pool client. Buffer invalidation
+ * messages are received via the fmq from the buffer pool. Buffer invalidation
+ * messages are handled as soon as possible.
+ */
+class BufferInvalidationListener {
+private:
+    bool mValid;
+    std::unique_ptr<BufferInvalidationQueue> mBufferInvalidationQueue;
+
+public:
+    /**
+     * Connects to a buffer invalidation FMQ from a descriptor of the created FMQ.
+     *
+     * @param fmqDesc   Descriptor of the created FMQ.
+     */
+    BufferInvalidationListener(const InvalidationDescriptor &fmqDesc);
+
+    /** Retrieves all pending buffer invalidation messages from the buffer pool.
+     *
+     * @param messages  retrieved pending messages.
+     */
+    void getInvalidations(std::vector<BufferInvalidationMessage> &messages);
+
+    /** Returns whether the FMQ is connected succesfully. */
+    bool isValid();
+};
+
+/**
+ * A buffer invalidation FMQ for a buffer pool. A buffer pool will send buffer
+ * invalidation messages to the clients via the FMQ. The FMQ is shared among
+ * buffer pool clients.
+ */
+class BufferInvalidationChannel {
+private:
+    bool mValid;
+    std::unique_ptr<BufferInvalidationQueue> mBufferInvalidationQueue;
+
+public:
+    /**
+     * Creates a buffer invalidation FMQ for a buffer pool.
+     */
+    BufferInvalidationChannel();
+
+    /** Returns whether the FMQ is connected succesfully. */
+    bool isValid();
+
+    /**
+     * Retrieves the descriptor of a buffer invalidation FMQ. the descriptor may
+     * be passed to the client for buffer invalidation handling.
+     *
+     * @param fmqDescPtr    double ptr of created FMQ's descriptor.
+     */
+    void getDesc(const InvalidationDescriptor **fmqDescPtr);
+
+    /** Posts a buffer invalidation for invalidated buffers.
+     *
+     * @param msgId     Invalidation message id which is used when clients send
+     *                  acks back via BufferStatusMessage
+     * @param fromId    The start bufferid of the invalidated buffers(inclusive)
+     * @param toId      The end bufferId of the invalidated buffers(inclusive)
+     */
+    void postInvalidation(uint32_t msgId, BufferId fromId, BufferId toId);
 };
 
 }  // namespace implementation
diff --git a/media/bufferpool/2.0/ClientManager.cpp b/media/bufferpool/2.0/ClientManager.cpp
index eeaf093..f8531d8 100644
--- a/media/bufferpool/2.0/ClientManager.cpp
+++ b/media/bufferpool/2.0/ClientManager.cpp
@@ -23,6 +23,7 @@
 #include <unistd.h>
 #include <utils/Log.h>
 #include "BufferPoolClient.h"
+#include "Observer.h"
 
 namespace android {
 namespace hardware {
@@ -106,6 +107,8 @@
 
     ResultStatus close(ConnectionId connectionId);
 
+    ResultStatus flush(ConnectionId connectionId);
+
     ResultStatus allocate(ConnectionId connectionId,
                           const std::vector<uint8_t> &params,
                           native_handle_t **handle,
@@ -153,10 +156,13 @@
                 mClients;
     } mActive;
 
+    sp<Observer> mObserver;
+
     ClientManagerCookieHolder mRemoteClientCookies;
 };
 
-ClientManager::Impl::Impl() {}
+ClientManager::Impl::Impl()
+    : mObserver(new Observer()) {}
 
 ResultStatus ClientManager::Impl::registerSender(
         const sp<IAccessor> &accessor, ConnectionId *pConnectionId) {
@@ -185,7 +191,7 @@
             lock.unlock();
             ResultStatus result = ResultStatus::OK;
             const std::shared_ptr<BufferPoolClient> client =
-                    std::make_shared<BufferPoolClient>(accessor);
+                    std::make_shared<BufferPoolClient>(accessor, mObserver);
             lock.lock();
             if (!client) {
                 result = ResultStatus::NO_MEMORY;
@@ -197,6 +203,7 @@
                 const std::weak_ptr<BufferPoolClient> wclient = client;
                 mCache.mClients.push_back(std::make_pair(accessor, wclient));
                 ConnectionId conId = client->getConnectionId();
+                mObserver->addClient(conId, wclient);
                 {
                     std::lock_guard<std::mutex> lock(mActive.mMutex);
                     mActive.mClients.insert(std::make_pair(conId, client));
@@ -266,8 +273,9 @@
     if (!accessor || !accessor->isValid()) {
         return ResultStatus::CRITICAL_ERROR;
     }
+    // TODO: observer is local. use direct call instead of hidl call.
     std::shared_ptr<BufferPoolClient> client =
-            std::make_shared<BufferPoolClient>(accessor);
+            std::make_shared<BufferPoolClient>(accessor, mObserver);
     if (!client || !client->isValid()) {
         return ResultStatus::CRITICAL_ERROR;
     }
@@ -280,6 +288,7 @@
         const std::weak_ptr<BufferPoolClient> wclient = client;
         mCache.mClients.push_back(std::make_pair(accessor, wclient));
         ConnectionId conId = client->getConnectionId();
+        mObserver->addClient(conId, wclient);
         {
             std::lock_guard<std::mutex> lock(mActive.mMutex);
             mActive.mClients.insert(std::make_pair(conId, client));
@@ -291,12 +300,13 @@
 }
 
 ResultStatus ClientManager::Impl::close(ConnectionId connectionId) {
-    std::lock_guard<std::mutex> lock1(mCache.mMutex);
-    std::lock_guard<std::mutex> lock2(mActive.mMutex);
+    std::unique_lock<std::mutex> lock1(mCache.mMutex);
+    std::unique_lock<std::mutex> lock2(mActive.mMutex);
     auto it = mActive.mClients.find(connectionId);
     if (it != mActive.mClients.end()) {
         sp<IAccessor> accessor;
         it->second->getAccessor(&accessor);
+        std::shared_ptr<BufferPoolClient> closing = it->second;
         mActive.mClients.erase(connectionId);
         for (auto cit = mCache.mClients.begin(); cit != mCache.mClients.end();) {
             // clean up dead client caches
@@ -307,11 +317,27 @@
                 cit++;
             }
         }
+        lock2.unlock();
+        lock1.unlock();
+        closing->flush();
         return ResultStatus::OK;
     }
     return ResultStatus::NOT_FOUND;
 }
 
+ResultStatus ClientManager::Impl::flush(ConnectionId connectionId) {
+    std::shared_ptr<BufferPoolClient> client;
+    {
+        std::lock_guard<std::mutex> lock(mActive.mMutex);
+        auto it = mActive.mClients.find(connectionId);
+        if (it == mActive.mClients.end()) {
+            return ResultStatus::NOT_FOUND;
+        }
+        client = it->second;
+    }
+    return client->flush();
+}
+
 ResultStatus ClientManager::Impl::allocate(
         ConnectionId connectionId, const std::vector<uint8_t> &params,
         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
@@ -461,6 +487,13 @@
     return ResultStatus::CRITICAL_ERROR;
 }
 
+ResultStatus ClientManager::flush(ConnectionId connectionId) {
+    if (mImpl) {
+        return mImpl->flush(connectionId);
+    }
+    return ResultStatus::CRITICAL_ERROR;
+}
+
 ResultStatus ClientManager::allocate(
         ConnectionId connectionId, const std::vector<uint8_t> &params,
         native_handle_t **handle, std::shared_ptr<BufferPoolData> *buffer) {
diff --git a/media/bufferpool/2.0/Connection.cpp b/media/bufferpool/2.0/Connection.cpp
index cd837a1..6bd0e79 100644
--- a/media/bufferpool/2.0/Connection.cpp
+++ b/media/bufferpool/2.0/Connection.cpp
@@ -60,6 +60,13 @@
     }
 }
 
+ResultStatus Connection::flush() {
+    if (mInitialized && mAccessor) {
+        return mAccessor->flush();
+    }
+    return ResultStatus::CRITICAL_ERROR;
+}
+
 ResultStatus Connection::allocate(
         const std::vector<uint8_t> &params, BufferId *bufferId,
         const native_handle_t **handle) {
diff --git a/media/bufferpool/2.0/Connection.h b/media/bufferpool/2.0/Connection.h
index e2b47f1..8507749 100644
--- a/media/bufferpool/2.0/Connection.h
+++ b/media/bufferpool/2.0/Connection.h
@@ -44,6 +44,11 @@
     Return<void> fetch(uint64_t transactionId, uint32_t bufferId, fetch_cb _hidl_cb) override;
 
     /**
+     * Invalidates all buffers which are active and/or are ready to be recycled.
+     */
+    ResultStatus flush();
+
+    /**
      * Allocates a buffer using the specified parameters. Recycles a buffer if
      * it is possible. The returned buffer can be transferred to other remote
      * clients(Connection).
diff --git a/media/bufferpool/2.0/Observer.cpp b/media/bufferpool/2.0/Observer.cpp
new file mode 100644
index 0000000..5b23160
--- /dev/null
+++ b/media/bufferpool/2.0/Observer.cpp
@@ -0,0 +1,73 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#include "Observer.h"
+
+namespace android {
+namespace hardware {
+namespace media {
+namespace bufferpool {
+namespace V2_0 {
+namespace implementation {
+
+Observer::Observer() {
+}
+
+Observer::~Observer() {
+}
+
+// Methods from ::android::hardware::media::bufferpool::V2_0::IObserver follow.
+Return<void> Observer::onMessage(int64_t connectionId, uint32_t msgId) {
+    std::unique_lock<std::mutex> lock(mLock);
+    auto it = mClients.find(connectionId);
+    if (it != mClients.end()) {
+        const std::shared_ptr<BufferPoolClient> client = it->second.lock();
+        if (!client) {
+            mClients.erase(it);
+        } else {
+            lock.unlock();
+            client->receiveInvalidation(msgId);
+        }
+    }
+    return Void();
+}
+
+void Observer::addClient(ConnectionId connectionId,
+                         const std::weak_ptr<BufferPoolClient> &wclient) {
+    std::lock_guard<std::mutex> lock(mLock);
+    for (auto it = mClients.begin(); it != mClients.end();) {
+        if (!it->second.lock() || it->first == connectionId) {
+            it = mClients.erase(it);
+        } else {
+            ++it;
+        }
+    }
+    mClients.insert(std::make_pair(connectionId, wclient));
+
+}
+
+void Observer::delClient(ConnectionId connectionId) {
+    std::lock_guard<std::mutex> lock(mLock);
+    mClients.erase(connectionId);
+}
+
+
+}  // namespace implementation
+}  // namespace V2_0
+}  // namespace bufferpool
+}  // namespace media
+}  // namespace hardware
+}  // namespace android
diff --git a/media/bufferpool/2.0/Observer.h b/media/bufferpool/2.0/Observer.h
new file mode 100644
index 0000000..42bd7c1
--- /dev/null
+++ b/media/bufferpool/2.0/Observer.h
@@ -0,0 +1,67 @@
+/*
+ * Copyright (C) 2018 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H
+#define ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H
+
+#include <android/hardware/media/bufferpool/2.0/IObserver.h>
+#include <hidl/MQDescriptor.h>
+#include <hidl/Status.h>
+#include "BufferPoolClient.h"
+
+namespace android {
+namespace hardware {
+namespace media {
+namespace bufferpool {
+namespace V2_0 {
+namespace implementation {
+
+using ::android::hardware::hidl_array;
+using ::android::hardware::hidl_memory;
+using ::android::hardware::hidl_string;
+using ::android::hardware::hidl_vec;
+using ::android::hardware::Return;
+using ::android::hardware::Void;
+using ::android::sp;
+
+struct Observer : public IObserver {
+    // Methods from ::android::hardware::media::bufferpool::V2_0::IObserver follow.
+    Return<void> onMessage(int64_t connectionId, uint32_t msgId) override;
+
+    ~Observer();
+
+    void addClient(ConnectionId connectionId,
+                   const std::weak_ptr<BufferPoolClient> &wclient);
+
+    void delClient(ConnectionId connectionId);
+
+private:
+    Observer();
+
+    friend struct ClientManager;
+
+    std::mutex mLock;
+    std::map<ConnectionId, const std::weak_ptr<BufferPoolClient>> mClients;
+};
+
+}  // namespace implementation
+}  // namespace V2_0
+}  // namespace bufferpool
+}  // namespace media
+}  // namespace hardware
+}  // namespace android
+
+#endif  // ANDROID_HARDWARE_MEDIA_BUFFERPOOL_V2_0_OBSERVER_H
diff --git a/media/bufferpool/2.0/include/bufferpool/ClientManager.h b/media/bufferpool/2.0/include/bufferpool/ClientManager.h
index cfc3bc3..953c304 100644
--- a/media/bufferpool/2.0/include/bufferpool/ClientManager.h
+++ b/media/bufferpool/2.0/include/bufferpool/ClientManager.h
@@ -92,6 +92,18 @@
     ResultStatus close(ConnectionId connectionId);
 
     /**
+     * Evicts cached allocations. If it's local connection, release the
+     * previous allocations and do not recycle current active allocations.
+     *
+     * @param connectionId The id of the connection.
+     *
+     * @return OK when the connection is resetted.
+     *         NOT_FOUND when the specified connection was not found.
+     *         CRITICAL_ERROR otherwise.
+     */
+    ResultStatus flush(ConnectionId connectionId);
+
+    /**
      * Allocates a buffer from the specified connection.
      *
      * @param connectionId  The id of the connection.