Logo Search packages:      
Sourcecode: tbb version File versions  Download package

template<typename SchedulerTraits>
void tbb::internal::CustomScheduler< SchedulerTraits >::wait_for_all ( task parent,
task child 
) [inline, private]

Scheduler loop that dispatches tasks.

If child is non-NULL, it is dispatched first. Then, until "parent" has a reference count of 1, other task are dispatched or stolen.

Definition at line 1423 of file task.cpp.

References tbb::task::allocated, tbb::internal::GenericScheduler::arena_slot, tbb::internal::GenericScheduler::deepest, tbb::internal::GenericScheduler::destroy_task(), tbb::internal::GenericScheduler::dummy_slot, tbb::internal::GenericScheduler::dummy_task, tbb::task::execute(), tbb::task::executing, tbb::internal::FastRandom::get(), tbb::internal::GenericScheduler::get_task(), tbb::internal::GenericScheduler::innermost_running_task, tbb::internal::GenericScheduler::is_worker(), tbb::internal::GenericScheduler::leave_arena(), tbb::task::parent(), tbb::task::prefix(), tbb::internal::GenericScheduler::random, tbb::task::ready, tbb::task::recycle, tbb::task::reexecute, tbb::task::ref_count(), tbb::task::state(), and tbb::internal::GenericScheduler::steal_task().

                                                                               {
    TBB_TRACE(("%p.wait_for_all(parent=%p,child=%p) enter\n", this, &parent, child));
#if TBB_DO_ASSERT
    __TBB_ASSERT( assert_okay(), NULL );
    if( child ) {
        __TBB_ASSERT( child->prefix().owner==this, NULL );
        __TBB_ASSERT( parent.ref_count()>=2, "ref_count must be at least 2" );
    } else {
        __TBB_ASSERT( parent.ref_count()>=1, "ref_count must be at least 1" );
    }
    __TBB_ASSERT( assert_okay(), NULL );
#endif /* TBB_DO_ASSERT */
    task* t = child;
    depth_type d;
    if( innermost_running_task==dummy_task ) {
        // We are in the innermost task dispatch loop of a master thread.
        __TBB_ASSERT( !is_worker(), NULL );
        // Forcefully make this loop operating on zero depth.
        d = 0;
    } else {
        d = parent.prefix().depth+1;
    }
    task* old_innermost_running_task = innermost_running_task;
    // Outer loop steals tasks when necesssary.
    for(;;) {
        // Middle loop evaluates tasks that are pulled off "array".
        do {
            // Inner loop evaluates tasks that are handed directly to us by other tasks.
            while(t) {
                task_prefix& pref = t->prefix();
                __TBB_ASSERT( pref.owner==this, NULL );
                __TBB_ASSERT( pref.depth>=d, NULL );
                __TBB_ASSERT( 1L<<t->state() & (1L<<task::allocated|1L<<task::ready|1L<<task::reexecute), NULL );
                pref.state = task::executing;
                innermost_running_task = t;
                __TBB_ASSERT(assert_okay(),NULL);
                TBB_TRACE(("%p.wait_for_all: %p.execute\n",this,t));
                GATHER_STATISTIC( ++execute_count );
                task* t_next = t->execute();
                if( t_next ) {
                    __TBB_ASSERT( t_next->state()==task::allocated,
                                "if task::execute() returns task, it must be marked as allocated" );
                    // The store here has a subtle secondary effect - it fetches *t_next into cache.
                    t_next->prefix().owner = this;
                }
                __TBB_ASSERT(assert_okay(),NULL);
                switch( task::state_type(t->prefix().state) ) {
                    case task::executing:
                        // this block was copied below to case task::recycle
                        // when making changes, check it too
                        if( task* s = t->parent() ) {
                            if( SchedulerTraits::itt_possible )
                                ITT_NOTIFY(sync_releasing, &s->prefix().ref_count);
                            if( SchedulerTraits::has_slow_atomic && s->prefix().ref_count==1 ? (s->prefix().ref_count=0, true) : __TBB_FetchAndDecrementWrelease((volatile void *)&(s->prefix().ref_count))==1 ) {
                                if( SchedulerTraits::itt_possible )
                                    ITT_NOTIFY(sync_acquired, &s->prefix().ref_count);
#if TBB_DO_ASSERT
                                s->prefix().debug_state &= ~ds_ref_count_active;
#endif /* TBB_DO_ASSERT */
                                s->prefix().owner = this;
                                depth_type s_depth = s->prefix().depth;
                                if( !t_next && s_depth>=deepest && s_depth>=d ) {
                                    // Eliminate spawn/get_task pair.
                                    // The elimination is valid because the spawn would set deepest==s_depth,
                                    // and the subsequent call to get_task(d) would grab task s and
                                    // restore deepest to its former value.
                                    t_next = s;
                                } else {
                                    CustomScheduler<SchedulerTraits>::spawn(*s, s->prefix().next );
                                    __TBB_ASSERT(assert_okay(),NULL);
                                }
                            }
                        }
                        destroy_task( *t );
                        break;

                    case task::recycle: { // state set by recycle_as_safe_continuation()
                        t->prefix().state = task::allocated;
                        // for safe continuation, need atomically decrement ref_count;
                        // the block was copied from above case task::executing, and changed.
                        task*& s = t;     // s is an alias to t, in order to make less changes
                        if( SchedulerTraits::itt_possible )
                            ITT_NOTIFY(sync_releasing, &s->prefix().ref_count);
                        if( SchedulerTraits::has_slow_atomic && s->prefix().ref_count==1 ? (s->prefix().ref_count=0, true) : __TBB_FetchAndDecrementWrelease((volatile void *)&(s->prefix().ref_count))==1 ) {
                            if( SchedulerTraits::itt_possible )
                                ITT_NOTIFY(sync_acquired, &s->prefix().ref_count);
#if TBB_DO_ASSERT
                            s->prefix().debug_state &= ~ds_ref_count_active;
                            __TBB_ASSERT( s->prefix().owner==this, "ownership corrupt?" );
                            __TBB_ASSERT( s->prefix().depth>=d, NULL );
#endif /* TBB_DO_ASSERT */
                            if( !t_next ) {
                                t_next = s;
                            } else {
                                CustomScheduler<SchedulerTraits>::spawn(*s, s->prefix().next );
                                __TBB_ASSERT(assert_okay(),NULL);
                            }
                        }
                        } break;

                    case task::reexecute: // set by recycle_to_reexecute()
                        __TBB_ASSERT( t_next, "reexecution requires that method 'execute' return a task" );
                        TBB_TRACE(("%p.wait_for_all: put task %p back into array",this,t));
                        t->prefix().state = task::allocated;
                        CustomScheduler<SchedulerTraits>::spawn( *t, t->prefix().next );
                        __TBB_ASSERT(assert_okay(),NULL);
                        break;
#if TBB_DO_ASSERT
                    case task::allocated:
                        break;
                    case task::ready:
                        __TBB_ASSERT( false, "task is in READY state upon return from method execute()" );
                        break;
                    default:
                        __TBB_ASSERT( false, "illegal state" );
#else
                    default: // just to shut up some compilation warnings
                        break;
#endif /* TBB_DO_ASSERT */
                }
                __TBB_ASSERT( !t_next||t_next->prefix().depth>=d, NULL );
                t = t_next;
            }
            __TBB_ASSERT(assert_okay(),NULL);


            t = get_task( d );
#if TBB_DO_ASSERT
            __TBB_ASSERT(assert_okay(),NULL);
            if(t) {
                AssertOkay(*t);
                __TBB_ASSERT( t->prefix().owner==this, "thread got task that it does not own" );
            }
#endif /* TBB_DO_ASSERT */
        } while( t );
        __TBB_ASSERT( arena->prefix().number_of_workers>0||parent.prefix().ref_count==1,
                    "deadlock detected" );
        // The state "failure_count==-1" is used only when itt_possible is true,
        // and denotes that a sync_prepare has not yet been issued.
        for( int failure_count = -static_cast<int>(SchedulerTraits::itt_possible);; ++failure_count) {
            if( parent.prefix().ref_count==1 ) {
                if( SchedulerTraits::itt_possible ) {
                    if( failure_count!=-1 ) {
                        ITT_NOTIFY(sync_prepare, &parent.prefix().ref_count);
                        // Notify Intel(R) Thread Profiler that thread has stopped spinning.
                        ITT_NOTIFY(sync_acquired, this);
                    }
                    ITT_NOTIFY(sync_acquired, &parent.prefix().ref_count);
                }
                goto done;
            }
            // Try to steal a task from a random victim.
            size_t n = arena->prefix().limit;
            if( n>1 ) {
                size_t k = random.get() % (n-1);
                ArenaSlot* victim = &arena->slot[k];
                if( victim>=arena_slot )
                    ++victim;               // Adjusts random distribution to exclude self
                t = steal_task( *victim, d );
                if( t ) {
                    __TBB_ASSERT( t->prefix().depth>=d, NULL );
                    if( SchedulerTraits::itt_possible ) {
                        if( failure_count!=-1 ) {
                            ITT_NOTIFY(sync_prepare, victim);
                            // Notify Intel(R) Thread Profiler that thread has stopped spinning.
                            ITT_NOTIFY(sync_acquired, this);
                            ITT_NOTIFY(sync_acquired, victim);
                        }
                    }
                    __TBB_ASSERT(t,NULL);
                    break;
                }
            }
            if( SchedulerTraits::itt_possible && failure_count==-1 ) {
                // The first attempt to steal work failed, so notify Intel(R) Thread Profiler that
                // the thread has started spinning.  Ideally, we would do this notification
                // *before* the first failed attempt to steal, but at that point we do not
                // know that the steal will fail.
                ITT_NOTIFY(sync_prepare, this);
                failure_count = 0;
            }
            // Pause, even if we are going to yield, because the yield might return immediately.
            __TBB_Pause(PauseTime);
            int yield_threshold = int(n);
            if( failure_count>=yield_threshold ) {
                if( failure_count>=2*yield_threshold ) {
                    __TBB_Yield();
#if IMPROVED_GATING
                    // Note: if d!=0 or !is_worker(), it is not safe to wait for a non-empty pool,
                    // because of the polling of parent.prefix().ref_count.
                    if( d==0 && is_worker() ) 
                        wait_while_pool_is_empty();
#else
                    arena->prefix().gate.wait();
#endif /* IMPROVED_GATING */
                    failure_count = 0;
                } else if( failure_count==yield_threshold ) {
                    // We have paused n times since last yield.
                    // Odds are that there is no other work to do.
                    __TBB_Yield();
                }
            }
        }
        __TBB_ASSERT(t,NULL);
        GATHER_STATISTIC( ++steal_count );
        t->prefix().owner = this;
    }
done:
    parent.prefix().ref_count = 0;
#if TBB_DO_ASSERT
    parent.prefix().debug_state &= ~ds_ref_count_active;
#endif /* TBB_DO_ASSERT */
    innermost_running_task = old_innermost_running_task;
    if( deepest<0 && innermost_running_task==dummy_task && arena_slot!=&dummy_slot ) {
        leave_arena(/*compress=*/true);
    }
    __TBB_ASSERT( assert_okay(), NULL );
    TBB_TRACE(("%p.wait_for_all(parent=%p): return\n",this,&parent));
}


Generated by  Doxygen 1.6.0   Back to index