OpenShot Audio Library | OpenShotAudio  0.6.0
juce_ThreadPool.cpp
1 /*
2  ==============================================================================
3 
4  This file is part of the JUCE library.
5  Copyright (c) 2022 - Raw Material Software Limited
6 
7  JUCE is an open source library subject to commercial or open-source
8  licensing.
9 
10  The code included in this file is provided under the terms of the ISC license
11  http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12  To use, copy, modify, and/or distribute this software for any purpose with or
13  without fee is hereby granted provided that the above copyright notice and
14  this permission notice appear in all copies.
15 
16  JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17  EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18  DISCLAIMED.
19 
20  ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
26 struct ThreadPool::ThreadPoolThread final : public Thread
27 {
28  ThreadPoolThread (ThreadPool& p, const Options& options)
29  : Thread { options.threadName, options.threadStackSizeBytes },
30  pool { p }
31  {
32  }
33 
34  void run() override
35  {
36  while (! threadShouldExit())
37  {
38  if (! pool.runNextJob (*this))
39  wait (500);
40  }
41  }
42 
43  std::atomic<ThreadPoolJob*> currentJob { nullptr };
44 
45  ThreadPool& pool;
46 
47  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ThreadPoolThread)
48 };
49 
50 //==============================================================================
51 ThreadPoolJob::ThreadPoolJob (const String& name) : jobName (name)
52 {
53 }
54 
56 {
57  // you mustn't delete a job while it's still in a pool! Use ThreadPool::removeJob()
58  // to remove it first!
59  jassert (pool == nullptr || ! pool->contains (this));
60 }
61 
63 {
64  return jobName;
65 }
66 
67 void ThreadPoolJob::setJobName (const String& newName)
68 {
69  jobName = newName;
70 }
71 
73 {
74  shouldStop = true;
75  listeners.call ([] (Thread::Listener& l) { l.exitSignalSent(); });
76 }
77 
79 {
80  listeners.add (listener);
81 }
82 
84 {
85  listeners.remove (listener);
86 }
87 
89 {
90  if (auto* t = dynamic_cast<ThreadPool::ThreadPoolThread*> (Thread::getCurrentThread()))
91  return t->currentJob.load();
92 
93  return nullptr;
94 }
95 
96 //==============================================================================
98 {
99  // not much point having a pool without any threads!
100  jassert (options.numberOfThreads > 0);
101 
102  for (int i = jmax (1, options.numberOfThreads); --i >= 0;)
103  threads.add (new ThreadPoolThread (*this, options));
104 
105  for (auto* t : threads)
106  t->startThread (options.desiredThreadPriority);
107 }
108 
109 ThreadPool::ThreadPool (int numberOfThreads,
110  size_t threadStackSizeBytes,
111  Thread::Priority desiredThreadPriority)
112  : ThreadPool { Options{}.withNumberOfThreads (numberOfThreads)
113  .withThreadStackSizeBytes (threadStackSizeBytes)
114  .withDesiredThreadPriority (desiredThreadPriority) }
115 {
116 }
117 
119 {
120  removeAllJobs (true, 5000);
121  stopThreads();
122 }
123 
124 void ThreadPool::stopThreads()
125 {
126  for (auto* t : threads)
127  t->signalThreadShouldExit();
128 
129  for (auto* t : threads)
130  t->stopThread (500);
131 }
132 
133 void ThreadPool::addJob (ThreadPoolJob* job, bool deleteJobWhenFinished)
134 {
135  jassert (job != nullptr);
136  jassert (job->pool == nullptr);
137 
138  if (job->pool == nullptr)
139  {
140  job->pool = this;
141  job->shouldStop = false;
142  job->isActive = false;
143  job->shouldBeDeleted = deleteJobWhenFinished;
144 
145  {
146  const ScopedLock sl (lock);
147  jobs.add (job);
148  }
149 
150  for (auto* t : threads)
151  t->notify();
152  }
153 }
154 
155 void ThreadPool::addJob (std::function<ThreadPoolJob::JobStatus()> jobToRun)
156 {
157  struct LambdaJobWrapper final : public ThreadPoolJob
158  {
159  LambdaJobWrapper (std::function<ThreadPoolJob::JobStatus()> j) : ThreadPoolJob ("lambda"), job (j) {}
160  JobStatus runJob() override { return job(); }
161 
162  std::function<ThreadPoolJob::JobStatus()> job;
163  };
164 
165  addJob (new LambdaJobWrapper (jobToRun), true);
166 }
167 
168 void ThreadPool::addJob (std::function<void()> jobToRun)
169 {
170  struct LambdaJobWrapper final : public ThreadPoolJob
171  {
172  LambdaJobWrapper (std::function<void()> j) : ThreadPoolJob ("lambda"), job (std::move (j)) {}
173  JobStatus runJob() override { job(); return ThreadPoolJob::jobHasFinished; }
174 
175  std::function<void()> job;
176  };
177 
178  addJob (new LambdaJobWrapper (std::move (jobToRun)), true);
179 }
180 
181 int ThreadPool::getNumJobs() const noexcept
182 {
183  const ScopedLock sl (lock);
184  return jobs.size();
185 }
186 
187 int ThreadPool::getNumThreads() const noexcept
188 {
189  return threads.size();
190 }
191 
192 ThreadPoolJob* ThreadPool::getJob (int index) const noexcept
193 {
194  const ScopedLock sl (lock);
195  return jobs [index];
196 }
197 
198 bool ThreadPool::contains (const ThreadPoolJob* job) const noexcept
199 {
200  const ScopedLock sl (lock);
201  return jobs.contains (const_cast<ThreadPoolJob*> (job));
202 }
203 
204 bool ThreadPool::isJobRunning (const ThreadPoolJob* job) const noexcept
205 {
206  const ScopedLock sl (lock);
207  return jobs.contains (const_cast<ThreadPoolJob*> (job)) && job->isActive;
208 }
209 
210 void ThreadPool::moveJobToFront (const ThreadPoolJob* job) noexcept
211 {
212  const ScopedLock sl (lock);
213 
214  auto index = jobs.indexOf (const_cast<ThreadPoolJob*> (job));
215 
216  if (index > 0 && ! job->isActive)
217  jobs.move (index, 0);
218 }
219 
220 bool ThreadPool::waitForJobToFinish (const ThreadPoolJob* job, int timeOutMs) const
221 {
222  if (job != nullptr)
223  {
224  auto start = Time::getMillisecondCounter();
225 
226  while (contains (job))
227  {
228  if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
229  return false;
230 
231  jobFinishedSignal.wait (2);
232  }
233  }
234 
235  return true;
236 }
237 
238 bool ThreadPool::removeJob (ThreadPoolJob* job, bool interruptIfRunning, int timeOutMs)
239 {
240  bool dontWait = true;
241  OwnedArray<ThreadPoolJob> deletionList;
242 
243  if (job != nullptr)
244  {
245  const ScopedLock sl (lock);
246 
247  if (jobs.contains (job))
248  {
249  if (job->isActive)
250  {
251  if (interruptIfRunning)
252  job->signalJobShouldExit();
253 
254  dontWait = false;
255  }
256  else
257  {
258  jobs.removeFirstMatchingValue (job);
259  addToDeleteList (deletionList, job);
260  }
261  }
262  }
263 
264  return dontWait || waitForJobToFinish (job, timeOutMs);
265 }
266 
267 bool ThreadPool::removeAllJobs (bool interruptRunningJobs, int timeOutMs,
268  ThreadPool::JobSelector* selectedJobsToRemove)
269 {
270  Array<ThreadPoolJob*> jobsToWaitFor;
271 
272  {
273  OwnedArray<ThreadPoolJob> deletionList;
274 
275  {
276  const ScopedLock sl (lock);
277 
278  for (int i = jobs.size(); --i >= 0;)
279  {
280  auto* job = jobs.getUnchecked (i);
281 
282  if (selectedJobsToRemove == nullptr || selectedJobsToRemove->isJobSuitable (job))
283  {
284  if (job->isActive)
285  {
286  jobsToWaitFor.add (job);
287 
288  if (interruptRunningJobs)
289  job->signalJobShouldExit();
290  }
291  else
292  {
293  jobs.remove (i);
294  addToDeleteList (deletionList, job);
295  }
296  }
297  }
298  }
299  }
300 
301  auto start = Time::getMillisecondCounter();
302 
303  for (;;)
304  {
305  for (int i = jobsToWaitFor.size(); --i >= 0;)
306  {
307  auto* job = jobsToWaitFor.getUnchecked (i);
308 
309  if (! isJobRunning (job))
310  jobsToWaitFor.remove (i);
311  }
312 
313  if (jobsToWaitFor.size() == 0)
314  break;
315 
316  if (timeOutMs >= 0 && Time::getMillisecondCounter() >= start + (uint32) timeOutMs)
317  return false;
318 
319  jobFinishedSignal.wait (20);
320  }
321 
322  return true;
323 }
324 
325 StringArray ThreadPool::getNamesOfAllJobs (bool onlyReturnActiveJobs) const
326 {
327  StringArray s;
328  const ScopedLock sl (lock);
329 
330  for (auto* job : jobs)
331  if (job->isActive || ! onlyReturnActiveJobs)
332  s.add (job->getJobName());
333 
334  return s;
335 }
336 
337 ThreadPoolJob* ThreadPool::pickNextJobToRun()
338 {
339  OwnedArray<ThreadPoolJob> deletionList;
340 
341  {
342  const ScopedLock sl (lock);
343 
344  for (int i = 0; i < jobs.size(); ++i)
345  {
346  if (auto* job = jobs[i])
347  {
348  if (! job->isActive)
349  {
350  if (job->shouldStop)
351  {
352  jobs.remove (i);
353  addToDeleteList (deletionList, job);
354  --i;
355  continue;
356  }
357 
358  job->isActive = true;
359  return job;
360  }
361  }
362  }
363  }
364 
365  return nullptr;
366 }
367 
368 bool ThreadPool::runNextJob (ThreadPoolThread& thread)
369 {
370  if (auto* job = pickNextJobToRun())
371  {
372  auto result = ThreadPoolJob::jobHasFinished;
373  thread.currentJob = job;
374 
375  try
376  {
377  result = job->runJob();
378  }
379  catch (...)
380  {
381  jassertfalse; // Your runJob() method mustn't throw any exceptions!
382  }
383 
384  thread.currentJob = nullptr;
385 
386  OwnedArray<ThreadPoolJob> deletionList;
387 
388  {
389  const ScopedLock sl (lock);
390 
391  if (jobs.contains (job))
392  {
393  job->isActive = false;
394 
395  if (result != ThreadPoolJob::jobNeedsRunningAgain || job->shouldStop)
396  {
397  jobs.removeFirstMatchingValue (job);
398  addToDeleteList (deletionList, job);
399 
400  jobFinishedSignal.signal();
401  }
402  else
403  {
404  // move the job to the end of the queue if it wants another go
405  jobs.move (jobs.indexOf (job), -1);
406  }
407  }
408  }
409 
410  return true;
411  }
412 
413  return false;
414 }
415 
416 void ThreadPool::addToDeleteList (OwnedArray<ThreadPoolJob>& deletionList, ThreadPoolJob* job) const
417 {
418  job->shouldStop = true;
419  job->pool = nullptr;
420 
421  if (job->shouldBeDeleted)
422  deletionList.add (job);
423 }
424 
425 } // namespace juce
ElementType getUnchecked(int index) const
Definition: juce_Array.h:252
int size() const noexcept
Definition: juce_Array.h:215
void remove(int indexToRemove)
Definition: juce_Array.h:742
void add(const ElementType &newElement)
Definition: juce_Array.h:418
ObjectClass * add(ObjectClass *newObject)
void add(String stringToAdd)
void setJobName(const String &newName)
String getJobName() const
void addListener(Thread::Listener *)
static ThreadPoolJob * getCurrentThreadPoolJob()
void removeListener(Thread::Listener *)
ThreadPoolJob(const String &name)
virtual bool isJobSuitable(ThreadPoolJob *job)=0
void moveJobToFront(const ThreadPoolJob *jobToMove) noexcept
void addJob(ThreadPoolJob *job, bool deleteJobWhenFinished)
int getNumThreads() const noexcept
ThreadPoolJob * getJob(int index) const noexcept
int getNumJobs() const noexcept
bool removeAllJobs(bool interruptRunningJobs, int timeOutMilliseconds, JobSelector *selectedJobsToRemove=nullptr)
StringArray getNamesOfAllJobs(bool onlyReturnActiveJobs) const
bool isJobRunning(const ThreadPoolJob *job) const noexcept
bool removeJob(ThreadPoolJob *job, bool interruptIfRunning, int timeOutMilliseconds)
bool contains(const ThreadPoolJob *job) const noexcept
bool waitForJobToFinish(const ThreadPoolJob *job, int timeOutMilliseconds) const
virtual void exitSignalSent()=0
static Thread *JUCE_CALLTYPE getCurrentThread()
bool wait(double timeOutMilliseconds) const
Thread(const String &threadName, size_t threadStackSize=osDefaultStackSize)
Definition: juce_Thread.cpp:26
bool threadShouldExit() const
static uint32 getMillisecondCounter() noexcept
Definition: juce_Time.cpp:241
bool wait(double timeOutMilliseconds=-1.0) const