Skip to content

Commit 79baed9

Browse files
committed
implement std::execution::task_scheduler per P3927
1 parent 24ba625 commit 79baed9

19 files changed

Lines changed: 1428 additions & 336 deletions

include/exec/__detail/__system_context_replaceability_api.hpp

Lines changed: 25 additions & 106 deletions
Original file line numberDiff line numberDiff line change
@@ -18,128 +18,47 @@
1818
#define STDEXEC_SYSTEM_CONTEXT_REPLACEABILITY_API_H
1919

2020
#include "../../stdexec/__detail/__execution_fwd.hpp"
21+
#include "../../stdexec/__detail/__system_context_replaceability_api.hpp"
2122

2223
#include <cstddef>
23-
#include <cstdint>
24-
#include <exception>
2524
#include <memory>
26-
#include <optional>
27-
#include <span>
28-
29-
struct __uuid {
30-
std::uint64_t __parts1;
31-
std::uint64_t __parts2;
32-
33-
friend auto operator==(__uuid, __uuid) noexcept -> bool = default;
34-
};
3525

3626
namespace exec::system_context_replaceability {
27+
using STDEXEC::system_context_replaceability::__parallel_scheduler_backend_factory;
3728

38-
/// Helper for the `__queryable_interface` concept.
39-
template <__uuid X>
40-
using __check_constexpr_uuid = void;
41-
42-
/// Concept for a queryable interface. Ensures that the interface has a `__interface_identifier` member.
43-
template <typename _T>
44-
concept __queryable_interface = requires() {
45-
typename __check_constexpr_uuid<_T::__interface_identifier>;
46-
};
47-
48-
/// The details for making `_T` a runtime property.
49-
template <typename _T>
50-
struct __runtime_property_helper {
51-
/// Is `_T` a property?
52-
static constexpr bool __is_property = false;
53-
/// The unique identifier for the property.
54-
static constexpr __uuid __property_identifier{0, 0};
55-
};
56-
57-
/// `inplace_stope_token` is a runtime property.
58-
template <>
59-
struct __runtime_property_helper<STDEXEC::inplace_stop_token> {
60-
static constexpr bool __is_property = true;
61-
static constexpr __uuid __property_identifier{0x8779c09d8aa249df, 0x867db0e653202604};
62-
};
63-
64-
/// Concept for a runtime property.
65-
template <typename _T>
66-
concept __runtime_property = __runtime_property_helper<_T>::__is_property;
67-
68-
struct parallel_scheduler_backend;
29+
/// Interface for the parallel scheduler backend.
30+
using parallel_scheduler_backend [[deprecated(
31+
"Use STDEXEC::system_context_replaceability::parallel_scheduler_backend instead.")]] =
32+
STDEXEC::system_context_replaceability::parallel_scheduler_backend;
6933

7034
/// Get the backend for the parallel scheduler.
7135
/// Users might replace this function.
72-
auto query_parallel_scheduler_backend() -> std::shared_ptr<parallel_scheduler_backend>;
73-
74-
/// The type of a factory that can create `parallel_scheduler_backend` instances.
75-
/// Out of spec.
76-
using __parallel_scheduler_backend_factory = std::shared_ptr<parallel_scheduler_backend> (*)();
36+
[[deprecated(
37+
"Use STDEXEC::system_context_replaceability::query_parallel_scheduler_backend instead.")]]
38+
inline auto query_parallel_scheduler_backend() -> std::shared_ptr<parallel_scheduler_backend> {
39+
return STDEXEC::system_context_replaceability::query_parallel_scheduler_backend();
40+
}
7741

7842
/// Set a factory for the parallel scheduler backend.
7943
/// Can be used to replace the parallel scheduler at runtime.
8044
/// Out of spec.
81-
auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory)
82-
-> __parallel_scheduler_backend_factory;
45+
[[deprecated(
46+
"Use STDEXEC::system_context_replaceability::set_parallel_scheduler_backend instead.")]]
47+
inline auto set_parallel_scheduler_backend(__parallel_scheduler_backend_factory __new_factory)
48+
-> __parallel_scheduler_backend_factory {
49+
return STDEXEC::system_context_replaceability::set_parallel_scheduler_backend(__new_factory);
50+
}
8351

