blob: cf9d3c8051dc706974ab25767c593fc6cc64f667 [file] [log] [blame]
Chong Zhang6d58e4b2020-03-31 09:41:10 -07001/*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
Chong Zhangacb33502020-04-20 11:04:48 -070017//#define LOG_NDEBUG 0
Chong Zhang6d58e4b2020-03-31 09:41:10 -070018#define LOG_TAG "TranscodingJobScheduler"
19
20#define VALIDATE_STATE 1
21
22#include <inttypes.h>
23#include <media/TranscodingJobScheduler.h>
24#include <utils/Log.h>
25
26#include <utility>
27
28namespace android {
29
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070030constexpr static uid_t OFFLINE_UID = -1;
Chong Zhang6d58e4b2020-03-31 09:41:10 -070031
32//static
33String8 TranscodingJobScheduler::jobToString(const JobKeyType& jobKey) {
34 return String8::format("{client:%lld, job:%d}", (long long)jobKey.first, jobKey.second);
35}
36
37TranscodingJobScheduler::TranscodingJobScheduler(
38 const std::shared_ptr<TranscoderInterface>& transcoder,
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070039 const std::shared_ptr<UidPolicyInterface>& uidPolicy)
40 : mTranscoder(transcoder), mUidPolicy(uidPolicy), mCurrentJob(nullptr), mResourceLost(false) {
Chong Zhang6d58e4b2020-03-31 09:41:10 -070041 // Only push empty offline queue initially. Realtime queues are added when requests come in.
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070042 mUidSortedList.push_back(OFFLINE_UID);
43 mOfflineUidIterator = mUidSortedList.begin();
44 mJobQueues.emplace(OFFLINE_UID, JobQueueType());
Chong Zhang6d58e4b2020-03-31 09:41:10 -070045}
46
47TranscodingJobScheduler::~TranscodingJobScheduler() {}
48
49TranscodingJobScheduler::Job* TranscodingJobScheduler::getTopJob_l() {
50 if (mJobMap.empty()) {
51 return nullptr;
52 }
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070053 uid_t topUid = *mUidSortedList.begin();
54 JobKeyType topJobKey = *mJobQueues[topUid].begin();
Chong Zhang6d58e4b2020-03-31 09:41:10 -070055 return &mJobMap[topJobKey];
56}
57
58void TranscodingJobScheduler::updateCurrentJob_l() {
59 Job* topJob = getTopJob_l();
60 Job* curJob = mCurrentJob;
61 ALOGV("updateCurrentJob: topJob is %s, curJob is %s",
62 topJob == nullptr ? "null" : jobToString(topJob->key).c_str(),
63 curJob == nullptr ? "null" : jobToString(curJob->key).c_str());
64
65 // If we found a topJob that should be run, and it's not already running,
66 // take some actions to ensure it's running.
67 if (topJob != nullptr && (topJob != curJob || topJob->state != Job::RUNNING)) {
68 // If another job is currently running, pause it first.
69 if (curJob != nullptr && curJob->state == Job::RUNNING) {
70 mTranscoder->pause(curJob->key.first, curJob->key.second);
71 curJob->state = Job::PAUSED;
72 }
73 // If we are not experiencing resource loss, we can start or resume
74 // the topJob now.
75 if (!mResourceLost) {
76 if (topJob->state == Job::NOT_STARTED) {
77 mTranscoder->start(topJob->key.first, topJob->key.second);
78 } else if (topJob->state == Job::PAUSED) {
79 mTranscoder->resume(topJob->key.first, topJob->key.second);
80 }
81 topJob->state = Job::RUNNING;
82 }
83 }
84 mCurrentJob = topJob;
85}
86
87void TranscodingJobScheduler::removeJob_l(const JobKeyType& jobKey) {
88 ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str());
89
90 if (mJobMap.count(jobKey) == 0) {
91 ALOGE("job %s doesn't exist", jobToString(jobKey).c_str());
92 return;
93 }
94
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070095 // Remove job from uid's queue.
96 const uid_t uid = mJobMap[jobKey].uid;
97 JobQueueType& jobQueue = mJobQueues[uid];
Chong Zhang6d58e4b2020-03-31 09:41:10 -070098 auto it = std::find(jobQueue.begin(), jobQueue.end(), jobKey);
99 if (it == jobQueue.end()) {
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700100 ALOGE("couldn't find job %s in queue for uid %d", jobToString(jobKey).c_str(), uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700101 return;
102 }
103 jobQueue.erase(it);
104
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700105 // If this is the last job in a real-time queue, remove this uid's queue.
106 if (uid != OFFLINE_UID && jobQueue.empty()) {
107 mUidSortedList.remove(uid);
108 mJobQueues.erase(uid);
Chong Zhangacb33502020-04-20 11:04:48 -0700109 mUidPolicy->unregisterMonitorUid(uid);
110
111 std::unordered_set<uid_t> topUids = mUidPolicy->getTopUids();
112 moveUidsToTop_l(topUids, false /*preserveTopUid*/);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700113 }
114
115 // Clear current job.
116 if (mCurrentJob == &mJobMap[jobKey]) {
117 mCurrentJob = nullptr;
118 }
119
120 // Remove job from job map.
121 mJobMap.erase(jobKey);
122}
123
Chong Zhangacb33502020-04-20 11:04:48 -0700124/**
125 * Moves the set of uids to the front of mUidSortedList (which is used to pick
126 * the next job to run).
127 *
128 * This is called when 1) we received a onTopUidsChanged() callbcak from UidPolicy,
129 * or 2) we removed the job queue for a uid because it becomes empty.
130 *
131 * In case of 1), if there are multiple uids in the set, and the current front
132 * uid in mUidSortedList is still in the set, we try to keep that uid at front
133 * so that current job run is not interrupted. (This is not a concern for case 2)
134 * because the queue for a uid was just removed entirely.)
135 */
136void TranscodingJobScheduler::moveUidsToTop_l(const std::unordered_set<uid_t>& uids,
137 bool preserveTopUid) {
138 // If uid set is empty, nothing to do. Do not change the queue status.
139 if (uids.empty()) {
140 return;
141 }
142
143 // Save the current top uid.
144 uid_t curTopUid = *mUidSortedList.begin();
145 bool pushCurTopToFront = false;
146 int32_t numUidsMoved = 0;
147
148 // Go through the sorted uid list once, and move the ones in top set to front.
149 for (auto it = mUidSortedList.begin(); it != mUidSortedList.end();) {
150 uid_t uid = *it;
151
152 if (uid != OFFLINE_UID && uids.count(uid) > 0) {
153 it = mUidSortedList.erase(it);
154
155 // If this is the top we're preserving, don't push it here, push
156 // it after the for-loop.
157 if (uid == curTopUid && preserveTopUid) {
158 pushCurTopToFront = true;
159 } else {
160 mUidSortedList.push_front(uid);
161 }
162
163 // If we found all uids in the set, break out.
164 if (++numUidsMoved == uids.size()) {
165 break;
166 }
167 } else {
168 ++it;
169 }
170 }
171
172 if (pushCurTopToFront) {
173 mUidSortedList.push_front(curTopUid);
174 }
175}
176
Chong Zhang3fa408f2020-04-30 11:04:28 -0700177bool TranscodingJobScheduler::submit(ClientIdType clientId, JobIdType jobId, uid_t uid,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700178 const TranscodingRequestParcel& request,
179 const std::weak_ptr<ITranscodingClientCallback>& callback) {
180 JobKeyType jobKey = std::make_pair(clientId, jobId);
181
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700182 ALOGV("%s: job %s, uid %d, prioirty %d", __FUNCTION__, jobToString(jobKey).c_str(), uid,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700183 (int32_t)request.priority);
184
185 std::scoped_lock lock{mLock};
186
187 if (mJobMap.count(jobKey) > 0) {
188 ALOGE("job %s already exists", jobToString(jobKey).c_str());
189 return false;
190 }
191
192 // TODO(chz): only support offline vs real-time for now. All kUnspecified jobs
193 // go to offline queue.
194 if (request.priority == TranscodingJobPriority::kUnspecified) {
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700195 uid = OFFLINE_UID;
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700196 }
197
198 // Add job to job map.
199 mJobMap[jobKey].key = jobKey;
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700200 mJobMap[jobKey].uid = uid;
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700201 mJobMap[jobKey].state = Job::NOT_STARTED;
202 mJobMap[jobKey].request = request;
203 mJobMap[jobKey].callback = callback;
204
205 // If it's an offline job, the queue was already added in constructor.
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700206 // If it's a real-time jobs, check if a queue is already present for the uid,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700207 // and add a new queue if needed.
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700208 if (uid != OFFLINE_UID) {
209 if (mJobQueues.count(uid) == 0) {
Chong Zhangacb33502020-04-20 11:04:48 -0700210 mUidPolicy->registerMonitorUid(uid);
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700211 if (mUidPolicy->isUidOnTop(uid)) {
212 mUidSortedList.push_front(uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700213 } else {
214 // Shouldn't be submitting real-time requests from non-top app,
215 // put it in front of the offline queue.
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700216 mUidSortedList.insert(mOfflineUidIterator, uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700217 }
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700218 } else if (uid != *mUidSortedList.begin()) {
219 if (mUidPolicy->isUidOnTop(uid)) {
220 mUidSortedList.remove(uid);
221 mUidSortedList.push_front(uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700222 }
223 }
224 }
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700225 // Append this job to the uid's queue.
226 mJobQueues[uid].push_back(jobKey);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700227
228 updateCurrentJob_l();
229
230 validateState_l();
231 return true;
232}
233
Chong Zhang3fa408f2020-04-30 11:04:28 -0700234bool TranscodingJobScheduler::cancel(ClientIdType clientId, JobIdType jobId) {
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700235 JobKeyType jobKey = std::make_pair(clientId, jobId);
236
237 ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str());
238
239 std::scoped_lock lock{mLock};
240
241 if (mJobMap.count(jobKey) == 0) {
242 ALOGE("job %s doesn't exist", jobToString(jobKey).c_str());
243 return false;
244 }
245 // If the job is running, pause it first.
246 if (mJobMap[jobKey].state == Job::RUNNING) {
247 mTranscoder->pause(clientId, jobId);
248 }
249
250 // Remove the job.
251 removeJob_l(jobKey);
252
253 // Start next job.
254 updateCurrentJob_l();
255
256 validateState_l();
257 return true;
258}
259
Chong Zhang3fa408f2020-04-30 11:04:28 -0700260bool TranscodingJobScheduler::getJob(ClientIdType clientId, JobIdType jobId,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700261 TranscodingRequestParcel* request) {
262 JobKeyType jobKey = std::make_pair(clientId, jobId);
263
264 std::scoped_lock lock{mLock};
265
266 if (mJobMap.count(jobKey) == 0) {
267 ALOGE("job %s doesn't exist", jobToString(jobKey).c_str());
268 return false;
269 }
270
271 *(TranscodingRequest*)request = mJobMap[jobKey].request;
272 return true;
273}
274
Chong Zhang3fa408f2020-04-30 11:04:28 -0700275void TranscodingJobScheduler::onFinish(ClientIdType clientId, JobIdType jobId) {
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700276 JobKeyType jobKey = std::make_pair(clientId, jobId);
277
278 ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str());
279
280 std::scoped_lock lock{mLock};
281
282 if (mJobMap.count(jobKey) == 0) {
Chong Zhangacb33502020-04-20 11:04:48 -0700283 ALOGW("ignoring finish for non-existent job");
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700284 return;
285 }
286
287 // Only ignore if job was never started. In particular, propagate the status
288 // to client if the job is paused. Transcoder could have posted finish when
289 // we're pausing it, and the finish arrived after we changed current job.
290 if (mJobMap[jobKey].state == Job::NOT_STARTED) {
Chong Zhangacb33502020-04-20 11:04:48 -0700291 ALOGW("ignoring finish for job that was never started");
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700292 return;
293 }
294
295 {
296 auto clientCallback = mJobMap[jobKey].callback.lock();
297 if (clientCallback != nullptr) {
298 clientCallback->onTranscodingFinished(jobId, TranscodingResultParcel({jobId, 0}));
299 }
300 }
301
302 // Remove the job.
303 removeJob_l(jobKey);
304
305 // Start next job.
306 updateCurrentJob_l();
307
308 validateState_l();
309}
310
Chong Zhang3fa408f2020-04-30 11:04:28 -0700311void TranscodingJobScheduler::onError(ClientIdType clientId, JobIdType jobId,
312 TranscodingErrorCode err) {
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700313 JobKeyType jobKey = std::make_pair(clientId, jobId);
314
315 ALOGV("%s: job %s, err %d", __FUNCTION__, jobToString(jobKey).c_str(), (int32_t)err);
316
317 std::scoped_lock lock{mLock};
318
319 if (mJobMap.count(jobKey) == 0) {
Chong Zhangacb33502020-04-20 11:04:48 -0700320 ALOGW("ignoring error for non-existent job");
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700321 return;
322 }
323
324 // Only ignore if job was never started. In particular, propagate the status
325 // to client if the job is paused. Transcoder could have posted finish when
326 // we're pausing it, and the finish arrived after we changed current job.
327 if (mJobMap[jobKey].state == Job::NOT_STARTED) {
Chong Zhangacb33502020-04-20 11:04:48 -0700328 ALOGW("ignoring error for job that was never started");
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700329 return;
330 }
331
332 {
333 auto clientCallback = mJobMap[jobKey].callback.lock();
334 if (clientCallback != nullptr) {
335 clientCallback->onTranscodingFailed(jobId, err);
336 }
337 }
338
339 // Remove the job.
340 removeJob_l(jobKey);
341
342 // Start next job.
343 updateCurrentJob_l();
344
345 validateState_l();
346}
347
Chong Zhang3fa408f2020-04-30 11:04:28 -0700348void TranscodingJobScheduler::onProgressUpdate(ClientIdType clientId, JobIdType jobId,
349 int32_t progress) {
Chong Zhangacb33502020-04-20 11:04:48 -0700350 JobKeyType jobKey = std::make_pair(clientId, jobId);
351
352 ALOGV("%s: job %s, progress %d", __FUNCTION__, jobToString(jobKey).c_str(), progress);
353
354 std::scoped_lock lock{mLock};
355
356 if (mJobMap.count(jobKey) == 0) {
357 ALOGW("ignoring progress for non-existent job");
358 return;
359 }
360
361 // Only ignore if job was never started. In particular, propagate the status
362 // to client if the job is paused. Transcoder could have posted finish when
363 // we're pausing it, and the finish arrived after we changed current job.
364 if (mJobMap[jobKey].state == Job::NOT_STARTED) {
365 ALOGW("ignoring progress for job that was never started");
366 return;
367 }
368
369 {
370 auto clientCallback = mJobMap[jobKey].callback.lock();
371 if (clientCallback != nullptr) {
372 clientCallback->onProgressUpdate(jobId, progress);
373 }
374 }
375}
376
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700377void TranscodingJobScheduler::onResourceLost() {
378 ALOGV("%s", __FUNCTION__);
379
380 std::scoped_lock lock{mLock};
381
382 // If we receive a resource loss event, the TranscoderLibrary already paused
383 // the transcoding, so we don't need to call onPaused to notify it to pause.
384 // Only need to update the job state here.
385 if (mCurrentJob != nullptr && mCurrentJob->state == Job::RUNNING) {
386 mCurrentJob->state = Job::PAUSED;
387 }
388 mResourceLost = true;
389
390 validateState_l();
391}
392
Chong Zhangacb33502020-04-20 11:04:48 -0700393void TranscodingJobScheduler::onTopUidsChanged(const std::unordered_set<uid_t>& uids) {
394 if (uids.empty()) {
395 ALOGW("%s: ignoring empty uids", __FUNCTION__);
396 return;
397 }
398
399 std::string uidStr;
400 for (auto it = uids.begin(); it != uids.end(); it++) {
401 if (!uidStr.empty()) {
402 uidStr += ", ";
403 }
404 uidStr += std::to_string(*it);
405 }
406
407 ALOGD("%s: topUids: size %zu, uids: %s", __FUNCTION__, uids.size(), uidStr.c_str());
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700408
409 std::scoped_lock lock{mLock};
410
Chong Zhangacb33502020-04-20 11:04:48 -0700411 moveUidsToTop_l(uids, true /*preserveTopUid*/);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700412
413 updateCurrentJob_l();
414
415 validateState_l();
416}
417
418void TranscodingJobScheduler::onResourceAvailable() {
419 ALOGV("%s", __FUNCTION__);
420
421 std::scoped_lock lock{mLock};
422
423 mResourceLost = false;
424 updateCurrentJob_l();
425
426 validateState_l();
427}
428
429void TranscodingJobScheduler::validateState_l() {
430#ifdef VALIDATE_STATE
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700431 LOG_ALWAYS_FATAL_IF(mJobQueues.count(OFFLINE_UID) != 1,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700432 "mJobQueues offline queue number is not 1");
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700433 LOG_ALWAYS_FATAL_IF(*mOfflineUidIterator != OFFLINE_UID,
434 "mOfflineUidIterator not pointing to offline uid");
435 LOG_ALWAYS_FATAL_IF(mUidSortedList.size() != mJobQueues.size(),
436 "mUidList and mJobQueues size mismatch");
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700437
438 int32_t totalJobs = 0;
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700439 for (auto uidIt = mUidSortedList.begin(); uidIt != mUidSortedList.end(); uidIt++) {
440 LOG_ALWAYS_FATAL_IF(mJobQueues.count(*uidIt) != 1, "mJobQueues count for uid %d is not 1",
441 *uidIt);
442 for (auto jobIt = mJobQueues[*uidIt].begin(); jobIt != mJobQueues[*uidIt].end(); jobIt++) {
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700443 LOG_ALWAYS_FATAL_IF(mJobMap.count(*jobIt) != 1, "mJobs count for job %s is not 1",
444 jobToString(*jobIt).c_str());
445 }
446
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700447 totalJobs += mJobQueues[*uidIt].size();
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700448 }
449 LOG_ALWAYS_FATAL_IF(mJobMap.size() != totalJobs,
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700450 "mJobs size doesn't match total jobs counted from uid queues");
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700451#endif // VALIDATE_STATE
452}
453
454} // namespace android