Skip to content

PPO

robot_nav.models.PPO.PPO

ActorCritic

Bases: Module

Actor-Critic neural network model for PPO.

Attributes:

Name Type Description
actor Sequential

Policy network (actor) to output action mean.

critic Sequential

Value network (critic) to predict state values.

action_var Tensor

Diagonal covariance matrix for action distribution.

device str

Device used for computation ('cpu' or 'cuda').

max_action float

Clipping range for action values.

Source code in robot_nav/models/PPO/PPO.py
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
class ActorCritic(nn.Module):
    """
    Actor-Critic neural network model for PPO.

    Attributes:
        actor (nn.Sequential): Policy network (actor) to output action mean.
        critic (nn.Sequential): Value network (critic) to predict state values.
        action_var (Tensor): Diagonal covariance matrix for action distribution.
        device (str): Device used for computation ('cpu' or 'cuda').
        max_action (float): Clipping range for action values.
    """

    def __init__(self, state_dim, action_dim, action_std_init, max_action, device):
        """
        Initialize the Actor and Critic networks.

        Args:
            state_dim (int): Dimension of the input state.
            action_dim (int): Dimension of the action space.
            action_std_init (float): Initial standard deviation of the action distribution.
            max_action (float): Maximum value allowed for an action (clipping range).
            device (str): Device to run the model on.
        """
        super(ActorCritic, self).__init__()

        self.device = device
        self.max_action = max_action

        self.action_dim = action_dim
        self.action_var = torch.full(
            (action_dim,), action_std_init * action_std_init
        ).to(self.device)
        # actor
        self.actor = nn.Sequential(
            nn.Linear(state_dim, 400),
            nn.Tanh(),
            nn.Linear(400, 300),
            nn.Tanh(),
            nn.Linear(300, action_dim),
            nn.Tanh(),
        )
        # critic
        self.critic = nn.Sequential(
            nn.Linear(state_dim, 400),
            nn.Tanh(),
            nn.Linear(400, 300),
            nn.Tanh(),
            nn.Linear(300, 1),
        )

    def set_action_std(self, new_action_std):
        """
        Set a new standard deviation for the action distribution.

        Args:
            new_action_std (float): New standard deviation.
        """
        self.action_var = torch.full(
            (self.action_dim,), new_action_std * new_action_std
        ).to(self.device)

    def forward(self):
        """
        Forward method is not implemented, as it's unused directly.

        Raises:
            NotImplementedError: Always raised when called.
        """
        raise NotImplementedError

    def act(self, state, sample):
        """
        Compute an action, its log probability, and the state value.

        Args:
            state (Tensor): Input state tensor.
            sample (bool): Whether to sample from the action distribution or use mean.

        Returns:
            Tuple[Tensor, Tensor, Tensor]: Sampled (or mean) action, log probability, and state value.
        """
        action_mean = self.actor(state)
        cov_mat = torch.diag(self.action_var).unsqueeze(dim=0)
        dist = MultivariateNormal(action_mean, cov_mat)

        if sample:
            action = torch.clip(
                dist.sample(), min=-self.max_action, max=self.max_action
            )
        else:
            action = dist.mean
        action_logprob = dist.log_prob(action)
        state_val = self.critic(state)

        return action.detach(), action_logprob.detach(), state_val.detach()

    def evaluate(self, state, action):
        """
        Evaluate action log probabilities, entropy, and state values for given states and actions.

        Args:
            state (Tensor): Batch of states.
            action (Tensor): Batch of actions.

        Returns:
            Tuple[Tensor, Tensor, Tensor]: Action log probabilities, state values, and distribution entropy.
        """
        action_mean = self.actor(state)

        action_var = self.action_var.expand_as(action_mean)
        cov_mat = torch.diag_embed(action_var).to(self.device)
        dist = MultivariateNormal(action_mean, cov_mat)

        # For Single Action Environments.
        if self.action_dim == 1:
            action = action.reshape(-1, self.action_dim)

        action_logprobs = dist.log_prob(action)
        dist_entropy = dist.entropy()
        state_values = self.critic(state)

        return action_logprobs, state_values, dist_entropy

__init__(state_dim, action_dim, action_std_init, max_action, device)

Initialize the Actor and Critic networks.

Parameters:

Name Type Description Default
state_dim int

Dimension of the input state.

