CCodec: refactor pipeline logic

Bug: 123632127
Test: bug repro steps
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:size:small
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:include-annotation:android.media.cts.MediaHeavyPresubmitTests
Change-Id: I289f51709dbd675991cd8949cd343c5bf5c6ef5c
diff --git a/media/codec2/components/aac/C2SoftAacDec.cpp b/media/codec2/components/aac/C2SoftAacDec.cpp
index c7c8442..04dda8f 100644
--- a/media/codec2/components/aac/C2SoftAacDec.cpp
+++ b/media/codec2/components/aac/C2SoftAacDec.cpp
@@ -52,33 +52,26 @@
 
 namespace android {
 
-class C2SoftAacDec::IntfImpl : public C2InterfaceHelper {
+constexpr char COMPONENT_NAME[] = "c2.android.aac.decoder";
+
+class C2SoftAacDec::IntfImpl : public SimpleInterface<void>::BaseParams {
 public:
     explicit IntfImpl(const std::shared_ptr<C2ReflectorHelper> &helper)
-        : C2InterfaceHelper(helper) {
-
-        setDerivedInstance(this);
+        : SimpleInterface<void>::BaseParams(
+                helper,
+                COMPONENT_NAME,
+                C2Component::KIND_DECODER,
+                C2Component::DOMAIN_AUDIO,
+                MEDIA_MIMETYPE_AUDIO_AAC) {
+        noPrivateBuffers();
+        noInputReferences();
+        noOutputReferences();
+        noInputLatency();
+        noTimeStretch();
 
         addParameter(
-                DefineParam(mInputFormat, C2_NAME_INPUT_STREAM_FORMAT_SETTING)
-                .withConstValue(new C2StreamFormatConfig::input(0u, C2FormatCompressed))
-                .build());
-
-        addParameter(
-                DefineParam(mOutputFormat, C2_NAME_OUTPUT_STREAM_FORMAT_SETTING)
-                .withConstValue(new C2StreamFormatConfig::output(0u, C2FormatAudio))
-                .build());
-
-        addParameter(
-                DefineParam(mInputMediaType, C2_NAME_INPUT_PORT_MIME_SETTING)
-                .withConstValue(AllocSharedString<C2PortMimeConfig::input>(
-                        MEDIA_MIMETYPE_AUDIO_AAC))
-                .build());
-
-        addParameter(
-                DefineParam(mOutputMediaType, C2_NAME_OUTPUT_PORT_MIME_SETTING)
-                .withConstValue(AllocSharedString<C2PortMimeConfig::output>(
-                        MEDIA_MIMETYPE_AUDIO_RAW))
+                DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
+                .withConstValue(new C2PortActualDelayTuning::output(2u))
                 .build());
 
         addParameter(
@@ -231,8 +224,6 @@
     // TODO Add : C2StreamAacSbrModeTuning
 };
 
-constexpr char COMPONENT_NAME[] = "c2.android.aac.decoder";
-
 C2SoftAacDec::C2SoftAacDec(
         const char *name,
         c2_node_id_t id,
diff --git a/media/codec2/components/avc/C2SoftAvcDec.cpp b/media/codec2/components/avc/C2SoftAvcDec.cpp
index 3e62744..86cd3d8 100644
--- a/media/codec2/components/avc/C2SoftAvcDec.cpp
+++ b/media/codec2/components/avc/C2SoftAvcDec.cpp
@@ -51,6 +51,12 @@
         noInputLatency();
         noTimeStretch();
 
+        // TODO: Proper support for reorder depth.
+        addParameter(
+                DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
+                .withConstValue(new C2PortActualDelayTuning::output(8u))
+                .build());
+
         // TODO: output latency and reordering
 
         addParameter(
@@ -877,6 +883,8 @@
     } else if (!hasPicture) {
         fillEmptyWork(work);
     }
+
+    work->input.buffers.clear();
 }
 
 c2_status_t C2SoftAvcDec::drainInternal(
diff --git a/media/codec2/components/hevc/C2SoftHevcDec.cpp b/media/codec2/components/hevc/C2SoftHevcDec.cpp
index 99892ce..f0d7d88 100644
--- a/media/codec2/components/hevc/C2SoftHevcDec.cpp
+++ b/media/codec2/components/hevc/C2SoftHevcDec.cpp
@@ -51,7 +51,11 @@
         noInputLatency();
         noTimeStretch();
 
-        // TODO: output latency and reordering
+        // TODO: Proper support for reorder depth.
+        addParameter(
+                DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
+                .withConstValue(new C2PortActualDelayTuning::output(8u))
+                .build());
 
         addParameter(
                 DefineParam(mAttrib, C2_PARAMKEY_COMPONENT_ATTRIBUTES)
diff --git a/media/codec2/components/mpeg4_h263/C2SoftMpeg4Dec.cpp b/media/codec2/components/mpeg4_h263/C2SoftMpeg4Dec.cpp
index 901f5ed..0b89cff 100644
--- a/media/codec2/components/mpeg4_h263/C2SoftMpeg4Dec.cpp
+++ b/media/codec2/components/mpeg4_h263/C2SoftMpeg4Dec.cpp
@@ -60,7 +60,11 @@
         noInputLatency();
         noTimeStretch();
 
-        // TODO: output latency and reordering
+        // TODO: Proper support for reorder depth.
+        addParameter(
+                DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
+                .withConstValue(new C2PortActualDelayTuning::output(1u))
+                .build());
 
         addParameter(
                 DefineParam(mAttrib, C2_PARAMKEY_COMPONENT_ATTRIBUTES)
diff --git a/media/codec2/hidl/1.0/vts/functional/common/media_c2_hidl_test_common.h b/media/codec2/hidl/1.0/vts/functional/common/media_c2_hidl_test_common.h
index d1557cb..fca2902 100644
--- a/media/codec2/hidl/1.0/vts/functional/common/media_c2_hidl_test_common.h
+++ b/media/codec2/hidl/1.0/vts/functional/common/media_c2_hidl_test_common.h
@@ -55,12 +55,10 @@
         : callBack(fn) {}
     virtual void onWorkDone(
         const std::weak_ptr<android::Codec2Client::Component>& comp,
-        std::list<std::unique_ptr<C2Work>>& workItems,
-        size_t numDiscardedInputBuffers) override {
+        std::list<std::unique_ptr<C2Work>>& workItems) override {
         /* TODO */
         ALOGD("onWorkDone called");
         (void)comp;
-        (void)numDiscardedInputBuffers;
         if (callBack) callBack(workItems);
     }
 
@@ -89,9 +87,10 @@
     }
 
     virtual void onInputBufferDone(
-        const std::shared_ptr<C2Buffer>& buffer) override {
+        uint64_t frameIndex, size_t arrayIndex) override {
         /* TODO */
-        (void)buffer;
+        (void)frameIndex;
+        (void)arrayIndex;
     }
 
     virtual void onFrameRendered(
diff --git a/media/codec2/hidl/client/client.cpp b/media/codec2/hidl/client/client.cpp
index 5b52fcd..e02c2ca 100644
--- a/media/codec2/hidl/client/client.cpp
+++ b/media/codec2/hidl/client/client.cpp
@@ -344,17 +344,13 @@
             return Void();
         }
         // release input buffers potentially held by the component from queue
-        size_t numDiscardedInputBuffers = 0;
         std::shared_ptr<Codec2Client::Component> strongComponent =
                 component.lock();
         if (strongComponent) {
-            numDiscardedInputBuffers =
-                    strongComponent->handleOnWorkDone(workItems);
+            strongComponent->handleOnWorkDone(workItems);
         }
         if (std::shared_ptr<Codec2Client::Listener> listener = base.lock()) {
-            listener->onWorkDone(component,
-                                 workItems,
-                                 numDiscardedInputBuffers);
+            listener->onWorkDone(component, workItems);
         } else {
             LOG(DEBUG) << "onWorkDone -- listener died.";
         }
@@ -418,26 +414,15 @@
             LOG(DEBUG) << "onInputBuffersReleased -- listener died.";
             return Void();
         }
-        std::shared_ptr<Codec2Client::Component> strongComponent =
-                component.lock();
-        if (!strongComponent) {
-            LOG(DEBUG) << "onInputBuffersReleased -- component died.";
-            return Void();
-        }
         for (const InputBuffer& inputBuffer : inputBuffers) {
-            std::shared_ptr<C2Buffer> buffer =
-                    strongComponent->freeInputBuffer(
-                        inputBuffer.frameIndex,
-                        inputBuffer.arrayIndex);
             LOG(VERBOSE) << "onInputBuffersReleased --"
                             " received death notification of"
                             " input buffer:"
                             " frameIndex = " << inputBuffer.frameIndex
                          << ", bufferIndex = " << inputBuffer.arrayIndex
                          << ".";
-            if (buffer) {
-                listener->onInputBufferDone(buffer);
-            }
+            listener->onInputBufferDone(
+                    inputBuffer.frameIndex, inputBuffer.arrayIndex);
         }
         return Void();
     }
@@ -918,43 +903,8 @@
     return static_cast<c2_status_t>(static_cast<Status>(transResult));
 }
 
-size_t Codec2Client::Component::handleOnWorkDone(
+void Codec2Client::Component::handleOnWorkDone(
         const std::list<std::unique_ptr<C2Work>> &workItems) {
-    // Input buffers' lifetime management
-    std::vector<uint64_t> inputDone;
-    for (const std::unique_ptr<C2Work> &work : workItems) {
-        if (work) {
-            if (work->worklets.empty()
-                    || !work->worklets.back()
-                    || (work->worklets.back()->output.flags &
-                        C2FrameData::FLAG_INCOMPLETE) == 0) {
-                // input is complete
-                inputDone.emplace_back(work->input.ordinal.frameIndex.peeku());
-            }
-        }
-    }
-
-    size_t numDiscardedInputBuffers = 0;
-    {
-        std::lock_guard<std::mutex> lock(mInputBuffersMutex);
-        for (uint64_t inputIndex : inputDone) {
-            auto it = mInputBuffers.find(inputIndex);
-            if (it == mInputBuffers.end()) {
-                LOG(VERBOSE) << "onWorkDone -- returned consumed/unknown "
-                                "input frame: index = "
-                             << inputIndex << ".";
-            } else {
-                LOG(VERBOSE) << "onWorkDone -- processed input frame: "
-                             << inputIndex
-                             << " (containing " << it->second.size()
-                                 << " buffers).";
-                mInputBuffers.erase(it);
-                mInputBufferCount.erase(inputIndex);
-                ++numDiscardedInputBuffers;
-            }
-        }
-    }
-
     // Output bufferqueue-based blocks' lifetime management
     mOutputBufferQueueMutex.lock();
     sp<IGraphicBufferProducer> igbp = mOutputIgbp;
@@ -965,72 +915,10 @@
     if (igbp) {
         holdBufferQueueBlocks(workItems, igbp, bqId, generation);
     }
-    return numDiscardedInputBuffers;
-}
-
-std::shared_ptr<C2Buffer> Codec2Client::Component::freeInputBuffer(
-        uint64_t frameIndex,
-        size_t bufferIndex) {
-    std::shared_ptr<C2Buffer> buffer;
-    std::lock_guard<std::mutex> lock(mInputBuffersMutex);
-    auto it = mInputBuffers.find(frameIndex);
-    if (it == mInputBuffers.end()) {
-        LOG(INFO) << "freeInputBuffer -- Unrecognized input frame index "
-                  << frameIndex << ".";
-        return nullptr;
-    }
-    if (bufferIndex >= it->second.size()) {
-        LOG(INFO) << "freeInputBuffer -- Input buffer number " << bufferIndex
-                  << " is not valid in input with frame index " << frameIndex
-                  << ".";
-        return nullptr;
-    }
-    buffer = it->second[bufferIndex];
-    if (!buffer) {
-        LOG(INFO) << "freeInputBuffer -- Input buffer number " << bufferIndex
-                  << " in input with frame index " << frameIndex
-                  << " has already been freed.";
-        return nullptr;
-    }
-    it->second[bufferIndex] = nullptr;
-    if (--mInputBufferCount[frameIndex] == 0) {
-        mInputBuffers.erase(it);
-        mInputBufferCount.erase(frameIndex);
-    }
-    return buffer;
 }
 
 c2_status_t Codec2Client::Component::queue(
         std::list<std::unique_ptr<C2Work>>* const items) {
-    // remember input buffers queued to hold reference to them
-    {
-        std::lock_guard<std::mutex> lock(mInputBuffersMutex);
-        for (const std::unique_ptr<C2Work> &work : *items) {
-            if (!work) {
-                continue;
-            }
-            if (work->input.buffers.size() == 0) {
-                continue;
-            }
-
-            uint64_t inputIndex = work->input.ordinal.frameIndex.peeku();
-            auto res = mInputBuffers.emplace(inputIndex, work->input.buffers);
-            if (!res.second) {
-                // TODO: append? - for now we are replacing
-                res.first->second = work->input.buffers;
-                LOG(INFO) << "queue -- duplicate input frame index: "
-                          << inputIndex
-                          << ". Discarding the old input frame...";
-            }
-            mInputBufferCount[inputIndex] = work->input.buffers.size();
-            LOG(VERBOSE) << "queue -- queuing input frame: "
-                         << "index = " << inputIndex
-                         << ", number of buffers = "
-                             << work->input.buffers.size()
-                         << ".";
-        }
-    }
-
     WorkBundle workBundle;
     if (!objcpy(&workBundle, *items, &mBufferPoolSender)) {
         LOG(ERROR) << "queue -- bad input.";
@@ -1088,24 +976,6 @@
         }
     }
 
-    // Input buffers' lifetime management
-    for (uint64_t flushedIndex : flushedIndices) {
-        std::lock_guard<std::mutex> lock(mInputBuffersMutex);
-        auto it = mInputBuffers.find(flushedIndex);
-        if (it == mInputBuffers.end()) {
-            LOG(VERBOSE) << "flush -- returned consumed/unknown input frame: "
-                            "index = " << flushedIndex << ".";
-        } else {
-            LOG(VERBOSE) << "flush -- returned unprocessed input frame: "
-                            "index = " << flushedIndex
-                         << ", number of buffers = "
-                             << mInputBufferCount[flushedIndex]
-                         << ".";
-            mInputBuffers.erase(it);
-            mInputBufferCount.erase(flushedIndex);
-        }
-    }
-
     // Output bufferqueue-based blocks' lifetime management
     mOutputBufferQueueMutex.lock();
     sp<IGraphicBufferProducer> igbp = mOutputIgbp;
@@ -1160,10 +1030,6 @@
     if (status != C2_OK) {
         LOG(DEBUG) << "stop -- call failed: " << status << ".";
     }
-    mInputBuffersMutex.lock();
-    mInputBuffers.clear();
-    mInputBufferCount.clear();
-    mInputBuffersMutex.unlock();
     return status;
 }
 
@@ -1178,10 +1044,6 @@
     if (status != C2_OK) {
         LOG(DEBUG) << "reset -- call failed: " << status << ".";
     }
-    mInputBuffersMutex.lock();
-    mInputBuffers.clear();
-    mInputBufferCount.clear();
-    mInputBuffersMutex.unlock();
     return status;
 }
 
@@ -1196,10 +1058,6 @@
     if (status != C2_OK) {
         LOG(DEBUG) << "release -- call failed: " << status << ".";
     }
-    mInputBuffersMutex.lock();
-    mInputBuffers.clear();
-    mInputBufferCount.clear();
-    mInputBuffersMutex.unlock();
     return status;
 }
 
diff --git a/media/codec2/hidl/client/include/codec2/hidl/client.h b/media/codec2/hidl/client/include/codec2/hidl/client.h
index f320ef3..5b3afca 100644
--- a/media/codec2/hidl/client/include/codec2/hidl/client.h
+++ b/media/codec2/hidl/client/include/codec2/hidl/client.h
@@ -252,16 +252,9 @@
 struct Codec2Client::Listener {
 
     // This is called when the component produces some output.
-    //
-    // numDiscardedInputBuffers is the number of input buffers contained in
-    // workItems that have just become unused. Note that workItems may contain
-    // more input buffers than numDiscardedInputBuffers because buffers that
-    // have been previously reported by onInputBufferDone() are not counted
-    // towards numDiscardedInputBuffers, but may still show up in workItems.
     virtual void onWorkDone(
             const std::weak_ptr<Component>& comp,
-            std::list<std::unique_ptr<C2Work>>& workItems,
-            size_t numDiscardedInputBuffers) = 0;
+            std::list<std::unique_ptr<C2Work>>& workItems) = 0;
 
     // This is called when the component goes into a tripped state.
     virtual void onTripped(
@@ -283,7 +276,7 @@
     // Input buffers that have been returned by onWorkDone() or flush() will not
     // trigger a call to this function.
     virtual void onInputBufferDone(
-            const std::shared_ptr<C2Buffer>& buffer) = 0;
+            uint64_t frameIndex, size_t arrayIndex) = 0;
 
     // This is called when the component becomes aware of a frame being
     // rendered.
@@ -385,24 +378,6 @@
 protected:
     sp<Base> mBase;
 
-    // Mutex for mInputBuffers and mInputBufferCount.
-    mutable std::mutex mInputBuffersMutex;
-
-    // Map: frameIndex -> vector of bufferIndices
-    //
-    // mInputBuffers[frameIndex][bufferIndex] may be null if the buffer in that
-    // slot has been freed.
-    mutable std::map<uint64_t, std::vector<std::shared_ptr<C2Buffer>>>
-            mInputBuffers;
-
-    // Map: frameIndex -> number of bufferIndices that have not been freed
-    //
-    // mInputBufferCount[frameIndex] keeps track of the number of non-null
-    // elements in mInputBuffers[frameIndex]. When mInputBufferCount[frameIndex]
-    // decreases to 0, frameIndex can be removed from both mInputBuffers and
-    // mInputBufferCount.
-    mutable std::map<uint64_t, size_t> mInputBufferCount;
-
     ::android::hardware::media::c2::V1_0::utils::DefaultBufferPoolSender
             mBufferPoolSender;
 
@@ -419,10 +394,7 @@
     friend struct Codec2Client;
 
     struct HidlListener;
-    // Return the number of input buffers that should be discarded.
-    size_t handleOnWorkDone(const std::list<std::unique_ptr<C2Work>> &workItems);
-    // Remove an input buffer from mInputBuffers and return it.
-    std::shared_ptr<C2Buffer> freeInputBuffer(uint64_t frameIndex, size_t bufferIndex);
+    void handleOnWorkDone(const std::list<std::unique_ptr<C2Work>> &workItems);
 
 };
 
diff --git a/media/codec2/sfplugin/Android.bp b/media/codec2/sfplugin/Android.bp
index 2870d39..a212651 100644
--- a/media/codec2/sfplugin/Android.bp
+++ b/media/codec2/sfplugin/Android.bp
@@ -8,6 +8,7 @@
         "CCodecConfig.cpp",
         "Codec2Buffer.cpp",
         "Codec2InfoBuilder.cpp",
+        "PipelineWatcher.cpp",
         "ReflectedParamUpdater.cpp",
         "SkipCutBuffer.cpp",
     ],
