Transcoder: Add MediaSampleQueue and unit tests.

MediaSampleQueue asynchronously connects producers and consumers of compressed media samples.
MediaSampleQueue will be used to hold output samples from each TrackTranscoder.
MediaSampleQueue will be extended to optionally limit the number of (or cost of) samples it can
hold before blocking the producer.

Test: MediaSampleQueueTests
Bug: 152091443
Change-Id: I8e78ea3c3848934078126220adaa91fb033b2088
diff --git a/media/libmediatranscoding/transcoder/Android.bp b/media/libmediatranscoding/transcoder/Android.bp
index 440a454..e352245 100644
--- a/media/libmediatranscoding/transcoder/Android.bp
+++ b/media/libmediatranscoding/transcoder/Android.bp
@@ -18,6 +18,7 @@
     name: "libmediatranscoder",
 
     srcs: [
+        "MediaSampleQueue.cpp",
         "MediaSampleReaderNDK.cpp",
     ],
 
diff --git a/media/libmediatranscoding/transcoder/MediaSampleQueue.cpp b/media/libmediatranscoding/transcoder/MediaSampleQueue.cpp
new file mode 100644
index 0000000..691ee1c
--- /dev/null
+++ b/media/libmediatranscoding/transcoder/MediaSampleQueue.cpp
@@ -0,0 +1,58 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// #define LOG_NDEBUG 0
+#define LOG_TAG "MediaSampleQueue"
+
+#include <android-base/logging.h>
+#include <media/MediaSampleQueue.h>
+
+namespace android {
+
+bool MediaSampleQueue::enqueue(const std::shared_ptr<MediaSample>& sample) {
+    std::scoped_lock<std::mutex> lock(mMutex);
+    if (!mAborted) {
+        mSampleQueue.push(sample);
+        mCondition.notify_one();
+    }
+    return mAborted;
+}
+
+// Unfortunately std::unique_lock is incompatible with -Wthread-safety
+bool MediaSampleQueue::dequeue(std::shared_ptr<MediaSample>* sample) NO_THREAD_SAFETY_ANALYSIS {
+    std::unique_lock<std::mutex> lock(mMutex);
+    while (mSampleQueue.empty() && !mAborted) {
+        mCondition.wait(lock);
+    }
+
+    if (!mAborted) {
+        if (sample != nullptr) {
+            *sample = mSampleQueue.front();
+        }
+        mSampleQueue.pop();
+    }
+    return mAborted;
+}
+
+void MediaSampleQueue::abort() {
+    std::scoped_lock<std::mutex> lock(mMutex);
+    // Clear the queue and notify consumers.
+    std::queue<std::shared_ptr<MediaSample>> empty = {};
+    std::swap(mSampleQueue, empty);
+    mAborted = true;
+    mCondition.notify_all();
+}
+}  // namespace android
\ No newline at end of file
diff --git a/media/libmediatranscoding/transcoder/include/media/MediaSample.h b/media/libmediatranscoding/transcoder/include/media/MediaSample.h
index 2b9be9f..e206a3e 100644
--- a/media/libmediatranscoding/transcoder/include/media/MediaSample.h
+++ b/media/libmediatranscoding/transcoder/include/media/MediaSample.h
@@ -18,6 +18,8 @@
 #define ANDROID_MEDIA_SAMPLE_H
 
 #include <cstdint>
