fix threading in RTSPSource and StreamingSource

Bug: 18532335

Change-Id: I9c34401a928dc0ddbd0923aa5f127dc628efbb92
diff --git a/media/libmediaplayerservice/nuplayer/RTSPSource.cpp b/media/libmediaplayerservice/nuplayer/RTSPSource.cpp
index 52ae9ee..0282a9f 100644
--- a/media/libmediaplayerservice/nuplayer/RTSPSource.cpp
+++ b/media/libmediaplayerservice/nuplayer/RTSPSource.cpp
@@ -50,7 +50,7 @@
       mState(DISCONNECTED),
       mFinalResult(OK),
       mDisconnectReplyID(0),
-      mBuffering(true),
+      mBuffering(false),
       mSeekGeneration(0),
       mEOSTimeoutAudio(0),
       mEOSTimeoutVideo(0) {
@@ -106,9 +106,7 @@
         mHandler->connect();
     }
 
-    sp<AMessage> notifyStart = dupNotify();
-    notifyStart->setInt32("what", kWhatBufferingStart);
-    notifyStart->post();
+    startBufferingIfNecessary();
 }
 
 void NuPlayer::RTSPSource::start() {
@@ -144,6 +142,7 @@
 }
 
 status_t NuPlayer::RTSPSource::feedMoreTSData() {
+    Mutex::Autolock _l(mBufferingLock);
     return mFinalResult;
 }
 
@@ -195,16 +194,8 @@
 
 status_t NuPlayer::RTSPSource::dequeueAccessUnit(
         bool audio, sp<ABuffer> *accessUnit) {
-    if (mBuffering) {
-        if (!haveSufficientDataOnAllTracks()) {
-            return -EWOULDBLOCK;
-        }
-
-        mBuffering = false;
-
-        sp<AMessage> notify = dupNotify();
-        notify->setInt32("what", kWhatBufferingEnd);
-        notify->post();
+    if (!stopBufferingIfNecessary()) {
+        return -EWOULDBLOCK;
     }
 
     sp<AnotherPacketSource> source = getSource(audio);
@@ -246,11 +237,7 @@
             if (!(otherSource != NULL && otherSource->isFinished(mediaDurationUs))) {
                 // We should not enter buffering mode
                 // if any of the sources already have detected EOS.
-                mBuffering = true;
-
-                sp<AMessage> notify = dupNotify();
-                notify->setInt32("what", kWhatBufferingStart);
-                notify->post();
+                startBufferingIfNecessary();
             }
 
             return -EWOULDBLOCK;
@@ -630,7 +617,7 @@
         }
 
         mState = DISCONNECTED;
-        mFinalResult = err;
+        setError(err);
 
         if (mDisconnectReplyID != 0) {
             finishDisconnectIfPossible();
@@ -657,7 +644,7 @@
     }
 
     mState = DISCONNECTED;
-    mFinalResult = err;
+    setError(err);
 
     if (mDisconnectReplyID != 0) {
         finishDisconnectIfPossible();
@@ -678,4 +665,40 @@
     mDisconnectReplyID = 0;
 }
 
+void NuPlayer::RTSPSource::setError(status_t err) {
+    Mutex::Autolock _l(mBufferingLock);
+    mFinalResult = err;
+}
+
+void NuPlayer::RTSPSource::startBufferingIfNecessary() {
+    Mutex::Autolock _l(mBufferingLock);
+
+    if (!mBuffering) {
+        mBuffering = true;
+
+        sp<AMessage> notify = dupNotify();
+        notify->setInt32("what", kWhatBufferingStart);
+        notify->post();
+    }
+}
+
+bool NuPlayer::RTSPSource::stopBufferingIfNecessary() {
+    Mutex::Autolock _l(mBufferingLock);
+
+    if (mBuffering) {
+        if (!haveSufficientDataOnAllTracks()) {
+            return false;
+        }
+
+        mBuffering = false;
+
+        sp<AMessage> notify = dupNotify();
+        notify->setInt32("what", kWhatBufferingEnd);
+        notify->post();
+    }
+
+    return true;
+}
+
+
 }  // namespace android
diff --git a/media/libmediaplayerservice/nuplayer/RTSPSource.h b/media/libmediaplayerservice/nuplayer/RTSPSource.h
index f1cae53..ac3299a 100644
--- a/media/libmediaplayerservice/nuplayer/RTSPSource.h
+++ b/media/libmediaplayerservice/nuplayer/RTSPSource.h
@@ -97,6 +97,7 @@
     State mState;
     status_t mFinalResult;
     uint32_t mDisconnectReplyID;