diff --git a/media/codec2/sfplugin/CCodec.cpp b/media/codec2/sfplugin/CCodec.cpp
index 10263de..ed1f85b 100644
--- a/media/codec2/sfplugin/CCodec.cpp
+++ b/media/codec2/sfplugin/CCodec.cpp
@@ -448,14 +448,13 @@
 
     virtual void onWorkDone(
             const std::weak_ptr<Codec2Client::Component>& component,
-            std::list<std::unique_ptr<C2Work>>& workItems,
-            size_t numDiscardedInputBuffers) override {
+            std::list<std::unique_ptr<C2Work>>& workItems) override {
         (void)component;
         sp<CCodec> codec(mCodec.promote());
         if (!codec) {
             return;
         }
-        codec->onWorkDone(workItems, numDiscardedInputBuffers);
+        codec->onWorkDone(workItems);
     }
 
     virtual void onTripped(
@@ -504,10 +503,10 @@
     }
 
     virtual void onInputBufferDone(
-            const std::shared_ptr<C2Buffer>& buffer) override {
+            uint64_t frameIndex, size_t arrayIndex) override {
         sp<CCodec> codec(mCodec.promote());
         if (codec) {
-            codec->onInputBufferDone(buffer);
+            codec->onInputBufferDone(frameIndex, arrayIndex);
         }
     }
 
