Fix bufferpool

Fix bufferpool carsh sporadically on invalidation. use android::sp
instead of android::wp for caching IObserver hidl interface.

Bug: 112203066
Change-Id: I69aafa11cc617f9ae36cce88283a3294fcab9bbd
diff --git a/media/bufferpool/2.0/Accessor.cpp b/media/bufferpool/2.0/Accessor.cpp
index f264501..57b4609 100644
--- a/media/bufferpool/2.0/Accessor.cpp
+++ b/media/bufferpool/2.0/Accessor.cpp
@@ -113,6 +113,10 @@
     return sConnectionDeathRecipient;
 }
 
+void Accessor::createInvalidator() {
+    Accessor::Impl::createInvalidator();
+}
+
 // Methods from ::android::hardware::media::bufferpool::V2_0::IAccessor follow.
 Return<void> Accessor::connect(
         const sp<::android::hardware::media::bufferpool::V2_0::IObserver>& observer,
diff --git a/media/bufferpool/2.0/Accessor.h b/media/bufferpool/2.0/Accessor.h
index 4b5b17a..8d02519 100644
--- a/media/bufferpool/2.0/Accessor.h
+++ b/media/bufferpool/2.0/Accessor.h
@@ -185,6 +185,8 @@
      */
     static sp<ConnectionDeathRecipient> getConnectionDeathRecipient();
 
+    static void createInvalidator();
+
 private:
     class Impl;
     std::shared_ptr<Impl> mImpl;
diff --git a/media/bufferpool/2.0/AccessorImpl.cpp b/media/bufferpool/2.0/AccessorImpl.cpp
index 4cc8abc..84fcca2 100644
--- a/media/bufferpool/2.0/AccessorImpl.cpp
+++ b/media/bufferpool/2.0/AccessorImpl.cpp
@@ -177,6 +177,7 @@
 
 ResultStatus Accessor::Impl::close(ConnectionId connectionId) {
     std::lock_guard<std::mutex> lock(mBufferPool.mMutex);
+    ALOGV("connection close %lld: %u", (long long)connectionId, mBufferPool.mInvalidation.mId);
     mBufferPool.processStatusMessages();
     mBufferPool.handleClose(connectionId);
     mBufferPool.mObserver.close(connectionId);
@@ -277,7 +278,7 @@
     return int(total ? 0.5 + 100. * static_cast<S>(base) / total : 0);
 }
 
-std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sSeqId(0);
+std::atomic<std::uint32_t> Accessor::Impl::BufferPool::Invalidation::sInvSeqId(0);
 
 Accessor::Impl::Impl::BufferPool::~BufferPool() {
     std::lock_guard<std::mutex> lock(mMutex);
@@ -316,8 +317,7 @@
         BufferId bufferId,
         BufferInvalidationChannel &channel) {
     for (auto it = mPendings.begin(); it != mPendings.end();) {
-        if (it->invalidate(bufferId)) {
-            it = mPendings.erase(it);
+        if (it->isInvalidated(bufferId)) {
             uint32_t msgId = 0;
             if (it->mNeedsAck) {
                 msgId = ++mInvalidationId;
@@ -327,7 +327,8 @@
                 }
             }
             channel.postInvalidation(msgId, it->mFrom, it->mTo);
-            sInvalidator.addAccessor(mId, it->mImpl);
+            sInvalidator->addAccessor(mId, it->mImpl);
+            it = mPendings.erase(it);
             continue;
         }
         ++it;
@@ -350,10 +351,12 @@
                 msgId = ++mInvalidationId;
             }
         }
+        ALOGV("bufferpool invalidation requested and queued");
         channel.postInvalidation(msgId, from, to);
-        sInvalidator.addAccessor(mId, impl);
+        sInvalidator->addAccessor(mId, impl);
     } else {
         // TODO: sending hint message?
+        ALOGV("bufferpool invalidation requested and pending");
         Pending pending(needsAck, from, to, left, impl);
         mPendings.push_back(pending);
     }
@@ -364,10 +367,14 @@
         std::set<int> deads;
         for (auto it = mAcks.begin(); it != mAcks.end(); ++it) {
             if (it->second != mInvalidationId) {
-                const sp<IObserver> observer = mObservers[it->first].promote();
+                const sp<IObserver> observer = mObservers[it->first];
                 if (observer) {
-                    observer->onMessage(it->first, mInvalidationId);
+                    ALOGV("connection %lld call observer (%u: %u)",
+                          (long long)it->first, it->second, mInvalidationId);
+                    Return<void> transResult = observer->onMessage(it->first, mInvalidationId);
+                    (void) transResult;
                 } else {
+                    ALOGV("bufferpool observer died %lld", (long long)it->first);
                     deads.insert(it->first);
                 }
             }
@@ -379,7 +386,7 @@
         }
     }
     // All invalidation Ids are synced.
-    sInvalidator.delAccessor(mId);
+    sInvalidator->delAccessor(mId);
 }
 
 bool Accessor::Impl::BufferPool::handleOwnBuffer(
@@ -542,6 +549,7 @@
                 break;
             case BufferStatus::INVALIDATION_ACK:
                 mInvalidation.onAck(message.connectionId, message.bufferId);
+                ret = true;
                 break;
         }
         if (ret == false) {
@@ -727,6 +735,7 @@
     BufferId to = mSeq;
     mStartSeq = mSeq;
     // TODO: needsAck params 
+    ALOGV("buffer invalidation request bp:%u %u %u", mInvalidation.mId, from, to);
     if (from != to) {
         invalidate(true, from, to, impl);
     }
@@ -791,6 +800,7 @@
             notify = true;
         }
         mAccessors.insert(std::make_pair(accessorId, impl));
