Changeset 25a9fec in mainline


Ignore:
Timestamp:
2019-08-07T09:54:10Z (5 years ago)
Author:
Matthieu Riolo <matthieu.riolo@…>
Children:
92a7cfb1
Parents:
18377301
git-author:
Michal Koutný <xm.koutny+hos@…> (2015-11-03 21:48:45)
git-committer:
Matthieu Riolo <matthieu.riolo@…> (2019-08-07 09:54:10)
Message:

sysman: Refactor job.c into job_queue.c and job_closure.c

Location:
uspace/srv/sysman
Files:
4 added
7 edited

Legend:

Unmodified
Added
Removed
  • uspace/srv/sysman/Makefile

    r18377301 r25a9fec  
    4242        edge.c \
    4343        job.c \
     44        job_closure.c \
     45        job_queue.c \
    4446        log.c \
    4547        repo.c \
  • uspace/srv/sysman/job.c

    r18377301 r25a9fec  
    3838#include "sysman.h"
    3939
    40 static list_t job_queue;
    4140
    4241/*
     
    4443 */
    4544
    46 static int job_add_blocked_job(job_t *blocking_job, job_t *blocked_job)
    47 {
    48         assert(blocking_job->blocked_jobs.size ==
    49             blocking_job->blocked_jobs_count);
    50 
    51         int rc = dyn_array_append(&blocking_job->blocked_jobs, job_t *,
    52             blocked_job);
    53         if (rc != EOK) {
    54                 return ENOMEM;
    55         }
    56         job_add_ref(blocked_job);
    57 
    58         blocking_job->blocked_jobs_count += 1;
    59         blocked_job->blocking_jobs += 1;
    60 
    61         return EOK;
    62 }
    6345
    6446/** Remove blocking_job from blocked job structure
     
    148130}
    149131
    150 static bool job_is_runnable(job_t *job)
    151 {
    152         assert(job->state == JOB_PENDING);
    153         return job->blocking_jobs == 0;
    154 }
    155 
    156 /** Pop next runnable job
    157  *
    158  * @return runnable job or NULL when there's none
    159  */
    160 static job_t *job_queue_pop_runnable(void)
    161 {
    162         job_t *result = NULL;
    163 
    164         /* Select first runnable job */
    165         list_foreach(job_queue, job_queue, job_t, candidate) {
    166                 if (job_is_runnable(candidate)) {
    167                         result = candidate;
    168                         break;
    169                 }
    170         }
    171         if (result) {
    172                 /* Remove job from queue and pass reference to caller */
    173                 list_remove(&result->job_queue);
    174         }
    175 
    176         return result;
    177 }
    178 
    179 /** Add multiple references to job
    180  *
    181  * Non-atomicity doesn't mind as long as individual increments are atomic.
    182  *
    183  * @note Function is not exported as other modules shouldn't need it.
    184  */
    185 static inline void job_add_refs(job_t *job, size_t refs)
    186 {
    187         for (size_t i = 0; i < refs; ++i) {
    188                 job_add_ref(job);
    189         }
    190 }
    191 
    192 /** Delete multiple references to job
    193  *
    194  * Behavior of concurrent runs with job_add_refs aren't specified.
    195  */
    196 static inline void job_del_refs(job_t **job_ptr, size_t refs)
    197 {
    198         for (size_t i = 0; i < refs; ++i) {
    199                 job_del_ref(job_ptr);
    200         }
    201 }
    202 
    203 /** Merge two jobs together
    204  *
    205  * @param[in/out]  trunk  job that
    206  * @param[in]      other  job that will be cleared out
    207  *
    208  * @return EOK on success
    209  * @return error code on fail
    210  */
    211 static int job_pre_merge(job_t *trunk, job_t *other)
    212 {
    213         assert(trunk->unit == other->unit);
    214         assert(trunk->target_state == other->target_state);
    215         assert(trunk->blocked_jobs.size == trunk->blocked_jobs_count);
    216         assert(other->merged_into == NULL);
    217 
    218         int rc = dyn_array_concat(&trunk->blocked_jobs, &other->blocked_jobs);
    219         if (rc != EOK) {
    220                 return rc;
    221         }
    222         dyn_array_clear(&other->blocked_jobs);
    223 
    224         // TODO allocate observed object
    225 
    226         other->merged_into = trunk;
    227 
    228         return EOK;
    229 }
    230 
    231 static void job_finish_merge(job_t *trunk, job_t *other)
    232 {
    233         assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
    234         //TODO aggregate merged blocked_jobs
    235         trunk->blocked_jobs_count = other->blocked_jobs.size;
    236 
    237         /*
    238          * Note, the sysman_move_observers cannot fail here sice all necessary
    239          * allocation is done in job_pre_merge.
    240          */
    241         size_t observers_refs = sysman_observers_count(other);
    242         int rc = sysman_move_observers(other, trunk);
    243         assert(rc == EOK);
    244 
    245         /* When we move observers, don't forget to pass their references too. */
    246         job_add_refs(trunk, observers_refs);
    247         job_del_refs(&other, observers_refs);
    248 }
    249 
    250 static void job_undo_merge(job_t *trunk)
    251 {
    252         assert(trunk->blocked_jobs.size >= trunk->blocked_jobs_count);
    253         dyn_array_clear_range(&trunk->blocked_jobs,
    254             trunk->blocked_jobs_count, trunk->blocked_jobs.size);
    255 }
    256 
    257132/*
    258133 * Non-static functions
    259134 */
    260 
    261 void job_queue_init()
    262 {
    263         list_initialize(&job_queue);
    264 }
    265 
    266 /** Consistenly add jobs to the queue
    267  *
    268  * @param[in/out]  closure    jobs closure, on success it's emptied, otherwise
    269  *                            you should take care of remaining jobs
    270  *
    271  * @return EOK on success
    272  * @return EBUSY when any job in closure is conflicting
    273  */
    274 int job_queue_add_closure(dyn_array_t *closure)
    275 {
    276         bool has_error = false;
    277         int rc = EOK;
    278 
    279         /* Check consistency with existing jobs. */
    280         dyn_array_foreach(*closure, job_t *, job_it) {
    281                 job_t *job = *job_it;
    282                 job_t *other_job = job->unit->job;
    283 
    284                 if (other_job == NULL) {
    285                         continue;
    286                 }
    287 
    288                 if (other_job->target_state != job->target_state) {
    289                         switch (other_job->state) {
    290                         case JOB_RUNNING:
    291                                 sysman_log(LVL_ERROR,
    292                                     "Unit '%s' has already different job running.",
    293                                     unit_name(job->unit));
    294                                 has_error = true;
    295                                 continue;
    296                         case JOB_PENDING:
    297                                 /*
    298                                  * Currently we have strict strategy not
    299                                  * permitting multiple jobs for one unit in the
    300                                  * queue at a time.
    301                                  */
    302                                 sysman_log(LVL_ERROR,
    303                                     "Cannot queue multiple jobs for unit '%s'.",
    304                                     unit_name(job->unit));
    305                                 has_error = true;
    306                                 continue;
    307                         default:
    308                                 assert(false);
    309                         }
    310                 } else {
    311                         // TODO think about other options to merging
    312                         //      (replacing, cancelling)
    313                         rc = job_pre_merge(other_job, job);
    314                         if (rc != EOK) {
    315                                 break;
    316                         }
    317                 }
    318         }
    319 
    320         /* Aggregate merged jobs, or rollback any changes in existing jobs */
    321         bool finish_merge = (rc == EOK) && !has_error;
    322         dyn_array_foreach(*closure, job_t *, job_it) {
    323                 if ((*job_it)->merged_into == NULL) {
    324                         continue;
    325                 }
    326                 if (finish_merge) {
    327                         job_finish_merge((*job_it)->merged_into, *job_it);
    328                 } else {
    329                         job_undo_merge((*job_it)->merged_into);
    330                 }
    331         }
    332         if (has_error) {
    333                 return EBUSY;
    334         } else if (rc != EOK) {
    335                 return rc;
    336         }
    337 
    338         /* Unmerged jobs are enqueued, merged are disposed */
    339         dyn_array_foreach(*closure, job_t *, job_it) {
    340                 job_t *job = (*job_it);
    341                 if (job->merged_into != NULL) {
    342                         job_del_ref(&job);
    343                         continue;
    344                 }
    345 
    346 
    347                 unit_t *u = job->unit;
    348                 assert(u->job == NULL);
    349                 /* Pass reference from the closure to the unit */
    350                 u->job = job;
    351 
    352                 /* Enqueue job (new reference) */
    353                 job->state = JOB_PENDING;
    354                 job_add_ref(job);
    355                 list_append(&job->job_queue, &job_queue);
    356         }
    357 
    358         /* We've stolen references from the closure, so erase it */
    359         dyn_array_clear(closure);
    360 
    361         return EOK;
    362 }
    363 
    364 /** Process all jobs that aren't transitively blocked
    365  *
    366  * Job can be blocked either by another job or by an incoming event, that will
    367  * be queued after this job_queue_process call.
    368  *
    369  * TODO Write down rules from where this function can be called, to avoid stack
    370  *      overflow.
    371  */
    372 void job_queue_process(void)
    373 {
    374         job_t *job;
    375         while ((job = job_queue_pop_runnable())) {
    376                 job_run(job);
    377                 job_del_ref(&job);
    378         }
    379 }
    380 
    381 int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
    382 {
    383         sysman_log(LVL_DEBUG2, "%s(%s)", __func__, unit_name(main_job->unit));
    384         int rc;
    385         list_t units_fifo;
    386         list_initialize(&units_fifo);
    387 
    388         /* Check invariant */
    389         list_foreach(units, units, unit_t, u) {
    390                 assert(u->bfs_job == NULL);
    391         }
    392                
    393         unit_t *unit = main_job->unit;
    394         job_add_ref(main_job);
    395         unit->bfs_job = main_job;
    396         list_append(&unit->bfs_link, &units_fifo);
    397        
    398         while (!list_empty(&units_fifo)) {
    399                 unit = list_get_instance(list_first(&units_fifo), unit_t,
    400                     bfs_link);
    401                 list_remove(&unit->bfs_link);
    402                 job_t *job = unit->bfs_job;
    403                 assert(job != NULL);
    404 
    405                 job_add_ref(job);
    406                 dyn_array_append(job_closure, job_t *, job);
    407 
    408                 /*
    409                  * Traverse dependencies edges
    410                  * According to dependency type and edge direction create
    411                  * appropriate jobs (currently "After" only).
    412                  */
    413                 list_foreach(unit->edges_out, edges_out, unit_edge_t, e) {
    414                         unit_t *u = e->output;
    415                         job_t *blocking_job;
    416 
    417                         if (u->bfs_job == NULL) {
    418                                 blocking_job = job_create(u, job->target_state);
    419                                 if (blocking_job == NULL) {
    420                                         rc = ENOMEM;
    421                                         goto finish;
    422                                 }
    423                                 /* Pass reference to unit */
    424                                 u->bfs_job = blocking_job;
    425                                 list_append(&u->bfs_link, &units_fifo);
    426                         } else {
    427                                 blocking_job = u->bfs_job;
    428                         }
    429 
    430                         job_add_blocked_job(blocking_job, job);
    431                 }
    432         }
    433         sysman_log(LVL_DEBUG2, "%s(%s):", __func__, unit_name(main_job->unit));
    434         dyn_array_foreach(*job_closure, job_t *, job_it) {
    435                 sysman_log(LVL_DEBUG2, "%s\t%s, refs: %u", __func__,
    436                     unit_name((*job_it)->unit), atomic_get(&(*job_it)->refcnt));
    437         }
    438         rc = EOK;
    439 
    440 finish:
    441         /* Unreference any jobs in interrupted BFS queue */
    442         list_foreach_safe(units_fifo, cur_link, next_link) {
    443                 unit_t *u = list_get_instance(cur_link, unit_t, bfs_link);
    444                 job_del_ref(&u->bfs_job);
    445                 list_remove(cur_link);
    446         }
    447        
    448         /* Clean after ourselves (BFS tag jobs) */
    449         dyn_array_foreach(*job_closure, job_t *, job_it) {
    450                 assert(*job_it == (*job_it)->unit->bfs_job);
    451                 job_del_ref(&(*job_it)->unit->bfs_job);
    452                 (*job_it)->unit->bfs_job = NULL;
    453         }
    454 
    455         return rc;
    456 }
    457135
    458136/** Create job assigned to the unit
  • uspace/srv/sysman/job.h

    r18377301 r25a9fec  
    8181};
    8282
    83 extern void job_queue_init(void);
    84 extern int job_queue_add_closure(dyn_array_t *);
    85 extern void job_queue_process(void);
    86 
    87 extern int job_create_closure(job_t *, dyn_array_t *);
    8883extern job_t *job_create(unit_t *, unit_state_t);
    8984
  • uspace/srv/sysman/main.c

    r18377301 r25a9fec  
    4141#include "connection_ctl.h"
    4242#include "edge.h"
    43 #include "job.h"
     43#include "job_queue.h"
    4444#include "log.h"
    4545#include "sysman.h"
  • uspace/srv/sysman/sysman.c

    r18377301 r25a9fec  
    3333#include <stdlib.h>
    3434
     35#include "job_closure.h"
     36#include "job_queue.h"
    3537#include "log.h"
    3638#include "sysman.h"
  • uspace/srv/sysman/test/job_closure.c

    r18377301 r25a9fec  
    3131#include <stdio.h>
    3232
    33 #include "../job.h"
     33#include "../job_closure.h"
    3434
    3535#include "mock_unit.h"
  • uspace/srv/sysman/test/job_queue.c

    r18377301 r25a9fec  
    3333
    3434#include "mock_unit.h"
    35 #include "../job.h"
     35#include "../job_queue.h"
    3636#include "../sysman.h"
    3737
Note: See TracChangeset for help on using the changeset viewer.