transcoding: add uid state based scheduling policy

bug: 145233472
bug: 154734285

test: unit tests

Change-Id: I9e8038252c2be834eb4e2fb2945396572d37b036
diff --git a/media/libmediatranscoding/Android.bp b/media/libmediatranscoding/Android.bp
index 2753db4..30c2501 100644
--- a/media/libmediatranscoding/Android.bp
+++ b/media/libmediatranscoding/Android.bp
@@ -39,6 +39,7 @@
     srcs: [
         "TranscodingClientManager.cpp",
         "TranscodingJobScheduler.cpp",
+        "TranscodingUidPolicy.cpp",
     ],
 
     shared_libs: [
@@ -47,6 +48,7 @@
         "liblog",
         "libutils",
         "libmediatranscoder",
+        "libbinder",
     ],
 
     export_include_dirs: ["include"],
diff --git a/media/libmediatranscoding/TranscodingClientManager.cpp b/media/libmediatranscoding/TranscodingClientManager.cpp
index 6bc8613..fd87cb4 100644
--- a/media/libmediatranscoding/TranscodingClientManager.cpp
+++ b/media/libmediatranscoding/TranscodingClientManager.cpp
@@ -43,7 +43,8 @@
      * object doesn't get created again, otherwise the binder object pointer
      * may not be unique.
      */
-    SpAIBinder mClientCallback;
+    SpAIBinder mClientBinder;
+    std::shared_ptr<ITranscodingClientCallback> mClientCallback;
     /* A unique id assigned to the client by the service. This number is used
      * by the service for indexing. Here we use the binder object's pointer
      * (casted to int64t_t) as the client id.
@@ -78,8 +79,9 @@
         const std::shared_ptr<ITranscodingClientCallback>& callback, pid_t pid, uid_t uid,
         const std::string& clientName, const std::string& opPackageName,
         TranscodingClientManager* owner)
-      : mClientCallback((callback != nullptr) ? callback->asBinder() : nullptr),
-        mClientId((int64_t)mClientCallback.get()),
+      : mClientBinder((callback != nullptr) ? callback->asBinder() : nullptr),
+        mClientCallback(callback),
+        mClientId((int64_t)mClientBinder.get()),
         mClientPid(pid),
         mClientUid(uid),
         mClientName(clientName),
@@ -98,9 +100,8 @@
 
     int32_t jobId = mNextJobId.fetch_add(1);
 
-    *_aidl_return =
-            mOwner->mJobScheduler->submit(mClientId, jobId, mClientUid, in_request,
-                                          ITranscodingClientCallback::fromBinder(mClientCallback));
+    *_aidl_return = mOwner->mJobScheduler->submit(mClientId, jobId, mClientUid, in_request,
+                                                  mClientCallback);
 
     if (*_aidl_return) {
         out_job->jobId = jobId;
@@ -205,7 +206,7 @@
           (long long)client->mClientId, client->mClientPid, client->mClientUid,
           client->mClientName.c_str(), client->mClientOpPackageName.c_str());
 
-    AIBinder_linkToDeath(client->mClientCallback.get(), mDeathRecipient.get(),
+    AIBinder_linkToDeath(client->mClientBinder.get(), mDeathRecipient.get(),
                          reinterpret_cast<void*>(client.get()));
 
     // Adds the new client to the map.
@@ -227,7 +228,7 @@
         return INVALID_OPERATION;
     }
 
-    SpAIBinder callback = it->second->mClientCallback;
+    SpAIBinder callback = it->second->mClientBinder;
 
     // Check if the client still live. If alive, unlink the death.
     if (callback.get() != nullptr) {
diff --git a/media/libmediatranscoding/TranscodingJobScheduler.cpp b/media/libmediatranscoding/TranscodingJobScheduler.cpp
index 6e235c6..83d7a82 100644
--- a/media/libmediatranscoding/TranscodingJobScheduler.cpp
+++ b/media/libmediatranscoding/TranscodingJobScheduler.cpp
@@ -14,7 +14,7 @@
  * limitations under the License.
  */
 
-// #define LOG_NDEBUG 0
+//#define LOG_NDEBUG 0
 #define LOG_TAG "TranscodingJobScheduler"
 
 #define VALIDATE_STATE 1
@@ -106,6 +106,10 @@
     if (uid != OFFLINE_UID && jobQueue.empty()) {
         mUidSortedList.remove(uid);
         mJobQueues.erase(uid);
+        mUidPolicy->unregisterMonitorUid(uid);
+
+        std::unordered_set<uid_t> topUids = mUidPolicy->getTopUids();
+        moveUidsToTop_l(topUids, false /*preserveTopUid*/);
     }
 
     // Clear current job.
@@ -117,6 +121,59 @@
     mJobMap.erase(jobKey);
 }
 
