transcoding: add watchdog to prevent transcoder hang

Add a watchdog to monitor transcoder progress. Make transcoder
report heart beat regularly as long as there is new progress.
If heartbeat stops, watchdog will initiate a timeout to

1) Abandon old TranscoderWrapper. We try to shut it down nicely,
however, if it's really stuck, we'll have to leave it there.
2) Instantiate a new TranscoderWrapper with new looper.
3) Report Watchdog timeout to client.

Tests:
- New unit tests to MediaTranscoder, TranscodingSessionController
and MediaTranscodingService's simulated test (for error code reporting).
- Manually tested that long recording works properly without timeout.

bug: 169453212
Change-Id: Iae89e49e8e12d6078dc49eef2960efd03e91c431
diff --git a/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp b/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
index 9a1c272..2e9daee 100644
--- a/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
+++ b/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
@@ -118,46 +118,55 @@
 
 class TestTranscoder : public TranscoderInterface {
 public:
-    TestTranscoder() : mLastError(TranscodingErrorCode::kUnknown) {}
+    TestTranscoder() : mLastError(TranscodingErrorCode::kUnknown), mGeneration(0) {}
     virtual ~TestTranscoder() {}
 
     // TranscoderInterface
-    void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& /*cb*/) override {}
-
     void start(ClientIdType clientId, SessionIdType sessionId,
                const TranscodingRequestParcel& /*request*/,
                const std::shared_ptr<ITranscodingClientCallback>& /*clientCallback*/) override {
-        mEventQueue.push_back(Start(clientId, sessionId));
+        append(Start(clientId, sessionId));
     }
     void pause(ClientIdType clientId, SessionIdType sessionId) override {
-        mEventQueue.push_back(Pause(clientId, sessionId));
+        append(Pause(clientId, sessionId));
     }
     void resume(ClientIdType clientId, SessionIdType sessionId,
                 const TranscodingRequestParcel& /*request*/,
                 const std::shared_ptr<ITranscodingClientCallback>& /*clientCallback*/) override {
-        mEventQueue.push_back(Resume(clientId, sessionId));
+        append(Resume(clientId, sessionId));
     }
-    void stop(ClientIdType clientId, SessionIdType sessionId) override {
-        mEventQueue.push_back(Stop(clientId, sessionId));
+    void stop(ClientIdType clientId, SessionIdType sessionId, bool abandon) override {
+        append(abandon ? Abandon(clientId, sessionId) : Stop(clientId, sessionId));
     }
 
     void onFinished(ClientIdType clientId, SessionIdType sessionId) {
-        mEventQueue.push_back(Finished(clientId, sessionId));
+        append(Finished(clientId, sessionId));
     }
 
     void onFailed(ClientIdType clientId, SessionIdType sessionId, TranscodingErrorCode err) {
-        mLastError = err;
-        mEventQueue.push_back(Failed(clientId, sessionId));
+        append(Failed(clientId, sessionId), err);
+    }
+
+    void onCreated() {
+        std::scoped_lock lock{mLock};
+        mGeneration++;
     }
 
     TranscodingErrorCode getLastError() {
+        std::scoped_lock lock{mLock};
+        // Clear last error.
         TranscodingErrorCode result = mLastError;
-        mLastError = TranscodingErrorCode::kUnknown;
+        mLastError = TranscodingErrorCode::kNoError;
         return result;
     }
 
+    int32_t getGeneration() {
+        std::scoped_lock lock{mLock};
+        return mGeneration;
+    }
+
     struct Event {
-        enum { NoEvent, Start, Pause, Resume, Stop, Finished, Failed } type;
+        enum { NoEvent, Start, Pause, Resume, Stop, Finished, Failed, Abandon } type;
         ClientIdType clientId;
         SessionIdType sessionId;
     };
@@ -175,21 +184,47 @@
     DECLARE_EVENT(Stop);
     DECLARE_EVENT(Finished);
     DECLARE_EVENT(Failed);
+    DECLARE_EVENT(Abandon);
 
-    const Event& popEvent() {
+    // Push 1 event to back.
+    void append(const Event& event,
+                const TranscodingErrorCode err = TranscodingErrorCode::kNoError) {
+        std::unique_lock lock(mLock);
+
+        mEventQueue.push_back(event);
+        // Error is sticky, non-error event will not erase it, only getLastError()
+        // clears last error.
+        if (err != TranscodingErrorCode::kNoError) {
+            mLastError = err;
+        }
+        mCondition.notify_one();
+    }
+
+    // Pop 1 event from front, wait for up to timeoutUs if empty.
+    const Event& popEvent(int64_t timeoutUs = 0) {
+        std::unique_lock lock(mLock);
+
+        if (mEventQueue.empty() && timeoutUs > 0) {
+            mCondition.wait_for(lock, std::chrono::microseconds(timeoutUs));
+        }
+
         if (mEventQueue.empty()) {
             mPoppedEvent = NoEvent;
         } else {
             mPoppedEvent = *mEventQueue.begin();
             mEventQueue.pop_front();
         }
+
         return mPoppedEvent;
     }
 
 private:
+    std::mutex mLock;
+    std::condition_variable mCondition;
     Event mPoppedEvent;
     std::list<Event> mEventQueue;
     TranscodingErrorCode mLastError;
+    int32_t mGeneration;
 };
 
 bool operator==(const TestTranscoder::Event& lhs, const TestTranscoder::Event& rhs) {
@@ -248,6 +283,7 @@
 class TranscodingSessionControllerTest : public ::testing::Test {
 public:
     TranscodingSessionControllerTest() { ALOGI("TranscodingSessionControllerTest created"); }
+    ~TranscodingSessionControllerTest() { ALOGD("TranscodingSessionControllerTest destroyed"); }
 
     void SetUp() override {
         ALOGI("TranscodingSessionControllerTest set up");
@@ -255,8 +291,16 @@
         mUidPolicy.reset(new TestUidPolicy());
         mResourcePolicy.reset(new TestResourcePolicy());
         mThermalPolicy.reset(new TestThermalPolicy());
-        mController.reset(new TranscodingSessionController(mTranscoder, mUidPolicy, mResourcePolicy,
-                                                           mThermalPolicy));
+        mController.reset(new TranscodingSessionController(
+                [this](const std::shared_ptr<TranscoderCallbackInterface>& /*cb*/,
+                       int64_t /*heartBeatIntervalUs*/) {
+                    // Here we require that the SessionController clears out all its refcounts of
+                    // the transcoder object when it calls create.
+                    EXPECT_EQ(mTranscoder.use_count(), 1);
+                    mTranscoder->onCreated();
+                    return mTranscoder;
+                },
+                mUidPolicy, mResourcePolicy, mThermalPolicy));
         mUidPolicy->setCallback(mController);
 
         // Set priority only, ignore other fields for now.
@@ -274,7 +318,15 @@
 
     void TearDown() override { ALOGI("TranscodingSessionControllerTest tear down"); }
 
-    ~TranscodingSessionControllerTest() { ALOGD("TranscodingSessionControllerTest destroyed"); }
+    void expectTimeout(int64_t clientId, int32_t sessionId, int32_t generation) {
+        EXPECT_EQ(mTranscoder->popEvent(2900000), TestTranscoder::NoEvent);
+        EXPECT_EQ(mTranscoder->popEvent(200000), TestTranscoder::Abandon(clientId, sessionId));
+        EXPECT_EQ(mTranscoder->popEvent(100000), TestTranscoder::Failed(clientId, sessionId));
+        EXPECT_EQ(mTranscoder->getLastError(), TranscodingErrorCode::kWatchdogTimeout);
+        // Should have created new transcoder.
+        EXPECT_EQ(mTranscoder->getGeneration(), generation);
+        EXPECT_EQ(mTranscoder.use_count(), 2);
+    }
 
     std::shared_ptr<TestTranscoder> mTranscoder;
     std::shared_ptr<TestUidPolicy> mUidPolicy;
@@ -802,4 +854,52 @@
     EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(2), SESSION(0)));
 }
 
+TEST_F(TranscodingSessionControllerTest, TestTranscoderWatchdogTimeout) {
+    ALOGD("TestTranscoderWatchdogTimeout");
+
+    // Submit session to CLIENT(0) in UID(0).
+    // Should start immediately (because this is the only session).
+    mController->submit(CLIENT(0), SESSION(0), UID(0), mRealtimeRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(0)));
+
+    int32_t expectedGen = 2;
+    // Test 1: If not sending keep-alive at all, timeout after 3 seconds.
+    expectTimeout(CLIENT(0), SESSION(0), expectedGen++);
+
+    // Test 2: No timeout as long as keep-alive coming; timeout after keep-alive stops.
+    mController->submit(CLIENT(0), SESSION(1), UID(0), mRealtimeRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(1)));
+    for (int i = 0; i < 5; i++) {
+        EXPECT_EQ(mTranscoder->popEvent(1000000), TestTranscoder::NoEvent);
+        mController->onHeartBeat(CLIENT(0), SESSION(1));
+    }
+    expectTimeout(CLIENT(0), SESSION(1), expectedGen++);
+
+    // Test 3a: No timeout for paused session even if no keep-alive is sent.
+    mController->submit(CLIENT(0), SESSION(2), UID(0), mOfflineRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(2)));
+    // Trigger a pause by sending a resource lost.
+    mController->onResourceLost(CLIENT(0), SESSION(2));
+    EXPECT_EQ(mTranscoder->popEvent(3100000), TestTranscoder::NoEvent);
+    mController->onResourceAvailable();
+    EXPECT_EQ(mTranscoder->popEvent(100000), TestTranscoder::Resume(CLIENT(0), SESSION(2)));
+    expectTimeout(CLIENT(0), SESSION(2), expectedGen++);
+
+    // Test 3b: No timeout for paused session even if no keep-alive is sent.
+    mController->submit(CLIENT(0), SESSION(3), UID(0), mOfflineRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(3)));
+    // Let the session run almost to timeout, to test timeout reset after pause.
+    EXPECT_EQ(mTranscoder->popEvent(2900000), TestTranscoder::NoEvent);
+    // Trigger a pause by submitting a higher-priority request.
+    mController->submit(CLIENT(0), SESSION(4), UID(0), mRealtimeRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), SESSION(3)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(4)));
+    // Finish the higher-priority session, lower-priority session should resume,
+    // and the timeout should reset to full value.
+    mController->onFinish(CLIENT(0), SESSION(4));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(0), SESSION(4)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), SESSION(3)));
+    expectTimeout(CLIENT(0), SESSION(3), expectedGen++);
+}
+
 }  // namespace android