29 void run()
override { owner.runThread(); }
37 : useMessageThread (callbacksOnMessageThread),
38 magicMessageHeader (magicMessageHeaderNumber)
45 callbackConnectionState =
false;
47 masterReference.clear();
53 int portNumber,
int timeOutMillisecs)
60 if (socket->connect (hostName, portNumber, timeOutMillisecs))
62 threadIsRunning =
true;
64 thread->startThread();
76 std::unique_ptr<NamedPipe> newPipe (
new NamedPipe());
78 if (newPipe->openExisting (pipeName))
81 pipeReceiveMessageTimeout = timeoutMs;
82 initialiseWithPipe (newPipe.release());
93 std::unique_ptr<NamedPipe> newPipe (
new NamedPipe());
95 if (newPipe->createNewPipe (pipeName, mustNotExist))
98 pipeReceiveMessageTimeout = timeoutMs;
99 initialiseWithPipe (newPipe.release());
108 thread->signalThreadShouldExit();
112 if (socket !=
nullptr) socket->close();
113 if (pipe !=
nullptr) pipe->close();
116 thread->stopThread (4000);
117 deletePipeAndSocket();
121 void InterprocessConnection::deletePipeAndSocket()
132 return ((socket !=
nullptr && socket->isConnected())
133 || (pipe !=
nullptr && pipe->isOpen()))
142 if (pipe ==
nullptr && socket ==
nullptr)
145 if (socket !=
nullptr && ! socket->isLocal())
146 return socket->getHostName();
159 messageData.
copyFrom (messageHeader, 0,
sizeof (messageHeader));
160 messageData.copyFrom (message.
getData(),
sizeof (messageHeader), message.
getSize());
162 return writeData (messageData.getData(), (int) messageData.getSize()) == (
int) messageData.getSize();
165 int InterprocessConnection::writeData (
void* data,
int dataSize)
169 if (socket !=
nullptr)
170 return socket->write (data, dataSize);
173 return pipe->write (data, dataSize, pipeReceiveMessageTimeout);
179 void InterprocessConnection::initialiseWithSocket (
StreamingSocket* newSocket)
181 jassert (socket ==
nullptr && pipe ==
nullptr);
182 socket.reset (newSocket);
184 threadIsRunning =
true;
186 thread->startThread();
189 void InterprocessConnection::initialiseWithPipe (
NamedPipe* newPipe)
191 jassert (socket ==
nullptr && pipe ==
nullptr);
192 pipe.reset (newPipe);
194 threadIsRunning =
true;
196 thread->startThread();
206 void messageCallback()
override 208 if (
auto* ipc = owner.get())
223 void InterprocessConnection::connectionMadeInt()
225 if (! callbackConnectionState)
227 callbackConnectionState =
true;
229 if (useMessageThread)
236 void InterprocessConnection::connectionLostInt()
238 if (callbackConnectionState)
240 callbackConnectionState =
false;
242 if (useMessageThread)
252 : owner (ipc), data (d)
255 void messageCallback()
override 257 if (
auto* ipc = owner.get())
265 void InterprocessConnection::deliverDataInt (
const MemoryBlock& data)
267 jassert (callbackConnectionState);
269 if (useMessageThread)
276 int InterprocessConnection::readData (
void* data,
int num)
278 if (socket !=
nullptr)
279 return socket->read (data, num,
true);
282 return pipe->read (data, num, pipeReceiveMessageTimeout);
288 bool InterprocessConnection::readNextMessage()
290 uint32 messageHeader[2];
291 auto bytes = readData (messageHeader,
sizeof (messageHeader));
293 if (bytes == (
int)
sizeof (messageHeader)
298 if (bytesInMessage > 0)
300 MemoryBlock messageData ((
size_t) bytesInMessage,
true);
303 while (bytesInMessage > 0)
305 if (thread->threadShouldExit())
308 auto numThisTime = jmin (bytesInMessage, 65536);
309 auto bytesIn = readData (addBytesToPointer (messageData.
getData(), bytesRead), numThisTime);
314 bytesRead += bytesIn;
315 bytesInMessage -= bytesIn;
319 deliverDataInt (messageData);
327 if (socket !=
nullptr)
328 deletePipeAndSocket();
336 void InterprocessConnection::runThread()
338 while (! thread->threadShouldExit())
340 if (socket !=
nullptr)
342 auto ready = socket->waitUntilReady (
true, 100);
346 deletePipeAndSocket();
357 else if (pipe !=
nullptr)
359 if (! pipe->isOpen())
361 deletePipeAndSocket();
371 if (thread->threadShouldExit() || ! readNextMessage())
375 threadIsRunning =
false;
String toString() const
Returns a dot- or colon-separated string in the form "1.2.3.4" (IPv4) or "1:2:3:4:5:6:7:8" (IPv6)...
size_t getSize() const noexcept
Returns the block's current allocated size, in bytes.
static Type swapIfBigEndian(Type value) noexcept
Swaps the byte order of a signed or unsigned integer if the CPU is big-endian.
bool connectToSocket(const String &hostName, int portNumber, int timeOutMillisecs)
Tries to connect this object to a socket.
Thread(const String &threadName, size_t threadStackSize=0)
Creates a thread.
virtual ~InterprocessConnection()
Destructor.
void copyFrom(const void *srcData, int destinationOffset, size_t numBytes) noexcept
Copies data into this MemoryBlock from a memory address.
virtual void messageReceived(const MemoryBlock &message)=0
Called when a message arrives.
A cross-process pipe that can have data written to and read from it.
A wrapper for a streaming (TCP) socket.
void * getData() noexcept
Returns a void pointer to the data.
InterprocessConnection(bool callbacksOnMessageThread=true, uint32 magicMessageHeaderNumber=0xf2b49e2c)
Creates a connection.
bool createPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs, bool mustNotExist=false)
Tries to create a new pipe for other processes to connect to.
virtual void connectionLost()=0
Called when the connection is broken.
bool connectToPipe(const String &pipeName, int pipeReceiveMessageTimeoutMs)
Tries to connect the object to an existing named pipe.
bool sendMessage(const MemoryBlock &message)
Tries to send a message to the other end of this connection.
virtual void connectionMade()=0
Called when the connection is first connected.
bool isConnected() const
True if a socket or pipe is currently active.
This class acts as a pointer which will automatically become null if the object to which it points is...
void disconnect()
Disconnects and closes any currently-open sockets or pipes.
static IPAddress local(bool IPv6=false) noexcept
Returns an IPv4 or IPv6 address meaning "localhost", equivalent to 127.0.0.1 (IPv4) or ::1 (IPv6) ...
String getConnectedHostName() const
Returns the name of the machine at the other end of this connection.
A class to hold a resizable block of raw data.
Automatically locks and unlocks a mutex object.
The base class for objects that can be sent to a MessageListener.
Manages a simple two-way messaging connection to another process, using either a socket or a named pi...
void run() override
Must be implemented to perform the thread's actual code.
Internal class used as the base class for all message objects.