对池工作单元执行数据采集和处理
此示例展示了如何在交互式并行池上实现并行数据采集和处理管道。管道使用 PollableDataQueue
对象来促进并行池中工作单元之间的数据传输。
从 R2025a 开始,您可以创建 PollableDataQueue
对象,客户端或任何工作单元都可以轮询这些对象以获取数据。使用此类型的 PollableDataQueue
对象在管道工作流或多个工作单元需要访问单个 PollableDataQueue
对象的应用程序中传输数据或消息。
此示例演示了如何并行化图像采集和处理管道。您可以将此方法应用于任何管道工作流以加快其执行速度。
并行数据处理管道
在此示例中,您捕获了运动中的仿真摆的流图像。随后,您对每个帧进行实时图像处理和分析,以确定摆锤的中心位置。在此设置中,并行池中的一个工作单元从仿真中获取图像数据,并将数据传递给 PollableDataQueue
对象。另外两个工作单元轮询该队列以接收和处理数据,然后将其发送到另一个 PollableDataQueue
对象。第四个工作单元轮询第二个队列以接收和分析数据,最后将其发送给一个带有回调函数的 DataQueue
对象,该函数将结果显示在客户端。
设置并行环境和数据队列
启动一个带有四个工作单元的并行池。
pool = parpool("Processes",4);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 4 workers.
准备并初始化绘图,以可视化来自工作单元的数据。prepareDisplay
函数在本示例的末尾定义。
[fig,p,himage] = prepareDisplay;
要启用工作单元之间的数据传输,请创建 PollableDataQueue
对象,并将 Destination
参量设置为 "any"
。当您将 Destination
参量设置为 "any"
时,客户端或任何工作单元都可以轮询结果队列以获取数据。
创建两个 PollableDataQueue
对象,将 Destination
设置为 "any"
,用于管道的数据采集和处理阶段。
acquisitionToProcessingPdq = parallel.pool.PollableDataQueue(destination="any"); processingToAnalysisPdq = parallel.pool.PollableDataQueue(destination="any");
创建一个额外的 PollableDataQueue
对象,以便客户端向执行数据采集的工作单元发送消息。
stopSignalPdq = parallel.pool.PollableDataQueue(destination="any");
要在客户端上可视化处理后的图像和结果数据,请创建一个 DataQueue
对象,即 displayResultsDq
。当工作单元向 displayResultsDq
对象发送数据时,使用 afterEach
函数运行 displayOnClient
函数。displayOnClient
函数在本示例的末尾定义。
displayResultsDq = parallel.pool.DataQueue; afterEach(displayResultsDq,@(results) displayOnClient(p,himage,results));
定义并行管道的函数
为了管理数据流并顺利停止管道中的计算,为每个阶段定义不同的函数:数据采集、数据处理和数据分析。每个函数都使用特定的队列来促进工作单元之间的数据传输和通信。
数据采集阶段
在 acquireData
函数中,一个工作单元捕获数据并将其发送到 acquisitionToProcessingPdq
队列。工作单元不断获取数据,直到从 stopSignalPdq
队列收到停止信号,然后关闭 acquisitionToProcessingPdq
队列。工作单元使用 generateFrames
函数以指定的速率仿真数据采集。generateFrames
函数作为支持文件附加到此示例。
function acquireData(stopSignalPdq,acquisitionToProcessingPdq) while isempty(poll(stopSignalPdq)) rawData = generateFrames(10); send(acquisitionToProcessingPdq,rawData); end close(acquisitionToProcessingPdq); clear generateFrames send(stopSignalPdq,"Data acquisition stopped") end
数据处理阶段
在 processData
函数中,一个工作单元轮询 acquisitionToProcessingPdq
队列以获取新数据,处理这些数据,并将结果发送到 processingToAnalysisPdq
队列。循环一直持续到运行数据采集阶段的工作单元关闭 acquisitionToProcessingPdq
队列为止。当 acquisitionToProcessingPdq
队列关闭且队列中无可用数据时,poll
将状态指示符 OK
设置为 false
。然后,工作单元停止等待数据,并关闭 processingToAnalysisPdq
队列。processFrames
函数作为支持文件附加到此示例。
function processData(acquisitionToProcessingPdq,processingToAnalysisPdq) OK = true; while OK [rawFrame,OK] = poll(acquisitionToProcessingPdq,Inf); if OK processedFrames = processFrames(rawFrame); send(processingToAnalysisPdq,processedFrames); end end close(processingToAnalysisPdq); end
最终数据分析阶段
在 analyzeData
函数中,一个工作单元轮询 processingToAnalysisPdq
队列以获取已处理的数据,对其进行分析,并将结果发送到 displayResultsDq
队列。循环一直持续,直到运行前一个阶段的工作单元关闭 processingToAnalysisPdq
队列,并且工作单元清空队列为止。当 processingToAnalysisPdq
队列关闭且队列中无可用数据时,poll
将状态指示符 OK
设置为 false
。然后,工作单元停止等待数据。findPendulumCenters
函数作为支持文件附加到此示例。
function allCentroids = analyzeData(processingToAnalysisPdq,displayResultsDq) idx = 0; OK = true; while OK [processedFrames,OK] = poll(processingToAnalysisPdq,Inf); if OK idx = idx+1; results = findPendulumCenters(processedFrames); allCentroids(idx,:) = results.centroids; send(displayResultsDq,results); end end end
启动和停止数据采集与分析
要在每个工作单元上执行不同的函数,请使用 parfeval
函数。parfeval
允许您在不阻止 MATLAB® 的情况下异步运行任务。
指示工作单元开始获取数据。
captureF = parfeval(@acquireData,0,stopSignalPdq,acquisitionToProcessingPdq);
指示两个工作单元执行数据处理函数。
processFOne = parfeval(@processData,0,acquisitionToProcessingPdq,processingToAnalysisPdq); processFTwo = parfeval(@processData,0,acquisitionToProcessingPdq,processingToAnalysisPdq);
指示最终工作单元执行数据分析并将结果发送给客户端。
analyzeF = parfeval(@analyzeData,1,processingToAnalysisPdq,displayResultsDq);
此图显示了输入帧、处理帧以及工作单元的分析结果。
fig.Visible="on";
parfeval
函数不会阻止 MATLAB,因此您可以在计算过程中继续工作。工作单元并行处理管道的不同阶段,并在结果可用后立即将中间结果发送给客户端。
在固定时间内收集并分析数据。
pause(10);
向 stopSignalPdq
队列发送消息。运行数据采集的工作单元会定期轮询此队列以获取消息。当工作单元收到消息时,工作单元停止数据采集并关闭 acquisitionToProcessingPdq
队列。
send(stopSignalPdq,"stop");
等待管道中的最后一个工作单元完成其 parfeval
计算。
wait(analyzeF);
确认数据采集工作单元已成功停止采集数据。
status = poll(stopSignalPdq)
status = "Data acquisition stopped"
使用 fetchOutputs
函数从 analyzeF
未来对象中检索结果。通过将一个圆与摆的中心点相连来计算摆长,并绘制结果。有关详细信息,请参阅附在此示例中的支持文件中的 calculateAndPlotLength
函数。
allCentroids = fetchOutputs(analyzeF); calculateAndPlotLength(allCentroids);
支持函数
displayOnClient
displayOnClient
函数更新一个绘制检测到的摆质心并更新图像帧的图形。它将新的质心坐标添加到绘图中,并使用最新的输入帧、感兴趣区域 (ROI) 和处理后的帧刷新摆锤图像。
function displayOnClient(p,himage,results) centroids = results.centroids; p.XData = [p.XData centroids(1)]; p.YData = [p.YData centroids(2)]; himage(1).CData = results.inputFrame; himage(2).CData = results.roi; himage(3).CData = results.processedFrame; drawnow limitrate nocallbacks; end
prepareDisplay
prepareDisplay
函数用于创建一个带有平铺布局的图形窗口,用于显示与摆动跟踪相关的图像和图表。该函数用于创建一个用于跟踪摆锤中心的图表,并初始化图像占位符以显示图像处理的各个阶段。
function [fig,p,himage] = prepareDisplay fig = figure(Name="Images from Camera",Visible="off"); tiledlayout(fig,3,3) nexttile(4,[2 3]) p = plot(NaN,NaN,"m."); axis ij; axis equal; xlabel("x"); ylabel("y"); title("Pendulum Centers"); himage = gobjects(1,3); titleStr = ["Pendulum Simulation","Cropped Region","Segmented Pendulum"]; for n = 1:3 nexttile(n) himage(n) = imshow(rand(480,640)); title(titleStr(n)) end end
另请参阅
函数
对象
主题
- 使用可轮询数据队列在工作单元之间传输数据
- 使用可轮询数据队列向工作单元发送消息
- 并行控制硬件和采集数据
- Image Processing Toolbox 快速入门 (Image Processing Toolbox)