主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

以交互方式并行导入和处理数据

自 R2023b 起

此示例显示如何在交互式并行池中同时导入和处理数据。为了简化您的工作流,您可以在同一个并行池上重叠parfevalparfor计算。

在此示例中,您使用 parfeval 在后台导入音频信号,并同时使用 parfor 执行一些基本的信号处理。本示例中描述的数据导入和处理方法具有普遍适用性,并且不依赖于导入和处理函数的具体细节。

该图提供了计算顺序的摘要。

设置

为了仿真从数据库导入音频,该示例使用支持文件中定义的 acquireAudio 函数生成音频数据。acquireAudio 函数还将音频划分成几帧,以便于并行处理音频。开始之前,请指定要导入的音频文件的数量,以及音频的持续时间和采样率。将音频划分 30 帧。

numAudio = 3;
audioDuration = 300;
sampleRate = 44100;
numFrames = 30;

启动一个有六个进程工作单元的并行池。

pool = parpool("Processes",6);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 6 workers.

该示例将处理后的数据存储在并行池 ValueStore 对象中。为了提供计算进度的直观表示,请使用 waitbar 函数创建一个简单的用户界面。设置一个名为 handleValueStoreEntry 的更新函数,每次向 ValueStore 对象添加条目时运行该函数。示例末尾定义的 handleValueStoreEntry 函数使用 persistent 来存储有关 parfor 迭代的信息。要初始化持久变量,请运行 handleValueStoreEntry 函数。

analysisWaitBar = waitbar(0,"Waiting for data...",Name="Analyzing Audio Data");

store = pool.ValueStore;
store.KeyUpdatedFcn = @(store,key) handleValueStoreEntry(store,key);

handleValueStoreEntry(numAudio,numFrames,analysisWaitBar);
Wait bar counter reset to 0.

获取和处理数据

迭代获取并自动处理音频数据。要导入数据,请安排 acquireAudio 函数与 parfeval 异步运行。要并行处理数据,请在 parfor 循环中使用 processAudio 函数。processAudio 函数在此示例的支持文件中定义。

为了最大限度地减少工作单元接收数据的等待时间,请错开计算时间。在开始 parfeval 循环之前,提交 for 计算以获取第一个音频。

importFuture = parfeval(@acquireAudio,1,audioDuration,numFrames,sampleRate);
for idx = 1:numAudio
    % Retrieve audio data from the parfeval computation.
    audio = fetchOutputs(importFuture);
    % Schedule the next parfeval computation to run in the background.
    if idx < numAudio
        importFuture = parfeval(@acquireAudio,1,audioDuration,numFrames,sampleRate);
    end
    % Process the audio data.
    parfor frame = 1:numFrames
        store = getCurrentValueStore;
        key = strcat("Audio_",num2str(idx),"_Frame_",num2str(frame));
        % Perform some signal processing.
        inputSignal = audio{1,frame};
        store(key) = processAudio(inputSignal,sampleRate);
    end
end

检索已处理的数据

您可以从 ValueStore 对象中检索已处理过的音频以进行进一步的计算。例如,使用示例末尾定义的 plotFrame 函数检索并绘制每个音频的第一帧。

plotData = [store("Audio_1_Frame_1"),store("Audio_2_Frame_1"), ...
    store("Audio_3_Frame_1")];
t = (0:length(plotData)-1)/sampleRate;
plotFrames(t,plotData);

清理

使用后删除等待栏和等待池。

delete(analysisWaitBar);
delete(pool);
Parallel pool using the 'Processes' profile is shutting down.

辅助函数

当将条目添加到池 ValueStore 时,使用持久变量执行计数来更新等待栏。

function handleValueStoreEntry(numAudio,numFrames,analysisWaitBar)
persistent count currentAudio nAudio nFrames bar
if nargin == 3
    % Initialize counting variables.
    count = 0;
    currentAudio = 1;
    nAudio = numAudio;
    nFrames = numFrames;
    bar = analysisWaitBar;
    fprintf("Wait bar counter reset to 0.")
else
    count = count + 1;
    progress = count/nFrames;
    waitbar(progress,bar, ...
        sprintf("Processing audio %d of %d",currentAudio,nAudio))

    if currentAudio==nAudio && progress==1
        waitbar(progress,bar,"Audio import and processing complete")
    end

    if  count == nFrames
        currentAudio = currentAudio + 1;
        count = 0;
    end
end
end

定义一个函数来绘制每个音频文件的帧。

function plotFrames(t,plotData)
fig = figure(Name="First Frame of Each Audio");
tl = tiledlayout(fig,3,1);
for i=1:3
    nexttile(tl);
    plot(t,plotData(:,i));
    title(sprintf("Audio %d",i))
end
title(tl,"First Frame of Each Audio")
xlabel(tl,"Time (s)");
ylabel(tl,"Amplitude");
end

另请参阅

|

主题