transcoding: add watchdog to prevent transcoder hang

Add a watchdog to monitor transcoder progress. Make transcoder
report heart beat regularly as long as there is new progress.
If heartbeat stops, watchdog will initiate a timeout to

1) Abandon old TranscoderWrapper. We try to shut it down nicely,
however, if it's really stuck, we'll have to leave it there.
2) Instantiate a new TranscoderWrapper with new looper.
3) Report Watchdog timeout to client.

Tests:
- New unit tests to MediaTranscoder, TranscodingSessionController
and MediaTranscodingService's simulated test (for error code reporting).
- Manually tested that long recording works properly without timeout.

bug: 169453212
Change-Id: Iae89e49e8e12d6078dc49eef2960efd03e91c431
diff --git a/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp b/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
index 389b941..0efe85d 100644
--- a/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
+++ b/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
@@ -83,12 +83,14 @@
     }
 }
 
-bool MediaSampleWriter::init(int fd, const std::weak_ptr<CallbackInterface>& callbacks) {
-    return init(DefaultMuxer::create(fd), callbacks);
+bool MediaSampleWriter::init(int fd, const std::weak_ptr<CallbackInterface>& callbacks,
+                             int64_t heartBeatIntervalUs) {
+    return init(DefaultMuxer::create(fd), callbacks, heartBeatIntervalUs);
 }
 
 bool MediaSampleWriter::init(const std::shared_ptr<MediaSampleWriterMuxerInterface>& muxer,
-                             const std::weak_ptr<CallbackInterface>& callbacks) {
+                             const std::weak_ptr<CallbackInterface>& callbacks,
+                             int64_t heartBeatIntervalUs) {
     if (callbacks.lock() == nullptr) {
         LOG(ERROR) << "Callback object cannot be null";
         return false;
@@ -106,6 +108,7 @@
     mState = INITIALIZED;
     mMuxer = muxer;
     mCallbacks = callbacks;
+    mHeartBeatIntervalUs = heartBeatIntervalUs;
     return true;
 }
 
@@ -219,6 +222,7 @@
 media_status_t MediaSampleWriter::runWriterLoop(bool* wasStopped) NO_THREAD_SAFETY_ANALYSIS {
     AMediaCodecBufferInfo bufferInfo;
     int32_t lastProgressUpdate = 0;
+    bool progressSinceLastReport = false;
     int trackEosCount = 0;
 
     // Set the "primary" track that will be used to determine progress to the track with longest
@@ -232,6 +236,10 @@
         }
     }
 
+    std::chrono::microseconds updateInterval(mHeartBeatIntervalUs);
+    std::chrono::system_clock::time_point nextUpdateTime =
+            std::chrono::system_clock::now() + updateInterval;
+
     while (true) {
         if (trackEosCount >= mTracks.size()) {
             break;
@@ -242,7 +250,21 @@
         {
             std::unique_lock lock(mMutex);
             while (mSampleQueue.empty() && mState == STARTED) {
-                mSampleSignal.wait(lock);
+                if (mHeartBeatIntervalUs <= 0) {
+                    mSampleSignal.wait(lock);
+                    continue;
+                }
+
+                if (mSampleSignal.wait_until(lock, nextUpdateTime) == std::cv_status::timeout) {
+                    // Send heart-beat if there is any progress since last update time.
+                    if (progressSinceLastReport) {
+                        if (auto callbacks = mCallbacks.lock()) {
+                            callbacks->onHeartBeat(this);
+                        }
+                        progressSinceLastReport = false;
+                    }
+                    nextUpdateTime += updateInterval;
+                }
             }
 
             if (mState == STOPPED) {
@@ -306,6 +328,7 @@
                 }
                 lastProgressUpdate = progress;
             }
+            progressSinceLastReport = true;
         }
     }
 
diff --git a/media/libmediatranscoding/transcoder/MediaTranscoder.cpp b/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
index d58d88d..74ddce4 100644
--- a/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
+++ b/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
@@ -206,13 +206,18 @@
     mCallbacks->onProgressUpdate(this, progress);
 }
 
