transcoding: add watchdog to prevent transcoder hang

Add a watchdog to monitor transcoder progress. Make transcoder
report heart beat regularly as long as there is new progress.
If heartbeat stops, watchdog will initiate a timeout to

1) Abandon old TranscoderWrapper. We try to shut it down nicely,
however, if it's really stuck, we'll have to leave it there.
2) Instantiate a new TranscoderWrapper with new looper.
3) Report Watchdog timeout to client.

Tests:
- New unit tests to MediaTranscoder, TranscodingSessionController
and MediaTranscodingService's simulated test (for error code reporting).
- Manually tested that long recording works properly without timeout.

bug: 169453212
Change-Id: Iae89e49e8e12d6078dc49eef2960efd03e91c431
diff --git a/media/libmediatranscoding/TranscoderWrapper.cpp b/media/libmediatranscoding/TranscoderWrapper.cpp
index 8410850..4bd4105 100644
--- a/media/libmediatranscoding/TranscoderWrapper.cpp
+++ b/media/libmediatranscoding/TranscoderWrapper.cpp
@@ -113,6 +113,12 @@
     case Event::Progress:
         typeStr = "Progress";
         break;
+    case Event::HeartBeat:
+        typeStr = "HeartBeat";
+        break;
+    case Event::Abandon:
+        typeStr = "Abandon";
+        break;
     default:
         return "(unknown)";
     }
@@ -154,6 +160,13 @@
         }
     }
 
+    virtual void onHeartBeat(const MediaTranscoder* transcoder __unused) override {
+        auto owner = mOwner.lock();
+        if (owner != nullptr) {
+            owner->onHeartBeat(mClientId, mSessionId);
+        }
+    }
+
     virtual void onCodecResourceLost(const MediaTranscoder* transcoder __unused,
                                      const std::shared_ptr<ndk::ScopedAParcel>& pausedState
                                              __unused) override {
@@ -166,12 +179,18 @@
     SessionIdType mSessionId;
 };
 
-TranscoderWrapper::TranscoderWrapper() : mCurrentClientId(0), mCurrentSessionId(-1) {
-    std::thread(&TranscoderWrapper::threadLoop, this).detach();
+TranscoderWrapper::TranscoderWrapper(const std::shared_ptr<TranscoderCallbackInterface>& cb,
+                                     int64_t heartBeatIntervalUs)
+      : mCallback(cb),
+        mHeartBeatIntervalUs(heartBeatIntervalUs),
+        mCurrentClientId(0),
+        mCurrentSessionId(-1),
+        mLooperReady(false) {
+    ALOGV("TranscoderWrapper CTOR: %p", this);
 }
 
-void TranscoderWrapper::setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) {
-    mCallback = cb;
+TranscoderWrapper::~TranscoderWrapper() {
+    ALOGV("TranscoderWrapper DTOR: %p", this);
 }
 
 static bool isResourceError(media_status_t err) {
@@ -250,7 +269,7 @@
     });
 }
 
-void TranscoderWrapper::stop(ClientIdType clientId, SessionIdType sessionId) {
+void TranscoderWrapper::stop(ClientIdType clientId, SessionIdType sessionId, bool abandon) {
     queueEvent(Event::Stop, clientId, sessionId, [=] {
         if (mTranscoder != nullptr && clientId == mCurrentClientId &&
             sessionId == mCurrentSessionId) {
@@ -268,6 +287,10 @@
         }
         // No callback needed for stop.
     });
+
+    if (abandon) {
+        queueEvent(Event::Abandon, 0, 0, nullptr);
+    }
 }
 
 void TranscoderWrapper::onFinish(ClientIdType clientId, SessionIdType sessionId) {
@@ -311,6 +334,15 @@
             progress);
 }
 
+void TranscoderWrapper::onHeartBeat(ClientIdType clientId, SessionIdType sessionId) {
+    queueEvent(Event::HeartBeat, clientId, sessionId, [=] {
+        auto callback = mCallback.lock();
+        if (callback != nullptr) {
+            callback->onHeartBeat(clientId, sessionId);
+        }
+    });
+}
+
 media_status_t TranscoderWrapper::setupTranscoder(
         ClientIdType clientId, SessionIdType sessionId, const TranscodingRequestParcel& request,
         const std::shared_ptr<ITranscodingClientCallback>& clientCb,
@@ -353,8 +385,8 @@
     mCurrentClientId = clientId;
     mCurrentSessionId = sessionId;
     mTranscoderCb = std::make_shared<CallbackImpl>(shared_from_this(), clientId, sessionId);
-    mTranscoder = MediaTranscoder::create(mTranscoderCb, request.clientPid, request.clientUid,
-                                          pausedState);
+    mTranscoder = MediaTranscoder::create(mTranscoderCb, mHeartBeatIntervalUs, request.clientPid,
+                                          request.clientUid, pausedState);
     if (mTranscoder == nullptr) {
         ALOGE("failed to create transcoder");
         return AMEDIA_ERROR_UNKNOWN;
@@ -486,6 +518,15 @@
                                    const std::function<void()> runnable, int32_t arg) {
     std::scoped_lock lock{mLock};
 
+    if (!mLooperReady) {
+        // A shared_ptr to ourselves is given to the thread's stack, so that the TranscoderWrapper
+        // object doesn't go away until the thread exits. When a watchdog timeout happens, this
+        // allows the session controller to release its reference to the TranscoderWrapper object
+        // without blocking on the thread exits.
+        std::thread([owner = shared_from_this()]() { owner->threadLoop(); }).detach();
+        mLooperReady = true;
+    }
+
     mQueue.push_back({type, clientId, sessionId, runnable, arg});
     mCondition.notify_one();
 }
@@ -505,6 +546,10 @@
 
         ALOGD("%s: %s", __FUNCTION__, toString(event).c_str());
 
+        if (event.type == Event::Abandon) {
+            break;
+        }
+
         lock.unlock();
         event.runnable();
         lock.lock();
diff --git a/media/libmediatranscoding/TranscodingSessionController.cpp b/media/libmediatranscoding/TranscodingSessionController.cpp
index 09ad3cd..d12af21 100644
--- a/media/libmediatranscoding/TranscodingSessionController.cpp
+++ b/media/libmediatranscoding/TranscodingSessionController.cpp
@@ -24,6 +24,7 @@
 #include <media/TranscodingUidPolicy.h>
 #include <utils/Log.h>
 
+#include <thread>
 #include <utility>
 
 namespace android {
@@ -60,12 +61,140 @@
     return "(unknown)";
 }
 
+///////////////////////////////////////////////////////////////////////////////
+struct TranscodingSessionController::Watchdog {
+    Watchdog(TranscodingSessionController* owner, int64_t timeoutUs);
+    ~Watchdog();
+
+    // Starts monitoring the session.
+    void start(const SessionKeyType& key);
+    // Stops monitoring the session.
+    void stop();
+    // Signals that the session is still alive. Must be sent at least every mTimeoutUs.
+    // (Timeout will happen if no ping in mTimeoutUs since the last ping.)
+    void keepAlive();
+
+private:
+    void threadLoop();
+    void updateTimer_l();
+
+    TranscodingSessionController* mOwner;
+    const int64_t mTimeoutUs;
+    mutable std::mutex mLock;
+    std::condition_variable mCondition GUARDED_BY(mLock);
+    // Whether watchdog is monitoring a session for timeout.
+    bool mActive GUARDED_BY(mLock);
+    // Whether watchdog is aborted and the monitoring thread should exit.
+    bool mAbort GUARDED_BY(mLock);
+    // When watchdog is active, the next timeout time point.
+    std::chrono::system_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
+    // When watchdog is active, the session being watched.
+    SessionKeyType mSessionToWatch GUARDED_BY(mLock);
+    std::thread mThread;
+};
+
+static constexpr int64_t kWatchdogTimeoutUs = 3000000LL;
+static constexpr int64_t kTranscoderHeartBeatIntervalUs = 1000000LL;
+
+TranscodingSessionController::Watchdog::Watchdog(TranscodingSessionController* owner,
+                                                 int64_t timeoutUs)
+      : mOwner(owner),
+        mTimeoutUs(timeoutUs),
+        mActive(false),
+        mAbort(false),
+        mThread(&Watchdog::threadLoop, this) {
+    ALOGV("Watchdog CTOR: %p", this);
+}
+
+TranscodingSessionController::Watchdog::~Watchdog() {
+    ALOGV("Watchdog DTOR: %p", this);
+
+    {
+        // Exit the looper thread.
+        std::scoped_lock lock{mLock};
+
+        mAbort = true;
+        mCondition.notify_one();
+    }
+
+    mThread.join();
+    ALOGV("Watchdog DTOR: %p, done.", this);
+}
+
+void TranscodingSessionController::Watchdog::start(const SessionKeyType& key) {
+    std::scoped_lock lock{mLock};
+
+    if (!mActive) {
+        ALOGI("Watchdog start: %s", sessionToString(key).c_str());
+
+        mActive = true;
+        mSessionToWatch = key;
+        updateTimer_l();
+        mCondition.notify_one();
+    }
+}
+
+void TranscodingSessionController::Watchdog::stop() {
+    std::scoped_lock lock{mLock};
+
+    if (mActive) {
+        ALOGI("Watchdog stop: %s", sessionToString(mSessionToWatch).c_str());
+
+        mActive = false;
+        mCondition.notify_one();
+    }
+}
+
+void TranscodingSessionController::Watchdog::keepAlive() {
+    std::scoped_lock lock{mLock};
+
+    if (mActive) {
+        ALOGI("Watchdog keepAlive: %s", sessionToString(mSessionToWatch).c_str());
+
+        updateTimer_l();
+        mCondition.notify_one();
+    }
+}
+
+// updateTimer_l() is only called with lock held.
+void TranscodingSessionController::Watchdog::updateTimer_l() NO_THREAD_SAFETY_ANALYSIS {
+    std::chrono::microseconds timeout(mTimeoutUs);
+    mNextTimeoutTime = std::chrono::system_clock::now() + timeout;
+}
+
+// Unfortunately std::unique_lock is incompatible with -Wthread-safety.
+void TranscodingSessionController::Watchdog::threadLoop() NO_THREAD_SAFETY_ANALYSIS {
+    std::unique_lock<std::mutex> lock{mLock};
+
+    while (!mAbort) {
+        if (!mActive) {
+            mCondition.wait(lock);
+            continue;
+        }
+        // Watchdog active, wait till next timeout time.
+        if (mCondition.wait_until(lock, mNextTimeoutTime) == std::cv_status::timeout) {
+            // If timeout happens, report timeout and deactivate watchdog.
+            mActive = false;
+            // Make a copy of session key, as once we unlock, it could be unprotected.
+            SessionKeyType sessionKey = mSessionToWatch;
+
+            ALOGE("Watchdog timeout: %s", sessionToString(sessionKey).c_str());
+
+            lock.unlock();
+            mOwner->onError(sessionKey.first, sessionKey.second,
+                            TranscodingErrorCode::kWatchdogTimeout);
+            lock.lock();
+        }
+    }
+}
+///////////////////////////////////////////////////////////////////////////////
+
 TranscodingSessionController::TranscodingSessionController(
-        const std::shared_ptr<TranscoderInterface>& transcoder,
+        const TranscoderFactoryType& transcoderFactory,
         const std::shared_ptr<UidPolicyInterface>& uidPolicy,
         const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
         const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy)
-      : mTranscoder(transcoder),
+      : mTranscoderFactory(transcoderFactory),
         mUidPolicy(uidPolicy),
         mResourcePolicy(resourcePolicy),
         mThermalPolicy(thermalPolicy),
@@ -160,6 +289,26 @@
     return &mSessionMap[topSessionKey];
 }
 