+        ALOGV("buffer invalidation added bp:%u %d", accessorId, notify);
     }
     lock.unlock();
     if (notify) {
@@ -801,12 +811,19 @@
 void Accessor::Impl::AccessorInvalidator::delAccessor(uint32_t accessorId) {
     std::lock_guard<std::mutex> lock(mMutex);
     mAccessors.erase(accessorId);
+    ALOGV("buffer invalidation deleted bp:%u", accessorId);
     if (mAccessors.size() == 0) {
         mReady = false;
     }
 }
 
-Accessor::Impl::AccessorInvalidator Accessor::Impl::sInvalidator;
+std::unique_ptr<Accessor::Impl::AccessorInvalidator> Accessor::Impl::sInvalidator;
+
+void Accessor::Impl::createInvalidator() {
+    if (!sInvalidator) {
+        sInvalidator = std::make_unique<Accessor::Impl::AccessorInvalidator>();
+    }
+}
 
 }  // namespace implementation
 }  // namespace V2_0
diff --git a/media/bufferpool/2.0/AccessorImpl.h b/media/bufferpool/2.0/AccessorImpl.h
index 6b03494..b3faa96 100644
--- a/media/bufferpool/2.0/AccessorImpl.h
+++ b/media/bufferpool/2.0/AccessorImpl.h
@@ -34,7 +34,7 @@
 
 /**
  * An implementation of a buffer pool accessor(or a buffer pool implementation.) */
-class Accessor::Impl 
+class Accessor::Impl
     : public std::enable_shared_from_this<Accessor::Impl> {
 public:
     Impl(const std::shared_ptr<BufferPoolAllocator> &allocator);
@@ -69,6 +69,8 @@
 
     void handleInvalidateAck();
 
+    static void createInvalidator();
+
 private:
     // ConnectionId = pid : (timestamp_created + seqId)
     // in order to guarantee uniqueness for each connection
@@ -111,7 +113,7 @@
         std::set<BufferId> mFreeBuffers;
 
         struct Invalidation {
-            static std::atomic<std::uint32_t> sSeqId;
+            static std::atomic<std::uint32_t> sInvSeqId;
 
             struct Pending {
                 bool mNeedsAck;
@@ -128,18 +130,18 @@
                           mImpl(impl)
                 {}
 
-                bool invalidate(uint32_t bufferId) {
+                bool isInvalidated(uint32_t bufferId) {
                     return isBufferInRange(mFrom, mTo, bufferId) && --mLeft == 0;
                 }
             };
 
             std::list<Pending> mPendings;
             std::map<ConnectionId, uint32_t> mAcks;
-            std::map<ConnectionId, const wp<IObserver>> mObservers;
+            std::map<ConnectionId, const sp<IObserver>> mObservers;
             uint32_t mInvalidationId;
             uint32_t mId;
 
-            Invalidation() : mInvalidationId(0), mId(sSeqId.fetch_add(1)) {}
+            Invalidation() : mInvalidationId(0), mId(sInvSeqId.fetch_add(1)) {}
 
             void onConnect(ConnectionId conId, const sp<IObserver> &observer);
 
@@ -234,6 +236,8 @@
         void invalidate(bool needsAck, BufferId from, BufferId to,
                         const std::shared_ptr<Accessor::Impl> &impl);
 
+        static void createInvalidator();
+
     public:
         /** Creates a buffer pool. */
         BufferPool();
@@ -376,7 +380,7 @@
         void delAccessor(uint32_t accessorId);
     };
 
-    static AccessorInvalidator sInvalidator;
+    static std::unique_ptr<AccessorInvalidator> sInvalidator;
 
     static void invalidatorThread(
         std::map<uint32_t, const std::weak_ptr<Accessor::Impl>> &accessors,
diff --git a/media/bufferpool/2.0/BufferPoolClient.cpp b/media/bufferpool/2.0/BufferPoolClient.cpp
index c80beff..5564a13 100644
--- a/media/bufferpool/2.0/BufferPoolClient.cpp
+++ b/media/bufferpool/2.0/BufferPoolClient.cpp
@@ -644,7 +644,7 @@
         } else if (messageId != 0) {
             // messages are drained.
             if (isMessageLater(messageId, mReleasing.mInvalidateId)) {
-                mReleasing.mInvalidateId = lastMsgId;
+                mReleasing.mInvalidateId = messageId;
                 mReleasing.mInvalidateAck = true;
             }
         }
@@ -653,6 +653,9 @@
             mReleasing.mStatusChannel->postBufferInvalidateAck(
                     mConnectionId,
                     mReleasing.mInvalidateId, &mReleasing.mInvalidateAck);
+            ALOGV("client %lld invalidateion ack (%d) %u",
+                (long long)mConnectionId,
+                mReleasing.mInvalidateAck, mReleasing.mInvalidateId);
         }
     }
     return cleared;
@@ -808,6 +811,7 @@
 }
 
 void BufferPoolClient::receiveInvalidation(uint32_t msgId) {
+    ALOGV("bufferpool client recv inv %u", msgId);
     if (isValid()) {
         mImpl->receiveInvalidation(msgId);
     }
diff --git a/media/bufferpool/2.0/ClientManager.cpp b/media/bufferpool/2.0/ClientManager.cpp
index f8531d8..c31d313 100644
--- a/media/bufferpool/2.0/ClientManager.cpp
+++ b/media/bufferpool/2.0/ClientManager.cpp
@@ -24,6 +24,7 @@
 #include <utils/Log.h>
 #include "BufferPoolClient.h"
 #include "Observer.h"
+#include "Accessor.h"
 
 namespace android {
 namespace hardware {
@@ -453,6 +454,7 @@
     if (!sInstance) {
         sInstance = new ClientManager();
     }
+    Accessor::createInvalidator();
     return sInstance;
 }