Can I limit number of items in parallel.pool.PollableDataQueue?
8 次查看(过去 30 天)
显示 更早的评论
I have a problem where I need to read and decode data from multiple packetised data streams. I currently have a system object that reads individual data streams (Reader) and another system object (MultiReader) that calls multiple readers and returns synchronised data.
The decoding is quite computationaly expensive so I'm trying to parallelise this. I have a solution that looks like it might work. I'm basically running the individual readers in parallel. A set of workers from a parallel pool are launched with parfeval and use the existing readers to return decoded data packets asynchronously in a parallel.pool.PollableDataQueue. The client receives the data from the workers, synchronises the streams and returns combined/synchronised data.
At times the workers can read the data faster than the client can consume it, there is a risk of exhausting available memory. However, there doesn't seem to be any easy way to limit the amount of data in the queues.
If I was using Python I would just set the maxsize parameter of the Queue object; adding items would then block when the number of items was above maxsize. Matlab's PollableDataQueue does not seem to have a similar capability.
As a workaround I tried the following in the worker:
while queue.QueueLength >= maxsize
pause(1)
end
send(queue, message)
But this doesn't seem to work. I assume the QueueLength attibute only "works" when called from the client. This behaviour appears undocumented. Am I missing something? I say "does not seem to work" because I'm finding it really difficult to debug code running on workers. Apart from looking at the Error and Diary attributes of the tasks in the client are there any tricks?
Returning to the original problem, the only other idea I had was to create an additional queue for control messages from the client to worker. These queues would need to be created in the workers and passed back to the client! However, this approach really does not fit into the structure of the code at the client end, which is pretty complicated as it is. Is there not a better way to do this?
Any ideas gratefully received.
0 个评论
回答(1 个)
Thomas Falch
2025-5-15,13:58
Starting with R2025a you can use the new "any-destination" PollableDataQueue to solve several of your problems. Unlike the old DataQueue/PollableDataQueue the any-destination PollableDataQueue can be created on any client/worker, and any client/worker with a copy can send or receive. You can create a any-destination PollableDataQueue like this:
queue = parallel.pool.PollableDataQueue(Destination="any");
The documentation is here: PollableDataQueue - Send and poll data between client and workers - MATLAB
With the any-destination queue you can more easily limit the number of elements of on the queue. QueueLength is essentially the number of elements that can be polled of the queue on this client/worker. For the old queue only one client/worker can receive, and the value if therefore allways 0 everywhere else. For the new any-destination queue, any client/worker can be a destination, and QueueLength is therefore valid everywhere. You can therefore check it before adding elements to the queue to limit the number of elements on the queue and overwhelming the receiver.
Alternatively, you can create a "control queue" which you can use to send messages to the workers to tell them to start/stop/pause etc. Previously you had to create the queue on the worker for this to work. With the any-destination queue, you can create it on the client, send it to a worker, and use it to send from the client to the worker (or from the worker back to the client).
Finally, the any-destination PollableDataQueue makes it easier to send from one worker to a different worker. (This could also be done with the old queue, but was very cumersome, since you still needed to create the queue at it's final destination, and then get it to the sending worker). Since you can send from worker to worker, you can have one worker producing the data, and multiple workers processing the data to keep up with the producer. You can see an example of how this is done here: Perform Data Acquisition and Processing on Pool Workers - MATLAB & Simulink
0 个评论
另请参阅
类别
在 Help Center 和 File Exchange 中查找有关 Startup and Shutdown 的更多信息
Community Treasure Hunt
Find the treasures in MATLAB Central and discover how the community can help you!
Start Hunting!