使用可轮询数据队列在工作单元之间传输数据
此示例展示了如何在使用 parfeval
进行异步函数评估时,使用可轮询数据队列在工作单元之间传输数据。
您可以使用 PollableDataQueue
对象在交互式并行池中的客户端和工作单元之间传输数据和消息。默认情况下,PollableDataQueue
对象只将数据发送到创建 PollableDataQueue
对象的客户端或工作单元。但是,从 R2025a 开始,您还可以创建一种 PollableDataQueue
对象类型,允许客户端或池中的任何工作单元轮询和接收数据。
此示例演示了如何使用 PollableDataQueue
对象设置工作单元,以便它们相互发送和接收数据。您还可以使用 PollableDataQueue
对象在来平滑地停止在工作单元上运行的 parfeval
计算。您可以将此方法应用于任何在异步 parfeval
计算过程中需要工作单元之间进行通信的应用程序。要查看一个示例,展示如何在数据采集和处理工作流中使用此方法,请参阅 对池工作单元执行数据采集和处理。
启动一个至少有三个线程工作单元的并行池。
pool = parpool("Threads");
Starting parallel pool (parpool) using the 'Threads' profile ... Connected to parallel pool with 6 workers.
要启用工作单元之间的通信,请创建一个 PollableDataQueue
对象,并将 Destination
参量设置为 "any"
。这种类型的 PollableDataQueue
对象允许任何工作单元发送和接收消息。
queue = parallel.pool.PollableDataQueue(Destination="any");
为第一个工作单元定义一个函数。firstWorkerFcn
函数处理输入数据的每个元素,将其翻倍,然后通过队列将结果发送给下一个工作单元。处理完所有数据后,该函数发送一个 "stop"
信号。如果有多个工作单元从队列中接收数据,请使用 close
函数关闭队列,而不是发送多个 "stop"
信号。
function firstWorkerFcn(workerQueue,inData) for idx = 1:numel(inData) initialResult = inData(idx)*2; send(workerQueue,initialResult); end send(workerQueue,"stop"); end
为第二个工作单元定义一个函数。secondWorkerFcn
函数不断轮询队列中的数据,处理每个接收到的值,并存储结果。该函数在接收到 "stop"
信号后停止处理。
function finalResults = secondWorkerFcn(workerQueue) count = 0; while true data = poll(workerQueue,Inf); if strcmp(data,"stop") break; end count = count+1; finalResults(count,:) = [data data+1]; end end
创建输入数据,并使用 parfeval
以异步方式执行工作单元函数。第一个工作单元处理输入数据,第二个工作单元接收并进一步处理结果。
inData = 1:5; futures(1) = parfeval(@firstWorkerFcn,0,queue,inData); futures(2) = parfeval(@secondWorkerFcn,1,queue);
使用 wait
等待两个工作单元完成任务,然后使用 fetchOutputs
从第二个工作单元中检索最终结果。
wait(futures); finalResults = fetchOutputs(futures(2));
可视化数据在工作单元中的流动情况。
plotWorkerOutput(inData,finalResults);
辅助函数
定义一个函数来可视化数据在工作单元中的移动。plotWorkerOutput
函数绘制输入数据和两个工作单元的结果,显示每个步骤的转换。
function plotWorkerOutput(inData,finalResults) c = ["r","g","b","c","m"]; figure; hold on; for idx = 1:numel(inData) plot([1, 2, 3],[inData(idx),finalResults(idx,1),finalResults(idx,2)],"-o"+c(idx),MarkerFaceColor=c(idx),DisplayName="Idx "+num2str(idx)); end hold off; ylabel("Output"); xticklabels(["","Client","Worker 1","Worker 2"]); title("Data Movement Through Workers"); legend grid on; xticks(0:4); xlim([0 4]); end