transcoding: use actual callback in SimulatedTranscoder

bug: 154734285
test: unit testing

Change-Id: I39f17e95e9cc9ea73a60828b00a3c79ccf11dbf2
diff --git a/media/libmediatranscoding/TranscodingJobScheduler.cpp b/media/libmediatranscoding/TranscodingJobScheduler.cpp
index e6dc7ad..d5ae2a3 100644
--- a/media/libmediatranscoding/TranscodingJobScheduler.cpp
+++ b/media/libmediatranscoding/TranscodingJobScheduler.cpp
@@ -291,15 +291,16 @@
     return true;
 }
 
-void TranscodingJobScheduler::onFinish(ClientIdType clientId, JobIdType jobId) {
+void TranscodingJobScheduler::notifyClient(ClientIdType clientId, JobIdType jobId,
+                                           const char* reason,
+                                           std::function<void(const JobKeyType&)> func) {
     JobKeyType jobKey = std::make_pair(clientId, jobId);
 
-    ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str());
-
     std::scoped_lock lock{mLock};
 
     if (mJobMap.count(jobKey) == 0) {
-        ALOGW("ignoring finish for non-existent job");
+        ALOGW("%s: ignoring %s for job %s that doesn't exist", __FUNCTION__, reason,
+              jobToString(jobKey).c_str());
         return;
     }
 
@@ -307,90 +308,89 @@
     // to client if the job is paused. Transcoder could have posted finish when
     // we're pausing it, and the finish arrived after we changed current job.
     if (mJobMap[jobKey].state == Job::NOT_STARTED) {
-        ALOGW("ignoring finish for job that was never started");
+        ALOGW("%s: ignoring %s for job %s that was never started", __FUNCTION__, reason,
+              jobToString(jobKey).c_str());
         return;
     }
 
-    {
-        auto clientCallback = mJobMap[jobKey].callback.lock();
-        if (clientCallback != nullptr) {
-            clientCallback->onTranscodingFinished(jobId, TranscodingResultParcel({jobId, 0}));
+    ALOGV("%s: job %s %s", __FUNCTION__, jobToString(jobKey).c_str(), reason);
+    func(jobKey);
+}
+
+void TranscodingJobScheduler::onStarted(ClientIdType clientId, JobIdType jobId) {
+    notifyClient(clientId, jobId, "started", [=](const JobKeyType& jobKey) {
+        auto callback = mJobMap[jobKey].callback.lock();
+        if (callback != nullptr) {
+            callback->onTranscodingStarted(jobId);
         }
-    }
+    });
+}
 
-    // Remove the job.
-    removeJob_l(jobKey);
+void TranscodingJobScheduler::onPaused(ClientIdType clientId, JobIdType jobId) {
+    notifyClient(clientId, jobId, "paused", [=](const JobKeyType& jobKey) {
+        auto callback = mJobMap[jobKey].callback.lock();
+        if (callback != nullptr) {
+            callback->onTranscodingPaused(jobId);
+        }
+    });
+}
 
-    // Start next job.
-    updateCurrentJob_l();
+void TranscodingJobScheduler::onResumed(ClientIdType clientId, JobIdType jobId) {
+    notifyClient(clientId, jobId, "resumed", [=](const JobKeyType& jobKey) {
+        auto callback = mJobMap[jobKey].callback.lock();
+        if (callback != nullptr) {
+            callback->onTranscodingResumed(jobId);
+        }
+    });
+}
 
-    validateState_l();
+void TranscodingJobScheduler::onFinish(ClientIdType clientId, JobIdType jobId) {
+    notifyClient(clientId, jobId, "finish", [=](const JobKeyType& jobKey) {
+        {
+            auto clientCallback = mJobMap[jobKey].callback.lock();
+            if (clientCallback != nullptr) {
+                clientCallback->onTranscodingFinished(jobId, TranscodingResultParcel({jobId, -1 /*actualBitrateBps*/}));
+            }
+        }
+
+        // Remove the job.
+        removeJob_l(jobKey);
+
+        // Start next job.
+        updateCurrentJob_l();
+
+        validateState_l();
+    });
 }
 
 void TranscodingJobScheduler::onError(ClientIdType clientId, JobIdType jobId,
                                       TranscodingErrorCode err) {
-    JobKeyType jobKey = std::make_pair(clientId, jobId);
-
-    ALOGV("%s: job %s, err %d", __FUNCTION__, jobToString(jobKey).c_str(), (int32_t)err);
-
-    std::scoped_lock lock{mLock};
-
-    if (mJobMap.count(jobKey) == 0) {
-        ALOGW("ignoring error for non-existent job");
-        return;
-    }
-
-    // Only ignore if job was never started. In particular, propagate the status
-    // to client if the job is paused. Transcoder could have posted finish when
-    // we're pausing it, and the finish arrived after we changed current job.
-    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
-        ALOGW("ignoring error for job that was never started");
-        return;
-    }
-
-    {
-        auto clientCallback = mJobMap[jobKey].callback.lock();
-        if (clientCallback != nullptr) {
-            clientCallback->onTranscodingFailed(jobId, err);
+    notifyClient(clientId, jobId, "error", [=](const JobKeyType& jobKey) {
+        {
+            auto clientCallback = mJobMap[jobKey].callback.lock();
+            if (clientCallback != nullptr) {
+                clientCallback->onTranscodingFailed(jobId, err);
+            }
         }
-    }
 
-    // Remove the job.
-    removeJob_l(jobKey);
+        // Remove the job.
+        removeJob_l(jobKey);
 
-    // Start next job.
-    updateCurrentJob_l();
+        // Start next job.
+        updateCurrentJob_l();
 
-    validateState_l();
+        validateState_l();
+    });
 }
 
 void TranscodingJobScheduler::onProgressUpdate(ClientIdType clientId, JobIdType jobId,
                                                int32_t progress) {
-    JobKeyType jobKey = std::make_pair(clientId, jobId);
-
-    ALOGV("%s: job %s, progress %d", __FUNCTION__, jobToString(jobKey).c_str(), progress);
-
-    std::scoped_lock lock{mLock};
-
-    if (mJobMap.count(jobKey) == 0) {
-        ALOGW("ignoring progress for non-existent job");
-        return;
-    }
-
-    // Only ignore if job was never started. In particular, propagate the status
-    // to client if the job is paused. Transcoder could have posted finish when
-    // we're pausing it, and the finish arrived after we changed current job.
-    if (mJobMap[jobKey].state == Job::NOT_STARTED) {
-        ALOGW("ignoring progress for job that was never started");
-        return;
-    }
-
-    {
-        auto clientCallback = mJobMap[jobKey].callback.lock();
-        if (clientCallback != nullptr) {
-            clientCallback->onProgressUpdate(jobId, progress);
+    notifyClient(clientId, jobId, "progress", [=](const JobKeyType& jobKey) {
+        auto callback = mJobMap[jobKey].callback.lock();
+        if (callback != nullptr) {
+            callback->onProgressUpdate(jobId, progress);
         }
-    }
+    });
 }
 
 void TranscodingJobScheduler::onResourceLost() {
diff --git a/media/libmediatranscoding/include/media/TranscoderInterface.h b/media/libmediatranscoding/include/media/TranscoderInterface.h
index ef51f65..97b7126 100644
--- a/media/libmediatranscoding/include/media/TranscoderInterface.h
+++ b/media/libmediatranscoding/include/media/TranscoderInterface.h
@@ -48,6 +48,9 @@
 class TranscoderCallbackInterface {
 public:
     // TODO(chz): determine what parameters are needed here.
+    virtual void onStarted(ClientIdType clientId, JobIdType jobId) = 0;
+    virtual void onPaused(ClientIdType clientId, JobIdType jobId) = 0;
+    virtual void onResumed(ClientIdType clientId, JobIdType jobId) = 0;
     virtual void onFinish(ClientIdType clientId, JobIdType jobId) = 0;
     virtual void onError(ClientIdType clientId, JobIdType jobId, TranscodingErrorCode err) = 0;
     virtual void onProgressUpdate(ClientIdType clientId, JobIdType jobId, int32_t progress) = 0;
diff --git a/media/libmediatranscoding/include/media/TranscodingJobScheduler.h b/media/libmediatranscoding/include/media/TranscodingJobScheduler.h
index 63001c3..5ccadad 100644
--- a/media/libmediatranscoding/include/media/TranscodingJobScheduler.h
+++ b/media/libmediatranscoding/include/media/TranscodingJobScheduler.h
@@ -47,6 +47,9 @@
     // ~SchedulerClientInterface
 
     // TranscoderCallbackInterface
+    void onStarted(ClientIdType clientId, JobIdType jobId) override;
+    void onPaused(ClientIdType clientId, JobIdType jobId) override;
+    void onResumed(ClientIdType clientId, JobIdType jobId) override;
     void onFinish(ClientIdType clientId, JobIdType jobId) override;
     void onError(ClientIdType clientId, JobIdType jobId, TranscodingErrorCode err) override;
     void onProgressUpdate(ClientIdType clientId, JobIdType jobId, int32_t progress) override;
@@ -105,7 +108,8 @@
     void updateCurrentJob_l();
     void removeJob_l(const JobKeyType& jobKey);
     void moveUidsToTop_l(const std::unordered_set<uid_t>& uids, bool preserveTopUid);
-
+    void notifyClient(ClientIdType clientId, JobIdType jobId, const char* reason,
+                      std::function<void(const JobKeyType&)> func);
     // Internal state verifier (debug only)
     void validateState_l();
 
diff --git a/services/mediatranscoding/Android.bp b/services/mediatranscoding/Android.bp
index 79e9fbc..498ce09 100644
--- a/services/mediatranscoding/Android.bp
+++ b/services/mediatranscoding/Android.bp
@@ -14,7 +14,7 @@
         "libmediatranscoding",
         "libutils",
     ],
-    
+
     export_shared_lib_headers: [
         "libmediatranscoding",
     ],
diff --git a/services/mediatranscoding/SimulatedTranscoder.cpp b/services/mediatranscoding/SimulatedTranscoder.cpp
index 0a77fbe..380d757 100644
--- a/services/mediatranscoding/SimulatedTranscoder.cpp
+++ b/services/mediatranscoding/SimulatedTranscoder.cpp
@@ -54,27 +54,43 @@
     }
     ALOGV("%s: job {%d}: processingTime: %lld", __FUNCTION__, jobId,
           (long long)mJobProcessingTimeMs);
-    queueEvent(Event::Start, clientId, jobId);
+    queueEvent(Event::Start, clientId, jobId, [=] {
+        auto callback = mCallback.lock();
+        if (callback != nullptr) {
+            callback->onStarted(clientId, jobId);
+        }
+    });
 }
 
 void SimulatedTranscoder::pause(ClientIdType clientId, JobIdType jobId) {
-    queueEvent(Event::Pause, clientId, jobId);
+    queueEvent(Event::Pause, clientId, jobId, [=] {
+        auto callback = mCallback.lock();
+        if (callback != nullptr) {
+            callback->onPaused(clientId, jobId);
+        }
+    });
 }
 
 void SimulatedTranscoder::resume(ClientIdType clientId, JobIdType jobId) {
-    queueEvent(Event::Resume, clientId, jobId);
+    queueEvent(Event::Resume, clientId, jobId, [=] {
+        auto callback = mCallback.lock();
+        if (callback != nullptr) {
+            callback->onResumed(clientId, jobId);
+        }
+    });
 }
 
 void SimulatedTranscoder::stop(ClientIdType clientId, JobIdType jobId) {
-    queueEvent(Event::Stop, clientId, jobId);
+    queueEvent(Event::Stop, clientId, jobId, nullptr);
 }
 
-void SimulatedTranscoder::queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId) {
+void SimulatedTranscoder::queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId,
+                                     std::function<void()> runnable) {
     ALOGV("%s: job {%lld, %d}: %s", __FUNCTION__, (long long)clientId, jobId, toString(type));
 
     auto lock = std::scoped_lock(mLock);
 
-    mQueue.push_back({type, clientId, jobId});
+    mQueue.push_back({type, clientId, jobId, runnable});
     mCondition.notify_one();
 }
 