+void TranscodingSessionController::setSessionState_l(Session* session, Session::State state) {
+    bool wasRunning = (session->getState() == Session::RUNNING);
+    session->setState(state);
+    bool isRunning = (session->getState() == Session::RUNNING);
+
+    if (wasRunning == isRunning) {
+        return;
+    }
+
+    // Currently we only have 1 running session, and we always put the previous
+    // session in non-running state before we run the new session, so it's okay
+    // to start/stop the watchdog here. If this assumption changes, we need to
+    // track the number of running sessions and start/stop watchdog based on that.
+    if (isRunning) {
+        mWatchdog->start(session->key);
+    } else {
+        mWatchdog->stop();
+    }
+}
+
 void TranscodingSessionController::Session::setState(Session::State newState) {
     if (state == newState) {
         return;
@@ -206,12 +355,17 @@
     // take some actions to ensure it's running.
     if (topSession != curSession ||
         (shouldBeRunning ^ (topSession->getState() == Session::RUNNING))) {
+        if (mTranscoder == nullptr) {
+            mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
+            mWatchdog = std::make_shared<Watchdog>(this, kWatchdogTimeoutUs);
+        }
+
         // If current session is running, pause it first. Note this is true for either
         // cases: 1) If top session is changing, or 2) if top session is not changing but
         // the topSession's state is changing.
         if (curSession != nullptr && curSession->getState() == Session::RUNNING) {
             mTranscoder->pause(curSession->key.first, curSession->key.second);
-            curSession->setState(Session::PAUSED);
+            setSessionState_l(curSession, Session::PAUSED);
         }
         // If we are not experiencing resource loss nor thermal throttling, we can start
         // or resume the topSession now.
@@ -223,7 +377,7 @@
                 mTranscoder->resume(topSession->key.first, topSession->key.second,
                                     topSession->request, topSession->callback.lock());
             }
-            topSession->setState(Session::RUNNING);
+            setSessionState_l(topSession, Session::RUNNING);
         }
     }
     mCurrentSession = topSession;
@@ -264,7 +418,7 @@
         mCurrentSession = nullptr;
     }
 
