主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

parallel.pool.PollableDataQueue

在客户端和工作单元之间发送和轮询数据

    说明

    PollableDataQueue 对象允许在计算期间,在并行池中的客户端和工作单元之间同步发送和轮询数据或消息。例如,从工作单元中,您可以将中间值发送到客户端或另一个工作单元,并在另一个计算中使用这些值。您还可以:

    • 从客户端或任何工作单元发送数据。

    • 创建一个 PollableDataQueue 对象,只允许创建队列的客户端或工作单元接收数据。

    • 创建一个 PollableDataQueue 对象,允许客户端或池中的任何工作单元接收数据。 (自 R2025a 起)

    与所有其他句柄对象不同,PollableDataQueueDataQueue 对象在传输时仍保持连接。

    创建对象

    描述

    q = parallel.pool.PollableDataQueue 创建一个 PollableDataQueue 对象,您可以使用该对象在客户端和工作单元之间发送和轮询数据。生成的 PollableDataQueue 对象只能由创建它的客户端或工作单元轮询。在您想要接收数据的工作单元或客户端上创建 PollableDataQueue

    示例

    q = parallel.pool.PollableDataQueue(Destination=destination) 设置 PollableDataQueue 对象的目标行为。 (自 R2025a 起)

    如果希望客户端或任何工作单元能够轮询 PollableDataQueue 对象以接收数据,请设置 Destination="any"

    示例

    输入参量

    全部展开

    自 R2025a 起

    队列的目标行为,指定为以下值之一:

    • "creator" - 只允许创建队列的客户端或工作单元轮询队列并接收数据。发送到队列的任何数据都会立即发送到创建该队列的客户端或工作单元。

    • "any" - 允许客户端或并行池中的任何工作单元轮询队列以接收数据。数据在队列中等待,并被发送到轮询队列的客户端或工作单元,该客户端或工作单元成为特定数据的接收方。

    属性

    全部展开

    自 R2025a 起

    使用 close 对象函数关闭队列后,此属性为只读属性。

    队列关闭状态,以以下值之一表示:

    • false - 队列未关闭,您可以向队列发送数据。

    • true - 队列已关闭,您无法向队列发送数据。任何尝试将数据发送到队列都会导致错误。您可以继续从队列中轮询数据。您无法重新打开已关闭的队列。

    数据类型: logical

    此 属性 为只读。

    工作单元或客户端可能轮询以接收的、当前保存在队列中的数据项的数量,以零或正整数表示。

    使用 Destination 名称-值参量设置的队列的目标行为决定了 QueueLength 属性的值:

    • 如果创建 PollableDataQueue 对象时未设置 Destination 参量,或者将 Destination 设置为 "creator",则在创建 PollableDataQueue 对象的工作单元或客户端上,QueueLength0 或一个正整数。

      • 如果客户端创建了 PollableDataQueue 对象,则所有工作单元上的值均为 0

      • 如果一个工作单元创建了 PollableDataQueue,则客户端和所有其他工作单元上的值为 0

    • 如果将 Destination 设置为 "any",则客户端和所有工作单元上的值为 0 或一个正整数。

    在 R2025a 之前的版本中: 在创建 PollableDataQueue 对象的工作单元或客户端上,QueueLength 属性的值为 0 或一个正整数。如果客户端创建了 PollableDataQueue 对象,则所有工作单元上的值均为 0。如果一个工作单元创建了 PollableDataQueue,则客户端和所有其他工作单元上的值为 0

    对象函数

    close关闭可轮询数据队列
    poll 从可轮询数据队列中检索已发送的数据
    send使用数据队列在客户端和工作单元之间发送数据

    示例

    全部折叠

    创建一个 PollableDataQueue 对象。

    p = parallel.pool.PollableDataQueue;
    

    运行一个 parfor 循环,并发送一条消息,例如值为 1 的数据。

    parfor idx = 1
        send(p,idx); 
    end
    

    轮询结果。

    poll(p)
    1
    

    有关使用 PollableDataQueue 对象进行数据轮询的详细信息,请参阅 poll

    自 R2025a 起

    使用 PollableDataQueue 对象并将 Destination 设置为 "any",从客户端向并行池中的多个工作单元发送消息。

    启动一个由四个线程工作单元组成的池。

    numWorkers = 4;
    pool = parpool("Threads",numWorkers);
    Starting parallel pool (parpool) using the 'Threads' profile ...
    Connected to parallel pool with 4 workers.
    

    创建两个 PollableDataQueue 对象、一个名为 workerPdq 的队列(通过将 Destination 设置为 "any") 来创建,用于向工作单元发送消息)和一个名为 clientPdq 的队列,用于从工作单元接收返回的消息。

    workerPdq = parallel.pool.PollableDataQueue(Destination="any");
    clientPdq = parallel.pool.PollableDataQueue;

    使用 parfevalOnAll 在所有工作单元上执行 analyzeMessage 辅助函数。将 workerPdqclientPdq 队列作为参量传递给函数。

    parfevalOnAll(@analyzeMessage,0,workerPdq,clientPdq);

    通过 workerPdq 队列向每个工作单元发送一条个性化消息。

    for idx = 1:numWorkers
        send(workerPdq,compose("Hello, Worker %d!",idx));
    end

    轮询 clientPdq 队列以接收来自工作单元的消息。使用 inf 无限期等待每条消息。

    for idx = 1:numWorkers
        poll(clientPdq,inf)
    end
    ans = 
    "Worker 1 received message!"
    
    ans = 
    "Worker 2 received message!"
    
    ans = 
    "Worker 3 received message!"
    
    ans = 
    "Worker 4 received message!"
    

    定义每个工作单元执行的辅助函数 analyzeMessage。该函数从 inQueue 队列中轮询消息,并提取工作单元编号。然后,该函数将确认消息发送回 outQueue 队列。

    function analyzeMessage(inQueue,outQueue)
        message = poll(inQueue,2);
        workerNum = sscanf(message,"Hello, Worker %u");
        send(outQueue,compose("Worker %d received message!",workerNum));
        pause(2)
    end

    当您向 PollableDataQueue 对象发送消息时,该消息会在队列中等待。每条消息都会将 1 添加到队列长度。当您使用 poll 时,将从队列中收集一条消息。在此示例中,您使用 QueueLength 属性查找 PollableDataQueue 对象的长度,并观察 Destination 参量对其的影响。

    首先,创建一个具有一个工作单元的并行池。

    parpool(1);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 1 workers.
    

    创建一个 PollableDataQueue 对象。默认情况下,parallel.pool.PollableDataQueue 函数创建一个 PollableDataQueue 对象,并将 Destination 设置为 "creator"。此类 PollableDataQueue 对象只允许创建队列的客户端或工作单元轮询该对象以获取数据。

    queue = parallel.pool.PollableDataQueue
    queue = 
     PollableDataQueue with properties: 
    
              QueueLength: 0
                 IsClosed: false
    
    

    初始时,队列为空。检查客户端和工作单元的队列长度。客户端和工作单元的 QueueLength 属性值均为 0

    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 0
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queue.QueueLength)
    end
    Queue length on the worker: 0
    

    接下来,从工作单元向队列发送一条消息。然后,使用 QueueLength 属性查找队列的长度。将 Destination 设置为 "creator" 后,客户端(创建队列的客户端)上的 QueueLength 属性值为 1,工作单元上的属性值为 0

    parfor idx = 1
        send(queue,"A message");
    end
    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 1
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queue.QueueLength)
    end
    Queue length on the worker: 0
    

    使用 poll 从队列中检索消息。

    msg = poll(queue)
    msg = 
    "A message"
    

    请再次检查队列的长度。由于您删除了消息,QueueLength 属性的值现在为 0

    fprintf("Queue length on the client: %i\n",queue.QueueLength)
    Queue length on the client: 0
    

    创建一个 PollableDataQueue 对象,将 Destination 设置为 "any"。此命令创建一个 PollableDataQueue 对象,客户端或池中的任何工作单元都可以轮询该对象以接收数据。

    queueAny = parallel.pool.PollableDataQueue(Destination="any")
    queueAny = 
     PollableDataQueue with properties: 
    
              QueueLength: 0
                 IsClosed: false
    
    

    向此队列发送消息。

    parfor idx = 1
        send(queueAny,"Another message");
    end

    检查队列长度。将 Destination 设置为 "any" 后,客户端和工作单元都显示 QueueLength 属性的值为 1,表明客户端或工作单元可以轮询队列以接收数据。

    fprintf("Queue length on the client: %i\n", queueAny.QueueLength);
    Queue length on the client: 1
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queueAny.QueueLength);
    end
    Queue length on the worker: 1
    

    最后,从队列中检索消息并检查队列长度。QueueLength 属性的值为 0,因为队列处理已完成。

    msg = poll(queueAny)
    msg = 
    "Another message"
    
    fprintf("Queue length o the client: %i\n",queueAny.QueueLength);
    Queue length o the client: 0
    
    parfor idx = 1
        fprintf("Queue length on the worker: %i\n",queueAny.QueueLength);
    end
    Queue length on the worker: 0
    

    提示

    • 您只能手动检索使用 PollableDataQueue 对象发送的数据或消息。要在客户端收到数据后自动处理数据,请使用 parallel.pool.DataQueue 对象发送数据。

    • 在 R2025a 之前的版本中: 要将数据从并行池工作单元发送回客户端,首先在客户端创建一个 PollableDataQueue 对象。在 parfor 循环或其他并行语言构造(如 parfeval)中传递此 PollableDataQueue 对象。从工作单元调用 send 将数据发送回客户端。在客户端,使用 poll 来检索从工作单元发送的消息或数据的结果。

    • 在 R2025a 之前的版本中: 要从客户端向工作单元发送数据,请在工作单元上创建队列,然后将其发送回客户端。有关此工作流的示例,请参阅 接收有关工作单元的通信

    • 在 R2023b 之前的版本中: 您无法将数据从一个工作单元发送到另一个工作单元。要在工作单元之间传输数据,请改用 spmdspmdSendspmdReceive

    扩展功能

    全部展开

    版本历史记录

    在 R2017a 中推出

    全部展开