Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_concurrent_queue_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB__concurrent_queue_impl_H
22 #define __TBB__concurrent_queue_impl_H
23 
24 #ifndef __TBB_concurrent_queue_H
25 #error Do not #include this internal file directly; use public TBB headers instead.
26 #endif
27 
28 #include "../tbb_stddef.h"
29 #include "../tbb_machine.h"
30 #include "../atomic.h"
31 #include "../spin_mutex.h"
32 #include "../cache_aligned_allocator.h"
33 #include "../tbb_exception.h"
34 #include "../tbb_profiling.h"
35 #include <new>
36 #include __TBB_STD_SWAP_HEADER
37 #include <iterator>
38 
39 namespace tbb {
40 
41 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
42 
43 // forward declaration
44 namespace strict_ppl {
45 template<typename T, typename A> class concurrent_queue;
46 }
47 
48 template<typename T, typename A> class concurrent_bounded_queue;
49 
50 #endif
51 
53 namespace strict_ppl {
54 
56 namespace internal {
57 
58 using namespace tbb::internal;
59 
60 typedef size_t ticket;
61 
62 template<typename T> class micro_queue ;
63 template<typename T> class micro_queue_pop_finalizer ;
64 template<typename T> class concurrent_queue_base_v3;
65 template<typename T> struct concurrent_queue_rep;
66 
68 
72  template<typename T> friend class micro_queue;
73  template<typename T> friend class concurrent_queue_base_v3;
74 
75 protected:
77  static const size_t phi = 3;
78 
79 public:
80  // must be power of 2
81  static const size_t n_queue = 8;
82 
84  struct page {
86  uintptr_t mask;
87  };
88 
90  char pad1[NFS_MaxLineSize-sizeof(atomic<ticket>)];
92  char pad2[NFS_MaxLineSize-sizeof(atomic<ticket>)];
93 
96 
98  size_t item_size;
99 
102 
103  char pad3[NFS_MaxLineSize-sizeof(size_t)-sizeof(size_t)-sizeof(atomic<size_t>)];
104 } ;
105 
107  return uintptr_t(p)>1;
108 }
109 
111 
115 {
116  template<typename T> friend class micro_queue ;
117  template<typename T> friend class micro_queue_pop_finalizer ;
118 protected:
120 private:
121  virtual concurrent_queue_rep_base::page* allocate_page() = 0;
122  virtual void deallocate_page( concurrent_queue_rep_base::page* p ) = 0;
123 } ;
124 
125 #if _MSC_VER && !defined(__INTEL_COMPILER)
126 // unary minus operator applied to unsigned type, result still unsigned
127 #pragma warning( push )
128 #pragma warning( disable: 4146 )
129 #endif
130 
132 
134 template<typename T>
135 class micro_queue : no_copy {
136 public:
137  typedef void (*item_constructor_t)(T* location, const void* src);
138 private:
140 
144  public:
145  destroyer( T& value ) : my_value(value) {}
146  ~destroyer() {my_value.~T();}
147  };
148 
149  void copy_item( page& dst, size_t dindex, const void* src, item_constructor_t construct_item ) {
150  construct_item( &get_ref(dst, dindex), src );
151  }
152 
153  void copy_item( page& dst, size_t dindex, const page& src, size_t sindex,
154  item_constructor_t construct_item )
155  {
156  T& src_item = get_ref( const_cast<page&>(src), sindex );
157  construct_item( &get_ref(dst, dindex), static_cast<const void*>(&src_item) );
158  }
159 
160  void assign_and_destroy_item( void* dst, page& src, size_t index ) {
161  T& from = get_ref(src,index);
162  destroyer d(from);
163  *static_cast<T*>(dst) = tbb::internal::move( from );
164  }
165 
166  void spin_wait_until_my_turn( atomic<ticket>& counter, ticket k, concurrent_queue_rep_base& rb ) const ;
167 
168 public:
169  friend class micro_queue_pop_finalizer<T>;
170 
171  struct padded_page: page {
173  padded_page();
175  void operator=( const padded_page& );
177  T last;
178  };
179 
180  static T& get_ref( page& p, size_t index ) {
181  return (&static_cast<padded_page*>(static_cast<void*>(&p))->last)[index];
182  }
183 
186 
189 
191 
192  void push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
193  item_constructor_t construct_item ) ;
194 
195  bool pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) ;
196 
197  micro_queue& assign( const micro_queue& src, concurrent_queue_base_v3<T>& base,
198  item_constructor_t construct_item ) ;
199 
200  page* make_copy( concurrent_queue_base_v3<T>& base, const page* src_page, size_t begin_in_page,
201  size_t end_in_page, ticket& g_index, item_constructor_t construct_item ) ;
202 
203  void invalidate_page_and_rethrow( ticket k ) ;
204 };
205 
206 template<typename T>
208  for( atomic_backoff b(true);;b.pause() ) {
209  ticket c = counter;
210  if( c==k ) return;
211  else if( c&1 ) {
212  ++rb.n_invalid_entries;
214  }
215  }
216 }
217 
218 template<typename T>
219 void micro_queue<T>::push( const void* item, ticket k, concurrent_queue_base_v3<T>& base,
220  item_constructor_t construct_item )
221 {
222  k &= -concurrent_queue_rep_base::n_queue;
223  page* p = NULL;
224  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page);
225  if( !index ) {
226  __TBB_TRY {
228  p = pa.allocate_page();
229  } __TBB_CATCH (...) {
230  ++base.my_rep->n_invalid_entries;
231  invalidate_page_and_rethrow( k );
232  }
233  p->mask = 0;
234  p->next = NULL;
235  }
236 
237  if( tail_counter != k ) spin_wait_until_my_turn( tail_counter, k, *base.my_rep );
238  call_itt_notify(acquired, &tail_counter);
239 
240  if( p ) {
241  spin_mutex::scoped_lock lock( page_mutex );
242  page* q = tail_page;
243  if( is_valid_page(q) )
244  q->next = p;
245  else
246  head_page = p;
247  tail_page = p;
248  } else {
249  p = tail_page;
250  }
251 
252  __TBB_TRY {
253  copy_item( *p, index, item, construct_item );
254  // If no exception was thrown, mark item as present.
255  itt_hide_store_word(p->mask, p->mask | uintptr_t(1)<<index);
256  call_itt_notify(releasing, &tail_counter);
257  tail_counter += concurrent_queue_rep_base::n_queue;
258  } __TBB_CATCH (...) {
259  ++base.my_rep->n_invalid_entries;
260  call_itt_notify(releasing, &tail_counter);
261  tail_counter += concurrent_queue_rep_base::n_queue;
262  __TBB_RETHROW();
263  }
264 }
265 
266 template<typename T>
267 bool micro_queue<T>::pop( void* dst, ticket k, concurrent_queue_base_v3<T>& base ) {
268  k &= -concurrent_queue_rep_base::n_queue;
269  if( head_counter!=k ) spin_wait_until_eq( head_counter, k );
270  call_itt_notify(acquired, &head_counter);
271  if( tail_counter==k ) spin_wait_while_eq( tail_counter, k );
272  call_itt_notify(acquired, &tail_counter);
273  page *p = head_page;
274  __TBB_ASSERT( p, NULL );
275  size_t index = modulo_power_of_two( k/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
276  bool success = false;
277  {
278  micro_queue_pop_finalizer<T> finalizer( *this, base, k+concurrent_queue_rep_base::n_queue, index==base.my_rep->items_per_page-1 ? p : NULL );
279  if( p->mask & uintptr_t(1)<<index ) {
280  success = true;
281  assign_and_destroy_item( dst, *p, index );
282  } else {
283  --base.my_rep->n_invalid_entries;
284  }
285  }
286  return success;
287 }
288 
289 template<typename T>
291  item_constructor_t construct_item )
292 {
293  head_counter = src.head_counter;
294  tail_counter = src.tail_counter;
295 
296  const page* srcp = src.head_page;
297  if( is_valid_page(srcp) ) {
298  ticket g_index = head_counter;
299  __TBB_TRY {
300  size_t n_items = (tail_counter-head_counter)/concurrent_queue_rep_base::n_queue;
301  size_t index = modulo_power_of_two( head_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
302  size_t end_in_first_page = (index+n_items<base.my_rep->items_per_page)?(index+n_items):base.my_rep->items_per_page;
303 
304  head_page = make_copy( base, srcp, index, end_in_first_page, g_index, construct_item );
305  page* cur_page = head_page;
306 
307  if( srcp != src.tail_page ) {
308  for( srcp = srcp->next; srcp!=src.tail_page; srcp=srcp->next ) {
309  cur_page->next = make_copy( base, srcp, 0, base.my_rep->items_per_page, g_index, construct_item );
310  cur_page = cur_page->next;
311  }
312 
313  __TBB_ASSERT( srcp==src.tail_page, NULL );
314  size_t last_index = modulo_power_of_two( tail_counter/concurrent_queue_rep_base::n_queue, base.my_rep->items_per_page );
315  if( last_index==0 ) last_index = base.my_rep->items_per_page;
316 
317  cur_page->next = make_copy( base, srcp, 0, last_index, g_index, construct_item );
318  cur_page = cur_page->next;
319  }
320  tail_page = cur_page;
321  } __TBB_CATCH (...) {
322  invalidate_page_and_rethrow( g_index );
323  }
324  } else {
325  head_page = tail_page = NULL;
326  }
327  return *this;
328 }
329 
330 template<typename T>
332  // Append an invalid page at address 1 so that no more pushes are allowed.
333  page* invalid_page = (page*)uintptr_t(1);
334  {
335  spin_mutex::scoped_lock lock( page_mutex );
336  itt_store_word_with_release(tail_counter, k+concurrent_queue_rep_base::n_queue+1);
337  page* q = tail_page;
338  if( is_valid_page(q) )
339  q->next = invalid_page;
340  else
341  head_page = invalid_page;
342  tail_page = invalid_page;
343  }
344  __TBB_RETHROW();
345 }
346 
347 template<typename T>
349  const concurrent_queue_rep_base::page* src_page, size_t begin_in_page, size_t end_in_page,
350  ticket& g_index, item_constructor_t construct_item )
351 {
353  page* new_page = pa.allocate_page();
354  new_page->next = NULL;
355  new_page->mask = src_page->mask;
356  for( ; begin_in_page!=end_in_page; ++begin_in_page, ++g_index )
357  if( new_page->mask & uintptr_t(1)<<begin_in_page )
358  copy_item( *new_page, begin_in_page, *src_page, begin_in_page, construct_item );
359  return new_page;
360 }
361 
362 template<typename T>
365  ticket my_ticket;
367  page* my_page;
369 public:
371  my_ticket(k), my_queue(queue), my_page(p), allocator(b)
372  {}
374 };
375 
376 template<typename T>
378  page* p = my_page;
379  if( is_valid_page(p) ) {
380  spin_mutex::scoped_lock lock( my_queue.page_mutex );
381  page* q = p->next;
382  my_queue.head_page = q;
383  if( !is_valid_page(q) ) {
384  my_queue.tail_page = NULL;
385  }
386  }
387  itt_store_word_with_release(my_queue.head_counter, my_ticket);
388  if( is_valid_page(p) ) {
389  allocator.deallocate_page( p );
390  }
391 }
392 
393 #if _MSC_VER && !defined(__INTEL_COMPILER)
394 #pragma warning( pop )
395 #endif // warning 4146 is back
396 
397 template<typename T> class concurrent_queue_iterator_rep ;
398 template<typename T> class concurrent_queue_iterator_base_v3;
399 
401 
404 template<typename T>
406  micro_queue<T> array[n_queue];
407 
409  static size_t index( ticket k ) {
410  return k*phi%n_queue;
411  }
412 
413  micro_queue<T>& choose( ticket k ) {
414  // The formula here approximates LRU in a cache-oblivious way.
415  return array[index(k)];
416  }
417 };
418 
420 
424 template<typename T>
426 private:
429 
430  friend struct concurrent_queue_rep<T>;
431  friend class micro_queue<T>;
434 
435 protected:
437 
438 private:
441 
442  virtual page *allocate_page() __TBB_override {
443  concurrent_queue_rep<T>& r = *my_rep;
444  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
445  return reinterpret_cast<page*>(allocate_block ( n ));
446  }
447 
449  concurrent_queue_rep<T>& r = *my_rep;
450  size_t n = sizeof(padded_page) + (r.items_per_page-1)*sizeof(T);
451  deallocate_block( reinterpret_cast<void*>(p), n );
452  }
453 
455  virtual void *allocate_block( size_t n ) = 0;
456 
458  virtual void deallocate_block( void *p, size_t n ) = 0;
459 
460 protected:
462 
464 #if TBB_USE_ASSERT
465  size_t nq = my_rep->n_queue;
466  for( size_t i=0; i<nq; i++ )
467  __TBB_ASSERT( my_rep->array[i].tail_page==NULL, "pages were not freed properly" );
468 #endif /* TBB_USE_ASSERT */
469  cache_aligned_allocator<concurrent_queue_rep<T> >().deallocate(my_rep,1);
470  }
471 
473  void internal_push( const void* src, item_constructor_t construct_item ) {
474  concurrent_queue_rep<T>& r = *my_rep;
475  ticket k = r.tail_counter++;
476  r.choose(k).push( src, k, *this, construct_item );
477  }
478 
480 
481  bool internal_try_pop( void* dst ) ;
482 
484  size_t internal_size() const ;
485 
487  bool internal_empty() const ;
488 
490  /* note that the name may be misleading, but it remains so due to a historical accident. */
491  void internal_finish_clear() ;
492 
496  }
497 
499  void assign( const concurrent_queue_base_v3& src, item_constructor_t construct_item ) ;
500 
501 #if __TBB_CPP11_RVALUE_REF_PRESENT
502  void internal_swap( concurrent_queue_base_v3& src ) {
504  std::swap( my_rep, src.my_rep );
505  }
506 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
507 };
508 
509 template<typename T>
511  const size_t item_size = sizeof(T);
512  my_rep = cache_aligned_allocator<concurrent_queue_rep<T> >().allocate(1);
513  __TBB_ASSERT( (size_t)my_rep % NFS_GetLineSize()==0, "alignment error" );
514  __TBB_ASSERT( (size_t)&my_rep->head_counter % NFS_GetLineSize()==0, "alignment error" );
515  __TBB_ASSERT( (size_t)&my_rep->tail_counter % NFS_GetLineSize()==0, "alignment error" );
516  __TBB_ASSERT( (size_t)&my_rep->array % NFS_GetLineSize()==0, "alignment error" );
517  memset(static_cast<void*>(my_rep),0,sizeof(concurrent_queue_rep<T>));
518  my_rep->item_size = item_size;
519  my_rep->items_per_page = item_size<= 8 ? 32 :
520  item_size<= 16 ? 16 :
521  item_size<= 32 ? 8 :
522  item_size<= 64 ? 4 :
523  item_size<=128 ? 2 :
524  1;
525 }
526 
527 template<typename T>
529  concurrent_queue_rep<T>& r = *my_rep;
530  ticket k;
531  do {
532  k = r.head_counter;
533  for(;;) {
534  if( (ptrdiff_t)(r.tail_counter-k)<=0 ) {
535  // Queue is empty
536  return false;
537  }
538  // Queue had item with ticket k when we looked. Attempt to get that item.
539  ticket tk=k;
540 #if defined(_MSC_VER) && defined(_Wp64)
541  #pragma warning (push)
542  #pragma warning (disable: 4267)
543 #endif
544  k = r.head_counter.compare_and_swap( tk+1, tk );
545 #if defined(_MSC_VER) && defined(_Wp64)
546  #pragma warning (pop)
547 #endif
548  if( k==tk )
549  break;
550  // Another thread snatched the item, retry.
551  }
552  } while( !r.choose( k ).pop( dst, k, *this ) );
553  return true;
554 }
555 
556 template<typename T>
558  concurrent_queue_rep<T>& r = *my_rep;
559  __TBB_ASSERT( sizeof(ptrdiff_t)<=sizeof(size_t), NULL );
560  ticket hc = r.head_counter;
561  size_t nie = r.n_invalid_entries;
562  ticket tc = r.tail_counter;
563  __TBB_ASSERT( hc!=tc || !nie, NULL );
564  ptrdiff_t sz = tc-hc-nie;
565  return sz<0 ? 0 : size_t(sz);
566 }
567 
568 template<typename T>
570  concurrent_queue_rep<T>& r = *my_rep;
571  ticket tc = r.tail_counter;
572  ticket hc = r.head_counter;
573  // if tc!=r.tail_counter, the queue was not empty at some point between the two reads.
574  return tc==r.tail_counter && tc==hc+r.n_invalid_entries ;
575 }
576 
577 template<typename T>
579  concurrent_queue_rep<T>& r = *my_rep;
580  size_t nq = r.n_queue;
581  for( size_t i=0; i<nq; ++i ) {
582  page* tp = r.array[i].tail_page;
583  if( is_valid_page(tp) ) {
584  __TBB_ASSERT( r.array[i].head_page==tp, "at most one page should remain" );
585  deallocate_page( tp );
586  r.array[i].tail_page = NULL;
587  } else
588  __TBB_ASSERT( !is_valid_page(r.array[i].head_page), "head page pointer corrupt?" );
589  }
590 }
591 
592 template<typename T>
594  item_constructor_t construct_item )
595 {
596  concurrent_queue_rep<T>& r = *my_rep;
597  r.items_per_page = src.my_rep->items_per_page;
598 
599  // copy concurrent_queue_rep data
600  r.head_counter = src.my_rep->head_counter;
601  r.tail_counter = src.my_rep->tail_counter;
602  r.n_invalid_entries = src.my_rep->n_invalid_entries;
603 
604  // copy or move micro_queues
605  for( size_t i = 0; i < r.n_queue; ++i )
606  r.array[i].assign( src.my_rep->array[i], *this, construct_item);
607 
608  __TBB_ASSERT( r.head_counter==src.my_rep->head_counter && r.tail_counter==src.my_rep->tail_counter,
609  "the source concurrent queue should not be concurrently modified." );
610 }
611 
612 template<typename Container, typename Value> class concurrent_queue_iterator;
613 
614 template<typename T>
617 public:
618  ticket head_counter;
622  head_counter(queue.my_rep->head_counter),
623  my_queue(queue)
624  {
625  for( size_t k=0; k<concurrent_queue_rep<T>::n_queue; ++k )
626  array[k] = queue.my_rep->array[k].head_page;
627  }
628 
630  bool get_item( T*& item, size_t k ) ;
631 };
632 
633 template<typename T>
634 bool concurrent_queue_iterator_rep<T>::get_item( T*& item, size_t k ) {
635  if( k==my_queue.my_rep->tail_counter ) {
636  item = NULL;
637  return true;
638  } else {
640  __TBB_ASSERT(p,NULL);
641  size_t i = modulo_power_of_two( k/concurrent_queue_rep<T>::n_queue, my_queue.my_rep->items_per_page );
642  item = &micro_queue<T>::get_ref(*p,i);
643  return (p->mask & uintptr_t(1)<<i)!=0;
644  }
645 }
646 
648 
649 template<typename Value>
652 
654 
655  template<typename C, typename T, typename U>
657 
658  template<typename C, typename T, typename U>
660 protected:
662  Value* my_item;
663 
665  concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {
666 #if __TBB_GCC_OPTIMIZER_ORDERING_BROKEN
668 #endif
669  }
670 
673  : no_assign(), my_rep(NULL), my_item(NULL) {
674  assign(i);
675  }
676 
679 
681  void assign( const concurrent_queue_iterator_base_v3<Value>& other ) ;
682 
684  void advance() ;
685 
689  my_rep = NULL;
690  }
691 };
692 
693 template<typename Value>
696  new( my_rep ) concurrent_queue_iterator_rep<Value>(queue);
697  size_t k = my_rep->head_counter;
698  if( !my_rep->get_item(my_item, k) ) advance();
699 }
700 
701 template<typename Value>
703  if( my_rep!=other.my_rep ) {
704  if( my_rep ) {
706  my_rep = NULL;
707  }
708  if( other.my_rep ) {
710  new( my_rep ) concurrent_queue_iterator_rep<Value>( *other.my_rep );
711  }
712  }
713  my_item = other.my_item;
714 }
715 
716 template<typename Value>
718  __TBB_ASSERT( my_item, "attempt to increment iterator past end of queue" );
719  size_t k = my_rep->head_counter;
720  const concurrent_queue_base_v3<Value>& queue = my_rep->my_queue;
721 #if TBB_USE_ASSERT
722  Value* tmp;
723  my_rep->get_item(tmp,k);
724  __TBB_ASSERT( my_item==tmp, NULL );
725 #endif /* TBB_USE_ASSERT */
727  if( i==queue.my_rep->items_per_page-1 ) {
729  root = root->next;
730  }
731  // advance k
732  my_rep->head_counter = ++k;
733  if( !my_rep->get_item(my_item, k) ) advance();
734 }
735 
737 
738 template<typename T> struct tbb_remove_cv {typedef T type;};
739 template<typename T> struct tbb_remove_cv<const T> {typedef T type;};
740 template<typename T> struct tbb_remove_cv<volatile T> {typedef T type;};
741 template<typename T> struct tbb_remove_cv<const volatile T> {typedef T type;};
742 
744 
746 template<typename Container, typename Value>
747 class concurrent_queue_iterator: public concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>,
748  public std::iterator<std::forward_iterator_tag,Value> {
749 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
750  template<typename T, class A>
751  friend class ::tbb::strict_ppl::concurrent_queue;
752 #else
753 public:
754 #endif
757  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(queue)
758  {
759  }
760 
761 public:
763 
767  concurrent_queue_iterator_base_v3<typename tbb_remove_cv<Value>::type>(other)
768  {}
769 
772  this->assign(other);
773  return *this;
774  }
775 
777  Value& operator*() const {
778  return *static_cast<Value*>(this->my_item);
779  }
780 
781  Value* operator->() const {return &operator*();}
782 
785  this->advance();
786  return *this;
787  }
788 
790  Value* operator++(int) {
791  Value* result = &operator*();
792  operator++();
793  return result;
794  }
795 }; // concurrent_queue_iterator
796 
797 
798 template<typename C, typename T, typename U>
800  return i.my_item==j.my_item;
801 }
802 
803 template<typename C, typename T, typename U>
805  return i.my_item!=j.my_item;
806 }
807 
808 } // namespace internal
809 
811 
812 } // namespace strict_ppl
813 
815 namespace internal {
816 
820 template<typename Container, typename Value> class concurrent_queue_iterator;
821 
823 
826 private:
829 
830  friend class concurrent_queue_rep;
831  friend struct micro_queue;
835 protected:
837  struct page {
839  uintptr_t mask;
840  };
841 
843  ptrdiff_t my_capacity;
844 
847 
849  size_t item_size;
850 
851  enum copy_specifics { copy, move };
852 
853 #if __TBB_PROTECTED_NESTED_CLASS_BROKEN
854 public:
855 #endif
856  template<typename T>
857  struct padded_page: page {
859  padded_page();
861  void operator=( const padded_page& );
863  T last;
864  };
865 
866 private:
867  virtual void copy_item( page& dst, size_t index, const void* src ) = 0;
868  virtual void assign_and_destroy_item( void* dst, page& src, size_t index ) = 0;
869 protected:
872 
874  void __TBB_EXPORTED_METHOD internal_push( const void* src );
875 
877  void __TBB_EXPORTED_METHOD internal_pop( void* dst );
878 
880  void __TBB_EXPORTED_METHOD internal_abort();
881 
883  bool __TBB_EXPORTED_METHOD internal_push_if_not_full( const void* src );
884 
886 
887  bool __TBB_EXPORTED_METHOD internal_pop_if_present( void* dst );
888 
890  ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const;
891 
893  bool __TBB_EXPORTED_METHOD internal_empty() const;
894 
896  void __TBB_EXPORTED_METHOD internal_set_capacity( ptrdiff_t capacity, size_t element_size );
897 
899  virtual page *allocate_page() = 0;
900 
902  virtual void deallocate_page( page *p ) = 0;
903 
905  /* note that the name may be misleading, but it remains so due to a historical accident. */
906  void __TBB_EXPORTED_METHOD internal_finish_clear() ;
907 
909  void __TBB_EXPORTED_METHOD internal_throw_exception() const;
910 
912  void __TBB_EXPORTED_METHOD assign( const concurrent_queue_base_v3& src ) ;
913 
914 #if __TBB_CPP11_RVALUE_REF_PRESENT
915  void internal_swap( concurrent_queue_base_v3& src ) {
917  std::swap( my_capacity, src.my_capacity );
918  std::swap( items_per_page, src.items_per_page );
919  std::swap( item_size, src.item_size );
920  std::swap( my_rep, src.my_rep );
921  }
922 #endif /* __TBB_CPP11_RVALUE_REF_PRESENT */
923 
925  void internal_insert_item( const void* src, copy_specifics op_type );
926 
928  bool internal_insert_if_not_full( const void* src, copy_specifics op_type );
929 
931  void internal_assign( const concurrent_queue_base_v3& src, copy_specifics op_type );
932 private:
933  virtual void copy_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
934 };
935 
937 
940 protected:
941  concurrent_queue_base_v8( size_t item_sz ) : concurrent_queue_base_v3( item_sz ) {}
942 
944  void __TBB_EXPORTED_METHOD move_content( concurrent_queue_base_v8& src ) ;
945 
947  bool __TBB_EXPORTED_METHOD internal_push_move_if_not_full( const void* src );
948 
950  void __TBB_EXPORTED_METHOD internal_push_move( const void* src );
951 private:
952  friend struct micro_queue;
953  virtual void move_page_item( page& dst, size_t dindex, const page& src, size_t sindex ) = 0;
954  virtual void move_item( page& dst, size_t index, const void* src ) = 0;
955 };
956 
958 
961 
963 
964  template<typename C, typename T, typename U>
966 
967  template<typename C, typename T, typename U>
969 
970  void initialize( const concurrent_queue_base_v3& queue, size_t offset_of_data );
971 protected:
973  void* my_item;
974 
976  concurrent_queue_iterator_base_v3() : my_rep(NULL), my_item(NULL) {}
977 
979  concurrent_queue_iterator_base_v3( const concurrent_queue_iterator_base_v3& i ) : my_rep(NULL), my_item(NULL) {
980  assign(i);
981  }
982 
984 
986 
988  __TBB_EXPORTED_METHOD concurrent_queue_iterator_base_v3( const concurrent_queue_base_v3& queue, size_t offset_of_data );
989 
991  void __TBB_EXPORTED_METHOD assign( const concurrent_queue_iterator_base_v3& i );
992 
994  void __TBB_EXPORTED_METHOD advance();
995 
997  __TBB_EXPORTED_METHOD ~concurrent_queue_iterator_base_v3();
998 };
999 
1001 
1003 
1005 template<typename Container, typename Value>
1006 class concurrent_queue_iterator: public concurrent_queue_iterator_base,
1007  public std::iterator<std::forward_iterator_tag,Value> {
1008 
1009 #if !__TBB_TEMPLATE_FRIENDS_BROKEN
1010  template<typename T, class A>
1011  friend class ::tbb::concurrent_bounded_queue;
1012 #else
1013 public:
1014 #endif
1015 
1019  {
1020  }
1021 
1022 public:
1024 
1029  {}
1030 
1033  assign(other);
1034  return *this;
1035  }
1036 
1038  Value& operator*() const {
1039  return *static_cast<Value*>(my_item);
1040  }
1041 
1042  Value* operator->() const {return &operator*();}
1043 
1046  advance();
1047  return *this;
1048  }
1049 
1051  Value* operator++(int) {
1052  Value* result = &operator*();
1053  operator++();
1054  return result;
1055  }
1056 }; // concurrent_queue_iterator
1057 
1058 
1059 template<typename C, typename T, typename U>
1061  return i.my_item==j.my_item;
1062 }
1063 
1064 template<typename C, typename T, typename U>
1066  return i.my_item!=j.my_item;
1067 }
1068 
1069 } // namespace internal;
1070 
1072 
1073 } // namespace tbb
1074 
1075 #endif /* __TBB__concurrent_queue_impl_H */
Class that implements exponential backoff.
Definition: tbb_machine.h:352
concurrent_queue_rep< T > * my_rep
Internal representation.
bool operator==(const cache_aligned_allocator< T > &, const cache_aligned_allocator< U > &)
#define __TBB_override
Definition: tbb_stddef.h:244
static size_t index(ticket k)
Map ticket to an array index.
Constness-independent portion of concurrent_queue_iterator.
void __TBB_EXPORTED_METHOD internal_finish_clear()
free any remaining pages
Meets requirements of a forward iterator for STL.
void assign_and_destroy_item(void *dst, page &src, size_t index)
size_t __TBB_EXPORTED_FUNC NFS_GetLineSize()
Cache/sector line size.
argument_integer_type modulo_power_of_two(argument_integer_type arg, divisor_integer_type divisor)
A function to compute arg modulo divisor where divisor is a power of 2.
Definition: tbb_stddef.h:365
ptrdiff_t __TBB_EXPORTED_METHOD internal_size() const
Get size of queue.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
Type-independent portion of concurrent_queue_iterator.
void itt_store_word_with_release(tbb::atomic< T > &dst, U src)
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
micro_queue< T >::item_constructor_t item_constructor_t
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_iterator_base_v3 &i)
Assignment.
Class used to ensure exception-safety of method "pop".
concurrent_queue_iterator_rep< Value > * my_rep
Represents concurrent_queue over which we are iterating.
Meets "allocator" requirements of ISO C++ Standard, Section 20.1.5.
parts of concurrent_queue_rep that do not have references to micro_queue
virtual void deallocate_page(concurrent_queue_rep_base::page *p) __TBB_override
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long ITT_FORMAT lu const __itt_domain __itt_id __itt_string_handle __itt_metadata_type type
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
void swap(atomic< T > &lhs, atomic< T > &rhs)
Definition: atomic.h:539
#define __TBB_TRY
Definition: tbb_stddef.h:287
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:270
Value & operator*() const
Reference to current item.
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Abstract class to define interface for page allocation/deallocation.
void const char const char int ITT_FORMAT __itt_group_sync p
Value & operator*() const
Reference to current item.
Internal representation of a ConcurrentQueue.
bool pop(void *dst, ticket k, concurrent_queue_base &base)
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
A queue using simple locking.
auto last(Container &c) -> decltype(begin(c))
concurrent_queue_iterator & operator++()
Advance to next item in queue.
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
Meets requirements of a forward iterator for STL.
#define __TBB_CATCH(e)
Definition: tbb_stddef.h:288
concurrent_queue_iterator & operator=(const concurrent_queue_iterator &other)
Iterator assignment.
concurrent_queue_rep_base::page page
#define __TBB_EXPORTED_METHOD
Definition: tbb_stddef.h:102
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
void __TBB_EXPORTED_METHOD assign(const concurrent_queue_base_v3 &src)
copy internal representation
concurrent_queue_iterator & operator++()
Advance to next item in queue.
#define __TBB_compiler_fence()
Definition: icc_generic.h:55
void __TBB_EXPORTED_METHOD advance()
Advance iterator one step towards tail of queue.
value_type compare_and_swap(value_type value, value_type comparand)
Definition: atomic.h:289
void push(const void *item, ticket k, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
bool get_item(void *&item, size_t k)
Set item to point to kth element. Return true if at end of queue or item is marked valid; false other...
The graph class.
concurrent_queue_rep * my_rep
Internal representation.
void spin_wait_while_eq(const volatile T &location, U value)
Spin WHILE the value of the variable is equal to a given value.
Definition: tbb_machine.h:398
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain ITT_FORMAT p const __itt_domain __itt_string_handle unsigned long long value
void call_itt_notify(notify_type, void *)
concurrent_queue_iterator(const concurrent_queue_base_v3 &queue)
Construct iterator pointing to head of queue.
void copy_item(page &dst, size_t dindex, const page &src, size_t sindex, item_constructor_t construct_item)
concurrent_queue_iterator_base_v3(const concurrent_queue_iterator_base_v3 &i)
Copy constructor.
bool __TBB_EXPORTED_METHOD internal_empty() const
Check if the queue is emtpy.
static T & get_ref(page &p, size_t index)
micro_queue & assign(const micro_queue &src, concurrent_queue_base &base, concurrent_queue_base::copy_specifics op_type)
void move(tbb_thread &t1, tbb_thread &t2)
Definition: tbb_thread.h:309
void internal_push(const void *src, item_constructor_t construct_item)
Enqueue item at tail of queue.
bool operator!=(const cache_aligned_allocator< T > &, const cache_aligned_allocator< U > &)
void spin_wait_until_eq(const volatile T &location, const U value)
Spin UNTIL the value of the variable is equal to a given value.
Definition: tbb_machine.h:406
const size_t NFS_MaxLineSize
Compile-time constant that is upper bound on cache line/sector size.
Definition: tbb_stddef.h:220
void itt_hide_store_word(T &dst, T src)
__TBB_EXPORTED_METHOD concurrent_queue_base_v3(size_t item_size)
ptrdiff_t my_capacity
Capacity of the queue.
concurrent_queue_iterator_rep * my_rep
concurrent_queue over which we are iterating.
virtual concurrent_queue_rep_base::page * allocate_page()=0
concurrent_queue_iterator_rep(const concurrent_queue_base_v3< T > &queue)
representation of concurrent_queue_base
micro_queue_pop_finalizer(micro_queue< T > &queue, concurrent_queue_base_v3< T > &b, ticket k, page *p)
page * make_copy(concurrent_queue_base &base, const page *src_page, size_t begin_in_page, size_t end_in_page, ticket &g_index, concurrent_queue_base::copy_specifics op_type)
bool is_valid_page(const concurrent_queue_rep_base::page *p)
A lock that occupies a single byte.
Definition: spin_mutex.h:40
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
concurrent_queue_iterator(const concurrent_queue_iterator< Container, typename Container::value_type > &other)
Identifiers declared inside namespace internal should never be used directly by client code...
Definition: atomic.h:55
static size_t index(ticket k)
Map ticket to an array index.
concurrent_queue_iterator & operator=(const concurrent_queue_iterator &other)
Iterator assignment.
#define __TBB_RETHROW()
Definition: tbb_stddef.h:290
void pause()
Pause for a while.
Definition: tbb_machine.h:367
atomic< size_t > n_invalid_entries
number of invalid entries in the queue
void copy_item(page &dst, size_t dindex, const void *src, item_constructor_t construct_item)
concurrent_queue_iterator_base_v3 concurrent_queue_iterator_base

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.