@@ -531,10 +530,6 @@
                 {RenderedFrameInfo(mediaTimeUs, renderTimeNs)});
     }
 
-    void onWorkQueued(bool eos) override {
-        mCodec->onWorkQueued(eos);
-    }
-
     void onOutputBuffersChanged() override {
         mCodec->mCallback->onOutputBuffersChanged();
     }
@@ -546,8 +541,7 @@
 // CCodec
 
 CCodec::CCodec()
-    : mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))),
-      mQueuedWorkCount(0) {
+    : mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))) {
 }
 
 CCodec::~CCodec() {
@@ -1343,7 +1337,6 @@
     }
 
     mChannel->flush(flushedWork);
-    subQueuedWorkCount(flushedWork.size());
 
     {
         Mutexed<State>::Locked state(mState);
@@ -1465,28 +1458,16 @@
     config->setParameters(comp, params, C2_MAY_BLOCK);
 }
 
-void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
-                        size_t numDiscardedInputBuffers) {
+void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems) {
     if (!workItems.empty()) {
-        {
-            Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
-                    mNumDiscardedInputBuffersQueue);
-            numDiscardedInputBuffersQueue->insert(
-                    numDiscardedInputBuffersQueue->end(),
-                    workItems.size() - 1, 0);
-            numDiscardedInputBuffersQueue->emplace_back(
-                    numDiscardedInputBuffers);
-        }
-        {
-            Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
-            queue->splice(queue->end(), workItems);
-        }
+        Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
+        queue->splice(queue->end(), workItems);
     }
     (new AMessage(kWhatWorkDone, this))->post();
 }
 
