Main Content

matlab.io.datastore.Partitionable 类

命名空间: matlab.io.datastore

为数据存储添加并行支持

描述

matlab.io.datastore.Partitionable 是一个抽象的 mixin 类,可为您的自定义数据存储添加并行支持,以便用于 Parallel Computing Toolbox™ 和 MATLAB® Parallel Server™

要使用此 mixin 类,除了从 matlab.io.Datastore 基类继承之外,还必须从 matlab.io.datastore.Partitionable 类继承。键入以下语法作为类定义文件的第一行:

classdef MyDatastore < matlab.io.Datastore & ...
                       matlab.io.datastore.Partitionable
    ...
end

要为自定义数据存储添加并行处理支持,您还必须:

有关创建支持并行处理的自定义数据存储的详细信息和步骤,请参阅Develop Custom Datastore

方法

maxpartitions 可能的最大分区数
numpartitions默认分区数
partition 划分数据存储

属性

Sealedfalse

有关类属性的信息,请参阅类属性

示例

全部折叠

构建支持并行处理的数据存储,并使用它将您的自定义或专有数据导入 MATLAB®。然后,在并行池中处理数据。

创建一个 .m 类定义文件,其中包含实现自定义数据存储的代码。您必须将此文件保存在工作文件夹或 MATLAB® 路径上的文件夹中。.m 文件的名称必须与对象构造函数的名称相同。例如,如果您希望构造函数的名称为 MyDatastorePar,则 .m 文件的名称必须为 MyDatastorePar.m.m 类定义文件必须包含以下步骤:

  • 步骤 1:从数据存储类继承。

  • 步骤 2:定义构造函数和必需的方法。

  • 步骤 3:定义您的自定义文件读取函数。

除这些步骤之外,还需要定义处理和分析数据所需的所有其他属性或方法。

%% STEP 1: INHERIT FROM DATASTORE CLASSES
classdef MyDatastorePar < matlab.io.Datastore & ...
        matlab.io.datastore.Partitionable
   
    properties(Access = private)
        CurrentFileIndex double
        FileSet matlab.io.datastore.DsFileSet
    end
    
    % Property to support saving, loading, and processing of
    % datastore on different file system machines or clusters.
    % In addition, define the methods get.AlternateFileSystemRoots()
    % and set.AlternateFileSystemRoots() in the methods section. 
    properties(Dependent)
        AlternateFileSystemRoots
    end
    
%% STEP 2: DEFINE THE CONSTRUCTOR AND THE REQUIRED METHODS
    methods
        % Define your datastore constructor
        function myds = MyDatastorePar(location,altRoots)
            myds.FileSet = matlab.io.datastore.DsFileSet(location,...
                'FileExtensions','.bin', ...
                'FileSplitSize',8*1024);
            myds.CurrentFileIndex = 1;
             
            if nargin == 2
                 myds.AlternateFileSystemRoots = altRoots;
            end
            
            reset(myds);
        end
        
        % Define the hasdata method
        function tf = hasdata(myds)
            % Return true if more data is available
            tf = hasfile(myds.FileSet);
        end
        
        % Define the read method
        function [data,info] = read(myds)
            % Read data and information about the extracted data
            % See also: MyFileReader()
            if ~hasdata(myds)
                msgII = ['Use the reset method to reset the datastore ',... 
                         'to the start of the data.']; 
                msgIII = ['Before calling the read method, ',...
                          'check if data is available to read ',...
                          'by using the hasdata method.'];
                error('No more data to read.\n%s\n%s',msgII,msgIII);
            end
            
            fileInfoTbl = nextfile(myds.FileSet);
            data = MyFileReader(fileInfoTbl);
            info.Size = size(data);
            info.FileName = fileInfoTbl.FileName;
            info.Offset = fileInfoTbl.Offset;
            
            % Update CurrentFileIndex for tracking progress
            if fileInfoTbl.Offset + fileInfoTbl.SplitSize >= ...
                    fileInfoTbl.FileSize
                myds.CurrentFileIndex = myds.CurrentFileIndex + 1 ;
            end
        end
        
        % Define the reset method
        function reset(myds)
            % Reset to the start of the data
            reset(myds.FileSet);
            myds.CurrentFileIndex = 1;
        end

        % Define the partition method
        function subds = partition(myds,n,ii)
            subds = copy(myds);
            subds.FileSet = partition(myds.FileSet,n,ii);
            reset(subds);
        end
        
        % Getter for AlternateFileSystemRoots property
        function altRoots = get.AlternateFileSystemRoots(myds)
            altRoots = myds.FileSet.AlternateFileSystemRoots;
        end

        % Setter for AlternateFileSystemRoots property
        function set.AlternateFileSystemRoots(myds,altRoots)
            try
              % The DsFileSet object manages AlternateFileSystemRoots
              % for your datastore
              myds.FileSet.AlternateFileSystemRoots = altRoots;

              % Reset the datastore
              reset(myds);  
            catch ME
              throw(ME);
            end
        end
      
    end
    
    methods (Hidden = true)          
        % Define the progress method
        function frac = progress(myds)
            % Determine percentage of data read from datastore
            if hasdata(myds) 
               frac = (myds.CurrentFileIndex-1)/...
                             myds.FileSet.NumFiles; 
            else 
               frac = 1;  
            end 
        end
    end
    
    methods(Access = protected)
        % If you use the  FileSet property in the datastore,
        % then you must define the copyElement method. The
        % copyElement method allows methods such as readall
        % and preview to remain stateless 
        function dscopy = copyElement(ds)
            dscopy = copyElement@matlab.mixin.Copyable(ds);
            dscopy.FileSet = copy(ds.FileSet);
        end
        
        % Define the maxpartitions method
        function n = maxpartitions(myds)
            n = maxpartitions(myds.FileSet);
        end
    end
