Changeset 627dc41 in mainline


Ignore:
Timestamp:
2019-06-28T15:23:03Z (5 years ago)
Author:
Jaroslav Jindrak <dzejrou@…>
Branches:
lfn, master, serial, ticket/834-toolchain-update, topic/msim-upgrade, topic/simplify-dev-export
Children:
a95e75e
Parents:
1995ac3
Message:

cpp: add deferred/async shared state for asynchronous function execution and implement async

File:
1 edited

Legend:

Unmodified
Added
Removed
  • uspace/lib/cpp/include/__bits/thread/future.hpp

    r1995ac3 r627dc41  
    3030#define LIBCPP_BITS_THREAD_FUTURE
    3131
     32#include <__bits/functional/function.hpp>
     33#include <__bits/functional/invoke.hpp>
    3234#include <__bits/refcount_obj.hpp>
    3335#include <__bits/thread/threading.hpp>
     
    3537#include <memory>
    3638#include <system_error>
     39#include <thread>
     40#include <tuple>
    3741#include <type_traits>
    3842#include <utility>
     
    104108        {
    105109            public:
    106                 const bool is_deferred_function;
    107 
    108                 shared_state(bool is_deferred = false)
    109                     : is_deferred_function{is_deferred}, mutex_{},
    110                       condvar_{}, value_{}, value_set_{false},
     110                shared_state()
     111                    : mutex_{}, condvar_{}, value_{}, value_set_{false},
    111112                      exception_{}, has_exception_{false}
    112113                {
     
    117118                void destroy() override
    118119                {
    119                     if (this->refs() < 1)
    120                     {
    121                         // TODO: what to destroy? just this?
    122                     }
     120                    /**
     121                     * Note: No need to act in this case, async shared
     122                     *       state is the object that needs to sometimes
     123                     *       invoke its payload.
     124                     */
    123125                }
    124126
    125127                void set_value(const R& val, bool set)
    126128                {
     129                    /**
     130                     * Note: This is the 'mark ready' move described
     131                     *       in 30.6.4 (6).
     132                     */
     133
    127134                    aux::threading::mutex::lock(mutex_);
    128135                    value_ = val;
     
    133140                }
    134141
    135                 void set_value(R&& val, bool set)
     142                void set_value(R&& val, bool set = true)
    136143                {
    137144                    aux::threading::mutex::lock(mutex_);
     
    143150                }
    144151
    145                 void set_set(bool set = true) noexcept
     152                void mark_set(bool set = true) noexcept
    146153                {
    147154                    value_set_ = set;
     
    184191                 *       functions.
    185192                 */
    186                 void wait()
     193                virtual void wait()
    187194                {
    188195                    aux::threading::mutex::lock(mutex_);
     
    218225                }
    219226
    220                 ~shared_state()
    221                 {
    222                     // TODO: just destroy?
    223                 }
     227                ~shared_state() override = default;
    224228
    225229            private:
     
    232236                exception_ptr exception_;
    233237                bool has_exception_;
     238        };
     239
     240        /**
     241         * We could make one state for both async and
     242         * deferred policies, but then we would be wasting
     243         * memory and the only benefit would be the ability
     244         * for additional implementation defined policies done
     245         * directly in that state (as opposed to making new
     246         * states for them).
     247         *
     248         * But since we have no plan (nor need) to make those,
     249         * this approach seems to be the best one.
     250         * TODO: Override wait_for and wait_until in both!
     251         */
     252
     253        template<class R, class F, class... Args>
     254        class async_shared_state: public shared_state<R>
     255        {
     256            public:
     257                async_shared_state(F&& f, Args&&... args)
     258                    : shared_state<R>{}, thread_{}
     259                {
     260                    thread_ = thread{
     261                        [=](){
     262                            try
     263                            {
     264                                this->set_value(invoke(f, args...));
     265                            }
     266                            catch(...) // TODO: Any exception.
     267                            {
     268                                // TODO: Store it.
     269                            }
     270                        }
     271                    };
     272                }
     273
     274                void destroy() override
     275                {
     276                    if (!this->is_set())
     277                        thread_.join();
     278                }
     279
     280                void wait() override
     281                {
     282                    if (!this->is_set())
     283                        thread_.join();
     284                }
     285
     286                ~async_shared_state() override
     287                {
     288                    destroy();
     289                }
     290
     291            private:
     292                thread thread_;
     293        };
     294
     295        template<class R, class F, class... Args>
     296        class deferred_shared_state: public shared_state<R>
     297        {
     298            public:
     299                template<class G>
     300                deferred_shared_state(G&& f, Args&&... args)
     301                    : shared_state<R>{}, func_{forward<F>(f)},
     302                      args_{forward<Args>(args)...}
     303                { /* DUMMY BODY */ }
     304
     305                void destroy() override
     306                {
     307                    if (!this->is_set())
     308                        invoke_(make_index_sequence<sizeof...(Args)>{});
     309                }
     310
     311                void wait() override
     312                {
     313                    // TODO: Should these be synchronized for async?
     314                    if (!this->is_set())
     315                        invoke_(make_index_sequence<sizeof...(Args)>{});
     316                }
     317
     318                ~deferred_shared_state() override
     319                {
     320                    destroy();
     321                }
     322
     323            private:
     324                function<R(decay_t<Args>...)> func_;
     325                tuple<decay_t<Args>...> args_;
     326
     327                template<size_t... Is>
     328                void invoke_(index_sequence<Is...>)
     329                {
     330                    try
     331                    {
     332                        this->set_value(invoke(move(func_), get<Is>(move(args_))...));
     333                    }
     334                    catch(...)
     335                    {
     336                        // TODO: Store it.
     337                    }
     338                }
    234339        };
    235340    }
     
    359464            {
    360465                /**
     466                 * Note: This is the 'abandon' move described in
     467                 *       30.6.4 (7).
    361468                 * 1) If state is not ready:
    362469                 *   a) Store exception of type future_error with
     
    447554                wait();
    448555
    449                 auto state = state_;
    450                 state_ = nullptr;
    451                 if (state->has_exception())
    452                     state->throw_stored_exception();
    453 
    454                 return std::move(state->get());
     556                if (state_->has_exception())
     557                    state_->throw_stored_exception();
     558                auto res = std::move(state_->get());
     559
     560                release_state_();
     561
     562                return res;
    455563            }
    456564
     
    500608            void release_state_()
    501609            {
     610                if (!state_)
     611                    return;
     612
    502613                /**
     614                 * Note: This is the 'release' move described in
     615                 *       30.6.4 (5).
    503616                 * Last reference to state -> destroy state.
    504617                 * Decrement refcount of state otherwise.
     
    508621                 *  3) This was the last reference to the shared state.
    509622                 */
     623                if (state_->decrement())
     624                {
     625                    /**
     626                     * The destroy call handles the special case
     627                     * when 1) - 3) hold.
     628                     */
     629                    state_->destroy();
     630                    delete state_;
     631                    state_ = nullptr;
     632                }
    510633            }
    511634
     
    525648    };
    526649
    527     // TODO: Make sure the move constructor of shared_future
     650    // TODO: Make sure the move-future constructor of shared_future
    528651    //       invalidates the state (i.e. sets to nullptr).
    529652    template<class R>
     
    603726    { /* DUMMY BODY */ };
    604727
    605     template<class F, class... Args>
    606     future<result_of_t<decay_t<F>(decay_t<Args>...)>>
    607     async(F&& f, Args&&... args)
    608     {
    609         // TODO: implement
    610         __unimplemented();
     728    namespace aux
     729    {
     730        /**
     731         * Note: The reason we keep the actual function
     732         *       within the aux namespace is that were the non-policy
     733         *       version of the function call the other one in the std
     734         *       namespace, we'd get resolution conflicts. This way
     735         *       aux::async is properly called even if std::async is
     736         *       called either with or without a launch policy.
     737         */
     738        template<class F, class... Args>
     739        future<result_of_t<decay_t<F>(decay_t<Args>...)>>
     740        async(launch policy, F&& f, Args&&... args)
     741        {
     742            using result_t = result_of_t<decay_t<F>(decay_t<Args>...)>;
     743
     744            bool async = (static_cast<int>(policy) &
     745                          static_cast<int>(launch::async)) != 0;
     746            bool deferred = (static_cast<int>(policy) &
     747                             static_cast<int>(launch::deferred)) != 0;
     748
     749            /**
     750             * Note: The case when async | deferred is set in policy
     751             *       is implementation defined, feel free to change.
     752             */
     753            if (async && deferred)
     754            {
     755                return future<result_t>{
     756                    new aux::deferred_shared_state<
     757                        result_t, F, Args...
     758                    >{forward<F>(f), forward<Args>(args)...}
     759                };
     760            }
     761            else if (async)
     762            {
     763                return future<result_t>{
     764                    new aux::async_shared_state<
     765                        result_t, F, Args...
     766                    >{forward<F>(f), forward<Args>(args)...}
     767                };
     768            }
     769            else if (deferred)
     770            {
     771               /**
     772                * Duplicated on purpose because of the default.
     773                * Do not remove!
     774                */
     775                return future<result_t>{
     776                    new aux::deferred_shared_state<
     777                        result_t, F, Args...
     778                    >{forward<F>(f), forward<Args>(args)...}
     779                };
     780            }
     781
     782            /**
     783             * This is undefined behaviour, let's be nice though ;)
     784             */
     785            return future<result_t>{
     786                new aux::deferred_shared_state<
     787                    result_t, F, Args...
     788                >{forward<F>(f), forward<Args>(args)...}
     789            };
     790        }
    611791    }
    612792
    613     template<class F, class... Args>
    614     future<result_of_t<decay_t<F>(decay_t<Args>...)>>
    615     async(launch, F&& f, Args&&... args)
    616     {
    617         // TODO: implement
    618         __unimplemented();
     793    template<class F>
     794    decltype(auto) async(F&& f)
     795    {
     796        launch policy = static_cast<launch>(
     797            static_cast<int>(launch::async) |
     798            static_cast<int>(launch::deferred)
     799        );
     800
     801        return aux::async(policy, forward<F>(f));
     802    }
     803
     804    /**
     805     * The async(launch, F, Args...) and async(F, Args...)
     806     * overloards must not collide, so we check the first template
     807     * argument and handle the special case of just a functor
     808     * above.
     809     */
     810    template<class F, class Arg, class... Args>
     811    decltype(auto) async(F&& f, Arg&& arg, Args&&... args)
     812    {
     813        if constexpr (is_same_v<decay_t<F>, launch>)
     814            return aux::async(f, forward<Arg>(arg), forward<Args>(args)...);
     815        else
     816        {
     817            launch policy = static_cast<launch>(
     818                static_cast<int>(launch::async) |
     819                static_cast<int>(launch::deferred)
     820            );
     821
     822            return aux::async(policy, forward<F>(f), forward<Arg>(arg), forward<Args>(args)...);
     823        }
    619824    }
    620825}
Note: See TracChangeset for help on using the changeset viewer.