-void CCodec::onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer) {
-    mChannel->onInputBufferDone(buffer);
+void CCodec::onInputBufferDone(uint64_t frameIndex, size_t arrayIndex) {
+    mChannel->onInputBufferDone(frameIndex, arrayIndex);
 }
 
 void CCodec::onMessageReceived(const sp<AMessage> &msg) {
@@ -1512,7 +1493,6 @@
         case kWhatStart: {
             // C2Component::start() should return within 500ms.
             setDeadline(now, 550ms, "start");
-            mQueuedWorkCount = 0;
             start();
             break;
         }
@@ -1520,10 +1500,6 @@
             // C2Component::stop() should return within 500ms.
             setDeadline(now, 550ms, "stop");
             stop();
-
-            mQueuedWorkCount = 0;
-            Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-            deadline->set(TimePoint::max(), "none");
             break;
         }
         case kWhatFlush: {
@@ -1549,7 +1525,6 @@
         }
         case kWhatWorkDone: {
             std::unique_ptr<C2Work> work;
-            size_t numDiscardedInputBuffers;
             bool shouldPost = false;
             {
                 Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
@@ -1560,24 +1535,10 @@
                 queue->pop_front();
                 shouldPost = !queue->empty();
             }
-            {
-                Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
-                        mNumDiscardedInputBuffersQueue);
-                if (numDiscardedInputBuffersQueue->empty()) {
-                    numDiscardedInputBuffers = 0;
-                } else {
-                    numDiscardedInputBuffers = numDiscardedInputBuffersQueue->front();
-                    numDiscardedInputBuffersQueue->pop_front();
-                }
-            }
             if (shouldPost) {
                 (new AMessage(kWhatWorkDone, this))->post();
             }
 
-            if (work->worklets.empty()
-                    || !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
-                subQueuedWorkCount(1);
-            }
             // handle configuration changes in work done
             Mutexed<Config>::Locked config(mConfig);
             bool changed = false;
@@ -1641,8 +1602,7 @@
             }
             mChannel->onWorkDone(
                     std::move(work), changed ? config->mOutputFormat : nullptr,
-                    initData.hasChanged() ? initData.update().get() : nullptr,
-                    numDiscardedInputBuffers);
+                    initData.hasChanged() ? initData.update().get() : nullptr);
             break;
         }
         case kWhatWatch: {
@@ -1669,17 +1629,26 @@
 void CCodec::initiateReleaseIfStuck() {
     std::string name;
     bool pendingDeadline = false;
-    for (Mutexed<NamedTimePoint> *deadlinePtr : { &mDeadline, &mQueueDeadline, &mEosDeadline }) {
-        Mutexed<NamedTimePoint>::Locked deadline(*deadlinePtr);
+    {
+        Mutexed<NamedTimePoint>::Locked deadline(mDeadline);
         if (deadline->get() < std::chrono::steady_clock::now()) {
             name = deadline->getName();
-            break;
         }
         if (deadline->get() != TimePoint::max()) {
             pendingDeadline = true;
         }
     }
     if (name.empty()) {
+        constexpr std::chrono::steady_clock::duration kWorkDurationThreshold = 3s;
+        std::chrono::steady_clock::duration elapsed = mChannel->elapsed();
+        if (elapsed >= kWorkDurationThreshold) {
+            name = "queue";
+        }
+        if (elapsed > 0s) {
+            pendingDeadline = true;
+        }
+    }
+    if (name.empty()) {
         // We're not stuck.
         if (pendingDeadline) {
             // If we are not stuck yet but still has deadline coming up,
@@ -1694,33 +1663,6 @@
     mCallback->onError(UNKNOWN_ERROR, ACTION_CODE_FATAL);
 }
 
-void CCodec::onWorkQueued(bool eos) {
-    ALOGV("queued work count +1 from %d", mQueuedWorkCount.load());
-    int32_t count = ++mQueuedWorkCount;
-    if (eos) {
-        CCodecWatchdog::getInstance()->watch(this);
-        Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
-        deadline->set(std::chrono::steady_clock::now() + 3s, "eos");
-    }
-    // TODO: query and use input/pipeline/output delay combined
-    if (count >= 4) {
-        CCodecWatchdog::getInstance()->watch(this);
-        Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-        deadline->set(std::chrono::steady_clock::now() + 3s, "queue");
-    }
-}
-
-void CCodec::subQueuedWorkCount(uint32_t count) {
-    ALOGV("queued work count -%u from %d", count, mQueuedWorkCount.load());
-    int32_t currentCount = (mQueuedWorkCount -= count);
-    if (currentCount == 0) {
-        Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
-        deadline->set(TimePoint::max(), "none");
-    }
-    Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
-    deadline->set(TimePoint::max(), "none");
-}
-
 }  // namespace android
 
 extern "C" android::CodecBase *CreateCodec() {
diff --git a/media/codec2/sfplugin/CCodec.h b/media/codec2/sfplugin/CCodec.h
index 78b009e..bb8bd19 100644
--- a/media/codec2/sfplugin/CCodec.h
+++ b/media/codec2/sfplugin/CCodec.h
@@ -66,9 +66,8 @@
     virtual void signalRequestIDRFrame() override;
 
     void initiateReleaseIfStuck();
-    void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
-                    size_t numDiscardedInputBuffers);
-    void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
+    void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems);
+    void onInputBufferDone(uint64_t frameIndex, size_t arrayIndex);
 
 protected:
     virtual ~CCodec();
@@ -76,7 +75,7 @@
     virtual void onMessageReceived(const sp<AMessage> &msg) override;
 
 private:
-    typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
+    typedef std::chrono::steady_clock::time_point TimePoint;
 
     status_t tryAndReportOnError(std::function<status_t()> job);
 
@@ -100,9 +99,6 @@
             const std::chrono::milliseconds &timeout,
             const char *name);
 
-    void onWorkQueued(bool eos);
-    void subQueuedWorkCount(uint32_t count);
-
     enum {
         kWhatAllocate,
         kWhatConfigure,
@@ -167,13 +163,9 @@
     struct ClientListener;
 
     Mutexed<NamedTimePoint> mDeadline;
-    std::atomic_int32_t mQueuedWorkCount;
-    Mutexed<NamedTimePoint> mQueueDeadline;
-    Mutexed<NamedTimePoint> mEosDeadline;
     typedef CCodecConfig Config;
     Mutexed<Config> mConfig;
     Mutexed<std::list<std::unique_ptr<C2Work>>> mWorkDoneQueue;
-    Mutexed<std::list<size_t>> mNumDiscardedInputBuffersQueue;
 
     friend class CCodecCallbackImpl;
 
diff --git a/media/codec2/sfplugin/CCodecBufferChannel.cpp b/media/codec2/sfplugin/CCodecBufferChannel.cpp
index 8e6a3f8..587f83c 100644
--- a/media/codec2/sfplugin/CCodecBufferChannel.cpp
+++ b/media/codec2/sfplugin/CCodecBufferChannel.cpp
@@ -152,6 +152,11 @@
      */
     virtual std::unique_ptr<InputBuffers> toArrayMode(size_t size) = 0;
 
+    /**
+     * Return number of buffers the client owns.
+     */
+    virtual size_t numClientBuffers() const = 0;
+
 protected:
     // Pool to obtain blocks for input buffers.
     std::shared_ptr<C2BlockPool> mPool;
@@ -508,6 +513,14 @@
         mBuffers.clear();
     }
 
+    size_t numClientBuffers() const {
+        return std::count_if(
+                mBuffers.begin(), mBuffers.end(),
+                [](const Entry &entry) {
+                    return (entry.clientBuffer != nullptr);
+                });
+    }
+
 private:
     friend class BuffersArrayImpl;
 
@@ -693,6 +706,14 @@
         }
     }
 
