CCodec: refactor pipeline logic

Bug: 123632127
Test: bug repro steps
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:size:small
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:include-annotation:android.media.cts.MediaHeavyPresubmitTests
Change-Id: I289f51709dbd675991cd8949cd343c5bf5c6ef5c
diff --git a/media/codec2/sfplugin/Android.bp b/media/codec2/sfplugin/Android.bp
index 2870d39..a212651 100644
--- a/media/codec2/sfplugin/Android.bp
+++ b/media/codec2/sfplugin/Android.bp
@@ -8,6 +8,7 @@
         "CCodecConfig.cpp",
         "Codec2Buffer.cpp",
         "Codec2InfoBuilder.cpp",
+        "PipelineWatcher.cpp",
         "ReflectedParamUpdater.cpp",
         "SkipCutBuffer.cpp",
     ],
diff --git a/media/codec2/sfplugin/CCodec.cpp b/media/codec2/sfplugin/CCodec.cpp
index 10263de..ed1f85b 100644
--- a/media/codec2/sfplugin/CCodec.cpp
+++ b/media/codec2/sfplugin/CCodec.cpp
@@ -448,14 +448,13 @@
 
     virtual void onWorkDone(
             const std::weak_ptr<Codec2Client::Component>& component,
-            std::list<std::unique_ptr<C2Work>>& workItems,
-            size_t numDiscardedInputBuffers) override {
+            std::list<std::unique_ptr<C2Work>>& workItems) override {
         (void)component;
         sp<CCodec> codec(mCodec.promote());
         if (!codec) {
             return;
         }
-        codec->onWorkDone(workItems, numDiscardedInputBuffers);
+        codec->onWorkDone(workItems);
     }
 
     virtual void onTripped(
@@ -504,10 +503,10 @@
     }
 
     virtual void onInputBufferDone(
-            const std::shared_ptr<C2Buffer>& buffer) override {
+            uint64_t frameIndex, size_t arrayIndex) override {
         sp<CCodec> codec(mCodec.promote());
         if (codec) {
-            codec->onInputBufferDone(buffer);
+            codec->onInputBufferDone(frameIndex, arrayIndex);
         }
     }
 
@@ -531,10 +530,6 @@
                 {RenderedFrameInfo(mediaTimeUs, renderTimeNs)});
     }
 
-    void onWorkQueued(bool eos) override {
-        mCodec->onWorkQueued(eos);
-    }
-
     void onOutputBuffersChanged() override {
         mCodec->mCallback->onOutputBuffersChanged();
     }
@@ -546,8 +541,7 @@
 // CCodec
 
 CCodec::CCodec()
-    : mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))),
-      mQueuedWorkCount(0) {
+    : mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))) {
 }
 
 CCodec::~CCodec() {
@@ -1343,7 +1337,6 @@
     }
 
     mChannel->flush(flushedWork);
-    subQueuedWorkCount(flushedWork.size());
 
     {
         Mutexed<State>::Locked state(mState);
@@ -1465,28 +1458,16 @@
     config->setParameters(comp, params, C2_MAY_BLOCK);
 }
 
-void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
-                        size_t numDiscardedInputBuffers) {
+void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems) {
     if (!workItems.empty()) {
-        {
-            Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
-                    mNumDiscardedInputBuffersQueue);
-            numDiscardedInputBuffersQueue->insert(
-                    numDiscardedInputBuffersQueue->end(),
-                    workItems.size() - 1, 0);
-            numDiscardedInputBuffersQueue->emplace_back(
-                    numDiscardedInputBuffers);
-        }
-        {
-            Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
-            queue->splice(queue->end(), workItems);
-        }
+        Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
+        queue->splice(queue->end(), workItems);
     }
     (new AMessage(kWhatWorkDone, this))->post();
 }
 
