Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
tbb::internal::input_buffer Class Reference

A buffer of input items for a filter. More...

Inheritance diagram for tbb::internal::input_buffer:
Collaboration diagram for tbb::internal::input_buffer:

Public Member Functions

 input_buffer (bool is_ordered_, bool is_bound_)
 Construct empty buffer. More...
 
 ~input_buffer ()
 Destroy the buffer. More...
 
bool put_token (task_info &info_, bool force_put=false)
 Put a token into the buffer. More...
 
template<typename StageTask >
void note_done (Token token, StageTask &spawner)
 Note that processing of a token is finished. More...
 
bool return_item (task_info &info, bool advance)
 return an item, invalidate the queued item, but only advance if advance More...
 
bool has_item ()
 true if the current low_token is valid. More...
 
void create_my_tls ()
 
void destroy_my_tls ()
 
bool my_tls_end_of_input ()
 
void set_my_tls_end_of_input ()
 

Private Types

typedef Token size_type
 
typedef basic_tls< intptr_t > end_of_input_tls_t
 for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input More...
 

Private Member Functions

void grow (size_type minimum_size)
 Resize "array". More...
 
void create_sema (size_t initial_tokens)
 
void free_sema ()
 
void sema_P ()
 
void sema_V ()
 
- Private Member Functions inherited from tbb::internal::no_copy
 no_copy ()
 Allow default construction. More...
 

Private Attributes

task_infoarray
 Array of deferred tasks that cannot yet start executing. More...
 
semaphoremy_sem
 for thread-bound filter, semaphore for waiting, NULL otherwise. More...
 
size_type array_size
 Size of array. More...
 
Token low_token
 Lowest token that can start executing. More...
 
spin_mutex array_mutex
 Serializes updates. More...
 
Token high_token
 Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned. More...
 
bool is_ordered
 True for ordered filter, false otherwise. More...
 
bool is_bound
 True for thread-bound filter, false otherwise. More...
 
end_of_input_tls_t end_of_input_tls
 
bool end_of_input_tls_allocated
 

Static Private Attributes

static const size_type initial_buffer_size = 4
 Initial size for "array". More...
 

Friends

class tbb::internal::pipeline_root_task
 
class tbb::filter
 
class tbb::thread_bound_filter
 
class tbb::internal::stage_task
 
class tbb::pipeline
 

Detailed Description

A buffer of input items for a filter.

Each item is a task_info, inserted into a position in the buffer corresponding to a Token.

Definition at line 52 of file pipeline.cpp.

Member Typedef Documentation

◆ end_of_input_tls_t

for parallel filters that accepts NULLs, thread-local flag for reaching end_of_input

Definition at line 96 of file pipeline.cpp.

◆ size_type

Definition at line 59 of file pipeline.cpp.

Constructor & Destructor Documentation

◆ input_buffer()

tbb::internal::input_buffer::input_buffer ( bool  is_ordered_,
bool  is_bound_ 
)
inline

Construct empty buffer.

Definition at line 107 of file pipeline.cpp.

References __TBB_ASSERT.

107  :
108  array(NULL), my_sem(NULL), array_size(0),
109  low_token(0), high_token(0),
110  is_ordered(is_ordered_), is_bound(is_bound_),
113  __TBB_ASSERT( array, NULL );
114  if(is_bound) create_sema(0);
115  }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
void grow(size_type minimum_size)
Resize "array".
Definition: pipeline.cpp:235
bool is_bound
True for thread-bound filter, false otherwise.
Definition: pipeline.cpp:93
static const size_type initial_buffer_size
Initial size for "array".
Definition: pipeline.cpp:84
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:73
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
Definition: pipeline.cpp:87
void create_sema(size_t initial_tokens)
Definition: pipeline.cpp:100
bool is_ordered
True for ordered filter, false otherwise.
Definition: pipeline.cpp:90
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:65
size_type array_size
Size of array.
Definition: pipeline.cpp:69

◆ ~input_buffer()

tbb::internal::input_buffer::~input_buffer ( )
inline

Destroy the buffer.

Definition at line 118 of file pipeline.cpp.

References __TBB_ASSERT, and tbb::internal::poison_pointer().

