transcoding: handle multiple uids in service

Bug: 171398942
Test: unit tests added; CTS; manual test with app

Change-Id: I07f1399c8f62953084e06fc87a6bb4da27c51e68
diff --git a/media/libmediatranscoding/TranscodingSessionController.cpp b/media/libmediatranscoding/TranscodingSessionController.cpp
index aeabe0f..2518e70 100644
--- a/media/libmediatranscoding/TranscodingSessionController.cpp
+++ b/media/libmediatranscoding/TranscodingSessionController.cpp
@@ -193,8 +193,8 @@
 
     ~Pacer() = default;
 
-    void onSessionCompleted(uid_t uid, std::chrono::microseconds runningTime);
     bool onSessionStarted(uid_t uid);
+    void onSessionCompleted(uid_t uid, std::chrono::microseconds runningTime);
 
 private:
     // Threshold of time between finish/start below which a back-to-back start is counted.
@@ -205,26 +205,20 @@
     int32_t mBurstTimeQuotaSec;
 
     struct UidHistoryEntry {
-        std::chrono::steady_clock::time_point lastCompletedTime;
+        bool sessionActive = false;
         int32_t burstCount = 0;
         std::chrono::steady_clock::duration burstDuration{0};
+        std::chrono::steady_clock::time_point lastCompletedTime;
     };
     std::map<uid_t, UidHistoryEntry> mUidHistoryMap;
 };
 
-void TranscodingSessionController::Pacer::onSessionCompleted(
-        uid_t uid, std::chrono::microseconds runningTime) {
+bool TranscodingSessionController::Pacer::onSessionStarted(uid_t uid) {
+    // If uid doesn't exist, only insert the entry and mark session active. Skip quota checking.
     if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) {
         mUidHistoryMap.emplace(uid, UidHistoryEntry{});
-    }
-    mUidHistoryMap[uid].lastCompletedTime = std::chrono::steady_clock::now();
-    mUidHistoryMap[uid].burstCount++;
-    mUidHistoryMap[uid].burstDuration += runningTime;
-}
-
-bool TranscodingSessionController::Pacer::onSessionStarted(uid_t uid) {
-    // If uid doesn't exist, this uid has no completed sessions. Skip.
-    if (mUidHistoryMap.find(uid) == mUidHistoryMap.end()) {
+        mUidHistoryMap[uid].sessionActive = true;
+        ALOGV("Pacer::onSessionStarted: uid %d: new", uid);
         return true;
     }
 
@@ -236,25 +230,43 @@
             std::chrono::steady_clock::now() - mUidHistoryMap[uid].lastCompletedTime;
     if (mUidHistoryMap[uid].burstCount >= mBurstCountQuota &&
         mUidHistoryMap[uid].burstDuration >= std::chrono::seconds(mBurstTimeQuotaSec)) {
-        ALOGW("Pacer: uid %d: over quota, burst count %d, time %lldms", uid,
-              mUidHistoryMap[uid].burstCount, (long long)mUidHistoryMap[uid].burstDuration.count());
+        ALOGW("Pacer::onSessionStarted: uid %d: over quota, burst count %d, time %lldms", uid,
+              mUidHistoryMap[uid].burstCount,
+              (long long)mUidHistoryMap[uid].burstDuration.count() / 1000000);
         return false;
     }
 
     // If not over quota, allow the session, and reset as long as this is not too close
     // to previous completion.
     if (timeSinceLastComplete > std::chrono::milliseconds(mBurstThresholdMs)) {
-        ALOGV("Pacer: uid %d: reset quota", uid);
+        ALOGV("Pacer::onSessionStarted: uid %d: reset quota", uid);
         mUidHistoryMap[uid].burstCount = 0;
         mUidHistoryMap[uid].burstDuration = std::chrono::milliseconds(0);
     } else {
-        ALOGV("Pacer: uid %d: burst count %d, time %lldms", uid, mUidHistoryMap[uid].burstCount,
-              (long long)mUidHistoryMap[uid].burstDuration.count());
+        ALOGV("Pacer::onSessionStarted: uid %d: burst count %d, time %lldms", uid,
+              mUidHistoryMap[uid].burstCount,
+              (long long)mUidHistoryMap[uid].burstDuration.count() / 1000000);
     }
 
+    mUidHistoryMap[uid].sessionActive = true;
     return true;
 }
 
