21 #ifndef __TBB_pipeline_H 22 #define __TBB_pipeline_H 29 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT || __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT 30 #include <type_traits> 42 #define __TBB_PIPELINE_VERSION(x) ((unsigned char)(x-2)<<1) 53 namespace interface6 {
54 template<
typename T,
typename U>
class filter_t;
71 static const unsigned char filter_is_serial = 0x1;
76 static const unsigned char filter_is_out_of_order = 0x1<<4;
79 static const unsigned char filter_is_bound = 0x1<<5;
82 static const unsigned char filter_may_emit_null = 0x1<<6;
85 static const unsigned char exact_exception_propagation =
86 #if TBB_USE_CAPTURED_EXCEPTION 93 static const unsigned char version_mask = 0x7<<1;
97 parallel = current_version | filter_is_out_of_order,
99 serial_in_order = current_version | filter_is_serial,
101 serial_out_of_order = current_version | filter_is_serial | filter_is_out_of_order,
107 next_filter_in_pipeline(not_in_pipeline()),
108 my_input_buffer(NULL),
109 my_filter_mode(static_cast<unsigned char>((is_serial_ ?
serial : parallel) | exact_exception_propagation)),
110 prev_filter_in_pipeline(not_in_pipeline()),
116 next_filter_in_pipeline(not_in_pipeline()),
117 my_input_buffer(NULL),
118 my_filter_mode(static_cast<unsigned char>(filter_mode | exact_exception_propagation)),
119 prev_filter_in_pipeline(not_in_pipeline()),
130 return bool( my_filter_mode & filter_is_serial );
135 return (my_filter_mode & (filter_is_out_of_order|filter_is_serial))==filter_is_serial;
140 return ( my_filter_mode & filter_is_bound )==filter_is_bound;
145 return ( my_filter_mode & filter_may_emit_null ) == filter_may_emit_null;
150 virtual void* operator()(
void* item ) = 0;
156 #if __TBB_TASK_GROUP_CONTEXT 160 virtual void finalize(
void* ) {};
170 bool has_more_work();
176 friend class internal::stage_task;
177 friend class internal::pipeline_root_task;
231 result_type internal_process_item(
bool is_blocking);
251 #if __TBB_TASK_GROUP_CONTEXT 260 friend class internal::stage_task;
261 friend class internal::pipeline_root_task;
264 friend class internal::pipeline_cleaner;
289 void remove_filter(
filter& filter_ );
294 #if __TBB_TASK_GROUP_CONTEXT 295 void clear_filters();
304 namespace interface6 {
314 template<
typename T,
typename U,
typename Body>
friend class internal::concrete_filter;
316 void stop() { is_pipeline_stopped =
true; }
325 #if __TBB_CPP11_TYPE_PROPERTIES_PRESENT 327 #elif __TBB_TR1_TYPE_PROPERTIES_IN_STD_PRESENT 341 #endif // Obtaining type properties 355 pointer output_t = allocator().allocate(1);
356 return new (output_t) T(source);
358 static value_type &
token(pointer & t) {
return *t;}
362 allocator().destroy(token);
363 allocator().deallocate(token,1);
373 static pointer
create_token(
const value_type & source) {
return source; }
374 static value_type &
token(pointer & t) {
return t;}
386 } type_to_void_ptr_map;
392 static value_type &
token(pointer & t) {
return t;}
394 type_to_void_ptr_map mymap;
395 mymap.void_overlay = NULL;
396 mymap.actual_value = ref;
397 return mymap.void_overlay;
400 type_to_void_ptr_map mymap;
401 mymap.void_overlay = ref;
402 return mymap.actual_value;
407 template<
typename T,
typename U,
typename Body>
416 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
417 u_pointer output_u = u_helper::create_token(my_body(t_helper::token(temp_input)));
418 t_helper::destroy_token(temp_input);
419 return u_helper::cast_to_void_ptr(output_u);
423 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
424 t_helper::destroy_token(temp_input);
432 template<
typename U,
typename Body>
440 u_pointer output_u = u_helper::create_token(my_body(control));
442 u_helper::destroy_token(output_u);
446 return u_helper::cast_to_void_ptr(output_u);
456 template<
typename T,
typename Body>
463 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
464 my_body(t_helper::token(temp_input));
465 t_helper::destroy_token(temp_input);
469 t_pointer temp_input = t_helper::cast_from_void_ptr(input);
470 t_helper::destroy_token(temp_input);
477 template<
typename Body>
513 #ifdef __TBB_TEST_FILTER_NODE_COUNT 514 ++(__TBB_TEST_FILTER_NODE_COUNT);
519 virtual void add_to(
pipeline& ) = 0;
529 #ifdef __TBB_TEST_FILTER_NODE_COUNT 530 --(__TBB_TEST_FILTER_NODE_COUNT);
536 template<
typename T,
typename U,
typename Body>
572 template<
typename T,
typename U,
typename Body>
574 return new internal::filter_node_leaf<T,U,Body>(
mode, body);
577 template<
typename T,
typename V,
typename U>
579 __TBB_ASSERT(left.
root,
"cannot use default-constructed filter_t as left argument of '&'");
580 __TBB_ASSERT(right.
root,
"cannot use default-constructed filter_t as right argument of '&'");
581 return new internal::filter_node_join(*left.
root,*right.
root);
585 template<
typename T,
typename U>
592 friend class internal::pipeline_proxy;
593 template<
typename T_,
typename U_,
typename Body>
595 template<
typename T_,
typename V_,
typename U_>
601 if( root ) root->add_ref();
603 template<
typename Body>
605 root( new internal::filter_node_leaf<T,U,Body>(mode, body) ) {
612 filter_node* old = root;
614 if( root ) root->add_ref();
615 if( old ) old->remove_ref();
618 if( root ) root->remove_ref();
623 filter_node* old = root;
631 __TBB_ASSERT( filter_chain.
root,
"cannot apply parallel_pipeline to default-constructed filter_t" );
640 internal::pipeline_proxy pipe(filter_chain);
642 pipe->run(max_number_of_live_tokens
643 #
if __TBB_TASK_GROUP_CONTEXT
649 #if __TBB_TASK_GROUP_CONTEXT 654 #endif // __TBB_TASK_GROUP_CONTEXT The class that represents an object of the pipeline for parallel_pipeline().
void * operator()(void *input) __TBB_override
Operate on an item from the input stream, and return item for output stream.
pipeline * my_pipeline
Pointer to the pipeline.
internal::filter_node filter_node
static void * cast_to_void_ptr(pointer ref)
A processing pipeline that applies filters to items.
Node in parse tree representing result of make_filter.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
filter * filter_list
Pointer to first filter in the pipeline.
concrete_filter(filter::mode filter_mode, const Body &body)
filter_t< T, U > make_filter(tbb::filter::mode mode, const Body &body)
Create a filter to participate in parallel_pipeline.
void * operator()(void *) __TBB_override
static pointer cast_from_void_ptr(void *ref)
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
concrete_filter(tbb::filter::mode filter_mode, const Body &body)
static void * cast_to_void_ptr(pointer ref)
void parallel_pipeline(size_t max_number_of_live_tokens, const filter_t< void, void > &filter_chain, tbb::task_group_context &context)
void operator=(const filter_t< T, U > &rhs)
filter_node_join(filter_node &x, filter_node &y)
void * operator()(void *) __TBB_override
Operate on an item from the input stream, and return item for output stream.
t_helper::pointer t_pointer
concrete_filter(tbb::filter::mode filter_mode, const Body &body)
void add_to(pipeline &p) __TBB_override
Add concrete_filter to pipeline.
filter * next_segment
Pointer to the next "segment" of filters, or NULL if not required.
filter_t(const filter_t< T, U > &rhs)
concrete_filter(tbb::filter::mode filter_mode, const Body &body)
const unsigned char my_filter_mode
Storage for filter mode and dynamically checked implementation version.
filter_node_leaf(tbb::filter::mode m, const Body &b)
filter_t(tbb::filter::mode mode, const Body &body)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t mode
token_helper< T, is_large_object< T >::value > t_helper
bool is_bound() const
True if filter is thread-bound.
filter_t< T, U > operator &(const filter_t< T, V > &left, const filter_t< V, U > &right)
void const char const char int ITT_FORMAT __itt_group_sync p
A buffer of input items for a filter.
static void destroy_token(pointer)
static pointer create_token(const value_type &source)
token_helper< T, is_large_object< T >::value > t_helper
#define __TBB_EXPORTED_METHOD
token_helper< U, is_large_object< U >::value > u_helper
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
const tbb::filter::mode mode
void finalize(void *input) __TBB_override
Destroys item if pipeline was cancelled.
tbb::pipeline * operator->()
t_helper::pointer t_pointer
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
static pointer create_token(const value_type &source)
static value_type & token(pointer &t)
static void destroy_token(pointer token)
Class representing a chain of type-safe pipeline filters.
#define __TBB_PIPELINE_VERSION(x)
u_helper::pointer u_pointer
static void destroy_token(pointer)
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
task * end_counter
task who's reference count is used to determine when all stages are done.
void finalize(void *input) __TBB_override
Destroys item if pipeline was cancelled.
virtual void add_to(pipeline &)=0
Add concrete_filter to pipeline.
input_filter control to signal end-of-input for parallel_pipeline
Used to form groups of tasks.
static pointer create_token(const value_type &source)
thread_bound_filter(mode filter_mode)
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
A stage in a pipeline served by a user thread.
void remove_ref()
Decrement reference count and delete if it becomes zero.
Abstract base class that represents a node in a parse tree underlying a filter_t. ...
void * operator()(void *input) __TBB_override
Operate on an item from the input stream, and return item for output stream.
static void * cast_to_void_ptr(pointer ref)
static pointer cast_from_void_ptr(void *ref)
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
token_helper< U, is_large_object< U >::value > u_helper
atomic< internal::Token > token_counter
Global counter of tokens.
static value_type & token(pointer &t)
Base class for user-defined tasks.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id ITT_FORMAT p const wchar_t int ITT_FORMAT __itt_group_mark d int
bool is_ordered() const
True if filter must receive stream in order.
bool end_of_input
False until fetch_input returns NULL.
void add_to(pipeline &p) __TBB_override
Add concrete_filter to pipeline.
void add_ref()
Increment reference count.
Base class for types that should not be copied or assigned.
static pointer cast_from_void_ptr(void *ref)
u_helper::pointer u_pointer
tbb::tbb_allocator< T > allocator
bool object_may_be_null()
true if an input filter can emit null
#define __TBB_TASK_GROUP_CONTEXT
bool is_serial() const
True if filter is serial.
static value_type & token(pointer &t)
static const unsigned char filter_is_serial
The lowest bit 0 is for parallel vs. serial.
Node in parse tree representing join of two filters.
filter_t(filter_node *root_)
tbb::atomic< intptr_t > ref_count
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.