Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
tbb::pipeline Class Reference

A processing pipeline that applies filters to items. More...

#include <pipeline.h>

Collaboration diagram for tbb::pipeline:

Public Member Functions

__TBB_EXPORTED_METHOD pipeline ()
 Construct empty pipeline. More...
 
virtual __TBB_EXPORTED_METHOD ~pipeline ()
 
void __TBB_EXPORTED_METHOD add_filter (filter &filter_)
 Add filter to end of pipeline. More...
 
void __TBB_EXPORTED_METHOD run (size_t max_number_of_live_tokens)
 Run the pipeline to completion. More...
 
void __TBB_EXPORTED_METHOD run (size_t max_number_of_live_tokens, tbb::task_group_context &context)
 Run the pipeline to completion with user-supplied context. More...
 
void __TBB_EXPORTED_METHOD clear ()
 Remove all filters from the pipeline. More...
 

Private Member Functions

void remove_filter (filter &filter_)
 Remove filter from pipeline. More...
 
void __TBB_EXPORTED_METHOD inject_token (task &self)
 Not used, but retained to satisfy old export files. More...
 
void clear_filters ()
 Does clean up if pipeline is cancelled or exception occurred. More...
 

Private Attributes

filterfilter_list
 Pointer to first filter in the pipeline. More...
 
filterfilter_end
 Pointer to location where address of next filter to be added should be stored. More...
 
taskend_counter
 task who's reference count is used to determine when all stages are done. More...
 
atomic< internal::Tokeninput_tokens
 Number of idle tokens waiting for input stage. More...
 
atomic< internal::Tokentoken_counter
 Global counter of tokens. More...
 
bool end_of_input
 False until fetch_input returns NULL. More...
 
bool has_thread_bound_filters
 True if the pipeline contains a thread-bound filter; false otherwise. More...
 

Friends

class internal::stage_task
 
class internal::pipeline_root_task
 
class filter
 
class thread_bound_filter
 
class internal::pipeline_cleaner
 
class tbb::interface6::internal::pipeline_proxy
 

Detailed Description

A processing pipeline that applies filters to items.

Definition at line 236 of file pipeline.h.

Constructor & Destructor Documentation

◆ pipeline()

tbb::pipeline::pipeline ( )

Construct empty pipeline.

Definition at line 518 of file pipeline.cpp.

References input_tokens, and token_counter.

518  :
519  filter_list(NULL),
520  filter_end(NULL),
521  end_counter(NULL),
522  end_of_input(false),
524 {
525  token_counter = 0;
526  input_tokens = 0;
527 }
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
Definition: pipeline.h:277
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:286
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
task * end_counter
task who&#39;s reference count is used to determine when all stages are done.
Definition: pipeline.h:274
atomic< internal::Token > token_counter
Global counter of tokens.
Definition: pipeline.h:280
bool end_of_input
False until fetch_input returns NULL.
Definition: pipeline.h:283

◆ ~pipeline()

tbb::pipeline::~pipeline ( )
virtual

Though the current implementation declares the destructor virtual, do not rely on this detail. The virtualness is deprecated and may disappear in future versions of TBB.

Definition at line 529 of file pipeline.cpp.

References clear().

529  {
530  clear();
531 }
void __TBB_EXPORTED_METHOD clear()
Remove all filters from the pipeline.
Definition: pipeline.cpp:533
Here is the call graph for this function:

Member Function Documentation

◆ add_filter()

void tbb::pipeline::add_filter ( filter filter_)

Add filter to end of pipeline.

Definition at line 552 of file pipeline.cpp.

References __TBB_ASSERT, __TBB_PIPELINE_VERSION, end_counter, filter_end, filter_list, has_thread_bound_filters, tbb::filter::is_bound(), tbb::filter::is_ordered(), tbb::filter::is_serial(), tbb::filter::my_filter_mode, tbb::filter::my_input_buffer, tbb::filter::my_pipeline, tbb::filter::next_filter_in_pipeline, tbb::filter::not_in_pipeline(), tbb::filter::object_may_be_null(), tbb::filter::prev_filter_in_pipeline, and tbb::filter::version_mask.

