transcoding: drop transcoding request for bad actors
Drop requests for a uid if it submits more than X back-to-back requests
that lasted longer than Y seconds.
bug: 177631807
test: new and existing unit tests; CTS MediaTranscodeManagerTest;
Manual testing Yelp app (with shortened thresholds)
Change-Id: I88287be62190ee5bc815dedb2a62c54ed3c41824
diff --git a/media/libmediatranscoding/TranscodingSessionController.cpp b/media/libmediatranscoding/TranscodingSessionController.cpp
index 54357bd..aeabe0f 100644
--- a/media/libmediatranscoding/TranscodingSessionController.cpp
+++ b/media/libmediatranscoding/TranscodingSessionController.cpp
@@ -87,15 +87,12 @@
// Whether watchdog is aborted and the monitoring thread should exit.
bool mAbort GUARDED_BY(mLock);
// When watchdog is active, the next timeout time point.
- std::chrono::system_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
+ std::chrono::steady_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
// When watchdog is active, the session being watched.
SessionKeyType mSessionToWatch GUARDED_BY(mLock);
std::thread mThread;
};
-static constexpr int64_t kWatchdogTimeoutUs = 3000000LL;
-static constexpr int64_t kTranscoderHeartBeatIntervalUs = 1000000LL;
-
TranscodingSessionController::Watchdog::Watchdog(TranscodingSessionController* owner,
int64_t timeoutUs)
: mOwner(owner),
@@ -159,7 +156,7 @@
// updateTimer_l() is only called with lock held.
void TranscodingSessionController::Watchdog::updateTimer_l() NO_THREAD_SAFETY_ANALYSIS {
std::chrono::microseconds timeout(mTimeoutUs);
- mNextTimeoutTime = std::chrono::system_clock::now() + timeout;
+ mNextTimeoutTime = std::chrono::steady_clock::now() + timeout;
}
// Unfortunately std::unique_lock is incompatible with -Wthread-safety.
@@ -188,12 +185,84 @@
}
}
///////////////////////////////////////////////////////////////////////////////
+struct TranscodingSessionController::Pacer {
+ Pacer(const ControllerConfig& config)
+ : mBurstThresholdMs(config.pacerBurstThresholdMs),
+ mBurstCountQuota(config.pacerBurstCountQuota),
+ mBurstTimeQuotaSec(config.pacerBurstTimeQuotaSeconds) {}
+
+ ~Pacer() = default;
+
+ void onSessionCompleted(uid_t uid, std::chrono::microseconds runningTime);
+ bool onSessionStarted(uid_t uid);
+
+private:
+ // Threshold of time between finish/start below which a back-to-back start is counted.
+ int32_t mBurstThresholdMs;
+ // Maximum allowed back-to-back start count.
+ int32_t mBurstCountQuota;
+ // Maximum allowed back-to-back running time.
+ int32_t mBurstTimeQuotaSec;
+
+ struct UidHistoryEntry {
+ std::chrono::steady_clock::time_point lastCompletedTime;
+ int32_t burstCount = 0;
+ std::chrono::steady_clock::duration burstDuration{0};
+ };
+ std::map<uid_t, UidHistoryEntry> mUidHistoryMap;
+};
+
+void TranscodingSessionController::Pacer::onSessionCompleted(
+ uid_t uid, std::chrono::microseconds runningTime) {
+ if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) {
+ mUidHistoryMap.emplace(uid, UidHistoryEntry{});
+ }
+ mUidHistoryMap[uid].lastCompletedTime = std::chrono::steady_clock::now();
+ mUidHistoryMap[uid].burstCount++;
+ mUidHistoryMap[uid].burstDuration += runningTime;
+}
+
+bool TranscodingSessionController::Pacer::onSessionStarted(uid_t uid) {
+ // If uid doesn't exist, this uid has no completed sessions. Skip.
+ if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) {
+ return true;
+ }
+
+ // TODO: if Thermal throttling or resoure lost happened to occurr between this start
+ // and the previous completion, we should deduct the paused time from the elapsed time.
+ // (Individual session's pause time, on the other hand, doesn't need to be deducted
+ // because it doesn't affect the gap between last completion and the start.
+ auto timeSinceLastComplete =
+ std::chrono::steady_clock::now() - mUidHistoryMap[uid].lastCompletedTime;
+ if (mUidHistoryMap[uid].burstCount >= mBurstCountQuota &&
+ mUidHistoryMap[uid].burstDuration >= std::chrono::seconds(mBurstTimeQuotaSec)) {
+ ALOGW("Pacer: uid %d: over quota, burst count %d, time %lldms", uid,
+ mUidHistoryMap[uid].burstCount, (long long)mUidHistoryMap[uid].burstDuration.count());
+ return false;
+ }
+
+ // If not over quota, allow the session, and reset as long as this is not too close
+ // to previous completion.
+ if (timeSinceLastComplete > std::chrono::milliseconds(mBurstThresholdMs)) {
+ ALOGV("Pacer: uid %d: reset quota", uid);
+ mUidHistoryMap[uid].burstCount = 0;
+ mUidHistoryMap[uid].burstDuration = std::chrono::milliseconds(0);
+ } else {
+ ALOGV("Pacer: uid %d: burst count %d, time %lldms", uid, mUidHistoryMap[uid].burstCount,
+ (long long)mUidHistoryMap[uid].burstDuration.count());
+ }
+
+ return true;
+}
+
+///////////////////////////////////////////////////////////////////////////////
TranscodingSessionController::TranscodingSessionController(
const TranscoderFactoryType& transcoderFactory,
const std::shared_ptr<UidPolicyInterface>& uidPolicy,
const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
- const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy)
+ const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy,
+ const ControllerConfig* config)
: mTranscoderFactory(transcoderFactory),
mUidPolicy(uidPolicy),
mResourcePolicy(resourcePolicy),
@@ -206,6 +275,13 @@
mSessionQueues.emplace(OFFLINE_UID, SessionQueueType());
mUidPackageNames[OFFLINE_UID] = "(offline)";
mThermalThrottling = thermalPolicy->getThrottlingStatus();
+ if (config != nullptr) {
+ mConfig = *config;
+ }
+ mPacer.reset(new Pacer(mConfig));
+ ALOGD("@@@ watchdog %lld, burst count %d, burst time %d, burst threshold %d",
+ (long long)mConfig.watchdogTimeoutUs, mConfig.pacerBurstCountQuota,
+ mConfig.pacerBurstTimeQuotaSeconds, mConfig.pacerBurstThresholdMs);
}
TranscodingSessionController::~TranscodingSessionController() {}
@@ -280,10 +356,21 @@
write(fd, result.string(), result.size());
}
+/*
+ * Returns nullptr if there is no session, or we're paused globally (due to resource lost,
+ * thermal throttling, etc.). Otherwise, return the session that should be run next.
+ */
TranscodingSessionController::Session* TranscodingSessionController::getTopSession_l() {
if (mSessionMap.empty()) {
return nullptr;
}
+
+ // Return nullptr if we're paused globally due to resource lost or thermal throttling.
+ if (((mResourcePolicy != nullptr && mResourceLost) ||
+ (mThermalPolicy != nullptr && mThermalThrottling))) {
+ return nullptr;
+ }
+
uid_t topUid = *mUidSortedList.begin();
SessionKeyType topSessionKey = *mSessionQueues[topUid].begin();
return &mSessionMap[topSessionKey];
@@ -313,9 +400,10 @@
if (state == newState) {
return;
}
- auto nowTime = std::chrono::system_clock::now();
+ auto nowTime = std::chrono::steady_clock::now();
if (state != INVALID) {
- std::chrono::microseconds elapsedTime = (nowTime - stateEnterTime);
+ std::chrono::microseconds elapsedTime =
+ std::chrono::duration_cast<std::chrono::microseconds>(nowTime - stateEnterTime);
switch (state) {
case PAUSED:
pausedTime = pausedTime + elapsedTime;
@@ -338,49 +426,60 @@
}
void TranscodingSessionController::updateCurrentSession_l() {
- Session* topSession = getTopSession_l();
Session* curSession = mCurrentSession;
- ALOGV("updateCurrentSession: topSession is %s, curSession is %s",
- topSession == nullptr ? "null" : sessionToString(topSession->key).c_str(),
- curSession == nullptr ? "null" : sessionToString(curSession->key).c_str());
+ Session* topSession = getTopSession_l();
- if (topSession == nullptr) {
- mCurrentSession = nullptr;
- return;
+ // Delayed init of transcoder and watchdog.
+ if (mTranscoder == nullptr) {
+ mTranscoder = mTranscoderFactory(shared_from_this());
+ mWatchdog = std::make_shared<Watchdog>(this, mConfig.watchdogTimeoutUs);
}
- bool shouldBeRunning = !((mResourcePolicy != nullptr && mResourceLost) ||
- (mThermalPolicy != nullptr && mThermalThrottling));
- // If we found a topSession that should be run, and it's not already running,
- // take some actions to ensure it's running.
- if (topSession != curSession ||
- (shouldBeRunning ^ (topSession->getState() == Session::RUNNING))) {
- if (mTranscoder == nullptr) {
- mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
- mWatchdog = std::make_shared<Watchdog>(this, kWatchdogTimeoutUs);
- }
+ // If we found a different top session, or the top session's running state is not
+ // correct. Take some actions to ensure it's correct.
+ while ((topSession = getTopSession_l()) != curSession ||
+ (topSession != nullptr && !topSession->isRunning())) {
+ ALOGV("updateCurrentSession_l: topSession is %s, curSession is %s",
+ topSession == nullptr ? "null" : sessionToString(topSession->key).c_str(),
+ curSession == nullptr ? "null" : sessionToString(curSession->key).c_str());
- // If current session is running, pause it first. Note this is true for either
- // cases: 1) If top session is changing, or 2) if top session is not changing but
- // the topSession's state is changing.
+ // If current session is running, pause it first. Note this is needed for either
+ // cases: 1) Top session is changing to another session, or 2) Top session is
+ // changing to null (which means we should be globally paused).
if (curSession != nullptr && curSession->getState() == Session::RUNNING) {
mTranscoder->pause(curSession->key.first, curSession->key.second);
setSessionState_l(curSession, Session::PAUSED);
}
- // If we are not experiencing resource loss nor thermal throttling, we can start
- // or resume the topSession now.
- if (shouldBeRunning) {
- if (topSession->getState() == Session::NOT_STARTED) {
- mTranscoder->start(topSession->key.first, topSession->key.second,
- topSession->request, topSession->callingUid,
- topSession->callback.lock());
- } else if (topSession->getState() == Session::PAUSED) {
- mTranscoder->resume(topSession->key.first, topSession->key.second,
- topSession->request, topSession->callingUid,
- topSession->callback.lock());
+
+ if (topSession == nullptr) {
+ // Nothing more to run (either no session or globally paused).
+ break;
+ }
+
+ // Otherwise, ensure topSession is running.
+ if (topSession->getState() == Session::NOT_STARTED) {
+ if (!mPacer->onSessionStarted(topSession->clientUid)) {
+ // Unfortunately this uid is out of quota for new sessions.
+ // Drop this sesion and try another one.
+ {
+ auto clientCallback = mSessionMap[topSession->key].callback.lock();
+ if (clientCallback != nullptr) {
+ clientCallback->onTranscodingFailed(
+ topSession->key.second, TranscodingErrorCode::kDroppedByService);
+ }
+ }
+ removeSession_l(topSession->key, Session::DROPPED_BY_PACER);
+ continue;
}
+ mTranscoder->start(topSession->key.first, topSession->key.second, topSession->request,
+ topSession->callingUid, topSession->callback.lock());
+ setSessionState_l(topSession, Session::RUNNING);
+ } else if (topSession->getState() == Session::PAUSED) {
+ mTranscoder->resume(topSession->key.first, topSession->key.second, topSession->request,
+ topSession->callingUid, topSession->callback.lock());
setSessionState_l(topSession, Session::RUNNING);
}
+ break;
}
mCurrentSession = topSession;
}
@@ -421,6 +520,12 @@
}
setSessionState_l(&mSessionMap[sessionKey], finalState);
+
+ if (finalState == Session::FINISHED || finalState == Session::ERROR) {
+ mPacer->onSessionCompleted(mSessionMap[sessionKey].clientUid,
+ mSessionMap[sessionKey].runningTime);
+ }
+
mSessionHistory.push_back(mSessionMap[sessionKey]);
if (mSessionHistory.size() > kSessionHistoryMax) {
mSessionHistory.erase(mSessionHistory.begin());
@@ -514,8 +619,6 @@
mSessionMap[sessionKey].key = sessionKey;
mSessionMap[sessionKey].clientUid = clientUid;
mSessionMap[sessionKey].callingUid = callingUid;
- mSessionMap[sessionKey].lastProgress = 0;
- mSessionMap[sessionKey].pauseCount = 0;
mSessionMap[sessionKey].request = request;
mSessionMap[sessionKey].callback = callback;
setSessionState_l(&mSessionMap[sessionKey], Session::NOT_STARTED);
@@ -690,7 +793,7 @@
mTranscoder->stop(clientId, sessionId, true /*abandon*/);
// Clear the last ref count before we create new transcoder.
mTranscoder = nullptr;
- mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
+ mTranscoder = mTranscoderFactory(shared_from_this());
}
{