transcoding: add watchdog to prevent transcoder hang
Add a watchdog to monitor transcoder progress. Make transcoder
report heart beat regularly as long as there is new progress.
If heartbeat stops, watchdog will initiate a timeout to
1) Abandon old TranscoderWrapper. We try to shut it down nicely,
however, if it's really stuck, we'll have to leave it there.
2) Instantiate a new TranscoderWrapper with new looper.
3) Report Watchdog timeout to client.
Tests:
- New unit tests to MediaTranscoder, TranscodingSessionController
and MediaTranscodingService's simulated test (for error code reporting).
- Manually tested that long recording works properly without timeout.
bug: 169453212
Change-Id: Iae89e49e8e12d6078dc49eef2960efd03e91c431
diff --git a/media/libmediatranscoding/TranscodingSessionController.cpp b/media/libmediatranscoding/TranscodingSessionController.cpp
index 09ad3cd..d12af21 100644
--- a/media/libmediatranscoding/TranscodingSessionController.cpp
+++ b/media/libmediatranscoding/TranscodingSessionController.cpp
@@ -24,6 +24,7 @@
#include <media/TranscodingUidPolicy.h>
#include <utils/Log.h>
+#include <thread>
#include <utility>
namespace android {
@@ -60,12 +61,140 @@
return "(unknown)";
}
+///////////////////////////////////////////////////////////////////////////////
+struct TranscodingSessionController::Watchdog {
+ Watchdog(TranscodingSessionController* owner, int64_t timeoutUs);
+ ~Watchdog();
+
+ // Starts monitoring the session.
+ void start(const SessionKeyType& key);
+ // Stops monitoring the session.
+ void stop();
+ // Signals that the session is still alive. Must be sent at least every mTimeoutUs.
+ // (Timeout will happen if no ping in mTimeoutUs since the last ping.)
+ void keepAlive();
+
+private:
+ void threadLoop();
+ void updateTimer_l();
+
+ TranscodingSessionController* mOwner;
+ const int64_t mTimeoutUs;
+ mutable std::mutex mLock;
+ std::condition_variable mCondition GUARDED_BY(mLock);
+ // Whether watchdog is monitoring a session for timeout.
+ bool mActive GUARDED_BY(mLock);
+ // Whether watchdog is aborted and the monitoring thread should exit.
+ bool mAbort GUARDED_BY(mLock);
+ // When watchdog is active, the next timeout time point.
+ std::chrono::system_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
+ // When watchdog is active, the session being watched.
+ SessionKeyType mSessionToWatch GUARDED_BY(mLock);
+ std::thread mThread;
+};
+
+static constexpr int64_t kWatchdogTimeoutUs = 3000000LL;
+static constexpr int64_t kTranscoderHeartBeatIntervalUs = 1000000LL;
+
+TranscodingSessionController::Watchdog::Watchdog(TranscodingSessionController* owner,
+ int64_t timeoutUs)
+ : mOwner(owner),
+ mTimeoutUs(timeoutUs),
+ mActive(false),
+ mAbort(false),
+ mThread(&Watchdog::threadLoop, this) {
+ ALOGV("Watchdog CTOR: %p", this);
+}
+
+TranscodingSessionController::Watchdog::~Watchdog() {
+ ALOGV("Watchdog DTOR: %p", this);
+
+ {
+ // Exit the looper thread.
+ std::scoped_lock lock{mLock};
+
+ mAbort = true;
+ mCondition.notify_one();
+ }
+
+ mThread.join();
+ ALOGV("Watchdog DTOR: %p, done.", this);
+}
+
+void TranscodingSessionController::Watchdog::start(const SessionKeyType& key) {
+ std::scoped_lock lock{mLock};
+
+ if (!mActive) {
+ ALOGI("Watchdog start: %s", sessionToString(key).c_str());
+
+ mActive = true;
+ mSessionToWatch = key;
+ updateTimer_l();
+ mCondition.notify_one();
+ }
+}
+
+void TranscodingSessionController::Watchdog::stop() {
+ std::scoped_lock lock{mLock};
+
+ if (mActive) {
+ ALOGI("Watchdog stop: %s", sessionToString(mSessionToWatch).c_str());
+
+ mActive = false;
+ mCondition.notify_one();
+ }
+}
+
+void TranscodingSessionController::Watchdog::keepAlive() {
+ std::scoped_lock lock{mLock};
+
+ if (mActive) {
+ ALOGI("Watchdog keepAlive: %s", sessionToString(mSessionToWatch).c_str());
+
+ updateTimer_l();
+ mCondition.notify_one();
+ }
+}
+
+// updateTimer_l() is only called with lock held.
+void TranscodingSessionController::Watchdog::updateTimer_l() NO_THREAD_SAFETY_ANALYSIS {
+ std::chrono::microseconds timeout(mTimeoutUs);
+ mNextTimeoutTime = std::chrono::system_clock::now() + timeout;
+}
+
+// Unfortunately std::unique_lock is incompatible with -Wthread-safety.
+void TranscodingSessionController::Watchdog::threadLoop() NO_THREAD_SAFETY_ANALYSIS {
+ std::unique_lock<std::mutex> lock{mLock};
+
+ while (!mAbort) {
+ if (!mActive) {
+ mCondition.wait(lock);
+ continue;
+ }
+ // Watchdog active, wait till next timeout time.
+ if (mCondition.wait_until(lock, mNextTimeoutTime) == std::cv_status::timeout) {
+ // If timeout happens, report timeout and deactivate watchdog.
+ mActive = false;
+ // Make a copy of session key, as once we unlock, it could be unprotected.
+ SessionKeyType sessionKey = mSessionToWatch;
+
+ ALOGE("Watchdog timeout: %s", sessionToString(sessionKey).c_str());
+
+ lock.unlock();
+ mOwner->onError(sessionKey.first, sessionKey.second,
+ TranscodingErrorCode::kWatchdogTimeout);
+ lock.lock();
+ }
+ }
+}
+///////////////////////////////////////////////////////////////////////////////
+
TranscodingSessionController::TranscodingSessionController(
- const std::shared_ptr<TranscoderInterface>& transcoder,
+ const TranscoderFactoryType& transcoderFactory,
const std::shared_ptr<UidPolicyInterface>& uidPolicy,
const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy)
- : mTranscoder(transcoder),
+ : mTranscoderFactory(transcoderFactory),
mUidPolicy(uidPolicy),
mResourcePolicy(resourcePolicy),
mThermalPolicy(thermalPolicy),
@@ -160,6 +289,26 @@
return &mSessionMap[topSessionKey];
}
+void TranscodingSessionController::setSessionState_l(Session* session, Session::State state) {
+ bool wasRunning = (session->getState() == Session::RUNNING);
+ session->setState(state);
+ bool isRunning = (session->getState() == Session::RUNNING);
+
+ if (wasRunning == isRunning) {
+ return;
+ }
+
+ // Currently we only have 1 running session, and we always put the previous
+ // session in non-running state before we run the new session, so it's okay
+ // to start/stop the watchdog here. If this assumption changes, we need to
+ // track the number of running sessions and start/stop watchdog based on that.
+ if (isRunning) {
+ mWatchdog->start(session->key);
+ } else {
+ mWatchdog->stop();
+ }
+}
+
void TranscodingSessionController::Session::setState(Session::State newState) {
if (state == newState) {
return;
@@ -206,12 +355,17 @@
// take some actions to ensure it's running.
if (topSession != curSession ||
(shouldBeRunning ^ (topSession->getState() == Session::RUNNING))) {
+ if (mTranscoder == nullptr) {
+ mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
+ mWatchdog = std::make_shared<Watchdog>(this, kWatchdogTimeoutUs);
+ }
+
// If current session is running, pause it first. Note this is true for either
// cases: 1) If top session is changing, or 2) if top session is not changing but
// the topSession's state is changing.
if (curSession != nullptr && curSession->getState() == Session::RUNNING) {
mTranscoder->pause(curSession->key.first, curSession->key.second);
- curSession->setState(Session::PAUSED);
+ setSessionState_l(curSession, Session::PAUSED);
}
// If we are not experiencing resource loss nor thermal throttling, we can start
// or resume the topSession now.
@@ -223,7 +377,7 @@
mTranscoder->resume(topSession->key.first, topSession->key.second,
topSession->request, topSession->callback.lock());
}
- topSession->setState(Session::RUNNING);
+ setSessionState_l(topSession, Session::RUNNING);
}
}
mCurrentSession = topSession;
@@ -264,7 +418,7 @@
mCurrentSession = nullptr;
}
- mSessionMap[sessionKey].setState(finalState);
+ setSessionState_l(&mSessionMap[sessionKey], finalState);
mSessionHistory.push_back(mSessionMap[sessionKey]);
if (mSessionHistory.size() > kSessionHistoryMax) {
mSessionHistory.erase(mSessionHistory.begin());
@@ -361,7 +515,7 @@
mSessionMap[sessionKey].pauseCount = 0;
mSessionMap[sessionKey].request = request;
mSessionMap[sessionKey].callback = callback;
- mSessionMap[sessionKey].setState(Session::NOT_STARTED);
+ setSessionState_l(&mSessionMap[sessionKey], Session::NOT_STARTED);
// If it's an offline session, the queue was already added in constructor.
// If it's a real-time sessions, check if a queue is already present for the uid,
@@ -527,6 +681,15 @@
void TranscodingSessionController::onError(ClientIdType clientId, SessionIdType sessionId,
TranscodingErrorCode err) {
notifyClient(clientId, sessionId, "error", [=](const SessionKeyType& sessionKey) {
+ if (err == TranscodingErrorCode::kWatchdogTimeout) {
+ // Abandon the transcoder, as its handler thread might be stuck in some call to
+ // MediaTranscoder altogether, and may not be able to handle any new tasks.
+ mTranscoder->stop(clientId, sessionId, true /*abandon*/);
+ // Clear the last ref count before we create new transcoder.
+ mTranscoder = nullptr;
+ mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
+ }
+
{
auto clientCallback = mSessionMap[sessionKey].callback.lock();
if (clientCallback != nullptr) {
@@ -555,6 +718,11 @@
});
}
+void TranscodingSessionController::onHeartBeat(ClientIdType clientId, SessionIdType sessionId) {
+ notifyClient(clientId, sessionId, "heart-beat",
+ [=](const SessionKeyType& /*sessionKey*/) { mWatchdog->keepAlive(); });
+}
+
void TranscodingSessionController::onResourceLost(ClientIdType clientId, SessionIdType sessionId) {
ALOGI("%s", __FUNCTION__);
@@ -572,7 +740,7 @@
// If we receive a resource loss event, the transcoder already paused the transcoding,
// so we don't need to call onPaused() to pause it. However, we still need to notify
// the client and update the session state here.
- resourceLostSession->setState(Session::PAUSED);
+ setSessionState_l(resourceLostSession, Session::PAUSED);
// Notify the client as a paused event.
auto clientCallback = resourceLostSession->callback.lock();
if (clientCallback != nullptr) {