CCodec: refactor pipeline logic
Bug: 123632127
Test: bug repro steps
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:size:small
Test: atest CtsMediaTestCases -- --module-arg CtsMediaTestCases:include-annotation:android.media.cts.MediaHeavyPresubmitTests
Change-Id: I289f51709dbd675991cd8949cd343c5bf5c6ef5c
diff --git a/media/codec2/components/aac/C2SoftAacDec.cpp b/media/codec2/components/aac/C2SoftAacDec.cpp
index c7c8442..04dda8f 100644
--- a/media/codec2/components/aac/C2SoftAacDec.cpp
+++ b/media/codec2/components/aac/C2SoftAacDec.cpp
@@ -52,33 +52,26 @@
namespace android {
-class C2SoftAacDec::IntfImpl : public C2InterfaceHelper {
+constexpr char COMPONENT_NAME[] = "c2.android.aac.decoder";
+
+class C2SoftAacDec::IntfImpl : public SimpleInterface<void>::BaseParams {
public:
explicit IntfImpl(const std::shared_ptr<C2ReflectorHelper> &helper)
- : C2InterfaceHelper(helper) {
-
- setDerivedInstance(this);
+ : SimpleInterface<void>::BaseParams(
+ helper,
+ COMPONENT_NAME,
+ C2Component::KIND_DECODER,
+ C2Component::DOMAIN_AUDIO,
+ MEDIA_MIMETYPE_AUDIO_AAC) {
+ noPrivateBuffers();
+ noInputReferences();
+ noOutputReferences();
+ noInputLatency();
+ noTimeStretch();
addParameter(
- DefineParam(mInputFormat, C2_NAME_INPUT_STREAM_FORMAT_SETTING)
- .withConstValue(new C2StreamFormatConfig::input(0u, C2FormatCompressed))
- .build());
-
- addParameter(
- DefineParam(mOutputFormat, C2_NAME_OUTPUT_STREAM_FORMAT_SETTING)
- .withConstValue(new C2StreamFormatConfig::output(0u, C2FormatAudio))
- .build());
-
- addParameter(
- DefineParam(mInputMediaType, C2_NAME_INPUT_PORT_MIME_SETTING)
- .withConstValue(AllocSharedString<C2PortMimeConfig::input>(
- MEDIA_MIMETYPE_AUDIO_AAC))
- .build());
-
- addParameter(
- DefineParam(mOutputMediaType, C2_NAME_OUTPUT_PORT_MIME_SETTING)
- .withConstValue(AllocSharedString<C2PortMimeConfig::output>(
- MEDIA_MIMETYPE_AUDIO_RAW))
+ DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
+ .withConstValue(new C2PortActualDelayTuning::output(2u))
.build());
addParameter(
@@ -231,8 +224,6 @@
// TODO Add : C2StreamAacSbrModeTuning
};
-constexpr char COMPONENT_NAME[] = "c2.android.aac.decoder";
-
C2SoftAacDec::C2SoftAacDec(
const char *name,
c2_node_id_t id,
diff --git a/media/codec2/components/avc/C2SoftAvcDec.cpp b/media/codec2/components/avc/C2SoftAvcDec.cpp
index 3e62744..86cd3d8 100644
--- a/media/codec2/components/avc/C2SoftAvcDec.cpp
+++ b/media/codec2/components/avc/C2SoftAvcDec.cpp
@@ -51,6 +51,12 @@
noInputLatency();
noTimeStretch();
+ // TODO: Proper support for reorder depth.
+ addParameter(
+ DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
+ .withConstValue(new C2PortActualDelayTuning::output(8u))
+ .build());
+
// TODO: output latency and reordering
addParameter(
@@ -877,6 +883,8 @@
} else if (!hasPicture) {
fillEmptyWork(work);
}
+
+ work->input.buffers.clear();
}
c2_status_t C2SoftAvcDec::drainInternal(
diff --git a/media/codec2/components/hevc/C2SoftHevcDec.cpp b/media/codec2/components/hevc/C2SoftHevcDec.cpp
index 99892ce..f0d7d88 100644
--- a/media/codec2/components/hevc/C2SoftHevcDec.cpp
+++ b/media/codec2/components/hevc/C2SoftHevcDec.cpp
@@ -51,7 +51,11 @@
noInputLatency();
noTimeStretch();
- // TODO: output latency and reordering
+ // TODO: Proper support for reorder depth.
+ addParameter(
+ DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
+ .withConstValue(new C2PortActualDelayTuning::output(8u))
+ .build());
addParameter(
DefineParam(mAttrib, C2_PARAMKEY_COMPONENT_ATTRIBUTES)
diff --git a/media/codec2/components/mpeg4_h263/C2SoftMpeg4Dec.cpp b/media/codec2/components/mpeg4_h263/C2SoftMpeg4Dec.cpp
index 901f5ed..0b89cff 100644
--- a/media/codec2/components/mpeg4_h263/C2SoftMpeg4Dec.cpp
+++ b/media/codec2/components/mpeg4_h263/C2SoftMpeg4Dec.cpp
@@ -60,7 +60,11 @@
noInputLatency();
noTimeStretch();
- // TODO: output latency and reordering
+ // TODO: Proper support for reorder depth.
+ addParameter(
+ DefineParam(mActualOutputDelay, C2_PARAMKEY_OUTPUT_DELAY)
+ .withConstValue(new C2PortActualDelayTuning::output(1u))
+ .build());
addParameter(
DefineParam(mAttrib, C2_PARAMKEY_COMPONENT_ATTRIBUTES)
diff --git a/media/codec2/hidl/1.0/vts/functional/common/media_c2_hidl_test_common.h b/media/codec2/hidl/1.0/vts/functional/common/media_c2_hidl_test_common.h
index d1557cb..fca2902 100644
--- a/media/codec2/hidl/1.0/vts/functional/common/media_c2_hidl_test_common.h
+++ b/media/codec2/hidl/1.0/vts/functional/common/media_c2_hidl_test_common.h
@@ -55,12 +55,10 @@
: callBack(fn) {}
virtual void onWorkDone(
const std::weak_ptr<android::Codec2Client::Component>& comp,
- std::list<std::unique_ptr<C2Work>>& workItems,
- size_t numDiscardedInputBuffers) override {
+ std::list<std::unique_ptr<C2Work>>& workItems) override {
/* TODO */
ALOGD("onWorkDone called");
(void)comp;
- (void)numDiscardedInputBuffers;
if (callBack) callBack(workItems);
}
@@ -89,9 +87,10 @@
}
virtual void onInputBufferDone(
- const std::shared_ptr<C2Buffer>& buffer) override {
+ uint64_t frameIndex, size_t arrayIndex) override {
/* TODO */
- (void)buffer;
+ (void)frameIndex;
+ (void)arrayIndex;
}
virtual void onFrameRendered(
diff --git a/media/codec2/hidl/client/client.cpp b/media/codec2/hidl/client/client.cpp
index 5b52fcd..e02c2ca 100644
--- a/media/codec2/hidl/client/client.cpp
+++ b/media/codec2/hidl/client/client.cpp
@@ -344,17 +344,13 @@
return Void();
}
// release input buffers potentially held by the component from queue
- size_t numDiscardedInputBuffers = 0;
std::shared_ptr<Codec2Client::Component> strongComponent =
component.lock();
if (strongComponent) {
- numDiscardedInputBuffers =
- strongComponent->handleOnWorkDone(workItems);
+ strongComponent->handleOnWorkDone(workItems);
}
if (std::shared_ptr<Codec2Client::Listener> listener = base.lock()) {
- listener->onWorkDone(component,
- workItems,
- numDiscardedInputBuffers);
+ listener->onWorkDone(component, workItems);
} else {
LOG(DEBUG) << "onWorkDone -- listener died.";
}
@@ -418,26 +414,15 @@
LOG(DEBUG) << "onInputBuffersReleased -- listener died.";
return Void();
}
- std::shared_ptr<Codec2Client::Component> strongComponent =
- component.lock();
- if (!strongComponent) {
- LOG(DEBUG) << "onInputBuffersReleased -- component died.";
- return Void();
- }
for (const InputBuffer& inputBuffer : inputBuffers) {
- std::shared_ptr<C2Buffer> buffer =
- strongComponent->freeInputBuffer(
- inputBuffer.frameIndex,
- inputBuffer.arrayIndex);
LOG(VERBOSE) << "onInputBuffersReleased --"
" received death notification of"
" input buffer:"
" frameIndex = " << inputBuffer.frameIndex
<< ", bufferIndex = " << inputBuffer.arrayIndex
<< ".";
- if (buffer) {
- listener->onInputBufferDone(buffer);
- }
+ listener->onInputBufferDone(
+ inputBuffer.frameIndex, inputBuffer.arrayIndex);
}
return Void();
}
@@ -918,43 +903,8 @@
return static_cast<c2_status_t>(static_cast<Status>(transResult));
}
-size_t Codec2Client::Component::handleOnWorkDone(
+void Codec2Client::Component::handleOnWorkDone(
const std::list<std::unique_ptr<C2Work>> &workItems) {
- // Input buffers' lifetime management
- std::vector<uint64_t> inputDone;
- for (const std::unique_ptr<C2Work> &work : workItems) {
- if (work) {
- if (work->worklets.empty()
- || !work->worklets.back()
- || (work->worklets.back()->output.flags &
- C2FrameData::FLAG_INCOMPLETE) == 0) {
- // input is complete
- inputDone.emplace_back(work->input.ordinal.frameIndex.peeku());
- }
- }
- }
-
- size_t numDiscardedInputBuffers = 0;
- {
- std::lock_guard<std::mutex> lock(mInputBuffersMutex);
- for (uint64_t inputIndex : inputDone) {
- auto it = mInputBuffers.find(inputIndex);
- if (it == mInputBuffers.end()) {
- LOG(VERBOSE) << "onWorkDone -- returned consumed/unknown "
- "input frame: index = "
- << inputIndex << ".";
- } else {
- LOG(VERBOSE) << "onWorkDone -- processed input frame: "
- << inputIndex
- << " (containing " << it->second.size()
- << " buffers).";
- mInputBuffers.erase(it);
- mInputBufferCount.erase(inputIndex);
- ++numDiscardedInputBuffers;
- }
- }
- }
-
// Output bufferqueue-based blocks' lifetime management
mOutputBufferQueueMutex.lock();
sp<IGraphicBufferProducer> igbp = mOutputIgbp;
@@ -965,72 +915,10 @@
if (igbp) {
holdBufferQueueBlocks(workItems, igbp, bqId, generation);
}
- return numDiscardedInputBuffers;
-}
-
-std::shared_ptr<C2Buffer> Codec2Client::Component::freeInputBuffer(
- uint64_t frameIndex,
- size_t bufferIndex) {
- std::shared_ptr<C2Buffer> buffer;
- std::lock_guard<std::mutex> lock(mInputBuffersMutex);
- auto it = mInputBuffers.find(frameIndex);
- if (it == mInputBuffers.end()) {
- LOG(INFO) << "freeInputBuffer -- Unrecognized input frame index "
- << frameIndex << ".";
- return nullptr;
- }
- if (bufferIndex >= it->second.size()) {
- LOG(INFO) << "freeInputBuffer -- Input buffer number " << bufferIndex
- << " is not valid in input with frame index " << frameIndex
- << ".";
- return nullptr;
- }
- buffer = it->second[bufferIndex];
- if (!buffer) {
- LOG(INFO) << "freeInputBuffer -- Input buffer number " << bufferIndex
- << " in input with frame index " << frameIndex
- << " has already been freed.";
- return nullptr;
- }
- it->second[bufferIndex] = nullptr;
- if (--mInputBufferCount[frameIndex] == 0) {
- mInputBuffers.erase(it);
- mInputBufferCount.erase(frameIndex);
- }
- return buffer;
}
c2_status_t Codec2Client::Component::queue(
std::list<std::unique_ptr<C2Work>>* const items) {
- // remember input buffers queued to hold reference to them
- {
- std::lock_guard<std::mutex> lock(mInputBuffersMutex);
- for (const std::unique_ptr<C2Work> &work : *items) {
- if (!work) {
- continue;
- }
- if (work->input.buffers.size() == 0) {
- continue;
- }
-
- uint64_t inputIndex = work->input.ordinal.frameIndex.peeku();
- auto res = mInputBuffers.emplace(inputIndex, work->input.buffers);
- if (!res.second) {
- // TODO: append? - for now we are replacing
- res.first->second = work->input.buffers;
- LOG(INFO) << "queue -- duplicate input frame index: "
- << inputIndex
- << ". Discarding the old input frame...";
- }
- mInputBufferCount[inputIndex] = work->input.buffers.size();
- LOG(VERBOSE) << "queue -- queuing input frame: "
- << "index = " << inputIndex
- << ", number of buffers = "
- << work->input.buffers.size()
- << ".";
- }
- }
-
WorkBundle workBundle;
if (!objcpy(&workBundle, *items, &mBufferPoolSender)) {
LOG(ERROR) << "queue -- bad input.";
@@ -1088,24 +976,6 @@
}
}
- // Input buffers' lifetime management
- for (uint64_t flushedIndex : flushedIndices) {
- std::lock_guard<std::mutex> lock(mInputBuffersMutex);
- auto it = mInputBuffers.find(flushedIndex);
- if (it == mInputBuffers.end()) {
- LOG(VERBOSE) << "flush -- returned consumed/unknown input frame: "
- "index = " << flushedIndex << ".";
- } else {
- LOG(VERBOSE) << "flush -- returned unprocessed input frame: "
- "index = " << flushedIndex
- << ", number of buffers = "
- << mInputBufferCount[flushedIndex]
- << ".";
- mInputBuffers.erase(it);
- mInputBufferCount.erase(flushedIndex);
- }
- }
-
// Output bufferqueue-based blocks' lifetime management
mOutputBufferQueueMutex.lock();
sp<IGraphicBufferProducer> igbp = mOutputIgbp;
@@ -1160,10 +1030,6 @@
if (status != C2_OK) {
LOG(DEBUG) << "stop -- call failed: " << status << ".";
}
- mInputBuffersMutex.lock();
- mInputBuffers.clear();
- mInputBufferCount.clear();
- mInputBuffersMutex.unlock();
return status;
}
@@ -1178,10 +1044,6 @@
if (status != C2_OK) {
LOG(DEBUG) << "reset -- call failed: " << status << ".";
}
- mInputBuffersMutex.lock();
- mInputBuffers.clear();
- mInputBufferCount.clear();
- mInputBuffersMutex.unlock();
return status;
}
@@ -1196,10 +1058,6 @@
if (status != C2_OK) {
LOG(DEBUG) << "release -- call failed: " << status << ".";
}
- mInputBuffersMutex.lock();
- mInputBuffers.clear();
- mInputBufferCount.clear();
- mInputBuffersMutex.unlock();
return status;
}
diff --git a/media/codec2/hidl/client/include/codec2/hidl/client.h b/media/codec2/hidl/client/include/codec2/hidl/client.h
index f320ef3..5b3afca 100644
--- a/media/codec2/hidl/client/include/codec2/hidl/client.h
+++ b/media/codec2/hidl/client/include/codec2/hidl/client.h
@@ -252,16 +252,9 @@
struct Codec2Client::Listener {
// This is called when the component produces some output.
- //
- // numDiscardedInputBuffers is the number of input buffers contained in
- // workItems that have just become unused. Note that workItems may contain
- // more input buffers than numDiscardedInputBuffers because buffers that
- // have been previously reported by onInputBufferDone() are not counted
- // towards numDiscardedInputBuffers, but may still show up in workItems.
virtual void onWorkDone(
const std::weak_ptr<Component>& comp,
- std::list<std::unique_ptr<C2Work>>& workItems,
- size_t numDiscardedInputBuffers) = 0;
+ std::list<std::unique_ptr<C2Work>>& workItems) = 0;
// This is called when the component goes into a tripped state.
virtual void onTripped(
@@ -283,7 +276,7 @@
// Input buffers that have been returned by onWorkDone() or flush() will not
// trigger a call to this function.
virtual void onInputBufferDone(
- const std::shared_ptr<C2Buffer>& buffer) = 0;
+ uint64_t frameIndex, size_t arrayIndex) = 0;
// This is called when the component becomes aware of a frame being
// rendered.
@@ -385,24 +378,6 @@
protected:
sp<Base> mBase;
- // Mutex for mInputBuffers and mInputBufferCount.
- mutable std::mutex mInputBuffersMutex;
-
- // Map: frameIndex -> vector of bufferIndices
- //
- // mInputBuffers[frameIndex][bufferIndex] may be null if the buffer in that
- // slot has been freed.
- mutable std::map<uint64_t, std::vector<std::shared_ptr<C2Buffer>>>
- mInputBuffers;
-
- // Map: frameIndex -> number of bufferIndices that have not been freed
- //
- // mInputBufferCount[frameIndex] keeps track of the number of non-null
- // elements in mInputBuffers[frameIndex]. When mInputBufferCount[frameIndex]
- // decreases to 0, frameIndex can be removed from both mInputBuffers and
- // mInputBufferCount.
- mutable std::map<uint64_t, size_t> mInputBufferCount;
-
::android::hardware::media::c2::V1_0::utils::DefaultBufferPoolSender
mBufferPoolSender;
@@ -419,10 +394,7 @@
friend struct Codec2Client;
struct HidlListener;
- // Return the number of input buffers that should be discarded.
- size_t handleOnWorkDone(const std::list<std::unique_ptr<C2Work>> &workItems);
- // Remove an input buffer from mInputBuffers and return it.
- std::shared_ptr<C2Buffer> freeInputBuffer(uint64_t frameIndex, size_t bufferIndex);
+ void handleOnWorkDone(const std::list<std::unique_ptr<C2Work>> &workItems);
};
diff --git a/media/codec2/sfplugin/Android.bp b/media/codec2/sfplugin/Android.bp
index 2870d39..a212651 100644
--- a/media/codec2/sfplugin/Android.bp
+++ b/media/codec2/sfplugin/Android.bp
@@ -8,6 +8,7 @@
"CCodecConfig.cpp",
"Codec2Buffer.cpp",
"Codec2InfoBuilder.cpp",
+ "PipelineWatcher.cpp",
"ReflectedParamUpdater.cpp",
"SkipCutBuffer.cpp",
],
diff --git a/media/codec2/sfplugin/CCodec.cpp b/media/codec2/sfplugin/CCodec.cpp
index 10263de..ed1f85b 100644
--- a/media/codec2/sfplugin/CCodec.cpp
+++ b/media/codec2/sfplugin/CCodec.cpp
@@ -448,14 +448,13 @@
virtual void onWorkDone(
const std::weak_ptr<Codec2Client::Component>& component,
- std::list<std::unique_ptr<C2Work>>& workItems,
- size_t numDiscardedInputBuffers) override {
+ std::list<std::unique_ptr<C2Work>>& workItems) override {
(void)component;
sp<CCodec> codec(mCodec.promote());
if (!codec) {
return;
}
- codec->onWorkDone(workItems, numDiscardedInputBuffers);
+ codec->onWorkDone(workItems);
}
virtual void onTripped(
@@ -504,10 +503,10 @@
}
virtual void onInputBufferDone(
- const std::shared_ptr<C2Buffer>& buffer) override {
+ uint64_t frameIndex, size_t arrayIndex) override {
sp<CCodec> codec(mCodec.promote());
if (codec) {
- codec->onInputBufferDone(buffer);
+ codec->onInputBufferDone(frameIndex, arrayIndex);
}
}
@@ -531,10 +530,6 @@
{RenderedFrameInfo(mediaTimeUs, renderTimeNs)});
}
- void onWorkQueued(bool eos) override {
- mCodec->onWorkQueued(eos);
- }
-
void onOutputBuffersChanged() override {
mCodec->mCallback->onOutputBuffersChanged();
}
@@ -546,8 +541,7 @@
// CCodec
CCodec::CCodec()
- : mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))),
- mQueuedWorkCount(0) {
+ : mChannel(new CCodecBufferChannel(std::make_shared<CCodecCallbackImpl>(this))) {
}
CCodec::~CCodec() {
@@ -1343,7 +1337,6 @@
}
mChannel->flush(flushedWork);
- subQueuedWorkCount(flushedWork.size());
{
Mutexed<State>::Locked state(mState);
@@ -1465,28 +1458,16 @@
config->setParameters(comp, params, C2_MAY_BLOCK);
}
-void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
- size_t numDiscardedInputBuffers) {
+void CCodec::onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems) {
if (!workItems.empty()) {
- {
- Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
- mNumDiscardedInputBuffersQueue);
- numDiscardedInputBuffersQueue->insert(
- numDiscardedInputBuffersQueue->end(),
- workItems.size() - 1, 0);
- numDiscardedInputBuffersQueue->emplace_back(
- numDiscardedInputBuffers);
- }
- {
- Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
- queue->splice(queue->end(), workItems);
- }
+ Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
+ queue->splice(queue->end(), workItems);
}
(new AMessage(kWhatWorkDone, this))->post();
}
-void CCodec::onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer) {
- mChannel->onInputBufferDone(buffer);
+void CCodec::onInputBufferDone(uint64_t frameIndex, size_t arrayIndex) {
+ mChannel->onInputBufferDone(frameIndex, arrayIndex);
}
void CCodec::onMessageReceived(const sp<AMessage> &msg) {
@@ -1512,7 +1493,6 @@
case kWhatStart: {
// C2Component::start() should return within 500ms.
setDeadline(now, 550ms, "start");
- mQueuedWorkCount = 0;
start();
break;
}
@@ -1520,10 +1500,6 @@
// C2Component::stop() should return within 500ms.
setDeadline(now, 550ms, "stop");
stop();
-
- mQueuedWorkCount = 0;
- Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
- deadline->set(TimePoint::max(), "none");
break;
}
case kWhatFlush: {
@@ -1549,7 +1525,6 @@
}
case kWhatWorkDone: {
std::unique_ptr<C2Work> work;
- size_t numDiscardedInputBuffers;
bool shouldPost = false;
{
Mutexed<std::list<std::unique_ptr<C2Work>>>::Locked queue(mWorkDoneQueue);
@@ -1560,24 +1535,10 @@
queue->pop_front();
shouldPost = !queue->empty();
}
- {
- Mutexed<std::list<size_t>>::Locked numDiscardedInputBuffersQueue(
- mNumDiscardedInputBuffersQueue);
- if (numDiscardedInputBuffersQueue->empty()) {
- numDiscardedInputBuffers = 0;
- } else {
- numDiscardedInputBuffers = numDiscardedInputBuffersQueue->front();
- numDiscardedInputBuffersQueue->pop_front();
- }
- }
if (shouldPost) {
(new AMessage(kWhatWorkDone, this))->post();
}
- if (work->worklets.empty()
- || !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
- subQueuedWorkCount(1);
- }
// handle configuration changes in work done
Mutexed<Config>::Locked config(mConfig);
bool changed = false;
@@ -1641,8 +1602,7 @@
}
mChannel->onWorkDone(
std::move(work), changed ? config->mOutputFormat : nullptr,
- initData.hasChanged() ? initData.update().get() : nullptr,
- numDiscardedInputBuffers);
+ initData.hasChanged() ? initData.update().get() : nullptr);
break;
}
case kWhatWatch: {
@@ -1669,17 +1629,26 @@
void CCodec::initiateReleaseIfStuck() {
std::string name;
bool pendingDeadline = false;
- for (Mutexed<NamedTimePoint> *deadlinePtr : { &mDeadline, &mQueueDeadline, &mEosDeadline }) {
- Mutexed<NamedTimePoint>::Locked deadline(*deadlinePtr);
+ {
+ Mutexed<NamedTimePoint>::Locked deadline(mDeadline);
if (deadline->get() < std::chrono::steady_clock::now()) {
name = deadline->getName();
- break;
}
if (deadline->get() != TimePoint::max()) {
pendingDeadline = true;
}
}
if (name.empty()) {
+ constexpr std::chrono::steady_clock::duration kWorkDurationThreshold = 3s;
+ std::chrono::steady_clock::duration elapsed = mChannel->elapsed();
+ if (elapsed >= kWorkDurationThreshold) {
+ name = "queue";
+ }
+ if (elapsed > 0s) {
+ pendingDeadline = true;
+ }
+ }
+ if (name.empty()) {
// We're not stuck.
if (pendingDeadline) {
// If we are not stuck yet but still has deadline coming up,
@@ -1694,33 +1663,6 @@
mCallback->onError(UNKNOWN_ERROR, ACTION_CODE_FATAL);
}
-void CCodec::onWorkQueued(bool eos) {
- ALOGV("queued work count +1 from %d", mQueuedWorkCount.load());
- int32_t count = ++mQueuedWorkCount;
- if (eos) {
- CCodecWatchdog::getInstance()->watch(this);
- Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
- deadline->set(std::chrono::steady_clock::now() + 3s, "eos");
- }
- // TODO: query and use input/pipeline/output delay combined
- if (count >= 4) {
- CCodecWatchdog::getInstance()->watch(this);
- Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
- deadline->set(std::chrono::steady_clock::now() + 3s, "queue");
- }
-}
-
-void CCodec::subQueuedWorkCount(uint32_t count) {
- ALOGV("queued work count -%u from %d", count, mQueuedWorkCount.load());
- int32_t currentCount = (mQueuedWorkCount -= count);
- if (currentCount == 0) {
- Mutexed<NamedTimePoint>::Locked deadline(mEosDeadline);
- deadline->set(TimePoint::max(), "none");
- }
- Mutexed<NamedTimePoint>::Locked deadline(mQueueDeadline);
- deadline->set(TimePoint::max(), "none");
-}
-
} // namespace android
extern "C" android::CodecBase *CreateCodec() {
diff --git a/media/codec2/sfplugin/CCodec.h b/media/codec2/sfplugin/CCodec.h
index 78b009e..bb8bd19 100644
--- a/media/codec2/sfplugin/CCodec.h
+++ b/media/codec2/sfplugin/CCodec.h
@@ -66,9 +66,8 @@
virtual void signalRequestIDRFrame() override;
void initiateReleaseIfStuck();
- void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems,
- size_t numDiscardedInputBuffers);
- void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
+ void onWorkDone(std::list<std::unique_ptr<C2Work>> &workItems);
+ void onInputBufferDone(uint64_t frameIndex, size_t arrayIndex);
protected:
virtual ~CCodec();
@@ -76,7 +75,7 @@
virtual void onMessageReceived(const sp<AMessage> &msg) override;
private:
- typedef std::chrono::time_point<std::chrono::steady_clock> TimePoint;
+ typedef std::chrono::steady_clock::time_point TimePoint;
status_t tryAndReportOnError(std::function<status_t()> job);
@@ -100,9 +99,6 @@
const std::chrono::milliseconds &timeout,
const char *name);
- void onWorkQueued(bool eos);
- void subQueuedWorkCount(uint32_t count);
-
enum {
kWhatAllocate,
kWhatConfigure,
@@ -167,13 +163,9 @@
struct ClientListener;
Mutexed<NamedTimePoint> mDeadline;
- std::atomic_int32_t mQueuedWorkCount;
- Mutexed<NamedTimePoint> mQueueDeadline;
- Mutexed<NamedTimePoint> mEosDeadline;
typedef CCodecConfig Config;
Mutexed<Config> mConfig;
Mutexed<std::list<std::unique_ptr<C2Work>>> mWorkDoneQueue;
- Mutexed<std::list<size_t>> mNumDiscardedInputBuffersQueue;
friend class CCodecCallbackImpl;
diff --git a/media/codec2/sfplugin/CCodecBufferChannel.cpp b/media/codec2/sfplugin/CCodecBufferChannel.cpp
index 8e6a3f8..587f83c 100644
--- a/media/codec2/sfplugin/CCodecBufferChannel.cpp
+++ b/media/codec2/sfplugin/CCodecBufferChannel.cpp
@@ -152,6 +152,11 @@
*/
virtual std::unique_ptr<InputBuffers> toArrayMode(size_t size) = 0;
+ /**
+ * Return number of buffers the client owns.
+ */
+ virtual size_t numClientBuffers() const = 0;
+
protected:
// Pool to obtain blocks for input buffers.
std::shared_ptr<C2BlockPool> mPool;
@@ -508,6 +513,14 @@
mBuffers.clear();
}
+ size_t numClientBuffers() const {
+ return std::count_if(
+ mBuffers.begin(), mBuffers.end(),
+ [](const Entry &entry) {
+ return (entry.clientBuffer != nullptr);
+ });
+ }
+
private:
friend class BuffersArrayImpl;
@@ -693,6 +706,14 @@
}
}
+ size_t numClientBuffers() const {
+ return std::count_if(
+ mBuffers.begin(), mBuffers.end(),
+ [](const Entry &entry) {
+ return entry.ownedByClient;
+ });
+ }
+
private:
std::string mImplName; ///< name for debugging
const char *mName; ///< C-string version of name
@@ -756,6 +777,10 @@
mImpl.flush();
}
+ size_t numClientBuffers() const final {
+ return mImpl.numClientBuffers();
+ }
+
private:
BuffersArrayImpl mImpl;
};
@@ -823,6 +848,10 @@
return std::move(array);
}
+ size_t numClientBuffers() const final {
+ return mImpl.numClientBuffers();
+ }
+
virtual sp<Codec2Buffer> alloc(size_t size) {
C2MemoryUsage usage = { C2MemoryUsage::CPU_READ, C2MemoryUsage::CPU_WRITE };
std::shared_ptr<C2LinearBlock> block;
@@ -967,6 +996,10 @@
return std::move(array);
}
+ size_t numClientBuffers() const final {
+ return mImpl.numClientBuffers();
+ }
+
private:
FlexBuffersImpl mImpl;
std::shared_ptr<C2AllocatorStore> mStore;
@@ -1030,6 +1063,10 @@
return std::move(array);
}
+ size_t numClientBuffers() const final {
+ return mImpl.numClientBuffers();
+ }
+
private:
FlexBuffersImpl mImpl;
std::shared_ptr<LocalBufferPool> mLocalBufferPool;
@@ -1065,6 +1102,10 @@
void getArray(Vector<sp<MediaCodecBuffer>> *array) const final {
array->clear();
}
+
+ size_t numClientBuffers() const final {
+ return 0u;
+ }
};
class OutputBuffersArray : public CCodecBufferChannel::OutputBuffers {
@@ -1422,90 +1463,6 @@
count->value = -1;
}
-// CCodecBufferChannel::PipelineCapacity
-
-CCodecBufferChannel::PipelineCapacity::PipelineCapacity()
- : input(0), component(0),
- mName("<UNKNOWN COMPONENT>") {
-}
-
-void CCodecBufferChannel::PipelineCapacity::initialize(
- int newInput,
- int newComponent,
- const char* newName,
- const char* callerTag) {
- input.store(newInput, std::memory_order_relaxed);
- component.store(newComponent, std::memory_order_relaxed);
- mName = newName;
- ALOGV("[%s] %s -- PipelineCapacity::initialize(): "
- "pipeline availability initialized ==> "
- "input = %d, component = %d",
- mName, callerTag ? callerTag : "*",
- newInput, newComponent);
-}
-
-bool CCodecBufferChannel::PipelineCapacity::allocate(const char* callerTag) {
- int prevInput = input.fetch_sub(1, std::memory_order_relaxed);
- int prevComponent = component.fetch_sub(1, std::memory_order_relaxed);
- if (prevInput > 0 && prevComponent > 0) {
- ALOGV("[%s] %s -- PipelineCapacity::allocate() returns true: "
- "pipeline availability -1 all ==> "
- "input = %d, component = %d",
- mName, callerTag ? callerTag : "*",
- prevInput - 1,
- prevComponent - 1);
- return true;
- }
- input.fetch_add(1, std::memory_order_relaxed);
- component.fetch_add(1, std::memory_order_relaxed);
- ALOGV("[%s] %s -- PipelineCapacity::allocate() returns false: "
- "pipeline availability unchanged ==> "
- "input = %d, component = %d",
- mName, callerTag ? callerTag : "*",
- prevInput,
- prevComponent);
- return false;
-}
-
-void CCodecBufferChannel::PipelineCapacity::free(const char* callerTag) {
- int prevInput = input.fetch_add(1, std::memory_order_relaxed);
- int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
- ALOGV("[%s] %s -- PipelineCapacity::free(): "
- "pipeline availability +1 all ==> "
- "input = %d, component = %d",
- mName, callerTag ? callerTag : "*",
- prevInput + 1,
- prevComponent + 1);
-}
-
-int CCodecBufferChannel::PipelineCapacity::freeInputSlots(
- size_t numDiscardedInputBuffers,
- const char* callerTag) {
- int prevInput = input.fetch_add(numDiscardedInputBuffers,
- std::memory_order_relaxed);
- ALOGV("[%s] %s -- PipelineCapacity::freeInputSlots(%zu): "
- "pipeline availability +%zu input ==> "
- "input = %d, component = %d",
- mName, callerTag ? callerTag : "*",
- numDiscardedInputBuffers,
- numDiscardedInputBuffers,
- prevInput + static_cast<int>(numDiscardedInputBuffers),
- component.load(std::memory_order_relaxed));
- return prevInput + static_cast<int>(numDiscardedInputBuffers);
-}
-
-int CCodecBufferChannel::PipelineCapacity::freeComponentSlot(
- const char* callerTag) {
- int prevComponent = component.fetch_add(1, std::memory_order_relaxed);
- ALOGV("[%s] %s -- PipelineCapacity::freeComponentSlot(): "
- "pipeline availability +1 component ==> "
- "input = %d, component = %d",
- mName, callerTag ? callerTag : "*",
- input.load(std::memory_order_relaxed),
- prevComponent + 1);
- return prevComponent + 1;
-}
-
// CCodecBufferChannel::ReorderStash
CCodecBufferChannel::ReorderStash::ReorderStash() {
@@ -1595,7 +1552,6 @@
mFrameIndex(0u),
mFirstValidFrameIndex(0u),
mMetaMode(MODE_NONE),
- mAvailablePipelineCapacity(),
mInputMetEos(false) {
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
buffers->reset(new DummyInputBuffers(""));
@@ -1658,6 +1614,9 @@
work->input.ordinal.customOrdinal = timeUs;
work->input.buffers.clear();
+ uint64_t queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
+ std::vector<std::shared_ptr<C2Buffer>> queuedBuffers;
+
if (buffer->size() > 0u) {
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
std::shared_ptr<C2Buffer> c2buffer;
@@ -1665,11 +1624,9 @@
return -ENOENT;
}
work->input.buffers.push_back(c2buffer);
- } else {
- mAvailablePipelineCapacity.freeInputSlots(1, "queueInputBufferInternal");
- if (eos) {
- flags |= C2FrameData::FLAG_END_OF_STREAM;
- }
+ queuedBuffers.push_back(c2buffer);
+ } else if (eos) {
+ flags |= C2FrameData::FLAG_END_OF_STREAM;
}
work->input.flags = (C2FrameData::flags_t)flags;
// TODO: fill info's
@@ -1680,10 +1637,16 @@
std::list<std::unique_ptr<C2Work>> items;
items.push_back(std::move(work));
+ mPipelineWatcher.lock()->onWorkQueued(
+ queuedFrameIndex,
+ std::move(queuedBuffers),
+ PipelineWatcher::Clock::now());
c2_status_t err = mComponent->queue(&items);
+ if (err != C2_OK) {
+ mPipelineWatcher.lock()->onWorkDone(queuedFrameIndex);
+ }
if (err == C2_OK && eos && buffer->size() > 0u) {
- mCCodecCallback->onWorkQueued(false);
work.reset(new C2Work);
work->input.ordinal.timestamp = timeUs;
work->input.ordinal.frameIndex = mFrameIndex++;
@@ -1693,13 +1656,22 @@
work->input.flags = C2FrameData::FLAG_END_OF_STREAM;
work->worklets.emplace_back(new C2Worklet);
+ queuedFrameIndex = work->input.ordinal.frameIndex.peeku();
+ queuedBuffers.clear();
+
items.clear();
items.push_back(std::move(work));
+
+ mPipelineWatcher.lock()->onWorkQueued(
+ queuedFrameIndex,
+ std::move(queuedBuffers),
+ PipelineWatcher::Clock::now());
err = mComponent->queue(&items);
+ if (err != C2_OK) {
+ mPipelineWatcher.lock()->onWorkDone(queuedFrameIndex);
+ }
}
if (err == C2_OK) {
- mCCodecCallback->onWorkQueued(eos);
-
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
bool released = (*buffers)->releaseBuffer(buffer, nullptr, true);
ALOGV("[%s] queueInputBuffer: buffer %sreleased", mName, released ? "" : "not ");
@@ -1846,14 +1818,16 @@
void CCodecBufferChannel::feedInputBufferIfAvailableInternal() {
while (!mInputMetEos &&
!mReorderStash.lock()->hasPending() &&
- mAvailablePipelineCapacity.allocate("feedInputBufferIfAvailable")) {
+ !mPipelineWatcher.lock()->pipelineFull()) {
sp<MediaCodecBuffer> inBuffer;
size_t index;
{
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
+ if ((*buffers)->numClientBuffers() >= mNumInputSlots) {
+ return;
+ }
if (!(*buffers)->requestNewBuffer(&index, &inBuffer)) {
ALOGV("[%s] no new buffer available", mName);
- mAvailablePipelineCapacity.free("feedInputBufferIfAvailable");
break;
}
}
@@ -2032,15 +2006,12 @@
{
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr, true)) {
- buffers.unlock();
released = true;
- mAvailablePipelineCapacity.freeInputSlots(1, "discardBuffer");
}
}
{
Mutexed<std::unique_ptr<OutputBuffers>>::Locked buffers(mOutputBuffers);
if (*buffers && (*buffers)->releaseBuffer(buffer, nullptr)) {
- buffers.unlock();
released = true;
}
}
@@ -2408,10 +2379,14 @@
// about buffers from the previous generation do not interfere with the
// newly initialized pipeline capacity.
- mAvailablePipelineCapacity.initialize(
- mNumInputSlots,
- mNumInputSlots + mNumOutputSlots,
- mName);
+ {
+ Mutexed<PipelineWatcher>::Locked watcher(mPipelineWatcher);
+ watcher->inputDelay(inputDelay ? inputDelay.value : 0)
+ .pipelineDelay(pipelineDelay ? pipelineDelay.value : 0)
+ .outputDelay(outputDelay ? outputDelay.value : 0)
+ .smoothnessFactor(kSmoothnessFactor);
+ watcher->flush();
+ }
mInputMetEos = false;
mSync.start();
@@ -2472,21 +2447,16 @@
buffer->meta()->setInt64("timeUs", 0);
post = false;
}
- if (mAvailablePipelineCapacity.allocate("requestInitialInputBuffers")) {
- if (post) {
- mCallback->onInputBufferAvailable(index, buffer);
- } else {
- toBeQueued.emplace_back(buffer);
- }
+ if (post) {
+ mCallback->onInputBufferAvailable(index, buffer);
} else {
- ALOGD("[%s] pipeline is full while requesting %zu-th input buffer",
- mName, i);
+ toBeQueued.emplace_back(buffer);
}
}
}
for (const sp<MediaCodecBuffer> &buffer : toBeQueued) {
if (queueInputBufferInternal(buffer) != OK) {
- mAvailablePipelineCapacity.freeComponentSlot("requestInitialInputBuffers");
+ ALOGV("[%s] Error while queueing initial buffers", mName);
}
}
return OK;
@@ -2532,28 +2502,25 @@
(*buffers)->flush(flushedWork);
}
mReorderStash.lock()->flush();
+ mPipelineWatcher.lock()->flush();
}
void CCodecBufferChannel::onWorkDone(
std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
- const C2StreamInitDataInfo::output *initData,
- size_t numDiscardedInputBuffers) {
+ const C2StreamInitDataInfo::output *initData) {
if (handleWork(std::move(work), outputFormat, initData)) {
- mAvailablePipelineCapacity.freeInputSlots(numDiscardedInputBuffers,
- "onWorkDone");
feedInputBufferIfAvailable();
}
}
void CCodecBufferChannel::onInputBufferDone(
- const std::shared_ptr<C2Buffer>& buffer) {
+ uint64_t frameIndex, size_t arrayIndex) {
+ std::shared_ptr<C2Buffer> buffer =
+ mPipelineWatcher.lock()->onInputBufferReleased(frameIndex, arrayIndex);
bool newInputSlotAvailable;
{
Mutexed<std::unique_ptr<InputBuffers>>::Locked buffers(mInputBuffers);
newInputSlotAvailable = (*buffers)->expireComponentBuffer(buffer);
- if (newInputSlotAvailable) {
- mAvailablePipelineCapacity.freeInputSlots(1, "onInputBufferDone");
- }
}
if (newInputSlotAvailable) {
feedInputBufferIfAvailable();
@@ -2573,7 +2540,7 @@
if (work->worklets.size() != 1u
|| !work->worklets.front()
|| !(work->worklets.front()->output.flags & C2FrameData::FLAG_INCOMPLETE)) {
- mAvailablePipelineCapacity.freeComponentSlot("handleWork");
+ mPipelineWatcher.lock()->onWorkDone(work->input.ordinal.frameIndex.peeku());
}
if (work->result == C2_NOT_FOUND) {
@@ -2832,6 +2799,10 @@
return OK;
}
+PipelineWatcher::Clock::duration CCodecBufferChannel::elapsed() {
+ return mPipelineWatcher.lock()->elapsed(PipelineWatcher::Clock::now());
+}
+
void CCodecBufferChannel::setMetaMode(MetaMode mode) {
mMetaMode = mode;
}
diff --git a/media/codec2/sfplugin/CCodecBufferChannel.h b/media/codec2/sfplugin/CCodecBufferChannel.h
index ebc1491..9dccab8 100644
--- a/media/codec2/sfplugin/CCodecBufferChannel.h
+++ b/media/codec2/sfplugin/CCodecBufferChannel.h
@@ -34,6 +34,7 @@
#include <media/ICrypto.h>
#include "InputSurfaceWrapper.h"
+#include "PipelineWatcher.h"
namespace android {
@@ -44,7 +45,6 @@
virtual ~CCodecCallback() = default;
virtual void onError(status_t err, enum ActionCode actionCode) = 0;
virtual void onOutputFramesRendered(int64_t mediaTimeUs, nsecs_t renderTimeNs) = 0;
- virtual void onWorkQueued(bool eos) = 0;
virtual void onOutputBuffersChanged() = 0;
};
@@ -128,22 +128,21 @@
* @param workItems finished work item.
* @param outputFormat new output format if it has changed, otherwise nullptr
* @param initData new init data (CSD) if it has changed, otherwise nullptr
- * @param numDiscardedInputBuffers the number of input buffers that are
- * returned for the first time (not previously returned by
- * onInputBufferDone()).
*/
void onWorkDone(
std::unique_ptr<C2Work> work, const sp<AMessage> &outputFormat,
- const C2StreamInitDataInfo::output *initData,
- size_t numDiscardedInputBuffers);
+ const C2StreamInitDataInfo::output *initData);
/**
* Make an input buffer available for the client as it is no longer needed
* by the codec.
*
- * @param buffer The buffer that becomes unused.
+ * @param frameIndex The index of input work
+ * @param arrayIndex The index of buffer in the input work buffers.
*/
- void onInputBufferDone(const std::shared_ptr<C2Buffer>& buffer);
+ void onInputBufferDone(uint64_t frameIndex, size_t arrayIndex);
+
+ PipelineWatcher::Clock::duration elapsed();
enum MetaMode {
MODE_NONE,
@@ -266,79 +265,7 @@
MetaMode mMetaMode;
- // PipelineCapacity is used in the input buffer gating logic.
- //
- // There are three criteria that need to be met before
- // onInputBufferAvailable() is called:
- // 1. The number of input buffers that have been received by
- // CCodecBufferChannel but not returned via onWorkDone() or
- // onInputBufferDone() does not exceed a certain limit. (Let us call this
- // number the "input" capacity.)
- // 2. The number of work items that have been received by
- // CCodecBufferChannel whose outputs have not been returned from the
- // component (by calling onWorkDone()) does not exceed a certain limit.
- // (Let us call this the "component" capacity.)
- //
- // These three criteria guarantee that a new input buffer that arrives from
- // the invocation of onInputBufferAvailable() will not
- // 1. overload CCodecBufferChannel's input buffers;
- // 2. overload the component; or
- //
- struct PipelineCapacity {
- // The number of available input capacity.
- std::atomic_int input;
- // The number of available component capacity.
- std::atomic_int component;
-
- PipelineCapacity();
- // Set the values of #input and #component.
- void initialize(int newInput, int newComponent,
- const char* newName = "<UNKNOWN COMPONENT>",
- const char* callerTag = nullptr);
-
- // Return true and decrease #input and #component by one if
- // they are all greater than zero; return false otherwise.
- //
- // callerTag is used for logging only.
- //
- // allocate() is called by CCodecBufferChannel to check whether it can
- // receive another input buffer. If the return value is true,
- // onInputBufferAvailable() and onOutputBufferAvailable() can be called
- // afterwards.
- bool allocate(const char* callerTag = nullptr);
-
- // Increase #input and #component by one.
- //
- // callerTag is used for logging only.
- //
- // free() is called by CCodecBufferChannel after allocate() returns true
- // but onInputBufferAvailable() cannot be called for any reasons. It
- // essentially undoes an allocate() call.
- void free(const char* callerTag = nullptr);
-
- // Increase #input by @p numDiscardedInputBuffers.
- //
- // callerTag is used for logging only.
- //
- // freeInputSlots() is called by CCodecBufferChannel when onWorkDone()
- // or onInputBufferDone() is called. @p numDiscardedInputBuffers is
- // provided in onWorkDone(), and is 1 in onInputBufferDone().
- int freeInputSlots(size_t numDiscardedInputBuffers,
- const char* callerTag = nullptr);
-
- // Increase #component by one and return the updated value.
- //
- // callerTag is used for logging only.
- //
- // freeComponentSlot() is called by CCodecBufferChannel when
- // onWorkDone() is called.
- int freeComponentSlot(const char* callerTag = nullptr);
-
- private:
- // Component name. Used for logging.
- const char* mName;
- };
- PipelineCapacity mAvailablePipelineCapacity;
+ Mutexed<PipelineWatcher> mPipelineWatcher;
class ReorderStash {
public:
diff --git a/media/codec2/sfplugin/PipelineWatcher.cpp b/media/codec2/sfplugin/PipelineWatcher.cpp
new file mode 100644
index 0000000..fe0a2c8
--- /dev/null
+++ b/media/codec2/sfplugin/PipelineWatcher.cpp
@@ -0,0 +1,145 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NDEBUG 0
+#define LOG_TAG "PipelineWatcher"
+
+#include <numeric>
+
+#include <log/log.h>
+
+#include "PipelineWatcher.h"
+
+namespace android {
+
+PipelineWatcher &PipelineWatcher::inputDelay(uint32_t value) {
+ mInputDelay = value;
+ return *this;
+}
+
+PipelineWatcher &PipelineWatcher::pipelineDelay(uint32_t value) {
+ mPipelineDelay = value;
+ return *this;
+}
+
+PipelineWatcher &PipelineWatcher::outputDelay(uint32_t value) {
+ mOutputDelay = value;
+ return *this;
+}
+
+PipelineWatcher &PipelineWatcher::smoothnessFactor(uint32_t value) {
+ mSmoothnessFactor = value;
+ return *this;
+}
+
+void PipelineWatcher::onWorkQueued(
+ uint64_t frameIndex,
+ std::vector<std::shared_ptr<C2Buffer>> &&buffers,
+ const Clock::time_point &queuedAt) {
+ ALOGV("onWorkQueued(frameIndex=%llu, buffers(size=%zu), queuedAt=%lld)",
+ (unsigned long long)frameIndex,
+ buffers.size(),
+ (long long)queuedAt.time_since_epoch().count());
+ auto it = mFramesInPipeline.find(frameIndex);
+ if (it != mFramesInPipeline.end()) {
+ ALOGD("onWorkQueued: Duplicate frame index (%llu); previous entry removed",
+ (unsigned long long)frameIndex);
+ (void)mFramesInPipeline.erase(it);
+ }
+ (void)mFramesInPipeline.try_emplace(frameIndex, std::move(buffers), queuedAt);
+}
+
+std::shared_ptr<C2Buffer> PipelineWatcher::onInputBufferReleased(
+ uint64_t frameIndex, size_t arrayIndex) {
+ ALOGV("onInputBufferReleased(frameIndex=%llu, arrayIndex=%zu)",
+ (unsigned long long)frameIndex, arrayIndex);
+ auto it = mFramesInPipeline.find(frameIndex);
+ if (it == mFramesInPipeline.end()) {
+ ALOGD("onInputBufferReleased: frameIndex not found (%llu); ignored",
+ (unsigned long long)frameIndex);
+ return nullptr;
+ }
+ if (it->second.buffers.size() <= arrayIndex) {
+ ALOGD("onInputBufferReleased: buffers at %llu: size %zu, requested index: %zu",
+ (unsigned long long)frameIndex, it->second.buffers.size(), arrayIndex);
+ return nullptr;
+ }
+ std::shared_ptr<C2Buffer> buffer(std::move(it->second.buffers[arrayIndex]));
+ ALOGD_IF(!buffer, "onInputBufferReleased: buffer already released (%llu:%zu)",
+ (unsigned long long)frameIndex, arrayIndex);
+ return buffer;
+}
+
+void PipelineWatcher::onWorkDone(uint64_t frameIndex) {
+ ALOGV("onWorkDone(frameIndex=%llu)", (unsigned long long)frameIndex);
+ auto it = mFramesInPipeline.find(frameIndex);
+ if (it == mFramesInPipeline.end()) {
+ ALOGD("onWorkDone: frameIndex not found (%llu); ignored",
+ (unsigned long long)frameIndex);
+ return;
+ }
+ (void)mFramesInPipeline.erase(it);
+}
+
+void PipelineWatcher::flush() {
+ mFramesInPipeline.clear();
+}
+
+bool PipelineWatcher::pipelineFull() const {
+ if (mFramesInPipeline.size() >=
+ mInputDelay + mPipelineDelay + mOutputDelay + mSmoothnessFactor) {
+ ALOGV("pipelineFull: too many frames in pipeline (%zu)", mFramesInPipeline.size());
+ return true;
+ }
+ size_t sizeWithInputReleased = std::count_if(
+ mFramesInPipeline.begin(),
+ mFramesInPipeline.end(),
+ [](const decltype(mFramesInPipeline)::value_type &value) {
+ for (const std::shared_ptr<C2Buffer> &buffer : value.second.buffers) {
+ if (buffer) {
+ return false;
+ }
+ }
+ return true;
+ });
+ if (sizeWithInputReleased >=
+ mPipelineDelay + mOutputDelay + mSmoothnessFactor) {
+ ALOGV("pipelineFull: too many frames in pipeline, with input released (%zu)",
+ sizeWithInputReleased);
+ return true;
+ }
+ ALOGV("pipeline has room (total: %zu, input released: %zu)",
+ mFramesInPipeline.size(), sizeWithInputReleased);
+ return false;
+}
+
+PipelineWatcher::Clock::duration PipelineWatcher::elapsed(
+ const PipelineWatcher::Clock::time_point &now) const {
+ return std::accumulate(
+ mFramesInPipeline.begin(),
+ mFramesInPipeline.end(),
+ Clock::duration::zero(),
+ [&now](const Clock::duration ¤t,
+ const decltype(mFramesInPipeline)::value_type &value) {
+ Clock::duration elapsed = now - value.second.queuedAt;
+ ALOGV("elapsed: frameIndex = %llu elapsed = %lldms",
+ (unsigned long long)value.first,
+ std::chrono::duration_cast<std::chrono::milliseconds>(elapsed).count());
+ return current > elapsed ? current : elapsed;
+ });
+}
+
+} // namespace android
diff --git a/media/codec2/sfplugin/PipelineWatcher.h b/media/codec2/sfplugin/PipelineWatcher.h
new file mode 100644
index 0000000..ce82298
--- /dev/null
+++ b/media/codec2/sfplugin/PipelineWatcher.h
@@ -0,0 +1,78 @@
+/*
+ * Copyright 2019 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef PIPELINE_WATCHER_H_
+#define PIPELINE_WATCHER_H_
+
+#include <chrono>
+#include <map>
+#include <memory>
+
+#include <C2Work.h>
+
+namespace android {
+
+/**
+ * PipelineWatcher watches the status of the work.
+ */
+class PipelineWatcher {
+public:
+ typedef std::chrono::steady_clock Clock;
+
+ PipelineWatcher()
+ : mInputDelay(0),
+ mPipelineDelay(0),
+ mOutputDelay(0),
+ mSmoothnessFactor(0) {}
+ ~PipelineWatcher() = default;
+
+ PipelineWatcher &inputDelay(uint32_t value);
+ PipelineWatcher &pipelineDelay(uint32_t value);
+ PipelineWatcher &outputDelay(uint32_t value);
+ PipelineWatcher &smoothnessFactor(uint32_t value);
+
+ void onWorkQueued(
+ uint64_t frameIndex,
+ std::vector<std::shared_ptr<C2Buffer>> &&buffers,
+ const Clock::time_point &queuedAt);
+ std::shared_ptr<C2Buffer> onInputBufferReleased(
+ uint64_t frameIndex, size_t arrayIndex);
+ void onWorkDone(uint64_t frameIndex);
+ void flush();
+
+ bool pipelineFull() const;
+ Clock::duration elapsed(const Clock::time_point &now) const;
+
+private:
+ uint32_t mInputDelay;
+ uint32_t mPipelineDelay;
+ uint32_t mOutputDelay;
+ uint32_t mSmoothnessFactor;
+
+ struct Frame {
+ Frame(std::vector<std::shared_ptr<C2Buffer>> &&b,
+ const Clock::time_point &q)
+ : buffers(b),
+ queuedAt(q) {}
+ std::vector<std::shared_ptr<C2Buffer>> buffers;
+ const Clock::time_point queuedAt;
+ };
+ std::map<uint64_t, Frame> mFramesInPipeline;
+};
+
+} // namespace android
+
+#endif // PIPELINE_WATCHER_H_