分区并行池以优化资源使用
池分区是现有并行池的段,您可对其进行配置并用于特定的并行工作流。您可以将池分区配置为针对特定的工作单元、集群主机或 GPU 设备,而无需重新启动原始池。使用 partition 函数,您可以将分配特定工作单元到不同任务的池进行分区,或创建多个池以同时运行不同的并行工作流。您可以对本地计算机或集群池进行分区。
原始池及其分区共享相同的底层工作单元集合,这意味着使用一个池可能会延迟另一个池的工作执行。池分区继承了原始池的一些属性,对这些属性的更改将应用于所有分区:
AttachedFilesFileStoreValueStoreIdleTimeout
池分区在管理和优化资源使用方面提供了灵活性。本表中的信息可帮助您决定采用哪种池分区策略。
| 目标 | 分区策略 | 更多信息 |
|---|---|---|
| 为深度学习等 GPU 密集型应用程序分配独立的 GPU 资源。 | 使用 GPU 资源进行分区 | 按 GPU 资源对并行池进行分区 |
| 同时运行 GPU 和 CPU 处理 | 使用 GPU 资源进行分区 | 按 GPU 资源对并行池进行分区 |
| 在每个主机上执行一次设置代码 | 使用唯一的主机分区 | 按唯一主机对池进行分区 |
| 最大化内存使用效率以支持数据密集型计算 | 使用唯一的主机分区 | 按唯一主机对池进行分区 |
| 将主机的所有 CPU 用于多线程应用程序 | 使用唯一的主机分区 | 按唯一主机对池进行分区 |
| 将数据保存在工作单元中,以减少大数据分析中的数据传输开销 | 具有一个工作单元的分区池 | 针对特定工作单元中的工作单元 |
| 为特定的工作单元分配专用资源 | 具有一个工作单元的分区池 | 针对特定工作单元中的工作单元 |
| 在函数调用之间保持工作单元的状态 | 具有一个工作单元的分区池 | 针对特定工作单元中的工作单元 |
| 同时运行多个工作流,无需资源竞争。 | 将并行池分区为多个池 | 为不同工作流分区多个池 |
| 调整池大小以适应工作流 | 将并行池分区为多个池 | 为不同工作流分区多个池 |
以下各节将介绍一些高级工作流,这些工作流充分展示了池分区的全部潜力。虽然并行池分区在特定场景下可能有用,但在典型场景中可能不需要使用它们。
按 GPU 资源对并行池进行分区
如果当前的并行池可以访问 GPU 资源,并且您要在后台或具有多个 GPU 的并行池上运行任务,则无需再删除当前池并启动新池。相反,您可以创建一个池分区,每个可用 GPU 具有一个工作单元。此配置允许您在专用 GPU 池上运行 GPU 密集型应用程序。此外,您还可以返回另一个带有剩余 CPU 工作单元的分区,从而提供对 GPU 和 CPU 资源的访问。这使您能够同时在 GPU 和 CPU 池分区上执行任务,从而优化本地计算机或远程集群池上的资源使用。
分隔 GPU 资源
此示例展示了如何对现有的交互式并行池进行分区,从中划分一个专用 GPU 并行池。对于深度学习等对 GPU 要求较高的应用程序,创建一个仅限 GPU 的资源池可以专门为这些应用程序分配资源。
在此示例中,gpuCluster 配置文件请求一个带有四个 GPU 的并行池。使用 gpuCluster 配置文件启动一个由 16 个工作单元组成的并行池。
pool = parpool("gpuCluster",16);Starting parallel pool (parpool) using the 'gpuCluster' profile ... Connected to parallel pool with 16 workers.
使用 partition 函数创建一个池分区,为每个可用 GPU 配置一个工作单元,以获得最佳性能。
gpuPool = partition(pool,"MaxNumWorkersPerGPU",1);
现在,您可以将 gpuPool 池分区用于需要多 GPU 的工作流。
例如,如果您有 Deep Learning Toolbox™,则可以使用 parfor 循环在 GPU 池分区上并行训练多个深度学习网络。此代码片段展示了如何通过在 parfor 循环中训练多个网络来并行扫描迷您批处理大小的参数。要在 gpuPool 池分区上运行 parfor 循环,请将 gpuPool 池对象指定为 parfor 函数的第二个参量。要查看使用 parfor 训练多个网络的示例,请参阅 使用 parfor 训练多个深度学习网络 (Deep Learning Toolbox)。
parfor(idx = 1:numMiniBatchSizes,gpuPool) miniBatchSize = miniBatchSizes(idx); % Define the training options. options = trainingOptions("sgdm", ... MiniBatchSize=miniBatchSize, ... % Set the mini-batch size. ExecutionEnvironment=gpu,... % Train on the GPU Verbose=false); % trainedNetworks{idx} = trainnet(...,options); end
同时使用 GPU 和 CPU 资源
此示例演示了如何根据 GPU 和 CPU 任务对池进行分区,并在两个池上同时运行代码。
在此示例中,您的本地计算机上安装了 GPU。您可以为 GPU 工作单元分区出一个池,为 CPU 工作单元分区出另一个池。
在本地计算机上创建一个进程工作单元池。
pool = parpool("Processes",6);Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 6 workers.
使用 partition 函数为可用的 GPU 创建一个具有一个工作单元的池分区。同时返回一个包含无法访问 GPU 的剩余工作单元的池分区。
[gpuPool,cpuPool] = partition(pool,"MaxNumWorkersPerGPU",1);定义一个函数,该函数通过迭代物流映射方程来模拟人口的增长。
function [r,x] = populationGrowth(N,numIterations) r = gpuArray.linspace(0,4,N); x = rand(1,N,"gpuArray"); for n=1:numIterations x = r.*x.*(1-x); end end
提交一个 parfeval 计算,在 gpuPool 池分区上运行 populationGrowth 函数。parfeval 不阻止 MATLAB,这意味着您可以继续执行命令。
f = parfeval(gpuPool,@populationGrowth,2,200000,1000);
在 parfeval 计算在 GPU 上后台运行时,在 cpuPool 池分区中的剩余工作单元上运行一个 parfor 循环。将 cpuPool 对象指定为 parfor 函数的第二个参量。
M = 100; N = 1e6; data = rand(M,N); parfor(idx = 1:M,cpuPool) out(idx) = sum(data(idx,:))./N; end
当 parfeval 计算完成后,使用 fetchOutputs 函数收集其结果。fetchOutputs 函数等待未来事件完成。
[x,r] = fetchOutputs(f);
按唯一主机对池进行分区
主机是指运行 MATLAB® 工作单元的计算机。在本地池中,这是您的计算机。在集群池中,它指的是集群中的计算机。集群池可以在多个主机上运行工作单元。按唯一主机对池进行分区非常适合需要主机特定配置和充分利用内存或 CPU 的任务。
在工作单元主机上执行设置代码
当工作流要求为运行工作单元的每个主机运行设置代码时,按主机对池进行分区可让您只对每个主机运行一次设置代码。此示例显示了如何为池中的每个主机运行自定义设置函数。
使用远程集群配置文件 myCluster 启动一个由 64 个工作单元组成的并行池。
myClusterPool = parpool("myCluster",64);Starting parallel pool (parpool) using the 'myCluster' profile ... Connected to parallel pool with 64 workers.
从 myClusterPool 创建一个池分区,每个主机一个工作单元。
hostPool = partition(myClusterPool,"MaxNumWorkersPerHost",1);使用 parfevalOnAll 函数在 hostPool 池中的所有工作单元上异步运行 downloadData 辅助函数。等待未来完成,并使用 fetchOutputs 函数检查工作单元中的错误。downloadData 函数作为支持文件附加到此示例。
fSetup = parfevalOnAll(hostPool,@downloadData,1); fetchOutputs(fSetup)
ans = 2×1 string
"Data download complete on host: phvjeq9c-01.company.com"
"Data download complete on host: phvjeq9c-02.company.com"
为数据密集型计算优化内存使用
此示例展示了如何在现有交互式并行池中最大限度地利用可用内存来进行数据密集型计算。
数据密集型计算的性能在很大程度上取决于矩阵的大小。在集群池上执行数据密集型计算时,如果矩阵占用了每个工作单元可用的系统内存的 50% 或以上,计算效率就会降低。如果矩阵大小超过此阈值,您可能会遇到性能下降,因为操作系统开始将内存交换到磁盘。当数据并行计算所需的内存超过池中工作单元可用的内存时,请考虑使用 partition 函数减少池中每个主机的工作单元数量。虽然这会导致工作单元较少的池分区,但每个工作单元都可以访问更多的系统内存来进行数据密集型计算。
使用远程集群配置文件 myCluster 启动一个由 16 个工作单元组成的并行池。在此示例中,假设每个工作单元可访问 8 GB 的系统内存。您可以使用此池运行一般的并行计算。
poolSize = 16;
pool = parpool("myCluster",poolSize);Starting parallel pool (parpool) using the 'myCluster' profile ... Connected to parallel pool with 16 workers.
要执行一些数据密集型工作而无需创建新池,可以按照以下步骤增加每个工作单元可用的内存量。
确定池中的唯一主机数量和每个主机上运行的工作单元数量,并将结果汇总到表中。表中显示并行池有两个唯一主机,每个主机有八个工作单元。
allWkrs = pool.Workers;
hostnames = {allWkrs.Host}';
[hostnames,ia,ic] = unique(hostnames);
numWorkers = accumarray(ic,1);
workersPerHost = table(hostnames,numWorkers) workersPerHost=2×2 table
hostnames numWorkers
_________________________ __________
"phvjeq9c-01.company.com" 8
"phvjeq9c-02.company.com" 8
使用表格中的信息来规划如何对池进行分区。例如,要将每个工作单元可用的内存增加到 16 GB,您需要每个主机提供四个工作单元,而不是八个。使用 partition 函数创建一个每个主机有四个工作单元的池分区。显示新创建的 highMemPool 分区的大小。
highMemPool = partition(pool,"MaxNumWorkersPerHost",4);
highMemPoolSize = highMemPool.NumWorkershighMemPoolSize = 8
计算池分区中工作单元可用的内存。现在,highMemPool 分区中的每个工作单元都可以访问 16 GB 的内存。
memoryPerWorker = 8; % In GB
totalMemory = memoryPerWorker*poolSize;
newMemoryPerWorker = totalMemory/highMemPoolSizenewMemoryPerWorker = 16
定义每个工作单元可运行的最大数组大小,该大小对应于池分区中每个工作单元可用的内存的 45%。
maxMemUsagePerWorker = 0.45*newMemoryPerWorker*1024^3; % In bytes. maxMatSize = round(sqrt(maxMemUsagePerWorker*highMemPoolSize/memoryPerWorker)); fprintf("The `highMemPool` pool can support" + ... " a matrix size of up to %d-by-%d\n",maxMatSize,maxMatSize)
The `highMemPool` pool can support a matrix size of up to 87926-by-87926
现在,您可以在 highMemPool 分区的的工作单元上运行最大为 87926×87926 的数组计算。
例如,要在池分区上使用分布式数组运行数据密集型计算,请将 highMemPool 池对象指定给 spmd 函数。
spmd(highMemPool) A = rand(maxMatSize,"codistributed"); b = rand(maxMatSize,1,"codistributed"); x = A\b; end
在工作单元上运行多线程代码
此示例展示了如何对现有交互式并行池进行分区和设置工作单元,以进行多线程计算。
某些 MATLAB 函数在多核计算机上默认使用多线程,从而提高了计算效率。当有多个线程可用时,使用这些函数的计算比在单个线程上执行得更好。并行池工作单元默认使用单个计算线程,因为它们通常与单个内核相关联。如果代码中的 MATLAB 函数从隐式多线程中获益,则可以将池按每个主机分区出的工作单元较少的池,并增加工作单元上的计算线程数,以利用内置的并行性。虽然此过程会导致工作单元数量减少,但每个工作单元都可以执行多线程计算,从而提高计算效率。
使用远程集群配置文件 myCluster 启动一个由 16 个工作单元组成的并行池。
poolSize = 16;
pool = parpool("myCluster",poolSize);Starting parallel pool (parpool) using the 'myCluster' profile ... Connected to parallel pool with 16 workers.
如果需要此池执行多线程计算,则可以按照以下步骤增加每个工作单元上的计算线程的最大数量。
确定池中唯一主机的数量以及每个主机上运行的工作单元的数量,并将结果汇总到表中。使用表格中的信息来规划如何对池进行分区。
allWkrs = pool.Workers;
hostnames = {allWkrs.Host}';
[uniqueHosts,~,hostIndices] = unique(hostnames);
numWorkers = accumarray(hostIndices,1);
hostWorkers = table(uniqueHosts,numWorkers) hostWorkers=4×2 table
uniqueHosts numWorkers
____________________________ __________
{'phvj9c-00-cm.company.com'} 4
{'phvj9c-01-cm.company.com'} 4
{'phvj9c-02-cm.company.com'} 4
{'phvj9c-03-cm.company.com'} 4
新池分区中所有工作单元的线程数不得超过原始池中的最大工作单元数,否则可能会降低性能。
表中显示,对于此并行池,所有主机的工作单元数量相同。您可以通过将每台主机上的工作单元数除以每个工作单元所需的线程数来应用一个简单的逻辑,从而找到每台主机上实现每个工作单元所需线程数所需的最小工作单元数。
确定分区池中每个工作单元的最小工作单元数,以使每个工作单元有四个线程。
threadsPerWorker = 4; workersPerHost = ceil(min(hostWorkers.numWorkers/threadsPerWorker))
workersPerHost = 1
根据每个主机的计算工作单元数对池进行分区。
multiThreadsPool = partition(pool,"MaxNumWorkersPerHost",workersPerHost);如果工作单元在主机之间分布不均匀,则必须使用 partition 函数的 Workers 参量来实现额外的逻辑,从每个主机中选择特定数量的工作单元。有关处理主机之间分布不均的更强大的分区逻辑的示例,请参阅此示例所附的 unevenHostPartitioning 支持文件。
要设置多线程计算的工作单元,请增加池分区中所有工作单元的最大计算线程数。使用 parfevalOnAll 函数,以指定每个工作单元的线程数在池分区中的所有工作单元中执行 maxNumCompThreads 函数。保留工作单元中之前计算线程的最大数量副本。
setNumCompThreads = parfevalOnAll(multiThreadsPool, ...
@maxNumCompThreads,1,threadsPerWorker);
lastThreads = fetchOutputs(setNumCompThreads);现在,您可以在 multiThreadsPool 分区的工作单元上运行多线程计算。例如,要在池分区上运行 parfor 计算,将 multiThreadsPool 池对象作为第二个输入参量传递给 parfor。
N = randn(5000); numIterations = 10; parfor (idx = 1:numIterations,multiThreadsPool) out = N*N; end
当您完成多线程计算后,请重置池分区中所有工作单元的最大计算线程数。使用 parfevalOnAll 函数,以池分区中所有工作单元的先前最大计算线程数执行 maxNumCompThreads 函数。等待未来完成,并使用 fetchOutputs 函数检查工作单元中的错误。
setNumCompThreads = parfevalOnAll(multiThreadsPool,@maxNumCompThreads,0,lastThreads(1)); fetchOutputs(setNumCompThreads)
针对特定工作单元中的工作单元
当您要在同一工作单元上重复执行命令时,可以使用一个工作单元对池进行分区。这种方法对于要求工作单元在计算之间保持状态或使用唯一资源的任务非常有用。与使用长时间运行的 parfeval 计算来阻止工作单元不同,将单个工作单元池进行分区有助于您跟踪特定的工作单元,并在需要时将该工作单元用于其他并行工作。
保留工作单元数据
此示例展示了如何从现有的交互式并行池中分区出一个工作单元,以在工作单元的内存中保留和访问大型数据集。使用此方法可最大限度地减少客户端与池工作单元之间的数据传输。
在此示例中,您在 10 个集群工作单元的交互式池上创建了一个大型体素图像卷。每个工作单元在内存中存储 500×500×500 的矩阵。如果您无法访问集群工作单元,或者计算机没有足够的内存来创建和存储 3 GB 的卷,请在运行此示例之前减少 imSize 变量的值。
imSize = [500 500 500];
使用 myCluster 远程集群配置文件启动一个由 10 个工作单元组成的并行池。
pool = parpool("myCluster",10);Starting parallel pool (parpool) using the 'myCluster' profile ... Connected to parallel pool with 10 workers.
在 spmd 语句中,使用并行池的工作单元创建气泡的仿真体素图像。显示图像的一部分。
spmd V = rand(imSize,"single"); BW = false(size(V)); BW(V < 0.000001) = true; V = bwdist(BW); V(V <= 20) = 1; V(V > 20) = 0; end volshow(V{1})

