主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

parallel.pool.DataQueue

在客户端和工作单元之间发送和监听数据

    说明

    DataQueue 对象可以在执行计算时并行自动处理并行池中工作单元和客户端之间发送的数据或消息。例如,您可以将中间值发送到客户端并自动计算计算的进度。

    要将数据从并行池工作单元发送回客户端,首先在客户端创建一个 DataQueue 对象。将此 DataQueue 传递到 parfor 循环或其他并行语言构造,例如 spmd。从工作单元调用 send 将数据发送回客户端。在客户端,指定一个函数来使用 afterEach 自动处理接收到的数据。

    • 如果需要,您可以从创建 DataQueue 的工作单元或客户端调用 sendDataQueue 仅将数据发送给创建 DataQueue 的客户端或工作单元。

    • 您可以在工作单元上创建队列并将其发送回客户端以实现反向通信。

      在 R2023b 之前的版本中: 您不能将队列从一个工作单元发送到另一个工作单元。要在工作单元之间传输数据,请改用 spmdspmdSendspmdReceive

    • 与所有其他句柄对象不同,DataQueuePollableDataQueue 实例在发送给工作单元时确实保持连接。

    创建对象

    描述

    q = parallel.pool.DataQueue 创建一个对象,您可以使用它在客户端和工作单元之间发送或监听消息(或数据)。在您想要接收数据的工作单元或客户端上创建 DataQueue

    示例

    属性

    全部展开

    此属性是只读的。

    等待从队列中删除的数据项数,指定为零或正整数。在创建 0 实例的工作单元或客户端上,该值为 DataQueue 或正整数。如果客户端创建了 DataQueue 实例,则所有工作单元上的值都是 0。如果一个工作单元创建了 DataQueue,则客户端和所有其他工作单元上的值为 0

    对象函数

    afterEach定义一个函数,当数据队列接收到新数据时调用
    send使用数据队列在客户端和工作单元之间发送数据

    示例

    全部折叠

    构造一个 DataQueue,并调用 afterEach

    q = parallel.pool.DataQueue;
    afterEach(q, @disp);
    
    启动 parfor 循环,并发送消息。待处理的消息被传递给 afterEach 函数,在此示例中为 @disp

    parfor i = 1:3
        send(q, i); 
    end;
         1
    
         2
    
         3

    有关使用 DataQueue 监听数据的更多详细信息,请参阅 afterEach

    当您向 DataQueue 对象发送消息时,该消息会在队列中等待,直到被侦听器处理。每条消息都会将 1 添加到队列长度。在此示例中,您使用 QueueLength 属性来查找 DataQueue 对象的长度。

    当客户端或工作单元创建 DataQueue 对象时,发送到队列的任何消息都会保存在该客户端或工作单元的内存中。如果客户端创建 DataQueue 对象,则所有工作单元上的 QueueLength 属性都是 0。在这个示例中,您在客户端创建一个 DataQueue 对象,并从一个工作单元发送数据。

    首先,创建一个具有一个工作单元的并行池。

    parpool(1);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 1 workers.
    

    然后,创建一个 DataQueue

    q = parallel.pool.DataQueue
    q = 
      DataQueue with properties:
    
        QueueLength: 0
    
    

    新创建的 DataQueue 有一个空队列。您可以使用 parfor 来查找工作单元上的 q.QueueLength。查找客户端上的队列长度,以及工作单元上的队列长度。

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    由于队列为空,对于客户端和工作单元来说,QueueLength 都是 0。接下来,从工作单元向队列发送一条消息。然后,使用 QueueLength 属性查找队列的长度。

    % Send a message first
    parfor i = 1
        send(q, 'A message');
    end
    
    % Find the length
    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 1
    
    parfor i = 1
        fprintf('On the worker: %i\n', q.QueueLength)
    end
    On the worker: 0
    

    QueueLength 属性在客户端上是 1,在工作单元上是 0。创建一个监听器来通过立即显示数据来处理队列。

    el = afterEach(q, @disp);

    等到队列为空,然后删除监听器。

    while q.QueueLength > 0
        pause(0.1);
    end
    delete(el);

    使用 QueueLength 属性来查找队列的长度。

    fprintf('On the client: %i\n', q.QueueLength)
    On the client: 0
    

    QueueLength0,因为队列处理已经完成。

    在此示例中,您使用 DataQueue 来更新等待栏,并使其具有 parfor 循环的进度。

    当您创建 parfor 循环时,您会将每次迭代卸载到并行池中的工作单元上。仅当 parfor 循环完成时,才会从工作单元返回信息。您可以使用 DataQueue 在每次迭代结束时更新等待栏。

    当您使用 parfor 循环的进度更新等待栏时,客户端必须记录有关剩余迭代次数的信息。

    提示

    如果您正在创建新的并行代码并想要监控代码的进度,请考虑使用 parfeval 工作流。有关详细信息,请参阅使用 afterEach 和 afterAll 异步更新用户界面

    本示例末尾定义的辅助函数 parforWaitbar 更新了等待栏。该函数使用 persistent 来存储有关剩余迭代次数的信息。

    使用 waitbar 创建等待栏,w

    w = waitbar(0,'Please wait ...');

    创建 DataQueue, D。然后在消息发送到 afterEach 后使用 parforWaitbar 运行 DataQueue

    % Create DataQueue and listener
    D = parallel.pool.DataQueue;
    afterEach(D,@parforWaitbar);

    设置 parfor 循环、N 的迭代次数。使用等待条 w 和迭代次数 N 来初始化函数 parforWaitbar

    parfor 循环的每次迭代结束时,客户端都会运行 parforWaitbar 并逐步更新等待栏。

    N = 100;
    parforWaitbar(w,N)
    

    函数 parforWaitbar 使用持久变量来存储在客户端上完成的迭代次数。无需向工作单元提供任何信息。

    运行 parfor 循环并进行 N 迭代。对于这个示例,使用 pauserand 来仿真一些工作。每次迭代后,使用 sendDataQueue 发送消息。当消息发送到 DataQueue 时,等待栏会更新。由于不需要工作单元提供任何信息,因此发送一条空消息以避免不必要的数据传输。

    parfor 循环完成后,使用 delete 关闭等待栏。

    parfor i = 1:N
        pause(rand)
        send(D,[]);
    end
    
    delete(w);
    

    定义辅助函数 parforWaitbar。当您使用两个输入参量运行 parforWaitbar 时,该函数会初始化三个持久变量(counthN)。当您使用一个输入参量运行 parforWaitbar 时,等待栏会更新。

    function parforWaitbar(waitbarHandle,iterations)
        persistent count h N
        
        if nargin == 2
            % Initialize
            
            count = 0;
            h = waitbarHandle;
            N = iterations;
        else
            % Update the waitbar
            
            % Check whether the handle is a reference to a deleted object
            if isvalid(h)
                count = count + 1;
                waitbar(count / N,h);
            end
        end
    end

    Status bar indicating roughly one third completion.

    此示例显示如何使用 parfeval 执行并行参数扫描,并在使用 DataQueue 对象进行计算时发回结果。

    parfeval 不会阻止 MATLAB,因此您可以在进行计算时继续工作。

    该示例对 Lorenz 常微分方程系统执行参数扫描,参数为 σρ,并展示了该系统的混沌性质。

    ddtx=σ(y-z)ddty=x(ρ-z)-yddtz=xy-βx

    设置并行环境

    使用 parpool 函数创建一个并行线程工作单元池。

    parpool("Threads");
    Starting parallel pool (parpool) using the 'Threads' profile ...
    Connected to parallel pool with 6 workers.
    

    创建参数网格

    定义您想要在参数扫描中探索的参数范围。

    gridSize = 40;
    sigma = linspace(5,45,gridSize);
    rho = linspace(50,100,gridSize);
    beta = 8/3;

    使用 meshgrid 函数创建二维参数网格。

    [rho,sigma] = meshgrid(rho,sigma);

    执行并行参数扫描

    定义参数后,即可执行并行参数扫描。

    为了直观地显示参数扫描的中期结果,请创建一个表面图。请注意,使用 Z 初始化表面的 NaN 组件会创建一个空图。

    figure;
    surface = surf(rho,sigma,NaN(size(sigma)));
    xlabel('\rho','Interpreter','Tex')
    ylabel('\sigma','Interpreter','Tex')

    要从工作单元发送临时数据,请创建一个 DataQueue 对象。设置一个函数,每次工作单元使用 afterEach 函数发送数据时更新表面图。updatePlot 函数是示例末尾定义的支持函数。

    Q = parallel.pool.DataQueue;
    afterEach(Q,@(data) updatePlot(surface,data));

    当您分配工作量时,parfeval 的工作效率会更高。为了分配工作负载,将要探索的参数分组到各个分区中。对于此示例,使用冒号运算符 (step) 分成大小为 : 的均匀分区。结果数组 partitions 包含分区的边界。请注意,必须添加最后一个分区的终点。

    step = 100;
    partitions = [1:step:numel(sigma),numel(sigma)+1]
    partitions = 1×17
    
               1         101         201         301         401         501         601         701         801         901        1001        1101        1201        1301        1401        1501        1601
    
    

    为了获得最佳性能,请尝试分成以下分区:

    • 足够大,以至于计算时间与调度分区的开销相比很大。

    • 足够小,以便有足够的分区来让所有工作单元忙碌。

    为了表示并行工作单元上的函数执行并保存其结果,请使用 future 对象。

    f(1:numel(partitions)-1) = parallel.FevalFuture;

    使用 parfeval 函数将计算卸载到并行工作单元。parameterSweep 是在此脚本末尾定义的辅助函数,用于在要探索的参数分区上解决 Lorenz 系统。它有一个输出参量,因此必须将 1 指定为 parfeval 中的输出数量。

    for ii = 1:numel(partitions)-1
        f(ii) = parfeval(@parameterSweep,1,partitions(ii),partitions(ii+1),sigma,rho,beta,Q);
    end

    parfeval 不会阻止 MATLAB,因此您可以在进行计算时继续工作。工作单元并行计算,并在中间结果可用时立即通过 DataQueue 发送。

    如果您想要阻止 MATLAB 直到 parfeval 完成,请在 future 对象上使用 wait 函数。当后续代码依赖于 wait 的完成时,使用 parfeval 函数很有用。

    wait(f);

    parfeval 完成计算后,wait 也完成,您就可以执行更多代码。例如,绘制 Lorenz 系统解决方案的选择。使用 fetchOutputs 函数检索存储在 future 对象中的结果。

    results = fetchOutputs(f);
    idxs = randperm(numel(results),4);
    figure
    for n = 1:numel(idxs)
        nexttile
        a = results{idxs(n)};
        plot3(a(:,1),a(:,2),a(:,3))
        grid on
        xlabel("x")
        ylabel("y")
        zlabel("z")
        title("Lorenz System Solution", ...
            "\rho = "+ num2str(rho(idxs(n)),'%5.2f') + " \sigma = "+ num2str(sigma(idxs(n)),'%5.2f'),Interpreter="tex")
    end

    如果您的参数扫描需要更多的计算资源并且您可以访问集群,那么您可以扩大您的 parfeval 计算。有关详细信息,请参阅从桌面扩展到集群

    定义辅助函数

    定义一个辅助函数,用于在要探索的参数分区上求解洛仑兹方程组。使用 send 对象上的 DataQueue 函数将中间结果发送到 MATLAB 客户端。

    function results = parameterSweep(first,last,sigma,rho,beta,Q)
        results = cell(last-first,1);
        for ii = first:last-1
            lorenzSystem = @(t,a) [sigma(ii)*(a(2) - a(1)); a(1)*(rho(ii) - a(3)) - a(2); a(1)*a(2) - beta*a(3)];
            [t,a] = ode45(lorenzSystem,[0 100],[1 1 1]);
            send(Q,[ii,a(end,3)]);
            results{ii-first+1} = a;
        end
    end

    定义另一个辅助函数,当新数据到达时更新曲面图。

    function updatePlot(surface,data)
        surface.ZData(data(1)) = data(2);
        drawnow('limitrate');
    end

    提示

    • 您只能自动处理使用 DataQueue 对象发送的数据或消息。要在客户端或工作单元收到数据后手动检索数据,请使用 parallel.pool.PollableDataQueue 对象发送数据。

    扩展功能

    全部展开

    版本历史记录

    在 R2017a 中推出