Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
tbb::interface5::concurrent_priority_queue< T, Compare, A > Class Template Reference

Concurrent priority queue. More...

#include <concurrent_priority_queue.h>

Inheritance diagram for tbb::interface5::concurrent_priority_queue< T, Compare, A >:
Collaboration diagram for tbb::interface5::concurrent_priority_queue< T, Compare, A >:

Classes

class  cpq_operation
 
class  my_functor_t
 

Public Types

typedef T value_type
 Element type in the queue. More...
 
typedef T & reference
 Reference type. More...
 
typedef const T & const_reference
 Const reference type. More...
 
typedef size_t size_type
 Integral type for representing size of the queue. More...
 
typedef ptrdiff_t difference_type
 Difference type for iterator. More...
 
typedef A allocator_type
 Allocator type. More...
 

Public Member Functions

 concurrent_priority_queue (const allocator_type &a=allocator_type())
 Constructs a new concurrent_priority_queue with default capacity. More...
 
 concurrent_priority_queue (size_type init_capacity, const allocator_type &a=allocator_type())
 Constructs a new concurrent_priority_queue with init_sz capacity. More...
 
template<typename InputIterator >
 concurrent_priority_queue (InputIterator begin, InputIterator end, const allocator_type &a=allocator_type())
 [begin,end) constructor More...
 
 concurrent_priority_queue (std::initializer_list< T > init_list, const allocator_type &a=allocator_type())
 Constructor from std::initializer_list. More...
 
 concurrent_priority_queue (const concurrent_priority_queue &src)
 Copy constructor. More...
 
 concurrent_priority_queue (const concurrent_priority_queue &src, const allocator_type &a)
 Copy constructor with specific allocator. More...
 
concurrent_priority_queueoperator= (const concurrent_priority_queue &src)
 Assignment operator. More...
 
 concurrent_priority_queue (concurrent_priority_queue &&src)
 Move constructor. More...
 
 concurrent_priority_queue (concurrent_priority_queue &&src, const allocator_type &a)
 Move constructor with specific allocator. More...
 
concurrent_priority_queueoperator= (concurrent_priority_queue &&src)
 Move assignment operator. More...
 
template<typename InputIterator >
void assign (InputIterator begin, InputIterator end)
 Assign the queue from [begin,end) range, not thread-safe. More...
 
void assign (std::initializer_list< T > il)
 Assign the queue from std::initializer_list, not thread-safe. More...
 
concurrent_priority_queueoperator= (std::initializer_list< T > il)
 Assign from std::initializer_list, not thread-safe. More...
 
bool empty () const
 Returns true if empty, false otherwise. More...
 
size_type size () const
 Returns the current number of elements contained in the queue. More...
 
void push (const_reference elem)
 Pushes elem onto the queue, increasing capacity of queue if necessary. More...
 
void push (value_type &&elem)
 Pushes elem onto the queue, increasing capacity of queue if necessary. More...
 
template<typename... Args>
void emplace (Args &&... args)
 Constructs a new element using args as the arguments for its construction and pushes it onto the queue */. More...
 
bool try_pop (reference elem)
 Gets a reference to and removes highest priority element. More...
 
void clear ()
 Clear the queue; not thread-safe. More...
 
void swap (concurrent_priority_queue &q)
 Swap this queue with another; not thread-safe. More...
 
allocator_type get_allocator () const
 Return allocator object. More...
 

Private Types

enum  operation_type { INVALID_OP, PUSH_OP, POP_OP, PUSH_RVALUE_OP }
 
enum  operation_status { WAIT =0, SUCCEEDED, FAILED }
 
typedef tbb::internal::aggregator< my_functor_t, cpq_operationaggregator_t
 
typedef std::vector< value_type, allocator_typevector_t
 Storage for the heap of elements in queue, plus unheapified elements. More...
 

Private Member Functions

void handle_operations (cpq_operation *op_list)
 
void heapify ()
 Merge unsorted elements into heap. More...
 
void reheap ()
 Re-heapify after an extraction. More...
 
void push_back_helper (const T &t, tbb::internal::true_type)
 
void push_back_helper (const T &, tbb::internal::false_type)
 

Private Attributes

aggregator_t my_aggregator
 
char padding1 [NFS_MaxLineSize - sizeof(aggregator_t)]
 Padding added to avoid false sharing. More...
 
size_type mark
 The point at which unsorted elements begin. More...
 
__TBB_atomic size_type my_size
 
Compare compare
 
char padding2 [NFS_MaxLineSize -(2 *sizeof(size_type)) - sizeof(Compare)]
 Padding added to avoid false sharing. More...
 
vector_t data
 

Detailed Description

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
class tbb::interface5::concurrent_priority_queue< T, Compare, A >

Concurrent priority queue.

Definition at line 67 of file concurrent_priority_queue.h.

Member Typedef Documentation

◆ aggregator_t

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
typedef tbb::internal::aggregator< my_functor_t, cpq_operation > tbb::interface5::concurrent_priority_queue< T, Compare, A >::aggregator_t
private

Definition at line 323 of file concurrent_priority_queue.h.

◆ allocator_type

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
typedef A tbb::interface5::concurrent_priority_queue< T, Compare, A >::allocator_type

Allocator type.

Definition at line 85 of file concurrent_priority_queue.h.

◆ const_reference

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
typedef const T& tbb::interface5::concurrent_priority_queue< T, Compare, A >::const_reference

Const reference type.

Definition at line 76 of file concurrent_priority_queue.h.

◆ difference_type

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
typedef ptrdiff_t tbb::interface5::concurrent_priority_queue< T, Compare, A >::difference_type

Difference type for iterator.

Definition at line 82 of file concurrent_priority_queue.h.

◆ reference

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
typedef T& tbb::interface5::concurrent_priority_queue< T, Compare, A >::reference

Reference type.

Definition at line 73 of file concurrent_priority_queue.h.

◆ size_type

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
typedef size_t tbb::interface5::concurrent_priority_queue< T, Compare, A >::size_type

Integral type for representing size of the queue.

Definition at line 79 of file concurrent_priority_queue.h.

◆ value_type

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
typedef T tbb::interface5::concurrent_priority_queue< T, Compare, A >::value_type

Element type in the queue.

Definition at line 70 of file concurrent_priority_queue.h.

◆ vector_t

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
typedef std::vector<value_type, allocator_type> tbb::interface5::concurrent_priority_queue< T, Compare, A >::vector_t
private

Storage for the heap of elements in queue, plus unheapified elements.

data has the following structure:

binary unheapified heap elements ____|_______|____ | | | v v v [_|...|_|_|...|_| |...| ] 0 ^ ^ ^ | | |__capacity | |__my_size |__mark

Thus, data stores the binary heap starting at position 0 through mark-1 (it may be empty). Then there are 0 or more elements that have not yet been inserted into the heap, in positions mark through my_size-1.

Definition at line 351 of file concurrent_priority_queue.h.

Member Enumeration Documentation

◆ operation_status

◆ operation_type

Constructor & Destructor Documentation

◆ concurrent_priority_queue() [1/8]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
tbb::interface5::concurrent_priority_queue< T, Compare, A >::concurrent_priority_queue ( const allocator_type a = allocator_type())
inlineexplicit

Constructs a new concurrent_priority_queue with default capacity.

Definition at line 88 of file concurrent_priority_queue.h.

◆ concurrent_priority_queue() [2/8]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
tbb::interface5::concurrent_priority_queue< T, Compare, A >::concurrent_priority_queue ( size_type  init_capacity,
const allocator_type a = allocator_type() 
)
inlineexplicit

Constructs a new concurrent_priority_queue with init_sz capacity.

Definition at line 94 of file concurrent_priority_queue.h.

94  :
95  mark(0), my_size(0), data(a)
96  {
97  data.reserve(init_capacity);
98  my_aggregator.initialize_handler(my_functor_t(this));
99  }
size_type mark
The point at which unsorted elements begin.

