主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

使用可轮询数据队列在工作单元之间传输数据

自 R2025a 起

此示例展示了如何在使用 parfeval 进行异步函数评估时,使用可轮询数据队列在工作单元之间传输数据。

您可以使用 PollableDataQueue 对象在交互式并行池中的客户端和工作单元之间传输数据和消息。默认情况下,PollableDataQueue 对象只将数据发送到创建 PollableDataQueue 对象的客户端或工作单元。但是,从 R2025a 开始,您还可以创建一种 PollableDataQueue 对象类型,允许客户端或池中的任何工作单元轮询和接收数据。

此示例演示了如何使用 PollableDataQueue 对象设置工作单元,以便它们相互发送和接收数据。您还可以使用 PollableDataQueue 对象在来平滑地停止在工作单元上运行的 parfeval 计算。您可以将此方法应用于任何在异步 parfeval 计算过程中需要工作单元之间进行通信的应用程序。要查看一个示例,展示如何在数据采集和处理工作流中使用此方法,请参阅 对池工作单元执行数据采集和处理

启动一个至少有三个线程工作单元的并行池。

pool = parpool("Threads");
Starting parallel pool (parpool) using the 'Threads' profile ...
Connected to parallel pool with 6 workers.

要启用工作单元之间的通信,请创建一个 PollableDataQueue 对象,并将 Destination 参量设置为 "any"。这种类型的 PollableDataQueue 对象允许任何工作单元发送和接收消息。

queue = parallel.pool.PollableDataQueue(Destination="any");

为第一个工作单元定义一个函数。firstWorkerFcn 函数处理输入数据的每个元素,将其翻倍,然后通过队列将结果发送给下一个工作单元。处理完所有数据后,该函数发送一个 "stop" 信号。如果有多个工作单元从队列中接收数据,请使用 close 函数关闭队列,而不是发送多个 "stop" 信号。

function firstWorkerFcn(workerQueue,inData)
for idx = 1:numel(inData)
    initialResult = inData(idx)*2;
    send(workerQueue,initialResult);
end
send(workerQueue,"stop");
end

为第二个工作单元定义一个函数。secondWorkerFcn 函数不断轮询队列中的数据,处理每个接收到的值,并存储结果。该函数在接收到 "stop" 信号后停止处理。

function finalResults = secondWorkerFcn(workerQueue)
count = 0;
while true
    data = poll(workerQueue,Inf);
    if strcmp(data,"stop")
        break;
    end
    count = count+1;
    finalResults(count,:) = [data data+1];
end
end

创建输入数据,并使用 parfeval 以异步方式执行工作单元函数。第一个工作单元处理输入数据,第二个工作单元接收并进一步处理结果。

inData = 1:5;
futures(1) = parfeval(@firstWorkerFcn,0,queue,inData);
futures(2) = parfeval(@secondWorkerFcn,1,queue);

使用 wait 等待两个工作单元完成任务,然后使用 fetchOutputs 从第二个工作单元中检索最终结果。

wait(futures);
finalResults = fetchOutputs(futures(2));

可视化数据在工作单元中的流动情况。

plotWorkerOutput(inData,finalResults);

辅助函数

定义一个函数来可视化数据在工作单元中的移动。plotWorkerOutput 函数绘制输入数据和两个工作单元的结果,显示每个步骤的转换。

function plotWorkerOutput(inData,finalResults)
c = ["r","g","b","c","m"];
figure;
hold on;
for idx = 1:numel(inData)
    plot([1, 2, 3],[inData(idx),finalResults(idx,1),finalResults(idx,2)],"-o"+c(idx),MarkerFaceColor=c(idx),DisplayName="Idx "+num2str(idx));
end
hold off;
ylabel("Output");
xticklabels(["","Client","Worker 1","Worker 2"]);
title("Data Movement Through Workers");
legend
grid on;
xticks(0:4);
xlim([0 4]);
end

另请参阅

函数

对象

主题