Logo Search packages:      
Sourcecode: faust version File versions  Download package

scheduler.h

#include <stdlib.h>
#include <assert.h>
#include <pthread.h>
#include <stdio.h>
#include <errno.h>
#include <string.h>
#include <semaphore.h>
#include <sys/types.h>
#include <sys/stat.h>
#include <errno.h>
#include <fcntl.h>
#include <unistd.h>
#include <math.h>

using namespace std;

// Globals

#define THREAD_SIZE 64
#define QUEUE_SIZE 4096

#define WORK_STEALING_INDEX 0
#define LAST_TASK_INDEX 1


#ifdef __ICC
#define INLINE __forceinline
#else
#define INLINE inline
#endif


// On Intel set FZ (Flush to Zero) and DAZ (Denormals Are Zero)
// flags to avoid costly denormals
#ifdef __SSE__
#include <xmmintrin.h>
#ifdef __SSE2__
#define AVOIDDENORMALS _mm_setcsr(_mm_getcsr() | 0x8040)
#else
#define AVOIDDENORMALS _mm_setcsr(_mm_getcsr() | 0x8000)
#endif
#else
#define AVOIDDENORMALS 
#endif

#ifdef __linux__

// handle 32/64 bits int size issues
#ifdef __x86_64__
#define UInt32    unsigned int
#define UInt64    unsigned long int
#else
#define UInt32    unsigned int
#define UInt64    unsigned long long int
#endif

#endif

#ifdef __APPLE__
#include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacTypes.h>
#endif

class TaskQueue;
struct DSPThreadPool;

extern TaskQueue* gTaskQueueList[THREAD_SIZE];
extern DSPThreadPool* gThreadPool;
extern int gClientCount;
extern UInt64 gMaxStealing;
    
void Yield();

/**
 * Returns the number of clock cycles elapsed since the last reset
 * of the processor
 */
static INLINE UInt64 DSP_rdtsc(void)
{
      union {
            UInt32 i32[2];
            UInt64 i64;
      } count;
      
      __asm__ __volatile__("rdtsc" : "=a" (count.i32[0]), "=d" (count.i32[1]));
     return count.i64;
}

#if defined(__i386__) || defined(__x86_64__)

#define LOCK "lock ; "

static INLINE void NOP(void)
{
      __asm__ __volatile__("nop \n\t");
}

static INLINE char CAS1(volatile void* addr, volatile int value, int newvalue)
{
    register char ret;
    __asm__ __volatile__ (
                                      "# CAS \n\t"
                                      LOCK "cmpxchg %2, (%1) \n\t"
                                      "sete %0               \n\t"
                                      : "=a" (ret)
                                      : "c" (addr), "d" (newvalue), "a" (value)
                          : "memory"
                                      );
    return ret;
}

static INLINE int atomic_xadd(volatile int* atomic, int val) 
{ 
    register int result;
    __asm__ __volatile__ ("# atomic_xadd \n\t"
                          LOCK "xaddl %0,%1 \n\t"
                          : "=r" (result), "=m" (*atomic) 
                          : "0" (val), "m" (*atomic));
    return result;
} 

#endif


/*
static INLINE int INC_ATOMIC(volatile int* val)
{
    int actual;
    do {
        actual = *val;
    } while (!CAS1(val, actual, actual + 1));
    return actual;
}

static INLINE int DEC_ATOMIC(volatile int* val)
{
    int actual;
    do {
        actual = *val;
    } while (!CAS1(val, actual, actual - 1));
    return actual;
}
*/

static INLINE int INC_ATOMIC(volatile int* val)
{
    return atomic_xadd(val, 1);
}
 
static INLINE int DEC_ATOMIC(volatile int* val)
{
    return atomic_xadd(val, -1);
}
 
// To be used in lock-free queue
struct AtomicCounter
{
    union {
        struct {
            short fHead;      
            short fTail;      
        }
        scounter;
        int fValue;
    }info;
    
      INLINE AtomicCounter()
      {
        info.fValue = 0;
    }
     