+void TranscodingSessionController::Pacer::onSessionCompleted(
+        uid_t uid, std::chrono::microseconds runningTime) {
+    // Skip quota update if this uid missed the start. (Could happen if the uid is added via
+    // addClientUid() after the session start.)
+    if (mUidHistoryMap.find(uid) == mUidHistoryMap.end() || !mUidHistoryMap[uid].sessionActive) {
+        ALOGV("Pacer::onSessionCompleted: uid %d: not started", uid);
+        return;
+    }
+    ALOGV("Pacer::onSessionCompleted: uid %d: runningTime %lld", uid, runningTime.count() / 1000);
+    mUidHistoryMap[uid].sessionActive = false;
+    mUidHistoryMap[uid].burstCount++;
+    mUidHistoryMap[uid].burstDuration += runningTime;
+    mUidHistoryMap[uid].lastCompletedTime = std::chrono::steady_clock::now();
+}
+
 ///////////////////////////////////////////////////////////////////////////////
 
 TranscodingSessionController::TranscodingSessionController(
@@ -372,6 +384,14 @@
     }
 
     uid_t topUid = *mUidSortedList.begin();
+    // If the current session is running, and it's in the topUid's queue, let it continue
+    // to run even if it's not the earliest in that uid's queue.
+    // For example, uid(B) is added to a session while it's pending in uid(A)'s queue, then
+    // B is brought to front which caused the session to run, then user switches back to A.
+    if (mCurrentSession != nullptr && mCurrentSession->getState() == Session::RUNNING &&
+        mCurrentSession->allClientUids.count(topUid) > 0) {
+        return mCurrentSession;
+    }
     SessionKeyType topSessionKey = *mSessionQueues[topUid].begin();
     return &mSessionMap[topSessionKey];
 }
