主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

将 tall 数组部署到启用 SparkHadoop 集群

支持的平台:仅限 Linux®

此示例说明如何将包含 tall 数组的 MATLAB® 应用程序部署到启用 Spark™ Hadoop® 集群。

目标:从给定的数据集计算航空公司的平均到达延误和最大到达延误。

数据集:airlinesmall.csv
描述:

1987 年至 2008 年航空公司出发和到达信息。

位置:

要下载 airlinesmall.csv 文件,在 MATLAB 命令提示符下输入:

setupExample("matlab/AddKeysValuesExample", pwd)
请忽略与 airlinesmall.csv 文件一同自动下载的 AddKeysValuesExample.mlx 实时脚本文件。

注意

您可以按照相同的说明将包含 tall 数组的 Spark 应用程序部署到 CLOUDERA® CDH。要查看 MATLAB Answers™ 上的示例,请点击此处

要使用 CLOUDERA CDH 加密区域,请将 JAR 文件 commons-codec-1.9.jar 添加到 MATLAB Runtime的静态类路径。文件位置:$HADOOP_PREFIX/lib/commons-codec-1.9.jar,其中 $HADOOP_PREFIX 是 Hadoop 的安装位置。

注意

如果您使用的是 Spark 1.6 或更高版本,则需要将 Java® 中的 MATLAB 堆大小增加到至少 512MB。有关如何在 Java 中增加 MATLAB 堆大小的信息,请参阅Java 堆内存设置

前提条件

  1. 通过创建一个 MATLAB 搜索路径可见的新工作文件夹来开始此示例。

  2. MATLAB Runtime 安装在 Hadoop 集群中每个工作进程节点均可访问的文件夹中。此示例使用 /usr/local/MATLAB/MATLAB_Runtime/<ver> 作为 MATLAB Runtime 文件夹的位置。

    如果您没有 MATLAB Runtime,您可以从以下网站下载:https: https://www.mathworks.com/products/compiler/mcr

    注意

    替换对 MATLAB Runtime 版本的所有引用 <ver> 在此示例中,MATLAB Runtime 版本号与您的 MATLAB 版本相对应。有关与 MATLAB 版本对应的 MATLAB Runtime 版本号的信息,请参阅此列表

  3. 将文件 airlinesmall.csv 复制到 Hadoop 分布式文件系统 (HDFS™) 文件夹 /user/<username>/datasets。这里 <username> 指的是您在 HDFS 中的用户名。

    $ ./hadoop fs -copyFromLocal airlinesmall.csv hdfs://host:54310/user/<username>/datasets