◆ concurrent_priority_queue() [3/8]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
template<typename InputIterator >
tbb::interface5::concurrent_priority_queue< T, Compare, A >::concurrent_priority_queue ( InputIterator  begin,
InputIterator  end,
const allocator_type a = allocator_type() 
)
inline

[begin,end) constructor

Definition at line 103 of file concurrent_priority_queue.h.

103  :
104  mark(0), data(begin, end, a)
105  {
106  my_aggregator.initialize_handler(my_functor_t(this));
107  heapify();
108  my_size = data.size();
109  }
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
void heapify()
Merge unsorted elements into heap.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
size_type mark
The point at which unsorted elements begin.

◆ concurrent_priority_queue() [4/8]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
tbb::interface5::concurrent_priority_queue< T, Compare, A >::concurrent_priority_queue ( std::initializer_list< T >  init_list,
const allocator_type a = allocator_type() 
)
inline

Constructor from std::initializer_list.

Definition at line 113 of file concurrent_priority_queue.h.

113  :
114  mark(0),data(init_list.begin(), init_list.end(), a)
115  {
116  my_aggregator.initialize_handler(my_functor_t(this));
117  heapify();
118  my_size = data.size();
119  }
void heapify()
Merge unsorted elements into heap.
size_type mark
The point at which unsorted elements begin.

◆ concurrent_priority_queue() [5/8]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
tbb::interface5::concurrent_priority_queue< T, Compare, A >::concurrent_priority_queue ( const concurrent_priority_queue< T, Compare, A > &  src)
inline

Copy constructor.

This operation is unsafe if there are pending concurrent operations on the src queue.

Definition at line 124 of file concurrent_priority_queue.h.

124  : mark(src.mark),
125  my_size(src.my_size), data(src.data.begin(), src.data.end(), src.data.get_allocator())
126  {
127  my_aggregator.initialize_handler(my_functor_t(this));
128  heapify();
129  }
void heapify()
Merge unsorted elements into heap.
size_type mark
The point at which unsorted elements begin.

◆ concurrent_priority_queue() [6/8]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
tbb::interface5::concurrent_priority_queue< T, Compare, A >::concurrent_priority_queue ( const concurrent_priority_queue< T, Compare, A > &  src,
const allocator_type a 
)
inline

Copy constructor with specific allocator.

This operation is unsafe if there are pending concurrent operations on the src queue.

Definition at line 133 of file concurrent_priority_queue.h.

133  : mark(src.mark),
134  my_size(src.my_size), data(src.data.begin(), src.data.end(), a)
135  {
136  my_aggregator.initialize_handler(my_functor_t(this));
137  heapify();
138  }
void heapify()
Merge unsorted elements into heap.
size_type mark
The point at which unsorted elements begin.

◆ concurrent_priority_queue() [7/8]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
tbb::interface5::concurrent_priority_queue< T, Compare, A >::concurrent_priority_queue ( concurrent_priority_queue< T, Compare, A > &&  src)
inline

Move constructor.

This operation is unsafe if there are pending concurrent operations on the src queue.

Definition at line 154 of file concurrent_priority_queue.h.

154  : mark(src.mark),
155  my_size(src.my_size), data(std::move(src.data))
156  {
157  my_aggregator.initialize_handler(my_functor_t(this));
158  }
size_type mark
The point at which unsorted elements begin.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309

◆ concurrent_priority_queue() [8/8]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
tbb::interface5::concurrent_priority_queue< T, Compare, A >::concurrent_priority_queue ( concurrent_priority_queue< T, Compare, A > &&  src,
const allocator_type a 
)
inline

Move constructor with specific allocator.

This operation is unsafe if there are pending concurrent operations on the src queue.

__TBB_ALLOCATOR_TRAITS_PRESENT

Definition at line 162 of file concurrent_priority_queue.h.