required
action_dim int

Dimension of the action space.

required
action_std_init float

Initial standard deviation of the action distribution.

required
max_action float

Maximum value allowed for an action (clipping range).

required
device str

Device to run the model on.

required
Source code in robot_nav/models/PPO/PPO.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
def __init__(self, state_dim, action_dim, action_std_init, max_action, device):
    """
    Initialize the Actor and Critic networks.

    Args:
        state_dim (int): Dimension of the input state.
        action_dim (int): Dimension of the action space.
        action_std_init (float): Initial standard deviation of the action distribution.
        max_action (float): Maximum value allowed for an action (clipping range).
        device (str): Device to run the model on.
    """
    super(ActorCritic, self).__init__()

    self.device = device
    self.max_action = max_action

    self.action_dim = action_dim
    self.action_var = torch.full(
        (action_dim,), action_std_init * action_std_init
    ).to(self.device)
    # actor
    self.actor = nn.Sequential(
        nn.Linear(state_dim, 400),
        nn.Tanh(),
        nn.Linear(400, 300),
        nn.Tanh(),
        nn.Linear(300, action_dim),
        nn.Tanh(),
    )
    # critic
    self.critic = nn.Sequential(
        nn.Linear(state_dim, 400),
        nn.Tanh(),
        nn.Linear(400, 300),
        nn.Tanh(),
        nn.Linear(300, 1),
    )

act(state, sample)

Compute an action, its log probability, and the state value.

Parameters:

Name Type Description Default
state Tensor

Input state tensor.

required
sample bool

Whether to sample from the action distribution or use mean.

required

Returns:

Type Description

Tuple[Tensor, Tensor, Tensor]: Sampled (or mean) action, log probability, and state value.

Source code in robot_nav/models/PPO/PPO.py
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
def act(self, state, sample):
    """
    Compute an action, its log probability, and the state value.

    Args:
        state (Tensor): Input state tensor.
        sample (bool): Whether to sample from the action distribution or use mean.

    Returns:
        Tuple[Tensor, Tensor, Tensor]: Sampled (or mean) action, log probability, and state value.
    """
    action_mean = self.actor(state)
    cov_mat = torch.diag(self.action_var).unsqueeze(dim=0)
    dist = MultivariateNormal(action_mean, cov_mat)

    if sample:
        action = torch.clip(
            dist.sample(), min=-self.max_action, max=self.max_action
        )
    else:
        action = dist.mean
    action_logprob = dist.log_prob(action)
    state_val = self.critic(state)

    return action.detach(), action_logprob.detach(), state_val.detach()

evaluate(state, action)

Evaluate action log probabilities, entropy, and state values for given states and actions.

Parameters:

Name Type Description Default
state Tensor

Batch of states.

required
action Tensor

Batch of actions.

required

Returns:

Type Description

Tuple[Tensor, Tensor, Tensor]: Action log probabilities, state values, and distribution entropy.

Source code in robot_nav/models/PPO/PPO.py
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
def evaluate(self, state, action):
    """
    Evaluate action log probabilities, entropy, and state values for given states and actions.

    Args:
        state (Tensor): Batch of states.
        action (Tensor): Batch of actions.

    Returns:
        Tuple[Tensor, Tensor, Tensor]: Action log probabilities, state values, and distribution entropy.
    """
    action_mean = self.actor(state)

    action_var = self.action_var.expand_as(action_mean)
    cov_mat = torch.diag_embed(action_var).to(self.device)
    dist = MultivariateNormal(action_mean, cov_mat)

    # For Single Action Environments.
    if self.action_dim == 1:
        action = action.reshape(-1, self.action_dim)

    action_logprobs = dist.log_prob(action)
    dist_entropy = dist.entropy()
    state_values = self.critic(state)

    return action_logprobs, state_values, dist_entropy

forward()

Forward method is not implemented, as it's unused directly.

Raises:

Type Description
NotImplementedError

Always raised when called.

Source code in robot_nav/models/PPO/PPO.py
122
123
124
125
126
127
128
129
def forward(self):
    """
    Forward method is not implemented, as it's unused directly.

    Raises:
        NotImplementedError: Always raised when called.
    """
    raise NotImplementedError

set_action_std(new_action_std)

Set a new standard deviation for the action distribution.

Parameters:

Name Type Description Default
new_action_std float