      INLINE  AtomicCounter& operator=(AtomicCounter& obj)
    {
        info.fValue = obj.info.fValue;
        return *this;
    }
    
      INLINE  AtomicCounter& operator=(volatile AtomicCounter& obj)
      {
        info.fValue = obj.info.fValue;
        return *this;
    }
    
};

int get_max_cpu()
{
    return sysconf(_SC_NPROCESSORS_ONLN);
}

static int GetPID()
{
#ifdef WIN32
    return  _getpid();
#else
    return getpid();
#endif
}

#define Value(e) (e).info.fValue

#define Head(e) (e).info.scounter.fHead
#define IncHead(e) (e).info.scounter.fHead++
#define DecHead(e) (e).info.scounter.fHead--

#define Tail(e) (e).info.scounter.fTail
#define IncTail(e) (e).info.scounter.fTail++
#define DecTail(e) (e).info.scounter.fTail--

#define MASTER_THREAD 0

#define MAX_STEAL_DUR 50                    // in usec
#define DEFAULT_CLOCKSPERSEC 2500000000     // in cycles (2,5 Ghz)

class TaskQueue 
{
    private:
    
        int fTaskList[QUEUE_SIZE];
        volatile AtomicCounter fCounter;
        UInt64 fStealingStart;
     
    public:
  
        INLINE TaskQueue(int cur_thread)
        {
            for (int i = 0; i < QUEUE_SIZE; i++) {
                fTaskList[i] = -1;
            }
            gTaskQueueList[cur_thread] = this;  
            fStealingStart = 0;
        }
         
        INLINE void PushHead(int item)
        {
            fTaskList[Head(fCounter)] = item;
            IncHead(fCounter);
        }
        
        INLINE int PopHead()
        {
            AtomicCounter old_val;
            AtomicCounter new_val;
            
            do {
                old_val = fCounter;
                new_val = old_val;
                if (Head(old_val) == Tail(old_val)) {
                    return WORK_STEALING_INDEX;
                } else {
                    DecHead(new_val);
                }
            } while (!CAS1(&fCounter, Value(old_val), Value(new_val)));
            
            return fTaskList[Head(old_val) - 1];
        }
        
        INLINE int PopTail()
        {
            AtomicCounter old_val;
            AtomicCounter new_val;
            
            do {
                old_val = fCounter;
                new_val = old_val;
                if (Head(old_val) == Tail(old_val)) {
                   return WORK_STEALING_INDEX;
                } else {
                    IncTail(new_val);
                }
            } while (!CAS1(&fCounter, Value(old_val), Value(new_val)));
            
            return fTaskList[Tail(old_val)];
        }

            INLINE void MeasureStealingDur()
            {
            // Takes first timetamp
            if (fStealingStart == 0) {
                fStealingStart = DSP_rdtsc();
            } else if ((DSP_rdtsc() - fStealingStart) > gMaxStealing) {
                Yield();
            }
            }

            INLINE void ResetStealingDur()
            {
            fStealingStart = 0;
            }
        
        static INLINE int GetNextTask(int thread, int num_threads)
        {
            int tasknum;
            for (int i = 0; i < num_threads; i++) {
                if ((i != thread) && gTaskQueueList[i] && (tasknum = gTaskQueueList[i]->PopTail()) != WORK_STEALING_INDEX) {
                #ifdef __linux__
                              //if (thread != MASTER_THREAD)
                                    gTaskQueueList[thread]->ResetStealingDur();
                #endif
                    return tasknum;    // Task is found
                }
            }
            NOP();
          #ifdef __linux__
                  //if (thread != MASTER_THREAD)
                        gTaskQueueList[thread]->MeasureStealingDur();
        #endif
            return WORK_STEALING_INDEX;    // Otherwise will try "workstealing" again next cycle...
        }
        