162  : mark(src.mark),
163  my_size(src.my_size),
164 #if __TBB_ALLOCATOR_TRAITS_PRESENT
165  data(std::move(src.data), a)
166 #else
167  // Some early version of C++11 STL vector does not have a constructor of vector(vector&& , allocator).
168  // It seems that the reason is absence of support of allocator_traits (stateful allocators).
169  data(a)
170 #endif //__TBB_ALLOCATOR_TRAITS_PRESENT
171  {
172  my_aggregator.initialize_handler(my_functor_t(this));
173 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
174  if (a != src.data.get_allocator()){
175  data.reserve(src.data.size());
176  data.assign(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()));
177  }else{
178  data = std::move(src.data);
179  }
180 #endif
181  }
size_type mark
The point at which unsorted elements begin.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309

Member Function Documentation

◆ assign() [1/2]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
template<typename InputIterator >
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::assign ( InputIterator  begin,
InputIterator  end 
)
inline

Assign the queue from [begin,end) range, not thread-safe.

Definition at line 204 of file concurrent_priority_queue.h.

204  {
205  vector_t(begin, end, data.get_allocator()).swap(data);
206  mark = 0;
207  my_size = data.size();
208  heapify();
209  }
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
std::vector< value_type, allocator_type > vector_t
Storage for the heap of elements in queue, plus unheapified elements.
void heapify()
Merge unsorted elements into heap.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
size_type mark
The point at which unsorted elements begin.
void swap(concurrent_priority_queue &q)
Swap this queue with another; not thread-safe.

◆ assign() [2/2]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::assign ( std::initializer_list< T >  il)
inline

Assign the queue from std::initializer_list, not thread-safe.

Definition at line 213 of file concurrent_priority_queue.h.

213 { this->assign(il.begin(), il.end()); }
void assign(InputIterator begin, InputIterator end)
Assign the queue from [begin,end) range, not thread-safe.

◆ clear()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::clear ( )
inline

Clear the queue; not thread-safe.

This operation is unsafe if there are pending concurrent operations on the queue. Resets size, effectively emptying queue; does not free space. May not clear elements added in pending operations.

Definition at line 279 of file concurrent_priority_queue.h.

279  {
280  data.clear();
281  mark = 0;
282  my_size = 0;
283  }
size_type mark
The point at which unsorted elements begin.

◆ emplace()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
template<typename... Args>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::emplace ( Args &&...  args)
inline

Constructs a new element using args as the arguments for its construction and pushes it onto the queue */.

This operation can be safely used concurrently with other push, try_pop or emplace operations.

Definition at line 258 of file concurrent_priority_queue.h.

258  {
259  push(value_type(std::forward<Args>(args)...));
260  }
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.

◆ empty()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
bool tbb::interface5::concurrent_priority_queue< T, Compare, A >::empty ( ) const
inline

Returns true if empty, false otherwise.

Returned value may not reflect results of pending operations. This operation reads shared data and will trigger a race condition.

Definition at line 225 of file concurrent_priority_queue.h.

225 { return size()==0; }
size_type size() const
Returns the current number of elements contained in the queue.

◆ get_allocator()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
allocator_type tbb::interface5::concurrent_priority_queue< T, Compare, A >::get_allocator ( ) const
inline

Return allocator object.

Definition at line 295 of file concurrent_priority_queue.h.

295 { return data.get_allocator(); }

◆ handle_operations()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::handle_operations ( cpq_operation op_list)
inlineprivate

Definition at line 354 of file concurrent_priority_queue.h.

Referenced by tbb::interface5::concurrent_priority_queue< T, Compare, A >::my_functor_t::operator()().