New standard deviation.

required
Source code in robot_nav/models/PPO/PPO.py
111
112
113
114
115
116
117
118
119
120
def set_action_std(self, new_action_std):
    """
    Set a new standard deviation for the action distribution.

    Args:
        new_action_std (float): New standard deviation.
    """
    self.action_var = torch.full(
        (self.action_dim,), new_action_std * new_action_std
    ).to(self.device)

PPO

Proximal Policy Optimization (PPO) implementation for continuous control tasks.

Attributes:

Name Type Description
max_action float

Maximum action value.

action_std float

Standard deviation of the action distribution.

action_std_decay_rate float

Rate at which to decay action standard deviation.

min_action_std float

Minimum allowed action standard deviation.

state_dim int

Dimension of the state space.

gamma float

Discount factor for future rewards.

eps_clip float

Clipping range for policy updates.

device str

Device for model computation ('cpu' or 'cuda').

save_every int

Interval (in iterations) for saving model checkpoints.

model_name str

Name used when saving/loading model.

save_directory Path

Directory to save model checkpoints.

iter_count int

Number of training iterations completed.

buffer RolloutBuffer

Buffer to store trajectories.

policy ActorCritic

Current actor-critic network.

optimizer Optimizer

Optimizer for actor and critic.

policy_old ActorCritic

Old actor-critic network for computing PPO updates.

MseLoss Module

Mean squared error loss function.

writer SummaryWriter

TensorBoard summary writer.

