Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
concurrent_monitor.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_concurrent_monitor_H
22 #define __TBB_concurrent_monitor_H
23 
24 #include "tbb/tbb_stddef.h"
25 #include "tbb/atomic.h"
26 #include "tbb/spin_mutex.h"
27 #include "tbb/tbb_exception.h"
28 #include "tbb/aligned_space.h"
29 
30 #include "semaphore.h"
31 
32 namespace tbb {
33 namespace internal {
34 
36 
38 public:
39  struct node_t {
42  explicit node_t() : next((node_t*)(uintptr_t)0xcdcdcdcd), prev((node_t*)(uintptr_t)0xcdcdcdcd) {}
43  };
44 
45  // ctor
47  // dtor
49 
50  inline size_t size() const {return count;}
51  inline bool empty() const {return size()==0;}
52  inline node_t* front() const {return head.next;}
53  inline node_t* last() const {return head.prev;}
54  inline node_t* begin() const {return front();}
55  inline const node_t* end() const {return &head;}
56 
58  inline void add( node_t* n ) {
60  n->prev = head.prev;
61  n->next = &head;
62  head.prev->next = n;
63  head.prev = n;
64  }
65 
67  inline void remove( node_t& n ) {
68  __TBB_ASSERT( count > 0, "attempt to remove an item from an empty list" );
70  n.prev->next = n.next;
71  n.next->prev = n.prev;
72  }
73 
76  if( const size_t l_count = __TBB_load_relaxed(count) ) {
77  __TBB_store_relaxed(lst.count, l_count);
78  lst.head.next = head.next;
79  lst.head.prev = head.prev;
80  head.next->prev = &lst.head;
81  head.prev->next = &lst.head;
82  clear();
83  }
84  }
85 
87 private:
90 };
91 
94 
96 
98 public:
100  class thread_context : waitset_node_t, no_copy {
101  friend class concurrent_monitor;
102  public:
103  thread_context() : skipped_wakeup(false), aborted(false), ready(false), context(0) {
104  epoch = 0;
105  in_waitset = false;
106  }
108  if (ready) {
109  if( skipped_wakeup ) semaphore().P();
110  semaphore().~binary_semaphore();
111  }
112  }
113  binary_semaphore& semaphore() { return *sema.begin(); }
114  private:
116  // Inlining of the method is undesirable, due to extra instructions for
117  // exception support added at caller side.
118  __TBB_NOINLINE( void init() );
120  __TBB_atomic unsigned epoch;
123  bool aborted;
124  bool ready;
125  uintptr_t context;
126  };
127 
130 
132  ~concurrent_monitor() ;
133 
135  void prepare_wait( thread_context& thr, uintptr_t ctx = 0 );
136 
138 
139  inline bool commit_wait( thread_context& thr ) {
140  const bool do_it = thr.epoch == __TBB_load_relaxed(epoch);
141  // this check is just an optimization
142  if( do_it ) {
143  __TBB_ASSERT( thr.ready, "use of commit_wait() without prior prepare_wait()");
144  thr.semaphore().P();
145  __TBB_ASSERT( !thr.in_waitset, "still in the queue?" );
146  if( thr.aborted )
148  } else {
149  cancel_wait( thr );
150  }
151  return do_it;
152  }
154  void cancel_wait( thread_context& thr );
155 
157  template<typename WaitUntil, typename Context>
158  void wait( WaitUntil until, Context on );
159 
161  void notify_one() {atomic_fence(); notify_one_relaxed();}
162 
164  void notify_one_relaxed();
165 
167  void notify_all() {atomic_fence(); notify_all_relaxed();}
168 
170  void notify_all_relaxed();
171 
173  template<typename P> void notify( const P& predicate ) {atomic_fence(); notify_relaxed( predicate );}
174 
176  template<typename P> void notify_relaxed( const P& predicate );
177 
179  void abort_all() {atomic_fence(); abort_all_relaxed(); }
180 
182  void abort_all_relaxed();
183 
184 private:
186  waitset_t waitset_ec;
187  __TBB_atomic unsigned epoch;
188  thread_context* to_thread_context( waitset_node_t* n ) { return static_cast<thread_context*>(n); }
189 };
190 
191 template<typename WaitUntil, typename Context>
192 void concurrent_monitor::wait( WaitUntil until, Context on )
193 {
194  bool slept = false;
195  thread_context thr_ctx;
196  prepare_wait( thr_ctx, on() );
197  while( !until() ) {
198  if( (slept = commit_wait( thr_ctx ) )==true )
199  if( until() ) break;
200  slept = false;
201  prepare_wait( thr_ctx, on() );
202  }
203  if( !slept )
204  cancel_wait( thr_ctx );
205 }
206 
207 template<typename P>
208 void concurrent_monitor::notify_relaxed( const P& predicate ) {
209  if( waitset_ec.empty() )
210  return;
211  waitset_t temp;
212  waitset_node_t* nxt;
213  const waitset_node_t* end = waitset_ec.end();
214  {
215  tbb::spin_mutex::scoped_lock l( mutex_ec );
216  __TBB_store_relaxed(epoch, __TBB_load_relaxed(epoch) + 1);
217  for( waitset_node_t* n=waitset_ec.last(); n!=end; n=nxt ) {
218  nxt = n->prev;
219  thread_context* thr = to_thread_context( n );
220  if( predicate( thr->context ) ) {
221  waitset_ec.remove( *n );
222  thr->in_waitset = false;
223  temp.add( n );
224  }
225  }
226  }
227 
228  end = temp.end();
229  for( waitset_node_t* n=temp.front(); n!=end; n=nxt ) {
230  nxt = n->next;
231  to_thread_context(n)->semaphore().V();
232  }
233 #if TBB_USE_ASSERT
234  temp.clear();
235 #endif
236 }
237 
238 } // namespace internal
239 } // namespace tbb
240 
241 #endif /* __TBB_concurrent_monitor_H */
tbb::aligned_space< binary_semaphore > sema
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
void notify(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate.
void add(node_t *n)
add to the back of the list
binary_semaphore for concurrent monitor
Definition: semaphore.h:226
Circular doubly-linked list with sentinel.
Block of space aligned sufficiently to construct an array T with N elements.
Definition: aligned_space.h:33
circular_doubly_linked_list_with_sentinel::node_t waitset_node_t
Represents acquisition of a mutex.
Definition: spin_mutex.h:54
void P()
wait/acquire
Definition: semaphore.h:239
Edsger Dijkstra&#39;s counting semaphore.
Definition: semaphore.h:98
bool commit_wait(thread_context &thr)
Commit wait if event count has not changed; otherwise, cancel wait.
void atomic_fence()
Sequentially consistent full memory fence.
Definition: tbb_machine.h:346
void throw_exception(exception_id eid)
Versionless convenience wrapper for throw_exception_v4()
void __TBB_store_relaxed(volatile T &location, V value)
Definition: tbb_machine.h:746
thread_context * to_thread_context(waitset_node_t *n)
void abort_all()
Abort any sleeping threads at the time of the call.
The graph class.
void wait(WaitUntil until, Context on)
Wait for a condition to be satisfied with waiting-on context.
void notify_one()
Notify one thread about the event.
void notify_relaxed(const P &predicate)
Notify waiting threads of the event that satisfies the given predicate; Relaxed version.
void flush_to(circular_doubly_linked_list_with_sentinel &lst)
move all elements to &#39;lst&#39; and initialize the &#39;this&#39; list
#define __TBB_NOINLINE(decl)
Definition: tbb_stddef.h:110
T __TBB_load_relaxed(const volatile T &location)
Definition: tbb_machine.h:742
#define __TBB_atomic
Definition: tbb_stddef.h:241
void notify_all()
Notify all waiting threads of the event.
circular_doubly_linked_list_with_sentinel waitset_t
A lock that occupies a single byte.
Definition: spin_mutex.h:40
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void P()
wait/acquire
Definition: semaphore.h:109

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.