Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
pipeline.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "tbb/pipeline.h"
22 #include "tbb/spin_mutex.h"
24 #include "itt_notify.h"
25 #include "semaphore.h"
26 #include "tls.h" // for parallel filters that do not use NULL as end_of_input
27 
28 
29 namespace tbb {
30 
31 namespace internal {
32 
34 struct task_info {
35  void* my_object;
41  bool is_valid;
43  void reset() {
44  my_object = NULL;
45  my_token = 0;
46  my_token_ready = false;
47  is_valid = false;
48  }
49 };
51 
54  friend class tbb::filter;
57  friend class tbb::pipeline;
58 
59  typedef Token size_type;
60 
63 
66 
68 
69  size_type array_size;
70 
72 
74 
77 
79 
80  void grow( size_type minimum_size );
81 
83 
84  static const size_type initial_buffer_size = 4;
85 
88 
90  bool is_ordered;
91 
93  bool is_bound;
94 
97  end_of_input_tls_t end_of_input_tls;
98  bool end_of_input_tls_allocated; // no way to test pthread creation of TLS
99 
100  void create_sema(size_t initial_tokens) { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
101  void free_sema() { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
102  void sema_P() { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
103  void sema_V() { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
104 
105 public:
107  input_buffer( bool is_ordered_, bool is_bound_ ) :
108  array(NULL), my_sem(NULL), array_size(0),
109  low_token(0), high_token(0),
110  is_ordered(is_ordered_), is_bound(is_bound_),
111  end_of_input_tls_allocated(false) {
112  grow(initial_buffer_size);
113  __TBB_ASSERT( array, NULL );
114  if(is_bound) create_sema(0);
115  }
116 
119  __TBB_ASSERT( array, NULL );
120  cache_aligned_allocator<task_info>().deallocate(array,array_size);
121  poison_pointer( array );
122  if(my_sem) {
123  free_sema();
124  }
125  if(end_of_input_tls_allocated) {
126  destroy_my_tls();
127  }
128  }
129 
131 
139  bool put_token( task_info& info_, bool force_put = false ) {
140  {
141  info_.is_valid = true;
142  spin_mutex::scoped_lock lock( array_mutex );
143  Token token;
144  bool was_empty = !array[low_token&(array_size-1)].is_valid;
145  if( is_ordered ) {
146  if( !info_.my_token_ready ) {
147  info_.my_token = high_token++;
148  info_.my_token_ready = true;
149  }
150  token = info_.my_token;
151  } else
152  token = high_token++;
153  __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
154  if( token!=low_token || is_bound || force_put ) {
155  // Trying to put token that is beyond low_token.
156  // Need to wait until low_token catches up before dispatching.
157  if( token-low_token>=array_size )
158  grow( token-low_token+1 );
159  ITT_NOTIFY( sync_releasing, this );
160  array[token&(array_size-1)] = info_;
161  if(was_empty && is_bound) {
162  sema_V();
163  }
164  return true;
165  }
166  }
167  return false;
168  }
169 
171 
172  // Using template to avoid explicit dependency on stage_task
173  // this is only called for serial filters, and is the reason for the
174  // advance parameter in return_item (we're incrementing low_token here.)
175  // Non-TBF serial stages don't advance the token at the start because the presence
176  // of the current token in the buffer keeps another stage from being spawned.
177  template<typename StageTask>
178  void note_done( Token token, StageTask& spawner ) {
179  task_info wakee;
180  wakee.reset();
181  {
182  spin_mutex::scoped_lock lock( array_mutex );
183  if( !is_ordered || token==low_token ) {
184  // Wake the next task
185  task_info& item = array[++low_token & (array_size-1)];
186  ITT_NOTIFY( sync_acquired, this );
187  wakee = item;
188  item.is_valid = false;
189  }
190  }
191  if( wakee.is_valid )
192  spawner.spawn_stage_task(wakee);
193  }
194 
195 #if __TBB_TASK_GROUP_CONTEXT
196  void clear( filter* my_filter ) {
198  long t=low_token;
199  for( size_type i=0; i<array_size; ++i, ++t ){
200  task_info& temp = array[t&(array_size-1)];
201  if (temp.is_valid ) {
202  my_filter->finalize(temp.my_object);
203  temp.is_valid = false;
204  }
205  }
206  }
207 #endif
208 
210  // advance == true for parallel filters. If the filter is serial, leave the
211  // item in the buffer to keep another stage from being spawned.
212  bool return_item(task_info& info, bool advance) {
213  spin_mutex::scoped_lock lock( array_mutex );
214  task_info& item = array[low_token&(array_size-1)];
215  ITT_NOTIFY( sync_acquired, this );
216  if( item.is_valid ) {
217  info = item;
218  item.is_valid = false;
219  if (advance) low_token++;
220  return true;
221  }
222  return false;
223  }
224 
226  bool has_item() { spin_mutex::scoped_lock lock(array_mutex); return array[low_token&(array_size -1)].is_valid; }
227 
228  // end_of_input signal for parallel_pipeline, parallel input filters with 0 tokens allowed.
229  void create_my_tls() { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
230  void destroy_my_tls() { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
231  bool my_tls_end_of_input() { return end_of_input_tls.get() != 0; }
232  void set_my_tls_end_of_input() { end_of_input_tls.set(1); }
233 };
234 
235 void input_buffer::grow( size_type minimum_size ) {
236  size_type old_size = array_size;
237  size_type new_size = old_size ? 2*old_size : initial_buffer_size;
238  while( new_size<minimum_size )
239  new_size*=2;
240  task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
241  task_info* old_array = array;
242  for( size_type i=0; i<new_size; ++i )
243  new_array[i].is_valid = false;
244  long t=low_token;
245  for( size_type i=0; i<old_size; ++i, ++t )
246  new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
247  array = new_array;
248  array_size = new_size;
249  if( old_array )
250  cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
251 }
252 
253 class stage_task: public task, public task_info {
254 private:
255  friend class tbb::pipeline;
260 
261 public:
263 
265  my_pipeline(pipeline),
266  my_filter(pipeline.filter_list),
267  my_at_start(true)
268  {
270  }
272  stage_task( pipeline& pipeline, filter* filter_, const task_info& info ) :
273  task_info(info),
274  my_pipeline(pipeline),
275  my_filter(filter_),
276  my_at_start(false)
277  {}
279  void reset() {
281  my_filter = my_pipeline.filter_list;
282  my_at_start = true;
283  }
285  task* execute() __TBB_override;
286 #if __TBB_TASK_GROUP_CONTEXT
287  ~stage_task()
288  {
289  if (my_filter && my_object && (my_filter->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4)) {
290  __TBB_ASSERT(is_cancelled(), "Trying to finalize the task that wasn't cancelled");
291  my_filter->finalize(my_object);
292  my_object = NULL;
293  }
294  }
295 #endif // __TBB_TASK_GROUP_CONTEXT
296  void spawn_stage_task(const task_info& info)
298  {
299  stage_task* clone = new (allocate_additional_child_of(*parent()))
300  stage_task( my_pipeline, my_filter, info );
301  spawn(*clone);
302  }
303 };
304 
306  __TBB_ASSERT( !my_at_start || !my_object, NULL );
307  __TBB_ASSERT( !my_filter->is_bound(), NULL );
308  if( my_at_start ) {
309  if( my_filter->is_serial() ) {
310  my_object = (*my_filter)(my_object);
311  if( my_object || ( my_filter->object_may_be_null() && !my_pipeline.end_of_input) )
312  {
313  if( my_filter->is_ordered() ) {
314  my_token = my_pipeline.token_counter++; // ideally, with relaxed semantics
315  my_token_ready = true;
316  } else if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
317  if( my_pipeline.has_thread_bound_filters )
318  my_pipeline.token_counter++; // ideally, with relaxed semantics
319  }
320  if( !my_filter->next_filter_in_pipeline ) { // we're only filter in pipeline
321  reset();
322  goto process_another_stage;
323  } else {
324  ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
325  if( --my_pipeline.input_tokens>0 )
326  spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
327  }
328  } else {
329  my_pipeline.end_of_input = true;
330  return NULL;
331  }
332  } else /*not is_serial*/ {
333  if( my_pipeline.end_of_input )
334  return NULL;
335  if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
336  if( my_pipeline.has_thread_bound_filters )
337  my_pipeline.token_counter++;
338  }
339  ITT_NOTIFY( sync_releasing, &my_pipeline.input_tokens );
340  if( --my_pipeline.input_tokens>0 )
341  spawn( *new( allocate_additional_child_of(*parent()) ) stage_task( my_pipeline ) );
342  my_object = (*my_filter)(my_object);
343  if( !my_object && (!my_filter->object_may_be_null() || my_filter->my_input_buffer->my_tls_end_of_input()) )
344  {
345  my_pipeline.end_of_input = true;
346  if( (my_filter->my_filter_mode & my_filter->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
347  if( my_pipeline.has_thread_bound_filters )
348  my_pipeline.token_counter--; // fix token_counter
349  }
350  return NULL;
351  }
352  }
353  my_at_start = false;
354  } else {
355  my_object = (*my_filter)(my_object);
356  if( my_filter->is_serial() )
357  my_filter->my_input_buffer->note_done(my_token, *this);
358  }
359  my_filter = my_filter->next_filter_in_pipeline;
360  if( my_filter ) {
361  // There is another filter to execute.
362  if( my_filter->is_serial() ) {
363  // The next filter must execute tokens in order
364  if( my_filter->my_input_buffer->put_token(*this) ){
365  // Can't proceed with the same item
366  if( my_filter->is_bound() ) {
367  // Find the next non-thread-bound filter
368  do {
369  my_filter = my_filter->next_filter_in_pipeline;
370  } while( my_filter && my_filter->is_bound() );
371  // Check if there is an item ready to process
372  if( my_filter && my_filter->my_input_buffer->return_item(*this, !my_filter->is_serial()))
373  goto process_another_stage;
374  }
375  my_filter = NULL; // To prevent deleting my_object twice if exception occurs
376  return NULL;
377  }
378  }
379  } else {
380  // Reached end of the pipe.
381  size_t ntokens_avail = ++my_pipeline.input_tokens;
382  if(my_pipeline.filter_list->is_bound() ) {
383  if(ntokens_avail == 1) {
384  my_pipeline.filter_list->my_input_buffer->sema_V();
385  }
386  return NULL;
387  }
388  if( ntokens_avail>1 // Only recycle if there is one available token
389  || my_pipeline.end_of_input ) {
390  return NULL; // No need to recycle for new input
391  }
392  ITT_NOTIFY( sync_acquired, &my_pipeline.input_tokens );
393  // Recycle as an input stage task.
394  reset();
395  }
396 process_another_stage:
397  /* A semi-hackish way to reexecute the same task object immediately without spawning.
398  recycle_as_continuation marks the task for future execution,
399  and then 'this' pointer is returned to bypass spawning. */
400  recycle_as_continuation();
401  return this;
402 }
403 
404 class pipeline_root_task: public task {
407 
409  if( !my_pipeline.end_of_input )
410  if( !my_pipeline.filter_list->is_bound() )
411  if( my_pipeline.input_tokens > 0 ) {
412  recycle_as_continuation();
413  set_ref_count(1);
414  return new( allocate_child() ) stage_task( my_pipeline );
415  }
416  if( do_segment_scanning ) {
417  filter* current_filter = my_pipeline.filter_list->next_segment;
418  /* first non-thread-bound filter that follows thread-bound one
419  and may have valid items to process */
420  filter* first_suitable_filter = current_filter;
421  while( current_filter ) {
422  __TBB_ASSERT( !current_filter->is_bound(), "filter is thread-bound?" );
423  __TBB_ASSERT( current_filter->prev_filter_in_pipeline->is_bound(), "previous filter is not thread-bound?" );
424  if( !my_pipeline.end_of_input || current_filter->has_more_work())
425  {
426  task_info info;
427  info.reset();
428  if( current_filter->my_input_buffer->return_item(info, !current_filter->is_serial()) ) {
429  set_ref_count(1);
430  recycle_as_continuation();
431  return new( allocate_child() ) stage_task( my_pipeline, current_filter, info);
432  }
433  current_filter = current_filter->next_segment;
434  if( !current_filter ) {
435  if( !my_pipeline.end_of_input ) {
436  recycle_as_continuation();
437  return this;
438  }
439  current_filter = first_suitable_filter;
440  __TBB_Yield();
441  }
442  } else {
443  /* The preceding pipeline segment is empty.
444  Fast-forward to the next post-TBF segment. */
445  first_suitable_filter = first_suitable_filter->next_segment;
446  current_filter = first_suitable_filter;
447  }
448  } /* while( current_filter ) */
449  return NULL;
450  } else {
451  if( !my_pipeline.end_of_input ) {
452  recycle_as_continuation();
453  return this;
454  }
455  return NULL;
456  }
457  }
458 public:
459  pipeline_root_task( pipeline& pipeline ): my_pipeline(pipeline), do_segment_scanning(false)
460  {
461  __TBB_ASSERT( my_pipeline.filter_list, NULL );
462  filter* first = my_pipeline.filter_list;
463  if( (first->my_filter_mode & first->version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
464  // Scanning the pipeline for segments
465  filter* head_of_previous_segment = first;
466  for( filter* subfilter=first->next_filter_in_pipeline;
467  subfilter!=NULL;
468  subfilter=subfilter->next_filter_in_pipeline )
469  {
470  if( subfilter->prev_filter_in_pipeline->is_bound() && !subfilter->is_bound() ) {
471  do_segment_scanning = true;
472  head_of_previous_segment->next_segment = subfilter;
473  head_of_previous_segment = subfilter;
474  }
475  }
476  }
477  }
478 };
479 
480 #if _MSC_VER && !defined(__INTEL_COMPILER)
481  // Workaround for overzealous compiler warnings
482  // Suppress compiler warning about constant conditional expression
483  #pragma warning (disable: 4127)
484 #endif
485 
486 // The class destroys end_counter and clears all input buffers if pipeline was cancelled.
487 class pipeline_cleaner: internal::no_copy {
489 public:
491  my_pipeline(_pipeline)
492  {}
494 #if __TBB_TASK_GROUP_CONTEXT
495  if (my_pipeline.end_counter->is_cancelled()) // Pipeline was cancelled
496  my_pipeline.clear_filters();
497 #endif
498  my_pipeline.end_counter = NULL;
499  }
500 };
501 
502 } // namespace internal
503 
505  __TBB_ASSERT(false,"illegal call to inject_token");
506 }
507 
508 #if __TBB_TASK_GROUP_CONTEXT
510  for( filter* f = filter_list; f; f = f->next_filter_in_pipeline ) {
511  if ((f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(4))
512  if( internal::input_buffer* b = f->my_input_buffer )
513  b->clear(f);
514  }
515 }
516 #endif
517 
519  filter_list(NULL),
520  filter_end(NULL),
521  end_counter(NULL),
522  end_of_input(false),
523  has_thread_bound_filters(false)
524 {
525  token_counter = 0;
526  input_tokens = 0;
527 }
528 
530  clear();
531 }
532 
534  filter* next;
535  for( filter* f = filter_list; f; f=next ) {
536  if( internal::input_buffer* b = f->my_input_buffer ) {
537  delete b;
538  f->my_input_buffer = NULL;
539  }
540  next=f->next_filter_in_pipeline;
542  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
543  f->prev_filter_in_pipeline = filter::not_in_pipeline();
544  f->my_pipeline = NULL;
545  }
546  if ( (f->my_filter_mode & filter::version_mask) >= __TBB_PIPELINE_VERSION(5) )
547  f->next_segment = NULL;
548  }
549  filter_list = filter_end = NULL;
550 }
551 
552 void pipeline::add_filter( filter& filter_ ) {
553 #if TBB_USE_ASSERT
555  __TBB_ASSERT( filter_.prev_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
556  __TBB_ASSERT( filter_.next_filter_in_pipeline==filter::not_in_pipeline(), "filter already part of pipeline?" );
557  __TBB_ASSERT( !end_counter, "invocation of add_filter on running pipeline" );
558 #endif
560  filter_.my_pipeline = this;
562  if ( filter_list == NULL)
563  filter_list = &filter_;
564  else
566  filter_.next_filter_in_pipeline = NULL;
567  filter_end = &filter_;
568  }
569  else
570  {
571  if( !filter_end )
572  filter_end = reinterpret_cast<filter*>(&filter_list);
573 
574  *reinterpret_cast<filter**>(filter_end) = &filter_;
575  filter_end = reinterpret_cast<filter*>(&filter_.next_filter_in_pipeline);
576  *reinterpret_cast<filter**>(filter_end) = NULL;
577  }
578  if( (filter_.my_filter_mode & filter_.version_mask) >= __TBB_PIPELINE_VERSION(5) ) {
579  if( filter_.is_serial() ) {
580  if( filter_.is_bound() )
582  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), filter_.is_bound() );
583  }
584  else {
585  if(filter_.prev_filter_in_pipeline) {
586  if(filter_.prev_filter_in_pipeline->is_bound()) {
587  // successors to bound filters must have an input_buffer
588  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
589  }
590  }
591  else { // input filter
592  if(filter_.object_may_be_null() ) {
593  //TODO: buffer only needed to hold TLS; could improve
594  filter_.my_input_buffer = new internal::input_buffer( /*is_ordered*/false, false );
595  filter_.my_input_buffer->create_my_tls();
596  }
597  }
598  }
599  } else {
600  if( filter_.is_serial() ) {
601  filter_.my_input_buffer = new internal::input_buffer( filter_.is_ordered(), false );
602  }
603  }
604 
605 }
606 
607 void pipeline::remove_filter( filter& filter_ ) {
608  __TBB_ASSERT( filter_.prev_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
609  __TBB_ASSERT( filter_.next_filter_in_pipeline!=filter::not_in_pipeline(), "filter not part of pipeline" );
610  __TBB_ASSERT( !end_counter, "invocation of remove_filter on running pipeline" );
611  if (&filter_ == filter_list)
613  else {
614  __TBB_ASSERT( filter_.prev_filter_in_pipeline, "filter list broken?" );
616  }
617  if (&filter_ == filter_end)
619  else {
620  __TBB_ASSERT( filter_.next_filter_in_pipeline, "filter list broken?" );
622  }
623  if( internal::input_buffer* b = filter_.my_input_buffer ) {
624  delete b;
625  filter_.my_input_buffer = NULL;
626  }
629  filter_.next_segment = NULL;
630  filter_.my_pipeline = NULL;
631 }
632 
633 void pipeline::run( size_t max_number_of_live_tokens
635  , tbb::task_group_context& context
636 #endif
637  ) {
638  __TBB_ASSERT( max_number_of_live_tokens>0, "pipeline::run must have at least one token" );
639  __TBB_ASSERT( !end_counter, "pipeline already running?" );
640  if( filter_list ) {
641  internal::pipeline_cleaner my_pipeline_cleaner(*this);
642  end_of_input = false;
643  input_tokens = internal::Token(max_number_of_live_tokens);
645  // release input filter if thread-bound
646  if(filter_list->is_bound()) {
647  filter_list->my_input_buffer->sema_V();
648  }
649  }
650 #if __TBB_TASK_GROUP_CONTEXT
652 #else
654 #endif
655  // Start execution of tasks
657 
660  if(f->is_bound()) {
661  f->my_input_buffer->sema_V(); // wake to end
662  }
663  }
664  }
665  }
666 }
667 
668 #if __TBB_TASK_GROUP_CONTEXT
669 void pipeline::run( size_t max_number_of_live_tokens ) {
670  if( filter_list ) {
671  // Construct task group context with the exception propagation mode expected
672  // by the pipeline caller.
676  task_group_context context(task_group_context::bound, ctx_traits);
677  run(max_number_of_live_tokens, context);
678  }
679 }
680 #endif // __TBB_TASK_GROUP_CONTEXT
681 
683  __TBB_ASSERT(my_pipeline, NULL);
684  __TBB_ASSERT(my_input_buffer, "has_more_work() called for filter with no input buffer");
685  return (internal::tokendiff_t)(my_pipeline->token_counter - my_input_buffer->low_token) != 0;
686 }
687 
689  if ( (my_filter_mode & version_mask) >= __TBB_PIPELINE_VERSION(3) ) {
690  if ( next_filter_in_pipeline != filter::not_in_pipeline() )
691  my_pipeline->remove_filter(*this);
692  else
693  __TBB_ASSERT( prev_filter_in_pipeline == filter::not_in_pipeline(), "probably filter list is broken" );
694  } else {
695  __TBB_ASSERT( next_filter_in_pipeline==filter::not_in_pipeline(), "cannot destroy filter that is part of pipeline" );
696  }
697 }
698 
699 void
701  __TBB_ASSERT(my_input_buffer, NULL);
702  __TBB_ASSERT(object_may_be_null(), NULL);
703  if(is_serial()) {
704  my_pipeline->end_of_input = true;
705  }
706  else {
707  __TBB_ASSERT(my_input_buffer->end_of_input_tls_allocated, NULL);
708  my_input_buffer->set_my_tls_end_of_input();
709  }
710 }
711 
713  return internal_process_item(true);
714 }
715 
717  return internal_process_item(false);
718 }
719 
721  __TBB_ASSERT(my_pipeline != NULL,"It's not supposed that process_item is called for a filter that is not in a pipeline.");
722  internal::task_info info;
723  info.reset();
724 
725  if( my_pipeline->end_of_input && !has_more_work() )
726  return end_of_stream;
727 
728  if( !prev_filter_in_pipeline ) {
729  if( my_pipeline->end_of_input )
730  return end_of_stream;
731  while( my_pipeline->input_tokens == 0 ) {
732  if( !is_blocking )
733  return item_not_available;
734  my_input_buffer->sema_P();
735  }
736  info.my_object = (*this)(info.my_object);
737  if( info.my_object ) {
738  __TBB_ASSERT(my_pipeline->input_tokens > 0, "Token failed in thread-bound filter");
739  my_pipeline->input_tokens--;
740  if( is_ordered() ) {
741  info.my_token = my_pipeline->token_counter;
742  info.my_token_ready = true;
743  }
744  my_pipeline->token_counter++; // ideally, with relaxed semantics
745  } else {
746  my_pipeline->end_of_input = true;
747  return end_of_stream;
748  }
749  } else { /* this is not an input filter */
750  while( !my_input_buffer->has_item() ) {
751  if( !is_blocking ) {
752  return item_not_available;
753  }
754  my_input_buffer->sema_P();
755  if( my_pipeline->end_of_input && !has_more_work() ) {
756  return end_of_stream;
757  }
758  }
759  if( !my_input_buffer->return_item(info, /*advance*/true) ) {
760  __TBB_ASSERT(false,"return_item failed");
761  }
762  info.my_object = (*this)(info.my_object);
763  }
764  if( next_filter_in_pipeline ) {
765  if ( !next_filter_in_pipeline->my_input_buffer->put_token(info,/*force_put=*/true) ) {
766  __TBB_ASSERT(false, "Couldn't put token after thread-bound buffer");
767  }
768  } else {
769  size_t ntokens_avail = ++(my_pipeline->input_tokens);
770  if( my_pipeline->filter_list->is_bound() ) {
771  if( ntokens_avail == 1 ) {
772  my_pipeline->filter_list->my_input_buffer->sema_V();
773  }
774  }
775  }
776 
777  return success;
778 }
779 
780 } // tbb
781 
task * execute() __TBB_override
The virtual task execution method.
Definition: pipeline.cpp:305
void V()
post/release
Definition: semaphore.h:114
unsigned long Token
Definition: pipeline.h:44
pipeline_root_task(pipeline &pipeline)
Definition: pipeline.cpp:459
bool is_valid
True if my_object is valid.
Definition: pipeline.cpp:41
#define __TBB_override
Definition: tbb_stddef.h:244
void __TBB_EXPORTED_METHOD inject_token(task &self)
Not used, but retained to satisfy old export files.
Definition: pipeline.cpp:504
pipeline * my_pipeline
Pointer to the pipeline.
Definition: pipeline.h:188
static const unsigned char exact_exception_propagation
7th bit defines exception propagation mode expected by the application.
Definition: pipeline.h:85
result_type __TBB_EXPORTED_METHOD process_item()
Wait until a data item becomes available, and invoke operator() on that item.
Definition: pipeline.cpp:712
void reset()
Roughly equivalent to the constructor of input stage task.
Definition: pipeline.cpp:279
pipeline_cleaner(pipeline &_pipeline)
Definition: pipeline.cpp:490
A processing pipeline that applies filters to items.
Definition: pipeline.h:236
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
filter * filter_list
Pointer to first filter in the pipeline.
Definition: pipeline.h:268
virtual __TBB_EXPORTED_METHOD ~filter()
Destroy filter.
Definition: pipeline.cpp:688
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:97
auto first(Container &c) -> decltype(begin(c))
atomic< internal::Token > input_tokens
Number of idle tokens waiting for input stage.
Definition: pipeline.h:277
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t new_size
bool my_at_start
True if this task has not yet read the input.
Definition: pipeline.cpp:259
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
filter * next_segment
Pointer to the next "segment" of filters, or NULL if not required.
Definition: pipeline.h:192
const unsigned char my_filter_mode
Storage for filter mode and dynamically checked implementation version.
Definition: pipeline.h:182
bool has_item()
true if the current low_token is valid.
Definition: pipeline.cpp:226
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
void grow(size_type minimum_size)
Resize "array".
Definition: pipeline.cpp:235
void note_done(Token token, StageTask &spawner)
Note that processing of a token is finished.
Definition: pipeline.cpp:178
#define __TBB_Yield()
Definition: ibm_aix51.h:48
bool is_bound() const
True if filter is thread-bound.
Definition: pipeline.h:139
Edsger Dijkstra&#39;s counting semaphore.
Definition: semaphore.h:98
friend class internal::pipeline_cleaner
Definition: pipeline.h:264
bool is_cancelled() const
Returns true if the context has received cancellation request.
Definition: task.h:913
bool is_bound
True for thread-bound filter, false otherwise.
Definition: pipeline.cpp:93
A buffer of input items for a filter.
Definition: pipeline.cpp:52
void set(T value)
Definition: tls.h:60
__TBB_EXPORTED_METHOD pipeline()
Construct empty pipeline.
Definition: pipeline.cpp:518
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:73
void __TBB_EXPORTED_METHOD add_filter(filter &filter_)
Add filter to end of pipeline.
Definition: pipeline.cpp:552
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
internal::input_buffer * my_input_buffer
Buffer for incoming tokens, or NULL if not required.
Definition: pipeline.h:174
bool put_token(task_info &info_, bool force_put=false)
Put a token into the buffer.
Definition: pipeline.cpp:139
bool has_thread_bound_filters
True if the pipeline contains a thread-bound filter; false otherwise.
Definition: pipeline.h:286
stage_task(pipeline &pipeline, filter *filter_, const task_info &info)
Construct stage_task for a subsequent stage in a pipeline.
Definition: pipeline.cpp:272
basic_tls< intptr_t > end_of_input_tls_t
for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input ...
Definition: pipeline.cpp:96
Token my_token
Invalid unless a task went through an ordered stage.
Definition: pipeline.cpp:37
result_type __TBB_EXPORTED_METHOD try_process_item()
If a data item is available, invoke operator() on that item.
Definition: pipeline.cpp:716
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
Definition: pipeline.cpp:87
void create_sema(size_t initial_tokens)
Definition: pipeline.cpp:100
filter * filter_end
Pointer to location where address of next filter to be added should be stored.
Definition: pipeline.h:271
bool return_item(task_info &info, bool advance)
return an item, invalidate the queued item, but only advance if advance
Definition: pipeline.cpp:212
void remove_filter(filter &filter_)
Remove filter from pipeline.
Definition: pipeline.cpp:607
The graph class.
void clear_filters()
Does clean up if pipeline is cancelled or exception occurred.
#define __TBB_PIPELINE_VERSION(x)
Definition: pipeline.h:42
filter * prev_filter_in_pipeline
Pointer to previous filter in the pipeline.
Definition: pipeline.h:185
task * end_counter
task who&#39;s reference count is used to determine when all stages are done.
Definition: pipeline.h:274
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
bool is_ordered
True for ordered filter, false otherwise.
Definition: pipeline.cpp:90
void reset()
Set to initial state (no object, no token)
Definition: pipeline.cpp:43
bool my_token_ready
False until my_token is set.
Definition: pipeline.cpp:39
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:309
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:65
Used to form groups of tasks.
Definition: task.h:335
size_type array_size
Size of array.
Definition: pipeline.cpp:69
static filter * not_in_pipeline()
Value used to mark "not in pipeline".
Definition: pipeline.h:68
A stage in a pipeline served by a user thread.
Definition: pipeline.h:197
virtual __TBB_EXPORTED_METHOD ~pipeline()
Definition: pipeline.cpp:529
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:120
static internal::allocate_root_proxy allocate_root()
Returns proxy for overloaded new that allocates a root task.
Definition: task.h:636
~input_buffer()
Destroy the buffer.
Definition: pipeline.cpp:118
filter * next_filter_in_pipeline
Pointer to next filter in the pipeline.
Definition: pipeline.h:160
void __TBB_EXPORTED_METHOD run(size_t max_number_of_live_tokens)
Run the pipeline to completion.
Definition: pipeline.cpp:633
static void spawn_root_and_wait(task &root)
Spawn task allocated by allocate_root, wait for it to complete, and deallocate it.
Definition: task.h:781
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:76
atomic< internal::Token > token_counter
Global counter of tokens.
Definition: pipeline.h:280
Base class for user-defined tasks.
Definition: task.h:592
virtual void finalize(void *)
Destroys item if pipeline was cancelled.
Definition: pipeline.h:160
input_buffer(bool is_ordered_, bool is_bound_)
Construct empty buffer.
Definition: pipeline.cpp:107
void __TBB_EXPORTED_METHOD set_end_of_input()
Definition: pipeline.cpp:700
void __TBB_EXPORTED_METHOD clear()
Remove all filters from the pipeline.
Definition: pipeline.cpp:533
static const unsigned char version_mask
Definition: pipeline.h:93
bool is_ordered() const
True if filter must receive stream in order.
Definition: pipeline.h:134
friend class internal::pipeline_root_task
Definition: pipeline.h:261
bool end_of_input
False until fetch_input returns NULL.
Definition: pipeline.h:283
result_type internal_process_item(bool is_blocking)
Internal routine for item processing.
Definition: pipeline.cpp:720
task * execute() __TBB_override
Should be overridden by derived classes.
Definition: pipeline.cpp:408
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info...
Definition: tbb_misc.cpp:78
A lock that occupies a single byte.
Definition: spin_mutex.h:40
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
long tokendiff_t
Definition: pipeline.h:45
bool has_more_work()
has the filter not yet processed all the tokens it will ever see?
Definition: pipeline.cpp:682
bool object_may_be_null()
true if an input filter can emit null
Definition: pipeline.h:144
stage_task(pipeline &pipeline)
Construct stage_task for first stage in a pipeline.
Definition: pipeline.cpp:264
#define __TBB_TASK_GROUP_CONTEXT
Definition: tbb_config.h:547
bool is_serial() const
True if filter is serial.
Definition: pipeline.h:129
This structure is used to store task information in a input buffer.
Definition: pipeline.cpp:34
void P()
wait/acquire
Definition: semaphore.h:109
A stage in a pipeline.
Definition: pipeline.h:65

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.