is it possible to perform a nested Batch?

9 次查看(过去 30 天)
In this link, it is shown how parfor can be used inside a batch call (see figure bellow). Instead of using parfor, can I call multiple batch (e.g., three batch using a for loop) inside a main batch. I have tried to implement that, but the nested batch does not work and I cannot figure out the reason.
Any hint will be very much appreciatted.
UPDATE (08.10.2024): ChatGPT says it is possible, but it seems not to be possible: Here is a simple case that shows it i not possible. You can observe that is not executed:
Script:
% Clear any existing jobs
clc; clear all;
% Submit the outer batch job using the modified cluster
outerJob = batch(@outerFunction, 0, 'CaptureDiary', true);
% Wait for the outer job to complete
wait(outerJob);
% Display the diary from the outer job
disp('Diary from outer job:');
diary(outerJob);
% Clean up the outer job
delete(outerJob);
outerFunction:
function output = outerFunction()
% OUTERFUNCTION - This function submits an inner batch job.
fprintf('Outer function started.\n');
% Get the cluster object (workers can use 'localMJS' profile)
myCluster = parcluster(); % Uses the cluster profile of the worker
% Define the input for the inner function
innerInput = 5;
disp(['InnerInput before',num2str(innerInput)])
% Submit the inner batch job
innerJob = batch(myCluster, @innerFunction, 1, {innerInput}, 'CaptureDiary', true);
disp(['InnerInput after ',num2str(innerInput)])
% Wait for the inner job to complete
wait(innerJob);
% Fetch outputs from the inner job
outputs = fetchOutputs(innerJob);
innerOutput = outputs{1};
% Display the diary from the inner job
fprintf('Diary from inner job:\n');
diary(innerJob); % Displays the diary content
fprintf('Outer function received output from inner job: %d\n', innerOutput);
% Clean up the inner job
delete(innerJob);
fprintf('Outer function completed.\n');
% Return the output
output = innerOutput;
end
inner function :
function output = innerFunction(input)
% INNERFUNCTION - This function performs a simple computation.
fprintf('Inner function is running.\n');
output = input^2; % Square the input
fprintf('Inner function computed output: %d\n', output);
end
Can someone give more insights on the problem?
  1 个评论
Sam Marshalik
Sam Marshalik 2024-10-7
Can you expand a bit on why you are looking to do nest batch submissions? Are you ultimately looking to submit a batch job that runs a nested function multiple times with different inputs?
It seems that parfor or parfeval could be a sufficient solution. You mentioned performance: in your tests, are you saying that if you run your nested function in a parfor loop it runs slower compared to batch offloading it?

请先登录,再进行评论。

回答(4 个)

Akshat Dalal
Akshat Dalal 2024-10-7
Hi Rub,
You can have a look at the following documentation regarding nested parallelism. It outlines all different scenarios and optimizations: https://www.mathworks.com/help/parallel-computing/nested-parfor-loops-and-for-loops.html
  1 个评论
Rub Ron
Rub Ron 2024-10-7
Hi, maybe I was not clear, I dont want to use parfor, because for my application it slower as using batch

请先登录,再进行评论。


Abhas
Abhas 2024-10-7
Upon going through the documentations, I found that running nested "batch" calls in MATLAB can be tricky because each "batch" call runs in its own independent MATLAB session. This means that a "batch" job cannot directly start another "batch" job.
However, you can work around this limitation by structuring your code to manage the execution flow appropriately using the following steps:
  1. Main Batch Job: Start by creating a main "batch" job that will manage the execution of other tasks.
  2. Independent Tasks: Instead of nesting "batch" calls, you can structure the tasks so that they can be run independently. You can use files or a shared data store to pass information between these tasks if needed.