@@ -139,10 +155,9 @@
                 continue;
             }
 
-            auto callback = mCallback.lock();
-            if (callback != nullptr) {
+            if (event.runnable != nullptr) {
                 lock.unlock();
-                callback->onProgressUpdate(event.clientId, event.jobId, event.type);
+                event.runnable();
                 lock.lock();
             }
         }
diff --git a/services/mediatranscoding/SimulatedTranscoder.h b/services/mediatranscoding/SimulatedTranscoder.h
index 9054b4c..0eb74c8 100644
--- a/services/mediatranscoding/SimulatedTranscoder.h
+++ b/services/mediatranscoding/SimulatedTranscoder.h
@@ -42,6 +42,7 @@
         enum Type { NoEvent, Start, Pause, Resume, Stop, Finished, Failed } type;
         ClientIdType clientId;
         JobIdType jobId;
+        std::function<void()> runnable;
     };
 
     static constexpr int64_t kJobDurationUs = 1000000;
@@ -67,7 +68,8 @@
     int64_t mJobProcessingTimeMs = kJobDurationUs / 1000;
 
     static const char* toString(Event::Type type);
-    void queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId);
+    void queueEvent(Event::Type type, ClientIdType clientId, JobIdType jobId,
+                    std::function<void()> runnable);
     void threadLoop();
 };
 