552  {
553 #if TBB_USE_ASSERT
554  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) )
555  __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
556  __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
557  __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
558 #endif
559  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
560  filter_.my_pipeline = this;
561  filter_.prev_filter_in_pipeline = filter_end;
562  if ( filter_list == NULL)
563  filter_list = &filter_;
564  else
566  filter_.next_filter_in_pipeline = NULL;
567  filter_end = &filter_;
568  }
569  else
570  {
571  if( !filter_end )
572  filter_end = reinterpret_cast<filter*>(&filter_list);
573 
574  *reinterpret_cast<filter**>(filter_end) = &filter_;
575  filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
576  *reinterpret_cast<filter**>(filter_end) = NULL;
577  }
578  if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
579  if( filter_.is_serial() ) {
580  if( filter_.is_bound() )
582  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
583  }
584  else {
585  if(filter_.prev_filter_in_pipeline) {
586  if(filter_.prev_filter_in_pipeline->is_bound()) {
587  // successors to bound filters must have an input_buffer
588  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
589  }
590  }
591  else { // input filter
592  if(filter_.object_may_be_null() ) {
593  //TODO: buffer only needed to hold TLS; could improve
594  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
595  filter_.my_input_buffer->create_my_tls();
596  }
597  }
598  }
599  } else {
600  if( filter_.is_serial() ) {
601  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
602  }
603  }
604 
605 }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:286
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:42
task * end_counter
task who&#39;s reference count is used to determine when all stages are done.
Definition: pipeline.h:274
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:68
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:160
friend class filter
Definition: pipeline.h:262
static const unsigned char version_mask
Definition: pipeline.h:93
Here is the call graph for this function:

◆ clear()

void tbb::pipeline::clear ( )

Remove all filters from the pipeline.

Definition at line 533 of file pipeline.cpp.

References __TBB_PIPELINE_VERSION, filter_end, filter_list, tbb::filter::my_input_buffer, tbb::filter::next_filter_in_pipeline, tbb::filter::not_in_pipeline(), and tbb::filter::version_mask.

Referenced by tbb::internal::input_buffer::note_done(), and ~pipeline().

533  {
534  filter* next;
535  for( filter* f = filter_list; f; f=next ) {
536  if( internal::input_buffer* b = f->my_input_buffer ) {
537  delete b;
538  f->my_input_buffer = NULL;
539  }
540  next=f->next_filter_in_pipeline;
541  f->next_filter_in_pipeline = filter::not_in_pipeline();
542  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
543  f->prev_filter_in_pipeline = filter::not_in_pipeline();
544  f->my_pipeline = NULL;
545  }
546  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
547  f->next_segment = NULL;
548  }
549  filter_list = filter_end = NULL;
550 }
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:42
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:68
friend class filter
Definition: pipeline.h:262
static const unsigned char version_mask
Definition: pipeline.h:93
Here is the call graph for this function:
Here is the caller graph for this function:

◆ clear_filters()

void tbb::pipeline::clear_filters ( )
private

Does clean up if pipeline is cancelled or exception occurred.

Referenced by inject_token(), and tbb::internal::pipeline_cleaner::~pipeline_cleaner().

Here is the caller graph for this function:

◆ inject_token()

void tbb::pipeline::inject_token ( task self)
private

Not used, but retained to satisfy old export files.

Definition at line 504 of file pipeline.cpp.

References __TBB_ASSERT, __TBB_PIPELINE_VERSION, clear_filters(), tbb::filter::next_filter_in_pipeline, and tbb::filter::version_mask.

504  {
505  __TBB_ASSERT(false,"illegal call to inject_token");
506 }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
Here is the call graph for this function:

◆ remove_filter()

void tbb::pipeline::remove_filter ( filter filter_)
private

Remove filter from pipeline.

Definition at line 607 of file pipeline.cpp.

References __TBB_ASSERT, __TBB_PIPELINE_VERSION, end_counter, filter_end, filter_list, tbb::filter::my_filter_mode, tbb::filter::my_input_buffer, tbb::filter::my_pipeline, tbb::filter::next_filter_in_pipeline, tbb::filter::next_segment, tbb::filter::not_in_pipeline(), tbb::filter::prev_filter_in_pipeline, and tbb::filter::version_mask.