+    Mutex mBufferingLock;
     bool mBuffering;
 
     sp<ALooper> mLooper;
@@ -126,6 +127,9 @@
     bool haveSufficientDataOnAllTracks();
 
     void setEOSTimeout(bool audio, int64_t timeout);
+    void setError(status_t err);
+    void startBufferingIfNecessary();
+    bool stopBufferingIfNecessary();
 
     DISALLOW_EVIL_CONSTRUCTORS(RTSPSource);
 };
diff --git a/media/libmediaplayerservice/nuplayer/StreamingSource.cpp b/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
index e54f5b9..b3f224d 100644
--- a/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
+++ b/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
@@ -37,13 +37,26 @@
         const sp<IStreamSource> &source)
     : Source(notify),
       mSource(source),
-      mFinalResult(OK) {
+      mFinalResult(OK),
+      mBuffering(false) {
 }
 
 NuPlayer::StreamingSource::~StreamingSource() {
+    if (mLooper != NULL) {
+        mLooper->unregisterHandler(id());
+        mLooper->stop();
+    }
 }
 
 void NuPlayer::StreamingSource::prepareAsync() {
+    if (mLooper == NULL) {
+        mLooper = new ALooper;
+        mLooper->setName("streaming");
+        mLooper->start();
+
+        mLooper->registerHandler(this);
+    }
+
     notifyVideoSizeChanged();
     notifyFlagsChanged(0);
     notifyPrepared();
@@ -62,13 +75,15 @@
     mTSParser = new ATSParser(parserFlags);
 
     mStreamListener->start();
+
+    postReadBuffer();
 }
 
 status_t NuPlayer::StreamingSource::feedMoreTSData() {
-    if (mFinalResult != OK) {
-        return mFinalResult;
-    }
+    return postReadBuffer();
+}
 
+void NuPlayer::StreamingSource::onReadBuffer() {
     for (int32_t i = 0; i < 50; ++i) {
         char buffer[188];
         sp<AMessage> extra;
@@ -77,7 +92,7 @@
         if (n == 0) {
             ALOGI("input data EOS reached.");
             mTSParser->signalEOS(ERROR_END_OF_STREAM);
-            mFinalResult = ERROR_END_OF_STREAM;
+            setError(ERROR_END_OF_STREAM);
             break;
         } else if (n == INFO_DISCONTINUITY) {
             int32_t type = ATSParser::DISCONTINUITY_TIME;
@@ -88,7 +103,8 @@
                         IStreamListener::kKeyDiscontinuityMask, &mask)) {
                 if (mask == 0) {
                     ALOGE("Client specified an illegal discontinuity type.");
-                    return ERROR_UNSUPPORTED;
+                    setError(ERROR_UNSUPPORTED);
+                    break;
                 }
 
                 type = mask;
@@ -97,7 +113,6 @@
             mTSParser->signalDiscontinuity(
                     (ATSParser::DiscontinuityType)type, extra);
         } else if (n < 0) {
-            CHECK_EQ(n, -EWOULDBLOCK);
             break;
         } else {
             if (buffer[0] == 0x00) {
@@ -128,26 +143,80 @@
                     ALOGE("TS Parser returned error %d", err);
 
                     mTSParser->signalEOS(err);
-                    mFinalResult = err;
+                    setError(err);
                     break;
                 }
             }
         }
     }
+}
 
+status_t NuPlayer::StreamingSource::postReadBuffer() {
+    {
+        Mutex::Autolock _l(mBufferingLock);
+        if (mFinalResult != OK) {
+            return mFinalResult;
+        }
+        if (mBuffering) {
+            return OK;
+        }
+        mBuffering = true;
+    }
+
+    (new AMessage(kWhatReadBuffer, id()))->post();
     return OK;
 }
 
-sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
+bool NuPlayer::StreamingSource::haveSufficientDataOnAllTracks() {
+    // We're going to buffer at least 2 secs worth data on all tracks before
+    // starting playback (both at startup and after a seek).
+
+    static const int64_t kMinDurationUs = 2000000ll;
+
+    sp<AnotherPacketSource> audioTrack = getSource(true /*audio*/);
+    sp<AnotherPacketSource> videoTrack = getSource(false /*audio*/);
+
+    status_t err;
+    int64_t durationUs;
+    if (audioTrack != NULL
+            && (durationUs = audioTrack->getBufferedDurationUs(&err))
+                    < kMinDurationUs
+            && err == OK) {
+        ALOGV("audio track doesn't have enough data yet. (%.2f secs buffered)",
+              durationUs / 1E6);
+        return false;
+    }
+
+    if (videoTrack != NULL
+            && (durationUs = videoTrack->getBufferedDurationUs(&err))
+                    < kMinDurationUs
+            && err == OK) {
+        ALOGV("video track doesn't have enough data yet. (%.2f secs buffered)",
+              durationUs / 1E6);
+        return false;
+    }
+
+    return true;
+}
+
+void NuPlayer::StreamingSource::setError(status_t err) {
+    Mutex::Autolock _l(mBufferingLock);
+    mFinalResult = err;
+}
+
+sp<AnotherPacketSource> NuPlayer::StreamingSource::getSource(bool audio) {
     if (mTSParser == NULL) {
         return NULL;
     }
 
-    ATSParser::SourceType type =
-        audio ? ATSParser::AUDIO : ATSParser::VIDEO;
+    sp<MediaSource> source = mTSParser->getSource(
+            audio ? ATSParser::AUDIO : ATSParser::VIDEO);
 
-    sp<AnotherPacketSource> source =
-        static_cast<AnotherPacketSource *>(mTSParser->getSource(type).get());
+    return static_cast<AnotherPacketSource *>(source.get());
+}
+
+sp<MetaData> NuPlayer::StreamingSource::getFormatMeta(bool audio) {
+    sp<AnotherPacketSource> source = getSource(audio);
 
     if (source == NULL) {
         return NULL;
@@ -158,16 +227,16 @@
 
 status_t NuPlayer::StreamingSource::dequeueAccessUnit(
         bool audio, sp<ABuffer> *accessUnit) {
-    ATSParser::SourceType type =
-        audio ? ATSParser::AUDIO : ATSParser::VIDEO;
-
-    sp<AnotherPacketSource> source =
-        static_cast<AnotherPacketSource *>(mTSParser->getSource(type).get());
+    sp<AnotherPacketSource> source = getSource(audio);
 
     if (source == NULL) {
         return -EWOULDBLOCK;
     }
 
+    if (!haveSufficientDataOnAllTracks()) {
+        postReadBuffer();
+    }
+
     status_t finalResult;
     if (!source->hasBufferAvailable(&finalResult)) {
         return finalResult == OK ? -EWOULDBLOCK : finalResult;
@@ -190,5 +259,26 @@
     return mSource->flags() & IStreamSource::kFlagIsRealTimeData;
 }
 
+void NuPlayer::StreamingSource::onMessageReceived(
+        const sp<AMessage> &msg) {
+    switch (msg->what()) {
+        case kWhatReadBuffer:
+        {
+            onReadBuffer();
+
+            {
+                Mutex::Autolock _l(mBufferingLock);
+                mBuffering = false;
+            }
+            break;
+        }
+        default:
+        {
+            TRESPASS();
+        }
+    }
+}
+
+
 }  // namespace android
 
diff --git a/media/libmediaplayerservice/nuplayer/StreamingSource.h b/media/libmediaplayerservice/nuplayer/StreamingSource.h
index 412b6c4..1f95f3c 100644
--- a/media/libmediaplayerservice/nuplayer/StreamingSource.h
+++ b/media/libmediaplayerservice/nuplayer/StreamingSource.h
@@ -25,6 +25,7 @@
 
 struct ABuffer;
 struct ATSParser;
+struct AnotherPacketSource;
 
 struct NuPlayer::StreamingSource : public NuPlayer::Source {
     StreamingSource(
@@ -43,14 +44,29 @@
 protected:
     virtual ~StreamingSource();
 
+    virtual void onMessageReceived(const sp<AMessage> &msg);
+
     virtual sp<MetaData> getFormatMeta(bool audio);
 
 private:
+    enum {
+        kWhatReadBuffer,
+    };
     sp<IStreamSource> mSource;
     status_t mFinalResult;
     sp<NuPlayerStreamListener> mStreamListener;
     sp<ATSParser> mTSParser;
 
+    bool mBuffering;
+    Mutex mBufferingLock;
+    sp<ALooper> mLooper;
+
+    void setError(status_t err);
+    sp<AnotherPacketSource> getSource(bool audio);
+    bool haveSufficientDataOnAllTracks();
+    status_t postReadBuffer();
+    void onReadBuffer();
+
     DISALLOW_EVIL_CONSTRUCTORS(StreamingSource);
 };