Send and Recieve data between client and workers

15 次查看(过去 30 天)
Hello everybody,
I am trying to update text fields of a GUI using multiple workers with parfevalOnAll and parallel.pool.DataQueue. I cannot use labRecieve or labSend because spmd wouldn't work for my application. To recieve the calculated data from the workers and update the text field I am using this code on the client:
parpool(4)
pool = gcp();
fig = uifigure();
grid = uigridlayout(fig, [2, 2]);
btn = uibutton(grid,'Text','SendtoWorker');
textField = uitextarea(uipanel(grid));
Q1 = parallel.pool.DataQueue();
listener1 = afterEach(Q1, @(data) UpdateTextField(textField, data));
fut1 = parfevalOnAll(@worker1, 0, Q1);
btn.ButtonPushedFcn = @(~,~) SendDataToworker(); %Send data to worker on Q2 when button is pushed
function UpdateTextField(textField, data)
str = num2str(data);
textField.Value = str;
end
function SendDataToworker() %Send data1 to worker on Q2
Q2 = parallel.pool.DataQueue();
data1 = 0;
send(Q2,data1)
end
On the workers I have defined the dataQueue Q2 and a listener2 to recieve the variables from client as such:
function worker1()
global acquiredData;
acquiredData = 1; % initial value for the data
Q2 = parallel.pool.DataQueue(); % Queue to recieve data from client
listener2 = afterEach(Q2, @(data) calculateData(data));
function calculateData(data)
acquiredData = data;
send(Q1,acquiredData + 1) %sending calculated value back to client
end
end
Although the Help Section mentions that parallel.pool.DataQueue should work in reverse direction I am not seeing the results on the GUI. I also don't get any errors during execution. Thank you for your help.
  5 个评论
Yusuf Salikoglu
Yusuf Salikoglu 2021-4-9
It does need to use queues because although the code here looks quite simplified I will actually be controlling multiple laboratory power supplies and querying their voltage, current, power values and updating them on the gui whilst sending setParameter() functions from client to worker to be able to control the voltage, kinda like a feedback loop if you will. So the worker actually has to constantly be listening to the queue from client.
Mohammad Sami
Mohammad Sami 2021-4-10
Ok after some experimentation this seems to work. If the queue are created in the main thread they can only be used to receive data from the worker but cannot be used to send data. Hence we need to create queues in the main thread and the worker and exchange them to have full bi directional comms.
I have also replaced the nested function to a while loop with an exit condition. You can change the exit condition to suit your purpose. Or perhaps you can send an exit command on q2 e.t.c.
For the main thread
q1 = parallel.pool.DataQueue; % this is to retrieve the data
afterEach(q1,@disp);
% this is used to retrive the pollable data q from the worker
q11 = parallel.pool.PollableDataQueue;
p = gcp;
parfeval(@worker1,0,q1,q11);
q2 = poll(q11,10); % retrieve the pollable data queue from the worker
for i = 1:10
send(q2,i); % use the retrieved q to send data / comms to worker
end
For the worker
function worker1(q1,q11)
nodatacounter = 0;
% create the q and send it back to main thread so that we can use it to rcv
% data from the main thread
q2 = parallel.pool.PollableDataQueue;
send(q11,q2);
while nodatacounter < 10 % kill the worker if no comms for 10 iterations
[data,datarcvd] = poll(q2,10); % 10 second timeout
if datarcvd
send(q1,data);
nodatacounter = 0;
else
nodatacounter = nodatacounter + 1;
end
end
end

请先登录,再进行评论。

采纳的回答

Mohammad Sami
Mohammad Sami 2021-4-11
Moved to answer.
Ok after some experimentation this seems to work. If the queue are created in the main thread they can only be used to receive data from the worker but cannot be used to send data. Hence we need to create queues in the main thread and the worker and exchange them to have full bi directional comms.
I have also replaced the nested function to a while loop with an exit condition. You can change the exit condition to suit your purpose. Or perhaps you can send an exit command on q2 e.t.c.
For the main thread
q1 = parallel.pool.DataQueue; % this is to retrieve the data
afterEach(q1,@disp);
% this is used to retrive the pollable data q from the worker
q11 = parallel.pool.PollableDataQueue;
p = gcp;
parfeval(@worker1,0,q1,q11);
q2 = poll(q11,10); % retrieve the pollable data queue from the worker
for i = 1:10
send(q2,i); % use the retrieved q to send data / comms to worker
end
For the worker
function worker1(q1,q11)
nodatacounter = 0;
% create the q and send it back to main thread so that we can use it to rcv
% data from the main thread
q2 = parallel.pool.PollableDataQueue;
send(q11,q2);
while nodatacounter < 10 % kill the worker if no comms for 10 iterations
[data,datarcvd] = poll(q2,10); % 10 second timeout
if datarcvd
send(q1,data);
nodatacounter = 0;
else
nodatacounter = nodatacounter + 1;
end
end
end

更多回答(0 个)

产品


版本

R2019b

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by