Here's a simple MATLAB code to demonstrate the same:
% Main script to start the batch job
mainJob = batch(@mainFunction, 0, {}, 'Pool', 0);
function mainFunction()
% Define the number of tasks
numTasks = 3;
% Preallocate for job handles
jobHandles = cell(1, numTasks);
for i = 1:numTasks
% Call a function that starts a new batch job
jobHandles{i} = batch(@taskFunction, 0, {}, 'Pool', 0);
end
% Wait for all jobs to finish
for i = 1:numTasks
wait(jobHandles{i});
end
% Optionally, retrieve results or perform cleanup
for i = 1:numTasks
destroy(jobHandles{i});
end
end
% Define the task function
function taskFunction()
% Your task-specific code here
disp('Running a task...');
pause(5); % Simulate some work
end
You may refer to the following MathWorks documentation links to know more about the same:
  1. https://www.mathworks.com/help/parallel-computing/index.html
  2. https://www.mathworks.com/help/parallel-computing/run-code-on-parallel-pools.html
  2 个评论
Rub Ron
Rub Ron 2024-10-7
I have that implementation, but it seems not to work
Abhas
Abhas 2024-10-7
You can confirm that it works by observing the status of the "mainJob" object in my code. It should transition from "queued" to "finished," and you can also check the duration it takes to complete.

请先登录,再进行评论。


Walter Roberson
Walter Roberson 2024-10-7
Once you have reached the limit on the number of simultaneous batch jobs, individual batch jobs are not going to suspend themselves in such a way that new batch jobs can start.
for i = 1:length(parm)
top_jobs{i} = batch(@my_function, 0, {parm(i)});
end
Those additional batch jobs are going to be queued, but not started until an existing batch job ends.
  4 个评论
Sam Marshalik
Sam Marshalik 2024-10-8
@Matt J: There is some nested parallelization within parpool. A parallel worker can initiate a Threaded parpool. You can for example request multiple workers and each of those workers can initiate a Threaded parpool.
Here is an example:
>> parpool("Processes", 1)
Starting parallel pool (parpool) using the 'Processes' profile ...
Connected to parallel pool with 1 workers.
ans =
ProcessPool with properties:
Connected: true
NumWorkers: 1
Busy: false
Cluster: Processes (Local Cluster)
AttachedFiles: {}
AutoAddClientPath: true
FileStore: [1x1 parallel.FileStore]
ValueStore: [1x1 parallel.ValueStore]
IdleTimeout: 30 minutes (30 minutes remaining)
SpmdEnabled: true
>> spmd, parpool("Threads", 1), end
Worker 1:
Starting parallel pool (parpool) using the 'Threads' profile ...
Connected to parallel pool with 1 workers.
ans =
ThreadPool with properties:
NumWorkers: 1
Busy: false
FileStore: [1x1 parallel.FileStore]
ValueStore: [1x1 parallel.ValueStore]
@Rub Ron: What kind of paralle language are you using in your nested batch function that you want to parallelize? Is it parfor, parfeval, etc. or are you using multi-threaded functions?
Rub Ron
Rub Ron 2024-10-8
@Sam Marshalik. Hi. I have posted a simple example case (please see my updated post), without a foor loop. When I run the example case, the function innerFunction (which in the nested batch) is not executed.

请先登录,再进行评论。


Stuart Moulder
Stuart Moulder 2024-10-9
The following code uses parfeval continuations to allow task to spawn further tasks. This can either be run directly on the user MATLAB with a parallel pool as:
pool = parpool('myProfile', numWorkers);
results = spawningTasks(numTotalTasks);
or as a batch job:
cluster = parcluster('myProfile');
job = cluster.batch(@spawningTasks, 1, {numTotalTasks}, 'Pool', numWorkers);
The code works as follows:
  1. outerFunctionCreateInputs either generates a result directly and posts it to the resultsQueue, or generates a variable number of input arguments for child tasks.
  2. The afterEach for the outerFunctionCreateInputs parfeval means that on completion the user MATLAB checks if any child task inputs where created, and if so submits a parfeval to execute innerFunction on each input.
  3. The afterAll on the parfevals in step 2 means that once all child tasks have completed, a parfeval is submitted which runs outerFunctionConsumeOutputs to combine all child task results and posts it to the resultsQueue.
  4. The user MATLAB simple polls the resultsQueue for the expected number of results.
