Intel(R) Threading Building Blocks Doxygen Documentation  version 4.2.3
_flow_graph_impl.h
Go to the documentation of this file.
1 /*
2  Copyright (c) 2005-2019 Intel Corporation
3 
4  Licensed under the Apache License, Version 2.0 (the "License");
5  you may not use this file except in compliance with the License.
6  You may obtain a copy of the License at
7 
8  http://www.apache.org/licenses/LICENSE-2.0
9 
10  Unless required by applicable law or agreed to in writing, software
11  distributed under the License is distributed on an "AS IS" BASIS,
12  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13  See the License for the specific language governing permissions and
14  limitations under the License.
15 
16 
17 
18 
19 */
20 
21 #ifndef __TBB_flow_graph_impl_H
22 #define __TBB_flow_graph_impl_H
23 
24 #include "../tbb_stddef.h"
25 #include "../task.h"
26 #include "../task_arena.h"
27 #include "../flow_graph_abstractions.h"
28 
29 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
30 #include "../concurrent_priority_queue.h"
31 #endif
32 
33 #include <list>
34 
35 #if TBB_DEPRECATED_FLOW_ENQUEUE
36 #define FLOW_SPAWN(a) tbb::task::enqueue((a))
37 #else
38 #define FLOW_SPAWN(a) tbb::task::spawn((a))
39 #endif
40 
41 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
42 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr ) expr
43 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority ) , priority
44 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1, priority
45 #else
46 #define __TBB_FLOW_GRAPH_PRIORITY_EXPR( expr )
47 #define __TBB_FLOW_GRAPH_PRIORITY_ARG0( priority )
48 #define __TBB_FLOW_GRAPH_PRIORITY_ARG1( arg1, priority ) arg1
49 #endif // __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
50 
51 namespace tbb {
52 namespace flow {
53 
54 namespace internal {
55 static tbb::task * const SUCCESSFULLY_ENQUEUED = (task *)-1;
56 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
57 typedef unsigned int node_priority_t;
58 static const node_priority_t no_priority = node_priority_t(0);
59 #endif
60 }
61 
62 namespace interface10 {
63 
65 
66 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
70 struct graph_task : public task {
71  graph_task( node_priority_t node_priority = no_priority ) : priority( node_priority ) {}
73 };
74 #else
75 typedef task graph_task;
76 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
77 
78 
79 class graph;
80 class graph_node;
81 
82 template <typename GraphContainerType, typename GraphNodeType>
84  friend class graph;
85  friend class graph_node;
86 public:
87  typedef size_t size_type;
88  typedef GraphNodeType value_type;
89  typedef GraphNodeType* pointer;
90  typedef GraphNodeType& reference;
91  typedef const GraphNodeType& const_reference;
92  typedef std::forward_iterator_tag iterator_category;
93 
95  graph_iterator() : my_graph(NULL), current_node(NULL) {}
96 
99  my_graph(other.my_graph), current_node(other.current_node)
100  {}
101 
104  if (this != &other) {
105  my_graph = other.my_graph;
106  current_node = other.current_node;
107  }
108  return *this;
109  }
110 
112  reference operator*() const;
113 
115  pointer operator->() const;
116 
118  bool operator==(const graph_iterator& other) const {
119  return ((my_graph == other.my_graph) && (current_node == other.current_node));
120  }
121 
123  bool operator!=(const graph_iterator& other) const { return !(operator==(other)); }
124 
127  internal_forward();
128  return *this;
129  }
130 
133  graph_iterator result = *this;
134  operator++();
135  return result;
136  }
137 
138 private:
139  // the graph over which we are iterating
140  GraphContainerType *my_graph;
141  // pointer into my_graph's my_nodes list
142  pointer current_node;
143 
145  graph_iterator(GraphContainerType *g, bool begin);
146  void internal_forward();
147 }; // class graph_iterator
148 
149 // flags to modify the behavior of the graph reset(). Can be combined.
152  rf_reset_bodies = 1 << 0, // delete the current node body, reset to a copy of the initial node body.
153  rf_clear_edges = 1 << 1 // delete edges
154 };
155 
156 namespace internal {
157 
158 void activate_graph(graph& g);
159 void deactivate_graph(graph& g);
160 bool is_graph_active(graph& g);
161 void spawn_in_graph_arena(graph& g, tbb::task& arena_task);
163 template<typename F> void execute_in_graph_arena(graph& g, F& f);
164 
165 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
167  bool operator()(const graph_task* left, const graph_task* right) {
168  return left->priority < right->priority;
169  }
170 };
171 
173 
174 class priority_task_selector : public task {
175 public:
176  priority_task_selector(graph_task_priority_queue_t& priority_queue)
177  : my_priority_queue(priority_queue) {}
179  graph_task* t = NULL;
180  bool result = my_priority_queue.try_pop(t);
181  __TBB_ASSERT_EX( result, "Number of critical tasks for scheduler and tasks"
182  " in graph's priority queue mismatched" );
184  "Incorrect task submitted to graph priority queue" );
186  "Tasks from graph's priority queue must have priority" );
187  task* t_next = t->execute();
188  task::destroy(*t);
189  return t_next;
190  }
191 private:
192  graph_task_priority_queue_t& my_priority_queue;
193 };
194 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
195 
196 }
197 
199 
201  friend class graph_node;
202 
203  template< typename Body >
204  class run_task : public graph_task {
205  public:
206  run_task(Body& body
208  , node_priority_t node_priority = no_priority
209  ) : graph_task(node_priority),
210 #else
211  ) :
212 #endif
213  my_body(body) { }
215  my_body();
216  return NULL;
217  }
218  private:
219  Body my_body;
220  };
221 
222  template< typename Receiver, typename Body >
223  class run_and_put_task : public graph_task {
224  public:
225  run_and_put_task(Receiver &r, Body& body) : my_receiver(r), my_body(body) {}
227  tbb::task *res = my_receiver.try_put_task(my_body());
228  if (res == SUCCESSFULLY_ENQUEUED) res = NULL;
229  return res;
230  }
231  private:
232  Receiver &my_receiver;
233  Body my_body;
234  };
235  typedef std::list<tbb::task *> task_list_type;
236 
237  class wait_functor {
239  public:
240  wait_functor(tbb::task* t) : graph_root_task(t) {}
241  void operator()() const { graph_root_task->wait_for_all(); }
242  };
243 
247  public:
248  spawn_functor(tbb::task& t) : spawn_task(t) {}
249  void operator()() const {
250  FLOW_SPAWN(spawn_task);
251  }
252  };
253 
254  void prepare_task_arena(bool reinit = false) {
255  if (reinit) {
256  __TBB_ASSERT(my_task_arena, "task arena is NULL");
257  my_task_arena->terminate();
258  my_task_arena->initialize(tbb::task_arena::attach());
259  }
260  else {
261  __TBB_ASSERT(my_task_arena == NULL, "task arena is not NULL");
262  my_task_arena = new tbb::task_arena(tbb::task_arena::attach());
263  }
264  if (!my_task_arena->is_active()) // failed to attach
265  my_task_arena->initialize(); // create a new, default-initialized arena
266  __TBB_ASSERT(my_task_arena->is_active(), "task arena is not active");
267  }
268 
269 public:
271  graph();
272 
274  explicit graph(tbb::task_group_context& use_this_context);
275 
277 
278  ~graph();
279 
280 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
281  void set_name(const char *name);
282 #endif
283 
285  reserve_wait();
286  }
287 
289  release_wait();
290  }
291 
293 
295  void reserve_wait() __TBB_override;
296 
298 
300  void release_wait() __TBB_override;
301 
303 
305  template< typename Receiver, typename Body >
306  void run(Receiver &r, Body body) {
307  if (internal::is_graph_active(*this)) {
308  task* rtask = new (task::allocate_additional_child_of(*root_task()))
310  my_task_arena->execute(spawn_functor(*rtask));
311  }
312  }
313 
315 
317  template< typename Body >
318  void run(Body body) {
319  if (internal::is_graph_active(*this)) {
320  task* rtask = new (task::allocate_additional_child_of(*root_task())) run_task< Body >(body);
321  my_task_arena->execute(spawn_functor(*rtask));
322  }
323  }
324 
326 
327  void wait_for_all() {
328  cancelled = false;
329  caught_exception = false;
330  if (my_root_task) {
331 #if TBB_USE_EXCEPTIONS
332  try {
333 #endif
334  my_task_arena->execute(wait_functor(my_root_task));
335  cancelled = my_context->is_group_execution_cancelled();
336 #if TBB_USE_EXCEPTIONS
337  }
338  catch (...) {
339  my_root_task->set_ref_count(1);
340  my_context->reset();
341  caught_exception = true;
342  cancelled = true;
343  throw;
344  }
345 #endif
346  // TODO: the "if" condition below is just a work-around to support the concurrent wait
347  // mode. The cancellation and exception mechanisms are still broken in this mode.
348  // Consider using task group not to re-implement the same functionality.
349  if (!(my_context->traits() & tbb::task_group_context::concurrent_wait)) {
350  my_context->reset(); // consistent with behavior in catch()
351  my_root_task->set_ref_count(1);
352  }
353  }
354  }
355 
358  return my_root_task;
359  }
360 
361  // ITERATORS
362  template<typename C, typename N>
363  friend class graph_iterator;
364 
365  // Graph iterator typedefs
368 
369  // Graph iterator constructors
371  iterator begin();
373  iterator end();
375  const_iterator begin() const;
377  const_iterator end() const;
379  const_iterator cbegin() const;
381  const_iterator cend() const;
382 
384  bool is_cancelled() { return cancelled; }
385  bool exception_thrown() { return caught_exception; }
386 
387  // thread-unsafe state reset.
388  void reset(reset_flags f = rf_reset_protocol);
389 
390 private:
394  bool cancelled;
397  task_list_type my_reset_task_list;
398 
400 
402  void register_node(graph_node *n);
403  void remove_node(graph_node *n);
404 
406 
407 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
409 #endif
410 
411  friend void internal::activate_graph(graph& g);
412  friend void internal::deactivate_graph(graph& g);
413  friend bool internal::is_graph_active(graph& g);
414  friend void internal::spawn_in_graph_arena(graph& g, tbb::task& arena_task);
416  template<typename F> friend void internal::execute_in_graph_arena(graph& g, F& f);
417 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
418  template<typename Input, typename Output, typename Policy, typename Allocator>
419  friend class async_node;
420 #endif
421 
423 
424 }; // class graph
425 
428  friend class graph;
429  template<typename C, typename N>
430  friend class graph_iterator;
431 protected:
433  graph_node *next, *prev;
434 public:
435  explicit graph_node(graph& g);
436 
437  virtual ~graph_node();
438 
439 #if TBB_PREVIEW_FLOW_GRAPH_TRACE
440  virtual void set_name(const char *name) = 0;
441 #endif
442 
443 #if TBB_DEPRECATED_FLOW_NODE_EXTRACTION
444  virtual void extract() = 0;
445 #endif
446 
447 protected:
448  // performs the reset on an individual node.
449  virtual void reset_node(reset_flags f = rf_reset_protocol) = 0;
450 }; // class graph_node
451 
452 namespace internal {
453 
454 inline void activate_graph(graph& g) {
455  g.my_is_active = true;
456 }
457 
458 inline void deactivate_graph(graph& g) {
459  g.my_is_active = false;
460 }
461 
462 inline bool is_graph_active(graph& g) {
463  return g.my_is_active;
464 }
465 
467 template<typename F>
468 inline void execute_in_graph_arena(graph& g, F& f) {
469  if (is_graph_active(g)) {
471  g.my_task_arena->execute(f);
472  }
473 }
474 
476 inline void spawn_in_graph_arena(graph& g, tbb::task& arena_task) {
477  task* task_to_spawn = &arena_task;
478 #if __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
479  // TODO: change flow graph's interfaces to work with graph_task type instead of tbb::task.
480  graph_task* t = static_cast<graph_task*>(&arena_task);
481  if( t->priority != no_priority ) {
486  task_to_spawn = new( t->allocate_continuation() ) priority_task_selector(g.my_priority_queue);
487  tbb::internal::make_critical( *task_to_spawn );
488  g.my_priority_queue.push(t);
489  }
490 #endif /* __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES */
491  graph::spawn_functor s_fn(*task_to_spawn);
492  execute_in_graph_arena(g, s_fn);
493 }
494 
496  g.my_reset_task_list.push_back(tp);
497 }
498 
499 } // namespace internal
500 
501 } // namespace interface10
502 } // namespace flow
503 } // namespace tbb
504 
505 #endif // __TBB_flow_graph_impl_H
bool operator!=(const graph_iterator &other) const
Inequality.
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp __itt_timestamp end
bool operator==(const cache_aligned_allocator< T > &, const cache_aligned_allocator< U > &)
static tbb::task *const SUCCESSFULLY_ENQUEUED
#define __TBB_override
Definition: tbb_stddef.h:244
void run(Body body)
Spawns a task that runs a function object.
void add_task_to_graph_reset_list(graph &g, tbb::task *tp)
internal::return_type_or_void< F >::type execute(F &f)
Definition: task_arena.h:342
bool operator()(const graph_task *left, const graph_task *right)
#define __TBB_ASSERT(predicate, comment)
No-op version of __TBB_ASSERT.
Definition: tbb_stddef.h:169
#define __TBB_PREVIEW_FLOW_GRAPH_PRIORITIES
Definition: tbb_config.h:825
Base class for types that should not be assigned.
Definition: tbb_stddef.h:324
void const char const char int ITT_FORMAT __itt_group_sync x void const char ITT_FORMAT __itt_group_sync s void ITT_FORMAT __itt_group_sync p void ITT_FORMAT p void ITT_FORMAT p no args __itt_suppress_mode_t unsigned int void size_t ITT_FORMAT d void ITT_FORMAT p void ITT_FORMAT p __itt_model_site __itt_model_site_instance ITT_FORMAT p __itt_model_task __itt_model_task_instance ITT_FORMAT p void ITT_FORMAT p void ITT_FORMAT p void size_t ITT_FORMAT d void ITT_FORMAT p const wchar_t ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s const char ITT_FORMAT s no args void ITT_FORMAT p size_t ITT_FORMAT d no args const wchar_t const wchar_t ITT_FORMAT s __itt_heap_function void size_t int ITT_FORMAT d __itt_heap_function void ITT_FORMAT p __itt_heap_function void void size_t int ITT_FORMAT d no args no args unsigned int ITT_FORMAT u const __itt_domain __itt_id ITT_FORMAT lu const __itt_domain __itt_id __itt_id __itt_string_handle ITT_FORMAT p const __itt_domain __itt_id ITT_FORMAT p const __itt_domain __itt_id __itt_timestamp begin
std::forward_iterator_tag iterator_category
tbb::task_group_context * my_context
void run(Receiver &r, Body body)
Spawns a task that runs a body and puts its output to a specific receiver.
internal::allocate_continuation_proxy & allocate_continuation()
Returns proxy for overloaded new that allocates a continuation task of *this.
Definition: task.h:649
graph_iterator(const graph_iterator &other)
Copy constructor.
The base of all graph nodes.
#define FLOW_SPAWN(a)
#define __TBB_ASSERT_EX(predicate, comment)
"Extended" version is useful to suppress warnings if a variable is only used with an assert ...
Definition: tbb_stddef.h:171
Pure virtual template classes that define interfaces for async communication.
graph_iterator< graph, graph_node > iterator
graph_iterator< const graph, const graph_node > const_iterator
static const node_priority_t no_priority
unsigned int node_priority_t
run_task(Body &body, node_priority_t node_priority=no_priority)
tbb::task * root_task()
Returns the root task of the graph.
graph_iterator operator++(int)
Post-increment.
void push(const_reference elem)
Pushes elem onto the queue, increasing capacity of queue if necessary.
graph_iterator & operator=(const graph_iterator &other)
Assignment.
virtual task * execute()=0
Should be overridden by derived classes.
The graph class.
bool operator==(const graph_iterator &other) const
Equality.
graph_iterator & operator++()
Pre-increment.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
std::list< tbb::task * > task_list_type
Implements async node.
Definition: flow_graph.h:3493
Used to form groups of tasks.
Definition: task.h:335
internal::graph_task_priority_queue_t my_priority_queue
void prepare_task_arena(bool reinit=false)
void wait_for_all()
Wait until graph is idle and decrement_wait_count calls equals increment_wait_count calls...
task * execute() __TBB_override
Should be overridden by derived classes.
void execute_in_graph_arena(graph &g, F &f)
Executes custom functor inside graph arena.
tbb::task * execute() __TBB_override
Should be overridden by derived classes.
Base class for user-defined tasks.
Definition: task.h:592
void spawn_in_graph_arena(graph &g, tbb::task &arena_task)
Spawns a task inside graph arena.
void wait_for_all()
Wait for reference count to become one, and set reference count to zero.
Definition: task.h:792
void make_critical(task &t)
Definition: task.h:952
bool is_cancelled()
return status of graph execution
A lock that occupies a single byte.
Definition: spin_mutex.h:40
Base class for types that should not be copied or assigned.
Definition: tbb_stddef.h:335
void const char const char int ITT_FORMAT __itt_group_sync x void const char * name
graph_task(node_priority_t node_priority=no_priority)
Base class for tasks generated by graph nodes.
tbb::concurrent_priority_queue< graph_task *, graph_task_comparator > graph_task_priority_queue_t
priority_task_selector(graph_task_priority_queue_t &priority_queue)
Tag class used to indicate the "attaching" constructor.
Definition: task_arena.h:235

Copyright © 2005-2019 Intel Corporation. All Rights Reserved.

Intel, Pentium, Intel Xeon, Itanium, Intel XScale and VTune are registered trademarks or trademarks of Intel Corporation or its subsidiaries in the United States and other countries.

* Other names and brands may be claimed as the property of others.