主要内容

本页采用了机器翻译。点击此处可查看最新英文版本。

close

关闭可轮询数据队列

自 R2025a 起

    说明

    close(pollablequeue) 关闭由 pollablequeue 指定的 PollableDataQueue 对象。您现在无法再使用 send 函数向队列发送数据。

    示例

    示例

    全部折叠

    定义一个函数 sendMessages,将消息发送到队列。它发送五条消息,在发送每条消息之间暂停一秒钟,以仿真工作。

    function sendMessages(queue)
        for i = 1:5
            send(queue,sprintf("Message %d from worker",i));
            pause(1);
        end
    end

    创建一个可轮询的数据队列,并使用 parfeval 在工作单元上执行 sendMessages 函数。暂停片刻,让工作单元发送一些消息。

    queue = parallel.pool.PollableDataQueue;
    f = parfeval(@sendMessages,0,queue);
    pause(2);

    要阻止工作单元向队列发送更多数据,请在队列上调用 close

    close(queue);

    等待未来对象 f 完成。使用未来对象 fError 属性来验证是否由于尝试向已关闭的队列发送消息而发生错误。

    wait(f);
    f.Error.message
    ans = 
    'Failed to send data because the DataQueue has been closed.'
    

    此示例展示了如何使用 close 函数向多个队列接收器发出不再发送数据的信号,从而避免发送多个“停止”消息。

    定义一个函数 dataGenFcn,通过生成随机数据并将结果通过 PollableDataQueue 对象发送给下一个工作单元来仿真数据采集。在为预定义的迭代次数生成数据后,该函数关闭 workerQueue,以向处理工作单元发出不再发送数据的信号。

    function dataGenFcn(workerQueue,numIter)
    for idx = 1:numIter
        data = randn(5e2);
        send(workerQueue,data);
        pause(0.1)
    end
    close(workerQueue);
    end

    定义函数 dataProcessFcn 以连续轮询队列中的数据,处理每个接收到的值,并存储结果。循环一直持续,直到处理工作单元清空已关闭的队列为止。当关闭队列为空时,轮询返回一个空数组,并将 OK 设置为 false,从而中断 while 循环。

    function allResults = dataProcessFcn(workerQueue)
    allResults = [];
    while true
        [data,OK] = poll(workerQueue,Inf);
        if ~OK
            break
        end
        result = max(real(eig(data)));
        allResults = [allResults;result];
        pause(0.2)
    end
    end

    启动一个具有 4 个工作单元的并行池。

    pool = parpool(4);
    Starting parallel pool (parpool) using the 'Processes' profile ...
    Connected to parallel pool with 4 workers.
    

    要启用工作单元之间的通信,请创建一个 PollableDataQueue 对象,并将 Destination 参量设置为 "any"。这种类型的 PollableDataQueue 对象允许任何工作单元发送和接收消息。

    queue = parallel.pool.PollableDataQueue(Destination="any");

    定义迭代次数,并使用 parfeval 以异步方式执行工作单元函数。第一个工作单元生成数据,池中的其他工作单元接收并处理数据。

    numIter = 100;
    dataGenFuture = parfeval(@dataGenFcn,0,queue,numIter);
    for f = 1:3
        dataProcessFutures(f) = parfeval(@dataProcessFcn,1,queue);
    end

    等待数据处理工作单元完成任务,然后使用 fetchOutputs 从处理工作单元中检索所有结果。显示结果的直方图。

    wait(dataProcessFutures);
    allResults = fetchOutputs(dataProcessFutures);
    histogram(allResults)

    输入参数

    全部折叠

    可轮询数据队列,指定为 PollableDataQueue 对象。

    关闭 PollableDataQueue 对象后,您将无法再向队列发送数据。任何尝试将数据发送到队列都会导致错误。您可以继续从队列中轮询数据。您无法重新打开已关闭的队列。

    示例: p = parallel.pool.PollableDataQueue;

    版本历史记录

    在 R2025a 中推出