Support streaming data across binder boundaries.

Change-Id: Ifbac61406dcb81343765f99ccba08bd90f9274cc
diff --git a/media/libmedia/Android.mk b/media/libmedia/Android.mk
index 2e5cbe3..731c09d 100644
--- a/media/libmedia/Android.mk
+++ b/media/libmedia/Android.mk
@@ -15,6 +15,7 @@
     IMediaRecorderClient.cpp \
     IMediaPlayer.cpp \
     IMediaRecorder.cpp \
+    IStreamSource.cpp \
     Metadata.cpp \
     mediarecorder.cpp \
     IMediaMetadataRetriever.cpp \
diff --git a/media/libmedia/IMediaPlayerService.cpp b/media/libmedia/IMediaPlayerService.cpp
index 4abfa75..77199e1 100644
--- a/media/libmedia/IMediaPlayerService.cpp
+++ b/media/libmedia/IMediaPlayerService.cpp
@@ -23,6 +23,7 @@
 #include <media/IMediaPlayerService.h>
 #include <media/IMediaRecorder.h>
 #include <media/IOMX.h>
+#include <media/IStreamSource.h>
 
 #include <utils/Errors.h>  // for status_t
 
@@ -31,6 +32,7 @@
 enum {
     CREATE_URL = IBinder::FIRST_CALL_TRANSACTION,
     CREATE_FD,
+    CREATE_STREAM,
     DECODE_URL,
     DECODE_FD,
     CREATE_MEDIA_RECORDER,
@@ -107,6 +109,21 @@
         return interface_cast<IMediaPlayer>(reply.readStrongBinder());;
     }
 
+    virtual sp<IMediaPlayer> create(
+            pid_t pid, const sp<IMediaPlayerClient> &client,
+            const sp<IStreamSource> &source, int audioSessionId) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IMediaPlayerService::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(pid));
+        data.writeStrongBinder(client->asBinder());
+        data.writeStrongBinder(source->asBinder());
+        data.writeInt32(static_cast<int32_t>(audioSessionId));
+
+        remote()->transact(CREATE_STREAM, data, &reply);
+
+        return interface_cast<IMediaPlayer>(reply.readStrongBinder());;
+    }
+
     virtual sp<IMemory> decode(const char* url, uint32_t *pSampleRate, int* pNumChannels, int* pFormat)
     {
         Parcel data, reply;
@@ -184,6 +201,27 @@
             reply->writeStrongBinder(player->asBinder());
             return NO_ERROR;
         } break;