The advantage of this code is that it will execute efficiently for a parallel pool of any size.
Rather than trying to get workers to submit more work, we using continuations to trigger the user MATLAB to submit more work. To do this we have had to split a single long running task into two tasks, one to create child task inputs, and one to consume child task outputs. This has the advantage that we do not force a worker to sit idle waiting for child tasks to complete. If the number of resources was to small, such a wait could cause the code to deadlock.
function results = spawningTasks(numTotalTasks)
% Use a PollableDataQueue to receive the result since it may be created by
% one of two possible parfevals.
resultsQueue = parallel.pool.PollableDataQueue;
for idx = 1:numTotalTasks
f = parfeval(@outerFunctionCreateInputs, 1, resultsQueue);
afterEach(f, @(f) spawnChildTasks(resultsQueue, f), 0, "PassFuture", true);
end
for idx = 1:numTotalTasks
results{idx} = resultsQueue.poll(Inf);
disp(results{idx});
end
end
function childInputs = outerFunctionCreateInputs(resultsQueue)
% This function runs on a worker to create inputs for child tasks, if any.
fprintf('Outer function 1 started.\n');
% Imagine this spawns between 0 and 3 subtasks
numChildTasks = randi(4) - 1;
if numChildTasks == 0
% No sub tasks required. Create result directly here.
resultsQueue.send(struct('Result', 0, 'NumChildTasks', 0));
end
% Generate inputs for child tasks
childInputs = arrayfun(@(~) randi(10), 1:numChildTasks, 'UniformOutput', false);
fprintf('Outer function 1 finished.\n');
end
function childOutput = innerFunction(childInput)
% This function runs on a worker and is a child task.
fprintf('Inner function is running.\n');
childOutput = childInput^2; % Square the input
fprintf('Inner function computed output: %d\n', childOutput);
end
function outerFunctionConsumeOutputs(resultsQueue, childOutputs)
% This function runs on a worker to combine outputs from child tasks.
fprintf('Outer function 2 started.\n');
resultsQueue.send(struct('Result', sum(childOutputs), 'NumChildTasks', numel(childOutputs)));
end
function spawnChildTasks(resultsQueue, f)
% This function runs on the user's MATLAB to spawn child tasks if required
try
childInputs = fetchOutputs(f);
if isempty(childInputs)
% No sub tasks required.
return;
end
% Submit a task for each child input
for idx = 1:numel(childInputs)
fChild(idx) = parfeval(@innerFunction, 1, childInputs{idx});
end
% On completing of all child tasks, submit the reduction operation to
% combine their results
afterAll(fChild, @(fChild) spawnConsumeTask(resultsQueue, fChild), 0, "PassFuture", true);
catch E
% Notify of any error
resultsQueue.send(E);
end
end
function spawnConsumeTask(resultsQueue, fChild)
% This function runs on the user MATLAB to spawn a task which combines the
% child results
try
childOutputs = fetchOutputs(fChild);
parfeval(@outerFunctionConsumeOutputs, 0, resultsQueue, childOutputs);
catch E
% Notify of any error
resultsQueue.send(E);
end
end
  1 个评论
Rub Ron
Rub Ron 2024-10-9
Hi, could you your solution test? What output should it provide? I have tested and it is taking so long to perform such a simple calculation. Also, no offense, but some part of your answer seems like a AI generated, and sometimes AI provides solutions that does not work.

请先登录,再进行评论。

类别

Help CenterFile Exchange 中查找有关 Parallel Computing Fundamentals 的更多信息

Community Treasure Hunt

Find the treasures in MATLAB Central and discover how the community can help you!

Start Hunting!

Translated by