-void CCodec::onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer) {
-    mChannel->onInputBufferDone(buffer);
+void CCodec::onInputBufferDone(uint64_t frameIndex, size_t arrayIndex) {
+    mChannel->onInputBufferDone(frameIndex, arrayIndex);
 }
 
 void CCodec::onMessageReceived(const sp<AMessage> &msg) {
@@ -1512,7 +1493,6 @@
         case kWhatStart: {
             // C2Component::start() should return within 500ms.
             setDeadline(now, 550ms, "start");
-            mQueuedWorkCount = 0;
             start();
             break;
         }
@@ -1520,10 +1500,6 @@
             // C2Component::stop() should return within 500ms.
             setDeadline(now, 550ms, "stop");
             stop();
-
-            mQueuedWorkCount = 0;
-            Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-            deadline->set(TimePoint::max(), "none");
             break;
         }
         case kWhatFlush: {
@@ -1549,7 +1525,6 @@
         }
         case kWhatWorkDone: {
             std::unique_ptr<C2Work> work;
-            size_t numDiscardedInputBuffers;
             bool shouldPost = false;
             {
                 Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
@@ -1560,24 +1535,10 @@
                 queue->pop_front();
                 shouldPost = !queue->empty();
             }
-            {
-                Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
-                        mNumDiscardedInputBuffersQueue);
-                if (numDiscardedInputBuffersQueue->empty()) {
-                    numDiscardedInputBuffers = 0;
-                } else {
-                    numDiscardedInputBuffers = numDiscardedInputBuffersQueue->front();
-                    numDiscardedInputBuffersQueue->pop_front();
-                }
-            }
             if (shouldPost) {
                 (new AMessage(kWhatWorkDone, this))->post();
             }
 
-            if (work->worklets.empty()
-                    || !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
-                subQueuedWorkCount(1);
-            }
             // handle configuration changes in work done
             Mutexed<Config>::Locked config(mConfig);
             bool changed = false;
@@ -1641,8 +1602,7 @@
             }
             mChannel->onWorkDone(
                     std::move(work), changed ? config->mOutputFormat : nullptr,
-                    initData.hasChanged() ? initData.update().get() : nullptr,
-                    numDiscardedInputBuffers);
+                    initData.hasChanged() ? initData.update().get() : nullptr);
             break;
         }
         case kWhatWatch: {
@@ -1669,17 +1629,26 @@
 void CCodec::initiateReleaseIfStuck() {
     std::string name;
     bool pendingDeadline = false;
-    for (Mutexed<NamedTimePoint> *deadlinePtr : { &mDeadline, &mQueueDeadline, &mEosDeadline }) {
-        Mutexed<NamedTimePoint>::Locked deadline(*deadlinePtr);
+    {
+        Mutexed<NamedTimePoint>::Locked deadline(mDeadline);
         if (deadline->get() < std::chrono::steady_clock::now()) {
             name = deadline->getName();
-            break;
         }
         if (deadline->get() != TimePoint::max()) {
             pendingDeadline = true;
         }
     }
     if (name.empty()) {
+        constexpr std::chrono::steady_clock::duration kWorkDurationThreshold = 3s;
+        std::chrono::steady_clock::duration elapsed = mChannel->elapsed();
+        if (elapsed >= kWorkDurationThreshold) {
+            name = "queue";
+        }
+        if (elapsed > 0s) {
+            pendingDeadline = true;
+        }
+    }
+    if (name.empty()) {
         // We're not stuck.
         if (pendingDeadline) {
             // If we are not stuck yet but still has deadline coming up,
@@ -1694,33 +1663,6 @@
     mCallback->onError(UNKNOWN_ERROR, ACTION_CODE_FATAL);
 }
 
-void CCodec::onWorkQueued(bool eos) {
-    ALOGV("queued work count +1 from %d", mQueuedWorkCount.load());
-    int32_t count = ++mQueuedWorkCount;
-    if (eos) {
-        CCodecWatchdog::getInstance()->watch(this);
-        Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
-        deadline->set(std::chrono::steady_clock::now() + 3s, "eos");
-    }
-    // TODO: query and use input/pipeline/output delay combined
-    if (count >= 4) {
-        CCodecWatchdog::getInstance()->watch(this);
-        Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-        deadline->set(std::chrono::steady_clock::now() + 3s, "queue");
-    }
-}
-
-void CCodec::subQueuedWorkCount(uint32_t count) {
-    ALOGV("queued work count -%u from %d", count, mQueuedWorkCount.load());
-    int32_t currentCount = (mQueuedWorkCount -= count);
-    if (currentCount == 0) {
-        Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
-        deadline->set(TimePoint::max(), "none");
-    }
-    Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-    deadline->set(TimePoint::max(), "none");
-}
-
 }  // namespace android
 
 extern "C" android::CodecBase *CreateCodec() {
diff --git a/media/codec2/sfplugin/CCodec.h b/media/codec2/sfplugin/CCodec.h
index 78b009e..bb8bd19 100644
--- a/media/codec2/sfplugin/CCodec.h
+++ b/media/codec2/sfplugin/CCodec.h
@@ -66,9 +66,8 @@
     virtual void signalRequestIDRFrame() override;
 
     void initiateReleaseIfStuck();
-    void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
-                    size_t numDiscardedInputBuffers);
-    void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
+    void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems);
+    void onInputBufferDone(uint64_t frameIndex, size_t arrayIndex);
 
 protected:
     virtual ~CCodec();
@@ -76,7 +75,7 @@
     virtual void onMessageReceived(const sp<AMessage> &msg) override;
 
 private:
-    typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
+    typedef std::chrono::steady_clock::time_point TimePoint;
 
     status_t tryAndReportOnError(std::function<status_t()> job);
 
@@ -100,9 +99,6 @@
             const std::chrono::milliseconds &timeout,
             const char *name);
 
-    void onWorkQueued(bool eos);
-    void subQueuedWorkCount(uint32_t count);
-
     enum {
         kWhatAllocate,
         kWhatConfigure,
@@ -167,13 +163,9 @@
     struct ClientListener;
 
     Mutexed<NamedTimePoint> mDeadline;
-    std::atomic_int32_t mQueuedWorkCount;
-    Mutexed<NamedTimePoint> mQueueDeadline;
-    Mutexed<NamedTimePoint> mEosDeadline;
     typedef CCodecConfig Config;
     Mutexed<Config> mConfig;
     Mutexed<std::list<std::unique_ptr<C2Work>>> mWorkDoneQueue;
-    Mutexed<std::list<size_t>> mNumDiscardedInputBuffersQueue;
 
     friend class CCodecCallbackImpl;
 
diff --git a/media/codec2/sfplugin/CCodecBufferChannel.cpp b/media/codec2/sfplugin/CCodecBufferChannel.cpp
index 8e6a3f8..587f83c 100644
--- a/media/codec2/sfplugin/CCodecBufferChannel.cpp
+++ b/media/codec2/sfplugin/CCodecBufferChannel.cpp
@@ -152,6 +152,11 @@
      */
     virtual std::unique_ptr<InputBuffers> toArrayMode(size_t size) = 0;
 
+    /**
+     * Return number of buffers the client owns.
+     */
+    virtual size_t numClientBuffers() const = 0;
+
 protected:
     // Pool to obtain blocks for input buffers.
     std::shared_ptr<C2BlockPool> mPool;
@@ -508,6 +513,14 @@
         mBuffers.clear();
     }
 
+    size_t numClientBuffers() const {
+        return std::count_if(
+                mBuffers.begin(), mBuffers.end(),
+                [](const Entry &entry) {
+                    return (entry.clientBuffer != nullptr);
+                });
+    }
+
 private:
     friend class BuffersArrayImpl;
 
@@ -693,6 +706,14 @@
         }
     }
 