+    size_t numClientBuffers() const {
+        return std::count_if(
+                mBuffers.begin(), mBuffers.end(),
+                [](const Entry &entry) {
+                    return entry.ownedByClient;
+                });
+    }
+
 private:
     std::string mImplName; ///< name for debugging
     const char *mName; ///< C-string version of name
@@ -756,6 +777,10 @@
         mImpl.flush();
     }
 
+    size_t numClientBuffers() const final {
+        return mImpl.numClientBuffers();
+    }
+
 private:
     BuffersArrayImpl mImpl;
 };
@@ -823,6 +848,10 @@
         return std::move(array);
     }
 
+    size_t numClientBuffers() const final {
+        return mImpl.numClientBuffers();
+    }
+
     virtual sp<Codec2Buffer> alloc(size_t size) {
         C2MemoryUsage usage = { C2MemoryUsage::CPU_READ, C2MemoryUsage::CPU_WRITE };
         std::shared_ptr<C2LinearBlock> block;
@@ -967,6 +996,10 @@
         return std::move(array);
     }
 
+    size_t numClientBuffers() const final {
+        return mImpl.numClientBuffers();
+    }
+
 private:
     FlexBuffersImpl mImpl;
     std::shared_ptr<C2AllocatorStore> mStore;
@@ -1030,6 +1063,10 @@
         return std::move(array);
     }
 
+    size_t numClientBuffers() const final {
+        return mImpl.numClientBuffers();
+    }
+
 private:
     FlexBuffersImpl mImpl;
     std::shared_ptr<LocalBufferPool> mLocalBufferPool;
@@ -1065,6 +1102,10 @@
     void getArray(Vector<sp<MediaCodecBuffer>> *array) const final {
         array->clear();
     }
+
+    size_t numClientBuffers() const final {
+        return 0u;
+    }
 };
 
 class OutputBuffersArray : public CCodecBufferChannel::OutputBuffers {
@@ -1422,90 +1463,6 @@
     count->value = -1;
 }
 
