Changeset 3f7e1f24 in mainline for uspace/srv/sysman/job.c
- Timestamp:
- 2019-08-03T08:28:26Z (5 years ago)
- Children:
- 095d03c
- Parents:
- d7c5fc0
- git-author:
- Michal Koutný <xm.koutny+hos@…> (2015-04-22 17:54:08)
- git-committer:
- Matthieu Riolo <matthieu.riolo@…> (2019-08-03 08:28:26)
- File:
-
- 1 edited
Legend:
- Unmodified
- Added
- Removed
-
uspace/srv/sysman/job.c
rd7c5fc0 r3f7e1f24 27 27 */ 28 28 29 #include <adt/ list.h>29 #include <adt/fifo.h> 30 30 #include <assert.h> 31 31 #include <errno.h> 32 #include <fibril.h> 33 #include <fibril_synch.h> 34 #include <stdio.h> 35 #include <stdlib.h> 36 32 33 #include "dep.h" 37 34 #include "job.h" 38 35 #include "log.h" 39 #include " unit.h"36 #include "sysman.h" 40 37 41 38 static list_t job_queue; 42 static fibril_mutex_t job_queue_mtx; 43 static fibril_condvar_t job_queue_cv; 44 45 static void job_destroy(job_t **); 46 47 48 static int job_run_start(job_t *job) 49 { 50 sysman_log(LVL_DEBUG, "%s(%p)", __func__, job); 51 unit_t *unit = job->unit; 52 53 int rc = unit_start(unit); 39 40 /* 41 * Static functions 42 */ 43 44 static int job_add_blocked_job(job_t *job, job_t *blocked_job) 45 { 46 int rc = dyn_array_append(&job->blocked_jobs, job_ptr_t, blocked_job); 54 47 if (rc != EOK) { 55 return rc; 56 } 57 58 fibril_mutex_lock(&unit->state_mtx); 59 while (unit->state != STATE_STARTED) { 60 fibril_condvar_wait(&unit->state_cv, &unit->state_mtx); 61 } 62 fibril_mutex_unlock(&unit->state_mtx); 63 64 // TODO react to failed state 48 return ENOMEM; 49 } 50 51 job_add_ref(blocked_job); 52 blocked_job->blocking_jobs += 1; 53 65 54 return EOK; 66 55 } 67 56 68 static int job_runner(void *arg) 69 { 70 job_t *job = (job_t *)arg; 71 72 int retval = EOK; 73 74 /* Wait for previous jobs */ 75 list_foreach(job->blocking_jobs, link, job_link_t, jl) { 76 retval = job_wait(jl->job); 77 if (retval != EOK) { 78 break; 79 } 80 } 81 82 if (retval != EOK) { 83 goto finish; 84 } 85 86 /* Run the job itself */ 87 fibril_mutex_lock(&job->state_mtx); 88 job->state = JOB_RUNNING; 89 fibril_condvar_broadcast(&job->state_cv); 90 fibril_mutex_unlock(&job->state_mtx); 91 92 switch (job->type) { 93 case JOB_START: 94 retval = job_run_start(job); 95 break; 96 default: 97 assert(false); 98 } 99 100 101 finish: 102 fibril_mutex_lock(&job->state_mtx); 103 job->state = JOB_FINISHED; 104 job->retval = retval; 105 fibril_condvar_broadcast(&job->state_cv); 106 fibril_mutex_unlock(&job->state_mtx); 107 108 job_del_ref(&job); 109 110 return EOK; 111 } 112 113 static int job_dispatcher(void *arg) 114 { 115 fibril_mutex_lock(&job_queue_mtx); 116 while (1) { 117 while (list_empty(&job_queue)) { 118 fibril_condvar_wait(&job_queue_cv, &job_queue_mtx); 119 } 120 121 link_t *link = list_first(&job_queue); 122 assert(link); 123 list_remove(link); 124 125 /* 126 * Note that possible use of fibril pool must hold invariant 127 * that job is started asynchronously. In the case there exists 128 * circular dependency between jobs, it may result in a deadlock. 129 */ 130 job_t *job = list_get_instance(link, job_t, link); 131 fid_t runner_fibril = fibril_create(job_runner, job); 132 fibril_add_ready(runner_fibril); 133 } 134 135 fibril_mutex_unlock(&job_queue_mtx); 136 return EOK; 137 } 57 static void job_init(job_t *job, unit_t *u, unit_state_t target_state) 58 { 59 assert(job); 60 assert(u); 61 62 link_initialize(&job->job_queue); 63 64 /* Start with one reference for the creator */ 65 atomic_set(&job->refcnt, 1); 66 67 job->target_state = target_state; 68 job->unit = u; 69 70 dyn_array_initialize(&job->blocked_jobs, job_ptr_t, 0); 71 job->blocking_jobs = 0; 72 job->blocking_job_failed = false; 73 74 job->state = JOB_UNQUEUED; 75 job->retval = JOB_UNDEFINED_; 76 } 77 78 static bool job_eval_retval(job_t *job) 79 { 80 unit_t *u = job->unit; 81 if (u->state == job->target_state) { 82 job->retval = JOB_OK; 83 return true; 84 } else if (u->state == STATE_FAILED) { 85 job->retval = JOB_FAILED; 86 return true; 87 } else { 88 return false; 89 } 90 } 91 92 static bool job_is_runnable(job_t *job) 93 { 94 return job->state == JOB_QUEUED && job->blocking_jobs == 0; 95 } 96 97 static void job_check(void *object, void *data) 98 { 99 unit_t *u = object; 100 job_t *job = data; 101 102 if (job_eval_retval(job)) { 103 job_finish(job); 104 } else { 105 // TODO place for timeout 106 // TODO add reference to job? 107 sysman_object_observer(u, &job_check, job); 108 } 109 } 110 111 112 static void job_unblock(job_t *blocked_job, job_t *blocking_job) 113 { 114 if (blocking_job->retval == JOB_FAILED) { 115 blocked_job->blocking_job_failed = true; 116 } 117 blocked_job->blocking_jobs -= 1; 118 } 119 120 static void job_destroy(job_t **job_ptr) 121 { 122 job_t *job = *job_ptr; 123 if (job == NULL) { 124 return; 125 } 126 127 assert(!link_used(&job->job_queue)); 128 dyn_array_destroy(&job->blocked_jobs); 129 // TODO I should decrease referece of blocked jobs 130 131 free(job); 132 *job_ptr = NULL; 133 } 134 135 /* 136 * Non-static functions 137 */ 138 138 139 139 void job_queue_init() 140 140 { 141 141 list_initialize(&job_queue); 142 fibril_mutex_initialize(&job_queue_mtx); 143 fibril_condvar_initialize(&job_queue_cv); 144 145 fid_t dispatcher_fibril = fibril_create(job_dispatcher, NULL); 146 fibril_add_ready(dispatcher_fibril); 147 } 148 149 int job_queue_jobs(list_t *jobs) 150 { 151 fibril_mutex_lock(&job_queue_mtx); 152 142 } 143 144 int job_queue_add_jobs(dyn_array_t *jobs) 145 { 153 146 /* Check consistency with queue. */ 154 list_foreach(*jobs, link, job_t, new_job) {155 list_foreach(job_queue, link, job_t, queued_job) {147 dyn_array_foreach(*jobs, job_ptr_t, new_job_it) { 148 list_foreach(job_queue, job_queue, job_t, queued_job) { 156 149 /* 157 150 * Currently we have strict strategy not permitting 158 151 * multiple jobs for one unit in the queue. 159 152 */ 160 if (new_job->unit == queued_job->unit) { 153 if ((*new_job_it)->unit == queued_job->unit) { 154 sysman_log(LVL_ERROR, 155 "Cannot queue multiple jobs foor unit '%s'", 156 unit_name((*new_job_it)->unit)); 161 157 return EEXIST; 162 158 } … … 165 161 166 162 /* Enqueue jobs */ 167 list_foreach_safe(*jobs, cur_link, next_link) { 168 list_remove(cur_link); 169 list_append(cur_link, &job_queue); 170 } 171 172 /* Only job dispatcher waits, it's correct to notify one only. */ 173 fibril_condvar_signal(&job_queue_cv); 174 fibril_mutex_unlock(&job_queue_mtx); 163 dyn_array_foreach(*jobs, job_ptr_t, job_it) { 164 (*job_it)->state = JOB_QUEUED; 165 list_append(&(*job_it)->job_queue, &job_queue); 166 // TODO explain this reference 167 job_add_ref(*job_it); 168 } 175 169 176 170 return EOK; 177 171 } 178 172 179 /** Blocking wait for job finishing. 180 * 181 * Multiple fibrils may wait for the same job. 182 * 183 * @return Return code of the job 184 */ 185 int job_wait(job_t *job) 186 { 187 fibril_mutex_lock(&job->state_mtx); 188 while (job->state != JOB_FINISHED) { 189 fibril_condvar_wait(&job->state_cv, &job->state_mtx); 190 } 191 192 int rc = job->retval; 193 fibril_mutex_unlock(&job->state_mtx); 194 173 /** Pop next runnable job 174 * 175 * @return runnable job or NULL when there's none 176 */ 177 job_t *job_queue_pop_runnable(void) 178 { 179 job_t *result = NULL; 180 link_t *first_link = list_first(&job_queue); 181 bool first_iteration = true; 182 183 list_foreach_safe(job_queue, cur_link, next_link) { 184 result = list_get_instance(cur_link, job_t, job_queue); 185 if (job_is_runnable(result)) { 186 break; 187 } else if (!first_iteration && cur_link == first_link) { 188 result = NULL; 189 break; 190 } else { 191 /* 192 * We make no assuptions about ordering of jobs in the 193 * queue, so just move the job to the end of the queue. 194 * If there are exist topologic ordering, eventually 195 * jobs will be reordered. Furthermore when if there 196 * exists any runnable job, it's always found. 197 */ 198 list_remove(cur_link); 199 list_append(cur_link, &job_queue); 200 } 201 first_iteration = false; 202 } 203 204 if (result) { 205 // TODO update refcount 206 list_remove(&result->job_queue); 207 result->state = JOB_DEQUEUED; 208 } 209 210 return result; 211 } 212 213 int job_create_closure(job_t *main_job, dyn_array_t *job_closure) 214 { 215 // TODO replace hard-coded FIFO size with resizable FIFO 216 FIFO_INITIALIZE_DYNAMIC(jobs_fifo, job_ptr_t, 10); 217 void *fifo_data = fifo_create(jobs_fifo); 218 int rc; 219 if (fifo_data == NULL) { 220 rc = ENOMEM; 221 goto finish; 222 } 223 224 /* 225 * Traverse dependency graph in BFS fashion and create jobs for every 226 * necessary unit. 227 */ 228 fifo_push(jobs_fifo, main_job); 229 job_t *job; 230 while ((job = fifo_pop(jobs_fifo)) != NULL) { 231 /* 232 * Do not increase reference count of job, as we're passing it 233 * to the closure. 234 */ 235 dyn_array_append(job_closure, job_ptr_t, job); 236 237 /* Traverse dependencies edges */ 238 unit_t *u = job->unit; 239 list_foreach(u->dependencies, dependencies, unit_dependency_t, dep) { 240 // TODO prepare for reverse edge direction and 241 // non-identity state mapping 242 job_t *new_job = 243 job_create(dep->dependency, job->target_state); 244 if (new_job == NULL) { 245 rc = ENOMEM; 246 goto finish; 247 } 248 job_add_blocked_job(new_job, job); 249 fifo_push(jobs_fifo, new_job); 250 } 251 } 252 rc = EOK; 253 254 finish: 255 free(fifo_data); 256 /* 257 * Newly created jobs are already passed to the closure, thus not 258 * deleting them here. 259 */ 195 260 return rc; 261 } 262 263 job_t *job_create(unit_t *u, unit_state_t target_state) 264 { 265 job_t *job = malloc(sizeof(job_t)); 266 if (job != NULL) { 267 job_init(job, u, target_state); 268 } 269 270 return job; 196 271 } 197 272 … … 214 289 } 215 290 216 static void job_init(job_t *job, job_type_t type) 217 { 218 assert(job); 219 220 link_initialize(&job->link); 221 list_initialize(&job->blocking_jobs); 222 223 /* Start with one reference for the creator */ 224 atomic_set(&job->refcnt, 1); 225 226 job->type = type; 227 job->unit = NULL; 228 229 job->state = JOB_WAITING; 230 fibril_mutex_initialize(&job->state_mtx); 231 fibril_condvar_initialize(&job->state_cv); 232 } 233 234 job_t *job_create(job_type_t type) 235 { 236 job_t *job = malloc(sizeof(job_t)); 237 if (job != NULL) { 238 job_init(job, type); 239 } 240 241 return job; 242 } 243 244 int job_add_blocking_job(job_t *job, job_t *blocking_job) 245 { 246 job_link_t *job_link = malloc(sizeof(job_link_t)); 247 if (job_link == NULL) { 248 return ENOMEM; 249 } 250 251 link_initialize(&job_link->link); 252 list_append(&job_link->link, &job->blocking_jobs); 253 254 job_link->job = blocking_job; 255 job_add_ref(blocking_job); 256 257 return EOK; 258 } 259 260 static void job_destroy(job_t **job_ptr) 261 { 262 job_t *job = *job_ptr; 263 if (job == NULL) { 264 return; 265 } 266 267 list_foreach_safe(job->blocking_jobs, cur_link, next_link) { 268 job_link_t *jl = list_get_instance(cur_link, job_link_t, link); 269 list_remove(cur_link); 270 job_del_ref(&jl->job); 271 free(jl); 272 } 273 free(job); 274 275 *job_ptr = NULL; 276 } 291 void job_run(job_t *job) 292 { 293 assert(job->state != JOB_RUNNING); 294 assert(job->state != JOB_FINISHED); 295 296 unit_t *u = job->unit; 297 sysman_log(LVL_DEBUG, "%s, %s -> %i", 298 __func__, unit_name(u), job->target_state); 299 300 /* Propagate failure */ 301 if (job->blocking_job_failed) { 302 goto fail; 303 } 304 305 int rc; 306 switch (job->target_state) { 307 case STATE_STARTED: 308 rc = unit_start(u); 309 break; 310 default: 311 // TODO implement other states 312 assert(false); 313 } 314 if (rc != EOK) { 315 goto fail; 316 } 317 318 job_check(job->unit, job); 319 return; 320 321 fail: 322 job->retval = JOB_FAILED; 323 job_finish(job); 324 } 325 326 /** Unblocks blocked jobs and notify observers 327 * 328 * @param[in] job job with defined return value 329 */ 330 void job_finish(job_t *job) 331 { 332 assert(job->state != JOB_FINISHED); 333 assert(job->retval != JOB_UNDEFINED_); 334 335 sysman_log(LVL_DEBUG2, "%s(%s) -> %i", 336 __func__, unit_name(job->unit), job->retval); 337 338 job->state = JOB_FINISHED; 339 340 /* Job finished */ 341 dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) { 342 job_unblock(*job_it, job); 343 } 344 // TODO remove reference from blocked jobs 345 346 // TODO should reference be added (for the created event) 347 sysman_raise_event(&sysman_event_job_changed, job); 348 } 349
Note:
See TracChangeset
for help on using the changeset viewer.