+    size_t numClientBuffers() const {
+        return std::count_if(
+                mBuffers.begin(), mBuffers.end(),
+                [](const Entry &entry) {
+                    return entry.ownedByClient;
+                });
+    }
+
 private:
     std::string mImplName; ///< name for debugging
     const char *mName; ///< C-string version of name
@@ -756,6 +777,10 @@
         mImpl.flush();
     }
 
+    size_t numClientBuffers() const final {
+        return mImpl.numClientBuffers();
+    }
+
 private:
     BuffersArrayImpl mImpl;
 };
@@ -823,6 +848,10 @@
         return std::move(array);
     }
 
+    size_t numClientBuffers() const final {
+        return mImpl.numClientBuffers();
+    }
+
     virtual sp<Codec2Buffer> alloc(size_t size) {
         C2MemoryUsage usage = { C2MemoryUsage::CPU_READ, C2MemoryUsage::CPU_WRITE };
         std::shared_ptr<C2LinearBlock> block;
@@ -967,6 +996,10 @@
         return std::move(array);
     }
 
+    size_t numClientBuffers() const final {
+        return mImpl.numClientBuffers();
+    }
+
 private:
     FlexBuffersImpl mImpl;
     std::shared_ptr<C2AllocatorStore> mStore;
@@ -1030,6 +1063,10 @@
         return std::move(array);
     }
 
+    size_t numClientBuffers() const final {
+        return mImpl.numClientBuffers();
+    }
+
 private:
     FlexBuffersImpl mImpl;
     std::shared_ptr<LocalBufferPool> mLocalBufferPool;
@@ -1065,6 +1102,10 @@
     void getArray(Vector<sp<MediaCodecBuffer>> *array) const final {
         array->clear();
     }
+
+    size_t numClientBuffers() const final {
+        return 0u;
+    }
 };
 
 class OutputBuffersArray : public CCodecBufferChannel::OutputBuffers {
@@ -1422,90 +1463,6 @@
     count->value = -1;
 }
 