+        case CREATE_STREAM:
+        {
+            CHECK_INTERFACE(IMediaPlayerService, data, reply);
+
+            pid_t pid = static_cast<pid_t>(data.readInt32());
+
+            sp<IMediaPlayerClient> client =
+                interface_cast<IMediaPlayerClient>(data.readStrongBinder());
+
+            sp<IStreamSource> source =
+                interface_cast<IStreamSource>(data.readStrongBinder());
+
+            int audioSessionId = static_cast<int>(data.readInt32());
+
+            sp<IMediaPlayer> player =
+                create(pid, client, source, audioSessionId);
+
+            reply->writeStrongBinder(player->asBinder());
+            return OK;
+            break;
+        }
         case DECODE_URL: {
             CHECK_INTERFACE(IMediaPlayerService, data, reply);
             const char* url = data.readCString();
diff --git a/media/libmedia/IStreamSource.cpp b/media/libmedia/IStreamSource.cpp
new file mode 100644
index 0000000..89f2b44
--- /dev/null
+++ b/media/libmedia/IStreamSource.cpp
@@ -0,0 +1,168 @@
+/*
+ * Copyright (C) 2010 The Android Open Source Project
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+//#define LOG_NDEBUG 0
+#define LOG_TAG "IStreamSource"
+#include <utils/Log.h>
+
+#include <media/IStreamSource.h>
+
+#include <binder/IMemory.h>
+#include <binder/Parcel.h>
+
+namespace android {
+
+enum {
+    // IStreamSource
+    SET_LISTENER = IBinder::FIRST_CALL_TRANSACTION,
+    SET_BUFFERS,
+    ON_BUFFER_AVAILABLE,
+
+    // IStreamListener
+    QUEUE_BUFFER,
+    QUEUE_COMMAND,
+};
+
+struct BpStreamSource : public BpInterface<IStreamSource> {
+    BpStreamSource(const sp<IBinder> &impl)
+        : BpInterface<IStreamSource>(impl) {
+    }
+
+    virtual void setListener(const sp<IStreamListener> &listener) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamSource::getInterfaceDescriptor());
+        data.writeStrongBinder(listener->asBinder());
+        remote()->transact(SET_LISTENER, data, &reply);
+    }
+
+    virtual void setBuffers(const Vector<sp<IMemory> > &buffers) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamSource::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(buffers.size()));
+        for (size_t i = 0; i < buffers.size(); ++i) {
+            data.writeStrongBinder(buffers.itemAt(i)->asBinder());
+        }
+        remote()->transact(SET_BUFFERS, data, &reply);
+    }
+
+    virtual void onBufferAvailable(size_t index) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamSource::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(index));
+        remote()->transact(
+                ON_BUFFER_AVAILABLE, data, &reply, IBinder::FLAG_ONEWAY);
+    }
+};
+
+IMPLEMENT_META_INTERFACE(StreamSource, "android.hardware.IStreamSource");
+
+status_t BnStreamSource::onTransact(
+        uint32_t code, const Parcel &data, Parcel *reply, uint32_t flags) {
+    switch (code) {
+        case SET_LISTENER:
+        {
+            CHECK_INTERFACE(IStreamSource, data, reply);
+            setListener(
+                    interface_cast<IStreamListener>(data.readStrongBinder()));
+            break;
+        }
+
+        case SET_BUFFERS:
+        {
+            CHECK_INTERFACE(IStreamSource, data, reply);
+            size_t n = static_cast<size_t>(data.readInt32());
+            Vector<sp<IMemory> > buffers;
+            for (size_t i = 0; i < n; ++i) {
+                sp<IMemory> mem =
+                    interface_cast<IMemory>(data.readStrongBinder());
+
+                buffers.push(mem);
+            }
+            setBuffers(buffers);
+            break;
+        }
+
+        case ON_BUFFER_AVAILABLE:
+        {
+            CHECK_INTERFACE(IStreamSource, data, reply);
+            onBufferAvailable(static_cast<size_t>(data.readInt32()));
+            break;
+        }
+
+        default:
+            return BBinder::onTransact(code, data, reply, flags);
+    }
+
+    return OK;
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
+struct BpStreamListener : public BpInterface<IStreamListener> {
+    BpStreamListener(const sp<IBinder> &impl)
+        : BpInterface<IStreamListener>(impl) {
+    }
+
+    virtual void queueBuffer(size_t index, size_t size) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamListener::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(index));
+        data.writeInt32(static_cast<int32_t>(size));
+
+        remote()->transact(QUEUE_BUFFER, data, &reply, IBinder::FLAG_ONEWAY);
+    }
+
+    virtual void queueCommand(Command cmd) {
+        Parcel data, reply;
+        data.writeInterfaceToken(IStreamListener::getInterfaceDescriptor());
+        data.writeInt32(static_cast<int32_t>(cmd));
+
+        remote()->transact(QUEUE_COMMAND, data, &reply, IBinder::FLAG_ONEWAY);
+    }
+};
+
+IMPLEMENT_META_INTERFACE(StreamListener, "android.hardware.IStreamListener");
+
+status_t BnStreamListener::onTransact(
+        uint32_t code, const Parcel &data, Parcel *reply, uint32_t flags) {
+    switch (code) {
+        case QUEUE_BUFFER:
+        {
+            CHECK_INTERFACE(IStreamListener, data, reply);
+            size_t index = static_cast<size_t>(data.readInt32());
+            size_t size = static_cast<size_t>(data.readInt32());
+
+            queueBuffer(index, size);
+            break;
+        }
+
+        case QUEUE_COMMAND:
+        {
+            CHECK_INTERFACE(IStreamListener, data, reply);
+            Command cmd = static_cast<Command>(data.readInt32());
+
+            queueCommand(cmd);
+            break;
+        }
+
+        default:
+            return BBinder::onTransact(code, data, reply, flags);
+    }
+
+    return OK;
+}
+
+}  // namespace android
diff --git a/media/libmediaplayerservice/MediaPlayerService.cpp b/media/libmediaplayerservice/MediaPlayerService.cpp
index 00e510b..63d09d6 100644
--- a/media/libmediaplayerservice/MediaPlayerService.cpp
+++ b/media/libmediaplayerservice/MediaPlayerService.cpp
@@ -278,6 +278,26 @@
     return c;
 }
 
+sp<IMediaPlayer> MediaPlayerService::create(
+        pid_t pid, const sp<IMediaPlayerClient> &client,
+        const sp<IStreamSource> &source, int audioSessionId) {
+    int32_t connId = android_atomic_inc(&mNextConnId);
+    sp<Client> c = new Client(this, pid, connId, client, audioSessionId);
+
+    LOGV("Create new client(%d) from pid %d, audioSessionId=%d",
+         connId, pid, audioSessionId);
+
+    if (OK != c->setDataSource(source)) {
+        c.clear();
+    } else {
+        wp<Client> w = c;
+        Mutex::Autolock lock(mLock);
+        mClients.add(w);
+    }
+
+    return c;
+}
+
 sp<IOMX> MediaPlayerService::getOMX() {
     Mutex::Autolock autoLock(mLock);
 
@@ -864,6 +884,30 @@
     return mStatus;
 }
 
+status_t MediaPlayerService::Client::setDataSource(
+        const sp<IStreamSource> &source) {
+    // create the right type of player
+    sp<MediaPlayerBase> p = createPlayer(STAGEFRIGHT_PLAYER);
+
+    if (p == NULL) {
+        return NO_INIT;
+    }
+
+    if (!p->hardwareOutput()) {
+        mAudioOutput = new AudioOutput(mAudioSessionId);
+        static_cast<MediaPlayerInterface*>(p.get())->setAudioSink(mAudioOutput);
+    }
+
+    // now set data source
+    mStatus = p->setDataSource(source);
+
+    if (mStatus == OK) {
+        mPlayer = p;
+    }
+
+    return mStatus;
+}
+
 status_t MediaPlayerService::Client::setVideoSurface(const sp<Surface>& surface)
 {
     LOGV("[%d] setVideoSurface(%p)", mConnId, surface.get());
diff --git a/media/libmediaplayerservice/MediaPlayerService.h b/media/libmediaplayerservice/MediaPlayerService.h
index 184324c..62f8ed6 100644
--- a/media/libmediaplayerservice/MediaPlayerService.h
+++ b/media/libmediaplayerservice/MediaPlayerService.h
@@ -191,6 +191,11 @@
             const KeyedVector<String8, String8> *headers, int audioSessionId);
 
     virtual sp<IMediaPlayer>    create(pid_t pid, const sp<IMediaPlayerClient>& client, int fd, int64_t offset, int64_t length, int audioSessionId);
+
+    virtual sp<IMediaPlayer>    create(
+            pid_t pid, const sp<IMediaPlayerClient> &client,
+            const sp<IStreamSource> &source, int audioSessionId);
+
     virtual sp<IMemory>         decode(const char* url, uint32_t *pSampleRate, int* pNumChannels, int* pFormat);
     virtual sp<IMemory>         decode(int fd, int64_t offset, int64_t length, uint32_t *pSampleRate, int* pNumChannels, int* pFormat);
     virtual sp<IOMX>            getOMX();
@@ -234,6 +239,9 @@
                         const KeyedVector<String8, String8> *headers);
 
                 status_t        setDataSource(int fd, int64_t offset, int64_t length);
+
+                status_t        setDataSource(const sp<IStreamSource> &source);
+
         static  void            notify(void* cookie, int msg, int ext1, int ext2);
 
                 pid_t           pid() const { return mPid; }
diff --git a/media/libmediaplayerservice/StagefrightPlayer.cpp b/media/libmediaplayerservice/StagefrightPlayer.cpp
index 58ef99b..da564dc 100644
--- a/media/libmediaplayerservice/StagefrightPlayer.cpp
+++ b/media/libmediaplayerservice/StagefrightPlayer.cpp
@@ -44,6 +44,10 @@
     return mPlayer->setDataSource(dup(fd), offset, length);
 }
 
+status_t StagefrightPlayer::setDataSource(const sp<IStreamSource> &source) {
+    return mPlayer->setDataSource(source);
+}
+
 status_t StagefrightPlayer::setVideoSurface(const sp<Surface> &surface) {
     LOGV("setVideoSurface");
 
diff --git a/media/libmediaplayerservice/StagefrightPlayer.h b/media/libmediaplayerservice/StagefrightPlayer.h
index c4a2588..fc72bfb 100644
--- a/media/libmediaplayerservice/StagefrightPlayer.h
+++ b/media/libmediaplayerservice/StagefrightPlayer.h
@@ -35,6 +35,9 @@
             const char *url, const KeyedVector<String8, String8> *headers);
 
     virtual status_t setDataSource(int fd, int64_t offset, int64_t length);
+
+    virtual status_t setDataSource(const sp<IStreamSource> &source);
+
     virtual status_t setVideoSurface(const sp<Surface> &surface);
     virtual status_t prepare();
     virtual status_t prepareAsync();
diff --git a/media/libstagefright/AwesomePlayer.cpp b/media/libstagefright/AwesomePlayer.cpp
index ec58919..a804866 100644
--- a/media/libstagefright/AwesomePlayer.cpp
+++ b/media/libstagefright/AwesomePlayer.cpp
@@ -34,13 +34,16 @@
 #include "UDPPusher.h"
 
 #include <binder/IPCThreadState.h>
+#include <binder/MemoryDealer.h>
+#include <media/IStreamSource.h>
+#include <media/stagefright/foundation/hexdump.h>
+#include <media/stagefright/foundation/ADebug.h>
 #include <media/stagefright/AudioPlayer.h>
 #include <media/stagefright/DataSource.h>
 #include <media/stagefright/FileSource.h>
 #include <media/stagefright/MediaBuffer.h>
 #include <media/stagefright/MediaDefs.h>
 #include <media/stagefright/MediaExtractor.h>
-#include <media/stagefright/MediaDebug.h>
 #include <media/stagefright/MediaSource.h>
 #include <media/stagefright/MetaData.h>
 #include <media/stagefright/OMXCodec.h>
@@ -155,6 +158,201 @@
             const AwesomeNativeWindowRenderer &);
 };
 
+////////////////////////////////////////////////////////////////////////////////
+
+struct QueueDataSource;
+
+struct QueueListener : public BnStreamListener {
+    QueueListener(QueueDataSource *owner)
+        : mOwner(owner) {
+    }
+
+    void clearOwner();
+
+    virtual void queueBuffer(size_t index, size_t size);
+    virtual void queueCommand(Command cmd);
+
+private:
+    Mutex mLock;
+
+    QueueDataSource *mOwner;
+
+    DISALLOW_EVIL_CONSTRUCTORS(QueueListener);
+};
+
+struct QueueDataSource : public DataSource {
+    QueueDataSource(const sp<IStreamSource> &source);
+
+    virtual status_t initCheck() const;
+
+    virtual ssize_t readAt(off64_t offset, void *data, size_t size);
+
+    virtual void queueBuffer(size_t index, size_t size);
+    virtual void queueCommand(IStreamListener::Command cmd);
+
+protected:
+    virtual ~QueueDataSource();
+
+private:
+    enum {
+        kNumBuffers = 16
+    };
+
+    struct BufferInfo {
+        size_t mIndex;
+        size_t mOffset;
+        size_t mSize;
+    };
+
+    Mutex mLock;
+    Condition mCondition;
+
+    sp<IStreamSource> mSource;
+    sp<QueueListener> mListener;
+    sp<MemoryDealer> mDealer;
+    Vector<sp<IMemory> > mBuffers;
+
+    List<BufferInfo> mFilledBuffers;
+
+    off64_t mPosition;
+    bool mEOS;
+
+    DISALLOW_EVIL_CONSTRUCTORS(QueueDataSource);
+};
+
+QueueDataSource::QueueDataSource(const sp<IStreamSource> &source)
+    : mSource(source),
+      mPosition(0),
+      mEOS(false) {
+    mListener = new QueueListener(this);
+    mSource->setListener(mListener);
+
+    static const size_t kBufferSize = 8192;
+
+    mDealer = new MemoryDealer(kNumBuffers * kBufferSize);
+    for (size_t i = 0; i < kNumBuffers; ++i) {
+        sp<IMemory> mem = mDealer->allocate(kBufferSize);
+        CHECK(mem != NULL);
+
+        mBuffers.push(mem);
+    }
+    mSource->setBuffers(mBuffers);
+
+    for (size_t i = 0; i < kNumBuffers; ++i) {
+        mSource->onBufferAvailable(i);
+    }
+}
+
+QueueDataSource::~QueueDataSource() {
+    Mutex::Autolock autoLock(mLock);
+
+    while (mFilledBuffers.size() < kNumBuffers && !mEOS) {
+        mCondition.wait(mLock);
+    }
+
+    mListener->clearOwner();
+}
+
+status_t QueueDataSource::initCheck() const {
+    return OK;
+}
+
+ssize_t QueueDataSource::readAt(off64_t offset, void *data, size_t size) {
+    if (offset != mPosition) {
+        return -EPIPE;
+    }
+
+    Mutex::Autolock autoLock(mLock);
+
+    size_t sizeDone = 0;
+
+    while (sizeDone < size) {
+        while (mFilledBuffers.empty() && !mEOS) {
+            mCondition.wait(mLock);
+        }
+
+        if (mFilledBuffers.empty()) {
+            if (sizeDone > 0) {
+                mPosition += sizeDone;
+                return sizeDone;
+            }
+            return ERROR_END_OF_STREAM;
+        }
+
+        BufferInfo &info = *mFilledBuffers.begin();
+
+        size_t copy = size - sizeDone;
+        if (copy > info.mSize) {
+            copy = info.mSize;
+        }
+
+        memcpy((uint8_t *)data + sizeDone,
+               (const uint8_t *)mBuffers.itemAt(info.mIndex)->pointer()
+                    + info.mOffset,
+               copy);
+
+        info.mSize -= copy;
+        info.mOffset += copy;
+        sizeDone += copy;
+
+        if (info.mSize == 0) {
+            mSource->onBufferAvailable(info.mIndex);
+            mFilledBuffers.erase(mFilledBuffers.begin());
+        }
+    }
+
+    mPosition += sizeDone;
+
+    return sizeDone;
+}
+
+void QueueDataSource::queueBuffer(size_t index, size_t size) {
+    Mutex::Autolock autoLock(mLock);
+
+    CHECK_LT(index, mBuffers.size());
+    CHECK_LE(size, mBuffers.itemAt(index)->size());
+
+    BufferInfo info;
+    info.mIndex = index;
+    info.mSize = size;
+    info.mOffset = 0;
+
+    mFilledBuffers.push_back(info);
+    mCondition.signal();
+}
+
+void QueueDataSource::queueCommand(IStreamListener::Command cmd) {
+    Mutex::Autolock autoLock(mLock);
+
+    if (cmd == IStreamListener::EOS) {
+        mEOS = true;
+        mCondition.signal();
+    }
+}
+
+void QueueListener::clearOwner() {
+    Mutex::Autolock autoLock(mLock);
+    mOwner = NULL;
+}
+
+void QueueListener::queueBuffer(size_t index, size_t size) {
+    Mutex::Autolock autoLock(mLock);
+    if (mOwner == NULL) {
+        return;
+    }
+    mOwner->queueBuffer(index, size);
+}
+
+void QueueListener::queueCommand(Command cmd) {
+    Mutex::Autolock autoLock(mLock);
+    if (mOwner == NULL) {
+        return;
+    }
+    mOwner->queueCommand(cmd);
+}
+
+////////////////////////////////////////////////////////////////////////////////
+
 AwesomePlayer::AwesomePlayer()
     : mQueueStarted(false),
       mTimeSource(NULL),
@@ -164,7 +362,7 @@
       mExtractorFlags(0),
       mVideoBuffer(NULL),
       mDecryptHandle(NULL) {
-    CHECK_EQ(mClient.connect(), OK);
+    CHECK_EQ(mClient.connect(), (status_t)OK);
 
     DataSource::RegisterDefaultSniffers();
 
@@ -264,6 +462,26 @@
     return setDataSource_l(dataSource);
 }
 
+status_t AwesomePlayer::setDataSource(const sp<IStreamSource> &source) {
+    Mutex::Autolock autoLock(mLock);
+
+    reset_l();
+
+    sp<DataSource> dataSource = new QueueDataSource(source);
+
+#if 0
+    sp<MediaExtractor> extractor =
+        MediaExtractor::Create(dataSource, MEDIA_MIMETYPE_CONTAINER_MPEG2TS);
+
+    return setDataSource_l(extractor);
+#else
+    sp<NuCachedSource2> cached = new NuCachedSource2(dataSource);
+    dataSource = cached;
+
+    return setDataSource_l(dataSource);
+#endif
+}
+
 status_t AwesomePlayer::setDataSource_l(
         const sp<DataSource> &dataSource) {
     sp<MediaExtractor> extractor = MediaExtractor::Create(dataSource);
@@ -619,7 +837,8 @@
         IPCThreadState::self()->flushCommands();
     }
 
-    CHECK_EQ(OK, initVideoDecoder(OMXCodec::kIgnoreCodecSpecificData));
+    CHECK_EQ((status_t)OK,
+             initVideoDecoder(OMXCodec::kIgnoreCodecSpecificData));
 }
 
 void AwesomePlayer::onStreamDone() {
@@ -1171,7 +1390,7 @@
             options.clearSeekTo();
 
             if (err != OK) {
-                CHECK_EQ(mVideoBuffer, NULL);
+                CHECK(mVideoBuffer == NULL);
 
                 if (err == INFO_FORMAT_CHANGED) {
                     LOGV("VideoSource signalled format change.");
diff --git a/media/libstagefright/include/AwesomePlayer.h b/media/libstagefright/include/AwesomePlayer.h
index e33f467..46f4a35 100644
--- a/media/libstagefright/include/AwesomePlayer.h
+++ b/media/libstagefright/include/AwesomePlayer.h
@@ -67,6 +67,8 @@
 
     status_t setDataSource(int fd, int64_t offset, int64_t length);
 
+    status_t setDataSource(const sp<IStreamSource> &source);
+
     void reset();
 
     status_t prepare();