end

%% STEP 3: IMPLEMENT YOUR CUSTOM FILE READING FUNCTION
function data = MyFileReader(fileInfoTbl)
% create a reader object using FileName
reader = matlab.io.datastore.DsFileReader(fileInfoTbl.FileName);

% seek to the offset
seek(reader,fileInfoTbl.Offset,'Origin','start-of-file');

% read fileInfoTbl.SplitSize amount of data
data = read(reader,fileInfoTbl.SplitSize);

end

您的自定义数据存储现在已准备就绪。使用您的自定义数据存储来读取并处理并行池中的数据。

使用自定义数据存储预览您的专有数据并将数据读取到 MATLAB 中进行并行处理。

此示例使用简单的数据集来说明使用自定义数据存储的工作流。该数据集是由 15 个二进制 (.bin) 文件构成的集合,其中每个文件包含一列(1 个变量)和 10000 行无符号整数(10000 条记录)。

dir('*.bin')
binary_data01.bin  binary_data02.bin  binary_data03.bin  binary_data04.bin  binary_data05.bin  binary_data06.bin  binary_data07.bin  binary_data08.bin  binary_data09.bin  binary_data10.bin  binary_data11.bin  binary_data12.bin  binary_data13.bin  binary_data14.bin  binary_data15.bin  

使用 MyDatastorePar 函数创建一个数据存储对象。有关 MyDatastorePar 的实现详细信息,请参阅示例构建支持并行处理的数据存储

folder = fullfile('*.bin'); 
ds = MyDatastorePar(folder); 

预览数据存储中的数据。

preview(ds)
ans = 8x1 uint8 column vector

   113
   180
   251
    91
    29
    66
   254
   214

确定数据存储的分区数。如果您拥有 Parallel Computing Toolbox (PCT),则可以使用 n = numpartitions(ds,myPool),其中 myPoolgcpparpool

n = numpartitions(ds); 

将数据存储划分为 n 个部分,分配给并行池中的 n 个工作进程。

parfor ii = 1:n
    subds = partition(ds,n,ii);
      while hasdata(subds)
        data = read(subds);
        % do something
      end
end

要通过涉及不同平台云或集群计算机的并行和分布式计算来处理您的数据存储,必须预先定义 'AlternateFileSystemRoots' 参数。例如,在本地计算机上创建一个数据存储并分析一小部分数据。然后,使用 Parallel Computing Toolbox 和 MATLAB Parallel Server 将分析范围扩大到整个数据集。

使用 MyDatastorePar 创建数据存储并为 'AlternateFileSystemRoots' 属性赋值。有关 MyDatastorePar 的实现细节,请参阅示例Build Datastore with Parallel Processing Support

要设置 'AlternateFileSystemRoots' 属性的值,请确定您的数据在不同平台上的根路径。根路径因计算机或文件系统而不同。例如,如果您使用下面的根路径访问数据:

  • Windows® 计算机上的 "Z:\DataSet"

  • MATLAB Parallel Server Linux® 集群上的 "/nfs-bldg001/DataSet"

请使用 AlternateFileSystemRoots 属性关联这些根路径。

altRoots = ["Z:\DataSet","/nfs-bldg001/DataSet"];
ds = MyDatastorePar('Z:\DataSet',altRoots);

在本地计算机上分析一小部分数据。例如,获取数据的分区子集,并通过删除任何缺失的条目来清理数据。然后,检查变量的绘图。

tt = tall(partition(ds,100,1)); 
summary(tt); 
% analyze your data                        
tt = rmmissing(tt);               
plot(tt.MyVar1,tt.MyVar2)     

使用 MATLAB Parallel Server 集群(Linux 集群)将分析范围扩大到整个数据集。例如,使用集群配置文件启动工作进程池,然后使用并行和分布式计算功能对整个数据集进行分析。

parpool('MyMjsProfile') 
tt = tall(ds);          
summary(tt);
% analyze your data
tt = rmmissing(tt);               
plot(tt.MyVar1,tt.MyVar2)

提示

  • 对于自定义数据存储实现,最好不要实现 numpartitions 方法。

版本历史记录

在 R2017b 中推出