Use Tall Arrays on a Spark Cluster
This example shows how to modify a MATLAB® example of creating a tall table to run on a Spark™ cluster or a Spark enabled Hadoop® cluster. You can use this tall table to create tall arrays and calculate statistical properties. You can develop code locally and then scale up, to take advantage of the capabilities offered by Parallel Computing Toolbox™ and MATLAB Parallel Server™ without having to rewrite your algorithm. See also Big Data Workflow Using Tall Arrays and Datastores, Configure for Spark Clusters (MATLAB Parallel Server), and Configure a Hadoop Cluster (MATLAB Parallel Server)
Connect to Spark Cluster Using Cluster Profile
Since R2024a
Create and use a parallel.cluster.Spark
object
from a Spark cluster profile and use the mapreducer
to set the Spark cluster as the execution environment.
sparkCluster = parcluster("SparkProfile")
mr = mapreducer(sparkCluster)
To learn how to create a profile for your Spark cluster, see Client Configuration (MATLAB Parallel Server).
Manually Connect to Spark Cluster and Spark Enabled Hadoop Cluster
You can also connect to your Spark cluster without a cluster profile. First, you must set environment variables and cluster properties as appropriate for your specific Spark cluster configuration. See your system administrator for the values for these and other properties necessary for submitting jobs to your cluster.
Manually Create Cluster Object for Spark Cluster
Create a cluster object to connect to a Spark cluster from a MATLAB client.
Create the cluster object by specifying the Spark installation location on your machine. Use the mapreducer
function to set the
Spark cluster as the execution
environment.
cluster = parallel.cluster.Spark(SparkInstallFolder="/path/to/spark/install"); % Optionally, if you want to control the exact number of workers: cluster.SparkProperties('spark.executor.instances') = '16'; mapreducer(cluster);
Manually Create Cluster Object for Spark Enabled Hadoop Cluster
Create a cluster object to connect to a Spark enabled Hadoop cluster from a MATLAB client.
Use environment variables to specify the Hadoop cluster installation location and the Spark installation location on your machine. Create the cluster object and set the Spark enabled Hadoop cluster as the execution environment.
setenv('HADOOP_HOME', '/path/to/hadoop/install') setenv('SPARK_HOME', '/path/to/spark/install'); cluster = parallel.cluster.Hadoop; % Optionally, if you want to control the exact number of workers: cluster.SparkProperties('spark.executor.instances') = '16'; mapreducer(cluster);
Note
In the setup step, you use mapreducer
to set the
cluster execution environment. In the next step, you create a tall array. If
you modify or delete the cluster execution environment after creating a tall
array, then the tall array is invalid and you must recreate it.
Note
If you want to develop in serial and not use local workers, enter the following command.
mapreducer(0);
Creating and Using Tall Tables
You can now execute parallel MATLAB code on your Spark cluster instead of on your local machine.
These instructions show how to create and use tall tables on a Spark enabled Hadoop cluster, although this procedure can be used for any Spark cluster.
Create a datastore that points to a tabular file of airline flight data. Clean the
data by treating 'NA'
values as missing data so that the
datastore
function replaces them with
NaN
values.
ds = datastore('airlinesmall.csv'); varnames = {'ArrDelay', 'DepDelay'}; ds.SelectedVariableNames = varnames; ds.TreatAsMissing = "NA";
Create a tall table tt
from the datastore. MATLAB automatically starts a Spark job to run subsequent calculations on the tall table.
tt = tall(ds)
Starting a Spark job on the Hadoop cluster. This may take a few minutes while cluster resources are allocated ... Connected to the Spark job. tt = M×2 tall table ArrDelay DepDelay ________ ________ 8 12 8 1 21 20 13 12 4 -1 59 63 3 -2 11 -1 : : : :
The display indicates that the number of rows, M
, is not yet
known. M
is a placeholder until the calculation
completes.
Extract the arrival delay ArrDelay
from the tall table. This
action creates a new tall array variable to use in subsequent calculations.
a = tt.ArrDelay;
You can specify a series of operations on your tall array, which are not executed
until you call the gather
function. Doing so allows you
to batch up commands that might take a long time. As an example, calculate the mean
and standard deviation of the arrival delay. Use these values to construct the upper
and lower thresholds for delays that are within 1 standard deviation of the
mean.
m = mean(a,'omitnan'); s = std(a,'omitnan'); one_sigma_bounds = [m-s m m+s];
Use gather
to calculate one_sigma_bounds
,
and bring the answer into memory.
sig1 = gather(one_sigma_bounds)
Evaluating tall expression using the Spark Cluster: - Pass 1 of 1: Completed in 0.95 sec Evaluation completed in 1.3 sec sig1 = -23.4572 7.1201 37.6975
You can specify multiple inputs and outputs to gather
if you
want to evaluate several things at once. Doing so is faster than calling
gather
separately on each tall array. For example,
calculate the minimum and maximum arrival delay.
[max_delay, min_delay] = gather(max(a),min(a))
max_delay = 1014 min_delay = -64
Note
These examples take more time to complete the first time if MATLAB is starting on the cluster workers.
When using tall arrays on a Spark cluster, compute resources from the cluster will be reserved for the lifetime of the mapreducer execution environment. To clear these resources, you must delete the mapreducer:
delete(gcmr);
mapreducer(0);
See Also
gather
| tall
| datastore
| table
| mapreducer
| parallel.cluster.Hadoop
| parallel.cluster.Spark
Related Examples
- Big Data Workflow Using Tall Arrays and Datastores
- Use Tall Arrays on a Parallel Pool
- Configure for Spark Clusters (MATLAB Parallel Server)
- Configure a Hadoop Cluster (MATLAB Parallel Server)
- Tall Arrays for Out-of-Memory Data