diff --git a/services/mediatranscoding/tests/mediatranscodingservice_tests.cpp b/services/mediatranscoding/tests/mediatranscodingservice_tests.cpp
index dfe105d..3bc7b15 100644
--- a/services/mediatranscoding/tests/mediatranscodingservice_tests.cpp
+++ b/services/mediatranscoding/tests/mediatranscodingservice_tests.cpp
@@ -216,11 +216,20 @@
         return Status::ok();
     }
 
-    Status onTranscodingStarted(int32_t /*in_jobId*/) override { return Status::ok(); }
+    Status onTranscodingStarted(int32_t in_jobId) override {
+        append(Start(mClientId, in_jobId));
+        return Status::ok();
+    }
 
-    Status onTranscodingPaused(int32_t /*in_jobId*/) override { return Status::ok(); }
+    Status onTranscodingPaused(int32_t in_jobId) override {
+        append(Pause(mClientId, in_jobId));
+        return Status::ok();
+    }
 
-    Status onTranscodingResumed(int32_t /*in_jobId*/) override { return Status::ok(); }
+    Status onTranscodingResumed(int32_t in_jobId) override {
+        append(Resume(mClientId, in_jobId));
+        return Status::ok();
+    }
 
     Status onTranscodingFinished(
             int32_t in_jobId,
@@ -241,23 +250,7 @@
         return Status::ok();
     }
 
