Implement log merging.

Still missing:
 * Put in separate periodical thread

Bug: 35329293
Test: manual
Change-Id: Ie8802fb7972e20d8dec493376ea253bb782f3a46
diff --git a/media/libnbaio/NBLog.cpp b/media/libnbaio/NBLog.cpp
index 03df94e..49caeb8 100644
--- a/media/libnbaio/NBLog.cpp
+++ b/media/libnbaio/NBLog.cpp
@@ -29,6 +29,8 @@
 #include <utils/Log.h>
 #include <utils/String8.h>
 
+#include <queue>
+
 namespace android {
 
 int NBLog::Entry::readAt(size_t offset) const
@@ -48,6 +50,75 @@
 
 // ---------------------------------------------------------------------------
 
+NBLog::FormatEntry::FormatEntry(const uint8_t *entry) : mEntry(entry) {
+    ALOGW_IF(entry[0] != EVENT_START_FMT,
+             "Created format entry with invalid event type %d",
+             entry[0]);
+}
+
+const char *NBLog::FormatEntry::formatString() const {
+    return (const char*) mEntry + 2;
+}
+
+size_t NBLog::FormatEntry::formatStringLength() const {
+    return mEntry[1];
+}
+
+const uint8_t *NBLog::FormatEntry::args() const {
+    const uint8_t *ptr = mEntry + mEntry[1] + NBLog::Entry::kOverhead;
+    if (ptr[0] != EVENT_TIMESTAMP) { // skip author if present
+        ptr += ptr[1] + NBLog::Entry::kOverhead;
+    }
+    return ptr + ptr[1] + NBLog::Entry::kOverhead;
+}
+
+timespec NBLog::FormatEntry::timestamp() const {
+    const uint8_t *ptr = mEntry + mEntry[1] + NBLog::Entry::kOverhead;
+    if (ptr[0] != EVENT_TIMESTAMP) { // skip authors if present
+        ptr += ptr[1] + NBLog::Entry::kOverhead;
+    }
+    // by this point, we should be standing in the timestamp entry
+    return *((struct timespec*) (&ptr[2]));
+}
+
+pid_t NBLog::FormatEntry::author() const {
+    size_t authorOffset = mEntry[1] + NBLog::Entry::kOverhead;
+    // return -1 if the entry has no author
+    if (mEntry[authorOffset] != EVENT_AUTHOR) {
+        return -1;
+    }
+    return *(pid_t*)(mEntry + authorOffset + 2);
+}
+
+size_t NBLog::FormatEntry::copyTo(std::unique_ptr<audio_utils_fifo_writer> &dst, int author) const {
+    // copy fmt start entry
+    size_t entryOffset = copyEntry(dst, mEntry);
+    // insert author entry
+    size_t authorEntrySize = NBLog::Entry::kOverhead + sizeof(author);
+    uint8_t authorEntry[authorEntrySize];
+    authorEntry[0] = EVENT_AUTHOR;
+    authorEntry[1] = authorEntry[authorEntrySize - 1] = sizeof(author);
+    *(int*) (&authorEntry[2]) = author;
+    dst->write(authorEntry, authorEntrySize);
+    // copy rest of entries
+    Event lastEvent = EVENT_TIMESTAMP;
+    while (lastEvent != EVENT_END_FMT) {
+        lastEvent = (Event) mEntry[entryOffset];
+        entryOffset += copyEntry(dst, mEntry + entryOffset);
+    }
+    return entryOffset;
+}
+
+
+size_t NBLog::FormatEntry::copyEntry(std::unique_ptr<audio_utils_fifo_writer> &dst,
+                                     const uint8_t *src) const {
+    size_t length = src[1] + NBLog::Entry::kOverhead;
+    dst->write(src, length);
+    return length;
+}
+
+// ---------------------------------------------------------------------------
+
 #if 0   // FIXME see note in NBLog.h
 NBLog::Timeline::Timeline(size_t size, void *shared)
     : mSize(roundup(size)), mOwn(shared == NULL),
@@ -463,43 +534,44 @@
     delete mFifo;
 }
 
