Re-implement HIDL stream read and write using FMQ
Result: no hwbinder calls due read / write session.
Test: make, perform Loopback RTT, check traces
Bug: 30222631
Change-Id: I4a8792525ec374111302cfd5c0a2e41f9f4cc418
diff --git a/media/libaudiohal/StreamHalHidl.cpp b/media/libaudiohal/StreamHalHidl.cpp
index 2c6e564..7f8d663 100644
--- a/media/libaudiohal/StreamHalHidl.cpp
+++ b/media/libaudiohal/StreamHalHidl.cpp
@@ -14,6 +14,8 @@
* limitations under the License.
*/
+#include <time.h>
+
#define LOG_TAG "StreamHalHidl"
//#define LOG_NDEBUG 0
@@ -28,18 +30,23 @@
using ::android::hardware::audio::common::V2_0::AudioFormat;
using ::android::hardware::audio::V2_0::AudioDrain;
using ::android::hardware::audio::V2_0::IStreamOutCallback;
-using ::android::hardware::audio::V2_0::ParameterValue;
-using ::android::hardware::audio::V2_0::Result;
-using ::android::hardware::audio::V2_0::TimeSpec;
+using ::android::hardware::audio::V2_0::MessageQueueFlagBits;
using ::android::hardware::audio::V2_0::MmapBufferInfo;
using ::android::hardware::audio::V2_0::MmapPosition;
+using ::android::hardware::audio::V2_0::ParameterValue;
+using ::android::hardware::audio::V2_0::Result;
+using ::android::hardware::audio::V2_0::ThreadPriority;
+using ::android::hardware::audio::V2_0::TimeSpec;
+using ::android::hardware::MQDescriptorSync;
using ::android::hardware::Return;
using ::android::hardware::Void;
namespace android {
StreamHalHidl::StreamHalHidl(IStream *stream)
- : ConversionHelperHidl("Stream"), mStream(stream) {
+ : ConversionHelperHidl("Stream"),
+ mHalThreadPriority(static_cast<int>(ThreadPriority::NORMAL)),
+ mStream(stream) {
}
StreamHalHidl::~StreamHalHidl() {
@@ -176,6 +183,11 @@
return processReturn("getMmapPosition", ret, retval);
}
+status_t StreamHalHidl::setHalThreadPriority(int priority) {
+ mHalThreadPriority = priority;
+ return OK;
+}
+
namespace {
/* Notes on callback ownership.
@@ -229,14 +241,21 @@
} // namespace
StreamOutHalHidl::StreamOutHalHidl(const sp<IStreamOut>& stream)
- : StreamHalHidl(stream.get()), mStream(stream) {
+ : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr),
+ mGetPresentationPositionNotSupported(false), mPPosFromWriteObtained(0) {
}
StreamOutHalHidl::~StreamOutHalHidl() {
- if (mCallback.unsafe_get() && mStream != 0) {
- processReturn("clearCallback", mStream->clearCallback());
+ if (mStream != 0) {
+ if (mCallback.unsafe_get()) {
+ processReturn("clearCallback", mStream->clearCallback());
+ }
+ processReturn("close", mStream->close());
}
mCallback.clear();
+ if (mEfGroup) {
+ EventFlag::deleteEventFlag(&mEfGroup);
+ }
}
status_t StreamOutHalHidl::getFrameSize(size_t *size) {
@@ -256,16 +275,89 @@
status_t StreamOutHalHidl::write(const void *buffer, size_t bytes, size_t *written) {
if (mStream == 0) return NO_INIT;
- hidl_vec<uint8_t> hidlData;
- hidlData.setToExternal(static_cast<uint8_t*>(const_cast<void*>(buffer)), bytes);
+ *written = 0;
+
+ if (bytes == 0 && !mDataMQ) {
+ // Can't determine the size for the MQ buffer. Wait for a non-empty write request.
+ ALOGW_IF(mCallback.unsafe_get(), "First call to async write with 0 bytes");
+ return OK;
+ }
+
+ status_t status;
+ if (!mDataMQ && (status = prepareForWriting(bytes)) != OK) {
+ return status;
+ }
+
+ const size_t availBytes = mDataMQ->availableToWrite();
+ if (bytes > availBytes) { bytes = availBytes; }
+ if (!mDataMQ->write(static_cast<const uint8_t*>(buffer), bytes)) {
+ ALOGW("data message queue write failed");
+ }
+ mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
+
+ // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+ uint32_t efState = 0;
+retry:
+ status_t ret = mEfGroup->wait(
+ static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState, NS_PER_SEC);
+ if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
+ WriteStatus writeStatus = { Result::NOT_INITIALIZED, 0, 0, { 0, 0 } };
+ mStatusMQ->read(&writeStatus);
+ if (writeStatus.retval == Result::OK) {
+ status = OK;
+ *written = writeStatus.written;
+ mPPosFromWriteFrames = writeStatus.frames;
+ mPPosFromWriteTS.tv_sec = writeStatus.timeStamp.tvSec;
+ mPPosFromWriteTS.tv_nsec = writeStatus.timeStamp.tvNSec;
+ struct timespec timeNow;
+ clock_gettime(CLOCK_MONOTONIC, &timeNow);
+ mPPosFromWriteObtained = timeNow.tv_sec * 1000000 + timeNow.tv_nsec / 1000;
+ } else {
+ status = processReturn("write", writeStatus.retval);
+ }
+ return status;
+ }
+ if (ret == -EAGAIN) {
+ // This normally retries no more than once.
+ goto retry;
+ }
+ return ret;
+}
+
+status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
+ std::unique_ptr<DataMQ> tempDataMQ;
+ std::unique_ptr<StatusMQ> tempStatusMQ;
Result retval;
- Return<void> ret = mStream->write(
- hidlData,
- [&](Result r, uint64_t w) {
+ Return<void> ret = mStream->prepareForWriting(
+ 1, bufferSize, ThreadPriority(mHalThreadPriority),
+ [&](Result r,
+ const MQDescriptorSync<uint8_t>& dataMQ,
+ const MQDescriptorSync<WriteStatus>& statusMQ) {
retval = r;
- *written = w;
+ if (retval == Result::OK) {
+ tempDataMQ.reset(new DataMQ(dataMQ));
+ tempStatusMQ.reset(new StatusMQ(statusMQ));
+ if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
+ EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+ }
+ }
});
- return processReturn("write", ret, retval);
+ if (!ret.getStatus().isOk() || retval != Result::OK) {
+ return processReturn("prepareForWriting", ret, retval);
+ }
+ if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
+ || !mEfGroup) {
+ ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
+ ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
+ ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
+ ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
+ "Status message queue for writing is invalid");
+ ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
+ return NO_INIT;
+ }
+ mDataMQ = std::move(tempDataMQ);
+ mStatusMQ = std::move(tempStatusMQ);
+ return OK;
}
status_t StreamOutHalHidl::getRenderPosition(uint32_t *dspFrames) {
@@ -342,6 +434,18 @@
status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct timespec *timestamp) {
if (mStream == 0) return NO_INIT;
+ if (mGetPresentationPositionNotSupported) return INVALID_OPERATION;
+ struct timespec timeNow;
+ clock_gettime(CLOCK_MONOTONIC, &timeNow);
+ uint64_t timeStampNow = timeNow.tv_sec * 1000000 + timeNow.tv_nsec / 1000;
+ if (timeStampNow - mPPosFromWriteObtained <= 1000) {
+ // No more than 1 ms passed since the last write, use cached result to avoid binder calls.
+ *frames = mPPosFromWriteFrames;
+ timestamp->tv_sec = mPPosFromWriteTS.tv_sec;
+ timestamp->tv_nsec = mPPosFromWriteTS.tv_nsec;
+ return OK;
+ }
+
Result retval;
Return<void> ret = mStream->getPresentationPosition(
[&](Result r, uint64_t hidlFrames, const TimeSpec& hidlTimeStamp) {
@@ -352,6 +456,9 @@
timestamp->tv_nsec = hidlTimeStamp.tvNSec;
}
});
+ if (ret.getStatus().isOk() && retval == Result::NOT_SUPPORTED) {
+ mGetPresentationPositionNotSupported = true;
+ }
return processReturn("getPresentationPosition", ret, retval);
}
@@ -378,10 +485,16 @@
StreamInHalHidl::StreamInHalHidl(const sp<IStreamIn>& stream)
- : StreamHalHidl(stream.get()), mStream(stream) {
+ : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr) {
}
StreamInHalHidl::~StreamInHalHidl() {
+ if (mStream != 0) {
+ processReturn("close", mStream->close());
+ }
+ if (mEfGroup) {
+ EventFlag::deleteEventFlag(&mEfGroup);
+ }
}
status_t StreamInHalHidl::getFrameSize(size_t *size) {
@@ -396,17 +509,83 @@
status_t StreamInHalHidl::read(void *buffer, size_t bytes, size_t *read) {
if (mStream == 0) return NO_INIT;
+ *read = 0;
+
+ if (bytes == 0 && !mDataMQ) {
+ // Can't determine the size for the MQ buffer. Wait for a non-empty read request.
+ return OK;
+ }
+
+ status_t status;
+ if (!mDataMQ) {
+ if ((status = prepareForReading(bytes)) != OK) return status;
+ // Trigger the first read.
+ mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+ }
+
+ // TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
+ uint32_t efState = 0;
+retry:
+ status_t ret = mEfGroup->wait(
+ static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState, NS_PER_SEC);
+ if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)) {
+ ReadStatus readStatus = { Result::NOT_INITIALIZED, 0 };
+ const size_t availToRead = mDataMQ->availableToRead();
+ if (bytes > availToRead) { bytes = availToRead; }
+ mDataMQ->read(static_cast<uint8_t*>(buffer), bytes);
+ mStatusMQ->read(&readStatus);
+ mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+ if (readStatus.retval == Result::OK) {
+ ALOGW_IF(availToRead != readStatus.read,
+ "HAL read report inconsistent: mq = %d, status = %d",
+ (int32_t)availToRead, (int32_t)readStatus.read);
+ *read = readStatus.read;
+ } else {
+ status = processReturn("read", readStatus.retval);
+ }
+ return status;
+ }
+ if (ret == -EAGAIN) {
+ // This normally retries no more than once.
+ goto retry;
+ }
+ return ret;
+}
+
+status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
+ std::unique_ptr<DataMQ> tempDataMQ;
+ std::unique_ptr<StatusMQ> tempStatusMQ;
Result retval;
- Return<void> ret = mStream->read(
- bytes,
- [&](Result r, const hidl_vec<uint8_t>& hidlData) {
+ Return<void> ret = mStream->prepareForReading(
+ 1, bufferSize, ThreadPriority(mHalThreadPriority),
+ [&](Result r,
+ const MQDescriptorSync<uint8_t>& dataMQ,
+ const MQDescriptorSync<ReadStatus>& statusMQ) {
retval = r;
- *read = std::min(hidlData.size(), bytes);
if (retval == Result::OK) {
- memcpy(buffer, &hidlData[0], *read);
+ tempDataMQ.reset(new DataMQ(dataMQ));
+ tempStatusMQ.reset(new StatusMQ(statusMQ));
+ if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
+ EventFlag::createEventFlag(tempDataMQ->getEventFlagWord(), &mEfGroup);
+ }
}
});
- return processReturn("read", ret, retval);
+ if (!ret.getStatus().isOk() || retval != Result::OK) {
+ return processReturn("prepareForReading", ret, retval);
+ }
+ if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
+ || !mEfGroup) {
+ ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for reading");
+ ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for reading is invalid");
+ ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for reading");
+ ALOGE_IF(tempStatusMQ && !tempStatusMQ->isValid(),
+ "Status message queue for reading is invalid");
+ ALOGE_IF(!mEfGroup, "Event flag creation for reading failed");
+ return NO_INIT;
+ }
+ mDataMQ = std::move(tempDataMQ);
+ mStatusMQ = std::move(tempStatusMQ);
+ return OK;
}
status_t StreamInHalHidl::getInputFramesLost(uint32_t *framesLost) {