blob: 6e235c6a2d107368b952261b874e3ff185afa9e9 [file] [log] [blame]
Chong Zhang6d58e4b2020-03-31 09:41:10 -07001/*
2 * Copyright (C) 2020 The Android Open Source Project
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16
17// #define LOG_NDEBUG 0
18#define LOG_TAG "TranscodingJobScheduler"
19
20#define VALIDATE_STATE 1
21
22#include <inttypes.h>
23#include <media/TranscodingJobScheduler.h>
24#include <utils/Log.h>
25
26#include <utility>
27
28namespace android {
29
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070030constexpr static uid_t OFFLINE_UID = -1;
Chong Zhang6d58e4b2020-03-31 09:41:10 -070031
32//static
33String8 TranscodingJobScheduler::jobToString(const JobKeyType& jobKey) {
34 return String8::format("{client:%lld, job:%d}", (long long)jobKey.first, jobKey.second);
35}
36
37TranscodingJobScheduler::TranscodingJobScheduler(
38 const std::shared_ptr<TranscoderInterface>& transcoder,
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070039 const std::shared_ptr<UidPolicyInterface>& uidPolicy)
40 : mTranscoder(transcoder), mUidPolicy(uidPolicy), mCurrentJob(nullptr), mResourceLost(false) {
Chong Zhang6d58e4b2020-03-31 09:41:10 -070041 // Only push empty offline queue initially. Realtime queues are added when requests come in.
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070042 mUidSortedList.push_back(OFFLINE_UID);
43 mOfflineUidIterator = mUidSortedList.begin();
44 mJobQueues.emplace(OFFLINE_UID, JobQueueType());
Chong Zhang6d58e4b2020-03-31 09:41:10 -070045}
46
47TranscodingJobScheduler::~TranscodingJobScheduler() {}
48
49TranscodingJobScheduler::Job* TranscodingJobScheduler::getTopJob_l() {
50 if (mJobMap.empty()) {
51 return nullptr;
52 }
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070053 uid_t topUid = *mUidSortedList.begin();
54 JobKeyType topJobKey = *mJobQueues[topUid].begin();
Chong Zhang6d58e4b2020-03-31 09:41:10 -070055 return &mJobMap[topJobKey];
56}
57
58void TranscodingJobScheduler::updateCurrentJob_l() {
59 Job* topJob = getTopJob_l();
60 Job* curJob = mCurrentJob;
61 ALOGV("updateCurrentJob: topJob is %s, curJob is %s",
62 topJob == nullptr ? "null" : jobToString(topJob->key).c_str(),
63 curJob == nullptr ? "null" : jobToString(curJob->key).c_str());
64
65 // If we found a topJob that should be run, and it's not already running,
66 // take some actions to ensure it's running.
67 if (topJob != nullptr && (topJob != curJob || topJob->state != Job::RUNNING)) {
68 // If another job is currently running, pause it first.
69 if (curJob != nullptr && curJob->state == Job::RUNNING) {
70 mTranscoder->pause(curJob->key.first, curJob->key.second);
71 curJob->state = Job::PAUSED;
72 }
73 // If we are not experiencing resource loss, we can start or resume
74 // the topJob now.
75 if (!mResourceLost) {
76 if (topJob->state == Job::NOT_STARTED) {
77 mTranscoder->start(topJob->key.first, topJob->key.second);
78 } else if (topJob->state == Job::PAUSED) {
79 mTranscoder->resume(topJob->key.first, topJob->key.second);
80 }
81 topJob->state = Job::RUNNING;
82 }
83 }
84 mCurrentJob = topJob;
85}
86
87void TranscodingJobScheduler::removeJob_l(const JobKeyType& jobKey) {
88 ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str());
89
90 if (mJobMap.count(jobKey) == 0) {
91 ALOGE("job %s doesn't exist", jobToString(jobKey).c_str());
92 return;
93 }
94
Chong Zhang7ae4e2f2020-04-17 15:24:34 -070095 // Remove job from uid's queue.
96 const uid_t uid = mJobMap[jobKey].uid;
97 JobQueueType& jobQueue = mJobQueues[uid];
Chong Zhang6d58e4b2020-03-31 09:41:10 -070098 auto it = std::find(jobQueue.begin(), jobQueue.end(), jobKey);
99 if (it == jobQueue.end()) {
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700100 ALOGE("couldn't find job %s in queue for uid %d", jobToString(jobKey).c_str(), uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700101 return;
102 }
103 jobQueue.erase(it);
104
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700105 // If this is the last job in a real-time queue, remove this uid's queue.
106 if (uid != OFFLINE_UID && jobQueue.empty()) {
107 mUidSortedList.remove(uid);
108 mJobQueues.erase(uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700109 }
110
111 // Clear current job.
112 if (mCurrentJob == &mJobMap[jobKey]) {
113 mCurrentJob = nullptr;
114 }
115
116 // Remove job from job map.
117 mJobMap.erase(jobKey);
118}
119
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700120bool TranscodingJobScheduler::submit(ClientIdType clientId, int32_t jobId, uid_t uid,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700121 const TranscodingRequestParcel& request,
122 const std::weak_ptr<ITranscodingClientCallback>& callback) {
123 JobKeyType jobKey = std::make_pair(clientId, jobId);
124
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700125 ALOGV("%s: job %s, uid %d, prioirty %d", __FUNCTION__, jobToString(jobKey).c_str(), uid,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700126 (int32_t)request.priority);
127
128 std::scoped_lock lock{mLock};
129
130 if (mJobMap.count(jobKey) > 0) {
131 ALOGE("job %s already exists", jobToString(jobKey).c_str());
132 return false;
133 }
134
135 // TODO(chz): only support offline vs real-time for now. All kUnspecified jobs
136 // go to offline queue.
137 if (request.priority == TranscodingJobPriority::kUnspecified) {
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700138 uid = OFFLINE_UID;
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700139 }
140
141 // Add job to job map.
142 mJobMap[jobKey].key = jobKey;
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700143 mJobMap[jobKey].uid = uid;
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700144 mJobMap[jobKey].state = Job::NOT_STARTED;
145 mJobMap[jobKey].request = request;
146 mJobMap[jobKey].callback = callback;
147
148 // If it's an offline job, the queue was already added in constructor.
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700149 // If it's a real-time jobs, check if a queue is already present for the uid,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700150 // and add a new queue if needed.
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700151 if (uid != OFFLINE_UID) {
152 if (mJobQueues.count(uid) == 0) {
153 if (mUidPolicy->isUidOnTop(uid)) {
154 mUidSortedList.push_front(uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700155 } else {
156 // Shouldn't be submitting real-time requests from non-top app,
157 // put it in front of the offline queue.
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700158 mUidSortedList.insert(mOfflineUidIterator, uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700159 }
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700160 } else if (uid != *mUidSortedList.begin()) {
161 if (mUidPolicy->isUidOnTop(uid)) {
162 mUidSortedList.remove(uid);
163 mUidSortedList.push_front(uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700164 }
165 }
166 }
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700167 // Append this job to the uid's queue.
168 mJobQueues[uid].push_back(jobKey);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700169
170 updateCurrentJob_l();
171
172 validateState_l();
173 return true;
174}
175
176bool TranscodingJobScheduler::cancel(ClientIdType clientId, int32_t jobId) {
177 JobKeyType jobKey = std::make_pair(clientId, jobId);
178
179 ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str());
180
181 std::scoped_lock lock{mLock};
182
183 if (mJobMap.count(jobKey) == 0) {
184 ALOGE("job %s doesn't exist", jobToString(jobKey).c_str());
185 return false;
186 }
187 // If the job is running, pause it first.
188 if (mJobMap[jobKey].state == Job::RUNNING) {
189 mTranscoder->pause(clientId, jobId);
190 }
191
192 // Remove the job.
193 removeJob_l(jobKey);
194
195 // Start next job.
196 updateCurrentJob_l();
197
198 validateState_l();
199 return true;
200}
201
202bool TranscodingJobScheduler::getJob(ClientIdType clientId, int32_t jobId,
203 TranscodingRequestParcel* request) {
204 JobKeyType jobKey = std::make_pair(clientId, jobId);
205
206 std::scoped_lock lock{mLock};
207
208 if (mJobMap.count(jobKey) == 0) {
209 ALOGE("job %s doesn't exist", jobToString(jobKey).c_str());
210 return false;
211 }
212
213 *(TranscodingRequest*)request = mJobMap[jobKey].request;
214 return true;
215}
216
217void TranscodingJobScheduler::onFinish(ClientIdType clientId, int32_t jobId) {
218 JobKeyType jobKey = std::make_pair(clientId, jobId);
219
220 ALOGV("%s: job %s", __FUNCTION__, jobToString(jobKey).c_str());
221
222 std::scoped_lock lock{mLock};
223
224 if (mJobMap.count(jobKey) == 0) {
225 ALOGW("ignoring abort for non-existent job");
226 return;
227 }
228
229 // Only ignore if job was never started. In particular, propagate the status
230 // to client if the job is paused. Transcoder could have posted finish when
231 // we're pausing it, and the finish arrived after we changed current job.
232 if (mJobMap[jobKey].state == Job::NOT_STARTED) {
233 ALOGW("ignoring abort for job that was never started");
234 return;
235 }
236
237 {
238 auto clientCallback = mJobMap[jobKey].callback.lock();
239 if (clientCallback != nullptr) {
240 clientCallback->onTranscodingFinished(jobId, TranscodingResultParcel({jobId, 0}));
241 }
242 }
243
244 // Remove the job.
245 removeJob_l(jobKey);
246
247 // Start next job.
248 updateCurrentJob_l();
249
250 validateState_l();
251}
252
253void TranscodingJobScheduler::onError(int64_t clientId, int32_t jobId, TranscodingErrorCode err) {
254 JobKeyType jobKey = std::make_pair(clientId, jobId);
255
256 ALOGV("%s: job %s, err %d", __FUNCTION__, jobToString(jobKey).c_str(), (int32_t)err);
257
258 std::scoped_lock lock{mLock};
259
260 if (mJobMap.count(jobKey) == 0) {
261 ALOGW("ignoring abort for non-existent job");
262 return;
263 }
264
265 // Only ignore if job was never started. In particular, propagate the status
266 // to client if the job is paused. Transcoder could have posted finish when
267 // we're pausing it, and the finish arrived after we changed current job.
268 if (mJobMap[jobKey].state == Job::NOT_STARTED) {
269 ALOGW("ignoring abort for job that was never started");
270 return;
271 }
272
273 {
274 auto clientCallback = mJobMap[jobKey].callback.lock();
275 if (clientCallback != nullptr) {
276 clientCallback->onTranscodingFailed(jobId, err);
277 }
278 }
279
280 // Remove the job.
281 removeJob_l(jobKey);
282
283 // Start next job.
284 updateCurrentJob_l();
285
286 validateState_l();
287}
288
289void TranscodingJobScheduler::onResourceLost() {
290 ALOGV("%s", __FUNCTION__);
291
292 std::scoped_lock lock{mLock};
293
294 // If we receive a resource loss event, the TranscoderLibrary already paused
295 // the transcoding, so we don't need to call onPaused to notify it to pause.
296 // Only need to update the job state here.
297 if (mCurrentJob != nullptr && mCurrentJob->state == Job::RUNNING) {
298 mCurrentJob->state = Job::PAUSED;
299 }
300 mResourceLost = true;
301
302 validateState_l();
303}
304
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700305void TranscodingJobScheduler::onTopUidChanged(uid_t uid) {
306 ALOGV("%s: uid %d", __FUNCTION__, uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700307
308 std::scoped_lock lock{mLock};
309
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700310 if (uid == OFFLINE_UID) {
311 ALOGW("%s: ignoring invalid uid %d", __FUNCTION__, uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700312 return;
313 }
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700314 // If this uid doesn't have any jobs, we don't care about it.
315 if (mJobQueues.count(uid) == 0) {
316 ALOGW("%s: ignoring uid %d without any jobs", __FUNCTION__, uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700317 return;
318 }
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700319 // If this uid is already top, don't do anything.
320 if (uid == *mUidSortedList.begin()) {
321 ALOGW("%s: uid %d is already top", __FUNCTION__, uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700322 return;
323 }
324
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700325 mUidSortedList.remove(uid);
326 mUidSortedList.push_front(uid);
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700327
328 updateCurrentJob_l();
329
330 validateState_l();
331}
332
333void TranscodingJobScheduler::onResourceAvailable() {
334 ALOGV("%s", __FUNCTION__);
335
336 std::scoped_lock lock{mLock};
337
338 mResourceLost = false;
339 updateCurrentJob_l();
340
341 validateState_l();
342}
343
344void TranscodingJobScheduler::validateState_l() {
345#ifdef VALIDATE_STATE
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700346 LOG_ALWAYS_FATAL_IF(mJobQueues.count(OFFLINE_UID) != 1,
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700347 "mJobQueues offline queue number is not 1");
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700348 LOG_ALWAYS_FATAL_IF(*mOfflineUidIterator != OFFLINE_UID,
349 "mOfflineUidIterator not pointing to offline uid");
350 LOG_ALWAYS_FATAL_IF(mUidSortedList.size() != mJobQueues.size(),
351 "mUidList and mJobQueues size mismatch");
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700352
353 int32_t totalJobs = 0;
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700354 for (auto uidIt = mUidSortedList.begin(); uidIt != mUidSortedList.end(); uidIt++) {
355 LOG_ALWAYS_FATAL_IF(mJobQueues.count(*uidIt) != 1, "mJobQueues count for uid %d is not 1",
356 *uidIt);
357 for (auto jobIt = mJobQueues[*uidIt].begin(); jobIt != mJobQueues[*uidIt].end(); jobIt++) {
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700358 LOG_ALWAYS_FATAL_IF(mJobMap.count(*jobIt) != 1, "mJobs count for job %s is not 1",
359 jobToString(*jobIt).c_str());
360 }
361
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700362 totalJobs += mJobQueues[*uidIt].size();
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700363 }
364 LOG_ALWAYS_FATAL_IF(mJobMap.size() != totalJobs,
Chong Zhang7ae4e2f2020-04-17 15:24:34 -0700365 "mJobs size doesn't match total jobs counted from uid queues");
Chong Zhang6d58e4b2020-03-31 09:41:10 -0700366#endif // VALIDATE_STATE
367}
368
369} // namespace android