21 #ifndef __TBB__flow_graph_join_impl_H 22 #define __TBB__flow_graph_join_impl_H 24 #ifndef __TBB_flow_graph_H 25 #error Do not #include this internal file directly; use public TBB headers instead. 43 template<
typename KeyType>
47 virtual task * increment_key_count(current_key_type
const & ,
bool ) = 0;
54 template<
typename TupleType,
typename PortType >
56 tbb::flow::get<N-1>( my_input ).set_join_node_pointer(port);
59 template<
typename TupleType >
61 tbb::flow::get<N-1>( my_input ).consume();
65 template<
typename TupleType >
67 tbb::flow::get<N-1>( my_input ).
release();
70 template <
typename TupleType>
73 release_my_reservation(my_input);
76 template<
typename InputTuple,
typename OutputTuple >
77 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
78 if ( !tbb::flow::get<N-1>( my_input ).reserve( tbb::flow::get<N-1>( out ) ) )
return false;
80 release_my_reservation( my_input );
86 template<
typename InputTuple,
typename OutputTuple>
87 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
88 bool res = tbb::flow::get<N-1>(my_input).get_item(tbb::flow::get<N-1>(out) );
92 template<
typename InputTuple,
typename OutputTuple>
93 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
94 return get_my_item(my_input, out);
97 template<
typename InputTuple>
100 tbb::flow::get<N-1>(my_input).reset_port();
103 template<
typename InputTuple>
105 reset_my_port(my_input);
108 template<
typename InputTuple,
typename KeyFuncTuple>
110 tbb::flow::get<N-1>(my_input).set_my_key_func(tbb::flow::get<N-1>(my_key_funcs));
111 tbb::flow::get<N-1>(my_key_funcs) = NULL;
115 template<
typename KeyFuncTuple>
117 if(tbb::flow::get<N-1>(other_inputs).get_my_key_func()) {
118 tbb::flow::get<N-1>(my_inputs).set_my_key_func(tbb::flow::get<N-1>(other_inputs).get_my_key_func()->clone());
123 template<
typename InputTuple>
126 tbb::flow::get<N-1>(my_input).reset_receiver(f);
129 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 130 template<
typename InputTuple>
131 static inline void extract_inputs(InputTuple &my_input) {
133 tbb::flow::get<N-1>(my_input).extract_receiver();
141 template<
typename TupleType,
typename PortType >
143 tbb::flow::get<0>( my_input ).set_join_node_pointer(port);
146 template<
typename TupleType >
148 tbb::flow::get<0>( my_input ).consume();
151 template<
typename TupleType >
153 tbb::flow::get<0>( my_input ).
release();
156 template<
typename TupleType>
158 release_my_reservation(my_input);
161 template<
typename InputTuple,
typename OutputTuple >
162 static inline bool reserve( InputTuple &my_input, OutputTuple &out) {
163 return tbb::flow::get<0>( my_input ).reserve( tbb::flow::get<0>( out ) );
166 template<
typename InputTuple,
typename OutputTuple>
167 static inline bool get_my_item( InputTuple &my_input, OutputTuple &out) {
168 return tbb::flow::get<0>(my_input).get_item(tbb::flow::get<0>(out));
171 template<
typename InputTuple,
typename OutputTuple>
172 static inline bool get_items(InputTuple &my_input, OutputTuple &out) {
173 return get_my_item(my_input, out);
176 template<
typename InputTuple>
178 tbb::flow::get<0>(my_input).reset_port();
181 template<
typename InputTuple>
183 reset_my_port(my_input);
186 template<
typename InputTuple,
typename KeyFuncTuple>
188 tbb::flow::get<0>(my_input).set_my_key_func(tbb::flow::get<0>(my_key_funcs));
189 tbb::flow::get<0>(my_key_funcs) = NULL;
192 template<
typename KeyFuncTuple>
194 if(tbb::flow::get<0>(other_inputs).get_my_key_func()) {
195 tbb::flow::get<0>(my_inputs).set_my_key_func(tbb::flow::get<0>(other_inputs).get_my_key_func()->clone());
198 template<
typename InputTuple>
200 tbb::flow::get<0>(my_input).reset_receiver(f);
203 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 204 template<
typename InputTuple>
205 static inline void extract_inputs(InputTuple &my_input) {
206 tbb::flow::get<0>(my_input).extract_receiver();
212 template<
typename T >
217 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 218 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
219 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
224 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 225 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
237 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 239 predecessor_list_type *plist;
243 type(char(t)), my_arg(const_cast<T*>(&e)) {}
245 my_pred(const_cast<predecessor_type *>(&s)) {}
249 typedef internal::aggregating_functor<class_type, reserving_port_operation>
handler_type;
255 bool no_predecessors;
258 op_list = op_list->next;
259 switch(current->
type) {
261 no_predecessors = my_predecessors.empty();
262 my_predecessors.add(*(current->
my_pred));
263 if ( no_predecessors ) {
264 (
void) my_join->decrement_port_count(
true);
269 my_predecessors.remove(*(current->
my_pred));
270 if(my_predecessors.empty()) my_join->increment_port_count();
277 else if ( my_predecessors.try_reserve( *(current->
my_arg) ) ) {
281 if ( my_predecessors.empty() ) {
282 my_join->increment_port_count();
289 my_predecessors.try_release( );
294 my_predecessors.try_consume( );
297 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 299 my_predecessors.internal_add_built_predecessor(*(current->
my_pred));
303 my_predecessors.internal_delete_built_predecessor(*(current->
my_pred));
307 current->cnt_val = my_predecessors.predecessor_count();
311 my_predecessors.copy_predecessors(*(current->plist));
320 template<
typename R,
typename B >
friend class run_and_put_task;
328 return my_join->graph_ref;
336 my_predecessors.set_owner(
this );
337 my_aggregator.initialize_handler(handler_type(
this));
344 my_predecessors.set_owner(
this );
345 my_aggregator.initialize_handler(handler_type(
this));
355 my_aggregator.execute(&op_data);
362 my_aggregator.execute(&op_data);
369 my_aggregator.execute(&op_data);
376 my_aggregator.execute(&op_data);
382 my_aggregator.execute(&op_data);
385 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 386 built_predecessors_type &built_predecessors()
__TBB_override {
return my_predecessors.built_predecessors(); }
387 void internal_add_built_predecessor(predecessor_type &src)
__TBB_override {
389 my_aggregator.execute(&op_data);
392 void internal_delete_built_predecessor(predecessor_type &src)
__TBB_override {
394 my_aggregator.execute(&op_data);
399 my_aggregator.execute(&op_data);
400 return op_data.cnt_val;
406 my_aggregator.execute(&op_data);
409 void extract_receiver() {
410 my_predecessors.built_predecessors().receiver_extract(*
this);
418 my_predecessors.reset();
420 __TBB_ASSERT(!(f&rf_clear_edges) || my_predecessors.empty(),
"port edges not removed");
436 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 437 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
438 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
443 enum op_type { get__item, res_port, try__put_task
444 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 445 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
455 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 456 predecessor_type *pred;
458 predecessor_list_type *plist;
463 type(char(t)), my_val(e)
468 type(char(t)), my_arg(const_cast<T*>(p))
477 typedef internal::aggregating_functor<class_type, queueing_port_operation>
handler_type;
486 op_list = op_list->next;
487 switch(current->
type) {
488 case try__put_task: {
490 was_empty = this->buffer_empty();
491 this->push_back(current->
my_val);
492 if (was_empty) rtask = my_join->decrement_port_count(
false);
500 if(!this->buffer_empty()) {
501 *(current->
my_arg) = this->front();
509 __TBB_ASSERT(this->my_item_valid(this->my_head),
"No item to reset");
510 this->destroy_front();
511 if(this->my_item_valid(this->my_head)) {
512 (
void)my_join->decrement_port_count(
true);
516 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 518 my_built_predecessors.add_edge(*(current->pred));
522 my_built_predecessors.delete_edge(*(current->pred));
526 current->cnt_val = my_built_predecessors.edge_count();
530 my_built_predecessors.copy_edges(*(current->plist));
540 template<
typename R,
typename B >
friend class run_and_put_task;
545 my_aggregator.execute(&op_data);
552 return my_join->graph_ref;
560 my_aggregator.initialize_handler(handler_type(
this));
566 my_aggregator.initialize_handler(handler_type(
this));
576 my_aggregator.execute(&op_data);
584 my_aggregator.execute(&op_data);
588 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 589 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
591 void internal_add_built_predecessor(predecessor_type &
p)
__TBB_override {
594 my_aggregator.execute(&op_data);
597 void internal_delete_built_predecessor(predecessor_type &p)
__TBB_override {
600 my_aggregator.execute(&op_data);
605 my_aggregator.execute(&op_data);
606 return op_data.cnt_val;
612 my_aggregator.execute(&op_data);
615 void extract_receiver() {
617 my_built_predecessors.receiver_extract(*
this);
624 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 626 my_built_predecessors.clear();
632 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 633 edge_container<predecessor_type> my_built_predecessors;
647 template<
typename K >
655 template<
class TraitsType >
657 public receiver<typename TraitsType::T>,
658 public hash_buffer< typename TraitsType::K, typename TraitsType::T, typename TraitsType::TtoK,
659 typename TraitsType::KHash > {
670 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 671 typedef typename receiver<input_type>::built_predecessors_type built_predecessors_type;
672 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
678 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 679 , add_blt_pred, del_blt_pred, blt_pred_cnt, blt_pred_cpy
689 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 690 predecessor_type *pred;
692 predecessor_list_type *plist;
696 type(char(t)), my_val(e) {}
699 type(char(t)), my_arg(const_cast<input_type*>(p)) {}
704 typedef internal::aggregating_functor<class_type, key_matching_port_operation>
handler_type;
712 op_list = op_list->next;
713 switch(current->
type) {
715 bool was_inserted = this->insert_with_key(current->
my_val);
722 if(!this->find_with_key(my_join->current_key, *(current->
my_arg))) {
723 __TBB_ASSERT(
false,
"Failed to find item corresponding to current_key.");
729 this->delete_with_key(my_join->current_key);
732 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 734 my_built_predecessors.add_edge(*(current->pred));
738 my_built_predecessors.delete_edge(*(current->pred));
742 current->cnt_val = my_built_predecessors.edge_count();
746 my_built_predecessors.copy_edges(*(current->plist));
755 template<
typename R,
typename B >
friend class run_and_put_task;
761 my_aggregator.execute(&op_data);
763 rtask = my_join->increment_key_count((*(this->get_key_func()))(v),
false);
771 return my_join->graph_ref;
778 my_aggregator.initialize_handler(handler_type(
this));
784 my_aggregator.initialize_handler(handler_type(
this));
800 my_aggregator.execute(&op_data);
804 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 805 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
807 void internal_add_built_predecessor(predecessor_type &
p)
__TBB_override {
810 my_aggregator.execute(&op_data);
813 void internal_delete_built_predecessor(predecessor_type &p)
__TBB_override {
816 my_aggregator.execute(&op_data);
821 my_aggregator.execute(&op_data);
822 return op_data.cnt_val;
828 my_aggregator.execute(&op_data);
836 my_aggregator.execute(&op_data);
840 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 841 void extract_receiver() {
842 buffer_type::reset();
843 my_built_predecessors.receiver_extract(*
this);
848 buffer_type::reset();
849 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 851 my_built_predecessors.clear();
859 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 860 edge_container<predecessor_type> my_built_predecessors;
864 using namespace graph_policy_namespace;
866 template<
typename JP,
typename InputTuple,
typename OutputTuple>
870 template<
typename JP,
typename InputTuple,
typename OutputTuple>
873 template<
typename InputTuple,
typename OutputTuple>
882 ports_with_no_inputs = N;
887 ports_with_no_inputs = N;
891 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
894 ++ports_with_no_inputs;
899 if(ports_with_no_inputs.fetch_and_decrement() == 1) {
901 task *rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
903 if(!handle_task)
return rtask;
916 ports_with_no_inputs = N;
920 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 923 ports_with_no_inputs = N;
931 return !ports_with_no_inputs;
935 if(ports_with_no_inputs)
return false;
951 template<
typename InputTuple,
typename OutputTuple>
960 ports_with_no_items = N;
965 ports_with_no_items = N;
970 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
973 ports_with_no_items = N;
979 if(ports_with_no_items.fetch_and_decrement() == 1) {
981 task *rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
983 if(!handle_task)
return rtask;
1001 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1010 return !ports_with_no_items;
1014 if(ports_with_no_items)
return false;
1032 template<
typename InputTuple,
typename OutputTuple,
typename K,
typename KHash>
1036 typename tbb::internal::strip<K>::type&,
1037 count_element<typename tbb::internal::strip<K>::type>,
1038 internal::type_to_key_function_body<
1039 count_element<typename tbb::internal::strip<K>::type>,
1040 typename tbb::internal::strip<K>::type& >,
1069 enum op_type { res_count, inc_count, may_succeed, try_make };
1073 class key_matching_FE_operation :
public aggregated_operation<key_matching_FE_operation> {
1082 my_output(NULL), bypass_t(NULL), enqueue_task(q_task) {}
1084 enqueue_task(true) {}
1089 typedef internal::aggregating_functor<class_type, key_matching_FE_operation>
handler_type;
1090 friend class internal::aggregating_functor<class_type, key_matching_FE_operation>;
1100 this->current_key = t;
1101 this->delete_with_key(this->current_key);
1103 this->push_back(l_out);
1105 rtask =
new ( task::allocate_additional_child_of( *(this->
graph_ref.root_task()) ) )
1117 __TBB_ASSERT(
false,
"should have had something to push");
1123 key_matching_FE_operation *current;
1126 op_list = op_list->next;
1127 switch(current->type) {
1130 this->destroy_front();
1135 count_element_type *
p = 0;
1136 unref_key_type &t = current->my_val;
1137 bool do_enqueue = current->enqueue_task;
1138 if(!(this->find_ref_with_key(t,p))) {
1139 count_element_type ev;
1142 this->insert_with_key(ev);
1143 if(!(this->find_ref_with_key(t,p))) {
1144 __TBB_ASSERT(
false,
"should find key after inserting it");
1148 task *rtask = fill_output_buffer(t,
true, do_enqueue);
1149 __TBB_ASSERT(!rtask || !do_enqueue,
"task should not be returned");
1150 current->bypass_t = rtask;
1159 if(this->buffer_empty()) {
1163 *(current->my_output) = this->front();
1173 template<
typename FunctionTuple>
1174 join_node_FE(graph &g, FunctionTuple &TtoK_funcs) : forwarding_base_type(g), my_node(NULL) {
1177 my_aggregator.initialize_handler(handler_type(
this));
1178 TtoK_function_body_type *cfb =
new TtoK_function_body_leaf_type(key_to_count_func());
1179 this->set_key_func(cfb);
1183 output_buffer_type() {
1187 my_aggregator.initialize_handler(handler_type(
this));
1188 TtoK_function_body_type *cfb =
new TtoK_function_body_leaf_type(key_to_count_func());
1189 this->set_key_func(cfb);
1193 void set_my_node(base_node_type *new_my_node) { my_node = new_my_node; }
1196 key_matching_FE_operation op_data(res_count);
1197 my_aggregator.execute(&op_data);
1204 key_matching_FE_operation op_data(t, handle_task, inc_count);
1205 my_aggregator.execute(&op_data);
1206 return op_data.bypass_t;
1221 key_to_count_buffer_type::reset();
1222 output_buffer_type::reset();
1225 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1229 key_to_count_buffer_type::reset();
1230 output_buffer_type::reset();
1237 key_matching_FE_operation op_data(may_succeed);
1238 my_aggregator.execute(&op_data);
1245 key_matching_FE_operation op_data(&out,try_make);
1246 my_aggregator.execute(&op_data);
1263 template<
typename JP,
typename InputTuple,
typename OutputTuple>
1265 public sender<OutputTuple> {
1267 using graph_node::my_graph;
1273 using input_ports_type::tuple_build_may_succeed;
1274 using input_ports_type::try_to_make_tuple;
1275 using input_ports_type::tuple_accepted;
1276 using input_ports_type::tuple_rejected;
1277 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1278 typedef typename sender<output_type>::built_successors_type built_successors_type;
1279 typedef typename sender<output_type>::successor_list_type successor_list_type;
1285 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1286 , add_blt_succ, del_blt_succ, blt_succ_cnt, blt_succ_cpy
1298 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1300 successor_list_type *slist;
1305 my_arg(const_cast<output_type*>(&e)), bypass_t(NULL) {}
1307 my_succ(const_cast<successor_type *>(&s)), bypass_t(NULL) {}
1311 typedef internal::aggregating_functor<class_type, join_node_base_operation>
handler_type;
1320 op_list = op_list->next;
1321 switch(current->
type) {
1323 my_successors.register_successor(*(current->
my_succ));
1325 task *rtask =
new ( task::allocate_additional_child_of(*(my_graph.root_task())) )
1329 forwarder_busy =
true;
1335 my_successors.remove_successor(*(current->
my_succ));
1339 if(tuple_build_may_succeed()) {
1340 if(try_to_make_tuple(*(current->
my_arg))) {
1348 case do_fwrd_bypass: {
1349 bool build_succeeded;
1350 task *last_task = NULL;
1352 if(tuple_build_may_succeed()) {
1354 build_succeeded = try_to_make_tuple(out);
1355 if(build_succeeded) {
1356 task *new_task = my_successors.try_put_task(out);
1363 build_succeeded =
false;
1366 }
while(build_succeeded);
1370 forwarder_busy =
false;
1373 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1375 my_successors.internal_add_built_successor(*(current->
my_succ));
1379 my_successors.internal_delete_built_successor(*(current->
my_succ));
1383 current->cnt_val = my_successors.successor_count();
1387 my_successors.copy_successors(*(current->slist));
1396 join_node_base(graph &g) : graph_node(g), input_ports_type(g), forwarder_busy(false) {
1397 my_successors.set_owner(
this);
1398 input_ports_type::set_my_node(
this);
1399 my_aggregator.initialize_handler(handler_type(
this));
1403 graph_node(other.graph_node::my_graph), input_ports_type(other),
1404 sender<OutputTuple>(), forwarder_busy(false), my_successors() {
1405 my_successors.set_owner(
this);
1406 input_ports_type::set_my_node(
this);
1407 my_aggregator.initialize_handler(handler_type(
this));
1410 template<
typename FunctionTuple>
1411 join_node_base(graph &g, FunctionTuple f) : graph_node(g), input_ports_type(g, f), forwarder_busy(false) {
1412 my_successors.set_owner(
this);
1413 input_ports_type::set_my_node(
this);
1414 my_aggregator.initialize_handler(handler_type(
this));
1419 my_aggregator.execute(&op_data);
1425 my_aggregator.execute(&op_data);
1431 my_aggregator.execute(&op_data);
1435 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1436 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
1438 void internal_add_built_successor( successor_type &r)
__TBB_override {
1440 my_aggregator.execute(&op_data);
1443 void internal_delete_built_successor( successor_type &r)
__TBB_override {
1445 my_aggregator.execute(&op_data);
1450 my_aggregator.execute(&op_data);
1451 return op_data.cnt_val;
1457 my_aggregator.execute(&op_data);
1461 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1463 input_ports_type::extract();
1464 my_successors.built_successors().sender_extract(*
this);
1471 input_ports_type::reset(f);
1481 my_aggregator.execute(&op_data);
1488 template<
int N,
template<
class>
class PT,
typename OutputTuple,
typename JP>
1493 template<
int N,
typename OutputTuple,
typename K,
typename KHash>
1508 template<
int N,
template<
class>
class PT,
typename OutputTuple,
typename JP>
1520 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1521 template <
typename K,
typename T>
1522 struct key_from_message_body {
1523 K operator()(
const T& t)
const {
1525 return key_from_message<K>(t);
1529 template <
typename K,
typename T>
1530 struct key_from_message_body<K&,T> {
1531 const K& operator()(
const T& t)
const {
1533 return key_from_message<const K&>(t);
1540 template<
typename OutputTuple,
typename K,
typename KHash>
1542 join_base<2,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1554 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1556 func_initializer_type(
1562 template<
typename Body0,
typename Body1>
1564 func_initializer_type(
1573 template<
typename OutputTuple,
typename K,
typename KHash>
1575 join_base<3,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1589 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1591 func_initializer_type(
1598 template<
typename Body0,
typename Body1,
typename Body2>
1600 func_initializer_type(
1610 template<
typename OutputTuple,
typename K,
typename KHash>
1612 join_base<4,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1628 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1630 func_initializer_type(
1638 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3>
1640 func_initializer_type(
1651 template<
typename OutputTuple,
typename K,
typename KHash>
1653 join_base<5,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1671 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1673 func_initializer_type(
1682 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4>
1683 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4) : base_type(g,
1684 func_initializer_type(
1696 #if __TBB_VARIADIC_MAX >= 6 1697 template<
typename OutputTuple,
typename K,
typename KHash>
1698 class unfolded_join_node<6,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1699 join_base<6,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1719 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1721 func_initializer_type(
1731 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
typename Body5>
1732 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4, Body5 body5)
1733 : base_type(g, func_initializer_type(
1747 #if __TBB_VARIADIC_MAX >= 7 1748 template<
typename OutputTuple,
typename K,
typename KHash>
1749 class unfolded_join_node<7,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1750 join_base<7,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1770 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p >
func_initializer_type;
1772 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1774 func_initializer_type(
1785 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1786 typename Body5,
typename Body6>
1787 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1788 Body5 body5, Body6 body6) : base_type(g, func_initializer_type(
1799 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1803 #if __TBB_VARIADIC_MAX >= 8 1804 template<
typename OutputTuple,
typename K,
typename KHash>
1805 class unfolded_join_node<8,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1806 join_base<8,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1828 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p >
func_initializer_type;
1830 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1832 func_initializer_type(
1844 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1845 typename Body5,
typename Body6,
typename Body7>
1846 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1847 Body5 body5, Body6 body6, Body7 body7) : base_type(g, func_initializer_type(
1859 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1863 #if __TBB_VARIADIC_MAX >= 9 1864 template<
typename OutputTuple,
typename K,
typename KHash>
1865 class unfolded_join_node<9,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1866 join_base<9,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1890 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p >
func_initializer_type;
1892 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1894 func_initializer_type(
1907 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1908 typename Body5,
typename Body6,
typename Body7,
typename Body8>
1909 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1910 Body5 body5, Body6 body6, Body7 body7, Body8 body8) : base_type(g, func_initializer_type(
1923 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1927 #if __TBB_VARIADIC_MAX >= 10 1928 template<
typename OutputTuple,
typename K,
typename KHash>
1929 class unfolded_join_node<10,key_matching_port,OutputTuple,key_matching<K,KHash> > :
public 1930 join_base<10,key_matching_port,OutputTuple,key_matching<K,KHash> >
::type {
1956 typedef typename tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p, f5_p, f6_p, f7_p, f8_p, f9_p >
func_initializer_type;
1958 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 1960 func_initializer_type(
1974 template<
typename Body0,
typename Body1,
typename Body2,
typename Body3,
typename Body4,
1975 typename Body5,
typename Body6,
typename Body7,
typename Body8,
typename Body9>
1976 unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4,
1977 Body5 body5, Body6 body6, Body7 body7, Body8 body8, Body9 body9) : base_type(g, func_initializer_type(
1991 unfolded_join_node(
const unfolded_join_node &other) : base_type(other) {}
1996 template<
size_t N,
typename JNT>
1998 return tbb::flow::get<N>(jn.input_ports());
2002 #endif // __TBB__flow_graph_join_impl_H type_to_key_func_type * get_my_key_func()
bool try_to_make_tuple(output_type &out)
TraitsType::KHash hash_compare_type
static void release_my_reservation(TupleType &my_input)
join_node_base(const join_node_base &other)
queueing_port()
Constructor.
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
static bool get_items(InputTuple &my_input, OutputTuple &out)
internal::aggregating_functor< class_type, key_matching_port_operation > handler_type
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
matching_forwarding_base< key_type > * my_join
static tbb::task *const SUCCESSFULLY_ENQUEUED
receiver< input_type >::predecessor_type predecessor_type
join_node_base_operation(const successor_type &s, op_type t)
static bool reserve(InputTuple &my_input, OutputTuple &out)
key_matching_port_operation(const input_type &e, op_type t)
key_matching_port_operation(const input_type *p, op_type t)
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3, Body4 body4)
tbb::internal::strip< key_type >::type unref_key_type
tbb::flow::tuple_element< 0, OutputTuple >::type T0
field of type K being used for matching.
void consume()
Complete use of the port.
bool tuple_build_may_succeed()
bool tuple_build_may_succeed()
A cache of successors that are put in a round-robin fashion.
task * decrement_port_count(bool handle_task) __TBB_override
aggregator< handler_type, key_matching_FE_operation > my_aggregator
join_node_base< queueing, InputTuple, OutputTuple > base_node_type
aggregator< handler_type, key_matching_port_operation > my_aggregator
internal::aggregating_functor< class_type, key_matching_FE_operation > handler_type
key_to_count_functor< unref_key_type > key_to_count_func
unfolded_join_node(const unfolded_join_node &other)
#define __TBB_STATIC_ASSERT(condition, msg)
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
tbb::flow::tuple_element< 1, OutputTuple >::type T1
queueing_port_operation(op_type t)
key_matching_FE_operation(output_type *p, op_type t)
void reset(reset_flags f)
Base class for types that should not be assigned.
item_buffer< output_type > output_buffer_type
void suppress_unused_warning(const T1 &)
Utility template function to prevent "unused" warnings by various compilers.
void reset_receiver(reset_flags f) __TBB_override
internal::type_to_key_function_body< count_element_type, unref_key_type & > TtoK_function_body_type
internal::type_to_key_function_body< T0, K > * f0_p
tbb::flow::tuple_element< 4, OutputTuple >::type T4
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
static void set_join_node_pointer(TupleType &my_input, PortType *port)
internal::join_node_base< key_traits_type, typename wrap_key_tuple_elements< N, key_matching_port, key_traits_type, OutputTuple >::type, OutputTuple > type
task * increment_key_count(unref_key_type const &t, bool handle_task) __TBB_override
internal::aggregating_functor< class_type, reserving_port_operation > handler_type
static void reset_ports(InputTuple &my_input)
tbb::internal::strip< KeyType >::type current_key_type
virtual ~forwarding_base()
static void reset_my_port(InputTuple &my_input)
tbb::internal::strip< key_type >::type noref_key_type
join_node_base< reserving, InputTuple, OutputTuple > base_node_type
reserving_port_operation(const predecessor_type &s, op_type t)
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2, Body3 body3)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
K key_from_message(const T &t)
task * decrement_port_count(bool) __TBB_override
graph & graph_reference() __TBB_override
receiver< input_type >::predecessor_type predecessor_type
void set_join_node_pointer(forwarding_base *join)
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
internal::aggregating_functor< class_type, queueing_port_operation > handler_type
key_matching_FE_operation(const unref_key_type &e, bool q_task, op_type t)
static void reset_inputs(InputTuple &my_input, reset_flags f)
reserving_port< T > class_type
tbb::flow::tuple_element< 1, OutputTuple >::type T1
graph & graph_reference() __TBB_override
unfolded_join_node(const unfolded_join_node &other)
join_node_base< JP, InputTuple, OutputTuple > class_type
bool try_to_make_tuple(output_type &out)
unfolded_join_node(graph &g, Body0 body0, Body1 body1)
tbb::flow::tuple< f0_p, f1_p > func_initializer_type
void set_join_node_pointer(forwarding_base *join)
record parent for tallying available items
internal::aggregating_functor< class_type, join_node_base_operation > handler_type
void const char const char int ITT_FORMAT __itt_group_sync p
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
void increment_port_count() __TBB_override
internal::type_to_key_function_body< T4, K > * f4_p
internal::type_to_key_function_body< T3, K > * f3_p
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
void handle_operations(join_node_base_operation *op_list)
bool register_predecessor(predecessor_type &src) __TBB_override
Add a predecessor.
matching_forwarding_base(graph &g)
unfolded_join_node(graph &g, Body0 body0, Body1 body1, Body2 body2)
tbb::flow::tuple_element< 0, OutputTuple >::type T0
input_type & input_ports()
internal::type_to_key_function_body< T1, K > * f1_p
reserving_port()
Constructor.
void set_my_node(base_node_type *new_my_node)
void set_join_node_pointer(forwarding_base *join)
join_node_FE< JP, InputTuple, OutputTuple > input_ports_type
current_key_type current_key
queueing_port(const queueing_port &)
copy constructor
bool get_item(input_type &v)
static void reset_inputs(InputTuple &my_input, reset_flags f)
void increment_port_count() __TBB_override
hash_buffer< key_type, input_type, type_to_key_func_type, hash_compare_type > buffer_type
static void copy_key_functors(KeyFuncTuple &my_inputs, KeyFuncTuple &other_inputs)
void set_my_node(base_node_type *new_my_node)
void handle_operations(key_matching_FE_operation *op_list)
join_node_FE(const join_node_FE &other)
tbb::flow::tuple_element< 2, OutputTuple >::type T2
tbb::flow::tuple_element< 1, OutputTuple >::type T1
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void reset_receiver(reset_flags f) __TBB_override
task * decrement_port_count(bool handle_task) __TBB_override
bool reserve(T &v)
Reserve an item from the port.
internal::type_to_key_function_body< T0, K > * f0_p
TraitsType::TtoK type_to_key_func_type
internal::type_to_key_function_body< T2, K > * f2_p
broadcast_cache< output_type, null_rw_mutex > my_successors
reserving_port(const reserving_port &)
join_node_base< JP, input_ports_type, output_type > base_type
tbb::flow::tuple_element< 0, OutputTuple >::type T0
internal::type_to_key_function_body< T1, K > * f1_p
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
void handle_operations(queueing_port_operation *op_list)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
static void release_reservations(TupleType &my_input)
key_matching_port_operation(op_type t)
queueing_port_operation(const T &e, op_type t)
forwarding_base * my_join
void reset(reset_flags f)
internal::type_to_key_function_body< T0, K > * f0_p
unfolded_join_node(const unfolded_join_node &other)
internal::type_to_key_function_body_leaf< count_element_type, unref_key_type &, key_to_count_func > TtoK_function_body_leaf_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
internal::join_node_base< JP, typename wrap_tuple_elements< N, PT, OutputTuple >::type, OutputTuple > type
key_matching_port(const key_matching_port &)
void release()
Release the port.
tbb::flow::tuple_element< 2, OutputTuple >::type T2
void increment_port_count() __TBB_override
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
A cache of successors that are broadcast to.
key_matching_FE_operation(op_type t)
join_node_base_operation(op_type t)
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type
task * fill_output_buffer(unref_key_type &t, bool should_enqueue, bool handle_task)
wrap_key_tuple_elements< 2, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
graph & graph_reference() __TBB_override
unfolded_join_node(const unfolded_join_node &other)
virtual task * decrement_port_count(bool handle_task)=0
queueing_port< T > class_type
join_node_base_operation(const output_type &e, op_type t)
void set_my_key_func(type_to_key_func_type *f)
void set_my_node(base_node_type *new_my_node)
aggregator< handler_type, queueing_port_operation > my_aggregator
static void set_join_node_pointer(TupleType &my_input, PortType *port)
internal::type_to_key_function_body< T3, K > * f3_p
bool try_get(output_type &v) __TBB_override
Request an item from the sender.
void handle_operations(reserving_port_operation *op_list)
void const char const char int ITT_FORMAT __itt_group_sync s
internal::type_to_key_function_body< T1, K > * f1_p
internal::type_to_key_function_body< T0, K > * f0_p
join_node_FE(const join_node_FE &other)
atomic< size_t > ports_with_no_items
const K & operator()(const table_item_type &v)
join_node_FE< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > class_type
tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p, f4_p > func_initializer_type
tbb::flow::tuple_element< 3, OutputTuple >::type T3
static void release_my_reservation(TupleType &my_input)
task * try_put_task(const T &v) __TBB_override
atomic< size_t > ports_with_no_inputs
void handle_operations(key_matching_port_operation *op_list)
predecessor_type * my_pred
join_node_base(graph &g, FunctionTuple f)
static bool get_my_item(InputTuple &my_input, OutputTuple &out)
count_element< unref_key_type > count_element_type
input_type & input_ports()
tbb::flow::tuple_element< 3, OutputTuple >::type T3
unfolded_join_node(graph &g)
wrap_key_tuple_elements< 4, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
aggregator< handler_type, join_node_base_operation > my_aggregator
key_matching_port< traits > class_type
static bool reserve(InputTuple &my_input, OutputTuple &out)
static void release_reservations(TupleType &my_input)
tbb::flow::tuple< f0_p, f1_p, f2_p > func_initializer_type
tbb::flow::tuple_element< 0, OutputTuple >::type T0
static void consume_reservations(TupleType &my_input)
aggregator< handler_type, reserving_port_operation > my_aggregator
matching_forwarding_base< key_type > forwarding_base_type
queueing_port_operation(const T *p, op_type t)
join_node_FE(graph &g, FunctionTuple &TtoK_funcs)
static void reset_my_port(InputTuple &my_input)
void reset_receiver(reset_flags f) __TBB_override
reserving_port_operation(op_type t)
hash_buffer< unref_key_type &, count_element_type, TtoK_function_body_type, key_hash_compare > key_to_count_buffer_type
internal::type_to_key_function_body< T2, K > * f2_p
join_node_FE(const join_node_FE &other)
wrap_key_tuple_elements< 3, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
reservable_predecessor_cache< T, null_mutex > my_predecessors
bool is_graph_active(graph &g)
bool tuple_build_may_succeed()
forwarding_base(graph &g)
reserving_port_operation(const T &e, op_type t)
tbb::flow::tuple_element< 2, OutputTuple >::type T2
receiver< input_type >::predecessor_type predecessor_type
sender< output_type >::successor_type successor_type
internal::type_to_key_function_body< T2, K > * f2_p
static void consume_reservations(TupleType &my_input)
count_element< K > table_item_type
bool try_to_make_tuple(output_type &out)
static bool get_items(InputTuple &my_input, OutputTuple &out)
virtual void increment_port_count()=0
task * try_put_task(const T &) __TBB_override
join_node_FE : implements input port policy
void __TBB_store_with_release(volatile T &location, V value)
key_matching< K, KHash > key_traits_type
unfolded_join_node(const unfolded_join_node &other)
forwarding_base * my_join
A task that calls a node's forward_task function.
join_node_base< key_matching< key_type, key_hash_compare >, InputTuple, OutputTuple > base_node_type
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
internal::type_to_key_function_body< T1, K > * f1_p
input_type & input_ports()
tbb::flow::tuple< f0_p, f1_p, f2_p, f3_p > func_initializer_type
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
static void reset_ports(InputTuple &my_input)
wrap_key_tuple_elements< 5, key_matching_port, key_matching< K, KHash >, OutputTuple >::type input_ports_type
tbb::flow::tuple_element< 1, OutputTuple >::type T1
static void set_key_functors(InputTuple &my_input, KeyFuncTuple &my_key_funcs)
void reset_node(reset_flags f) __TBB_override
void reset(reset_flags f)
bool remove_predecessor(predecessor_type &src) __TBB_override
Remove a predecessor.
join_node_base< key_matching< K, KHash >, input_ports_type, output_type > base_type