Interactively Import and Process Data in Parallel
This example shows how to import and process data simultaneously in an interactive parallel pool. To streamline your workflow, you can overlap parfeval
and parfor
computations on the same parallel pool.
In this example, you import audio signals in the background with parfeval
and simultaneously perform some basic signal processing with parfor
. The data import and processing approach described in this example is generally applicable and is not reliant on the specific details of the import and process functions.
This image provides a summary of the order of computations.
Set Up
To simulate importing audio from a database, the example generates audio data using the acquireAudio
function, which is defined in a supporting file. The acquireAudio
function also partitions the audio into frames to facilitate parallelized audio processing. Before you start, specify the number of audio files to import, as well as the duration and sample rate of the audio. Partition the audio into 30 frames.
numAudio = 3; audioDuration = 300; sampleRate = 44100; numFrames = 30;
Start a parallel pool with six process workers.
pool = parpool("Processes",6);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 6 workers.
The example stores the processed data in the parallel pool ValueStore
object. To provide a visual representation of the computation progress, use the waitbar
function to create a simple user interface. Set up an update function called handleValueStoreEntry
to run each time an entry is added to the ValueStore
object. The handleValueStoreEntry
function, defined at the end of the example, uses persistent
to store information about the parfor
iterations. To initialize the persistent variables, run the handleValueStoreEntry
function.
analysisWaitBar = waitbar(0,"Waiting for data...",Name="Analyzing Audio Data");
store = pool.ValueStore; store.KeyUpdatedFcn = @(store,key) handleValueStoreEntry(store,key); handleValueStoreEntry(numAudio,numFrames,analysisWaitBar);
Wait bar counter reset to 0.
Acquire and Process Data
Acquire and automatically process the audio data iteratively. To import the data, schedule the acquireAudio
function to run asynchronously with parfeval
. To process the data in parallel, use the processAudio
function in a parfor
-loop. The processAudio
function is defined in a supporting file to this example.
To minimize the waiting time for workers to receive data, stagger the computations. Submit a parfeval
computation to acquire the first audio before you start the for
-loop.
importFuture = parfeval(@acquireAudio,1,audioDuration,numFrames,sampleRate); for idx = 1:numAudio % Retrieve audio data from the parfeval computation. audio = fetchOutputs(importFuture); % Schedule the next parfeval computation to run in the background. if idx < numAudio importFuture = parfeval(@acquireAudio,1,audioDuration,numFrames,sampleRate); end % Process the audio data. parfor frame = 1:numFrames store = getCurrentValueStore; key = strcat("Audio_",num2str(idx),"_Frame_",num2str(frame)); % Perform some signal processing. inputSignal = audio{1,frame}; store(key) = processAudio(inputSignal,sampleRate); end end
Retrieve Processed Data
You can retrieve the processed audio from the ValueStore
object for further computations. For example, retrieve and plot the first frame of each audio using the plotFrame
function defined at the end of the example.
plotData = [store("Audio_1_Frame_1"),store("Audio_2_Frame_1"), ... store("Audio_3_Frame_1")]; t = (0:length(plotData)-1)/sampleRate; plotFrames(t,plotData);
Clean Up
Delete the wait bar and pool after use.
delete(analysisWaitBar); delete(pool);
Parallel pool using the 'Processes' profile is shutting down.
Helper Functions
Update a wait bar when an entry is added to the pool ValueStore
by using persistent variables to perform the counting.
function handleValueStoreEntry(numAudio,numFrames,analysisWaitBar) persistent count currentAudio nAudio nFrames bar if nargin == 3 % Initialize counting variables. count = 0; currentAudio = 1; nAudio = numAudio; nFrames = numFrames; bar = analysisWaitBar; fprintf("Wait bar counter reset to 0.") else count = count + 1; progress = count/nFrames; waitbar(progress,bar, ... sprintf("Processing audio %d of %d",currentAudio,nAudio)) if currentAudio==nAudio && progress==1 waitbar(progress,bar,"Audio import and processing complete") end if count == nFrames currentAudio = currentAudio + 1; count = 0; end end end
Define a function to plot the frames for each audio file.
function plotFrames(t,plotData) fig = figure(Name="First Frame of Each Audio"); tl = tiledlayout(fig,3,1); for i=1:3 nexttile(tl); plot(t,plotData(:,i)); title(sprintf("Audio %d",i)) end title(tl,"First Frame of Each Audio") xlabel(tl,"Time (s)"); ylabel(tl,"Amplitude"); end