-    mSessionMap[sessionKey].setState(finalState);
+    setSessionState_l(&mSessionMap[sessionKey], finalState);
     mSessionHistory.push_back(mSessionMap[sessionKey]);
     if (mSessionHistory.size() > kSessionHistoryMax) {
         mSessionHistory.erase(mSessionHistory.begin());
@@ -361,7 +515,7 @@
     mSessionMap[sessionKey].pauseCount = 0;
     mSessionMap[sessionKey].request = request;
     mSessionMap[sessionKey].callback = callback;
-    mSessionMap[sessionKey].setState(Session::NOT_STARTED);
+    setSessionState_l(&mSessionMap[sessionKey], Session::NOT_STARTED);
 
     // If it's an offline session, the queue was already added in constructor.
     // If it's a real-time sessions, check if a queue is already present for the uid,
@@ -527,6 +681,15 @@
 void TranscodingSessionController::onError(ClientIdType clientId, SessionIdType sessionId,
                                            TranscodingErrorCode err) {
     notifyClient(clientId, sessionId, "error", [=](const SessionKeyType& sessionKey) {
+        if (err == TranscodingErrorCode::kWatchdogTimeout) {
+            // Abandon the transcoder, as its handler thread might be stuck in some call to
+            // MediaTranscoder altogether, and may not be able to handle any new tasks.
+            mTranscoder->stop(clientId, sessionId, true /*abandon*/);
+            // Clear the last ref count before we create new transcoder.
+            mTranscoder = nullptr;
+            mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
+        }
+
         {
             auto clientCallback = mSessionMap[sessionKey].callback.lock();
             if (clientCallback != nullptr) {
@@ -555,6 +718,11 @@
     });
 }
 
+void TranscodingSessionController::onHeartBeat(ClientIdType clientId, SessionIdType sessionId) {
+    notifyClient(clientId, sessionId, "heart-beat",
+                 [=](const SessionKeyType& /*sessionKey*/) { mWatchdog->keepAlive(); });
+}
+
 void TranscodingSessionController::onResourceLost(ClientIdType clientId, SessionIdType sessionId) {
     ALOGI("%s", __FUNCTION__);
 
@@ -572,7 +740,7 @@
         // If we receive a resource loss event, the transcoder already paused the transcoding,
         // so we don't need to call onPaused() to pause it. However, we still need to notify
         // the client and update the session state here.
-        resourceLostSession->setState(Session::PAUSED);
+        setSessionState_l(resourceLostSession, Session::PAUSED);
         // Notify the client as a paused event.
         auto clientCallback = resourceLostSession->callback.lock();
         if (clientCallback != nullptr) {
diff --git a/media/libmediatranscoding/aidl/android/media/TranscodingErrorCode.aidl b/media/libmediatranscoding/aidl/android/media/TranscodingErrorCode.aidl
index b044d41..23072ff 100644
--- a/media/libmediatranscoding/aidl/android/media/TranscodingErrorCode.aidl
+++ b/media/libmediatranscoding/aidl/android/media/TranscodingErrorCode.aidl
@@ -31,4 +31,5 @@
     kInvalidOperation = 5,
     kErrorIO = 6,
     kInsufficientResources = 7,
+    kWatchdogTimeout = 8,
 }
\ No newline at end of file
diff --git a/media/libmediatranscoding/include/media/TranscoderInterface.h b/media/libmediatranscoding/include/media/TranscoderInterface.h
index 6268aa5..5f27d82 100644
--- a/media/libmediatranscoding/include/media/TranscoderInterface.h
+++ b/media/libmediatranscoding/include/media/TranscoderInterface.h
@@ -32,7 +32,6 @@
 // Interface for the controller to call the transcoder to take actions.
 class TranscoderInterface {
 public:
-    virtual void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) = 0;
     virtual void start(ClientIdType clientId, SessionIdType sessionId,
                        const TranscodingRequestParcel& request,
                        const std::shared_ptr<ITranscodingClientCallback>& clientCallback) = 0;
@@ -40,7 +39,9 @@
     virtual void resume(ClientIdType clientId, SessionIdType sessionId,
                         const TranscodingRequestParcel& request,
                         const std::shared_ptr<ITranscodingClientCallback>& clientCallback) = 0;
-    virtual void stop(ClientIdType clientId, SessionIdType sessionId) = 0;
+    // Stop the specified session. If abandon is true, the transcoder wrapper will be discarded
+    // after the session stops.
+    virtual void stop(ClientIdType clientId, SessionIdType sessionId, bool abandon = false) = 0;
 
 protected:
     virtual ~TranscoderInterface() = default;
@@ -59,6 +60,7 @@
                          TranscodingErrorCode err) = 0;
     virtual void onProgressUpdate(ClientIdType clientId, SessionIdType sessionId,
                                   int32_t progress) = 0;
+    virtual void onHeartBeat(ClientIdType clientId, SessionIdType sessionId) = 0;
 
     // Called when transcoding becomes temporarily inaccessible due to loss of resource.
     // If there is any session currently running, it will be paused. When resource contention
diff --git a/media/libmediatranscoding/include/media/TranscoderWrapper.h b/media/libmediatranscoding/include/media/TranscoderWrapper.h
index 02beede..7935bbe 100644
--- a/media/libmediatranscoding/include/media/TranscoderWrapper.h
+++ b/media/libmediatranscoding/include/media/TranscoderWrapper.h
@@ -36,22 +36,36 @@
 class TranscoderWrapper : public TranscoderInterface,
                           public std::enable_shared_from_this<TranscoderWrapper> {
 public:
-    TranscoderWrapper();
+    TranscoderWrapper(const std::shared_ptr<TranscoderCallbackInterface>& cb,
+                      int64_t heartBeatIntervalUs);
+    ~TranscoderWrapper();
 
-    virtual void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) override;
-    virtual void start(ClientIdType clientId, SessionIdType sessionId,
-                       const TranscodingRequestParcel& request,
-                       const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
-    virtual void pause(ClientIdType clientId, SessionIdType sessionId) override;
-    virtual void resume(ClientIdType clientId, SessionIdType sessionId,
-                        const TranscodingRequestParcel& request,
-                        const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
-    virtual void stop(ClientIdType clientId, SessionIdType sessionId) override;
+    // TranscoderInterface
+    void start(ClientIdType clientId, SessionIdType sessionId,
+               const TranscodingRequestParcel& request,
+               const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
+    void pause(ClientIdType clientId, SessionIdType sessionId) override;
+    void resume(ClientIdType clientId, SessionIdType sessionId,
+                const TranscodingRequestParcel& request,
+                const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
+    void stop(ClientIdType clientId, SessionIdType sessionId, bool abandon = false) override;
+    // ~TranscoderInterface
 
 private:
     class CallbackImpl;
     struct Event {
-        enum Type { NoEvent, Start, Pause, Resume, Stop, Finish, Error, Progress } type;
+        enum Type {
+            NoEvent,
+            Start,
+            Pause,
+            Resume,
+            Stop,
+            Finish,
+            Error,
+            Progress,
+            HeartBeat,
+            Abandon
+        } type;
         ClientIdType clientId;
         SessionIdType sessionId;
         std::function<void()> runnable;
@@ -62,17 +76,21 @@
     std::shared_ptr<CallbackImpl> mTranscoderCb;
     std::shared_ptr<MediaTranscoder> mTranscoder;
     std::weak_ptr<TranscoderCallbackInterface> mCallback;
+    int64_t mHeartBeatIntervalUs;
     std::mutex mLock;
     std::condition_variable mCondition;
     std::list<Event> mQueue;  // GUARDED_BY(mLock);
     std::map<SessionKeyType, std::shared_ptr<ndk::ScopedAParcel>> mPausedStateMap;
     ClientIdType mCurrentClientId;
     SessionIdType mCurrentSessionId;
+    // Whether the looper has been created.
+    bool mLooperReady;
 
     static std::string toString(const Event& event);
     void onFinish(ClientIdType clientId, SessionIdType sessionId);
     void onError(ClientIdType clientId, SessionIdType sessionId, media_status_t status);
     void onProgress(ClientIdType clientId, SessionIdType sessionId, int32_t progress);
+    void onHeartBeat(ClientIdType clientId, SessionIdType sessionId);
 
     media_status_t handleStart(ClientIdType clientId, SessionIdType sessionId,
                                const TranscodingRequestParcel& request,
diff --git a/media/libmediatranscoding/include/media/TranscodingSessionController.h b/media/libmediatranscoding/include/media/TranscodingSessionController.h
index 4fcc423..34e9506 100644
--- a/media/libmediatranscoding/include/media/TranscodingSessionController.h
+++ b/media/libmediatranscoding/include/media/TranscodingSessionController.h
@@ -28,6 +28,7 @@
 #include <utils/Vector.h>
 
 #include <chrono>
+#include <functional>
 #include <list>
 #include <map>
 #include <mutex>
@@ -36,11 +37,13 @@
 using ::aidl::android::media::TranscodingResultParcel;
 using ::aidl::android::media::TranscodingSessionPriority;
 
-class TranscodingSessionController : public UidPolicyCallbackInterface,
-                                     public ControllerClientInterface,
-                                     public TranscoderCallbackInterface,
-                                     public ResourcePolicyCallbackInterface,
-                                     public ThermalPolicyCallbackInterface {
+class TranscodingSessionController
+      : public UidPolicyCallbackInterface,
+        public ControllerClientInterface,
+        public TranscoderCallbackInterface,
+        public ResourcePolicyCallbackInterface,
+        public ThermalPolicyCallbackInterface,
+        public std::enable_shared_from_this<TranscodingSessionController> {
 public:
     virtual ~TranscodingSessionController();
 
@@ -61,6 +64,7 @@
     void onError(ClientIdType clientId, SessionIdType sessionId, TranscodingErrorCode err) override;
     void onProgressUpdate(ClientIdType clientId, SessionIdType sessionId,
                           int32_t progress) override;
+    void onHeartBeat(ClientIdType clientId, SessionIdType sessionId) override;
     void onResourceLost(ClientIdType clientId, SessionIdType sessionId) override;
     // ~TranscoderCallbackInterface
 
@@ -88,6 +92,8 @@
 
     using SessionKeyType = std::pair<ClientIdType, SessionIdType>;
     using SessionQueueType = std::list<SessionKeyType>;
+    using TranscoderFactoryType = std::function<std::shared_ptr<TranscoderInterface>(
+            const std::shared_ptr<TranscoderCallbackInterface>&, int64_t)>;
 
     struct Session {
         enum State {
@@ -121,6 +127,8 @@
         State state = INVALID;
     };
 
+    struct Watchdog;
+
     // TODO(chz): call transcoder without global lock.
     // Use mLock for all entrypoints for now.
     mutable std::mutex mLock;
@@ -136,6 +144,7 @@
     std::list<uid_t>::iterator mOfflineUidIterator;
     std::map<uid_t, std::string> mUidPackageNames;
 
+    TranscoderFactoryType mTranscoderFactory;
     std::shared_ptr<TranscoderInterface> mTranscoder;
     std::shared_ptr<UidPolicyInterface> mUidPolicy;
     std::shared_ptr<ResourcePolicyInterface> mResourcePolicy;
@@ -145,9 +154,10 @@
     bool mResourceLost;
     bool mThermalThrottling;
     std::list<Session> mSessionHistory;
+    std::shared_ptr<Watchdog> mWatchdog;
 
     // Only allow MediaTranscodingService and unit tests to instantiate.
-    TranscodingSessionController(const std::shared_ptr<TranscoderInterface>& transcoder,
+    TranscodingSessionController(const TranscoderFactoryType& transcoderFactory,
                                  const std::shared_ptr<UidPolicyInterface>& uidPolicy,
                                  const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
                                  const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy);
@@ -157,6 +167,7 @@
     void updateCurrentSession_l();
     void removeSession_l(const SessionKeyType& sessionKey, Session::State finalState);
     void moveUidsToTop_l(const std::unordered_set<uid_t>& uids, bool preserveTopUid);
+    void setSessionState_l(Session* session, Session::State state);
     void notifyClient(ClientIdType clientId, SessionIdType sessionId, const char* reason,
                       std::function<void(const SessionKeyType&)> func);
     // Internal state verifier (debug only)
diff --git a/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp b/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
index 9a1c272..2e9daee 100644
--- a/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
+++ b/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
@@ -118,46 +118,55 @@
 
 class TestTranscoder : public TranscoderInterface {
 public:
-    TestTranscoder() : mLastError(TranscodingErrorCode::kUnknown) {}
+    TestTranscoder() : mLastError(TranscodingErrorCode::kUnknown), mGeneration(0) {}
     virtual ~TestTranscoder() {}
 
     // TranscoderInterface
-    void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& /*cb*/) override {}
-
     void start(ClientIdType clientId, SessionIdType sessionId,
                const TranscodingRequestParcel& /*request*/,
                const std::shared_ptr<ITranscodingClientCallback>& /*clientCallback*/) override {
-        mEventQueue.push_back(Start(clientId, sessionId));
+        append(Start(clientId, sessionId));
     }
     void pause(ClientIdType clientId, SessionIdType sessionId) override {
-        mEventQueue.push_back(Pause(clientId, sessionId));
+        append(Pause(clientId, sessionId));
     }
     void resume(ClientIdType clientId, SessionIdType sessionId,
                 const TranscodingRequestParcel& /*request*/,
                 const std::shared_ptr<ITranscodingClientCallback>& /*clientCallback*/) override {
-        mEventQueue.push_back(Resume(clientId, sessionId));
+        append(Resume(clientId, sessionId));
     }
-    void stop(ClientIdType clientId, SessionIdType sessionId) override {
-        mEventQueue.push_back(Stop(clientId, sessionId));
+    void stop(ClientIdType clientId, SessionIdType sessionId, bool abandon) override {
+        append(abandon ? Abandon(clientId, sessionId) : Stop(clientId, sessionId));
     }
 
     void onFinished(ClientIdType clientId, SessionIdType sessionId) {
-        mEventQueue.push_back(Finished(clientId, sessionId));
+        append(Finished(clientId, sessionId));
     }
 
     void onFailed(ClientIdType clientId, SessionIdType sessionId, TranscodingErrorCode err) {
-        mLastError = err;
-        mEventQueue.push_back(Failed(clientId, sessionId));
+        append(Failed(clientId, sessionId), err);
+    }
+
+    void onCreated() {
+        std::scoped_lock lock{mLock};
+        mGeneration++;
     }
 
     TranscodingErrorCode getLastError() {
+        std::scoped_lock lock{mLock};
+        // Clear last error.
         TranscodingErrorCode result = mLastError;
-        mLastError = TranscodingErrorCode::kUnknown;
+        mLastError = TranscodingErrorCode::kNoError;
         return result;
     }
 
+    int32_t getGeneration() {
+        std::scoped_lock lock{mLock};
+        return mGeneration;
+    }
+
     struct Event {
-        enum { NoEvent, Start, Pause, Resume, Stop, Finished, Failed } type;
+        enum { NoEvent, Start, Pause, Resume, Stop, Finished, Failed, Abandon } type;
         ClientIdType clientId;
         SessionIdType sessionId;
     };
@@ -175,21 +184,47 @@
     DECLARE_EVENT(Stop);
     DECLARE_EVENT(Finished);
     DECLARE_EVENT(Failed);
+    DECLARE_EVENT(Abandon);
 
-    const Event& popEvent() {
+    // Push 1 event to back.
+    void append(const Event& event,
+                const TranscodingErrorCode err = TranscodingErrorCode::kNoError) {
+        std::unique_lock lock(mLock);
+
+        mEventQueue.push_back(event);
+        // Error is sticky, non-error event will not erase it, only getLastError()
+        // clears last error.
+        if (err != TranscodingErrorCode::kNoError) {
+            mLastError = err;
+        }
+        mCondition.notify_one();
+    }
+
+    // Pop 1 event from front, wait for up to timeoutUs if empty.
+    const Event& popEvent(int64_t timeoutUs = 0) {
+        std::unique_lock lock(mLock);
+
+        if (mEventQueue.empty() && timeoutUs > 0) {
+            mCondition.wait_for(lock, std::chrono::microseconds(timeoutUs));
+        }
+
         if (mEventQueue.empty()) {
             mPoppedEvent = NoEvent;
         } else {
             mPoppedEvent = *mEventQueue.begin();
             mEventQueue.pop_front();
         }
+
         return mPoppedEvent;
     }
 
 private:
+    std::mutex mLock;
+    std::condition_variable mCondition;
     Event mPoppedEvent;
     std::list<Event> mEventQueue;
     TranscodingErrorCode mLastError;
+    int32_t mGeneration;
 };
 
 bool operator==(const TestTranscoder::Event& lhs, const TestTranscoder::Event& rhs) {
@@ -248,6 +283,7 @@
 class TranscodingSessionControllerTest : public ::testing::Test {
 public:
     TranscodingSessionControllerTest() { ALOGI("TranscodingSessionControllerTest created"); }
+    ~TranscodingSessionControllerTest() { ALOGD("TranscodingSessionControllerTest destroyed"); }
 
     void SetUp() override {
         ALOGI("TranscodingSessionControllerTest set up");
@@ -255,8 +291,16 @@
         mUidPolicy.reset(new TestUidPolicy());
         mResourcePolicy.reset(new TestResourcePolicy());
         mThermalPolicy.reset(new TestThermalPolicy());
-        mController.reset(new TranscodingSessionController(mTranscoder, mUidPolicy, mResourcePolicy,
-                                                           mThermalPolicy));
+        mController.reset(new TranscodingSessionController(
+                [this](const std::shared_ptr<TranscoderCallbackInterface>& /*cb*/,
+                       int64_t /*heartBeatIntervalUs*/) {
+                    // Here we require that the SessionController clears out all its refcounts of
+                    // the transcoder object when it calls create.
+                    EXPECT_EQ(mTranscoder.use_count(), 1);
+                    mTranscoder->onCreated();
+                    return mTranscoder;
+                },
+                mUidPolicy, mResourcePolicy, mThermalPolicy));
         mUidPolicy->setCallback(mController);
 
         // Set priority only, ignore other fields for now.
@@ -274,7 +318,15 @@
 
     void TearDown() override { ALOGI("TranscodingSessionControllerTest tear down"); }
 
-    ~TranscodingSessionControllerTest() { ALOGD("TranscodingSessionControllerTest destroyed"); }
+    void expectTimeout(int64_t clientId, int32_t sessionId, int32_t generation) {
+        EXPECT_EQ(mTranscoder->popEvent(2900000), TestTranscoder::NoEvent);
+        EXPECT_EQ(mTranscoder->popEvent(200000), TestTranscoder::Abandon(clientId, sessionId));
+        EXPECT_EQ(mTranscoder->popEvent(100000), TestTranscoder::Failed(clientId, sessionId));
+        EXPECT_EQ(mTranscoder->getLastError(), TranscodingErrorCode::kWatchdogTimeout);
+        // Should have created new transcoder.
+        EXPECT_EQ(mTranscoder->getGeneration(), generation);
+        EXPECT_EQ(mTranscoder.use_count(), 2);
+    }
 
     std::shared_ptr<TestTranscoder> mTranscoder;
     std::shared_ptr<TestUidPolicy> mUidPolicy;
@@ -802,4 +854,52 @@
     EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(2), SESSION(0)));
 }
 
+TEST_F(TranscodingSessionControllerTest, TestTranscoderWatchdogTimeout) {
+    ALOGD("TestTranscoderWatchdogTimeout");
+
+    // Submit session to CLIENT(0) in UID(0).
+    // Should start immediately (because this is the only session).
+    mController->submit(CLIENT(0), SESSION(0), UID(0), mRealtimeRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(0)));
+
+    int32_t expectedGen = 2;
+    // Test 1: If not sending keep-alive at all, timeout after 3 seconds.
+    expectTimeout(CLIENT(0), SESSION(0), expectedGen++);
+
+    // Test 2: No timeout as long as keep-alive coming; timeout after keep-alive stops.
+    mController->submit(CLIENT(0), SESSION(1), UID(0), mRealtimeRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(1)));
+    for (int i = 0; i < 5; i++) {
+        EXPECT_EQ(mTranscoder->popEvent(1000000), TestTranscoder::NoEvent);
+        mController->onHeartBeat(CLIENT(0), SESSION(1));
+    }
+    expectTimeout(CLIENT(0), SESSION(1), expectedGen++);
+
+    // Test 3a: No timeout for paused session even if no keep-alive is sent.
+    mController->submit(CLIENT(0), SESSION(2), UID(0), mOfflineRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(2)));
+    // Trigger a pause by sending a resource lost.
+    mController->onResourceLost(CLIENT(0), SESSION(2));
+    EXPECT_EQ(mTranscoder->popEvent(3100000), TestTranscoder::NoEvent);
+    mController->onResourceAvailable();
+    EXPECT_EQ(mTranscoder->popEvent(100000), TestTranscoder::Resume(CLIENT(0), SESSION(2)));
+    expectTimeout(CLIENT(0), SESSION(2), expectedGen++);
+
+    // Test 3b: No timeout for paused session even if no keep-alive is sent.
+    mController->submit(CLIENT(0), SESSION(3), UID(0), mOfflineRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(3)));
+    // Let the session run almost to timeout, to test timeout reset after pause.
+    EXPECT_EQ(mTranscoder->popEvent(2900000), TestTranscoder::NoEvent);
+    // Trigger a pause by submitting a higher-priority request.
+    mController->submit(CLIENT(0), SESSION(4), UID(0), mRealtimeRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), SESSION(3)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(4)));
+    // Finish the higher-priority session, lower-priority session should resume,
+    // and the timeout should reset to full value.
+    mController->onFinish(CLIENT(0), SESSION(4));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(0), SESSION(4)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), SESSION(3)));
+    expectTimeout(CLIENT(0), SESSION(3), expectedGen++);
+}
+
 }  // namespace android
diff --git a/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp b/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
index 389b941..0efe85d 100644
--- a/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
+++ b/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
@@ -83,12 +83,14 @@
     }
 }
 
-bool MediaSampleWriter::init(int fd, const std::weak_ptr<CallbackInterface>& callbacks) {
-    return init(DefaultMuxer::create(fd), callbacks);
+bool MediaSampleWriter::init(int fd, const std::weak_ptr<CallbackInterface>& callbacks,
+                             int64_t heartBeatIntervalUs) {
+    return init(DefaultMuxer::create(fd), callbacks, heartBeatIntervalUs);
 }
 
 bool MediaSampleWriter::init(const std::shared_ptr<MediaSampleWriterMuxerInterface>& muxer,
-                             const std::weak_ptr<CallbackInterface>& callbacks) {
+                             const std::weak_ptr<CallbackInterface>& callbacks,
+                             int64_t heartBeatIntervalUs) {
     if (callbacks.lock() == nullptr) {
         LOG(ERROR) << "Callback object cannot be null";
         return false;
@@ -106,6 +108,7 @@
     mState = INITIALIZED;
     mMuxer = muxer;
     mCallbacks = callbacks;
+    mHeartBeatIntervalUs = heartBeatIntervalUs;
     return true;
 }
 
@@ -219,6 +222,7 @@
 media_status_t MediaSampleWriter::runWriterLoop(bool* wasStopped) NO_THREAD_SAFETY_ANALYSIS {
     AMediaCodecBufferInfo bufferInfo;
     int32_t lastProgressUpdate = 0;
+    bool progressSinceLastReport = false;
     int trackEosCount = 0;
 
     // Set the "primary" track that will be used to determine progress to the track with longest
@@ -232,6 +236,10 @@
         }
     }
 
+    std::chrono::microseconds updateInterval(mHeartBeatIntervalUs);
+    std::chrono::system_clock::time_point nextUpdateTime =
+            std::chrono::system_clock::now() + updateInterval;
+
     while (true) {
         if (trackEosCount >= mTracks.size()) {
             break;
@@ -242,7 +250,21 @@
         {
             std::unique_lock lock(mMutex);
             while (mSampleQueue.empty() && mState == STARTED) {
-                mSampleSignal.wait(lock);
+                if (mHeartBeatIntervalUs <= 0) {
+                    mSampleSignal.wait(lock);
+                    continue;
+                }
+
+                if (mSampleSignal.wait_until(lock, nextUpdateTime) == std::cv_status::timeout) {
+                    // Send heart-beat if there is any progress since last update time.
+                    if (progressSinceLastReport) {
+                        if (auto callbacks = mCallbacks.lock()) {
+                            callbacks->onHeartBeat(this);
+                        }
+                        progressSinceLastReport = false;
+                    }
+                    nextUpdateTime += updateInterval;
+                }
             }
 
             if (mState == STOPPED) {
@@ -306,6 +328,7 @@
                 }
                 lastProgressUpdate = progress;
             }
+            progressSinceLastReport = true;
         }
     }
 
diff --git a/media/libmediatranscoding/transcoder/MediaTranscoder.cpp b/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
index d58d88d..74ddce4 100644
--- a/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
+++ b/media/libmediatranscoding/transcoder/MediaTranscoder.cpp
@@ -206,13 +206,18 @@
     mCallbacks->onProgressUpdate(this, progress);
 }
 
-MediaTranscoder::MediaTranscoder(const std::shared_ptr<CallbackInterface>& callbacks, pid_t pid,
-                                 uid_t uid)
-      : mCallbacks(callbacks), mPid(pid), mUid(uid) {}
+void MediaTranscoder::onHeartBeat(const MediaSampleWriter* writer __unused) {
+    // Signal heart-beat to the client.
+    mCallbacks->onHeartBeat(this);
+}
+
+MediaTranscoder::MediaTranscoder(const std::shared_ptr<CallbackInterface>& callbacks,
+                                 int64_t heartBeatIntervalUs, pid_t pid, uid_t uid)
+      : mCallbacks(callbacks), mHeartBeatIntervalUs(heartBeatIntervalUs), mPid(pid), mUid(uid) {}
 
 std::shared_ptr<MediaTranscoder> MediaTranscoder::create(
-        const std::shared_ptr<CallbackInterface>& callbacks, pid_t pid, uid_t uid,
-        const std::shared_ptr<ndk::ScopedAParcel>& pausedState) {
+        const std::shared_ptr<CallbackInterface>& callbacks, int64_t heartBeatIntervalUs, pid_t pid,
+        uid_t uid, const std::shared_ptr<ndk::ScopedAParcel>& pausedState) {
     if (pausedState != nullptr) {
         LOG(INFO) << "Initializing from paused state.";
     }
@@ -221,7 +226,8 @@
         return nullptr;
     }
 
-    return std::shared_ptr<MediaTranscoder>(new MediaTranscoder(callbacks, pid, uid));
+    return std::shared_ptr<MediaTranscoder>(
+            new MediaTranscoder(callbacks, heartBeatIntervalUs, pid, uid));
 }
 
 media_status_t MediaTranscoder::configureSource(int fd) {
@@ -348,7 +354,7 @@
     }
 
     mSampleWriter = MediaSampleWriter::Create();
-    const bool initOk = mSampleWriter->init(fd, shared_from_this());
+    const bool initOk = mSampleWriter->init(fd, shared_from_this(), mHeartBeatIntervalUs);
 
     if (!initOk) {
         LOG(ERROR) << "Unable to initialize sample writer with destination fd: " << fd;
diff --git a/media/libmediatranscoding/transcoder/benchmark/MediaTranscoderBenchmark.cpp b/media/libmediatranscoding/transcoder/benchmark/MediaTranscoderBenchmark.cpp
index 712f8fc..ac3b2c0 100644
--- a/media/libmediatranscoding/transcoder/benchmark/MediaTranscoderBenchmark.cpp
+++ b/media/libmediatranscoding/transcoder/benchmark/MediaTranscoderBenchmark.cpp
@@ -60,6 +60,8 @@
     virtual void onProgressUpdate(const MediaTranscoder* transcoder __unused,
                                   int32_t progress __unused) override {}
 
+    virtual void onHeartBeat(const MediaTranscoder* transcoder __unused) override {}
+
     virtual void onCodecResourceLost(const MediaTranscoder* transcoder __unused,
                                      const std::shared_ptr<ndk::ScopedAParcel>& pausedState
                                              __unused) override {}
diff --git a/media/libmediatranscoding/transcoder/include/media/MediaSampleWriter.h b/media/libmediatranscoding/transcoder/include/media/MediaSampleWriter.h
index 080f2b7..23a234b 100644
--- a/media/libmediatranscoding/transcoder/include/media/MediaSampleWriter.h
+++ b/media/libmediatranscoding/transcoder/include/media/MediaSampleWriter.h
@@ -90,6 +90,9 @@
         /** Sample writer progress update in percent. */
         virtual void onProgressUpdate(const MediaSampleWriter* writer, int32_t progress) = 0;
 
+        /** Sample writer heart-beat signal. */
+        virtual void onHeartBeat(const MediaSampleWriter* writer) = 0;
+
         virtual ~CallbackInterface() = default;
     };
 
@@ -101,18 +104,25 @@
      * @param fd An open file descriptor to write to. The caller is responsible for closing this
      *        file descriptor and it is safe to do so once this method returns.
      * @param callbacks Client callback object that gets called by the sample writer.
+     * @param heartBeatIntervalUs Interval (in microsecond) at which the sample writer should send a
+     *        heart-beat to onProgressUpdate() to indicate it's making progress. Value <=0 indicates
+     *        that the heartbeat is not required.
      * @return True if the writer was successfully initialized.
      */
-    bool init(int fd, const std::weak_ptr<CallbackInterface>& callbacks /* nonnull */);
+    bool init(int fd, const std::weak_ptr<CallbackInterface>& callbacks /* nonnull */,
+              int64_t heartBeatIntervalUs = -1);
 
     /**
      * Initializes the sample writer with a custom muxer interface implementation.
      * @param muxer The custom muxer interface implementation.
      * @param @param callbacks Client callback object that gets called by the sample writer.
+     * @param heartBeatIntervalUs Interval (in microsecond) at which the sample writer should send a
+     *        heart-beat to onProgressUpdate() to indicate it's making progress.
      * @return True if the writer was successfully initialized.
      */
     bool init(const std::shared_ptr<MediaSampleWriterMuxerInterface>& muxer /* nonnull */,
-              const std::weak_ptr<CallbackInterface>& callbacks /* nonnull */);
+              const std::weak_ptr<CallbackInterface>& callbacks /* nonnull */,
+              int64_t heartBeatIntervalUs = -1);
 
     /**
      * Adds a new track to the sample writer. Tracks must be added after the sample writer has been
@@ -185,6 +195,7 @@
 
     std::weak_ptr<CallbackInterface> mCallbacks;
     std::shared_ptr<MediaSampleWriterMuxerInterface> mMuxer;
+    int64_t mHeartBeatIntervalUs;
 
     std::mutex mMutex;  // Protects sample queue and state.
     std::condition_variable mSampleSignal;
diff --git a/media/libmediatranscoding/transcoder/include/media/MediaTranscoder.h b/media/libmediatranscoding/transcoder/include/media/MediaTranscoder.h
index 4e11ef5..8776dc9 100644
--- a/media/libmediatranscoding/transcoder/include/media/MediaTranscoder.h
+++ b/media/libmediatranscoding/transcoder/include/media/MediaTranscoder.h
@@ -50,6 +50,9 @@
         /** Transcoder progress update reported in percent from 0 to 100. */
         virtual void onProgressUpdate(const MediaTranscoder* transcoder, int32_t progress) = 0;
 