        INLINE void InitTaskList(int task_list_size, int* task_list, int thread_num, int cur_thread, int& tasknum)
        {
            int task_slice = task_list_size / thread_num;
            int task_slice_rest = task_list_size % thread_num;

            if (task_slice == 0) {
                // Each thread directly executes one task
                tasknum = task_list[cur_thread];
                // Thread 0 takes remaining ready tasks 
                if (cur_thread == 0) { 
                    for (int index = 0; index < task_slice_rest - thread_num; index++) {
                        PushHead(task_list[task_slice_rest + index]);
                    }
                }
            } else {
                // Each thread takes a part of ready tasks
                int index;
                for (index = 0; index < task_slice - 1; index++) {
                    PushHead(task_list[cur_thread * task_slice + index]);
                }
                // Each thread directly executes one task 
                tasknum = task_list[cur_thread * task_slice + index];
                // Thread 0 takes remaining ready tasks 
                if (cur_thread == 0) {
                    for (index = 0; index < task_slice_rest; index++) {
                        PushHead(task_list[thread_num * task_slice + index]);
                    }
                }
            }
        }
        
        static INLINE void Init()
        {
            for (int i = 0; i < THREAD_SIZE; i++) {
                gTaskQueueList[i] = 0;
            }
        }
     
};

struct TaskGraph 
{
    volatile int gTaskList[QUEUE_SIZE];
    
    TaskGraph()
    {
        for (int i = 0; i < QUEUE_SIZE; i++) {
            gTaskList[i] = 0;
        } 
    }

    INLINE void InitTask(int task, int val)
    {
        gTaskList[task] = val;
    }
    
    void Display()
    {
        for (int i = 0; i < QUEUE_SIZE; i++) {
            printf("Task = %d activation = %d\n", i, gTaskList[i]);
        } 
    }
      
    INLINE void ActivateOutputTask(TaskQueue& queue, int task, int& tasknum)
    {
        if (DEC_ATOMIC(&gTaskList[task]) == 1) {
            if (tasknum == WORK_STEALING_INDEX) {
                tasknum = task;
            } else {
                queue.PushHead(task);
            }
        }    
    }
      
    INLINE void ActivateOutputTask(TaskQueue& queue, int task)
    {
        if (DEC_ATOMIC(&gTaskList[task]) == 1) {
            queue.PushHead(task);
        }
    }
    
    INLINE void ActivateOneOutputTask(TaskQueue& queue, int task, int& tasknum)
    {
        if (DEC_ATOMIC(&gTaskList[task]) == 1) {
            tasknum = task;
        } else {
            tasknum = queue.PopHead(); 
        }
    }
    
    INLINE void GetReadyTask(TaskQueue& queue, int& tasknum)
    {
        if (tasknum == WORK_STEALING_INDEX) {
            tasknum = queue.PopHead();
        }
    }
 
};


#define THREAD_POOL_SIZE 16
#define JACK_SCHED_POLICY SCHED_FIFO

/* use 512KB stack per thread - the default is way too high to be feasible
 * with mlockall() on many systems */
#define THREAD_STACK 524288


#ifdef __APPLE__

#include <CoreServices/../Frameworks/CarbonCore.framework/Headers/MacTypes.h>
#include <mach/thread_policy.h>
#include <mach/thread_act.h>

#define THREAD_SET_PRIORITY         0
#define THREAD_SCHEDULED_PRIORITY   1

static UInt32 GetThreadPriority(pthread_t thread, int inWhichPriority);

// returns the thread's priority as it was last set by the API
static UInt32 GetThreadSetPriority(pthread_t thread)
{
    return GetThreadPriority(thread, THREAD_SET_PRIORITY);
}

// returns the thread's priority as it was last scheduled by the Kernel
static UInt32 GetThreadScheduledPriority(pthread_t thread)
{
    return GetThreadPriority(thread, THREAD_SCHEDULED_PRIORITY);
}

