主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

在并行中对数据存储进行分区

对数据存储进行并行分区,将数据存储的一部分放在并行池中的每个工作单元上,这在许多情况下可以带来好处:

  • 仅对整个数据存储的一部分执行某些操作,或者同时对几个定义的部分执行某些操作。

  • 在数据存储中搜索特定值,所有工作单元同时在自己的分区上操作。

  • 对所有分区中的工作单元执行减少计算。

并行从数据存储读取数据

此示例显示如何使用 partition函数并行从数据存储中读取数据。它使用 MATLAB® 中提供的小型航空公司数据存储,并从其 'ArrDelay' 列中找到非 NaN 值的平均值。

串行执行

计算平均值的一个简单方法是将所有非 NaN 值的总和除以非 NaN 值的数量。sumAndCountArrivalDelay 辅助函数中的代码首先以非并行方式为数据存储执行此操作。

首先,删除任何现有的并行池。

delete(gcp('nocreate'));

airlinesmall_subset.xlsx 中的工作表集合中创建数据存储并选择要导入的 ArrDelay 变量。

使用函数 sumAndCountArrivalDelay 来计算平均值,无需任何并行执行。使用 tictoc 函数来计时执行,在这里以及后面的并行情况下。

ds = spreadsheetDatastore(repmat({'airlinesmall_subset.xlsx'},20,1));
ds.SelectedVariableNames = 'ArrDelay';
reset(ds);
tic
  [total,count] = sumAndCountArrivalDelay(ds)
total = 3098060
count = 394940
sumtime = toc
sumtime = 36.6618
mean = total/count
mean = 7.8444

并行执行

partition 函数允许您将数据存储分区为更小的部分,每个部分都表示为一个数据存储本身。这些较小的数据存储彼此完全独立地工作,以便您可以在并行语言功能(例如 parfor 循环和 spmd 代码块)中使用它们。

使用自动分区

您可以使用 numpartitions函数指定分区数,该数基于数据存储本身和并行池大小。这并不一定等于池中的工作单元数量。将循环迭代次数设置为分区数 (N)。

以下代码在本地集群上启动并行池,然后在工作单元之间对数据存储进行分区,以便对循环进行迭代。此代码调用辅助函数 parforSumAndCountArrivalDelay,其中包括一个 parfor 循环,用于在并行循环迭代中汇总计数和总和。

p = parpool('Processes',4);
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 4 workers.
reset(ds);
tic
  [total,count] = parforSumAndCountArrivalDelay(ds)
total = 3098060
count = 394940
parfortime = toc
parfortime = 11.6383
mean = total/count
mean = 7.8444

指定分区数

您可以明确设置该值,而不是让软件计算分区数,以便可以适当地对数据进行分区以适合您的算法。例如,要并行化 spmd 代码块内的数据,您可以将工作单元 (spmdSize) 的数量指定为要使用的分区数量。spmdSumAndCountArrivalDelay 辅助函数使用 spmd 代码块执行并行读取,并明确将分区数设置为等于工作单元数。

reset(ds);
tic
[total,count] = spmdSumAndCountArrivalDelay(ds)
total = 3098060
count = 394940
spmdtime = toc
spmdtime = 11.7520
mean = total/count
mean = 7.8444

完成计算后,您可以删除当前并行池。

delete(p);

辅助函数

创建一个辅助函数,以非并行的方式汇总计数和总和。

function [total,count] = sumAndCountArrivalDelay(ds)
    total = 0;
    count = 0;
    while hasdata(ds)
        data = read(ds);
        total = total + sum(data.ArrDelay,1,'OmitNaN');
        count = count + sum(~isnan(data.ArrDelay));
    end
end

创建一个辅助函数,使用 parfor 并行计算计数和并行。

function [total, count] = parforSumAndCountArrivalDelay(ds)
    N = numpartitions(ds,gcp);
    total = 0;
    count = 0;    
    parfor ii = 1:N
        % Get partition ii of the datastore.
        subds = partition(ds,N,ii);
    
        [localTotal,localCount] = sumAndCountArrivalDelay(subds);
        total = total + localTotal;
        count = count + localCount;
    end
end

创建一个辅助函数,使用 spmd 并行计算计数和并行。

function [total,count] = spmdSumAndCountArrivalDelay(ds)
    spmd
        subds = partition(ds,spmdSize,spmdIndex);
        [total,count] = sumAndCountArrivalDelay(subds);    
    end
    total = sum([total{:}]);
    count = sum([count{:}]);
end

另请参阅

|

主题