aaudio: lock transport methods

The start/pause/stop/flush/close and other binder methods
need to be thread safe. They do not need to run
in parallel. So a lock was added for each.

Where virtual methods are needed, the locked method calls
a corresponding _l submethod, eg. stop() calls stop_l().

The close logic was also simplified because the "pending"
technique is not needed now that we have the locks.
It was only needed because a close could have occured
while in the middle of another method.

This CL was merged with changes in RVC-DEV.

Bug: 153358911
Test: adb logcat *:F
Test: in another window:  test_binder_attack
Test: There should be no fatal error in the logcat.
Test: atest CtsNativeMediaAAudioTestCases
Change-Id: I5920cf78af4501856756c5c2fc8e77758232508a
diff --git a/services/oboeservice/AAudioService.cpp b/services/oboeservice/AAudioService.cpp
index ecbcb7e..82b12d6 100644
--- a/services/oboeservice/AAudioService.cpp
+++ b/services/oboeservice/AAudioService.cpp
@@ -23,7 +23,6 @@
 #include <sstream>
 
 #include <aaudio/AAudio.h>
-#include <mediautils/SchedulingPolicyService.h>
 #include <mediautils/ServiceUtilities.h>
 #include <utils/String16.h>
 
@@ -162,28 +161,6 @@
     }
 }
 