static int SetThreadToPriority(pthread_t thread, UInt32 inPriority, Boolean inIsFixed, UInt64 period, UInt64 computation, UInt64 constraint)
{
    if (inPriority == 96) {
        // REAL-TIME / TIME-CONSTRAINT THREAD
        thread_time_constraint_policy_data_t theTCPolicy;
        theTCPolicy.period = period;
        theTCPolicy.computation = computation;
        theTCPolicy.constraint = constraint;
        theTCPolicy.preemptible = true;
        kern_return_t res = thread_policy_set(pthread_mach_thread_np(thread), THREAD_TIME_CONSTRAINT_POLICY, (thread_policy_t)&theTCPolicy, THREAD_TIME_CONSTRAINT_POLICY_COUNT);
        return (res == KERN_SUCCESS) ? 0 : -1;
    } else {
        // OTHER THREADS
        thread_extended_policy_data_t theFixedPolicy;
        thread_precedence_policy_data_t thePrecedencePolicy;
        SInt32 relativePriority;
        
        // [1] SET FIXED / NOT FIXED
        theFixedPolicy.timeshare = !inIsFixed;
        thread_policy_set(pthread_mach_thread_np(thread), THREAD_EXTENDED_POLICY, (thread_policy_t)&theFixedPolicy, THREAD_EXTENDED_POLICY_COUNT);
        
        // [2] SET PRECEDENCE
        // N.B.: We expect that if thread A created thread B, and the program wishes to change
        // the priority of thread B, then the call to change the priority of thread B must be
        // made by thread A.
        // This assumption allows us to use pthread_self() to correctly calculate the priority
        // of the feeder thread (since precedency policy's importance is relative to the
        // spawning thread's priority.)
        relativePriority = inPriority - GetThreadSetPriority(pthread_self());
        
        thePrecedencePolicy.importance = relativePriority;
        kern_return_t res = thread_policy_set(pthread_mach_thread_np(thread), THREAD_PRECEDENCE_POLICY, (thread_policy_t)&thePrecedencePolicy, THREAD_PRECEDENCE_POLICY_COUNT);
        return (res == KERN_SUCCESS) ? 0 : -1;
    }
}

static UInt32 GetThreadPriority(pthread_t thread, int inWhichPriority)
{
    thread_basic_info_data_t threadInfo;
    policy_info_data_t thePolicyInfo;
    unsigned int count;
    
    // get basic info
    count = THREAD_BASIC_INFO_COUNT;
    thread_info(pthread_mach_thread_np(thread), THREAD_BASIC_INFO, (thread_info_t)&threadInfo, &count);
    
    switch (threadInfo.policy) {
        case POLICY_TIMESHARE:
            count = POLICY_TIMESHARE_INFO_COUNT;
            thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_TIMESHARE_INFO, (thread_info_t)&(thePolicyInfo.ts), &count);
            if (inWhichPriority == THREAD_SCHEDULED_PRIORITY) {
                return thePolicyInfo.ts.cur_priority;
            } else {
                return thePolicyInfo.ts.base_priority;
            }
            break;
            
        case POLICY_FIFO:
            count = POLICY_FIFO_INFO_COUNT;
            thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_FIFO_INFO, (thread_info_t)&(thePolicyInfo.fifo), &count);
            if ((thePolicyInfo.fifo.depressed) && (inWhichPriority == THREAD_SCHEDULED_PRIORITY)) {
                return thePolicyInfo.fifo.depress_priority;
            }
            return thePolicyInfo.fifo.base_priority;
            break;
            
        case POLICY_RR:
            count = POLICY_RR_INFO_COUNT;
            thread_info(pthread_mach_thread_np(thread), THREAD_SCHED_RR_INFO, (thread_info_t)&(thePolicyInfo.rr), &count);
            if ((thePolicyInfo.rr.depressed) && (inWhichPriority == THREAD_SCHEDULED_PRIORITY)) {
                return thePolicyInfo.rr.depress_priority;
            }
            return thePolicyInfo.rr.base_priority;
            break;
    }
    
    return 0;
}

