Improvements to our MPEG2 Transport Stream parser

Verifies the continuity counter
Parses and associates PCR time with streams
Allows for a absolute time anchor to be signalled via discontinuity.

Change-Id: I4bc88c78382c9cc6380f28df584cc6c254e0a8f9
diff --git a/media/libmediaplayerservice/nuplayer/StreamingSource.cpp b/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
index b696aa4..a1fd2ed 100644
--- a/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
+++ b/media/libmediaplayerservice/nuplayer/StreamingSource.cpp
@@ -42,7 +42,15 @@
 
 void NuPlayer::StreamingSource::start() {
     mStreamListener = new NuPlayerStreamListener(mSource, 0);
-    mTSParser = new ATSParser(ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE);
+
+    uint32_t sourceFlags = mSource->flags();
+
+    uint32_t parserFlags = ATSParser::TS_TIMESTAMPS_ARE_ABSOLUTE;
+    if (sourceFlags & IStreamSource::kFlagAlignedVideoData) {
+        parserFlags |= ATSParser::ALIGNED_VIDEO_DATA;
+    }
+
+    mTSParser = new ATSParser(parserFlags);
 
     mStreamListener->start();
 }
@@ -138,7 +146,17 @@
         return finalResult == OK ? -EWOULDBLOCK : finalResult;
     }
 
-    return source->dequeueAccessUnit(accessUnit);
+    status_t err = source->dequeueAccessUnit(accessUnit);
+
+#if !defined(LOG_NDEBUG) || LOG_NDEBUG == 0
+    if (err == OK) {
+        int64_t timeUs;
+        CHECK((*accessUnit)->meta()->findInt64("timeUs", &timeUs));
+        ALOGV("dequeueAccessUnit timeUs=%lld us", timeUs);
+    }
+#endif
+
+    return err;
 }
 
 }  // namespace android
diff --git a/media/libstagefright/mpeg2ts/ATSParser.cpp b/media/libstagefright/mpeg2ts/ATSParser.cpp
index 5f3e300..d988356 100644
--- a/media/libstagefright/mpeg2ts/ATSParser.cpp
+++ b/media/libstagefright/mpeg2ts/ATSParser.cpp
@@ -51,7 +51,8 @@
             unsigned pid, ABitReader *br, status_t *err);
 
     bool parsePID(
-            unsigned pid, unsigned payload_unit_start_indicator,
+            unsigned pid, unsigned continuity_counter,
+            unsigned payload_unit_start_indicator,
             ABitReader *br, status_t *err);
 
     void signalDiscontinuity(
@@ -77,6 +78,10 @@
         return mProgramMapPID;
     }
 
+    uint32_t parserFlags() const {
+        return mParser->mFlags;
+    }
+
 private:
     ATSParser *mParser;
     unsigned mProgramNumber;
@@ -91,13 +96,17 @@
 };
 
 struct ATSParser::Stream : public RefBase {
-    Stream(Program *program, unsigned elementaryPID, unsigned streamType);
+    Stream(Program *program,
+           unsigned elementaryPID,
+           unsigned streamType,
+           unsigned PCR_PID);
 
     unsigned type() const { return mStreamType; }
     unsigned pid() const { return mElementaryPID; }
     void setPID(unsigned pid) { mElementaryPID = pid; }
 
     status_t parse(
+            unsigned continuity_counter,
             unsigned payload_unit_start_indicator,
             ABitReader *br);
 
@@ -115,6 +124,8 @@
     Program *mProgram;
     unsigned mElementaryPID;
     unsigned mStreamType;
+    unsigned mPCR_PID;
+    int32_t mExpectedContinuityCounter;
 
     sp<ABuffer> mBuffer;
     sp<AnotherPacketSource> mSource;
@@ -184,7 +195,8 @@
 }
 
 bool ATSParser::Program::parsePID(
-        unsigned pid, unsigned payload_unit_start_indicator,
+        unsigned pid, unsigned continuity_counter,
+        unsigned payload_unit_start_indicator,
         ABitReader *br, status_t *err) {
     *err = OK;
 
@@ -194,7 +206,7 @@
     }
 
     *err = mStreams.editValueAt(index)->parse(
-            payload_unit_start_indicator, br);
+            continuity_counter, payload_unit_start_indicator, br);
 
     return true;
 }