-MediaTranscoder::MediaTranscoder(const std::shared_ptr<CallbackInterface>& callbacks, pid_t pid,
-                                 uid_t uid)
-      : mCallbacks(callbacks), mPid(pid), mUid(uid) {}
+void MediaTranscoder::onHeartBeat(const MediaSampleWriter* writer __unused) {
+    // Signal heart-beat to the client.
+    mCallbacks->onHeartBeat(this);
+}
+
+MediaTranscoder::MediaTranscoder(const std::shared_ptr<CallbackInterface>& callbacks,
+                                 int64_t heartBeatIntervalUs, pid_t pid, uid_t uid)
+      : mCallbacks(callbacks), mHeartBeatIntervalUs(heartBeatIntervalUs), mPid(pid), mUid(uid) {}
 
 std::shared_ptr<MediaTranscoder> MediaTranscoder::create(
-        const std::shared_ptr<CallbackInterface>& callbacks, pid_t pid, uid_t uid,
-        const std::shared_ptr<ndk::ScopedAParcel>& pausedState) {
+        const std::shared_ptr<CallbackInterface>& callbacks, int64_t heartBeatIntervalUs, pid_t pid,
+        uid_t uid, const std::shared_ptr<ndk::ScopedAParcel>& pausedState) {
     if (pausedState != nullptr) {
         LOG(INFO) << "Initializing from paused state.";
     }
@@ -221,7 +226,8 @@
         return nullptr;
     }
 
-    return std::shared_ptr<MediaTranscoder>(new MediaTranscoder(callbacks, pid, uid));
+    return std::shared_ptr<MediaTranscoder>(
+            new MediaTranscoder(callbacks, heartBeatIntervalUs, pid, uid));
 }
 
 media_status_t MediaTranscoder::configureSource(int fd) {
@@ -348,7 +354,7 @@
     }
 
     mSampleWriter = MediaSampleWriter::Create();
-    const bool initOk = mSampleWriter->init(fd, shared_from_this());
+    const bool initOk = mSampleWriter->init(fd, shared_from_this(), mHeartBeatIntervalUs);
 
     if (!initOk) {
         LOG(ERROR) << "Unable to initialize sample writer with destination fd: " << fd;
diff --git a/media/libmediatranscoding/transcoder/benchmark/MediaTranscoderBenchmark.cpp b/media/libmediatranscoding/transcoder/benchmark/MediaTranscoderBenchmark.cpp
index 712f8fc..ac3b2c0 100644
--- a/media/libmediatranscoding/transcoder/benchmark/MediaTranscoderBenchmark.cpp
+++ b/media/libmediatranscoding/transcoder/benchmark/MediaTranscoderBenchmark.cpp
@@ -60,6 +60,8 @@
     virtual void onProgressUpdate(const MediaTranscoder* transcoder __unused,
                                   int32_t progress __unused) override {}
 
+    virtual void onHeartBeat(const MediaTranscoder* transcoder __unused) override {}
+
     virtual void onCodecResourceLost(const MediaTranscoder* transcoder __unused,
                                      const std::shared_ptr<ndk::ScopedAParcel>& pausedState
                                              __unused) override {}
diff --git a/media/libmediatranscoding/transcoder/include/media/MediaSampleWriter.h b/media/libmediatranscoding/transcoder/include/media/MediaSampleWriter.h
index 080f2b7..23a234b 100644
--- a/media/libmediatranscoding/transcoder/include/media/MediaSampleWriter.h
+++ b/media/libmediatranscoding/transcoder/include/media/MediaSampleWriter.h
@@ -90,6 +90,9 @@
         /** Sample writer progress update in percent. */
         virtual void onProgressUpdate(const MediaSampleWriter* writer, int32_t progress) = 0;
 
+        /** Sample writer heart-beat signal. */
+        virtual void onHeartBeat(const MediaSampleWriter* writer) = 0;
+
         virtual ~CallbackInterface() = default;
     };
 
@@ -101,18 +104,25 @@
      * @param fd An open file descriptor to write to. The caller is responsible for closing this
      *        file descriptor and it is safe to do so once this method returns.
      * @param callbacks Client callback object that gets called by the sample writer.
+     * @param heartBeatIntervalUs Interval (in microsecond) at which the sample writer should send a
+     *        heart-beat to onProgressUpdate() to indicate it's making progress. Value <=0 indicates
+     *        that the heartbeat is not required.
      * @return True if the writer was successfully initialized.
      */
