mediametrics: implement AStatsManager_PullAtomCallback

Bug: 141714243
Test: mediametrics_tests
Change-Id: Ie1fad29f40626bb6d363ba04c1669ba7753d25f0
diff --git a/services/mediametrics/Android.bp b/services/mediametrics/Android.bp
index f7d1f6a..443d339 100644
--- a/services/mediametrics/Android.bp
+++ b/services/mediametrics/Android.bp
@@ -165,10 +165,18 @@
         "libmediautils",
         "libmemunreachable",
         "libprotobuf-cpp-lite",
+        "libstagefright_foundation",
         "libstatslog",
+        "libstatspull",
+        "libstatssocket",
         "libutils",
     ],
 
+    export_shared_lib_headers: [
+        "libstatspull",
+        "libstatssocket",
+    ],
+
     static_libs: [
         "libplatformprotos",
     ],
diff --git a/services/mediametrics/MediaMetricsService.cpp b/services/mediametrics/MediaMetricsService.cpp
index 1a0f6a4..bfc722e 100644
--- a/services/mediametrics/MediaMetricsService.cpp
+++ b/services/mediametrics/MediaMetricsService.cpp
@@ -19,6 +19,7 @@
 #include <utils/Log.h>
 
 #include "MediaMetricsService.h"
+#include "iface_statsd.h"
 
 #include <pwd.h> //getpwuid
 
@@ -30,6 +31,9 @@
 #include <mediautils/MemoryLeakTrackUtil.h>
 #include <memunreachable/memunreachable.h>
 #include <private/android_filesystem_config.h> // UID
+#include <statslog.h>
+
+#include <set>
 
 namespace android {
 
@@ -200,7 +204,6 @@
 
     (void)mAudioAnalytics.submit(sitem, isTrusted);
 
-    extern bool dump2Statsd(const std::shared_ptr<const mediametrics::Item>& item);
     (void)dump2Statsd(sitem);  // failure should be logged in function.
     saveItem(sitem);
     return NO_ERROR;
@@ -440,6 +443,10 @@
     std::lock_guard _l(mLock);
     // we assume the items are roughly in time order.
     mItems.emplace_back(item);
+    if (isPullable(item->getKey())) {
+        registerStatsdCallbacksIfNeeded();
+        mPullableItems[item->getKey()].emplace_back(item);
+    }
     ++mItemsFinalized;
     if (expirations(item)
             && (!mExpireFuture.valid()
@@ -486,4 +493,57 @@
     return false;
 }
 
+void MediaMetricsService::registerStatsdCallbacksIfNeeded()
+{
+    if (mStatsdRegistered.test_and_set()) {
+        return;
+    }
+    auto tag = android::util::MEDIA_DRM_ACTIVITY_INFO;
+    auto cb = MediaMetricsService::pullAtomCallback;
+    AStatsManager_setPullAtomCallback(tag, /* metadata */ nullptr, cb, this);
+}
+
+/* static */
+bool MediaMetricsService::isPullable(const std::string &key)
+{
+    static const std::set<std::string> pullableKeys{
+        "mediadrm",
+    };
+    return pullableKeys.count(key);
+}
+
+/* static */
+std::string MediaMetricsService::atomTagToKey(int32_t atomTag)
+{
+    switch (atomTag) {
+    case android::util::MEDIA_DRM_ACTIVITY_INFO:
+        return "mediadrm";
+    }
+    return {};
+}
+
+/* static */
+AStatsManager_PullAtomCallbackReturn MediaMetricsService::pullAtomCallback(
+        int32_t atomTag, AStatsEventList* data, void* cookie)
+{
+    MediaMetricsService* svc = reinterpret_cast<MediaMetricsService*>(cookie);
+    return svc->pullItems(atomTag, data);
+}
+
+AStatsManager_PullAtomCallbackReturn MediaMetricsService::pullItems(
+        int32_t atomTag, AStatsEventList* data)
+{
+    const std::string key(atomTagToKey(atomTag));
+    if (key.empty()) {
+        return AStatsManager_PULL_SKIP;
+    }
+    std::lock_guard _l(mLock);
+    for (auto &item : mPullableItems[key]) {
+        if (const auto sitem = item.lock()) {
+            dump2Statsd(sitem, data);
+        }
+    }
+    mPullableItems[key].clear();
+    return AStatsManager_PULL_SUCCESS;
+}
 } // namespace android
diff --git a/services/mediametrics/MediaMetricsService.h b/services/mediametrics/MediaMetricsService.h
index bcae397..8bc8019 100644
--- a/services/mediametrics/MediaMetricsService.h
+++ b/services/mediametrics/MediaMetricsService.h
@@ -26,6 +26,7 @@
 #include <android-base/thread_annotations.h>
 #include <android/media/BnMediaMetricsService.h>
 #include <mediautils/ServiceUtilities.h>
+#include <stats_pull_atom_callback.h>
 #include <utils/String8.h>
 
 #include "AudioAnalytics.h"
@@ -102,6 +103,15 @@
     void dumpQueue(String8 &result, int64_t sinceNs, const char* prefix) REQUIRES(mLock);
     void dumpHeaders(String8 &result, int64_t sinceNs, const char* prefix) REQUIRES(mLock);
 
+    // support statsd pushed atoms
+    static bool isPullable(const std::string &key);
+    static std::string atomTagToKey(int32_t atomTag);
+    static AStatsManager_PullAtomCallbackReturn pullAtomCallback(
+            int32_t atomTag, AStatsEventList* data, void* cookie);
+    AStatsManager_PullAtomCallbackReturn pullItems(int32_t atomTag, AStatsEventList* data);
+    void registerStatsdCallbacksIfNeeded();
+    std::atomic_flag mStatsdRegistered = ATOMIC_FLAG_INIT;
+
     // The following variables accessed without mLock
 
     // limit how many records we'll retain
@@ -130,6 +140,12 @@
     // TODO: Make separate class, use segmented queue, write lock only end.
     // Note: Another analytics module might have ownership of an item longer than the log.
     std::deque<std::shared_ptr<const mediametrics::Item>> mItems GUARDED_BY(mLock);
+
+    // Queues per item key, pending to be pulled by statsd.
+    // Use weak_ptr such that a pullable item can still expire.
+    using ItemKey = std::string;
+    using WeakItemQueue = std::deque<std::weak_ptr<const mediametrics::Item>>;
+    std::unordered_map<ItemKey, WeakItemQueue> mPullableItems GUARDED_BY(mLock);
 };
 
 } // namespace android