+        /** Transcoder heart-beat signal. */
+        virtual void onHeartBeat(const MediaTranscoder* transcoder) = 0;
+
         /**
          * Transcoder lost codec resources and paused operations. The client can resume transcoding
          * again when resources are available by either:
@@ -70,7 +73,7 @@
      * possible to change any configurations on a paused transcoder.
      */
     static std::shared_ptr<MediaTranscoder> create(
-            const std::shared_ptr<CallbackInterface>& callbacks,
+            const std::shared_ptr<CallbackInterface>& callbacks, int64_t heartBeatIntervalUs = -1,
             pid_t pid = AMEDIACODEC_CALLING_PID, uid_t uid = AMEDIACODEC_CALLING_UID,
             const std::shared_ptr<ndk::ScopedAParcel>& pausedState = nullptr);
 
@@ -120,7 +123,8 @@
     virtual ~MediaTranscoder() = default;
 
 private:
-    MediaTranscoder(const std::shared_ptr<CallbackInterface>& callbacks, pid_t pid, uid_t uid);
+    MediaTranscoder(const std::shared_ptr<CallbackInterface>& callbacks,
+                    int64_t heartBeatIntervalUs, pid_t pid, uid_t uid);
 
     // MediaTrackTranscoderCallback
     virtual void onTrackFormatAvailable(const MediaTrackTranscoder* transcoder) override;