-    Status onProgressUpdate(int32_t in_jobId, int32_t in_progress) override {
-        // The progress numbers from the SimulatedTranscoder represents the
-        // event's type in the transcoder.
-        switch (in_progress) {
-        case SimulatedTranscoder::Event::Start:
-            append(EventTracker::Start(mClientId, in_jobId));
-            break;
-        case SimulatedTranscoder::Event::Pause:
-            append(EventTracker::Pause(mClientId, in_jobId));
-            break;
-        case SimulatedTranscoder::Event::Resume:
-            append(EventTracker::Resume(mClientId, in_jobId));
-            break;
-        default:
-            ALOGE("unrecognized progress number %d, ignored by test", in_progress);
-            break;
-        }
+    Status onProgressUpdate(int32_t /*in_jobId*/, int32_t /*in_progress*/) override {
         return Status::ok();
     }
 
@@ -393,8 +386,9 @@
         }
 
         return status.isOk() && (result == shouldSucceed) &&
-               (!shouldSucceed || (job.jobId == jobId && 
-                job.request.sourceFilePath == sourceFilePath && job.request.destinationFilePath == destinationFilePath));
+               (!shouldSucceed ||
+                (job.jobId == jobId && job.request.sourceFilePath == sourceFilePath &&
+                 job.request.destinationFilePath == destinationFilePath));
     }
 
     std::shared_ptr<IMediaTranscodingService> mService;
@@ -592,8 +586,10 @@
     registerMultipleClients();
 
     // Submit some offline jobs first.
-    EXPECT_TRUE(submit(mClient1, 0, "test_source_file_0", "test_destination_file_0", TranscodingJobPriority::kUnspecified));
-    EXPECT_TRUE(submit(mClient1, 1, "test_source_file_1", "test_destination_file_1", TranscodingJobPriority::kUnspecified));
+    EXPECT_TRUE(submit(mClient1, 0, "test_source_file_0", "test_destination_file_0",
+                       TranscodingJobPriority::kUnspecified));
+    EXPECT_TRUE(submit(mClient1, 1, "test_source_file_1", "test_destination_file_1",
+                       TranscodingJobPriority::kUnspecified));
 
     // Job 0 should start immediately.
     EXPECT_EQ(mClientCallback1->pop(kPaddingUs), EventTracker::Start(CLIENT(1), 0));