29 :
Thread (
"Pool", stackSize), pool (p)
36 if (! pool.runNextJob (*
this))
40 std::atomic<ThreadPoolJob*> currentJob {
nullptr };
55 jassert (pool ==
nullptr || ! pool->
contains (
this));
76 listeners.add (listener);
81 listeners.remove (listener);
87 return t->currentJob.load();
95 jassert (numThreads > 0);
97 createThreads (numThreads, threadStackSize);
107 removeAllJobs (
true, 5000);
111 void ThreadPool::createThreads (
int numThreads,
size_t threadStackSize)
113 for (
int i = jmax (1, numThreads); --i >= 0;)
116 for (
auto* t : threads)
120 void ThreadPool::stopThreads()
122 for (
auto* t : threads)
123 t->signalThreadShouldExit();
125 for (
auto* t : threads)
131 jassert (job !=
nullptr);
132 jassert (job->pool ==
nullptr);
134 if (job->pool ==
nullptr)
137 job->shouldStop =
false;
138 job->isActive =
false;
139 job->shouldBeDeleted = deleteJobWhenFinished;
146 for (
auto* t : threads)
158 std::function<ThreadPoolJob::JobStatus()> job;
161 addJob (
new LambdaJobWrapper (jobToRun),
true);
168 LambdaJobWrapper (std::function<
void()> j) :
ThreadPoolJob (
"lambda"), job (j) {}
171 std::function<void()> job;
174 addJob (
new LambdaJobWrapper (jobToRun),
true);
185 return threads.size();
197 return jobs.contains (const_cast<ThreadPoolJob*> (job));
203 return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
210 auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
212 if (index > 0 && ! job->isActive)
213 jobs.move (index, 0);
222 while (contains (job))
227 jobFinishedSignal.wait (2);
236 bool dontWait =
true;
243 if (jobs.contains (job))
247 if (interruptIfRunning)
254 jobs.removeFirstMatchingValue (job);
255 addToDeleteList (deletionList, job);
260 return dontWait || waitForJobToFinish (job, timeOutMs);
274 for (
int i = jobs.size(); --i >= 0;)
276 auto* job = jobs.getUnchecked(i);
278 if (selectedJobsToRemove ==
nullptr || selectedJobsToRemove->
isJobSuitable (job))
282 jobsToWaitFor.
add (job);
284 if (interruptRunningJobs)
285 job->signalJobShouldExit();
290 addToDeleteList (deletionList, job);
301 for (
int i = jobsToWaitFor.
size(); --i >= 0;)
305 if (! isJobRunning (job))
309 if (jobsToWaitFor.
size() == 0)
315 jobFinishedSignal.wait (20);
326 for (
auto* job : jobs)
327 if (job->isActive || ! onlyReturnActiveJobs)
328 s.
add (job->getJobName());
337 for (
auto* t : threads)
338 if (! t->setPriority (newPriority))
351 for (
int i = 0; i < jobs.size(); ++i)
353 if (
auto* job = jobs[i])
360 addToDeleteList (deletionList, job);
365 job->isActive =
true;
377 if (
auto* job = pickNextJobToRun())
380 thread.currentJob = job;
384 result = job->runJob();
391 thread.currentJob =
nullptr;
398 if (jobs.contains (job))
400 job->isActive =
false;
404 jobs.removeFirstMatchingValue (job);
405 addToDeleteList (deletionList, job);
407 jobFinishedSignal.signal();
412 jobs.move (jobs.indexOf (job), -1);
425 job->shouldStop =
true;
428 if (job->shouldBeDeleted)
429 deletionList.
add (job);
int getNumJobs() const noexcept
Returns the number of jobs currently running or queued.
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
Waits until a job has finished running and has been removed from the pool.
void removeListener(Thread::Listener *)
Removes a listener added with addListener.
int getNumThreads() const noexcept
Returns the number of threads assigned to this thread pool.
Thread(const String &threadName, size_t threadStackSize=0)
Creates a thread.
void add(const ElementType &newElement)
Appends a new element at the end of the array.
A callback class used when you need to select which ThreadPoolJob objects are suitable for some kind ...
virtual bool isJobSuitable(ThreadPoolJob *job)=0
Should return true if the specified thread matches your criteria for whatever operation that this obj...
A special array for holding a list of strings.
virtual ~ThreadPoolJob()
Destructor.
ElementType getUnchecked(int index) const
Returns one of the elements in the array, without checking the index passed in.
virtual JobStatus runJob()=0
Performs the actual work that this job needs to do.
void run() override
Must be implemented to perform the thread's actual code.
static ThreadPoolJob * getCurrentThreadPoolJob()
If the calling thread is being invoked inside a runJob() method, this will return the ThreadPoolJob t...
bool isJobRunning(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently being run by a thread.
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
Tries to remove all jobs from the pool.
JobStatus
These are the values that can be returned by the runJob() method.
Used to receive callbacks for thread exit calls.
bool threadShouldExit() const
Checks whether the thread has been told to stop running.
ThreadPoolJob(const String &name)
Creates a thread pool job object.
void addListener(Thread::Listener *)
Add a listener to this thread job which will receive a callback when signalJobShouldExit was called o...
Holds a resizable array of primitive or copy-by-value objects.
int size() const noexcept
Returns the current number of elements in the array.
virtual void exitSignalSent()=0
Called if Thread::signalThreadShouldExit was called.
indicates that the job has finished and can be removed from the pool.
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
If the given job is in the queue, this will move it to the front so that it is the next one to be exe...
ObjectClass * add(ObjectClass *newObject)
Appends a new object to the end of the array.
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
Returns a list of the names of all the jobs currently running or queued.
bool setThreadPriorities(int newPriority)
Changes the priority of all the threads.
An array designed for holding objects.
String getJobName() const
Returns the name of this job.
A task that is executed by a ThreadPool object.
void signalJobShouldExit()
Calling this will cause the shouldExit() method to return true, and the job should (if it's been impl...
static Thread *JUCE_CALLTYPE getCurrentThread()
Finds the thread object that is currently running.
ThreadPool()
Creates a thread pool with one thread per CPU core.
void setJobName(const String &newName)
Changes the job's name.
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
Adds a job to the queue.
A set of threads that will run a list of jobs.
Automatically locks and unlocks a mutex object.
ThreadPoolJob * getJob(int index) const noexcept
Returns one of the jobs in the queue.
void remove(int indexToRemove)
Removes an element from the array.
bool wait(int timeOutMilliseconds) const
Suspends the execution of this thread until either the specified timeout period has elapsed...
indicates that the job would like to be called again when a thread is free.
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
Tries to remove a job from the pool.
static int getNumCpus() noexcept
Returns the number of logical CPU cores.
void add(String stringToAdd)
Appends a string at the end of the array.
bool contains(const ThreadPoolJob *job) const noexcept
Returns true if the given job is currently queued or running.
static uint32 getMillisecondCounter() noexcept
Returns the number of millisecs since a fixed event (usually system startup).