mediaplayer: optimize buffer queue management

Various changes for power consumption including:
    Restrict the number of messages in flight.
    Buffer more frames in the GenericSource so reads occur in a burst.

Bug: 15094301
Change-Id: I783481fd91f3fdd445b95e88ab82178f649f1a38
Signed-off-by: Phil Burk <philburk@google.com>
diff --git a/media/libmediaplayerservice/nuplayer/GenericSource.cpp b/media/libmediaplayerservice/nuplayer/GenericSource.cpp
index ec1a9a0..2421b32 100644
--- a/media/libmediaplayerservice/nuplayer/GenericSource.cpp
+++ b/media/libmediaplayerservice/nuplayer/GenericSource.cpp
@@ -1176,12 +1176,14 @@
 void NuPlayer::GenericSource::readBuffer(
         media_track_type trackType, int64_t seekTimeUs, int64_t *actualTimeUs, bool formatChange) {
     Track *track;
+    size_t maxBuffers = 1;
     switch (trackType) {
         case MEDIA_TRACK_TYPE_VIDEO:
             track = &mVideoTrack;
             break;
         case MEDIA_TRACK_TYPE_AUDIO:
             track = &mAudioTrack;
+            maxBuffers = 64;
             break;
         case MEDIA_TRACK_TYPE_SUBTITLE:
             track = &mSubtitleTrack;
@@ -1214,7 +1216,7 @@
         options.setNonBlocking();
     }
 
-    for (;;) {
+    for (size_t numBuffers = 0; numBuffers < maxBuffers; ) {
         MediaBuffer *mbuf;
         status_t err = track->mSource->read(&mbuf, &options);
 
@@ -1245,7 +1247,7 @@
 
             sp<ABuffer> buffer = mediaBufferToABuffer(mbuf, trackType, actualTimeUs);
             track->mPackets->queueAccessUnit(buffer);
-            break;
+            ++numBuffers;
         } else if (err == WOULD_BLOCK) {
             break;
         } else if (err == INFO_FORMAT_CHANGED) {
diff --git a/media/libmediaplayerservice/nuplayer/NuPlayer.cpp b/media/libmediaplayerservice/nuplayer/NuPlayer.cpp
index df3e992..9020a8d 100644
--- a/media/libmediaplayerservice/nuplayer/NuPlayer.cpp
+++ b/media/libmediaplayerservice/nuplayer/NuPlayer.cpp
@@ -50,6 +50,10 @@
 
 namespace android {
 
+// TODO optimize buffer size for power consumption
+// The offload read buffer size is 32 KB but 24 KB uses less power.
+const size_t NuPlayer::kAggregateBufferSizeBytes = 24 * 1024;
+
 struct NuPlayer::Action : public RefBase {
     Action() {}
 
@@ -730,7 +734,7 @@
 
                 if (err == -EWOULDBLOCK) {
                     if (mSource->feedMoreTSData() == OK) {
-                        msg->post(10000ll);
+                        msg->post(10 * 1000ll);
                     }
                 }
             } else if (what == Decoder::kWhatEOS) {
@@ -995,6 +999,7 @@
     ALOGV("both audio and video are flushed now.");
 
     mPendingAudioAccessUnit.clear();
+    mAggregateBuffer.clear();
 
     if (mTimeDiscontinuityPending) {
         mRenderer->signalTimeDiscontinuity();
@@ -1256,14 +1261,8 @@
     // Aggregate smaller buffers into a larger buffer.
     // The goal is to reduce power consumption.
     // Unfortunately this does not work with the software AAC decoder.
-    // TODO optimize buffer size for power consumption
-    // The offload read buffer size is 32 KB but 24 KB uses less power.
-    const int kAudioBigBufferSizeBytes = 24 * 1024;
-    bool doBufferAggregation = (audio && mOffloadAudio);
-    sp<ABuffer> biggerBuffer;
+    bool doBufferAggregation = (audio && mOffloadAudio);;
     bool needMoreData = false;
-    int numSmallBuffers = 0;
-    bool gotTime = false;
 
     bool dropAccessUnit;
     do {
@@ -1279,14 +1278,10 @@
         }
 
         if (err == -EWOULDBLOCK) {
-            if (biggerBuffer == NULL) {
-                return err;
-            } else {
-                break; // Reply with data that we already have.
-            }
+            return err;
         } else if (err != OK) {
             if (err == INFO_DISCONTINUITY) {
-                if (biggerBuffer != NULL) {
+                if (mAggregateBuffer != NULL) {
                     // We already have some data so save this for later.
                     mPendingAudioErr = err;
                     mPendingAudioAccessUnit = accessUnit;
@@ -1401,46 +1396,45 @@
 
         size_t smallSize = accessUnit->size();
         needMoreData = false;
-        if (doBufferAggregation && (biggerBuffer == NULL)
+        if (doBufferAggregation && (mAggregateBuffer == NULL)
                 // Don't bother if only room for a few small buffers.
-                && (smallSize < (kAudioBigBufferSizeBytes / 3))) {
+                && (smallSize < (kAggregateBufferSizeBytes / 3))) {
             // Create a larger buffer for combining smaller buffers from the extractor.
-            biggerBuffer = new ABuffer(kAudioBigBufferSizeBytes);
-            biggerBuffer->setRange(0, 0); // start empty
+            mAggregateBuffer = new ABuffer(kAggregateBufferSizeBytes);
+            mAggregateBuffer->setRange(0, 0); // start empty
         }
 
-        if (biggerBuffer != NULL) {
+        if (mAggregateBuffer != NULL) {
             int64_t timeUs;
+            int64_t dummy;
             bool smallTimestampValid = accessUnit->meta()->findInt64("timeUs", &timeUs);
+            bool bigTimestampValid = mAggregateBuffer->meta()->findInt64("timeUs", &dummy);
             // Will the smaller buffer fit?
-            size_t bigSize = biggerBuffer->size();
-            size_t roomLeft = biggerBuffer->capacity() - bigSize;
+            size_t bigSize = mAggregateBuffer->size();
+            size_t roomLeft = mAggregateBuffer->capacity() - bigSize;
             // Should we save this small buffer for the next big buffer?
             // If the first small buffer did not have a timestamp then save
             // any buffer that does have a timestamp until the next big buffer.
             if ((smallSize > roomLeft)
-                || (!gotTime && (numSmallBuffers > 0) && smallTimestampValid)) {
+                || (!bigTimestampValid && (bigSize > 0) && smallTimestampValid)) {
                 mPendingAudioErr = err;
                 mPendingAudioAccessUnit = accessUnit;
                 accessUnit.clear();
             } else {
+                // Grab time from first small buffer if available.
+                if ((bigSize == 0) && smallTimestampValid) {
+                    mAggregateBuffer->meta()->setInt64("timeUs", timeUs);
+                }
                 // Append small buffer to the bigger buffer.
-                memcpy(biggerBuffer->base() + bigSize, accessUnit->data(), smallSize);
+                memcpy(mAggregateBuffer->base() + bigSize, accessUnit->data(), smallSize);
                 bigSize += smallSize;
-                biggerBuffer->setRange(0, bigSize);
+                mAggregateBuffer->setRange(0, bigSize);
 
-                // Keep looping until we run out of room in the biggerBuffer.
+                // Keep looping until we run out of room in the mAggregateBuffer.
                 needMoreData = true;
 
-                // Grab time from first small buffer if available.
-                if ((numSmallBuffers == 0) && smallTimestampValid) {
-                    biggerBuffer->meta()->setInt64("timeUs", timeUs);
-                    gotTime = true;
-                }
-
-                ALOGV("feedDecoderInputData() #%d, smallSize = %zu, bigSize = %zu, capacity = %zu",
-                        numSmallBuffers, smallSize, bigSize, biggerBuffer->capacity());
-                numSmallBuffers++;
+                ALOGV("feedDecoderInputData() smallSize = %zu, bigSize = %zu, capacity = %zu",
+                        smallSize, bigSize, mAggregateBuffer->capacity());
             }
         }
     } while (dropAccessUnit || needMoreData);
@@ -1459,9 +1453,11 @@
         mCCDecoder->decode(accessUnit);
     }
 
-    if (biggerBuffer != NULL) {
-        ALOGV("feedDecoderInputData() reply with aggregated buffer, %d", numSmallBuffers);
-        reply->setBuffer("buffer", biggerBuffer);
+    if (mAggregateBuffer != NULL) {
+        ALOGV("feedDecoderInputData() reply with aggregated buffer, %zu",
+                mAggregateBuffer->size());
+        reply->setBuffer("buffer", mAggregateBuffer);
+        mAggregateBuffer.clear();
     } else {
         reply->setBuffer("buffer", accessUnit);
     }
diff --git a/media/libmediaplayerservice/nuplayer/NuPlayer.h b/media/libmediaplayerservice/nuplayer/NuPlayer.h
index 89ae11c..2e951bd 100644
--- a/media/libmediaplayerservice/nuplayer/NuPlayer.h
+++ b/media/libmediaplayerservice/nuplayer/NuPlayer.h
@@ -67,6 +67,8 @@
     status_t getSelectedTrack(int32_t type, Parcel* reply) const;
     status_t selectTrack(size_t trackIndex, bool select);
 
+    static const size_t kAggregateBufferSizeBytes;
+
 protected:
     virtual ~NuPlayer();
 
@@ -158,8 +160,11 @@
     // notion of time has changed.
     bool mTimeDiscontinuityPending;
 
+    // Used by feedDecoderInputData to aggregate small buffers into
+    // one large buffer.
     sp<ABuffer> mPendingAudioAccessUnit;
     status_t    mPendingAudioErr;
+    sp<ABuffer> mAggregateBuffer;
 
     FlushStatus mFlushingAudio;
     FlushStatus mFlushingVideo;
diff --git a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.cpp b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.cpp
index ab7906a..f7aacdd 100644
--- a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.cpp
+++ b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.cpp
@@ -30,8 +30,10 @@
 
 namespace android {
 
-static const int kMaxPendingBuffers = 10;
-static const int kMaxCachedBytes = 200000;
+static const size_t kMaxCachedBytes = 200000;
+// The buffers will contain a bit less than kAggregateBufferSizeBytes.
+// So we can start off with just enough buffers to keep the cache full.
+static const size_t kMaxPendingBuffers = 1 + (kMaxCachedBytes / NuPlayer::kAggregateBufferSizeBytes);
 
 NuPlayer::DecoderPassThrough::DecoderPassThrough(
         const sp<AMessage> &notify)
@@ -39,7 +41,8 @@
       mNotify(notify),
       mBufferGeneration(0),
       mReachedEOS(true),
-      mPendingBuffers(0),
+      mPendingBuffersToFill(0),
+      mPendingBuffersToDrain(0),
       mCachedBytes(0),
       mComponentName("pass through decoder") {
     mDecoderLooper = new ALooper;
@@ -79,12 +82,13 @@
 
 void NuPlayer::DecoderPassThrough::onConfigure(const sp<AMessage> &format) {
     ALOGV("[%s] onConfigure", mComponentName.c_str());
-    mPendingBuffers = 0;
     mCachedBytes = 0;
+    mPendingBuffersToFill = 0;
+    mPendingBuffersToDrain = 0;
     mReachedEOS = false;
     ++mBufferGeneration;
 
-    requestABuffer();
+    requestMaxBuffers();
 
     sp<AMessage> notify = mNotify->dup();
     notify->setInt32("what", kWhatOutputFormatChanged);
@@ -98,12 +102,15 @@
     return generation != mBufferGeneration;
 }
 
-void NuPlayer::DecoderPassThrough::requestABuffer() {
-    if (mCachedBytes >= kMaxCachedBytes || mReachedEOS) {
-        ALOGV("[%s] mReachedEOS=%d, max pending buffers(%d:%d)",
-                mComponentName.c_str(), (mReachedEOS ? 1 : 0),
-                mPendingBuffers, kMaxPendingBuffers);
-        return;
+bool NuPlayer::DecoderPassThrough::requestABuffer() {
+    if (mCachedBytes >= kMaxCachedBytes) {
+        ALOGV("[%s] mCachedBytes = %zu",
+                mComponentName.c_str(), mCachedBytes);
+        return false;
+    }
+    if (mReachedEOS) {
+        ALOGV("[%s] reached EOS", mComponentName.c_str());
+        return false;
     }
 
     sp<AMessage> reply = new AMessage(kWhatInputBufferFilled, id());
@@ -113,19 +120,16 @@
     notify->setInt32("what", kWhatFillThisBuffer);
     notify->setMessage("reply", reply);
     notify->post();
-    mPendingBuffers++;
+    mPendingBuffersToFill++;
+    ALOGV("requestABuffer: #ToFill = %zu, #ToDrain = %zu", mPendingBuffersToFill,
+            mPendingBuffersToDrain);
 
-    // pending buffers will already result in requestABuffer
-    if (mPendingBuffers < kMaxPendingBuffers) {
-        sp<AMessage> message = new AMessage(kWhatRequestABuffer, id());
-        message->setInt32("generation", mBufferGeneration);
-        message->post();
-    }
-    return;
+    return true;
 }
 
 void android::NuPlayer::DecoderPassThrough::onInputBufferFilled(
         const sp<AMessage> &msg) {
+    --mPendingBuffersToFill;
     if (mReachedEOS) {
         return;
     }
@@ -153,11 +157,16 @@
     notify->setBuffer("buffer", buffer);
     notify->setMessage("reply", reply);
     notify->post();
+    ++mPendingBuffersToDrain;
+    ALOGV("onInputBufferFilled: #ToFill = %zu, #ToDrain = %zu, cachedBytes = %zu",
+            mPendingBuffersToFill, mPendingBuffersToDrain, mCachedBytes);
 }
 
 void NuPlayer::DecoderPassThrough::onBufferConsumed(int32_t size) {
-    mPendingBuffers--;
+    --mPendingBuffersToDrain;
     mCachedBytes -= size;
+    ALOGV("onBufferConsumed: #ToFill = %zu, #ToDrain = %zu, cachedBytes = %zu",
+           mPendingBuffersToFill, mPendingBuffersToDrain, mCachedBytes);
     requestABuffer();
 }
 
@@ -167,11 +176,20 @@
     sp<AMessage> notify = mNotify->dup();
     notify->setInt32("what", kWhatFlushCompleted);
     notify->post();
-    mPendingBuffers = 0;
+    mPendingBuffersToFill = 0;
+    mPendingBuffersToDrain = 0;
     mCachedBytes = 0;
     mReachedEOS = false;
 }
 
+void NuPlayer::DecoderPassThrough::requestMaxBuffers() {
+    for (size_t i = 0; i < kMaxPendingBuffers; i++) {
+        if (!requestABuffer()) {
+            break;
+        }
+    }
+}
+
 void NuPlayer::DecoderPassThrough::onShutdown() {
     ++mBufferGeneration;
 
@@ -229,7 +247,7 @@
 
         case kWhatResume:
         {
-            requestABuffer();
+            requestMaxBuffers();
             break;
         }
 
diff --git a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.h b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.h
index 8590856..fb20257 100644
--- a/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.h
+++ b/media/libmediaplayerservice/nuplayer/NuPlayerDecoderPassThrough.h
@@ -55,19 +55,26 @@
     sp<AMessage> mNotify;
     sp<ALooper> mDecoderLooper;
 
-    void requestABuffer();
+    /** Returns true if a buffer was requested.
+     * Returns false if at EOS or cache already full.
+     */
+    bool requestABuffer();
     bool isStaleReply(const sp<AMessage> &msg);
 
     void onConfigure(const sp<AMessage> &format);
     void onFlush();
     void onInputBufferFilled(const sp<AMessage> &msg);
     void onBufferConsumed(int32_t size);
+    void requestMaxBuffers();
     void onShutdown();
 
     int32_t mBufferGeneration;
-    bool mReachedEOS;
-    int32_t mPendingBuffers;
-    int32_t mCachedBytes;
+    bool    mReachedEOS;
+    // TODO mPendingBuffersToFill and mPendingBuffersToDrain are only for
+    // debugging. They can be removed when the power investigation is done.
+    size_t  mPendingBuffersToFill;
+    size_t  mPendingBuffersToDrain;
+    size_t  mCachedBytes;
     AString mComponentName;
 
     DISALLOW_EVIL_CONSTRUCTORS(DecoderPassThrough);