fetchNext
Retrieve next available unread outputs from a reinforcement learning environment simulations running on workers
Since R2022a
Description
[
blocks the command prompt and waits for an unread element of idx
,out
] = fetchNext(F
)F
(which
corresponds to a simulation scheduled on a worker) to reach a finished state. It then
returns the index idx
of the simulation that finished and the
corresponding output out
, which is a scalar structure consistent with
the output of runEpisode
.
Examples
Retrieve Next Unread Output from Environment Simulation
This example shows how to use fetchNext
to retrieve the next available unread result from reinforcement learning environment simulations running on workers.
Load a predefined environment and a suitable agent. For this example use both the environment and agent described in Train MBPO Agent to Balance Continuous Cart-Pole System.
env = rlPredefinedEnv("CartPole-Continuous"); load("MATLABCartpoleMBPO.mat","agent");
Start a parallel pool and set up the environment so that it simulates on workers.
pp = parpool(2);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 2 workers.
setup(env,UseParallel=true);
To record the completion time of each simulation, start a timer.
tic
Initialize two vectors to store the average reward values and the completion time for each simulation, respectively.
avr = zeros(6,1); sct = zeros(6,1);
Schedule six simulations to run on the available workers. At the beginning of the simulation, the reset function of the environment sets the initial angle of the pole randomly around zero (the upward position), thereby ensuring that each simulation is unique.
for i=1:6 ftr(i) = runEpisode(env,agent,CleanupPostSim=false); end
Use fetchNext
in a loop to retrieve results.
while ~all([ftr.Read] == true) % Wait until an output is available then retrieve it [i,out] = fetchNext(ftr); % Store the simulation completion time sct(i) = toc; % Store the average reward value avr(i) = mean([out.AgentData.Experiences.Reward]); end
Plot average reward and timing for each simulation.
figure subplot(2,1,1); plot(sct) title('Simulation completion times (seconds)'); subplot(2,1,2) plot(avr) title('Average reward value'); xlabel('Simulation number');
As expected, simulations run in parallel (and therefore terminate at about the same time) in groups of two.
Clear the array of Future
objects, the environment, and delete the parallel pool (this is the reverse order in which they were created).
clear ftr clear env delete(pp)
Defer Outputs of Simulations Running on Workers
This example shows how to use Future
objects and their methods fetchNext
, fetchOutput
, cancel
, and wait
to defer output retrieval for environment simulations running on workers, monitor the status of ongoing simulations, fetch outputs of completed simulations, cancel ongoing simulations, or wait for ongoing simulations to complete.
Load a predefined environment and a suitable agent. For this example use both the environment and agent described in Train AC Agent to Balance Discrete Cart-Pole System.
env = rlPredefinedEnv("CartPole-Discrete"); load("MATLABCartpoleAC.mat","agent")
Start a parallel pool and set up the environment so that it simulates on workers.
pp = parpool(2);
Starting parallel pool (parpool) using the 'Processes' profile ... Connected to parallel pool with 2 workers.
setup(env,UseParallel=true);
To display the simulation completion times, start a timer.
tic
Schedule six simulation to run on the available workers. At the beginning of the simulation, the reset function of the cart-pole environment sets the initial angle of the pole to a random position in the neighborhood of zero (the upward position). This randomization ensures that each simulation is different.
for i=1:6 ftr(i) = runEpisode(env,agent,CleanupPostSim=false); end
Each element of the Future
array ftr
represents a scheduled simulation.
ftr
ftr=1×6 object
1×6 Future array with properties:
Read
State
Diary
ID
Display the state of each simulation.
ftr.State
ans = 'running'
ans = 'running'
ans = 'queued'
ans = 'queued'
ans = 'queued'
ans = 'queued'
Two simulations are ongoing while the others are queued.
Use fetchNext
with a timeout of 0.1
seconds to retrieve results for simulations that complete within that time (if any).
[idx,out] = fetchNext(ftr,0.1)
idx = [] out = []
Both the outputs are empty, which means that none of the four simulation has completed yet.
Display how many output results have been already retrieved.
ftr.Read
ans = logical
0
ans = logical
0
ans = logical
0
ans = logical
0
ans = logical
0
ans = logical
0
Use fetchNext
without any timeout to wait until an unretrieved simulation output becomes available and then return the results.
[idx,out] = fetchNext(ftr)
idx = 2
out = struct with fields:
SimulationInfo: [1×1 struct]
AgentData: [1×1 struct]
Display the state of the simulations.
ftr.State
ans = 'finished'
ans = 'finished'
ans = 'running'
ans = 'running'
ans = 'queued'
ans = 'queued'
As expected, the first two simulations, which were running in parallel on the two workers, are finished, while the next two, which were previously queued, are now running, and the final two are still queued.
Display the time taken for the first two simulations to complete.
toc
Elapsed time is 10.451231 seconds.
Note that once the results from a simulation has been already retrieved, any attempt to use fetchNext
to retrieve it again, such as in fetchNext(ftr(2))
, will result in an error. To retrieve the results from a Future
object that has already been read, you can use fetchOuptuts
, such as in fetchOutputs(ftr(2))
.
Retrieve the next available result, and display the time elapsed since the simulations started.
[idx,out] = fetchNext(ftr)
idx = 1
out = struct with fields:
SimulationInfo: [1×1 struct]
AgentData: [1×1 struct]
toc
Elapsed time is 11.945070 seconds.
As expected, fetchNext
promptly returns the results from the second simulation, since it was already available.
Display how many output results have been already retrieved.
ftr.Read
ans = logical
1
ans = logical
1
ans = logical
0
ans = logical
0
ans = logical
0
ans = logical
0
Cancel the last simulation.
cancel(ftr(6))
Wait for the fourth simulation to complete. The wait
function blocks the command prompt until the fourth simulation is completed.
wait(ftr(4))
Display the elapsed time since the simulations started.
toc
Elapsed time is 12.414076 seconds.
Display the state of the simulations.
ftr.State
ans = 'finished'
ans = 'finished'
ans = 'finished'
ans = 'finished'
ans = 'running'
ans = 'finished'
The status of the last element of the array, for which the simulation has been canceled, is classified as 'finished'
.
Since any attempt to retrieve results from a simulation that has been canceled will result in an error, remove the canceled object from the array.
ftr(6)=[]
ftr=1×5 object
1×5 Future array with properties:
Read
State
Diary
ID
Use fetchOutputs
to wait until all remaining simulations are completed and then retrieve all outputs.
outs = fetchOutputs(ftr)
outs=5×1 struct array with fields:
SimulationInfo
AgentData
Display the elapsed time.
toc
Elapsed time is 16.265069 seconds.
Plot the action and observations from the fifth simulation.
figure subplot(2,1,1); plot(outs(5).AgentData.Time(2:end), ... cell2mat([outs(5).AgentData.Experiences.Action])) title('Simulation #5: action'); xlabel('time'); subplot(2,1,2) plot(outs(5).AgentData.Time(2:end), ... cell2mat([outs(5).AgentData.Experiences.Observation])) title('Simulation #5: observations') xlabel('time');
Clear the array of Future
objects, the environment, and delete the parallel pool (this is the reverse order in which they were created).
clear ftr clear env delete(pp)
Input Arguments
F
— Future simulation outputs
Future
object | array of Future
objects
Future simulation outputs, specified as a Future
objects or as an
array of Future
objects. To create an element of
F
, set the UseParallel
property of a
reinforcement learning environment to true
, and then use runEpisode
to
simulate an agent or a policy within this environment. Assign the element of
F
to the output of runEpisode
.
Note
fetchNext
can only retrieve results from elements of
F
that have not been read before. Any attempt to retrieve
results from an element that has its Read
property set to
true
will result in an error.
timeout
— Maximum number of seconds to wait for a result
positive scalar
Maximum number of seconds to wait for a result to become available, specified as a positive scalar.
Example: 5
Output Arguments
idx
— Index
positive integer scalar
Index of the finished simulation that has its output out
returned by fetchNext
, returned as a positive integer
scalar.
out
— Simulation output
structure
Output returned from the finished simulation indicated by idx
,
returned as a structure with the fields AgentData
and
SimulationInfo
, as described in the outputs section of runEpisode
.
Version History
Introduced in R2022a
See Also
Objects
Future
|Future
|Simulink.Simulation.Future
(Simulink)
Functions
fetchOutputs
|cancel
|wait
|runEpisode
|setup
|cleanup
|reset
MATLAB Command
You clicked a link that corresponds to this MATLAB command:
Run the command by entering it in the MATLAB Command Window. Web browsers do not support MATLAB commands.
Select a Web Site
Choose a web site to get translated content where available and see local events and offers. Based on your location, we recommend that you select: .
You can also select a web site from the following list
How to Get Best Site Performance
Select the China site (in Chinese or English) for best site performance. Other MathWorks country sites are not optimized for visits from your location.
Americas
- América Latina (Español)
- Canada (English)
- United States (English)
Europe
- Belgium (English)
- Denmark (English)
- Deutschland (Deutsch)
- España (Español)
- Finland (English)
- France (Français)
- Ireland (English)
- Italia (Italiano)
- Luxembourg (English)
- Netherlands (English)
- Norway (English)
- Österreich (Deutsch)
- Portugal (English)
- Sweden (English)
- Switzerland
- United Kingdom (English)
Asia Pacific
- Australia (English)
- India (English)
- New Zealand (English)
- 中国
- 日本Japanese (日本語)
- 한국Korean (한국어)