将结果体素图像连接到 spmdIndex 为 1 的工作单元上。
spmd V = spmdCat(V,3,1); end
现在,对池进行分区,将其划分到存储体素图像的工作单元,具体来说,是 spmdIndex 为 1 的工作单元上。
在 spmd 语句中使用 getCurrentWorker 函数来识别池中的工作单元。getCurrentWorker 函数返回一个 Composite,其中包含池中每个工作单元的 parallel.pool.Worker 对象。
spmd wkrs = getCurrentWorker; end
使用 parallel.pool.Worker 对象为 spmdIndex 为 1 的工作单元对并行池进行分区。
dataWkrPool = partition(pool,"Workers",wkrs{1});要在 parfor 循环或 parfeval 计算中访问体素图像,请使用体素图像的 Composite 对象 V 创建一个 Constant 对象。
V = parallel.pool.Constant(V);
现在,您可以在 parfor 循环或 parfeval 计算中访问工作单元中的数据。
例如,使用 parfeval 对 dataWkrPool 单工作单元池分区执行一些图像处理,使用 Image Processing Toolbox™ 函数。定义一个函数来计算体素图像中的气泡数量和每个气泡的体积。将 dataWkrPool 池分区和体素图像 Constant 对象指定给 parfeval 函数。检索结果。
function [numBubbles,bubbleVolumes] = myBubbleFunction(V) labeledV = bwlabeln(V.Value); numBubbles = max(labeledV(:)); % Calculate the volume of each bubble stats = regionprops3(labeledV,"Volume"); bubbleVolumes = stats.Volume'; end f = parfeval(dataWkrPool,@myBubbleFunction,2,V); [numBubbles,bubbleVolumes] = fetchOutputs(f)
numBubbles = 645
bubbleVolumes = 1×645
26709 12319 17329 24559 33401 33401 33401 33401 33401 16665 33401 33401 93972 17329 33401 33401 33401 33401 33401 33038 33401 65942 33401 33401 33401 33401 33401 63062 64938 33401 25660 24559 66584 49502 33401 33401 33401 33401 33401 33401 33401 33401 33401 33401 33401 22237 33401 33401 33401 66596
将工作单元分配给特定任务
此示例展示了如何从现有并行池中分区出一个工作单元,并使用该工作单元保持相同的数据库连接,从数据库导入图像数据以进行处理。
当工作流需要独特资源(如数据库连接或硬件连接)时,将这些资源分配给特定的工作单元可确保您在需要时轻松、反复地访问这些资源。要在并行池中使用文件句柄、数据库和硬件连接等句柄类型资源,建议使用 parallel.pool.Constant 对象为池中的每个工作单元创建句柄类型资源。对于数据库连接,这也意味着数据库必须承担与池中不同工作单元保持多个连接的开销。
如果无法为池中的每个工作单元创建数据库连接,则可以使用此方法在特定的工作单元上维护数据库连接,以便反复访问。
在本地计算机上启动一个线程工作单元池。基于线程的池经过优化,可减少数据传输、加快调度速度并减少内存使用,因此可提高涉及工作单元之间大量数据传输的应用程序的性能。
pool = parpool("Threads");Starting parallel pool (parpool) using the 'Threads' profile ... Connected to parallel pool with 6 workers.
使用池对象的 Workers 属性来获取池中工作单元的 parallel.Worker 对象。对并行池进行分区,以隔离数组 allWkrs 中的第一个工作单元。
allWkrs = pool.Workers;
dBWkrPool = partition(pool,"Workers",allWkrs(1));使用 connectToDatabase 辅助函数为数据库连接创建一个 Constant 对象。如果您拥有 Database Toolbox™,您可以使用 database (Database Toolbox) 函数连接到数据库。connectToDatabase 辅助函数已作为支持文件附加到此示例中。
cDbase = parallel.pool.Constant(@() connectToDatabase("Database"));定义一个函数,用于创建并维护与数据库的连接。要运行此示例,getDbaseData 函数仿真创建与数据库的连接,并将连接存储在 Constant 对象的 Value 属性中。该函数还仿真每次在工作单元上运行时使用相同的数据库连接导入数据。如果您拥有 Database Toolbox,您可以使用 fetch (Database Toolbox) 函数从数据库中导入数据。fetchQuery 辅助函数已作为支持文件附加到此示例中。
function imgs = getDbaseData(C,query) conn = C.Value; imgs = fetchQuery(conn,query); end
指定数据库查询。为图像处理滤波器创建一个 Constant 对象。
imageNames = ["outdoors" "indoors" "daytime" "nightime"]; sqlqueries = "SELECT * FROM " + imageNames; cFilter = parallel.pool.Constant(randn(16,1));
要导入数据,请将 getDbaseData 函数与 parfeval 异步运行,并在 dBWkrPool 池分区上运行。通过将 dBWkrPool 分区指定给 parfeval 函数,相同的工作单元始终运行 getDbaseData 函数,并使用来自 Constant 对象的相同数据库连接来导入数据。
fImport = parfeval(dBWkrPool,@getDbaseData,1,cDbase,sqlqueries(1));
在循环中,使用 parfeval 导入数据,并在 parfor 循环中处理导入的数据。为了最大限度地减少工作单元等待接收数据的时间,请错开数据导入和处理计算。
for j = 1:length(sqlqueries) % Fetch data asynchronously data = fetchOutputs(fImport); % Wait for the data to be fetched % Schedule the next parfeval computation to run in the background. if j < length(imageNames) fImport = parfeval(dBWkrPool,@getDbaseData,1,cDbase,sqlqueries(j+1)); end parfor k=1:size(data,2) % Zero-pad filter to the length of data, and transform [rows,~] = size(data); filterF = fft(cFilter.Value,rows); % Transform each column of the input data imgFft = fft2(data{k}); % Multiply each column by filter and compute inverse transform out{k} = ifft2(filterF.*imgFft) end processedImages.(imageNames(j)) = out; fprintf("'%s' images processed\n",imageNames(j)) end
'outdoors' images processed 'indoors' images processed 'daytime' images processed 'nightime' images processed
维护工作单元的状态
此示例展示了如何使用池分区来确保工作单元在多个函数调用之间保持其状态。您可以将并行池划分为多个单工作单元池分区。此设置允许您将计算定向到同一工作单元,从而使用持久变量在函数调用之间继续计算。
此示例仿真了一个简单的金融交易系统,其中每个工作单元处理市场数据、更新其状态,并根据其当前状态和各个风险因素做出交易决策。
首先访问当前的并行池。如果没有可用池,gcp 函数将使用默认配置创建一个池。使用当前池对象的 Workers 属性来获取池中工作单元的 parallel.Worker 对象。
pool = gcp; poolWkrs = pool.Workers;
将池分三部分,将数组 poolWkrs 中的前三个工作单元隔离到单独的池分区中。将池分区存储在池对象数组中。
traderPools(1) = partition(pool,"Workers",poolWkrs(1)); traderPools(2) = partition(pool,"Workers",poolWkrs(2)); traderPools(3) = partition(pool,"Workers",poolWkrs(3));
定义一个用于交易系统的函数。tradingWorker 函数在函数调用之间维护一个持久状态,处理新的市场数据,并根据当前状态使用平均值来做出交易决策。决策过程纳入了针对工作单元的风险因素,以反映不同工作单元之间的差异。
function decision = tradingWorker(marketData,riskFactor) % Initialize or update the state persistent state if isempty(state) state = struct("history",[],"position","none"); end % Process new market data state.history = [state.history;marketData]; % Calculate short-term and long-term averages shortTermAvg = mean(marketData); longTermAvg = mean(state.history); % Trading logic with a worker-specific risk factor randomFactor = 1 + riskFactor*randn; % Worker-specific randomness if strcmp(state.position,"none") && (shortTermAvg > longTermAvg*randomFactor) state.position = "buy"; elseif strcmp(state.position,"buy") && (shortTermAvg < longTermAvg*randomFactor) state.position = "sell"; elseif strcmp(state.position,"sell") && (shortTermAvg >= longTermAvg*randomFactor) state.position = "none"; end decision = state.position; pause(1) % Simulate processing time end
创建一个用户界面表来显示来自工作单元的决策数据。
fig = uifigure(Position=[619 525 443 308]); decisionTable = table(Size=[0 4], ... VariableTypes=["double","string","string","string"], ... VariableNames=["Time","Trader 1","Trader 2","Trader 3"]); uit = uitable(fig,Data=decisionTable,Position=[20 11 350 266]);
仿真实时市场数据,并使用分区池进行处理。为每个工作单元分配一个引入交易决策变异性的唯一风险因素。为了确保每个池工作单元在每次运行 tradingWorker 函数并更新其状态时都使用相同的风险因素,请为每个风险因素使用相同的池分区。获取每个时间步的决策,并更新表格。
riskFactors = [0.1 0.2 0.3]; for t = 1:10 % % Simulate real-time market data marketData = rand(100,1)*100; % Random price data % Create futures to process data in parallel futures(1) = parfeval(traderPools(1),@tradingWorker,1,marketData,riskFactors(1)); futures(2) = parfeval(traderPools(2),@tradingWorker,1,marketData,riskFactors(2)); futures(3) = parfeval(traderPools(3),@tradingWorker,1,marketData,riskFactors(3)); decisions = fetchOutputs(futures); decisionTable(end+1,:) = {t,decisions(1),decisions(2),decisions(3)}; uit.Data = decisionTable; drawnow limitrate nocallbacks end

