Main Content

本页采用了机器翻译。点击此处可查看最新英文版本。

Spark 集群上使用 Tall 数组

此示例显示如何修改创建 tall 表的 MATLAB® 示例以在 Spark™ 集群或启用 Spark 的 Hadoop® 集群上运行。您可以使用此 tall 表来创建 tall 数组并计算统计属性。您可以在本地开发代码,然后扩大规模,以利用 Parallel Computing Toolbox™ 和 MATLAB Parallel Server™ 提供的功能,而无需重写算法。另请参阅 使用 Tall 数组和数据存储的大数据工作流配置 Spark 集群 (MATLAB Parallel Server)配置 Hadoop 集群 (MATLAB Parallel Server)

使用集群配置文件连接到 Spark 集群

自 R2024a 起

从 Spark 集群配置文件中创建并使用 parallel.cluster.Spark 对象,并使用 mapreducer 将 Spark 集群设置为执行环境。

sparkCluster = parcluster("SparkProfile")
mr = mapreducer(sparkCluster)

要了解如何为您的 Spark 集群创建配置文件,请参阅 客户端配置 (MATLAB Parallel Server)

手动连接到 Spark 集群和 Spark 已启用 Hadoop 集群

您还可以在没有集群配置文件文件的情况下连接到您的 Spark 集群。首先,您必须根据特定的 Spark 集群配置设置适当的环境变量和集群属性。请咨询您的系统管理员以获取这些属性的值以及向集群提交作业所需的其他属性的值。

Spark 集群手动创建集群对象

创建一个集群对象,以从 MATLAB 客户端连接到 Spark 集群。

通过指定机器上的 Spark 安装位置来创建集群对象。使用 mapreducer 函数将 Spark 集群设置为执行环境。

cluster = parallel.cluster.Spark(SparkInstallFolder="/path/to/spark/install");

% Optionally, if you want to control the exact number of workers:
cluster.SparkProperties('spark.executor.instances') = '16';

mapreducer(cluster);

为已启用 SparkHadoop 集群手动创建集群对象

创建一个集群对象,以从 MATLAB 客户端连接到启用了 Spark 的 Hadoop 集群。

使用环境变量指定您机器上的 Hadoop 集群安装位置和 Spark 安装位置。创建集群对象,并将 Spark 启用的 Hadoop 集群设置为执行环境。

setenv('HADOOP_HOME', '/path/to/hadoop/install')
setenv('SPARK_HOME', '/path/to/spark/install');
cluster = parallel.cluster.Hadoop;

% Optionally, if you want to control the exact number of workers:
cluster.SparkProperties('spark.executor.instances') = '16';

mapreducer(cluster);

注意

在设置步骤中,使用 mapreducer 来设置集群执行环境。在下一步中,您将创建一个 tall 数组。如果在创建 tall 数组后修改或删除集群执行环境,则 tall 数组无效,必须重新创建。

注意

如果您想要串行开发并且不使用本地工作进程,请输入以下命令。

mapreducer(0);

创建和使用 tall 表

您现在可以在 Spark 集群上而不是在本地机器上执行并行 MATLAB 代码。

这些说明展示了如何在启用了 Spark 的 Hadoop 集群上创建和使用 tall 表,尽管此过程可用于任何 Spark 集群。

创建指向航空航班数据表格文件的数据存储。通过将 'NA' 值视为缺失数据来清理数据,以便 datastore 函数用 NaN 值替换它们。

ds = datastore('airlinesmall.csv');
varnames = {'ArrDelay', 'DepDelay'};
ds.SelectedVariableNames = varnames;
ds.TreatAsMissing = "NA";

从数据存储中创建一个 tall 表 tt。MATLAB 自动启动 Spark 作业以对 tall 表运行后续计算。

tt = tall(ds)
Starting a Spark job on the Hadoop cluster. 
This may take a few minutes while cluster resources are allocated ...

Connected to the Spark job.

tt =

  M×2 tall table

    ArrDelay    DepDelay
    ________    ________

        8          12   
        8           1   
       21          20   
       13          12   
        4          -1   
       59          63   
        3          -2   
       11          -1   
       :           :
       :           :

显示内容表明行数 M 尚不清楚。M 是一个占位符,直到计算完成。

从 tall 表中提取到达延迟 ArrDelay。此操作将创建一个新的 tall 数组变量以供后续计算使用。

a = tt.ArrDelay;

您可以对 tall 数组指定一系列操作,这些操作直到调用 gather 函数才会执行。这样做可以让您批处理可能需要很长时间的命令。作为示例,计算到达延迟的平均值和标准差。使用这些值来构建平均值 1 个标准差以内的延迟的上限和下限阈值。

m = mean(a,'omitnan');
s = std(a,'omitnan');
one_sigma_bounds = [m-s m m+s];

使用 gather 计算 one_sigma_bounds,并将答案记入记忆。

sig1 = gather(one_sigma_bounds)
Evaluating tall expression using the Spark Cluster:
- Pass 1 of 1: Completed in 0.95 sec
Evaluation completed in 1.3 sec

sig1 =

  -23.4572    7.1201   37.6975

如果您想同时评估几件事,您可以为 gather 指定多个输入和输出。这样做比在每个 tall 数组上单独调用 gather 要快。例如,计算最小和最大到达延迟。

[max_delay, min_delay] = gather(max(a),min(a))
max_delay =

        1014

min_delay =

   -64

注意

如果 MATLAB 在集群工作进程上启动,这些示例第一次需要更多时间才能完成。

在 Spark 集群上使用 tall 数组时,集群中的计算资源将在 mapreducer 执行环境的整个生命周期内保留。要清除这些资源,您必须删除 mapreducer:

delete(gcmr);
或者,您可以更改为不同的执行环境,例如:
mapreducer(0);

另请参阅

| | | | | |

相关示例

详细信息