+/**
+ * Moves the set of uids to the front of mUidSortedList (which is used to pick
+ * the next job to run).
+ *
+ * This is called when 1) we received a onTopUidsChanged() callbcak from UidPolicy,
+ * or 2) we removed the job queue for a uid because it becomes empty.
+ *
+ * In case of 1), if there are multiple uids in the set, and the current front
+ * uid in mUidSortedList is still in the set, we try to keep that uid at front
+ * so that current job run is not interrupted. (This is not a concern for case 2)
+ * because the queue for a uid was just removed entirely.)
+ */
+void TranscodingJobScheduler::moveUidsToTop_l(const std::unordered_set<uid_t>& uids,
+                                              bool preserveTopUid) {
+    // If uid set is empty, nothing to do. Do not change the queue status.
+    if (uids.empty()) {
+        return;
+    }
+
+    // Save the current top uid.
+    uid_t curTopUid = *mUidSortedList.begin();
+    bool pushCurTopToFront = false;
+    int32_t numUidsMoved = 0;
+
+    // Go through the sorted uid list once, and move the ones in top set to front.
+    for (auto it = mUidSortedList.begin(); it != mUidSortedList.end();) {
+        uid_t uid = *it;
+
+        if (uid != OFFLINE_UID && uids.count(uid) > 0) {
+            it = mUidSortedList.erase(it);
+
+            // If this is the top we're preserving, don't push it here, push
+            // it after the for-loop.
+            if (uid == curTopUid && preserveTopUid) {
+                pushCurTopToFront = true;
+            } else {
+                mUidSortedList.push_front(uid);
+            }
+
+            // If we found all uids in the set, break out.
+            if (++numUidsMoved == uids.size()) {
+                break;
+            }
+        } else {
+            ++it;
+        }
+    }
+
+    if (pushCurTopToFront) {
+        mUidSortedList.push_front(curTopUid);
+    }
+}
+
 bool TranscodingJobScheduler::submit(ClientIdType clientId, int32_t jobId, uid_t uid,
                                      const TranscodingRequestParcel& request,
                                      const std::weak_ptr<ITranscodingClientCallback>& callback) {
@@ -150,6 +207,7 @@
     // and add a new queue if needed.
     if (uid != OFFLINE_UID) {
         if (mJobQueues.count(uid) == 0) {
+            mUidPolicy->registerMonitorUid(uid);
             if (mUidPolicy->isUidOnTop(uid)) {
                 mUidSortedList.push_front(uid);
             } else {
@@ -222,7 +280,7 @@
     std::scoped_lock lock{mLock};
 
     if (mJobMap.count(jobKey) == 0) {
-        ALOGW("ignoring abort for non-existent job");
+        ALOGW("ignoring finish for non-existent job");
         return;
     }
 
@@ -230,7 +288,7 @@
     // to client if the job is paused. Transcoder could have posted finish when
     // we're pausing it, and the finish arrived after we changed current job.
     if (mJobMap[jobKey].state == Job::NOT_STARTED) {
-        ALOGW("ignoring abort for job that was never started");
+        ALOGW("ignoring finish for job that was never started");
         return;
     }
 
@@ -258,7 +316,7 @@
     std::scoped_lock lock{mLock};
 
     if (mJobMap.count(jobKey) == 0) {
-        ALOGW("ignoring abort for non-existent job");
+        ALOGW("ignoring error for non-existent job");
         return;
     }
 
@@ -266,7 +324,7 @@
     // to client if the job is paused. Transcoder could have posted finish when
     // we're pausing it, and the finish arrived after we changed current job.
     if (mJobMap[jobKey].state == Job::NOT_STARTED) {
-        ALOGW("ignoring abort for job that was never started");
+        ALOGW("ignoring error for job that was never started");
         return;
     }
 
@@ -286,6 +344,34 @@
     validateState_l();
 }
 
+void TranscodingJobScheduler::onProgressUpdate(int64_t clientId, int32_t jobId, int32_t progress) {
+    JobKeyType jobKey = std::make_pair(clientId, jobId);
+
+    ALOGV("%s: job %s, progress %d", __FUNCTION__, jobToString(jobKey).c_str(), progress);
+
+    std::scoped_lock lock{mLock};
+
+    if (mJobMap.count(jobKey) == 0) {
+        ALOGW("ignoring progress for non-existent job");
+        return;
+    }
+
+    // Only ignore if job was never started. In particular, propagate the status
+    // to client if the job is paused. Transcoder could have posted finish when
+    // we're pausing it, and the finish arrived after we changed current job.
+    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
+        ALOGW("ignoring progress for job that was never started");
+        return;
+    }
+
+    {
+        auto clientCallback = mJobMap[jobKey].callback.lock();
+        if (clientCallback != nullptr) {
+            clientCallback->onProgressUpdate(jobId, progress);
+        }
+    }
+}
+
 void TranscodingJobScheduler::onResourceLost() {
     ALOGV("%s", __FUNCTION__);
 
@@ -302,28 +388,25 @@
     validateState_l();
 }
 
-void TranscodingJobScheduler::onTopUidChanged(uid_t uid) {
-    ALOGV("%s: uid %d", __FUNCTION__, uid);
+void TranscodingJobScheduler::onTopUidsChanged(const std::unordered_set<uid_t>& uids) {
+    if (uids.empty()) {
+        ALOGW("%s: ignoring empty uids", __FUNCTION__);
+        return;
+    }
+
+    std::string uidStr;
+    for (auto it = uids.begin(); it != uids.end(); it++) {
+        if (!uidStr.empty()) {
+            uidStr += ", ";
+        }
+        uidStr += std::to_string(*it);
+    }
+
+    ALOGD("%s: topUids: size %zu, uids: %s", __FUNCTION__, uids.size(), uidStr.c_str());
 
     std::scoped_lock lock{mLock};
 
-    if (uid == OFFLINE_UID) {
-        ALOGW("%s: ignoring invalid uid %d", __FUNCTION__, uid);
-        return;
-    }
-    // If this uid doesn't have any jobs, we don't care about it.
-    if (mJobQueues.count(uid) == 0) {
-        ALOGW("%s: ignoring uid %d without any jobs", __FUNCTION__, uid);
-        return;
-    }
-    // If this uid is already top, don't do anything.
-    if (uid == *mUidSortedList.begin()) {
-        ALOGW("%s: uid %d is already top", __FUNCTION__, uid);
-        return;
-    }
-
-    mUidSortedList.remove(uid);
-    mUidSortedList.push_front(uid);
+    moveUidsToTop_l(uids, true /*preserveTopUid*/);
 
     updateCurrentJob_l();
 
diff --git a/media/libmediatranscoding/TranscodingUidPolicy.cpp b/media/libmediatranscoding/TranscodingUidPolicy.cpp
new file mode 100644
index 0000000..9c8d3fe
--- /dev/null
+++ b/media/libmediatranscoding/TranscodingUidPolicy.cpp
@@ -0,0 +1,247 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// #define LOG_NDEBUG 0
+#define LOG_TAG "TranscodingUidPolicy"
+
+#include <binder/ActivityManager.h>
+#include <cutils/misc.h>  // FIRST_APPLICATION_UID
+#include <inttypes.h>
+#include <media/TranscodingUidPolicy.h>
+#include <utils/Log.h>
+
+#include <utility>
+
+namespace android {
+
+constexpr static uid_t OFFLINE_UID = -1;
+constexpr static const char* kTranscodingTag = "transcoding";
+
+struct TranscodingUidPolicy::UidObserver : public BnUidObserver,
+                                           public virtual IBinder::DeathRecipient {
+    explicit UidObserver(TranscodingUidPolicy* owner) : mOwner(owner) {}
+
+    // IUidObserver
+    void onUidGone(uid_t uid, bool disabled) override;
+    void onUidActive(uid_t uid) override;
+    void onUidIdle(uid_t uid, bool disabled) override;
+    void onUidStateChanged(uid_t uid, int32_t procState, int64_t procStateSeq,
+                           int32_t capability) override;
+
+    // IBinder::DeathRecipient implementation
+    void binderDied(const wp<IBinder>& who) override;
+
+    TranscodingUidPolicy* mOwner;
+};
+
+void TranscodingUidPolicy::UidObserver::onUidGone(uid_t uid __unused, bool disabled __unused) {}
+
+void TranscodingUidPolicy::UidObserver::onUidActive(uid_t uid __unused) {}
+
+void TranscodingUidPolicy::UidObserver::onUidIdle(uid_t uid __unused, bool disabled __unused) {}
+
+void TranscodingUidPolicy::UidObserver::onUidStateChanged(uid_t uid, int32_t procState,
+                                                          int64_t procStateSeq __unused,
+                                                          int32_t capability __unused) {
+    mOwner->onUidStateChanged(uid, procState);
+}
+
+void TranscodingUidPolicy::UidObserver::binderDied(const wp<IBinder>& /*who*/) {
+    ALOGW("TranscodingUidPolicy: ActivityManager has died");
+    // TODO(chz): this is a rare event (since if the AMS is dead, the system is
+    // probably dead as well). But we should try to reconnect.
+    mOwner->setUidObserverRegistered(false);
+}
+
+////////////////////////////////////////////////////////////////////////////
+
+TranscodingUidPolicy::TranscodingUidPolicy()
+      : mAm(std::make_shared<ActivityManager>()),
+        mUidObserver(new UidObserver(this)),
+        mRegistered(false),
+        mTopUidState(ActivityManager::PROCESS_STATE_UNKNOWN) {
+    registerSelf();
+}
+
+TranscodingUidPolicy::~TranscodingUidPolicy() {
+    unregisterSelf();
+}
+
+void TranscodingUidPolicy::registerSelf() {
+    status_t res = mAm->linkToDeath(mUidObserver.get());
+    mAm->registerUidObserver(
+            mUidObserver.get(),
+            ActivityManager::UID_OBSERVER_GONE | ActivityManager::UID_OBSERVER_IDLE |
+                    ActivityManager::UID_OBSERVER_ACTIVE | ActivityManager::UID_OBSERVER_PROCSTATE,
+            ActivityManager::PROCESS_STATE_UNKNOWN, String16(kTranscodingTag));
+
+    if (res == OK) {
+        Mutex::Autolock _l(mUidLock);
+
+        mRegistered = true;
+        ALOGI("TranscodingUidPolicy: Registered with ActivityManager");
+    } else {
+        mAm->unregisterUidObserver(mUidObserver.get());
+    }
+}
+
+void TranscodingUidPolicy::unregisterSelf() {
+    mAm->unregisterUidObserver(mUidObserver.get());
+    mAm->unlinkToDeath(mUidObserver.get());
+
+    Mutex::Autolock _l(mUidLock);
+
+    mRegistered = false;
+
+    ALOGI("TranscodingUidPolicy: Unregistered with ActivityManager");
+}
+
+void TranscodingUidPolicy::setUidObserverRegistered(bool registered) {
+    Mutex::Autolock _l(mUidLock);
+
+    mRegistered = registered;
+}
+
+void TranscodingUidPolicy::setCallback(const std::shared_ptr<UidPolicyCallbackInterface>& cb) {
+    mUidPolicyCallback = cb;
+}
+
+void TranscodingUidPolicy::registerMonitorUid(uid_t uid) {
+    Mutex::Autolock _l(mUidLock);
+    if (uid == OFFLINE_UID) {
+        ALOGW("Ignoring the offline uid");
+        return;
+    }
+    if (mUidStateMap.find(uid) != mUidStateMap.end()) {
+        ALOGE("%s: Trying to register uid: %d which is already monitored!", __FUNCTION__, uid);
+        return;
+    }
+
+    int32_t state = ActivityManager::PROCESS_STATE_UNKNOWN;
+    if (mRegistered && mAm->isUidActiveOrForeground(uid, String16(kTranscodingTag))) {
+        state = mAm->getUidProcessState(uid, String16(kTranscodingTag));
+    }
+
+    ALOGV("%s: inserting new uid: %u, procState %d", __FUNCTION__, uid, state);
+
+    mUidStateMap.emplace(std::pair<uid_t, int32_t>(uid, state));
+    mStateUidMap[state].insert(uid);
+
+    updateTopUid_l();
+}
+
+void TranscodingUidPolicy::unregisterMonitorUid(uid_t uid) {
+    Mutex::Autolock _l(mUidLock);
+
+    auto it = mUidStateMap.find(uid);
+    if (it == mUidStateMap.end()) {
+        ALOGE("%s: Trying to unregister uid: %d which is not monitored!", __FUNCTION__, uid);
+        return;
+    }
+
+    auto stateIt = mStateUidMap.find(it->second);
+    if (stateIt != mStateUidMap.end()) {
+        stateIt->second.erase(uid);
+        if (stateIt->second.empty()) {
+            mStateUidMap.erase(stateIt);
+        }
+    }
+    mUidStateMap.erase(it);
+
+    updateTopUid_l();
+}
+
+bool TranscodingUidPolicy::isUidOnTop(uid_t uid) {
+    Mutex::Autolock _l(mUidLock);
+
+    return mTopUidState != ActivityManager::PROCESS_STATE_UNKNOWN &&
+           mTopUidState == getProcState_l(uid);
+}
+
+std::unordered_set<uid_t> TranscodingUidPolicy::getTopUids() const {
+    Mutex::Autolock _l(mUidLock);
+
+    if (mTopUidState == ActivityManager::PROCESS_STATE_UNKNOWN) {
+        return std::unordered_set<uid_t>();
+    }
+
+    return mStateUidMap.at(mTopUidState);
+}
+
+void TranscodingUidPolicy::onUidStateChanged(uid_t uid, int32_t procState) {
+    ALOGV("onUidStateChanged: %u, procState %d", uid, procState);
+
+    bool topUidSetChanged = false;
+    std::unordered_set<uid_t> topUids;
+    {
+        Mutex::Autolock _l(mUidLock);
+        auto it = mUidStateMap.find(uid);
+        if (it != mUidStateMap.end() && it->second != procState) {
+            // Top set changed if 1) the uid is in the current top uid set, or 2) the
+            // new procState is at least the same priority as the current top uid state.
+            bool isUidCurrentTop = mTopUidState != ActivityManager::PROCESS_STATE_UNKNOWN &&
+                                   mStateUidMap[mTopUidState].count(uid) > 0;
+            bool isNewStateHigherThanTop = procState != ActivityManager::PROCESS_STATE_UNKNOWN &&
+                                           (procState <= mTopUidState ||
+                                            mTopUidState == ActivityManager::PROCESS_STATE_UNKNOWN);
+            topUidSetChanged = (isUidCurrentTop || isNewStateHigherThanTop);
+
+            // Move uid to the new procState.
+            mStateUidMap[it->second].erase(uid);
+            mStateUidMap[procState].insert(uid);
+            it->second = procState;
+
+            if (topUidSetChanged) {
+                updateTopUid_l();
+
+                // Make a copy of the uid set for callback.
+                topUids = mStateUidMap[mTopUidState];
+            }
+        }
+    }
+
+    ALOGV("topUidSetChanged: %d", topUidSetChanged);
+
+    if (topUidSetChanged) {
+        auto callback = mUidPolicyCallback.lock();
+        if (callback != nullptr) {
+            callback->onTopUidsChanged(topUids);
+        }
+    }
+}
+
+void TranscodingUidPolicy::updateTopUid_l() {
+    // Update top uid state.
+    mTopUidState = ActivityManager::PROCESS_STATE_UNKNOWN;
+    for (auto stateIt = mStateUidMap.begin(); stateIt != mStateUidMap.end(); stateIt++) {
+        if (stateIt->first != ActivityManager::PROCESS_STATE_UNKNOWN && !stateIt->second.empty()) {
+            mTopUidState = stateIt->first;
+            break;
+        }
+    }
+
+    ALOGV("%s: top uid state is %d", __FUNCTION__, mTopUidState);
+}
+
+int32_t TranscodingUidPolicy::getProcState_l(uid_t uid) {
+    auto it = mUidStateMap.find(uid);
+    if (it != mUidStateMap.end()) {
+        return it->second;
+    }
+    return ActivityManager::PROCESS_STATE_UNKNOWN;
+}
+
+}  // namespace android
diff --git a/media/libmediatranscoding/include/media/TranscoderInterface.h b/media/libmediatranscoding/include/media/TranscoderInterface.h
index d74135a..a2afa00 100644
--- a/media/libmediatranscoding/include/media/TranscoderInterface.h
+++ b/media/libmediatranscoding/include/media/TranscoderInterface.h
@@ -43,6 +43,7 @@
     // TODO(chz): determine what parameters are needed here.
     virtual void onFinish(int64_t clientId, int32_t jobId) = 0;
     virtual void onError(int64_t clientId, int32_t jobId, TranscodingErrorCode err) = 0;
+    virtual void onProgressUpdate(int64_t clientId, int32_t jobId, int32_t progress) = 0;
 
     // Called when transcoding becomes temporarily inaccessible due to loss of resource.
     // If there is any job currently running, it will be paused. When resource contention
diff --git a/media/libmediatranscoding/include/media/TranscodingJobScheduler.h b/media/libmediatranscoding/include/media/TranscodingJobScheduler.h
index 77f6404..0838977 100644
--- a/media/libmediatranscoding/include/media/TranscodingJobScheduler.h
+++ b/media/libmediatranscoding/include/media/TranscodingJobScheduler.h
@@ -49,11 +49,12 @@
     // TranscoderCallbackInterface
     void onFinish(ClientIdType clientId, int32_t jobId) override;
     void onError(int64_t clientId, int32_t jobId, TranscodingErrorCode err) override;
+    void onProgressUpdate(int64_t clientId, int32_t jobId, int32_t progress) override;
     void onResourceLost() override;
     // ~TranscoderCallbackInterface
 
     // UidPolicyCallbackInterface
-    void onTopUidChanged(uid_t uid) override;
+    void onTopUidsChanged(const std::unordered_set<uid_t>& uids) override;
     void onResourceAvailable() override;
     // ~UidPolicyCallbackInterface
 
@@ -103,6 +104,7 @@
     Job* getTopJob_l();
     void updateCurrentJob_l();
     void removeJob_l(const JobKeyType& jobKey);
+    void moveUidsToTop_l(const std::unordered_set<uid_t>& uids, bool preserveTopUid);
 
     // Internal state verifier (debug only)
     void validateState_l();
diff --git a/media/libmediatranscoding/include/media/TranscodingUidPolicy.h b/media/libmediatranscoding/include/media/TranscodingUidPolicy.h
new file mode 100644
index 0000000..27dadd2
--- /dev/null
+++ b/media/libmediatranscoding/include/media/TranscodingUidPolicy.h
@@ -0,0 +1,71 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ANDROID_MEDIA_TRANSCODING_UID_POLICY_H
+#define ANDROID_MEDIA_TRANSCODING_UID_POLICY_H
+
+#include <aidl/android/media/ITranscodingClient.h>
+#include <aidl/android/media/ITranscodingClientCallback.h>
+#include <media/UidPolicyInterface.h>
+#include <sys/types.h>
+#include <utils/Condition.h>
+#include <utils/RefBase.h>
+#include <utils/String8.h>
+#include <utils/Vector.h>
+
+#include <map>
+#include <mutex>
+#include <unordered_map>
+#include <unordered_set>
+
+namespace android {
+
+class ActivityManager;
+// Observer for UID lifecycle and provide information about the uid's app
+// priority used by the job scheduler.
+class TranscodingUidPolicy : public UidPolicyInterface {
+public:
+    explicit TranscodingUidPolicy();
+    ~TranscodingUidPolicy();
+
+    // UidPolicyInterface
+    bool isUidOnTop(uid_t uid) override;
+    void registerMonitorUid(uid_t uid) override;
+    void unregisterMonitorUid(uid_t uid) override;
+    std::unordered_set<uid_t> getTopUids() const override;
+    void setCallback(const std::shared_ptr<UidPolicyCallbackInterface>& cb) override;
+
+private:
+    void onUidStateChanged(uid_t uid, int32_t procState);
+    void setUidObserverRegistered(bool registerd);
+    void registerSelf();
+    void unregisterSelf();
+    int32_t getProcState_l(uid_t uid) NO_THREAD_SAFETY_ANALYSIS;
+    void updateTopUid_l() NO_THREAD_SAFETY_ANALYSIS;
+
+    struct UidObserver;
+    mutable Mutex mUidLock;
+    std::shared_ptr<ActivityManager> mAm;
+    sp<UidObserver> mUidObserver;
+    bool mRegistered GUARDED_BY(mUidLock);
+    int32_t mTopUidState GUARDED_BY(mUidLock);
+    std::unordered_map<uid_t, int32_t> mUidStateMap GUARDED_BY(mUidLock);
+    std::map<int32_t, std::unordered_set<uid_t>> mStateUidMap GUARDED_BY(mUidLock);
+    std::weak_ptr<UidPolicyCallbackInterface> mUidPolicyCallback;
+};  // class TranscodingUidPolicy
+
+}  // namespace android
+#endif  // ANDROID_MEDIA_TRANSCODING_SERVICE_H
diff --git a/media/libmediatranscoding/include/media/UidPolicyInterface.h b/media/libmediatranscoding/include/media/UidPolicyInterface.h
index e9a9da4..dc28027 100644
--- a/media/libmediatranscoding/include/media/UidPolicyInterface.h
+++ b/media/libmediatranscoding/include/media/UidPolicyInterface.h
@@ -17,29 +17,37 @@
 #ifndef ANDROID_MEDIA_UID_POLICY_INTERFACE_H
 #define ANDROID_MEDIA_UID_POLICY_INTERFACE_H
 
+#include <unordered_set>
+
 namespace android {
 
+class UidPolicyCallbackInterface;
+
 // Interface for the scheduler to query a uid's info.
 class UidPolicyInterface {
 public:
-    // Determines if a uid is currently running as top.
-    // TODO(chz): this should probably be replaced by a query that determines
-    // which uid has the highest priority among a given set of uids.
+    // Instruct the uid policy to start monitoring a uid.
+    virtual void registerMonitorUid(uid_t uid) = 0;
+    // Instruct the uid policy to stop monitoring a uid.
+    virtual void unregisterMonitorUid(uid_t uid) = 0;
+    // Whether a uid is among the set of uids that's currently top priority.
     virtual bool isUidOnTop(uid_t uid) = 0;
+    // Retrieves the set of uids that's currently top priority.
+    virtual std::unordered_set<uid_t> getTopUids() const = 0;
+    // Set the associated callback interface to send the events when uid states change.
+    virtual void setCallback(const std::shared_ptr<UidPolicyCallbackInterface>& cb) = 0;
 
 protected:
     virtual ~UidPolicyInterface() = default;
 };
 
-// Interface for notifying the scheduler of a change in a uid's state or
+// Interface for notifying the scheduler of a change in uid states or
 // transcoding resource availability.
 class UidPolicyCallbackInterface {
 public:
-    // Called when a uid is brought to top.
-    // TODO(chz): this should probably be replace by a callback when the uid
-    // that was previously identified being the highest priority as in
-    // UidPolicyInterface::isUidOnTop() has changed in priority.
-    virtual void onTopUidChanged(uid_t uid) = 0;
+    // Called when the set of uids that's top priority among the uids of interest
+    // has changed. The receiver of this callback should adjust accordingly.
+    virtual void onTopUidsChanged(const std::unordered_set<uid_t>& uids) = 0;
 
     // Called when resources become available for transcoding use. The scheduler
     // may use this as a signal to attempt restart transcoding activity that
diff --git a/media/libmediatranscoding/tests/Android.bp b/media/libmediatranscoding/tests/Android.bp
index 1017d29..b54022a 100644
--- a/media/libmediatranscoding/tests/Android.bp
+++ b/media/libmediatranscoding/tests/Android.bp
@@ -37,6 +37,9 @@
     srcs: ["TranscodingClientManager_tests.cpp"],
 }
 
+//
+// TranscodingJobScheduler unit test
+//
 cc_test {
     name: "TranscodingJobScheduler_tests",
     defaults: ["libmediatranscoding_test_defaults"],
diff --git a/media/libmediatranscoding/tests/TranscodingJobScheduler_tests.cpp b/media/libmediatranscoding/tests/TranscodingJobScheduler_tests.cpp
index 6bc9e20..95edf1d 100644
--- a/media/libmediatranscoding/tests/TranscodingJobScheduler_tests.cpp
+++ b/media/libmediatranscoding/tests/TranscodingJobScheduler_tests.cpp
@@ -31,6 +31,8 @@
 #include <media/TranscodingJobScheduler.h>
 #include <utils/Log.h>
 
+#include <unordered_set>
+
 namespace android {
 
 using Status = ::ndk::ScopedAStatus;
@@ -47,10 +49,39 @@
 #define JOB(n) (kClientJobId + (n))
 #define UID(n) (kClientUid + (n))
 
-class TestCallback : public TranscoderInterface, public UidPolicyInterface {
+class TestUidPolicy : public UidPolicyInterface {
 public:
-    TestCallback() : mTopUid(kInvalidUid), mLastError(TranscodingErrorCode::kUnknown) {}
-    virtual ~TestCallback() {}
+    TestUidPolicy() = default;
+    virtual ~TestUidPolicy() = default;
+
+    // UidPolicyInterface
+    void registerMonitorUid(uid_t /*uid*/) override {}
+    void unregisterMonitorUid(uid_t /*uid*/) override {}
+    bool isUidOnTop(uid_t uid) override { return mTopUids.count(uid) > 0; }
+    std::unordered_set<uid_t> getTopUids() const override { return mTopUids; }
+    void setCallback(const std::shared_ptr<UidPolicyCallbackInterface>& cb) override {
+        mUidPolicyCallback = cb;
+    }
+    void setTop(uid_t uid) {
+        std::unordered_set<uid_t> uids = {uid};
+        setTop(uids);
+    }
+    void setTop(const std::unordered_set<uid_t>& uids) {
+        mTopUids = uids;
+        auto uidPolicyCb = mUidPolicyCallback.lock();
+        if (uidPolicyCb != nullptr) {
+            uidPolicyCb->onTopUidsChanged(mTopUids);
+        }
+    }
+
+    std::unordered_set<uid_t> mTopUids;
+    std::weak_ptr<UidPolicyCallbackInterface> mUidPolicyCallback;
+};
+
+class TestTranscoder : public TranscoderInterface {
+public:
+    TestTranscoder() : mLastError(TranscodingErrorCode::kUnknown) {}
+    virtual ~TestTranscoder() {}
 
     // TranscoderInterface
     void start(int64_t clientId, int32_t jobId) override {
@@ -63,9 +94,6 @@
         mEventQueue.push_back(Resume(clientId, jobId));
     }
 
-    // UidPolicyInterface
-    bool isUidOnTop(uid_t uid) override { return uid == mTopUid; }
-
     void onFinished(int64_t clientId, int32_t jobId) {
         mEventQueue.push_back(Finished(clientId, jobId));
     }
@@ -75,8 +103,6 @@
         mEventQueue.push_back(Failed(clientId, jobId));
     }
 
-    void setTop(uid_t uid) { mTopUid = uid; }
-
     TranscodingErrorCode getLastError() {
         TranscodingErrorCode result = mLastError;
         mLastError = TranscodingErrorCode::kUnknown;
@@ -115,16 +141,16 @@
 private:
     Event mPoppedEvent;
     std::list<Event> mEventQueue;
-    uid_t mTopUid;
     TranscodingErrorCode mLastError;
 };
 
-bool operator==(const TestCallback::Event& lhs, const TestCallback::Event& rhs) {
+bool operator==(const TestTranscoder::Event& lhs, const TestTranscoder::Event& rhs) {
     return lhs.type == rhs.type && lhs.clientId == rhs.clientId && lhs.jobId == rhs.jobId;
 }
 
 struct TestClientCallback : public BnTranscodingClientCallback {
-    TestClientCallback(TestCallback* owner, int64_t clientId) : mOwner(owner), mClientId(clientId) {
+    TestClientCallback(TestTranscoder* owner, int64_t clientId)
+          : mOwner(owner), mClientId(clientId) {
         ALOGD("TestClient Created");
     }
 
@@ -152,7 +178,7 @@
     virtual ~TestClientCallback() { ALOGI("TestClient destroyed"); };
 
 private:
-    TestCallback* mOwner;
+    TestTranscoder* mOwner;
     int64_t mClientId;
     TestClientCallback(const TestClientCallback&) = delete;
     TestClientCallback& operator=(const TestClientCallback&) = delete;
@@ -164,27 +190,30 @@
 
     void SetUp() override {
         ALOGI("TranscodingJobSchedulerTest set up");
-        mCallback.reset(new TestCallback());
-        mScheduler.reset(new TranscodingJobScheduler(mCallback, mCallback));
+        mTranscoder.reset(new TestTranscoder());
+        mUidPolicy.reset(new TestUidPolicy());
+        mScheduler.reset(new TranscodingJobScheduler(mTranscoder, mUidPolicy));
+        mUidPolicy->setCallback(mScheduler);
 
         // Set priority only, ignore other fields for now.
         mOfflineRequest.priority = TranscodingJobPriority::kUnspecified;
         mRealtimeRequest.priority = TranscodingJobPriority::kHigh;
         mClientCallback0 =
-                ::ndk::SharedRefBase::make<TestClientCallback>(mCallback.get(), CLIENT(0));
+                ::ndk::SharedRefBase::make<TestClientCallback>(mTranscoder.get(), CLIENT(0));
         mClientCallback1 =
-                ::ndk::SharedRefBase::make<TestClientCallback>(mCallback.get(), CLIENT(1));
+                ::ndk::SharedRefBase::make<TestClientCallback>(mTranscoder.get(), CLIENT(1));
         mClientCallback2 =
-                ::ndk::SharedRefBase::make<TestClientCallback>(mCallback.get(), CLIENT(2));
+                ::ndk::SharedRefBase::make<TestClientCallback>(mTranscoder.get(), CLIENT(2));
         mClientCallback3 =
-                ::ndk::SharedRefBase::make<TestClientCallback>(mCallback.get(), CLIENT(3));
+                ::ndk::SharedRefBase::make<TestClientCallback>(mTranscoder.get(), CLIENT(3));
     }
 
     void TearDown() override { ALOGI("TranscodingJobSchedulerTest tear down"); }
 
     ~TranscodingJobSchedulerTest() { ALOGD("TranscodingJobSchedulerTest destroyed"); }
 
-    std::shared_ptr<TestCallback> mCallback;
+    std::shared_ptr<TestTranscoder> mTranscoder;
+    std::shared_ptr<TestUidPolicy> mUidPolicy;
     std::shared_ptr<TranscodingJobScheduler> mScheduler;
     TranscodingRequestParcel mOfflineRequest;
     TranscodingRequestParcel mRealtimeRequest;
@@ -198,44 +227,42 @@
     ALOGD("TestSubmitJob");
 
     // Start with UID(1) on top.
-    mCallback->setTop(UID(1));
+    mUidPolicy->setTop(UID(1));
 
     // Submit offline job to CLIENT(0) in UID(0).
     // Should start immediately (because this is the only job).
     mScheduler->submit(CLIENT(0), JOB(0), UID(0), mOfflineRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), 0));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), 0));
 
     // Submit real-time job to CLIENT(0).
     // Should pause offline job and start new job,  even if UID(0) is not on top.
     mScheduler->submit(CLIENT(0), JOB(1), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(1)));
 
     // Submit real-time job to CLIENT(0), should be queued after the previous job.
     mScheduler->submit(CLIENT(0), JOB(2), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Submit real-time job to CLIENT(1) in same uid, should be queued after the previous job.
     mScheduler->submit(CLIENT(1), JOB(0), UID(0), mRealtimeRequest, mClientCallback1);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Submit real-time job to CLIENT(2) in UID(1).
-    // Should pause previous job and start new job, because UID(1) is top.
-    mCallback->setTop(UID(1));
+    // Should pause previous job and start new job, because UID(1) is (has been) top.
     mScheduler->submit(CLIENT(2), JOB(0), UID(1), mRealtimeRequest, mClientCallback2);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(1)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(2), JOB(0)));
 
     // Submit offline job, shouldn't generate any event.
     mScheduler->submit(CLIENT(2), JOB(1), UID(1), mOfflineRequest, mClientCallback2);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
-    mCallback->setTop(UID(0));
-    // Submit real-time job to CLIENT(1) in UID(0).
+    // Bring UID(0) to top.
+    mUidPolicy->setTop(UID(0));
     // Should pause current job, and resume last job in UID(0).
-    mScheduler->submit(CLIENT(1), JOB(1), UID(0), mRealtimeRequest, mClientCallback1);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(2), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Resume(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), JOB(1)));
 }
 
 TEST_F(TranscodingJobSchedulerTest, TestCancelJob) {
@@ -243,15 +270,15 @@
 
     // Submit real-time job JOB(0), should start immediately.
     mScheduler->submit(CLIENT(0), JOB(0), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(0)));
 
     // Submit real-time job JOB(1), should not start.
     mScheduler->submit(CLIENT(0), JOB(1), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Submit offline job JOB(2), should not start.
     mScheduler->submit(CLIENT(0), JOB(2), UID(0), mOfflineRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Cancel queued real-time job.
     // Cancel real-time job JOB(1), should be cancelled.
@@ -263,224 +290,283 @@
 
     // Submit offline job JOB(3), shouldn't cause any event.
     mScheduler->submit(CLIENT(0), JOB(3), UID(0), mOfflineRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Cancel running real-time job JOB(0).
     // - Should be paused first then cancelled.
     // - Should also start offline job JOB(2) because real-time queue is empty.
     EXPECT_TRUE(mScheduler->cancel(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(3)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(3)));
 }
 
 TEST_F(TranscodingJobSchedulerTest, TestFinishJob) {
     ALOGD("TestFinishJob");
 
-    // Fail without any jobs submitted, should be ignored.
+    // Start with unspecified top UID.
+    // Finish without any jobs submitted, should be ignored.
     mScheduler->onFinish(CLIENT(0), JOB(0));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Submit offline job JOB(0), should start immediately.
     mScheduler->submit(CLIENT(0), JOB(0), UID(0), mOfflineRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(0)));
 
     // Submit real-time job JOB(1), should pause offline job and start immediately.
     mScheduler->submit(CLIENT(0), JOB(1), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(1)));
 
     // Submit real-time job JOB(2), should not start.
     mScheduler->submit(CLIENT(0), JOB(2), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
-    // Fail when the job never started, should be ignored.
+    // Finish when the job never started, should be ignored.
     mScheduler->onFinish(CLIENT(0), JOB(2));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // UID(1) moves to top.
-    mCallback->setTop(UID(1));
+    mUidPolicy->setTop(UID(1));
     // Submit real-time job to CLIENT(1) in UID(1), should pause previous job and start new job.
     mScheduler->submit(CLIENT(1), JOB(0), UID(1), mRealtimeRequest, mClientCallback1);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(1)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(1), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(1), JOB(0)));
 
-    // Simulate Fail that arrived late, after pause issued by scheduler.
+    // Simulate Finish that arrived late, after pause issued by scheduler.
     // Should still be propagated to client, but shouldn't trigger any new start.
     mScheduler->onFinish(CLIENT(0), JOB(1));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Finished(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(0), JOB(1)));
 
