主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

使用 MATLAB API for Spark 将应用程序部署到 Spark

支持的平台:仅限 Linux®

此示例向您说明如何使用 MATLAB® API for Spark™ 将独立应用程序部署到 Spark。您可以使用两个受支持的集群管理器之一在 Spark 上部署您的应用程序:本地和 Hadoop® YARN。此示例向您说明如何使用两个集群管理器部署应用程序。有关集群管理器的讨论,请参阅Spark 支持的集群管理器

目标:计算给定数据集中唯一航空公司的数量。

数据集:airlinesmall.csv
描述:

1987 年至 2008 年航空公司出发和到达信息。

位置:

要下载 airlinesmall.csv 文件,在 MATLAB 命令提示符下输入:

setupExample("matlab/AddKeysValuesExample", pwd)
请忽略与 airlinesmall.csv 文件一同自动下载的 AddKeysValuesExample.mlx 实时脚本文件。

辅助函数

使用以下代码创建一个名为 carrierToCount.m 的 MATLAB 文件:

function results = carrierToCount(input)
    tbl = input{1};
    intermKeys = tbl.UniqueCarrier;
    [intermKeys, ~, idx] = unique(intermKeys);
    intermValues = num2cell(accumarray(idx, ones(size(idx))));
    results = cellfun( @(x,y) {x,y} , ...
        intermKeys, intermValues, ...
        'UniformOutput',false);
该辅助函数作为函数句柄传递给示例中的某个方法。

注意

如果您使用的是 Spark 1.6 或更高版本,则需要将 Java® 中的 MATLAB 堆大小增加到至少 512MB。有关如何在 Java 中增加 MATLAB 堆大小的信息,请参阅Java 堆内存设置

本地

本地集群管理器代表一个伪 Spark 启用集群,在单台计算机上以非分布式模式工作。它可以配置为使用一个工作进程线程,或者在多核计算机上配置多个工作进程线程。在应用中,它用单词 local 表示。在启用 Spark Hadoop 集群上进行全面部署之前,本地集群管理器可方便地调试您的应用程序。

前提条件

  1. 通过创建一个 MATLAB 搜索路径可见的新工作文件夹来开始此示例。

  2. 创建上面提到的辅助函数 carrierToCount.m

过程

  1. 指定 Spark 属性。

    使用 containers.Map 对象指定 Spark 属性。

    sparkProp = containers.Map(...
        {'spark.executor.cores',...
        'spark.matlab.worker.debug'},...
        {'1',...
        'true'});

    Spark 属性指示正在部署的应用程序的 Spark 执行环境。每个应用程序都必须配置特定的 Spark 属性才能部署。

    有关 Spark 属性的详细信息,请展开 SparkConf 类的 输入参量 部分中 'SparkProperties' 名称-值对的 prop 值。

  2. 创建一个 SparkConf 对象。

    使用类 matlab.compiler.mlspark.SparkConf 创建 SparkConf 对象。SparkConf 对象存储部署到 Spark 的应用程序的配置参数。应用程序的配置参数通过 SparkContext 传递到 Spark 集群。

    conf = matlab.compiler.mlspark.SparkConf(...
        'AppName', 'mySparkAppDepLocal', ...
        'Master', 'local[1]', ...
        'SparkProperties', sparkProp );

    有关 SparkConf 的详细信息,请参阅matlab.compiler.mlspark.SparkConf

  3. 创建一个 SparkContext 对象。

    使用类 matlab.compiler.mlspark.SparkContext 并以 SparkConf 对象作为输入来创建 SparkContext 对象。

    sc = matlab.compiler.mlspark.SparkContext(conf);

    SparkContext 对象通过初始化与 Spark 集群的连接充当 Spark 的入口点。它接受 SparkConf 对象作为输入参量,并使用该对象中指定的参数来设置与 Spark 执行环境建立连接所需的内部服务。

    有关 SparkContext 的详细信息,请参阅matlab.compiler.mlspark.SparkContext

  4. 从数据创建一个 RDD 对象。

    使用 MATLAB 函数 datastore 创建指向文件 airlinesmall.csvdatastore 对象。然后使用 SparkContext 方法 datastoreToRDDdatastore 对象转换为 Spark RDD 对象。

    % Create a MATLAB datastore (LOCAL)
    ds = datastore('airlinesmall.csv',...
        'TreatAsMissing','NA', ...
        'SelectedVariableNames','UniqueCarrier');
    % Convert MATLAB datastore to Spark RDD
    rdd = sc.datastoreToRDD(ds);

    一般来说,可以使用 SparkContext 类的以下方法创建输入 RDD:parallelizedatastoreToRDDtextFile

  5. RDD 对象执行运算。

    使用 Spark RDD 方法(例如 flatMap 将函数应用于 RDD 对象的所有元素并展平结果。先前创建的函数 carrierToCount 是将要应用于 RDD 元素的函数。函数 carrierToCount 的函数句柄作为输入参量传递给 flatMap 方法。

    maprdd = rdd.flatMap(@carrierToCount);
    redrdd = maprdd.reduceByKey( @(acc,value) acc+value );
    countdata = redrdd.collect();
    
    % Count and display carrier occurrences
    count = 0;
    for i=1:numel(countdata)
        count = count + countdata{i}{2};
        fprintf('\nCarrier Name: %s, Count: %d', countdata{i}{1}, countdata{i}{2});
    end
    fprintf('\n Total count : %d\n', count);
    
    % Delete Spark Context
    delete(sc)

    一般来说,您将向 Spark RDD 方法(称为转换操作)提供 MATLAB 函数句柄或匿名函数作为输入参量。这些函数句柄和匿名函数在已部署应用程序的工作进程上执行。

    有关受支持的 RDD 转换和操作的列表,请参阅变换操作RDD 类的方法部分。

    有关转换和操作的详细信息,请参阅Apache Spark 基础知识

  6. 创建一个独立应用程序。

    使用带有 -m 标志的 mcc 命令来创建独立应用程序。-m 标志创建一个可以从命令行运行的标准可执行文件。-a 标志包含来自文件夹 airlinesmall.csv 的依赖数据集 <matlabroot>/toolbox/matlab/demos。只要位于同一个工作文件夹中,mcc 命令就会自动选取依赖文件 carrierToCount.m

    >> mcc -m deployToSparkMlApiLocal.m -a <matlabroot>/toolbox/matlab/demos/airlinesmall.csv

    mcc 命令创建一个 shell 脚本 run_deployToSparkMlApiLocal.sh 来运行可执行文件 deployToSparkMlApiLocal

    有关详细信息,请参阅 mcc

  7. 使用以下命令从 Linux shell 运行独立应用程序:

    $ ./run_deployToSparkMlApiLocal.sh /share/MATLAB/MATLAB_Runtime/R2025a

    /share/MATLAB/MATLAB_Runtime/R2025a 是一个指示 MATLAB Runtime 位置的参量。

    在执行上述命令之前,请确保 javaclasspath.txt 文件与 shell 脚本和可执行文件位于同一文件夹中。

    如果您的应用程序找不到文件 javaclasspath.txt,它将无法执行。

    如果包含 Hadoop 配置文件文件夹位置的可选行被取消注释,您的应用程序也可能无法执行。要在 local 集群管理器上执行您的应用程序,必须注释掉此行。仅当您计划使用 yarn-client 作为集群管理器在启用 Spark Hadoop 集群上运行应用程序时,才应取消注释此行。

  8. 您将会看到以下输出:

    Carrier Name: 9E, Count: 521
    Carrier Name: AA, Count: 14930
    Carrier Name: AQ, Count: 154
    Carrier Name: AS, Count: 2910
    Carrier Name: B6, Count: 806
    Carrier Name: CO, Count: 8138
    ...
    ...
    ...
    Carrier Name: US, Count: 13997
    Carrier Name: WN, Count: 15931
    Carrier Name: XE, Count: 2357
    Carrier Name: YV, Count: 849
     Total count : 123523

代码:

 deployToSparkMlApiLocal.m

Hadoop YARN

Yarn-client 集群管理器代表启用 Spark 的 Hadoop 集群。Hadoop 2.0 中引入了 YARN 集群管理器。它通常安装在与 HDFS™ 相同的节点上。因此,在 YARN 上运行 Spark 可以让 Spark 轻松访问 HDFS 数据。在应用程序中,它使用单词 yarn-client 来表示。

由于使用 yarn-client 作为集群管理器部署应用程序的步骤与使用上面显示的本地集群管理器类似,因此对这些步骤的讨论很少。有关每个步骤的详细讨论,请查看本地上述情况。

注意

您可以按照相同的说明将使用 MATLAB API for Spark 创建的 Spark 应用程序部署到 CLOUDERA® CDH。要查看 MATLAB Answers™ 上的示例,请点击此处

要使用 CLOUDERA CDH 加密区域,请将 JAR 文件 commons-codec-1.9.jar 添加到 MATLAB Runtime的静态类路径。文件位置:$HADOOP_PREFIX/lib/commons-codec-1.9.jar,其中 $HADOOP_PREFIX 是 Hadoop 的安装位置。

前提条件

  1. 通过创建一个 MATLAB 搜索路径可见的新工作文件夹来开始此示例。

  2. MATLAB Runtime 安装在 Hadoop 集群中每个工作进程节点均可访问的文件夹中。此示例使用 /share/MATLAB/MATLAB_Runtime/R2025a 作为 MATLAB Runtime 文件夹的位置。

    如果您没有 MATLAB Runtime,您可以从以下网站下载:https: https://www.mathworks.com/products/compiler/mcr

  3. airlinesmall.csv 复制到 Hadoop 分布式文件系统 (HDFS) 文件夹 /user/<username>/datasets。这里,<username> 指的是您在 HDFS 中的用户名。

    $ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets

过程

  1. 设置环境变量 HADOOP_PREFIX 以指向您的 Hadoop 安装文件夹。这些属性对于向 Hadoop 集群提交作业是必需的。

    setenv('HADOOP_PREFIX','/share/hadoop/hadoop-2.6.0')

    使用 MATLABdatastore 函数指向 HDFS 上的数据时,必须设置 HADOOP_PREFIX 环境变量。设置此环境变量与 Spark 无关。有关详细信息,请参阅Spark 和 Hadoop 之间的关系

  2. 指定 Spark 属性。

    使用 containers.Map 对象指定 Spark 属性。

    sparkProperties = containers.Map( ...
        {'spark.executor.cores',...
        'spark.executor.memory',...
        'spark.yarn.executor.memoryOverhead',...
        'spark.dynamicAllocation.enabled',...
        'spark.shuffle.service.enabled',...
        'spark.eventLog.enabled',...
        'spark.eventLog.dir'}, ...
        {'1',...
        '2g',...
        '1024',...
        'true',...
        'true',...
        'true',...
        'hdfs://hadoop01glnxa64:54310/user/<username>/sparkdeploy'});

    有关 Spark 属性的详细信息,请展开 SparkConf 类的 输入参量 部分中 'SparkProperties' 名称-值对的 prop 值。

  3. 创建一个 SparkConf 对象。

    使用类 matlab.compiler.mlspark.SparkConf 创建 SparkConf 对象。

    conf = matlab.compiler.mlspark.SparkConf( ...
        'AppName','myApp', ...
        'Master','yarn-client', ...
        'SparkProperties',sparkProperties);

    有关 SparkConf 的详细信息,请参阅matlab.compiler.mlspark.SparkConf

  4. 创建一个 SparkContext 对象。

    使用类 matlab.compiler.mlspark.SparkContext 并以 SparkConf 对象作为输入来创建 SparkContext 对象。

    sc = matlab.compiler.mlspark.SparkContext(conf);

    有关 SparkContext 的详细信息,请参阅matlab.compiler.mlspark.SparkContext

  5. 从数据创建一个 RDD 对象。

    使用 MATLAB 函数 datastore 创建指向 HDFS 中的文件 airlinesmall.csvdatastore 对象。然后使用 SparkContext 方法 datastoreToRDDdatastore 对象转换为 Spark RDD 对象。

    % Create a MATLAB datastore (HADOOP)
    ds = datastore(...
        'hdfs:///user/<username>/datasets/airlinesmall.csv',...
        'TreatAsMissing','NA',...
        'SelectedVariableNames','UniqueCarrier');
    
    % Convert MATLAB datastore to Spark RDD
    rdd = sc.datastoreToRDD(ds);

    一般来说,可以使用 SparkContext 类的以下方法创建输入 RDD:parallelizedatastoreToRDDtextFile

  6. RDD 对象执行运算。

    使用 Spark RDD 方法(例如 flatMap 将函数应用于 RDD 对象的所有元素并展平结果。先前创建的函数 carrierToCount 是将要应用于 RDD 元素的函数。函数 carrierToCount 的函数句柄作为输入参量传递给 flatMap 方法。

    maprdd = rdd.flatMap(@carrierToCount);
    redrdd = maprdd.reduceByKey( @(acc,value) acc+value );
    countdata = redrdd.collect();
    
    % Count and display carrier occurrences
    count = 0;
    for i=1:numel(countdata)
        count = count + countdata{i}{2};
        fprintf('\nCarrier Code: %s, Count: %d', countdata{i}{1}, countdata{i}{2});
    end
    fprintf('\n Total count : %d\n', count);
    
    % Save results to MAT file
    save('countdata.mat','countdata');
    
    % Delete Spark Context
    delete(sc);

    有关受支持的 RDD 转换和操作的列表,请参阅变换操作RDD 类的方法部分。

    有关转换和操作的详细信息,请参阅Apache Spark 基础知识

  7. 创建一个独立应用程序。

    使用带有 -m 标志的 mcc 命令来创建独立应用程序。-m 标志创建一个可以从命令行运行的独立应用程序。您不需要附加数据集 airlinesmall.csv 因为它位于 HDFS 上。只要位于同一个工作文件夹中,mcc 命令就会自动选取依赖文件 carrierToCount.m

    >> mcc -m deployToSparkMlApiHadoop.m

    mcc 命令创建一个 shell 脚本 run_deployToSparkMlApiHadoop.sh 来运行可执行文件 deployToSparkMlApiHadoop

    有关详细信息,请参阅 mcc

  8. 使用以下命令从 Linux shell 运行独立应用程序:

    $ ./run_deployToSparkMlApiHadoop.sh /share/MATLAB/MATLAB_Runtime/R2025a

    /share/MATLAB/MATLAB_Runtime/R2025a 是一个指示 MATLAB Runtime 位置的参量。

    在执行上述命令之前,请确保 javaclasspath.txt 文件与 shell 脚本和可执行文件位于同一文件夹中。

    如果您的应用程序找不到文件 javaclasspath.txt,它将无法执行。

    如果包含 Hadoop 配置文件文件夹位置的可选行被注释,您的应用程序也可能无法执行。要在 yarn-client 集群管理器上执行您的应用程序,必须取消注释此行。仅当您计划使用本地集群管理器运行应用程序时,才应注释此行。

  9. 您将会看到以下输出:

    Carrier Name: 9E, Count: 521
    Carrier Name: AA, Count: 14930
    Carrier Name: AQ, Count: 154
    Carrier Name: AS, Count: 2910
    Carrier Name: B6, Count: 806
    Carrier Name: CO, Count: 8138
    ...
    ...
    ...
    Carrier Name: US, Count: 13997
    Carrier Name: WN, Count: 15931
    Carrier Name: XE, Count: 2357
    Carrier Name: YV, Count: 849
     Total count : 123523

    注意

    如果正在部署的应用程序是 MATLAB 函数而不是 MATLAB 脚本,请使用以下执行语法:

    $ ./run_<applicationName>.sh \
      <MATLAB_Runtime_Location> \
      [Spark arguments] \
      [Application arguments]
    例如:
    $ ./run_deployToSparkMlApiHadoop.sh.sh \
       /usr/local/MATLAB/MATLAB_Runtime/R2025a \
       yarn-client \
       hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \
       hdfs://host:54310/user/<username>/result

代码:

 deployToSparkMlApiHadoop.m