Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_queue.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_concurrent_queue_H
22 #define __TBB_concurrent_queue_H
23 
26 
27 namespace tbb {
28 
29 namespace strict_ppl {
30 
32 
35 template<typename T, typename A = cache_aligned_allocator<T> >
36 class concurrent_queue: public internal::concurrent_queue_base_v3<T> {
37  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
38 
41  page_allocator_type my_allocator;
42 
44  virtual void *allocate_block( size_t n ) __TBB_override {
45  void *b = reinterpret_cast<void*>(my_allocator.allocate( n ));
46  if( !b )
48  return b;
49  }
50 
52  virtual void deallocate_block( void *b, size_t n ) __TBB_override {
53  my_allocator.deallocate( reinterpret_cast<char*>(b), n );
54  }
55 
56  static void copy_construct_item(T* location, const void* src){
57  new (location) T(*static_cast<const T*>(src));
58  }
59 
60 #if __TBB_CPP11_RVALUE_REF_PRESENT
61  static void move_construct_item(T* location, const void* src) {
62  new (location) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
63  }
64 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
65 public:
67  typedef T value_type;
68 
70  typedef T& reference;
71 
73  typedef const T& const_reference;
74 
76  typedef size_t size_type;
77 
79  typedef ptrdiff_t difference_type;
80 
82  typedef A allocator_type;
83 
85  explicit concurrent_queue(const allocator_type& a = allocator_type()) :
86  my_allocator( a )
87  {
88  }
89 
91  template<typename InputIterator>
92  concurrent_queue( InputIterator begin, InputIterator end, const allocator_type& a = allocator_type()) :
93  my_allocator( a )
94  {
95  for( ; begin != end; ++begin )
96  this->push(*begin);
97  }
98 
100  concurrent_queue( const concurrent_queue& src, const allocator_type& a = allocator_type()) :
101  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
102  {
103  this->assign( src, copy_construct_item );
104  }
105 
106 #if __TBB_CPP11_RVALUE_REF_PRESENT
109  internal::concurrent_queue_base_v3<T>(), my_allocator( std::move(src.my_allocator) )
110  {
111  this->internal_swap( src );
112  }
113 
114  concurrent_queue( concurrent_queue&& src, const allocator_type& a ) :
115  internal::concurrent_queue_base_v3<T>(), my_allocator( a )
116  {
117  // checking that memory allocated by one instance of allocator can be deallocated
118  // with another
119  if( my_allocator == src.my_allocator) {
120  this->internal_swap( src );
121  } else {
122  // allocators are different => performing per-element move
123  this->assign( src, move_construct_item );
124  src.clear();
125  }
126  }
127 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
128 
131 
133  void push( const T& source ) {
134  this->internal_push( &source, copy_construct_item );
135  }
136 
137 #if __TBB_CPP11_RVALUE_REF_PRESENT
138  void push( T&& source ) {
139  this->internal_push( &source, move_construct_item );
140  }
141 
142 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
143  template<typename... Arguments>
144  void emplace( Arguments&&... args ) {
145  push( T(std::forward<Arguments>( args )...) );
146  }
147 #endif //__TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
148 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
149 
151 
153  bool try_pop( T& result ) {
154  return this->internal_try_pop( &result );
155  }
156 
158  size_type unsafe_size() const {return this->internal_size();}
159 
161  bool empty() const {return this->internal_empty();}
162 
164  void clear() ;
165 
167  allocator_type get_allocator() const { return this->my_allocator; }
168 
169  typedef internal::concurrent_queue_iterator<concurrent_queue,T> iterator;
170  typedef internal::concurrent_queue_iterator<concurrent_queue,const T> const_iterator;
171 
172  //------------------------------------------------------------------------
173  // The iterators are intended only for debugging. They are slow and not thread safe.
174  //------------------------------------------------------------------------
175  iterator unsafe_begin() {return iterator(*this);}
176  iterator unsafe_end() {return iterator();}
177  const_iterator unsafe_begin() const {return const_iterator(*this);}
178  const_iterator unsafe_end() const {return const_iterator();}
179 } ;
180 
181 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
182 // Deduction guide for the constructor from two iterators
183 template<typename InputIterator,
184  typename T = typename std::iterator_traits<InputIterator>::value_type,
185  typename A = cache_aligned_allocator<T>
186 > concurrent_queue(InputIterator, InputIterator, const A& = A())
188 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
189 
190 template<typename T, class A>
192  clear();
193  this->internal_finish_clear();
194 }
195 
196 template<typename T, class A>
198  T value;
199  while( !empty() ) try_pop(value);
200 }
201 
202 } // namespace strict_ppl
203 
205 
210 template<typename T, class A = cache_aligned_allocator<T> >
211 class concurrent_bounded_queue: public internal::concurrent_queue_base_v8 {
212  template<typename Container, typename Value> friend class internal::concurrent_queue_iterator;
214 
216  page_allocator_type my_allocator;
217 
220 
222  class destroyer: internal::no_copy {
224  public:
225  destroyer( T& value ) : my_value(value) {}
226  ~destroyer() {my_value.~T();}
227  };
228 
229  T& get_ref( page& p, size_t index ) {
230  __TBB_ASSERT( index<items_per_page, NULL );
231  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
232  }
233 
234  virtual void copy_item( page& dst, size_t index, const void* src ) __TBB_override {
235  new( &get_ref(dst,index) ) T(*static_cast<const T*>(src));
236  }
237 
238 #if __TBB_CPP11_RVALUE_REF_PRESENT
239  virtual void move_item( page& dst, size_t index, const void* src ) __TBB_override {
240  new( &get_ref(dst,index) ) T( std::move(*static_cast<T*>(const_cast<void*>(src))) );
241  }
242 #else
243  virtual void move_item( page&, size_t, const void* ) __TBB_override {
244  __TBB_ASSERT( false, "Unreachable code" );
245  }
246 #endif
247 
248  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
249  new( &get_ref(dst,dindex) ) T( get_ref( const_cast<page&>(src), sindex ) );
250  }
251 
252 #if __TBB_CPP11_RVALUE_REF_PRESENT
253  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) __TBB_override {
254  new( &get_ref(dst,dindex) ) T( std::move(get_ref( const_cast<page&>(src), sindex )) );
255  }
256 #else
257  virtual void move_page_item( page&, size_t, const page&, size_t ) __TBB_override {
258  __TBB_ASSERT( false, "Unreachable code" );
259  }
260 #endif
261 
262  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) __TBB_override {
263  T& from = get_ref(src,index);
264  destroyer d(from);
265  *static_cast<T*>(dst) = tbb::internal::move( from );
266  }
267 
269  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
270  page *p = reinterpret_cast<page*>(my_allocator.allocate( n ));
271  if( !p )
273  return p;
274  }
275 
277  size_t n = sizeof(padded_page) + (items_per_page-1)*sizeof(T);
278  my_allocator.deallocate( reinterpret_cast<char*>(p), n );
279  }
280 
281 public:
283  typedef T value_type;
284 
286  typedef A allocator_type;
287 
289  typedef T& reference;
290 
292  typedef const T& const_reference;
293 
295 
297  typedef std::ptrdiff_t size_type;
298 
300  typedef std::ptrdiff_t difference_type;
301 
303  explicit concurrent_bounded_queue(const allocator_type& a = allocator_type()) :
304  concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
305  {
306  }
307 
309  concurrent_bounded_queue( const concurrent_bounded_queue& src, const allocator_type& a = allocator_type())
310  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
311  {
312  assign( src );
313  }
314 
315 #if __TBB_CPP11_RVALUE_REF_PRESENT
319  {
320  internal_swap( src );
321  }
322 
323  concurrent_bounded_queue( concurrent_bounded_queue&& src, const allocator_type& a )
324  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
325  {
326  // checking that memory allocated by one instance of allocator can be deallocated
327  // with another
328  if( my_allocator == src.my_allocator) {
329  this->internal_swap( src );
330  } else {
331  // allocators are different => performing per-element move
332  this->move_content( src );
333  src.clear();
334  }
335  }
336 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
337 
339  template<typename InputIterator>
340  concurrent_bounded_queue( InputIterator begin, InputIterator end,
341  const allocator_type& a = allocator_type())
342  : concurrent_queue_base_v8( sizeof(T) ), my_allocator( a )
343  {
344  for( ; begin != end; ++begin )
345  internal_push_if_not_full(&*begin);
346  }
347 
350 
352  void push( const T& source ) {
353  internal_push( &source );
354  }
355 
356 #if __TBB_CPP11_RVALUE_REF_PRESENT
357  void push( T&& source ) {
359  internal_push_move( &source );
360  }
361 
362 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
363  template<typename... Arguments>
364  void emplace( Arguments&&... args ) {
365  push( T(std::forward<Arguments>( args )...) );
366  }
367 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
368 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
369 
371 
372  void pop( T& destination ) {
373  internal_pop( &destination );
374  }
375 
376 #if TBB_USE_EXCEPTIONS
377  void abort() {
379  internal_abort();
380  }
381 #endif
382 
384 
386  bool try_push( const T& source ) {
387  return internal_push_if_not_full( &source );
388  }
389 
390 #if __TBB_CPP11_RVALUE_REF_PRESENT
391 
394  bool try_push( T&& source ) {
395  return internal_push_move_if_not_full( &source );
396  }
397 #if __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT
398  template<typename... Arguments>
399  bool try_emplace( Arguments&&... args ) {
400  return try_push( T(std::forward<Arguments>( args )...) );
401  }
402 #endif /* __TBB_CPP11_VARIADIC_TEMPLATES_PRESENT */
403 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
404 
406 
408  bool try_pop( T& destination ) {
409  return internal_pop_if_present( &destination );
410  }
411 
413 
416  size_type size() const {return internal_size();}
417 
419  bool empty() const {return internal_empty();}
420 
422  size_type capacity() const {
423  return my_capacity;
424  }
425 
427 
429  void set_capacity( size_type new_capacity ) {
430  internal_set_capacity( new_capacity, sizeof(T) );
431  }
432 
434  allocator_type get_allocator() const { return this->my_allocator; }
435 
437  void clear() ;
438 
439  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,T> iterator;
440  typedef internal::concurrent_queue_iterator<concurrent_bounded_queue,const T> const_iterator;
441 
442  //------------------------------------------------------------------------
443  // The iterators are intended only for debugging. They are slow and not thread safe.
444  //------------------------------------------------------------------------
445  iterator unsafe_begin() {return iterator(*this);}
446  iterator unsafe_end() {return iterator();}
447  const_iterator unsafe_begin() const {return const_iterator(*this);}
448  const_iterator unsafe_end() const {return const_iterator();}
449 
450 };
451 
452 #if __TBB_CPP17_DEDUCTION_GUIDES_PRESENT
453 // guide for concurrent_bounded_queue(InputIterator, InputIterator, ...)
454 template<typename InputIterator,
455  typename T = typename std::iterator_traits<InputIterator>::value_type,
456  typename A = cache_aligned_allocator<T>
457 > concurrent_bounded_queue(InputIterator, InputIterator, const A& = A())
459 #endif /* __TBB_CPP17_DEDUCTION_GUIDES_PRESENT */
460 
461 template<typename T, class A>
463  clear();
465 }
466 
467 template<typename T, class A>
469  T value;
470  while( try_pop(value) ) /*noop*/;
471 }
472 
474 
475 } // namespace tbb
476 
477 #endif /* __TBB_concurrent_queue_H */
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
#define __TBB_override
Definition: tbb_stddef.h:244
static void move_construct_item(T *location, const void *src)
const T & const_reference
Const reference type.
size_t size_type
Integral type for representing size of the queue.
bool try_pop(T &result)
Attempt to dequeue an item from head of queue.
concurrent_bounded_queue(concurrent_bounded_queue &&src, const allocator_type &a)
const_iterator unsafe_end() const
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
bool try_pop(T &destination)
Attempt to dequeue an item from head of queue.
void assign(const concurrent_queue_base_v3 &src, item_constructor_t construct_item)
copy or move internal representation
ptrdiff_t difference_type
Difference type for iterator.
concurrent_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
void clear()
clear the queue. not thread-safe.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
const_iterator unsafe_begin() const
Class used to ensure exception-safety of method "pop".
size_t internal_size() const
Get size of queue; result may be invalid if queue is modified concurrently.
void clear()
Clear the queue. not thread-safe.
tbb::internal::allocator_rebind< A, char >::type page_allocator_type
Allocator type.
void pop(T &destination)
Dequeue item from head of queue.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
virtual page * allocate_page() __TBB_override
custom allocator
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
concurrent_queue(concurrent_queue &&src, const allocator_type &a)
concurrent_bounded_queue(const concurrent_bounded_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
internal::concurrent_queue_iterator< concurrent_queue, T > iterator
static void copy_construct_item(T *location, const void *src)
T & get_ref(page &p, size_t index)
T value_type
Element type in the queue.
concurrent_bounded_queue(InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
[begin,end) constructor
A high-performance thread-safe blocking concurrent bounded queue.
virtual void deallocate_page(page *p) __TBB_override
custom de-allocator
internal::concurrent_queue_iterator< concurrent_bounded_queue, T > iterator
void const char const char int ITT_FORMAT __itt_group_sync p
virtual void copy_item(page &dst, size_t index, const void *src) __TBB_override
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex) __TBB_override
concurrent_queue(const allocator_type &a=allocator_type())
Construct empty queue.
const_iterator unsafe_end() const
concurrent_queue_base_v3::copy_specifics copy_specifics
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
internal::concurrent_queue_iterator< concurrent_bounded_queue, const T > const_iterator
bool internal_empty() const
check if the queue is empty; thread safe
auto last(Container &c) -> decltype(begin(c))
friend class internal::concurrent_queue_iterator
void set_capacity(size_type new_capacity)
Set the capacity.
internal::concurrent_queue_iterator< concurrent_queue, const T > const_iterator
size_type capacity() const
Maximum number of allowed elements.
virtual void move_page_item(page &dst, size_t dindex, const page &src, size_t sindex) __TBB_override
size_type size() const
Return number of pushes minus number of pops.
The graph class.
size_type unsafe_size() const
Return the number of items in the queue; thread unsafe.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
~concurrent_bounded_queue()
Destroy queue.
bool try_emplace(Arguments &&... args)
tbb::internal::allocator_rebind< A, char >::type page_allocator_type
void push(const T &source)
Enqueue an item at tail of queue.
allocator_traits< Alloc >::template rebind_alloc< T >::other type
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309
const_iterator unsafe_begin() const
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
bool try_push(T &&source)
Move an item at tail of queue if queue is not already full.
virtual void deallocate_block(void *b, size_t n) __TBB_override
Deallocates block created by allocate_block.
void internal_swap(concurrent_queue_base_v3 &src)
swap internal representation
bool empty() const
Equivalent to size()==0.
bool empty() const
Equivalent to size()<=0.
const T & const_reference
Const reference type.
concurrent_queue(const concurrent_queue &src, const allocator_type &a=allocator_type())
Copy constructor.
bool try_push(const T &source)
Enqueue an item at tail of queue if queue is not already full.
allocator_type get_allocator() const
Return allocator object.
concurrent_queue_base_v3::padded_page< T > padded_page
void emplace(Arguments &&... args)
concurrent_bounded_queue(const allocator_type &a=allocator_type())
Construct empty queue.
bool internal_try_pop(void *dst)
Attempt to dequeue item from queue.
virtual void * allocate_block(size_t n) __TBB_override
Allocates a block of size n (bytes)
virtual void assign_and_destroy_item(void *dst, page &src, size_t index) __TBB_override
virtual void move_item(page &dst, size_t index, const void *src) __TBB_override
A high-performance thread-safe non-blocking concurrent queue.
T value_type
Element type in the queue.
std::ptrdiff_t difference_type
Difference type for iterator.
void emplace(Arguments &&... args)
page_allocator_type my_allocator
Allocator type.
void push(const T &source)
Enqueue an item at tail of queue.
std::ptrdiff_t size_type
Integral type for representing size of the queue.
allocator_type get_allocator() const
return allocator object

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.