-// CCodecBufferChannel::PipelineCapacity
-
-CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
-      : input(0), component(0),
-        mName("<UNKNOWN COMPONENT>") {
-}
-
-void CCodecBufferChannel::PipelineCapacity::initialize(
-        int newInput,
-        int newComponent,
-        const char* newName,
-        const char* callerTag) {
-    input.store(newInput, std::memory_order_relaxed);
-    component.store(newComponent, std::memory_order_relaxed);
-    mName = newName;
-    ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
-          "pipeline availability initialized ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            newInput, newComponent);
-}
-
-bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
-    int prevInput = input.fetch_sub(1, std::memory_order_relaxed);
-    int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
-    if (prevInput > 0 && prevComponent > 0) {
-        ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
-              "pipeline availability -1 all ==> "
-              "input = %d, component = %d",
-                mName, callerTag ? callerTag : "*",
-                prevInput - 1,
-                prevComponent - 1);
-        return true;
-    }
-    input.fetch_add(1, std::memory_order_relaxed);
-    component.fetch_add(1, std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
-          "pipeline availability unchanged ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            prevInput,
-            prevComponent);
-    return false;
-}
-
-void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
-    int prevInput = input.fetch_add(1, std::memory_order_relaxed);
-    int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::free(): "
-          "pipeline availability +1 all ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            prevInput + 1,
-            prevComponent + 1);
-}
-
-int CCodecBufferChannel::PipelineCapacity::freeInputSlots(
-        size_t numDiscardedInputBuffers,
-        const char* callerTag) {
-    int prevInput = input.fetch_add(numDiscardedInputBuffers,
-                                    std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::freeInputSlots(%zu): "
-          "pipeline availability +%zu input ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            numDiscardedInputBuffers,
-            numDiscardedInputBuffers,
-            prevInput + static_cast<int>(numDiscardedInputBuffers),
-            component.load(std::memory_order_relaxed));
-    return prevInput + static_cast<int>(numDiscardedInputBuffers);
-}
-
-int CCodecBufferChannel::PipelineCapacity::freeComponentSlot(
-        const char* callerTag) {
-    int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
-    ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
-          "pipeline availability +1 component ==> "
-          "input = %d, component = %d",
-            mName, callerTag ? callerTag : "*",
-            input.load(std::memory_order_relaxed),
-            prevComponent + 1);
-    return prevComponent + 1;
-}
-
 // CCodecBufferChannel::ReorderStash
 
 CCodecBufferChannel::ReorderStash::ReorderStash() {
@@ -1595,7 +1552,6 @@
       mFrameIndex(0u),
       mFirstValidFrameIndex(0u),
       mMetaMode(MODE_NONE),
-      mAvailablePipelineCapacity(),
       mInputMetEos(false) {
     Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
     buffers->reset(new DummyInputBuffers(""));
@@ -1658,6 +1614,9 @@
     work->input.ordinal.customOrdinal = timeUs;
     work->input.buffers.clear();
 
+    uint64_t queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
+    std::vector<std::shared_ptr<C2Buffer>> queuedBuffers;
+
     if (buffer->size() > 0u) {
         Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
         std::shared_ptr<C2Buffer> c2buffer;
@@ -1665,11 +1624,9 @@
             return -ENOENT;
         }
         work->input.buffers.push_back(c2buffer);
-    } else {
-        mAvailablePipelineCapacity.freeInputSlots(1, "queueInputBufferInternal");
-        if (eos) {
-            flags |= C2FrameData::FLAG_END_OF_STREAM;
-        }
+        queuedBuffers.push_back(c2buffer);
+    } else if (eos) {
+        flags |= C2FrameData::FLAG_END_OF_STREAM;
     }
     work->input.flags = (C2FrameData::flags_t)flags;
     // TODO: fill info's
@@ -1680,10 +1637,16 @@
 
     std::list<std::unique_ptr<C2Work>> items;
     items.push_back(std::move(work));
+    mPipelineWatcher.lock()->onWorkQueued(
+            queuedFrameIndex,
+            std::move(queuedBuffers),
+            PipelineWatcher::Clock::now());
     c2_status_t err = mComponent->queue(&items);
+    if (err != C2_OK) {
+        mPipelineWatcher.lock()->onWorkDone(queuedFrameIndex);
+    }
 
     if (err == C2_OK && eos && buffer->size() > 0u) {
-        mCCodecCallback->onWorkQueued(false);
         work.reset(new C2Work);
         work->input.ordinal.timestamp = timeUs;
         work->input.ordinal.frameIndex = mFrameIndex++;
@@ -1693,13 +1656,22 @@
         work->input.flags = C2FrameData::FLAG_END_OF_STREAM;
         work->worklets.emplace_back(new C2Worklet);
 
+        queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
+        queuedBuffers.clear();
+
         items.clear();
         items.push_back(std::move(work));
+
+        mPipelineWatcher.lock()->onWorkQueued(
+                queuedFrameIndex,
+                std::move(queuedBuffers),
+                PipelineWatcher::Clock::now());
         err = mComponent->queue(&items);
+        if (err != C2_OK) {
+            mPipelineWatcher.lock()->onWorkDone(queuedFrameIndex);
+        }
     }
     if (err == C2_OK) {
-        mCCodecCallback->onWorkQueued(eos);
-
         Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
         bool released = (*buffers)->releaseBuffer(buffer, nullptr, true);
         ALOGV("[%s] queueInputBuffer: buffer %sreleased", mName, released ? "" : "not ");
@@ -1846,14 +1818,16 @@
 void CCodecBufferChannel::feedInputBufferIfAvailableInternal() {
     while (!mInputMetEos &&
            !mReorderStash.lock()->hasPending() &&
-           mAvailablePipelineCapacity.allocate("feedInputBufferIfAvailable")) {
+           !mPipelineWatcher.lock()->pipelineFull()) {
         sp<MediaCodecBuffer> inBuffer;
         size_t index;
         {
             Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
+            if ((*buffers)->numClientBuffers() >= mNumInputSlots) {
+                return;
+            }
             if (!(*buffers)->requestNewBuffer(&index, &inBuffer)) {
                 ALOGV("[%s] no new buffer available", mName);
-                mAvailablePipelineCapacity.free("feedInputBufferIfAvailable");
                 break;
             }
         }
@@ -2032,15 +2006,12 @@
     {
         Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
         if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr, true)) {
-            buffers.unlock();
             released = true;
-            mAvailablePipelineCapacity.freeInputSlots(1, "discardBuffer");
         }
     }
     {
         Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
         if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr)) {
-            buffers.unlock();
             released = true;
         }
     }
@@ -2408,10 +2379,14 @@
     // about buffers from the previous generation do not interfere with the
     // newly initialized pipeline capacity.
 
-    mAvailablePipelineCapacity.initialize(
-            mNumInputSlots,
-            mNumInputSlots + mNumOutputSlots,
-            mName);
+    {
+        Mutexed<PipelineWatcher>::Locked watcher(mPipelineWatcher);
+        watcher->inputDelay(inputDelay ? inputDelay.value : 0)
+                .pipelineDelay(pipelineDelay ? pipelineDelay.value : 0)
+                .outputDelay(outputDelay ? outputDelay.value : 0)
+                .smoothnessFactor(kSmoothnessFactor);
+        watcher->flush();
+    }
 
     mInputMetEos = false;
     mSync.start();
@@ -2472,21 +2447,16 @@
                 buffer->meta()->setInt64("timeUs", 0);
                 post = false;
             }
-            if (mAvailablePipelineCapacity.allocate("requestInitialInputBuffers")) {
-                if (post) {
-                    mCallback->onInputBufferAvailable(index, buffer);
-                } else {
-                    toBeQueued.emplace_back(buffer);
-                }
+            if (post) {
+                mCallback->onInputBufferAvailable(index, buffer);
             } else {
-                ALOGD("[%s] pipeline is full while requesting %zu-th input buffer",
-                        mName, i);
+                toBeQueued.emplace_back(buffer);
             }
         }
     }
     for (const sp<MediaCodecBuffer> &buffer : toBeQueued) {
         if (queueInputBufferInternal(buffer) != OK) {
-            mAvailablePipelineCapacity.freeComponentSlot("requestInitialInputBuffers");
+            ALOGV("[%s] Error while queueing initial buffers", mName);
         }
     }
     return OK;
@@ -2532,28 +2502,25 @@
         (*buffers)->flush(flushedWork);
     }
     mReorderStash.lock()->flush();
