21 #ifndef __TBB__flow_graph_node_impl_H 22 #define __TBB__flow_graph_node_impl_H 24 #ifndef __TBB_flow_graph_H 25 #error Do not #include this internal file directly; use public TBB headers instead. 37 template<
typename T,
typename A >
64 template<
typename Input,
typename Policy,
typename A,
typename ImplType >
67 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 68 , add_blt_pred, del_blt_pred,
69 blt_pred_cnt, blt_pred_cpy
83 "queueing and rejecting policies can't be specified simultaneously");
85 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 86 typedef typename predecessor_cache_type::built_predecessors_type built_predecessors_type;
87 typedef typename receiver<input_type>::predecessor_list_type predecessor_list_type;
93 ) : my_graph_ref(g), my_max_concurrency(max_concurrency)
96 , forwarder_busy(false)
98 my_predecessors.set_owner(
this);
104 : receiver<Input>(),
tbb::internal::no_assign()
105 , my_graph_ref(src.my_graph_ref), my_max_concurrency(src.my_max_concurrency)
107 , my_queue(src.my_queue ? new input_queue_type() : NULL), forwarder_busy(false)
109 my_predecessors.set_owner(
this);
118 if ( my_queue )
delete my_queue;
127 operation_type op_data(reg_pred);
129 my_aggregator.execute(&op_data);
135 operation_type op_data(rem_pred);
137 my_aggregator.execute(&op_data);
141 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 142 void internal_add_built_predecessor( predecessor_type &src)
__TBB_override {
144 operation_type op_data(add_blt_pred);
146 my_aggregator.execute(&op_data);
150 void internal_delete_built_predecessor( predecessor_type &src)
__TBB_override {
151 operation_type op_data(del_blt_pred);
153 my_aggregator.execute(&op_data);
157 operation_type op_data(blt_pred_cnt);
158 my_aggregator.execute(&op_data);
159 return op_data.cnt_val;
163 operation_type op_data(blt_pred_cpy);
165 my_aggregator.execute(&op_data);
169 return my_predecessors.built_predecessors();
181 forwarder_busy =
false;
188 input_queue_type *my_queue;
194 my_predecessors.
reset();
195 __TBB_ASSERT(!(f & rf_clear_edges) || my_predecessors.
empty(),
"function_input_base reset failed");
203 operation_type op_data(i, app_body_bypass);
204 my_aggregator.execute(&op_data);
205 return op_data.bypass_t;
219 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 221 predecessor_list_type *predv;
226 type(char(t)), elem(const_cast<input_type*>(&e)) {}
231 typedef internal::aggregating_functor<class_type, operation_type>
handler_type;
232 friend class internal::aggregating_functor<class_type, operation_type>;
236 task* new_task = NULL;
238 if(!my_queue->empty()) {
240 new_task = create_body_task(my_queue->front());
249 new_task = create_body_task(i);
258 op_list = op_list->next;
261 my_predecessors.
add(*(tmp->r));
263 if (!forwarder_busy) {
264 forwarder_busy =
true;
265 spawn_forward_task();
269 my_predecessors.
remove(*(tmp->r));
272 case app_body_bypass: {
273 tmp->bypass_t = NULL;
276 if(my_concurrency<my_max_concurrency)
277 tmp->bypass_t = perform_queued_requests();
282 case tryput_bypass: internal_try_put_task(tmp);
break;
283 case try_fwd: internal_forward(tmp);
break;
284 case occupy_concurrency:
285 if (my_concurrency < my_max_concurrency) {
292 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 294 my_predecessors.internal_add_built_predecessor(*(tmp->r));
299 my_predecessors.internal_delete_built_predecessor(*(tmp->r));
303 tmp->cnt_val = my_predecessors.predecessor_count();
307 my_predecessors.copy_predecessors( *(tmp->predv) );
318 if (my_concurrency < my_max_concurrency) {
320 task * new_task = create_body_task(*(op->elem));
321 op->bypass_t = new_task;
323 }
else if ( my_queue && my_queue->push(*(op->elem)) ) {
335 if (my_concurrency < my_max_concurrency || !my_max_concurrency)
336 op->bypass_t = perform_queued_requests();
340 forwarder_busy =
false;
346 operation_type op_data(t, tryput_bypass);
347 my_aggregator.
execute(&op_data);
349 return op_data.bypass_t;
355 if( my_max_concurrency == 0 ) {
356 return apply_body_bypass(t);
358 operation_type check_op(t, occupy_concurrency);
359 my_aggregator.
execute(&check_op);
361 return apply_body_bypass(t);
363 return internal_try_put_bypass(t);
368 if( my_max_concurrency == 0 ) {
369 return create_body_task(t);
371 return internal_try_put_bypass(t);
378 return static_cast<ImplType *
>(
this)->apply_body_impl_bypass(i);
384 new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
392 operation_type op_data(try_fwd);
395 op_data.status =
WAIT;
396 my_aggregator.
execute(&op_data);
398 task* ttask = op_data.bypass_t;
408 new( task::allocate_additional_child_of(*(my_graph_ref.root_task())) )
415 task* tp = create_forward_task();
424 template<
typename Input,
typename Output,
typename Policy,
typename A>
435 template<
typename Body>
441 , my_init_body( new internal::
function_body_leaf< input_type, output_type, Body>(body) ) {
447 my_body( src.my_init_body->clone() ),
448 my_init_body(src.my_init_body->clone() ) {
456 template<
typename Body >
458 function_body_type &body_ref = *this->my_body;
466 output_type v = (*my_body)(i);
473 output_type v = apply_body_impl(i);
474 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER 475 task* successor_task = successors().try_put_task(v);
477 task* postponed_task = NULL;
478 if( base_type::my_max_concurrency != 0 ) {
479 postponed_task = base_type::try_get_postponed_task(i);
482 #if TBB_DEPRECATED_MESSAGE_FLOW_ORDER 483 graph& g = base_type::my_graph_ref;
486 if( postponed_task ) {
491 task* successor_task = successors().try_put_task(v);
492 #if _MSC_VER && !__INTEL_COMPILER 493 #pragma warning (push) 494 #pragma warning (disable: 4127) 497 #if _MSC_VER && !__INTEL_COMPILER 498 #pragma warning (pop) 500 if(!successor_task) {
506 return successor_task;
513 base_type::reset_function_input_base(f);
515 function_body_type *tmp = my_init_body->clone();
531 (
void)tbb::flow::get<N-1>(p).successors().clear();
535 if(tbb::flow::get<N-1>(p).successors().
empty())
543 (
void)tbb::flow::get<0>(p).successors().clear();
546 return tbb::flow::get<0>(
p).successors().empty();
550 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 552 template<
int N>
struct extract_element {
553 template<
typename P>
static void extract_this(P &
p) {
554 (
void)tbb::flow::get<N-1>(p).successors().built_successors().sender_extract(tbb::flow::get<N-1>(p));
555 extract_element<N-1>::extract_this(p);
559 template<>
struct extract_element<1> {
560 template<
typename P>
static void extract_this(P &
p) {
561 (
void)tbb::flow::get<0>(p).successors().built_successors().sender_extract(tbb::flow::get<0>(p));
568 template<
typename Input,
typename OutputPortSet,
typename Policy,
typename A>
580 template<
typename Body>
591 my_body( src.my_init_body->clone() ),
592 my_init_body(src.my_init_body->clone() ) {
600 template<
typename Body >
602 multifunction_body_type &body_ref = *this->my_body;
611 (*my_body)(i, my_output_ports);
614 if(base_type::my_max_concurrency != 0) {
615 ttask = base_type::try_get_postponed_task(i);
623 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 625 extract_element<N>::extract_this(my_output_ports);
630 base_type::reset_function_input_base(f);
633 multifunction_body_type *tmp = my_init_body->clone();
647 template<
size_t N,
typename MOP>
649 return tbb::flow::get<N>(op.output_ports());
661 template<
typename T,
typename P>
664 task* last_task = tbb::flow::get<N-1>(
p).try_put_task(tbb::flow::get<N-1>(t));
672 template<
typename T,
typename P>
674 task* last_task = tbb::flow::get<0>(
p).try_put_task(tbb::flow::get<0>(t));
681 template<
typename Output,
typename Policy>
693 template<
typename Body >
698 , my_init_body( new internal::
function_body_leaf< input_type, output_type, Body>(body) )
701 template<
typename Body >
707 , my_init_body( new internal::
function_body_leaf< input_type, output_type, Body>(body) )
711 my_graph_ref(src.my_graph_ref),
712 my_body( src.my_init_body->clone() ),
713 my_init_body( src.my_init_body->clone() ) {}
720 template<
typename Body >
722 function_body_type &body_ref = *my_body;
727 continue_receiver::reset_receiver(f);
729 function_body_type *tmp = my_init_body->clone();
750 output_type v = (*my_body)( continue_msg() );
752 return successors().try_put_task( v );
759 #if _MSC_VER && !__INTEL_COMPILER 760 #pragma warning (push) 761 #pragma warning (disable: 4127) 764 #if _MSC_VER && !__INTEL_COMPILER 765 #pragma warning (pop) 767 return apply_body_bypass( continue_msg() );
770 return new ( task::allocate_additional_child_of( *(my_graph_ref.root_task()) ) )
782 template<
typename Output >
790 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 791 typedef typename sender<output_type>::built_successors_type built_successors_type;
792 typedef typename sender<output_type>::successor_list_type successor_list_type;
797 my_successors.set_owner(
this);
802 successors().register_successor( r );
808 successors().remove_successor( r );
812 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION 813 built_successors_type &built_successors()
__TBB_override {
return successors().built_successors(); }
816 void internal_add_built_successor( successor_type &r)
__TBB_override {
817 successors().internal_add_built_successor( r );
820 void internal_delete_built_successor( successor_type &r)
__TBB_override {
821 successors().internal_delete_built_successor( r );
825 return successors().successor_count();
829 successors().copy_successors(v);
842 return my_successors.try_put_task(i);
851 template<
typename Output >
856 using base_type::my_successors;
862 task *res = try_put_task(i);
863 if(!res)
return false;
873 return my_successors.try_put_task(i);
881 #if __TBB_FLOW_GRAPH_CPP11_FEATURES 882 template<
typename CompositeType>
885 template<
typename CompositeType,
typename NodeType1,
typename... NodeTypes >
886 void add_nodes_impl(CompositeType *c_node,
bool visible,
const NodeType1& n1,
const NodeTypes&... n) {
887 void *
addr =
const_cast<NodeType1 *
>(&n1);
896 #endif // __TBB__flow_graph_node_impl_H bool pop_front(item_type &v)
static tbb::task *const SUCCESSFULLY_ENQUEUED
task * try_put_task(const input_type &t) __TBB_override
Put item to successor; return task to run the successor if possible.
static bool this_empty(P &p)
A functor that takes an Input and generates an Output.
function_body_type * my_init_body
multifunction_input(const multifunction_input &src)
Copy constructor.
const size_t my_max_concurrency
predecessor_cache< input_type, null_mutex > my_predecessors
void internal_forward(operation_type *op)
Creates tasks for postponed messages if available and if concurrency allows.
operation_type(const input_type &e, op_type t)
multifunction_output(const multifunction_output &)
bool buffer_empty() const
function_input_base< Input, Policy, A, ImplType > class_type
bool get_item(output_type &v)
void spawn_forward_task()
Spawns a task that calls forward()
function_input< Input, Output, Policy, A > my_class
multifunction_body_type * my_init_body
#define __TBB_STATIC_ASSERT(condition, msg)
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
function_body_type * my_body
function_input_queue< input_type, A > input_queue_type
continue_input< output_type, Policy > class_type
void internal_try_put_task(operation_type *op)
Put to the node, but return the task instead of enqueueing it.
function_body that takes an Input and a set of output ports
static void clear_this(P &p)
task * create_forward_task()
task * execute() __TBB_override
Base class for types that should not be assigned.
void reset_function_input(reset_flags f)
output_type apply_body_impl(const input_type &i)
bool register_predecessor(predecessor_type &src) __TBB_override
Adds src to the list of cached predecessors.
task * forward_task()
This is executed by an enqueued task, the "forwarder".
Body copy_function_object()
void handle_operations(operation_type *op_list)
static void fgt_begin_body(void *)
function_body_type * my_body
bool remove_predecessor(predecessor_type &src) __TBB_override
Removes src from the list of cached predecessors.
Implements methods for an executable node that takes continue_msg as input.
virtual ~function_input_base()
Destructor.
continue_input(graph &g, int number_of_predecessors,)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
Implements methods for a function node that takes a type Input as input.
function_input_base(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(size_t max_concurrency, node_priority_t priority))
Constructor for function_input_base.
receiver< input_type >::predecessor_type predecessor_type
Implements methods for a function node that takes a type Input as input and sends.
void reset_function_input_base(reset_flags f)
predecessor_cache< input_type, null_mutex > predecessor_cache_type
output_ports_type my_output_ports
internal::aggregating_functor< class_type, operation_type > handler_type
Body copy_function_object()
A task that calls a node's apply_body_bypass function, passing in an input of type Input...
Output output_type
The output type of this receiver.
unsigned int node_priority_t
void const char const char int ITT_FORMAT __itt_group_sync p
function_output(const function_output &)
task * internal_try_put_bypass(const input_type &t)
task * apply_body_impl_bypass(const input_type &i)
static void fgt_alias_port(void *, void *, bool)
static tbb::task * combine_tasks(graph &g, tbb::task *left, tbb::task *right)
bool remove_successor(successor_type &r) __TBB_override
Removes a successor from this node.
static void clear_this(P &p)
sender< output_type >::successor_type successor_type
tbb::flow::tuple_element< N, typename MOP::output_ports_type >::type & output_port(MOP &op)
task * try_get_postponed_task(const input_type &i)
void check_task_and_spawn(graph &g, task *t)
function_output< output_type > base_type
continue_input(const continue_input &src)
int max_concurrency()
Returns the maximal number of threads that can work inside the arena.
static task * emit_this(graph &g, const T &t, P &p)
multifunction_input< Input, OutputPortSet, Policy, A > my_class
broadcast_cache_type & successors()
continue_input(graph &g, __TBB_FLOW_GRAPH_PRIORITY_ARG1(Body &body, node_priority_t priority))
function_input_base(const function_input_base &src)
Copy constructor.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task * task
void reset(reset_flags f)
function_input_base< Input, Policy, A, my_class > base_type
#define __TBB_FLOW_GRAPH_PRIORITY_EXPR(expr)
broadcast_cache_type my_successors
broadcast_cache< output_type > broadcast_cache_type
task * try_put_task_impl(const input_type &t, tbb::internal::true_type)
#define __TBB_FLOW_GRAPH_PRIORITY_ARG1(arg1, priority)
task * perform_queued_requests()
output_ports_type & output_ports()
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
aggregated_operation base class
Input input_type
The input type of this receiver.
function_input(const function_input &src)
Copy constructor.
task * apply_body_bypass(input_type)
Applies the body to the provided input.
static task * emit_this(graph &g, const T &t, P &p)
function_body< input_type, output_type > function_body_type
void add_nodes_impl(CompositeType *, bool)
bool push_back(item_type &v)
multifunction_body< input_type, output_ports_type > multifunction_body_type
task * try_put_task(const output_type &i)
static void fgt_end_body(void *)
task * apply_body_bypass(const input_type &i)
Applies the body to the provided input.
const item_type & front() const
Input and scheduling for a function node that takes a type Input as input.
the leaf for function_body
function_input_base< Input, Policy, A, my_class > base_type
Base class for user-defined tasks.
void execute(operation_type *op)
Body copy_function_object()
function_body_type * my_init_body
task * apply_body_impl_bypass(const input_type &i)
graph & graph_reference() __TBB_override
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
function_input_queue< input_type, A > input_queue_type
multifunction_body_type * my_body
continue_msg input_type
The input type of this receiver.
leaf for multifunction. OutputSet can be a std::tuple or a vector.
task * try_put_task(const output_type &i)
operation_type(op_type t)
bool register_successor(successor_type &r) __TBB_override
Adds a new successor to this node.
function_body< input_type, output_type > function_body_type
bool is_graph_active(graph &g)
static bool this_empty(P &p)
A::template rebind< input_queue_type >::other queue_allocator_type
function_input(graph &g, size_t max_concurrency,)
OutputPortSet output_ports_type
graph & graph_reference() __TBB_override
void reset_receiver(reset_flags f) __TBB_override
void __TBB_store_with_release(volatile T &location, V value)
bool try_put(const output_type &i)
Implements methods for both executable and function nodes that puts Output to its successors...
A task that calls a node's forward_task function.
void reset_receiver(reset_flags f) __TBB_override
task * try_put_task_impl(const input_type &t, tbb::internal::false_type)
function_input_queue< input_type, A > input_queue_type
multifunction_input(graph &g, size_t max_concurrency,)
task * create_body_task(const input_type &input)
allocates a task to apply a body
aggregator< handler_type, operation_type > my_aggregator