为受支持的调度器编写通信作业
调度器和条件
您可以使用任何类型的调度器运行通信作业。通信作业还包括 parpool、spmd 和 parfor。这些说明展示了如何编写通信作业以在本地进程集群、MATLAB® 作业调度器集群或任何与 MATLAB Parallel Server™ 接口交互的第三方调度器集群上运行。对于所有不同的调度器类型来说,基本作业编程顺序都是相同的。
编写任务函数代码
这个示例展示了编程通信作业的基本原理。在这个示例中,spmdIndex 值为 1 的工作单元创建了一个幻方矩阵,该幻方矩阵由行数和列数组成,其数量等于运行该作业的工作单元数量 (spmdSize)。在这种情况下,四个工作单元使用 4×4 幻方矩阵运行通信作业。第一个工作单元使用 spmdBroadcast 函数将矩阵广播给所有其他工作单元,每个工作单元计算矩阵一列的总和。所有这些列和与 spmdPlus 函数相结合,计算出原始幻方矩阵元素的总和。
本示例的函数如下所示。
function total_sum = colsum if spmdIndex == 1 % Send magic square to other workers A = spmdBroadcast(1,magic(spmdSize)); else % Receive broadcast on other workers A = spmdBroadcast(1); end % Calculate sum of column identified by spdmIndex for this worker column_sum = sum(A(:,spmdIndex)); % Calculate total sum by combining column sum from all workers total_sum = spmdPlus(column_sum);
将该函数保存为名为 colsum.m 的文件,保存在 MATLAB 客户端的路径下。该软件使用作业的 AttachedFiles 属性将文件发送给每个工作单元。
虽然此示例中有一个工作单元创建了幻方并将其广播给其他工作单元,但是还有其他方法可以将数据传送给工作单元。每个工作单元都可以为自己创建矩阵。或者,每个工作单元可以从磁盘上的文件读取其部分数据,可以将数据作为参量传递给任务函数,或者可以将数据发送到作业的 AttachedFiles 属性中包含的文件中。选择的解决方案取决于您的网络配置和数据性质。
客户端中的代码
与独立作业一样,您可以选择一个配置文件,并使用 parcluster 函数在您的 MATLAB 客户端中创建一个集群对象。根据您使用的调度器,配置文件会略有不同,但使用配置文件定义尽可能多的属性可最大限度地减少调度器类型之间的编码差异。
您可以使用以下代码创建并配置集群对象:
c = parcluster('MyProfile')其中 'MyProfile' 是您正在使用的调度器类型的集群配置文件的名称。各种集群选项所需的任何差异均在配置文件中控制。对于每种类型的调度器,您可以有一个或多个单独的配置文件。有关完整详细信息,请参阅 发现集群并使用集群配置文件。根据系统管理员的指示创建或修改配置文件。
当您的集群对象被定义时,您可以使用 createCommunicatingJob 函数创建作业对象。创建作业时,必须将作业 Type 属性设置为 'SPMD'。
cjob = createCommunicatingJob(c,'Type','SPMD');
函数文件 colsum.m(在 编写任务函数代码 中创建)位于 MATLAB 客户端路径上,但必须向工作单元提供。其中一种方法是使用作业的 AttachedFiles 属性,可以在您使用的配置文件中进行设置,或者在命令行窗口中输入以下代码:
cjob.AttachedFiles = {'colsum.m'}您还可以设置作业的其他属性,例如,设置要使用的工作单元数量。再次强调,配置文件可能在您的特定情况下很有用,特别是当您的大多数作业需要许多相同的属性设置时。要在四个工作单元上运行此示例,您可以在配置文件中建立它,或者通过以下客户端代码建立它:
cjob.NumWorkersRange = 4
您可以使用常用的 createTask 函数创建该作业的一项任务。在这个示例中,任务只从每个工作单元返回一个参量,并且没有输入参量到 colsum 函数。
t = createTask(cjob, @colsum, 1, {})使用 submit 运行该作业。
submit(cjob)
让 MATLAB 客户端等待作业完成后再收集结果。结果由每个工作单元的一个值组成。任务中的 spmdPlus 函数在工作单元之间共享数据,以便每个工作单元都有相同的结果。
wait(cjob)
results = fetchOutputs(cjob)
results =
[136]
[136]
[136]
[136]