@@ -134,6 +138,7 @@
     virtual void onFinished(const MediaSampleWriter* writer, media_status_t status) override;
     virtual void onStopped(const MediaSampleWriter* writer) override;
     virtual void onProgressUpdate(const MediaSampleWriter* writer, int32_t progress) override;
+    virtual void onHeartBeat(const MediaSampleWriter* writer) override;
     // ~MediaSampleWriter::CallbackInterface
 
     void onThreadFinished(const void* thread, media_status_t threadStatus, bool threadStopped);
@@ -147,6 +152,7 @@
     std::vector<std::shared_ptr<MediaTrackTranscoder>> mTrackTranscoders;
     std::mutex mTracksAddedMutex;
     std::unordered_set<const MediaTrackTranscoder*> mTracksAdded GUARDED_BY(mTracksAddedMutex);
+    int64_t mHeartBeatIntervalUs;
     pid_t mPid;
     uid_t mUid;
 
diff --git a/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp b/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
index 0a41b00..8b3905c 100644
--- a/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
+++ b/media/libmediatranscoding/transcoder/tests/MediaSampleWriterTests.cpp
@@ -210,6 +210,8 @@
         mLastProgress = progress;
         mProgressUpdateCount++;
     }
+
+    virtual void onHeartBeat(const MediaSampleWriter* writer __unused) override {}
     // ~MediaSampleWriter::CallbackInterface
 
     void waitForWritingFinished() {
diff --git a/media/libmediatranscoding/transcoder/tests/MediaTranscoderTests.cpp b/media/libmediatranscoding/transcoder/tests/MediaTranscoderTests.cpp
index 54d8b89..4e33ec3 100644
--- a/media/libmediatranscoding/transcoder/tests/MediaTranscoderTests.cpp
+++ b/media/libmediatranscoding/transcoder/tests/MediaTranscoderTests.cpp
@@ -81,6 +81,11 @@
         }
     }
 