static int GetParams(pthread_t thread, UInt64* period, UInt64* computation, UInt64* constraint)
{
    thread_time_constraint_policy_data_t theTCPolicy;
    mach_msg_type_number_t count = THREAD_TIME_CONSTRAINT_POLICY_COUNT;
    boolean_t get_default = false;
    
    kern_return_t res = thread_policy_get(pthread_mach_thread_np(thread),
                                          THREAD_TIME_CONSTRAINT_POLICY,
                                          (thread_policy_t)&theTCPolicy,
                                          &count,
                                          &get_default);
    if (res == KERN_SUCCESS) {
        *period = theTCPolicy.period;
        *computation = theTCPolicy.computation;
        *constraint = theTCPolicy.constraint;
        return 0;
    } else {
        return -1;
    }
}

static UInt64 period = 0;
static UInt64 computation = 0;
static UInt64 constraint = 0;

INLINE void GetRealTime()
{
    if (period == 0) {
        GetParams(pthread_self(), &period, &computation, &constraint);
    }
}

INLINE void SetRealTime()
{
    SetThreadToPriority(pthread_self(), 96, true, period, computation, constraint);
}

void CancelThread(pthread_t fThread)
{
    mach_port_t machThread = pthread_mach_thread_np(fThread);
    thread_terminate(machThread);
}

INLINE void Yield()
{
    //sched_yield();
}

#endif

#ifdef __linux__

static int faust_sched_policy = -1;
static struct sched_param faust_rt_param; 

INLINE void GetRealTime()
{
    if (faust_sched_policy == -1) {
        memset(&faust_rt_param, 0, sizeof(faust_rt_param));
      pthread_getschedparam(pthread_self(), &faust_sched_policy, &faust_rt_param);
    }
}

INLINE void SetRealTime()
{
      faust_rt_param.sched_priority--;
    pthread_setschedparam(pthread_self(), faust_sched_policy, &faust_rt_param);
}

void CancelThread(pthread_t fThread)
{
    pthread_cancel(fThread);
    pthread_join(fThread, NULL);
}

INLINE void Yield()
{
    pthread_yield();
}


#endif

#define KDSPMESURE 50

static INLINE int Range(int min, int max, int val)
{
    if (val < min) {
        return min;
    } else if (val > max) {
        return max;
    } else {
        return val;
    }
}

struct Runnable {
    
    UInt64 fTiming[KDSPMESURE];
    UInt64 fStart;
    UInt64 fStop;
    int fCounter;
    float fOldMean;
    int fOldfDynamicNumThreads;
    bool fDynAdapt;
    
    virtual void computeThread(int cur_thread) = 0;
    
    Runnable():fCounter(0), fOldMean(1000000000.f), fOldfDynamicNumThreads(1)
    {
      memset(fTiming, 0, sizeof(long long int ) * KDSPMESURE);
        fDynAdapt = getenv("OMP_DYN_THREAD") ? strtol(getenv("OMP_DYN_THREAD"), NULL, 10) : false;
    }
    
    INLINE float ComputeMean()
    {
        float mean = 0;
        for (int i = 0; i < KDSPMESURE; i++) {
            mean += float(fTiming[i]);
        }
        mean /= float(KDSPMESURE);
        return mean;
    }
    
    INLINE void StartMeasure()
    {
        if (!fDynAdapt)
            return;
        
        fStart = DSP_rdtsc();
    }
     
    INLINE void StopMeasure(int staticthreadnum, int& dynthreadnum)
    {
        if (!fDynAdapt)
            return;
        
        fStop = DSP_rdtsc();
        fCounter = (fCounter + 1) % KDSPMESURE;
        if (fCounter == 0) {
            float mean = ComputeMean();
            if (fabs(mean - fOldMean) > 5000) {
                if (mean > fOldMean) { // Worse...
                    //printf("Worse %f %f\n", mean, fOldMean);
                    if (fOldfDynamicNumThreads > dynthreadnum) {
                        fOldfDynamicNumThreads = dynthreadnum;
                        dynthreadnum += 1;
                    } else {
                        fOldfDynamicNumThreads = dynthreadnum;
                        dynthreadnum -= 1;
                    }
                 } else { // Better...
                    //printf("Better %f %f\n", mean, fOldMean);
                    if (fOldfDynamicNumThreads > dynthreadnum) {
                        fOldfDynamicNumThreads = dynthreadnum;
                        dynthreadnum -= 1;
                    } else {
                        fOldfDynamicNumThreads = dynthreadnum;
                        dynthreadnum += 1;
                    }
                }
                fOldMean = mean;
                dynthreadnum = Range(1, staticthreadnum, dynthreadnum);
                //printf("dynthreadnum %d\n", dynthreadnum);
            }
        }
        fTiming[fCounter] = fStop - fStart; 
    }
};