+#include <functional>
+#include <memory>
 
 namespace android {
 
@@ -71,9 +73,37 @@
  */
 struct MediaSample {
     /**
-     * Byte buffer containing the sample's compressed data.
-     * The memory backing this buffer is not managed by the MediaSample object so a separate
-     * mechanism to release a buffer is needed between a producer and a consumer.
+     * Callback to notify that a media sample is about to be released, giving the creator a chance
+     * to reclaim the data buffer backing the sample. Once this callback returns, the media sample
+     * instance *will* be released so it cannot be used outside of the callback. To enable the
+     * callback, create the media sample with {@link #createWithReleaseCallback}.
+     * @param sample The sample to be released.
+     */
+    using OnSampleReleasedCallback = std::function<void(MediaSample* sample)>;
+
+    /**
+     * Creates a new media sample instance with a registered release callback. The release callback
+     * will get called right before the media sample is released giving the creator a chance to
+     * reclaim the buffer.
+     * @param buffer Byte buffer containing the sample's compressed data.
+     * @param dataOffset Offset, in bytes, to the sample's compressed data inside the buffer.
+     * @param bufferId Buffer identifier that can be used to identify the buffer on release.
+     * @param releaseCallback The sample release callback.
+     * @return A new media sample instance.
+     */
+    static std::shared_ptr<MediaSample> createWithReleaseCallback(
+            uint8_t* buffer, size_t dataOffset, uint32_t bufferId,
+            OnSampleReleasedCallback releaseCallback) {
+        MediaSample* sample = new MediaSample(buffer, dataOffset, bufferId, releaseCallback);
+        return std::shared_ptr<MediaSample>(
+                sample, std::bind(&MediaSample::releaseSample, std::placeholders::_1));
+    }
+
+    /**
+     * Byte buffer containing the sample's compressed data. The media sample instance does not take
+     * ownership of the buffer and will not automatically release the memory, but the caller can
+     * register a release callback by creating the media sample with
+     * {@link #createWithReleaseCallback}.
      */
     const uint8_t* buffer = nullptr;
 
@@ -88,6 +118,28 @@
 
     /** Media sample information. */
     MediaSampleInfo info;
+
+private:
+    MediaSample(uint8_t* buffer, size_t dataOffset, uint32_t bufferId,
+                OnSampleReleasedCallback releaseCallback)
+          : buffer(buffer),
+            dataOffset(dataOffset),
+            bufferId(bufferId),
+            mReleaseCallback(releaseCallback){};
+
+    static void releaseSample(MediaSample* sample) {
+        if (sample->mReleaseCallback != nullptr) {
+            sample->mReleaseCallback(sample);
+        }
+        delete sample;
+    }
+
+    // Do not allow copying to prevent dangling pointers in the copied object after the original is
+    // released.
+    MediaSample(const MediaSample&) = delete;
+    MediaSample& operator=(const MediaSample&) = delete;
+
+    const OnSampleReleasedCallback mReleaseCallback = nullptr;
 };
 
 }  // namespace android
diff --git a/media/libmediatranscoding/transcoder/include/media/MediaSampleQueue.h b/media/libmediatranscoding/transcoder/include/media/MediaSampleQueue.h
new file mode 100644
index 0000000..dc22423
--- /dev/null
+++ b/media/libmediatranscoding/transcoder/include/media/MediaSampleQueue.h
@@ -0,0 +1,66 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+#ifndef ANDROID_MEDIA_SAMPLE_QUEUE_H
+#define ANDROID_MEDIA_SAMPLE_QUEUE_H
+
+#include <media/MediaSample.h>
+#include <utils/Mutex.h>
+
+#include <memory>
+#include <mutex>
+#include <queue>
+
+namespace android {
+
+/**
+ * MediaSampleQueue asynchronously connects a producer and a consumer of media samples.
+ * Media samples flows through the queue in FIFO order. If the queue is empty the consumer will be
+ * blocked until a new media sample is added or until the producer aborts the queue operation.
+ */
+class MediaSampleQueue {
+public:
+    /**
+     * Enqueues a media sample at the end of the queue and notifies potentially waiting consumers.
+     * If the queue has previously been aborted this method does nothing.
+     * @param sample The media sample to enqueue.
+     * @return True if the queue has been aborted.
+     */
+    bool enqueue(const std::shared_ptr<MediaSample>& sample);
+
+    /**
+     * Removes the next media sample from the queue and returns it. If the queue has previously been
+     * aborted this method returns null. Note that this method will block while the queue is empty.
+     * @param[out] sample The next media sample in the queue.
+     * @return True if the queue has been aborted.
+     */
+    bool dequeue(std::shared_ptr<MediaSample>* sample /* nonnull */);
+
+    /**
+     * Aborts the queue operation. This clears the queue and notifies waiting consumers. After the
+     * has been aborted it is not possible to enqueue more samples, and dequeue will return null.
+     */
+    void abort();
+
+private:
+    std::queue<std::shared_ptr<MediaSample>> mSampleQueue GUARDED_BY(mMutex);
+    std::mutex mMutex;
+    std::condition_variable mCondition;
+    bool mAborted GUARDED_BY(mMutex) = false;
+};
+
+}  // namespace android
+#endif  // ANDROID_MEDIA_SAMPLE_QUEUE_H
diff --git a/media/libmediatranscoding/transcoder/include/media/MediaSampleReaderNDK.h b/media/libmediatranscoding/transcoder/include/media/MediaSampleReaderNDK.h
index 668be9b..2dc9029 100644
--- a/media/libmediatranscoding/transcoder/include/media/MediaSampleReaderNDK.h
+++ b/media/libmediatranscoding/transcoder/include/media/MediaSampleReaderNDK.h
@@ -20,6 +20,7 @@
 #include <media/MediaSampleReader.h>
 #include <media/NdkMediaExtractor.h>
 
