Main Content

本页采用了机器翻译。点击此处可查看英文原文。

接收有关工作进程的通信

自 R2023b 起

此示例显示如何在工作进程上设置数据队列来接收数据。

您可以使用数据队列在客户端和工作进程之间传输数据或消息。

此示例生成有关工作进程的仪器数据并将数据发送回客户端。为了启动和停止信号生成,客户端可以使用数据队列向工作进程发送消息。这种方法提供了一种更平滑的方式来停止工作进程上的 parfeval 计算。

启动一个具有三个工作进程的并行池。

pool = parpool("Processes",3);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 3 workers.

设置队列以在客户端接收数据

准备并初始化图表以可视化来自工作进程的仪器数据。createPlots 函数在示例的末尾定义。

[fig,p] = createPlots;

创建一个 DataQueue,并使用 afterEach 来指定每次队列接收数据时执行的函数。receiveDataOnClient 函数绘制从工作进程收到的数据,并在示例的末尾定义。

clientQueue = parallel.pool.DataQueue;
afterEach(clientQueue,@(data) receiveDataOnClient(p,data));

设置队列以接收有关工作人员的通信

在客户端上创建一个助手 PollableDataQueue

helperClientQueue = parallel.pool.PollableDataQueue;

使用 parfeval 在并行池中的三个工作进程上建立数据队列。connectToWorker 辅助函数为每个工作进程分配一个唯一的 ID,在每个工作进程上创建一个 PollableDataQueue,并使用 helperClientQueue 队列将数据队列发送到客户端。然后,工作进程等待客户端的指令来开始数据生成。

wkrF(1:3) = parallel.FevalFuture;
for ID = 1:3
    wkrF(ID) = parfeval(@connectToWorker,0,clientQueue,helperClientQueue,ID);
end

在客户端,接收标记的工作进程队列。您现在可以使用这些队列向每个工作进程发送数据。

allWkrQueues = struct('ID',{},'Queue',{});
for i = 1:3
    wkrQueue = poll(helperClientQueue,inf);
    allWkrQueues(wkrQueue.ID) = wkrQueue;
end

开始和停止数据生成

接下来,指示工作进程开始生成数据。

for ID = 1:3
    send(allWkrQueues(ID).Queue,"Start generating data");
end

该图显示了每个工作进程生成并发送给客户端的仪器数据。

fig.Visible="on";

生成周期为 10 秒的数据。

pause(10)

要停止在工作进程 2 上收集数据,请使用在工作进程 2 上创建的队列向工作进程发送消息。您可以观察到仪器 2 的线停止在约 0.9 秒处。

send(allWkrQueues(2).Queue,"stop");

轮询 helperClientQueue 队列以接收来自工作进程 2 的确认。

[status, ~] = poll(helperClientQueue,inf);
disp(status)
Data generation stopped on worker 2

等待其他工作进程完成计算。

wait(wkrF);

辅助函数

connectToWorker 函数在工作进程上创建 PollableDataQueue,将其发送给客户端,然后轮询 wkrQueue 队列等待来自客户端的指令。

当工作进程收到来自客户端的消息时,该函数会在工作进程上生成一个虚拟信号,仿真来自仪器的连续数据。在每个时间步骤中,工作进程使用 clientQueue 队列向客户端发送一个信号点,然后轮询 wkrQueue 队列以检查队列是否有数据。如果有数据需要接收,则工作进程停止生成数据,并向客户端发送消息确认已停止生成数据。

function connectToWorker(clientQueue,helperClientQueue,ID)
% Assign an ID to this worker.
wkrQueue.ID = ID;
% Create a PollableDataQueue on this specific worker.
wkrQueue.Queue = parallel.pool.PollableDataQueue;
% Send the queue to the client.
send(helperClientQueue,wkrQueue);

% Wait for instructions from client.
[~, OK] = poll(wkrQueue.Queue,inf);
if OK
    t = 0:0.01:4;
    step = 1;
    while step < numel(t)
        % Generate dummy instrument data.
        data_point = sin(ID*2*pi*t(step));
        % Send data to client using a data queue.
        send(clientQueue,{ID,t(step),data_point});
        % Check if worker queue has data to receive and use a timeout.
        [~, OK] = poll(wkrQueue.Queue,0.1);
        if OK
            send(helperClientQueue,sprintf("Data generation stopped on worker %d",ID));
            return
        else
            step = step + 1;
        end
    end
else
    return
end
end

定义一个函数来准备和初始化图表以可视化来自工作进程的数据。为每个工作进程指定不同的生产线属性。

function [fig,p] = createPlots
fig = figure(Name="Signal from Instruments",Visible="off");
t = tiledlayout(fig,3,1);
lineColor = ["k","b","g"];
p = gobjects(1,3);
for i=1:3
    nexttile(t);
    xlabel("Time (s)");
    ylabel("Amplitude");
    title(sprintf("Instrument %d",i))
    p(i) = animatedline(NaN,NaN,Color=lineColor(i));
end
end

定义一个函数,当工作进程向客户端发送数据时更新图表。

function receiveDataOnClient(p,data)
addpoints(p(data{1,1}),data{1,2},data{1,3})
drawnow limitrate;
end

另请参阅

| |

相关主题