fix threading in RTSPSource and StreamingSource

Bug: 18532335

Change-Id: I9c34401a928dc0ddbd0923aa5f127dc628efbb92
diff --git a/media/libmediaplayerservice/nuplayer/StreamingSource.cpp b/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
index e54f5b9..b3f224d 100644
--- a/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
+++ b/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
@@ -37,13 +37,26 @@
         const sp<IStreamSource> &source)
     : Source(notify),
       mSource(source),
-      mFinalResult(OK) {
+      mFinalResult(OK),
+      mBuffering(false) {
 }
 
 NuPlayer::StreamingSource::~StreamingSource() {
+    if (mLooper != NULL) {
+        mLooper->unregisterHandler(id());
+        mLooper->stop();
+    }
 }
 
 void NuPlayer::StreamingSource::prepareAsync() {
+    if (mLooper == NULL) {
+        mLooper = new ALooper;
+        mLooper->setName("streaming");
+        mLooper->start();
+
+        mLooper->registerHandler(this);
+    }
+
     notifyVideoSizeChanged();
     notifyFlagsChanged(0);
     notifyPrepared();
@@ -62,13 +75,15 @@
     mTSParser = new ATSParser(parserFlags);
 
     mStreamListener->start();
+
+    postReadBuffer();
 }
 
 status_t NuPlayer::StreamingSource::feedMoreTSData() {
-    if (mFinalResult != OK) {
-        return mFinalResult;
-    }
+    return postReadBuffer();
+}
 
+void NuPlayer::StreamingSource::onReadBuffer() {
     for (int32_t i = 0; i < 50; ++i) {
         char buffer[188];
         sp<AMessage> extra;
@@ -77,7 +92,7 @@
         if (n == 0) {
             ALOGI("input data EOS reached.");
             mTSParser->signalEOS(ERROR_END_OF_STREAM);
-            mFinalResult = ERROR_END_OF_STREAM;
+            setError(ERROR_END_OF_STREAM);
             break;
         } else if (n == INFO_DISCONTINUITY) {
             int32_t type = ATSParser::DISCONTINUITY_TIME;
@@ -88,7 +103,8 @@
                         IStreamListener::kKeyDiscontinuityMask, &mask)) {
                 if (mask == 0) {
                     ALOGE("Client specified an illegal discontinuity type.");
-                    return ERROR_UNSUPPORTED;
+                    setError(ERROR_UNSUPPORTED);
+                    break;
                 }
 
                 type = mask;
@@ -97,7 +113,6 @@
             mTSParser->signalDiscontinuity(
                     (ATSParser::DiscontinuityType)type, extra);
         } else if (n < 0) {
-            CHECK_EQ(n, -EWOULDBLOCK);
             break;
         } else {
             if (buffer[0] == 0x00) {
@@ -128,26 +143,80 @@
                     ALOGE("TS Parser returned error %d", err);
 
                     mTSParser->signalEOS(err);
-                    mFinalResult = err;
+                    setError(err);
                     break;
                 }
             }
         }
     }
+}
 
+status_t NuPlayer::StreamingSource::postReadBuffer() {
+    {
+        Mutex::Autolock _l(mBufferingLock);
+        if (mFinalResult != OK) {
+            return mFinalResult;
+        }
+        if (mBuffering) {
+            return OK;
+        }
+        mBuffering = true;
+    }
+
+    (new AMessage(kWhatReadBuffer, id()))->post();
     return OK;
 }
 
-sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
+bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
+    // We're going to buffer at least 2 secs worth data on all tracks before
+    // starting playback (both at startup and after a seek).
+
+    static const int64_t kMinDurationUs = 2000000ll;
+
+    sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
+    sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
+
+    status_t err;
+    int64_t durationUs;
+    if (audioTrack != NULL
+            && (durationUs = audioTrack->getBufferedDurationUs(&err))
+                    < kMinDurationUs
+            && err == OK) {
+        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
+              durationUs / 1E6);
+        return false;
+    }
+
+    if (videoTrack != NULL
+            && (durationUs = videoTrack->getBufferedDurationUs(&err))
+                    < kMinDurationUs
+            && err == OK) {
+        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
+              durationUs / 1E6);
+        return false;
+    }
+
+    return true;
+}
+
+void NuPlayer::StreamingSource::setError(status_t err) {
+    Mutex::Autolock _l(mBufferingLock);
+    mFinalResult = err;
+}
+
+sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
     if (mTSParser == NULL) {
         return NULL;
     }
 
-    ATSParser::SourceType type =
-        audio ? ATSParser::AUDIO : ATSParser::VIDEO;
+    sp<MediaSource> source = mTSParser->getSource(
+            audio ? ATSParser::AUDIO : ATSParser::VIDEO);
 
-    sp<AnotherPacketSource> source =
-        static_cast<AnotherPacketSource *>(mTSParser->getSource(type).get());
+    return static_cast<AnotherPacketSource *>(source.get());
+}
+
+sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
+    sp<AnotherPacketSource> source = getSource(audio);
 
     if (source == NULL) {
         return NULL;
@@ -158,16 +227,16 @@
 
 status_t NuPlayer::StreamingSource::dequeueAccessUnit(
         bool audio, sp<ABuffer> *accessUnit) {
-    ATSParser::SourceType type =
-        audio ? ATSParser::AUDIO : ATSParser::VIDEO;
-
-    sp<AnotherPacketSource> source =
-        static_cast<AnotherPacketSource *>(mTSParser->getSource(type).get());
+    sp<AnotherPacketSource> source = getSource(audio);
 
     if (source == NULL) {
         return -EWOULDBLOCK;
     }
 
+    if (!haveSufficientDataOnAllTracks()) {
+        postReadBuffer();
+    }
+
     status_t finalResult;
     if (!source->hasBufferAvailable(&finalResult)) {
         return finalResult == OK ? -EWOULDBLOCK : finalResult;
@@ -190,5 +259,26 @@
     return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
 }
 
+void NuPlayer::StreamingSource::onMessageReceived(
+        const sp<AMessage> &msg) {
+    switch (msg->what()) {
+        case kWhatReadBuffer:
+        {
+            onReadBuffer();
+
+            {
+                Mutex::Autolock _l(mBufferingLock);
+                mBuffering = false;
+            }
+            break;
+        }
+        default:
+        {
+            TRESPASS();
+        }
+    }
+}
+
+
 }  // namespace android