+#include <memory>
 #include <mutex>
 #include <vector>
 
diff --git a/media/libmediatranscoding/transcoder/tests/Android.bp b/media/libmediatranscoding/transcoder/tests/Android.bp
index 99b3245..a9937d7 100644
--- a/media/libmediatranscoding/transcoder/tests/Android.bp
+++ b/media/libmediatranscoding/transcoder/tests/Android.bp
@@ -37,3 +37,10 @@
     defaults: ["testdefaults"],
     srcs: ["MediaSampleReaderNDKTests.cpp"],
 }
+
+// MediaSampleQueue unit test
+cc_test {
+    name: "MediaSampleQueueTests",
+    defaults: ["testdefaults"],
+    srcs: ["MediaSampleQueueTests.cpp"],
+}
diff --git a/media/libmediatranscoding/transcoder/tests/MediaSampleQueueTests.cpp b/media/libmediatranscoding/transcoder/tests/MediaSampleQueueTests.cpp
new file mode 100644
index 0000000..2046ca0
--- /dev/null
+++ b/media/libmediatranscoding/transcoder/tests/MediaSampleQueueTests.cpp
@@ -0,0 +1,229 @@
+/*
+ * Copyright (C) 2020 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Unit Test for MediaSampleQueue
+
+// #define LOG_NDEBUG 0
+#define LOG_TAG "MediaSampleQueueTests"
+
+#include <android-base/logging.h>
+#include <gtest/gtest.h>
+#include <media/MediaSampleQueue.h>
+
+#include <thread>
+
+namespace android {
+
+/** Duration to use when delaying threads to order operations. */
+static constexpr int64_t kThreadDelayDurationMs = 100;
+
+class MediaSampleQueueTests : public ::testing::Test {
+public:
+    MediaSampleQueueTests() { LOG(DEBUG) << "MediaSampleQueueTests created"; }
+    ~MediaSampleQueueTests() { LOG(DEBUG) << "MediaSampleQueueTests destroyed"; }
+};
+
+static std::shared_ptr<MediaSample> newSample(uint32_t id) {
+    return MediaSample::createWithReleaseCallback(nullptr /* buffer */, 0 /* offset */, id,
+                                                  nullptr /* callback */);
+}
+
+TEST_F(MediaSampleQueueTests, TestSequentialDequeueOrder) {
+    LOG(DEBUG) << "TestSequentialDequeueOrder Starts";
+
+    static constexpr int kNumSamples = 4;
+    MediaSampleQueue sampleQueue;
+
+    // Enqueue loop.
+    for (int i = 0; i < kNumSamples; ++i) {
+        sampleQueue.enqueue(newSample(i));
+    }
+
+    // Dequeue loop.
+    for (int i = 0; i < kNumSamples; ++i) {
+        std::shared_ptr<MediaSample> sample;
+        bool aborted = sampleQueue.dequeue(&sample);
+        EXPECT_NE(sample, nullptr);
+        EXPECT_EQ(sample->bufferId, i);
+        EXPECT_FALSE(aborted);
+    }
+}
+
+TEST_F(MediaSampleQueueTests, TestInterleavedDequeueOrder) {
+    LOG(DEBUG) << "TestInterleavedDequeueOrder Starts";
+
+    static constexpr int kNumSamples = 4;
+    MediaSampleQueue sampleQueue;
+
+    // Enqueue and dequeue.
+    for (int i = 0; i < kNumSamples; ++i) {
+        sampleQueue.enqueue(newSample(i));
+
+        std::shared_ptr<MediaSample> sample;
+        bool aborted = sampleQueue.dequeue(&sample);
+        EXPECT_NE(sample, nullptr);
+        EXPECT_EQ(sample->bufferId, i);
+        EXPECT_FALSE(aborted);
+    }
+}
+
+TEST_F(MediaSampleQueueTests, TestBlockingDequeue) {
+    LOG(DEBUG) << "TestBlockingDequeue Starts";
+
+    MediaSampleQueue sampleQueue;
+
+    std::thread enqueueThread([&sampleQueue] {
+        // Note: This implementation is a bit racy. Any amount of sleep will not guarantee that the
+        // main thread will be blocked on the sample queue by the time this thread calls enqueue.
+        // But we can say with high confidence that it will and the test will not fail regardless.
+        std::this_thread::sleep_for(std::chrono::milliseconds(kThreadDelayDurationMs));
+        sampleQueue.enqueue(newSample(1));
+    });
+
+    std::shared_ptr<MediaSample> sample;
+    bool aborted = sampleQueue.dequeue(&sample);
+    EXPECT_NE(sample, nullptr);
+    EXPECT_EQ(sample->bufferId, 1);
+    EXPECT_FALSE(aborted);
+
+    enqueueThread.join();
+}
+
+TEST_F(MediaSampleQueueTests, TestDequeueBufferRelease) {
+    LOG(DEBUG) << "TestDequeueBufferRelease Starts";
+
+    static constexpr int kNumSamples = 4;
+    std::vector<bool> bufferReleased(kNumSamples, false);
+
+    MediaSample::OnSampleReleasedCallback callback = [&bufferReleased](MediaSample* sample) {
+        bufferReleased[sample->bufferId] = true;
+    };
+
+    MediaSampleQueue sampleQueue;
+    for (int i = 0; i < kNumSamples; ++i) {
+        bool aborted = sampleQueue.enqueue(
+                MediaSample::createWithReleaseCallback(nullptr, 0, i, callback));
+        EXPECT_FALSE(aborted);
+    }
+
+    for (int i = 0; i < kNumSamples; ++i) {
+        EXPECT_FALSE(bufferReleased[i]);
+    }
+
+    for (int i = 0; i < kNumSamples; ++i) {
+        {
+            std::shared_ptr<MediaSample> sample;
+            bool aborted = sampleQueue.dequeue(&sample);
+            EXPECT_NE(sample, nullptr);
+            EXPECT_EQ(sample->bufferId, i);
+            EXPECT_FALSE(bufferReleased[i]);
+            EXPECT_FALSE(aborted);
+        }
+
+        for (int j = 0; j < kNumSamples; ++j) {
+            EXPECT_EQ(bufferReleased[j], j <= i);
+        }
+    }
+}
+
+TEST_F(MediaSampleQueueTests, TestAbortBufferRelease) {
+    LOG(DEBUG) << "TestAbortBufferRelease Starts";
+
+    static constexpr int kNumSamples = 4;
+    std::vector<bool> bufferReleased(kNumSamples, false);
+
+    MediaSample::OnSampleReleasedCallback callback = [&bufferReleased](MediaSample* sample) {
+        bufferReleased[sample->bufferId] = true;
+    };
+
+    MediaSampleQueue sampleQueue;
+    for (int i = 0; i < kNumSamples; ++i) {
+        bool aborted = sampleQueue.enqueue(
+                MediaSample::createWithReleaseCallback(nullptr, 0, i, callback));
+        EXPECT_FALSE(aborted);
+    }
+
+    for (int i = 0; i < kNumSamples; ++i) {
+        EXPECT_FALSE(bufferReleased[i]);
+    }
+
+    sampleQueue.abort();
+
+    for (int i = 0; i < kNumSamples; ++i) {
+        EXPECT_TRUE(bufferReleased[i]);
+    }
+}
+
+TEST_F(MediaSampleQueueTests, TestNonEmptyAbort) {
+    LOG(DEBUG) << "TestNonEmptyAbort Starts";
+
+    MediaSampleQueue sampleQueue;
+    bool aborted = sampleQueue.enqueue(newSample(1));
+    EXPECT_FALSE(aborted);
+
+    sampleQueue.abort();
+
+    std::shared_ptr<MediaSample> sample;
+    aborted = sampleQueue.dequeue(&sample);
+    EXPECT_TRUE(aborted);
+    EXPECT_EQ(sample, nullptr);
+
+    aborted = sampleQueue.enqueue(sample);
+    EXPECT_TRUE(aborted);
+}
+
+TEST_F(MediaSampleQueueTests, TestEmptyAbort) {
+    LOG(DEBUG) << "TestEmptyAbort Starts";
+
+    MediaSampleQueue sampleQueue;
+    sampleQueue.abort();
+
+    std::shared_ptr<MediaSample> sample;
+    bool aborted = sampleQueue.dequeue(&sample);
+    EXPECT_TRUE(aborted);
+    EXPECT_EQ(sample, nullptr);
+
+    aborted = sampleQueue.enqueue(sample);
+    EXPECT_TRUE(aborted);
+}
+
+TEST_F(MediaSampleQueueTests, TestBlockingAbort) {
+    LOG(DEBUG) << "TestBlockingAbort Starts";
+
+    MediaSampleQueue sampleQueue;
+
+    std::thread abortingThread([&sampleQueue] {
+        // Note: This implementation is a bit racy. Any amount of sleep will not guarantee that the
+        // main thread will be blocked on the sample queue by the time this thread calls abort.
+        // But we can say with high confidence that it will and the test will not fail regardless.
+        std::this_thread::sleep_for(std::chrono::milliseconds(kThreadDelayDurationMs));
+        sampleQueue.abort();
+    });
+
+    std::shared_ptr<MediaSample> sample;
+    bool aborted = sampleQueue.dequeue(&sample);
+    EXPECT_TRUE(aborted);
+    EXPECT_EQ(sample, nullptr);
+
+    abortingThread.join();
+}
+
+}  // namespace android
+
+int main(int argc, char** argv) {
+    ::testing::InitGoogleTest(&argc, argv);
+    return RUN_ALL_TESTS();
+}
diff --git a/media/libmediatranscoding/transcoder/tests/build_and_run_all_unit_tests.sh b/media/libmediatranscoding/transcoder/tests/build_and_run_all_unit_tests.sh
index 2d9dccf..dbee604 100755
--- a/media/libmediatranscoding/transcoder/tests/build_and_run_all_unit_tests.sh
+++ b/media/libmediatranscoding/transcoder/tests/build_and_run_all_unit_tests.sh
@@ -24,3 +24,6 @@
 
 echo "testing MediaSampleReaderNDK"
 adb shell /data/nativetest64/MediaSampleReaderNDKTests/MediaSampleReaderNDKTests
+
+echo "testing MediaSampleQueue"
+adb shell /data/nativetest64/MediaSampleQueueTests/MediaSampleQueueTests