-// CCodecBufferChannel::PipelineCapacity
-
-CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
-      : input(0), component(0),
-        mName("<UNKNOWN COMPONENT>") {
-}
-
-void CCodecBufferChannel::PipelineCapacity::initialize(
-        int newInput,
-        int newComponent,
-        const char* newName,
-        const char* callerTag) {
-    input.store(newInput, std::memory_order_relaxed);
-    component.store(newComponent, std::memory_order_relaxed);
-    mName = newName;
-    ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
-          "pipeline availability initialized ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            newInput, newComponent);
-}
-
-bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
-    int prevInput = input.fetch_sub(1, std::memory_order_relaxed);
-    int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
-    if (prevInput > 0 && prevComponent > 0) {
-        ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
-              "pipeline availability -1 all ==> "
-              "input = %d, component = %d",
-                mName, callerTag ? callerTag : "*",
-                prevInput - 1,
-                prevComponent - 1);
-        return true;
-    }
-    input.fetch_add(1, std::memory_order_relaxed);
-    component.fetch_add(1, std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
-          "pipeline availability unchanged ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            prevInput,
-            prevComponent);
-    return false;
-}
-
-void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
-    int prevInput = input.fetch_add(1, std::memory_order_relaxed);
-    int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::free(): "
-          "pipeline availability +1 all ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            prevInput + 1,
-            prevComponent + 1);
-}
-
-int CCodecBufferChannel::PipelineCapacity::freeInputSlots(
-        size_t numDiscardedInputBuffers,
-        const char* callerTag) {
-    int prevInput = input.fetch_add(numDiscardedInputBuffers,
-                                    std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::freeInputSlots(%zu): "
-          "pipeline availability +%zu input ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            numDiscardedInputBuffers,
-            numDiscardedInputBuffers,
-            prevInput + static_cast<int>(numDiscardedInputBuffers),
-            component.load(std::memory_order_relaxed));
-    return prevInput + static_cast<int>(numDiscardedInputBuffers);
-}
-
-int CCodecBufferChannel::PipelineCapacity::freeComponentSlot(
-        const char* callerTag) {
-    int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
-          "pipeline availability +1 component ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            input.load(std::memory_order_relaxed),
-            prevComponent + 1);
-    return prevComponent + 1;
-}
-
 // CCodecBufferChannel::ReorderStash
 
 CCodecBufferChannel::ReorderStash::ReorderStash() {
@@ -1595,7 +1552,6 @@
       mFrameIndex(0u),
       mFirstValidFrameIndex(0u),
       mMetaMode(MODE_NONE),
-      mAvailablePipelineCapacity(),
       mInputMetEos(false) {
     Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
     buffers->reset(new DummyInputBuffers(""));
@@ -1658,6 +1614,9 @@
     work->input.ordinal.customOrdinal = timeUs;
     work->input.buffers.clear();
 
+    uint64_t queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
+    std::vector<std::shared_ptr<C2Buffer>> queuedBuffers;
+
     if (buffer->size() > 0u) {
         Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
         std::shared_ptr<C2Buffer> c2buffer;
@@ -1665,11 +1624,9 @@
             return -ENOENT;
         }
         work->input.buffers.push_back(c2buffer);
-    } else {
-        mAvailablePipelineCapacity.freeInputSlots(1, "queueInputBufferInternal");
-        if (eos) {
-            flags |= C2FrameData::FLAG_END_OF_STREAM;
-        }
+        queuedBuffers.push_back(c2buffer);
+    } else if (eos) {
+        flags |= C2FrameData::FLAG_END_OF_STREAM;
     }
     work->input.flags = (C2FrameData::flags_t)flags;
     // TODO: fill info's
@@ -1680,10 +1637,16 @@
 
     std::list<std::unique_ptr<C2Work>> items;
     items.push_back(std::move(work));
+    mPipelineWatcher.lock()->onWorkQueued(
+            queuedFrameIndex,
+            std::move(queuedBuffers),
+            PipelineWatcher::Clock::now());
     c2_status_t err = mComponent->queue(&items);
+    if (err != C2_OK) {
+        mPipelineWatcher.lock()->onWorkDone(queuedFrameIndex);
+    }
 
     if (err == C2_OK && eos && buffer->size() > 0u) {
-        mCCodecCallback->onWorkQueued(false);
         work.reset(new C2Work);
         work->input.ordinal.timestamp = timeUs;
         work->input.ordinal.frameIndex = mFrameIndex++;
@@ -1693,13 +1656,22 @@
         work->input.flags = C2FrameData::FLAG_END_OF_STREAM;
         work->worklets.emplace_back(new C2Worklet);
 
+        queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
+        queuedBuffers.clear();
+
         items.clear();
         items.push_back(std::move(work));
+
+        mPipelineWatcher.lock()->onWorkQueued(
+                queuedFrameIndex,
+                std::move(queuedBuffers),
+                PipelineWatcher::Clock::now());
         err = mComponent->queue(&items);
+        if (err != C2_OK) {
+            mPipelineWatcher.lock()->onWorkDone(queuedFrameIndex);
+        }
     }
     if (err == C2_OK) {
-        mCCodecCallback->onWorkQueued(eos);
-
         Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
         bool released = (*buffers)->releaseBuffer(buffer, nullptr, true);
         ALOGV("[%s] queueInputBuffer: buffer %sreleased", mName, released ? "" : "not ");
@@ -1846,14 +1818,16 @@
 void CCodecBufferChannel::feedInputBufferIfAvailableInternal() {
     while (!mInputMetEos &&
            !mReorderStash.lock()->hasPending() &&
-           mAvailablePipelineCapacity.allocate("feedInputBufferIfAvailable")) {
+           !mPipelineWatcher.lock()->pipelineFull()) {
         sp<MediaCodecBuffer> inBuffer;
         size_t index;
         {
             Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
+            if ((*buffers)->numClientBuffers() >= mNumInputSlots) {
+                return;
+            }
             if (!(*buffers)->requestNewBuffer(&index, &inBuffer)) {
                 ALOGV("[%s] no new buffer available", mName);
-                mAvailablePipelineCapacity.free("feedInputBufferIfAvailable");
                 break;
             }
         }
@@ -2032,15 +2006,12 @@
     {
         Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
         if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr, true)) {
-            buffers.unlock();
             released = true;
-            mAvailablePipelineCapacity.freeInputSlots(1, "discardBuffer");
         }
     }
     {
         Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
         if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr)) {
-            buffers.unlock();
             released = true;
         }
     }
