transcoding: add watchdog to prevent transcoder hang

Add a watchdog to monitor transcoder progress. Make transcoder
report heart beat regularly as long as there is new progress.
If heartbeat stops, watchdog will initiate a timeout to

1) Abandon old TranscoderWrapper. We try to shut it down nicely,
however, if it's really stuck, we'll have to leave it there.
2) Instantiate a new TranscoderWrapper with new looper.
3) Report Watchdog timeout to client.

Tests:
- New unit tests to MediaTranscoder, TranscodingSessionController
and MediaTranscodingService's simulated test (for error code reporting).
- Manually tested that long recording works properly without timeout.

bug: 169453212
Change-Id: Iae89e49e8e12d6078dc49eef2960efd03e91c431
diff --git a/services/mediatranscoding/MediaTranscodingService.cpp b/services/mediatranscoding/MediaTranscodingService.cpp
index 5c8cc1a..cca36fb 100644
--- a/services/mediatranscoding/MediaTranscodingService.cpp
+++ b/services/mediatranscoding/MediaTranscodingService.cpp
@@ -41,16 +41,21 @@
             errorCode,                                \
             String8::format("%s:%d: " errorString, __FUNCTION__, __LINE__, ##__VA_ARGS__))
 
-MediaTranscodingService::MediaTranscodingService(
-        const std::shared_ptr<TranscoderInterface>& transcoder)
+MediaTranscodingService::MediaTranscodingService(bool simulated)
       : mUidPolicy(new TranscodingUidPolicy()),
         mResourcePolicy(new TranscodingResourcePolicy()),
-        mThermalPolicy(new TranscodingThermalPolicy()),
-        mSessionController(new TranscodingSessionController(transcoder, mUidPolicy, mResourcePolicy,
-                                                            mThermalPolicy)),
-        mClientManager(new TranscodingClientManager(mSessionController)) {
+        mThermalPolicy(new TranscodingThermalPolicy()) {
     ALOGV("MediaTranscodingService is created");
-    transcoder->setCallback(mSessionController);
+    mSessionController.reset(new TranscodingSessionController(
+            [simulated](const std::shared_ptr<TranscoderCallbackInterface>& cb,
+                        int64_t heartBeatUs) -> std::shared_ptr<TranscoderInterface> {
+                if (simulated) {
+                    return std::make_shared<SimulatedTranscoder>(cb, heartBeatUs);
+                }
+                return std::make_shared<TranscoderWrapper>(cb, heartBeatUs);
+            },
+            mUidPolicy, mResourcePolicy, mThermalPolicy));
+    mClientManager.reset(new TranscodingClientManager(mSessionController));
     mUidPolicy->setCallback(mSessionController);
     mResourcePolicy->setCallback(mSessionController);
     mThermalPolicy->setCallback(mSessionController);
@@ -94,15 +99,9 @@
 
 //static
 void MediaTranscodingService::instantiate() {
-    std::shared_ptr<TranscoderInterface> transcoder;
-    if (property_get_bool("debug.transcoding.simulated_transcoder", false)) {
-        transcoder = std::make_shared<SimulatedTranscoder>();
-    } else {
-        transcoder = std::make_shared<TranscoderWrapper>();
-    }
-
     std::shared_ptr<MediaTranscodingService> service =
-            ::ndk::SharedRefBase::make<MediaTranscodingService>(transcoder);
+            ::ndk::SharedRefBase::make<MediaTranscodingService>(
+                    property_get_bool("debug.transcoding.simulated_transcoder", false));
     binder_status_t status =
             AServiceManager_addService(service->asBinder().get(), getServiceName());
     if (status != STATUS_OK) {
diff --git a/services/mediatranscoding/MediaTranscodingService.h b/services/mediatranscoding/MediaTranscodingService.h
index a22acf2..d024c54 100644
--- a/services/mediatranscoding/MediaTranscodingService.h
+++ b/services/mediatranscoding/MediaTranscodingService.h
@@ -30,7 +30,6 @@
 using ::aidl::android::media::TranscodingSessionParcel;
 class TranscodingClientManager;
 class TranscodingSessionController;
-class TranscoderInterface;
 class UidPolicyInterface;
 class ResourcePolicyInterface;
 class ThermalPolicyInterface;
@@ -40,7 +39,7 @@
     static constexpr int32_t kInvalidSessionId = -1;
     static constexpr int32_t kInvalidClientId = -1;
 
-    MediaTranscodingService(const std::shared_ptr<TranscoderInterface>& transcoder);
+    MediaTranscodingService(bool simulated);
     virtual ~MediaTranscodingService();
 
     static void instantiate();
diff --git a/services/mediatranscoding/SimulatedTranscoder.cpp b/services/mediatranscoding/SimulatedTranscoder.cpp
index 03ee886..1de1f7b 100644
--- a/services/mediatranscoding/SimulatedTranscoder.cpp
+++ b/services/mediatranscoding/SimulatedTranscoder.cpp
@@ -33,18 +33,28 @@
         return "Pause";
     case Event::Resume:
         return "Resume";
+    case Event::Stop:
+        return "Stop";
+    case Event::Finished:
+        return "Finished";
+    case Event::Failed:
+        return "Failed";
+    case Event::Abandon:
+        return "Abandon";
     default:
         break;
     }
     return "(unknown)";
 }
 
-SimulatedTranscoder::SimulatedTranscoder() {
-    std::thread(&SimulatedTranscoder::threadLoop, this).detach();
+SimulatedTranscoder::SimulatedTranscoder(const std::shared_ptr<TranscoderCallbackInterface>& cb,
+                                         int64_t heartBeatUs __unused)
+      : mCallback(cb), mLooperReady(false) {
+    ALOGV("SimulatedTranscoder CTOR: %p", this);
 }
 
-void SimulatedTranscoder::setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) {
-    mCallback = cb;
+SimulatedTranscoder::~SimulatedTranscoder() {
+    ALOGV("SimulatedTranscoder DTOR: %p", this);
 }
 
 void SimulatedTranscoder::start(
@@ -83,8 +93,12 @@
     });
 }
 
-void SimulatedTranscoder::stop(ClientIdType clientId, SessionIdType sessionId) {
+void SimulatedTranscoder::stop(ClientIdType clientId, SessionIdType sessionId, bool abandon) {
     queueEvent(Event::Stop, clientId, sessionId, nullptr);
+
+    if (abandon) {
+        queueEvent(Event::Abandon, 0, 0, nullptr);
+    }
 }
 
 void SimulatedTranscoder::queueEvent(Event::Type type, ClientIdType clientId,
@@ -94,6 +108,15 @@
 
     auto lock = std::scoped_lock(mLock);
 
+    if (!mLooperReady) {
+        // A shared_ptr to ourselves is given to the thread's stack, so that SimulatedTranscoder
+        // object doesn't go away until the thread exits. When a watchdog timeout happens, this
+        // allows the session controller to release its reference to the TranscoderWrapper object
+        // without blocking on the thread exits.
+        std::thread([owner = shared_from_this()]() { owner->threadLoop(); }).detach();
+        mLooperReady = true;
+    }
+
     mQueue.push_back({type, clientId, sessionId, runnable});
     mCondition.notify_one();
 }
@@ -136,34 +159,36 @@
         }
 
         // Handle the events, adjust state and send updates to client accordingly.
-        while (!mQueue.empty()) {
-            Event event = *mQueue.begin();
-            mQueue.pop_front();
+        Event event = *mQueue.begin();
+        mQueue.pop_front();
 
-            ALOGV("%s: session {%lld, %d}: %s", __FUNCTION__, (long long)event.clientId,
-                  event.sessionId, toString(event.type));
+        ALOGV("%s: session {%lld, %d}: %s", __FUNCTION__, (long long)event.clientId,
+              event.sessionId, toString(event.type));
 
-            if (!running && (event.type == Event::Start || event.type == Event::Resume)) {
-                running = true;
-                lastRunningTime = std::chrono::system_clock::now();
-                lastRunningEvent = event;
-                if (event.type == Event::Start) {
-                    remainingUs = std::chrono::milliseconds(mSessionProcessingTimeMs);
-                }
-            } else if (running && (event.type == Event::Pause || event.type == Event::Stop)) {
-                running = false;
-                remainingUs -= (std::chrono::system_clock::now() - lastRunningTime);
-            } else {
-                ALOGW("%s: discarding bad event: session {%lld, %d}: %s", __FUNCTION__,
-                      (long long)event.clientId, event.sessionId, toString(event.type));
-                continue;
+        if (event.type == Event::Abandon) {
+            break;
+        }
+
+        if (!running && (event.type == Event::Start || event.type == Event::Resume)) {
+            running = true;
+            lastRunningTime = std::chrono::system_clock::now();
+            lastRunningEvent = event;
+            if (event.type == Event::Start) {
+                remainingUs = std::chrono::milliseconds(mSessionProcessingTimeMs);
             }
+        } else if (running && (event.type == Event::Pause || event.type == Event::Stop)) {
+            running = false;
+            remainingUs -= (std::chrono::system_clock::now() - lastRunningTime);
+        } else {
+            ALOGW("%s: discarding bad event: session {%lld, %d}: %s", __FUNCTION__,
+                  (long long)event.clientId, event.sessionId, toString(event.type));
+            continue;
+        }
 
-            if (event.runnable != nullptr) {
-                lock.unlock();
-                event.runnable();
-                lock.lock();
-            }
+        if (event.runnable != nullptr) {
+            lock.unlock();
+            event.runnable();
+            lock.lock();
         }
     }
 }
diff --git a/services/mediatranscoding/SimulatedTranscoder.h b/services/mediatranscoding/SimulatedTranscoder.h
index ba2bba0..6b51b4e 100644
--- a/services/mediatranscoding/SimulatedTranscoder.h
+++ b/services/mediatranscoding/SimulatedTranscoder.h
@@ -36,10 +36,11 @@
  * Session lifecycle events are reported via progress updates with special progress
  * numbers (equal to the Event's type).
  */
-class SimulatedTranscoder : public TranscoderInterface {
+class SimulatedTranscoder : public TranscoderInterface,
+                            public std::enable_shared_from_this<SimulatedTranscoder> {
 public:
     struct Event {
-        enum Type { NoEvent, Start, Pause, Resume, Stop, Finished, Failed } type;
+        enum Type { NoEvent, Start, Pause, Resume, Stop, Finished, Failed, Abandon } type;
         ClientIdType clientId;
         SessionIdType sessionId;
         std::function<void()> runnable;
@@ -47,10 +48,11 @@
 
     static constexpr int64_t kSessionDurationUs = 1000000;
 
-    SimulatedTranscoder();
+    SimulatedTranscoder(const std::shared_ptr<TranscoderCallbackInterface>& cb,
+                        int64_t heartBeatUs);
+    ~SimulatedTranscoder();
 
     // TranscoderInterface
-    void setCallback(const std::shared_ptr<TranscoderCallbackInterface>& cb) override;
     void start(ClientIdType clientId, SessionIdType sessionId,
                const TranscodingRequestParcel& request,
                const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
@@ -58,7 +60,7 @@
     void resume(ClientIdType clientId, SessionIdType sessionId,
                 const TranscodingRequestParcel& request,
                 const std::shared_ptr<ITranscodingClientCallback>& clientCallback) override;
-    void stop(ClientIdType clientId, SessionIdType sessionId) override;
+    void stop(ClientIdType clientId, SessionIdType sessionId, bool abandon = false) override;
     // ~TranscoderInterface
 
 private:
@@ -66,6 +68,7 @@
     std::mutex mLock;
     std::condition_variable mCondition;
     std::list<Event> mQueue GUARDED_BY(mLock);
+    bool mLooperReady;
 
     // Minimum time spent on transcode the video. This is used just for testing.
     int64_t mSessionProcessingTimeMs = kSessionDurationUs / 1000;