Changeset 3f7e1f24 in mainline


Ignore:
Timestamp:
2019-08-03T08:28:26Z (5 years ago)
Author:
Matthieu Riolo <matthieu.riolo@…>
Children:
095d03c
Parents:
d7c5fc0
git-author:
Michal Koutný <xm.koutny+hos@…> (2015-04-22 17:54:08)
git-committer:
Matthieu Riolo <matthieu.riolo@…> (2019-08-03 08:28:26)
Message:

sysman: Refactored job manipulation (event loop + one main fibril)

Location:
uspace
Files:
12 edited

Legend:

Unmodified
Added
Removed
  • uspace/Makefile

    rd7c5fc0 r3f7e1f24  
    118118        srv/net/udp \
    119119        srv/ns \
     120        srv/taskmon \
    120121        srv/sysman \
    121         srv/taskmon \
    122122        srv/vfs \
    123123        srv/bd/sata_bd \
  • uspace/srv/sysman/configuration.c

    rd7c5fc0 r3f7e1f24  
    3939
    4040static hash_table_t units;
    41 static fibril_rwlock_t units_rwl;
    4241
    4342/* Hash table functions */
     
    8180{
    8281        hash_table_create(&units, 0, 0, &units_ht_ops);
    83         fibril_rwlock_initialize(&units_rwl);
    8482}
    8583
     
    8987        assert(unit->state == STATE_EMBRYO);
    9088        assert(unit->name != NULL);
    91         assert(fibril_rwlock_is_write_locked(&units_rwl));
    9289        sysman_log(LVL_DEBUG2, "%s('%s')", __func__, unit_name(unit));
    9390
     
    10097
    10198void configuration_start_update(void) {
    102         assert(!fibril_rwlock_is_write_locked(&units_rwl));
    103         sysman_log(LVL_DEBUG2, "%s", __func__);
    104         fibril_rwlock_write_lock(&units_rwl);
     99        sysman_log(LVL_DEBUG2, "%s", __func__);
    105100}
    106101
     
    124119void configuration_commit(void)
    125120{
    126         assert(fibril_rwlock_is_write_locked(&units_rwl));
    127121        sysman_log(LVL_DEBUG2, "%s", __func__);
    128122
     
    132126         */
    133127        hash_table_apply(&units, &configuration_commit_unit, NULL);
    134         fibril_rwlock_write_unlock(&units_rwl);
    135128}
    136129
     
    157150void configuration_rollback(void)
    158151{
    159         assert(fibril_rwlock_is_write_locked(&units_rwl));
    160152        sysman_log(LVL_DEBUG2, "%s", __func__);
    161153
    162154        hash_table_apply(&units, &configuration_rollback_unit, NULL);
    163         fibril_rwlock_write_unlock(&units_rwl);
    164155}
    165156
     
    199190int configuration_resolve_dependecies(void)
    200191{
    201         assert(fibril_rwlock_is_write_locked(&units_rwl));
    202192        sysman_log(LVL_DEBUG2, "%s", __func__);
    203193
  • uspace/srv/sysman/job.c

    rd7c5fc0 r3f7e1f24  
    2727 */
    2828
    29 #include <adt/list.h>
     29#include <adt/fifo.h>
    3030#include <assert.h>
    3131#include <errno.h>
    32 #include <fibril.h>
    33 #include <fibril_synch.h>
    34 #include <stdio.h>
    35 #include <stdlib.h>
    36 
     32
     33#include "dep.h"
    3734#include "job.h"
    3835#include "log.h"
    39 #include "unit.h"
     36#include "sysman.h"
    4037
    4138static list_t job_queue;
    42 static fibril_mutex_t job_queue_mtx;
    43 static fibril_condvar_t job_queue_cv;
    44 
    45 static void job_destroy(job_t **);
    46 
    47 
    48 static int job_run_start(job_t *job)
    49 {
    50         sysman_log(LVL_DEBUG, "%s(%p)", __func__, job);
    51         unit_t *unit = job->unit;
    52 
    53         int rc = unit_start(unit);
     39
     40/*
     41 * Static functions
     42 */
     43
     44static int job_add_blocked_job(job_t *job, job_t *blocked_job)
     45{
     46        int rc = dyn_array_append(&job->blocked_jobs, job_ptr_t, blocked_job);
    5447        if (rc != EOK) {
    55                 return rc;
    56         }
    57 
    58         fibril_mutex_lock(&unit->state_mtx);
    59         while (unit->state != STATE_STARTED) {
    60                 fibril_condvar_wait(&unit->state_cv, &unit->state_mtx);
    61         }
    62         fibril_mutex_unlock(&unit->state_mtx);
    63 
    64         // TODO react to failed state
     48                return ENOMEM;
     49        }
     50
     51        job_add_ref(blocked_job);
     52        blocked_job->blocking_jobs += 1;
     53
    6554        return EOK;
    6655}
    6756
    68 static int job_runner(void *arg)
    69 {
    70         job_t *job = (job_t *)arg;
    71 
    72         int retval = EOK;
    73 
    74         /* Wait for previous jobs */
    75         list_foreach(job->blocking_jobs, link, job_link_t, jl) {
    76                 retval = job_wait(jl->job);
    77                 if (retval != EOK) {
    78                         break;
    79                 }
    80         }
    81 
    82         if (retval != EOK) {
    83                 goto finish;
    84         }
    85 
    86         /* Run the job itself */
    87         fibril_mutex_lock(&job->state_mtx);
    88         job->state = JOB_RUNNING;
    89         fibril_condvar_broadcast(&job->state_cv);
    90         fibril_mutex_unlock(&job->state_mtx);
    91 
    92         switch (job->type) {
    93         case JOB_START:
    94                 retval = job_run_start(job);
    95                 break;
    96         default:
    97                 assert(false);
    98         }
    99 
    100 
    101 finish:
    102         fibril_mutex_lock(&job->state_mtx);
    103         job->state = JOB_FINISHED;
    104         job->retval = retval;
    105         fibril_condvar_broadcast(&job->state_cv);
    106         fibril_mutex_unlock(&job->state_mtx);
    107 
    108         job_del_ref(&job);
    109 
    110         return EOK;
    111 }
    112 
    113 static int job_dispatcher(void *arg)
    114 {
    115         fibril_mutex_lock(&job_queue_mtx);
    116         while (1) {
    117                 while (list_empty(&job_queue)) {
    118                         fibril_condvar_wait(&job_queue_cv, &job_queue_mtx);
    119                 }
    120                
    121                 link_t *link = list_first(&job_queue);
    122                 assert(link);
    123                 list_remove(link);
    124 
    125                 /*
    126                  * Note that possible use of fibril pool must hold invariant
    127                  * that job is started asynchronously. In the case there exists
    128                  * circular dependency between jobs, it may result in a deadlock.
    129                  */
    130                 job_t *job = list_get_instance(link, job_t, link);
    131                 fid_t runner_fibril = fibril_create(job_runner, job);
    132                 fibril_add_ready(runner_fibril);
    133         }
    134 
    135         fibril_mutex_unlock(&job_queue_mtx);
    136         return EOK;
    137 }
     57static void job_init(job_t *job, unit_t *u, unit_state_t target_state)
     58{
     59        assert(job);
     60        assert(u);
     61
     62        link_initialize(&job->job_queue);
     63
     64        /* Start with one reference for the creator */
     65        atomic_set(&job->refcnt, 1);
     66
     67        job->target_state = target_state;
     68        job->unit = u;
     69
     70        dyn_array_initialize(&job->blocked_jobs, job_ptr_t, 0);
     71        job->blocking_jobs = 0;
     72        job->blocking_job_failed = false;
     73
     74        job->state = JOB_UNQUEUED;
     75        job->retval = JOB_UNDEFINED_;
     76}
     77
     78static bool job_eval_retval(job_t *job)
     79{
     80        unit_t *u = job->unit;
     81        if (u->state == job->target_state) {
     82                job->retval = JOB_OK;
     83                return true;
     84        } else if (u->state == STATE_FAILED) {
     85                job->retval = JOB_FAILED;
     86                return true;
     87        } else {
     88                return false;
     89        }
     90}
     91
     92static bool job_is_runnable(job_t *job)
     93{
     94        return job->state == JOB_QUEUED && job->blocking_jobs == 0;
     95}
     96
     97static void job_check(void *object, void *data)
     98{
     99        unit_t *u = object;
     100        job_t *job = data;
     101
     102        if (job_eval_retval(job)) {
     103                job_finish(job);
     104        } else {
     105                // TODO place for timeout
     106                // TODO add reference to job?
     107                sysman_object_observer(u, &job_check, job);
     108        }
     109}
     110
     111
     112static void job_unblock(job_t *blocked_job, job_t *blocking_job)
     113{
     114        if (blocking_job->retval == JOB_FAILED) {
     115                blocked_job->blocking_job_failed = true;
     116        }
     117        blocked_job->blocking_jobs -= 1;
     118}
     119
     120static void job_destroy(job_t **job_ptr)
     121{
     122        job_t *job = *job_ptr;
     123        if (job == NULL) {
     124                return;
     125        }
     126
     127        assert(!link_used(&job->job_queue));
     128        dyn_array_destroy(&job->blocked_jobs);
     129        // TODO I should decrease referece of blocked jobs
     130
     131        free(job);
     132        *job_ptr = NULL;
     133}
     134
     135/*
     136 * Non-static functions
     137 */
    138138
    139139void job_queue_init()
    140140{
    141141        list_initialize(&job_queue);
    142         fibril_mutex_initialize(&job_queue_mtx);
    143         fibril_condvar_initialize(&job_queue_cv);
    144 
    145         fid_t dispatcher_fibril = fibril_create(job_dispatcher, NULL);
    146         fibril_add_ready(dispatcher_fibril);
    147 }
    148 
    149 int job_queue_jobs(list_t *jobs)
    150 {
    151         fibril_mutex_lock(&job_queue_mtx);
    152 
     142}
     143
     144int job_queue_add_jobs(dyn_array_t *jobs)
     145{
    153146        /* Check consistency with queue. */
    154         list_foreach(*jobs, link, job_t, new_job) {
    155                 list_foreach(job_queue, link, job_t, queued_job) {
     147        dyn_array_foreach(*jobs, job_ptr_t, new_job_it) {
     148                list_foreach(job_queue, job_queue, job_t, queued_job) {
    156149                        /*
    157150                         * Currently we have strict strategy not permitting
    158151                         * multiple jobs for one unit in the queue.
    159152                         */
    160                         if (new_job->unit == queued_job->unit) {
     153                        if ((*new_job_it)->unit == queued_job->unit) {
     154                                sysman_log(LVL_ERROR,
     155                                    "Cannot queue multiple jobs foor unit '%s'",
     156                                    unit_name((*new_job_it)->unit));
    161157                                return EEXIST;
    162158                        }
     
    165161
    166162        /* Enqueue jobs */
    167         list_foreach_safe(*jobs, cur_link, next_link) {
    168                 list_remove(cur_link);
    169                 list_append(cur_link, &job_queue);
    170         }
    171 
    172         /* Only job dispatcher waits, it's correct to notify one only. */
    173         fibril_condvar_signal(&job_queue_cv);
    174         fibril_mutex_unlock(&job_queue_mtx);
     163        dyn_array_foreach(*jobs, job_ptr_t, job_it) {
     164                (*job_it)->state = JOB_QUEUED;
     165                list_append(&(*job_it)->job_queue, &job_queue);
     166                // TODO explain this reference
     167                job_add_ref(*job_it);
     168        }
    175169
    176170        return EOK;
    177171}
    178172
    179 /** Blocking wait for job finishing.
    180  *
    181  * Multiple fibrils may wait for the same job.
    182  *
    183  * @return    Return code of the job
    184  */
    185 int job_wait(job_t *job)
    186 {
    187         fibril_mutex_lock(&job->state_mtx);
    188         while (job->state != JOB_FINISHED) {
    189                 fibril_condvar_wait(&job->state_cv, &job->state_mtx);
    190         }
    191 
    192         int rc = job->retval;
    193         fibril_mutex_unlock(&job->state_mtx);
    194 
     173/** Pop next runnable job
     174 *
     175 * @return runnable job or NULL when there's none
     176 */
     177job_t *job_queue_pop_runnable(void)
     178{
     179        job_t *result = NULL;
     180        link_t *first_link = list_first(&job_queue);
     181        bool first_iteration = true;
     182
     183        list_foreach_safe(job_queue, cur_link, next_link) {
     184                result = list_get_instance(cur_link, job_t, job_queue);
     185                if (job_is_runnable(result)) {
     186                        break;
     187                } else if (!first_iteration && cur_link == first_link) {
     188                        result = NULL;
     189                        break;
     190                } else {
     191                        /*
     192                         * We make no assuptions about ordering of jobs in the
     193                         * queue, so just move the job to the end of the queue.
     194                         * If there are exist topologic ordering, eventually
     195                         * jobs will be reordered. Furthermore when if there
     196                         * exists any runnable job, it's always found.
     197                         */
     198                        list_remove(cur_link);
     199                        list_append(cur_link, &job_queue);
     200                }
     201                first_iteration = false;
     202        }
     203
     204        if (result) {
     205                // TODO update refcount
     206                list_remove(&result->job_queue);
     207                result->state = JOB_DEQUEUED;
     208        }
     209
     210        return result;
     211}
     212
     213int job_create_closure(job_t *main_job, dyn_array_t *job_closure)
     214{
     215        // TODO replace hard-coded FIFO size with resizable FIFO
     216        FIFO_INITIALIZE_DYNAMIC(jobs_fifo, job_ptr_t, 10);
     217        void *fifo_data = fifo_create(jobs_fifo);
     218        int rc;
     219        if (fifo_data == NULL) {
     220                rc = ENOMEM;
     221                goto finish;
     222        }
     223
     224        /*
     225         * Traverse dependency graph in BFS fashion and create jobs for every
     226         * necessary unit.
     227         */
     228        fifo_push(jobs_fifo, main_job);
     229        job_t *job;
     230        while ((job = fifo_pop(jobs_fifo)) != NULL) {
     231                /*
     232                 * Do not increase reference count of job, as we're passing it
     233                 * to the closure.
     234                 */
     235                dyn_array_append(job_closure, job_ptr_t, job);
     236
     237                /* Traverse dependencies edges */
     238                unit_t *u = job->unit;
     239                list_foreach(u->dependencies, dependencies, unit_dependency_t, dep) {
     240                        // TODO prepare for reverse edge direction and
     241                        //      non-identity state mapping
     242                        job_t *new_job =
     243                            job_create(dep->dependency, job->target_state);
     244                        if (new_job == NULL) {
     245                                rc = ENOMEM;
     246                                goto finish;
     247                        }
     248                        job_add_blocked_job(new_job, job);
     249                        fifo_push(jobs_fifo, new_job);
     250                }
     251        }
     252        rc = EOK;
     253
     254finish:
     255        free(fifo_data);
     256        /*
     257         * Newly created jobs are already passed to the closure, thus not
     258         * deleting them here.
     259         */
    195260        return rc;
     261}
     262
     263job_t *job_create(unit_t *u, unit_state_t target_state)
     264{
     265        job_t *job = malloc(sizeof(job_t));
     266        if (job != NULL) {
     267                job_init(job, u, target_state);
     268        }
     269
     270        return job;
    196271}
    197272
     
    214289}
    215290
    216 static void job_init(job_t *job, job_type_t type)
    217 {
    218         assert(job);
    219 
    220         link_initialize(&job->link);
    221         list_initialize(&job->blocking_jobs);
    222 
    223         /* Start with one reference for the creator */
    224         atomic_set(&job->refcnt, 1);
    225 
    226         job->type = type;
    227         job->unit = NULL;
    228 
    229         job->state = JOB_WAITING;
    230         fibril_mutex_initialize(&job->state_mtx);
    231         fibril_condvar_initialize(&job->state_cv);
    232 }
    233 
    234 job_t *job_create(job_type_t type)
    235 {
    236         job_t *job = malloc(sizeof(job_t));
    237         if (job != NULL) {
    238                 job_init(job, type);
    239         }
    240 
    241         return job;
    242 }
    243 
    244 int job_add_blocking_job(job_t *job, job_t *blocking_job)
    245 {
    246         job_link_t *job_link = malloc(sizeof(job_link_t));
    247         if (job_link == NULL) {
    248                 return ENOMEM;
    249         }
    250 
    251         link_initialize(&job_link->link);
    252         list_append(&job_link->link, &job->blocking_jobs);
    253 
    254         job_link->job = blocking_job;
    255         job_add_ref(blocking_job);
    256 
    257         return EOK;
    258 }
    259 
    260 static void job_destroy(job_t **job_ptr)
    261 {
    262         job_t *job = *job_ptr;
    263         if (job == NULL) {
    264                 return;
    265         }
    266 
    267         list_foreach_safe(job->blocking_jobs, cur_link, next_link) {
    268                 job_link_t *jl = list_get_instance(cur_link, job_link_t, link);
    269                 list_remove(cur_link);
    270                 job_del_ref(&jl->job);
    271                 free(jl);
    272         }
    273         free(job);
    274 
    275         *job_ptr = NULL;
    276 }
     291void job_run(job_t *job)
     292{
     293        assert(job->state != JOB_RUNNING);
     294        assert(job->state != JOB_FINISHED);
     295
     296        unit_t *u = job->unit;
     297        sysman_log(LVL_DEBUG, "%s, %s -> %i",
     298            __func__, unit_name(u), job->target_state);
     299
     300        /* Propagate failure */
     301        if (job->blocking_job_failed) {
     302                goto fail;
     303        }
     304
     305        int rc;
     306        switch (job->target_state) {
     307        case STATE_STARTED:
     308                rc = unit_start(u);
     309                break;
     310        default:
     311                // TODO implement other states
     312                assert(false);
     313        }
     314        if (rc != EOK) {
     315                goto fail;
     316        }
     317
     318        job_check(job->unit, job);
     319        return;
     320
     321fail:
     322        job->retval = JOB_FAILED;
     323        job_finish(job);
     324}
     325
     326/** Unblocks blocked jobs and notify observers
     327 *
     328 * @param[in]  job  job with defined return value
     329 */
     330void job_finish(job_t *job)
     331{
     332        assert(job->state != JOB_FINISHED);
     333        assert(job->retval != JOB_UNDEFINED_);
     334
     335        sysman_log(LVL_DEBUG2, "%s(%s) -> %i",
     336            __func__, unit_name(job->unit), job->retval);
     337
     338        job->state = JOB_FINISHED;
     339
     340        /* Job finished */
     341        dyn_array_foreach(job->blocked_jobs, job_ptr_t, job_it) {
     342                job_unblock(*job_it, job);
     343        }
     344        // TODO remove reference from blocked jobs
     345
     346        // TODO should reference be added (for the created event)
     347        sysman_raise_event(&sysman_event_job_changed, job);
     348}
     349
  • uspace/srv/sysman/job.h

    rd7c5fc0 r3f7e1f24  
    3030#define SYSMAN_JOB_H
    3131
     32#include <adt/dyn_array.h>
    3233#include <adt/list.h>
    3334#include <atomic.h>
     35#include <stdbool.h>
    3436
    3537#include "unit.h"
    3638
    37 struct job;
    38 typedef struct job job_t;
    39 
     39/** Run state of job */
    4040typedef enum {
    41         JOB_START
    42 } job_type_t;
    43 
    44 typedef enum {
    45         JOB_WAITING,
     41        JOB_UNQUEUED, /**< Job not in queue yet */
     42        JOB_QUEUED,
     43        JOB_DEQUEUED, /**< Job not in queue already */
    4644        JOB_RUNNING,
    4745        JOB_FINISHED
    4846} job_state_t;
    4947
     48/** Return value of job */
     49typedef enum {
     50        JOB_OK,
     51        JOB_FAILED,
     52        JOB_UNDEFINED_ = -1
     53} job_retval_t;
     54
    5055typedef struct {
    51         link_t link;
    52         job_t *job;
    53 } job_link_t;
    54 
    55 /** Job represents pending or running operation on unit */
    56 struct job {
    57         /** Link to queue job is in */
    58         link_t link;
    59 
    60         /** List of jobs (job_link_t ) that are blocking the job. */
    61         list_t blocking_jobs;
    62 
    63         /** Reference counter for the job structure. */
     56        link_t job_queue;
    6457        atomic_t refcnt;
    6558
    66         job_type_t type;
     59        unit_state_t target_state;
    6760        unit_t *unit;
    6861
     62        /** Jobs that this job is preventing from running */
     63        dyn_array_t blocked_jobs;
     64        /** No. of jobs that must finish before this job */
     65        size_t blocking_jobs;
     66        /** Any of blocking jobs failed */
     67        bool blocking_job_failed;
     68
     69        /** See job_state_t */
    6970        job_state_t state;
    70         fibril_mutex_t state_mtx;
    71         fibril_condvar_t state_cv;
     71        /** See job_retval_t */
     72        job_retval_t retval;
     73} job_t;
    7274
    73         /** Return value of the job, defined only when state == JOB_FINISHED */
    74         int retval;
    75 };
     75typedef job_t *job_ptr_t;
    7676
    7777extern void job_queue_init(void);
    78 extern int job_queue_jobs(list_t *);
     78extern int job_queue_add_jobs(dyn_array_t *);
     79extern job_t *job_queue_pop_runnable(void);
    7980
    80 extern int job_wait(job_t *);
     81extern int job_create_closure(job_t *, dyn_array_t *);
     82extern job_t *job_create(unit_t *, unit_state_t);
    8183
    8284extern void job_add_ref(job_t *);
    8385extern void job_del_ref(job_t **);
    8486
    85 extern job_t *job_create(job_type_t type);
    86 extern int job_add_blocking_job(job_t *, job_t *);
    8787
     88extern void job_run(job_t *);
     89extern void job_finish(job_t *);
    8890#endif
  • uspace/srv/sysman/main.c

    rd7c5fc0 r3f7e1f24  
    3737#include "dep.h"
    3838#include "job.h"
     39#include "log.h"
    3940#include "sysman.h"
    4041#include "unit.h"
     
    4748}
    4849
    49 static int sysman_entry_point(void *arg) {
    50         /*
    51          * Build hard coded configuration.
    52          *
    53          * Strings are allocated on heap, so that they can be free'd by an
    54          * owning unit.
    55          */
     50/** Build hard coded configuration */
     51static job_t *create_entry_configuration(void) {
    5652        int result = EOK;
    5753        unit_t *mnt_initrd = NULL;
     
    107103        configuration_commit();
    108104
    109         result = sysman_unit_start(tgt_default);
    110 
    111         return result;
     105        job_t *first_job = job_create(tgt_default, STATE_STARTED);
     106        if (first_job == NULL) {
     107                goto fail;
     108        }
     109        return first_job;
    112110
    113111fail:
     112        // TODO cannot destroy units after they're added to configuration
    114113        unit_destroy(&tgt_default);
    115114        unit_destroy(&cfg_init);
    116115        unit_destroy(&mnt_initrd);
    117         return result;
     116        return NULL;
     117}
     118
     119static void first_job_handler(void *object, void *unused)
     120{
     121        job_t *job = object;
     122        sysman_log(LVL_DEBUG, "First job retval: %i.", job->retval);
     123        job_del_ref(&job);
    118124}
    119125
     
    122128        printf(NAME ": HelenOS system daemon\n");
    123129
     130        /*
     131         * Initialize global structures
     132         */
    124133        configuration_init();
     134        sysman_events_init();
    125135        job_queue_init();
    126136
    127137        /*
    128          * Create and start initial configuration asynchronously
    129          * so that we can start server's fibril that may be used
    130          * when executing the start.
     138         * Create initial configuration while we are in a single fibril, keep
     139         * the job and run it when event loop is running.
    131140         */
    132         fid_t entry_fibril = fibril_create(sysman_entry_point, NULL);
    133         fibril_add_ready(entry_fibril);
     141        job_t *first_job = create_entry_configuration();
    134142
    135         /* Prepare and start sysman server */
     143        /*
     144         * Event loop runs in separate fibril, all consequent access to global
     145         * structure is made from this fibril only.
     146         */
     147        fid_t event_loop_fibril = fibril_create(sysman_events_loop, NULL);
     148        fibril_add_ready(event_loop_fibril);
     149
     150        /* Queue first job for processing */
     151        sysman_object_observer(first_job, &first_job_handler, NULL);
     152        sysman_raise_event(&sysman_event_job_process, first_job);
     153
     154        /* Start sysman server */
    136155        async_set_client_connection(sysman_connection);
    137156
     
    139158        async_manager();
    140159
     160        /* not reached */
    141161        return 0;
    142162}
  • uspace/srv/sysman/sysman.c

    rd7c5fc0 r3f7e1f24  
    2727 */
    2828
     29#include <adt/hash_table.h>
    2930#include <adt/list.h>
    3031#include <errno.h>
    31 
    32 #include "dep.h"
    33 #include "job.h"
     32#include <fibril_synch.h>
     33#include <stdlib.h>
     34
     35#include "log.h"
    3436#include "sysman.h"
    3537
    36 /** Create jobs for cluser of given unit.
    37  *
    38  * @note Using recursion, limits "depth" of dependency graph.
    39  */
    40 static int sysman_create_closure_jobs(unit_t *unit, job_t **entry_job_ptr,
    41     list_t *accumulator, job_type_t type)
    42 {
    43         int rc = EOK;
    44         job_t *job = job_create(type);
    45         if (job == NULL) {
     38
     39/* Do not expose this generally named type */
     40typedef struct {
     41        link_t event_queue;
     42
     43        event_handler_t handler;
     44        void *data;
     45} event_t;
     46
     47typedef struct {
     48        link_t callbacks;
     49
     50        callback_handler_t handler;
     51        void *data;
     52} obj_callback_t;
     53
     54typedef struct {
     55        ht_link_t ht_link;
     56
     57        void *object;
     58        list_t callbacks;
     59} observed_object_t;
     60
     61static LIST_INITIALIZE(event_queue);
     62static fibril_mutex_t event_queue_mtx;
     63static fibril_condvar_t event_queue_cv;
     64
     65static hash_table_t observed_objects;
     66static fibril_mutex_t observed_objects_mtx;
     67static fibril_condvar_t observed_objects_cv;
     68
     69/* Hash table functions */
     70static size_t observed_objects_ht_hash(const ht_link_t *item)
     71{
     72        observed_object_t *callbacks =
     73            hash_table_get_inst(item, observed_object_t, ht_link);
     74
     75        return (size_t) callbacks->object;
     76}
     77
     78static size_t observed_objects_ht_key_hash(void *key)
     79{
     80        void *object = *(void **) key;
     81        return (size_t) object;
     82}
     83
     84static bool observed_objects_ht_key_equal(void *key, const ht_link_t *item)
     85{
     86        void *object = *(void **)key;
     87        return (
     88            hash_table_get_inst(item, observed_object_t, ht_link)->object ==
     89            object);
     90}
     91
     92static hash_table_ops_t observed_objects_ht_ops = {
     93        .hash            = &observed_objects_ht_hash,
     94        .key_hash        = &observed_objects_ht_key_hash,
     95        .equal           = NULL,
     96        .key_equal       = &observed_objects_ht_key_equal,
     97        .remove_callback = NULL
     98};
     99
     100static void notify_observers(void *object)
     101{
     102        ht_link_t *item = hash_table_find(&observed_objects, &object);
     103        if (item == NULL) {
     104                return;
     105        }
     106        observed_object_t *observed_object =
     107            hash_table_get_inst(item, observed_object_t, ht_link);
     108
     109        list_foreach_safe(observed_object->callbacks, cur_link, next_link) {
     110                obj_callback_t *callback =
     111                    list_get_instance(cur_link, obj_callback_t, callbacks);
     112                callback->handler(object, callback->data);
     113                list_remove(cur_link);
     114                free(callback);
     115        }
     116}
     117
     118/*
     119 * Non-static functions
     120 */
     121void sysman_events_init(void)
     122{
     123        fibril_mutex_initialize(&event_queue_mtx);
     124        fibril_condvar_initialize(&event_queue_cv);
     125
     126        bool table =
     127            hash_table_create(&observed_objects, 0, 0, &observed_objects_ht_ops);
     128        if (!table) {
     129                sysman_log(LVL_FATAL, "%s: Failed initialization", __func__);
     130                abort();
     131        }
     132        fibril_mutex_initialize(&observed_objects_mtx);
     133        fibril_condvar_initialize(&observed_objects_cv);
     134}
     135
     136int sysman_events_loop(void *unused)
     137{
     138        while (1) {
     139                /* Pop event */
     140                fibril_mutex_lock(&event_queue_mtx);
     141                while (list_empty(&event_queue)) {
     142                        fibril_condvar_wait(&event_queue_cv, &event_queue_mtx);
     143                }
     144
     145                link_t *li_event = list_first(&event_queue);
     146                list_remove(li_event);
     147                event_t *event =
     148                    list_get_instance(li_event, event_t, event_queue);
     149                fibril_mutex_unlock(&event_queue_mtx);
     150
     151                /* Process event */
     152                event->handler(event->data);
     153                free(event);
     154        }
     155}
     156
     157void sysman_raise_event(event_handler_t handler, void *data)
     158{
     159        event_t *event = malloc(sizeof(event_t));
     160        if (event == NULL) {
     161                sysman_log(LVL_FATAL, "%s: cannot allocate event", __func__);
     162                // TODO think about aborting system critical task
     163                abort();
     164        }
     165        link_initialize(&event->event_queue);
     166        event->handler = handler;
     167        event->data = data;
     168
     169        fibril_mutex_lock(&event_queue_mtx);
     170        list_append(&event->event_queue, &event_queue);
     171        /* There's only single event loop, broadcast is unnecessary */
     172        fibril_condvar_signal(&event_queue_cv);
     173        fibril_mutex_unlock(&event_queue_mtx);
     174}
     175
     176/** Register single-use object observer callback
     177 *
     178 * TODO no one handles return value, it's quite fatal to lack memory for
     179 *      callbacks...  @return EOK on success
     180 * @return ENOMEM
     181 */
     182int sysman_object_observer(void *object, callback_handler_t handler, void *data)
     183{
     184        int rc;
     185        observed_object_t *observed_object = NULL;
     186        observed_object_t *new_observed_object = NULL;
     187        ht_link_t *ht_link = hash_table_find(&observed_objects, &object);
     188
     189        if (ht_link == NULL) {
     190                observed_object = malloc(sizeof(observed_object_t));
     191                if (observed_object == NULL) {
     192                        rc = ENOMEM;
     193                        goto fail;
     194                }
     195                new_observed_object = observed_object;
     196
     197                observed_object->object = object;
     198                list_initialize(&observed_object->callbacks);
     199                hash_table_insert(&observed_objects, &observed_object->ht_link);
     200        } else {
     201                observed_object =
     202                    hash_table_get_inst(ht_link, observed_object_t, ht_link);
     203        }
     204
     205        obj_callback_t *obj_callback = malloc(sizeof(obj_callback_t));
     206        if (obj_callback == NULL) {
    46207                rc = ENOMEM;
    47208                goto fail;
    48209        }
    49210
    50         job->unit = unit;
    51 
    52         list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) {
    53                 job_t *blocking_job = NULL;
    54                 rc = sysman_create_closure_jobs(dep->dependency, &blocking_job,
    55                     accumulator, type);
    56                 if (rc != EOK) {
    57                         goto fail;
    58                 }
    59                
    60                 rc = job_add_blocking_job(job, blocking_job);
    61                 if (rc != EOK) {
    62                         goto fail;
    63                 }
    64         }
    65 
    66         /* Job is passed to the accumulator, i.e. no add_ref. */
    67         list_append(&job->link, accumulator);
    68 
    69         if (entry_job_ptr != NULL) {
    70                 *entry_job_ptr = job;
    71         }
     211        obj_callback->handler = handler;
     212        obj_callback->data = data;
     213        list_append(&obj_callback->callbacks, &observed_object->callbacks);
    72214        return EOK;
    73215
    74216fail:
    75         job_del_ref(&job);
     217        free(new_observed_object);
    76218        return rc;
    77219}
    78220
    79 int sysman_unit_start(unit_t *unit)
    80 {
    81         list_t new_jobs;
    82         list_initialize(&new_jobs);
    83 
    84         job_t *job = NULL;
    85         // TODO shouldn't be here read-lock on configuration?
    86         int rc = sysman_create_closure_jobs(unit, &job, &new_jobs, JOB_START);
     221/*
     222 * Event handlers
     223 */
     224
     225// NOTE must run in main event loop fibril
     226void sysman_event_job_process(void *arg)
     227{
     228        job_t *job = arg;
     229        dyn_array_t job_closure;
     230        dyn_array_initialize(&job_closure, job_ptr_t, 0);
     231
     232        int rc = job_create_closure(job, &job_closure);
    87233        if (rc != EOK) {
    88                 return rc;
    89         }
    90 
    91         // TODO handle errors when adding job accumulator
    92         job_queue_jobs(&new_jobs);
    93 
    94         return job_wait(job);
    95 }
     234                sysman_log(LVL_ERROR, "Cannot create closure for job %p (%i)",
     235                    job, rc);
     236                goto fail;
     237        }
     238
     239        rc = job_queue_add_jobs(&job_closure);
     240        if (rc != EOK) {
     241                // TODO job_queue_add_jobs should log message
     242                goto fail;
     243        }
     244
     245        // TODO explain why calling asynchronously
     246        sysman_raise_event(&sysman_event_job_queue_run, NULL);
     247        return;
     248
     249fail:
     250        job->retval = JOB_FAILED;
     251        job_finish(job);
     252        // TODO clarify refcount to the main job
     253        dyn_array_foreach(job_closure, job_ptr_t, closure_job) {
     254                job_del_ref(&(*closure_job));
     255        }
     256        dyn_array_destroy(&job_closure);
     257}
     258
     259
     260void sysman_event_job_queue_run(void *unused)
     261{
     262        job_t *job;
     263        while ((job = job_queue_pop_runnable())) {
     264                job_run(job);
     265        }
     266}
     267
     268void sysman_event_job_changed(void *object)
     269{
     270        notify_observers(object);
     271}
  • uspace/srv/sysman/sysman.h

    rd7c5fc0 r3f7e1f24  
    3030#define SYSMAN_SYSMAN_H
    3131
     32#include "job.h"
    3233#include "unit.h"
    3334
    34 extern int sysman_unit_start(unit_t *);
     35typedef void (*event_handler_t)(void *);
     36typedef void (*callback_handler_t)(void *object, void *data);
     37
     38extern void sysman_events_init(void);
     39
     40extern int sysman_events_loop(void *);
     41
     42extern void sysman_raise_event(event_handler_t, void *);
     43
     44extern int sysman_object_observer(void *, callback_handler_t, void *);
     45
     46
     47extern void sysman_event_job_process(void *);
     48extern void sysman_event_job_queue_run(void *);
     49extern void sysman_event_job_changed(void *);
    3550
    3651#endif
  • uspace/srv/sysman/unit.c

    rd7c5fc0 r3f7e1f24  
    6868
    6969        unit->state = STATE_EMBRYO;
    70         fibril_mutex_initialize(&unit->state_mtx);
    71         fibril_condvar_initialize(&unit->state_cv);
    7270
    7371        list_initialize(&unit->dependants);
     
    105103}
    106104
    107 void unit_set_state(unit_t *unit, unit_state_t state)
    108 {
    109         fibril_mutex_lock(&unit->state_mtx);
    110         unit->state = state;
    111         fibril_condvar_broadcast(&unit->state_cv);
    112         fibril_mutex_unlock(&unit->state_mtx);
    113 }
    114105
    115106/** Issue request to restarter to start a unit
    116107 *
    117  * Return from this function only means start request was issued.
    118  * If you need to wait for real start of the unit, use waiting on state_cv.
     108 * Ideally this function is non-blocking synchronous, however, some units
     109 * cannot be started synchronously and thus return from this function generally
     110 * means that start was requested.
     111 *
     112 * Check state of the unit for actual result, start method can end in states:
     113 *   - STATE_STARTED, (succesful synchronous start)
     114 *   - STATE_STARTING, (succesful asynchronous start request)
     115 *   - STATE_FAILED.  (error occured)
    119116 */
    120117int unit_start(unit_t *unit)
  • uspace/srv/sysman/unit.h

    rd7c5fc0 r3f7e1f24  
    6262
    6363        unit_state_t state;
    64         fibril_mutex_t state_mtx;
    65         fibril_condvar_t state_cv;
    6664
    6765        list_t dependencies;
  • uspace/srv/sysman/units/unit_cfg.c

    rd7c5fc0 r3f7e1f24  
    176176                }
    177177
    178                 assert(unit->state = STATE_EMBRYO);
     178                assert(unit->state == STATE_EMBRYO);
    179179                configuration_add_unit(unit);
    180180        }
     
    232232        assert(u_cfg);
    233233
    234         /*
    235          * Skip starting state and hold state lock during whole configuration
    236          * load.
    237          */
    238         fibril_mutex_lock(&unit->state_mtx);
    239234        int rc = cfg_load_configuration(u_cfg->path);
    240235       
     
    244239                unit->state = STATE_FAILED;
    245240        }
    246         fibril_condvar_broadcast(&unit->state_cv);
    247         fibril_mutex_unlock(&unit->state_mtx);
    248241
    249242        return rc;
  • uspace/srv/sysman/units/unit_mnt.c

    rd7c5fc0 r3f7e1f24  
    8787static int unit_mnt_start(unit_t *unit)
    8888{
     89        // TODO replace with non-blocking
     90        const bool blocking = true;
    8991        unit_mnt_t *u_mnt = CAST_MNT(unit);
    9092        assert(u_mnt);
    9193
    92         fibril_mutex_lock(&unit->state_mtx);
    9394       
    9495        // TODO think about unit's lifecycle (is STOPPED only acceptable?)
    9596        assert(unit->state == STATE_STOPPED);
    96         unit->state = STATE_STARTING;
    97        
    98         fibril_condvar_broadcast(&unit->state_cv);
    99         fibril_mutex_unlock(&unit->state_mtx);
    10097
    10198
    10299        // TODO use other mount parameters
    103100        int rc = mount(u_mnt->type, u_mnt->mountpoint, u_mnt->device, "",
    104             IPC_FLAG_BLOCKING, 0);
     101            blocking ? IPC_FLAG_BLOCKING : 0, 0);
    105102
    106         if (rc == EOK) {
    107                 sysman_log(LVL_NOTE, "Mount ('%s') mounted", unit_name(unit));
    108                 unit_set_state(unit, STATE_STARTED);
     103        if (blocking) {
     104                if (rc == EOK) {
     105                        sysman_log(LVL_DEBUG, "Mount ('%s') mounted", unit_name(unit));
     106                        unit->state = STATE_STARTED;
     107                } else {
     108                        sysman_log(LVL_ERROR, "Mount ('%s') failed (%i)",
     109                            unit_name(unit), rc);
     110                        unit->state = STATE_FAILED;
     111                }
    109112        } else {
    110                 sysman_log(LVL_ERROR, "Mount ('%s') failed (%i)",
    111                     unit_name(unit), rc);
    112                 unit_set_state(unit, STATE_FAILED);
     113                if (rc == EOK) {
     114                        sysman_log(LVL_DEBUG, "Mount ('%s') requested", unit_name(unit));
     115                        unit->state = STATE_STARTING;
     116                } else {
     117                        sysman_log(LVL_ERROR, "Mount ('%s') request failed (%i)",
     118                            unit_name(unit), rc);
     119                        unit->state = STATE_FAILED;
     120                }
    113121        }
    114122
  • uspace/srv/sysman/units/unit_tgt.c

    rd7c5fc0 r3f7e1f24  
    5959        assert(u_tgt);
    6060
     61        unit->state = STATE_STARTED;
    6162        return EOK;
    6263}
Note: See TracChangeset for help on using the changeset viewer.