-void NBLog::Reader::dump(int fd, size_t indent)
+std::unique_ptr<NBLog::Reader::Snapshot> NBLog::Reader::getSnapshot()
 {
     if (mFifoReader == NULL) {
-        return;
+        return std::unique_ptr<NBLog::Reader::Snapshot>(new Snapshot());
     }
     // make a copy to avoid race condition with writer
     size_t capacity = mFifo->capacity();
 
-    // TODO Stack-based allocation of large objects may fail.
-    //      Currently the log buffers are a page or two, which should be safe.
-    //      But if the log buffers ever get a lot larger,
-    //      then change this to allocate from heap when necessary.
-    static size_t kReasonableStackObjectSize = 32768;
-    ALOGW_IF(capacity > kReasonableStackObjectSize, "Stack-based allocation of object may fail");
-    uint8_t copy[capacity];
+    std::unique_ptr<Snapshot> snapshot(new Snapshot(capacity));
 
-    size_t lost;
-    ssize_t actual = mFifoReader->read(copy, capacity, NULL /*timeout*/, &lost);
+    ssize_t actual = mFifoReader->read((void*) snapshot->mData, capacity, NULL /*timeout*/,
+                                       &(snapshot->mLost));
     ALOG_ASSERT(actual <= capacity);
-    size_t avail = actual > 0 ? (size_t) actual : 0;
-    size_t i = avail;
+    snapshot->mAvail = actual > 0 ? (size_t) actual : 0;
+    return snapshot;
+}
+
+void NBLog::Reader::dump(int fd, size_t indent, NBLog::Reader::Snapshot &snapshot)
+{
+    size_t i = snapshot.available();
+    const uint8_t *snapData = snapshot.data();
     Event event;
     size_t length;
     struct timespec ts;
     time_t maxSec = -1;
     while (i >= Entry::kOverhead) {
-        length = copy[i - 1];
-        if (length + Entry::kOverhead > i || copy[i - length - 2] != length) {
+        length = snapData[i - 1];
+        if (length + Entry::kOverhead > i || snapData[i - length - 2] != length) {
+
             break;
         }
-        event = (Event) copy[i - length - Entry::kOverhead];
+        event = (Event) snapData[i - length - Entry::kOverhead];
         if (event == EVENT_TIMESTAMP) {
             if (length != sizeof(struct timespec)) {
                 // corrupt
                 break;
             }
-            memcpy(&ts, &copy[i - length - 1], sizeof(struct timespec));
+            memcpy(&ts, &snapData[i - length - 1], sizeof(struct timespec));
             if (ts.tv_sec > maxSec) {
                 maxSec = ts.tv_sec;
             }
@@ -509,7 +581,7 @@
     mFd = fd;
     mIndent = indent;
     String8 timestamp, body;
-    lost += i;
+    size_t lost = snapshot.lost() + i;
     if (lost > 0) {
         body.appendFormat("warning: lost %zu bytes worth of events", lost);
         // TODO timestamp empty here, only other choice to wait for the first timestamp event in the
@@ -525,10 +597,10 @@
         timestamp.appendFormat("[%*s]", (int) width + 4, "");
     }
     bool deferredTimestamp = false;
-    while (i < avail) {
-        event = (Event) copy[i];
-        length = copy[i + 1];
-        const void *data = &copy[i + 2];
+    while (i < snapshot.available()) {
+        event = (Event) snapData[i];
+        length = snapData[i + 1];
+        const void *data = &snapData[i + 2];
         size_t advance = length + Entry::kOverhead;
         switch (event) {
         case EVENT_STRING:
@@ -544,11 +616,11 @@
             size_t j = i;
             for (;;) {
                 j += sizeof(struct timespec) + 3 /*Entry::kOverhead?*/;
-                if (j >= avail || (Event) copy[j] != EVENT_TIMESTAMP) {
+                if (j >= snapshot.available() || (Event) snapData[j] != EVENT_TIMESTAMP) {
                     break;
                 }
                 struct timespec tsNext;
-                memcpy(&tsNext, &copy[j + 2], sizeof(struct timespec));
+                memcpy(&tsNext, &snapData[j + 2], sizeof(struct timespec));
                 if (tsNext.tv_sec != ts.tv_sec) {
                     break;
                 }
@@ -594,8 +666,7 @@
             appendPID(&body, data, length);
             break;
         case EVENT_START_FMT:
-            advance += handleFormat((const char*) &copy[i + 2], length,
-                                    &copy[i + Entry::kOverhead + length], &timestamp, &body);
+            advance += handleFormat(FormatEntry(snapData + i), &timestamp, &body);
             break;
         case EVENT_END_FMT:
             body.appendFormat("warning: got to end format event");
@@ -617,6 +688,13 @@
     }
 }
 
+void NBLog::Reader::dump(int fd, size_t indent)
+{
+    // get a snapshot, dump it
+    std::unique_ptr<Snapshot> snap = getSnapshot();
+    dump(fd, indent, *snap);
+}
+
 void NBLog::Reader::dumpLine(const String8 &timestamp, String8 &body)
 {
     if (mFd >= 0) {
@@ -656,17 +734,23 @@
     body->appendFormat("<PID: %d, name: %.*s>", id, (int) (length - sizeof(pid_t)), name);
 }
 
-int NBLog::handleFormat(const char *fmt, size_t fmt_length, const uint8_t *data,
-                        String8 *timestamp, String8 *body) {
-    if (data[0] != EVENT_TIMESTAMP) {
-        ALOGW("NBLog Reader Expected timestamp event %d, got %d", EVENT_TIMESTAMP, data[0]);
-    }
-    struct timespec ts;
-    memcpy(&ts, &data[2], sizeof(ts));
+int NBLog::Reader::handleFormat(const FormatEntry &fmtEntry, String8 *timestamp, String8 *body) {
+    // log timestamp
+    struct timespec ts = fmtEntry.timestamp();
     timestamp->clear();
     timestamp->appendFormat("[%d.%03d]", (int) ts.tv_sec,
                     (int) (ts.tv_nsec / 1000000));
-    size_t data_offset = Entry::kOverhead + sizeof ts;
+    size_t fullLength = NBLog::Entry::kOverhead + sizeof(ts);
+
+    // log author (if present)
+    fullLength += handleAuthor(fmtEntry, body);
+
+    // log string
+    const uint8_t *args = fmtEntry.args();
+    size_t args_offset = 0;
+
+    const char* fmt = fmtEntry.formatString();
+    size_t fmt_length = fmtEntry.formatStringLength();
 
     for (size_t fmt_offset = 0; fmt_offset < fmt_length; ++fmt_offset) {
         if (fmt[fmt_offset] != '%') {
@@ -681,20 +765,20 @@
             continue;
         }
 
-        NBLog::Event event = (NBLog::Event) data[data_offset];
-        size_t length = data[data_offset + 1];
+        NBLog::Event event = (NBLog::Event) args[args_offset];
+        size_t length = args[args_offset + 1];
 
         // TODO check length for event type is correct
-        if(length != data[data_offset + length + 2]) {
+        if(length != args[args_offset + length + 2]) {
             ALOGW("NBLog Reader received different lengths %zu and %d for event %d", length,
-                  data[data_offset + length + 2], event);
+                  args[args_offset + length + 2], event);
             body->append("<invalid entry>");
             ++fmt_offset;
             continue;
         }
 
         // TODO: implement more complex formatting such as %.3f
-        void * datum = (void*) &data[data_offset + 2]; // pointer to the current event data
+        void * datum = (void*) &args[args_offset + 2]; // pointer to the current event args
         switch(fmt[fmt_offset])
         {
         case 's': // string
@@ -731,10 +815,98 @@
             ALOGW("NBLog Reader encountered unknown character %c", fmt[fmt_offset]);
         }
 
-        data_offset += length + Entry::kOverhead;
+        args_offset += length + Entry::kOverhead;
 
     }
-    return data_offset + Entry::kOverhead; // data offset + size of END_FMT event
+    fullLength += args_offset + Entry::kOverhead;
+    return fullLength;
+}
+
+// ---------------------------------------------------------------------------
+
+NBLog::Merger::Merger(const void *shared, size_t size):
+      mBuffer(NULL),
+      mShared((Shared *) shared),
+      mFifo(mShared != NULL ?
+        new audio_utils_fifo(size, sizeof(uint8_t),
+            mShared->mBuffer, mShared->mRear, NULL /*throttlesFront*/) : NULL),
+      mFifoWriter(mFifo != NULL ? new audio_utils_fifo_writer(*mFifo) : NULL)
+      {}
+
+void NBLog::Merger::addReader(const NBLog::NamedReader &reader) {
+    mNamedReaders.push_back(reader);
+}
+
+// items placed in priority queue during merge
+// composed by a timestamp and the index of the snapshot where the timestamp came from
+struct MergeItem
+{
+    struct timespec ts;
+    int index;
+    MergeItem(struct timespec ts, int index): ts(ts), index(index) {}
+};
+
+// operators needed for priority queue in merge
+bool operator>(const struct timespec &t1, const struct timespec &t2) {
+    return t1.tv_sec > t2.tv_sec || (t1.tv_sec == t2.tv_sec && t1.tv_nsec > t2.tv_nsec);
+}
+
+bool operator>(const struct MergeItem &i1, const struct MergeItem &i2) {
+    return i1.ts > i2.ts ||
+        (i1.ts.tv_sec == i2.ts.tv_sec && i1.ts.tv_nsec == i2.ts.tv_nsec && i1.index > i2.index);
+}
+
+// Merge registered readers, sorted by timestamp
+void NBLog::Merger::merge() {
+    int nLogs = mNamedReaders.size();
+    std::vector<std::unique_ptr<NBLog::Reader::Snapshot>> snapshots(nLogs);
+    for (int i = 0; i < nLogs; ++i) {
+        snapshots[i] = mNamedReaders[i].reader()->getSnapshot();
+    }
+    // initialize offsets
+    std::vector<size_t> offsets(nLogs, 0);
+    // TODO custom heap implementation could allow to update top, improving performance
+    // for bursty buffers
+    std::priority_queue<MergeItem, std::vector<MergeItem>, std::greater<MergeItem>> timestamps;
+    for (int i = 0; i < nLogs; ++i)
+    {
+        if (snapshots[i]->available() > 0) {
+            timespec ts = FormatEntry(snapshots[i]->data()).timestamp();
+            MergeItem item(ts, i);
+            timestamps.push(item);
+        }
+    }
+
+    while (!timestamps.empty()) {
+        // find minimum timestamp
+        int index = timestamps.top().index;
+        // copy it to the log
+        size_t length = FormatEntry(snapshots[index]->data() + offsets[index]).copyTo(
+            mFifoWriter, index);
+        // update data structures
+        offsets[index] += length;
+        ALOGW_IF(offsets[index] > snapshots[index]->available(), "Overflown snapshot capacity");
+        timestamps.pop();
+        if (offsets[index] != snapshots[index]->available()) {
+            timespec ts = FormatEntry(snapshots[index]->data() + offsets[index]).timestamp();
+            MergeItem item(ts, index);
+            timestamps.emplace(item);
+        }
+    }
+}
+
+const std::vector<NBLog::NamedReader> *NBLog::Merger::getNamedReaders() const {
+    return &mNamedReaders;
+}
+
+NBLog::MergeReader::MergeReader(const void *shared, size_t size, Merger &merger)
+    : Reader(shared, size), mNamedReaders(merger.getNamedReaders()) {}
+
+size_t NBLog::MergeReader::handleAuthor(const NBLog::FormatEntry &fmtEntry, String8 *body) {
+    int author = fmtEntry.author();
+    const char* name = (*mNamedReaders)[author].name();
+    body->appendFormat("%s: ", name);
+    return NBLog::Entry::kOverhead + sizeof(author);
 }
 
 }   // namespace android