主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

使用可轮询数据队列向工作单元发送消息

自 R2025a 起

此示例展示了如何在使用 parfeval 进行异步函数评估时,使用可轮询数据队列向工作单元发送数据或指令。

您可以使用 PollableDataQueue 对象在交互式并行池中的客户端和工作单元之间传输数据和消息。默认情况下,PollableDataQueue 对象只将数据发送到创建 PollableDataQueue 对象的客户端或工作单元。但是,从 R2025a 开始,您还可以创建一种 PollableDataQueue 对象类型,允许客户端或池中的任何工作单元轮询和接收数据。

此示例演示了如何使用 PollableDataQueue 对象准备工作单元,以接收您从客户端发送的数据或指令。您还可以使用 PollableDataQueue 对象在来平滑地停止在工作单元上运行的 parfeval 计算。您可以将此方法应用于任何在异步 parfeval 计算过程中需要向工作单元发送额外指令的应用程序。要查看使用 PollableDataQueue 对象控制硬件设备的示例,请参阅 并行控制硬件和采集数据

启动一个具有一个线程工作单元的并行池。

pool = parpool("Threads",1);
Starting parallel pool (parpool) using the 'Threads' profile ...
Connected to parallel pool with 1 workers.

要启用工作单元与客户端之间的通信,请创建一个 PollableDataQueue 对象,并将 Destination 参量设置为 "any"。这种类型的 PollableDataQueue 对象允许客户端和工作单元发送和接收消息。

为了简化客户端与工作单元之间的通信,创建两个 PollableDataQueue 对象。workerToClient 队列将消息从工作单元发送到客户端,而 clientToWorker 队列将消息从客户端发送到工作单元。

workerToClient = parallel.pool.PollableDataQueue(Destination="any");
clientToWorker = parallel.pool.PollableDataQueue(Destination="any");

定义在工作单元上运行的函数 processDataprocessData 函数等待来自客户端的数据,处理数据,并将状态更新发送回客户端。当收到 "stop" 消息时,该函数停止运行。

function out = processData(workerToClient,clientToWorker)
out = 0;
send(workerToClient,"Ready to receive data.");
while true
    % Wait for a message
    data = poll(clientToWorker,Inf);
    if strcmp(data,"stop")
        send(workerToClient,"Stopped processing data on worker.")
        return
    else
        response = sprintf("Data %d received.",data(1));
        send(workerToClient,response);
        out = out+data(2);
        pause(1);
    end
end
end

使用 parfeval 执行 processData 函数,并准备工作单元开始等待来自客户端的消息。parfeval 在工作单元上异步计算 processData 函数,不会阻止客户端。

future = parfeval(@processData,1,workerToClient,clientToWorker);

轮询 workerToClient 队列以接收来自工作单元的初始状态消息。

status = poll(workerToClient,inf)
status = 
"Ready to receive data."

在循环中,使用 clientToWorker 队列将数据发送到工作单元,并在发送下一个数据点之前轮询 workerToClient 队列以进行确认。

for idx = 1:5
    send(clientToWorker,[idx rand]);
    status = poll(workerToClient,inf)
end
status = 
"Data 1 received."
status = 
"Data 2 received."
status = 
"Data 3 received."
status = 
"Data 4 received."
status = 
"Data 5 received."

要停止 parfeval 计算并终止工作单元上的处理循环,请向 clientToWorker 队列发送 "stop" 消息。如果有多个工作单元从队列中接收数据,请使用 close 函数关闭队列,而不是发送多个 "stop" 信号。

send(clientToWorker,"stop");

对最终状态消息进行轮询,等待 parfeval 计算完成,然后使用 fetchOutputs 检索累积的结果。

status = poll(workerToClient,inf)
status = 
"Stopped processing data on worker."
wait(future)
out = fetchOutputs(future)
out = 
3.2311

另请参阅

函数

对象

主题