blob: 5f6e0d19c535f9239df269b48d64baafe5af6bf1 [file] [log] [blame]
Eric Tanace588c2018-09-12 11:44:43 -07001/*
2 * Copyright (C) 2018 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17#define LOG_TAG "NBLog"
18//#define LOG_NDEBUG 0
19
20#include <memory>
21#include <queue>
22#include <stddef.h>
23#include <stdint.h>
24#include <vector>
25
26#include <audio_utils/fifo.h>
27#include <json/json.h>
28#include <media/nblog/Merger.h>
29#include <media/nblog/PerformanceAnalysis.h>
30#include <media/nblog/ReportPerformance.h>
31#include <media/nblog/Reader.h>
32#include <media/nblog/Timeline.h>
33#include <utils/Condition.h>
34#include <utils/Log.h>
35#include <utils/Mutex.h>
36#include <utils/RefBase.h>
37#include <utils/String8.h>
38#include <utils/Timers.h>
39#include <utils/Thread.h>
40
41namespace android {
42namespace NBLog {
43
44Merger::Merger(const void *shared, size_t size):
45 mShared((Shared *) shared),
46 mFifo(mShared != NULL ?
47 new audio_utils_fifo(size, sizeof(uint8_t),
48 mShared->mBuffer, mShared->mRear, NULL /*throttlesFront*/) : NULL),
49 mFifoWriter(mFifo != NULL ? new audio_utils_fifo_writer(*mFifo) : NULL)
50{
51}
52
53void Merger::addReader(const sp<Reader> &reader)
54{
55 // FIXME This is called by binder thread in MediaLogService::registerWriter
56 // but the access to shared variable mReaders is not yet protected by a lock.
57 mReaders.push_back(reader);
58}
59
60// items placed in priority queue during merge
61// composed by a timestamp and the index of the snapshot where the timestamp came from
62struct MergeItem
63{
64 int64_t ts;
65 int index;
66 MergeItem(int64_t ts, int index): ts(ts), index(index) {}
67};
68
69bool operator>(const struct MergeItem &i1, const struct MergeItem &i2)
70{
71 return i1.ts > i2.ts || (i1.ts == i2.ts && i1.index > i2.index);
72}
73
74// Merge registered readers, sorted by timestamp, and write data to a single FIFO in local memory
75void Merger::merge()
76{
77 if (true) return; // Merging is not necessary at the moment, so this is to disable it
78 // and bypass compiler warnings about member variables not being used.
79 const int nLogs = mReaders.size();
80 std::vector<std::unique_ptr<Snapshot>> snapshots(nLogs);
81 std::vector<EntryIterator> offsets;
82 offsets.reserve(nLogs);
83 for (int i = 0; i < nLogs; ++i) {
84 snapshots[i] = mReaders[i]->getSnapshot();
85 offsets.push_back(snapshots[i]->begin());
86 }
87 // initialize offsets
88 // TODO custom heap implementation could allow to update top, improving performance
89 // for bursty buffers
90 std::priority_queue<MergeItem, std::vector<MergeItem>, std::greater<MergeItem>> timestamps;
91 for (int i = 0; i < nLogs; ++i)
92 {
93 if (offsets[i] != snapshots[i]->end()) {
94 std::unique_ptr<AbstractEntry> abstractEntry = AbstractEntry::buildEntry(offsets[i]);
95 if (abstractEntry == nullptr) {
96 continue;
97 }
98 timestamps.emplace(abstractEntry->timestamp(), i);
99 }
100 }
101
102 while (!timestamps.empty()) {
103 int index = timestamps.top().index; // find minimum timestamp
104 // copy it to the log, increasing offset
105 offsets[index] = AbstractEntry::buildEntry(offsets[index])->
106 copyWithAuthor(mFifoWriter, index);
107 // update data structures
108 timestamps.pop();
109 if (offsets[index] != snapshots[index]->end()) {
110 int64_t ts = AbstractEntry::buildEntry(offsets[index])->timestamp();
111 timestamps.emplace(ts, index);
112 }
113 }
114}
115
116const std::vector<sp<Reader>>& Merger::getReaders() const
117{
118 //AutoMutex _l(mLock);
119 return mReaders;
120}
121
122// ---------------------------------------------------------------------------
123
124MergeReader::MergeReader(const void *shared, size_t size, Merger &merger)
125 : Reader(shared, size, "MergeReader"), mReaders(merger.getReaders())
126{
127}
128
129// Takes raw content of the local merger FIFO, processes log entries, and
130// writes the data to a map of class PerformanceAnalysis, based on their thread ID.
131void MergeReader::processSnapshot(Snapshot &snapshot, int author)
132{
133 ReportPerformance::PerformanceData& data = mThreadPerformanceData[author];
134 // We don't do "auto it" because it reduces readability in this case.
135 for (EntryIterator it = snapshot.begin(); it != snapshot.end(); ++it) {
136 switch (it->type) {
137 case EVENT_HISTOGRAM_ENTRY_TS: {
138 const HistTsEntry payload = it.payload<HistTsEntry>();
139 // TODO: hash for histogram ts and audio state need to match
140 // and correspond to audio production source file location
141 mThreadPerformanceAnalysis[author][0 /*hash*/].logTsEntry(payload.ts);
142 } break;
143 case EVENT_AUDIO_STATE: {
144 mThreadPerformanceAnalysis[author][0 /*hash*/].handleStateChange();
145 } break;
146 case EVENT_THREAD_INFO: {
147 const thread_info_t info = it.payload<thread_info_t>();
148 // TODO make PerformanceData hold a type of thread_info_t.
149 // Currently, thread_info_t is defined in NBLog.h, which includes
150 // PerformanceAnalysis.h. PerformanceData is defined in PerformanceAnalysis.h,
151 // where including NBLog.h would result in circular includes. The organization
152 // of files will need to change to avoid this problem.
153 data.threadInfo = info;
154 } break;
155 case EVENT_LATENCY: {
156 const double latencyMs = it.payload<double>();
157 data.latencyHist.add(latencyMs);
158 } break;
159 case EVENT_WORK_TIME: {
160 const int64_t monotonicNs = it.payload<int64_t>();
161 const double monotonicMs = monotonicNs * 1e-6;
162 data.workHist.add(monotonicMs);
163 data.active += monotonicNs;
164 } break;
165 case EVENT_WARMUP_TIME: {
166 const double timeMs = it.payload<double>();
167 data.warmupHist.add(timeMs);
168 } break;
169 case EVENT_UNDERRUN:
170 data.underruns++;
171 break;
172 case EVENT_OVERRUN:
173 data.overruns++;
174 break;
175 case EVENT_RESERVED:
176 case EVENT_UPPER_BOUND:
177 ALOGW("warning: unexpected event %d", it->type);
178 default:
179 break;
180 }
181 }
182}
183
184void MergeReader::getAndProcessSnapshot()
185{
186 // get a snapshot of each reader and process them
187 // TODO insert lock here
188 const size_t nLogs = mReaders.size();
189 std::vector<std::unique_ptr<Snapshot>> snapshots(nLogs);
190 for (size_t i = 0; i < nLogs; i++) {
191 snapshots[i] = mReaders[i]->getSnapshot();
192 }
193 // TODO unlock lock here
194 for (size_t i = 0; i < nLogs; i++) {
195 if (snapshots[i] != nullptr) {
196 processSnapshot(*(snapshots[i]), i);
197 }
198 }
199 checkPushToMediaMetrics();
200}
201
202void MergeReader::checkPushToMediaMetrics()
203{
204 const nsecs_t now = systemTime();
205 for (auto& item : mThreadPerformanceData) {
206 ReportPerformance::PerformanceData& data = item.second;
207 if (now - data.start >= kPeriodicMediaMetricsPush) {
208 (void)ReportPerformance::sendToMediaMetrics(data);
209 data.reset(); // data is persistent per thread
210 }
211 }
212}
213
214void MergeReader::dump(int fd, int indent)
215{
216 // TODO: add a mutex around media.log dump
217 ReportPerformance::dump(fd, indent, mThreadPerformanceAnalysis);
218 Json::Value root(Json::arrayValue);
219 for (const auto& item : mThreadPerformanceData) {
220 const ReportPerformance::PerformanceData& data = item.second;
221 std::unique_ptr<Json::Value> threadData = ReportPerformance::dumpToJson(data);
222 if (threadData == nullptr) {
223 continue;
224 }
225 (*threadData)["threadNum"] = item.first;
226 root.append(*threadData);
227 }
228 Json::StyledWriter writer;
229 std::string rootStr = writer.write(root);
230 write(fd, rootStr.c_str(), rootStr.size());
231}
232
233void MergeReader::handleAuthor(const AbstractEntry &entry, String8 *body)
234{
235 int author = entry.author();
236 if (author == -1) {
237 return;
238 }
239 // FIXME Needs a lock
240 const char* name = mReaders[author]->name().c_str();
241 body->appendFormat("%s: ", name);
242}
243
244// ---------------------------------------------------------------------------
245
246MergeThread::MergeThread(Merger &merger, MergeReader &mergeReader)
247 : mMerger(merger),
248 mMergeReader(mergeReader),
249 mTimeoutUs(0)
250{
251}
252
253MergeThread::~MergeThread()
254{
255 // set exit flag, set timeout to 0 to force threadLoop to exit and wait for the thread to join
256 requestExit();
257 setTimeoutUs(0);
258 join();
259}
260
261bool MergeThread::threadLoop()
262{
263 bool doMerge;
264 {
265 AutoMutex _l(mMutex);
266 // If mTimeoutUs is negative, wait on the condition variable until it's positive.
267 // If it's positive, merge. The minimum period between waking the condition variable
268 // is handled in AudioFlinger::MediaLogNotifier::threadLoop().
269 mCond.wait(mMutex);
270 doMerge = mTimeoutUs > 0;
271 mTimeoutUs -= kThreadSleepPeriodUs;
272 }
273 if (doMerge) {
274 // Merge data from all the readers
275 mMerger.merge();
276 // Process the data collected by mMerger and write it to PerformanceAnalysis
277 // FIXME: decide whether to call getAndProcessSnapshot every time
278 // or whether to have a separate thread that calls it with a lower frequency
279 mMergeReader.getAndProcessSnapshot();
280 }
281 return true;
282}
283
284void MergeThread::wakeup()
285{
286 setTimeoutUs(kThreadWakeupPeriodUs);
287}
288
289void MergeThread::setTimeoutUs(int time)
290{
291 AutoMutex _l(mMutex);
292 mTimeoutUs = time;
293 mCond.signal();
294}
295
296} // namespace NBLog
297} // namespace android