118  {
119  __TBB_ASSERT( array, NULL );
120  cache_aligned_allocator<task_info>().deallocate(array,array_size);
122  if(my_sem) {
123  free_sema();
124  }
126  destroy_my_tls();
127  }
128  }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
void poison_pointer(T *__TBB_atomic &)
Definition: tbb_stddef.h:309
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:65
size_type array_size
Size of array.
Definition: pipeline.cpp:69
Here is the call graph for this function:

Member Function Documentation

◆ create_my_tls()

void tbb::internal::input_buffer::create_my_tls ( )
inline

Definition at line 229 of file pipeline.cpp.

References tbb::internal::basic_tls< T >::create(), and tbb::internal::handle_perror().

229 { int status = end_of_input_tls.create(); if(status) handle_perror(status, "TLS not allocated for filter"); end_of_input_tls_allocated = true; }
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:97
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info...
Definition: tbb_misc.cpp:78
Here is the call graph for this function:

◆ create_sema()

void tbb::internal::input_buffer::create_sema ( size_t  initial_tokens)
inlineprivate

Definition at line 100 of file pipeline.cpp.

References __TBB_ASSERT.

100 { __TBB_ASSERT(!my_sem,NULL); my_sem = new internal::semaphore(initial_tokens); }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:65

◆ destroy_my_tls()

void tbb::internal::input_buffer::destroy_my_tls ( )
inline

Definition at line 230 of file pipeline.cpp.

References tbb::internal::basic_tls< T >::destroy(), and tbb::internal::handle_perror().

230 { int status = end_of_input_tls.destroy(); if(status) handle_perror(status, "Failed to destroy filter TLS"); }
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:97
void __TBB_EXPORTED_FUNC handle_perror(int error_code, const char *aux_info)
Throws std::runtime_error with what() returning error_code description prefixed with aux_info...
Definition: tbb_misc.cpp:78
Here is the call graph for this function:

◆ free_sema()

void tbb::internal::input_buffer::free_sema ( )
inlineprivate

Definition at line 101 of file pipeline.cpp.

References __TBB_ASSERT.

101 { __TBB_ASSERT(my_sem,NULL); delete my_sem; }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:65

◆ grow()

void tbb::internal::input_buffer::grow ( size_type  minimum_size)
private

Resize "array".

Caller is responsible to acquiring a lock on "array_mutex".

Definition at line 235 of file pipeline.cpp.

References tbb::internal::task_info::is_valid, and new_size.

235  {
236  size_type old_size = array_size;
237  size_type new_size = old_size ? 2*old_size : initial_buffer_size;
238  while( new_size<minimum_size )
239  new_size*=2;
240  task_info* new_array = cache_aligned_allocator<task_info>().allocate(new_size);
241  task_info* old_array = array;
242  for( size_type i=0; i<new_size; ++i )
243  new_array[i].is_valid = false;
244  long t=low_token;
245  for( size_type i=0; i<old_size; ++i, ++t )
246  new_array[t&(new_size-1)] = old_array[t&(old_size-1)];
247  array = new_array;
249  if( old_array )
250  cache_aligned_allocator<task_info>().deallocate(old_array,old_size);
251 }
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t new_size
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
static const size_type initial_buffer_size
Initial size for "array".
Definition: pipeline.cpp:84
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:73
size_type array_size
Size of array.
Definition: pipeline.cpp:69

◆ has_item()

bool tbb::internal::input_buffer::has_item ( )
inline

true if the current low_token is valid.

Definition at line 226 of file pipeline.cpp.

References tbb::internal::task_info::is_valid, and lock.

task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:73
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
friend class scoped_lock
Definition: spin_mutex.h:180
size_type array_size
Size of array.
Definition: pipeline.cpp:69
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:76

◆ my_tls_end_of_input()

bool tbb::internal::input_buffer::my_tls_end_of_input ( )
inline

Definition at line 231 of file pipeline.cpp.

References tbb::internal::basic_tls< T >::get().

231 { return end_of_input_tls.get() != 0; }
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:97
Here is the call graph for this function:

◆ note_done()

template<typename StageTask >
void tbb::internal::input_buffer::note_done ( Token  token,
StageTask &  spawner 
)
inline

Note that processing of a token is finished.

Fires up processing of the next token, if processing was deferred.

Definition at line 178 of file pipeline.cpp.