-// If a close request is pending then close the stream
-bool AAudioService::releaseStream(const sp<AAudioServiceStreamBase> &serviceStream) {
-    bool closed = false;
-    // decrementAndRemoveStreamByHandle() uses a lock so that if there are two simultaneous closes
-    // then only one will get the pointer and do the close.
-    sp<AAudioServiceStreamBase> foundStream = mStreamTracker.decrementAndRemoveStreamByHandle(
-            serviceStream->getHandle());
-    if (foundStream.get() != nullptr) {
-        foundStream->close();
-        pid_t pid = foundStream->getOwnerProcessId();
-        AAudioClientTracker::getInstance().unregisterClientStream(pid, foundStream);
-        closed = true;
-    }
-    return closed;
-}
-
-aaudio_result_t AAudioService::checkForPendingClose(
-        const sp<AAudioServiceStreamBase> &serviceStream,
-        aaudio_result_t defaultResult) {
-    return releaseStream(serviceStream) ? AAUDIO_ERROR_INVALID_STATE : defaultResult;
-}
-
 aaudio_result_t AAudioService::closeStream(aaudio_handle_t streamHandle) {
     // Check permission and ownership first.
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
@@ -195,17 +172,20 @@
 }
 
 aaudio_result_t AAudioService::closeStream(sp<AAudioServiceStreamBase> serviceStream) {
+    // This is protected by a lock in AAudioClientTracker.
+    // It is safe to unregister the same stream twice.
     pid_t pid = serviceStream->getOwnerProcessId();
     AAudioClientTracker::getInstance().unregisterClientStream(pid, serviceStream);
+    // This is protected by a lock in mStreamTracker.
+    // It is safe to remove the same stream twice.
+    mStreamTracker.removeStreamByHandle(serviceStream->getHandle());
 
-    serviceStream->markCloseNeeded();
-    (void) releaseStream(serviceStream);
-    return AAUDIO_OK;
+    return serviceStream->close();
 }
 
 sp<AAudioServiceStreamBase> AAudioService::convertHandleToServiceStream(
         aaudio_handle_t streamHandle) {
-    sp<AAudioServiceStreamBase> serviceStream = mStreamTracker.getStreamByHandleAndIncrement(
+    sp<AAudioServiceStreamBase> serviceStream = mStreamTracker.getStreamByHandle(
             streamHandle);
     if (serviceStream.get() != nullptr) {
         // Only allow owner or the aaudio service to access the stream.
@@ -218,8 +198,6 @@
         if (!allowed) {
             ALOGE("AAudioService: calling uid %d cannot access stream 0x%08X owned by %d",
                   callingUserId, streamHandle, ownerUserId);
-            // We incremented the reference count so we must check if it needs to be closed.
-            checkForPendingClose(serviceStream, AAUDIO_OK);
             serviceStream.clear();
         }
     }
@@ -234,96 +212,66 @@
         ALOGE("getStreamDescription(), illegal stream handle = 0x%0x", streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-
-    aaudio_result_t result = serviceStream->getDescription(parcelable);
-    // parcelable.dump();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->getDescription(parcelable);
 }
 
 aaudio_result_t AAudioService::startStream(aaudio_handle_t streamHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("startStream(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-
-    aaudio_result_t result = serviceStream->start();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->start();
 }
 
 aaudio_result_t AAudioService::pauseStream(aaudio_handle_t streamHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("pauseStream(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->pause();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->pause();
 }
 
 aaudio_result_t AAudioService::stopStream(aaudio_handle_t streamHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("stopStream(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->stop();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->stop();
 }
 
 aaudio_result_t AAudioService::flushStream(aaudio_handle_t streamHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("flushStream(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->flush();
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->flush();
 }
 
 aaudio_result_t AAudioService::registerAudioThread(aaudio_handle_t streamHandle,
                                                    pid_t clientThreadId,
-                                                   int64_t periodNanoseconds) {
-    aaudio_result_t result = AAUDIO_OK;
+                                                   int64_t /* periodNanoseconds */) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("registerAudioThread(), illegal stream handle = 0x%0x", streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    if (serviceStream->getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) {
-        ALOGE("AAudioService::registerAudioThread(), thread already registered");
-        result = AAUDIO_ERROR_INVALID_STATE;
-    } else {
-        const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
-        int32_t priority = isCallerInService()
-            ? kRealTimeAudioPriorityService : kRealTimeAudioPriorityClient;
-        serviceStream->setRegisteredThread(clientThreadId);
-        int err = android::requestPriority(ownerPid, clientThreadId,
-                                           priority, true /* isForApp */);
-        if (err != 0) {
-            ALOGE("AAudioService::registerAudioThread(%d) failed, errno = %d, priority = %d",
-                  clientThreadId, errno, priority);
-            result = AAUDIO_ERROR_INTERNAL;
-        }
-    }
-    return checkForPendingClose(serviceStream, result);
+    int32_t priority = isCallerInService()
+        ? kRealTimeAudioPriorityService : kRealTimeAudioPriorityClient;
+    return serviceStream->registerAudioThread(clientThreadId, priority);
 }
 
 aaudio_result_t AAudioService::unregisterAudioThread(aaudio_handle_t streamHandle,
                                                      pid_t clientThreadId) {
-    aaudio_result_t result = AAUDIO_OK;
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("%s(), illegal stream handle = 0x%0x", __func__, streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    if (serviceStream->getRegisteredThread() != clientThreadId) {
-        ALOGE("%s(), wrong thread", __func__);
-        result = AAUDIO_ERROR_ILLEGAL_ARGUMENT;
-    } else {
-        serviceStream->setRegisteredThread(0);
-    }
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->unregisterAudioThread(clientThreadId);
 }
 
 aaudio_result_t AAudioService::startClient(aaudio_handle_t streamHandle,
@@ -332,22 +280,20 @@
                                            audio_port_handle_t *clientHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("%s(), illegal stream handle = 0x%0x", __func__, streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->startClient(client, attr, clientHandle);
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->startClient(client, attr, clientHandle);
 }
 
 aaudio_result_t AAudioService::stopClient(aaudio_handle_t streamHandle,
                                           audio_port_handle_t portHandle) {
     sp<AAudioServiceStreamBase> serviceStream = convertHandleToServiceStream(streamHandle);
     if (serviceStream.get() == nullptr) {
-        ALOGE("%s(), illegal stream handle = 0x%0x", __func__, streamHandle);
+        ALOGW("%s(), invalid streamHandle = 0x%0x", __func__, streamHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
-    aaudio_result_t result = serviceStream->stopClient(portHandle);
-    return checkForPendingClose(serviceStream, result);
+    return serviceStream->stopClient(portHandle);
 }
 
 // This is only called internally when AudioFlinger wants to tear down a stream.
@@ -355,12 +301,13 @@
 aaudio_result_t AAudioService::disconnectStreamByPortHandle(audio_port_handle_t portHandle) {
     ALOGD("%s(%d) called", __func__, portHandle);
     sp<AAudioServiceStreamBase> serviceStream =
-            mStreamTracker.findStreamByPortHandleAndIncrement(portHandle);
+            mStreamTracker.findStreamByPortHandle(portHandle);
     if (serviceStream.get() == nullptr) {
         ALOGE("%s(), could not find stream with portHandle = %d", __func__, portHandle);
         return AAUDIO_ERROR_INVALID_HANDLE;
     }
+    // This is protected by a lock and will just return if already stopped.
     aaudio_result_t result = serviceStream->stop();
     serviceStream->disconnect();
-    return checkForPendingClose(serviceStream, result);
+    return result;
 }
diff --git a/services/oboeservice/AAudioService.h b/services/oboeservice/AAudioService.h
index 6a2ac1f..caf48a5 100644
--- a/services/oboeservice/AAudioService.h
+++ b/services/oboeservice/AAudioService.h
@@ -114,11 +114,6 @@
     sp<aaudio::AAudioServiceStreamBase> convertHandleToServiceStream(
             aaudio::aaudio_handle_t streamHandle);
 
-    bool releaseStream(const sp<aaudio::AAudioServiceStreamBase> &serviceStream);
-
-    aaudio_result_t checkForPendingClose(const sp<aaudio::AAudioServiceStreamBase> &serviceStream,
-                                         aaudio_result_t defaultResult);
-
     android::AudioClient            mAudioClient;
 
     aaudio::AAudioStreamTracker     mStreamTracker;
diff --git a/services/oboeservice/AAudioServiceEndpoint.cpp b/services/oboeservice/AAudioServiceEndpoint.cpp
index b09cbf4..15cbd82 100644
--- a/services/oboeservice/AAudioServiceEndpoint.cpp
+++ b/services/oboeservice/AAudioServiceEndpoint.cpp
@@ -90,14 +90,16 @@
 std::vector<android::sp<AAudioServiceStreamBase>>
         AAudioServiceEndpoint::disconnectRegisteredStreams() {
     std::vector<android::sp<AAudioServiceStreamBase>> streamsDisconnected;
-    std::lock_guard<std::mutex> lock(mLockStreams);
+    {
+        std::lock_guard<std::mutex> lock(mLockStreams);
+        mRegisteredStreams.swap(streamsDisconnected);
+    }
     mConnected.store(false);
-    for (const auto &stream : mRegisteredStreams) {
+    for (const auto &stream : streamsDisconnected) {
         ALOGD("%s() - stop and disconnect port %d", __func__, stream->getPortHandle());
         stream->stop();
         stream->disconnect();
     }
-    mRegisteredStreams.swap(streamsDisconnected);
     return streamsDisconnected;
 }
 
diff --git a/services/oboeservice/AAudioServiceEndpointShared.cpp b/services/oboeservice/AAudioServiceEndpointShared.cpp
index 21253c8..dc21886 100644
--- a/services/oboeservice/AAudioServiceEndpointShared.cpp
+++ b/services/oboeservice/AAudioServiceEndpointShared.cpp
@@ -168,13 +168,11 @@
 
 aaudio_result_t AAudioServiceEndpointShared::stopStream(sp<AAudioServiceStreamBase> sharedStream,
                                                         audio_port_handle_t clientHandle) {
-    // Don't lock here because the disconnectRegisteredStreams also uses the lock.
-
     // Ignore result.
     (void) getStreamInternal()->stopClient(clientHandle);
 
     if (--mRunningStreamCount == 0) { // atomic
-        stopSharingThread();
+        stopSharingThread(); // the sharing thread locks mLockStreams
         getStreamInternal()->requestStop();
     }
     return AAUDIO_OK;
diff --git a/services/oboeservice/AAudioServiceStreamBase.cpp b/services/oboeservice/AAudioServiceStreamBase.cpp
index dba9fb9..e750e81 100644
--- a/services/oboeservice/AAudioServiceStreamBase.cpp
+++ b/services/oboeservice/AAudioServiceStreamBase.cpp
@@ -24,6 +24,7 @@
 
 #include <media/MediaMetricsItem.h>
 #include <media/TypeConverter.h>
+#include <mediautils/SchedulingPolicyService.h>
 
 #include "binding/IAAudioService.h"
 #include "binding/AAudioServiceMessage.h"
@@ -169,11 +170,16 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::close() {
+    std::lock_guard<std::mutex> lock(mLock);
+    return close_l();
+}
+
+aaudio_result_t AAudioServiceStreamBase::close_l() {
     if (getState() == AAUDIO_STREAM_STATE_CLOSED) {
         return AAUDIO_OK;
     }
 
-    stop();
+    stop_l();
 
     aaudio_result_t result = AAUDIO_OK;
     sp<AAudioServiceEndpoint> endpoint = mServiceEndpointWeak.promote();
@@ -185,7 +191,7 @@
         endpointManager.closeEndpoint(endpoint);
 
         // AAudioService::closeStream() prevents two threads from closing at the same time.
-        mServiceEndpoint.clear(); // endpoint will hold the pointer until this method returns.
+        mServiceEndpoint.clear(); // endpoint will hold the pointer after this method returns.
     }
 
     {
@@ -219,9 +225,18 @@
  * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete.
  */
 aaudio_result_t AAudioServiceStreamBase::start() {
+    std::lock_guard<std::mutex> lock(mLock);
+
     const int64_t beginNs = AudioClock::getNanoseconds();
     aaudio_result_t result = AAUDIO_OK;
 
+    if (auto state = getState();
+        state == AAUDIO_STREAM_STATE_CLOSED || state == AAUDIO_STREAM_STATE_DISCONNECTED) {
+        ALOGW("%s() already CLOSED, returns INVALID_STATE, handle = %d",
+                __func__, getHandle());
+        return AAUDIO_ERROR_INVALID_STATE;
+    }
+
     mediametrics::Defer defer([&] {
         mediametrics::LogItem(mMetricsId)
             .set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_START)
@@ -231,7 +246,7 @@
             .record(); });
 
     if (isRunning()) {
-        return AAUDIO_OK;
+        return result;
     }
 
     setFlowing(false);
@@ -254,16 +269,21 @@
     return result;
 
 error:
-    disconnect();
+    disconnect_l();
     return result;
 }
 
 aaudio_result_t AAudioServiceStreamBase::pause() {
-    const int64_t beginNs = AudioClock::getNanoseconds();
+    std::lock_guard<std::mutex> lock(mLock);
+    return pause_l();
+}
+
+aaudio_result_t AAudioServiceStreamBase::pause_l() {
     aaudio_result_t result = AAUDIO_OK;
     if (!isRunning()) {
         return result;
     }
+    const int64_t beginNs = AudioClock::getNanoseconds();
 
     mediametrics::Defer defer([&] {
         mediametrics::LogItem(mMetricsId)
@@ -279,7 +299,7 @@
 
     result = stopTimestampThread();
     if (result != AAUDIO_OK) {
-        disconnect();
+        disconnect_l();
         return result;
     }
 
@@ -292,7 +312,7 @@
     result = endpoint->stopStream(this, mClientHandle);
     if (result != AAUDIO_OK) {
         ALOGE("%s() mServiceEndpoint returned %d, %s", __func__, result, getTypeText());
-        disconnect(); // TODO should we return or pause Base first?
+        disconnect_l(); // TODO should we return or pause Base first?
     }
 
     sendServiceEvent(AAUDIO_SERVICE_EVENT_PAUSED);
@@ -301,11 +321,16 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::stop() {
-    const int64_t beginNs = AudioClock::getNanoseconds();
+    std::lock_guard<std::mutex> lock(mLock);
+    return stop_l();
+}
+
+aaudio_result_t AAudioServiceStreamBase::stop_l() {
     aaudio_result_t result = AAUDIO_OK;
     if (!isRunning()) {
         return result;
     }
+    const int64_t beginNs = AudioClock::getNanoseconds();
 
     mediametrics::Defer defer([&] {
         mediametrics::LogItem(mMetricsId)
@@ -322,7 +347,7 @@
     sendCurrentTimestamp(); // warning - this calls a virtual function
     result = stopTimestampThread();
     if (result != AAUDIO_OK) {
-        disconnect();
+        disconnect_l();
         return result;
     }
 
@@ -336,7 +361,7 @@
     result = endpoint->stopStream(this, mClientHandle);
     if (result != AAUDIO_OK) {
         ALOGE("%s() stopStream returned %d, %s", __func__, result, getTypeText());
-        disconnect();
+        disconnect_l();
         // TODO what to do with result here?
     }
 
@@ -355,11 +380,12 @@
 }
 
 aaudio_result_t AAudioServiceStreamBase::flush() {
-    const int64_t beginNs = AudioClock::getNanoseconds();
+    std::lock_guard<std::mutex> lock(mLock);
     aaudio_result_t result = AAudio_isFlushAllowed(getState());
     if (result != AAUDIO_OK) {
         return result;
     }
+    const int64_t beginNs = AudioClock::getNanoseconds();
 
     mediametrics::Defer defer([&] {
         mediametrics::LogItem(mMetricsId)
@@ -404,16 +430,66 @@
 }
 
 void AAudioServiceStreamBase::disconnect() {
-    if (getState() != AAUDIO_STREAM_STATE_DISCONNECTED) {
+    std::lock_guard<std::mutex> lock(mLock);
+    disconnect_l();
+}
+
+void AAudioServiceStreamBase::disconnect_l() {
+    if (getState() != AAUDIO_STREAM_STATE_DISCONNECTED
+        && getState() != AAUDIO_STREAM_STATE_CLOSED) {
+
         mediametrics::LogItem(mMetricsId)
             .set(AMEDIAMETRICS_PROP_EVENT, AMEDIAMETRICS_PROP_EVENT_VALUE_DISCONNECT)
             .set(AMEDIAMETRICS_PROP_STATE, AudioGlobal_convertStreamStateToText(getState()))
             .record();
+
         sendServiceEvent(AAUDIO_SERVICE_EVENT_DISCONNECTED);
         setState(AAUDIO_STREAM_STATE_DISCONNECTED);
     }
 }
 
+aaudio_result_t AAudioServiceStreamBase::registerAudioThread(pid_t clientThreadId,
+        int priority) {
+    std::lock_guard<std::mutex> lock(mLock);
+    aaudio_result_t result = AAUDIO_OK;
+    if (getRegisteredThread() != AAudioServiceStreamBase::ILLEGAL_THREAD_ID) {
+        ALOGE("AAudioService::registerAudioThread(), thread already registered");
+        result = AAUDIO_ERROR_INVALID_STATE;
+    } else {
+        const pid_t ownerPid = IPCThreadState::self()->getCallingPid(); // TODO review
+        setRegisteredThread(clientThreadId);
+        int err = android::requestPriority(ownerPid, clientThreadId,
+                                           priority, true /* isForApp */);
+        if (err != 0) {
+            ALOGE("AAudioService::registerAudioThread(%d) failed, errno = %d, priority = %d",
+                  clientThreadId, errno, priority);
+            result = AAUDIO_ERROR_INTERNAL;
+        }
+    }
+    return result;
+}
+
+aaudio_result_t AAudioServiceStreamBase::unregisterAudioThread(pid_t clientThreadId) {
+    std::lock_guard<std::mutex> lock(mLock);
+    aaudio_result_t result = AAUDIO_OK;
+    if (getRegisteredThread() != clientThreadId) {
+        ALOGE("%s(), wrong thread", __func__);
+        result = AAUDIO_ERROR_ILLEGAL_ARGUMENT;
+    } else {
+        setRegisteredThread(0);
+    }
+    return result;
+}
+
+void AAudioServiceStreamBase::setState(aaudio_stream_state_t state) {
+    // CLOSED is a final state.
+    if (mState != AAUDIO_STREAM_STATE_CLOSED) {
+        mState = state;
+    } else {
+        ALOGW_IF(mState != state, "%s(%d) when already CLOSED", __func__, state);
+    }
+}
+
 aaudio_result_t AAudioServiceStreamBase::sendServiceEvent(aaudio_service_event_t event,
                                                           double  dataDouble) {
     AAudioServiceMessage command;
@@ -511,6 +587,7 @@
  * used to communicate with the underlying HAL or Service.
  */
 aaudio_result_t AAudioServiceStreamBase::getDescription(AudioEndpointParcelable &parcelable) {
+    std::lock_guard<std::mutex> lock(mLock);
     {
         std::lock_guard<std::mutex> lock(mUpMessageQueueLock);
         if (mUpMessageQueue == nullptr) {
diff --git a/services/oboeservice/AAudioServiceStreamBase.h b/services/oboeservice/AAudioServiceStreamBase.h
index 79dd738..94cc980 100644
--- a/services/oboeservice/AAudioServiceStreamBase.h
+++ b/services/oboeservice/AAudioServiceStreamBase.h
@@ -77,7 +77,7 @@
     // because we had to wait until we generated the handle.
     void logOpen(aaudio_handle_t streamHandle);
 
-    virtual aaudio_result_t close();
+    aaudio_result_t close();
 
     /**
      * Start the flow of audio data.
@@ -85,7 +85,7 @@
      * This is not guaranteed to be synchronous but it currently is.
      * An AAUDIO_SERVICE_EVENT_STARTED will be sent to the client when complete.
      */
-    virtual aaudio_result_t start();
+    aaudio_result_t start();
 
     /**
      * Stop the flow of data so that start() can resume without loss of data.
@@ -93,7 +93,7 @@
      * This is not guaranteed to be synchronous but it currently is.
      * An AAUDIO_SERVICE_EVENT_PAUSED will be sent to the client when complete.
     */
-    virtual aaudio_result_t pause();
+    aaudio_result_t pause();
 
     /**
      * Stop the flow of data after the currently queued data has finished playing.
@@ -102,17 +102,14 @@
      * An AAUDIO_SERVICE_EVENT_STOPPED will be sent to the client when complete.
      *
      */
-    virtual aaudio_result_t stop();
-
-    aaudio_result_t stopTimestampThread();
+    aaudio_result_t stop();
 
     /**
      * Discard any data held by the underlying HAL or Service.
      *
      * An AAUDIO_SERVICE_EVENT_FLUSHED will be sent to the client when complete.
      */
-    virtual aaudio_result_t flush();
-
+    aaudio_result_t flush();
 
     virtual aaudio_result_t startClient(const android::AudioClient& client,
                                         const audio_attributes_t *attr __unused,
@@ -126,29 +123,19 @@
         return AAUDIO_ERROR_UNAVAILABLE;
     }
 
+    aaudio_result_t registerAudioThread(pid_t clientThreadId, int priority);
+
+    aaudio_result_t unregisterAudioThread(pid_t clientThreadId);
+
     bool isRunning() const {
         return mState == AAUDIO_STREAM_STATE_STARTED;
     }
 
-    // -------------------------------------------------------------------
-
-    /**
-     * Send a message to the client with an int64_t data value.
-     */
-    aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
-                                     int64_t dataLong = 0);
-    /**
-     * Send a message to the client with an double data value.
-     */
-    aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
-                                     double  dataDouble);
-
     /**
      * Fill in a parcelable description of stream.
      */
     aaudio_result_t getDescription(AudioEndpointParcelable &parcelable);
 
-
     void setRegisteredThread(pid_t pid) {
         mRegisteredClientThread = pid;
     }
@@ -262,9 +249,13 @@
     aaudio_result_t open(const aaudio::AAudioStreamRequest &request,
                          aaudio_sharing_mode_t sharingMode);
 
-    void setState(aaudio_stream_state_t state) {
-        mState = state;
-    }
+    // These must be called under mLock
+    virtual aaudio_result_t close_l();
+    virtual aaudio_result_t pause_l();
+    virtual aaudio_result_t stop_l();
+    void disconnect_l();
+
+    void setState(aaudio_stream_state_t state);
 
     /**
      * Device specific startup.
@@ -319,6 +310,19 @@
 
 private:
 
+    aaudio_result_t stopTimestampThread();
+
+    /**
+     * Send a message to the client with an int64_t data value.
+     */
+    aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
+                                     int64_t dataLong = 0);
+    /**
+     * Send a message to the client with a double data value.
+     */
+    aaudio_result_t sendServiceEvent(aaudio_service_event_t event,
+                                     double dataDouble);
+
     /**
      * @return true if the queue is getting full.
      */
@@ -336,6 +340,10 @@
     // This indicate that a running stream should not be processed because of an error,
     // for example a full message queue. Note that this atomic is unrelated to mCloseNeeded.
     std::atomic<bool>       mSuspended{false};
+
+    // Locking order is important.
+    // Always acquire mLock before acquiring AAudioServiceEndpoint::mLockStreams
+    std::mutex              mLock; // Prevent start/stop/close etcetera from colliding
 };
 
 } /* namespace aaudio */
diff --git a/services/oboeservice/AAudioServiceStreamMMAP.cpp b/services/oboeservice/AAudioServiceStreamMMAP.cpp
index 639a0a8..54d7d06 100644
--- a/services/oboeservice/AAudioServiceStreamMMAP.cpp
+++ b/services/oboeservice/AAudioServiceStreamMMAP.cpp
@@ -92,11 +92,11 @@
 }
 
 // Stop the flow of data such that start() can resume with loss of data.
-aaudio_result_t AAudioServiceStreamMMAP::pause() {
+aaudio_result_t AAudioServiceStreamMMAP::pause_l() {
     if (!isRunning()) {
         return AAUDIO_OK;
     }
-    aaudio_result_t result = AAudioServiceStreamBase::pause();
+    aaudio_result_t result = AAudioServiceStreamBase::pause_l();
     // TODO put before base::pause()?
     if (!mInService) {
         (void) stopClient(mClientHandle);
@@ -104,11 +104,11 @@
     return result;
 }
 
-aaudio_result_t AAudioServiceStreamMMAP::stop() {
+aaudio_result_t AAudioServiceStreamMMAP::stop_l() {
     if (!isRunning()) {
         return AAUDIO_OK;
     }
-    aaudio_result_t result = AAudioServiceStreamBase::stop();
+    aaudio_result_t result = AAudioServiceStreamBase::stop_l();
     // TODO put before base::stop()?
     if (!mInService) {
         (void) stopClient(mClientHandle);
diff --git a/services/oboeservice/AAudioServiceStreamMMAP.h b/services/oboeservice/AAudioServiceStreamMMAP.h
index 9105469..5902613 100644
--- a/services/oboeservice/AAudioServiceStreamMMAP.h
+++ b/services/oboeservice/AAudioServiceStreamMMAP.h
@@ -52,16 +52,6 @@
 
     aaudio_result_t open(const aaudio::AAudioStreamRequest &request) override;
 
-    /**
-     * Stop the flow of data so that start() can resume without loss of data.
-     *
-     * This is not guaranteed to be synchronous but it currently is.
-     * An AAUDIO_SERVICE_EVENT_PAUSED will be sent to the client when complete.
-    */
-    aaudio_result_t pause() override;
-
-    aaudio_result_t stop() override;
-
     aaudio_result_t startClient(const android::AudioClient& client,
                                 const audio_attributes_t *attr,
                                 audio_port_handle_t *clientHandle) override;
@@ -72,6 +62,16 @@
 
 protected:
 
+    /**
+     * Stop the flow of data so that start() can resume without loss of data.
+     *
+     * This is not guaranteed to be synchronous but it currently is.
+     * An AAUDIO_SERVICE_EVENT_PAUSED will be sent to the client when complete.
+    */
+    aaudio_result_t pause_l() override;
+
+    aaudio_result_t stop_l() override;
+
     aaudio_result_t getAudioDataDescription(AudioEndpointParcelable &parcelable) override;
 
     aaudio_result_t getFreeRunningPosition(int64_t *positionFrames, int64_t *timeNanos) override;
diff --git a/services/oboeservice/AAudioServiceStreamShared.cpp b/services/oboeservice/AAudioServiceStreamShared.cpp
index 2ca847a..01b1c2e 100644
--- a/services/oboeservice/AAudioServiceStreamShared.cpp
+++ b/services/oboeservice/AAudioServiceStreamShared.cpp
@@ -203,9 +203,8 @@
     return result;
 }
 
-
-aaudio_result_t AAudioServiceStreamShared::close()  {
-    aaudio_result_t result = AAudioServiceStreamBase::close();
+aaudio_result_t AAudioServiceStreamShared::close_l()  {
+    aaudio_result_t result = AAudioServiceStreamBase::close_l();
 
     {
         std::lock_guard<std::mutex> lock(mAudioDataQueueLock);
diff --git a/services/oboeservice/AAudioServiceStreamShared.h b/services/oboeservice/AAudioServiceStreamShared.h
index 61769b5..abcb782 100644
--- a/services/oboeservice/AAudioServiceStreamShared.h
+++ b/services/oboeservice/AAudioServiceStreamShared.h
@@ -52,7 +52,7 @@
 
     aaudio_result_t open(const aaudio::AAudioStreamRequest &request) override;
 
-    aaudio_result_t close() override;
+    aaudio_result_t close_l() override;
 
     /**
      * This must be locked when calling getAudioDataFifoBuffer_l() and while
diff --git a/services/oboeservice/AAudioStreamTracker.cpp b/services/oboeservice/AAudioStreamTracker.cpp
index 3328159..8e66b94 100644
--- a/services/oboeservice/AAudioStreamTracker.cpp
+++ b/services/oboeservice/AAudioStreamTracker.cpp
@@ -30,32 +30,20 @@
 using namespace android;
 using namespace aaudio;
 
-sp<AAudioServiceStreamBase> AAudioStreamTracker::decrementAndRemoveStreamByHandle(
+int32_t AAudioStreamTracker::removeStreamByHandle(
         aaudio_handle_t streamHandle) {
     std::lock_guard<std::mutex> lock(mHandleLock);
-    sp<AAudioServiceStreamBase> serviceStream;
-    auto it = mStreamsByHandle.find(streamHandle);
-    if (it != mStreamsByHandle.end()) {
-        sp<AAudioServiceStreamBase> tempStream = it->second;
-        // Does the caller need to close the stream?
-        // The reference count should never be negative.
-        // But it is safer to check for <= 0 than == 0.
-        if ((tempStream->decrementServiceReferenceCount_l() <= 0) && tempStream->isCloseNeeded()) {
-            serviceStream = tempStream; // Only return stream if ready to be closed.
-            mStreamsByHandle.erase(it);
-        }
-    }
-    return serviceStream;
+    auto count = mStreamsByHandle.erase(streamHandle);
+    return static_cast<int32_t>(count);
 }
 
-sp<AAudioServiceStreamBase> AAudioStreamTracker::getStreamByHandleAndIncrement(
+sp<AAudioServiceStreamBase> AAudioStreamTracker::getStreamByHandle(
         aaudio_handle_t streamHandle) {
     std::lock_guard<std::mutex> lock(mHandleLock);
     sp<AAudioServiceStreamBase> serviceStream;
     auto it = mStreamsByHandle.find(streamHandle);
     if (it != mStreamsByHandle.end()) {
         serviceStream = it->second;
-        serviceStream->incrementServiceReferenceCount_l();
     }
     return serviceStream;
 }
@@ -63,7 +51,7 @@
 // The port handle is only available when the stream is started.
 // So we have to iterate over all the streams.
 // Luckily this rarely happens.
-sp<AAudioServiceStreamBase> AAudioStreamTracker::findStreamByPortHandleAndIncrement(
+sp<AAudioServiceStreamBase> AAudioStreamTracker::findStreamByPortHandle(
         audio_port_handle_t portHandle) {
     std::lock_guard<std::mutex> lock(mHandleLock);
     sp<AAudioServiceStreamBase> serviceStream;
@@ -72,7 +60,6 @@
         auto candidate = it->second;
         if (candidate->getPortHandle() == portHandle) {
             serviceStream = candidate;
-            serviceStream->incrementServiceReferenceCount_l();
             break;
         }
         it++;
diff --git a/services/oboeservice/AAudioStreamTracker.h b/services/oboeservice/AAudioStreamTracker.h
index 57ec426..d1301a2 100644
--- a/services/oboeservice/AAudioStreamTracker.h
+++ b/services/oboeservice/AAudioStreamTracker.h
@@ -32,25 +32,20 @@
 
 public:
     /**
-     * Find the stream associated with the handle.
-     * Decrement its reference counter. If zero and the stream needs
-     * to be closed then remove the stream and return a pointer to the stream.
-     * Otherwise return null if it does not need to be closed.
+     * Remove any streams with the matching handle.
      *
      * @param streamHandle
-     * @return strong pointer to the stream if it needs to be closed, or nullptr
+     * @return number of streams removed
      */
-    android::sp<AAudioServiceStreamBase> decrementAndRemoveStreamByHandle(
-            aaudio_handle_t streamHandle);
+    int32_t removeStreamByHandle(aaudio_handle_t streamHandle);
 
     /**
      * Look up a stream based on the handle.
-     * Increment its service reference count if found.
      *
      * @param streamHandle
      * @return strong pointer to the stream if found, or nullptr
      */
-    android::sp<aaudio::AAudioServiceStreamBase> getStreamByHandleAndIncrement(
+    android::sp<aaudio::AAudioServiceStreamBase> getStreamByHandle(
             aaudio_handle_t streamHandle);
 
     /**
@@ -60,7 +55,7 @@
      * @param portHandle
      * @return strong pointer to the stream if found, or nullptr
      */
-    android::sp<aaudio::AAudioServiceStreamBase> findStreamByPortHandleAndIncrement(
+    android::sp<aaudio::AAudioServiceStreamBase> findStreamByPortHandle(
             audio_port_handle_t portHandle);
 
     /**