Changeset 3f7e1f24 in mainline for uspace/srv/sysman/job.c


Ignore:
Timestamp:
2019-08-03T08:28:26Z (5 years ago)
Author:
Matthieu Riolo <matthieu.riolo@…>
Children:
095d03c
Parents:
d7c5fc0
git-author:
Michal Koutný <xm.koutny+hos@…> (2015-04-22 17:54:08)
git-committer:
Matthieu Riolo <matthieu.riolo@…> (2019-08-03 08:28:26)
Message:

sysman: Refactored job manipulation (event loop + one main fibril)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/srv/sysman/job.c

    rd7c5fc0 r3f7e1f24  
    2727 */
    2828
    29 #include <adt/list.h>
     29#include <adt/fifo.h>
    3030#include <assert.h>
    3131#include <errno.h>
    32 #include <fibril.h>
    33 #include <fibril_synch.h>
    34 #include <stdio.h>
    35 #include <stdlib.h>
    36 
     32
     33#include "dep.h"
    3734#include "job.h"
    3835#include "log.h"
    39 #include "unit.h"
     36#include "sysman.h"
    4037
    4138static list_t job_queue;
    42 static fibril_mutex_t job_queue_mtx;
    43 static fibril_condvar_t job_queue_cv;
    44 
    45 static void job_destroy(job_t **);
    46 
    47 
    48 static int job_run_start(job_t *job)
    49 {
    50         sysman_log(LVL_DEBUG, "%s(%p)", __func__, job);
    51         unit_t *unit = job->unit;
    52 
    53         int rc = unit_start(unit);
     39
     40/*
     41 * Static functions
     42 */
     43
     44static int job_add_blocked_job(job_t *job, job_t *blocked_job)
     45{
     46        int rc = dyn_array_append(&job->blocked_jobs, job_ptr_t, blocked_job);
    5447        if (rc != EOK) {
    55                 return rc;
    56         }
    57 
    58         fibril_mutex_lock(&unit->state_mtx);
    59         while (unit->state != STATE_STARTED) {
    60                 fibril_condvar_wait(&unit->state_cv, &unit->state_mtx);
    61         }
    62         fibril_mutex_unlock(&unit->state_mtx);
    63 
    64         // TODO react to failed state
     48                return ENOMEM;
     49        }
     50
     51        job_add_ref(blocked_job);
     52        blocked_job->blocking_jobs += 1;
     53
    6554        return EOK;
    6655}
    6756
    68 static int job_runner(void *arg)
    69 {
    70         job_t *job = (job_t *)arg;
    71 
    72         int retval = EOK;
    73 
    74         /* Wait for previous jobs */
    75         list_foreach(job->blocking_jobs, link, job_link_t, jl) {
    76                 retval = job_wait(jl->job);
    77                 if (retval != EOK) {
    78                         break;
    79                 }
    80         }
    81 
    82         if (retval != EOK) {
    83                 goto finish;
    84         }
    85 
    86         /* Run the job itself */
    87         fibril_mutex_lock(&job->state_mtx);
    88         job->state = JOB_RUNNING;
    89         fibril_condvar_broadcast(&job->state_cv);
    90         fibril_mutex_unlock(&job->state_mtx);
    91 
    92         switch (job->type) {
    93         case JOB_START:
    94                 retval = job_run_start(job);
    95                 break;
    96         default:
    97                 assert(false);
    98         }
    99 
    100 
    101 finish:
    102         fibril_mutex_lock(&job->state_mtx);
    103         job->state = JOB_FINISHED;
    104         job->retval = retval;
    105         fibril_condvar_broadcast(&job->state_cv);
    106         fibril_mutex_unlock(&job->state_mtx);
    107 
    108         job_del_ref(&job);
    109 
    110         return EOK;
    111 }
    112 
    113 static int job_dispatcher(void *arg)
    114 {
    115         fibril_mutex_lock(&job_queue_mtx);
    116         while (1) {
    117                 while (list_empty(&job_queue)) {
    118                         fibril_condvar_wait(&job_queue_cv, &job_queue_mtx);
    119                 }
    120                
    121                 link_t *link = list_first(&job_queue);
    122                 assert(link);
    123                 list_remove(link);
    124 
    125                 /*
    126                  * Note that possible use of fibril pool must hold invariant
    127                  * that job is started asynchronously. In the case there exists
    128                  * circular dependency between jobs, it may result in a deadlock.
    129                  */
    130                 job_t *job = list_get_instance(link, job_t, link);
    131                 fid_t runner_fibril = fibril_create(job_runner, job);
    132                 fibril_add_ready(runner_fibril);
    133         }
    134 
    135         fibril_mutex_unlock(&job_queue_mtx);
    136         return EOK;
    137 }
     57static void job_init(job_t *job, unit_t *u, unit_state_t target_state)
     58{
     59        assert(job);
     60        assert(u);
     61
     62        link_initialize(&job->job_queue);
     63
     64        /* Start with one reference for the creator */
     65        atomic_set(&job->refcnt, 1);
     66
     67        job->target_state = target_state;
     68        job->unit = u;
     69
     70        dyn_array_initialize(&job->blocked_jobs, job_ptr_t, 0);
     71        job->blocking_jobs = 0;
     72        job->blocking_job_failed = false;
     73
     74        job->state = JOB_UNQUEUED;
     75        job->retval = JOB_UNDEFINED_;
     76}
     77
     78static bool job_eval_retval(job_t *job)
     79{
     80        unit_t *u = job->unit;
     81        if (u->state == job->target_state) {
     82                job->retval = JOB_OK;
     83                return true;
     84        } else if (u->state == STATE_FAILED) {
     85                job->retval = JOB_FAILED;
     86                return true;
     87        } else {
     88                return false;
     89        }
     90}
     91
     92static bool job_is_runnable(job_t *job)
     93{
     94        return job->state == JOB_QUEUED && job->blocking_jobs == 0;
     95}
     96
     97static void job_check(void *object, void *data)
     98{
     99        unit_t *u = object;
     100        job_t *job = data;
     101
     102        if (job_eval_retval(job)) {
     103                job_finish(job);
     104        } else {
     105                // TODO place for timeout
     106                // TODO add reference to job?
     107                sysman_object_observer(u, &job_check, job);
     108        }
     109}
     110
     111
     112static void job_unblock(job_t *blocked_job, job_t *blocking_job)
     113{
     114        if (blocking_job->retval == JOB_FAILED) {
     115                blocked_job->blocking_job_failed = true;
     116        }
     117        blocked_job->blocking_jobs -= 1;
     118}
     119
     120static void job_destroy(job_t **job_ptr)
     121{
     122        job_t *job = *job_ptr;
     123        if (job == NULL) {
     124                return;
     125        }
     126
     127        assert(!link_used(&job->job_queue));
     128        dyn_array_destroy(&job->blocked_jobs);
     129        // TODO I should decrease referece of blocked jobs
     130
     131        free(job);
     132        *job_ptr = NULL;
     133}
     134
     135/*
     136 * Non-static functions
     137 */
    138138
    139139void job_queue_init()
    140140{
    141141        list_initialize(&job_queue);
    142         fibril_mutex_initialize(&job_queue_mtx);
    143         fibril_condvar_initialize(&job_queue_cv);
    144 
    145         fid_t dispatcher_fibril = fibril_create(job_dispatcher, NULL);
    146         fibril_add_ready(dispatcher_fibril);
    147 }
    148 
    149 int job_queue_jobs(list_t *jobs)
    150 {
    151         fibril_mutex_lock(&job_queue_mtx);
    152 
     142}
     143
     144int job_queue_add_jobs(dyn_array_t *jobs)
     145{
    153146        /* Check consistency with queue. */
    154         list_foreach(*jobs, link, job_t, new_job) {
    155                 list_foreach(job_queue, link, job_t, queued_job) {
     147        dyn_array_foreach(*jobs, job_ptr_t, new_job_it) {
     148                list_foreach(job_queue, job_queue, job_t, queued_job) {
    156149                        /*
    157150                         * Currently we have strict strategy not permitting
    158151                         * multiple jobs for one unit in the queue.
    159152                         */
    160                         if (new_job->unit == queued_job->unit) {
     153                        if ((*new_job_it)->unit == queued_job->unit) {
     154                                sysman_log(LVL_ERROR,
     155                                    "Cannot queue multiple jobs foor unit '%s'",
     156                                    unit_name((*new_job_it)->unit));
    161157                                return EEXIST;
    162158                        }
     
    165161
    166162        /* Enqueue jobs */
    167         list_foreach_safe(*jobs, cur_link, next_link) {
    168                 list_remove(cur_link);
    169                 list_append(cur_link, &job_queue);
    170         }
    171 
    172         /* Only job dispatcher waits, it's correct to notify one only. */
    173         fibril_condvar_signal(&job_queue_cv);
    174         fibril_mutex_unlock(&job_queue_mtx);
     163        dyn_array_foreach(*jobs, job_ptr_t, job_it) {
     164                (*job_it)->state = JOB_QUEUED;
     165                list_append(&(*job_it)->job_queue, &job_queue);
     166                // TODO explain this reference
     167                job_add_ref(*job_it);
     168        }
    175169
    176170        return EOK;
    177171}
    178172
    179 /** Blocking wait for job finishing.
    180  *
    181  * Multiple fibrils may wait for the same job.
    182  *
    183  * @return    Return code of the job
    184  */
    185 int job_wait(job_t *job)
    186 {
    187         fibril_mutex_lock(&job->state_mtx);
    188         while (job->state != JOB_FINISHED) {
    189                 fibril_condvar_wait(&job->state_cv, &job->state_mtx);
    190         }
    191 
    192         int rc = job->retval;
    193         fibril_mutex_unlock(&job->state_mtx);
    194 
     173/** Pop next runnable job
     174 *
     175 * @return runnable job or NULL when there's none
     176 */
     177job_t *job_queue_pop_runnable(void)
     178{
     179        job_t *result = NULL;
     180        link_t *first_link = list_first(&job_queue);
     181        bool first_iteration = true;
     182
     183        list_foreach_safe(job_queue, cur_link, next_link) {
     184                result = list_get_instance(cur_link, job_t, job_queue);
     185                if (job_is_runnable(result)) {
     186                        break;
     187                } else if (!first_iteration && cur_link == first_link) {
     188                        result = NULL;
     189                        break;
     190                } else {
     191                        /*
     192                         * We make no assuptions about ordering of jobs in the
     193                         * queue, so just move the job to the end of the queue.
     194                         * If there are exist topologic ordering, eventually
     195                         * jobs will be reordered. Furthermore when if there
     196                         * exists any runnable job, it's always found.
     197                         */
     198                        list_remove(cur_link);
     199                        list_append(cur_link, &job_queue);
     200                }
     201                first_iteration = false;
     202        }
     203
     204        if (result) {
     205                // TODO update refcount
     206                list_remove(&result->job_queue);
     207                result->state = JOB_DEQUEUED;
     208        }
     209
     210        return result;
     211}
     212
     213int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
     214{
     215        // TODO replace hard-coded FIFO size with resizable FIFO
     216        FIFO_INITIALIZE_DYNAMIC(jobs_fifo, job_ptr_t, 10);
     217        void *fifo_data = fifo_create(jobs_fifo);
     218        int rc;
     219        if (fifo_data == NULL) {
     220                rc = ENOMEM;
     221                goto finish;
     222        }
     223
     224        /*
     225         * Traverse dependency graph in BFS fashion and create jobs for every
     226         * necessary unit.
     227         */
     228        fifo_push(jobs_fifo, main_job);
     229        job_t *job;
     230        while ((job = fifo_pop(jobs_fifo)) != NULL) {
     231                /*
     232                 * Do not increase reference count of job, as we're passing it
     233                 * to the closure.
     234                 */
     235                dyn_array_append(job_closure, job_ptr_t, job);
     236
     237                /* Traverse dependencies edges */
     238                unit_t *u = job->unit;
     239                list_foreach(u->dependencies, dependencies, unit_dependency_t, dep) {
     240                        // TODO prepare for reverse edge direction and
     241                        //      non-identity state mapping
     242                        job_t *new_job =
     243                            job_create(dep->dependency, job->target_state);
     244                        if (new_job == NULL) {
     245                                rc = ENOMEM;
     246                                goto finish;
     247                        }
     248                        job_add_blocked_job(new_job, job);
     249                        fifo_push(jobs_fifo, new_job);
     250                }
     251        }
     252        rc = EOK;
     253
     254finish:
     255        free(fifo_data);
     256        /*
     257         * Newly created jobs are already passed to the closure, thus not
     258         * deleting them here.
     259         */
    195260        return rc;
     261}
     262
     263job_t *job_create(unit_t *u, unit_state_t target_state)
     264{
     265        job_t *job = malloc(sizeof(job_t));
     266        if (job != NULL) {
     267                job_init(job, u, target_state);
     268        }
     269
     270        return job;
    196271}
    197272
     
    214289}
    215290
    216 static void job_init(job_t *job, job_type_t type)
    217 {
    218         assert(job);
    219 
    220         link_initialize(&job->link);
    221         list_initialize(&job->blocking_jobs);
    222 
    223         /* Start with one reference for the creator */
    224         atomic_set(&job->refcnt, 1);
    225 
    226         job->type = type;
    227         job->unit = NULL;
    228 
    229         job->state = JOB_WAITING;
    230         fibril_mutex_initialize(&job->state_mtx);
    231         fibril_condvar_initialize(&job->state_cv);
    232 }
    233 
    234 job_t *job_create(job_type_t type)
    235 {
    236         job_t *job = malloc(sizeof(job_t));
    237         if (job != NULL) {
    238                 job_init(job, type);
    239         }
    240 
    241         return job;
    242 }
    243 
    244 int job_add_blocking_job(job_t *job, job_t *blocking_job)
    245 {
    246         job_link_t *job_link = malloc(sizeof(job_link_t));
    247         if (job_link == NULL) {
    248                 return ENOMEM;
    249         }
    250 
    251         link_initialize(&job_link->link);
    252         list_append(&job_link->link, &job->blocking_jobs);
    253 
    254         job_link->job = blocking_job;
    255         job_add_ref(blocking_job);
    256 
    257         return EOK;
    258 }
    259 
    260 static void job_destroy(job_t **job_ptr)
    261 {
    262         job_t *job = *job_ptr;
    263         if (job == NULL) {
    264                 return;
    265         }
    266 
    267         list_foreach_safe(job->blocking_jobs, cur_link, next_link) {
    268                 job_link_t *jl = list_get_instance(cur_link, job_link_t, link);
    269                 list_remove(cur_link);
    270                 job_del_ref(&jl->job);
    271                 free(jl);
    272         }
    273         free(job);
    274 
    275         *job_ptr = NULL;
    276 }
     291void job_run(job_t *job)
     292{
     293        assert(job->state != JOB_RUNNING);
     294        assert(job->state != JOB_FINISHED);
     295
     296        unit_t *u = job->unit;
     297        sysman_log(LVL_DEBUG, "%s, %s -> %i",
     298            __func__, unit_name(u), job->target_state);
     299
     300        /* Propagate failure */
     301        if (job->blocking_job_failed) {
     302                goto fail;
     303        }
     304
     305        int rc;
     306        switch (job->target_state) {
     307        case STATE_STARTED:
     308                rc = unit_start(u);
     309                break;
     310        default:
     311                // TODO implement other states
     312                assert(false);
     313        }
     314        if (rc != EOK) {
     315                goto fail;
     316        }
     317
     318        job_check(job->unit, job);
     319        return;
     320
     321fail:
     322        job->retval = JOB_FAILED;
     323        job_finish(job);
     324}
     325
     326/** Unblocks blocked jobs and notify observers
     327 *
     328 * @param[in]  job  job with defined return value
     329 */
     330void job_finish(job_t *job)
     331{
     332        assert(job->state != JOB_FINISHED);
     333        assert(job->retval != JOB_UNDEFINED_);
     334
     335        sysman_log(LVL_DEBUG2, "%s(%s) -> %i",
     336            __func__, unit_name(job->unit), job->retval);
     337
     338        job->state = JOB_FINISHED;
     339
     340        /* Job finished */
     341        dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) {
     342                job_unblock(*job_it, job);
     343        }
     344        // TODO remove reference from blocked jobs
     345
     346        // TODO should reference be added (for the created event)
     347        sysman_raise_event(&sysman_event_job_changed, job);
     348}
     349
Note: See TracChangeset for help on using the changeset viewer.