-    // Fail running real-time job, should start next real-time job in queue.
+    // Finish running real-time job, should start next real-time job in queue.
     mScheduler->onFinish(CLIENT(1), JOB(0));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Finished(CLIENT(1), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(2)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(1), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(2)));
 
-    // Fail running real-time job, should resume next job (offline job) in queue.
+    // Finish running real-time job, should resume next job (offline job) in queue.
     mScheduler->onFinish(CLIENT(0), JOB(2));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Finished(CLIENT(0), JOB(2)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Resume(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(0), JOB(2)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), JOB(0)));
 
-    // Fail running offline job.
+    // Finish running offline job.
     mScheduler->onFinish(CLIENT(0), JOB(0));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Finished(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(0), JOB(0)));
 
-    // Duplicate fail for last job, should be ignored.
+    // Duplicate finish for last job, should be ignored.
     mScheduler->onFinish(CLIENT(0), JOB(0));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 }
 
 TEST_F(TranscodingJobSchedulerTest, TestFailJob) {
     ALOGD("TestFailJob");
 
+    // Start with unspecified top UID.
     // Fail without any jobs submitted, should be ignored.
     mScheduler->onError(CLIENT(0), JOB(0), TranscodingErrorCode::kUnknown);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Submit offline job JOB(0), should start immediately.
     mScheduler->submit(CLIENT(0), JOB(0), UID(0), mOfflineRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(0)));
 
     // Submit real-time job JOB(1), should pause offline job and start immediately.
     mScheduler->submit(CLIENT(0), JOB(1), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(1)));
 
     // Submit real-time job JOB(2), should not start.
     mScheduler->submit(CLIENT(0), JOB(2), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Fail when the job never started, should be ignored.
     mScheduler->onError(CLIENT(0), JOB(2), TranscodingErrorCode::kUnknown);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // UID(1) moves to top.
-    mCallback->setTop(UID(1));
+    mUidPolicy->setTop(UID(1));
     // Submit real-time job to CLIENT(1) in UID(1), should pause previous job and start new job.
     mScheduler->submit(CLIENT(1), JOB(0), UID(1), mRealtimeRequest, mClientCallback1);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(1)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(1), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(1), JOB(0)));
 
     // Simulate Fail that arrived late, after pause issued by scheduler.
     // Should still be propagated to client, but shouldn't trigger any new start.
     mScheduler->onError(CLIENT(0), JOB(1), TranscodingErrorCode::kUnknown);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Failed(CLIENT(0), JOB(1)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Failed(CLIENT(0), JOB(1)));
 
     // Fail running real-time job, should start next real-time job in queue.
     mScheduler->onError(CLIENT(1), JOB(0), TranscodingErrorCode::kUnknown);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Failed(CLIENT(1), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(2)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Failed(CLIENT(1), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(2)));
 
     // Fail running real-time job, should resume next job (offline job) in queue.
     mScheduler->onError(CLIENT(0), JOB(2), TranscodingErrorCode::kUnknown);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Failed(CLIENT(0), JOB(2)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Resume(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Failed(CLIENT(0), JOB(2)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), JOB(0)));
 
     // Fail running offline job, and test error code propagation.
     mScheduler->onError(CLIENT(0), JOB(0), TranscodingErrorCode::kInvalidBitstream);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Failed(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->getLastError(), TranscodingErrorCode::kInvalidBitstream);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Failed(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->getLastError(), TranscodingErrorCode::kInvalidBitstream);
 
     // Duplicate fail for last job, should be ignored.
     mScheduler->onError(CLIENT(0), JOB(0), TranscodingErrorCode::kUnknown);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 }
 
 TEST_F(TranscodingJobSchedulerTest, TestTopUidChanged) {
     ALOGD("TestTopUidChanged");
 
+    // Start with unspecified top UID.
     // Submit real-time job to CLIENT(0), job should start immediately.
     mScheduler->submit(CLIENT(0), JOB(0), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(0)));
 
     // Submit offline job to CLIENT(0), should not start.
     mScheduler->submit(CLIENT(1), JOB(0), UID(0), mOfflineRequest, mClientCallback1);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Move UID(1) to top.
