tunnel NuPlayer source and decoder input

Bug: 18342383

Change-Id: Ieff1cd3bad2b39d46f127ddd5d5139b919992461
diff --git a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.cpp b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.cpp
index d2721ed..3b4c0a7 100644
--- a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.cpp
+++ b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.cpp
@@ -28,76 +28,51 @@
 #include <media/stagefright/foundation/ABuffer.h>
 #include <media/stagefright/foundation/ADebug.h>
 #include <media/stagefright/foundation/AMessage.h>
-#include <media/stagefright/MediaDefs.h>
 #include <media/stagefright/MediaErrors.h>
 
+#include "ATSParser.h"
+
 namespace android {
 
+// TODO optimize buffer size for power consumption
+// The offload read buffer size is 32 KB but 24 KB uses less power.
+static const size_t kAggregateBufferSizeBytes = 24 * 1024;
 static const size_t kMaxCachedBytes = 200000;
-// The buffers will contain a bit less than kAggregateBufferSizeBytes.
-// So we can start off with just enough buffers to keep the cache full.
-static const size_t kMaxPendingBuffers = 1 + (kMaxCachedBytes / NuPlayer::kAggregateBufferSizeBytes);
 
 NuPlayer::DecoderPassThrough::DecoderPassThrough(
         const sp<AMessage> &notify,
         const sp<Source> &source,
         const sp<Renderer> &renderer)
-    : Decoder(notify, source),
-      mNotify(notify),
+    : mNotify(notify),
       mSource(source),
       mRenderer(renderer),
       mSkipRenderingUntilMediaTimeUs(-1ll),
       mBufferGeneration(0),
       mReachedEOS(true),
-      mPendingBuffersToFill(0),
+      mPendingAudioErr(OK),
       mPendingBuffersToDrain(0),
       mCachedBytes(0),
       mComponentName("pass through decoder") {
     ALOGW_IF(renderer == NULL, "expect a non-NULL renderer");
-    mDecoderLooper = new ALooper;
-    mDecoderLooper->setName("NuPlayerDecoderPassThrough");
-    mDecoderLooper->start(false, false, ANDROID_PRIORITY_AUDIO);
 }
 
 NuPlayer::DecoderPassThrough::~DecoderPassThrough() {
 }
 
-void NuPlayer::DecoderPassThrough::configure(const sp<AMessage> &format) {
-    sp<AMessage> msg = new AMessage(kWhatConfigure, id());
-    msg->setMessage("format", format);
-    msg->post();
-}
-
-void NuPlayer::DecoderPassThrough::init() {
-    mDecoderLooper->registerHandler(this);
-}
-
-void NuPlayer::DecoderPassThrough::signalFlush() {
-    (new AMessage(kWhatFlush, id()))->post();
-}
-
-void NuPlayer::DecoderPassThrough::signalResume() {
-    (new AMessage(kWhatResume, id()))->post();
-}
-
-void NuPlayer::DecoderPassThrough::initiateShutdown() {
-    (new AMessage(kWhatShutdown, id()))->post();
-}
-
-bool NuPlayer::DecoderPassThrough::supportsSeamlessFormatChange(
-        const sp<AMessage> & /* targetFormat */) const {
-    return true;
+void NuPlayer::DecoderPassThrough::getStats(
+        int64_t *numFramesTotal, int64_t *numFramesDropped) const {
+    *numFramesTotal = 0;
+    *numFramesDropped = 0;
 }
 
 void NuPlayer::DecoderPassThrough::onConfigure(const sp<AMessage> &format) {
     ALOGV("[%s] onConfigure", mComponentName.c_str());
     mCachedBytes = 0;
-    mPendingBuffersToFill = 0;
     mPendingBuffersToDrain = 0;
     mReachedEOS = false;
     ++mBufferGeneration;
 
-    requestMaxBuffers();
+    onRequestInputBuffers();
 
     uint32_t flags;
     int64_t durationUs;
@@ -112,47 +87,213 @@
             format, true /* offloadOnly */, false /* hasVideo */, flags);
 }
 
+void NuPlayer::DecoderPassThrough::onSetRenderer(
+        const sp<Renderer> &renderer) {
+    // renderer can't be changed during offloading
+    ALOGW_IF(renderer != mRenderer,
+            "ignoring request to change renderer");
+}
+
+void NuPlayer::DecoderPassThrough::onGetInputBuffers(
+        Vector<sp<ABuffer> > * /* dstBuffers */) {
+    ALOGE("onGetInputBuffers() called unexpectedly");
+}
+
 bool NuPlayer::DecoderPassThrough::isStaleReply(const sp<AMessage> &msg) {
     int32_t generation;
     CHECK(msg->findInt32("generation", &generation));
     return generation != mBufferGeneration;
 }
 
-bool NuPlayer::DecoderPassThrough::requestABuffer() {
-    if (mCachedBytes >= kMaxCachedBytes) {
-        ALOGV("[%s] mCachedBytes = %zu",
-                mComponentName.c_str(), mCachedBytes);
-        return false;
-    }
-    if (mReachedEOS) {
-        ALOGV("[%s] reached EOS", mComponentName.c_str());
-        return false;
-    }
+bool NuPlayer::DecoderPassThrough::isCacheFullOrEOS() const {
+    ALOGV("[%s] mCachedBytes = %zu, mReachedEOS = %d",
+            mComponentName.c_str(), mCachedBytes, mReachedEOS);
 
-    sp<AMessage> reply = new AMessage(kWhatInputBufferFilled, id());
-    reply->setInt32("generation", mBufferGeneration);
-
-    sp<AMessage> notify = mNotify->dup();
-    notify->setInt32("what", kWhatFillThisBuffer);
-    notify->setMessage("reply", reply);
-    notify->post();
-    mPendingBuffersToFill++;
-    ALOGV("requestABuffer: #ToFill = %zu, #ToDrain = %zu", mPendingBuffersToFill,
-            mPendingBuffersToDrain);
-
-    return true;
+    return mCachedBytes >= kMaxCachedBytes || mReachedEOS;
 }
 
-void android::NuPlayer::DecoderPassThrough::onInputBufferFilled(
+void NuPlayer::DecoderPassThrough::doRequestBuffers() {
+    status_t err = OK;
+    while (!isCacheFullOrEOS()) {
+        sp<AMessage> msg = new AMessage();
+
+        err = fetchInputData(msg);
+        if (err != OK) {
+            break;
+        }
+
+        onInputBufferFetched(msg);
+    }
+
+    if (err == -EWOULDBLOCK
+            && mSource->feedMoreTSData() == OK) {
+        scheduleRequestBuffers();
+    }
+}
+
+status_t NuPlayer::DecoderPassThrough::dequeueAccessUnit(sp<ABuffer> *accessUnit) {
+    status_t err;
+
+    // Did we save an accessUnit earlier because of a discontinuity?
+    if (mPendingAudioAccessUnit != NULL) {
+        *accessUnit = mPendingAudioAccessUnit;
+        mPendingAudioAccessUnit.clear();
+        err = mPendingAudioErr;
+        ALOGV("feedDecoderInputData() use mPendingAudioAccessUnit");
+    } else {
+        err = mSource->dequeueAccessUnit(true /* audio */, accessUnit);
+    }
+
+    if (err == INFO_DISCONTINUITY || err == ERROR_END_OF_STREAM) {
+        if (mAggregateBuffer != NULL) {
+            // We already have some data so save this for later.
+            mPendingAudioErr = err;
+            mPendingAudioAccessUnit = *accessUnit;
+            (*accessUnit).clear();
+            ALOGD("return aggregated buffer and save err(=%d) for later", err);
+            err = OK;
+        }
+    }
+
+    return err;
+}
+
+sp<ABuffer> NuPlayer::DecoderPassThrough::aggregateBuffer(
+        const sp<ABuffer> &accessUnit) {
+    sp<ABuffer> aggregate;
+
+    if (accessUnit == NULL) {
+        // accessUnit is saved to mPendingAudioAccessUnit
+        // return current mAggregateBuffer
+        aggregate = mAggregateBuffer;
+        mAggregateBuffer.clear();
+        return aggregate;
+    }
+
+    size_t smallSize = accessUnit->size();
+    if ((mAggregateBuffer == NULL)
+            // Don't bother if only room for a few small buffers.
+            && (smallSize < (kAggregateBufferSizeBytes / 3))) {
+        // Create a larger buffer for combining smaller buffers from the extractor.
+        mAggregateBuffer = new ABuffer(kAggregateBufferSizeBytes);
+        mAggregateBuffer->setRange(0, 0); // start empty
+    }
+
+    if (mAggregateBuffer != NULL) {
+        int64_t timeUs;
+        int64_t dummy;
+        bool smallTimestampValid = accessUnit->meta()->findInt64("timeUs", &timeUs);
+        bool bigTimestampValid = mAggregateBuffer->meta()->findInt64("timeUs", &dummy);
+        // Will the smaller buffer fit?
+        size_t bigSize = mAggregateBuffer->size();
+        size_t roomLeft = mAggregateBuffer->capacity() - bigSize;
+        // Should we save this small buffer for the next big buffer?
+        // If the first small buffer did not have a timestamp then save
+        // any buffer that does have a timestamp until the next big buffer.
+        if ((smallSize > roomLeft)
+            || (!bigTimestampValid && (bigSize > 0) && smallTimestampValid)) {
+            mPendingAudioErr = OK;
+            mPendingAudioAccessUnit = accessUnit;
+            aggregate = mAggregateBuffer;
+            mAggregateBuffer.clear();
+        } else {
+            // Grab time from first small buffer if available.
+            if ((bigSize == 0) && smallTimestampValid) {
+                mAggregateBuffer->meta()->setInt64("timeUs", timeUs);
+            }
+            // Append small buffer to the bigger buffer.
+            memcpy(mAggregateBuffer->base() + bigSize, accessUnit->data(), smallSize);
+            bigSize += smallSize;
+            mAggregateBuffer->setRange(0, bigSize);
+
+            ALOGV("feedDecoderInputData() smallSize = %zu, bigSize = %zu, capacity = %zu",
+                    smallSize, bigSize, mAggregateBuffer->capacity());
+        }
+    } else {
+        // decided not to aggregate
+        aggregate = accessUnit;
+    }
+
+    return aggregate;
+}
+
+status_t NuPlayer::DecoderPassThrough::fetchInputData(sp<AMessage> &reply) {
+    sp<ABuffer> accessUnit;
+
+    do {
+        status_t err = dequeueAccessUnit(&accessUnit);
+
+        if (err == -EWOULDBLOCK) {
+            return err;
+        } else if (err != OK) {
+            if (err == INFO_DISCONTINUITY) {
+                int32_t type;
+                CHECK(accessUnit->meta()->findInt32("discontinuity", &type));
+
+                bool formatChange =
+                        (type & ATSParser::DISCONTINUITY_AUDIO_FORMAT) != 0;
+
+                bool timeChange =
+                        (type & ATSParser::DISCONTINUITY_TIME) != 0;
+
+                ALOGI("audio discontinuity (formatChange=%d, time=%d)",
+                        formatChange, timeChange);
+
+                if (formatChange || timeChange) {
+                    sp<AMessage> msg = mNotify->dup();
+                    msg->setInt32("what", kWhatInputDiscontinuity);
+                    // will perform seamless format change,
+                    // only notify NuPlayer to scan sources
+                    msg->setInt32("formatChange", false);
+                    msg->post();
+                }
+
+                if (timeChange) {
+                    onFlush(false /* notifyComplete */);
+                    err = OK;
+                } else if (formatChange) {
+                    // do seamless format change
+                    err = OK;
+                } else {
+                    // This stream is unaffected by the discontinuity
+                    return -EWOULDBLOCK;
+                }
+            }
+
+            reply->setInt32("err", err);
+            return OK;
+        }
+
+        accessUnit = aggregateBuffer(accessUnit);
+    } while (accessUnit == NULL);
+
+#if 0
+    int64_t mediaTimeUs;
+    CHECK(accessUnit->meta()->findInt64("timeUs", &mediaTimeUs));
+    ALOGV("feeding audio input buffer at media time %.2f secs",
+         mediaTimeUs / 1E6);
+#endif
+
+    reply->setBuffer("buffer", accessUnit);
+
+    return OK;
+}
+
+void NuPlayer::DecoderPassThrough::onInputBufferFetched(
         const sp<AMessage> &msg) {
-    --mPendingBuffersToFill;
     if (mReachedEOS) {
         return;
     }
 
     sp<ABuffer> buffer;
-    msg->findBuffer("buffer", &buffer);
+    bool hasBuffer = msg->findBuffer("buffer", &buffer);
     if (buffer == NULL) {
+        int32_t streamErr = ERROR_END_OF_STREAM;
+        CHECK(msg->findInt32("err", &streamErr) || !hasBuffer);
+        if (streamErr == OK) {
+            return;
+        }
+
         mReachedEOS = true;
         if (mRenderer != NULL) {
             mRenderer->queueEOS(true /* audio */, ERROR_END_OF_STREAM);
@@ -201,50 +342,52 @@
     mRenderer->queueBuffer(true /* audio */, buffer, reply);
 
     ++mPendingBuffersToDrain;
-    ALOGV("onInputBufferFilled: #ToFill = %zu, #ToDrain = %zu, cachedBytes = %zu",
-            mPendingBuffersToFill, mPendingBuffersToDrain, mCachedBytes);
+    ALOGV("onInputBufferFilled: #ToDrain = %zu, cachedBytes = %zu",
+            mPendingBuffersToDrain, mCachedBytes);
 }
 
 void NuPlayer::DecoderPassThrough::onBufferConsumed(int32_t size) {
     --mPendingBuffersToDrain;
     mCachedBytes -= size;
-    ALOGV("onBufferConsumed: #ToFill = %zu, #ToDrain = %zu, cachedBytes = %zu",
-           mPendingBuffersToFill, mPendingBuffersToDrain, mCachedBytes);
-    requestABuffer();
+    ALOGV("onBufferConsumed: #ToDrain = %zu, cachedBytes = %zu",
+            mPendingBuffersToDrain, mCachedBytes);
+    onRequestInputBuffers();
 }
 
-void NuPlayer::DecoderPassThrough::onFlush() {
+void NuPlayer::DecoderPassThrough::onResume() {
+    onRequestInputBuffers();
+}
+
+void NuPlayer::DecoderPassThrough::onFlush(bool notifyComplete) {
     ++mBufferGeneration;
     mSkipRenderingUntilMediaTimeUs = -1;
 
     if (mRenderer != NULL) {
-        mRenderer->flush(true /* audio */);
+        mRenderer->flush(true /* audio */, notifyComplete);
+        mRenderer->signalTimeDiscontinuity();
     }
 
-    sp<AMessage> notify = mNotify->dup();
-    notify->setInt32("what", kWhatFlushCompleted);
-    notify->post();
-    mPendingBuffersToFill = 0;
+    if (notifyComplete) {
+        sp<AMessage> notify = mNotify->dup();
+        notify->setInt32("what", kWhatFlushCompleted);
+        notify->post();
+    }
+
     mPendingBuffersToDrain = 0;
     mCachedBytes = 0;
     mReachedEOS = false;
 }
 
-void NuPlayer::DecoderPassThrough::requestMaxBuffers() {
-    for (size_t i = 0; i < kMaxPendingBuffers; i++) {
-        if (!requestABuffer()) {
-            break;
-        }
-    }
-}
-
-void NuPlayer::DecoderPassThrough::onShutdown() {
+void NuPlayer::DecoderPassThrough::onShutdown(bool notifyComplete) {
     ++mBufferGeneration;
     mSkipRenderingUntilMediaTimeUs = -1;
 
-    sp<AMessage> notify = mNotify->dup();
-    notify->setInt32("what", kWhatShutdownCompleted);
-    notify->post();
+    if (notifyComplete) {
+        sp<AMessage> notify = mNotify->dup();
+        notify->setInt32("what", kWhatShutdownCompleted);
+        notify->post();
+    }
+
     mReachedEOS = true;
 }
 
@@ -253,31 +396,6 @@
             msg->debugString().c_str());
 
     switch (msg->what()) {
-        case kWhatConfigure:
-        {
-            sp<AMessage> format;
-            CHECK(msg->findMessage("format", &format));
-            onConfigure(format);
-            break;
-        }
-
-        case kWhatRequestABuffer:
-        {
-            if (!isStaleReply(msg)) {
-                requestABuffer();
-            }
-
-            break;
-        }
-
-        case kWhatInputBufferFilled:
-        {
-            if (!isStaleReply(msg)) {
-                onInputBufferFilled(msg);
-            }
-            break;
-        }
-
         case kWhatBufferConsumed:
         {
             if (!isStaleReply(msg)) {
@@ -288,26 +406,8 @@
             break;
         }
 
-        case kWhatFlush:
-        {
-            onFlush();
-            break;
-        }
-
-        case kWhatResume:
-        {
-            requestMaxBuffers();
-            break;
-        }
-
-        case kWhatShutdown:
-        {
-            onShutdown();
-            break;
-        }
-
         default:
-            TRESPASS();
+            DecoderBase::onMessageReceived(msg);
             break;
     }
 }