8452
/// Interface for completing a sender operation. Backend will call frontend though this interface
8553
/// for completing the `schedule` and `schedule_bulk` operations.
86-
struct receiver {
87-
virtual ~receiver() = default;
88-
89-
protected:
90-
virtual auto __query_env(__uuid, void*) noexcept -> bool = 0;
91-
92-
public:
93-
/// Called when the system scheduler completes successfully.
94-
virtual void set_value() noexcept = 0;
95-
/// Called when the system scheduler completes with an error.
96-
virtual void set_error(std::exception_ptr) noexcept = 0;
97-
/// Called when the system scheduler was stopped.
98-
virtual void set_stopped() noexcept = 0;
99-
100-
/// Query the receiver for a property of type `_P`.
101-
template <typename _P>
102-
auto try_query() noexcept -> std::optional<std::decay_t<_P>> {
103-
if constexpr (__runtime_property<_P>) {
104-
std::decay_t<_P> __p;
105-
bool __success =
106-
__query_env(__runtime_property_helper<std::decay_t<_P>>::__property_identifier, &__p);
107-
return __success ? std::make_optional(std::move(__p)) : std::nullopt;
108-
} else {
109-
return std::nullopt;
110-
}
111-
}
112-
};
113-
114-
/// Receiver for bulk sheduling operations.
115-
struct bulk_item_receiver : receiver {
116-
/// Called for each item of a bulk operation, possible on different threads.
117-
virtual void execute(std::uint32_t, std::uint32_t) noexcept = 0;
118-
};
119-
120-
/// Interface for the parallel scheduler backend.
121-
struct parallel_scheduler_backend {
122-
static constexpr __uuid __interface_identifier{0x5ee9202498c4bd4f, 0xa1df2508ffcd9d7e};
123-
124-
virtual ~parallel_scheduler_backend() = default;
125-
126-
/// Schedule work on parallel scheduler, calling `__r` when done and using `__s` for preallocated
127-
/// memory.
128-
virtual void schedule(std::span<std::byte> __s, receiver& __r) noexcept = 0;
129-
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for different
130-
/// subranges of [0, __n), and using `__s` for preallocated memory.
131-
virtual void schedule_bulk_chunked(
132-
std::uint32_t __n,
133-
std::span<std::byte> __s,
134-
bulk_item_receiver& __r) noexcept = 0;
135-
/// Schedule bulk work of size `__n` on parallel scheduler, calling `__r` for each item, and
136-
/// using `__s` for preallocated memory.
137-
virtual void schedule_bulk_unchunked(
138-
std::uint32_t __n,
139-
std::span<std::byte> __s,
140-
bulk_item_receiver& __r) noexcept = 0;
141-
};
142-
54+
using receiver
55+
[[deprecated("Use STDEXEC::system_context_replaceability::receiver_proxy instead.")]] =
56+
STDEXEC::system_context_replaceability::receiver_proxy;
57+
58+
/// Receiver for bulk scheduling operations.
59+
using bulk_item_receiver [[deprecated(
60+
"Use STDEXEC::system_context_replaceability::bulk_item_receiver_proxy instead.")]] =
61+
STDEXEC::system_context_replaceability::bulk_item_receiver_proxy;
14362
} // namespace exec::system_context_replaceability
14463

14564
#endif

include/exec/system_context.hpp

Lines changed: 63 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,12 @@
1515
*/
1616
#pragma once
1717

18-
#include <utility>
19-
2018
#include "../stdexec/execution.hpp"
2119
#include "__detail/__system_context_replaceability_api.hpp"
2220