过程

  1. 设置环境变量 HADOOP_PREFIX 以指向您的 Hadoop 安装文件夹。这些属性对于向 Hadoop 集群提交作业是必需的。

    setenv('HADOOP_PREFIX','/usr/lib/hadoop')

    使用 MATLABdatastore 函数指向 HDFS 上的数据时,必须设置 HADOOP_PREFIX 环境变量。设置此环境变量与 Spark 无关。有关详细信息,请参阅Spark 和 Hadoop 之间的关系

    如果您计划使用本地计算机上的数据集而不是 HDFS 上的数据集,那么您可以跳过此步骤。

    注意

    此示例使用 /usr/lib/hadoop 作为安装 Hadoop 目录。您的 Hadoop 安装目录可能不同。

  2. 指定 Spark 属性。

    使用 containers.Map 对象指定 Spark 属性。

    sparkProperties = containers.Map( ...
     {'spark.executor.cores', ...
     'spark.executor.memory', ...
     'spark.yarn.executor.memoryOverhead', ...
     'spark.dynamicAllocation.enabled', ...
     'spark.shuffle.service.enabled', ...
     'spark.eventLog.enabled', ...
     'spark.eventLog.dir'}, ...
     {'1', ...
      '2g', ...
      '1024', ...
      'true', ...
      'true', ...
      'true', ...
      'hdfs://host:54310/user/<username>/log'});

    有关 Spark 属性的详细信息,请展开 SparkConf 类的 输入参量 部分中 'SparkProperties' 名称-值对的 prop 值。SparkConf 类是 Spark 的 MATLAB API 的一部分,它提供了将 MATLAB 应用程序部署到 Spark 另一种方法。有关详细信息,请参阅使用 MATLAB API for Spark 部署应用程序

  3. 使用 Spark 参数配置包含 tall 数组的 MATLAB 应用程序。

    使用 matlab.mapreduce.DeploySparkMapReducer 类来配置您的 MATLAB 应用程序,其中包含以 Spark 参数作为键-值对组的 tall 数组。

    conf = matlab.mapreduce.DeploySparkMapReducer( ...
          'AppName','myTallApp', ...
          'Master','yarn-client', ...
          'SparkProperties',sparkProperties);

    有关详细信息,请参阅 matlab.mapreduce.DeploySparkMapReducer

  4. 定义 Spark 执行环境。

    使用 mapreducer 函数定义 Spark 执行环境。

    mapreducer(conf)

    有关详细信息,请参阅 mapreducer

  5. 包括包含 tall 数组的 MATLAB 应用程序代码。

    使用 MATLAB 函数 datastore 创建指向 HDFS 中的文件 airlinesmall.csvdatastore 对象。将 datastore 对象作为输入参量传递给 tall 函数。这将创建一个 tall 数组。您可以对高大数组执行操作来计算平均到达延迟和最大到达延迟。

    % Create a |datastore| for a collection of tabular text files representing airline data. 
    % Select the variables of interest, specify a categorical data type for the 
    % |Origin| and |Dest| variables.
    % ds = datastore('airlinesmall.csv') % if using a dataset on your local machine
    ds = datastore('hdfs:///<username>/datasets/airlinesmall.csv');
    ds.TreatAsMissing = 'NA';
    ds.SelectedVariableNames = {'Year','Month','ArrDelay','DepDelay','Origin','Dest'};
    ds.SelectedFormats(5:6) = {'%C','%C'};
    
    % Create Tall Array
    % Tall arrays are like normal MATLAB arrays, except that they can have any
    % number of rows. When a |tall| array is backed by a |datastore|, the underlying class of
    % the tall array is based on the type of datastore.
    tt = tall(ds);
    
    % Remove Rows with Missing Data or NaN Values
    idx = any(ismissing(tt),2); 
    tt(idx,:) = [];
    
    % Compute Mean Delay
    meanArrivalDelay = mean(tt.DepDelay,'omitnan');
    biggestDelays = topkrows(tt,10,'ArrDelay');
    
    % Gather Results
    % The |gather| function forces evaluation of all queued operations and
    % brings the resulting output back into memory. 
    [meanArrivalDelay,biggestDelays] = gather(meanArrivalDelay,biggestDelays)
    
    % Delete mapreducer object
    delete(conf);
  6. 创建 Spark 应用程序。

    使用带有 -vCW 选项的 mcc 命令来创建使用 Spark 3.x 的应用程序。

    >> mcc -vCW 'Spark:myTallApp,3' deployTallArrayToSpark.m

    创建以下文件。

    文件描述
    run_myTallApp.sh运行应用程序的 Shell 脚本。该脚本调用 spark-submit 在集群上启动应用程序。
    myTallApp.jar应用程序 JAR。应用程序 JAR 包含打包的 MATLAB 代码和其他依赖项。
    readme.txt自述文件包含有关如何运行该应用程序的详细信息。
    requiredMCRProducts.txt 
    mccExcludedFiles.log 

    有关详细信息,请参阅 mcc

  7. 使用以下命令从 Linux shell 运行该应用程序:

    $ ./run_myTallApp.sh /usr/local/MATLAB/MATLAB_Runtime/v##

    /usr/local/MATLAB/MATLAB_Runtime/<ver> 是一个指示 MATLAB Runtime 位置的参量。

  8. 您将会看到以下输出:

    meanArrivalDelay =
        7.1201
    
    biggestDelays =
    
      10x5 table
    
        Year    Month    ArrDelay    Origin    Dest
        ____    _____    ________    ______    ____
    
        1995    11       1014        HNL       LAX 
        2007     4        914        JFK       DTW 
        2001     4        887        MCO       DTW 
        2008     7        845        CMH       ORD 
        1988     3        772        ORD       LEX 
        2008     4        710        EWR       RDU 
        1998    10        679        MCI       DFW 
        2006     6        603        ABQ       PHX 
        2008     6        586        PIT       LGA 
        2007     4        568        RNO       SLC
    

或者,如果您想分析或查看应用程序在 MATLAB 中生成的结果,则需要使用 tall 数组的 write 函数将结果写入 HDFS 上的文件。然后您可以使用 datastore 函数读取该文件。

要将结果写入 HDFS 上的文件,请在 MATLAB 应用程序中的 delete(conf) 语句之前添加以下代码行,然后打包应用程序:

write('hdfs:///user/<username>/results', tall(biggestDelays));

<username> 替换为您的用户名。

对于 tall 数组,您只能使用 write 函数将一个变量保存到文件中。因此,如果您想保存多个变量,则需要写入多个文件。

要在针对启用 Spark 集群执行应用程序后在 MATLAB 中查看结果,请使用 datastore 函数,如下所示:

>> ds = datastore('hdfs:///user/<username>/results')
>> readall(ds)

如果您无法使用 datastore 函数查看结果,则可能需要使用函数 setenv 设置环境变量 HADOOP_PREFIX

注意

如果正在部署的 tall 数组应用程序是 MATLAB 函数而不是 MATLAB 脚本,请使用以下执行语法:

$ ./run_<applicationName>.sh \
  <MATLAB_Runtime_Location> \
  [Spark arguments] \
  [Application arguments]
例如:
$ ./run_myTallApp.sh \
   /usr/local/MATLAB/MATLAB_Runtime/v92 \
   yarn-client \
   hdfs://host:54310/user/<username>/datasets/airlinesmall.csv \
   hdfs://host:54310/user/<username>/result

代码:

 deployTallArrayToSpark.m