将 tall 数组部署到启用 Spark 的 Hadoop 集群
支持的平台:仅限 Linux®。
此示例说明如何将包含 tall 数组的 MATLAB® 应用程序部署到启用 Spark™ Hadoop® 集群。
目标:从给定的数据集计算航空公司的平均到达延误和最大到达延误。
| 数据集: | airlinesmall.csv |
| 描述: | 1987 年至 2008 年航空公司出发和到达信息。 |
| 位置: | 要下载 setupExample("matlab/AddKeysValuesExample", pwd)airlinesmall.csv 文件一同自动下载的 AddKeysValuesExample.mlx 实时脚本文件。 |
注意
您可以按照相同的说明将包含 tall 数组的 Spark 应用程序部署到 CLOUDERA® CDH。要查看 MATLAB Answers™ 上的示例,请点击此处。
要使用 CLOUDERA CDH 加密区域,请将 JAR 文件 commons-codec-1.9.jar 添加到 MATLAB Runtime的静态类路径。文件位置:$HADOOP_PREFIX/lib/commons-codec-1.9.jar,其中 $HADOOP_PREFIX 是 Hadoop 的安装位置。
注意
如果您使用的是 Spark 1.6 或更高版本,则需要将 Java® 中的 MATLAB 堆大小增加到至少 512MB。有关如何在 Java 中增加 MATLAB 堆大小的信息,请参阅Java 堆内存设置。
前提条件
通过创建一个 MATLAB 搜索路径可见的新工作文件夹来开始此示例。
将 MATLAB Runtime 安装在 Hadoop 集群中每个工作进程节点均可访问的文件夹中。此示例使用
/usr/local/MATLAB/MATLAB_Runtime/作为 MATLAB Runtime 文件夹的位置。<ver>如果您没有 MATLAB Runtime,您可以从以下网站下载:https:
https://www.mathworks.com/products/compiler/mcr。注意
替换对 MATLAB Runtime 版本的所有引用
在此示例中,MATLAB Runtime 版本号与您的 MATLAB 版本相对应。有关与 MATLAB 版本对应的 MATLAB Runtime 版本号的信息,请参阅此列表。<ver>将文件
airlinesmall.csv复制到 Hadoop 分布式文件系统 (HDFS™) 文件夹/user/<username>/datasets。这里<username>指的是您在 HDFS 中的用户名。$ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets
过程
设置环境变量
HADOOP_PREFIX以指向您的 Hadoop 安装文件夹。这些属性对于向 Hadoop 集群提交作业是必需的。setenv('HADOOP_PREFIX','/usr/lib/hadoop')
使用 MATLAB
datastore函数指向 HDFS 上的数据时,必须设置HADOOP_PREFIX环境变量。设置此环境变量与 Spark 无关。有关详细信息,请参阅Spark 和 Hadoop 之间的关系。如果您计划使用本地计算机上的数据集而不是 HDFS 上的数据集,那么您可以跳过此步骤。
注意
此示例使用
/usr/lib/hadoop作为安装 Hadoop 目录。您的 Hadoop 安装目录可能不同。指定 Spark 属性。
使用
containers.Map对象指定 Spark 属性。sparkProperties = containers.Map( ... {'spark.executor.cores', ... 'spark.executor.memory', ... 'spark.yarn.executor.memoryOverhead', ... 'spark.dynamicAllocation.enabled', ... 'spark.shuffle.service.enabled', ... 'spark.eventLog.enabled', ... 'spark.eventLog.dir'}, ... {'1', ... '2g', ... '1024', ... 'true', ... 'true', ... 'true', ... 'hdfs://host:54310/user/<username>/log'});
有关 Spark 属性的详细信息,请展开
SparkConf类的 输入参量 部分中'SparkProperties'名称-值对的prop值。SparkConf类是 Spark 的 MATLAB API 的一部分,它提供了将 MATLAB 应用程序部署到 Spark 另一种方法。有关详细信息,请参阅使用 MATLAB API for Spark 部署应用程序。使用 Spark 参数配置包含 tall 数组的 MATLAB 应用程序。
使用
matlab.mapreduce.DeploySparkMapReducer类来配置您的 MATLAB 应用程序,其中包含以 Spark 参数作为键-值对组的 tall 数组。conf = matlab.mapreduce.DeploySparkMapReducer( ... 'AppName','myTallApp', ... 'Master','yarn-client', ... 'SparkProperties',sparkProperties);
有关详细信息,请参阅
matlab.mapreduce.DeploySparkMapReducer。定义 Spark 执行环境。
使用
mapreducer函数定义 Spark 执行环境。mapreducer(conf)
有关详细信息,请参阅
mapreducer。包括包含 tall 数组的 MATLAB 应用程序代码。
使用 MATLAB 函数
datastore创建指向 HDFS 中的文件airlinesmall.csvdatastore对象。将datastore对象作为输入参量传递给tall函数。这将创建一个 tall 数组。您可以对高大数组执行操作来计算平均到达延迟和最大到达延迟。% Create a |datastore| for a collection of tabular text files representing airline data. % Select the variables of interest, specify a categorical data type for the % |Origin| and |Dest| variables. % ds = datastore('airlinesmall.csv') % if using a dataset on your local machine ds = datastore('hdfs:///<username>/datasets/airlinesmall.csv'); ds.TreatAsMissing = 'NA'; ds.SelectedVariableNames = {'Year','Month','ArrDelay','DepDelay','Origin','Dest'}; ds.SelectedFormats(5:6) = {'%C','%C'}; % Create Tall Array % Tall arrays are like normal MATLAB arrays, except that they can have any % number of rows. When a |tall| array is backed by a |datastore|, the underlying class of % the tall array is based on the type of datastore. tt = tall(ds); % Remove Rows with Missing Data or NaN Values idx = any(ismissing(tt),2); tt(idx,:) = []; % Compute Mean Delay meanArrivalDelay = mean(tt.DepDelay,'omitnan'); biggestDelays = topkrows(tt,10,'ArrDelay'); % Gather Results % The |gather| function forces evaluation of all queued operations and % brings the resulting output back into memory. [meanArrivalDelay,biggestDelays] = gather(meanArrivalDelay,biggestDelays) % Delete mapreducer object delete(conf);
创建 Spark 应用程序。
使用带有
-vCW选项的mcc命令来创建使用 Spark 3.x 的应用程序。>> mcc -vCW 'Spark:myTallApp,3' deployTallArrayToSpark.m创建以下文件。
文件 描述 run_myTallApp.sh运行应用程序的 Shell 脚本。该脚本调用 spark-submit在集群上启动应用程序。myTallApp.jar应用程序 JAR。应用程序 JAR 包含打包的 MATLAB 代码和其他依赖项。 readme.txt自述文件包含有关如何运行该应用程序的详细信息。 requiredMCRProducts.txtmccExcludedFiles.log有关详细信息,请参阅
mcc。使用以下命令从 Linux shell 运行该应用程序:
$ ./run_myTallApp.sh /usr/local/MATLAB/MATLAB_Runtime/v##
/usr/local/MATLAB/MATLAB_Runtime/是一个指示 MATLAB Runtime 位置的参量。<ver>您将会看到以下输出:
meanArrivalDelay = 7.1201 biggestDelays = 10x5 table Year Month ArrDelay Origin Dest ____ _____ ________ ______ ____ 1995 11 1014 HNL LAX 2007 4 914 JFK DTW 2001 4 887 MCO DTW 2008 7 845 CMH ORD 1988 3 772 ORD LEX 2008 4 710 EWR RDU 1998 10 679 MCI DFW 2006 6 603 ABQ PHX 2008 6 586 PIT LGA 2007 4 568 RNO SLC
或者,如果您想分析或查看应用程序在 MATLAB 中生成的结果,则需要使用 tall 数组的 write 函数将结果写入 HDFS 上的文件。然后您可以使用 datastore 函数读取该文件。
要将结果写入 HDFS 上的文件,请在 MATLAB 应用程序中的 delete(conf) 语句之前添加以下代码行,然后打包应用程序:
write('hdfs:///user/<username>/results', tall(biggestDelays));将 <username> 替换为您的用户名。
对于 tall 数组,您只能使用 write 函数将一个变量保存到文件中。因此,如果您想保存多个变量,则需要写入多个文件。
要在针对启用 Spark 集群执行应用程序后在 MATLAB 中查看结果,请使用 datastore 函数,如下所示:
>> ds = datastore('hdfs:///user/<username>/results')
>> readall(ds)如果您无法使用 datastore 函数查看结果,则可能需要使用函数 setenv 设置环境变量 HADOOP_PREFIX。
注意
如果正在部署的 tall 数组应用程序是 MATLAB 函数而不是 MATLAB 脚本,请使用以下执行语法:
$ ./run_<applicationName>.sh \ <MATLAB_Runtime_Location> \ [Spark arguments] \ [Application arguments]
$ ./run_myTallApp.sh \ /usr/local/MATLAB/MATLAB_Runtime/v92 \ yarn-client \ hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \ hdfs://host:54310/user/<username>/result
代码:
