使用 MATLAB API for Spark 将应用程序部署到 Spark
支持的平台:仅限 Linux®。
此示例向您说明如何使用 MATLAB® API for Spark™ 将独立应用程序部署到 Spark。您可以使用两个受支持的集群管理器之一在 Spark 上部署您的应用程序:本地和 Hadoop® YARN。此示例向您说明如何使用两个集群管理器部署应用程序。有关集群管理器的讨论,请参阅Spark 支持的集群管理器。
目标:计算给定数据集中唯一航空公司的数量。
| 数据集: | airlinesmall.csv |
| 描述: | 1987 年至 2008 年航空公司出发和到达信息。 |
| 位置: | 要下载 setupExample("matlab/AddKeysValuesExample", pwd)airlinesmall.csv 文件一同自动下载的 AddKeysValuesExample.mlx 实时脚本文件。 |
辅助函数
使用以下代码创建一个名为 carrierToCount.m 的 MATLAB 文件:
function results = carrierToCount(input) tbl = input{1}; intermKeys = tbl.UniqueCarrier; [intermKeys, ~, idx] = unique(intermKeys); intermValues = num2cell(accumarray(idx, ones(size(idx)))); results = cellfun( @(x,y) {x,y} , ... intermKeys, intermValues, ... 'UniformOutput',false);
注意
如果您使用的是 Spark 1.6 或更高版本,则需要将 Java® 中的 MATLAB 堆大小增加到至少 512MB。有关如何在 Java 中增加 MATLAB 堆大小的信息,请参阅Java 堆内存设置。
本地
本地集群管理器代表一个伪 Spark 启用集群,在单台计算机上以非分布式模式工作。它可以配置为使用一个工作进程线程,或者在多核计算机上配置多个工作进程线程。在应用中,它用单词 local 表示。在启用 Spark Hadoop 集群上进行全面部署之前,本地集群管理器可方便地调试您的应用程序。
前提条件
通过创建一个 MATLAB 搜索路径可见的新工作文件夹来开始此示例。
创建上面提到的辅助函数
carrierToCount.m。
过程
指定 Spark 属性。
使用
containers.Map对象指定 Spark 属性。sparkProp = containers.Map(... {'spark.executor.cores',... 'spark.matlab.worker.debug'},... {'1',... 'true'});
Spark 属性指示正在部署的应用程序的 Spark 执行环境。每个应用程序都必须配置特定的 Spark 属性才能部署。
有关 Spark 属性的详细信息,请展开
SparkConf类的 输入参量 部分中'SparkProperties'名称-值对的prop值。创建一个
SparkConf对象。使用类
matlab.compiler.mlspark.SparkConf创建SparkConf对象。SparkConf对象存储部署到 Spark 的应用程序的配置参数。应用程序的配置参数通过 SparkContext 传递到 Spark 集群。conf = matlab.compiler.mlspark.SparkConf(... 'AppName', 'mySparkAppDepLocal', ... 'Master', 'local[1]', ... 'SparkProperties', sparkProp );
有关 SparkConf 的详细信息,请参阅
matlab.compiler.mlspark.SparkConf。创建一个
SparkContext对象。使用类
matlab.compiler.mlspark.SparkContext并以SparkConf对象作为输入来创建SparkContext对象。sc = matlab.compiler.mlspark.SparkContext(conf);
SparkContext对象通过初始化与 Spark 集群的连接充当 Spark 的入口点。它接受SparkConf对象作为输入参量,并使用该对象中指定的参数来设置与 Spark 执行环境建立连接所需的内部服务。有关 SparkContext 的详细信息,请参阅
matlab.compiler.mlspark.SparkContext。从数据创建一个
RDD对象。使用 MATLAB 函数
datastore创建指向文件airlinesmall.csvdatastore对象。然后使用 SparkContext 方法datastoreToRDD将datastore对象转换为 SparkRDD对象。% Create a MATLAB datastore (LOCAL) ds = datastore('airlinesmall.csv',... 'TreatAsMissing','NA', ... 'SelectedVariableNames','UniqueCarrier'); % Convert MATLAB datastore to Spark RDD rdd = sc.datastoreToRDD(ds);
一般来说,可以使用
SparkContext类的以下方法创建输入 RDD:parallelize、datastoreToRDD和textFile。对
RDD对象执行运算。使用 Spark RDD 方法(例如
flatMap将函数应用于RDD对象的所有元素并展平结果。先前创建的函数carrierToCount是将要应用于 RDD 元素的函数。函数carrierToCount的函数句柄作为输入参量传递给flatMap方法。maprdd = rdd.flatMap(@carrierToCount); redrdd = maprdd.reduceByKey( @(acc,value) acc+value ); countdata = redrdd.collect(); % Count and display carrier occurrences count = 0; for i=1:numel(countdata) count = count + countdata{i}{2}; fprintf('\nCarrier Name: %s, Count: %d', countdata{i}{1}, countdata{i}{2}); end fprintf('\n Total count : %d\n', count); % Delete Spark Context delete(sc)
一般来说,您将向 Spark RDD 方法(称为转换和操作)提供 MATLAB 函数句柄或匿名函数作为输入参量。这些函数句柄和匿名函数在已部署应用程序的工作进程上执行。
有关受支持的 RDD 转换和操作的列表,请参阅变换和操作在
RDD类的方法部分。有关转换和操作的详细信息,请参阅Apache Spark 基础知识。
创建一个独立应用程序。
使用带有
-m标志的mcc命令来创建独立应用程序。-m标志创建一个可以从命令行运行的标准可执行文件。-a标志包含来自文件夹airlinesmall.csv的依赖数据集<matlabroot>/toolbox/matlab/demos。只要位于同一个工作文件夹中,mcc命令就会自动选取依赖文件carrierToCount.m。>> mcc -m deployToSparkMlApiLocal.m -a <matlabroot>/toolbox/matlab/demos/airlinesmall.csv
mcc命令创建一个 shell 脚本run_deployToSparkMlApiLocal.sh来运行可执行文件deployToSparkMlApiLocal。有关详细信息,请参阅
mcc。使用以下命令从 Linux shell 运行独立应用程序:
$ ./run_deployToSparkMlApiLocal.sh /share/MATLAB/MATLAB_Runtime/R2025a
/share/MATLAB/MATLAB_Runtime/R2025a是一个指示 MATLAB Runtime 位置的参量。在执行上述命令之前,请确保
javaclasspath.txt文件与 shell 脚本和可执行文件位于同一文件夹中。如果您的应用程序找不到文件
javaclasspath.txt,它将无法执行。如果包含 Hadoop 配置文件文件夹位置的可选行被取消注释,您的应用程序也可能无法执行。要在
local集群管理器上执行您的应用程序,必须注释掉此行。仅当您计划使用yarn-client作为集群管理器在启用 Spark Hadoop 集群上运行应用程序时,才应取消注释此行。您将会看到以下输出:
Carrier Name: 9E, Count: 521 Carrier Name: AA, Count: 14930 Carrier Name: AQ, Count: 154 Carrier Name: AS, Count: 2910 Carrier Name: B6, Count: 806 Carrier Name: CO, Count: 8138 ... ... ... Carrier Name: US, Count: 13997 Carrier Name: WN, Count: 15931 Carrier Name: XE, Count: 2357 Carrier Name: YV, Count: 849 Total count : 123523
代码:
Hadoop YARN
Yarn-client 集群管理器代表启用 Spark 的 Hadoop 集群。Hadoop 2.0 中引入了 YARN 集群管理器。它通常安装在与 HDFS™ 相同的节点上。因此,在 YARN 上运行 Spark 可以让 Spark 轻松访问 HDFS 数据。在应用程序中,它使用单词 yarn-client 来表示。
由于使用 yarn-client 作为集群管理器部署应用程序的步骤与使用上面显示的本地集群管理器类似,因此对这些步骤的讨论很少。有关每个步骤的详细讨论,请查看本地上述情况。
注意
您可以按照相同的说明将使用 MATLAB API for Spark 创建的 Spark 应用程序部署到 CLOUDERA® CDH。要查看 MATLAB Answers™ 上的示例,请点击此处。
要使用 CLOUDERA CDH 加密区域,请将 JAR 文件 commons-codec-1.9.jar 添加到 MATLAB Runtime的静态类路径。文件位置:$HADOOP_PREFIX/lib/commons-codec-1.9.jar,其中 $HADOOP_PREFIX 是 Hadoop 的安装位置。
前提条件
通过创建一个 MATLAB 搜索路径可见的新工作文件夹来开始此示例。
将 MATLAB Runtime 安装在 Hadoop 集群中每个工作进程节点均可访问的文件夹中。此示例使用
/share/MATLAB/MATLAB_Runtime/R2025a作为 MATLAB Runtime 文件夹的位置。如果您没有 MATLAB Runtime,您可以从以下网站下载:https:
https://www.mathworks.com/products/compiler/mcr。将
airlinesmall.csv复制到 Hadoop 分布式文件系统 (HDFS) 文件夹/user/<username>/datasets。这里,<username>指的是您在 HDFS 中的用户名。$ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets
过程
设置环境变量
HADOOP_PREFIX以指向您的 Hadoop 安装文件夹。这些属性对于向 Hadoop 集群提交作业是必需的。setenv('HADOOP_PREFIX','/share/hadoop/hadoop-2.6.0')
使用 MATLAB
datastore函数指向 HDFS 上的数据时,必须设置HADOOP_PREFIX环境变量。设置此环境变量与 Spark 无关。有关详细信息,请参阅Spark 和 Hadoop 之间的关系。指定 Spark 属性。
使用
containers.Map对象指定 Spark 属性。sparkProperties = containers.Map( ... {'spark.executor.cores',... 'spark.executor.memory',... 'spark.yarn.executor.memoryOverhead',... 'spark.dynamicAllocation.enabled',... 'spark.shuffle.service.enabled',... 'spark.eventLog.enabled',... 'spark.eventLog.dir'}, ... {'1',... '2g',... '1024',... 'true',... 'true',... 'true',... 'hdfs://hadoop01glnxa64:54310/user/<username>/sparkdeploy'});
有关 Spark 属性的详细信息,请展开
SparkConf类的 输入参量 部分中'SparkProperties'名称-值对的prop值。创建一个
SparkConf对象。使用类
matlab.compiler.mlspark.SparkConf创建SparkConf对象。conf = matlab.compiler.mlspark.SparkConf( ... 'AppName','myApp', ... 'Master','yarn-client', ... 'SparkProperties',sparkProperties);
有关 SparkConf 的详细信息,请参阅
matlab.compiler.mlspark.SparkConf。创建一个
SparkContext对象。使用类
matlab.compiler.mlspark.SparkContext并以SparkConf对象作为输入来创建SparkContext对象。sc = matlab.compiler.mlspark.SparkContext(conf);
有关 SparkContext 的详细信息,请参阅
matlab.compiler.mlspark.SparkContext。从数据创建一个
RDD对象。使用 MATLAB 函数
datastore创建指向 HDFS 中的文件airlinesmall.csvdatastore对象。然后使用 SparkContext 方法datastoreToRDD将datastore对象转换为 SparkRDD对象。% Create a MATLAB datastore (HADOOP) ds = datastore(... 'hdfs:///user/<username>/datasets/airlinesmall.csv',... 'TreatAsMissing','NA',... 'SelectedVariableNames','UniqueCarrier'); % Convert MATLAB datastore to Spark RDD rdd = sc.datastoreToRDD(ds);
一般来说,可以使用
SparkContext类的以下方法创建输入 RDD:parallelize、datastoreToRDD和textFile。对
RDD对象执行运算。使用 Spark RDD 方法(例如
flatMap将函数应用于RDD对象的所有元素并展平结果。先前创建的函数carrierToCount是将要应用于 RDD 元素的函数。函数carrierToCount的函数句柄作为输入参量传递给flatMap方法。maprdd = rdd.flatMap(@carrierToCount); redrdd = maprdd.reduceByKey( @(acc,value) acc+value ); countdata = redrdd.collect(); % Count and display carrier occurrences count = 0; for i=1:numel(countdata) count = count + countdata{i}{2}; fprintf('\nCarrier Code: %s, Count: %d', countdata{i}{1}, countdata{i}{2}); end fprintf('\n Total count : %d\n', count); % Save results to MAT file save('countdata.mat','countdata'); % Delete Spark Context delete(sc);
有关受支持的 RDD 转换和操作的列表,请参阅变换和操作在
RDD类的方法部分。有关转换和操作的详细信息,请参阅Apache Spark 基础知识。
创建一个独立应用程序。
使用带有
-m标志的mcc命令来创建独立应用程序。-m标志创建一个可以从命令行运行的独立应用程序。您不需要附加数据集airlinesmall.csv因为它位于 HDFS 上。只要位于同一个工作文件夹中,mcc命令就会自动选取依赖文件carrierToCount.m。>> mcc -m deployToSparkMlApiHadoop.m
mcc命令创建一个 shell 脚本run_deployToSparkMlApiHadoop.sh来运行可执行文件deployToSparkMlApiHadoop。有关详细信息,请参阅
mcc。使用以下命令从 Linux shell 运行独立应用程序:
$ ./run_deployToSparkMlApiHadoop.sh /share/MATLAB/MATLAB_Runtime/R2025a
/share/MATLAB/MATLAB_Runtime/R2025a是一个指示 MATLAB Runtime 位置的参量。在执行上述命令之前,请确保
javaclasspath.txt文件与 shell 脚本和可执行文件位于同一文件夹中。如果您的应用程序找不到文件
javaclasspath.txt,它将无法执行。如果包含 Hadoop 配置文件文件夹位置的可选行被注释,您的应用程序也可能无法执行。要在 yarn-client 集群管理器上执行您的应用程序,必须取消注释此行。仅当您计划使用本地集群管理器运行应用程序时,才应注释此行。
您将会看到以下输出:
Carrier Name: 9E, Count: 521 Carrier Name: AA, Count: 14930 Carrier Name: AQ, Count: 154 Carrier Name: AS, Count: 2910 Carrier Name: B6, Count: 806 Carrier Name: CO, Count: 8138 ... ... ... Carrier Name: US, Count: 13997 Carrier Name: WN, Count: 15931 Carrier Name: XE, Count: 2357 Carrier Name: YV, Count: 849 Total count : 123523
注意
如果正在部署的应用程序是 MATLAB 函数而不是 MATLAB 脚本,请使用以下执行语法:
例如:$ ./run_<applicationName>.sh \ <MATLAB_Runtime_Location> \ [Spark arguments] \ [Application arguments]
$ ./run_deployToSparkMlApiHadoop.sh.sh \ /usr/local/MATLAB/MATLAB_Runtime/R2025a \ yarn-client \ hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \ hdfs://host:54310/user/<username>/result
代码:
