接收有关工作进程的通信
此示例显示如何在工作进程上设置数据队列来接收数据。
您可以使用数据队列在客户端和工作进程之间传输数据或消息。
此示例生成有关工作进程的仪器数据并将数据发送回客户端。为了启动和停止信号生成,客户端可以使用数据队列向工作进程发送消息。这种方法提供了一种更平滑的方式来停止工作进程上的 parfeval
计算。
启动一个具有三个工作进程的并行池。
pool = parpool("Processes",3);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 3 workers.
设置队列以在客户端接收数据
准备并初始化图表以可视化来自工作进程的仪器数据。createPlots
函数在示例的末尾定义。
[fig,p] = createPlots;
创建一个 DataQueue
,并使用 afterEach
来指定每次队列接收数据时执行的函数。receiveDataOnClient
函数绘制从工作进程收到的数据,并在示例的末尾定义。
clientQueue = parallel.pool.DataQueue; afterEach(clientQueue,@(data) receiveDataOnClient(p,data));
设置队列以接收有关工作人员的通信
在客户端上创建一个助手 PollableDataQueue
。
helperClientQueue = parallel.pool.PollableDataQueue;
使用 parfeval
在并行池中的三个工作进程上建立数据队列。connectToWorker
辅助函数为每个工作进程分配一个唯一的 ID,在每个工作进程上创建一个 PollableDataQueue
,并使用 helperClientQueue
队列将数据队列发送到客户端。然后,工作进程等待客户端的指令来开始数据生成。
wkrF(1:3) = parallel.FevalFuture; for ID = 1:3 wkrF(ID) = parfeval(@connectToWorker,0,clientQueue,helperClientQueue,ID); end
在客户端,接收标记的工作进程队列。您现在可以使用这些队列向每个工作进程发送数据。
allWkrQueues = struct('ID',{},'Queue',{}); for i = 1:3 wkrQueue = poll(helperClientQueue,inf); allWkrQueues(wkrQueue.ID) = wkrQueue; end
开始和停止数据生成
接下来,指示工作进程开始生成数据。
for ID = 1:3 send(allWkrQueues(ID).Queue,"Start generating data"); end
该图显示了每个工作进程生成并发送给客户端的仪器数据。
fig.Visible="on";
生成周期为 10 秒的数据。
pause(10)
要停止在工作进程 2 上收集数据,请使用在工作进程 2 上创建的队列向工作进程发送消息。您可以观察到仪器 2 的线停止在约 0.9 秒处。
send(allWkrQueues(2).Queue,"stop");
轮询 helperClientQueue
队列以接收来自工作进程 2 的确认。
[status, ~] = poll(helperClientQueue,inf); disp(status)
Data generation stopped on worker 2
等待其他工作进程完成计算。
wait(wkrF);
辅助函数
connectToWorker
函数在工作进程上创建 PollableDataQueue
,将其发送给客户端,然后轮询 wkrQueue
队列等待来自客户端的指令。
当工作进程收到来自客户端的消息时,该函数会在工作进程上生成一个虚拟信号,仿真来自仪器的连续数据。在每个时间步骤中,工作进程使用 clientQueue
队列向客户端发送一个信号点,然后轮询 wkrQueue
队列以检查队列是否有数据。如果有数据需要接收,则工作进程停止生成数据,并向客户端发送消息确认已停止生成数据。
function connectToWorker(clientQueue,helperClientQueue,ID) % Assign an ID to this worker. wkrQueue.ID = ID; % Create a PollableDataQueue on this specific worker. wkrQueue.Queue = parallel.pool.PollableDataQueue; % Send the queue to the client. send(helperClientQueue,wkrQueue); % Wait for instructions from client. [~, OK] = poll(wkrQueue.Queue,inf); if OK t = 0:0.01:4; step = 1; while step < numel(t) % Generate dummy instrument data. data_point = sin(ID*2*pi*t(step)); % Send data to client using a data queue. send(clientQueue,{ID,t(step),data_point}); % Check if worker queue has data to receive and use a timeout. [~, OK] = poll(wkrQueue.Queue,0.1); if OK send(helperClientQueue,sprintf("Data generation stopped on worker %d",ID)); return else step = step + 1; end end else return end end
定义一个函数来准备和初始化图表以可视化来自工作进程的数据。为每个工作进程指定不同的生产线属性。
function [fig,p] = createPlots fig = figure(Name="Signal from Instruments",Visible="off"); t = tiledlayout(fig,3,1); lineColor = ["k","b","g"]; p = gobjects(1,3); for i=1:3 nexttile(t); xlabel("Time (s)"); ylabel("Amplitude"); title(sprintf("Instrument %d",i)) p(i) = animatedline(NaN,NaN,Color=lineColor(i)); end end
定义一个函数,当工作进程向客户端发送数据时更新图表。
function receiveDataOnClient(p,data) addpoints(p(data{1,1}),data{1,2},data{1,3}) drawnow limitrate; end
另请参阅
parallel.pool.PollableDataQueue
| parallel.pool.DataQueue
| afterEach