@@ -241,7 +253,10 @@
     MY_LOGV("  section_number = %u", br->getBits(8));
     MY_LOGV("  last_section_number = %u", br->getBits(8));
     MY_LOGV("  reserved = %u", br->getBits(3));
-    MY_LOGV("  PCR_PID = 0x%04x", br->getBits(13));
+
+    unsigned PCR_PID = br->getBits(13);
+    ALOGV("  PCR_PID = 0x%04x", PCR_PID);
+
     MY_LOGV("  reserved = %u", br->getBits(4));
 
     unsigned program_info_length = br->getBits(12);
@@ -382,7 +397,9 @@
         ssize_t index = mStreams.indexOfKey(info.mPID);
 
         if (index < 0) {
-            sp<Stream> stream = new Stream(this, info.mPID, info.mType);
+            sp<Stream> stream = new Stream(
+                    this, info.mPID, info.mType, PCR_PID);
+
             mStreams.add(info.mPID, stream);
         }
     }
@@ -419,21 +436,35 @@
         }
     }
 
-    return (PTS * 100) / 9;
+    int64_t timeUs = (PTS * 100) / 9;
+
+    if (mParser->mAbsoluteTimeAnchorUs >= 0ll) {
+        timeUs += mParser->mAbsoluteTimeAnchorUs;
+    }
+
+    return timeUs;
 }
 
 ////////////////////////////////////////////////////////////////////////////////
 
 ATSParser::Stream::Stream(
-        Program *program, unsigned elementaryPID, unsigned streamType)
+        Program *program,
+        unsigned elementaryPID,
+        unsigned streamType,
+        unsigned PCR_PID)
     : mProgram(program),
       mElementaryPID(elementaryPID),
       mStreamType(streamType),
