PollableDataQueues / competing while loops

8 次查看(过去 30 天)
I want to establish two-way communications between a client and worker called via parfeval within parallel while loops. The client sends instruction updates to the worker and the worker needs to send data packages and status messages to the client.
The problem with this setup is the competing while loops: if one of the while-loops is faster, the data queue being written to by the faster while-loop gets excessively long. I would prefer if the last item could be pulled and the queue flushed, but the best I can come up with at the moment is to implement a while loop that sucks everything out of the queue. The extra polling adds overhead, especially if one of the queues is much slower than the other.
The example below demonstrates this: the client is slowed down by the plot operation. The code executes slower than it otherwise would because of the while-loop emptying the queue with its polling. It gets much worse if the pause command is uncommented. If you look at the number of messages in PolQ after execution you will still see a large number of leftover messages (~50 on my machine).
% Start parallel pool...
if isempty(gcp('nocreate'))
p = parpool;
else
p = gcp;
end
% Struct containing instructions (Kill / exit)
Message.Kill = false;
% Pollable queue for receiving data from worker
PolQ = parallel.pool.PollableDataQueue;
% Initialize worker
f = parfeval(@MyWorkerFunc, 0, Message, PolQ);
% Receive a queue, pollable by the worker, for sending messages to the worker:
SenQ = poll(PolQ, Inf);
% Initialize client main loop...
killCount = 1;
killThresh = 100;
while true
% Poll worker for data...
if PolQ.QueueLength > 0
while PolQ.QueueLength > 0
[data, OK] = poll(PolQ);
end
% Do something useful that takes time
plot(data);
drawnow;
% Uncomment below to see an exaggerated effect...
% pause(0.05);
end
% Kill?
if killCount > killThresh
Message.Kill = true;
send(SenQ, Message);
break
end
% Send "instructions" (not present in demo) to worker
send(SenQ, Message);
% Increment kill counter
killCount = killCount + 1;
disp(killCount);
end
disp(PolQ)
% Function executed on worker:
function MyWorkerFunc(Message, PolQ)
% Create instruction relay & send back to client:
SenQ = parallel.pool.PollableDataQueue;
send(PolQ, SenQ);
while ~Message.Kill
% Make our "data"
data = randn(1, 100);
% Send to client
send(PolQ, data);
% Check for updates to Message:
tmp = poll(SenQ);
if ~isempty(tmp)
Message = tmp;
end
end
end
What is a more efficient way of doing this? Is there a way to access a specific queue entry and flush the queue after retreiving? Alternatively, is there a more efficient way of signalling the worker that the client is ready to receive data, so that it isn't continuously broadcasting messages?
  2 个评论
Mario Malic
Mario Malic 2023-6-5
I don't know a lot about parallel computing, but maybe this could be more appropriate parallel.pool.DataQueue + afterEach
However, having pause within the block while still having data to process is opposite of what you want to do, that pause should be taken out of the if statement block.
% if PolQ.QueueLength > 0
% code
% end
Also, persistent variables could help, this is the amount of time it takes to construct a PollableDataQueue, and you do that every time you use send function if I understood this code correctly.
tic;
SenQ = parallel.pool.PollableDataQueue;
toc
Elapsed time is 0.105898 seconds.
persistent SenQ
if isempty(SenQ)
SenQ = parallel.pool.PollableDataQueue;
end
Tim
Tim 2023-6-5
编辑:Tim 2023-6-5
Thank you for the repy Mario. You are absolutely right about the "pause" line, but it was only there to simulate some unavoidable process which may take non-trivial time, (e.g. replace "pause" with some necessary but moderately slow operation, the pause just shows how it disproportionally slows the loop down).
Regarding SenQ - speaking as a newbie to parallel processing, I don't think it needs to be made persistent because the function MyFunc is only called once, and SenQ is initialized in that function; SenQ is never getting re-created, so the overhead associated with creating that queue is only incurred once.
The solution I think I'm going to settle on, though I am still somewhat unhappy with it, is to find a way to guarantee that the sending on both loops is performed less frequently than the polling on both loops; this way the polling never gets stacked too high; this could be done with a counter. I still don't like this solution; one of the things I don't understand, for example, is why the length of a pollable data queue from the sending side is 0 even if the message hasn't been picked up. If that value actually reflected the fact that the queue hadn't been read yet that would be more useful (i.e. if the sender could check if the queue had been read yet, it could wait to push something onto the queue until the message on there had been read).

请先登录,再进行评论。

回答(0 个)

类别

Help CenterFile Exchange 中查找有关 Asynchronous Parallel Programming 的更多信息

产品


版本

R2020a

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by