PollableDataQueues / competing while loops
11 次查看（过去 30 天）
I want to establish two-way communications between a client and worker called via parfeval within parallel while loops. The client sends instruction updates to the worker and the worker needs to send data packages and status messages to the client.
The problem with this setup is the competing while loops: if one of the while-loops is faster, the data queue being written to by the faster while-loop gets excessively long. I would prefer if the last item could be pulled and the queue flushed, but the best I can come up with at the moment is to implement a while loop that sucks everything out of the queue. The extra polling adds overhead, especially if one of the queues is much slower than the other.
The example below demonstrates this: the client is slowed down by the plot operation. The code executes slower than it otherwise would because of the while-loop emptying the queue with its polling. It gets much worse if the pause command is uncommented. If you look at the number of messages in PolQ after execution you will still see a large number of leftover messages (~50 on my machine).
% Start parallel pool...
p = parpool;
p = gcp;
% Struct containing instructions (Kill / exit)
Message.Kill = false;
% Pollable queue for receiving data from worker
PolQ = parallel.pool.PollableDataQueue;
% Initialize worker
f = parfeval(@MyWorkerFunc, 0, Message, PolQ);
% Receive a queue, pollable by the worker, for sending messages to the worker:
SenQ = poll(PolQ, Inf);
% Initialize client main loop...
killCount = 1;
killThresh = 100;
% Poll worker for data...
if PolQ.QueueLength > 0
while PolQ.QueueLength > 0
[data, OK] = poll(PolQ);
% Do something useful that takes time
% Uncomment below to see an exaggerated effect...
if killCount > killThresh
Message.Kill = true;
% Send "instructions" (not present in demo) to worker
% Increment kill counter
killCount = killCount + 1;
% Function executed on worker:
function MyWorkerFunc(Message, PolQ)
% Create instruction relay & send back to client:
SenQ = parallel.pool.PollableDataQueue;
% Make our "data"
data = randn(1, 100);
% Send to client
% Check for updates to Message:
tmp = poll(SenQ);
Message = tmp;
What is a more efficient way of doing this? Is there a way to access a specific queue entry and flush the queue after retreiving? Alternatively, is there a more efficient way of signalling the worker that the client is ready to receive data, so that it isn't continuously broadcasting messages?