Re-implement HIDL stream read and write using FMQ

Result: no hwbinder calls due read / write session.

Test: make, perform Loopback RTT, check traces
Bug: 30222631

Change-Id: I4a8792525ec374111302cfd5c0a2e41f9f4cc418
diff --git a/media/libaudiohal/Android.mk b/media/libaudiohal/Android.mk
index e82766f..893c626 100644
--- a/media/libaudiohal/Android.mk
+++ b/media/libaudiohal/Android.mk
@@ -3,11 +3,13 @@
 include $(CLEAR_VARS)
 
 LOCAL_SHARED_LIBRARIES := \
+    libbase \
     libcutils \
+    libeffects \
+    libfmq \
     libhardware \
     liblog \
-    libutils \
-    libeffects
+    libutils
 
 ifeq ($(ENABLE_TREBLE), true)
 
diff --git a/media/libaudiohal/StreamHalHidl.cpp b/media/libaudiohal/StreamHalHidl.cpp
index 2c6e564..7f8d663 100644
--- a/media/libaudiohal/StreamHalHidl.cpp
+++ b/media/libaudiohal/StreamHalHidl.cpp
@@ -14,6 +14,8 @@
  * limitations under the License.
  */
 
+#include <time.h>
+
 #define LOG_TAG "StreamHalHidl"
 //#define LOG_NDEBUG 0
 
@@ -28,18 +30,23 @@
 using ::android::hardware::audio::common::V2_0::AudioFormat;
 using ::android::hardware::audio::V2_0::AudioDrain;
 using ::android::hardware::audio::V2_0::IStreamOutCallback;
-using ::android::hardware::audio::V2_0::ParameterValue;
-using ::android::hardware::audio::V2_0::Result;
-using ::android::hardware::audio::V2_0::TimeSpec;
+using ::android::hardware::audio::V2_0::MessageQueueFlagBits;
 using ::android::hardware::audio::V2_0::MmapBufferInfo;
 using ::android::hardware::audio::V2_0::MmapPosition;