+    virtual void onHeartBeat(const MediaTranscoder* transcoder __unused) override {
+        std::unique_lock<std::mutex> lock(mMutex);
+        mHeartBeatCount++;
+    }
+
     virtual void onCodecResourceLost(const MediaTranscoder* transcoder __unused,
                                      const std::shared_ptr<ndk::ScopedAParcel>& pausedState
                                              __unused) override {}
@@ -100,6 +105,7 @@
     }
     media_status_t mStatus = AMEDIA_OK;
     bool mFinished = false;
+    int32_t mHeartBeatCount = 0;
 
 private:
     std::mutex mMutex;
@@ -143,6 +149,7 @@
 
     typedef enum {
         kRunToCompletion,
+        kCheckHeartBeat,
         kCancelAfterProgress,
         kCancelAfterStart,
         kPauseAfterProgress,
@@ -152,8 +159,9 @@
     using FormatConfigurationCallback = std::function<AMediaFormat*(AMediaFormat*)>;
     media_status_t transcodeHelper(const char* srcPath, const char* destPath,
                                    FormatConfigurationCallback formatCallback,
-                                   TranscodeExecutionControl executionControl = kRunToCompletion) {
-        auto transcoder = MediaTranscoder::create(mCallbacks);
+                                   TranscodeExecutionControl executionControl = kRunToCompletion,
+                                   int64_t heartBeatIntervalUs = -1) {
+        auto transcoder = MediaTranscoder::create(mCallbacks, heartBeatIntervalUs);
         EXPECT_NE(transcoder, nullptr);
 
         const int srcFd = open(srcPath, O_RDONLY);
@@ -200,6 +208,18 @@
             case kPauseAfterStart:
                 transcoder->pause(&pausedState);
                 break;
+            case kCheckHeartBeat: {
+                mCallbacks->waitForProgressMade();
+                auto startTime = std::chrono::system_clock::now();
+                mCallbacks->waitForTranscodingFinished();
+                auto finishTime = std::chrono::system_clock::now();
+                int32_t expectedCount =
+                        (finishTime - startTime) / std::chrono::microseconds(heartBeatIntervalUs);
+                // Here we relax the expected count by 1, in case the last heart-beat just
+                // missed the window, other than that the count should be exact.
+                EXPECT_GE(mCallbacks->mHeartBeatCount, expectedCount - 1);
+                break;
+            }
             case kRunToCompletion:
             default:
                 mCallbacks->waitForTranscodingFinished();
@@ -430,6 +450,18 @@
     }
 }
 