@@ -2408,10 +2379,14 @@
     // about buffers from the previous generation do not interfere with the
     // newly initialized pipeline capacity.
 
-    mAvailablePipelineCapacity.initialize(
-            mNumInputSlots,
-            mNumInputSlots + mNumOutputSlots,
-            mName);
+    {
+        Mutexed<PipelineWatcher>::Locked watcher(mPipelineWatcher);
+        watcher->inputDelay(inputDelay ? inputDelay.value : 0)
+                .pipelineDelay(pipelineDelay ? pipelineDelay.value : 0)
+                .outputDelay(outputDelay ? outputDelay.value : 0)
+                .smoothnessFactor(kSmoothnessFactor);
+        watcher->flush();
+    }
 
     mInputMetEos = false;
     mSync.start();
@@ -2472,21 +2447,16 @@
                 buffer->meta()->setInt64("timeUs", 0);
                 post = false;
             }
-            if (mAvailablePipelineCapacity.allocate("requestInitialInputBuffers")) {
-                if (post) {
-                    mCallback->onInputBufferAvailable(index, buffer);
-                } else {
-                    toBeQueued.emplace_back(buffer);
-                }
+            if (post) {
+                mCallback->onInputBufferAvailable(index, buffer);
             } else {
-                ALOGD("[%s] pipeline is full while requesting %zu-th input buffer",
-                        mName, i);
+                toBeQueued.emplace_back(buffer);
             }
         }
     }
     for (const sp<MediaCodecBuffer> &buffer : toBeQueued) {
         if (queueInputBufferInternal(buffer) != OK) {
-            mAvailablePipelineCapacity.freeComponentSlot("requestInitialInputBuffers");
+            ALOGV("[%s] Error while queueing initial buffers", mName);
         }
     }
     return OK;
@@ -2532,28 +2502,25 @@
         (*buffers)->flush(flushedWork);
     }
     mReorderStash.lock()->flush();