Source code in robot_nav/models/PPO/PPO.py
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
class PPO:
    """
    Proximal Policy Optimization (PPO) implementation for continuous control tasks.

    Attributes:
        max_action (float): Maximum action value.
        action_std (float): Standard deviation of the action distribution.
        action_std_decay_rate (float): Rate at which to decay action standard deviation.
        min_action_std (float): Minimum allowed action standard deviation.
        state_dim (int): Dimension of the state space.
        gamma (float): Discount factor for future rewards.
        eps_clip (float): Clipping range for policy updates.
        device (str): Device for model computation ('cpu' or 'cuda').
        save_every (int): Interval (in iterations) for saving model checkpoints.
        model_name (str): Name used when saving/loading model.
        save_directory (Path): Directory to save model checkpoints.
        iter_count (int): Number of training iterations completed.
        buffer (RolloutBuffer): Buffer to store trajectories.
        policy (ActorCritic): Current actor-critic network.
        optimizer (torch.optim.Optimizer): Optimizer for actor and critic.
        policy_old (ActorCritic): Old actor-critic network for computing PPO updates.
        MseLoss (nn.Module): Mean squared error loss function.
        writer (SummaryWriter): TensorBoard summary writer.
    """

    def __init__(
        self,
        state_dim,
        action_dim,
        max_action,
        lr_actor=0.0003,
        lr_critic=0.001,
        gamma=0.99,
        eps_clip=0.2,
        action_std_init=0.6,
        action_std_decay_rate=0.015,
        min_action_std=0.1,
        device="cpu",
        save_every=10,
        load_model=False,
        save_directory=Path("robot_nav/models/PPO/checkpoint"),
        model_name="PPO",
        load_directory=Path("robot_nav/models/PPO/checkpoint"),
    ):
        self.max_action = max_action
        self.action_std = action_std_init
        self.action_std_decay_rate = action_std_decay_rate
        self.min_action_std = min_action_std
        self.state_dim = state_dim
        self.gamma = gamma
        self.eps_clip = eps_clip
        self.device = device
        self.save_every = save_every
        self.model_name = model_name
        self.save_directory = save_directory
        self.iter_count = 0

        self.buffer = RolloutBuffer()

        self.policy = ActorCritic(
            state_dim, action_dim, action_std_init, self.max_action, self.device
        ).to(device)
        self.optimizer = torch.optim.Adam(
            [
                {"params": self.policy.actor.parameters(), "lr": lr_actor},
                {"params": self.policy.critic.parameters(), "lr": lr_critic},
            ]
        )

        self.policy_old = ActorCritic(
            state_dim, action_dim, action_std_init, self.max_action, self.device
        ).to(device)
        self.policy_old.load_state_dict(self.policy.state_dict())
        if load_model:
            self.load(filename=model_name, directory=load_directory)

        self.MseLoss = nn.MSELoss()
        self.writer = SummaryWriter(comment=model_name)

    def set_action_std(self, new_action_std):
        """
        Set a new standard deviation for the action distribution.

        Args:
            new_action_std (float): New standard deviation value.
        """
        self.action_std = new_action_std
        self.policy.set_action_std(new_action_std)
        self.policy_old.set_action_std(new_action_std)

    def decay_action_std(self, action_std_decay_rate, min_action_std):
        """
        Decay the action standard deviation by a fixed rate, down to a minimum threshold.

        Args:
            action_std_decay_rate (float): Amount to reduce standard deviation by.
            min_action_std (float): Minimum value for standard deviation.
        """
        print(
            "--------------------------------------------------------------------------------------------"
        )
        self.action_std = self.action_std - action_std_decay_rate
        self.action_std = round(self.action_std, 4)
        if self.action_std <= min_action_std:
            self.action_std = min_action_std
            print(
                "setting actor output action_std to min_action_std : ", self.action_std
            )
        else:
            print("setting actor output action_std to : ", self.action_std)
        self.set_action_std(self.action_std)
        print(
            "--------------------------------------------------------------------------------------------"
        )

    def get_action(self, state, add_noise):
        """
        Sample an action using the current policy (optionally with noise), and store in buffer if noise is added.

        Args:
            state (array_like): Input state for the policy.
            add_noise (bool): Whether to sample from the distribution (True) or use the deterministic mean (False).

        Returns:
            np.ndarray: Sampled action.
        """

        with torch.no_grad():
            state = torch.FloatTensor(state).to(self.device)
            action, action_logprob, state_val = self.policy_old.act(state, add_noise)

        if add_noise:
            # self.buffer.states.append(state)
            self.buffer.actions.append(action)
            self.buffer.logprobs.append(action_logprob)
            self.buffer.state_values.append(state_val)

        return action.detach().cpu().numpy().flatten()

    def train(self, replay_buffer, iterations, batch_size):
        """
        Train the policy and value function using PPO loss based on the stored rollout buffer.

        Args:
            replay_buffer: Placeholder for compatibility (not used).
            iterations (int): Number of epochs to optimize the policy per update.
            batch_size (int): Batch size (not used; training uses the whole buffer).
        """
        # Monte Carlo estimate of returns
        rewards = []
        discounted_reward = 0
        for reward, is_terminal in zip(
            reversed(self.buffer.rewards), reversed(self.buffer.is_terminals)
        ):
            if is_terminal:
                discounted_reward = 0
            discounted_reward = reward + (self.gamma * discounted_reward)
            rewards.insert(0, discounted_reward)

        # Normalizing the rewards
        rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
        rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-7)

        # convert list to tensor
        assert len(self.buffer.actions) == len(self.buffer.states)

        states = [torch.tensor(st, dtype=torch.float32) for st in self.buffer.states]
        old_states = torch.squeeze(torch.stack(states, dim=0)).detach().to(self.device)
        old_actions = (
            torch.squeeze(torch.stack(self.buffer.actions, dim=0))
            .detach()
            .to(self.device)
        )
        old_logprobs = (
            torch.squeeze(torch.stack(self.buffer.logprobs, dim=0))
            .detach()
            .to(self.device)
        )
        old_state_values = (
            torch.squeeze(torch.stack(self.buffer.state_values, dim=0))
            .detach()
            .to(self.device)
        )

        # calculate advantages
        advantages = rewards.detach() - old_state_values.detach()

        av_state_values = 0
        max_state_value = -inf
        av_loss = 0
        # Optimize policy for K epochs
        for _ in range(iterations):
            # Evaluating old actions and values
            logprobs, state_values, dist_entropy = self.policy.evaluate(
                old_states, old_actions
            )

            # match state_values tensor dimensions with rewards tensor
            state_values = torch.squeeze(state_values)
            av_state_values += torch.mean(state_values)
            max_state_value = max(max_state_value, max(state_values))
            # Finding the ratio (pi_theta / pi_theta__old)
            ratios = torch.exp(logprobs - old_logprobs.detach())

            # Finding Surrogate Loss
            surr1 = ratios * advantages
            surr2 = (
                torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
            )

            # final loss of clipped objective PPO
            loss = (
                -torch.min(surr1, surr2)
                + 0.5 * self.MseLoss(state_values, rewards)
                - 0.01 * dist_entropy
            )

            # take gradient step
            self.optimizer.zero_grad()
            loss.mean().backward()
            self.optimizer.step()
            av_loss += loss.mean()

        # Copy new weights into old policy
        self.policy_old.load_state_dict(self.policy.state_dict())
        # clear buffer
        self.buffer.clear()
        self.decay_action_std(self.action_std_decay_rate, self.min_action_std)
        self.iter_count += 1
        # Write new values for tensorboard
        self.writer.add_scalar("train/loss", av_loss / iterations, self.iter_count)
        self.writer.add_scalar(
            "train/avg_value", av_state_values / iterations, self.iter_count
        )
        self.writer.add_scalar("train/max_value", max_state_value, self.iter_count)
        if self.save_every > 0 and self.iter_count % self.save_every == 0:
            self.save(filename=self.model_name, directory=self.save_directory)

    def prepare_state(self, latest_scan, distance, cos, sin, collision, goal, action):
        """
        Convert raw sensor and navigation data into a normalized state vector for the policy.

        Args:
            latest_scan (list[float]): LIDAR scan data.
            distance (float): Distance to the goal.
            cos (float): Cosine of angle to the goal.
            sin (float): Sine of angle to the goal.
            collision (bool): Whether the robot has collided.
            goal (bool): Whether the robot has reached the goal.
            action (tuple[float, float]): Last action taken (linear and angular velocities).

        Returns:
            tuple[list[float], int]: Processed state vector and terminal flag (1 if terminal, else 0).
        """
        latest_scan = np.array(latest_scan)

        inf_mask = np.isinf(latest_scan)
        latest_scan[inf_mask] = 7.0

        max_bins = self.state_dim - 5
        bin_size = int(np.ceil(len(latest_scan) / max_bins))

        # Initialize the list to store the minimum values of each bin
        min_values = []

        # Loop through the data and create bins
        for i in range(0, len(latest_scan), bin_size):
            # Get the current bin
            bin = latest_scan[i : i + min(bin_size, len(latest_scan) - i)]
            # Find the minimum value in the current bin and append it to the min_values list
            min_values.append(min(bin) / 7)

        # Normalize to [0, 1] range
        distance /= 10
        lin_vel = action[0] * 2
        ang_vel = (action[1] + 1) / 2
        state = min_values + [distance, cos, sin] + [lin_vel, ang_vel]

        assert len(state) == self.state_dim
        terminal = 1 if collision or goal else 0

        return state, terminal

    def save(self, filename, directory):
        """
        Save the current policy model to the specified directory.

        Args:
            filename (str): Base name of the model file.
            directory (Path): Directory to save the model to.
        """
        Path(directory).mkdir(parents=True, exist_ok=True)
        torch.save(
            self.policy_old.state_dict(), "%s/%s_policy.pth" % (directory, filename)
        )

    def load(self, filename, directory):
        """
        Load the policy model from a saved checkpoint.

        Args:
            filename (str): Base name of the model file.
            directory (Path): Directory to load the model from.
        """
        self.policy_old.load_state_dict(
            torch.load(
                "%s/%s_policy.pth" % (directory, filename),
                map_location=lambda storage, loc: storage,
            )
        )
        self.policy.load_state_dict(
            torch.load(
                "%s/%s_policy.pth" % (directory, filename),
                map_location=lambda storage, loc: storage,
            )
        )
        print(f"Loaded weights from: {directory}")