354  {
355  cpq_operation *tmp, *pop_list=NULL;
356 
357  __TBB_ASSERT(mark == data.size(), NULL);
358 
359  // First pass processes all constant (amortized; reallocation may happen) time pushes and pops.
360  while (op_list) {
361  // ITT note: &(op_list->status) tag is used to cover accesses to op_list
362  // node. This thread is going to handle the operation, and so will acquire it
363  // and perform the associated operation w/o triggering a race condition; the
364  // thread that created the operation is waiting on the status field, so when
365  // this thread is done with the operation, it will perform a
366  // store_with_release to give control back to the waiting thread in
367  // aggregator::insert_operation.
368  call_itt_notify(acquired, &(op_list->status));
369  __TBB_ASSERT(op_list->type != INVALID_OP, NULL);
370  tmp = op_list;
371  op_list = itt_hide_load_word(op_list->next);
372  if (tmp->type == POP_OP) {
373  if (mark < data.size() &&
374  compare(data[0], data[data.size()-1])) {
375  // there are newly pushed elems and the last one
376  // is higher than top
377  *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
379  itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
380  data.pop_back();
381  __TBB_ASSERT(mark<=data.size(), NULL);
382  }
383  else { // no convenient item to pop; postpone
384  itt_hide_store_word(tmp->next, pop_list);
385  pop_list = tmp;
386  }
387  } else { // PUSH_OP or PUSH_RVALUE_OP
388  __TBB_ASSERT(tmp->type == PUSH_OP || tmp->type == PUSH_RVALUE_OP, "Unknown operation" );
389  __TBB_TRY{
390  if (tmp->type == PUSH_OP) {
392  } else {
393  data.push_back(tbb::internal::move(*(tmp->elem)));
394  }
396  itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
397  } __TBB_CATCH(...) {
398  itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
399  }
400  }
401  }
402 
403  // second pass processes pop operations
404  while (pop_list) {
405  tmp = pop_list;
406  pop_list = itt_hide_load_word(pop_list->next);
407  __TBB_ASSERT(tmp->type == POP_OP, NULL);
408  if (data.empty()) {
409  itt_store_word_with_release(tmp->status, uintptr_t(FAILED));
410  }
411  else {
412  __TBB_ASSERT(mark<=data.size(), NULL);
413  if (mark < data.size() &&
414  compare(data[0], data[data.size()-1])) {
415  // there are newly pushed elems and the last one is
416  // higher than top
417  *(tmp->elem) = tbb::internal::move(data[data.size()-1]);
419  itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
420  data.pop_back();
421  }
422  else { // extract top and push last element down heap
423  *(tmp->elem) = tbb::internal::move(data[0]);
425  itt_store_word_with_release(tmp->status, uintptr_t(SUCCEEDED));
426  reheap();
427  }
428  }
429  }
430 
431  // heapify any leftover pushed elements before doing the next
432  // batch of operations
433  if (mark<data.size()) heapify();
434  __TBB_ASSERT(mark == data.size(), NULL);
435  }
T itt_hide_load_word(const T &src)
void heapify()
Merge unsorted elements into heap.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
void reheap()
Re-heapify after an extraction.
#define __TBB_TRY
Definition: tbb_stddef.h:287
size_type mark
The point at which unsorted elements begin.
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:288
void push_back_helper(const T &t, tbb::internal::true_type)
void call_itt_notify(notify_type, void *)
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309
void itt_hide_store_word(T &dst, T src)
void __TBB_store_with_release(volatile T &location, V value)
Definition: tbb_machine.h:720
Here is the caller graph for this function:

◆ heapify()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::heapify ( )
inlineprivate

Merge unsorted elements into heap.

Definition at line 438 of file concurrent_priority_queue.h.

438  {
439  if (!mark && data.size()>0) mark = 1;
440  for (; mark<data.size(); ++mark) {
441  // for each unheapified element under size
442  size_type cur_pos = mark;
444  do { // push to_place up the heap
445  size_type parent = (cur_pos-1)>>1;
446  if (!compare(data[parent], to_place)) break;
447  data[cur_pos] = tbb::internal::move(data[parent]);
448  cur_pos = parent;
449  } while( cur_pos );
450  data[cur_pos] = tbb::internal::move(to_place);
451  }
452  }
size_t size_type
Integral type for representing size of the queue.
size_type mark
The point at which unsorted elements begin.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id parent
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309

◆ operator=() [1/3]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
concurrent_priority_queue& tbb::interface5::concurrent_priority_queue< T, Compare, A >::operator= ( const concurrent_priority_queue< T, Compare, A > &  src)
inline

Assignment operator.

This operation is unsafe if there are pending concurrent operations on the src queue.

Definition at line 142 of file concurrent_priority_queue.h.