diff --git a/services/mediametrics/fuzzer/Android.bp b/services/mediametrics/fuzzer/Android.bp
index d75ded2..b03e518 100644
--- a/services/mediametrics/fuzzer/Android.bp
+++ b/services/mediametrics/fuzzer/Android.bp
@@ -50,7 +50,10 @@
         "libmemunreachable",
         "libprotobuf-cpp-lite",
         "libstagefright",
+        "libstagefright_foundation",
         "libstatslog",
+        "libstatspull",
+        "libstatssocket",
         "libutils",
         "mediametricsservice-aidl-cpp",
     ],
diff --git a/services/mediametrics/iface_statsd.cpp b/services/mediametrics/iface_statsd.cpp
index 0ce2f4a..b7c5296 100644
--- a/services/mediametrics/iface_statsd.cpp
+++ b/services/mediametrics/iface_statsd.cpp
@@ -27,7 +27,10 @@
 #include <pthread.h>
 #include <unistd.h>
 
+#include <map>
 #include <memory>
+#include <string>
+#include <vector>
 #include <string.h>
 #include <pwd.h>
 
@@ -47,30 +50,13 @@
 
 bool enabled_statsd = true;
 
-struct statsd_hooks {
-    const char *key;
-    bool (*handler)(const mediametrics::Item *);
-};
+using statsd_pusher = bool (*)(const mediametrics::Item *);
+using statsd_puller = bool (*)(const mediametrics::Item *, AStatsEventList *);
 
-// keep this sorted, so we can do binary searches
-static constexpr struct statsd_hooks statsd_handlers[] =
-{
-    { "audiopolicy", statsd_audiopolicy },
-    { "audiorecord", statsd_audiorecord },
-    { "audiothread", statsd_audiothread },
-    { "audiotrack", statsd_audiotrack },
-    { "codec", statsd_codec},
-    { "drmmanager", statsd_drmmanager },
-    { "extractor", statsd_extractor },
-    { "mediadrm", statsd_mediadrm },
-    { "mediaparser", statsd_mediaparser },
-    { "nuplayer", statsd_nuplayer },
-    { "nuplayer2", statsd_nuplayer },
-    { "recorder", statsd_recorder },
-};
-
-// give me a record, i'll look at the type and upload appropriately
-bool dump2Statsd(const std::shared_ptr<const mediametrics::Item>& item) {
+namespace {
+template<typename Handler, typename... Args>
+bool dump2StatsdInternal(const std::map<std::string, Handler>& handlers,
+        const std::shared_ptr<const mediametrics::Item>& item, Args... args) {
     if (item == nullptr) return false;
 
     // get the key
@@ -81,12 +67,39 @@
         return false;
     }
 
-    for (const auto &statsd_handler : statsd_handlers) {
-        if (key == statsd_handler.key) {
-            return statsd_handler.handler(item.get());
-        }
+    if (handlers.count(key)) {
+        return (handlers.at(key))(item.get(), args...);
     }
     return false;
 }
+} // namespace
+
+// give me a record, I'll look at the type and upload appropriately
+bool dump2Statsd(const std::shared_ptr<const mediametrics::Item>& item) {
+    static const std::map<std::string, statsd_pusher> statsd_pushers =
+    {
+        { "audiopolicy", statsd_audiopolicy },
+        { "audiorecord", statsd_audiorecord },
+        { "audiothread", statsd_audiothread },
+        { "audiotrack", statsd_audiotrack },
+        { "codec", statsd_codec},
+        { "drmmanager", statsd_drmmanager },
+        { "extractor", statsd_extractor },
+        { "mediadrm", statsd_mediadrm },
+        { "mediaparser", statsd_mediaparser },
+        { "nuplayer", statsd_nuplayer },
+        { "nuplayer2", statsd_nuplayer },
+        { "recorder", statsd_recorder },
+    };
+    return dump2StatsdInternal(statsd_pushers, item);
+}
+
+bool dump2Statsd(const std::shared_ptr<const mediametrics::Item>& item, AStatsEventList* out) {
+    static const std::map<std::string, statsd_puller> statsd_pullers =
+    {
+        { "mediadrm", statsd_mediadrm_puller },
+    };
+    return dump2StatsdInternal(statsd_pullers, item, out);
+}
 
 } // namespace android
