transcoding: drop transcoding request for bad actors
Drop requests for a uid if it submits more than X back-to-back requests
that lasted longer than Y seconds.
bug: 177631807
test: new and existing unit tests; CTS MediaTranscodeManagerTest;
Manual testing Yelp app (with shortened thresholds)
Change-Id: I88287be62190ee5bc815dedb2a62c54ed3c41824
diff --git a/media/libmediatranscoding/TranscodingSessionController.cpp b/media/libmediatranscoding/TranscodingSessionController.cpp
index 54357bd..aeabe0f 100644
--- a/media/libmediatranscoding/TranscodingSessionController.cpp
+++ b/media/libmediatranscoding/TranscodingSessionController.cpp
@@ -87,15 +87,12 @@
// Whether watchdog is aborted and the monitoring thread should exit.
bool mAbort GUARDED_BY(mLock);
// When watchdog is active, the next timeout time point.
- std::chrono::system_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
+ std::chrono::steady_clock::time_point mNextTimeoutTime GUARDED_BY(mLock);
// When watchdog is active, the session being watched.
SessionKeyType mSessionToWatch GUARDED_BY(mLock);
std::thread mThread;
};
-static constexpr int64_t kWatchdogTimeoutUs = 3000000LL;
-static constexpr int64_t kTranscoderHeartBeatIntervalUs = 1000000LL;
-
TranscodingSessionController::Watchdog::Watchdog(TranscodingSessionController* owner,
int64_t timeoutUs)
: mOwner(owner),
@@ -159,7 +156,7 @@
// updateTimer_l() is only called with lock held.
void TranscodingSessionController::Watchdog::updateTimer_l() NO_THREAD_SAFETY_ANALYSIS {
std::chrono::microseconds timeout(mTimeoutUs);
- mNextTimeoutTime = std::chrono::system_clock::now() + timeout;
+ mNextTimeoutTime = std::chrono::steady_clock::now() + timeout;
}
// Unfortunately std::unique_lock is incompatible with -Wthread-safety.
@@ -188,12 +185,84 @@
}
}
///////////////////////////////////////////////////////////////////////////////
+struct TranscodingSessionController::Pacer {
+ Pacer(const ControllerConfig& config)
+ : mBurstThresholdMs(config.pacerBurstThresholdMs),
+ mBurstCountQuota(config.pacerBurstCountQuota),
+ mBurstTimeQuotaSec(config.pacerBurstTimeQuotaSeconds) {}
+
+ ~Pacer() = default;
+
+ void onSessionCompleted(uid_t uid, std::chrono::microseconds runningTime);
+ bool onSessionStarted(uid_t uid);
+
+private:
+ // Threshold of time between finish/start below which a back-to-back start is counted.
+ int32_t mBurstThresholdMs;
+ // Maximum allowed back-to-back start count.
+ int32_t mBurstCountQuota;
+ // Maximum allowed back-to-back running time.
+ int32_t mBurstTimeQuotaSec;
+
+ struct UidHistoryEntry {
+ std::chrono::steady_clock::time_point lastCompletedTime;
+ int32_t burstCount = 0;
+ std::chrono::steady_clock::duration burstDuration{0};
+ };
+ std::map<uid_t, UidHistoryEntry> mUidHistoryMap;
+};
+
+void TranscodingSessionController::Pacer::onSessionCompleted(
+ uid_t uid, std::chrono::microseconds runningTime) {
+ if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) {
+ mUidHistoryMap.emplace(uid, UidHistoryEntry{});
+ }
+ mUidHistoryMap[uid].lastCompletedTime = std::chrono::steady_clock::now();
+ mUidHistoryMap[uid].burstCount++;
+ mUidHistoryMap[uid].burstDuration += runningTime;
+}
+
+bool TranscodingSessionController::Pacer::onSessionStarted(uid_t uid) {
+ // If uid doesn't exist, this uid has no completed sessions. Skip.
+ if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) {
+ return true;
+ }
+
+ // TODO: if Thermal throttling or resoure lost happened to occurr between this start
+ // and the previous completion, we should deduct the paused time from the elapsed time.
+ // (Individual session's pause time, on the other hand, doesn't need to be deducted
+ // because it doesn't affect the gap between last completion and the start.
+ auto timeSinceLastComplete =
+ std::chrono::steady_clock::now() - mUidHistoryMap[uid].lastCompletedTime;
+ if (mUidHistoryMap[uid].burstCount >= mBurstCountQuota &&
+ mUidHistoryMap[uid].burstDuration >= std::chrono::seconds(mBurstTimeQuotaSec)) {
+ ALOGW("Pacer: uid %d: over quota, burst count %d, time %lldms", uid,
+ mUidHistoryMap[uid].burstCount, (long long)mUidHistoryMap[uid].burstDuration.count());
+ return false;
+ }
+
+ // If not over quota, allow the session, and reset as long as this is not too close
+ // to previous completion.
+ if (timeSinceLastComplete > std::chrono::milliseconds(mBurstThresholdMs)) {
+ ALOGV("Pacer: uid %d: reset quota", uid);
+ mUidHistoryMap[uid].burstCount = 0;
+ mUidHistoryMap[uid].burstDuration = std::chrono::milliseconds(0);
+ } else {
+ ALOGV("Pacer: uid %d: burst count %d, time %lldms", uid, mUidHistoryMap[uid].burstCount,
+ (long long)mUidHistoryMap[uid].burstDuration.count());
+ }
+
+ return true;
+}
+
+///////////////////////////////////////////////////////////////////////////////
TranscodingSessionController::TranscodingSessionController(
const TranscoderFactoryType& transcoderFactory,
const std::shared_ptr<UidPolicyInterface>& uidPolicy,
const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
- const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy)
+ const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy,
+ const ControllerConfig* config)
: mTranscoderFactory(transcoderFactory),
mUidPolicy(uidPolicy),
mResourcePolicy(resourcePolicy),
@@ -206,6 +275,13 @@
mSessionQueues.emplace(OFFLINE_UID, SessionQueueType());
mUidPackageNames[OFFLINE_UID] = "(offline)";
mThermalThrottling = thermalPolicy->getThrottlingStatus();
+ if (config != nullptr) {
+ mConfig = *config;
+ }
+ mPacer.reset(new Pacer(mConfig));
+ ALOGD("@@@ watchdog %lld, burst count %d, burst time %d, burst threshold %d",
+ (long long)mConfig.watchdogTimeoutUs, mConfig.pacerBurstCountQuota,
+ mConfig.pacerBurstTimeQuotaSeconds, mConfig.pacerBurstThresholdMs);
}
TranscodingSessionController::~TranscodingSessionController() {}
@@ -280,10 +356,21 @@
write(fd, result.string(), result.size());
}
+/*
+ * Returns nullptr if there is no session, or we're paused globally (due to resource lost,
+ * thermal throttling, etc.). Otherwise, return the session that should be run next.
+ */
TranscodingSessionController::Session* TranscodingSessionController::getTopSession_l() {
if (mSessionMap.empty()) {
return nullptr;
}
+
+ // Return nullptr if we're paused globally due to resource lost or thermal throttling.
+ if (((mResourcePolicy != nullptr && mResourceLost) ||
+ (mThermalPolicy != nullptr && mThermalThrottling))) {
+ return nullptr;
+ }
+
uid_t topUid = *mUidSortedList.begin();
SessionKeyType topSessionKey = *mSessionQueues[topUid].begin();
return &mSessionMap[topSessionKey];
@@ -313,9 +400,10 @@
if (state == newState) {
return;
}
- auto nowTime = std::chrono::system_clock::now();
+ auto nowTime = std::chrono::steady_clock::now();
if (state != INVALID) {
- std::chrono::microseconds elapsedTime = (nowTime - stateEnterTime);
+ std::chrono::microseconds elapsedTime =
+ std::chrono::duration_cast<std::chrono::microseconds>(nowTime - stateEnterTime);
switch (state) {
case PAUSED:
pausedTime = pausedTime + elapsedTime;
@@ -338,49 +426,60 @@
}
void TranscodingSessionController::updateCurrentSession_l() {
- Session* topSession = getTopSession_l();
Session* curSession = mCurrentSession;
- ALOGV("updateCurrentSession: topSession is %s, curSession is %s",
- topSession == nullptr ? "null" : sessionToString(topSession->key).c_str(),
- curSession == nullptr ? "null" : sessionToString(curSession->key).c_str());
+ Session* topSession = getTopSession_l();
- if (topSession == nullptr) {
- mCurrentSession = nullptr;
- return;
+ // Delayed init of transcoder and watchdog.
+ if (mTranscoder == nullptr) {
+ mTranscoder = mTranscoderFactory(shared_from_this());
+ mWatchdog = std::make_shared<Watchdog>(this, mConfig.watchdogTimeoutUs);
}
- bool shouldBeRunning = !((mResourcePolicy != nullptr && mResourceLost) ||
- (mThermalPolicy != nullptr && mThermalThrottling));
- // If we found a topSession that should be run, and it's not already running,
- // take some actions to ensure it's running.
- if (topSession != curSession ||
- (shouldBeRunning ^ (topSession->getState() == Session::RUNNING))) {
- if (mTranscoder == nullptr) {
- mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
- mWatchdog = std::make_shared<Watchdog>(this, kWatchdogTimeoutUs);
- }
+ // If we found a different top session, or the top session's running state is not
+ // correct. Take some actions to ensure it's correct.
+ while ((topSession = getTopSession_l()) != curSession ||
+ (topSession != nullptr && !topSession->isRunning())) {
+ ALOGV("updateCurrentSession_l: topSession is %s, curSession is %s",
+ topSession == nullptr ? "null" : sessionToString(topSession->key).c_str(),
+ curSession == nullptr ? "null" : sessionToString(curSession->key).c_str());
- // If current session is running, pause it first. Note this is true for either
- // cases: 1) If top session is changing, or 2) if top session is not changing but
- // the topSession's state is changing.
+ // If current session is running, pause it first. Note this is needed for either
+ // cases: 1) Top session is changing to another session, or 2) Top session is
+ // changing to null (which means we should be globally paused).
if (curSession != nullptr && curSession->getState() == Session::RUNNING) {
mTranscoder->pause(curSession->key.first, curSession->key.second);
setSessionState_l(curSession, Session::PAUSED);
}
- // If we are not experiencing resource loss nor thermal throttling, we can start
- // or resume the topSession now.
- if (shouldBeRunning) {
- if (topSession->getState() == Session::NOT_STARTED) {
- mTranscoder->start(topSession->key.first, topSession->key.second,
- topSession->request, topSession->callingUid,
- topSession->callback.lock());
- } else if (topSession->getState() == Session::PAUSED) {
- mTranscoder->resume(topSession->key.first, topSession->key.second,
- topSession->request, topSession->callingUid,
- topSession->callback.lock());
+
+ if (topSession == nullptr) {
+ // Nothing more to run (either no session or globally paused).
+ break;
+ }
+
+ // Otherwise, ensure topSession is running.
+ if (topSession->getState() == Session::NOT_STARTED) {
+ if (!mPacer->onSessionStarted(topSession->clientUid)) {
+ // Unfortunately this uid is out of quota for new sessions.
+ // Drop this sesion and try another one.
+ {
+ auto clientCallback = mSessionMap[topSession->key].callback.lock();
+ if (clientCallback != nullptr) {
+ clientCallback->onTranscodingFailed(
+ topSession->key.second, TranscodingErrorCode::kDroppedByService);
+ }
+ }
+ removeSession_l(topSession->key, Session::DROPPED_BY_PACER);
+ continue;
}
+ mTranscoder->start(topSession->key.first, topSession->key.second, topSession->request,
+ topSession->callingUid, topSession->callback.lock());
+ setSessionState_l(topSession, Session::RUNNING);
+ } else if (topSession->getState() == Session::PAUSED) {
+ mTranscoder->resume(topSession->key.first, topSession->key.second, topSession->request,
+ topSession->callingUid, topSession->callback.lock());
setSessionState_l(topSession, Session::RUNNING);
}
+ break;
}
mCurrentSession = topSession;
}
@@ -421,6 +520,12 @@
}
setSessionState_l(&mSessionMap[sessionKey], finalState);
+
+ if (finalState == Session::FINISHED || finalState == Session::ERROR) {
+ mPacer->onSessionCompleted(mSessionMap[sessionKey].clientUid,
+ mSessionMap[sessionKey].runningTime);
+ }
+
mSessionHistory.push_back(mSessionMap[sessionKey]);
if (mSessionHistory.size() > kSessionHistoryMax) {
mSessionHistory.erase(mSessionHistory.begin());
@@ -514,8 +619,6 @@
mSessionMap[sessionKey].key = sessionKey;
mSessionMap[sessionKey].clientUid = clientUid;
mSessionMap[sessionKey].callingUid = callingUid;
- mSessionMap[sessionKey].lastProgress = 0;
- mSessionMap[sessionKey].pauseCount = 0;
mSessionMap[sessionKey].request = request;
mSessionMap[sessionKey].callback = callback;
setSessionState_l(&mSessionMap[sessionKey], Session::NOT_STARTED);
@@ -690,7 +793,7 @@
mTranscoder->stop(clientId, sessionId, true /*abandon*/);
// Clear the last ref count before we create new transcoder.
mTranscoder = nullptr;
- mTranscoder = mTranscoderFactory(shared_from_this(), kTranscoderHeartBeatIntervalUs);
+ mTranscoder = mTranscoderFactory(shared_from_this());
}
{
diff --git a/media/libmediatranscoding/aidl/android/media/TranscodingErrorCode.aidl b/media/libmediatranscoding/aidl/android/media/TranscodingErrorCode.aidl
index 23072ff..5349fe1 100644
--- a/media/libmediatranscoding/aidl/android/media/TranscodingErrorCode.aidl
+++ b/media/libmediatranscoding/aidl/android/media/TranscodingErrorCode.aidl
@@ -23,13 +23,19 @@
*/
@Backing(type = "int")
enum TranscodingErrorCode {
+ // Errors exposed to client side.
kNoError = 0,
- kUnknown = 1,
- kMalformed = 2,
- kUnsupported = 3,
- kInvalidParameter = 4,
- kInvalidOperation = 5,
- kErrorIO = 6,
- kInsufficientResources = 7,
- kWatchdogTimeout = 8,
+ kDroppedByService = 1,
+ kServiceUnavailable = 2,
+
+ // Other private errors.
+ kPrivateErrorFirst = 1000,
+ kUnknown = kPrivateErrorFirst + 0,
+ kMalformed = kPrivateErrorFirst + 1,
+ kUnsupported = kPrivateErrorFirst + 2,
+ kInvalidParameter = kPrivateErrorFirst + 3,
+ kInvalidOperation = kPrivateErrorFirst + 4,
+ kErrorIO = kPrivateErrorFirst + 5,
+ kInsufficientResources = kPrivateErrorFirst + 6,
+ kWatchdogTimeout = kPrivateErrorFirst + 7,
}
\ No newline at end of file
diff --git a/media/libmediatranscoding/include/media/TranscodingSessionController.h b/media/libmediatranscoding/include/media/TranscodingSessionController.h
index cfa2f13..b2d6f0a 100644
--- a/media/libmediatranscoding/include/media/TranscodingSessionController.h
+++ b/media/libmediatranscoding/include/media/TranscodingSessionController.h
@@ -93,7 +93,18 @@
using SessionKeyType = std::pair<ClientIdType, SessionIdType>;
using SessionQueueType = std::list<SessionKeyType>;
using TranscoderFactoryType = std::function<std::shared_ptr<TranscoderInterface>(
- const std::shared_ptr<TranscoderCallbackInterface>&, int64_t)>;
+ const std::shared_ptr<TranscoderCallbackInterface>&)>;
+
+ struct ControllerConfig {
+ // Watchdog timeout.
+ int64_t watchdogTimeoutUs = 3000000LL;
+ // Threshold of time between finish/start below which a back-to-back start is counted.
+ int32_t pacerBurstThresholdMs = 1000;
+ // Maximum allowed back-to-back start count.
+ int32_t pacerBurstCountQuota = 10;
+ // Maximum allowed back-to-back running time.
+ int32_t pacerBurstTimeQuotaSeconds = 180; // 3-min
+ };
struct Session {
enum State {
@@ -106,16 +117,17 @@
FINISHED,
CANCELED,
ERROR,
+ DROPPED_BY_PACER,
};
SessionKeyType key;
uid_t clientUid;
uid_t callingUid;
- int32_t lastProgress;
- int32_t pauseCount;
- std::chrono::time_point<std::chrono::system_clock> stateEnterTime;
- std::chrono::microseconds waitingTime;
- std::chrono::microseconds runningTime;
- std::chrono::microseconds pausedTime;
+ int32_t lastProgress = 0;
+ int32_t pauseCount = 0;
+ std::chrono::time_point<std::chrono::steady_clock> stateEnterTime;
+ std::chrono::microseconds waitingTime{0};
+ std::chrono::microseconds runningTime{0};
+ std::chrono::microseconds pausedTime{0};
TranscodingRequest request;
std::weak_ptr<ITranscodingClientCallback> callback;
@@ -123,12 +135,16 @@
// Must use setState to change state.
void setState(Session::State state);
State getState() const { return state; }
+ bool isRunning() { return state == RUNNING; }
private:
State state = INVALID;
};
struct Watchdog;
+ struct Pacer;
+
+ ControllerConfig mConfig;
// TODO(chz): call transcoder without global lock.
// Use mLock for all entrypoints for now.
@@ -156,12 +172,14 @@
bool mThermalThrottling;
std::list<Session> mSessionHistory;
std::shared_ptr<Watchdog> mWatchdog;
+ std::shared_ptr<Pacer> mPacer;
// Only allow MediaTranscodingService and unit tests to instantiate.
TranscodingSessionController(const TranscoderFactoryType& transcoderFactory,
const std::shared_ptr<UidPolicyInterface>& uidPolicy,
const std::shared_ptr<ResourcePolicyInterface>& resourcePolicy,
- const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy);
+ const std::shared_ptr<ThermalPolicyInterface>& thermalPolicy,
+ const ControllerConfig* config = nullptr);
void dumpSession_l(const Session& session, String8& result, bool closedSession = false);
Session* getTopSession_l();
diff --git a/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp b/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
index 3cd9112..560d1fe 100644
--- a/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
+++ b/media/libmediatranscoding/tests/TranscodingSessionController_tests.cpp
@@ -118,7 +118,7 @@
class TestTranscoder : public TranscoderInterface {
public:
- TestTranscoder() : mLastError(TranscodingErrorCode::kUnknown), mGeneration(0) {}
+ TestTranscoder() : mGeneration(0) {}
virtual ~TestTranscoder() {}
// TranscoderInterface
@@ -152,19 +152,6 @@
mGeneration++;
}
- TranscodingErrorCode getLastError() {
- std::scoped_lock lock{mLock};
- // Clear last error.
- TranscodingErrorCode result = mLastError;
- mLastError = TranscodingErrorCode::kNoError;
- return result;
- }
-
- int32_t getGeneration() {
- std::scoped_lock lock{mLock};
- return mGeneration;
- }
-
struct Event {
enum { NoEvent, Start, Pause, Resume, Stop, Finished, Failed, Abandon } type;
ClientIdType clientId;
@@ -195,7 +182,7 @@
// Error is sticky, non-error event will not erase it, only getLastError()
// clears last error.
if (err != TranscodingErrorCode::kNoError) {
- mLastError = err;
+ mLastErrorQueue.push_back(err);
}
mCondition.notify_one();
}
@@ -218,12 +205,27 @@
return mPoppedEvent;
}
+ TranscodingErrorCode getLastError() {
+ std::scoped_lock lock{mLock};
+ if (mLastErrorQueue.empty()) {
+ return TranscodingErrorCode::kNoError;
+ }
+ TranscodingErrorCode err = mLastErrorQueue.front();
+ mLastErrorQueue.pop_front();
+ return err;
+ }
+
+ int32_t getGeneration() {
+ std::scoped_lock lock{mLock};
+ return mGeneration;
+ }
+
private:
std::mutex mLock;
std::condition_variable mCondition;
Event mPoppedEvent;
std::list<Event> mEventQueue;
- TranscodingErrorCode mLastError;
+ std::list<TranscodingErrorCode> mLastErrorQueue;
int32_t mGeneration;
};
@@ -291,16 +293,21 @@
mUidPolicy.reset(new TestUidPolicy());
mResourcePolicy.reset(new TestResourcePolicy());
mThermalPolicy.reset(new TestThermalPolicy());
+ // Overrid default burst params with shorter values for testing.
+ TranscodingSessionController::ControllerConfig config = {
+ .pacerBurstThresholdMs = 500,
+ .pacerBurstCountQuota = 10,
+ .pacerBurstTimeQuotaSeconds = 3,
+ };
mController.reset(new TranscodingSessionController(
- [this](const std::shared_ptr<TranscoderCallbackInterface>& /*cb*/,
- int64_t /*heartBeatIntervalUs*/) {
+ [this](const std::shared_ptr<TranscoderCallbackInterface>& /*cb*/) {
// Here we require that the SessionController clears out all its refcounts of
// the transcoder object when it calls create.
EXPECT_EQ(mTranscoder.use_count(), 1);
mTranscoder->onCreated();
return mTranscoder;
},
- mUidPolicy, mResourcePolicy, mThermalPolicy));
+ mUidPolicy, mResourcePolicy, mThermalPolicy, &config));
mUidPolicy->setCallback(mController);
// Set priority only, ignore other fields for now.
@@ -328,6 +335,40 @@
EXPECT_EQ(mTranscoder.use_count(), 2);
}
+ void testPacerHelper(int numSubmits, int sessionDurationMs, int expectedSuccess,
+ bool pauseLastSuccessSession = false) {
+ for (int i = 0; i < numSubmits; i++) {
+ mController->submit(CLIENT(0), SESSION(i), UID(0), UID(0),
+ mRealtimeRequest, mClientCallback0);
+ }
+ for (int i = 0; i < expectedSuccess; i++) {
+ EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(i)));
+ if ((i == expectedSuccess - 1) && pauseLastSuccessSession) {
+ // Insert a pause of 3 sec to the last success running session
+ mController->onThrottlingStarted();
+ EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), SESSION(i)));
+ sleep(3);
+ mController->onThrottlingStopped();
+ EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), SESSION(i)));
+ }
+ usleep(sessionDurationMs * 1000);
+ // Test half of Finish and half of Error, both should be counted as burst runs.
+ if (i & 1) {
+ mController->onFinish(CLIENT(0), SESSION(i));
+ EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(0), SESSION(i)));
+ } else {
+ mController->onError(CLIENT(0), SESSION(i), TranscodingErrorCode::kUnknown);
+ EXPECT_EQ(mTranscoder->popEvent(100000),
+ TestTranscoder::Failed(CLIENT(0), SESSION(i)));
+ EXPECT_EQ(mTranscoder->getLastError(), TranscodingErrorCode::kUnknown);
+ }
+ }
+ for (int i = expectedSuccess; i < numSubmits; i++) {
+ EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Failed(CLIENT(0), SESSION(i)));
+ EXPECT_EQ(mTranscoder->getLastError(), TranscodingErrorCode::kDroppedByService);
+ }
+ }
+
std::shared_ptr<TestTranscoder> mTranscoder;
std::shared_ptr<TestUidPolicy> mUidPolicy;
std::shared_ptr<TestResourcePolicy> mResourcePolicy;
@@ -523,15 +564,18 @@
// Should still be propagated to client, but shouldn't trigger any new start.
mController->onError(CLIENT(0), SESSION(1), TranscodingErrorCode::kUnknown);
EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Failed(CLIENT(0), SESSION(1)));
+ EXPECT_EQ(mTranscoder->getLastError(), TranscodingErrorCode::kUnknown);
// Fail running real-time session, should start next real-time session in queue.
mController->onError(CLIENT(1), SESSION(0), TranscodingErrorCode::kUnknown);
EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Failed(CLIENT(1), SESSION(0)));
+ EXPECT_EQ(mTranscoder->getLastError(), TranscodingErrorCode::kUnknown);
EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(2)));
// Fail running real-time session, should resume next session (offline session) in queue.
mController->onError(CLIENT(0), SESSION(2), TranscodingErrorCode::kUnknown);
EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Failed(CLIENT(0), SESSION(2)));
+ EXPECT_EQ(mTranscoder->getLastError(), TranscodingErrorCode::kUnknown);
EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), SESSION(0)));
// Fail running offline session, and test error code propagation.
@@ -854,7 +898,7 @@
EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(2), SESSION(0)));
}
-TEST_F(TranscodingSessionControllerTest, TestTranscoderWatchdogTimeout) {
+TEST_F(TranscodingSessionControllerTest, TestTranscoderWatchdogNoHeartbeat) {
ALOGD("TestTranscoderWatchdogTimeout");
// Submit session to CLIENT(0) in UID(0).
@@ -862,18 +906,24 @@
mController->submit(CLIENT(0), SESSION(0), UID(0), UID(0), mRealtimeRequest, mClientCallback0);
EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(0)));
- int32_t expectedGen = 2;
// Test 1: If not sending keep-alive at all, timeout after 3 seconds.
- expectTimeout(CLIENT(0), SESSION(0), expectedGen++);
+ expectTimeout(CLIENT(0), SESSION(0), 2);
+}
+TEST_F(TranscodingSessionControllerTest, TestTranscoderWatchdogHeartbeat) {
// Test 2: No timeout as long as keep-alive coming; timeout after keep-alive stops.
mController->submit(CLIENT(0), SESSION(1), UID(0), UID(0), mRealtimeRequest, mClientCallback0);
EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), SESSION(1)));
+
for (int i = 0; i < 5; i++) {
EXPECT_EQ(mTranscoder->popEvent(1000000), TestTranscoder::NoEvent);
mController->onHeartBeat(CLIENT(0), SESSION(1));
}
- expectTimeout(CLIENT(0), SESSION(1), expectedGen++);
+ expectTimeout(CLIENT(0), SESSION(1), 2);
+}
+
+TEST_F(TranscodingSessionControllerTest, TestTranscoderWatchdogDuringPause) {
+ int expectedGen = 2;
// Test 3a: No timeout for paused session even if no keep-alive is sent.
mController->submit(CLIENT(0), SESSION(2), UID(0), UID(0), mOfflineRequest, mClientCallback0);
@@ -902,4 +952,25 @@
expectTimeout(CLIENT(0), SESSION(3), expectedGen++);
}
+TEST_F(TranscodingSessionControllerTest, TestTranscoderPacerOverCountOnly) {
+ ALOGD("TestTranscoderPacerOverCountOnly");
+ testPacerHelper(12 /*numSubmits*/, 100 /*sessionDurationMs*/, 12 /*expectedSuccess*/);
+}
+
+TEST_F(TranscodingSessionControllerTest, TestTranscoderPacerOverTimeOnly) {
+ ALOGD("TestTranscoderPacerOverTimeOnly");
+ testPacerHelper(5 /*numSubmits*/, 1000 /*sessionDurationMs*/, 5 /*expectedSuccess*/);
+}
+
+TEST_F(TranscodingSessionControllerTest, TestTranscoderPacerOverQuota) {
+ ALOGD("TestTranscoderPacerOverQuota");
+ testPacerHelper(12 /*numSubmits*/, 400 /*sessionDurationMs*/, 10 /*expectedSuccess*/);
+}
+
+TEST_F(TranscodingSessionControllerTest, TestTranscoderPacerWithPause) {
+ ALOGD("TestTranscoderPacerDuringPause");
+ testPacerHelper(12 /*numSubmits*/, 400 /*sessionDurationMs*/, 10 /*expectedSuccess*/,
+ true /*pauseLastSuccessSession*/);
+}
+
} // namespace android
diff --git a/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp b/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
index 0efe85d..88c1c42 100644
--- a/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
+++ b/media/libmediatranscoding/transcoder/MediaSampleWriter.cpp
@@ -237,8 +237,8 @@
}
std::chrono::microseconds updateInterval(mHeartBeatIntervalUs);
- std::chrono::system_clock::time_point nextUpdateTime =
- std::chrono::system_clock::now() + updateInterval;
+ std::chrono::steady_clock::time_point nextUpdateTime =
+ std::chrono::steady_clock::now() + updateInterval;
while (true) {
if (trackEosCount >= mTracks.size()) {
diff --git a/services/mediatranscoding/MediaTranscodingService.cpp b/services/mediatranscoding/MediaTranscodingService.cpp
index 4433f33..8b64134 100644
--- a/services/mediatranscoding/MediaTranscodingService.cpp
+++ b/services/mediatranscoding/MediaTranscodingService.cpp
@@ -42,22 +42,49 @@
errorCode, \
String8::format("%s:%d: " errorString, __FUNCTION__, __LINE__, ##__VA_ARGS__))
-MediaTranscodingService::MediaTranscodingService(bool simulated)
+static constexpr int64_t kTranscoderHeartBeatIntervalUs = 1000000LL;
+
+MediaTranscodingService::MediaTranscodingService()
: mUidPolicy(new TranscodingUidPolicy()),
mResourcePolicy(new TranscodingResourcePolicy()),
mThermalPolicy(new TranscodingThermalPolicy()),
mLogger(new TranscodingLogger()) {
ALOGV("MediaTranscodingService is created");
- mSessionController.reset(new TranscodingSessionController(
- [simulated, logger = mLogger](
- const std::shared_ptr<TranscoderCallbackInterface>& cb,
- int64_t heartBeatUs) -> std::shared_ptr<TranscoderInterface> {
- if (simulated) {
- return std::make_shared<SimulatedTranscoder>(cb, heartBeatUs);
- }
- return std::make_shared<TranscoderWrapper>(cb, logger, heartBeatUs);
- },
- mUidPolicy, mResourcePolicy, mThermalPolicy));
+ bool simulated = property_get_bool("debug.transcoding.simulated_transcoder", false);
+ if (simulated) {
+ // Overrid default config params with shorter values for testing.
+ TranscodingSessionController::ControllerConfig config = {
+ .pacerBurstThresholdMs = 500,
+ .pacerBurstCountQuota = 10,
+ .pacerBurstTimeQuotaSeconds = 3,
+ };
+ mSessionController.reset(new TranscodingSessionController(
+ [](const std::shared_ptr<TranscoderCallbackInterface>& cb)
+ -> std::shared_ptr<TranscoderInterface> {
+ return std::make_shared<SimulatedTranscoder>(cb);
+ },
+ mUidPolicy, mResourcePolicy, mThermalPolicy, &config));
+ } else {
+ int32_t overrideBurstCountQuota =
+ property_get_int32("persist.transcoding.burst_count_quota", -1);
+ int32_t pacerBurstTimeQuotaSeconds =
+ property_get_int32("persist.transcoding.burst_time_quota_seconds", -1);
+ // Override default config params with properties if present.
+ TranscodingSessionController::ControllerConfig config;
+ if (overrideBurstCountQuota > 0) {
+ config.pacerBurstCountQuota = overrideBurstCountQuota;
+ }
+ if (pacerBurstTimeQuotaSeconds > 0) {
+ config.pacerBurstTimeQuotaSeconds = pacerBurstTimeQuotaSeconds;
+ }
+ mSessionController.reset(new TranscodingSessionController(
+ [logger = mLogger](const std::shared_ptr<TranscoderCallbackInterface>& cb)
+ -> std::shared_ptr<TranscoderInterface> {
+ return std::make_shared<TranscoderWrapper>(cb, logger,
+ kTranscoderHeartBeatIntervalUs);
+ },
+ mUidPolicy, mResourcePolicy, mThermalPolicy, &config));
+ }
mClientManager.reset(new TranscodingClientManager(mSessionController));
mUidPolicy->setCallback(mSessionController);
mResourcePolicy->setCallback(mSessionController);
@@ -103,8 +130,7 @@
//static
void MediaTranscodingService::instantiate() {
std::shared_ptr<MediaTranscodingService> service =
- ::ndk::SharedRefBase::make<MediaTranscodingService>(
- property_get_bool("debug.transcoding.simulated_transcoder", false));
+ ::ndk::SharedRefBase::make<MediaTranscodingService>();
binder_status_t status =
AServiceManager_addService(service->asBinder().get(), getServiceName());
if (status != STATUS_OK) {
diff --git a/services/mediatranscoding/MediaTranscodingService.h b/services/mediatranscoding/MediaTranscodingService.h
index 9384641..12be131 100644
--- a/services/mediatranscoding/MediaTranscodingService.h
+++ b/services/mediatranscoding/MediaTranscodingService.h
@@ -40,7 +40,7 @@
static constexpr int32_t kInvalidSessionId = -1;
static constexpr int32_t kInvalidClientId = -1;
- MediaTranscodingService(bool simulated);
+ MediaTranscodingService();
virtual ~MediaTranscodingService();
static void instantiate();
diff --git a/services/mediatranscoding/SimulatedTranscoder.cpp b/services/mediatranscoding/SimulatedTranscoder.cpp
index 0151b3d..e80dbc5 100644
--- a/services/mediatranscoding/SimulatedTranscoder.cpp
+++ b/services/mediatranscoding/SimulatedTranscoder.cpp
@@ -47,8 +47,7 @@
return "(unknown)";
}
-SimulatedTranscoder::SimulatedTranscoder(const std::shared_ptr<TranscoderCallbackInterface>& cb,
- int64_t heartBeatUs __unused)
+SimulatedTranscoder::SimulatedTranscoder(const std::shared_ptr<TranscoderCallbackInterface>& cb)
: mCallback(cb), mLooperReady(false) {
ALOGV("SimulatedTranscoder CTOR: %p", this);
}
@@ -132,7 +131,7 @@
void SimulatedTranscoder::threadLoop() {
bool running = false;
- std::chrono::system_clock::time_point lastRunningTime;
+ std::chrono::steady_clock::time_point lastRunningTime;
Event lastRunningEvent;
std::unique_lock<std::mutex> lock(mLock);
@@ -164,8 +163,9 @@
// Advance last running time and remaining time. This is needed to guard
// against bad events (which will be ignored) or spurious wakeups, in that
// case we don't want to wait for the same time again.
- auto now = std::chrono::system_clock::now();
- mRemainingTimeMap[key] -= (now - lastRunningTime);
+ auto now = std::chrono::steady_clock::now();
+ mRemainingTimeMap[key] -= std::chrono::duration_cast<std::chrono::microseconds>(
+ now - lastRunningTime);
lastRunningTime = now;
}
}
@@ -184,7 +184,7 @@
SessionKeyType key = std::make_pair(event.clientId, event.sessionId);
if (!running && (event.type == Event::Start || event.type == Event::Resume)) {
running = true;
- lastRunningTime = std::chrono::system_clock::now();
+ lastRunningTime = std::chrono::steady_clock::now();
lastRunningEvent = event;
ALOGV("%s: session {%lld, %d}: remaining time: %lld", __FUNCTION__,
(long long)event.clientId, event.sessionId,
@@ -195,7 +195,8 @@
if (event.type == Event::Stop) {
mRemainingTimeMap.erase(key);
} else {
- mRemainingTimeMap[key] -= (std::chrono::system_clock::now() - lastRunningTime);
+ mRemainingTimeMap[key] -= std::chrono::duration_cast<std::chrono::microseconds>(
+ std::chrono::steady_clock::now() - lastRunningTime);
}
} else {
ALOGW("%s: discarding bad event: session {%lld, %d}: %s", __FUNCTION__,
diff --git a/services/mediatranscoding/SimulatedTranscoder.h b/services/mediatranscoding/SimulatedTranscoder.h
index 6990576..58e2e30 100644
--- a/services/mediatranscoding/SimulatedTranscoder.h
+++ b/services/mediatranscoding/SimulatedTranscoder.h
@@ -49,8 +49,7 @@
static constexpr int64_t kSessionDurationUs = 1000000;
- SimulatedTranscoder(const std::shared_ptr<TranscoderCallbackInterface>& cb,
- int64_t heartBeatUs);
+ SimulatedTranscoder(const std::shared_ptr<TranscoderCallbackInterface>& cb);
~SimulatedTranscoder();
// TranscoderInterface
diff --git a/services/mediatranscoding/tests/MediaTranscodingServiceTestHelper.h b/services/mediatranscoding/tests/MediaTranscodingServiceTestHelper.h
index 5256a3f..3f7d8d6 100644
--- a/services/mediatranscoding/tests/MediaTranscodingServiceTestHelper.h
+++ b/services/mediatranscoding/tests/MediaTranscodingServiceTestHelper.h
@@ -208,7 +208,9 @@
std::unique_lock lock(mLock);
mEventQueue.push_back(event);
- mLastErr = err;
+ if (err != TranscodingErrorCode::kNoError) {
+ mLastErrQueue.push_back(err);
+ }
mCondition.notify_one();
}
@@ -226,7 +228,12 @@
TranscodingErrorCode getLastError() {
std::unique_lock lock(mLock);
- return mLastErr;
+ if (mLastErrQueue.empty()) {
+ return TranscodingErrorCode::kNoError;
+ }
+ TranscodingErrorCode err = mLastErrQueue.front();
+ mLastErrQueue.pop_front();
+ return err;
}
private:
@@ -234,7 +241,7 @@
std::condition_variable mCondition;
Event mPoppedEvent;
std::list<Event> mEventQueue;
- TranscodingErrorCode mLastErr;
+ std::list<TranscodingErrorCode> mLastErrQueue;
int mUpdateCount = 0;
int mLastProgress = -1;
};
diff --git a/services/mediatranscoding/tests/mediatranscodingservice_simulated_tests.cpp b/services/mediatranscoding/tests/mediatranscodingservice_simulated_tests.cpp
index b8a6f76..c8994ac 100644
--- a/services/mediatranscoding/tests/mediatranscodingservice_simulated_tests.cpp
+++ b/services/mediatranscoding/tests/mediatranscodingservice_simulated_tests.cpp
@@ -54,6 +54,10 @@
constexpr int64_t kPaddingUs = 1000000;
constexpr int64_t kSessionWithPaddingUs = SimulatedTranscoder::kSessionDurationUs + kPaddingUs;
constexpr int64_t kWatchdogTimeoutUs = 3000000;
+// Pacer settings used for simulated tests. Listed here for reference.
+constexpr int32_t kSimulatedPacerBurstThresholdMs = 500;
+//constexpr int32_t kSimulatedPacerBurstCountQuota = 10;
+//constexpr int32_t kSimulatedPacerBurstTimeQuotaSec = 3;
constexpr const char* kClientOpPackageName = "TestClientPackage";
@@ -64,6 +68,25 @@
virtual ~MediaTranscodingServiceSimulatedTest() {
ALOGI("MediaTranscodingServiceResourceTest destroyed");
}
+
+ void testPacerHelper(int numSubmits, int sessionDurationMs, int expectedSuccess) {
+ // Idle to clear out burst history.
+ usleep(kSimulatedPacerBurstThresholdMs * 2 * 1000);
+ for (int i = 0; i < numSubmits; i++) {
+ EXPECT_TRUE(mClient3->submit(i, "test_source_file_0", "test_destination_file_0",
+ TranscodingSessionPriority::kNormal, -1 /*bitrateBps*/,
+ -1 /*overridePid*/, -1 /*overrideUid*/,
+ sessionDurationMs));
+ }
+ for (int i = 0; i < expectedSuccess; i++) {
+ EXPECT_EQ(mClient3->pop(kPaddingUs), EventTracker::Start(CLIENT(3), i));
+ EXPECT_EQ(mClient3->pop(kSessionWithPaddingUs), EventTracker::Finished(CLIENT(3), i));
+ }
+ for (int i = expectedSuccess; i < numSubmits; i++) {
+ EXPECT_EQ(mClient3->pop(kPaddingUs), EventTracker::Failed(CLIENT(3), i));
+ EXPECT_EQ(mClient3->getLastError(), TranscodingErrorCode::kDroppedByService);
+ }
+ }
};
TEST_F(MediaTranscodingServiceSimulatedTest, TestRegisterNullClient) {
@@ -414,5 +437,36 @@
ALOGD("TestTranscodingWatchdog finished.");
}
+TEST_F(MediaTranscodingServiceSimulatedTest, TestTranscodingPacerOverCountQuotaOnly) {
+ ALOGD("TestTranscodingPacerOverCountQuotaOnly starting...");
+
+ registerMultipleClients();
+ testPacerHelper(12 /*numSubmits*/, 100 /*sessionDurationMs*/, 12 /*expectedSuccess*/);
+ unregisterMultipleClients();
+
+ ALOGD("TestTranscodingPacerOverCountQuotaOnly finished.");
+}
+
+TEST_F(MediaTranscodingServiceSimulatedTest, TestTranscodingPacerOverTimeQuotaOnly) {
+ ALOGD("TestTranscodingPacerOverTimeQuotaOnly starting...");
+
+ registerMultipleClients();
+ testPacerHelper(5 /*numSubmits*/, 1000 /*sessionDurationMs*/, 5 /*expectedSuccess*/);
+ unregisterMultipleClients();
+
+ ALOGD("TestTranscodingPacerOverTimeQuotaOnly finished.");
+}
+
+TEST_F(MediaTranscodingServiceSimulatedTest, TestTranscodingPacerOverQuota) {
+ ALOGD("TestTranscodingPacerOverQuota starting...");
+
+ registerMultipleClients();
+ testPacerHelper(12 /*numSubmits*/, 400 /*sessionDurationMs*/, 10 /*expectedSuccess*/);
+ unregisterMultipleClients();
+
+ // Idle to clear out burst history. Since we expect it to actually fail, wait for cooldown.
+ ALOGD("TestTranscodingPacerOverQuota finished.");
+}
+
} // namespace media
} // namespace android