主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

对池工作单元执行数据采集和处理

自 R2025a 起

此示例展示了如何在交互式并行池上实现并行数据采集和处理管道。管道使用 PollableDataQueue 对象来促进并行池中工作单元之间的数据传输。

从 R2025a 开始,您可以创建 PollableDataQueue 对象,客户端或任何工作单元都可以轮询这些对象以获取数据。使用此类型的 PollableDataQueue 对象在管道工作流或多个工作单元需要访问单个 PollableDataQueue 对象的应用程序中传输数据或消息。

此示例演示了如何并行化图像采集和处理管道。您可以将此方法应用于任何管道工作流以加快其执行速度。

并行数据处理管道

在此示例中,您捕获了运动中的仿真摆的流图像。随后,您对每个帧进行实时图像处理和分析,以确定摆锤的中心位置。在此设置中,并行池中的一个工作单元从仿真中获取图像数据,并将数据传递给 PollableDataQueue 对象。另外两个工作单元轮询该队列以接收和处理数据,然后将其发送到另一个 PollableDataQueue 对象。第四个工作单元轮询第二个队列以接收和分析数据,最后将其发送给一个带有回调函数的 DataQueue 对象,该函数将结果显示在客户端。

Flowchart illustrating parallel data acquisition and processing pipeline stages. The pipeline includes four stages: Stage 1 consists of one worker that captures the images of the pendulum in motion, then sends data to a pollable data queue. Stage 2 consists of two workers that poll pollable data queue 1 for an image, process the image, then send the processed image to pollable data queues 2. Stage 3 consists of one worker that analyses the image to determine the center of the pendulum, then sends the results to a data queue for display on the client.

设置并行环境和数据队列

启动一个带有四个工作单元的并行池。

pool = parpool("Processes",4);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 4 workers.

准备并初始化绘图,以可视化来自工作单元的数据。prepareDisplay 函数在本示例的末尾定义。

[fig,p,himage] = prepareDisplay;

要启用工作单元之间的数据传输,请创建 PollableDataQueue 对象,并将 Destination 参量设置为 "any"。当您将 Destination 参量设置为 "any" 时,客户端或任何工作单元都可以轮询结果队列以获取数据。

创建两个 PollableDataQueue 对象,将 Destination 设置为 "any",用于管道的数据采集和处理阶段。

acquisitionToProcessingPdq = parallel.pool.PollableDataQueue(destination="any");
processingToAnalysisPdq = parallel.pool.PollableDataQueue(destination="any");

创建一个额外的 PollableDataQueue 对象,以便客户端向执行数据采集的工作单元发送消息。

stopSignalPdq = parallel.pool.PollableDataQueue(destination="any");

要在客户端上可视化处理后的图像和结果数据,请创建一个 DataQueue 对象,即 displayResultsDq。当工作单元向 displayResultsDq 对象发送数据时,使用 afterEach 函数运行 displayOnClient 函数。displayOnClient 函数在本示例的末尾定义。

displayResultsDq = parallel.pool.DataQueue; 
afterEach(displayResultsDq,@(results) displayOnClient(p,himage,results));

定义并行管道的函数

为了管理数据流并顺利停止管道中的计算,为每个阶段定义不同的函数:数据采集、数据处理和数据分析。每个函数都使用特定的队列来促进工作单元之间的数据传输和通信。

数据采集阶段

acquireData 函数中,一个工作单元捕获数据并将其发送到 acquisitionToProcessingPdq 队列。工作单元不断获取数据,直到从 stopSignalPdq 队列收到停止信号,然后关闭 acquisitionToProcessingPdq 队列。工作单元使用 generateFrames 函数以指定的速率仿真数据采集。generateFrames 函数作为支持文件附加到此示例。

function acquireData(stopSignalPdq,acquisitionToProcessingPdq)
while isempty(poll(stopSignalPdq))
    rawData = generateFrames(10);
    send(acquisitionToProcessingPdq,rawData);
end
close(acquisitionToProcessingPdq);
clear generateFrames
send(stopSignalPdq,"Data acquisition stopped")

end

数据处理阶段

processData 函数中,一个工作单元轮询 acquisitionToProcessingPdq 队列以获取新数据,处理这些数据,并将结果发送到 processingToAnalysisPdq 队列。循环一直持续到运行数据采集阶段的工作单元关闭 acquisitionToProcessingPdq 队列为止。当 acquisitionToProcessingPdq 队列关闭且队列中无可用数据时,poll 将状态指示符 OK 设置为 false。然后,工作单元停止等待数据,并关闭 processingToAnalysisPdq 队列。processFrames 函数作为支持文件附加到此示例。