+TEST_F(MediaTranscoderTests, TestHeartBeat) {
+    const char* srcPath = "/data/local/tmp/TranscodingTestAssets/longtest_15s.mp4";
+    const char* destPath = "/data/local/tmp/MediaTranscoder_HeartBeat.MP4";
+
+    // Use a shorter value of 500ms than the default 1000ms to get more heart beat for testing.
+    const int64_t heartBeatIntervalUs = 500000LL;
+    EXPECT_EQ(transcodeHelper(srcPath, destPath, getAVCVideoFormat, kCheckHeartBeat,
+                              heartBeatIntervalUs),
+              AMEDIA_OK);
+    EXPECT_TRUE(mCallbacks->mFinished);
+}
+
 }  // namespace android
 
 int main(int argc, char** argv) {
diff --git a/media/libmediatranscoding/transcoder/tests/fuzzer/media_transcoder_fuzzer.cpp b/media/libmediatranscoding/transcoder/tests/fuzzer/media_transcoder_fuzzer.cpp
index 48d3406..ec36c0f 100644
--- a/media/libmediatranscoding/transcoder/tests/fuzzer/media_transcoder_fuzzer.cpp
+++ b/media/libmediatranscoding/transcoder/tests/fuzzer/media_transcoder_fuzzer.cpp
@@ -88,6 +88,8 @@
         }
     }
 
+    virtual void onHeartBeat(const MediaTranscoder* transcoder UNUSED_PARAM) override {}
+
     virtual void onCodecResourceLost(const MediaTranscoder* transcoder UNUSED_PARAM,
                                      const shared_ptr<ndk::ScopedAParcel>& pausedState
                                              UNUSED_PARAM) override {}
