34 #if __TBB_TASK_PRIORITY 36 arena *&next = my_priority_levels[a.my_top_priority].next_arena;
42 if ( arenas.
size() == 1 )
43 next = &*arenas.
begin();
47 #if __TBB_TASK_PRIORITY 49 arena *&next = my_priority_levels[a.my_top_priority].next_arena;
54 arena_list_type::iterator it = next;
57 if ( ++it == arenas.
end() && arenas.
size() > 1 )
68 market::market (
unsigned workers_soft_limit,
unsigned workers_hard_limit,
size_t stack_size )
72 , my_global_top_priority(normalized_normal_priority)
73 , my_global_bottom_priority(normalized_normal_priority)
79 #if __TBB_TASK_PRIORITY 92 workers_soft_limit = soft_limit-1;
95 if( workers_soft_limit >= workers_hard_limit )
96 workers_soft_limit = workers_hard_limit-1;
97 return workers_soft_limit;
107 if( old_public_count==0 )
113 "skip_soft_limit_warning must be larger than any valid workers_requested" );
115 if( soft_limit_to_report < workers_requested ) {
117 "The request for %u workers is ignored. Further requests for more workers " 118 "will be silently ignored until the limit changes.\n",
119 soft_limit_to_report, workers_requested );
129 "The request for larger stack (%u) cannot be satisfied.\n",
134 if( stack_size == 0 )
148 #if __TBB_TASK_GROUP_CONTEXT 150 "my_workers must be the last data field of the market class");
155 memset( storage, 0, size );
157 m =
new (storage)
market( workers_soft_limit, workers_hard_limit, stack_size );
163 runtime_warning(
"RML might limit the number of workers to %u while %u is requested.\n" 164 , m->
my_server->default_concurrency(), workers_soft_limit );
170 #if __TBB_COUNT_TASK_NODES 171 if ( my_task_node_count )
172 runtime_warning(
"Leaked %ld task objects\n", (
long)my_task_node_count );
174 this->market::~market();
181 bool do_release =
false;
184 if ( blocking_terminate ) {
185 __TBB_ASSERT( is_public,
"Only an object with a public reference can request the blocking terminate" );
216 return blocking_terminate;
222 int old_requested=0, requested=0;
223 bool need_mandatory =
false;
241 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 244 if( !(m->my_mandatory_num_requested && !soft_limit) )
248 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 249 m->my_mandatory_num_requested? 0 :
252 requested =
min(demand, (
int)soft_limit);
255 #if __TBB_TASK_PRIORITY 256 m->my_priority_levels[m->my_global_top_priority].workers_available = soft_limit;
262 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 263 if( !m->my_mandatory_num_requested && !soft_limit ) {
266 #if __TBB_TASK_PRIORITY 267 for(
int p = m->my_global_top_priority;
p >= m->my_global_bottom_priority; --
p ) {
268 priority_level_info &pl = m->my_priority_levels[
p];
274 for( arena_list_type::iterator it = arenas.
begin(); it != arenas.
end(); ++it ) {
275 if( !it->my_task_stream.empty(p) ) {
277 if( m->mandatory_concurrency_enable_impl( &*it ) )
278 need_mandatory =
true;
281 #if __TBB_TASK_PRIORITY 288 int delta = requested - old_requested;
289 if( need_mandatory ) ++delta;
291 m->
my_server->adjust_job_count_estimate( delta );
297 return ((
const market&)client).must_join_workers();
330 #if __TBB_TASK_PRIORITY 334 priority_level_info &pl = my_priority_levels[
p];
337 arena_list_type::iterator it = my_arenas.
begin();
338 for ( ; it != my_arenas.
end(); ++it ) {
340 if ( it->my_aba_epoch == aba_epoch ) {
356 #if __TBB_TASK_PRIORITY 364 if ( arenas.
empty() )
366 arena_list_type::iterator it = hint;
370 if ( ++it == arenas.
end() )
373 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 374 && !a.recall_by_mandatory_request()
380 }
while ( it != hint );
386 max_workers =
min(workers_demand, max_workers);
389 arena_list_type::iterator it = arenas.
begin();
390 for ( ; it != arenas.
end(); ++it ) {
397 int allotted = tmp / workers_demand;
398 carry = tmp % workers_demand;
401 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 402 if ( !allotted && a.must_have_concurrency() )
406 assigned += allotted;
408 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 419 for ( arena_list_type::iterator it = arenas.
begin(); it != arenas.
end(); ++it )
426 #if __TBB_TASK_PRIORITY 427 inline void market::update_global_top_priority ( intptr_t newPriority ) {
429 my_global_top_priority = newPriority;
430 my_priority_levels[newPriority].workers_available =
431 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 435 advance_global_reload_epoch();
438 inline void market::reset_global_priority () {
439 my_global_bottom_priority = normalized_normal_priority;
440 update_global_top_priority(normalized_normal_priority);
448 int p = my_global_top_priority;
453 a =
arena_in_need( my_priority_levels[p].arenas, prev_arena );
456 while ( !a && p >= my_global_bottom_priority ) {
457 priority_level_info &pl = my_priority_levels[p--];
470 intptr_t i = highest_affected_priority;
471 int available = my_priority_levels[i].workers_available;
472 for ( ; i >= my_global_bottom_priority; --i ) {
473 priority_level_info &pl = my_priority_levels[i];
474 pl.workers_available = available;
475 if ( pl.workers_requested ) {
476 available -=
update_allotment( pl.arenas, pl.workers_requested, available );
477 if ( available < 0 ) {
483 __TBB_ASSERT( i <= my_global_bottom_priority || !available, NULL );
484 for ( --i; i >= my_global_bottom_priority; --i ) {
485 priority_level_info &pl = my_priority_levels[i];
486 pl.workers_available = 0;
487 arena_list_type::iterator it = pl.arenas.begin();
488 for ( ; it != pl.arenas.end(); ++it ) {
489 __TBB_ASSERT( it->my_num_workers_requested >= 0 || !it->my_num_workers_allotted, NULL );
490 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 491 it->my_num_workers_allotted = it->must_have_concurrency() ? 1 : 0;
493 it->my_num_workers_allotted = 0;
500 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 501 bool market::mandatory_concurrency_enable_impl (
arena *a,
bool *enabled ) {
502 if( a->my_concurrency_mode==arena_base::cm_enforced_global ) {
510 a->my_concurrency_mode = arena_base::cm_enforced_global;
511 #if __TBB_TASK_PRIORITY 512 priority_level_info &pl = my_priority_levels[a->my_top_priority];
513 pl.workers_requested++;
514 if( my_global_top_priority < a->my_top_priority ) {
515 my_global_top_priority = a->my_top_priority;
516 advance_global_reload_epoch();
521 if( 1 == ++my_mandatory_num_requested ) {
528 bool market::mandatory_concurrency_enable (
arena *a ) {
533 add_thread = mandatory_concurrency_enable_impl(a, &enabled);
536 my_server->adjust_job_count_estimate( 1 );
540 void market::mandatory_concurrency_disable (
arena *a ) {
541 bool remove_thread =
false;
542 int delta_adjust_demand = 0;
547 if( a->my_concurrency_mode!=arena_base::cm_enforced_global )
551 #if __TBB_TASK_PRIORITY 552 if ( a->my_top_priority != normalized_normal_priority ) {
553 update_arena_top_priority( *a, normalized_normal_priority );
555 a->my_bottom_priority = normalized_normal_priority;
558 int val = --my_mandatory_num_requested;
562 remove_thread =
true;
570 #if __TBB_TASK_PRIORITY 571 priority_level_info &pl = my_priority_levels[a->my_top_priority];
572 pl.workers_requested--;
573 intptr_t
p = my_global_top_priority;
574 for (; !my_priority_levels[
p].workers_requested && p>0; p--)
577 reset_global_priority();
578 else if( p!= my_global_top_priority )
579 update_global_top_priority(p);
581 a->my_concurrency_mode = arena::cm_normal;
583 if( delta_adjust_demand )
586 my_server->adjust_job_count_estimate( -1 );
598 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 600 if ( a.
my_market->my_mandatory_num_requested && a.my_concurrency_mode!=arena_base::cm_normal )
605 if ( prev_req <= 0 ) {
611 else if ( prev_req < 0 ) {
615 #if !__TBB_TASK_PRIORITY 618 intptr_t
p = a.my_top_priority;
619 priority_level_info &pl = my_priority_levels[
p];
620 pl.workers_requested += delta;
623 if ( a.my_top_priority != normalized_normal_priority ) {
625 update_arena_top_priority( a, normalized_normal_priority );
627 a.my_bottom_priority = normalized_normal_priority;
629 if ( p == my_global_top_priority ) {
630 if ( !pl.workers_requested ) {
631 while ( --p >= my_global_bottom_priority && !my_priority_levels[p].workers_requested )
633 if ( p < my_global_bottom_priority )
634 reset_global_priority();
636 update_global_top_priority(p);
640 else if ( p > my_global_top_priority ) {
644 update_global_top_priority(p);
646 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 649 && a.
my_market->my_mandatory_num_requested && a.my_concurrency_mode!=arena_base::cm_normal )
655 else if ( p == my_global_bottom_priority ) {
656 if ( !pl.workers_requested ) {
657 while ( ++p <= my_global_top_priority && !my_priority_levels[p].workers_requested )
659 if ( p > my_global_top_priority )
660 reset_global_priority();
662 my_global_bottom_priority =
p;
667 else if ( p < my_global_bottom_priority ) {
668 int prev_bottom = my_global_bottom_priority;
669 my_global_bottom_priority =
p;
673 __TBB_ASSERT( my_global_bottom_priority < p && p < my_global_top_priority, NULL );
694 my_server->adjust_job_count_estimate( delta );
704 query_interval = 1000,
707 for(
int i = first_interval; ; i--) {
721 #if !__TBB_SLEEP_PERMISSION 725 #if __TBB_TASK_PRIORITY 726 arena_list_type &al = my_priority_levels[my_global_top_priority].arenas;
732 if( the_global_observer_list.ask_permission_to_leave() )
737 #endif// !__TBB_SLEEP_PERMISSION 766 #if __TBB_TASK_GROUP_CONTEXT 769 my_workers[index - 1] =
s;
774 #if __TBB_TASK_PRIORITY 775 void market::update_arena_top_priority (
arena& a, intptr_t new_priority ) {
777 __TBB_ASSERT( a.my_top_priority != new_priority, NULL );
778 priority_level_info &prev_level = my_priority_levels[a.my_top_priority],
779 &new_level = my_priority_levels[new_priority];
781 a.my_top_priority = new_priority;
786 __TBB_ASSERT( prev_level.workers_requested >= 0 && new_level.workers_requested >= 0, NULL );
789 bool market::lower_arena_priority (
arena& a, intptr_t new_priority, uintptr_t old_reload_epoch ) {
792 if ( a.my_reload_epoch != old_reload_epoch ) {
797 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
799 intptr_t
p = a.my_top_priority;
800 update_arena_top_priority( a, new_priority );
802 if ( my_global_bottom_priority > new_priority ) {
803 my_global_bottom_priority = new_priority;
805 if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
807 for ( --p; p>my_global_bottom_priority && !my_priority_levels[
p].workers_requested; --
p )
continue;
808 update_global_top_priority(p);
813 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
818 bool market::update_arena_priority (
arena& a, intptr_t new_priority ) {
822 tbb::internal::assert_priority_valid(new_priority);
825 if ( a.my_top_priority == new_priority ) {
828 else if ( a.my_top_priority > new_priority ) {
829 if ( a.my_bottom_priority > new_priority )
830 a.my_bottom_priority = new_priority;
837 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
839 intptr_t
p = a.my_top_priority;
840 intptr_t highest_affected_level =
max(p, new_priority);
841 update_arena_top_priority( a, new_priority );
843 if ( my_global_top_priority < new_priority ) {
844 update_global_top_priority(new_priority);
846 else if ( my_global_top_priority == new_priority ) {
847 advance_global_reload_epoch();
850 __TBB_ASSERT( new_priority < my_global_top_priority, NULL );
851 __TBB_ASSERT( new_priority > my_global_bottom_priority, NULL );
852 if ( p == my_global_top_priority && !my_priority_levels[p].workers_requested ) {
855 for ( --p; !my_priority_levels[
p].workers_requested; --
p )
continue;
857 update_global_top_priority(p);
858 highest_affected_level =
p;
861 if ( p == my_global_bottom_priority ) {
864 __TBB_ASSERT( new_priority <= my_global_top_priority, NULL );
865 while ( my_global_bottom_priority < my_global_top_priority
866 && !my_priority_levels[my_global_bottom_priority].workers_requested )
867 ++my_global_bottom_priority;
868 __TBB_ASSERT( my_global_bottom_priority <= new_priority, NULL );
869 #if __TBB_ENQUEUE_ENFORCED_CONCURRENCY 870 const bool enforced_concurrency = my_mandatory_num_requested && a.must_have_concurrency();
872 const bool enforced_concurrency =
false;
874 __TBB_ASSERT_EX( enforced_concurrency || my_priority_levels[my_global_bottom_priority].workers_requested > 0, NULL );
878 __TBB_ASSERT( my_global_top_priority >= a.my_top_priority, NULL );
unsigned num_workers_active()
The number of workers active in the arena.
void __TBB_EXPORTED_FUNC runtime_warning(const char *format,...)
Report a runtime warning.
void *__TBB_EXPORTED_FUNC NFS_Allocate(size_t n_element, size_t element_size, void *hint)
Allocate memory on cache/sector line boundary.
void free_arena()
Completes arena shutdown, destructs and deallocates it.
arena * my_arena
The arena that I own (if master) or am servicing at the moment (if worker)
unsigned my_ref_count
Reference count controlling market object lifetime.
T __TBB_load_with_acquire(const volatile T &location)
Work stealing task scheduler.
arena * arena_in_need(arena *)
Returns next arena that needs more workers, or NULL.
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
void detach_arena(arena &)
Removes the arena from the market's list.
static const unsigned skip_soft_limit_warning
The value indicating that the soft limit warning is unnecessary.
static const pool_state_t SNAPSHOT_EMPTY
No tasks to steal since last snapshot was taken.
bool is_worker() const
True if running on a worker thread, false otherwise.
void remove_arena_from_list(arena &a)
arena_list_type my_arenas
List of registered arenas.
market(unsigned workers_soft_limit, unsigned workers_hard_limit, size_t stack_size)
Constructor.
unsigned my_num_workers_hard_limit
Maximal number of workers allowed for use by the underlying resource manager.
unsigned my_num_workers_allotted
The number of workers that have been marked out by the resource manager to service the arena...
uintptr_t my_arenas_aba_epoch
ABA prevention marker to assign to newly created arenas.
int my_num_workers_requested
Number of workers currently requested from RML.
#define __TBB_TASK_PRIORITY
static unsigned calc_workers_soft_limit(unsigned workers_soft_limit, unsigned workers_hard_limit)
T min(const T &val1, const T &val2)
Utility template function returning lesser of the two values.
uintptr_t my_aba_epoch
ABA prevention marker.
void insert_arena_into_list(arena &a)
bool my_join_workers
Shutdown mode.
void update_allotment()
Recalculates the number of workers assigned to each arena in the list.
#define __TBB_offsetof(class_name, member_name)
Extended variant of the standard offsetof macro.
atomic< unsigned > my_references
Reference counter for the arena.
void process(generic_scheduler &)
Registers the worker with the arena and enters TBB scheduler dispatch loop.
void cleanup(job &j) __TBB_override
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert ...
static size_t active_value(parameter p)
void const char const char int ITT_FORMAT __itt_group_sync p
#define ITT_THREAD_SET_NAME(name)
The scoped locking pattern.
tbb::atomic< uintptr_t > my_pool_state
Current task pool state and estimate of available tasks amount.
static void add_ref()
Add reference to resources. If first reference added, acquire the resources.
unsigned my_max_num_workers
The number of workers requested by the master thread owning the arena.
T max(const T &val1, const T &val2)
Utility template function returning greater of the two values.
generic_scheduler * my_scheduler
Scheduler of the thread attached to the slot.
static market * theMarket
Currently active global market.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void * lock
rml::tbb_server * my_server
Pointer to the RML server object that services this TBB instance.
static const intptr_t num_priority_levels
static unsigned app_parallelism_limit()
Reports active parallelism level according to user's settings.
void adjust_demand(arena &, int delta)
Request that arena's need in workers should be adjusted.
#define GATHER_STATISTIC(x)
void lock()
Acquire writer lock.
#define _T(string_literal)
Standard Windows style macro to markup the string literals.
void unlock()
Release lock.
void try_destroy_arena(arena *, uintptr_t aba_epoch)
Removes the arena from the market's list.
void assert_market_valid() const
void destroy()
Destroys and deallocates market object created by market::create()
job * create_one_job() __TBB_override
static const unsigned ref_worker
static void assume_scheduler(generic_scheduler *s)
Temporarily set TLS slot to the given scheduler.
static rml::tbb_server * create_rml_server(rml::tbb_client &)
static generic_scheduler * local_scheduler_if_initialized()
void const char const char int ITT_FORMAT __itt_group_sync s
static bool is_set(generic_scheduler *s)
Used to check validity of the local scheduler TLS contents.
market * my_market
The market that owns this arena.
arena * my_next_arena
The first arena to be checked when idle worker seeks for an arena to enter.
static arena * create_arena(int num_slots, int num_reserved_slots, size_t stack_size)
Creates an arena object.
unsigned my_public_ref_count
Count of master threads attached.
static unsigned default_num_threads()
void process(job &j) __TBB_override
static void cleanup_worker(void *arg, bool worker)
Perform necessary cleanup when a worker thread finishes.
void __TBB_EXPORTED_FUNC NFS_Free(void *)
Free memory allocated by NFS_Allocate.
bool is_arena_in_list(arena_list_type &arenas, arena *a)
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t size
atomic< unsigned > my_first_unused_worker_idx
First unused index of worker.
static generic_scheduler * create_worker(market &m, size_t index)
Initialize a scheduler for a worker thread.
int my_num_workers_requested
The number of workers that are currently requested from the resource manager.
arenas_list_mutex_type my_arenas_list_mutex
void acknowledge_close_connection() __TBB_override
static global_market_mutex_type theMarketMutex
Mutex guarding creation/destruction of theMarket, insertions/deletions in my_arenas, and cancellation propagation.
static void remove_ref()
Remove reference to resources. If last reference removed, release the resources.
static bool does_client_join_workers(const tbb::internal::rml::tbb_client &client)
size_t my_stack_size
Stack size of worker threads.
atomic< T > & as_atomic(T &t)
bool release(bool is_public, bool blocking_terminate)
Decrements market's refcount and destroys it in the end.
unsigned my_num_workers_soft_limit
Current application-imposed limit on the number of workers (see set_active_num_workers()) ...
int my_total_demand
Number of workers that were requested by all arenas.
static void set_active_num_workers(unsigned w)
Set number of active workers.
static arena & allocate_arena(market &, unsigned num_slots, unsigned num_reserved_slots)
Allocate an instance of arena.
unsigned my_workers_soft_limit_to_report
Either workers soft limit to be reported via runtime_warning() or skip_soft_limit_warning.
static bool UsePrivateRML
static market & global_market(bool is_public, unsigned max_num_workers=0, size_t stack_size=0)
Factory method creating new market object.