+using ::android::hardware::audio::V2_0::ParameterValue;
+using ::android::hardware::audio::V2_0::Result;
+using ::android::hardware::audio::V2_0::ThreadPriority;
+using ::android::hardware::audio::V2_0::TimeSpec;
+using ::android::hardware::MQDescriptorSync;
 using ::android::hardware::Return;
 using ::android::hardware::Void;
 
 namespace android {
 
 StreamHalHidl::StreamHalHidl(IStream *stream)
-        : ConversionHelperHidl("Stream"), mStream(stream) {
+        : ConversionHelperHidl("Stream"),
+          mHalThreadPriority(static_cast<int>(ThreadPriority::NORMAL)),
+          mStream(stream) {
 }
 
 StreamHalHidl::~StreamHalHidl() {
@@ -176,6 +183,11 @@
     return processReturn("getMmapPosition", ret, retval);
 }
 
+status_t StreamHalHidl::setHalThreadPriority(int priority) {
+    mHalThreadPriority = priority;
+    return OK;
+}
+
 namespace {
 
 /* Notes on callback ownership.
@@ -229,14 +241,21 @@
 }  // namespace
 
 StreamOutHalHidl::StreamOutHalHidl(const sp<IStreamOut>& stream)
-        : StreamHalHidl(stream.get()), mStream(stream) {
+        : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr),
+          mGetPresentationPositionNotSupported(false), mPPosFromWriteObtained(0) {
 }
 
 StreamOutHalHidl::~StreamOutHalHidl() {
-    if (mCallback.unsafe_get() && mStream != 0) {
-        processReturn("clearCallback", mStream->clearCallback());
+    if (mStream != 0) {
+        if (mCallback.unsafe_get()) {
+            processReturn("clearCallback", mStream->clearCallback());
+        }
+        processReturn("close", mStream->close());
     }
     mCallback.clear();
+    if (mEfGroup) {
+        EventFlag::deleteEventFlag(&mEfGroup);
+    }
 }
 
 status_t StreamOutHalHidl::getFrameSize(size_t *size) {
@@ -256,16 +275,89 @@
 
 status_t StreamOutHalHidl::write(const void *buffer, size_t bytes, size_t *written) {
     if (mStream == 0) return NO_INIT;
-    hidl_vec<uint8_t> hidlData;
-    hidlData.setToExternal(static_cast<uint8_t*>(const_cast<void*>(buffer)), bytes);
+    *written = 0;
+
+    if (bytes == 0 && !mDataMQ) {
+        // Can't determine the size for the MQ buffer. Wait for a non-empty write request.
+        ALOGW_IF(mCallback.unsafe_get(), "First call to async write with 0 bytes");
+        return OK;
+    }
+
+    status_t status;
+    if (!mDataMQ && (status = prepareForWriting(bytes)) != OK) {
+        return status;
+    }
+
+    const size_t availBytes = mDataMQ->availableToWrite();
+    if (bytes > availBytes) { bytes = availBytes; }
+    if (!mDataMQ->write(static_cast<const uint8_t*>(buffer), bytes)) {
+        ALOGW("data message queue write failed");
+    }
+    mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
+
+    // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+    uint32_t efState = 0;
+retry:
+    status_t ret = mEfGroup->wait(
+            static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState, NS_PER_SEC);
+    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
+        WriteStatus writeStatus = { Result::NOT_INITIALIZED, 0, 0, { 0, 0 } };
+        mStatusMQ->read(&writeStatus);
+        if (writeStatus.retval == Result::OK) {
+            status = OK;
+            *written = writeStatus.written;
+            mPPosFromWriteFrames = writeStatus.frames;
+            mPPosFromWriteTS.tv_sec = writeStatus.timeStamp.tvSec;
+            mPPosFromWriteTS.tv_nsec = writeStatus.timeStamp.tvNSec;
+            struct timespec timeNow;
+            clock_gettime(CLOCK_MONOTONIC, &timeNow);
+            mPPosFromWriteObtained = timeNow.tv_sec * 1000000 + timeNow.tv_nsec / 1000;
+        } else {
+            status = processReturn("write", writeStatus.retval);
+        }
+        return status;
+    }
+    if (ret == -EAGAIN) {
+        // This normally retries no more than once.
+        goto retry;
+    }
+    return ret;
+}
+
+status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
+    std::unique_ptr<DataMQ> tempDataMQ;
+    std::unique_ptr<StatusMQ> tempStatusMQ;
     Result retval;
-    Return<void> ret = mStream->write(
-            hidlData,
-            [&](Result r, uint64_t w) {
+    Return<void> ret = mStream->prepareForWriting(
+            1, bufferSize, ThreadPriority(mHalThreadPriority),
+            [&](Result r,
+                    const MQDescriptorSync<uint8_t>& dataMQ,
+                    const MQDescriptorSync<WriteStatus>& statusMQ) {
                 retval = r;
-                *written = w;
+                if (retval == Result::OK) {
+                    tempDataMQ.reset(new DataMQ(dataMQ));
+                    tempStatusMQ.reset(new StatusMQ(statusMQ));
+                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
+                        EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+                    }
+                }
             });
-    return processReturn("write", ret, retval);
+    if (!ret.getStatus().isOk() || retval != Result::OK) {
+        return processReturn("prepareForWriting", ret, retval);
+    }
+    if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
+        || !mEfGroup) {
+        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
+        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
+        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
+        ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
+                "Status message queue for writing is invalid");
+        ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
+        return NO_INIT;
+    }
+    mDataMQ = std::move(tempDataMQ);
+    mStatusMQ = std::move(tempStatusMQ);
+    return OK;
 }
 
 status_t StreamOutHalHidl::getRenderPosition(uint32_t *dspFrames) {
@@ -342,6 +434,18 @@
 
 status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct timespec *timestamp) {
     if (mStream == 0) return NO_INIT;
+    if (mGetPresentationPositionNotSupported) return INVALID_OPERATION;
+    struct timespec timeNow;
+    clock_gettime(CLOCK_MONOTONIC, &timeNow);
+    uint64_t timeStampNow = timeNow.tv_sec * 1000000 + timeNow.tv_nsec / 1000;
+    if (timeStampNow - mPPosFromWriteObtained <= 1000) {
+        // No more than 1 ms passed since the last write, use cached result to avoid binder calls.
+        *frames = mPPosFromWriteFrames;
+        timestamp->tv_sec = mPPosFromWriteTS.tv_sec;
+        timestamp->tv_nsec = mPPosFromWriteTS.tv_nsec;
+        return OK;
+    }
+
     Result retval;
     Return<void> ret = mStream->getPresentationPosition(
             [&](Result r, uint64_t hidlFrames, const TimeSpec& hidlTimeStamp) {
@@ -352,6 +456,9 @@
                     timestamp->tv_nsec = hidlTimeStamp.tvNSec;
                 }
             });
+    if (ret.getStatus().isOk() && retval == Result::NOT_SUPPORTED) {
+        mGetPresentationPositionNotSupported = true;
+    }
     return processReturn("getPresentationPosition", ret, retval);
 }
 
@@ -378,10 +485,16 @@
 
 
 StreamInHalHidl::StreamInHalHidl(const sp<IStreamIn>& stream)
-        : StreamHalHidl(stream.get()), mStream(stream) {
+        : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr) {
 }
 
 StreamInHalHidl::~StreamInHalHidl() {
+    if (mStream != 0) {
+        processReturn("close", mStream->close());
+    }
+    if (mEfGroup) {
+        EventFlag::deleteEventFlag(&mEfGroup);
+    }
 }
 
 status_t StreamInHalHidl::getFrameSize(size_t *size) {
@@ -396,17 +509,83 @@
 
 status_t StreamInHalHidl::read(void *buffer, size_t bytes, size_t *read) {
     if (mStream == 0) return NO_INIT;
+    *read = 0;
+
+    if (bytes == 0 && !mDataMQ) {
+        // Can't determine the size for the MQ buffer. Wait for a non-empty read request.
+        return OK;
+    }
+
+    status_t status;
+    if (!mDataMQ) {
+        if ((status = prepareForReading(bytes)) != OK) return status;
+        // Trigger the first read.
+        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+    }
+
+    // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+    uint32_t efState = 0;
+retry:
+    status_t ret = mEfGroup->wait(
+            static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState, NS_PER_SEC);
+    if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)) {
+        ReadStatus readStatus = { Result::NOT_INITIALIZED, 0 };
+        const size_t availToRead = mDataMQ->availableToRead();
+        if (bytes > availToRead) { bytes = availToRead; }
+        mDataMQ->read(static_cast<uint8_t*>(buffer), bytes);
+        mStatusMQ->read(&readStatus);
+        mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+        if (readStatus.retval == Result::OK) {
+            ALOGW_IF(availToRead != readStatus.read,
+                    "HAL read report inconsistent: mq = %d, status = %d",
+                    (int32_t)availToRead, (int32_t)readStatus.read);
+            *read = readStatus.read;
+        } else {
+            status = processReturn("read", readStatus.retval);
+        }
+        return status;
+    }
+    if (ret == -EAGAIN) {
+        // This normally retries no more than once.
+        goto retry;
+    }
+    return ret;
+}
+
+status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
+    std::unique_ptr<DataMQ> tempDataMQ;
+    std::unique_ptr<StatusMQ> tempStatusMQ;
     Result retval;
-    Return<void> ret = mStream->read(
-            bytes,
-            [&](Result r, const hidl_vec<uint8_t>& hidlData) {
+    Return<void> ret = mStream->prepareForReading(
+            1, bufferSize, ThreadPriority(mHalThreadPriority),
+            [&](Result r,
+                    const MQDescriptorSync<uint8_t>& dataMQ,
+                    const MQDescriptorSync<ReadStatus>& statusMQ) {
                 retval = r;
-                *read = std::min(hidlData.size(), bytes);
                 if (retval == Result::OK) {
-                    memcpy(buffer, &hidlData[0], *read);
+                    tempDataMQ.reset(new DataMQ(dataMQ));
+                    tempStatusMQ.reset(new StatusMQ(statusMQ));
+                    if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
+                        EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+                    }
                 }
             });
-    return processReturn("read", ret, retval);
+    if (!ret.getStatus().isOk() || retval != Result::OK) {
+        return processReturn("prepareForReading", ret, retval);
+    }
+    if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
+        || !mEfGroup) {
+        ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for reading");
+        ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for reading is invalid");
+        ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for reading");
+        ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
+                "Status message queue for reading is invalid");
+        ALOGE_IF(!mEfGroup, "Event flag creation for reading failed");
+        return NO_INIT;
+    }
+    mDataMQ = std::move(tempDataMQ);
+    mStatusMQ = std::move(tempStatusMQ);
+    return OK;
 }
 
 status_t StreamInHalHidl::getInputFramesLost(uint32_t *framesLost) {
diff --git a/media/libaudiohal/StreamHalHidl.h b/media/libaudiohal/StreamHalHidl.h
index 377acb4..0093957 100644
--- a/media/libaudiohal/StreamHalHidl.h
+++ b/media/libaudiohal/StreamHalHidl.h
@@ -20,6 +20,8 @@
 #include <android/hardware/audio/2.0/IStream.h>
 #include <android/hardware/audio/2.0/IStreamIn.h>
 #include <android/hardware/audio/2.0/IStreamOut.h>
+#include <fmq/EventFlag.h>
+#include <fmq/MessageQueue.h>
 #include <media/audiohal/StreamHalInterface.h>
 
 #include "ConversionHelperHidl.h"
@@ -27,7 +29,11 @@
 using ::android::hardware::audio::V2_0::IStream;
 using ::android::hardware::audio::V2_0::IStreamIn;
 using ::android::hardware::audio::V2_0::IStreamOut;
+using ::android::hardware::EventFlag;
+using ::android::hardware::MessageQueue;
 using ::android::hardware::Return;
+using ReadStatus = ::android::hardware::audio::V2_0::IStreamIn::ReadStatus;
+using WriteStatus = ::android::hardware::audio::V2_0::IStreamOut::WriteStatus;
 
 namespace android {
 
@@ -80,6 +86,10 @@
     // Get current read/write position in the mmap buffer
     virtual status_t getMmapPosition(struct audio_mmap_position *position);
 
+    // Set the priority of the thread that interacts with the HAL
+    // (must match the priority of the audioflinger's thread that calls 'read' / 'write')
+    virtual status_t setHalThreadPriority(int priority);
+
   protected:
     // Subclasses can not be constructed directly by clients.
     explicit StreamHalHidl(IStream *stream);
@@ -87,6 +97,8 @@
     // The destructor automatically closes the stream.
     virtual ~StreamHalHidl();
 
+    int mHalThreadPriority;
+
   private:
     IStream *mStream;
 };
@@ -143,14 +155,25 @@
 
   private:
     friend class DeviceHalHidl;
+    typedef MessageQueue<uint8_t, hardware::kSynchronizedReadWrite> DataMQ;
+    typedef MessageQueue<WriteStatus, hardware::kSynchronizedReadWrite> StatusMQ;
 
     wp<StreamOutHalInterfaceCallback> mCallback;
     sp<IStreamOut> mStream;
+    std::unique_ptr<DataMQ> mDataMQ;
+    std::unique_ptr<StatusMQ> mStatusMQ;
+    EventFlag* mEfGroup;
+    bool mGetPresentationPositionNotSupported;
+    uint64_t mPPosFromWriteObtained;
+    uint64_t mPPosFromWriteFrames;
+    struct timespec mPPosFromWriteTS;
 
     // Can not be constructed directly by clients.
     StreamOutHalHidl(const sp<IStreamOut>& stream);
 
     virtual ~StreamOutHalHidl();
+
+    status_t prepareForWriting(size_t bufferSize);
 };
 
 class StreamInHalHidl : public StreamInHalInterface, public StreamHalHidl {
@@ -173,13 +196,20 @@
 
   private:
     friend class DeviceHalHidl;
+    typedef MessageQueue<uint8_t, hardware::kSynchronizedReadWrite> DataMQ;
+    typedef MessageQueue<ReadStatus, hardware::kSynchronizedReadWrite> StatusMQ;
 
     sp<IStreamIn> mStream;
+    std::unique_ptr<DataMQ> mDataMQ;
+    std::unique_ptr<StatusMQ> mStatusMQ;
+    EventFlag* mEfGroup;
 
     // Can not be constructed directly by clients.
     StreamInHalHidl(const sp<IStreamIn>& stream);
 
     virtual ~StreamInHalHidl();
+
+    status_t prepareForReading(size_t bufferSize);
 };
 
 } // namespace android
diff --git a/media/libaudiohal/StreamHalLocal.cpp b/media/libaudiohal/StreamHalLocal.cpp
index 61c8898..b25e518 100644
--- a/media/libaudiohal/StreamHalLocal.cpp
+++ b/media/libaudiohal/StreamHalLocal.cpp
@@ -96,6 +96,12 @@
     return mStream->dump(mStream, fd);
 }
 
+status_t StreamHalLocal::setHalThreadPriority(int) {
+    // Don't need to do anything as local hal is executed by audioflinger directly
+    // on the same thread.
+    return OK;
+}
+
 StreamOutHalLocal::StreamOutHalLocal(audio_stream_out_t *stream, sp<DeviceHalLocal> device)
         : StreamHalLocal(&stream->common, device), mStream(stream) {
 }
diff --git a/media/libaudiohal/StreamHalLocal.h b/media/libaudiohal/StreamHalLocal.h
index fbb000a..8c96c1f 100644
--- a/media/libaudiohal/StreamHalLocal.h
+++ b/media/libaudiohal/StreamHalLocal.h
@@ -70,6 +70,10 @@
     // Get current read/write position in the mmap buffer
     virtual status_t getMmapPosition(struct audio_mmap_position *position) = 0;
 
+    // Set the priority of the thread that interacts with the HAL
+    // (must match the priority of the audioflinger's thread that calls 'read' / 'write')
+    virtual status_t setHalThreadPriority(int priority);
+
   protected:
     // Subclasses can not be constructed directly by clients.
     StreamHalLocal(audio_stream_t *stream, sp<DeviceHalLocal> device);