+      mPCR_PID(PCR_PID),
+      mExpectedContinuityCounter(-1),
       mPayloadStarted(false),
       mQueue(NULL) {
     switch (mStreamType) {
         case STREAMTYPE_H264:
-            mQueue = new ElementaryStreamQueue(ElementaryStreamQueue::H264);
+            mQueue = new ElementaryStreamQueue(
+                    ElementaryStreamQueue::H264,
+                    (mProgram->parserFlags() & ALIGNED_VIDEO_DATA)
+                        ? ElementaryStreamQueue::kFlag_AlignedData : 0);
             break;
         case STREAMTYPE_MPEG2_AUDIO_ADTS:
             mQueue = new ElementaryStreamQueue(ElementaryStreamQueue::AAC);
@@ -473,11 +504,25 @@
 }
 
 status_t ATSParser::Stream::parse(
+        unsigned continuity_counter,
         unsigned payload_unit_start_indicator, ABitReader *br) {
     if (mQueue == NULL) {
         return OK;
     }
 
+    if (mExpectedContinuityCounter >= 0
+            && (unsigned)mExpectedContinuityCounter != continuity_counter) {
+        ALOGI("discontinuity on stream pid 0x%04x", mElementaryPID);
+
+        mPayloadStarted = false;
+        mBuffer->setRange(0, 0);
+        mExpectedContinuityCounter = -1;
+
+        return OK;
+    }
+
+    mExpectedContinuityCounter = (continuity_counter + 1) & 0x0f;
+
     if (payload_unit_start_indicator) {
         if (mPayloadStarted) {
             // Otherwise we run the danger of receiving the trailing bytes
@@ -664,8 +709,7 @@
             PTS |= br->getBits(15);
             CHECK_EQ(br->getBits(1), 1u);
 
-            ALOGV("PTS = %llu", PTS);
-            // ALOGI("PTS = %.2f secs", PTS / 90000.0f);
+            ALOGV("PTS = 0x%016llx (%.2f)", PTS, PTS / 90000.0);
 
             optional_bytes_remaining -= 5;
 
@@ -847,7 +891,10 @@
 ////////////////////////////////////////////////////////////////////////////////
 
 ATSParser::ATSParser(uint32_t flags)
-    : mFlags(flags) {
+    : mFlags(flags),
+      mAbsoluteTimeAnchorUs(-1ll),
+      mNumTSPacketsParsed(0),
+      mNumPCRs(0) {
     mPSISections.add(0 /* PID */, new PSISection);
 }
 
@@ -863,6 +910,15 @@
 
 void ATSParser::signalDiscontinuity(
         DiscontinuityType type, const sp<AMessage> &extra) {
+    if (type == DISCONTINUITY_ABSOLUTE_TIME) {
+        int64_t timeUs;
+        CHECK(extra->findInt64("timeUs", &timeUs));
+
+        CHECK(mPrograms.empty());
+        mAbsoluteTimeAnchorUs = timeUs;
+        return;
+    }
+
     for (size_t i = 0; i < mPrograms.size(); ++i) {
         mPrograms.editItemAt(i)->signalDiscontinuity(type, extra);
     }
@@ -942,6 +998,7 @@
 
 status_t ATSParser::parsePID(
         ABitReader *br, unsigned PID,
+        unsigned continuity_counter,
         unsigned payload_unit_start_indicator) {
     ssize_t sectionIndex = mPSISections.indexOfKey(PID);
 
@@ -1002,7 +1059,8 @@
     for (size_t i = 0; i < mPrograms.size(); ++i) {
         status_t err;
         if (mPrograms.editItemAt(i)->parsePID(
-                    PID, payload_unit_start_indicator, br, &err)) {
+                    PID, continuity_counter, payload_unit_start_indicator,
+                    br, &err)) {
             if (err != OK) {
                 return err;
             }
@@ -1019,10 +1077,55 @@
     return OK;
 }
 
-void ATSParser::parseAdaptationField(ABitReader *br) {
+void ATSParser::parseAdaptationField(ABitReader *br, unsigned PID) {
     unsigned adaptation_field_length = br->getBits(8);
+
     if (adaptation_field_length > 0) {
-        br->skipBits(adaptation_field_length * 8);  // XXX
+        unsigned discontinuity_indicator = br->getBits(1);
+
+        if (discontinuity_indicator) {
+            ALOGV("PID 0x%04x: discontinuity_indicator = 1 (!!!)", PID);
+        }
+
+        br->skipBits(2);
+        unsigned PCR_flag = br->getBits(1);
+
+        size_t numBitsRead = 4;
+
+        if (PCR_flag) {
+            br->skipBits(4);
+            uint64_t PCR_base = br->getBits(32);
+            PCR_base = (PCR_base << 1) | br->getBits(1);
+
+            br->skipBits(6);
+            unsigned PCR_ext = br->getBits(9);
+
+            // The number of bytes from the start of the current
+            // MPEG2 transport stream packet up and including
+            // the final byte of this PCR_ext field.
+            size_t byteOffsetFromStartOfTSPacket =
+                (188 - br->numBitsLeft() / 8);
+
+            uint64_t PCR = PCR_base * 300 + PCR_ext;
+
+            ALOGV("PID 0x%04x: PCR = 0x%016llx (%.2f)",
+                  PID, PCR, PCR / 27E6);
+
+            // The number of bytes received by this parser up to and
+            // including the final byte of this PCR_ext field.
+            size_t byteOffsetFromStart =
+                mNumTSPacketsParsed * 188 + byteOffsetFromStartOfTSPacket;
+
+            for (size_t i = 0; i < mPrograms.size(); ++i) {
+                updatePCR(PID, PCR, byteOffsetFromStart);
+            }
+
+            numBitsRead += 52;
+        }
+
+        CHECK_GE(adaptation_field_length * 8, numBitsRead);
+
+        br->skipBits(adaptation_field_length * 8 - numBitsRead);
     }
 }
 
@@ -1048,19 +1151,24 @@
     ALOGV("adaptation_field_control = %u", adaptation_field_control);
 
     unsigned continuity_counter = br->getBits(4);
-    ALOGV("continuity_counter = %u", continuity_counter);
+    ALOGV("PID = 0x%04x, continuity_counter = %u", PID, continuity_counter);
 
     // ALOGI("PID = 0x%04x, continuity_counter = %u", PID, continuity_counter);
 
     if (adaptation_field_control == 2 || adaptation_field_control == 3) {
-        parseAdaptationField(br);
+        parseAdaptationField(br, PID);
     }
 
+    status_t err = OK;
+
     if (adaptation_field_control == 1 || adaptation_field_control == 3) {
-        return parsePID(br, PID, payload_unit_start_indicator);
+        err = parsePID(
+                br, PID, continuity_counter, payload_unit_start_indicator);
     }
 
-    return OK;
+    ++mNumTSPacketsParsed;
+
+    return err;
 }
 
 sp<MediaSource> ATSParser::getSource(SourceType type) {
@@ -1091,6 +1199,31 @@
     return mPrograms.editItemAt(0)->PTSTimeDeltaEstablished();
 }
 
+void ATSParser::updatePCR(
+        unsigned PID, uint64_t PCR, size_t byteOffsetFromStart) {
+    ALOGV("PCR 0x%016llx @ %d", PCR, byteOffsetFromStart);
+
+    if (mNumPCRs == 2) {
+        mPCR[0] = mPCR[1];
+        mPCRBytes[0] = mPCRBytes[1];
+        mSystemTimeUs[0] = mSystemTimeUs[1];
+        mNumPCRs = 1;
+    }
+
+    mPCR[mNumPCRs] = PCR;
+    mPCRBytes[mNumPCRs] = byteOffsetFromStart;
+    mSystemTimeUs[mNumPCRs] = ALooper::GetNowUs();
+
+    ++mNumPCRs;
+
+    if (mNumPCRs == 2) {
+        double transportRate =
+            (mPCRBytes[1] - mPCRBytes[0]) * 27E6 / (mPCR[1] - mPCR[0]);
+
+        ALOGV("transportRate = %.2f bytes/sec", transportRate);
+    }
+}
+
 ////////////////////////////////////////////////////////////////////////////////
 
 ATSParser::PSISection::PSISection() {
diff --git a/media/libstagefright/mpeg2ts/ATSParser.h b/media/libstagefright/mpeg2ts/ATSParser.h
index 9ef2939..5ccbab7 100644
--- a/media/libstagefright/mpeg2ts/ATSParser.h
+++ b/media/libstagefright/mpeg2ts/ATSParser.h
@@ -38,6 +38,7 @@
         DISCONTINUITY_TIME              = 1,
         DISCONTINUITY_AUDIO_FORMAT      = 2,
         DISCONTINUITY_VIDEO_FORMAT      = 4,
+        DISCONTINUITY_ABSOLUTE_TIME     = 8,
 
         DISCONTINUITY_SEEK              = DISCONTINUITY_TIME,
 
@@ -54,7 +55,9 @@
         // If this flag is _not_ specified, the first PTS encountered in a
         // program of this stream will be assumed to correspond to media time 0
         // instead.
-        TS_TIMESTAMPS_ARE_ABSOLUTE = 1
+        TS_TIMESTAMPS_ARE_ABSOLUTE = 1,
+        // Video PES packets contain exactly one (aligned) access unit.
+        ALIGNED_VIDEO_DATA         = 2,
     };
 
     ATSParser(uint32_t flags = 0);
@@ -100,17 +103,29 @@
     // Keyed by PID
     KeyedVector<unsigned, sp<PSISection> > mPSISections;
 
+    int64_t mAbsoluteTimeAnchorUs;
+
+    size_t mNumTSPacketsParsed;
+
     void parseProgramAssociationTable(ABitReader *br);
     void parseProgramMap(ABitReader *br);
     void parsePES(ABitReader *br);
 
     status_t parsePID(
         ABitReader *br, unsigned PID,
+        unsigned continuity_counter,
         unsigned payload_unit_start_indicator);
 
-    void parseAdaptationField(ABitReader *br);
+    void parseAdaptationField(ABitReader *br, unsigned PID);
     status_t parseTS(ABitReader *br);
 
+    void updatePCR(unsigned PID, uint64_t PCR, size_t byteOffsetFromStart);
+
+    uint64_t mPCR[2];
+    size_t mPCRBytes[2];
+    int64_t mSystemTimeUs[2];
+    size_t mNumPCRs;
+
     DISALLOW_EVIL_CONSTRUCTORS(ATSParser);
 };