function processData(acquisitionToProcessingPdq,processingToAnalysisPdq)
OK = true;
while OK
    [rawFrame,OK] = poll(acquisitionToProcessingPdq,Inf);
    if OK
    processedFrames = processFrames(rawFrame);
    send(processingToAnalysisPdq,processedFrames);
    end
end
close(processingToAnalysisPdq);
end

最终数据分析阶段

analyzeData 函数中,一个工作单元轮询 processingToAnalysisPdq 队列以获取已处理的数据,对其进行分析,并将结果发送到 displayResultsDq 队列。循环一直持续,直到运行前一个阶段的工作单元关闭 processingToAnalysisPdq 队列,并且工作单元清空队列为止。当 processingToAnalysisPdq 队列关闭且队列中无可用数据时,poll 将状态指示符 OK 设置为 false。然后,工作单元停止等待数据。findPendulumCenters 函数作为支持文件附加到此示例。

function allCentroids = analyzeData(processingToAnalysisPdq,displayResultsDq)
idx = 0;
OK = true;
while OK
    [processedFrames,OK] = poll(processingToAnalysisPdq,Inf);
    if OK
        idx = idx+1;
        results = findPendulumCenters(processedFrames);
        allCentroids(idx,:) = results.centroids;
        send(displayResultsDq,results);
    end
end
end

启动和停止数据采集与分析

要在每个工作单元上执行不同的函数,请使用 parfeval 函数。parfeval 允许您在不阻止 MATLAB® 的情况下异步运行任务。

指示工作单元开始获取数据。

captureF = parfeval(@acquireData,0,stopSignalPdq,acquisitionToProcessingPdq);

指示两个工作单元执行数据处理函数。

processFOne = parfeval(@processData,0,acquisitionToProcessingPdq,processingToAnalysisPdq);
processFTwo = parfeval(@processData,0,acquisitionToProcessingPdq,processingToAnalysisPdq);

指示最终工作单元执行数据分析并将结果发送给客户端。

analyzeF = parfeval(@analyzeData,1,processingToAnalysisPdq,displayResultsDq);

此图显示了输入帧、处理帧以及工作单元的分析结果。

fig.Visible="on";

parfeval 函数不会阻止 MATLAB,因此您可以在计算过程中继续工作。工作单元并行处理管道的不同阶段,并在结果可用后立即将中间结果发送给客户端。

在固定时间内收集并分析数据。

pause(10);

stopSignalPdq 队列发送消息。运行数据采集的工作单元会定期轮询此队列以获取消息。当工作单元收到消息时,工作单元停止数据采集并关闭 acquisitionToProcessingPdq 队列。

send(stopSignalPdq,"stop");

等待管道中的最后一个工作单元完成其 parfeval 计算。

wait(analyzeF);

确认数据采集工作单元已成功停止采集数据。

status = poll(stopSignalPdq)
status = 
"Data acquisition stopped"

使用 fetchOutputs 函数从 analyzeF 未来对象中检索结果。通过将一个圆与摆的中心点相连来计算摆长,并绘制结果。有关详细信息,请参阅附在此示例中的支持文件中的 calculateAndPlotLength 函数。

allCentroids = fetchOutputs(analyzeF);
calculateAndPlotLength(allCentroids);

支持函数

displayOnClient

displayOnClient 函数更新一个绘制检测到的摆质心并更新图像帧的图形。它将新的质心坐标添加到绘图中,并使用最新的输入帧、感兴趣区域 (ROI) 和处理后的帧刷新摆锤图像。

function displayOnClient(p,himage,results)
centroids = results.centroids;
p.XData = [p.XData centroids(1)];
p.YData = [p.YData centroids(2)];

himage(1).CData = results.inputFrame;
himage(2).CData = results.roi;
himage(3).CData = results.processedFrame;
drawnow limitrate nocallbacks;
end

prepareDisplay

prepareDisplay 函数用于创建一个带有平铺布局的图形窗口,用于显示与摆动跟踪相关的图像和图表。该函数用于创建一个用于跟踪摆锤中心的图表,并初始化图像占位符以显示图像处理的各个阶段。

function [fig,p,himage] = prepareDisplay
fig = figure(Name="Images from Camera",Visible="off");
tiledlayout(fig,3,3)
nexttile(4,[2 3])
p = plot(NaN,NaN,"m.");
axis ij;
axis equal;
xlabel("x");
ylabel("y");
title("Pendulum Centers");

himage = gobjects(1,3);
titleStr = ["Pendulum Simulation","Cropped Region","Segmented Pendulum"];
for n = 1:3
nexttile(n)
himage(n) = imshow(rand(480,640));
title(titleStr(n))
end
end

另请参阅

函数

对象

主题