使用可轮询数据队列向工作单元发送消息
此示例展示了如何在使用 parfeval
进行异步函数评估时,使用可轮询数据队列向工作单元发送数据或指令。
您可以使用 PollableDataQueue
对象在交互式并行池中的客户端和工作单元之间传输数据和消息。默认情况下,PollableDataQueue
对象只将数据发送到创建 PollableDataQueue
对象的客户端或工作单元。但是,从 R2025a 开始,您还可以创建一种 PollableDataQueue
对象类型,允许客户端或池中的任何工作单元轮询和接收数据。
此示例演示了如何使用 PollableDataQueue
对象准备工作单元,以接收您从客户端发送的数据或指令。您还可以使用 PollableDataQueue
对象在来平滑地停止在工作单元上运行的 parfeval
计算。您可以将此方法应用于任何在异步 parfeval
计算过程中需要向工作单元发送额外指令的应用程序。要查看使用 PollableDataQueue
对象控制硬件设备的示例,请参阅 并行控制硬件和采集数据。
启动一个具有一个线程工作单元的并行池。
pool = parpool("Threads",1);
Starting parallel pool (parpool) using the 'Threads' profile ... Connected to parallel pool with 1 workers.
要启用工作单元与客户端之间的通信,请创建一个 PollableDataQueue
对象,并将 Destination
参量设置为 "any"
。这种类型的 PollableDataQueue
对象允许客户端和工作单元发送和接收消息。
为了简化客户端与工作单元之间的通信,创建两个 PollableDataQueue
对象。workerToClient
队列将消息从工作单元发送到客户端,而 clientToWorker
队列将消息从客户端发送到工作单元。
workerToClient = parallel.pool.PollableDataQueue(Destination="any"); clientToWorker = parallel.pool.PollableDataQueue(Destination="any");
定义在工作单元上运行的函数 processData
。processData
函数等待来自客户端的数据,处理数据,并将状态更新发送回客户端。当收到 "stop"
消息时,该函数停止运行。
function out = processData(workerToClient,clientToWorker) out = 0; send(workerToClient,"Ready to receive data."); while true % Wait for a message data = poll(clientToWorker,Inf); if strcmp(data,"stop") send(workerToClient,"Stopped processing data on worker.") return else response = sprintf("Data %d received.",data(1)); send(workerToClient,response); out = out+data(2); pause(1); end end end
使用 parfeval
执行 processData
函数,并准备工作单元开始等待来自客户端的消息。parfeval
在工作单元上异步计算 processData
函数,不会阻止客户端。
future = parfeval(@processData,1,workerToClient,clientToWorker);
轮询 workerToClient
队列以接收来自工作单元的初始状态消息。
status = poll(workerToClient,inf)
status = "Ready to receive data."
在循环中,使用 clientToWorker
队列将数据发送到工作单元,并在发送下一个数据点之前轮询 workerToClient
队列以进行确认。
for idx = 1:5 send(clientToWorker,[idx rand]); status = poll(workerToClient,inf) end
status = "Data 1 received."
status = "Data 2 received."
status = "Data 3 received."
status = "Data 4 received."
status = "Data 5 received."
要停止 parfeval
计算并终止工作单元上的处理循环,请向 clientToWorker
队列发送 "stop"
消息。如果有多个工作单元从队列中接收数据,请使用 close
函数关闭队列,而不是发送多个 "stop"
信号。
send(clientToWorker,"stop");
对最终状态消息进行轮询,等待 parfeval
计算完成,然后使用 fetchOutputs
检索累积的结果。
status = poll(workerToClient,inf)
status = "Stopped processing data on worker."
wait(future) out = fetchOutputs(future)
out = 3.2311