清除工作单元函数中的持久状态。当您从客户端清除具有持久变量的函数时,MATLAB® 也会清除所有工作单元上的该函数。
clear tradingWorker为不同工作流分区多个池
在某些需要管理多个或并发工作流的应用程序中,创建多个池分区可以帮助您平衡不同工作流的资源使用。这种方法对于彼此独立运行的并行工作流特别有用。
同时运行多个工作流
如果您有多个具有不同资源要求的工作流需要同时运行,将池划分为多个池分区可使每个工作流独立运行,互不干扰。有关使用多个池分区管理数据处理管道中的资源的示例,请参阅 用于并行并发工作流中高效资源管理的分区池。
根据工作流调整并行池大小
此示例展示了在不同工作流之间切换时,如何使用池分区来高效管理资源,而无需重启池。
在此示例中,您运行了一个 parfor 循环,该循环利用了池中所有可用的工作单元。完成此过程后,您需要运行一个内存密集型任务,该任务只能使用可用核心的一半,以避免内存问题。池分区允许您在不重新启动现有池的情况下,无缝管理此过渡。
使用默认集群配置文件启动一个包含 10 个工作单元的并行池。使用一个 parfor 循环来生成时长为 30 秒的音频样本,采样率为 44.000 Hz。音频信号的频率每 3 秒钟变化一次,变化为随机值。generateAudio 函数作为支持文件附加到此示例。
pool = parpool(10);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 10 workers.
fs = 44100; frequencyDuration = 3; audioDuration = 30; numAudio = 50; audioSamples = cell(1,numAudio); parfor idx = 1:numAudio audioSamples{idx} = generateAudio(audioDuration,frequencyDuration,fs); end
接下来,提取背景音频样本的短时傅里叶变换 (STFT)。STFT 计算涉及执行多个 FFT 计算,这可能会占用工作单元的大量内存。要减少同时运行的 parfeval 计算的数量,请创建一个工作单元数量较少的池分区来运行 parfeval 计算。
poolWkrs = pool.Workers;
smallPool = partition(pool,"Workers",poolWkrs(1:2:end));通过将 smallPool 池对象指定给 parfeval 函数,将 extractFeatures parfeval 计算提交到池分区。
futures(1:numAudio) = parallel.FevalFuture; for a = 1:numAudio futures(a) = parfeval(smallPool,@extractFeatures,1,audioSamples{a},fs); end
您可以继续在客户端或未位于 smallPool 分区的工作单元上运行计算。对于此示例,当 parfeval 计算完成后,使用 fetchOutputs 函数收集其结果。fetchOutputs 函数等待所有异步任务完成。
sftfData = fetchOutputs(futures);
局部函数
extractFeatures 函数处理采样频率为 fs 的输入音频信号 audio,并返回一个包含原始音频和混响音频的短时傅里叶变换 (STFT) 特征的结构体。addReverb 和 getSTFT 辅助函数已作为支持文件附加到此示例中。
function output = extractFeatures(audio,fs) reverbAud = addReverb(audio,fs); [audT,audF,audStftX] = getSTFT(audio,fs); [reverbT,reverbF,reverbStftX] = getSTFT(reverbAud,fs); output = struct("audT",audT,"audF",audF,"audStftX",audStftX, ... "reverbT",reverbT,"reverbF",reverbF,"reverbStftX",reverbStftX); end