References tbb::pipeline::clear(), tbb::filter::finalize(), tbb::internal::task_info::is_valid, ITT_NOTIFY, lock, tbb::internal::task_info::my_object, and tbb::internal::task_info::reset().

178  {
179  task_info wakee;
180  wakee.reset();
181  {
183  if( !is_ordered || token==low_token ) {
184  // Wake the next task
185  task_info& item = array[++low_token & (array_size-1)];
186  ITT_NOTIFY( sync_acquired, this );
187  wakee = item;
188  item.is_valid = false;
189  }
190  }
191  if( wakee.is_valid )
192  spawner.spawn_stage_task(wakee);
193  }
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:73
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
friend class scoped_lock
Definition: spin_mutex.h:180
bool is_ordered
True for ordered filter, false otherwise.
Definition: pipeline.cpp:90
size_type array_size
Size of array.
Definition: pipeline.cpp:69
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:120
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:76
Here is the call graph for this function:

◆ put_token()

bool tbb::internal::input_buffer::put_token ( task_info info_,
bool  force_put = false 
)
inline

Put a token into the buffer.

If task information was placed into buffer, returns true; otherwise returns false, informing the caller to create and spawn a task. If input buffer owned by thread-bound filter and the item at low_token was not valid, issue a V() If the input_buffer is owned by a successor to a thread-bound filter, the force_put parameter should be true to ensure the token is inserted in the buffer.

Definition at line 139 of file pipeline.cpp.

References __TBB_ASSERT, tbb::internal::task_info::is_valid, ITT_NOTIFY, lock, tbb::internal::task_info::my_token, tbb::internal::task_info::my_token_ready, and sync_releasing.

139  {
140  {
141  info_.is_valid = true;
143  Token token;
144  bool was_empty = !array[low_token&(array_size-1)].is_valid;
145  if( is_ordered ) {
146  if( !info_.my_token_ready ) {
147  info_.my_token = high_token++;
148  info_.my_token_ready = true;
149  }
150  token = info_.my_token;
151  } else
152  token = high_token++;
153  __TBB_ASSERT( (tokendiff_t)(token-low_token)>=0, NULL );
154  if( token!=low_token || is_bound || force_put ) {
155  // Trying to put token that is beyond low_token.
156  // Need to wait until low_token catches up before dispatching.
157  if( token-low_token>=array_size )
158  grow( token-low_token+1 );
159  ITT_NOTIFY( sync_releasing, this );
160  array[token&(array_size-1)] = info_;
161  if(was_empty && is_bound) {
162  sema_V();
163  }
164  return true;
165  }
166  }
167  return false;
168  }
unsigned long Token
Definition: pipeline.h:44
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
void grow(size_type minimum_size)
Resize "array".
Definition: pipeline.cpp:235
bool is_bound
True for thread-bound filter, false otherwise.
Definition: pipeline.cpp:93
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:73
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
Token my_token
Invalid unless a task went through an ordered stage.
Definition: pipeline.cpp:37
Token high_token
Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assig...
Definition: pipeline.cpp:87
friend class scoped_lock
Definition: spin_mutex.h:180
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
bool is_ordered
True for ordered filter, false otherwise.
Definition: pipeline.cpp:90
size_type array_size
Size of array.
Definition: pipeline.cpp:69
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:120
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:76
long tokendiff_t
Definition: pipeline.h:45

◆ return_item()

bool tbb::internal::input_buffer::return_item ( task_info info,
bool  advance 
)
inline

return an item, invalidate the queued item, but only advance if advance

Definition at line 212 of file pipeline.cpp.

References tbb::internal::task_info::is_valid, ITT_NOTIFY, and lock.

212  {
214  task_info& item = array[low_token&(array_size-1)];
215  ITT_NOTIFY( sync_acquired, this );
216  if( item.is_valid ) {
217  info = item;
218  item.is_valid = false;
219  if (advance) low_token++;
220  return true;
221  }
222  return false;
223  }
task_info * array
Array of deferred tasks that cannot yet start executing.
Definition: pipeline.cpp:62
Token low_token
Lowest token that can start executing.
Definition: pipeline.cpp:73
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
friend class scoped_lock
Definition: spin_mutex.h:180
size_type array_size
Size of array.
Definition: pipeline.cpp:69
#define ITT_NOTIFY(name, obj)
Definition: itt_notify.h:120
spin_mutex array_mutex
Serializes updates.
Definition: pipeline.cpp:76