diff --git a/services/mediatranscoding/MediaTranscodingService.cpp b/services/mediatranscoding/MediaTranscodingService.cpp
index 5c8cc1a..cca36fb 100644
--- a/services/mediatranscoding/MediaTranscodingService.cpp
+++ b/services/mediatranscoding/MediaTranscodingService.cpp
@@ -41,16 +41,21 @@
             errorCode,                                \
             String8::format("%s:%d: " errorString, __FUNCTION__, __LINE__, ##__VA_ARGS__))
 
-MediaTranscodingService::MediaTranscodingService(
-        const std::shared_ptr<TranscoderInterface>& transcoder)
+MediaTranscodingService::MediaTranscodingService(bool simulated)
       : mUidPolicy(new TranscodingUidPolicy()),
         mResourcePolicy(new TranscodingResourcePolicy()),
-        mThermalPolicy(new TranscodingThermalPolicy()),
-        mSessionController(new TranscodingSessionController(transcoder, mUidPolicy, mResourcePolicy,
-                                                            mThermalPolicy)),
-        mClientManager(new TranscodingClientManager(mSessionController)) {
+        mThermalPolicy(new TranscodingThermalPolicy()) {
     ALOGV("MediaTranscodingService is created");
-    transcoder->setCallback(mSessionController);
+    mSessionController.reset(new TranscodingSessionController(
+            [simulated](const std::shared_ptr<TranscoderCallbackInterface>& cb,
+                        int64_t heartBeatUs) -> std::shared_ptr<TranscoderInterface> {
+                if (simulated) {
+                    return std::make_shared<SimulatedTranscoder>(cb, heartBeatUs);
+                }
+                return std::make_shared<TranscoderWrapper>(cb, heartBeatUs);
+            },
+            mUidPolicy, mResourcePolicy, mThermalPolicy));
+    mClientManager.reset(new TranscodingClientManager(mSessionController));
     mUidPolicy->setCallback(mSessionController);
     mResourcePolicy->setCallback(mSessionController);
     mThermalPolicy->setCallback(mSessionController);
@@ -94,15 +99,9 @@
 
 //static
 void MediaTranscodingService::instantiate() {
-    std::shared_ptr<TranscoderInterface> transcoder;
-    if (property_get_bool("debug.transcoding.simulated_transcoder", false)) {
-        transcoder = std::make_shared<SimulatedTranscoder>();
-    } else {
-        transcoder = std::make_shared<TranscoderWrapper>();
-    }
-
     std::shared_ptr<MediaTranscodingService> service =
-            ::ndk::SharedRefBase::make<MediaTranscodingService>(transcoder);
+            ::ndk::SharedRefBase::make<MediaTranscodingService>(
+                    property_get_bool("debug.transcoding.simulated_transcoder", false));
     binder_status_t status =
             AServiceManager_addService(service->asBinder().get(), getServiceName());
     if (status != STATUS_OK) {
diff --git a/services/mediatranscoding/MediaTranscodingService.h b/services/mediatranscoding/MediaTranscodingService.h
index a22acf2..d024c54 100644
--- a/services/mediatranscoding/MediaTranscodingService.h
+++ b/services/mediatranscoding/MediaTranscodingService.h
@@ -30,7 +30,6 @@
 using ::aidl::android::media::TranscodingSessionParcel;
 class TranscodingClientManager;
 class TranscodingSessionController;
-class TranscoderInterface;
 class UidPolicyInterface;
 class ResourcePolicyInterface;
 class ThermalPolicyInterface;
@@ -40,7 +39,7 @@
     static constexpr int32_t kInvalidSessionId = -1;
     static constexpr int32_t kInvalidClientId = -1;
 
-    MediaTranscodingService(const std::shared_ptr<TranscoderInterface>& transcoder);
+    MediaTranscodingService(bool simulated);
     virtual ~MediaTranscodingService();
 
     static void instantiate();
diff --git a/services/mediatranscoding/SimulatedTranscoder.cpp b/services/mediatranscoding/SimulatedTranscoder.cpp
index 03ee886..1de1f7b 100644
--- a/services/mediatranscoding/SimulatedTranscoder.cpp
+++ b/services/mediatranscoding/SimulatedTranscoder.cpp
@@ -33,18 +33,28 @@
         return "Pause";
     case Event::Resume:
         return "Resume";
+    case Event::Stop:
+        return "Stop";
+    case Event::Finished:
+        return "Finished";
+    case Event::Failed:
+        return "Failed";
+    case Event::Abandon:
+        return "Abandon";
     default:
         break;
     }
     return "(unknown)";
 }
 
-SimulatedTranscoder::SimulatedTranscoder() {
-    std::thread(&SimulatedTranscoder::threadLoop, this).detach();
+SimulatedTranscoder::SimulatedTranscoder(const std::shared_ptr<TranscoderCallbackInterface>& cb,
+                                         int64_t heartBeatUs __unused)
+      : mCallback(cb), mLooperReady(false) {
+    ALOGV("SimulatedTranscoder CTOR: %p", this);
 }
 
-void SimulatedTranscoder::setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) {
-    mCallback = cb;
+SimulatedTranscoder::~SimulatedTranscoder() {
+    ALOGV("SimulatedTranscoder DTOR: %p", this);
 }
 
 void SimulatedTranscoder::start(
@@ -83,8 +93,12 @@
     });
 }
 
-void SimulatedTranscoder::stop(ClientIdType clientId, SessionIdType sessionId) {
+void SimulatedTranscoder::stop(ClientIdType clientId, SessionIdType sessionId, bool abandon) {
     queueEvent(Event::Stop, clientId, sessionId, nullptr);
+
+    if (abandon) {
+        queueEvent(Event::Abandon, 0, 0, nullptr);
+    }
 }
 
 void SimulatedTranscoder::queueEvent(Event::Type type, ClientIdType clientId,
@@ -94,6 +108,15 @@
 
     auto lock = std::scoped_lock(mLock);
 
+    if (!mLooperReady) {
+        // A shared_ptr to ourselves is given to the thread's stack, so that SimulatedTranscoder
+        // object doesn't go away until the thread exits. When a watchdog timeout happens, this
+        // allows the session controller to release its reference to the TranscoderWrapper object
+        // without blocking on the thread exits.
+        std::thread([owner = shared_from_this()]() { owner->threadLoop(); }).detach();
+        mLooperReady = true;
+    }
+
     mQueue.push_back({type, clientId, sessionId, runnable});
     mCondition.notify_one();
 }
@@ -136,34 +159,36 @@
         }
 
         // Handle the events, adjust state and send updates to client accordingly.
-        while (!mQueue.empty()) {
-            Event event = *mQueue.begin();
-            mQueue.pop_front();
+        Event event = *mQueue.begin();
+        mQueue.pop_front();
 
-            ALOGV("%s: session {%lld, %d}: %s", __FUNCTION__, (long long)event.clientId,
-                  event.sessionId, toString(event.type));
+        ALOGV("%s: session {%lld, %d}: %s", __FUNCTION__, (long long)event.clientId,
+              event.sessionId, toString(event.type));
 
-            if (!running && (event.type == Event::Start || event.type == Event::Resume)) {
-                running = true;
-                lastRunningTime = std::chrono::system_clock::now();
-                lastRunningEvent = event;
-                if (event.type == Event::Start) {
-                    remainingUs = std::chrono::milliseconds(mSessionProcessingTimeMs);
-                }
-            } else if (running && (event.type == Event::Pause || event.type == Event::Stop)) {
-                running = false;
-                remainingUs -= (std::chrono::system_clock::now() - lastRunningTime);
-            } else {
-                ALOGW("%s: discarding bad event: session {%lld, %d}: %s", __FUNCTION__,
-                      (long long)event.clientId, event.sessionId, toString(event.type));
-                continue;
+        if (event.type == Event::Abandon) {
+            break;
+        }
+
+        if (!running && (event.type == Event::Start || event.type == Event::Resume)) {
+            running = true;
+            lastRunningTime = std::chrono::system_clock::now();
+            lastRunningEvent = event;
+            if (event.type == Event::Start) {
+                remainingUs = std::chrono::milliseconds(mSessionProcessingTimeMs);
             }
+        } else if (running && (event.type == Event::Pause || event.type == Event::Stop)) {
+            running = false;
+            remainingUs -= (std::chrono::system_clock::now() - lastRunningTime);
+        } else {
+            ALOGW("%s: discarding bad event: session {%lld, %d}: %s", __FUNCTION__,
+                  (long long)event.clientId, event.sessionId, toString(event.type));
+            continue;
+        }
 
-            if (event.runnable != nullptr) {
-                lock.unlock();
-                event.runnable();
-                lock.lock();
-            }
+        if (event.runnable != nullptr) {
+            lock.unlock();
+            event.runnable();
+            lock.lock();
         }
     }
 }
diff --git a/services/mediatranscoding/SimulatedTranscoder.h b/services/mediatranscoding/SimulatedTranscoder.h
index ba2bba0..6b51b4e 100644
--- a/services/mediatranscoding/SimulatedTranscoder.h
+++ b/services/mediatranscoding/SimulatedTranscoder.h
@@ -36,10 +36,11 @@
  * Session lifecycle events are reported via progress updates with special progress
  * numbers (equal to the Event's type).
  */
-class SimulatedTranscoder : public TranscoderInterface {
+class SimulatedTranscoder : public TranscoderInterface,
+                            public std::enable_shared_from_this<SimulatedTranscoder> {
 public:
     struct Event {
-        enum Type { NoEvent, Start, Pause, Resume, Stop, Finished, Failed } type;
+        enum Type { NoEvent, Start, Pause, Resume, Stop, Finished, Failed, Abandon } type;
         ClientIdType clientId;
         SessionIdType sessionId;
         std::function<void()> runnable;
@@ -47,10 +48,11 @@
 
     static constexpr int64_t kSessionDurationUs = 1000000;
 
-    SimulatedTranscoder();
+    SimulatedTranscoder(const std::shared_ptr<TranscoderCallbackInterface>& cb,
+                        int64_t heartBeatUs);
+    ~SimulatedTranscoder();
 
     // TranscoderInterface
-    void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) override;
     void start(ClientIdType clientId, SessionIdType sessionId,
                const TranscodingRequestParcel& request,
                const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
@@ -58,7 +60,7 @@
     void resume(ClientIdType clientId, SessionIdType sessionId,
                 const TranscodingRequestParcel& request,
                 const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
-    void stop(ClientIdType clientId, SessionIdType sessionId) override;
+    void stop(ClientIdType clientId, SessionIdType sessionId, bool abandon = false) override;
     // ~TranscoderInterface
 
 private:
@@ -66,6 +68,7 @@
     std::mutex mLock;
     std::condition_variable mCondition;
     std::list<Event> mQueue GUARDED_BY(mLock);
+    bool mLooperReady;
 
     // Minimum time spent on transcode the video. This is used just for testing.
     int64_t mSessionProcessingTimeMs = kSessionDurationUs / 1000;