decay_action_std(action_std_decay_rate, min_action_std)

Decay the action standard deviation by a fixed rate, down to a minimum threshold.

Parameters:

Name Type Description Default
action_std_decay_rate float

Amount to reduce standard deviation by.

required
min_action_std float

Minimum value for standard deviation.

required
Source code in robot_nav/models/PPO/PPO.py
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
def decay_action_std(self, action_std_decay_rate, min_action_std):
    """
    Decay the action standard deviation by a fixed rate, down to a minimum threshold.

    Args:
        action_std_decay_rate (float): Amount to reduce standard deviation by.
        min_action_std (float): Minimum value for standard deviation.
    """
    print(
        "--------------------------------------------------------------------------------------------"
    )
    self.action_std = self.action_std - action_std_decay_rate
    self.action_std = round(self.action_std, 4)
    if self.action_std <= min_action_std:
        self.action_std = min_action_std
        print(
            "setting actor output action_std to min_action_std : ", self.action_std
        )
    else:
        print("setting actor output action_std to : ", self.action_std)
    self.set_action_std(self.action_std)
    print(
        "--------------------------------------------------------------------------------------------"
    )

get_action(state, add_noise)

Sample an action using the current policy (optionally with noise), and store in buffer if noise is added.

Parameters:

Name Type Description Default
state array_like