+    mPipelineWatcher.lock()->flush();
 }
 
 void CCodecBufferChannel::onWorkDone(
         std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
-        const C2StreamInitDataInfo::output *initData,
-        size_t numDiscardedInputBuffers) {
+        const C2StreamInitDataInfo::output *initData) {
     if (handleWork(std::move(work), outputFormat, initData)) {
-        mAvailablePipelineCapacity.freeInputSlots(numDiscardedInputBuffers,
-                                                  "onWorkDone");
         feedInputBufferIfAvailable();
     }
 }
 
 void CCodecBufferChannel::onInputBufferDone(
-        const std::shared_ptr<C2Buffer>& buffer) {
+        uint64_t frameIndex, size_t arrayIndex) {
+    std::shared_ptr<C2Buffer> buffer =
+            mPipelineWatcher.lock()->onInputBufferReleased(frameIndex, arrayIndex);
     bool newInputSlotAvailable;
     {
         Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
         newInputSlotAvailable = (*buffers)->expireComponentBuffer(buffer);
-        if (newInputSlotAvailable) {
-            mAvailablePipelineCapacity.freeInputSlots(1, "onInputBufferDone");
-        }
     }
     if (newInputSlotAvailable) {
         feedInputBufferIfAvailable();
@@ -2573,7 +2540,7 @@
     if (work->worklets.size() != 1u
             || !work->worklets.front()
             || !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
-        mAvailablePipelineCapacity.freeComponentSlot("handleWork");
+        mPipelineWatcher.lock()->onWorkDone(work->input.ordinal.frameIndex.peeku());
     }
 
     if (work->result == C2_NOT_FOUND) {
@@ -2832,6 +2799,10 @@
     return OK;
 }
 
+PipelineWatcher::Clock::duration CCodecBufferChannel::elapsed() {
+    return mPipelineWatcher.lock()->elapsed(PipelineWatcher::Clock::now());
+}
+
 void CCodecBufferChannel::setMetaMode(MetaMode mode) {
     mMetaMode = mode;
 }
diff --git a/media/codec2/sfplugin/CCodecBufferChannel.h b/media/codec2/sfplugin/CCodecBufferChannel.h
index ebc1491..9dccab8 100644
--- a/media/codec2/sfplugin/CCodecBufferChannel.h
+++ b/media/codec2/sfplugin/CCodecBufferChannel.h
@@ -34,6 +34,7 @@
 #include <media/ICrypto.h>
 
 #include "InputSurfaceWrapper.h"
+#include "PipelineWatcher.h"
 
 namespace android {
 
@@ -44,7 +45,6 @@
     virtual ~CCodecCallback() = default;
     virtual void onError(status_t err, enum ActionCode actionCode) = 0;
     virtual void onOutputFramesRendered(int64_t mediaTimeUs, nsecs_t renderTimeNs) = 0;
-    virtual void onWorkQueued(bool eos) = 0;
     virtual void onOutputBuffersChanged() = 0;
 };
 
@@ -128,22 +128,21 @@
      * @param workItems   finished work item.
      * @param outputFormat new output format if it has changed, otherwise nullptr
      * @param initData    new init data (CSD) if it has changed, otherwise nullptr
-     * @param numDiscardedInputBuffers the number of input buffers that are
-     *                    returned for the first time (not previously returned by
-     *                    onInputBufferDone()).
      */
     void onWorkDone(
             std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
-            const C2StreamInitDataInfo::output *initData,
-            size_t numDiscardedInputBuffers);
+            const C2StreamInitDataInfo::output *initData);
 
     /**
      * Make an input buffer available for the client as it is no longer needed
      * by the codec.
      *
-     * @param buffer The buffer that becomes unused.
+     * @param frameIndex The index of input work
+     * @param arrayIndex The index of buffer in the input work buffers.
      */
-    void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
+    void onInputBufferDone(uint64_t frameIndex, size_t arrayIndex);
+
+    PipelineWatcher::Clock::duration elapsed();
 
     enum MetaMode {
         MODE_NONE,
@@ -266,79 +265,7 @@
 
     MetaMode mMetaMode;
 
-    // PipelineCapacity is used in the input buffer gating logic.
-    //
-    // There are three criteria that need to be met before
-    // onInputBufferAvailable() is called:
-    // 1. The number of input buffers that have been received by
-    //    CCodecBufferChannel but not returned via onWorkDone() or
-    //    onInputBufferDone() does not exceed a certain limit. (Let us call this
-    //    number the "input" capacity.)
-    // 2. The number of work items that have been received by
-    //    CCodecBufferChannel whose outputs have not been returned from the
-    //    component (by calling onWorkDone()) does not exceed a certain limit.
-    //    (Let us call this the "component" capacity.)
-    //
-    // These three criteria guarantee that a new input buffer that arrives from
-    // the invocation of onInputBufferAvailable() will not
-    // 1. overload CCodecBufferChannel's input buffers;
-    // 2. overload the component; or
-    //
-    struct PipelineCapacity {
-        // The number of available input capacity.
-        std::atomic_int input;
-        // The number of available component capacity.
-        std::atomic_int component;
-
-        PipelineCapacity();
-        // Set the values of #input and #component.
-        void initialize(int newInput, int newComponent,
-                        const char* newName = "<UNKNOWN COMPONENT>",
-                        const char* callerTag = nullptr);
-
-        // Return true and decrease #input and #component by one if
-        // they are all greater than zero; return false otherwise.
-        //
-        // callerTag is used for logging only.
-        //
-        // allocate() is called by CCodecBufferChannel to check whether it can
-        // receive another input buffer. If the return value is true,
-        // onInputBufferAvailable() and onOutputBufferAvailable() can be called
-        // afterwards.
-        bool allocate(const char* callerTag = nullptr);
-
-        // Increase #input and #component by one.
-        //
-        // callerTag is used for logging only.
-        //
-        // free() is called by CCodecBufferChannel after allocate() returns true
-        // but onInputBufferAvailable() cannot be called for any reasons. It
-        // essentially undoes an allocate() call.
-        void free(const char* callerTag = nullptr);
-
-        // Increase #input by @p numDiscardedInputBuffers.
-        //
-        // callerTag is used for logging only.
-        //
-        // freeInputSlots() is called by CCodecBufferChannel when onWorkDone()
-        // or onInputBufferDone() is called. @p numDiscardedInputBuffers is
-        // provided in onWorkDone(), and is 1 in onInputBufferDone().
-        int freeInputSlots(size_t numDiscardedInputBuffers,
-                           const char* callerTag = nullptr);
-
-        // Increase #component by one and return the updated value.
-        //
-        // callerTag is used for logging only.
-        //
-        // freeComponentSlot() is called by CCodecBufferChannel when
-        // onWorkDone() is called.
-        int freeComponentSlot(const char* callerTag = nullptr);
-
-    private:
-        // Component name. Used for logging.
-        const char* mName;
-    };
-    PipelineCapacity mAvailablePipelineCapacity;
+    Mutexed<PipelineWatcher> mPipelineWatcher;
 
     class ReorderStash {
     public:
diff --git a/media/codec2/sfplugin/PipelineWatcher.cpp b/media/codec2/sfplugin/PipelineWatcher.cpp
new file mode 100644
index 0000000..fe0a2c8
--- /dev/null
+++ b/media/codec2/sfplugin/PipelineWatcher.cpp
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NDEBUG 0
+#define LOG_TAG "PipelineWatcher"
+
+#include <numeric>
+
+#include <log/log.h>
+
+#include "PipelineWatcher.h"
+
+namespace android {
+
+PipelineWatcher &PipelineWatcher::inputDelay(uint32_t value) {
+    mInputDelay = value;
+    return *this;
+}
+
+PipelineWatcher &PipelineWatcher::pipelineDelay(uint32_t value) {
+    mPipelineDelay = value;
+    return *this;
+}
+
+PipelineWatcher &PipelineWatcher::outputDelay(uint32_t value) {
+    mOutputDelay = value;
+    return *this;
+}
+
+PipelineWatcher &PipelineWatcher::smoothnessFactor(uint32_t value) {
+    mSmoothnessFactor = value;
+    return *this;
+}
+
+void PipelineWatcher::onWorkQueued(
+        uint64_t frameIndex,
+        std::vector<std::shared_ptr<C2Buffer>> &&buffers,
+        const Clock::time_point &queuedAt) {
+    ALOGV("onWorkQueued(frameIndex=%llu, buffers(size=%zu), queuedAt=%lld)",
+          (unsigned long long)frameIndex,
+          buffers.size(),
+          (long long)queuedAt.time_since_epoch().count());
+    auto it = mFramesInPipeline.find(frameIndex);
+    if (it != mFramesInPipeline.end()) {
+        ALOGD("onWorkQueued: Duplicate frame index (%llu); previous entry removed",
+              (unsigned long long)frameIndex);
+        (void)mFramesInPipeline.erase(it);
+    }
+    (void)mFramesInPipeline.try_emplace(frameIndex, std::move(buffers), queuedAt);
+}
+
+std::shared_ptr<C2Buffer> PipelineWatcher::onInputBufferReleased(
+        uint64_t frameIndex, size_t arrayIndex) {
+    ALOGV("onInputBufferReleased(frameIndex=%llu, arrayIndex=%zu)",
+          (unsigned long long)frameIndex, arrayIndex);
+    auto it = mFramesInPipeline.find(frameIndex);
+    if (it == mFramesInPipeline.end()) {
+        ALOGD("onInputBufferReleased: frameIndex not found (%llu); ignored",
+              (unsigned long long)frameIndex);
+        return nullptr;
+    }
+    if (it->second.buffers.size() <= arrayIndex) {
+        ALOGD("onInputBufferReleased: buffers at %llu: size %zu, requested index: %zu",
+              (unsigned long long)frameIndex, it->second.buffers.size(), arrayIndex);
+        return nullptr;
+    }
+    std::shared_ptr<C2Buffer> buffer(std::move(it->second.buffers[arrayIndex]));
+    ALOGD_IF(!buffer, "onInputBufferReleased: buffer already released (%llu:%zu)",
+             (unsigned long long)frameIndex, arrayIndex);
+    return buffer;
+}
+
+void PipelineWatcher::onWorkDone(uint64_t frameIndex) {
+    ALOGV("onWorkDone(frameIndex=%llu)", (unsigned long long)frameIndex);
+    auto it = mFramesInPipeline.find(frameIndex);
+    if (it == mFramesInPipeline.end()) {
+        ALOGD("onWorkDone: frameIndex not found (%llu); ignored",
+              (unsigned long long)frameIndex);
+        return;
+    }
+    (void)mFramesInPipeline.erase(it);
+}
+
+void PipelineWatcher::flush() {
+    mFramesInPipeline.clear();
+}
+
+bool PipelineWatcher::pipelineFull() const {
+    if (mFramesInPipeline.size() >=
+            mInputDelay + mPipelineDelay + mOutputDelay + mSmoothnessFactor) {
+        ALOGV("pipelineFull: too many frames in pipeline (%zu)", mFramesInPipeline.size());
+        return true;
+    }
+    size_t sizeWithInputReleased = std::count_if(
+            mFramesInPipeline.begin(),
+            mFramesInPipeline.end(),
+            [](const decltype(mFramesInPipeline)::value_type &value) {
+                for (const std::shared_ptr<C2Buffer> &buffer : value.second.buffers) {
+                    if (buffer) {
+                        return false;
+                    }
+                }
+                return true;
+            });
+    if (sizeWithInputReleased >=
+            mPipelineDelay + mOutputDelay + mSmoothnessFactor) {
+        ALOGV("pipelineFull: too many frames in pipeline, with input released (%zu)",
+              sizeWithInputReleased);
+        return true;
+    }
+    ALOGV("pipeline has room (total: %zu, input released: %zu)",
+          mFramesInPipeline.size(), sizeWithInputReleased);
+    return false;
+}
+
+PipelineWatcher::Clock::duration PipelineWatcher::elapsed(
+        const PipelineWatcher::Clock::time_point &now) const {
+    return std::accumulate(
+            mFramesInPipeline.begin(),
+            mFramesInPipeline.end(),
+            Clock::duration::zero(),
+            [&now](const Clock::duration &current,
+                   const decltype(mFramesInPipeline)::value_type &value) {
+                Clock::duration elapsed = now - value.second.queuedAt;
+                ALOGV("elapsed: frameIndex = %llu elapsed = %lldms",
+                      (unsigned long long)value.first,
+                      std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
+                return current > elapsed ? current : elapsed;
+            });
+}
+
+}  // namespace android
diff --git a/media/codec2/sfplugin/PipelineWatcher.h b/media/codec2/sfplugin/PipelineWatcher.h
new file mode 100644
index 0000000..ce82298
--- /dev/null
+++ b/media/codec2/sfplugin/PipelineWatcher.h
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PIPELINE_WATCHER_H_
+#define PIPELINE_WATCHER_H_
+
+#include <chrono>
+#include <map>
+#include <memory>
+
+#include <C2Work.h>
+
+namespace android {
+
+/**
+ * PipelineWatcher watches the status of the work.
+ */
+class PipelineWatcher {
+public:
+    typedef std::chrono::steady_clock Clock;
+
+    PipelineWatcher()
+        : mInputDelay(0),
+          mPipelineDelay(0),
+          mOutputDelay(0),
+          mSmoothnessFactor(0) {}
+    ~PipelineWatcher() = default;
+
+    PipelineWatcher &inputDelay(uint32_t value);
+    PipelineWatcher &pipelineDelay(uint32_t value);
+    PipelineWatcher &outputDelay(uint32_t value);
+    PipelineWatcher &smoothnessFactor(uint32_t value);
+
+    void onWorkQueued(
+            uint64_t frameIndex,
+            std::vector<std::shared_ptr<C2Buffer>> &&buffers,
+            const Clock::time_point &queuedAt);
+    std::shared_ptr<C2Buffer> onInputBufferReleased(
+            uint64_t frameIndex, size_t arrayIndex);
+    void onWorkDone(uint64_t frameIndex);
+    void flush();
+
+    bool pipelineFull() const;
+    Clock::duration elapsed(const Clock::time_point &now) const;
+
+private:
+    uint32_t mInputDelay;
+    uint32_t mPipelineDelay;
+    uint32_t mOutputDelay;
+    uint32_t mSmoothnessFactor;
+
+    struct Frame {
+        Frame(std::vector<std::shared_ptr<C2Buffer>> &&b,
+              const Clock::time_point &q)
+            : buffers(b),
+              queuedAt(q) {}
+        std::vector<std::shared_ptr<C2Buffer>> buffers;
+        const Clock::time_point queuedAt;
+    };
+    std::map<uint64_t, Frame> mFramesInPipeline;
+};
+
+}  // namespace android
+
+#endif  // PIPELINE_WATCHER_H_