-    mCallback->setTop(UID(1));
+    mUidPolicy->setTop(UID(1));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
+
     // Submit real-time job to CLIENT(2) in different uid UID(1).
     // Should pause previous job and start new job.
     mScheduler->submit(CLIENT(2), JOB(0), UID(1), mRealtimeRequest, mClientCallback2);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(2), JOB(0)));
 
     // Bring UID(0) back to top.
-    mCallback->setTop(UID(0));
-    mScheduler->onTopUidChanged(UID(0));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(2), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Resume(CLIENT(0), JOB(0)));
+    mUidPolicy->setTop(UID(0));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), JOB(0)));
 
     // Bring invalid uid to top.
-    mScheduler->onTopUidChanged(kInvalidUid);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    mUidPolicy->setTop(kInvalidUid);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Finish job, next real-time job should resume.
     mScheduler->onFinish(CLIENT(0), JOB(0));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Finished(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Resume(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(2), JOB(0)));
 
     // Finish job, offline job should start.
     mScheduler->onFinish(CLIENT(2), JOB(0));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Finished(CLIENT(2), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(1), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(1), JOB(0)));
+}
+
+TEST_F(TranscodingJobSchedulerTest, TestTopUidSetChanged) {
+    ALOGD("TestTopUidChanged_MultipleUids");
+
+    // Start with unspecified top UID.
+    // Submit real-time job to CLIENT(0), job should start immediately.
+    mScheduler->submit(CLIENT(0), JOB(0), UID(0), mRealtimeRequest, mClientCallback0);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(0)));
+
+    // Submit offline job to CLIENT(0), should not start.
+    mScheduler->submit(CLIENT(1), JOB(0), UID(0), mOfflineRequest, mClientCallback1);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
+
+    // Set UID(0), UID(1) to top set.
+    // UID(0) should continue to run.
+    mUidPolicy->setTop({UID(0), UID(1)});
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
+
+    // Submit real-time job to CLIENT(2) in different uid UID(1).
+    // UID(0) should pause and UID(1) should start.
+    mScheduler->submit(CLIENT(2), JOB(0), UID(1), mRealtimeRequest, mClientCallback2);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(2), JOB(0)));
+
+    // Remove UID(0) from top set, and only leave UID(1) in the set.
+    // UID(1) should continue to run.
+    mUidPolicy->setTop(UID(1));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
+
+    // Set UID(0), UID(2) to top set.
+    // UID(1) should continue to run.
+    mUidPolicy->setTop({UID(1), UID(2)});
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
+
+    // Bring UID(0) back to top.
+    mUidPolicy->setTop(UID(0));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), JOB(0)));
+
+    // Bring invalid uid to top.
+    mUidPolicy->setTop(kInvalidUid);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
+
+    // Finish job, next real-time job from UID(1) should resume, even if UID(1) no longer top.
+    mScheduler->onFinish(CLIENT(0), JOB(0));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(2), JOB(0)));
+
+    // Finish job, offline job should start.
+    mScheduler->onFinish(CLIENT(2), JOB(0));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Finished(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(1), JOB(0)));
 }
 
 TEST_F(TranscodingJobSchedulerTest, TestResourceLost) {
     ALOGD("TestResourceLost");
 
+    // Start with unspecified top UID.
     // Submit real-time job to CLIENT(0), job should start immediately.
     mScheduler->submit(CLIENT(0), JOB(0), UID(0), mRealtimeRequest, mClientCallback0);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(0), JOB(0)));
 
     // Submit offline job to CLIENT(0), should not start.
     mScheduler->submit(CLIENT(1), JOB(0), UID(0), mOfflineRequest, mClientCallback1);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Move UID(1) to top.
