主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

接收有关工作单元的通信

自 R2023b 起

此示例展示了如何在工作单元上设置数据队列以接收数据。

您可以使用 PollableDataQueue 在客户端和工作单元之间传输数据或消息。默认情况下,PollableDataQueue 对象只将数据发送到创建它的客户端或工作单元。要使工作单元能够使用此默认类型接收来自客户端的消息,您需要在工作单元上创建 PollableDataQueue 对象,然后将其发送给客户端。从 R2025a 开始,您可以创建另一种类型的 PollableDataQueue,允许客户端或池中的任何工作单元轮询它以接收数据。有关此类 PollableDataQueue 的示例,请参阅 使用可轮询数据队列向工作单元发送消息

此示例生成有关工作单元的仪器数据并将数据发送回客户端。为了启动和停止信号生成,客户端可以使用数据队列向工作单元发送消息。这种方法提供了一种更平滑的方式来停止工作单元上的 parfeval 计算。

启动一个具有三个工作单元的并行池。

pool = parpool("Processes",3);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 3 workers.

设置队列以在客户端接收数据

准备并初始化图以可视化来自工作单元的仪器数据。createPlots 函数在示例的末尾定义。

[fig,p] = createPlots;

创建一个 DataQueue,并使用 afterEach 来指定每次队列接收数据时执行的函数。receiveDataOnClient 函数绘制从工作单元收到的数据,并在示例的末尾定义。

clientQueue = parallel.pool.DataQueue;
afterEach(clientQueue,@(data) receiveDataOnClient(p,data));

设置队列以接收有关工作单元的通信

在客户端上创建一个助手 PollableDataQueue

helperClientQueue = parallel.pool.PollableDataQueue;

使用 parfeval 在并行池中的三个工作单元上建立数据队列。connectToWorker 辅助函数为每个工作单元分配一个唯一的 ID,在每个工作单元上创建一个 PollableDataQueue,并使用 helperClientQueue 队列将数据队列发送到客户端。然后,工作单元等待客户端的指令来开始数据生成。

wkrF(1:3) = parallel.FevalFuture;
for ID = 1:3
    wkrF(ID) = parfeval(@connectToWorker,0,clientQueue,helperClientQueue,ID);
end

在客户端,接收标有标签的工作单元队列。您现在可以使用这些队列向每个工作单元发送数据。

allWkrQueues = struct('ID',{},'Queue',{});
for i = 1:3
    wkrQueue = poll(helperClientQueue,inf);
    allWkrQueues(wkrQueue.ID) = wkrQueue;
end

开始和停止数据生成

接下来,指示工作单元开始生成数据。

for ID = 1:3
    send(allWkrQueues(ID).Queue,"Start generating data");
end

该图显示了每个工作单元生成并发送给客户端的仪器数据。

fig.Visible="on";

生成周期为 10 秒的数据。

pause(10)

要停止在工作单元 2 上收集数据,请使用在工作单元 2 上创建的队列向工作单元发送消息。您可以观察到仪器 2 的线停止在约 0.9 秒处。

send(allWkrQueues(2).Queue,"stop");

轮询 helperClientQueue 队列以接收来自工作单元 2 的确认。

[status, ~] = poll(helperClientQueue,inf);
disp(status)
Data generation stopped on worker 2

等待其他工作单元完成计算。

wait(wkrF);

辅助函数

connectToWorker 函数在工作单元上创建 PollableDataQueue,将其发送给客户端,然后轮询 wkrQueue 队列等待来自客户端的指令。

当工作单元收到来自客户端的消息时,该函数会在工作单元上生成一个虚拟信号,仿真来自仪器的连续数据。在每个时间步骤中,工作单元使用 clientQueue 队列向客户端发送一个信号点,然后轮询 wkrQueue 队列以检查队列是否有数据。如果有数据需要接收,则工作单元停止生成数据,并向客户端发送消息确认已停止生成数据。

function connectToWorker(clientQueue,helperClientQueue,ID)
% Assign an ID to this worker.
wkrQueue.ID = ID;
% Create a PollableDataQueue on this specific worker.
wkrQueue.Queue = parallel.pool.PollableDataQueue;
% Send the queue to the client.
send(helperClientQueue,wkrQueue);

% Wait for instructions from client.
[~, OK] = poll(wkrQueue.Queue,inf);
if OK
    t = 0:0.01:4;
    step = 1;
    while step < numel(t)
        % Generate dummy instrument data.
        data_point = sin(ID*2*pi*t(step));
        % Send data to client using a data queue.
        send(clientQueue,{ID,t(step),data_point});
        % Check if worker queue has data to receive and use a timeout.
        [~, OK] = poll(wkrQueue.Queue,0.1);
        if OK
            send(helperClientQueue,sprintf("Data generation stopped on worker %d",ID));
            return
        else
            step = step + 1;
        end
    end
else
    return
end
end

定义一个函数来准备和初始化图以可视化来自工作单元的数据。为每个工作单元指定不同的生产线属性。

function [fig,p] = createPlots
fig = figure(Name="Signal from Instruments",Visible="off");
t = tiledlayout(fig,3,1);
lineColor = ["k","b","g"];
p = gobjects(1,3);
for i=1:3
    nexttile(t);
    xlabel("Time (s)");
    ylabel("Amplitude");
    title(sprintf("Instrument %d",i))
    p(i) = animatedline(NaN,NaN,Color=lineColor(i));
end
end

定义一个函数,当工作单元向客户端发送数据时更新图。

function receiveDataOnClient(p,data)
addpoints(p(data{1,1}),data{1,2},data{1,3})
drawnow limitrate;
end

另请参阅

| |

主题