Skip to content

Commit 4cca273

Browse files
committed
add merge algorithm for sequence-senders
each input sequence may be on a different scheduler. The merged items will invoke `set_next` on the receiver from all of the contexts. Depending on the schedulers in play, the calls to `set_next` may overlap in parallel.
1 parent 082a9e4 commit 4cca273

File tree

3 files changed

+467
-0
lines changed

3 files changed

+467
-0
lines changed

include/exec/sequence/merge.hpp

Lines changed: 219 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,219 @@
1+
/*
2+
* Copyright (c) 2023 Maikel Nadolski
3+
* Copyright (c) 2023 NVIDIA Corporation
4+
*
5+
* Licensed under the Apache License Version 2.0 with LLVM Exceptions
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* https://llvm.org/LICENSE.txt
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
#pragma once
18+
19+
#include "../../stdexec/concepts.hpp"
20+
#include "../../stdexec/execution.hpp"
21+
#include "../sequence_senders.hpp"
22+
23+
#include "../__detail/__basic_sequence.hpp"
24+
#include "./transform_each.hpp"
25+
#include "./ignore_all_values.hpp"
26+
#include "stdexec/__detail/__execution_fwd.hpp"
27+
#include "stdexec/__detail/__meta.hpp"
28+
#include "stdexec/__detail/__senders_core.hpp"
29+
#include "stdexec/__detail/__transform_completion_signatures.hpp"
30+
31+
namespace exec {
32+
namespace __merge {
33+
using namespace stdexec;
34+
35+
template <class _Receiver>
36+
struct __operation_base {
37+
_Receiver __receiver_;
38+
};
39+
40+
template <class _ReceiverId>
41+
struct __result_receiver {
42+
using _Receiver = stdexec::__t<_ReceiverId>;
43+
44+
struct __t {
45+
using receiver_concept = stdexec::receiver_t;
46+
using __id = __result_receiver;
47+
48+
__operation_base<_Receiver>* __op_;
49+
50+
void set_value() noexcept {
51+
stdexec::set_value(static_cast<_Receiver&&>(__op_->__receiver_));
52+
}
53+
54+
template <class _Error>
55+
void set_error(_Error&& __error) noexcept {
56+
stdexec::set_error(
57+
static_cast<_Receiver&&>(__op_->__receiver_), static_cast<_Error&&>(__error));
58+
}
59+
60+
void set_stopped() noexcept
61+
{
62+
stdexec::set_stopped(static_cast<_Receiver&&>(__op_->__receiver_));
63+
}
64+
65+
auto get_env() const noexcept -> env_of_t<_Receiver> {
66+
return stdexec::get_env(__op_->__receiver_);
67+
}
68+
};
69+
};
70+
71+
template <class _ReceiverId>
72+
struct __merge_each_fn {
73+
using _Receiver = stdexec::__t<_ReceiverId>;
74+
75+
template <sender _Item>
76+
auto operator()(_Item&& __item, __operation_base<_Receiver>* __op) const noexcept(
77+
__nothrow_callable<set_next_t, _Receiver&, _Item>)
78+
-> next_sender_of_t<_Receiver, _Item> {
79+
return exec::set_next(
80+
__op->__receiver_, static_cast<_Item&&>(__item));
81+
}
82+
};
83+
84+
struct __combine {
85+
template<class _ReceiverId>
86+
using merge_each_fn_t = __binder_back<__merge_each_fn<_ReceiverId>, __operation_base<__t<_ReceiverId>>*>;
87+
88+
template<class _Sequence, class _ReceiverId>
89+
using transform_sender_t = __call_result_t<exec::transform_each_t, _Sequence, merge_each_fn_t<_ReceiverId>>;
90+
template<class _Sequence, class _ReceiverId>
91+
using ignored_sender_t = __call_result_t<exec::ignore_all_values_t, transform_sender_t<_Sequence, _ReceiverId>>;
92+
93+
template<class _ReceiverId, class... _Sequences>
94+
using result_sender_t = __call_result_t<when_all_t,
95+
ignored_sender_t<_Sequences, _ReceiverId>...>;
96+
};
97+
98+
template <class _ReceiverId, class... _Sequences>
99+
struct __operation {
100+
using _Receiver = stdexec::__t<_ReceiverId>;
101+
102+
using merge_each_fn_t = typename __combine::merge_each_fn_t<_ReceiverId>;
103+
104+
template<class _ReceiverIdDependent>
105+
using result_sender_t = typename __combine::result_sender_t<_ReceiverIdDependent, _Sequences...>;
106+
107+
struct __t : __operation_base<_Receiver> {
108+
using __id = __operation;
109+
110+
connect_result_t<result_sender_t<_ReceiverId>, stdexec::__t<__result_receiver<_ReceiverId>>> __op_result_;
111+
112+
__t(_Receiver __rcvr, _Sequences... __sequences)
113+
: __operation_base<
114+
_Receiver
115+
>{static_cast<_Receiver&&>(__rcvr)}
116+
, __op_result_{stdexec::connect(
117+
stdexec::when_all(
118+
exec::ignore_all_values(
119+
exec::transform_each(static_cast<_Sequences&&>(__sequences), merge_each_fn_t{{this}, {}, {}}))...),
120+
stdexec::__t<__result_receiver<_ReceiverId>>{this})} {
121+
}
122+
123+
void start() & noexcept {
124+
stdexec::start(__op_result_);
125+
}
126+
};
127+
};
128+
129+
template <class _Receiver>
130+
struct __subscribe_fn {
131+
_Receiver& __rcvr_;
132+
133+
template <class... _Sequences>
134+
auto operator()(__ignore, _Sequences... __sequences) noexcept(
135+
(__nothrow_decay_copyable<_Sequences> && ...)
136+
&& __nothrow_move_constructible<_Receiver>)
137+
-> __t<__operation<__id<_Receiver>, _Sequences...>> {
138+
return {
139+
static_cast<_Receiver&&>(__rcvr_),
140+
static_cast<_Sequences&&>(__sequences)...};
141+
}
142+
};
143+
144+
struct merge_t {
145+
template <class... _Sequences>
146+
auto operator()(_Sequences&&... __sequences) const
147+
noexcept((__nothrow_decay_copyable<_Sequences> && ...))
148+
-> __well_formed_sequence_sender auto {
149+
auto __domain = __common_domain_t<_Sequences...>();
150+
return transform_sender(
151+
__domain, make_sequence_expr<merge_t>(
152+
static_cast<_Sequences&&>(__sequences)...));
153+
}
154+
155+
template <class... _Args>
156+
using __all_nothrow_decay_copyable = __mbool<(__nothrow_decay_copyable<_Args> && ...)>;
157+
158+
template <class _Error>
159+
using __set_error_t = completion_signatures<set_error_t(__decay_t<_Error>)>;
160+
161+
struct _INVALID_ARGUMENTS_TO_MERGE_ { };
162+
163+
template <class _Self, class _Env>
164+
using __error_t = __mexception<
165+
_INVALID_ARGUMENTS_TO_MERGE_,
166+
__children_of<_Self, __q<_WITH_SEQUENCES_>>,
167+
_WITH_ENVIRONMENT_<_Env>
168+
>;
169+
170+
template <class... _Env>
171+
struct __completions_t {
172+
173+
template <class... _Sequences>
174+
using __f = __meval<
175+
__concat_completion_signatures,
176+
completion_signatures<set_stopped_t()>,
177+
__sequence_completion_signatures_of_t<_Sequences, _Env...>...
178+
>;
179+
};
180+
181+
template <class _Self, class... _Env>
182+
using __completions = __children_of<_Self, __completions_t<_Env...>>;
183+
184+
template <sender_expr_for<merge_t> _Self, class... _Env>
185+
static auto get_completion_signatures(_Self&&, _Env&&...) noexcept {
186+
return __minvoke<__mtry_catch<__q<__completions>, __q<__error_t>>, _Self, _Env...>();
187+
}
188+
189+
template <class... _Env>
190+
struct __items_t {
191+
192+
template <class... _Sequences>
193+
using __f = stdexec::__mapply<
194+
stdexec::__munique<stdexec::__q<exec::item_types>>,
195+
stdexec::__minvoke<
196+
stdexec::__mconcat<stdexec::__qq<exec::item_types>>,
197+
__item_types_of_t<_Sequences, _Env...>...>>;
198+
};
199+
200+
template <class _Self, class... _Env>
201+
using __items = __children_of<_Self, __items_t<_Env...>>;
202+
203+
template <sender_expr_for<merge_t> _Self, class... _Env>
204+
static auto get_item_types(_Self&&, _Env&&...) noexcept {
205+
return __minvoke<__mtry_catch<__q<__items>, __q<__error_t>>, _Self, _Env...>();
206+
}
207+
208+
template <sender_expr_for<merge_t> _Self, receiver _Receiver>
209+
static auto subscribe(_Self&& __self, _Receiver __rcvr)
210+
noexcept(__nothrow_callable<__sexpr_apply_t, _Self, __subscribe_fn<_Receiver>>)
211+
-> __sexpr_apply_result_t<_Self, __subscribe_fn<_Receiver>> {
212+
return __sexpr_apply(static_cast<_Self&&>(__self), __subscribe_fn<_Receiver>{__rcvr});
213+
}
214+
};
215+
} // namespace __merge
216+
217+
using __merge::merge_t;
218+
inline constexpr merge_t merge{};
219+
} // namespace exec

test/exec/CMakeLists.txt

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ set(exec_test_sources
5050
sequence/test_ignore_all_values.cpp
5151
sequence/test_iterate.cpp
5252
sequence/test_transform_each.cpp
53+
sequence/test_merge.cpp
5354
$<$<BOOL:${STDEXEC_ENABLE_TBB}>:../execpools/test_tbb_thread_pool.cpp>
5455
$<$<BOOL:${STDEXEC_ENABLE_TASKFLOW}>:../execpools/test_taskflow_thread_pool.cpp>
5556
$<$<BOOL:${STDEXEC_ENABLE_ASIO}>:../execpools/test_asio_thread_pool.cpp>

0 commit comments

Comments
 (0)