607  {
608  __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
609  __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
610  __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
611  if (&filter_ == filter_list)
613  else {
614  __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
615  filter_.prev_filter_in_pipeline->next_filter_in_pipeline = filter_.next_filter_in_pipeline;
616  }
617  if (&filter_ == filter_end)
619  else {
620  __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
621  filter_.next_filter_in_pipeline->prev_filter_in_pipeline = filter_.prev_filter_in_pipeline;
622  }
623  if( internal::input_buffer* b = filter_.my_input_buffer ) {
624  delete b;
625  filter_.my_input_buffer = NULL;
626  }
627  filter_.next_filter_in_pipeline = filter_.prev_filter_in_pipeline = filter::not_in_pipeline();
628  if ( (filter_.my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
629  filter_.next_segment = NULL;
630  filter_.my_pipeline = NULL;
631 }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:42
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
Definition: pipeline.h:185
task * end_counter
task who&#39;s reference count is used to determine when all stages are done.
Definition: pipeline.h:274
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:68
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:160
static const unsigned char version_mask
Definition: pipeline.h:93
Here is the call graph for this function:

◆ run() [1/2]

void tbb::pipeline::run ( size_t  max_number_of_live_tokens)

Run the pipeline to completion.

Definition at line 633 of file pipeline.cpp.

References __TBB_ASSERT, tbb::task::allocate_root(), tbb::task_group_context::bound, tbb::task_group_context::default_traits, end_counter, end_of_input, tbb::filter::exact_exception_propagation, filter_list, has_thread_bound_filters, input_tokens, internal::pipeline_cleaner, internal::pipeline_root_task, tbb::filter::is_bound(), tbb::filter::my_filter_mode, tbb::filter::my_input_buffer, tbb::filter::next_filter_in_pipeline, and tbb::task::spawn_root_and_wait().

637  {
638  __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
639  __TBB_ASSERT( !end_counter, "pipeline already running?" );
640  if( filter_list ) {
641  internal::pipeline_cleaner my_pipeline_cleaner(*this);
642  end_of_input = false;
643  input_tokens = internal::Token(max_number_of_live_tokens);
645  // release input filter if thread-bound
646  if(filter_list->is_bound()) {
647  filter_list->my_input_buffer->sema_V();
648  }
649  }
650 #if __TBB_TASK_GROUP_CONTEXT
651  end_counter = new( task::allocate_root(context) ) internal::pipeline_root_task( *this );
652 #else
653  end_counter = new( task::allocate_root() ) internal::pipeline_root_task( *this );
654 #endif
655  // Start execution of tasks
657 
660  if(f->is_bound()) {
661  f->my_input_buffer->sema_V(); // wake to end
662  }
663  }
664  }
665  }
666 }
unsigned long Token
Definition: pipeline.h:44
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
Definition: pipeline.h:277
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:139
friend class internal::pipeline_cleaner
Definition: pipeline.h:264
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
Definition: pipeline.h:174
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:286
task * end_counter
task who&#39;s reference count is used to determine when all stages are done.
Definition: pipeline.h:274
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:636
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:160
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
Definition: task.h:781
friend class filter
Definition: pipeline.h:262
bool end_of_input
False until fetch_input returns NULL.
Definition: pipeline.h:283
Here is the call graph for this function:

◆ run() [2/2]

void __TBB_EXPORTED_METHOD tbb::pipeline::run ( size_t  max_number_of_live_tokens,
tbb::task_group_context context 
)

Run the pipeline to completion with user-supplied context.

Friends And Related Function Documentation

◆ filter

friend class filter
friend

Definition at line 262 of file pipeline.h.

◆ internal::pipeline_cleaner

friend class internal::pipeline_cleaner
friend

Definition at line 264 of file pipeline.h.

Referenced by run().

◆ internal::pipeline_root_task

friend class internal::pipeline_root_task
friend

Definition at line 261 of file pipeline.h.

Referenced by run().

◆ internal::stage_task

friend class internal::stage_task
friend

Definition at line 260 of file pipeline.h.

◆ tbb::interface6::internal::pipeline_proxy

Definition at line 265 of file pipeline.h.

◆ thread_bound_filter

friend class thread_bound_filter
friend

Definition at line 263 of file pipeline.h.

Member Data Documentation

◆ end_counter

task* tbb::pipeline::end_counter
private

task who's reference count is used to determine when all stages are done.

Definition at line 274 of file pipeline.h.

Referenced by add_filter(), remove_filter(), run(), and tbb::internal::pipeline_cleaner::~pipeline_cleaner().

◆ end_of_input

bool tbb::pipeline::end_of_input
private

False until fetch_input returns NULL.

Definition at line 283 of file pipeline.h.

Referenced by tbb::internal::pipeline_root_task::execute(), and run().

◆ filter_end

filter* tbb::pipeline::filter_end
private

Pointer to location where address of next filter to be added should be stored.

Definition at line 271 of file pipeline.h.

Referenced by add_filter(), clear(), and remove_filter().

◆ filter_list

◆ has_thread_bound_filters

bool tbb::pipeline::has_thread_bound_filters
private

True if the pipeline contains a thread-bound filter; false otherwise.

Definition at line 286 of file pipeline.h.

Referenced by add_filter(), and run().

◆ input_tokens

atomic<internal::Token> tbb::pipeline::input_tokens
private

Number of idle tokens waiting for input stage.

Definition at line 277 of file pipeline.h.

Referenced by tbb::internal::pipeline_root_task::execute(), pipeline(), and run().

◆ token_counter

atomic<internal::Token> tbb::pipeline::token_counter
private

Global counter of tokens.

Definition at line 280 of file pipeline.h.

Referenced by pipeline().


The documentation for this class was generated from the following files:

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.