Changeset 3f7e1f24 in mainline for uspace/srv/sysman/sysman.c


Ignore:
Timestamp:
2019-08-03T08:28:26Z (5 years ago)
Author:
Matthieu Riolo <matthieu.riolo@…>
Children:
095d03c
Parents:
d7c5fc0
git-author:
Michal Koutný <xm.koutny+hos@…> (2015-04-22 17:54:08)
git-committer:
Matthieu Riolo <matthieu.riolo@…> (2019-08-03 08:28:26)
Message:

sysman: Refactored job manipulation (event loop + one main fibril)

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/srv/sysman/sysman.c

    rd7c5fc0 r3f7e1f24  
    2727 */
    2828
     29#include <adt/hash_table.h>
    2930#include <adt/list.h>
    3031#include <errno.h>
    31 
    32 #include "dep.h"
    33 #include "job.h"
     32#include <fibril_synch.h>
     33#include <stdlib.h>
     34
     35#include "log.h"
    3436#include "sysman.h"
    3537
    36 /** Create jobs for cluser of given unit.
    37  *
    38  * @note Using recursion, limits "depth" of dependency graph.
    39  */
    40 static int sysman_create_closure_jobs(unit_t *unit, job_t **entry_job_ptr,
    41     list_t *accumulator, job_type_t type)
    42 {
    43         int rc = EOK;
    44         job_t *job = job_create(type);
    45         if (job == NULL) {
     38
     39/* Do not expose this generally named type */
     40typedef struct {
     41        link_t event_queue;
     42
     43        event_handler_t handler;
     44        void *data;
     45} event_t;
     46
     47typedef struct {
     48        link_t callbacks;
     49
     50        callback_handler_t handler;
     51        void *data;
     52} obj_callback_t;
     53
     54typedef struct {
     55        ht_link_t ht_link;
     56
     57        void *object;
     58        list_t callbacks;
     59} observed_object_t;
     60
     61static LIST_INITIALIZE(event_queue);
     62static fibril_mutex_t event_queue_mtx;
     63static fibril_condvar_t event_queue_cv;
     64
     65static hash_table_t observed_objects;
     66static fibril_mutex_t observed_objects_mtx;
     67static fibril_condvar_t observed_objects_cv;
     68
     69/* Hash table functions */
     70static size_t observed_objects_ht_hash(const ht_link_t *item)
     71{
     72        observed_object_t *callbacks =
     73            hash_table_get_inst(item, observed_object_t, ht_link);
     74
     75        return (size_t) callbacks->object;
     76}
     77
     78static size_t observed_objects_ht_key_hash(void *key)
     79{
     80        void *object = *(void **) key;
     81        return (size_t) object;
     82}
     83
     84static bool observed_objects_ht_key_equal(void *key, const ht_link_t *item)
     85{
     86        void *object = *(void **)key;
     87        return (
     88            hash_table_get_inst(item, observed_object_t, ht_link)->object ==
     89            object);
     90}
     91
     92static hash_table_ops_t observed_objects_ht_ops = {
     93        .hash            = &observed_objects_ht_hash,
     94        .key_hash        = &observed_objects_ht_key_hash,
     95        .equal           = NULL,
     96        .key_equal       = &observed_objects_ht_key_equal,
     97        .remove_callback = NULL
     98};
     99
     100static void notify_observers(void *object)
     101{
     102        ht_link_t *item = hash_table_find(&observed_objects, &object);
     103        if (item == NULL) {
     104                return;
     105        }
     106        observed_object_t *observed_object =
     107            hash_table_get_inst(item, observed_object_t, ht_link);
     108
     109        list_foreach_safe(observed_object->callbacks, cur_link, next_link) {
     110                obj_callback_t *callback =
     111                    list_get_instance(cur_link, obj_callback_t, callbacks);
     112                callback->handler(object, callback->data);
     113                list_remove(cur_link);
     114                free(callback);
     115        }
     116}
     117
     118/*
     119 * Non-static functions
     120 */
     121void sysman_events_init(void)
     122{
     123        fibril_mutex_initialize(&event_queue_mtx);
     124        fibril_condvar_initialize(&event_queue_cv);
     125
     126        bool table =
     127            hash_table_create(&observed_objects, 0, 0, &observed_objects_ht_ops);
     128        if (!table) {
     129                sysman_log(LVL_FATAL, "%s: Failed initialization", __func__);
     130                abort();
     131        }
     132        fibril_mutex_initialize(&observed_objects_mtx);
     133        fibril_condvar_initialize(&observed_objects_cv);
     134}
     135
     136int sysman_events_loop(void *unused)
     137{
     138        while (1) {
     139                /* Pop event */
     140                fibril_mutex_lock(&event_queue_mtx);
     141                while (list_empty(&event_queue)) {
     142                        fibril_condvar_wait(&event_queue_cv, &event_queue_mtx);
     143                }
     144
     145                link_t *li_event = list_first(&event_queue);
     146                list_remove(li_event);
     147                event_t *event =
     148                    list_get_instance(li_event, event_t, event_queue);
     149                fibril_mutex_unlock(&event_queue_mtx);
     150
     151                /* Process event */
     152                event->handler(event->data);
     153                free(event);
     154        }
     155}
     156
     157void sysman_raise_event(event_handler_t handler, void *data)
     158{
     159        event_t *event = malloc(sizeof(event_t));
     160        if (event == NULL) {
     161                sysman_log(LVL_FATAL, "%s: cannot allocate event", __func__);
     162                // TODO think about aborting system critical task
     163                abort();
     164        }
     165        link_initialize(&event->event_queue);
     166        event->handler = handler;
     167        event->data = data;
     168
     169        fibril_mutex_lock(&event_queue_mtx);
     170        list_append(&event->event_queue, &event_queue);
     171        /* There's only single event loop, broadcast is unnecessary */
     172        fibril_condvar_signal(&event_queue_cv);
     173        fibril_mutex_unlock(&event_queue_mtx);
     174}
     175
     176/** Register single-use object observer callback
     177 *
     178 * TODO no one handles return value, it's quite fatal to lack memory for
     179 *      callbacks...  @return EOK on success
     180 * @return ENOMEM
     181 */
     182int sysman_object_observer(void *object, callback_handler_t handler, void *data)
     183{
     184        int rc;
     185        observed_object_t *observed_object = NULL;
     186        observed_object_t *new_observed_object = NULL;
     187        ht_link_t *ht_link = hash_table_find(&observed_objects, &object);
     188
     189        if (ht_link == NULL) {
     190                observed_object = malloc(sizeof(observed_object_t));
     191                if (observed_object == NULL) {
     192                        rc = ENOMEM;
     193                        goto fail;
     194                }
     195                new_observed_object = observed_object;
     196
     197                observed_object->object = object;
     198                list_initialize(&observed_object->callbacks);
     199                hash_table_insert(&observed_objects, &observed_object->ht_link);
     200        } else {
     201                observed_object =
     202                    hash_table_get_inst(ht_link, observed_object_t, ht_link);
     203        }
     204
     205        obj_callback_t *obj_callback = malloc(sizeof(obj_callback_t));
     206        if (obj_callback == NULL) {
    46207                rc = ENOMEM;
    47208                goto fail;
    48209        }
    49210
    50         job->unit = unit;
    51 
    52         list_foreach(unit->dependencies, dependencies, unit_dependency_t, dep) {
    53                 job_t *blocking_job = NULL;
    54                 rc = sysman_create_closure_jobs(dep->dependency, &blocking_job,
    55                     accumulator, type);
    56                 if (rc != EOK) {
    57                         goto fail;
    58                 }
    59                
    60                 rc = job_add_blocking_job(job, blocking_job);
    61                 if (rc != EOK) {
    62                         goto fail;
    63                 }
    64         }
    65 
    66         /* Job is passed to the accumulator, i.e. no add_ref. */
    67         list_append(&job->link, accumulator);
    68 
    69         if (entry_job_ptr != NULL) {
    70                 *entry_job_ptr = job;
    71         }
     211        obj_callback->handler = handler;
     212        obj_callback->data = data;
     213        list_append(&obj_callback->callbacks, &observed_object->callbacks);
    72214        return EOK;
    73215
    74216fail:
    75         job_del_ref(&job);
     217        free(new_observed_object);
    76218        return rc;
    77219}
    78220
    79 int sysman_unit_start(unit_t *unit)
    80 {
    81         list_t new_jobs;
    82         list_initialize(&new_jobs);
    83 
    84         job_t *job = NULL;
    85         // TODO shouldn't be here read-lock on configuration?
    86         int rc = sysman_create_closure_jobs(unit, &job, &new_jobs, JOB_START);
     221/*
     222 * Event handlers
     223 */
     224
     225// NOTE must run in main event loop fibril
     226void sysman_event_job_process(void *arg)
     227{
     228        job_t *job = arg;
     229        dyn_array_t job_closure;
     230        dyn_array_initialize(&job_closure, job_ptr_t, 0);
     231
     232        int rc = job_create_closure(job, &job_closure);
    87233        if (rc != EOK) {
    88                 return rc;
    89         }
    90 
    91         // TODO handle errors when adding job accumulator
    92         job_queue_jobs(&new_jobs);
    93 
    94         return job_wait(job);
    95 }
     234                sysman_log(LVL_ERROR, "Cannot create closure for job %p (%i)",
     235                    job, rc);
     236                goto fail;
     237        }
     238
     239        rc = job_queue_add_jobs(&job_closure);
     240        if (rc != EOK) {
     241                // TODO job_queue_add_jobs should log message
     242                goto fail;
     243        }
     244
     245        // TODO explain why calling asynchronously
     246        sysman_raise_event(&sysman_event_job_queue_run, NULL);
     247        return;
     248
     249fail:
     250        job->retval = JOB_FAILED;
     251        job_finish(job);
     252        // TODO clarify refcount to the main job
     253        dyn_array_foreach(job_closure, job_ptr_t, closure_job) {
     254                job_del_ref(&(*closure_job));
     255        }
     256        dyn_array_destroy(&job_closure);
     257}
     258
     259
     260void sysman_event_job_queue_run(void *unused)
     261{
     262        job_t *job;
     263        while ((job = job_queue_pop_runnable())) {
     264                job_run(job);
     265        }
     266}
     267
     268void sysman_event_job_changed(void *object)
     269{
     270        notify_observers(object);
     271}
Note: See TracChangeset for help on using the changeset viewer.