-    bool init(int fd, const std::weak_ptr<CallbackInterface>& callbacks /* nonnull */);
+    bool init(int fd, const std::weak_ptr<CallbackInterface>& callbacks /* nonnull */,
+              int64_t heartBeatIntervalUs = -1);
 
     /**
      * Initializes the sample writer with a custom muxer interface implementation.
      * @param muxer The custom muxer interface implementation.
      * @param @param callbacks Client callback object that gets called by the sample writer.
+     * @param heartBeatIntervalUs Interval (in microsecond) at which the sample writer should send a
+     *        heart-beat to onProgressUpdate() to indicate it's making progress.
      * @return True if the writer was successfully initialized.
      */
     bool init(const std::shared_ptr<MediaSampleWriterMuxerInterface>& muxer /* nonnull */,
-              const std::weak_ptr<CallbackInterface>& callbacks /* nonnull */);
+              const std::weak_ptr<CallbackInterface>& callbacks /* nonnull */,
+              int64_t heartBeatIntervalUs = -1);
 
     /**
      * Adds a new track to the sample writer. Tracks must be added after the sample writer has been
@@ -185,6 +195,7 @@
 
     std::weak_ptr<CallbackInterface> mCallbacks;
     std::shared_ptr<MediaSampleWriterMuxerInterface> mMuxer;
+    int64_t mHeartBeatIntervalUs;
 
     std::mutex mMutex;  // Protects sample queue and state.
     std::condition_variable mSampleSignal;
diff --git a/media/libmediatranscoding/transcoder/include/media/MediaTranscoder.h b/media/libmediatranscoding/transcoder/include/media/MediaTranscoder.h
index 4e11ef5..8776dc9 100644
--- a/media/libmediatranscoding/transcoder/include/media/MediaTranscoder.h
+++ b/media/libmediatranscoding/transcoder/include/media/MediaTranscoder.h
@@ -50,6 +50,9 @@
         /** Transcoder progress update reported in percent from 0 to 100. */
         virtual void onProgressUpdate(const MediaTranscoder* transcoder, int32_t progress) = 0;
 
+        /** Transcoder heart-beat signal. */
+        virtual void onHeartBeat(const MediaTranscoder* transcoder) = 0;
+
         /**
          * Transcoder lost codec resources and paused operations. The client can resume transcoding
          * again when resources are available by either:
@@ -70,7 +73,7 @@
      * possible to change any configurations on a paused transcoder.
      */
     static std::shared_ptr<MediaTranscoder> create(
-            const std::shared_ptr<CallbackInterface>& callbacks,
+            const std::shared_ptr<CallbackInterface>& callbacks, int64_t heartBeatIntervalUs = -1,
             pid_t pid = AMEDIACODEC_CALLING_PID, uid_t uid = AMEDIACODEC_CALLING_UID,
             const std::shared_ptr<ndk::ScopedAParcel>& pausedState = nullptr);
 
@@ -120,7 +123,8 @@
     virtual ~MediaTranscoder() = default;
 
 private:
-    MediaTranscoder(const std::shared_ptr<CallbackInterface>& callbacks, pid_t pid, uid_t uid);
+    MediaTranscoder(const std::shared_ptr<CallbackInterface>& callbacks,
+                    int64_t heartBeatIntervalUs, pid_t pid, uid_t uid);
 
     // MediaTrackTranscoderCallback
     virtual void onTrackFormatAvailable(const MediaTrackTranscoder* transcoder) override;
@@ -134,6 +138,7 @@
     virtual void onFinished(const MediaSampleWriter* writer, media_status_t status) override;
     virtual void onStopped(const MediaSampleWriter* writer) override;
     virtual void onProgressUpdate(const MediaSampleWriter* writer, int32_t progress) override;
+    virtual void onHeartBeat(const MediaSampleWriter* writer) override;
     // ~MediaSampleWriter::CallbackInterface
 
     void onThreadFinished(const void* thread, media_status_t threadStatus, bool threadStopped);
@@ -147,6 +152,7 @@
     std::vector<std::shared_ptr<MediaTrackTranscoder>> mTrackTranscoders;
     std::mutex mTracksAddedMutex;
     std::unordered_set<const MediaTrackTranscoder*> mTracksAdded GUARDED_BY(mTracksAddedMutex);
+    int64_t mHeartBeatIntervalUs;
     pid_t mPid;
     uid_t mUid;
 
diff --git a/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp b/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
index 0a41b00..8b3905c 100644
--- a/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
+++ b/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
@@ -210,6 +210,8 @@
         mLastProgress = progress;
         mProgressUpdateCount++;
     }
+
+    virtual void onHeartBeat(const MediaSampleWriter* writer __unused) override {}
     // ~MediaSampleWriter::CallbackInterface
 
     void waitForWritingFinished() {
diff --git a/media/libmediatranscoding/transcoder/tests/MediaTranscoderTests.cpp b/media/libmediatranscoding/transcoder/tests/MediaTranscoderTests.cpp
index 54d8b89..4e33ec3 100644
--- a/media/libmediatranscoding/transcoder/tests/MediaTranscoderTests.cpp
+++ b/media/libmediatranscoding/transcoder/tests/MediaTranscoderTests.cpp
@@ -81,6 +81,11 @@
         }
     }
 