diff --git a/services/mediametrics/iface_statsd.h b/services/mediametrics/iface_statsd.h
index 9b49556..1b6c79a 100644
--- a/services/mediametrics/iface_statsd.h
+++ b/services/mediametrics/iface_statsd.h
@@ -14,7 +14,13 @@
  * limitations under the License.
  */
 
+#include <memory>
+#include <stats_event.h>
+
 namespace android {
+namespace mediametrics {
+class Item;
+}
 
 extern bool enabled_statsd;
 
@@ -30,7 +36,12 @@
 extern bool statsd_recorder(const mediametrics::Item *);
 
 extern bool statsd_mediadrm(const mediametrics::Item *);
-extern bool statsd_widevineCDM(const mediametrics::Item *);
 extern bool statsd_drmmanager(const mediametrics::Item *);
 
+// component specific pullers
+extern bool statsd_mediadrm_puller(const mediametrics::Item *, AStatsEventList *);
+
+bool dump2Statsd(const std::shared_ptr<const mediametrics::Item>& item);
+bool dump2Statsd(const std::shared_ptr<const mediametrics::Item>& item, AStatsEventList* out);
+
 } // namespace android
diff --git a/services/mediametrics/statsd_drm.cpp b/services/mediametrics/statsd_drm.cpp
index ffc9707..071c549 100644
--- a/services/mediametrics/statsd_drm.cpp
+++ b/services/mediametrics/statsd_drm.cpp
@@ -17,6 +17,7 @@
 //#define LOG_NDEBUG 0
 #define LOG_TAG "statsd_drm"
 #include <utils/Log.h>
+#include <media/stagefright/foundation/base64.h>
 
 #include <stdint.h>
 #include <inttypes.h>
@@ -37,6 +38,7 @@
 
 #include <array>
 #include <string>
+#include <vector>
 
 namespace android {
 
@@ -118,4 +120,65 @@
     return true;
 }
 
+namespace {
+std::vector<uint8_t> base64DecodeNoPad(std::string& str) {
+    if (str.empty()) {
+        return {};
+    }
+
+    switch (str.length() % 4) {
+    case 3: str += "="; break;
+    case 2: str += "=="; break;
+    case 1: str += "==="; break;
+    case 0: /* unchanged */ break;
+    }
+
+    std::vector<uint8_t> buf(str.length() / 4 * 3, 0);
+    size_t size = buf.size();
+    if (decodeBase64(buf.data(), &size, str.c_str()) && size <= buf.size()) {
+        buf.erase(buf.begin() + size, buf.end());
+        return buf;
+    }
+    return {};
+}
+} // namespace
+
+// |out| and its contents are memory-managed by statsd.
+bool statsd_mediadrm_puller(const mediametrics::Item* item, AStatsEventList* out)
+{
+    if (item == nullptr) {
+        return false;
+    }
+
+    if (!enabled_statsd) {
+        ALOGV("NOT pulling: mediadrm activity");
+        return true;
+    }
+
+    std::string serialized_metrics;
+    (void) item->getString("serialized_metrics", &serialized_metrics);
+    const auto framework_raw(base64DecodeNoPad(serialized_metrics));
+
+    std::string plugin_metrics;
+    (void) item->getString("plugin_metrics", &plugin_metrics);
+    const auto plugin_raw(base64DecodeNoPad(plugin_metrics));
+
+    std::string vendor;
+    (void) item->getString("vendor", &vendor);
+    std::string description;
+    (void) item->getString("description", &description);
+
+    // Memory for |event| is internally managed by statsd.
+    AStatsEvent* event = AStatsEventList_addStatsEvent(out);
+    AStatsEvent_setAtomId(event, android::util::MEDIA_DRM_ACTIVITY_INFO);
+    AStatsEvent_writeString(event, item->getPkgName().c_str());
+    AStatsEvent_writeInt64(event, item->getPkgVersionCode());
+    AStatsEvent_writeString(event, vendor.c_str());
+    AStatsEvent_writeString(event, description.c_str());
+    AStatsEvent_writeByteArray(event, framework_raw.data(), framework_raw.size());
+    AStatsEvent_writeByteArray(event, plugin_raw.data(), plugin_raw.size());
+    AStatsEvent_build(event);
+    return true;
+}
+
 } // namespace android