◆ sema_P()

void tbb::internal::input_buffer::sema_P ( )
inlineprivate

Definition at line 102 of file pipeline.cpp.

References __TBB_ASSERT, and tbb::internal::semaphore::P().

102 { __TBB_ASSERT(my_sem,NULL); my_sem->P(); }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:65
void P()
wait/acquire
Definition: semaphore.h:109
Here is the call graph for this function:

◆ sema_V()

void tbb::internal::input_buffer::sema_V ( )
inlineprivate

Definition at line 103 of file pipeline.cpp.

References __TBB_ASSERT, and tbb::internal::semaphore::V().

103 { __TBB_ASSERT(my_sem,NULL); my_sem->V(); }
void V()
post/release
Definition: semaphore.h:114
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
semaphore * my_sem
for thread-bound filter, semaphore for waiting, NULL otherwise.
Definition: pipeline.cpp:65
Here is the call graph for this function:

◆ set_my_tls_end_of_input()

void tbb::internal::input_buffer::set_my_tls_end_of_input ( )
inline

Definition at line 232 of file pipeline.cpp.

References tbb::internal::basic_tls< T >::set().

232 { end_of_input_tls.set(1); }
end_of_input_tls_t end_of_input_tls
Definition: pipeline.cpp:97
void set(T value)
Definition: tls.h:60
Here is the call graph for this function:

Friends And Related Function Documentation

◆ tbb::filter

friend class tbb::filter
friend

Definition at line 54 of file pipeline.cpp.

◆ tbb::internal::pipeline_root_task

friend class tbb::internal::pipeline_root_task
friend

Definition at line 53 of file pipeline.cpp.

◆ tbb::internal::stage_task

friend class tbb::internal::stage_task
friend

Definition at line 56 of file pipeline.cpp.

◆ tbb::pipeline

friend class tbb::pipeline
friend

Definition at line 57 of file pipeline.cpp.

◆ tbb::thread_bound_filter

friend class tbb::thread_bound_filter
friend

Definition at line 55 of file pipeline.cpp.

Member Data Documentation

◆ array

task_info* tbb::internal::input_buffer::array
private

Array of deferred tasks that cannot yet start executing.

Definition at line 62 of file pipeline.cpp.

◆ array_mutex

spin_mutex tbb::internal::input_buffer::array_mutex
private

Serializes updates.

Definition at line 76 of file pipeline.cpp.

◆ array_size

size_type tbb::internal::input_buffer::array_size
private

Size of array.

Always 0 or a power of 2

Definition at line 69 of file pipeline.cpp.

◆ end_of_input_tls

end_of_input_tls_t tbb::internal::input_buffer::end_of_input_tls
private

Definition at line 97 of file pipeline.cpp.

◆ end_of_input_tls_allocated

bool tbb::internal::input_buffer::end_of_input_tls_allocated
private

Definition at line 98 of file pipeline.cpp.

◆ high_token

Token tbb::internal::input_buffer::high_token
private

Used for out of order buffer, and for assigning my_token if is_ordered and my_token not already assigned.

Definition at line 87 of file pipeline.cpp.

◆ initial_buffer_size

const size_type tbb::internal::input_buffer::initial_buffer_size = 4
staticprivate

Initial size for "array".

Must be a power of 2

Definition at line 84 of file pipeline.cpp.

◆ is_bound

bool tbb::internal::input_buffer::is_bound
private

True for thread-bound filter, false otherwise.

Definition at line 93 of file pipeline.cpp.

◆ is_ordered

bool tbb::internal::input_buffer::is_ordered
private

True for ordered filter, false otherwise.

Definition at line 90 of file pipeline.cpp.

◆ low_token

Token tbb::internal::input_buffer::low_token
private

Lowest token that can start executing.

All prior Token have already been seen.

Definition at line 73 of file pipeline.cpp.

◆ my_sem

semaphore* tbb::internal::input_buffer::my_sem
private

for thread-bound filter, semaphore for waiting, NULL otherwise.

Definition at line 65 of file pipeline.cpp.


The documentation for this class was generated from the following file:

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.