Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
market.cpp
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #include "tbb/tbb_stddef.h"
22 #include "tbb/global_control.h" // global_control::active_value
23 
24 #include "market.h"
25 #include "tbb_main.h"
26 #include "governor.h"
27 #include "scheduler.h"
28 #include "itt_notify.h"
29 
30 namespace tbb {
31 namespace internal {
32 
34 #if __TBB_TASK_PRIORITY
35  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
36  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
37 #else /* !__TBB_TASK_PRIORITY */
38  arena_list_type &arenas = my_arenas;
39  arena *&next = my_next_arena;
40 #endif /* !__TBB_TASK_PRIORITY */
41  arenas.push_front( a );
42  if ( arenas.size() == 1 )
43  next = &*arenas.begin();
44 }
45 
47 #if __TBB_TASK_PRIORITY
48  arena_list_type &arenas = my_priority_levels[a.my_top_priority].arenas;
49  arena *&next = my_priority_levels[a.my_top_priority].next_arena;
50 #else /* !__TBB_TASK_PRIORITY */
51  arena_list_type &arenas = my_arenas;
52  arena *&next = my_next_arena;
53 #endif /* !__TBB_TASK_PRIORITY */
54  arena_list_type::iterator it = next;
55  __TBB_ASSERT( it != arenas.end(), NULL );
56  if ( next == &a ) {
57  if ( ++it == arenas.end() && arenas.size() > 1 )
58  it = arenas.begin();
59  next = &*it;
60  }
61  arenas.remove( a );
62 }
63 
64 //------------------------------------------------------------------------
65 // market
66 //------------------------------------------------------------------------
67 
68 market::market ( unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size )
69  : my_num_workers_hard_limit(workers_hard_limit)
70  , my_num_workers_soft_limit(workers_soft_limit)
72  , my_global_top_priority(normalized_normal_priority)
73  , my_global_bottom_priority(normalized_normal_priority)
74 #endif /* __TBB_TASK_PRIORITY */
75  , my_ref_count(1)
76  , my_stack_size(stack_size)
77  , my_workers_soft_limit_to_report(workers_soft_limit)
78 {
79 #if __TBB_TASK_PRIORITY
80  __TBB_ASSERT( my_global_reload_epoch == 0, NULL );
81  my_priority_levels[normalized_normal_priority].workers_available = my_num_workers_soft_limit;
82 #endif /* __TBB_TASK_PRIORITY */
83 
84  // Once created RML server will start initializing workers that will need
85  // global market instance to get worker stack size
87  __TBB_ASSERT( my_server, "Failed to create RML server" );
88 }
89 
90 static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit) {
91  if( int soft_limit = market::app_parallelism_limit() )
92  workers_soft_limit = soft_limit-1;
93  else // if user set no limits (yet), use market's parameter
94  workers_soft_limit = max( governor::default_num_threads() - 1, workers_soft_limit );
95  if( workers_soft_limit >= workers_hard_limit )
96  workers_soft_limit = workers_hard_limit-1;
97  return workers_soft_limit;
98 }
99 
100 market& market::global_market ( bool is_public, unsigned workers_requested, size_t stack_size ) {
101  global_market_mutex_type::scoped_lock lock( theMarketMutex );
102  market *m = theMarket;
103  if( m ) {
104  ++m->my_ref_count;
105  const unsigned old_public_count = is_public? m->my_public_ref_count++ : /*any non-zero value*/1;
106  lock.release();
107  if( old_public_count==0 )
109 
110  // do not warn if default number of workers is requested
111  if( workers_requested != governor::default_num_threads()-1 ) {
112  __TBB_ASSERT( skip_soft_limit_warning > workers_requested,
113  "skip_soft_limit_warning must be larger than any valid workers_requested" );
114  unsigned soft_limit_to_report = m->my_workers_soft_limit_to_report;
115  if( soft_limit_to_report < workers_requested ) {
116  runtime_warning( "The number of workers is currently limited to %u. "
117  "The request for %u workers is ignored. Further requests for more workers "
118  "will be silently ignored until the limit changes.\n",
119  soft_limit_to_report, workers_requested );
120  // The race is possible when multiple threads report warnings.
121  // We are OK with that, as there are just multiple warnings.
123  compare_and_swap(skip_soft_limit_warning, soft_limit_to_report);
124  }
125 
126  }
127  if( m->my_stack_size < stack_size )
128  runtime_warning( "Thread stack size has been already set to %u. "
129  "The request for larger stack (%u) cannot be satisfied.\n",
130  m->my_stack_size, stack_size );
131  }
132  else {
133  // TODO: A lot is done under theMarketMutex locked. Can anything be moved out?
134  if( stack_size == 0 )
136  // Expecting that 4P is suitable for most applications.
137  // Limit to 2P for large thread number.
138  // TODO: ask RML for max concurrency and possibly correct hard_limit
139  const unsigned factor = governor::default_num_threads()<=128? 4 : 2;
140  // The requested number of threads is intentionally not considered in
141  // computation of the hard limit, in order to separate responsibilities
142  // and avoid complicated interactions between global_control and task_scheduler_init.
143  // The market guarantees that at least 256 threads might be created.
144  const unsigned workers_hard_limit = max(max(factor*governor::default_num_threads(), 256u), app_parallelism_limit());
145  const unsigned workers_soft_limit = calc_workers_soft_limit(workers_requested, workers_hard_limit);
146  // Create the global market instance
147  size_t size = sizeof(market);
148 #if __TBB_TASK_GROUP_CONTEXT
149  __TBB_ASSERT( __TBB_offsetof(market, my_workers) + sizeof(generic_scheduler*) == sizeof(market),
150  "my_workers must be the last data field of the market class");
151  size += sizeof(generic_scheduler*) * (workers_hard_limit - 1);
152 #endif /* __TBB_TASK_GROUP_CONTEXT */
154  void* storage = NFS_Allocate(1, size, NULL);
155  memset( storage, 0, size );
156  // Initialize and publish global market
157  m = new (storage) market( workers_soft_limit, workers_hard_limit, stack_size );
158  if( is_public )
159  m->my_public_ref_count = 1;
160  theMarket = m;
161  // This check relies on the fact that for shared RML default_concurrency==max_concurrency
162  if ( !governor::UsePrivateRML && m->my_server->default_concurrency() < workers_soft_limit )
163  runtime_warning( "RML might limit the number of workers to %u while %u is requested.\n"
164  , m->my_server->default_concurrency(), workers_soft_limit );
165  }
166  return *m;
167 }
168 
170 #if __TBB_COUNT_TASK_NODES
171  if ( my_task_node_count )
172  runtime_warning( "Leaked %ld task objects\n", (long)my_task_node_count );
173 #endif /* __TBB_COUNT_TASK_NODES */
174  this->market::~market(); // qualified to suppress warning
175  NFS_Free( this );
177 }
178 
179 bool market::release ( bool is_public, bool blocking_terminate ) {
180  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
181  bool do_release = false;
182  {
183  global_market_mutex_type::scoped_lock lock( theMarketMutex );
184  if ( blocking_terminate ) {
185  __TBB_ASSERT( is_public, "Only an object with a public reference can request the blocking terminate" );
186  while ( my_public_ref_count == 1 && my_ref_count > 1 ) {
187  lock.release();
188  // To guarantee that request_close_connection() is called by the last master, we need to wait till all
189  // references are released. Re-read my_public_ref_count to limit waiting if new masters are created.
190  // Theoretically, new private references to the market can be added during waiting making it potentially
191  // endless.
192  // TODO: revise why the weak scheduler needs market's pointer and try to remove this wait.
193  // Note that the market should know about its schedulers for cancelation/exception/priority propagation,
194  // see e.g. task_group_context::cancel_group_execution()
196  __TBB_Yield();
197  lock.acquire( theMarketMutex );
198  }
199  }
200  if ( is_public ) {
201  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
204  }
205  if ( --my_ref_count == 0 ) {
207  do_release = true;
208  theMarket = NULL;
209  }
210  }
211  if( do_release ) {
212  __TBB_ASSERT( !__TBB_load_with_acquire(my_public_ref_count), "No public references remain if we remove the market." );
213  // inform RML that blocking termination is required
214  my_join_workers = blocking_terminate;
215  my_server->request_close_connection();
216  return blocking_terminate;
217  }
218  return false;
219 }
220 
221 void market::set_active_num_workers ( unsigned soft_limit ) {
222  int old_requested=0, requested=0;
223  bool need_mandatory = false;
224  market *m;
225 
226  {
227  global_market_mutex_type::scoped_lock lock( theMarketMutex );
228  if ( !theMarket )
229  return; // actual value will be used at market creation
230  m = theMarket;
231  ++m->my_ref_count;
232  }
233  // have my_ref_count for market, use it safely
234  {
236  __TBB_ASSERT(soft_limit <= m->my_num_workers_hard_limit, NULL);
237  m->my_num_workers_soft_limit = soft_limit;
238  // report only once after new soft limit value is set
239  m->my_workers_soft_limit_to_report = soft_limit;
240 
241 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
242  // updates soft_limit to zero must be postponed
243  // while mandatory parallelism is enabled
244  if( !(m->my_mandatory_num_requested && !soft_limit) )
245 #endif
246  {
247  const int demand =
248 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
249  m->my_mandatory_num_requested? 0 :
250 #endif
251  m->my_total_demand;
252  requested = min(demand, (int)soft_limit);
253  old_requested = m->my_num_workers_requested;
254  m->my_num_workers_requested = requested;
255 #if __TBB_TASK_PRIORITY
256  m->my_priority_levels[m->my_global_top_priority].workers_available = soft_limit;
257  m->update_allotment( m->my_global_top_priority );
258 #else
259  m->update_allotment();
260 #endif
261  }
262 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
263  if( !m->my_mandatory_num_requested && !soft_limit ) {
264  // enable mandatory concurrency, if enqueued tasks are found
265  // and zero soft_limit requested
266 #if __TBB_TASK_PRIORITY
267  for( int p = m->my_global_top_priority; p >= m->my_global_bottom_priority; --p ) {
268  priority_level_info &pl = m->my_priority_levels[p];
269  arena_list_type &arenas = pl.arenas;
270 #else
271  const int p = 0;
272  arena_list_type &arenas = m->my_arenas;
273 #endif /* __TBB_TASK_PRIORITY */
274  for( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it ) {
275  if( !it->my_task_stream.empty(p) ) {
276  // switch local_mandatory to global_mandatory unconditionally
277  if( m->mandatory_concurrency_enable_impl( &*it ) )
278  need_mandatory = true;
279  }
280  }
281 #if __TBB_TASK_PRIORITY
282  }
283 #endif /* __TBB_TASK_PRIORITY */
284  }
285 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
286  }
287  // adjust_job_count_estimate must be called outside of any locks
288  int delta = requested - old_requested;
289  if( need_mandatory ) ++delta;
290  if( delta!=0 )
291  m->my_server->adjust_job_count_estimate( delta );
292  // release internal market reference to match ++m->my_ref_count above
293  m->release( /*is_public=*/false, /*blocking_terminate=*/false );
294 }
295 
296 bool governor::does_client_join_workers (const tbb::internal::rml::tbb_client &client) {
297  return ((const market&)client).must_join_workers();
298 }
299 
300 arena* market::create_arena ( int num_slots, int num_reserved_slots, size_t stack_size ) {
301  __TBB_ASSERT( num_slots > 0, NULL );
302  __TBB_ASSERT( num_reserved_slots <= num_slots, NULL );
303  // Add public market reference for master thread/task_arena (that adds an internal reference in exchange).
304  market &m = global_market( /*is_public=*/true, num_slots-num_reserved_slots, stack_size );
305 
306  arena& a = arena::allocate_arena( m, num_slots, num_reserved_slots );
307  // Add newly created arena into the existing market's list.
310  return &a;
311 }
312 
315  __TBB_ASSERT( theMarket == this, "Global market instance was destroyed prematurely?" );
316  __TBB_ASSERT( !a.my_slots[0].my_scheduler, NULL );
320 }
321 
322 void market::try_destroy_arena ( arena* a, uintptr_t aba_epoch ) {
323  bool locked = true;
324  __TBB_ASSERT( a, NULL );
325  // we hold reference to the market, so it cannot be destroyed at any moment here
326  __TBB_ASSERT( this == theMarket, NULL );
327  __TBB_ASSERT( my_ref_count!=0, NULL );
330 #if __TBB_TASK_PRIORITY
331  // scan all priority levels, not only in [my_global_bottom_priority;my_global_top_priority]
332  // range, because arena to be destroyed can have no outstanding request for workers
333  for ( int p = num_priority_levels-1; p >= 0; --p ) {
334  priority_level_info &pl = my_priority_levels[p];
335  arena_list_type &my_arenas = pl.arenas;
336 #endif /* __TBB_TASK_PRIORITY */
337  arena_list_type::iterator it = my_arenas.begin();
338  for ( ; it != my_arenas.end(); ++it ) {
339  if ( a == &*it ) {
340  if ( it->my_aba_epoch == aba_epoch ) {
341  // Arena is alive
342  if ( !a->my_num_workers_requested && !a->my_references ) {
343  __TBB_ASSERT( !a->my_num_workers_allotted && (a->my_pool_state == arena::SNAPSHOT_EMPTY || !a->my_max_num_workers), "Inconsistent arena state" );
344  // Arena is abandoned. Destroy it.
345  detach_arena( *a );
347  locked = false;
348  a->free_arena();
349  }
350  }
351  if (locked)
353  return;
354  }
355  }
356 #if __TBB_TASK_PRIORITY
357  }
358 #endif /* __TBB_TASK_PRIORITY */
360 }
361 
364  if ( arenas.empty() )
365  return NULL;
366  arena_list_type::iterator it = hint;
367  __TBB_ASSERT( it != arenas.end(), NULL );
368  do {
369  arena& a = *it;
370  if ( ++it == arenas.end() )
371  it = arenas.begin();
373 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
374  && !a.recall_by_mandatory_request()
375 #endif
376  ) {
378  return &a;
379  }
380  } while ( it != hint );
381  return NULL;
382 }
383 
384 int market::update_allotment ( arena_list_type& arenas, int workers_demand, int max_workers ) {
385  __TBB_ASSERT( workers_demand, NULL );
386  max_workers = min(workers_demand, max_workers);
387  int carry = 0;
388  int assigned = 0;
389  arena_list_type::iterator it = arenas.begin();
390  for ( ; it != arenas.end(); ++it ) {
391  arena& a = *it;
392  if ( a.my_num_workers_requested <= 0 ) {
394  continue;
395  }
396  int tmp = a.my_num_workers_requested * max_workers + carry;
397  int allotted = tmp / workers_demand;
398  carry = tmp % workers_demand;
399  // a.my_num_workers_requested may temporarily exceed a.my_max_num_workers
400  allotted = min( allotted, (int)a.my_max_num_workers );
401 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
402  if ( !allotted && a.must_have_concurrency() )
403  allotted = 1;
404 #endif
405  a.my_num_workers_allotted = allotted;
406  assigned += allotted;
407  }
408 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
409  __TBB_ASSERT( assigned <= workers_demand, NULL ); // weaker assertion due to enforced allotment
410 #else
411  __TBB_ASSERT( assigned <= max_workers, NULL );
412 #endif
413  return assigned;
414 }
415 
418  if ( a ) {
419  for ( arena_list_type::iterator it = arenas.begin(); it != arenas.end(); ++it )
420  if ( a == &*it )
421  return true;
422  }
423  return false;
424 }
425 
426 #if __TBB_TASK_PRIORITY
427 inline void market::update_global_top_priority ( intptr_t newPriority ) {
428  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.market_prio_switches );
429  my_global_top_priority = newPriority;
430  my_priority_levels[newPriority].workers_available =
431 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
432  my_mandatory_num_requested && !my_num_workers_soft_limit ? 1 :
433 #endif
435  advance_global_reload_epoch();
436 }
437 
438 inline void market::reset_global_priority () {
439  my_global_bottom_priority = normalized_normal_priority;
440  update_global_top_priority(normalized_normal_priority);
441 }
442 
443 arena* market::arena_in_need ( arena* prev_arena ) {
444  if( as_atomic(my_total_demand) <= 0 )
445  return NULL;
448  int p = my_global_top_priority;
449  arena *a = NULL;
450 
451  // Checks if arena is alive or not
452  if ( is_arena_in_list( my_priority_levels[p].arenas, prev_arena ) ) {
453  a = arena_in_need( my_priority_levels[p].arenas, prev_arena );
454  }
455 
456  while ( !a && p >= my_global_bottom_priority ) {
457  priority_level_info &pl = my_priority_levels[p--];
458  a = arena_in_need( pl.arenas, pl.next_arena );
459  if ( a ) {
460  as_atomic(pl.next_arena) = a; // a subject for innocent data race under the reader lock
461  // TODO: rework global round robin policy to local or random to avoid this write
462  }
463  // TODO: When refactoring task priority code, take into consideration the
464  // __TBB_TRACK_PRIORITY_LEVEL_SATURATION sections from earlier versions of TBB
465  }
466  return a;
467 }
468 
469 void market::update_allotment ( intptr_t highest_affected_priority ) {
470  intptr_t i = highest_affected_priority;
471  int available = my_priority_levels[i].workers_available;
472  for ( ; i >= my_global_bottom_priority; --i ) {
473  priority_level_info &pl = my_priority_levels[i];
474  pl.workers_available = available;
475  if ( pl.workers_requested ) {
476  available -= update_allotment( pl.arenas, pl.workers_requested, available );
477  if ( available < 0 ) { // TODO: assertion?
478  available = 0;
479  break;
480  }
481  }
482  }
483  __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
484  for ( --i; i >= my_global_bottom_priority; --i ) {
485  priority_level_info &pl = my_priority_levels[i];
486  pl.workers_available = 0;
487  arena_list_type::iterator it = pl.arenas.begin();
488  for ( ; it != pl.arenas.end(); ++it ) {
489  __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
490 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
491  it->my_num_workers_allotted = it->must_have_concurrency() ? 1 : 0;
492 #else
493  it->my_num_workers_allotted = 0;
494 #endif
495  }
496  }
497 }
498 #endif /* __TBB_TASK_PRIORITY */
499 
500 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
501 bool market::mandatory_concurrency_enable_impl ( arena *a, bool *enabled ) {
502  if( a->my_concurrency_mode==arena_base::cm_enforced_global ) {
503  if( enabled )
504  *enabled = false;
505  return false;
506  }
507  if( enabled )
508  *enabled = true;
509  a->my_max_num_workers = 1;
510  a->my_concurrency_mode = arena_base::cm_enforced_global;
511 #if __TBB_TASK_PRIORITY
512  priority_level_info &pl = my_priority_levels[a->my_top_priority];
513  pl.workers_requested++;
514  if( my_global_top_priority < a->my_top_priority ) {
515  my_global_top_priority = a->my_top_priority;
516  advance_global_reload_epoch();
517  }
518 #endif
521  if( 1 == ++my_mandatory_num_requested ) {
522  my_total_demand++;
523  return true;
524  }
525  return false;
526 }
527 
528 bool market::mandatory_concurrency_enable ( arena *a ) {
529  bool add_thread;
530  bool enabled;
531  {
533  add_thread = mandatory_concurrency_enable_impl(a, &enabled);
534  }
535  if( add_thread )
536  my_server->adjust_job_count_estimate( 1 );
537  return enabled;
538 }
539 
540 void market::mandatory_concurrency_disable ( arena *a ) {
541  bool remove_thread = false;
542  int delta_adjust_demand = 0;
543 
544  {
546 
547  if( a->my_concurrency_mode!=arena_base::cm_enforced_global )
548  return;
549  __TBB_ASSERT( a->my_max_num_workers==1, NULL );
550  a->my_max_num_workers = 0;
551 #if __TBB_TASK_PRIORITY
552  if ( a->my_top_priority != normalized_normal_priority ) {
553  update_arena_top_priority( *a, normalized_normal_priority );
554  }
555  a->my_bottom_priority = normalized_normal_priority;
556 #endif
557 
558  int val = --my_mandatory_num_requested;
559  __TBB_ASSERT_EX( val >= 0, NULL );
560  if( val == 0 ) {
561  my_total_demand--;
562  remove_thread = true;
563  }
565  if (a->my_num_workers_requested > 0)
566  delta_adjust_demand = a->my_num_workers_requested;
567  else
569 
570 #if __TBB_TASK_PRIORITY
571  priority_level_info &pl = my_priority_levels[a->my_top_priority];
572  pl.workers_requested--;
573  intptr_t p = my_global_top_priority;
574  for (; !my_priority_levels[p].workers_requested && p>0; p--)
575  ;
576  if( !p )
577  reset_global_priority();
578  else if( p!= my_global_top_priority )
579  update_global_top_priority(p);
580 #endif
581  a->my_concurrency_mode = arena::cm_normal;
582  }
583  if( delta_adjust_demand )
584  adjust_demand( *a, -delta_adjust_demand );
585  if( remove_thread )
586  my_server->adjust_job_count_estimate( -1 );
587 }
588 #endif /* __TBB_ENQUEUE_ENFORCED_CONCURRENCY */
589 
590 void market::adjust_demand ( arena& a, int delta ) {
591  __TBB_ASSERT( theMarket, "market instance was destroyed prematurely?" );
592  if ( !delta )
593  return;
595  int prev_req = a.my_num_workers_requested;
596  a.my_num_workers_requested += delta;
597  if ( a.my_num_workers_requested <= 0 ) {
598 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
599  // must not recall worker from arena with mandatory parallelism
600  if ( a.my_market->my_mandatory_num_requested && a.my_concurrency_mode!=arena_base::cm_normal )
602  else
603 #endif
605  if ( prev_req <= 0 ) {
607  return;
608  }
609  delta = -prev_req;
610  }
611  else if ( prev_req < 0 ) {
612  delta = a.my_num_workers_requested;
613  }
614  my_total_demand += delta;
615 #if !__TBB_TASK_PRIORITY
617 #else /* !__TBB_TASK_PRIORITY */
618  intptr_t p = a.my_top_priority;
619  priority_level_info &pl = my_priority_levels[p];
620  pl.workers_requested += delta;
621  __TBB_ASSERT( pl.workers_requested >= 0, NULL );
622  if ( a.my_num_workers_requested <= 0 ) {
623  if ( a.my_top_priority != normalized_normal_priority ) {
624  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_resets );
625  update_arena_top_priority( a, normalized_normal_priority );
626  }
627  a.my_bottom_priority = normalized_normal_priority;
628  }
629  if ( p == my_global_top_priority ) {
630  if ( !pl.workers_requested ) {
631  while ( --p >= my_global_bottom_priority && !my_priority_levels[p].workers_requested )
632  continue;
633  if ( p < my_global_bottom_priority )
634  reset_global_priority();
635  else
636  update_global_top_priority(p);
637  }
638  update_allotment( my_global_top_priority );
639  }
640  else if ( p > my_global_top_priority ) {
641  __TBB_ASSERT( pl.workers_requested > 0, NULL );
642  // TODO: investigate if the following invariant is always valid
643  __TBB_ASSERT( a.my_num_workers_requested >= 0, NULL );
644  update_global_top_priority(p);
646 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
647  // must not recall worker from arena with mandatory parallelism
649  && a.my_market->my_mandatory_num_requested && a.my_concurrency_mode!=arena_base::cm_normal )
651 #endif
652  my_priority_levels[p - 1].workers_available = my_num_workers_soft_limit - a.my_num_workers_allotted;
653  update_allotment( p - 1 );
654  }
655  else if ( p == my_global_bottom_priority ) {
656  if ( !pl.workers_requested ) {
657  while ( ++p <= my_global_top_priority && !my_priority_levels[p].workers_requested )
658  continue;
659  if ( p > my_global_top_priority )
660  reset_global_priority();
661  else
662  my_global_bottom_priority = p;
663  }
664  else
665  update_allotment( p );
666  }
667  else if ( p < my_global_bottom_priority ) {
668  int prev_bottom = my_global_bottom_priority;
669  my_global_bottom_priority = p;
670  update_allotment( prev_bottom );
671  }
672  else {
673  __TBB_ASSERT( my_global_bottom_priority < p && p < my_global_top_priority, NULL );
674  update_allotment( p );
675  }
676  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested<=0, NULL );
678 #endif /* !__TBB_TASK_PRIORITY */
679  if ( delta > 0 ) {
680  // can't overflow soft_limit, but remember values request by arenas in
681  // my_total_demand to not prematurely release workers to RML
683  delta = my_num_workers_soft_limit - my_num_workers_requested;
684  } else {
685  // the number of workers should not be decreased below my_total_demand
688  }
689  my_num_workers_requested += delta;
691 
693  // Must be called outside of any locks
694  my_server->adjust_job_count_estimate( delta );
696 }
697 
698 void market::process( job& j ) {
699  generic_scheduler& s = static_cast<generic_scheduler&>(j);
700  // s.my_arena can be dead. Don't access it until arena_in_need is called
701  arena *a = s.my_arena;
702  __TBB_ASSERT( governor::is_set(&s), NULL );
703  enum {
704  query_interval = 1000,
705  first_interval = 1
706  };
707  for(int i = first_interval; ; i--) {
708  while ( (a = arena_in_need(a)) )
709  {
710  a->process(s);
711  a = NULL; // To avoid double checks in arena_in_need
712  i = first_interval;
713  }
714  // Workers leave market because there is no arena in need. It can happen earlier than
715  // adjust_job_count_estimate() decreases my_slack and RML can put this thread to sleep.
716  // It might result in a busy-loop checking for my_slack<0 and calling this method instantly.
717  // first_interval>0 and the yield refines this spinning.
718  if( i > 0 )
719  __TBB_Yield();
720  else
721 #if !__TBB_SLEEP_PERMISSION
722  break;
723 #else
724  { // i == 0
725 #if __TBB_TASK_PRIORITY
726  arena_list_type &al = my_priority_levels[my_global_top_priority].arenas;
727 #else /* __TBB_TASK_PRIORITY */
729 #endif /* __TBB_TASK_PRIORITY */
730  if( al.empty() ) // races if any are innocent TODO: replace by an RML query interface
731  break; // no arenas left, perhaps going to shut down
732  if( the_global_observer_list.ask_permission_to_leave() )
733  break; // go sleep
734  __TBB_Yield();
735  i = query_interval;
736  }
737 #endif// !__TBB_SLEEP_PERMISSION
738  }
739  GATHER_STATISTIC( ++s.my_counters.market_roundtrips );
740 }
741 
742 void market::cleanup( job& j ) {
743  __TBB_ASSERT( theMarket != this, NULL );
744  generic_scheduler& s = static_cast<generic_scheduler&>(j);
746  __TBB_ASSERT( !mine || mine->is_worker(), NULL );
747  if( mine!=&s ) {
749  generic_scheduler::cleanup_worker( &s, mine!=NULL );
751  } else {
753  }
754 }
755 
757  destroy();
758 }
759 
760 ::rml::job* market::create_one_job() {
761  unsigned index = ++my_first_unused_worker_idx;
762  __TBB_ASSERT( index > 0, NULL );
763  ITT_THREAD_SET_NAME(_T("TBB Worker Thread"));
764  // index serves as a hint decreasing conflicts between workers when they migrate between arenas
766 #if __TBB_TASK_GROUP_CONTEXT
767  __TBB_ASSERT( index <= my_num_workers_hard_limit, NULL );
768  __TBB_ASSERT( !my_workers[index - 1], NULL );
769  my_workers[index - 1] = s;
770 #endif /* __TBB_TASK_GROUP_CONTEXT */
771  return s;
772 }
773 
774 #if __TBB_TASK_PRIORITY
775 void market::update_arena_top_priority ( arena& a, intptr_t new_priority ) {
776  GATHER_STATISTIC( ++governor::local_scheduler_if_initialized()->my_counters.arena_prio_switches );
777  __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
778  priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
779  &new_level = my_priority_levels[new_priority];
781  a.my_top_priority = new_priority;
783  as_atomic( a.my_reload_epoch ).fetch_and_increment<tbb::release>(); // TODO: synch with global reload epoch in order to optimize usage of local reload epoch
784  prev_level.workers_requested -= a.my_num_workers_requested;
785  new_level.workers_requested += a.my_num_workers_requested;
786  __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
787 }
788 
789 bool market::lower_arena_priority ( arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
790  // TODO: replace the lock with a try_lock loop which performs a double check of the epoch
792  if ( a.my_reload_epoch != old_reload_epoch ) {
794  return false;
795  }
796  __TBB_ASSERT( a.my_top_priority > new_priority, NULL );
797  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
798 
799  intptr_t p = a.my_top_priority;
800  update_arena_top_priority( a, new_priority );
801  if ( a.my_num_workers_requested > 0 ) {
802  if ( my_global_bottom_priority > new_priority ) {
803  my_global_bottom_priority = new_priority;
804  }
805  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
806  // Global top level became empty
807  for ( --p; p>my_global_bottom_priority && !my_priority_levels[p].workers_requested; --p ) continue;
808  update_global_top_priority(p);
809  }
810  update_allotment( p );
811  }
812 
813  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
815  return true;
816 }
817 
818 bool market::update_arena_priority ( arena& a, intptr_t new_priority ) {
819  // TODO: do not acquire this global lock while checking arena's state.
821 
822  tbb::internal::assert_priority_valid(new_priority);
823  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority || a.my_num_workers_requested <= 0, NULL );
825  if ( a.my_top_priority == new_priority ) {
826  return false;
827  }
828  else if ( a.my_top_priority > new_priority ) {
829  if ( a.my_bottom_priority > new_priority )
830  a.my_bottom_priority = new_priority;
831  return false;
832  }
833  else if ( a.my_num_workers_requested <= 0 ) {
834  return false;
835  }
836 
837  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
838 
839  intptr_t p = a.my_top_priority;
840  intptr_t highest_affected_level = max(p, new_priority);
841  update_arena_top_priority( a, new_priority );
842 
843  if ( my_global_top_priority < new_priority ) {
844  update_global_top_priority(new_priority);
845  }
846  else if ( my_global_top_priority == new_priority ) {
847  advance_global_reload_epoch();
848  }
849  else {
850  __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
851  __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
852  if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
853  // Global top level became empty
854  __TBB_ASSERT( my_global_bottom_priority < p, NULL );
855  for ( --p; !my_priority_levels[p].workers_requested; --p ) continue;
856  __TBB_ASSERT( p >= new_priority, NULL );
857  update_global_top_priority(p);
858  highest_affected_level = p;
859  }
860  }
861  if ( p == my_global_bottom_priority ) {
862  // Arena priority was increased from the global bottom level.
863  __TBB_ASSERT( p < new_priority, NULL );
864  __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
865  while ( my_global_bottom_priority < my_global_top_priority
866  && !my_priority_levels[my_global_bottom_priority].workers_requested )
867  ++my_global_bottom_priority;
868  __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
869 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY
870  const bool enforced_concurrency = my_mandatory_num_requested && a.must_have_concurrency();
871 #else
872  const bool enforced_concurrency = false;
873 #endif
874  __TBB_ASSERT_EX( enforced_concurrency || my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
875  }
876  update_allotment( highest_affected_level );
877 
878  __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
880  return true;
881 }
882 #endif /* __TBB_TASK_PRIORITY */
883 
884 } // namespace internal
885 } // namespace tbb
unsigned num_workers_active()
The number of workers active in the arena.
Definition: arena.h:237
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
void free_arena()
Completes arena shutdown, destructs and deallocates it.
Definition: arena.cpp:253
arena * my_arena
The arena that I own (if master) or am servicing at the moment (if worker)
Definition: scheduler.h:78
unsigned my_ref_count
Reference count controlling market object lifetime.
Definition: market.h:150
T __TBB_load_with_acquire(const volatile T &location)
Definition: tbb_machine.h:716
Work stealing task scheduler.
Definition: scheduler.h:124
arena * arena_in_need(arena *)
Returns next arena that needs more workers, or NULL.
Definition: market.h:222
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
void detach_arena(arena &)
Removes the arena from the market&#39;s list.
Definition: market.cpp:314
static const unsigned skip_soft_limit_warning
The value indicating that the soft limit warning is unnecessary.
Definition: market.h:162
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
Definition: arena.h:221
bool is_worker() const
True if running on a worker thread, false otherwise.
Definition: scheduler.h:595
void remove_arena_from_list(arena &a)
Definition: market.cpp:46
arena_list_type my_arenas
List of registered arenas.
Definition: market.h:139
market(unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size)
Constructor.
Definition: market.cpp:68
unsigned my_num_workers_hard_limit
Maximal number of workers allowed for use by the underlying resource manager.
Definition: market.h:78
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena...
Definition: arena.h:55
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
Definition: market.h:147
int my_num_workers_requested
Number of workers currently requested from RML.
Definition: market.h:85
#define __TBB_TASK_PRIORITY
Definition: tbb_config.h:581
static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit)
Definition: market.cpp:90
T min(const T &val1, const T &val2)
Utility template function returning lesser of the two values.
Definition: tbb_misc.h:106
uintptr_t my_aba_epoch
ABA prevention marker.
Definition: arena.h:138
void insert_arena_into_list(arena &a)
Definition: market.cpp:33
bool my_join_workers
Shutdown mode.
Definition: market.h:159
#define __TBB_Yield()
Definition: ibm_aix51.h:48
void update_allotment()
Recalculates the number of workers assigned to each arena in the list.
Definition: market.h:215
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
Definition: tbb_stddef.h:270
atomic< unsigned > my_references
Reference counter for the arena.
Definition: arena.h:61
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
Definition: arena.cpp:106
void cleanup(job &j) __TBB_override
Definition: market.cpp:742
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert ...
Definition: tbb_stddef.h:171
static size_t active_value(parameter p)
void const char const char int ITT_FORMAT __itt_group_sync p
#define ITT_THREAD_SET_NAME(name)
Definition: itt_notify.h:121
The scoped locking pattern.
Definition: spin_rw_mutex.h:90
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
Definition: arena.h:103
static void add_ref()
Add reference to resources. If first reference added, acquire the resources.
Definition: tbb_main.cpp:120
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
Definition: arena.h:93
T max(const T &val1, const T &val2)
Utility template function returning greater of the two values.
Definition: tbb_misc.h:115
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.
arena_slot my_slots[1]
Definition: arena.h:300
static market * theMarket
Currently active global market.
Definition: market.h:62
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
rml::tbb_server * my_server
Pointer to the RML server object that services this TBB instance.
Definition: market.h:74
static const intptr_t num_priority_levels
static unsigned app_parallelism_limit()
Reports active parallelism level according to user&#39;s settings.
Definition: tbb_main.cpp:492
The graph class.
void adjust_demand(arena &, int delta)
Request that arena&#39;s need in workers should be adjusted.
Definition: market.cpp:590
#define GATHER_STATISTIC(x)
void lock()
Acquire writer lock.
#define _T(string_literal)
Standard Windows style macro to markup the string literals.
Definition: itt_notify.h:66
void unlock()
Release lock.
void try_destroy_arena(arena *, uintptr_t aba_epoch)
Removes the arena from the market&#39;s list.
Definition: market.cpp:322
void assert_market_valid() const
Definition: market.h:228
void destroy()
Destroys and deallocates market object created by market::create()
Definition: market.cpp:169
job * create_one_job() __TBB_override
Definition: market.cpp:760
static const unsigned ref_worker
Definition: arena.h:231
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
Definition: governor.cpp:120
static rml::tbb_server * create_rml_server(rml::tbb_client &)
Definition: governor.cpp:96
static generic_scheduler * local_scheduler_if_initialized()
Definition: governor.h:136
void const char const char int ITT_FORMAT __itt_group_sync s
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
Definition: governor.cpp:124
market * my_market
The market that owns this arena.
Definition: arena.h:135
arena * my_next_arena
The first arena to be checked when idle worker seeks for an arena to enter.
Definition: market.h:143
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.
Definition: market.cpp:300
unsigned my_public_ref_count
Count of master threads attached.
Definition: market.h:153
static unsigned default_num_threads()
Definition: governor.h:85
void process(job &j) __TBB_override
Definition: market.cpp:698
static void cleanup_worker(void *arg, bool worker)
Perform necessary cleanup when a worker thread finishes.
Definition: scheduler.cpp:1294
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
bool is_arena_in_list(arena_list_type &arenas, arena *a)
Definition: market.cpp:417
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
atomic< unsigned > my_first_unused_worker_idx
First unused index of worker.
Definition: market.h:90
static generic_scheduler * create_worker(market &m, size_t index)
Initialize a scheduler for a worker thread.
Definition: scheduler.cpp:1239
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
Definition: arena.h:96
arenas_list_mutex_type my_arenas_list_mutex
Definition: market.h:71
void acknowledge_close_connection() __TBB_override
Definition: market.cpp:756
static global_market_mutex_type theMarketMutex
Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas, and cancellation propagation.
Definition: market.h:67
static void remove_ref()
Remove reference to resources. If last reference removed, release the resources.
Definition: tbb_main.cpp:125
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
Definition: market.cpp:296
size_t my_stack_size
Stack size of worker threads.
Definition: market.h:156
Release.
Definition: atomic.h:49
atomic< T > & as_atomic(T &t)
Definition: atomic.h:547
bool release(bool is_public, bool blocking_terminate)
Decrements market&#39;s refcount and destroys it in the end.
Definition: market.cpp:179
unsigned my_num_workers_soft_limit
Current application-imposed limit on the number of workers (see set_active_num_workers()) ...
Definition: market.h:82
int my_total_demand
Number of workers that were requested by all arenas.
Definition: market.h:93
static void set_active_num_workers(unsigned w)
Set number of active workers.
Definition: market.cpp:221
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
Definition: arena.cpp:242
unsigned my_workers_soft_limit_to_report
Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning.
Definition: market.h:165
static bool UsePrivateRML
Definition: governor.h:65
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.
Definition: market.cpp:100

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.