NBAIO: re-implement NBAIO Pipe and MonoPipe using fifo

Also removed const from NBAIO_Sink::availableToWrite() because
at least one implementation can no longer implement the const-ness.

Test: normal mixer, tee sink, and remote submix still work
Change-Id: I8461177efdf53bba8295b147e97835b018804903
diff --git a/media/libnbaio/Android.bp b/media/libnbaio/Android.bp
index 615b541..fd7af4f 100644
--- a/media/libnbaio/Android.bp
+++ b/media/libnbaio/Android.bp
@@ -2,16 +2,15 @@
     name: "libnbaio",
     srcs: [
         "AudioBufferProviderSource.cpp",
-        "AudioStreamOutSink.cpp",
         "AudioStreamInSource.cpp",
-        "NBAIO.cpp",
+        "AudioStreamOutSink.cpp",
         "MonoPipe.cpp",
         "MonoPipeReader.cpp",
+        "NBAIO.cpp",
+        "NBLog.cpp",
         "Pipe.cpp",
         "PipeReader.cpp",
         "SourceAudioBufferProvider.cpp",
-
-        "NBLog.cpp",
     ],
 
     // libsndfile license is incompatible; uncomment to use for local debug only
@@ -33,4 +32,6 @@
         "-Werror",
         "-Wall",
     ],
+
+    include_dirs: ["system/media/audio_utils/include"],
 }
diff --git a/media/libnbaio/MonoPipe.cpp b/media/libnbaio/MonoPipe.cpp
index 8d1cb0f..3c5df1a 100644
--- a/media/libnbaio/MonoPipe.cpp
+++ b/media/libnbaio/MonoPipe.cpp
@@ -19,7 +19,6 @@
 #define LOG_TAG "MonoPipe"
 //#define LOG_NDEBUG 0
 
-#include <cutils/atomic.h>
 #include <cutils/compiler.h>
 #include <utils/Log.h>
 #include <utils/Trace.h>
@@ -32,11 +31,11 @@
 
 MonoPipe::MonoPipe(size_t reqFrames, const NBAIO_Format& format, bool writeCanBlock) :
         NBAIO_Sink(format),
-        mReqFrames(reqFrames),
+        // TODO fifo now supports non-power-of-2 buffer sizes, so could remove the roundup
         mMaxFrames(roundup(reqFrames)),
         mBuffer(malloc(mMaxFrames * Format_frameSize(format))),
-        mFront(0),
-        mRear(0),
+        mFifo(mMaxFrames, Format_frameSize(format), mBuffer, true /*throttlesWriter*/),
+        mFifoWriter(mFifo),
         mWriteTsValid(false),
         // mWriteTs
         mSetpoint((reqFrames * 11) / 16),
@@ -53,14 +52,14 @@
     free(mBuffer);
 }
 
-ssize_t MonoPipe::availableToWrite() const
+ssize_t MonoPipe::availableToWrite()
 {
     if (CC_UNLIKELY(!mNegotiated)) {
         return NEGOTIATE;
     }
-    // uses mMaxFrames not mReqFrames, so allows "over-filling" the pipe beyond requested limit
-    ssize_t ret = mMaxFrames - (mRear - android_atomic_acquire_load(&mFront));
-    ALOG_ASSERT((0 <= ret) && (ret <= mMaxFrames));
+    // uses mMaxFrames not reqFrames, so allows "over-filling" the pipe beyond requested limit
+    ssize_t ret = mFifoWriter.available();
+    ALOG_ASSERT(ret <= mMaxFrames);
     return ret;
 }
 