21+
#include <optional>
22+
#include <utility>
23+
2324
#ifndef STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE
2425
# define STDEXEC_SYSTEM_CONTEXT_SCHEDULE_OP_SIZE 72
2526
#endif
@@ -43,48 +44,61 @@ namespace exec {
4344
namespace detail {
4445
/// Allows a frontend receiver of type `_Rcvr` to be passed to the backend.
4546
template <class _Rcvr>
46-
struct __receiver_adapter : system_context_replaceability::receiver {
47+
struct __receiver_adapter : STDEXEC::system_context_replaceability::receiver_proxy {
4748
explicit __receiver_adapter(_Rcvr&& __rcvr)
48-
: __rcvr_{std::forward<_Rcvr>(__rcvr)} {
49-
}
50-
51-
auto __query_env(__uuid __id, void* __dest) noexcept -> bool override {
52-
using system_context_replaceability::__runtime_property_helper;
53-
using __StopToken = decltype(STDEXEC::get_stop_token(STDEXEC::get_env(__rcvr_)));
54-
if constexpr (std::is_same_v<STDEXEC::inplace_stop_token, __StopToken>) {
55-
if (__id == __runtime_property_helper<STDEXEC::inplace_stop_token>::__property_identifier) {
56-
*static_cast<STDEXEC::inplace_stop_token*>(__dest) = STDEXEC::get_stop_token(
57-
STDEXEC::get_env(__rcvr_));
58-
return true;
59-
}
60-
}
61-
return false;
49+
: __rcvr_{static_cast<_Rcvr&&>(__rcvr)} {
6250
}
6351

6452
void set_value() noexcept override {
6553
STDEXEC::set_value(std::forward<_Rcvr>(__rcvr_));
6654
}
6755

68-
void set_error(std::exception_ptr __ex) noexcept override {
56+
void set_error(std::exception_ptr&& __ex) noexcept override {
6957
STDEXEC::set_error(std::forward<_Rcvr>(__rcvr_), std::move(__ex));
7058
}
7159

7260
void set_stopped() noexcept override {
7361
STDEXEC::set_stopped(std::forward<_Rcvr>(__rcvr_));
7462
}
7563

64+
protected:
65+
void __query_env(
66+
STDEXEC::__type_index __query_type,
67+
STDEXEC::__type_index __value_type,
68+
void* __dest) const noexcept override {
69+
if (__query_type == STDEXEC::__mtypeid<STDEXEC::get_stop_token_t>) {
70+
__query(STDEXEC::get_stop_token, __value_type, __dest);
71+
}
72+
}
73+
74+
private:
75+
void __query(STDEXEC::get_stop_token_t, STDEXEC::__type_index __value_type, void* __dest)
76+
const noexcept {
77+
using __stop_token_t = STDEXEC::stop_token_of_t<STDEXEC::env_of_t<_Rcvr>>;
78+
if constexpr (std::is_same_v<STDEXEC::inplace_stop_token, __stop_token_t>) {
79+
if (__value_type == STDEXEC::__mtypeid<STDEXEC::inplace_stop_token>) {
80+
using __dest_t = std::optional<STDEXEC::inplace_stop_token>;
81+
*static_cast<__dest_t*>(__dest) = STDEXEC::get_stop_token(STDEXEC::get_env(__rcvr_));
82+
}
83+
}
84+
}
85+
86+
public:
7687
STDEXEC_ATTRIBUTE(no_unique_address)
7788
_Rcvr __rcvr_;
7889
};
7990

8091
/// The type large enough to store the data produced by a sender.
92+
/// BUGBUG: this seems wrong. i think this should be a variant of tuples of possible
93+
/// results.
8194
template <class _Sender>
8295
using __sender_data_t = decltype(STDEXEC::sync_wait(std::declval<_Sender>()).value());
8396

8497
} // namespace detail
8598

8699
class parallel_scheduler;
87100
class __parallel_sender;
101+
88102
template <bool, STDEXEC::sender _S, std::integral _Size, class _Fn, bool>
89103
class __parallel_bulk_sender;
90104

@@ -106,7 +120,7 @@ namespace exec {
106120

107121
namespace detail {
108122
using __backend_ptr =
109-
std::shared_ptr<system_context_replaceability::parallel_scheduler_backend>;
123+
std::shared_ptr<STDEXEC::system_context_replaceability::parallel_scheduler_backend>;
110124

111125
template <class T>
112126
auto __make_parallel_scheduler_from(T, __backend_ptr) noexcept;
@@ -199,7 +213,7 @@ namespace exec {
199213
auto& __scheduler_impl = __preallocated_.__as<__backend_ptr>();
200214
auto __impl = std::move(__scheduler_impl);
201215
std::destroy_at(&__scheduler_impl);
202-
__impl->schedule(__preallocated_.__as_storage(), __rcvr_);
216+
__impl->schedule(__rcvr_, __preallocated_.__as_storage());
203217
}
204218

205219
/// Object that receives completion from the work described by the sender.
@@ -312,7 +326,8 @@ namespace exec {
312326
/// This represents the base class that abstracts the storage of the values sent by the previous sender.
313327
/// Derived class will properly implement the receiver methods.
314328
template <class _Previous>
315-
struct __forward_args_receiver : system_context_replaceability::bulk_item_receiver {
329+
struct __forward_args_receiver
330+
: STDEXEC::system_context_replaceability::bulk_item_receiver_proxy {
316331
using __storage_t = detail::__sender_data_t<_Previous>;
317332

318333
/// Storage for the arguments received from the previous sender.
@@ -329,24 +344,11 @@ namespace exec {
329344
/// Stores `__as` in the base class storage, with the right types.
330345
explicit __typed_forward_args_receiver(_As&&... __as) {
331346
static_assert(sizeof(std::tuple<_As...>) <= sizeof(__base_t::__arguments_data_));
347+
// BUGBUG: this seems wrong. we are not ever destroying this tuple.
332348
new (__base_t::__arguments_data_)
333349
std::tuple<STDEXEC::__decay_t<_As>...>{std::move(__as)...};
334350
}
335351

336-
auto __query_env(__uuid __id, void* __dest) noexcept -> bool override {
337-
auto __state = reinterpret_cast<_BulkState*>(this);
338-
using system_context_replaceability::__runtime_property_helper;
339-
using __StopToken = decltype(STDEXEC::get_stop_token(STDEXEC::get_env(__state->__rcvr_)));
340-
if constexpr (std::is_same_v<STDEXEC::inplace_stop_token, __StopToken>) {
341-
if (__id == __runtime_property_helper<STDEXEC::inplace_stop_token>::__property_identifier) {
342-
*static_cast<STDEXEC::inplace_stop_token*>(__dest) = STDEXEC::get_stop_token(
343-
STDEXEC::get_env(__state->__rcvr_));
344-
return true;
345-
}
346-
}
347-
return false;
348-
}
349-
350352
/// Calls `set_value()` on the final receiver of the bulk operation, using the values from the previous sender.
351353
void set_value() noexcept override {
352354
auto __state = reinterpret_cast<_BulkState*>(this);
@@ -396,6 +398,30 @@ namespace exec {
396398
*reinterpret_cast<std::tuple<_As...>*>(__base_t::__arguments_data_));
397399
}
398400
}
401+
402+
protected:
403+
void __query_env(
404+
STDEXEC::__type_index __query_type,
405+
STDEXEC::__type_index __value_type,
406+
void* __dest) const noexcept override {
407+
if (__query_type == STDEXEC::__mtypeid<STDEXEC::get_stop_token_t>) {
408+
__query(STDEXEC::get_stop_token, __value_type, __dest);
409+
}
410+
}
411+
412+
private:
413+
void __query(STDEXEC::get_stop_token_t, STDEXEC::__type_index __value_type, void* __dest)
414+
const noexcept {
415+
auto __state = reinterpret_cast<const _BulkState*>(this);
416+
using __stop_token_t = STDEXEC::stop_token_of_t<STDEXEC::env_of_t<__rcvr_t>>;
417+
if constexpr (std::is_same_v<STDEXEC::inplace_stop_token, __stop_token_t>) {
418+
using __dest_t = std::optional<STDEXEC::inplace_stop_token>;
419+
if (__value_type == STDEXEC::__mtypeid<STDEXEC::inplace_stop_token>) {
420+
*static_cast<__dest_t*>(__dest) = STDEXEC::get_stop_token(
421+
STDEXEC::get_env(__state->__rcvr_));
422+
}
423+
}
424+
}
399425
};
400426

401427
/// The state needed to execute the bulk sender created from system context, minus the preallocates space.
@@ -645,7 +671,7 @@ namespace exec {
645671
};
646672

647673
inline auto get_parallel_scheduler() -> parallel_scheduler {
648-
auto __impl = system_context_replaceability::query_parallel_scheduler_backend();
674+
auto __impl = STDEXEC::system_context_replaceability::query_parallel_scheduler_backend();
649675
if (!__impl) {
650676
STDEXEC_THROW(std::runtime_error{"No system context implementation found"});
651677
}
@@ -728,5 +754,5 @@ namespace exec {
728754

729755
#if defined(STDEXEC_SYSTEM_CONTEXT_HEADER_ONLY)
730756
# define STDEXEC_SYSTEM_CONTEXT_INLINE inline
731-
# include "__detail/__system_context_default_impl_entry.hpp"
757+
# include "../stdexec/__detail/__system_context_default_impl_entry.hpp"
732758
#endif

0 commit comments

Comments
 (0)