struct DSPThread;

struct DSPThreadPool {
    
    DSPThread* fThreadPool[THREAD_POOL_SIZE];
    int fThreadCount; 
    volatile int fCurThreadCount;
      
    DSPThreadPool();
    ~DSPThreadPool();
    
    void StartAll(int num, bool realtime);
    void StopAll();
    void SignalAll(int num, Runnable* runnable);
    
    void SignalOne();
    bool IsFinished();
    
    static DSPThreadPool* Init();
    static void Destroy();
    
};

struct DSPThread {

    pthread_t fThread;
    DSPThreadPool* fThreadPool;
    Runnable* fRunnable;
    sem_t* fSemaphore;
    char fName[128];
    bool fRealTime;
    int fNum;
    
    DSPThread(int num, DSPThreadPool* pool)
    {
        fNum = num;
        fThreadPool = pool;
        fRunnable = NULL;
        fRealTime = false;
        
        sprintf(fName, "faust_sem_%d_%p", GetPID(), this);
        
        if ((fSemaphore = sem_open(fName, O_CREAT, 0777, 0)) == (sem_t*)SEM_FAILED) {
            printf("Allocate: can't check in named semaphore name = %s err = %s", fName, strerror(errno));
        }
    }

    virtual ~DSPThread()
    {
        sem_unlink(fName);
        sem_close(fSemaphore);
    }
    
    void Run()
    {
        while (sem_wait(fSemaphore) != 0) {}
        fRunnable->computeThread(fNum + 1);
        fThreadPool->SignalOne();
    }
    
    static void* ThreadHandler(void* arg)
    {
        DSPThread* thread = static_cast<DSPThread*>(arg);
        
        AVOIDDENORMALS;
        
        // One "dummy" cycle to setup thread
        if (thread->fRealTime) {
            thread->Run();
            SetRealTime();
        }
                  
        while (true) {
            thread->Run();
        }
        
        return NULL;
    }
    
    int Start(bool realtime)
    {
        pthread_attr_t attributes;
        struct sched_param rt_param;
        pthread_attr_init(&attributes);
        
        int priority = 60; // TODO
        int res;
        
        if (realtime) {
            fRealTime = true;
        }else {
            fRealTime = getenv("OMP_REALTIME") ? strtol(getenv("OMP_REALTIME"), NULL, 10) : true;
        }
                               
        if ((res = pthread_attr_setdetachstate(&attributes, PTHREAD_CREATE_JOINABLE))) {
            printf("Cannot request joinable thread creation for real-time thread res = %d err = %s\n", res, strerror(errno));
            return -1;
        }

        if ((res = pthread_attr_setscope(&attributes, PTHREAD_SCOPE_SYSTEM))) {
            printf("Cannot set scheduling scope for real-time thread res = %d err = %s\n", res, strerror(errno));
            return -1;
        }

        if (realtime) {
            
            if ((res = pthread_attr_setinheritsched(&attributes, PTHREAD_EXPLICIT_SCHED))) {
                printf("Cannot request explicit scheduling for RT thread res = %d err = %s\n", res, strerror(errno));
                return -1;
            }
        
            if ((res = pthread_attr_setschedpolicy(&attributes, JACK_SCHED_POLICY))) {
                printf("Cannot set RR scheduling class for RT thread res = %d err = %s\n", res, strerror(errno));
                return -1;
            }
            
            memset(&rt_param, 0, sizeof(rt_param));
            rt_param.sched_priority = priority;

            if ((res = pthread_attr_setschedparam(&attributes, &rt_param))) {
                printf("Cannot set scheduling priority for RT thread res = %d err = %s\n", res, strerror(errno));
                return -1;
            }

        } else {
            
            if ((res = pthread_attr_setinheritsched(&attributes, PTHREAD_INHERIT_SCHED))) {
                printf("Cannot request explicit scheduling for RT thread res = %d err = %s\n", res, strerror(errno));
                return -1;
            }
        }
     
        if ((res = pthread_attr_setstacksize(&attributes, THREAD_STACK))) {
            printf("Cannot set thread stack size res = %d err = %s\n", res, strerror(errno));
            return -1;
        }
        
        if ((res = pthread_create(&fThread, &attributes, ThreadHandler, this))) {
            printf("Cannot create thread res = %d err = %s\n", res, strerror(errno));
            return -1;
        }

        pthread_attr_destroy(&attributes);
        return 0;
    }
    
