以交互方式并行导入和处理数据
此示例显示如何在交互式并行池中同时导入和处理数据。为了简化您的工作流,您可以在同一个并行池上重叠parfeval
和parfor
计算。
在此示例中,您使用 parfeval
在后台导入音频信号,并同时使用 parfor
执行一些基本的信号处理。本示例中描述的数据导入和处理方法具有普遍适用性,并且不依赖于导入和处理函数的具体细节。
该图提供了计算顺序的摘要。
设置
为了仿真从数据库导入音频,该示例使用支持文件中定义的 acquireAudio
函数生成音频数据。acquireAudio
函数还将音频划分成几帧,以便于并行处理音频。开始之前,请指定要导入的音频文件的数量,以及音频的持续时间和采样率。将音频划分 30 帧。
numAudio = 3; audioDuration = 300; sampleRate = 44100; numFrames = 30;
启动一个有六个进程工作单元的并行池。
pool = parpool("Processes",6);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 6 workers.
该示例将处理后的数据存储在并行池 ValueStore
对象中。为了提供计算进度的直观表示,请使用 waitbar
函数创建一个简单的用户界面。设置一个名为 handleValueStoreEntry
的更新函数,每次向 ValueStore
对象添加条目时运行该函数。示例末尾定义的 handleValueStoreEntry
函数使用 persistent
来存储有关 parfor
迭代的信息。要初始化持久变量,请运行 handleValueStoreEntry
函数。
analysisWaitBar = waitbar(0,"Waiting for data...",Name="Analyzing Audio Data");
store = pool.ValueStore; store.KeyUpdatedFcn = @(store,key) handleValueStoreEntry(store,key); handleValueStoreEntry(numAudio,numFrames,analysisWaitBar);
Wait bar counter reset to 0.
获取和处理数据
迭代获取并自动处理音频数据。要导入数据,请安排 acquireAudio
函数与 parfeval
异步运行。要并行处理数据,请在 parfor
循环中使用 processAudio
函数。processAudio
函数在此示例的支持文件中定义。
为了最大限度地减少工作单元接收数据的等待时间,请错开计算时间。在开始 parfeval
循环之前,提交 for
计算以获取第一个音频。
importFuture = parfeval(@acquireAudio,1,audioDuration,numFrames,sampleRate); for idx = 1:numAudio % Retrieve audio data from the parfeval computation. audio = fetchOutputs(importFuture); % Schedule the next parfeval computation to run in the background. if idx < numAudio importFuture = parfeval(@acquireAudio,1,audioDuration,numFrames,sampleRate); end % Process the audio data. parfor frame = 1:numFrames store = getCurrentValueStore; key = strcat("Audio_",num2str(idx),"_Frame_",num2str(frame)); % Perform some signal processing. inputSignal = audio{1,frame}; store(key) = processAudio(inputSignal,sampleRate); end end
检索已处理的数据
您可以从 ValueStore
对象中检索已处理过的音频以进行进一步的计算。例如,使用示例末尾定义的 plotFrame
函数检索并绘制每个音频的第一帧。
plotData = [store("Audio_1_Frame_1"),store("Audio_2_Frame_1"), ... store("Audio_3_Frame_1")]; t = (0:length(plotData)-1)/sampleRate; plotFrames(t,plotData);
清理
使用后删除等待栏和等待池。
delete(analysisWaitBar); delete(pool);
Parallel pool using the 'Processes' profile is shutting down.
辅助函数
当将条目添加到池 ValueStore
时,使用持久变量执行计数来更新等待栏。
function handleValueStoreEntry(numAudio,numFrames,analysisWaitBar) persistent count currentAudio nAudio nFrames bar if nargin == 3 % Initialize counting variables. count = 0; currentAudio = 1; nAudio = numAudio; nFrames = numFrames; bar = analysisWaitBar; fprintf("Wait bar counter reset to 0.") else count = count + 1; progress = count/nFrames; waitbar(progress,bar, ... sprintf("Processing audio %d of %d",currentAudio,nAudio)) if currentAudio==nAudio && progress==1 waitbar(progress,bar,"Audio import and processing complete") end if count == nFrames currentAudio = currentAudio + 1; count = 0; end end end
定义一个函数来绘制每个音频文件的帧。
function plotFrames(t,plotData) fig = figure(Name="First Frame of Each Audio"); tl = tiledlayout(fig,3,1); for i=1:3 nexttile(tl); plot(t,plotData(:,i)); title(sprintf("Audio %d",i)) end title(tl,"First Frame of Each Audio") xlabel(tl,"Time (s)"); ylabel(tl,"Amplitude"); end