Input state for the policy.

required
add_noise bool

Whether to sample from the distribution (True) or use the deterministic mean (False).

required

Returns:

Type Description

np.ndarray: Sampled action.

Source code in robot_nav/models/PPO/PPO.py
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
def get_action(self, state, add_noise):
    """
    Sample an action using the current policy (optionally with noise), and store in buffer if noise is added.

    Args:
        state (array_like): Input state for the policy.
        add_noise (bool): Whether to sample from the distribution (True) or use the deterministic mean (False).

    Returns:
        np.ndarray: Sampled action.
    """

    with torch.no_grad():
        state = torch.FloatTensor(state).to(self.device)
        action, action_logprob, state_val = self.policy_old.act(state, add_noise)

    if add_noise:
        # self.buffer.states.append(state)
        self.buffer.actions.append(action)
        self.buffer.logprobs.append(action_logprob)
        self.buffer.state_values.append(state_val)

    return action.detach().cpu().numpy().flatten()

load(filename, directory)

Load the policy model from a saved checkpoint.

Parameters:

Name Type Description Default
filename str

Base name of the model file.

required
directory Path

Directory to load the model from.

required
Source code in robot_nav/models/PPO/PPO.py
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
def load(self, filename, directory):
    """
    Load the policy model from a saved checkpoint.

    Args:
        filename (str): Base name of the model file.
        directory (Path): Directory to load the model from.
    """
    self.policy_old.load_state_dict(
        torch.load(
            "%s/%s_policy.pth" % (directory, filename),
            map_location=lambda storage, loc: storage,
        )
    )
    self.policy.load_state_dict(
        torch.load(
            "%s/%s_policy.pth" % (directory, filename),
            map_location=lambda storage, loc: storage,
        )
    )
    print(f"Loaded weights from: {directory}")

prepare_state(latest_scan, distance, cos, sin, collision, goal, action)

Convert raw sensor and navigation data into a normalized state vector for the policy.

Parameters:

Name Type Description Default
latest_scan list[float]

LIDAR scan data.

required
distance float

Distance to the goal.

required
cos float

Cosine of angle to the goal.

required
sin float

Sine of angle to the goal.

required
collision bool

Whether the robot has collided.

required
goal bool

Whether the robot has reached the goal.

required
action tuple[float, float]

Last action taken (linear and angular velocities).

required

Returns:

Type Description

tuple[list[float], int]: Processed state vector and terminal flag (1 if terminal, else 0).

Source code in robot_nav/models/PPO/PPO.py
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
def prepare_state(self, latest_scan, distance, cos, sin, collision, goal, action):
    """
    Convert raw sensor and navigation data into a normalized state vector for the policy.

    Args:
        latest_scan (list[float]): LIDAR scan data.
        distance (float): Distance to the goal.
        cos (float): Cosine of angle to the goal.
        sin (float): Sine of angle to the goal.
        collision (bool): Whether the robot has collided.
        goal (bool): Whether the robot has reached the goal.
        action (tuple[float, float]): Last action taken (linear and angular velocities).

    Returns:
        tuple[list[float], int]: Processed state vector and terminal flag (1 if terminal, else 0).
    """
    latest_scan = np.array(latest_scan)

    inf_mask = np.isinf(latest_scan)
    latest_scan[inf_mask] = 7.0

    max_bins = self.state_dim - 5
    bin_size = int(np.ceil(len(latest_scan) / max_bins))

    # Initialize the list to store the minimum values of each bin
    min_values = []

    # Loop through the data and create bins
    for i in range(0, len(latest_scan), bin_size):
        # Get the current bin
        bin = latest_scan[i : i + min(bin_size, len(latest_scan) - i)]
        # Find the minimum value in the current bin and append it to the min_values list
        min_values.append(min(bin) / 7)

    # Normalize to [0, 1] range
    distance /= 10
    lin_vel = action[0] * 2
    ang_vel = (action[1] + 1) / 2
    state = min_values + [distance, cos, sin] + [lin_vel, ang_vel]

    assert len(state) == self.state_dim
    terminal = 1 if collision or goal else 0

    return state, terminal