-    mCallback->setTop(UID(1));
+    mUidPolicy->setTop(UID(1));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Submit real-time job to CLIENT(2) in different uid UID(1).
     // Should pause previous job and start new job.
     mScheduler->submit(CLIENT(2), JOB(0), UID(1), mRealtimeRequest, mClientCallback2);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Pause(CLIENT(0), JOB(0)));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Pause(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(2), JOB(0)));
 
     // Test 1: No queue change during resource loss.
     // Signal resource lost.
     mScheduler->onResourceLost();
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Signal resource available, CLIENT(2) should resume.
     mScheduler->onResourceAvailable();
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Resume(CLIENT(2), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(2), JOB(0)));
 
     // Test 2: Change of queue order during resource loss.
     // Signal resource lost.
     mScheduler->onResourceLost();
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Move UID(0) back to top, should have no resume due to no resource.
-    mScheduler->onTopUidChanged(UID(0));
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    mUidPolicy->setTop(UID(0));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Signal resource available, CLIENT(0) should resume.
     mScheduler->onResourceAvailable();
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Resume(CLIENT(0), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Resume(CLIENT(0), JOB(0)));
 
     // Test 3: Adding new queue during resource loss.
     // Signal resource lost.
     mScheduler->onResourceLost();
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Move UID(2) to top.
-    mCallback->setTop(UID(2));
+    mUidPolicy->setTop(UID(2));
 
     // Submit real-time job to CLIENT(3) in UID(2), job shouldn't start due to no resource.
     mScheduler->submit(CLIENT(3), JOB(0), UID(2), mRealtimeRequest, mClientCallback3);
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::NoEvent);
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::NoEvent);
 
     // Signal resource available, CLIENT(3)'s job should start.
     mScheduler->onResourceAvailable();