+    mPipelineWatcher.lock()->flush();
 }
 
 void CCodecBufferChannel::onWorkDone(
         std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
-        const C2StreamInitDataInfo::output *initData,
-        size_t numDiscardedInputBuffers) {
+        const C2StreamInitDataInfo::output *initData) {
     if (handleWork(std::move(work), outputFormat, initData)) {
-        mAvailablePipelineCapacity.freeInputSlots(numDiscardedInputBuffers,
-                                                  "onWorkDone");
         feedInputBufferIfAvailable();
     }
 }
 
 void CCodecBufferChannel::onInputBufferDone(
-        const std::shared_ptr<C2Buffer>& buffer) {
+        uint64_t frameIndex, size_t arrayIndex) {
+    std::shared_ptr<C2Buffer> buffer =
+            mPipelineWatcher.lock()->onInputBufferReleased(frameIndex, arrayIndex);
     bool newInputSlotAvailable;
     {
         Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
         newInputSlotAvailable = (*buffers)->expireComponentBuffer(buffer);
-        if (newInputSlotAvailable) {
-            mAvailablePipelineCapacity.freeInputSlots(1, "onInputBufferDone");
-        }
     }
     if (newInputSlotAvailable) {
         feedInputBufferIfAvailable();
@@ -2573,7 +2540,7 @@
     if (work->worklets.size() != 1u
             || !work->worklets.front()
             || !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
-        mAvailablePipelineCapacity.freeComponentSlot("handleWork");
+        mPipelineWatcher.lock()->onWorkDone(work->input.ordinal.frameIndex.peeku());
     }
 
     if (work->result == C2_NOT_FOUND) {
@@ -2832,6 +2799,10 @@
     return OK;
 }
 
+PipelineWatcher::Clock::duration CCodecBufferChannel::elapsed() {
+    return mPipelineWatcher.lock()->elapsed(PipelineWatcher::Clock::now());
+}
+
 void CCodecBufferChannel::setMetaMode(MetaMode mode) {
     mMetaMode = mode;
 }
diff --git a/media/codec2/sfplugin/CCodecBufferChannel.h b/media/codec2/sfplugin/CCodecBufferChannel.h
index ebc1491..9dccab8 100644
--- a/media/codec2/sfplugin/CCodecBufferChannel.h
+++ b/media/codec2/sfplugin/CCodecBufferChannel.h
@@ -34,6 +34,7 @@
 #include <media/ICrypto.h>
 
 #include "InputSurfaceWrapper.h"
+#include "PipelineWatcher.h"
 
 namespace android {
 
@@ -44,7 +45,6 @@
     virtual ~CCodecCallback() = default;
     virtual void onError(status_t err, enum ActionCode actionCode) = 0;
     virtual void onOutputFramesRendered(int64_t mediaTimeUs, nsecs_t renderTimeNs) = 0;
-    virtual void onWorkQueued(bool eos) = 0;
     virtual void onOutputBuffersChanged() = 0;
 };
 
@@ -128,22 +128,21 @@
      * @param workItems   finished work item.
      * @param outputFormat new output format if it has changed, otherwise nullptr
      * @param initData    new init data (CSD) if it has changed, otherwise nullptr
-     * @param numDiscardedInputBuffers the number of input buffers that are
-     *                    returned for the first time (not previously returned by
-     *                    onInputBufferDone()).
      */
     void onWorkDone(
             std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
-            const C2StreamInitDataInfo::output *initData,
-            size_t numDiscardedInputBuffers);
+            const C2StreamInitDataInfo::output *initData);
 
     /**
      * Make an input buffer available for the client as it is no longer needed
      * by the codec.
      *
-     * @param buffer The buffer that becomes unused.
+     * @param frameIndex The index of input work
+     * @param arrayIndex The index of buffer in the input work buffers.
      */
-    void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
+    void onInputBufferDone(uint64_t frameIndex, size_t arrayIndex);
+
+    PipelineWatcher::Clock::duration elapsed();
 
     enum MetaMode {
         MODE_NONE,
@@ -266,79 +265,7 @@
 
     MetaMode mMetaMode;
 
-    // PipelineCapacity is used in the input buffer gating logic.
-    //
-    // There are three criteria that need to be met before
-    // onInputBufferAvailable() is called:
-    // 1. The number of input buffers that have been received by
-    //    CCodecBufferChannel but not returned via onWorkDone() or
-    //    onInputBufferDone() does not exceed a certain limit. (Let us call this
-    //    number the "input" capacity.)
-    // 2. The number of work items that have been received by
-    //    CCodecBufferChannel whose outputs have not been returned from the
-    //    component (by calling onWorkDone()) does not exceed a certain limit.
-    //    (Let us call this the "component" capacity.)
-    //
-    // These three criteria guarantee that a new input buffer that arrives from
-    // the invocation of onInputBufferAvailable() will not
-    // 1. overload CCodecBufferChannel's input buffers;
-    // 2. overload the component; or
-    //
-    struct PipelineCapacity {
-        // The number of available input capacity.
-        std::atomic_int input;
-        // The number of available component capacity.
-        std::atomic_int component;
-
-        PipelineCapacity();
-        // Set the values of #input and #component.
-        void initialize(int newInput, int newComponent,
-                        const char* newName = "<UNKNOWN COMPONENT>",
-                        const char* callerTag = nullptr);
-
-        // Return true and decrease #input and #component by one if
-        // they are all greater than zero; return false otherwise.
-        //
-        // callerTag is used for logging only.
-        //
-        // allocate() is called by CCodecBufferChannel to check whether it can
-        // receive another input buffer. If the return value is true,
-        // onInputBufferAvailable() and onOutputBufferAvailable() can be called
-        // afterwards.
-        bool allocate(const char* callerTag = nullptr);
-
-        // Increase #input and #component by one.
-        //
-        // callerTag is used for logging only.
-        //
-        // free() is called by CCodecBufferChannel after allocate() returns true
-        // but onInputBufferAvailable() cannot be called for any reasons. It
-        // essentially undoes an allocate() call.
-        void free(const char* callerTag = nullptr);
-
-        // Increase #input by @p numDiscardedInputBuffers.
-        //
-        // callerTag is used for logging only.
-        //
-        // freeInputSlots() is called by CCodecBufferChannel when onWorkDone()
-        // or onInputBufferDone() is called. @p numDiscardedInputBuffers is
-        // provided in onWorkDone(), and is 1 in onInputBufferDone().
-        int freeInputSlots(size_t numDiscardedInputBuffers,
-                           const char* callerTag = nullptr);
-
-        // Increase #component by one and return the updated value.
-        //
-        // callerTag is used for logging only.
-        //
-        // freeComponentSlot() is called by CCodecBufferChannel when
-        // onWorkDone() is called.
-        int freeComponentSlot(const char* callerTag = nullptr);
-
-    private:
-        // Component name. Used for logging.
-        const char* mName;
-    };
-    PipelineCapacity mAvailablePipelineCapacity;
+    Mutexed<PipelineWatcher> mPipelineWatcher;
 
     class ReorderStash {
     public:
diff --git a/media/codec2/sfplugin/PipelineWatcher.cpp b/media/codec2/sfplugin/PipelineWatcher.cpp
new file mode 100644
index 0000000..fe0a2c8
--- /dev/null
+++ b/media/codec2/sfplugin/PipelineWatcher.cpp
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NDEBUG 0
+#define LOG_TAG "PipelineWatcher"
+
+#include <numeric>
+
+#include <log/log.h>
+
+#include "PipelineWatcher.h"
+
+namespace android {
+
+PipelineWatcher &PipelineWatcher::inputDelay(uint32_t value) {
+    mInputDelay = value;
+    return *this;
+}
+
+PipelineWatcher &PipelineWatcher::pipelineDelay(uint32_t value) {
+    mPipelineDelay = value;
+    return *this;
+}
+
+PipelineWatcher &PipelineWatcher::outputDelay(uint32_t value) {
+    mOutputDelay = value;
+    return *this;
+}
+
+PipelineWatcher &PipelineWatcher::smoothnessFactor(uint32_t value) {
+    mSmoothnessFactor = value;
+    return *this;
+}
+
+void PipelineWatcher::onWorkQueued(
+        uint64_t frameIndex,
+        std::vector<std::shared_ptr<C2Buffer>> &&buffers,
+        const Clock::time_point &queuedAt) {
+    ALOGV("onWorkQueued(frameIndex=%llu, buffers(size=%zu), queuedAt=%lld)",
+          (unsigned long long)frameIndex,
+          buffers.size(),
+          (long long)queuedAt.time_since_epoch().count());
+    auto it = mFramesInPipeline.find(frameIndex);
+    if (it != mFramesInPipeline.end()) {
+        ALOGD("onWorkQueued: Duplicate frame index (%llu); previous entry removed",
+              (unsigned long long)frameIndex);
+        (void)mFramesInPipeline.erase(it);
+    }
+    (void)mFramesInPipeline.try_emplace(frameIndex, std::move(buffers), queuedAt);
+}
+
+std::shared_ptr<C2Buffer> PipelineWatcher::onInputBufferReleased(
+        uint64_t frameIndex, size_t arrayIndex) {
+    ALOGV("onInputBufferReleased(frameIndex=%llu, arrayIndex=%zu)",
+          (unsigned long long)frameIndex, arrayIndex);
+    auto it = mFramesInPipeline.find(frameIndex);
+    if (it == mFramesInPipeline.end()) {
+        ALOGD("onInputBufferReleased: frameIndex not found (%llu); ignored",
+              (unsigned long long)frameIndex);
+        return nullptr;
+    }
+    if (it->second.buffers.size() <= arrayIndex) {
+        ALOGD("onInputBufferReleased: buffers at %llu: size %zu, requested index: %zu",
+              (unsigned long long)frameIndex, it->second.buffers.size(), arrayIndex);
+        return nullptr;
+    }
+    std::shared_ptr<C2Buffer> buffer(std::move(it->second.buffers[arrayIndex]));
+    ALOGD_IF(!buffer, "onInputBufferReleased: buffer already released (%llu:%zu)",
+             (unsigned long long)frameIndex, arrayIndex);
+    return buffer;
+}
+
+void PipelineWatcher::onWorkDone(uint64_t frameIndex) {
+    ALOGV("onWorkDone(frameIndex=%llu)", (unsigned long long)frameIndex);
+    auto it = mFramesInPipeline.find(frameIndex);
+    if (it == mFramesInPipeline.end()) {
+        ALOGD("onWorkDone: frameIndex not found (%llu); ignored",
+              (unsigned long long)frameIndex);
+        return;
+    }
+    (void)mFramesInPipeline.erase(it);
+}
+
+void PipelineWatcher::flush() {
+    mFramesInPipeline.clear();
+}
+
+bool PipelineWatcher::pipelineFull() const {
+    if (mFramesInPipeline.size() >=
+            mInputDelay + mPipelineDelay + mOutputDelay + mSmoothnessFactor) {
+        ALOGV("pipelineFull: too many frames in pipeline (%zu)", mFramesInPipeline.size());
+        return true;
+    }
+    size_t sizeWithInputReleased = std::count_if(
+            mFramesInPipeline.begin(),
+            mFramesInPipeline.end(),
+            [](const decltype(mFramesInPipeline)::value_type &value) {
+                for (const std::shared_ptr<C2Buffer> &buffer : value.second.buffers) {
+                    if (buffer) {
+                        return false;
+                    }
+                }
+                return true;
+            });
+    if (sizeWithInputReleased >=
+            mPipelineDelay + mOutputDelay + mSmoothnessFactor) {
+        ALOGV("pipelineFull: too many frames in pipeline, with input released (%zu)",
+              sizeWithInputReleased);
+        return true;
+    }
+    ALOGV("pipeline has room (total: %zu, input released: %zu)",
+          mFramesInPipeline.size(), sizeWithInputReleased);
+    return false;
+}
+
+PipelineWatcher::Clock::duration PipelineWatcher::elapsed(
+        const PipelineWatcher::Clock::time_point &now) const {
+    return std::accumulate(
+            mFramesInPipeline.begin(),
+            mFramesInPipeline.end(),
+            Clock::duration::zero(),
+            [&now](const Clock::duration &current,
+                   const decltype(mFramesInPipeline)::value_type &value) {
+                Clock::duration elapsed = now - value.second.queuedAt;
+                ALOGV("elapsed: frameIndex = %llu elapsed = %lldms",
+                      (unsigned long long)value.first,
+                      std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
+                return current > elapsed ? current : elapsed;
+            });
+}
+
+}  // namespace android
diff --git a/media/codec2/sfplugin/PipelineWatcher.h b/media/codec2/sfplugin/PipelineWatcher.h
new file mode 100644
index 0000000..ce82298
--- /dev/null
+++ b/media/codec2/sfplugin/PipelineWatcher.h
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PIPELINE_WATCHER_H_
+#define PIPELINE_WATCHER_H_
+
+#include <chrono>
+#include <map>
+#include <memory>
+
+#include <C2Work.h>
+
+namespace android {
+
+/**
+ * PipelineWatcher watches the status of the work.
+ */
+class PipelineWatcher {
+public:
+    typedef std::chrono::steady_clock Clock;
+
+    PipelineWatcher()
+        : mInputDelay(0),
+          mPipelineDelay(0),
+          mOutputDelay(0),
+          mSmoothnessFactor(0) {}
+    ~PipelineWatcher() = default;
+
+    PipelineWatcher &inputDelay(uint32_t value);
+    PipelineWatcher &pipelineDelay(uint32_t value);
+    PipelineWatcher &outputDelay(uint32_t value);
+    PipelineWatcher &smoothnessFactor(uint32_t value);
+
+    void onWorkQueued(
+            uint64_t frameIndex,
+            std::vector<std::shared_ptr<C2Buffer>> &&buffers,
+            const Clock::time_point &queuedAt);
+    std::shared_ptr<C2Buffer> onInputBufferReleased(
+            uint64_t frameIndex, size_t arrayIndex);
+    void onWorkDone(uint64_t frameIndex);
+    void flush();
+
+    bool pipelineFull() const;
+    Clock::duration elapsed(const Clock::time_point &now) const;
+
+private:
+    uint32_t mInputDelay;
+    uint32_t mPipelineDelay;
+    uint32_t mOutputDelay;
+    uint32_t mSmoothnessFactor;
+
+    struct Frame {
+        Frame(std::vector<std::shared_ptr<C2Buffer>> &&b,
+              const Clock::time_point &q)
+            : buffers(b),
+              queuedAt(q) {}
+        std::vector<std::shared_ptr<C2Buffer>> buffers;
+        const Clock::time_point queuedAt;
+    };
+    std::map<uint64_t, Frame> mFramesInPipeline;
+};
+
+}  // namespace android
+
+#endif  // PIPELINE_WATCHER_H_