25 #define __TBB_concurrent_queue_H 32 #if defined(_MSC_VER) && defined(_Wp64) 34 #pragma warning (disable: 4267) 37 #define RECORD_EVENTS 0 64 void push(
const void* item, ticket k, concurrent_queue_base& base,
67 void abort_push( ticket k, concurrent_queue_base& base );
69 bool pop(
void* dst, ticket k, concurrent_queue_base& base );
74 page*
make_copy ( concurrent_queue_base& base,
const page* src_page,
size_t begin_in_page,
86 concurrent_queue_base &
base;
89 my_ticket(k), my_queue(queue), my_page(p), base(b)
110 bool operator() ( uintptr_t
p )
const {
return (ticket)p<=t;}
122 static const size_t phi = 3;
126 static const size_t n_queue = 8;
130 return k*phi%n_queue;
145 return array[index(k)];
151 static const ptrdiff_t infinite_capacity = ptrdiff_t(~
size_t(0)/2);
154 #if _MSC_VER && !defined(__INTEL_COMPILER) 156 #pragma warning( push ) 157 #pragma warning( disable: 4146 ) 184 if( tail_counter!=k )
188 else if( tail&0x1 ) {
221 p->
mask |= uintptr_t(1)<<index;
241 bool success =
false;
244 if( p->
mask & uintptr_t(1)<<index ) {
277 cur_page = cur_page->
next;
285 cur_page->
next =
make_copy( base, srcp, 0, last_index, g_index, op_type );
286 cur_page = cur_page->
next;
304 new_page->
next = NULL;
306 for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
307 if( new_page->
mask & uintptr_t(1)<<begin_in_page ) {
309 base.
copy_page_item( *new_page, begin_in_page, *src_page, begin_in_page );
322 static_invalid_page = &dummy;
334 #if _MSC_VER && !defined(__INTEL_COMPILER) 335 #pragma warning( pop ) 336 #endif // warning 4146 is back 342 items_per_page = item_sz<= 8 ? 32 :
348 my_capacity = size_t(-1)/(item_sz>1 ? item_sz : 2);
357 this->item_size = item_sz;
361 size_t nq = my_rep->n_queue;
362 for(
size_t i=0; i<nq; i++ )
363 __TBB_ASSERT( my_rep->array[i].tail_page==NULL,
"pages were not freed properly" );
368 internal_insert_item( src, copy );
372 internal_insert_item( src,
move );
379 ptrdiff_t e = my_capacity;
381 bool sync_prepare_done =
false;
385 if( !sync_prepare_done ) {
386 ITT_NOTIFY( sync_prepare, &sync_prepare_done );
387 sync_prepare_done =
true;
393 while( (ptrdiff_t)(k-r.
head_counter)>=const_cast<volatile ptrdiff_t&>(e = my_capacity) ) {
406 if (slept ==
true)
break;
412 ITT_NOTIFY( sync_acquired, &sync_prepare_done );
422 bool sync_prepare_done =
false;
430 if( !sync_prepare_done ) {
432 sync_prepare_done =
true;
451 if (slept ==
true)
break;
458 }
while( !r.
choose(k).
pop(dst,k,*
this) );
488 }
while( !r.
choose( k ).
pop( dst, k, *
this ) );
496 return internal_insert_if_not_full( src, copy );
500 return internal_insert_if_not_full( src,
move );
524 __TBB_ASSERT(
sizeof(ptrdiff_t)<=
sizeof(
size_t), NULL );
525 return ptrdiff_t(my_rep->tail_counter-my_rep->head_counter-my_rep->n_invalid_entries);
529 ticket tc = my_rep->tail_counter;
530 ticket hc = my_rep->head_counter;
532 return ( tc==my_rep->tail_counter && ptrdiff_t(tc-hc-my_rep->n_invalid_entries)<=0 );
540 size_t nq = my_rep->n_queue;
541 for(
size_t i=0; i<nq; ++i ) {
542 page* tp = my_rep->array[i].tail_page;
543 __TBB_ASSERT( my_rep->array[i].head_page==tp,
"at most one page should remain" );
545 if( tp!=static_invalid_page ) deallocate_page( tp );
546 my_rep->array[i].tail_page = NULL;
566 for(
size_t i = 0; i<my_rep->n_queue; ++i )
567 my_rep->array[i].assign( src.
my_rep->
array[i], *
this, op_type );
570 "the source concurrent queue should not be concurrently modified." );
574 internal_assign( src, copy );
578 internal_assign( src,
move );
591 head_counter(queue.my_rep->head_counter),
593 offset_of_last(offset_of_last_)
608 item =
static_cast<unsigned char*
>(
static_cast<void*
>(
p)) + offset_of_last + my_queue.
item_size*i;
609 return (p->
mask & uintptr_t(1)<<i)!=0;
622 if( !my_rep->get_item(my_item, k) ) advance();
630 initialize(queue,offset_of_last);
634 if( my_rep!=other.
my_rep ) {
648 __TBB_ASSERT( my_item,
"attempt to increment iterator past end of queue" );
649 size_t k = my_rep->head_counter;
653 my_rep->get_item(tmp,k);
662 my_rep->head_counter = ++k;
663 if( !my_rep->get_item(my_item, k) ) advance();
Class that implements exponential backoff.
Primary template for atomic.
const size_t offset_of_last
void __TBB_EXPORTED_METHOD internal_finish_clear()
free any remaining pages
virtual void deallocate_page(page *p)=0
custom de-allocator
concurrent_queue_base_v3 concurrent_queue_base
void make_invalid(ticket k)
static const ptrdiff_t infinite_capacity
Value for effective_capacity that denotes unbounded queue.
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
virtual void copy_page_item(page &dst, size_t dindex, const page &src, size_t sindex)=0
const concurrent_queue_base & my_queue
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
void notify(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate.
atomic< ticket > tail_counter
atomic< size_t > n_invalid_entries
void __TBB_EXPORTED_METHOD internal_push(const void *src)
Enqueue item at tail of queue using copy operation.
Type-independent portion of concurrent_queue_iterator.
micro_queue_pop_finalizer(micro_queue &queue, concurrent_queue_base &b, ticket k, page *p)
Base class for types that should not be assigned.
Exception for user-initiated abort.
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_iterator_base_v3 &i)
Assignment.
concurrent_monitor slots_avail
void __TBB_EXPORTED_METHOD internal_throw_exception() const
throw an exception
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
concurrent_queue_base::page page
void cancel_wait(thread_context &thr)
Cancel the wait. Removes the thread from the wait queue if not removed yet.
void __TBB_EXPORTED_METHOD internal_pop(void *dst)
Dequeue item from head of queue.
Represents acquisition of a mutex.
virtual page * allocate_page()=0
custom allocator
bool commit_wait(thread_context &thr)
Commit wait if event count has not changed; otherwise, cancel wait.
static void * static_invalid_page
bool internal_insert_if_not_full(const void *src, copy_specifics op_type)
Attempts to enqueue at tail of queue using specified operation (copy or move)
void prepare_wait(thread_context &thr, uintptr_t ctx=0)
prepare wait by inserting 'thr' into the wait queue
void __TBB_EXPORTED_METHOD move_content(concurrent_queue_base_v8 &src)
move items
atomic< ticket > head_counter
virtual __TBB_EXPORTED_METHOD ~concurrent_queue_base_v3()
void const char const char int ITT_FORMAT __itt_group_sync p
Internal representation of a ConcurrentQueue.
bool pop(void *dst, ticket k, concurrent_queue_base &base)
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
A queue using simple locking.
concurrent_queue_iterator_rep(const concurrent_queue_base &queue, size_t offset_of_last_)
void abort_all()
Abort any sleeping threads at the time of the call.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
void internal_assign(const concurrent_queue_base_v3 &src, copy_specifics op_type)
Assigns one queue to another using specified operation (copy or move)
atomic< ticket > tail_counter
bool is_aligned(T *pointer, uintptr_t alignment)
A function to check if passed in pointer is aligned on a specific border.
void __TBB_EXPORTED_METHOD advance()
Advance iterator one step towards tail of queue.
value_type compare_and_swap(value_type value, value_type comparand)
void push(const void *item, ticket k, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
micro_queue & choose(ticket k)
bool get_item(void *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
concurrent_queue_rep * my_rep
Internal representation.
atomic< unsigned > abort_counter
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
void internal_insert_item(const void *src, copy_specifics op_type)
Enqueues item at tail of queue using specified operation (copy or move)
__TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3()
Destructor.
void initialize(const concurrent_queue_base_v3 &queue, size_t offset_of_data)
void abort_push(ticket k, concurrent_queue_base &base)
atomic< page * > head_page
virtual void copy_item(page &dst, size_t index, const void *src)=0
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type size_t void ITT_FORMAT p const __itt_domain __itt_id __itt_string_handle const wchar_t size_t ITT_FORMAT lu const __itt_domain __itt_id __itt_relation __itt_id tail
static const size_t n_queue
Must be power of 2.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p sync_releasing
void __TBB_EXPORTED_METHOD internal_push_move(const void *src)
Enqueue item at tail of queue using move operation.
void __TBB_EXPORTED_METHOD internal_abort()
Abort all pending queue operations.
size_t items_per_page
Always a power of 2.
bool __TBB_EXPORTED_METHOD internal_push_if_not_full(const void *src)
Attempt to enqueue item onto queue using copy operation.
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is emtpy.
micro_queue & assign(const micro_queue &src, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
void move(tbb_thread &t1, tbb_thread &t2)
#define ITT_NOTIFY(name, obj)
void * my_item
Pointer to current item.
atomic< page * > tail_page
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
atomic< ticket > head_counter
__TBB_EXPORTED_METHOD concurrent_queue_base_v3(size_t item_size)
ptrdiff_t my_capacity
Capacity of the queue.
bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full(const void *src)
Attempt to enqueue item onto queue using move operation.
bool __TBB_EXPORTED_METHOD internal_pop_if_present(void *dst)
Attempt to dequeue item from queue.
size_t item_size
Size of an item.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
concurrent_monitor items_avail
page * make_copy(concurrent_queue_base &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, concurrent_queue_base::copy_specifics op_type)
A lock that occupies a single byte.
Base class for types that should not be copied or assigned.
~micro_queue_pop_finalizer()
virtual void assign_and_destroy_item(void *dst, page &src, size_t index)=0
static size_t index(ticket k)
Map ticket to an array index.
concurrent_queue_base::page page
concurrent_queue_base & base
void __TBB_EXPORTED_METHOD internal_set_capacity(ptrdiff_t capacity, size_t element_size)
Set the queue capacity.
void pause()
Pause for a while.
micro_queue array[n_queue]
concurrent_queue_iterator_base_v3()
Default constructor.