| Jerry Zhang | 487be61 | 2016-10-24 12:10:41 -0700 | [diff] [blame^] | 1 | /* | 
 | 2 |  * Copyright (C) 2016 The Android Open Source Project | 
 | 3 |  * | 
 | 4 |  * Licensed under the Apache License, Version 2.0 (the "License"); | 
 | 5 |  * you may not use this file except in compliance with the License. | 
 | 6 |  * You may obtain a copy of the License at | 
 | 7 |  * | 
 | 8 |  *      http://www.apache.org/licenses/LICENSE-2.0 | 
 | 9 |  * | 
 | 10 |  * Unless required by applicable law or agreed to in writing, software | 
 | 11 |  * distributed under the License is distributed on an "AS IS" BASIS, | 
 | 12 |  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | 
 | 13 |  * See the License for the specific language governing permissions and | 
 | 14 |  * limitations under the License. | 
 | 15 |  */ | 
 | 16 |  | 
 | 17 | #include <android-base/logging.h> | 
 | 18 | #include <condition_variable> | 
 | 19 | #include <memory> | 
 | 20 | #include <mutex> | 
 | 21 | #include <queue> | 
 | 22 |  | 
 | 23 | #include "AsyncIO.h" | 
 | 24 |  | 
 | 25 | namespace { | 
 | 26 |  | 
 | 27 | void read_func(struct aiocb *aiocbp) { | 
 | 28 |     aiocbp->ret = TEMP_FAILURE_RETRY(pread(aiocbp->aio_fildes, | 
 | 29 |                 aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset)); | 
 | 30 |     if (aiocbp->ret == -1) aiocbp->error = errno; | 
 | 31 | } | 
 | 32 |  | 
 | 33 | void write_func(struct aiocb *aiocbp) { | 
 | 34 |     aiocbp->ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes, | 
 | 35 |                 aiocbp->aio_buf, aiocbp->aio_nbytes, aiocbp->aio_offset)); | 
 | 36 |     if (aiocbp->ret == -1) aiocbp->error = errno; | 
 | 37 | } | 
 | 38 |  | 
 | 39 | void splice_read_func(struct aiocb *aiocbp) { | 
 | 40 |     aiocbp->ret = TEMP_FAILURE_RETRY(splice(aiocbp->aio_fildes, | 
 | 41 |                 (off64_t*) &aiocbp->aio_offset, aiocbp->aio_sink, | 
 | 42 |                 NULL, aiocbp->aio_nbytes, 0)); | 
 | 43 |     if (aiocbp->ret == -1) aiocbp->error = errno; | 
 | 44 | } | 
 | 45 |  | 
 | 46 | void splice_write_func(struct aiocb *aiocbp) { | 
 | 47 |     aiocbp->ret = TEMP_FAILURE_RETRY(splice(aiocbp->aio_fildes, NULL, | 
 | 48 |                 aiocbp->aio_sink, (off64_t*) &aiocbp->aio_offset, | 
 | 49 |                 aiocbp->aio_nbytes, 0)); | 
 | 50 |     if (aiocbp->ret == -1) aiocbp->error = errno; | 
 | 51 | } | 
 | 52 |  | 
 | 53 | std::queue<std::unique_ptr<struct aiocb>> queue; | 
 | 54 | std::mutex queue_lock; | 
 | 55 | std::condition_variable queue_cond; | 
 | 56 | std::condition_variable write_cond; | 
 | 57 | int done = 1; | 
 | 58 | void splice_write_pool_func(int) { | 
 | 59 |     while(1) { | 
 | 60 |         std::unique_lock<std::mutex> lk(queue_lock); | 
 | 61 |         queue_cond.wait(lk, []{return !queue.empty() || done;}); | 
 | 62 |         if (queue.empty() && done) { | 
 | 63 |             return; | 
 | 64 |         } | 
 | 65 |         std::unique_ptr<struct aiocb> aiocbp = std::move(queue.front()); | 
 | 66 |         queue.pop(); | 
 | 67 |         lk.unlock(); | 
 | 68 |         write_cond.notify_one(); | 
 | 69 |         splice_write_func(aiocbp.get()); | 
 | 70 |         close(aiocbp->aio_fildes); | 
 | 71 |     } | 
 | 72 | } | 
 | 73 |  | 
 | 74 | void write_pool_func(int) { | 
 | 75 |     while(1) { | 
 | 76 |         std::unique_lock<std::mutex> lk(queue_lock); | 
 | 77 |         queue_cond.wait(lk, []{return !queue.empty() || done;}); | 
 | 78 |         if (queue.empty() && done) { | 
 | 79 |             return; | 
 | 80 |         } | 
 | 81 |         std::unique_ptr<struct aiocb> aiocbp = std::move(queue.front()); | 
 | 82 |         queue.pop(); | 
 | 83 |         lk.unlock(); | 
 | 84 |         write_cond.notify_one(); | 
 | 85 |         aiocbp->ret = TEMP_FAILURE_RETRY(pwrite(aiocbp->aio_fildes, | 
 | 86 |                     aiocbp->aio_pool_buf.get(), aiocbp->aio_nbytes, aiocbp->aio_offset)); | 
 | 87 |         if (aiocbp->ret == -1) aiocbp->error = errno; | 
 | 88 |     } | 
 | 89 | } | 
 | 90 |  | 
 | 91 | constexpr int NUM_THREADS = 1; | 
 | 92 | constexpr int MAX_QUEUE_SIZE = 10; | 
 | 93 | std::thread pool[NUM_THREADS]; | 
 | 94 |  | 
 | 95 | } // end anonymous namespace | 
 | 96 |  | 
 | 97 | void aio_pool_init(void(f)(int)) { | 
 | 98 |     CHECK(done == 1); | 
 | 99 |     done = 0; | 
 | 100 |     for (int i = 0; i < NUM_THREADS; i++) { | 
 | 101 |         pool[i] = std::thread(f, i); | 
 | 102 |     } | 
 | 103 | } | 
 | 104 |  | 
 | 105 | void aio_pool_splice_init() { | 
 | 106 |     aio_pool_init(splice_write_pool_func); | 
 | 107 | } | 
 | 108 |  | 
 | 109 | void aio_pool_write_init() { | 
 | 110 |     aio_pool_init(write_pool_func); | 
 | 111 | } | 
 | 112 |  | 
 | 113 | void aio_pool_end() { | 
 | 114 |     done = 1; | 
 | 115 |     for (int i = 0; i < NUM_THREADS; i++) { | 
 | 116 |         std::unique_lock<std::mutex> lk(queue_lock); | 
 | 117 |         lk.unlock(); | 
 | 118 |         queue_cond.notify_one(); | 
 | 119 |     } | 
 | 120 |  | 
 | 121 |     for (int i = 0; i < NUM_THREADS; i++) { | 
 | 122 |         pool[i].join(); | 
 | 123 |     } | 
 | 124 | } | 
 | 125 |  | 
 | 126 | // used for both writes and splices depending on which init was used before. | 
 | 127 | int aio_pool_write(struct aiocb *aiocbp) { | 
 | 128 |     std::unique_lock<std::mutex> lk(queue_lock); | 
 | 129 |     write_cond.wait(lk, []{return queue.size() < MAX_QUEUE_SIZE;}); | 
 | 130 |     queue.push(std::unique_ptr<struct aiocb>(aiocbp)); | 
 | 131 |     lk.unlock(); | 
 | 132 |     queue_cond.notify_one(); | 
 | 133 |     return 0; | 
 | 134 | } | 
 | 135 |  | 
 | 136 | int aio_read(struct aiocb *aiocbp) { | 
 | 137 |     aiocbp->thread = std::thread(read_func, aiocbp); | 
 | 138 |     return 0; | 
 | 139 | } | 
 | 140 |  | 
 | 141 | int aio_write(struct aiocb *aiocbp) { | 
 | 142 |     aiocbp->thread = std::thread(write_func, aiocbp); | 
 | 143 |     return 0; | 
 | 144 | } | 
 | 145 |  | 
 | 146 | int aio_splice_read(struct aiocb *aiocbp) { | 
 | 147 |     aiocbp->thread = std::thread(splice_read_func, aiocbp); | 
 | 148 |     return 0; | 
 | 149 | } | 
 | 150 |  | 
 | 151 | int aio_splice_write(struct aiocb *aiocbp) { | 
 | 152 |     aiocbp->thread = std::thread(splice_write_func, aiocbp); | 
 | 153 |     return 0; | 
 | 154 | } | 
 | 155 |  | 
 | 156 | int aio_error(const struct aiocb *aiocbp) { | 
 | 157 |     return aiocbp->error; | 
 | 158 | } | 
 | 159 |  | 
 | 160 | ssize_t aio_return(struct aiocb *aiocbp) { | 
 | 161 |     return aiocbp->ret; | 
 | 162 | } | 
 | 163 |  | 
 | 164 | int aio_suspend(struct aiocb *aiocbp[], int n, | 
 | 165 |         const struct timespec *) { | 
 | 166 |     for (int i = 0; i < n; i++) { | 
 | 167 |         aiocbp[i]->thread.join(); | 
 | 168 |     } | 
 | 169 |     return 0; | 
 | 170 | } | 
 | 171 |  | 
 | 172 | int aio_cancel(int, struct aiocb *) { | 
 | 173 |     // Not implemented | 
 | 174 |     return -1; | 
 | 175 | } | 
 | 176 |  |