21 #ifndef __TBB_flow_graph_H 22 #define __TBB_flow_graph_H 38 #if __TBB_PREVIEW_ASYNC_MSG 43 #if __TBB_PREVIEW_STREAMING_NODE 46 #include <unordered_map> 47 #include <type_traits> 48 #endif // __TBB_PREVIEW_STREAMING_NODE 50 #if TBB_DEPRECATED_FLOW_ENQUEUE 51 #define FLOW_SPAWN(a) tbb::task::enqueue((a)) 53 #define FLOW_SPAWN(a) tbb::task::spawn((a)) 57 #if __TBB_CPP11_TUPLE_PRESENT 62 using std::tuple_size;
63 using std::tuple_element;
68 #include "compat/tuple" 90 namespace interface10 {
113 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 117 class edge_container {
120 typedef std::list<C *, tbb::tbb_allocator<C *> > edge_list_type;
122 void add_edge(C &
s) {
123 built_edges.push_back(&s);
126 void delete_edge(C &s) {
127 for (
typename edge_list_type::iterator i = built_edges.begin(); i != built_edges.end(); ++i) {
135 void copy_edges(edge_list_type &v) {
139 size_t edge_count() {
140 return (
size_t)(built_edges.size());
149 template<
typename S >
void sender_extract(
S &s);
150 template<
typename R >
void receiver_extract(R &r);
153 edge_list_type built_edges;
168 namespace interface10 {
173 if (right == NULL)
return left;
175 if (left == NULL)
return right;
186 #if __TBB_PREVIEW_ASYNC_MSG 194 template<
typename T,
typename =
void >
197 typedef T filtered_type;
199 static const bool is_async_type =
false;
202 return static_cast<const void*
>(&t);
206 return static_cast<void*
>(&t);
209 static const T& from_void_ptr(
const void*
p) {
210 return *
static_cast<const T*
>(
p);
213 static T& from_void_ptr(
void*
p) {
214 return *
static_cast<T*
>(
p);
217 static task* try_put_task_wrapper_impl(
receiver<T>*
const this_recv,
const void *
p,
bool is_async) {
235 template<
typename T >
237 typedef T async_type;
238 typedef typename T::async_msg_data_type filtered_type;
240 static const bool is_async_type =
true;
247 static void* to_void_ptr(T& t) {
252 static const T& from_void_ptr(
const void* p) {
256 static T& from_void_ptr(
void* p) {
261 static task* try_put_task_wrapper_impl(
receiver<T>*
const this_recv,
const void *p,
bool is_async) {
292 virtual bool register_successor( successor_type &r ) = 0;
295 virtual bool remove_successor( successor_type &r ) = 0;
303 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 304 typedef internal::edge_container<successor_type> built_successors_type;
306 typedef built_successors_type::edge_list_type successor_list_type;
307 virtual built_successors_type &built_successors() = 0;
308 virtual void internal_add_built_successor( successor_type & ) = 0;
309 virtual void internal_delete_built_successor( successor_type & ) = 0;
310 virtual void copy_successors( successor_list_type &) = 0;
311 virtual size_t successor_count() = 0;
315 template<
typename X >
321 template<
typename X >
326 virtual bool try_get_wrapper(
void* p,
bool is_async ) = 0;
327 virtual bool try_reserve_wrapper(
void* p,
bool is_async ) = 0;
330 class untyped_receiver {
338 #if __TBB_PREVIEW_OPENCL_NODE 339 template<
typename,
typename >
friend class proxy_dependency_receiver;
350 bool try_put(
const X& t) {
352 if (!res)
return false;
365 virtual bool remove_predecessor( predecessor_type & ) {
return false; }
367 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 368 typedef internal::edge_container<predecessor_type> built_predecessors_type;
369 typedef built_predecessors_type::edge_list_type predecessor_list_type;
370 virtual built_predecessors_type &built_predecessors() = 0;
371 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
372 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
373 virtual void copy_predecessors( predecessor_list_type & ) = 0;
374 virtual size_t predecessor_count() = 0;
382 virtual task* try_put_task_wrapper(
const void* p,
bool is_async ) = 0;
384 virtual graph& graph_reference() = 0;
391 virtual bool is_continue_receiver() {
return false; }
397 template<
typename T >
401 typedef T output_type;
418 __TBB_ASSERT(
false,
"async_msg interface does not support 'pull' protocol in try_get()");
428 __TBB_ASSERT(
false,
"async_msg interface does not support 'pull' protocol in try_reserve()");
434 template<
typename T >
446 return internal::untyped_receiver::try_put(t);
450 return internal::untyped_receiver::try_put(t);
459 virtual task *try_put_task(
const T& t) = 0;
463 #else // __TBB_PREVIEW_ASYNC_MSG 466 template<
typename T >
470 typedef T output_type;
480 virtual bool register_successor( successor_type &r ) = 0;
483 virtual bool remove_successor( successor_type &r ) = 0;
486 virtual bool try_get( T & ) {
return false; }
489 virtual bool try_reserve( T & ) {
return false; }
492 virtual bool try_release( ) {
return false; }
495 virtual bool try_consume( ) {
return false; }
497 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 498 typedef typename internal::edge_container<successor_type> built_successors_type;
500 typedef typename built_successors_type::edge_list_type successor_list_type;
501 virtual built_successors_type &built_successors() = 0;
502 virtual void internal_add_built_successor( successor_type & ) = 0;
503 virtual void internal_delete_built_successor( successor_type & ) = 0;
504 virtual void copy_successors( successor_list_type &) = 0;
505 virtual size_t successor_count() = 0;
510 template<
typename T >
514 typedef T input_type;
523 bool try_put(
const T& t ) {
525 if (!res)
return false;
536 virtual graph& graph_reference() = 0;
541 virtual bool register_predecessor( predecessor_type & ) {
return false; }
544 virtual bool remove_predecessor( predecessor_type & ) {
return false; }
546 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 547 typedef typename internal::edge_container<predecessor_type> built_predecessors_type;
548 typedef typename built_predecessors_type::edge_list_type predecessor_list_type;
549 virtual built_predecessors_type &built_predecessors() = 0;
550 virtual void internal_add_built_predecessor( predecessor_type & ) = 0;
551 virtual void internal_delete_built_predecessor( predecessor_type & ) = 0;
552 virtual void copy_predecessors( predecessor_list_type & ) = 0;
553 virtual size_t predecessor_count() = 0;
562 virtual bool is_continue_receiver() {
return false; }
564 #if __TBB_PREVIEW_OPENCL_NODE 565 template<
typename,
typename >
friend class proxy_dependency_receiver;
569 #endif // __TBB_PREVIEW_ASYNC_MSG 585 my_predecessor_count = my_initial_predecessor_count = number_of_predecessors;
586 my_current_count = 0;
593 my_current_count = 0;
600 ++my_predecessor_count;
610 --my_predecessor_count;
614 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 615 typedef internal::edge_container<predecessor_type> built_predecessors_type;
616 typedef built_predecessors_type::edge_list_type predecessor_list_type;
617 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
619 void internal_add_built_predecessor( predecessor_type &s)
__TBB_override {
621 my_built_predecessors.add_edge( s );
626 my_built_predecessors.delete_edge(s);
631 my_built_predecessors.copy_edges(v);
636 return my_built_predecessors.edge_count();
649 if ( ++my_current_count < my_predecessor_count )
652 my_current_count = 0;
654 task * res = execute();
658 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 661 built_predecessors_type my_built_predecessors;
665 int my_current_count;
666 int my_initial_predecessor_count;
673 my_current_count = 0;
675 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 676 my_built_predecessors.clear();
678 my_predecessor_count = my_initial_predecessor_count;
685 virtual task * execute() = 0;
693 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 694 template <
typename K,
typename T>
711 namespace interface10 {
716 #if __TBB_PREVIEW_ASYNC_MSG 721 template <
typename C,
typename N>
724 if (begin) current_node = my_graph->my_nodes;
728 template <
typename C,
typename N>
731 return *operator->();
734 template <
typename C,
typename N>
739 template <
typename C,
typename N>
741 if (current_node) current_node = current_node->next;
745 inline graph::graph() : my_nodes(NULL), my_nodes_last(NULL), my_task_arena(NULL) {
847 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 848 inline void graph::set_name(
const char *
name) {
864 template <
typename Output >
876 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 882 template<
typename Body >
884 :
graph_node(g), my_active(is_active), init_my_active(is_active),
887 my_reserved(false), my_has_cached_item(false)
889 my_successors.set_owner(
this);
897 my_active(src.init_my_active),
898 init_my_active(src.init_my_active), my_body( src.my_init_body->clone() ), my_init_body(src.my_init_body->clone() ),
899 my_reserved(false), my_has_cached_item(false)
901 my_successors.set_owner(
this);
909 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 918 my_successors.register_successor(r);
927 my_successors.remove_successor(r);
931 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 933 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
935 void internal_add_built_successor( successor_type &r)
__TBB_override {
937 my_successors.internal_add_built_successor(r);
940 void internal_delete_built_successor( successor_type &r)
__TBB_override {
942 my_successors.internal_delete_built_successor(r);
947 return my_successors.successor_count();
952 my_successors.copy_successors(v);
962 if ( my_has_cached_item ) {
964 my_has_cached_item =
false;
980 if ( my_has_cached_item ) {
993 __TBB_ASSERT( my_reserved && my_has_cached_item,
"releasing non-existent reservation" );
995 if(!my_successors.empty())
1003 __TBB_ASSERT( my_reserved && my_has_cached_item,
"consuming non-existent reservation" );
1004 my_reserved =
false;
1005 my_has_cached_item =
false;
1006 if ( !my_successors.empty() ) {
1016 if (!my_successors.empty())
1020 template<
typename Body>
1026 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1028 my_successors.built_successors().sender_extract(*
this);
1029 my_active = init_my_active;
1030 my_reserved =
false;
1031 if(my_has_cached_item) my_has_cached_item =
false;
1039 my_active = init_my_active;
1041 if(my_has_cached_item) {
1042 my_has_cached_item =
false;
1068 if ( my_reserved ) {
1071 if ( !my_has_cached_item ) {
1073 bool r = (*my_body)(my_cached_item);
1076 my_has_cached_item =
true;
1079 if ( my_has_cached_item ) {
1093 return (
new ( task::allocate_additional_child_of( *(this->
my_graph.
root_task()) ) )
1108 if ( !try_reserve_apply_body(v) )
1121 template <
typename Input,
typename Output = continue_msg,
typename Policy = queueing,
typename Allocator=cache_aligned_allocator<Input> >
1131 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1132 typedef typename input_impl_type::predecessor_list_type predecessor_list_type;
1133 typedef typename fOutput_type::successor_list_type successor_list_type;
1135 using input_impl_type::my_predecessors;
1141 template<
typename Body >
1153 input_impl_type(src),
1159 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1165 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1167 my_predecessors.built_predecessors().receiver_extract(*
this);
1168 successors().built_successors().sender_extract(*
this);
1176 using input_impl_type::try_put_task;
1181 input_impl_type::reset_function_input(f);
1183 if(f & rf_clear_edges) {
1184 successors().clear();
1185 my_predecessors.clear();
1187 __TBB_ASSERT(!(f & rf_clear_edges) || successors().
empty(),
"function_node successors not empty");
1188 __TBB_ASSERT(this->my_predecessors.empty(),
"function_node predecessors not empty");
1195 template <
typename Input,
typename Output,
typename Policy = queueing,
typename Allocator=cache_aligned_allocator<Input> >
1201 typename internal::wrap_tuple_elements<
1202 tbb::flow::tuple_size<Output>::value,
1203 internal::multifunction_output,
1219 using input_impl_type::my_predecessors;
1221 template<
typename Body>
1223 graph &g,
size_t concurrency,
1226 tbb::internal::fgt_multioutput_node_with_body<N>(
1227 tbb::internal::FLOW_MULTIFUNCTION_NODE,
1229 this->output_ports(), this->my_body
1235 tbb::internal::fgt_multioutput_node_with_body<N>( tbb::internal::FLOW_MULTIFUNCTION_NODE,
1237 this->output_ports(), this->my_body );
1240 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1246 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1248 my_predecessors.built_predecessors().receiver_extract(*
this);
1249 base_type::extract();
1259 template<
typename TupleType,
typename Allocator=cache_aligned_allocator<TupleType> >
1266 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1268 typedef typename base_type::predecessor_list_type predecessor_list_type;
1270 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
1281 tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->
my_graph,
1286 tbb::internal::fgt_multioutput_node<N>(tbb::internal::FLOW_SPLIT_NODE, &this->
my_graph,
1290 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1305 if (f & rf_clear_edges)
1314 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1319 void internal_add_built_predecessor(predecessor_type&)
__TBB_override {}
1322 void internal_delete_built_predecessor(predecessor_type&)
__TBB_override {}
1328 built_predecessors_type &built_predecessors()
__TBB_override {
return my_predessors; }
1331 built_predecessors_type my_predessors;
1339 template <
typename Output,
typename Policy =
internal::Policy<
void> >
1351 template <
typename Body >
1362 template <
typename Body >
1364 graph &g,
int number_of_predecessors,
1376 internal::function_output<Output>() {
1382 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1388 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1390 input_impl_type::my_built_predecessors.receiver_extract(*
this);
1391 successors().built_successors().sender_extract(*
this);
1399 using input_impl_type::try_put_task;
1403 input_impl_type::reset_receiver(f);
1404 if(f & rf_clear_edges)successors().clear();
1405 __TBB_ASSERT(!(f & rf_clear_edges) || successors().
empty(),
"continue_node not reset");
1410 template <
typename T>
1417 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1423 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1424 internal::edge_container<predecessor_type> my_built_predecessors;
1444 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1462 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1465 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
1467 void internal_add_built_successor(successor_type &r)
__TBB_override {
1468 my_successors.internal_add_built_successor(r);
1471 void internal_delete_built_successor(successor_type &r)
__TBB_override {
1472 my_successors.internal_delete_built_successor(r);
1476 return my_successors.successor_count();
1480 my_successors.copy_successors(v);
1485 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
1487 void internal_add_built_predecessor( predecessor_type &p)
__TBB_override {
1489 my_built_predecessors.add_edge(p);
1492 void internal_delete_built_predecessor( predecessor_type &p)
__TBB_override {
1494 my_built_predecessors.delete_edge(p);
1499 return my_built_predecessors.edge_count();
1502 void copy_predecessors(predecessor_list_type &v)
__TBB_override {
1504 my_built_predecessors.copy_edges(v);
1508 my_built_predecessors.receiver_extract(*
this);
1509 my_successors.built_successors().sender_extract(*
this);
1531 if (f&rf_clear_edges) {
1532 my_successors.
clear();
1533 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1534 my_built_predecessors.clear();
1537 __TBB_ASSERT(!(f & rf_clear_edges) || my_successors.
empty(),
"Error resetting broadcast_node");
1542 template <
typename T,
typename A=cache_aligned_allocator<T> >
1550 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1555 typedef size_t size_type;
1558 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1559 internal::edge_container<predecessor_type> my_built_predecessors;
1564 enum op_type {reg_succ, rem_succ, req_item, res_item, rel_res, con_res, put_item, try_fwd_task
1565 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1566 , add_blt_succ, del_blt_succ,
1567 add_blt_pred, del_blt_pred,
1568 blt_succ_cnt, blt_pred_cnt,
1569 blt_succ_cpy, blt_pred_cpy
1574 class buffer_operation :
public internal::aggregated_operation< buffer_operation > {
1577 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1582 predecessor_type *
p;
1584 successor_list_type *svec;
1585 predecessor_list_type *pvec;
1594 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
1595 , ltask(NULL), elem(const_cast<T*>(&e))
1597 , elem(const_cast<T*>(&e)) , ltask(NULL)
1604 typedef internal::aggregating_functor<class_type, buffer_operation>
handler_type;
1609 handle_operations_impl(op_list,
this);
1612 template<
typename derived_type>
1613 void handle_operations_impl(buffer_operation *op_list, derived_type* derived) {
1614 __TBB_ASSERT(static_cast<class_type*>(derived) ==
this,
"'this' is not a base class for derived");
1616 buffer_operation *tmp = NULL;
1617 bool try_forwarding =
false;
1620 op_list = op_list->next;
1621 switch (tmp->type) {
1622 case reg_succ: internal_reg_succ(tmp); try_forwarding =
true;
break;
1623 case rem_succ: internal_rem_succ(tmp);
break;
1624 case req_item: internal_pop(tmp);
break;
1625 case res_item: internal_reserve(tmp);
break;
1626 case rel_res: internal_release(tmp); try_forwarding =
true;
break;
1627 case con_res: internal_consume(tmp); try_forwarding =
true;
break;
1628 case put_item: try_forwarding = internal_push(tmp);
break;
1629 case try_fwd_task: internal_forward_task(tmp);
break;
1630 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1632 case add_blt_succ: internal_add_built_succ(tmp);
break;
1633 case del_blt_succ: internal_del_built_succ(tmp);
break;
1634 case add_blt_pred: internal_add_built_pred(tmp);
break;
1635 case del_blt_pred: internal_del_built_pred(tmp);
break;
1636 case blt_succ_cnt: internal_succ_cnt(tmp);
break;
1637 case blt_pred_cnt: internal_pred_cnt(tmp);
break;
1638 case blt_succ_cpy: internal_copy_succs(tmp);
break;
1639 case blt_pred_cpy: internal_copy_preds(tmp);
break;
1646 if (try_forwarding && !forwarder_busy) {
1648 forwarder_busy =
true;
1663 return op_data.ltask;
1666 inline bool enqueue_forwarding_task(buffer_operation &op_data) {
1667 task *ft = grab_forwarding_task(op_data);
1678 task *last_task = NULL;
1681 op_data.ltask = NULL;
1682 my_aggregator.execute(&op_data);
1693 virtual void internal_reg_succ(buffer_operation *op) {
1704 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1707 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
1709 virtual void internal_add_built_succ(buffer_operation *op) {
1710 my_successors.internal_add_built_successor(*(op->r));
1714 virtual void internal_del_built_succ(buffer_operation *op) {
1715 my_successors.internal_delete_built_successor(*(op->r));
1721 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
1723 virtual void internal_add_built_pred(buffer_operation *op) {
1724 my_built_predecessors.add_edge(*(op->p));
1728 virtual void internal_del_built_pred(buffer_operation *op) {
1729 my_built_predecessors.delete_edge(*(op->p));
1733 virtual void internal_succ_cnt(buffer_operation *op) {
1734 op->cnt_val = my_successors.successor_count();
1738 virtual void internal_pred_cnt(buffer_operation *op) {
1739 op->cnt_val = my_built_predecessors.edge_count();
1743 virtual void internal_copy_succs(buffer_operation *op) {
1744 my_successors.copy_successors(*(op->svec));
1748 virtual void internal_copy_preds(buffer_operation *op) {
1749 my_built_predecessors.copy_edges(*(op->pvec));
1759 return this->my_item_valid(this->my_tail - 1);
1768 this->destroy_back();
1775 internal_forward_task_impl(op,
this);
1778 template<
typename derived_type>
1780 __TBB_ASSERT(static_cast<class_type*>(derived) ==
this,
"'this' is not a base class for derived");
1782 if (this->my_reserved || !derived->is_item_valid()) {
1784 this->forwarder_busy =
false;
1788 task * last_task = NULL;
1789 size_type counter = my_successors.
size();
1790 for (; counter > 0 && derived->is_item_valid(); --counter)
1791 derived->try_put_and_add_task(last_task);
1793 op->ltask = last_task;
1794 if (last_task && !counter) {
1799 forwarder_busy =
false;
1804 this->push_back(*(op->elem));
1809 virtual void internal_pop(buffer_operation *op) {
1810 if(this->pop_back(*(op->elem))) {
1819 if(this->reserve_front(*(op->elem))) {
1828 this->consume_front();
1833 this->release_front();
1840 forwarder_busy(false) {
1842 my_aggregator.initialize_handler(handler_type(
this));
1849 internal::reservable_item_buffer<T>(),
receiver<T>(),
sender<T>() {
1850 forwarder_busy =
false;
1852 my_aggregator.initialize_handler(handler_type(
this));
1857 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 1870 buffer_operation op_data(reg_succ);
1872 my_aggregator.execute(&op_data);
1873 (
void)enqueue_forwarding_task(op_data);
1877 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 1879 buffer_operation op_data(add_blt_succ);
1881 my_aggregator.execute(&op_data);
1885 buffer_operation op_data(del_blt_succ);
1887 my_aggregator.execute(&op_data);
1891 buffer_operation op_data(add_blt_pred);
1893 my_aggregator.execute(&op_data);
1896 void internal_delete_built_predecessor( predecessor_type &p)
__TBB_override {
1897 buffer_operation op_data(del_blt_pred);
1899 my_aggregator.execute(&op_data);
1903 buffer_operation op_data(blt_pred_cnt);
1904 my_aggregator.execute(&op_data);
1905 return op_data.cnt_val;
1909 buffer_operation op_data(blt_succ_cnt);
1910 my_aggregator.execute(&op_data);
1911 return op_data.cnt_val;
1914 void copy_predecessors( predecessor_list_type &v )
__TBB_override {
1915 buffer_operation op_data(blt_pred_cpy);
1917 my_aggregator.execute(&op_data);
1921 buffer_operation op_data(blt_succ_cpy);
1923 my_aggregator.execute(&op_data);
1933 buffer_operation op_data(rem_succ);
1935 my_aggregator.execute(&op_data);
1939 (
void)enqueue_forwarding_task(op_data);
1947 buffer_operation op_data(req_item);
1949 my_aggregator.execute(&op_data);
1950 (
void)enqueue_forwarding_task(op_data);
1960 my_aggregator.execute(&op_data);
1961 (
void)enqueue_forwarding_task(op_data);
1969 my_aggregator.execute(&op_data);
1970 (
void)enqueue_forwarding_task(op_data);
1977 buffer_operation op_data(con_res);
1978 my_aggregator.execute(&op_data);
1979 (
void)enqueue_forwarding_task(op_data);
1991 my_aggregator.execute(&op_data);
1992 task *ft = grab_forwarding_task(op_data);
2014 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 2017 my_built_predecessors.receiver_extract(*
this);
2018 my_successors.built_successors().sender_extract(*
this);
2026 if (f&rf_clear_edges) {
2027 my_successors.
clear();
2028 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 2029 my_built_predecessors.clear();
2032 forwarder_busy =
false;
2037 template <
typename T,
typename A=cache_aligned_allocator<T> >
2049 return this->my_item_valid(this->my_head);
2056 graph& graph_ref = this->graph_reference();
2058 this->destroy_front();
2064 this->internal_forward_task_impl(op,
this);
2068 if ( this->my_reserved || !this->my_item_valid(this->my_head)){
2072 this->pop_front(*(op->elem));
2077 if (this->my_reserved || !this->my_item_valid(this->my_head)) {
2081 this->reserve_front(*(op->elem));
2086 this->consume_front();
2110 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2118 base_type::reset_node(f);
2123 template<
typename T,
typename A=cache_aligned_allocator<T> >
2135 template<
typename Sequencer >
2137 my_sequencer(new internal::function_body_leaf< T, size_t, Sequencer>(s) ) {
2145 my_sequencer( src.my_sequencer->clone() ) {
2154 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2166 size_type tag = (*my_sequencer)(*(op->elem));
2167 #if !TBB_DEPRECATED_SEQUENCER_DUPLICATES 2168 if (tag < this->my_head) {
2175 size_t new_tail = (tag+1 > this->my_tail) ? tag+1 : this->my_tail;
2177 if (this->
size(new_tail) > this->capacity()) {
2178 this->grow_my_array(this->
size(new_tail));
2180 this->my_tail = new_tail;
2189 template<
typename T,
typename Compare = std::less<T>,
typename A=cache_aligned_allocator<T> >
2213 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2223 base_type::reset_node(f);
2232 this->internal_forward_task_impl(op,
this);
2236 this->handle_operations_impl(op_list,
this);
2240 prio_push(*(op->elem));
2247 if ( this->my_reserved ==
true || this->my_tail == 0 ) {
2252 *(op->elem) = prio();
2260 if (this->my_reserved ==
true || this->my_tail == 0) {
2264 this->my_reserved =
true;
2265 *(op->elem) = prio();
2266 reserved_item = *(op->elem);
2273 this->my_reserved =
false;
2274 reserved_item = input_type();
2279 prio_push(reserved_item);
2280 this->my_reserved =
false;
2281 reserved_item = input_type();
2288 if (mark < this->my_tail) heapify();
2289 __TBB_ASSERT(mark == this->my_tail,
"mark unequal after heapify");
2293 return this->my_tail > 0;
2300 graph& graph_ref = this->graph_reference();
2314 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds before test");
2315 return mark < this->my_tail && compare(this->get_my_item(0), this->get_my_item(this->my_tail - 1));
2320 if ( this->my_tail >= this->my_array_size )
2321 this->grow_my_array( this->my_tail + 1 );
2322 (
void) this->place_item(this->my_tail, src);
2324 __TBB_ASSERT(mark < this->my_tail,
"mark outside bounds after push");
2331 if (prio_use_tail()) {
2334 this->destroy_item(this->my_tail-1);
2336 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds after pop");
2339 this->destroy_item(0);
2340 if(this->my_tail > 1) {
2342 __TBB_ASSERT(this->my_item_valid(this->my_tail - 1), NULL);
2343 this->move_item(0,this->my_tail - 1);
2346 if(mark > this->my_tail) --mark;
2347 if (this->my_tail > 1)
2349 __TBB_ASSERT(mark <= this->my_tail,
"mark outside bounds after pop");
2353 return this->get_my_item(prio_use_tail() ? this->my_tail-1 : 0);
2358 if(this->my_tail == 0) {
2362 if (!mark) mark = 1;
2363 for (; mark<this->my_tail; ++mark) {
2364 size_type cur_pos = mark;
2365 input_type to_place;
2366 this->fetch_item(mark,to_place);
2368 size_type
parent = (cur_pos-1)>>1;
2369 if (!compare(this->get_my_item(parent), to_place))
2371 this->move_item(cur_pos, parent);
2374 (
void) this->place_item(cur_pos, to_place);
2380 size_type cur_pos=0, child=1;
2381 while (child < mark) {
2382 size_type target = child;
2384 compare(this->get_my_item(child),
2385 this->get_my_item(child+1)))
2388 if (compare(this->get_my_item(target),
2389 this->get_my_item(cur_pos)))
2392 this->swap_items(cur_pos, target);
2394 child = (cur_pos<<1)+1;
2403 template<
typename T >
2410 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 2433 return ( my_count + my_tries < my_threshold && !my_predecessors.
empty() && !my_successors.
empty() );
2440 bool reserved =
false;
2443 if ( check_conditions() )
2460 if ( check_conditions() ) {
2478 if ( check_conditions() ) {
2498 if(my_count) --my_count;
2500 return forward_task();
2509 graph_node(g), my_threshold(threshold), my_count(0), my_tries(0),
2510 init_decrement_predecessors(num_decrement_predecessors),
2511 decrement(num_decrement_predecessors)
2524 my_threshold(src.my_threshold), my_count(0), my_tries(0),
2525 init_decrement_predecessors(src.init_decrement_predecessors),
2526 decrement(src.init_decrement_predecessors)
2536 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2545 bool was_empty = my_successors.
empty();
2548 if ( was_empty && !my_predecessors.
empty() && my_count + my_tries < my_threshold ) {
2566 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 2567 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
2568 built_predecessors_type &built_predecessors()
__TBB_override {
return my_predecessors.built_predecessors(); }
2570 void internal_add_built_successor(successor_type &src)
__TBB_override {
2571 my_successors.internal_add_built_successor(src);
2574 void internal_delete_built_successor(successor_type &src)
__TBB_override {
2575 my_successors.internal_delete_built_successor(src);
2578 size_t successor_count()
__TBB_override {
return my_successors.successor_count(); }
2581 my_successors.copy_successors(v);
2584 void internal_add_built_predecessor(predecessor_type &src)
__TBB_override {
2585 my_predecessors.internal_add_built_predecessor(src);
2588 void internal_delete_built_predecessor(predecessor_type &src)
__TBB_override {
2589 my_predecessors.internal_delete_built_predecessor(src);
2592 size_t predecessor_count()
__TBB_override {
return my_predecessors.predecessor_count(); }
2594 void copy_predecessors(predecessor_list_type &v)
__TBB_override {
2595 my_predecessors.copy_predecessors(v);
2600 my_successors.built_successors().sender_extract(*
this);
2601 my_predecessors.built_predecessors().receiver_extract(*
this);
2602 decrement.built_predecessors().receiver_extract(decrement);
2609 my_predecessors.
add( src );
2620 my_predecessors.
remove( src );
2633 if ( my_count + my_tries >= my_threshold )
2645 rtask =
new ( task::allocate_additional_child_of( *(this->
my_graph.
root_task()) ) )
2667 if(f & rf_clear_edges) {
2668 my_predecessors.
clear();
2669 my_successors.
clear();
2673 my_predecessors.
reset( );
2687 template<
typename OutputTuple,
typename JP=queueing>
class join_node;
2689 template<
typename OutputTuple>
2698 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->
my_graph,
2702 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_RESERVING, &this->
my_graph,
2706 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2714 template<
typename OutputTuple>
2723 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->
my_graph,
2727 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_QUEUEING, &this->
my_graph,
2731 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2741 template<
typename OutputTuple,
typename K,
typename KHash>
2743 key_matching_port, OutputTuple, key_matching<K,KHash> > {
2751 #if __TBB_PREVIEW_MESSAGE_BASED_KEY_MATCHING 2755 template<
typename __TBB_B0,
typename __TBB_B1>
2757 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2760 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2>
2761 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2) : unfolded_type(g, b0, b1, b2) {
2762 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2765 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3>
2766 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3) : unfolded_type(g, b0, b1, b2, b3) {
2767 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2770 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4>
2771 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4) :
2772 unfolded_type(g, b0, b1, b2, b3, b4) {
2773 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2776 #if __TBB_VARIADIC_MAX >= 6 2777 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2779 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5) :
2780 unfolded_type(g, b0, b1, b2, b3, b4, b5) {
2781 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2785 #if __TBB_VARIADIC_MAX >= 7 2786 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2787 typename __TBB_B5,
typename __TBB_B6>
2788 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6) :
2789 unfolded_type(g, b0, b1, b2, b3, b4, b5, b6) {
2790 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2794 #if __TBB_VARIADIC_MAX >= 8 2795 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2796 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7>
2797 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2798 __TBB_B7 b7) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7) {
2799 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2803 #if __TBB_VARIADIC_MAX >= 9 2804 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2805 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7,
typename __TBB_B8>
2806 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2807 __TBB_B7 b7, __TBB_B8 b8) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8) {
2808 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2812 #if __TBB_VARIADIC_MAX >= 10 2813 template<
typename __TBB_B0,
typename __TBB_B1,
typename __TBB_B2,
typename __TBB_B3,
typename __TBB_B4,
2814 typename __TBB_B5,
typename __TBB_B6,
typename __TBB_B7,
typename __TBB_B8,
typename __TBB_B9>
2815 join_node(
graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6,
2816 __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9) : unfolded_type(g, b0, b1, b2, b3, b4, b5, b6, b7, b8, b9) {
2817 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2822 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_JOIN_NODE_TAG_MATCHING, &this->
my_graph,
2826 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2843 template<
typename T0>
2846 static const int N = 1;
2852 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2857 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2861 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2868 template<
typename T0,
typename T1>
2871 static const int N = 2;
2877 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2882 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2886 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2893 template<
typename T0,
typename T1,
typename T2>
2896 static const int N = 3;
2902 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2907 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2911 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2918 template<
typename T0,
typename T1,
typename T2,
typename T3>
2921 static const int N = 4;
2927 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2932 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2936 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2943 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4>
2946 static const int N = 5;
2952 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2957 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2961 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2968 #if __TBB_VARIADIC_MAX >= 6 2969 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5>
2970 class indexer_node<T0, T1, T2, T3, T4, T5> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5> > {
2972 static const int N = 6;
2978 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2983 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
2987 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 2993 #endif //variadic max 6 2995 #if __TBB_VARIADIC_MAX >= 7 2996 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
2998 class indexer_node<T0, T1, T2, T3, T4, T5, T6> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6> > {
3000 static const int N = 7;
3006 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
3011 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
3015 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3021 #endif //variadic max 7 3023 #if __TBB_VARIADIC_MAX >= 8 3024 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3025 typename T6,
typename T7>
3026 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7> > {
3028 static const int N = 8;
3034 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
3039 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
3043 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3049 #endif //variadic max 8 3051 #if __TBB_VARIADIC_MAX >= 9 3052 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3053 typename T6,
typename T7,
typename T8>
3054 class indexer_node<T0, T1, T2, T3, T4, T5, T6, T7, T8> :
public internal::unfolded_indexer_node<tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8> > {
3056 static const int N = 9;
3062 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
3067 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
3071 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3077 #endif //variadic max 9 3079 #if __TBB_VARIADIC_MAX >= 10 3080 template<
typename T0,
typename T1,
typename T2,
typename T3,
typename T4,
typename T5,
3081 typename T6,
typename T7,
typename T8,
typename T9>
3084 static const int N = 10;
3086 typedef tuple<T0, T1, T2, T3, T4, T5, T6, T7, T8, T9>
InputTuple;
3087 typedef typename internal::tagged_msg<size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9> output_type;
3090 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
3095 tbb::internal::fgt_multiinput_node<N>( tbb::internal::FLOW_INDEXER_NODE, &this->
my_graph,
3099 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3105 #endif //variadic max 10 3107 #if __TBB_PREVIEW_ASYNC_MSG 3110 template<
typename T >
3113 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3114 s.internal_add_built_predecessor(p);
3115 p.internal_add_built_successor(s);
3122 template<
typename T >
3127 #if __TBB_PREVIEW_ASYNC_MSG 3128 template<
typename TS,
typename TR,
3135 template<
typename T >
3140 template<
typename T >
3145 #endif // __TBB_PREVIEW_ASYNC_MSG 3147 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3149 template<
typename T,
typename V,
3150 typename =
typename T::output_ports_type,
typename =
typename V::input_ports_type >
3152 make_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3156 template<
typename T,
typename R,
3157 typename =
typename T::output_ports_type >
3159 make_edge(get<0>(output.output_ports()), input);
3163 template<
typename S,
typename V,
3164 typename =
typename V::input_ports_type >
3166 make_edge(output, get<0>(input.input_ports()));
3170 #if __TBB_PREVIEW_ASYNC_MSG 3173 template<
typename T >
3177 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3179 p.internal_delete_built_successor(s);
3180 s.internal_delete_built_predecessor(p);
3186 template<
typename T >
3191 #if __TBB_PREVIEW_ASYNC_MSG 3192 template<
typename TS,
typename TR,
3199 template<
typename T >
3204 template<
typename T >
3208 #endif // __TBB_PREVIEW_ASYNC_MSG 3210 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3212 template<
typename T,
typename V,
3213 typename =
typename T::output_ports_type,
typename =
typename V::input_ports_type >
3215 remove_edge(get<0>(output.output_ports()), get<0>(input.input_ports()));
3219 template<
typename T,
typename R,
3220 typename =
typename T::output_ports_type >
3222 remove_edge(get<0>(output.output_ports()), input);
3225 template<
typename S,
typename V,
3226 typename =
typename V::input_ports_type >
3232 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3233 template<
typename C >
3234 template<
typename S >
3235 void internal::edge_container<C>::sender_extract(
S &s ) {
3236 edge_list_type e = built_edges;
3237 for (
typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3242 template<
typename C >
3243 template<
typename R >
3244 void internal::edge_container<C>::receiver_extract( R &r ) {
3245 edge_list_type e = built_edges;
3246 for (
typename edge_list_type::iterator i = e.begin(); i != e.end(); ++i ) {
3253 template<
typename Body,
typename Node >
3255 return n.template copy_function_object<Body>();
3258 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3263 template<
typename... InputTypes,
typename... OutputTypes>
3274 static const size_t NUM_INPUTS =
sizeof...(InputTypes);
3275 static const size_t NUM_OUTPUTS =
sizeof...(OutputTypes);
3281 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3292 template<
typename T1,
typename T2>
3296 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T1>(input_ports_tuple));
3297 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T2>(output_ports_tuple));
3303 template<
typename... NodeTypes >
3306 template<
typename... NodeTypes >
3309 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3316 __TBB_ASSERT(my_input_ports,
"input ports not set, call set_external_ports to set input ports");
3317 return *my_input_ports;
3321 __TBB_ASSERT(my_output_ports,
"output ports not set, call set_external_ports to set output ports");
3322 return *my_output_ports;
3325 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3327 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3333 template<
typename... InputTypes>
3340 static const size_t NUM_INPUTS =
sizeof...(InputTypes);
3346 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3357 template<
typename T>
3361 my_input_ports = tbb::internal::make_unique<input_ports_type>(std::forward<T>(input_ports_tuple));
3366 template<
typename... NodeTypes >
3369 template<
typename... NodeTypes >
3372 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3379 __TBB_ASSERT(my_input_ports,
"input ports not set, call set_external_ports to set input ports");
3380 return *my_input_ports;
3383 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3385 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3392 template<
typename... OutputTypes>
3399 static const size_t NUM_OUTPUTS =
sizeof...(OutputTypes);
3405 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3416 template<
typename T>
3420 my_output_ports = tbb::internal::make_unique<output_ports_type>(std::forward<T>(output_ports_tuple));
3425 template<
typename... NodeTypes >
3428 template<
typename... NodeTypes >
3431 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3438 __TBB_ASSERT(my_output_ports,
"output ports not set, call set_external_ports to set output ports");
3439 return *my_output_ports;
3442 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3444 __TBB_ASSERT(
false,
"Current composite_node implementation does not support extract");
3450 #endif // __TBB_FLOW_GRAPH_CPP11_FEATURES 3452 namespace internal {
3454 template<
typename Gateway>
3461 my_gateway = gateway;
3468 template<
typename Input,
typename Ports,
typename Gateway,
typename Body>
3475 : base_type(gateway), my_body(body) { }
3478 my_body(v, *this->my_gateway);
3490 template <
typename Input,
typename Output,
3514 try_put_functor(output_port_type &p,
const Output &v) : port(&p), value(&v), result(false) { }
3516 result = port->
try_put(*value);
3525 my_node->my_graph.reserve_wait();
3529 my_node->my_graph.release_wait();
3535 return my_node->try_put_impl(i);
3556 template<
typename Body>
3558 graph &g,
size_t concurrency,
3562 internal::async_body<Input, typename base_type::output_ports_type, gateway_type, Body>
3564 tbb::internal::fgt_multioutput_node_with_body<1>(
3565 tbb::internal::FLOW_ASYNC_NODE,
3567 this->output_ports(), this->my_body
3572 static_cast<async_body_base_type*
>(this->my_body->get_body_ptr())->set_gateway(&my_gateway);
3573 static_cast<async_body_base_type*
>(this->my_init_body->get_body_ptr())->set_gateway(&my_gateway);
3575 tbb::internal::fgt_multioutput_node_with_body<1>( tbb::internal::FLOW_ASYNC_NODE,
3577 this->output_ports(), this->my_body );
3584 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3594 return internal::output_port<0>(*this).register_successor(r);
3599 return internal::output_port<0>(*this).remove_successor(r);
3602 template<
typename Body>
3606 mfn_body_type &body_ref = *this->my_body;
3608 return ab.get_body();
3611 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3612 typedef typename internal::edge_container<successor_type> built_successors_type;
3614 typedef typename built_successors_type::edge_list_type successor_list_type;
3616 return internal::output_port<0>(*this).built_successors();
3619 void internal_add_built_successor( successor_type &r )
__TBB_override {
3620 internal::output_port<0>(*this).internal_add_built_successor(r);
3623 void internal_delete_built_successor( successor_type &r )
__TBB_override {
3624 internal::output_port<0>(*this).internal_delete_built_successor(r);
3628 internal::output_port<0>(*this).copy_successors(l);
3632 return internal::output_port<0>(*this).successor_count();
3639 base_type::reset_node(f);
3643 #if __TBB_PREVIEW_STREAMING_NODE 3645 #endif // __TBB_PREVIEW_STREAMING_NODE 3650 namespace interface10a {
3652 using namespace interface10;
3653 namespace internal = interface10::internal;
3655 template<
typename T >
3662 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3686 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3696 bool ret = s.try_put( my_buffer );
3722 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3723 built_predecessors_type &built_predecessors()
__TBB_override {
return my_built_predecessors; }
3724 built_successors_type &built_successors()
__TBB_override {
return my_successors.built_successors(); }
3726 void internal_add_built_successor( successor_type &s)
__TBB_override {
3728 my_successors.internal_add_built_successor(s);
3731 void internal_delete_built_successor( successor_type &s)
__TBB_override {
3733 my_successors.internal_delete_built_successor(s);
3738 return my_successors.successor_count();
3743 my_successors.copy_successors(v);
3746 void internal_add_built_predecessor( predecessor_type &p)
__TBB_override {
3748 my_built_predecessors.add_edge(p);
3751 void internal_delete_built_predecessor( predecessor_type &p)
__TBB_override {
3753 my_built_predecessors.delete_edge(p);
3758 return my_built_predecessors.edge_count();
3761 void copy_predecessors( predecessor_list_type &v )
__TBB_override {
3763 my_built_predecessors.copy_edges(v);
3767 my_buffer_is_valid =
false;
3768 built_successors().sender_extract(*
this);
3769 built_predecessors().receiver_extract(*
this);
3776 if ( my_buffer_is_valid ) {
3796 return my_buffer_is_valid;
3801 my_buffer_is_valid =
false;
3811 return try_put_task_impl(v);
3816 my_buffer_is_valid =
true;
3830 o(owner), s(succ) {};
3834 o.register_successor(s);
3839 predecessor_type&
o;
3845 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 3846 internal::edge_container<predecessor_type> my_built_predecessors;
3853 my_buffer_is_valid =
false;
3854 if (f&rf_clear_edges) {
3855 my_successors.
clear();
3860 template<
typename T >
3883 #if TBB_PREVIEW_FLOW_GRAPH_TRACE 3895 return this->my_buffer_is_valid ? NULL : this->try_put_task_impl(v);
3927 using namespace interface10::internal::graph_policy_namespace;
3934 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 3938 #if __TBB_PREVIEW_ASYNC_MSG 3941 #if __TBB_PREVIEW_STREAMING_NODE 3944 #endif // __TBB_PREVIEW_STREAMING_NODE 3945 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES 3954 #undef __TBB_PFG_RESET_ARG 3957 #endif // __TBB_flow_graph_H join_node(const join_node &other)
graph & graph_reference() __TBB_override
The leaf for source_body.
void reserve_wait() __TBB_override
Used to register that an external entity may still interact with the graph.
task * try_put_task(const T &t) __TBB_override
build a task to run the successor if possible. Default is old behavior.
void reset_node(reset_flags) __TBB_override
unfolded_join_node : passes input_ports_type to join_node_base. We build the input port type ...
sender< output_type >::successor_type successor_type
join_node(const join_node &other)
An empty class used for messages that mean "I'm done".
buffer_node< T, A >::size_type size_type
task * decrement_counter()
virtual ~untyped_receiver()
Destructor.
function_body that takes an Input and a set of output ports
Forwards messages in sequence order.
An cache of predecessors that supports requests and reservations.
buffer_node< T, A > base_type
static void alias_port(void *, PortsTuple &)
static void fgt_async_commit(void *, void *)
std::unique_ptr< output_ports_type > my_output_ports
static tbb::task *const SUCCESSFULLY_ENQUEUED
virtual bool register_predecessor(predecessor_type &)
Add a predecessor to the node.
internal::unfolded_indexer_node< InputTuple > unfolded_type
task * try_put_task(const T &v) __TBB_override
Put item to successor; return task to run the successor if possible.
~sequencer_node()
Destructor.
receiver_gateway_impl(async_node *node)
void internal_pop(queue_operation *op) __TBB_override
internal::unfolded_indexer_node< InputTuple > unfolded_type
indexer_node(const indexer_node &other)
tuple< T0, T1 > InputTuple
static void fgt_async_reserve(void *, void *)
internal::tagged_msg< size_t, T0, T1, T2, T3, T4 > output_type
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8)
A cache of successors that are broadcast to.
input_ports_type & input_ports()
limiter_node(const limiter_node &src)
Copy constructor.
tuple< T0, T1, T2, T3, T4, T5, T6, T7 > InputTuple
virtual bool try_release()
Releases the reserved item.
write_once_node(const write_once_node &src)
Copy constructor: call base class copy constructor.
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
internal::unfolded_indexer_node< InputTuple > unfolded_type
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7 > output_type
internal::broadcast_cache< output_type > my_successors
fOutput_type::successor_type successor_type
tbb::flow::tuple< sender< OutputTypes > &... > output_ports_type
bool remove_successor(successor_type &s) __TBB_override
Removes a successor from this node.
bool try_get(output_type &v) __TBB_override
Request an item from the node.
void increment_ref_count()
Atomically increment reference count.
static void * to_void_ptr(T &t)
static void fgt_make_edge(void *, void *)
void release_wait() __TBB_override
Inform a graph that a previous call to reserve_wait is no longer in effect.
~graph()
Destroys the graph.
receiver_gateway< output_type > gateway_type
virtual void internal_reserve(buffer_operation *op)
continue_node(const continue_node &src)
Copy constructor.
A task that calls a node's apply_body_bypass function with no input.
bool try_reserve(output_type &v)
Implements a function node that supports Input -> Output.
internal::broadcast_cache< input_type > my_successors
#define __TBB_STATIC_ASSERT(condition, msg)
overwrite_node(const overwrite_node &src)
Copy constructor; doesn't take anything from src; default won't work.
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6)
void fgt_multiinput_multioutput_node_desc(const NodeType *, const char *)
internal::function_input< input_type, output_type, Policy, Allocator > input_impl_type
internal::unfolded_join_node< N, key_matching_port, OutputTuple, key_matching< K, KHash > > unfolded_type
indexer_node(const indexer_node &other)
bool internal_push(prio_operation *op) __TBB_override
priority_queue_node class_type
fOutput_type::successor_type successor_type
sender< output_type >::successor_type successor_type
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Implements methods for both executable and function nodes that puts Output to its successors...
try_put_functor(output_port_type &p, const Output &v)
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8 > output_type
static const void * to_void_ptr(const T &t)
virtual void internal_consume(buffer_operation *op)
buffer_node(graph &g)
Constructor.
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5 > output_type
unfolded_type::input_ports_type input_ports_type
internal::source_body< output_type > * my_body
internal::source_body< output_type > * my_init_body
void reset_node(reset_flags f) __TBB_override
bool remove_successor(successor_type &r) __TBB_override
Removes s as a successor.
task * try_put_task(const T &t) __TBB_override
Puts an item to this receiver.
void reset_node(reset_flags f) __TBB_override
concurrency
An enumeration the provides the two most common concurrency levels: unlimited and serial...
Detects whether two given types are the same.
static const void * to_void_ptr(const T &t)
interface10::internal::Policy< queueing, lightweight > queueing_lightweight
void remove_successor(successor_type &r)
const_iterator cend() const
end const iterator
Base class for types that should not be assigned.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
receiver< input_type >::predecessor_type predecessor_type
receiver< input_type >::predecessor_type predecessor_type
The predecessor type for this node.
static task * emit_this(graph &g, const T &t, P &p)
tbb::task_group_context * my_context
bool internal_push(sequencer_operation *op) __TBB_override
overwrite_node< T > base_type
tbb::spin_mutex nodelist_mutex
void set_external_ports(T1 &&input_ports_tuple, T2 &&output_ports_tuple)
int init_decrement_predecessors
bool register_successor(successor_type &r) __TBB_override
Adds a successor.
write_once_node(graph &g)
Constructor.
buffer_node< T, A >::buffer_operation sequencer_operation
bool try_consume() __TBB_override
Consumes the reserved item.
tbb::flow::tuple< sender< OutputTypes > &... > output_ports_type
indexer_node(const indexer_node &other)
Forwards messages in arbitrary order.
task that does nothing. Useful for synchronization.
virtual bool try_get(T &)
Request an item from the sender.
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
indexer_node(const indexer_node &other)
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
Breaks an infinite loop between the node reservation and register_successor call. ...
receiver< TupleType > base_type
function_node(const function_node &src)
Copy constructor.
static void fgt_begin_body(void *)
graph_node * my_nodes_last
bool try_get(input_type &v) __TBB_override
Request an item from the sender.
split_node: accepts a tuple as input, forwards each element of the tuple to its
register_predecessor_task(predecessor_type &owner, successor_type &succ)
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
static void fgt_async_try_put_end(void *, void *)
internal::aggregator< handler_type, buffer_operation > my_aggregator
void make_edge(sender< T > &p, receiver< T > &s)
Makes an edge between a single predecessor and a single successor.
static void fgt_composite(void *, void *)
Base class for receivers of completion messages.
virtual void reset_node(reset_flags f=rf_reset_protocol)=0
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5)
An executable node that acts as a source, i.e. it has no predecessors.
int decrement_ref_count()
Atomically decrement reference count and returns its new value.
unfolded_type::input_ports_type input_ports_type
internal::unfolded_indexer_node< InputTuple > unfolded_type
void reset_node(reset_flags f) __TBB_override
internal::tagged_msg< size_t, T0, T1 > output_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
sender< output_type >::successor_type successor_type
internal::tagged_msg< size_t, T0 > output_type
tuple< T0, T1, T2, T3, T4, T5 > InputTuple
void set_owner(owner_type *owner)
multifunction_node(const multifunction_node &other)
K key_from_message(const T &t)
graph & graph_reference() __TBB_override
The base of all graph nodes.
Body copy_function_object()
tbb::flow::tuple_element< N, typename JNT::input_ports_type >::type & input_port(JNT &jn)
templated function to refer to input ports of the join node
graph & graph_reference() __TBB_override
std::unique_ptr< output_ports_type > my_output_ports
buffer_node(const buffer_node &src)
Copy constructor.
internal::broadcast_cache< T > my_successors
Represents acquisition of a mutex.
Pure virtual template class that defines a receiver of messages of type T.
buffer_operation(const T &e, op_type t)
async_node(const async_node &other)
wrap_tuple_elements< N, PT, OutputTuple >::type input_ports_type
item_buffer with reservable front-end. NOTE: if reserving, do not
void internal_reserve(queue_operation *op) __TBB_override
void internal_consume(queue_operation *op) __TBB_override
virtual bool try_consume()
Consumes the reserved item.
task * try_put_task_impl(const input_type &v)
output_ports_type my_output_ports
async_body(const Body &body, gateway_type *gateway)
priority_queue_node(const priority_queue_node &src)
Copy constructor.
void internal_remove_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
join_node(const join_node &other)
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3)
void remove_node(graph_node *n)
static void fgt_async_try_put_begin(void *, void *)
async_body_base< Gateway > base_type
internal::continue_input< Output, Policy > input_impl_type
Forwards messages in FIFO order.
receiver< input_type >::predecessor_type predecessor_type
task * grab_forwarding_task(buffer_operation &op_data)
static void alias_port(void *, PortsTuple &)
static void fgt_graph_desc(void *, const char *)
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8 > InputTuple
graph_iterator< graph, graph_node > iterator
Forwards messages of type T to all successors.
void internal_pop(prio_operation *op) __TBB_override
graph & graph_reference() __TBB_override
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
task * try_put_task(const input_type &v) __TBB_override
Put item to successor; return task to run the successor if possible.
void set_ref_count(int count)
Set reference count.
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
void add_nodes_impl(CompositeType *, bool)
graph_iterator< const graph, const graph_node > const_iterator
static const node_priority_t no_priority
unfolded_type::input_ports_type input_ports_type
void __TBB_EXPORTED_METHOD reset()
Forcefully reinitializes the context after the task tree it was associated with is completed...
unsigned int node_priority_t
void register_node(graph_node *n)
internal::async_body_base< gateway_type > async_body_base_type
bool try_get(X &t)
Request an item from the sender.
void const char const char int ITT_FORMAT __itt_group_sync p
buffer_node< T, A > base_type
static void fgt_release_wait(void *)
input_ports_type & input_ports()
void add_visible_nodes(const NodeTypes &... n)
virtual bool register_successor(successor_type &r)=0
Add a new successor to this node.
internal::unfolded_indexer_node< InputTuple > unfolded_type
sender< output_type >::successor_type successor_type
The type of successors of this node.
bool register_successor(successor_type &s) __TBB_override
Add a new successor to this node.
iterator end()
end iterator
async_body_base(gateway_type *gateway)
Output output_type
The type of the output message, which is complete.
receiver< input_type > receiver_type
static void clear_this(P &p)
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
indexer_node(const indexer_node &other)
pointer operator->() const
Dereference.
bool try_put_impl(const Output &i)
Implements gateway_type::try_put for an external activity to submit a message to FG.
void internal_forward_task_impl(buffer_operation *op, derived_type *derived)
base_type::buffer_operation queue_operation
priority_queue_node(graph &g)
Constructor.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark S
virtual bool remove_predecessor(predecessor_type &)
Remove a predecessor from the node.
internal::function_input_queue< input_type, Allocator > input_queue_type
void release_wait() __TBB_override
Deregisters an external entity that may have interacted with the graph.
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > output_type
internal::port_ref_impl< N1, N2 > port_ref()
const_iterator cbegin() const
start const iterator
Forward declaration section.
output_ports_type & output_ports()
sender< output_type >::successor_type successor_type
~source_node()
The destructor.
tbb::task * root_task()
Returns the root task of the graph.
field of type K being used for matching.
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4)
receiver< input_type >::predecessor_type predecessor_type
sender< output_type >::successor_type successor_type
Forwards messages in priority order.
internal::unfolded_indexer_node< InputTuple > unfolded_type
void add_nodes(const NodeTypes &... n)
sender< output_type >::successor_type successor_type
bool try_consume() __TBB_override
Consumes a reserved item.
virtual ~untyped_sender()
virtual task * forward_task()
This is executed by an enqueued task, the "forwarder".
virtual void handle_operations(buffer_operation *op_list)
void activate_graph(graph &g)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
source_node(const source_node &src)
Copy constructor.
Body copy_body(Node &n)
Returns a copy of the body from a function or continue node.
void reset(reset_flags f=rf_reset_protocol)
bool try_release() __TBB_override
Release a reserved item.
input_impl_type::predecessor_type predecessor_type
internal::function_input_queue< input_type, Allocator > input_queue_type
internal::broadcast_cache< input_type, null_rw_mutex > my_successors
queue_node(const queue_node &src)
Copy constructor.
internal::async_helpers< T >::filtered_type filtered_type
void remove_edge(sender< T > &p, receiver< T > &s)
Removes an edge between a single predecessor and a single successor.
void internal_make_edge(internal::untyped_sender &p, internal::untyped_receiver &s)
task * try_put_task(const T &t) __TBB_override
receive an item, return a task *if possible
buffer_node< T, A >::item_type item_type
indexer_node(const indexer_node &other)
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
void reset_node(reset_flags f) __TBB_override
GraphNodeType & reference
void deactivate_graph(graph &g)
static void fgt_graph(void *)
bool try_put(const Output &i) __TBB_override
Implements gateway_type::try_put for an external activity to submit a message to FG.
virtual graph & graph_reference()=0
T input_type
The input type of this receiver.
bool register_predecessor(predecessor_type &) __TBB_override
Increments the trigger threshold.
internal::tagged_msg< size_t, T0, T1, T2, T3 > output_type
void reset_node(reset_flags f) __TBB_override
void internal_reserve(prio_operation *op) __TBB_override
input_impl_type::predecessor_type predecessor_type
void register_successor(successor_type &r)
output_ports_type & output_ports()
static void fgt_multiinput_multioutput_node(string_index, void *, void *)
internal::multifunction_input< Input, typename base_type::output_ports_type, Policy, Allocator > mfn_input_type
static void fgt_remove_edge(void *, void *)
std::unique_ptr< input_ports_type > my_input_ports
virtual task * try_put_task_wrapper(const void *p, bool is_async) __TBB_override
internal::tagged_msg< size_t, T0, T1, T2 > output_type
internal::broadcast_cache< output_type > & successors() __TBB_override
task * try_put_task(const X &t)
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
bool try_get(T &v) __TBB_override
Request an item from the buffer_node.
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > base_type
bool try_release() __TBB_override
Releases the reserved item.
buffer_operation(op_type t)
graph & graph_reference() __TBB_override
reference operator*() const
Dereference.
continue_node(graph &g,)
Constructor for executable node with continue_msg -> Output.
multifunction_node< Input, tuple< Output >, Policy, Allocator > base_type
void try_put_and_add_task(task *&last_task)
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7)
virtual void internal_rem_succ(buffer_operation *op)
Remove successor.
bool try_put(const typename internal::async_helpers< T >::filtered_type &t)
Put an item to the receiver.
sender< output_type >::successor_type successor_type
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2, __TBB_B3 b3, __TBB_B4 b4, __TBB_B5 b5, __TBB_B6 b6, __TBB_B7 b7, __TBB_B8 b8, __TBB_B9 b9)
void set_owner(successor_type *owner)
tbb::flow::tuple< receiver< InputTypes > &... > input_ports_type
internal::multifunction_input< input_type, output_ports_type, Policy, Allocator > input_impl_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
A cache of predecessors that only supports try_get.
internal::reservable_predecessor_cache< T, spin_mutex > my_predecessors
internal::unfolded_indexer_node< InputTuple > unfolded_type
void set_gateway(gateway_type *gateway)
buffer_node< T, A > class_type
task * try_put_task(const X &t)
receiver< input_type >::predecessor_type predecessor_type
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
sequencer_node(graph &g, const Sequencer &s)
Constructor.
An abstract cache of successors.
broadcast_node(const broadcast_node &src)
indexer_node(const indexer_node &other)
indexer_node(const indexer_node &other)
bool try_reserve(output_type &v) __TBB_override
Reserves an item.
gateway_type * my_gateway
internal::wrap_tuple_elements< N, internal::multifunction_output, Output >::type output_ports_type
internal::aggregating_functor< class_type, buffer_operation > handler_type
internal::unfolded_indexer_node< InputTuple > unfolded_type
internal::multifunction_output< Output > output_port_type
static void fgt_node_with_body(string_index, void *, void *, void *)
void add(untyped_sender &n)
Used to form groups of tasks.
internal::unfolded_join_node< N, queueing_port, OutputTuple, queueing > unfolded_type
virtual bool try_reserve(T &)
Reserves an item in the sender.
output_type my_cached_item
sequencer_node(const sequencer_node &src)
Copy constructor.
bool try_reserve_apply_body(output_type &v)
continue_node(graph &g, int number_of_predecessors,)
Constructor for executable node with continue_msg -> Output.
internal::unfolded_join_node< N, reserving_port, OutputTuple, reserving > unfolded_type
void reset_node(reset_flags f) __TBB_override
void reset_node(reset_flags) __TBB_override
bool try_put(const output_type &i)
void add_visible_nodes(const NodeTypes &... n)
tuple< T0, T1, T2, T3, T4 > InputTuple
void set_external_ports(T &&output_ports_tuple)
void try_put_and_add_task(task *&last_task)
#define __TBB_FLOW_GRAPH_PRIORITY_ARG0(priority)
task * try_put_task(const T &t) __TBB_override
void reset_node(reset_flags f) __TBB_override
static void fgt_end_body(void *)
Implements methods for an executable node that takes continue_msg as input.
internal::decrementer< limiter_node< T > > decrement
The internal receiver< continue_msg > that decrements the count.
sender< output_type >::successor_type successor_type
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
bool try_reserve(T &v) __TBB_override
Reserves an item.
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
void const char const char int ITT_FORMAT __itt_group_sync s
virtual bool remove_successor(successor_type &r)=0
Removes a successor from this node.
leaf for multifunction. OutputSet can be a std::tuple or a vector.
receiver_type::predecessor_type predecessor_type
internal::function_output< output_type > fOutput_type
void prepare_task_arena(bool reinit=false)
Implements methods for a function node that takes a type Input as input.
void internal_forward_task(prio_operation *op) __TBB_override
Tries to forward valid items to successors.
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls...
queue_node(graph &g)
Constructor.
tbb::task_arena * my_task_arena
void prio_push(const T &src)
tbb::flow::tuple< receiver< InputTypes > &... > input_ports_type
void internal_release(prio_operation *op) __TBB_override
task * try_put_task(const TupleType &t) __TBB_override
Put item to successor; return task to run the successor if possible.
bool remove_predecessor(predecessor_type &) __TBB_override
Decrements the trigger threshold.
int my_initial_predecessor_count
task_list_type my_reset_task_list
void execute_in_graph_arena(graph &g, F &f)
Executes custom functor inside graph arena.
async_storage_ptr my_storage
void add_visible_nodes(const NodeTypes &... n)
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1, __TBB_B2 b2)
task * try_put_task(const X &t)
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
void add_nodes(const NodeTypes &... n)
indexer_node(const indexer_node &other)
continue_receiver(const continue_receiver &src)
Copy constructor.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
task * apply_body_bypass()
Applies the body. Returning SUCCESSFULLY_ENQUEUED okay; forward_task_bypass will handle it...
buffer_node< T, A >::buffer_operation prio_operation
virtual bool try_reserve_wrapper(void *p, bool is_async) __TBB_override
Base class for user-defined tasks.
limiter_node(graph &g, size_t threshold, int num_decrement_predecessors=0)
Constructor.
virtual bool try_get_wrapper(void *p, bool is_async) __TBB_override
receiver< input_type >::predecessor_type predecessor_type
void reset_node(reset_flags f) __TBB_override
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
static void fgt_reserve_wait(void *)
implements a function node that supports Input -> (set of outputs)
iterator begin()
start iterator
internal::unfolded_indexer_node< InputTuple > unfolded_type
void reset_receiver(reset_flags f) __TBB_override
put receiver back in initial state
A task that calls a node's forward_task function.
tuple< T0, T1, T2, T3 > InputTuple
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
internal::round_robin_cache< T, null_rw_mutex > my_successors
internal::wrap_tuple_elements< N, internal::multifunction_output, TupleType >::type output_ports_type
void reset_node(reset_flags f) __TBB_override
function_node(graph &g, size_t concurrency,)
Constructor.
static void fgt_multioutput_node_desc(const NodeType *, const char *)
receiver< input_type >::predecessor_type predecessor_type
void reserve_wait() __TBB_override
Inform a graph that messages may come from outside, to prevent premature graph completion.
void reset_node(reset_flags f) __TBB_override
resets the source_node to its initial state
virtual void internal_release(buffer_operation *op)
multifunction_node(graph &g, size_t concurrency,)
static void fgt_node(string_index, void *, void *)
sender< output_type >::successor_type successor_type
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
void internal_forward_task(queue_operation *op) __TBB_override
Tries to forward valid items to successors.
output_ports_type & output_ports()
untyped_receiver successor_type
The successor type for this node.
internal::broadcast_cache< output_type > & successors() __TBB_override
A lock that occupies a single byte.
virtual void internal_forward_task(buffer_operation *op)
Tries to forward valid items to successors.
void handle_operations(prio_operation *op_list) __TBB_override
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
tbb::internal::uint64_t tag_value
bool try_release() __TBB_override
Release a reserved item.
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
bool is_graph_active(graph &g)
void spawn_put()
Spawns a task that applies the body.
void add_nodes(const NodeTypes &... n)
void reset_receiver(reset_flags) __TBB_override
put receiver back in initial state
bool register_successor(successor_type &r) __TBB_override
Replace the current successor with this new successor.
bool try_reserve(X &t)
Reserves an item in the sender.
internal::tagged_msg< size_t, T0, T1, T2, T3, T4, T5, T6 > output_type
const V & cast_to(T const &t)
void internal_consume(prio_operation *op) __TBB_override
void remove(untyped_sender &n)
A cache of successors that are put in a round-robin fashion.
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
graph()
Constructs a graph with isolated task_group_context.
Implements methods for a function node that takes a type Input as input and sends.
void reset_node(reset_flags f) __TBB_override
internal::async_helpers< T >::filtered_type filtered_type
base_type::size_type size_type
Implements an executable node that supports continue_msg -> Output.
tuple< T0, T1, T2 > InputTuple
bool register_successor(successor_type &r) __TBB_override
Add a new successor to this node.
void __TBB_store_with_release(volatile T &location, V value)
async_node(graph &g, size_t concurrency,)
std::unique_ptr< input_ports_type > my_input_ports
Body copy_function_object()
Base class for tasks generated by graph nodes.
void try_put_and_add_task(task *&last_task)
bool try_reserve(T &v) __TBB_override
Reserves an item.
internal::function_output< output_type > fOutput_type
continue_msg input_type
The input type.
virtual bool internal_push(buffer_operation *op)
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
join_node(graph &g, __TBB_B0 b0, __TBB_B1 b1)
receiver< input_type >::predecessor_type predecessor_type
buffer_node< T, A >::size_type size_type
virtual void finalize() const
void set_external_ports(T &&input_ports_tuple)
Enables one or the other code branches.
task * try_put_task(const input_type &) __TBB_override
Put item to successor; return task to run the successor if possible.
tuple< T0, T1, T2, T3, T4, T5, T6, T7, T8, T9 > InputTuple
internal::unfolded_indexer_node< InputTuple > unfolded_type
void reset_node(reset_flags) __TBB_override
indexer_node(const indexer_node &other)
void operator()(const Input &v, Ports &)
virtual source_body * clone()=0
virtual task * try_put_task(const T &t)=0
Put item to successor; return task to run the successor if possible.
base_type::output_ports_type output_ports_type
tuple< T0, T1, T2, T3, T4, T5, T6 > InputTuple
static void fgt_node_desc(const NodeType *, const char *)
Forwards messages only if the threshold has not been reached.
internal::function_body< T, size_t > * my_sequencer
receiver< input_type >::predecessor_type predecessor_type