142  {
143  if (this != &src) {
144  vector_t(src.data.begin(), src.data.end(), src.data.get_allocator()).swap(data);
145  mark = src.mark;
146  my_size = src.my_size;
147  }
148  return *this;
149  }
std::vector< value_type, allocator_type > vector_t
Storage for the heap of elements in queue, plus unheapified elements.
size_type mark
The point at which unsorted elements begin.
void swap(concurrent_priority_queue &q)
Swap this queue with another; not thread-safe.

◆ operator=() [2/3]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
concurrent_priority_queue& tbb::interface5::concurrent_priority_queue< T, Compare, A >::operator= ( concurrent_priority_queue< T, Compare, A > &&  src)
inline

Move assignment operator.

This operation is unsafe if there are pending concurrent operations on the src queue.

__TBB_ALLOCATOR_TRAITS_PRESENT

Definition at line 185 of file concurrent_priority_queue.h.

185  {
186  if (this != &src) {
187  mark = src.mark;
188  my_size = src.my_size;
189 #if !__TBB_ALLOCATOR_TRAITS_PRESENT
190  if (data.get_allocator() != src.data.get_allocator()){
191  vector_t(std::make_move_iterator(src.data.begin()), std::make_move_iterator(src.data.end()), data.get_allocator()).swap(data);
192  }else
193 #endif
194  {
195  data = std::move(src.data);
196  }
197  }
198  return *this;
199  }
std::vector< value_type, allocator_type > vector_t
Storage for the heap of elements in queue, plus unheapified elements.
size_type mark
The point at which unsorted elements begin.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309
void swap(concurrent_priority_queue &q)
Swap this queue with another; not thread-safe.

◆ operator=() [3/3]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
concurrent_priority_queue& tbb::interface5::concurrent_priority_queue< T, Compare, A >::operator= ( std::initializer_list< T >  il)
inline

Assign from std::initializer_list, not thread-safe.

Definition at line 216 of file concurrent_priority_queue.h.

216  {
217  this->assign(il.begin(), il.end());
218  return *this;
219  }
void assign(InputIterator begin, InputIterator end)
Assign the queue from [begin,end) range, not thread-safe.

◆ push() [1/2]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::push ( const_reference  elem)
inline

Pushes elem onto the queue, increasing capacity of queue if necessary.

This operation can be safely used concurrently with other push, try_pop or emplace operations.

Definition at line 234 of file concurrent_priority_queue.h.

Referenced by tbb::flow::interface10::internal::spawn_in_graph_arena().

234  {
235 #if __TBB_CPP11_IS_COPY_CONSTRUCTIBLE_PRESENT
236  __TBB_STATIC_ASSERT( std::is_copy_constructible<value_type>::value, "The type is not copy constructible. Copying push operation is impossible." );
237 #endif
238  cpq_operation op_data(elem, PUSH_OP);
239  my_aggregator.execute(&op_data);
240  if (op_data.status == FAILED) // exception thrown
242  }
#define __TBB_STATIC_ASSERT(condition, msg)
Definition: tbb_stddef.h:536
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
Here is the caller graph for this function:

◆ push() [2/2]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::push ( value_type &&  elem)
inline

Pushes elem onto the queue, increasing capacity of queue if necessary.

This operation can be safely used concurrently with other push, try_pop or emplace operations.

Definition at line 247 of file concurrent_priority_queue.h.

247  {
248  cpq_operation op_data(elem, PUSH_RVALUE_OP);
249  my_aggregator.execute(&op_data);
250  if (op_data.status == FAILED) // exception thrown
252  }
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()

◆ push_back_helper() [1/2]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::push_back_helper ( const T &  t,
tbb::internal::true_type   
)
inlineprivate

Definition at line 475 of file concurrent_priority_queue.h.

475  {
476  data.push_back(t);
477  }

◆ push_back_helper() [2/2]

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::push_back_helper ( const T &  ,
tbb::internal::false_type   
)
inlineprivate

Definition at line 479 of file concurrent_priority_queue.h.