    void Signal(bool stop, Runnable* runnable)
    {
        fRunnable = runnable;
        sem_post(fSemaphore);
    }
    
    void Stop()
    {
        CancelThread(fThread);
    }

};

DSPThreadPool::DSPThreadPool()
{
    for (int i = 0; i < THREAD_POOL_SIZE; i++) {
        fThreadPool[i] = NULL;
    }
    fThreadCount = 0;
    fCurThreadCount = 0;
}

DSPThreadPool::~DSPThreadPool()
{
    StopAll();
    
    for (int i = 0; i < fThreadCount; i++) {
        delete(fThreadPool[i]);
        fThreadPool[i] = NULL;
    }
    
    fThreadCount = 0;
 }

void DSPThreadPool::StartAll(int num, bool realtime)
{
    if (fThreadCount == 0) {  // Protection for multiple call...  (like LADSPA plug-ins in Ardour)
        for (int i = 0; i < num; i++) {
            fThreadPool[i] = new DSPThread(i, this);
            fThreadPool[i]->Start(realtime);
            fThreadCount++;
        }
    }
}

void DSPThreadPool::StopAll()
{
    for (int i = 0; i < fThreadCount; i++) {
        fThreadPool[i]->Stop();
    }
}

void DSPThreadPool::SignalAll(int num, Runnable* runnable)
{
    fCurThreadCount = num;
        
    for (int i = 0; i < num; i++) {  // Important : use local num here...
        fThreadPool[i]->Signal(false, runnable);
    }
}

void DSPThreadPool::SignalOne()
{
    DEC_ATOMIC(&fCurThreadCount);
}

bool DSPThreadPool::IsFinished()
{
    return (fCurThreadCount == 0);
}

DSPThreadPool* DSPThreadPool::Init()
{
    if (gClientCount++ == 0 && !gThreadPool) {
        gThreadPool = new DSPThreadPool();
    }
    return gThreadPool;
}

void DSPThreadPool::Destroy()
{
    if (--gClientCount == 0 && gThreadPool) {
        delete gThreadPool;
        gThreadPool = NULL;
    }
}

#ifndef PLUG_IN

// Globals
TaskQueue* gTaskQueueList[THREAD_SIZE] = {0};

DSPThreadPool* gThreadPool = 0;
int gClientCount = 0;

int clock_per_microsec = (getenv("CLOCKSPERSEC") 
                ? strtoll(getenv("CLOCKSPERSEC"), NULL, 10) 
                : DEFAULT_CLOCKSPERSEC) / 1000000;
                
UInt64  gMaxStealing = getenv("OMP_STEALING_DUR") 
                ? strtoll(getenv("OMP_STEALING_DUR"), NULL, 10) * clock_per_microsec 
                : MAX_STEAL_DUR * clock_per_microsec;

#endif


Generated by  Doxygen 1.6.0   Back to index