OpenShot Audio Library | OpenShotAudio  0.6.0
juce_ConnectedChildProcess.cpp
1 /*
2  ==============================================================================
3 
4  This file is part of the JUCE library.
5  Copyright (c) 2022 - Raw Material Software Limited
6 
7  JUCE is an open source library subject to commercial or open-source
8  licensing.
9 
10  The code included in this file is provided under the terms of the ISC license
11  http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12  To use, copy, modify, and/or distribute this software for any purpose with or
13  without fee is hereby granted provided that the above copyright notice and
14  this permission notice appear in all copies.
15 
16  JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17  EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18  DISCLAIMED.
19 
20  ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
26 enum { magicCoordWorkerConnectionHeader = 0x712baf04 };
27 
28 static const char* startMessage = "__ipc_st";
29 static const char* killMessage = "__ipc_k_";
30 static const char* pingMessage = "__ipc_p_";
31 enum { specialMessageSize = 8, defaultTimeoutMs = 8000 };
32 
33 static bool isMessageType (const MemoryBlock& mb, const char* messageType) noexcept
34 {
35  return mb.matches (messageType, (size_t) specialMessageSize);
36 }
37 
38 static String getCommandLinePrefix (const String& commandLineUniqueID)
39 {
40  return "--" + commandLineUniqueID + ":";
41 }
42 
43 //==============================================================================
44 // This thread sends and receives ping messages every second, so that it
45 // can find out if the other process has stopped running.
46 struct ChildProcessPingThread : public Thread,
47  private AsyncUpdater
48 {
49  ChildProcessPingThread (int timeout) : Thread ("IPC ping"), timeoutMs (timeout)
50  {
51  pingReceived();
52  }
53 
54  void startPinging() { startThread (Priority::low); }
55 
56  void pingReceived() noexcept { countdown = timeoutMs / 1000 + 1; }
57  void triggerConnectionLostMessage() { triggerAsyncUpdate(); }
58 
59  virtual bool sendPingMessage (const MemoryBlock&) = 0;
60  virtual void pingFailed() = 0;
61 
62  int timeoutMs;
63 
65 
66 private:
67  Atomic<int> countdown;
68 
69  void handleAsyncUpdate() override { pingFailed(); }
70 
71  void run() override
72  {
73  while (! threadShouldExit())
74  {
75  if (--countdown <= 0 || ! sendPingMessage ({ pingMessage, specialMessageSize }))
76  {
77  triggerConnectionLostMessage();
78  break;
79  }
80 
81  wait (1000);
82  }
83  }
84 
85  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ChildProcessPingThread)
86 };
87 
88 //==============================================================================
89 struct ChildProcessCoordinator::Connection final : public InterprocessConnection,
90  private ChildProcessPingThread
91 {
92  Connection (ChildProcessCoordinator& m, const String& pipeName, int timeout)
93  : InterprocessConnection (false, magicCoordWorkerConnectionHeader),
94  ChildProcessPingThread (timeout),
95  owner (m)
96  {
97  createPipe (pipeName, timeoutMs);
98  }
99 
100  ~Connection() override
101  {
102  cancelPendingUpdate();
103  stopThread (10000);
104  }
105 
106  using ChildProcessPingThread::startPinging;
107 
108 private:
109  void connectionMade() override {}
110  void connectionLost() override { owner.handleConnectionLost(); }
111 
112  bool sendPingMessage (const MemoryBlock& m) override { return owner.sendMessageToWorker (m); }
113  void pingFailed() override { connectionLost(); }
114 
115  void messageReceived (const MemoryBlock& m) override
116  {
117  pingReceived();
118 
119  if (m.getSize() != specialMessageSize || ! isMessageType (m, pingMessage))
120  owner.handleMessageFromWorker (m);
121  }
122 
124 
125  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Connection)
126 };
127 
128 //==============================================================================
130 
132 {
134 }
135 
137 
139 {
140  JUCE_BEGIN_IGNORE_WARNINGS_GCC_LIKE ("-Wdeprecated-declarations")
141  JUCE_BEGIN_IGNORE_WARNINGS_MSVC (4996)
142  handleMessageFromSlave (mb);
143  JUCE_END_IGNORE_WARNINGS_GCC_LIKE
144  JUCE_END_IGNORE_WARNINGS_MSVC
145 }
146 
148 {
149  if (connection != nullptr)
150  return connection->sendMessage (mb);
151 
152  jassertfalse; // this can only be used when the connection is active!
153  return false;
154 }
155 
156 bool ChildProcessCoordinator::launchWorkerProcess (const File& executable, const String& commandLineUniqueID,
157  int timeoutMs, int streamFlags)
158 {
160 
161  auto pipeName = "p" + String::toHexString (Random().nextInt64());
162 
163  StringArray args;
164  args.add (executable.getFullPathName());
165  args.add (getCommandLinePrefix (commandLineUniqueID) + pipeName);
166 
167  childProcess = [&]() -> std::shared_ptr<ChildProcess>
168  {
169  if ((SystemStats::getOperatingSystemType() & SystemStats::Linux) != 0)
170  return ChildProcessManager::getInstance()->createAndStartManagedChildProcess (args, streamFlags);
171 
172  auto p = std::make_shared<ChildProcess>();
173 
174  if (p->start (args, streamFlags))
175  return p;
176 
177  return nullptr;
178  }();
179 
180  if (childProcess != nullptr)
181  {
182  connection.reset (new Connection (*this, pipeName, timeoutMs <= 0 ? defaultTimeoutMs : timeoutMs));
183 
184  if (connection->isConnected())
185  {
186  connection->startPinging();
187  sendMessageToWorker ({ startMessage, specialMessageSize });
188  return true;
189  }
190 
191  connection.reset();
192  }
193 
194  return false;
195 }
196 
198 {
199  if (connection != nullptr)
200  {
201  sendMessageToWorker ({ killMessage, specialMessageSize });
202  connection->disconnect();
203  connection.reset();
204  }
205 
206  childProcess.reset();
207 }
208 
209 //==============================================================================
210 struct ChildProcessWorker::Connection final : public InterprocessConnection,
211  private ChildProcessPingThread
212 {
213  Connection (ChildProcessWorker& p, const String& pipeName, int timeout)
214  : InterprocessConnection (false, magicCoordWorkerConnectionHeader),
215  ChildProcessPingThread (timeout),
216  owner (p)
217  {
218  connectToPipe (pipeName, timeoutMs);
219  }
220 
221  ~Connection() override
222  {
223  cancelPendingUpdate();
224  stopThread (10000);
225  disconnect();
226  }
227 
228  using ChildProcessPingThread::startPinging;
229 
230 private:
231  ChildProcessWorker& owner;
232 
233  void connectionMade() override {}
234  void connectionLost() override { owner.handleConnectionLost(); }
235 
236  bool sendPingMessage (const MemoryBlock& m) override { return owner.sendMessageToCoordinator (m); }
237  void pingFailed() override { connectionLost(); }
238 
239  void messageReceived (const MemoryBlock& m) override
240  {
241  pingReceived();
242 
243  if (isMessageType (m, pingMessage))
244  return;
245 
246  if (isMessageType (m, killMessage))
247  return triggerConnectionLostMessage();
248 
249  if (isMessageType (m, startMessage))
250  return owner.handleConnectionMade();
251 
252  owner.handleMessageFromCoordinator (m);
253  }
254 
255  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (Connection)
256 };
257 
258 //==============================================================================
261 
264 
266 {
267  JUCE_BEGIN_IGNORE_WARNINGS_GCC_LIKE ("-Wdeprecated-declarations")
268  JUCE_BEGIN_IGNORE_WARNINGS_MSVC (4996)
269  handleMessageFromMaster (mb);
270  JUCE_END_IGNORE_WARNINGS_GCC_LIKE
271  JUCE_END_IGNORE_WARNINGS_MSVC
272 }
273 
275 {
276  if (connection != nullptr)
277  return connection->sendMessage (mb);
278 
279  jassertfalse; // this can only be used when the connection is active!
280  return false;
281 }
282 
284  const String& commandLineUniqueID,
285  int timeoutMs)
286 {
287  auto prefix = getCommandLinePrefix (commandLineUniqueID);
288 
289  if (commandLine.trim().startsWith (prefix))
290  {
291  auto pipeName = commandLine.fromFirstOccurrenceOf (prefix, false, false)
292  .upToFirstOccurrenceOf (" ", false, false).trim();
293 
294  if (pipeName.isNotEmpty())
295  {
296  connection.reset (new Connection (*this, pipeName, timeoutMs <= 0 ? defaultTimeoutMs : timeoutMs));
297 
298  if (connection->isConnected())
299  connection->startPinging();
300  else
301  connection.reset();
302  }
303  }
304 
305  return connection != nullptr;
306 }
307 
308 } // namespace juce
void cancelPendingUpdate() noexcept
bool sendMessageToWorker(const MemoryBlock &)
virtual void handleMessageFromWorker(const MemoryBlock &)
bool launchWorkerProcess(const File &executableToLaunch, const String &commandLineUniqueID, int timeoutMs=0, int streamFlags=ChildProcess::wantStdOut|ChildProcess::wantStdErr)
bool sendMessageToCoordinator(const MemoryBlock &)
virtual void handleMessageFromCoordinator(const MemoryBlock &mb)
bool initialiseFromCommandLine(const String &commandLine, const String &commandLineUniqueID, int timeoutMs=0)
const String & getFullPathName() const noexcept
Definition: juce_File.h:153
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
void add(String stringToAdd)
String upToFirstOccurrenceOf(StringRef substringToEndWith, bool includeSubStringInResult, bool ignoreCase) const
String trim() const
bool startsWith(StringRef text) const noexcept
static String toHexString(IntegerType number)
Definition: juce_String.h:1097
String fromFirstOccurrenceOf(StringRef substringToStartFrom, bool includeSubStringInResult, bool ignoreCase) const
static OperatingSystemType getOperatingSystemType()
bool wait(double timeOutMilliseconds) const
Thread(const String &threadName, size_t threadStackSize=osDefaultStackSize)
Definition: juce_Thread.cpp:26
bool startThread()
bool threadShouldExit() const