save(filename, directory)

Save the current policy model to the specified directory.

Parameters:

Name Type Description Default
filename str

Base name of the model file.

required
directory Path

Directory to save the model to.

required
Source code in robot_nav/models/PPO/PPO.py
468
469
470
471
472
473
474
475
476
477
478
479
def save(self, filename, directory):
    """
    Save the current policy model to the specified directory.

    Args:
        filename (str): Base name of the model file.
        directory (Path): Directory to save the model to.
    """
    Path(directory).mkdir(parents=True, exist_ok=True)
    torch.save(
        self.policy_old.state_dict(), "%s/%s_policy.pth" % (directory, filename)
    )

set_action_std(new_action_std)

Set a new standard deviation for the action distribution.

Parameters:

Name Type Description Default
new_action_std float

New standard deviation value.

required
Source code in robot_nav/models/PPO/PPO.py
264
265
266
267
268
269
270
271
272
273
def set_action_std(self, new_action_std):
    """
    Set a new standard deviation for the action distribution.

    Args:
        new_action_std (float): New standard deviation value.
    """
    self.action_std = new_action_std
    self.policy.set_action_std(new_action_std)
    self.policy_old.set_action_std(new_action_std)

train(replay_buffer, iterations, batch_size)

Train the policy and value function using PPO loss based on the stored rollout buffer.

Parameters:

Name Type Description Default
replay_buffer

Placeholder for compatibility (not used).

required
iterations int

Number of epochs to optimize the policy per update.

required
batch_size int

Batch size (not used; training uses the whole buffer).