-    EXPECT_EQ(mCallback->popEvent(), TestCallback::Start(CLIENT(3), JOB(0)));
+    EXPECT_EQ(mTranscoder->popEvent(), TestTranscoder::Start(CLIENT(3), JOB(0)));
 }
 
 }  // namespace android
diff --git a/services/mediatranscoding/MediaTranscodingService.cpp b/services/mediatranscoding/MediaTranscodingService.cpp
index a13bec0..6c10e3e 100644
--- a/services/mediatranscoding/MediaTranscodingService.cpp
+++ b/services/mediatranscoding/MediaTranscodingService.cpp
@@ -22,6 +22,7 @@
 #include <android/binder_process.h>
 #include <media/TranscodingClientManager.h>
 #include <media/TranscodingJobScheduler.h>
+#include <media/TranscodingUidPolicy.h>
 #include <private/android_filesystem_config.h>
 #include <utils/Log.h>
 #include <utils/Vector.h>
@@ -48,10 +49,9 @@
     }
 }
 
-// DummyTranscoder and DummyUidPolicy are currently used to instantiate
-// MediaTranscodingService on service side for testing, so that we could
-// actually test the IPC calls of MediaTranscodingService to expose some
-// issues that's observable only over IPC.
+// DummyTranscoder is currently used to instantiate MediaTranscodingService on
+// service side for testing, so that we could actually test the IPC calls of
+// MediaTranscodingService to expose some issues that's observable only over IPC.
 class DummyTranscoder : public TranscoderInterface {
     void start(int64_t clientId, int32_t jobId) override {
         (void)clientId;
@@ -67,16 +67,9 @@
     }
 };
 
-class DummyUidPolicy : public UidPolicyInterface {
-    bool isUidOnTop(uid_t uid) override {
-        (void)uid;
-        return true;
-    }
-};
-
 MediaTranscodingService::MediaTranscodingService()
       : MediaTranscodingService(std::make_shared<DummyTranscoder>(),
-                                std::make_shared<DummyUidPolicy>()) {}
+                                std::make_shared<TranscodingUidPolicy>()) {}
 
 MediaTranscodingService::MediaTranscodingService(
         const std::shared_ptr<TranscoderInterface>& transcoder,
@@ -84,6 +77,7 @@
       : mJobScheduler(new TranscodingJobScheduler(transcoder, uidPolicy)),
         mClientManager(new TranscodingClientManager(mJobScheduler)) {
     ALOGV("MediaTranscodingService is created");
+    uidPolicy->setCallback(mJobScheduler);
 }
 
 MediaTranscodingService::~MediaTranscodingService() {