@@ -71,38 +70,33 @@
     }
     size_t totalFramesWritten = 0;
     while (count > 0) {
-        // can't return a negative value, as we already checked for !mNegotiated
-        size_t avail = availableToWrite();
-        size_t written = avail;
-        if (CC_LIKELY(written > count)) {
-            written = count;
-        }
-        size_t rear = mRear & (mMaxFrames - 1);
-        size_t part1 = mMaxFrames - rear;
-        if (part1 > written) {
-            part1 = written;
-        }
-        if (CC_LIKELY(part1 > 0)) {
-            memcpy((char *) mBuffer + (rear * mFrameSize), buffer, part1 * mFrameSize);
-            if (CC_UNLIKELY(rear + part1 == mMaxFrames)) {
-                size_t part2 = written - part1;
-                if (CC_LIKELY(part2 > 0)) {
-                    memcpy(mBuffer, (char *) buffer + (part1 * mFrameSize), part2 * mFrameSize);
-                }
+        ssize_t actual = mFifoWriter.write(buffer, count);
+        ALOG_ASSERT(actual <= count);
+        if (actual < 0) {
+            if (totalFramesWritten == 0) {
+                return actual;
             }
-            android_atomic_release_store(written + mRear, &mRear);
-            totalFramesWritten += written;
+            break;
         }
+        size_t written = (size_t) actual;
+        totalFramesWritten += written;
         if (!mWriteCanBlock || mIsShutdown) {
             break;
         }
         count -= written;
         buffer = (char *) buffer + (written * mFrameSize);
+        // TODO Replace this whole section by audio_util_fifo's setpoint feature.
         // Simulate blocking I/O by sleeping at different rates, depending on a throttle.
         // The throttle tries to keep the mean pipe depth near the setpoint, with a slight jitter.
         uint32_t ns;
         if (written > 0) {
-            size_t filled = (mMaxFrames - avail) + written;
+            ssize_t avail = mFifoWriter.available();
+            ALOG_ASSERT(avail <= mMaxFrames);
+            if (avail < 0) {
+                // don't return avail as status, because totalFramesWritten > 0
+                break;
+            }
+            size_t filled = mMaxFrames - (size_t) avail;
             // FIXME cache these values to avoid re-computation
             if (filled <= mSetpoint / 2) {
                 // pipe is (nearly) empty, fill quickly
diff --git a/media/libnbaio/MonoPipeReader.cpp b/media/libnbaio/MonoPipeReader.cpp
index 01dc524..a9b4d18 100644
--- a/media/libnbaio/MonoPipeReader.cpp
+++ b/media/libnbaio/MonoPipeReader.cpp
@@ -25,7 +25,7 @@
 
 MonoPipeReader::MonoPipeReader(MonoPipe* pipe) :
         NBAIO_Source(pipe->mFormat),
-        mPipe(pipe)
+        mPipe(pipe), mFifoReader(mPipe->mFifo, true /*throttlesWriter*/)
 {
 }
 
@@ -38,38 +38,21 @@
     if (CC_UNLIKELY(!mNegotiated)) {
         return NEGOTIATE;
     }
-    ssize_t ret = android_atomic_acquire_load(&mPipe->mRear) - mPipe->mFront;
-    ALOG_ASSERT((0 <= ret) && ((size_t) ret <= mPipe->mMaxFrames));
+    ssize_t ret = mFifoReader.available();
+    ALOG_ASSERT(ret <= mPipe->mMaxFrames);
     return ret;
 }
 
 ssize_t MonoPipeReader::read(void *buffer, size_t count)
 {
     // count == 0 is unlikely and not worth checking for explicitly; will be handled automatically
-    ssize_t red = availableToRead();
-    if (CC_UNLIKELY(red <= 0)) {
-        return red;
+    ssize_t actual = mFifoReader.read(buffer, count);
+    ALOG_ASSERT(actual <= count);
+    if (CC_UNLIKELY(actual <= 0)) {
+        return actual;
     }
-    if (CC_LIKELY((size_t) red > count)) {
-        red = count;
-    }
-    size_t front = mPipe->mFront & (mPipe->mMaxFrames - 1);
-    size_t part1 = mPipe->mMaxFrames - front;
-    if (part1 > (size_t) red) {
-        part1 = red;
-    }
-    if (CC_LIKELY(part1 > 0)) {
-        memcpy(buffer, (char *) mPipe->mBuffer + (front * mFrameSize), part1 * mFrameSize);
-        if (CC_UNLIKELY(front + part1 == mPipe->mMaxFrames)) {
-            size_t part2 = red - part1;
-            if (CC_LIKELY(part2 > 0)) {
-                memcpy((char *) buffer + (part1 * mFrameSize), mPipe->mBuffer, part2 * mFrameSize);
-            }
-        }
-        android_atomic_release_store(red + mPipe->mFront, &mPipe->mFront);
-        mFramesRead += red;
-    }
-    return red;
+    mFramesRead += (size_t) actual;
+    return actual;
 }
 
 void MonoPipeReader::onTimestamp(const ExtendedTimestamp &timestamp)
diff --git a/media/libnbaio/NBLog.cpp b/media/libnbaio/NBLog.cpp
index 4d14904..c728e3e 100644
--- a/media/libnbaio/NBLog.cpp
+++ b/media/libnbaio/NBLog.cpp
@@ -67,6 +67,7 @@
 /*static*/
 size_t NBLog::Timeline::sharedSize(size_t size)
 {
+    // TODO fifo now supports non-power-of-2 buffer sizes, so could remove the roundup
     return sizeof(Shared) + roundup(size);
 }
 
diff --git a/media/libnbaio/Pipe.cpp b/media/libnbaio/Pipe.cpp
index 13f211d..39df3f4 100644
--- a/media/libnbaio/Pipe.cpp
+++ b/media/libnbaio/Pipe.cpp
@@ -27,9 +27,11 @@
 
 Pipe::Pipe(size_t maxFrames, const NBAIO_Format& format, void *buffer) :
         NBAIO_Sink(format),
+        // TODO fifo now supports non-power-of-2 buffer sizes, so could remove the roundup
         mMaxFrames(roundup(maxFrames)),
         mBuffer(buffer == NULL ? malloc(mMaxFrames * Format_frameSize(format)) : buffer),
-        mRear(0),
+        mFifo(mMaxFrames, Format_frameSize(format), mBuffer, false /*throttlesWriter*/),
+        mFifoWriter(mFifo),
         mReaders(0),
         mFreeBufferInDestructor(buffer == NULL)
 {
@@ -49,25 +51,13 @@
     if (CC_UNLIKELY(!mNegotiated)) {
         return NEGOTIATE;
     }
-    // write() is not multi-thread safe w.r.t. itself, so no mutex or atomic op needed to read mRear
-    size_t rear = mRear & (mMaxFrames - 1);
-    size_t written = mMaxFrames - rear;
-    if (CC_LIKELY(written > count)) {
-        written = count;
+    ssize_t actual = mFifoWriter.write(buffer, count);
+    ALOG_ASSERT(actual <= count);
+    if (actual <= 0) {
+        return actual;
     }
-    memcpy((char *) mBuffer + (rear * mFrameSize), buffer, written * mFrameSize);
-    if (CC_UNLIKELY(rear + written == mMaxFrames)) {
-        if (CC_UNLIKELY((count -= written) > rear)) {
-            count = rear;
-        }
-        if (CC_LIKELY(count > 0)) {
-            memcpy(mBuffer, (char *) buffer + (written * mFrameSize), count * mFrameSize);
-            written += count;
-        }
-    }
-    android_atomic_release_store(written + mRear, &mRear);
-    mFramesWritten += written;
-    return written;
+    mFramesWritten += (size_t) actual;
+    return actual;
 }
 
 }   // namespace android
diff --git a/media/libnbaio/PipeReader.cpp b/media/libnbaio/PipeReader.cpp
index fdea68e..bd468a6 100644
--- a/media/libnbaio/PipeReader.cpp
+++ b/media/libnbaio/PipeReader.cpp
@@ -25,9 +25,7 @@
 
 PipeReader::PipeReader(Pipe& pipe) :
         NBAIO_Source(pipe.mFormat),
-        mPipe(pipe),
-        // any data already in the pipe is not visible to this PipeReader
-        mFront(android_atomic_acquire_load(&pipe.mRear)),
+        mPipe(pipe), mFifoReader(mPipe.mFifo, false /*throttlesWriter*/),
         mFramesOverrun(0),
         mOverruns(0)
 {
@@ -45,71 +43,54 @@
     ALOG_ASSERT(readers > 0);
 }
 
-__attribute__((no_sanitize("integer")))
 ssize_t PipeReader::availableToRead()
 {
     if (CC_UNLIKELY(!mNegotiated)) {
         return NEGOTIATE;
     }
-    int32_t rear = android_atomic_acquire_load(&mPipe.mRear);
-    // read() is not multi-thread safe w.r.t. itself, so no mutex or atomic op needed to read mFront
-    size_t avail = rear - mFront;
-    if (CC_UNLIKELY(avail > mPipe.mMaxFrames)) {
-        // Discard all data in pipe to avoid another overrun immediately
-        mFront = rear;
-        mFramesOverrun += avail;
+    size_t lost;
+    ssize_t avail = mFifoReader.available(&lost);
+    if (avail == -EOVERFLOW || lost > 0) {
+        mFramesOverrun += lost;
         ++mOverruns;
-        return OVERRUN;
+        avail = OVERRUN;
     }
     return avail;
 }
 
-__attribute__((no_sanitize("integer")))
 ssize_t PipeReader::read(void *buffer, size_t count)
 {
-    ssize_t avail = availableToRead();
-    if (CC_UNLIKELY(avail <= 0)) {
-        return avail;
+    size_t lost;
+    ssize_t actual = mFifoReader.read(buffer, count, NULL /*timeout*/, &lost);
+    ALOG_ASSERT(actual <= count);
+    if (actual == -EOVERFLOW || lost > 0) {
+        mFramesOverrun += lost;
+        ++mOverruns;
+        actual = OVERRUN;
     }
-    // An overrun can occur from here on and be silently ignored,
-    // but it will be caught at next read()
-    if (CC_LIKELY(count > (size_t) avail)) {
-        count = avail;
+    if (actual <= 0) {
+        return actual;
     }
-    size_t front = mFront & (mPipe.mMaxFrames - 1);
-    size_t red = mPipe.mMaxFrames - front;
-    if (CC_LIKELY(red > count)) {
-        red = count;
-    }
-    // In particular, an overrun during the memcpy will result in reading corrupt data
-    memcpy(buffer, (char *) mPipe.mBuffer + (front * mFrameSize), red * mFrameSize);
-    // We could re-read the rear pointer here to detect the corruption, but why bother?
-    if (CC_UNLIKELY(front + red == mPipe.mMaxFrames)) {
-        if (CC_UNLIKELY((count -= red) > front)) {
-            count = front;
-        }
-        if (CC_LIKELY(count > 0)) {
-            memcpy((char *) buffer + (red * mFrameSize), mPipe.mBuffer, count * mFrameSize);
-            red += count;
-        }
-    }
-    mFront += red;
-    mFramesRead += red;
-    return red;
+    mFramesRead += (size_t) actual;
+    return actual;
 }
 
-__attribute__((no_sanitize("integer")))
 ssize_t PipeReader::flush()
 {
     if (CC_UNLIKELY(!mNegotiated)) {
         return NEGOTIATE;
     }
-    const int32_t rear = android_atomic_acquire_load(&mPipe.mRear);
-    const size_t flushed = rear - mFront;
-    // We don't check if flushed > mPipe.mMaxFrames (an overrun occurred) as the
-    // distinction is unimportant; all data is dropped.
-    mFront = rear;
-    mFramesRead += flushed;  // we consider flushed frames as read.
+    size_t lost;
+    ssize_t flushed = mFifoReader.flush(&lost);
+    if (flushed == -EOVERFLOW || lost > 0) {
+        mFramesOverrun += lost;
+        ++mOverruns;
+        flushed = OVERRUN;
+    }
+    if (flushed <= 0) {
+        return flushed;
+    }
+    mFramesRead += (size_t) flushed;  // we consider flushed frames as read, but not lost frames
     return flushed;
 }