479  {
480  __TBB_ASSERT( false, "The type is not copy constructible. Copying push operation is impossible." );
481  }
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169

◆ reheap()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::reheap ( )
inlineprivate

Re-heapify after an extraction.

Re-heapify by pushing last element down the heap from the root.

Definition at line 456 of file concurrent_priority_queue.h.

456  {
457  size_type cur_pos=0, child=1;
458 
459  while (child < mark) {
460  size_type target = child;
461  if (child+1 < mark && compare(data[child], data[child+1]))
462  ++target;
463  // target now has the higher priority child
464  if (compare(data[target], data[data.size()-1])) break;
465  data[cur_pos] = tbb::internal::move(data[target]);
466  cur_pos = target;
467  child = (cur_pos<<1)+1;
468  }
469  if (cur_pos != data.size()-1)
470  data[cur_pos] = tbb::internal::move(data[data.size()-1]);
471  data.pop_back();
472  if (mark > data.size()) mark = data.size();
473  }
size_t size_type
Integral type for representing size of the queue.
size_type mark
The point at which unsorted elements begin.
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309

◆ size()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
size_type tbb::interface5::concurrent_priority_queue< T, Compare, A >::size ( ) const
inline

Returns the current number of elements contained in the queue.

Returned value may not reflect results of pending operations. This operation reads shared data and will trigger a race condition.

Definition at line 230 of file concurrent_priority_queue.h.

230 { return __TBB_load_with_acquire(my_size); }
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:716

◆ swap()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
void tbb::interface5::concurrent_priority_queue< T, Compare, A >::swap ( concurrent_priority_queue< T, Compare, A > &  q)
inline

Swap this queue with another; not thread-safe.

This operation is unsafe if there are pending concurrent operations on the queue.

Definition at line 287 of file concurrent_priority_queue.h.

287  {
288  using std::swap;
289  data.swap(q.data);
290  swap(mark, q.mark);
291  swap(my_size, q.my_size);
292  }
void swap(atomic< T > &lhs, atomic< T > &rhs)
Definition: atomic.h:539
size_type mark
The point at which unsorted elements begin.
void swap(concurrent_priority_queue &q)
Swap this queue with another; not thread-safe.

◆ try_pop()

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
bool tbb::interface5::concurrent_priority_queue< T, Compare, A >::try_pop ( reference  elem)
inline

Gets a reference to and removes highest priority element.

If a highest priority element was found, sets elem and returns true, otherwise returns false. This operation can be safely used concurrently with other push, try_pop or emplace operations.

Definition at line 268 of file concurrent_priority_queue.h.

Member Data Documentation

◆ compare

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
Compare tbb::interface5::concurrent_priority_queue< T, Compare, A >::compare
private

Definition at line 330 of file concurrent_priority_queue.h.

◆ data

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
vector_t tbb::interface5::concurrent_priority_queue< T, Compare, A >::data
private

◆ mark

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
size_type tbb::interface5::concurrent_priority_queue< T, Compare, A >::mark
private

◆ my_aggregator

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
aggregator_t tbb::interface5::concurrent_priority_queue< T, Compare, A >::my_aggregator
private

Definition at line 324 of file concurrent_priority_queue.h.

◆ my_size

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
__TBB_atomic size_type tbb::interface5::concurrent_priority_queue< T, Compare, A >::my_size
private

◆ padding1

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
char tbb::interface5::concurrent_priority_queue< T, Compare, A >::padding1[NFS_MaxLineSize - sizeof(aggregator_t)]
private

Padding added to avoid false sharing.

Definition at line 326 of file concurrent_priority_queue.h.

◆ padding2

template<typename T, typename Compare = std::less<T>, typename A = cache_aligned_allocator<T>>
char tbb::interface5::concurrent_priority_queue< T, Compare, A >::padding2[NFS_MaxLineSize -(2 *sizeof(size_type)) - sizeof(Compare)]
private

Padding added to avoid false sharing.

Definition at line 332 of file concurrent_priority_queue.h.


The documentation for this class was generated from the following file:

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.