codec2: offload fence waiting for input surfaces into another thread
Bug: 131800183
Test: manual
Change-Id: I0d05764eea683210ace5a7d1d9108544cdbaee51
diff --git a/media/codec2/sfplugin/C2OMXNode.cpp b/media/codec2/sfplugin/C2OMXNode.cpp
index 3a93c2a..d27fe32 100644
--- a/media/codec2/sfplugin/C2OMXNode.cpp
+++ b/media/codec2/sfplugin/C2OMXNode.cpp
@@ -36,6 +36,7 @@
#include <media/stagefright/MediaErrors.h>
#include <ui/Fence.h>
#include <ui/GraphicBuffer.h>
+#include <utils/Thread.h>
#include "C2OMXNode.h"
@@ -50,16 +51,126 @@
} // namespace
+class C2OMXNode::QueueThread : public Thread {
+public:
+ QueueThread() : Thread(false) {}
+ ~QueueThread() override = default;
+ void queue(
+ const std::shared_ptr<Codec2Client::Component> &comp,
+ int fenceFd,
+ std::unique_ptr<C2Work> &&work,
+ android::base::unique_fd &&fd0,
+ android::base::unique_fd &&fd1) {
+ Mutexed<Jobs>::Locked jobs(mJobs);
+ auto it = jobs->queues.try_emplace(comp, comp, systemTime()).first;
+ it->second.workList.emplace_back(
+ std::move(work), fenceFd, std::move(fd0), std::move(fd1));
+ jobs->cond.broadcast();
+ }
+
+protected:
+ bool threadLoop() override {
+ constexpr nsecs_t kIntervalNs = nsecs_t(10) * 1000 * 1000; // 10ms
+ constexpr nsecs_t kWaitNs = kIntervalNs * 2;
+ for (int i = 0; i < 2; ++i) {
+ Mutexed<Jobs>::Locked jobs(mJobs);
+ nsecs_t nowNs = systemTime();
+ bool queued = false;
+ for (auto it = jobs->queues.begin(); it != jobs->queues.end(); ++it) {
+ Queue &queue = it->second;
+ if (queue.workList.empty()
+ || nowNs - queue.lastQueuedTimestampNs < kIntervalNs) {
+ continue;
+ }
+ std::shared_ptr<Codec2Client::Component> comp = queue.component.lock();
+ if (!comp) {
+ it = jobs->queues.erase(it);
+ continue;
+ }
+ std::list<std::unique_ptr<C2Work>> items;
+ std::vector<int> fenceFds;
+ std::vector<android::base::unique_fd> uniqueFds;
+ while (!queue.workList.empty()) {
+ items.push_back(std::move(queue.workList.front().work));
+ fenceFds.push_back(queue.workList.front().fenceFd);
+ uniqueFds.push_back(std::move(queue.workList.front().fd0));
+ uniqueFds.push_back(std::move(queue.workList.front().fd1));
+ queue.workList.pop_front();
+ }
+
+ jobs.unlock();
+ for (int fenceFd : fenceFds) {
+ sp<Fence> fence(new Fence(fenceFd));
+ fence->waitForever(LOG_TAG);
+ }
+ comp->queue(&items);
+ for (android::base::unique_fd &ufd : uniqueFds) {
+ (void)ufd.release();
+ }
+ jobs.lock();
+
+ queued = true;
+ }
+ if (queued) {
+ return true;
+ }
+ if (i == 0) {
+ jobs.waitForConditionRelative(jobs->cond, kWaitNs);
+ }
+ }
+ return true;
+ }
+
+private:
+ struct WorkFence {
+ WorkFence(std::unique_ptr<C2Work> &&w, int fd) : work(std::move(w)), fenceFd(fd) {}
+
+ WorkFence(
+ std::unique_ptr<C2Work> &&w,
+ int fd,
+ android::base::unique_fd &&uniqueFd0,
+ android::base::unique_fd &&uniqueFd1)
+ : work(std::move(w)),
+ fenceFd(fd),
+ fd0(std::move(uniqueFd0)),
+ fd1(std::move(uniqueFd1)) {}
+
+ std::unique_ptr<C2Work> work;
+ int fenceFd;
+ android::base::unique_fd fd0;
+ android::base::unique_fd fd1;
+ };
+ struct Queue {
+ Queue(const std::shared_ptr<Codec2Client::Component> &comp, nsecs_t timestamp)
+ : component(comp), lastQueuedTimestampNs(timestamp) {}
+ Queue(const Queue &) = delete;
+ Queue &operator =(const Queue &) = delete;
+
+ std::weak_ptr<Codec2Client::Component> component;
+ std::list<WorkFence> workList;
+ nsecs_t lastQueuedTimestampNs;
+ };
+ struct Jobs {
+ std::map<std::weak_ptr<Codec2Client::Component>,
+ Queue,
+ std::owner_less<std::weak_ptr<Codec2Client::Component>>> queues;
+ Condition cond;
+ };
+ Mutexed<Jobs> mJobs;
+};
+
C2OMXNode::C2OMXNode(const std::shared_ptr<Codec2Client::Component> &comp)
: mComp(comp), mFrameIndex(0), mWidth(0), mHeight(0), mUsage(0),
- mAdjustTimestampGapUs(0), mFirstInputFrame(true) {
+ mAdjustTimestampGapUs(0), mFirstInputFrame(true),
+ mQueueThread(new QueueThread) {
android_fdsan_set_error_level(ANDROID_FDSAN_ERROR_LEVEL_WARN_ALWAYS);
+ mQueueThread->run("C2OMXNode", PRIORITY_AUDIO);
}
status_t C2OMXNode::freeNode() {
mComp.reset();
android_fdsan_set_error_level(ANDROID_FDSAN_ERROR_LEVEL_WARN_ONCE);
- return OK;
+ return mQueueThread->requestExitAndWait();
}
status_t C2OMXNode::sendCommand(OMX_COMMANDTYPE cmd, OMX_S32 param) {
@@ -216,11 +327,6 @@
status_t C2OMXNode::emptyBuffer(
buffer_id buffer, const OMXBuffer &omxBuf,
OMX_U32 flags, OMX_TICKS timestamp, int fenceFd) {
- // TODO: better fence handling
- if (fenceFd >= 0) {
- sp<Fence> fence = new Fence(fenceFd);
- fence->waitForever(LOG_TAG);
- }
std::shared_ptr<Codec2Client::Component> comp = mComp.lock();
if (!comp) {
return NO_INIT;
@@ -299,22 +405,8 @@
}
work->worklets.clear();
work->worklets.emplace_back(new C2Worklet);
- std::list<std::unique_ptr<C2Work>> items;
- uint64_t index = work->input.ordinal.frameIndex.peeku();
- items.push_back(std::move(work));
-
- c2_status_t err = comp->queue(&items);
- if (err != C2_OK) {
- (void)fd0.release();
- (void)fd1.release();
- return UNKNOWN_ERROR;
- }
-
- mBufferIdsInUse.lock()->emplace(index, buffer);
-
- // release ownership of the fds
- (void)fd0.release();
- (void)fd1.release();
+ mBufferIdsInUse.lock()->emplace(work->input.ordinal.frameIndex.peeku(), buffer);
+ mQueueThread->queue(comp, fenceFd, std::move(work), std::move(fd0), std::move(fd1));
return OK;
}
diff --git a/media/codec2/sfplugin/C2OMXNode.h b/media/codec2/sfplugin/C2OMXNode.h
index 3ca6c0a..1717c96 100644
--- a/media/codec2/sfplugin/C2OMXNode.h
+++ b/media/codec2/sfplugin/C2OMXNode.h
@@ -113,6 +113,9 @@
c2_cntr64_t mPrevCodecTimestamp; // adjusted (codec) timestamp for previous frame
Mutexed<std::map<uint64_t, buffer_id>> mBufferIdsInUse;
+
+ class QueueThread;
+ sp<QueueThread> mQueueThread;
};
} // namespace android