@@ -427,7 +447,7 @@
 
 void TranscodingSessionController::updateCurrentSession_l() {
     Session* curSession = mCurrentSession;
-    Session* topSession = getTopSession_l();
+    Session* topSession = nullptr;
 
     // Delayed init of transcoder and watchdog.
     if (mTranscoder == nullptr) {
@@ -458,9 +478,18 @@
 
         // Otherwise, ensure topSession is running.
         if (topSession->getState() == Session::NOT_STARTED) {
-            if (!mPacer->onSessionStarted(topSession->clientUid)) {
-                // Unfortunately this uid is out of quota for new sessions.
-                // Drop this sesion and try another one.
+            // Check if at least one client has quota to start the session.
+            bool keepForClient = false;
+            for (uid_t uid : topSession->allClientUids) {
+                if (mPacer->onSessionStarted(uid)) {
+                    keepForClient = true;
+                    // DO NOT break here, because book-keeping still needs to happen
+                    // for the other uids.
+                }
+            }
+            if (!keepForClient) {
+                // Unfortunately all uids requesting this session are out of quota.
+                // Drop this session and try the next one.
                 {
                     auto clientCallback = mSessionMap[topSession->key].callback.lock();
                     if (clientCallback != nullptr) {
@@ -484,8 +513,34 @@
     mCurrentSession = topSession;
 }
 
+void TranscodingSessionController::addUidToSession_l(uid_t clientUid,
+                                                     const SessionKeyType& sessionKey) {
+    // If it's an offline session, the queue was already added in constructor.
+    // If it's a real-time sessions, check if a queue is already present for the uid,
+    // and add a new queue if needed.
+    if (clientUid != OFFLINE_UID) {
+        if (mSessionQueues.count(clientUid) == 0) {
+            mUidPolicy->registerMonitorUid(clientUid);
+            if (mUidPolicy->isUidOnTop(clientUid)) {
+                mUidSortedList.push_front(clientUid);
+            } else {
+                // Shouldn't be submitting real-time requests from non-top app,
+                // put it in front of the offline queue.
+                mUidSortedList.insert(mOfflineUidIterator, clientUid);
+            }
+        } else if (clientUid != *mUidSortedList.begin()) {
+            if (mUidPolicy->isUidOnTop(clientUid)) {
+                mUidSortedList.remove(clientUid);
+                mUidSortedList.push_front(clientUid);
+            }
+        }
+    }
+    // Append this session to the uid's queue.
+    mSessionQueues[clientUid].push_back(sessionKey);
+}
+
 void TranscodingSessionController::removeSession_l(const SessionKeyType& sessionKey,
-                                                   Session::State finalState) {
+                                                   Session::State finalState, bool keepForOffline) {
     ALOGV("%s: session %s", __FUNCTION__, sessionToString(sessionKey).c_str());
 
     if (mSessionMap.count(sessionKey) == 0) {
@@ -494,26 +549,40 @@
     }
 
     // Remove session from uid's queue.
-    const uid_t uid = mSessionMap[sessionKey].clientUid;
-    SessionQueueType& sessionQueue = mSessionQueues[uid];
-    auto it = std::find(sessionQueue.begin(), sessionQueue.end(), sessionKey);
-    if (it == sessionQueue.end()) {
-        ALOGE("couldn't find session %s in queue for uid %d", sessionToString(sessionKey).c_str(),
-              uid);
-        return;
+    bool uidQueueRemoved = false;
+    for (uid_t uid : mSessionMap[sessionKey].allClientUids) {
+        if (keepForOffline && uid == OFFLINE_UID) {
+            continue;
+        }
+        SessionQueueType& sessionQueue = mSessionQueues[uid];
+        auto it = std::find(sessionQueue.begin(), sessionQueue.end(), sessionKey);
+        if (it == sessionQueue.end()) {
+            ALOGW("couldn't find session %s in queue for uid %d",
+                  sessionToString(sessionKey).c_str(), uid);
+            continue;
+        }
+        sessionQueue.erase(it);
+
+        // If this is the last session in a real-time queue, remove this uid's queue.
+        if (uid != OFFLINE_UID && sessionQueue.empty()) {
+            mUidSortedList.remove(uid);
+            mSessionQueues.erase(uid);
+            mUidPolicy->unregisterMonitorUid(uid);
+
+            uidQueueRemoved = true;
+        }
     }
-    sessionQueue.erase(it);
 
-    // If this is the last session in a real-time queue, remove this uid's queue.
-    if (uid != OFFLINE_UID && sessionQueue.empty()) {
-        mUidSortedList.remove(uid);
-        mSessionQueues.erase(uid);
-        mUidPolicy->unregisterMonitorUid(uid);
-
+    if (uidQueueRemoved) {
         std::unordered_set<uid_t> topUids = mUidPolicy->getTopUids();
         moveUidsToTop_l(topUids, false /*preserveTopUid*/);
     }
 
+    if (keepForOffline) {
+        mSessionMap[sessionKey].allClientUids = {OFFLINE_UID};
+        return;
+    }
+
     // Clear current session.
     if (mCurrentSession == &mSessionMap[sessionKey]) {
         mCurrentSession = nullptr;
@@ -522,8 +591,9 @@
     setSessionState_l(&mSessionMap[sessionKey], finalState);
 
     if (finalState == Session::FINISHED || finalState == Session::ERROR) {
-        mPacer->onSessionCompleted(mSessionMap[sessionKey].clientUid,
-                                   mSessionMap[sessionKey].runningTime);
+        for (uid_t uid : mSessionMap[sessionKey].allClientUids) {
+            mPacer->onSessionCompleted(uid, mSessionMap[sessionKey].runningTime);
+        }
     }
 
     mSessionHistory.push_back(mSessionMap[sessionKey]);
@@ -617,34 +687,13 @@
 
     // Add session to session map.
     mSessionMap[sessionKey].key = sessionKey;
-    mSessionMap[sessionKey].clientUid = clientUid;
     mSessionMap[sessionKey].callingUid = callingUid;
+    mSessionMap[sessionKey].allClientUids.insert(clientUid);
     mSessionMap[sessionKey].request = request;
     mSessionMap[sessionKey].callback = callback;
     setSessionState_l(&mSessionMap[sessionKey], Session::NOT_STARTED);
 
-    // If it's an offline session, the queue was already added in constructor.
-    // If it's a real-time sessions, check if a queue is already present for the uid,
-    // and add a new queue if needed.
-    if (clientUid != OFFLINE_UID) {
-        if (mSessionQueues.count(clientUid) == 0) {
-            mUidPolicy->registerMonitorUid(clientUid);
-            if (mUidPolicy->isUidOnTop(clientUid)) {
-                mUidSortedList.push_front(clientUid);
-            } else {
-                // Shouldn't be submitting real-time requests from non-top app,
-                // put it in front of the offline queue.
-                mUidSortedList.insert(mOfflineUidIterator, clientUid);
-            }
-        } else if (clientUid != *mUidSortedList.begin()) {
-            if (mUidPolicy->isUidOnTop(clientUid)) {
-                mUidSortedList.remove(clientUid);
-                mUidSortedList.push_front(clientUid);
-            }
-        }
-    }
-    // Append this session to the uid's queue.
-    mSessionQueues[clientUid].push_back(sessionKey);
+    addUidToSession_l(clientUid, sessionKey);
 
     updateCurrentSession_l();
 
@@ -657,14 +706,20 @@
 
     ALOGV("%s: session %s", __FUNCTION__, sessionToString(sessionKey).c_str());
 
-    std::list<SessionKeyType> sessionsToRemove;
+    std::list<SessionKeyType> sessionsToRemove, sessionsForOffline;
 
     std::scoped_lock lock{mLock};
 
     if (sessionId < 0) {
         for (auto it = mSessionMap.begin(); it != mSessionMap.end(); ++it) {
-            if (it->first.first == clientId && it->second.clientUid != OFFLINE_UID) {
-                sessionsToRemove.push_back(it->first);
+            if (it->first.first == clientId) {
+                // If there is offline request, only keep the offline client;
+                // otherwise remove the session.
+                if (it->second.allClientUids.count(OFFLINE_UID) > 0) {
+                    sessionsForOffline.push_back(it->first);
+                } else {
+                    sessionsToRemove.push_back(it->first);
+                }
             }
         }
     } else {
@@ -688,6 +743,10 @@
         removeSession_l(*it, Session::CANCELED);
     }
 
+    for (auto it = sessionsForOffline.begin(); it != sessionsForOffline.end(); ++it) {
+        removeSession_l(*it, Session::CANCELED, true /*keepForOffline*/);
+    }
+
     // Start next session.
     updateCurrentSession_l();
 
@@ -695,6 +754,51 @@
     return true;
 }
 
+bool TranscodingSessionController::addClientUid(ClientIdType clientId, SessionIdType sessionId,
+                                                uid_t clientUid) {
+    SessionKeyType sessionKey = std::make_pair(clientId, sessionId);
+
+    std::scoped_lock lock{mLock};
+
+    if (mSessionMap.count(sessionKey) == 0) {
+        ALOGE("session %s doesn't exist", sessionToString(sessionKey).c_str());
+        return false;
+    }
+
+    if (mSessionMap[sessionKey].allClientUids.count(clientUid) > 0) {
+        ALOGE("session %s already has uid %d", sessionToString(sessionKey).c_str(), clientUid);
+        return false;
+    }
+
+    mSessionMap[sessionKey].allClientUids.insert(clientUid);
+    addUidToSession_l(clientUid, sessionKey);
+
+    updateCurrentSession_l();
+
+    validateState_l();
+    return true;
+}
+
+bool TranscodingSessionController::getClientUids(ClientIdType clientId, SessionIdType sessionId,
+                                                 std::vector<int32_t>* out_clientUids) {
+    SessionKeyType sessionKey = std::make_pair(clientId, sessionId);
+
+    std::scoped_lock lock{mLock};
+
+    if (mSessionMap.count(sessionKey) == 0) {
+        ALOGE("session %s doesn't exist", sessionToString(sessionKey).c_str());
+        return false;
+    }
+
+    out_clientUids->clear();
+    for (uid_t uid : mSessionMap[sessionKey].allClientUids) {
+        if (uid != OFFLINE_UID) {
+            out_clientUids->push_back(uid);
+        }
+    }
+    return true;
+}
+
 bool TranscodingSessionController::getSession(ClientIdType clientId, SessionIdType sessionId,
                                               TranscodingRequestParcel* request) {
     SessionKeyType sessionKey = std::make_pair(clientId, sessionId);
@@ -938,7 +1042,8 @@
     LOG_ALWAYS_FATAL_IF(*mOfflineUidIterator != OFFLINE_UID,
                         "mOfflineUidIterator not pointing to offline uid");
     LOG_ALWAYS_FATAL_IF(mUidSortedList.size() != mSessionQueues.size(),
-                        "mUidList and mSessionQueues size mismatch");
+                        "mUidSortedList and mSessionQueues size mismatch, %zu vs %zu",
+                        mUidSortedList.size(), mSessionQueues.size());
 
     int32_t totalSessions = 0;
     for (auto uid : mUidSortedList) {
@@ -952,8 +1057,14 @@
 
         totalSessions += mSessionQueues[uid].size();
     }
-    LOG_ALWAYS_FATAL_IF(mSessionMap.size() != totalSessions,
-                        "mSessions size doesn't match total sessions counted from uid queues");
+    int32_t totalSessionsAlternative = 0;
+    for (auto const& s : mSessionMap) {
+        totalSessionsAlternative += s.second.allClientUids.size();
+    }
+    LOG_ALWAYS_FATAL_IF(totalSessions != totalSessionsAlternative,
+                        "session count (including dup) from mSessionQueues doesn't match that from "
+                        "mSessionMap, %d vs %d",
+                        totalSessions, totalSessionsAlternative);
 #endif  // VALIDATE_STATE
 }