audiohal: Make sure audio data transfer related commands go via FMQ
When outputting audio, the framework issues several HAL calls
from the same thread that writes into data FMQ. These calls
also need to be served on the same thread that writes audio data
to HAL. The same thing happens when audio input is commenced.
Add a command FMQ for passing different commands to the HAL thread.
This way, depending on the calling thread, the same call may go
either via hwbinder or via the command queue.
This dramatically reduces jitter in RTT measurements (although
doesn't improve the latency).
Bug: 30222631
Test: scripted RTT app
Change-Id: Ic212e33d4e2b1cf404973fe9d60762523eef9f40
diff --git a/media/libaudiohal/StreamHalHidl.cpp b/media/libaudiohal/StreamHalHidl.cpp
index 9ee8fa8..36ea1be 100644
--- a/media/libaudiohal/StreamHalHidl.cpp
+++ b/media/libaudiohal/StreamHalHidl.cpp
@@ -14,8 +14,6 @@
* limitations under the License.
*/
-#include <time.h>
-
#define LOG_TAG "StreamHalHidl"
//#define LOG_NDEBUG 0
@@ -40,6 +38,7 @@
using ::android::hardware::MQDescriptorSync;
using ::android::hardware::Return;
using ::android::hardware::Void;
+using ReadCommand = ::android::hardware::audio::V2_0::IStreamIn::ReadCommand;
namespace android {
@@ -241,8 +240,7 @@
} // namespace
StreamOutHalHidl::StreamOutHalHidl(const sp<IStreamOut>& stream)
- : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr),
- mGetPresentationPositionNotSupported(false), mPPosFromWrite{ 0, OK, 0, { 0, 0 } } {
+ : StreamHalHidl(stream.get()), mStream(stream), mWriterClient(0), mEfGroup(nullptr) {
}
StreamOutHalHidl::~StreamOutHalHidl() {
@@ -265,7 +263,15 @@
status_t StreamOutHalHidl::getLatency(uint32_t *latency) {
if (mStream == 0) return NO_INIT;
- return processReturn("getLatency", mStream->getLatency(), latency);
+ if (mWriterClient == gettid() && mCommandMQ) {
+ return callWriterThread(
+ WriteCommand::GET_LATENCY, "getLatency", nullptr, 0,
+ [&](const WriteStatus& writeStatus) {
+ *latency = writeStatus.reply.latencyMs;
+ });
+ } else {
+ return processReturn("getLatency", mStream->getLatency(), latency);
+ }
}
status_t StreamOutHalHidl::setVolume(float left, float right) {
@@ -288,10 +294,30 @@
return status;
}
- const size_t availBytes = mDataMQ->availableToWrite();
- if (bytes > availBytes) { bytes = availBytes; }
- if (!mDataMQ->write(static_cast<const uint8_t*>(buffer), bytes)) {
- ALOGW("data message queue write failed");
+ return callWriterThread(
+ WriteCommand::WRITE, "write", static_cast<const uint8_t*>(buffer), bytes,
+ [&] (const WriteStatus& writeStatus) {
+ *written = writeStatus.reply.written;
+ });
+}
+
+status_t StreamOutHalHidl::callWriterThread(
+ WriteCommand cmd, const char* cmdName,
+ const uint8_t* data, size_t dataSize, StreamOutHalHidl::WriterCallback callback) {
+ if (!mCommandMQ->write(&cmd)) {
+ ALOGE("command message queue write failed for \"%s\"", cmdName);
+ return -EAGAIN;
+ }
+ if (data != nullptr) {
+ size_t availableToWrite = mDataMQ->availableToWrite();
+ if (dataSize > availableToWrite) {
+ ALOGW("truncating write data from %d to %d due to insufficient data queue space",
+ (int32_t)dataSize, (int32_t)availableToWrite);
+ dataSize = availableToWrite;
+ }
+ if (!mDataMQ->write(data, dataSize)) {
+ ALOGE("data message queue write failed for \"%s\"", cmdName);
+ }
}
mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY));
@@ -301,24 +327,18 @@
status_t ret = mEfGroup->wait(
static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL), &efState, NS_PER_SEC);
if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL)) {
- WriteStatus writeStatus =
- { Result::NOT_INITIALIZED, 0, Result::NOT_INITIALIZED, 0, { 0, 0 } };
- mStatusMQ->read(&writeStatus);
- if (writeStatus.writeRetval == Result::OK) {
- status = OK;
- *written = writeStatus.written;
- mPPosFromWrite.status = processReturn(
- "get_presentation_position", writeStatus.presentationPositionRetval);
- if (mPPosFromWrite.status == OK) {
- mPPosFromWrite.frames = writeStatus.frames;
- mPPosFromWrite.ts.tv_sec = writeStatus.timeStamp.tvSec;
- mPPosFromWrite.ts.tv_nsec = writeStatus.timeStamp.tvNSec;
- }
- mPPosFromWrite.obtained = getCurrentTimeMs();
- } else {
- status = processReturn("write", writeStatus.writeRetval);
+ WriteStatus writeStatus;
+ writeStatus.retval = Result::NOT_INITIALIZED;
+ if (!mStatusMQ->read(&writeStatus)) {
+ ALOGE("status message read failed for \"%s\"", cmdName);
}
- return status;
+ if (writeStatus.retval == Result::OK) {
+ ret = OK;
+ callback(writeStatus);
+ } else {
+ ret = processReturn(cmdName, writeStatus.retval);
+ }
+ return ret;
}
if (ret == -EAGAIN) {
// This normally retries no more than once.
@@ -327,23 +347,20 @@
return ret;
}
-uint64_t StreamOutHalHidl::getCurrentTimeMs() {
- struct timespec timeNow;
- clock_gettime(CLOCK_MONOTONIC, &timeNow);
- return timeNow.tv_sec * 1000000 + timeNow.tv_nsec / 1000;
-}
-
status_t StreamOutHalHidl::prepareForWriting(size_t bufferSize) {
+ std::unique_ptr<CommandMQ> tempCommandMQ;
std::unique_ptr<DataMQ> tempDataMQ;
std::unique_ptr<StatusMQ> tempStatusMQ;
Result retval;
Return<void> ret = mStream->prepareForWriting(
1, bufferSize, ThreadPriority(mHalThreadPriority),
[&](Result r,
+ const CommandMQ::Descriptor& commandMQ,
const DataMQ::Descriptor& dataMQ,
const StatusMQ::Descriptor& statusMQ) {
retval = r;
if (retval == Result::OK) {
+ tempCommandMQ.reset(new CommandMQ(commandMQ));
tempDataMQ.reset(new DataMQ(dataMQ));
tempStatusMQ.reset(new StatusMQ(statusMQ));
if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
@@ -354,8 +371,13 @@
if (!ret.isOk() || retval != Result::OK) {
return processReturn("prepareForWriting", ret, retval);
}
- if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
- || !mEfGroup) {
+ if (!tempCommandMQ || !tempCommandMQ->isValid() ||
+ !tempDataMQ || !tempDataMQ->isValid() ||
+ !tempStatusMQ || !tempStatusMQ->isValid() ||
+ !mEfGroup) {
+ ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
+ ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
+ "Command message queue for writing is invalid");
ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for writing");
ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for writing is invalid");
ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for writing");
@@ -364,8 +386,10 @@
ALOGE_IF(!mEfGroup, "Event flag creation for writing failed");
return NO_INIT;
}
+ mCommandMQ = std::move(tempCommandMQ);
mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ);
+ mWriterClient = gettid();
return OK;
}
@@ -443,31 +467,27 @@
status_t StreamOutHalHidl::getPresentationPosition(uint64_t *frames, struct timespec *timestamp) {
if (mStream == 0) return NO_INIT;
- if (mGetPresentationPositionNotSupported) return INVALID_OPERATION;
- if (getCurrentTimeMs() - mPPosFromWrite.obtained <= 1000) {
- // No more than 1 ms passed since the last write, use cached result to avoid binder calls.
- if (mPPosFromWrite.status == OK) {
- *frames = mPPosFromWrite.frames;
- timestamp->tv_sec = mPPosFromWrite.ts.tv_sec;
- timestamp->tv_nsec = mPPosFromWrite.ts.tv_nsec;
- }
- return mPPosFromWrite.status;
+ if (mWriterClient == gettid() && mCommandMQ) {
+ return callWriterThread(
+ WriteCommand::GET_PRESENTATION_POSITION, "getPresentationPosition", nullptr, 0,
+ [&](const WriteStatus& writeStatus) {
+ *frames = writeStatus.reply.presentationPosition.frames;
+ timestamp->tv_sec = writeStatus.reply.presentationPosition.timeStamp.tvSec;
+ timestamp->tv_nsec = writeStatus.reply.presentationPosition.timeStamp.tvNSec;
+ });
+ } else {
+ Result retval;
+ Return<void> ret = mStream->getPresentationPosition(
+ [&](Result r, uint64_t hidlFrames, const TimeSpec& hidlTimeStamp) {
+ retval = r;
+ if (retval == Result::OK) {
+ *frames = hidlFrames;
+ timestamp->tv_sec = hidlTimeStamp.tvSec;
+ timestamp->tv_nsec = hidlTimeStamp.tvNSec;
+ }
+ });
+ return processReturn("getPresentationPosition", ret, retval);
}
-
- Result retval;
- Return<void> ret = mStream->getPresentationPosition(
- [&](Result r, uint64_t hidlFrames, const TimeSpec& hidlTimeStamp) {
- retval = r;
- if (retval == Result::OK) {
- *frames = hidlFrames;
- timestamp->tv_sec = hidlTimeStamp.tvSec;
- timestamp->tv_nsec = hidlTimeStamp.tvNSec;
- }
- });
- if (ret.isOk() && retval == Result::NOT_SUPPORTED) {
- mGetPresentationPositionNotSupported = true;
- }
- return processReturn("getPresentationPosition", ret, retval);
}
void StreamOutHalHidl::onWriteReady() {
@@ -493,7 +513,7 @@
StreamInHalHidl::StreamInHalHidl(const sp<IStreamIn>& stream)
- : StreamHalHidl(stream.get()), mStream(stream), mEfGroup(nullptr) {
+ : StreamHalHidl(stream.get()), mStream(stream), mReaderClient(0), mEfGroup(nullptr) {
}
StreamInHalHidl::~StreamInHalHidl() {
@@ -525,33 +545,53 @@
}
status_t status;
- if (!mDataMQ) {
- if ((status = prepareForReading(bytes)) != OK) return status;
- // Trigger the first read.
- mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+ if (!mDataMQ && (status = prepareForReading(bytes)) != OK) {
+ return status;
}
+ ReadParameters params;
+ params.command = ReadCommand::READ;
+ params.params.read = bytes;
+ return callReaderThread(params, "read",
+ [&](const ReadStatus& readStatus) {
+ const size_t availToRead = mDataMQ->availableToRead();
+ if (!mDataMQ->read(static_cast<uint8_t*>(buffer), std::min(bytes, availToRead))) {
+ ALOGE("data message queue read failed for \"read\"");
+ }
+ ALOGW_IF(availToRead != readStatus.reply.read,
+ "HAL read report inconsistent: mq = %d, status = %d",
+ (int32_t)availToRead, (int32_t)readStatus.reply.read);
+ *read = readStatus.reply.read;
+ });
+}
+
+status_t StreamInHalHidl::callReaderThread(
+ const ReadParameters& params, const char* cmdName,
+ StreamInHalHidl::ReaderCallback callback) {
+ if (!mCommandMQ->write(¶ms)) {
+ ALOGW("command message queue write failed");
+ return -EAGAIN;
+ }
+ mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
+
// TODO: Remove manual event flag handling once blocking MQ is implemented. b/33815422
uint32_t efState = 0;
retry:
status_t ret = mEfGroup->wait(
static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY), &efState, NS_PER_SEC);
if (efState & static_cast<uint32_t>(MessageQueueFlagBits::NOT_EMPTY)) {
- ReadStatus readStatus = { Result::NOT_INITIALIZED, 0 };
- const size_t availToRead = mDataMQ->availableToRead();
- if (bytes > availToRead) { bytes = availToRead; }
- mDataMQ->read(static_cast<uint8_t*>(buffer), bytes);
- mStatusMQ->read(&readStatus);
- mEfGroup->wake(static_cast<uint32_t>(MessageQueueFlagBits::NOT_FULL));
- if (readStatus.retval == Result::OK) {
- ALOGW_IF(availToRead != readStatus.read,
- "HAL read report inconsistent: mq = %d, status = %d",
- (int32_t)availToRead, (int32_t)readStatus.read);
- *read = readStatus.read;
- } else {
- status = processReturn("read", readStatus.retval);
+ ReadStatus readStatus;
+ readStatus.retval = Result::NOT_INITIALIZED;
+ if (!mStatusMQ->read(&readStatus)) {
+ ALOGE("status message read failed for \"%s\"", cmdName);
}
- return status;
+ if (readStatus.retval == Result::OK) {
+ ret = OK;
+ callback(readStatus);
+ } else {
+ ret = processReturn(cmdName, readStatus.retval);
+ }
+ return ret;
}
if (ret == -EAGAIN) {
// This normally retries no more than once.
@@ -561,16 +601,19 @@
}
status_t StreamInHalHidl::prepareForReading(size_t bufferSize) {
+ std::unique_ptr<CommandMQ> tempCommandMQ;
std::unique_ptr<DataMQ> tempDataMQ;
std::unique_ptr<StatusMQ> tempStatusMQ;
Result retval;
Return<void> ret = mStream->prepareForReading(
1, bufferSize, ThreadPriority(mHalThreadPriority),
[&](Result r,
+ const CommandMQ::Descriptor& commandMQ,
const DataMQ::Descriptor& dataMQ,
const StatusMQ::Descriptor& statusMQ) {
retval = r;
if (retval == Result::OK) {
+ tempCommandMQ.reset(new CommandMQ(commandMQ));
tempDataMQ.reset(new DataMQ(dataMQ));
tempStatusMQ.reset(new StatusMQ(statusMQ));
if (tempDataMQ->isValid() && tempDataMQ->getEventFlagWord()) {
@@ -581,8 +624,13 @@
if (!ret.isOk() || retval != Result::OK) {
return processReturn("prepareForReading", ret, retval);
}
- if (!tempDataMQ || !tempDataMQ->isValid() || !tempStatusMQ || !tempStatusMQ->isValid()
- || !mEfGroup) {
+ if (!tempCommandMQ || !tempCommandMQ->isValid() ||
+ !tempDataMQ || !tempDataMQ->isValid() ||
+ !tempStatusMQ || !tempStatusMQ->isValid() ||
+ !mEfGroup) {
+ ALOGE_IF(!tempCommandMQ, "Failed to obtain command message queue for writing");
+ ALOGE_IF(tempCommandMQ && !tempCommandMQ->isValid(),
+ "Command message queue for writing is invalid");
ALOGE_IF(!tempDataMQ, "Failed to obtain data message queue for reading");
ALOGE_IF(tempDataMQ && !tempDataMQ->isValid(), "Data message queue for reading is invalid");
ALOGE_IF(!tempStatusMQ, "Failed to obtain status message queue for reading");
@@ -591,8 +639,10 @@
ALOGE_IF(!mEfGroup, "Event flag creation for reading failed");
return NO_INIT;
}
+ mCommandMQ = std::move(tempCommandMQ);
mDataMQ = std::move(tempDataMQ);
mStatusMQ = std::move(tempStatusMQ);
+ mReaderClient = gettid();
return OK;
}
@@ -603,16 +653,26 @@
status_t StreamInHalHidl::getCapturePosition(int64_t *frames, int64_t *time) {
if (mStream == 0) return NO_INIT;
- Result retval;
- Return<void> ret = mStream->getCapturePosition(
- [&](Result r, uint64_t hidlFrames, uint64_t hidlTime) {
- retval = r;
- if (retval == Result::OK) {
- *frames = hidlFrames;
- *time = hidlTime;
- }
- });
- return processReturn("getCapturePosition", ret, retval);
+ if (mReaderClient == gettid() && mCommandMQ) {
+ ReadParameters params;
+ params.command = ReadCommand::GET_CAPTURE_POSITION;
+ return callReaderThread(params, "getCapturePosition",
+ [&](const ReadStatus& readStatus) {
+ *frames = readStatus.reply.capturePosition.frames;
+ *time = readStatus.reply.capturePosition.time;
+ });
+ } else {
+ Result retval;
+ Return<void> ret = mStream->getCapturePosition(
+ [&](Result r, uint64_t hidlFrames, uint64_t hidlTime) {
+ retval = r;
+ if (retval == Result::OK) {
+ *frames = hidlFrames;
+ *time = hidlTime;
+ }
+ });
+ return processReturn("getCapturePosition", ret, retval);
+ }
}
} // namespace android