OpenShot Audio Library | OpenShotAudio  0.6.0
juce_InterprocessConnection.cpp
1 /*
2  ==============================================================================
3 
4  This file is part of the JUCE library.
5  Copyright (c) 2022 - Raw Material Software Limited
6 
7  JUCE is an open source library subject to commercial or open-source
8  licensing.
9 
10  The code included in this file is provided under the terms of the ISC license
11  http://www.isc.org/downloads/software-support-policy/isc-license. Permission
12  To use, copy, modify, and/or distribute this software for any purpose with or
13  without fee is hereby granted provided that the above copyright notice and
14  this permission notice appear in all copies.
15 
16  JUCE IS PROVIDED "AS IS" WITHOUT ANY WARRANTY, AND ALL WARRANTIES, WHETHER
17  EXPRESSED OR IMPLIED, INCLUDING MERCHANTABILITY AND FITNESS FOR PURPOSE, ARE
18  DISCLAIMED.
19 
20  ==============================================================================
21 */
22 
23 namespace juce
24 {
25 
26 struct InterprocessConnection::ConnectionThread final : public Thread
27 {
28  ConnectionThread (InterprocessConnection& c) : Thread ("JUCE IPC"), owner (c) {}
29  void run() override { owner.runThread(); }
30 
32  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionThread)
33 };
34 
35 class SafeActionImpl
36 {
37 public:
38  explicit SafeActionImpl (InterprocessConnection& p)
39  : ref (p) {}
40 
41  template <typename Fn>
42  void ifSafe (Fn&& fn)
43  {
44  const ScopedLock lock (mutex);
45 
46  if (safe)
47  fn (ref);
48  }
49 
50  void setSafe (bool s)
51  {
52  const ScopedLock lock (mutex);
53  safe = s;
54  }
55 
56  bool isSafe()
57  {
58  const ScopedLock lock (mutex);
59  return safe;
60  }
61 
62 private:
63  CriticalSection mutex;
64  InterprocessConnection& ref;
65  bool safe = false;
66 };
67 
68 class InterprocessConnection::SafeAction final : public SafeActionImpl
69 {
70  using SafeActionImpl::SafeActionImpl;
71 };
72 
73 //==============================================================================
74 InterprocessConnection::InterprocessConnection (bool callbacksOnMessageThread, uint32 magicMessageHeaderNumber)
75  : useMessageThread (callbacksOnMessageThread),
76  magicMessageHeader (magicMessageHeaderNumber),
77  safeAction (std::make_shared<SafeAction> (*this))
78 {
79  thread.reset (new ConnectionThread (*this));
80 }
81 
83 {
84  // You *must* call `disconnect` in the destructor of your derived class to ensure
85  // that any pending messages are not delivered. If the messages were delivered after
86  // destroying the derived class, we'd end up calling the pure virtual implementations
87  // of `messageReceived`, `connectionMade` and `connectionLost` which is definitely
88  // not a good idea!
89  jassert (! safeAction->isSafe());
90 
91  callbackConnectionState = false;
92  disconnect (4000, Notify::no);
93  thread.reset();
94 }
95 
96 //==============================================================================
98  int portNumber, int timeOutMillisecs)
99 {
100  disconnect();
101 
102  auto s = std::make_unique<StreamingSocket>();
103 
104  if (s->connect (hostName, portNumber, timeOutMillisecs))
105  {
106  const ScopedWriteLock sl (pipeAndSocketLock);
107  initialiseWithSocket (std::move (s));
108  return true;
109  }
110 
111  return false;
112 }
113 
114 bool InterprocessConnection::connectToPipe (const String& pipeName, int timeoutMs)
115 {
116  disconnect();
117 
118  auto newPipe = std::make_unique<NamedPipe>();
119 
120  if (newPipe->openExisting (pipeName))
121  {
122  const ScopedWriteLock sl (pipeAndSocketLock);
123  pipeReceiveMessageTimeout = timeoutMs;
124  initialiseWithPipe (std::move (newPipe));
125  return true;
126  }
127 
128  return false;
129 }
130 
131 bool InterprocessConnection::createPipe (const String& pipeName, int timeoutMs, bool mustNotExist)
132 {
133  disconnect();
134 
135  auto newPipe = std::make_unique<NamedPipe>();
136 
137  if (newPipe->createNewPipe (pipeName, mustNotExist))
138  {
139  const ScopedWriteLock sl (pipeAndSocketLock);
140  pipeReceiveMessageTimeout = timeoutMs;
141  initialiseWithPipe (std::move (newPipe));
142  return true;
143  }
144 
145  return false;
146 }
147 
148 void InterprocessConnection::disconnect (int timeoutMs, Notify notify)
149 {
150  thread->signalThreadShouldExit();
151 
152  {
153  const ScopedReadLock sl (pipeAndSocketLock);
154  if (socket != nullptr) socket->close();
155  if (pipe != nullptr) pipe->close();
156  }
157 
158  thread->stopThread (timeoutMs);
159  deletePipeAndSocket();
160 
161  if (notify == Notify::yes)
162  connectionLostInt();
163 
164  callbackConnectionState = false;
165  safeAction->setSafe (false);
166 }
167 
168 void InterprocessConnection::deletePipeAndSocket()
169 {
170  const ScopedWriteLock sl (pipeAndSocketLock);
171  socket.reset();
172  pipe.reset();
173 }
174 
176 {
177  const ScopedReadLock sl (pipeAndSocketLock);
178 
179  return ((socket != nullptr && socket->isConnected())
180  || (pipe != nullptr && pipe->isOpen()))
181  && threadIsRunning;
182 }
183 
185 {
186  {
187  const ScopedReadLock sl (pipeAndSocketLock);
188 
189  if (pipe == nullptr && socket == nullptr)
190  return {};
191 
192  if (socket != nullptr && ! socket->isLocal())
193  return socket->getHostName();
194  }
195 
196  return IPAddress::local().toString();
197 }
198 
199 //==============================================================================
201 {
202  uint32 messageHeader[2] = { ByteOrder::swapIfBigEndian (magicMessageHeader),
203  ByteOrder::swapIfBigEndian ((uint32) message.getSize()) };
204 
205  MemoryBlock messageData (sizeof (messageHeader) + message.getSize());
206  messageData.copyFrom (messageHeader, 0, sizeof (messageHeader));
207  messageData.copyFrom (message.getData(), sizeof (messageHeader), message.getSize());
208 
209  return writeData (messageData.getData(), (int) messageData.getSize()) == (int) messageData.getSize();
210 }
211 
212 int InterprocessConnection::writeData (void* data, int dataSize)
213 {
214  const ScopedReadLock sl (pipeAndSocketLock);
215 
216  if (socket != nullptr)
217  return socket->write (data, dataSize);
218 
219  if (pipe != nullptr)
220  return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
221 
222  return 0;
223 }
224 
225 //==============================================================================
226 void InterprocessConnection::initialise()
227 {
228  safeAction->setSafe (true);
229  threadIsRunning = true;
230  connectionMadeInt();
231  thread->startThread();
232 }
233 
234 void InterprocessConnection::initialiseWithSocket (std::unique_ptr<StreamingSocket> newSocket)
235 {
236  jassert (socket == nullptr && pipe == nullptr);
237  socket = std::move (newSocket);
238  initialise();
239 }
240 
241 void InterprocessConnection::initialiseWithPipe (std::unique_ptr<NamedPipe> newPipe)
242 {
243  jassert (socket == nullptr && pipe == nullptr);
244  pipe = std::move (newPipe);
245  initialise();
246 }
247 
248 //==============================================================================
249 struct ConnectionStateMessage final : public MessageManager::MessageBase
250 {
251  ConnectionStateMessage (std::shared_ptr<SafeActionImpl> ipc, bool connected) noexcept
252  : safeAction (ipc), connectionMade (connected)
253  {}
254 
255  void messageCallback() override
256  {
257  safeAction->ifSafe ([this] (InterprocessConnection& owner)
258  {
259  if (connectionMade)
260  owner.connectionMade();
261  else
262  owner.connectionLost();
263  });
264  }
265 
266  std::shared_ptr<SafeActionImpl> safeAction;
267  bool connectionMade;
268 
269  JUCE_DECLARE_NON_COPYABLE_WITH_LEAK_DETECTOR (ConnectionStateMessage)
270 };
271 
272 void InterprocessConnection::connectionMadeInt()
273 {
274  if (! callbackConnectionState)
275  {
276  callbackConnectionState = true;
277 
278  if (useMessageThread)
279  (new ConnectionStateMessage (safeAction, true))->post();
280  else
281  connectionMade();
282  }
283 }
284 
285 void InterprocessConnection::connectionLostInt()
286 {
287  if (callbackConnectionState)
288  {
289  callbackConnectionState = false;
290 
291  if (useMessageThread)
292  (new ConnectionStateMessage (safeAction, false))->post();
293  else
294  connectionLost();
295  }
296 }
297 
298 struct DataDeliveryMessage final : public Message
299 {
300  DataDeliveryMessage (std::shared_ptr<SafeActionImpl> ipc, const MemoryBlock& d)
301  : safeAction (ipc), data (d)
302  {}
303 
304  void messageCallback() override
305  {
306  safeAction->ifSafe ([this] (InterprocessConnection& owner)
307  {
308  owner.messageReceived (data);
309  });
310  }
311 
312  std::shared_ptr<SafeActionImpl> safeAction;
313  MemoryBlock data;
314 };
315 
316 void InterprocessConnection::deliverDataInt (const MemoryBlock& data)
317 {
318  jassert (callbackConnectionState);
319 
320  if (useMessageThread)
321  (new DataDeliveryMessage (safeAction, data))->post();
322  else
323  messageReceived (data);
324 }
325 
326 //==============================================================================
327 int InterprocessConnection::readData (void* data, int num)
328 {
329  const ScopedReadLock sl (pipeAndSocketLock);
330 
331  if (socket != nullptr)
332  return socket->read (data, num, true);
333 
334  if (pipe != nullptr)
335  return pipe->read (data, num, pipeReceiveMessageTimeout);
336 
337  jassertfalse;
338  return -1;
339 }
340 
341 bool InterprocessConnection::readNextMessage()
342 {
343  uint32 messageHeader[2];
344  auto bytes = readData (messageHeader, sizeof (messageHeader));
345 
346  if (bytes == (int) sizeof (messageHeader)
347  && ByteOrder::swapIfBigEndian (messageHeader[0]) == magicMessageHeader)
348  {
349  auto bytesInMessage = (int) ByteOrder::swapIfBigEndian (messageHeader[1]);
350 
351  if (bytesInMessage > 0)
352  {
353  MemoryBlock messageData ((size_t) bytesInMessage, true);
354  int bytesRead = 0;
355 
356  while (bytesInMessage > 0)
357  {
358  if (thread->threadShouldExit())
359  return false;
360 
361  auto numThisTime = jmin (bytesInMessage, 65536);
362  auto bytesIn = readData (addBytesToPointer (messageData.getData(), bytesRead), numThisTime);
363 
364  if (bytesIn <= 0)
365  break;
366 
367  bytesRead += bytesIn;
368  bytesInMessage -= bytesIn;
369  }
370 
371  if (bytesRead >= 0)
372  deliverDataInt (messageData);
373  }
374 
375  return true;
376  }
377 
378  if (bytes < 0)
379  {
380  if (socket != nullptr)
381  deletePipeAndSocket();
382 
383  connectionLostInt();
384  }
385 
386  return false;
387 }
388 
389 void InterprocessConnection::runThread()
390 {
391  while (! thread->threadShouldExit())
392  {
393  if (socket != nullptr)
394  {
395  auto ready = socket->waitUntilReady (true, 100);
396 
397  if (ready < 0)
398  {
399  deletePipeAndSocket();
400  connectionLostInt();
401  break;
402  }
403 
404  if (ready == 0)
405  {
406  thread->wait (1);
407  continue;
408  }
409  }
410  else if (pipe != nullptr)
411  {
412  if (! pipe->isOpen())
413  {
414  deletePipeAndSocket();
415  connectionLostInt();
416  break;
417  }
418  }
419  else
420  {
421  break;
422  }
423 
424  if (thread->threadShouldExit() || ! readNextMessage())
425  break;
426  }
427 
428  threadIsRunning = false;
429 }
430 
431 } // namespace juce
static Type swapIfBigEndian(Type value) noexcept
virtual void connectionMade()=0
virtual void messageReceived(const MemoryBlock &message)=0
void disconnect(int timeoutMs=-1, Notify notify=Notify::yes)
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
virtual void connectionLost()=0
bool sendMessage(const MemoryBlock &message)
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
void * getData() noexcept
void copyFrom(const void *srcData, int destinationOffset, size_t numBytes) noexcept
size_t getSize() const noexcept
Thread(const String &threadName, size_t threadStackSize=osDefaultStackSize)
Definition: juce_Thread.cpp:26