+    virtual void onHeartBeat(const MediaTranscoder* transcoder __unused) override {
+        std::unique_lock<std::mutex> lock(mMutex);
+        mHeartBeatCount++;
+    }
+
     virtual void onCodecResourceLost(const MediaTranscoder* transcoder __unused,
                                      const std::shared_ptr<ndk::ScopedAParcel>& pausedState
                                              __unused) override {}
@@ -100,6 +105,7 @@
     }
     media_status_t mStatus = AMEDIA_OK;
     bool mFinished = false;
+    int32_t mHeartBeatCount = 0;
 
 private:
     std::mutex mMutex;
@@ -143,6 +149,7 @@
 
     typedef enum {
         kRunToCompletion,
+        kCheckHeartBeat,
         kCancelAfterProgress,
         kCancelAfterStart,
         kPauseAfterProgress,
@@ -152,8 +159,9 @@
     using FormatConfigurationCallback = std::function<AMediaFormat*(AMediaFormat*)>;
     media_status_t transcodeHelper(const char* srcPath, const char* destPath,
                                    FormatConfigurationCallback formatCallback,
-                                   TranscodeExecutionControl executionControl = kRunToCompletion) {
-        auto transcoder = MediaTranscoder::create(mCallbacks);
+                                   TranscodeExecutionControl executionControl = kRunToCompletion,
+                                   int64_t heartBeatIntervalUs = -1) {
+        auto transcoder = MediaTranscoder::create(mCallbacks, heartBeatIntervalUs);
         EXPECT_NE(transcoder, nullptr);
 
         const int srcFd = open(srcPath, O_RDONLY);
@@ -200,6 +208,18 @@
             case kPauseAfterStart:
                 transcoder->pause(&pausedState);
                 break;
+            case kCheckHeartBeat: {
+                mCallbacks->waitForProgressMade();
+                auto startTime = std::chrono::system_clock::now();
+                mCallbacks->waitForTranscodingFinished();
+                auto finishTime = std::chrono::system_clock::now();
+                int32_t expectedCount =
+                        (finishTime - startTime) / std::chrono::microseconds(heartBeatIntervalUs);
+                // Here we relax the expected count by 1, in case the last heart-beat just
+                // missed the window, other than that the count should be exact.
+                EXPECT_GE(mCallbacks->mHeartBeatCount, expectedCount - 1);
+                break;
+            }
             case kRunToCompletion:
             default:
                 mCallbacks->waitForTranscodingFinished();
@@ -430,6 +450,18 @@
     }
 }
 
+TEST_F(MediaTranscoderTests, TestHeartBeat) {
+    const char* srcPath = "/data/local/tmp/TranscodingTestAssets/longtest_15s.mp4";
+    const char* destPath = "/data/local/tmp/MediaTranscoder_HeartBeat.MP4";
+
+    // Use a shorter value of 500ms than the default 1000ms to get more heart beat for testing.
+    const int64_t heartBeatIntervalUs = 500000LL;
+    EXPECT_EQ(transcodeHelper(srcPath, destPath, getAVCVideoFormat, kCheckHeartBeat,
+                              heartBeatIntervalUs),
+              AMEDIA_OK);
+    EXPECT_TRUE(mCallbacks->mFinished);
+}
+
 }  // namespace android
 
 int main(int argc, char** argv) {
diff --git a/media/libmediatranscoding/transcoder/tests/fuzzer/media_transcoder_fuzzer.cpp b/media/libmediatranscoding/transcoder/tests/fuzzer/media_transcoder_fuzzer.cpp
index 48d3406..ec36c0f 100644
--- a/media/libmediatranscoding/transcoder/tests/fuzzer/media_transcoder_fuzzer.cpp
+++ b/media/libmediatranscoding/transcoder/tests/fuzzer/media_transcoder_fuzzer.cpp
@@ -88,6 +88,8 @@
         }
     }
 
+    virtual void onHeartBeat(const MediaTranscoder* transcoder UNUSED_PARAM) override {}
+
     virtual void onCodecResourceLost(const MediaTranscoder* transcoder UNUSED_PARAM,
                                      const shared_ptr<ndk::ScopedAParcel>& pausedState
                                              UNUSED_PARAM) override {}