required
Source code in robot_nav/models/PPO/PPO.py
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
def train(self, replay_buffer, iterations, batch_size):
    """
    Train the policy and value function using PPO loss based on the stored rollout buffer.

    Args:
        replay_buffer: Placeholder for compatibility (not used).
        iterations (int): Number of epochs to optimize the policy per update.
        batch_size (int): Batch size (not used; training uses the whole buffer).
    """
    # Monte Carlo estimate of returns
    rewards = []
    discounted_reward = 0
    for reward, is_terminal in zip(
        reversed(self.buffer.rewards), reversed(self.buffer.is_terminals)
    ):
        if is_terminal:
            discounted_reward = 0
        discounted_reward = reward + (self.gamma * discounted_reward)
        rewards.insert(0, discounted_reward)

    # Normalizing the rewards
    rewards = torch.tensor(rewards, dtype=torch.float32).to(self.device)
    rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-7)

    # convert list to tensor
    assert len(self.buffer.actions) == len(self.buffer.states)

    states = [torch.tensor(st, dtype=torch.float32) for st in self.buffer.states]
    old_states = torch.squeeze(torch.stack(states, dim=0)).detach().to(self.device)
    old_actions = (
        torch.squeeze(torch.stack(self.buffer.actions, dim=0))
        .detach()
        .to(self.device)
    )
    old_logprobs = (
        torch.squeeze(torch.stack(self.buffer.logprobs, dim=0))
        .detach()
        .to(self.device)
    )
    old_state_values = (
        torch.squeeze(torch.stack(self.buffer.state_values, dim=0))
        .detach()
        .to(self.device)
    )

    # calculate advantages
    advantages = rewards.detach() - old_state_values.detach()

    av_state_values = 0
    max_state_value = -inf
    av_loss = 0
    # Optimize policy for K epochs
    for _ in range(iterations):
        # Evaluating old actions and values
        logprobs, state_values, dist_entropy = self.policy.evaluate(
            old_states, old_actions
        )

        # match state_values tensor dimensions with rewards tensor
        state_values = torch.squeeze(state_values)
        av_state_values += torch.mean(state_values)
        max_state_value = max(max_state_value, max(state_values))
        # Finding the ratio (pi_theta / pi_theta__old)
        ratios = torch.exp(logprobs - old_logprobs.detach())

        # Finding Surrogate Loss
        surr1 = ratios * advantages
        surr2 = (
            torch.clamp(ratios, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
        )

        # final loss of clipped objective PPO
        loss = (
            -torch.min(surr1, surr2)
            + 0.5 * self.MseLoss(state_values, rewards)
            - 0.01 * dist_entropy
        )

        # take gradient step
        self.optimizer.zero_grad()
        loss.mean().backward()
        self.optimizer.step()
        av_loss += loss.mean()

    # Copy new weights into old policy
    self.policy_old.load_state_dict(self.policy.state_dict())
    # clear buffer
    self.buffer.clear()
    self.decay_action_std(self.action_std_decay_rate, self.min_action_std)
    self.iter_count += 1
    # Write new values for tensorboard
    self.writer.add_scalar("train/loss", av_loss / iterations, self.iter_count)
    self.writer.add_scalar(
        "train/avg_value", av_state_values / iterations, self.iter_count
    )
    self.writer.add_scalar("train/max_value", max_state_value, self.iter_count)
    if self.save_every > 0 and self.iter_count % self.save_every == 0:
        self.save(filename=self.model_name, directory=self.save_directory)

RolloutBuffer

Buffer to store rollout data (transitions) for PPO training.

Attributes:

Name Type Description
actions list

Actions taken by the agent.

states list

States observed by the agent.

logprobs list

Log probabilities of the actions.

rewards list

Rewards received from the environment.

state_values list

Value estimates for the states.

is_terminals list

Flags indicating episode termination.

Source code in robot_nav/models/PPO/PPO.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
class RolloutBuffer:
    """
    Buffer to store rollout data (transitions) for PPO training.

    Attributes:
        actions (list): Actions taken by the agent.
        states (list): States observed by the agent.
        logprobs (list): Log probabilities of the actions.
        rewards (list): Rewards received from the environment.
        state_values (list): Value estimates for the states.
        is_terminals (list): Flags indicating episode termination.
    """

    def __init__(self):
        """
        Initialize empty lists to store buffer elements.
        """
        self.actions = []
        self.states = []
        self.logprobs = []
        self.rewards = []
        self.state_values = []
        self.is_terminals = []

    def clear(self):
        """
        Clear all stored data from the buffer.
        """
        del self.actions[:]
        del self.states[:]
        del self.logprobs[:]
        del self.rewards[:]
        del self.state_values[:]
        del self.is_terminals[:]

    def add(self, state, action, reward, terminal, next_state):
        """
        Add a transition to the buffer. (Partial implementation.)

        Args:
            state: The current observed state.
            action: The action taken.
            reward: The reward received after taking the action.
            terminal (bool): Whether the episode terminated.
            next_state: The resulting state after taking the action.
        """
        self.states.append(state)
        self.rewards.append(reward)
        self.is_terminals.append(terminal)

__init__()

Initialize empty lists to store buffer elements.

Source code in robot_nav/models/PPO/PPO.py
23
24
25
26
27
28
29
30
31
32
def __init__(self):
    """
    Initialize empty lists to store buffer elements.
    """
    self.actions = []
    self.states = []
    self.logprobs = []
    self.rewards = []
    self.state_values = []
    self.is_terminals = []

add(state, action, reward, terminal, next_state)

Add a transition to the buffer. (Partial implementation.)

Parameters:

Name Type Description Default
state

The current observed state.

required
action

The action taken.

required
reward

The reward received after taking the action.

required
terminal bool

Whether the episode terminated.

required
next_state

The resulting state after taking the action.

required
Source code in robot_nav/models/PPO/PPO.py
45
46
47
48
49
50
51
52
53
54
55
56
57
58
def add(self, state, action, reward, terminal, next_state):
    """
    Add a transition to the buffer. (Partial implementation.)

    Args:
        state: The current observed state.
        action: The action taken.
        reward: The reward received after taking the action.
        terminal (bool): Whether the episode terminated.
        next_state: The resulting state after taking the action.
    """
    self.states.append(state)
    self.rewards.append(reward)
    self.is_terminals.append(terminal)

clear()

Clear all stored data from the buffer.

Source code in robot_nav/models/PPO/PPO.py
34
35
36
37
38
39
40
41
42
43
def clear(self):
    """
    Clear all stored data from the buffer.
    """
    del self.actions[:]
    del self.states[:]
    del self.logprobs[:]
    del self.rewards[:]
    del self.state_values[:]
    del self.is_terminals[:]