利用并行作业和任务进行扩展
此示例展示了如何使用并行作业和任务将计算扩展到大型集群上的数百个工作单元。
您可以通过将 parfor
循环转换为独立作业的多个任务,将现有的 parfor
工作流扩展到并行池限制之外。此示例将 运行不使用并行池的 parfor 循环 示例中的 parfor
工作流转换为作业和任务工作流。
此示例重新创建了 Jammer 等人 [2] 对 ARGESIM 基准 CP2 蒙特卡罗研究 [1] 的更新。在蒙特卡罗研究中,您可以使用作业和任务仿真具有不同、随机采样的阻尼系数的弹簧质量系统。
创建集群对象和作业
创建集群对象并显示集群中可用的工作单元数量。HPCProfile
是 MATLAB® 作业调度器集群的配置文件。将 HPCProfile
配置文件替换为您自己的集群配置文件。
cluster = parcluster("HPCProfile"); maxNumWorkers = cluster.NumWorkers; fprintf("Number of workers available: %d",maxNumWorkers)
Number of workers available: 496
使用集群对象创建一个独立的作业。
job = createJob(cluster);
定义仿真参数
period = [0 2];
h = 0.001; % time step
t_interval = period(1):h:period(2);
设置迭代次数。
nReps = 10000000;
初始化随机数生成器并创建一个从 800 到 1200 之间的均匀分布中采样的阻尼系数数组。
rng(0); a = 800; b = 1200; d = (b-a).*rand(nReps,1) + a;
修改作业和任务工作流
要将 parfor
工作流更改为作业和任务工作流,请将 parfor
循环的主体转换为一个函数,该函数接受阻尼系数向量并返回质量弹簧运动的总和。
function y_sum = taskFcn(d)
定义工作单元的仿真参数。为了减少数据传输开销,请直接在工作单元上指定时间间隔和任何其他常量参量,而不是将它们作为输入参量传输给工作工作单元。
period = [0 2]; h = 0.001; y0 = [0 0.1]; t_interval = period(1):h:period(2);
初始化归约操作的结果变量。
y_sum = zeros(numel(t_interval),1);
为了减少调度开销,将迭代划分为每个任务的组,而不是为每个迭代调度一个任务。使用 for
循环遍历此任务的阻尼系数集。使用归约变量来计算每个时间点的运动总和。
for n = 1:length(d) f = @(t,y) massSpringODE(t,y,d(n)); [~,yOut] = ode45(f,t_interval,y0); y_sum = y_sum + yOut(:,1); end
当所有结果的总和很大时,或者客户端必须在作业运行中处理中间结果时,您可以使用作业的 ValueStore
。否则,如果您的结果数据很小,您可以使用任务的 OutputArgument
属性将结果发送回客户端。
end
为任务准备输入数据
为了帮助减少为一个作业安排多个任务时的开销,请将迭代按任务划分组。尝试将迭代划分为以下组:
足够大,以至于计算时间与调度分区的开销相比很大。
规模足够小,有足够的任务让所有工作单元忙碌。
减少最后几组任务的规模,以让尽可能多的工作单元忙碌起来。
附加到此示例的 partitionIterations
辅助函数使用迭代次数和所需的最大工作单元数量将迭代划分为适当大小的组,并返回一个元胞数组,其中每个元胞对应一组迭代索引。partitionIterations
函数将较大的组分配给初始任务,并逐步将较小的组分配给后续任务,以实现均衡的工作量分配。
taskGroups = partitionIterations(nReps,maxNumWorkers);
为每个任务指定迭代索引后,使用 cellfun
函数将每个任务组对应的阻尼系数提取到元胞数组中。
dampingCoeffs = cellfun(@(ind) {d(ind)},taskGroups,UniformOutput=false);
创建任务并提交作业
使用单个调用为该作业创建多个任务。每个任务使用来自 taskFcn
元胞数组的相应输入参量集执行 dampingCoeffs
函数。指示工作单元为每个任务返回一个输出参量。
tasks = createTask(job,@taskFcn,1,dampingCoeffs);
提交作业在集群上运行。
submit(job);
如果想要阻止 MATLAB 客户端直到作业完成,请在 wait
对象上使用 job
函数。当后续代码依赖于作业的完成时,wait
函数很有用。
wait(job);
访问结果
作业完成后,您可以使用 fetchOutputs
函数检索所有任务的结果。
results = fetchOutputs(job);
fetchOutputs
函数返回一个元胞数组,其中每个元素都是一个任务的输出。将元胞转换为数值数组并计算每行的总和和平均值。
y_sum = sum(cell2mat(results'),2); meanY = y_sum./nReps;
绘制系统随时间变化的平均响应。
plot(t_interval,meanY) title("ODE Solution of Mass-Spring System") xlabel("Time") ylabel("Motion") grid on
显示作业持续时间。
jobDuration = job.RunningDuration
jobDuration = duration
00:07:32
比较计算加速
将 parfor
工作流转换为作业和任务工作流的计算加速与在并行池和直接在集群上运行 parfor
循环的计算加速进行比较。
使用本示例附带的 timeExecution
辅助函数来测量 parfor
工作流在客户端、在具有 496 个工作单元的并行池上以及直接在具有 496 个可用工作单元的集群上的执行时间。将作业持续时间转换为秒数。
[serialTime,hpcPoolTime,hpcClusterTime] = timeExecution("HPCProfile",maxNumWorkers);
jobsAndTaskTime = double(seconds(jobDuration));
elapsedTimes = [serialTime hpcPoolTime hpcClusterTime jobsAndTaskTime];
计算计算加速并创建条形图比较每个工作流的加速。该图显示,使用作业和任务工作流与在并行池上运行 parfor
循环具有相似的加速,并且比直接在集群上运行 parfor
循环具有更大的加速。
speedUp = elapsedTimes(1)./elapsedTimes; x = ["parfor Client","parfor Pool","parfor Cluster","Jobs and Tasks"]; bar(x,speedUp); xlabel("Workflow") ylabel("Computational Speedup")
辅助函数
该辅助函数代表求解器使用的质量弹簧系统的 ODE。您可以将描述弹簧质量系统 (eq1) 的微分方程重写为可以使用 ode45
求解器进行求解的一阶 ODE 系统 (eq2)。
(eq1)
(eq2)
function dy = massSpringODE(t,y0,d) k = 9000; % spring stiffness (N/m) m = 450; % mass (kg) dy = zeros(2,1); dy(1) = y0(2); dy(2) = -(d*y0(2)+k*y0(1))/m; end
参考资料
[1] Breitenecker, Felix, Gerhard Höfinger, Thorsten Pawletta, Sven Pawletta, and Rene Fink."ARGESIM Benchmark on Parallel and Distributed Simulation."Simulation News Europe SNE 17, no. 1 (2007):53-56.
[2] Jammer, David, Peter Junglas, and Sven Pawletta.“Solving ARGESIM Benchmark CP2 ’Parallel and Distributed Simulation’ with Open MPI/GSL and Matlab PCT - Monte Carlo and PDE Case Studies.”SNE Simulation Notes Europe 32, no. 4 (December 2022):211–20. https://doi.org/10.11128/sne.32.bncp2.10625.