Skip to content

DDPG

robot_nav.models.DDPG.DDPG

Actor

Bases: Module

Actor network for the DDPG algorithm.

This network maps input states to actions using a fully connected feedforward architecture. It uses Leaky ReLU activations in the hidden layers and a tanh activation at the output to ensure the output actions are in the range [-1, 1].

Architecture
  • Linear(state_dim → 400) + LeakyReLU
  • Linear(400 → 300) + LeakyReLU
  • Linear(300 → action_dim) + Tanh

Parameters:

Name Type Description Default
state_dim int

Dimension of the input state.

required
action_dim int

Dimension of the output action space.

required
Source code in robot_nav/models/DDPG/DDPG.py
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
class Actor(nn.Module):
    """
    Actor network for the DDPG algorithm.

    This network maps input states to actions using a fully connected feedforward architecture.
    It uses Leaky ReLU activations in the hidden layers and a tanh activation at the output
    to ensure the output actions are in the range [-1, 1].

    Architecture:
        - Linear(state_dim → 400) + LeakyReLU
        - Linear(400 → 300) + LeakyReLU
        - Linear(300 → action_dim) + Tanh

    Args:
        state_dim (int): Dimension of the input state.
        action_dim (int): Dimension of the output action space.
    """

    def __init__(self, state_dim, action_dim):
        super(Actor, self).__init__()

        self.layer_1 = nn.Linear(state_dim, 400)
        torch.nn.init.kaiming_uniform_(self.layer_1.weight, nonlinearity="leaky_relu")
        self.layer_2 = nn.Linear(400, 300)
        torch.nn.init.kaiming_uniform_(self.layer_2.weight, nonlinearity="leaky_relu")
        self.layer_3 = nn.Linear(300, action_dim)
        self.tanh = nn.Tanh()

    def forward(self, s):
        """
        Forward pass of the actor network.

        Args:
            s (torch.Tensor): Input state tensor of shape (batch_size, state_dim).

        Returns:
            torch.Tensor: Output action tensor of shape (batch_size, action_dim), scaled to [-1, 1].
        """
        s = F.leaky_relu(self.layer_1(s))
        s = F.leaky_relu(self.layer_2(s))
        a = self.tanh(self.layer_3(s))
        return a

forward(s)

Forward pass of the actor network.

Parameters:

Name Type Description Default
s Tensor

Input state tensor of shape (batch_size, state_dim).

required

Returns:

Type Description

torch.Tensor: Output action tensor of shape (batch_size, action_dim), scaled to [-1, 1].

Source code in robot_nav/models/DDPG/DDPG.py
41
42
43
44
45
46
47
48
49
50
51
52
53
54
def forward(self, s):
    """
    Forward pass of the actor network.

    Args:
        s (torch.Tensor): Input state tensor of shape (batch_size, state_dim).

    Returns:
        torch.Tensor: Output action tensor of shape (batch_size, action_dim), scaled to [-1, 1].
    """
    s = F.leaky_relu(self.layer_1(s))
    s = F.leaky_relu(self.layer_2(s))
    a = self.tanh(self.layer_3(s))
    return a

Critic

Bases: Module

Critic network for the DDPG algorithm.

This network evaluates the Q-value of a given state-action pair. It separately processes state and action inputs through linear layers, combines them, and passes the result through another linear layer to predict a scalar Q-value.

Architecture
  • Linear(state_dim → 400) + LeakyReLU
  • Linear(400 → 300) [state branch]
  • Linear(action_dim → 300) [action branch]
  • Combine both branches, apply LeakyReLU
  • Linear(300 → 1) for Q-value output

Parameters:

Name Type Description Default
state_dim int

Dimension of the input state.

required
action_dim int

Dimension of the input action.

required
Source code in robot_nav/models/DDPG/DDPG.py
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
class Critic(nn.Module):
    """
    Critic network for the DDPG algorithm.

    This network evaluates the Q-value of a given state-action pair. It separately processes
    state and action inputs through linear layers, combines them, and passes the result through
    another linear layer to predict a scalar Q-value.

    Architecture:
        - Linear(state_dim → 400) + LeakyReLU
        - Linear(400 → 300) [state branch]
        - Linear(action_dim → 300) [action branch]
        - Combine both branches, apply LeakyReLU
        - Linear(300 → 1) for Q-value output

    Args:
        state_dim (int): Dimension of the input state.
        action_dim (int): Dimension of the input action.
    """

    def __init__(self, state_dim, action_dim):
        super(Critic, self).__init__()

        self.layer_1 = nn.Linear(state_dim, 400)
        torch.nn.init.kaiming_uniform_(self.layer_1.weight, nonlinearity="leaky_relu")
        self.layer_2_s = nn.Linear(400, 300)
        torch.nn.init.kaiming_uniform_(self.layer_2_s.weight, nonlinearity="leaky_relu")
        self.layer_2_a = nn.Linear(action_dim, 300)
        torch.nn.init.kaiming_uniform_(self.layer_2_a.weight, nonlinearity="leaky_relu")
        self.layer_3 = nn.Linear(300, 1)
        torch.nn.init.kaiming_uniform_(self.layer_3.weight, nonlinearity="leaky_relu")

    def forward(self, s, a):
        """
        Forward pass of the critic network.

        Args:
            s (torch.Tensor): State tensor of shape (batch_size, state_dim).
            a (torch.Tensor): Action tensor of shape (batch_size, action_dim).

        Returns:
            torch.Tensor: Q-value tensor of shape (batch_size, 1).
        """
        s1 = F.leaky_relu(self.layer_1(s))
        self.layer_2_s(s1)
        self.layer_2_a(a)
        s11 = torch.mm(s1, self.layer_2_s.weight.data.t())
        s12 = torch.mm(a, self.layer_2_a.weight.data.t())
        s1 = F.leaky_relu(s11 + s12 + self.layer_2_a.bias.data)
        q = self.layer_3(s1)

        return q

forward(s, a)

Forward pass of the critic network.

Parameters:

Name Type Description Default
s Tensor

State tensor of shape (batch_size, state_dim).

required
a Tensor

Action tensor of shape (batch_size, action_dim).

required

Returns:

Type Description

torch.Tensor: Q-value tensor of shape (batch_size, 1).

Source code in robot_nav/models/DDPG/DDPG.py
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
def forward(self, s, a):
    """
    Forward pass of the critic network.

    Args:
        s (torch.Tensor): State tensor of shape (batch_size, state_dim).
        a (torch.Tensor): Action tensor of shape (batch_size, action_dim).

    Returns:
        torch.Tensor: Q-value tensor of shape (batch_size, 1).
    """
    s1 = F.leaky_relu(self.layer_1(s))
    self.layer_2_s(s1)
    self.layer_2_a(a)
    s11 = torch.mm(s1, self.layer_2_s.weight.data.t())
    s12 = torch.mm(a, self.layer_2_a.weight.data.t())
    s1 = F.leaky_relu(s11 + s12 + self.layer_2_a.bias.data)
    q = self.layer_3(s1)

    return q

DDPG

Bases: object

Deep Deterministic Policy Gradient (DDPG) agent implementation.

This class encapsulates the actor-critic learning framework using DDPG, which is suitable for continuous action spaces. It supports training, action selection, model saving/loading, and state preparation for a reinforcement learning agent, specifically designed for robot navigation.

Parameters:

Name Type Description Default
state_dim int

Dimension of the input state.

required
action_dim int

Dimension of the action space.

required
max_action float

Maximum action value allowed.

required
device device

Computation device (CPU or GPU).

required
lr float

Learning rate for the optimizers. Default is 1e-4.

0.0001
save_every int

Frequency of saving the model in training iterations. 0 means no saving. Default is 0.

0
load_model bool

Flag indicating whether to load a model from disk. Default is False.

False
save_directory Path

Directory to save the model checkpoints. Default is "robot_nav/models/DDPG/checkpoint".

Path('robot_nav/models/DDPG/checkpoint')
model_name str

Name used for saving and TensorBoard logging. Default is "DDPG".

'DDPG'
load_directory Path

Directory to load model checkpoints from. Default is "robot_nav/models/DDPG/checkpoint".

Path('robot_nav/models/DDPG/checkpoint')
use_max_bound bool

Whether to enforce a learned upper bound on the Q-value. Default is False.

False
bound_weight float

Weight of the upper bound loss penalty. Default is 0.25.

0.25
Source code in robot_nav/models/DDPG/DDPG.py
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
class DDPG(object):
    """
    Deep Deterministic Policy Gradient (DDPG) agent implementation.

    This class encapsulates the actor-critic learning framework using DDPG, which is suitable
    for continuous action spaces. It supports training, action selection, model saving/loading,
    and state preparation for a reinforcement learning agent, specifically designed for robot navigation.

    Args:
        state_dim (int): Dimension of the input state.
        action_dim (int): Dimension of the action space.
        max_action (float): Maximum action value allowed.
        device (torch.device): Computation device (CPU or GPU).
        lr (float): Learning rate for the optimizers. Default is 1e-4.
        save_every (int): Frequency of saving the model in training iterations. 0 means no saving. Default is 0.
        load_model (bool): Flag indicating whether to load a model from disk. Default is False.
        save_directory (Path): Directory to save the model checkpoints. Default is "robot_nav/models/DDPG/checkpoint".
        model_name (str): Name used for saving and TensorBoard logging. Default is "DDPG".
        load_directory (Path): Directory to load model checkpoints from. Default is "robot_nav/models/DDPG/checkpoint".
        use_max_bound (bool): Whether to enforce a learned upper bound on the Q-value. Default is False.
        bound_weight (float): Weight of the upper bound loss penalty. Default is 0.25.
    """

    def __init__(
        self,
        state_dim,
        action_dim,
        max_action,
        device,
        lr=1e-4,
        save_every=0,
        load_model=False,
        save_directory=Path("robot_nav/models/DDPG/checkpoint"),
        model_name="DDPG",
        load_directory=Path("robot_nav/models/DDPG/checkpoint"),
        use_max_bound=False,
        bound_weight=0.25,
    ):
        # Initialize the Actor network
        self.device = device
        self.actor = Actor(state_dim, action_dim).to(self.device)
        self.actor_target = Actor(state_dim, action_dim).to(self.device)
        self.actor_target.load_state_dict(self.actor.state_dict())
        self.actor_optimizer = torch.optim.Adam(params=self.actor.parameters(), lr=lr)

        # Initialize the Critic networks
        self.critic = Critic(state_dim, action_dim).to(self.device)
        self.critic_target = Critic(state_dim, action_dim).to(self.device)
        self.critic_target.load_state_dict(self.critic.state_dict())
        self.critic_optimizer = torch.optim.Adam(params=self.critic.parameters(), lr=lr)

        self.action_dim = action_dim
        self.max_action = max_action
        self.state_dim = state_dim
        self.writer = SummaryWriter(comment=model_name)
        self.iter_count = 0
        if load_model:
            self.load(filename=model_name, directory=load_directory)
        self.save_every = save_every
        self.model_name = model_name
        self.save_directory = save_directory
        self.use_max_bound = use_max_bound
        self.bound_weight = bound_weight

    def get_action(self, obs, add_noise):
        """
        Selects an action based on the observation.

        Args:
            obs (np.array): The current state observation.
            add_noise (bool): Whether to add exploration noise to the action.

        Returns:
            np.array: Action selected by the actor network.
        """
        if add_noise:
            return (
                self.act(obs) + np.random.normal(0, 0.2, size=self.action_dim)
            ).clip(-self.max_action, self.max_action)
        else:
            return self.act(obs)

    def act(self, state):
        """
        Computes the action for a given state using the actor network.

        Args:
            state (np.array): Environment state.

        Returns:
            np.array: Action values as output by the actor network.
        """
        state = torch.Tensor(state).to(self.device)
        return self.actor(state).cpu().data.numpy().flatten()

    # training cycle
    def train(
        self,
        replay_buffer,
        iterations,
        batch_size,
        discount=0.99,
        tau=0.005,
        policy_noise=0.2,
        noise_clip=0.5,
        policy_freq=2,
        max_lin_vel=0.5,
        max_ang_vel=1,
        goal_reward=100,
        distance_norm=10,
        time_step=0.3,
    ):
        """
        Trains the actor and critic networks using a replay buffer and soft target updates.

        Args:
            replay_buffer (object): Replay buffer object with a sample_batch method.
            iterations (int): Number of training iterations.
            batch_size (int): Size of each training batch.
            discount (float): Discount factor for future rewards.
            tau (float): Soft update factor for target networks.
            policy_noise (float): Standard deviation of noise added to target policy.
            noise_clip (float): Maximum value to clip target policy noise.
            policy_freq (int): Frequency of actor and target updates.
            max_lin_vel (float): Maximum linear velocity, used in Q-bound calculation.
            max_ang_vel (float): Maximum angular velocity, used in Q-bound calculation.
            goal_reward (float): Reward given upon reaching goal.
            distance_norm (float): Distance normalization factor.
            time_step (float): Time step used in max bound calculation.
        """
        av_Q = 0
        max_Q = -inf
        av_loss = 0
        for it in range(iterations):
            # sample a batch from the replay buffer
            (
                batch_states,
                batch_actions,
                batch_rewards,
                batch_dones,
                batch_next_states,
            ) = replay_buffer.sample_batch(batch_size)
            state = torch.Tensor(batch_states).to(self.device)
            next_state = torch.Tensor(batch_next_states).to(self.device)
            action = torch.Tensor(batch_actions).to(self.device)
            reward = torch.Tensor(batch_rewards).to(self.device)
            done = torch.Tensor(batch_dones).to(self.device)

            # Obtain the estimated action from the next state by using the actor-target
            next_action = self.actor_target(next_state)

            # Add noise to the action
            noise = (
                torch.Tensor(batch_actions)
                .data.normal_(0, policy_noise)
                .to(self.device)
            )
            noise = noise.clamp(-noise_clip, noise_clip)
            next_action = (next_action + noise).clamp(-self.max_action, self.max_action)

            # Calculate the Q values from the critic-target network for the next state-action pair
            target_Q = self.critic_target(next_state, next_action)

            av_Q += torch.mean(target_Q)
            max_Q = max(max_Q, torch.max(target_Q))
            # Calculate the final Q value from the target network parameters by using Bellman equation
            target_Q = reward + ((1 - done) * discount * target_Q).detach()

            # Get the Q values of the basis networks with the current parameters
            current_Q = self.critic(state, action)

            # Calculate the loss between the current Q value and the target Q value
            loss = F.mse_loss(current_Q, target_Q)

            if self.use_max_bound:
                max_bound = get_max_bound(
                    next_state,
                    discount,
                    max_ang_vel,
                    max_lin_vel,
                    time_step,
                    distance_norm,
                    goal_reward,
                    reward,
                    done,
                    self.device,
                )
                max_excess_Q = F.relu(current_Q - max_bound)
                max_bound_loss = (max_excess_Q**2).mean()
                # Add loss for Q values exceeding maximum possible upper bound
                loss += self.bound_weight * max_bound_loss

            # Perform the gradient descent
            self.critic_optimizer.zero_grad()
            loss.backward()
            self.critic_optimizer.step()

            if it % policy_freq == 0:
                # Maximize the actor output value by performing gradient descent on negative Q values
                # (essentially perform gradient ascent)
                actor_grad = self.critic(state, self.actor(state))
                actor_grad = -actor_grad.mean()
                self.actor_optimizer.zero_grad()
                actor_grad.backward()
                self.actor_optimizer.step()

                # Use soft update to update the actor-target network parameters by
                # infusing small amount of current parameters
                for param, target_param in zip(
                    self.actor.parameters(), self.actor_target.parameters()
                ):
                    target_param.data.copy_(
                        tau * param.data + (1 - tau) * target_param.data
                    )
                # Use soft update to update the critic-target network parameters by infusing
                # small amount of current parameters
                for param, target_param in zip(
                    self.critic.parameters(), self.critic_target.parameters()
                ):
                    target_param.data.copy_(
                        tau * param.data + (1 - tau) * target_param.data
                    )

            av_loss += loss
        self.iter_count += 1
        # Write new values for tensorboard
        self.writer.add_scalar("train/loss", av_loss / iterations, self.iter_count)
        self.writer.add_scalar("train/avg_Q", av_Q / iterations, self.iter_count)
        self.writer.add_scalar("train/max_Q", max_Q, self.iter_count)
        if self.save_every > 0 and self.iter_count % self.save_every == 0:
            self.save(filename=self.model_name, directory=self.save_directory)

    def save(self, filename, directory):
        """
        Saves the model parameters to disk.

        Args:
            filename (str): Base filename for saving the model components.
            directory (str or Path): Directory where the model files will be saved.
        """
        Path(directory).mkdir(parents=True, exist_ok=True)
        torch.save(self.actor.state_dict(), "%s/%s_actor.pth" % (directory, filename))
        torch.save(
            self.actor_target.state_dict(),
            "%s/%s_actor_target.pth" % (directory, filename),
        )
        torch.save(self.critic.state_dict(), "%s/%s_critic.pth" % (directory, filename))
        torch.save(
            self.critic_target.state_dict(),
            "%s/%s_critic_target.pth" % (directory, filename),
        )

    def load(self, filename, directory):
        """
        Loads model parameters from disk.

        Args:
            filename (str): Base filename used for loading model components.
            directory (str or Path): Directory to load the model files from.
        """
        self.actor.load_state_dict(
            torch.load("%s/%s_actor.pth" % (directory, filename))
        )
        self.actor_target.load_state_dict(
            torch.load("%s/%s_actor_target.pth" % (directory, filename))
        )
        self.critic.load_state_dict(
            torch.load("%s/%s_critic.pth" % (directory, filename))
        )
        self.critic_target.load_state_dict(
            torch.load("%s/%s_critic_target.pth" % (directory, filename))
        )
        print(f"Loaded weights from: {directory}")

    def prepare_state(self, latest_scan, distance, cos, sin, collision, goal, action):
        """
        Processes raw sensor input and additional information into a normalized state representation.

        Args:
            latest_scan (list or np.array): Raw LIDAR or laser scan data.
            distance (float): Distance to the goal.
            cos (float): Cosine of the angle to the goal.
            sin (float): Sine of the angle to the goal.
            collision (bool): Whether a collision has occurred.
            goal (bool): Whether the goal has been reached.
            action (list or np.array): The action taken in the previous step.

        Returns:
            return (tuple): (state vector, terminal flag)
        """
        latest_scan = np.array(latest_scan)

        inf_mask = np.isinf(latest_scan)
        latest_scan[inf_mask] = 7.0

        max_bins = self.state_dim - 5
        bin_size = int(np.ceil(len(latest_scan) / max_bins))

        # Initialize the list to store the minimum values of each bin
        min_values = []

        # Loop through the data and create bins
        for i in range(0, len(latest_scan), bin_size):
            # Get the current bin
            bin = latest_scan[i : i + min(bin_size, len(latest_scan) - i)]
            # Find the minimum value in the current bin and append it to the min_values list
            min_values.append(min(bin) / 7)

        # Normalize to [0, 1] range
        distance /= 10
        lin_vel = action[0] * 2
        ang_vel = (action[1] + 1) / 2
        state = min_values + [distance, cos, sin] + [lin_vel, ang_vel]

        assert len(state) == self.state_dim
        terminal = 1 if collision or goal else 0

        return state, terminal

act(state)

Computes the action for a given state using the actor network.

Parameters:

Name Type Description Default
state array

Environment state.

required

Returns:

Type Description

np.array: Action values as output by the actor network.

Source code in robot_nav/models/DDPG/DDPG.py
194
195
196
197
198
199
200
201
202
203
204
205
def act(self, state):
    """
    Computes the action for a given state using the actor network.

    Args:
        state (np.array): Environment state.

    Returns:
        np.array: Action values as output by the actor network.
    """
    state = torch.Tensor(state).to(self.device)
    return self.actor(state).cpu().data.numpy().flatten()

get_action(obs, add_noise)

Selects an action based on the observation.

Parameters:

Name Type Description Default
obs array

The current state observation.

required
add_noise bool

Whether to add exploration noise to the action.

required

Returns:

Type Description

np.array: Action selected by the actor network.

Source code in robot_nav/models/DDPG/DDPG.py
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
def get_action(self, obs, add_noise):
    """
    Selects an action based on the observation.

    Args:
        obs (np.array): The current state observation.
        add_noise (bool): Whether to add exploration noise to the action.

    Returns:
        np.array: Action selected by the actor network.
    """
    if add_noise:
        return (
            self.act(obs) + np.random.normal(0, 0.2, size=self.action_dim)
        ).clip(-self.max_action, self.max_action)
    else:
        return self.act(obs)

load(filename, directory)

Loads model parameters from disk.

Parameters:

Name Type Description Default
filename str

Base filename used for loading model components.

required
directory str or Path

Directory to load the model files from.

required
Source code in robot_nav/models/DDPG/DDPG.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
def load(self, filename, directory):
    """
    Loads model parameters from disk.

    Args:
        filename (str): Base filename used for loading model components.
        directory (str or Path): Directory to load the model files from.
    """
    self.actor.load_state_dict(
        torch.load("%s/%s_actor.pth" % (directory, filename))
    )
    self.actor_target.load_state_dict(
        torch.load("%s/%s_actor_target.pth" % (directory, filename))
    )
    self.critic.load_state_dict(
        torch.load("%s/%s_critic.pth" % (directory, filename))
    )
    self.critic_target.load_state_dict(
        torch.load("%s/%s_critic_target.pth" % (directory, filename))
    )
    print(f"Loaded weights from: {directory}")

prepare_state(latest_scan, distance, cos, sin, collision, goal, action)

Processes raw sensor input and additional information into a normalized state representation.

Parameters:

Name Type Description Default
latest_scan list or array

Raw LIDAR or laser scan data.

required
distance float

Distance to the goal.

required
cos float

Cosine of the angle to the goal.

required
sin float

Sine of the angle to the goal.

required
collision bool

Whether a collision has occurred.

required
goal bool

Whether the goal has been reached.

required
action list or array

The action taken in the previous step.

required

Returns:

Name Type Description
return tuple

(state vector, terminal flag)

Source code in robot_nav/models/DDPG/DDPG.py
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
def prepare_state(self, latest_scan, distance, cos, sin, collision, goal, action):
    """
    Processes raw sensor input and additional information into a normalized state representation.

    Args:
        latest_scan (list or np.array): Raw LIDAR or laser scan data.
        distance (float): Distance to the goal.
        cos (float): Cosine of the angle to the goal.
        sin (float): Sine of the angle to the goal.
        collision (bool): Whether a collision has occurred.
        goal (bool): Whether the goal has been reached.
        action (list or np.array): The action taken in the previous step.

    Returns:
        return (tuple): (state vector, terminal flag)
    """
    latest_scan = np.array(latest_scan)

    inf_mask = np.isinf(latest_scan)
    latest_scan[inf_mask] = 7.0

    max_bins = self.state_dim - 5
    bin_size = int(np.ceil(len(latest_scan) / max_bins))

    # Initialize the list to store the minimum values of each bin
    min_values = []

    # Loop through the data and create bins
    for i in range(0, len(latest_scan), bin_size):
        # Get the current bin
        bin = latest_scan[i : i + min(bin_size, len(latest_scan) - i)]
        # Find the minimum value in the current bin and append it to the min_values list
        min_values.append(min(bin) / 7)

    # Normalize to [0, 1] range
    distance /= 10
    lin_vel = action[0] * 2
    ang_vel = (action[1] + 1) / 2
    state = min_values + [distance, cos, sin] + [lin_vel, ang_vel]

    assert len(state) == self.state_dim
    terminal = 1 if collision or goal else 0

    return state, terminal

save(filename, directory)

Saves the model parameters to disk.

Parameters:

Name Type Description Default
filename str

Base filename for saving the model components.

required
directory str or Path

Directory where the model files will be saved.

required
Source code in robot_nav/models/DDPG/DDPG.py
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def save(self, filename, directory):
    """
    Saves the model parameters to disk.

    Args:
        filename (str): Base filename for saving the model components.
        directory (str or Path): Directory where the model files will be saved.
    """
    Path(directory).mkdir(parents=True, exist_ok=True)
    torch.save(self.actor.state_dict(), "%s/%s_actor.pth" % (directory, filename))
    torch.save(
        self.actor_target.state_dict(),
        "%s/%s_actor_target.pth" % (directory, filename),
    )
    torch.save(self.critic.state_dict(), "%s/%s_critic.pth" % (directory, filename))
    torch.save(
        self.critic_target.state_dict(),
        "%s/%s_critic_target.pth" % (directory, filename),
    )

train(replay_buffer, iterations, batch_size, discount=0.99, tau=0.005, policy_noise=0.2, noise_clip=0.5, policy_freq=2, max_lin_vel=0.5, max_ang_vel=1, goal_reward=100, distance_norm=10, time_step=0.3)

Trains the actor and critic networks using a replay buffer and soft target updates.

Parameters:

Name Type Description Default
replay_buffer object

Replay buffer object with a sample_batch method.

required
iterations int

Number of training iterations.

required
batch_size int

Size of each training batch.

required
discount float

Discount factor for future rewards.

0.99
tau float

Soft update factor for target networks.

0.005
policy_noise float

Standard deviation of noise added to target policy.

0.2
noise_clip float

Maximum value to clip target policy noise.

0.5
policy_freq int

Frequency of actor and target updates.

2
max_lin_vel float

Maximum linear velocity, used in Q-bound calculation.

0.5
max_ang_vel float

Maximum angular velocity, used in Q-bound calculation.

1
goal_reward float

Reward given upon reaching goal.

100
distance_norm float

Distance normalization factor.

10
time_step float

Time step used in max bound calculation.

0.3
Source code in robot_nav/models/DDPG/DDPG.py
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
def train(
    self,
    replay_buffer,
    iterations,
    batch_size,
    discount=0.99,
    tau=0.005,
    policy_noise=0.2,
    noise_clip=0.5,
    policy_freq=2,
    max_lin_vel=0.5,
    max_ang_vel=1,
    goal_reward=100,
    distance_norm=10,
    time_step=0.3,
):
    """
    Trains the actor and critic networks using a replay buffer and soft target updates.

    Args:
        replay_buffer (object): Replay buffer object with a sample_batch method.
        iterations (int): Number of training iterations.
        batch_size (int): Size of each training batch.
        discount (float): Discount factor for future rewards.
        tau (float): Soft update factor for target networks.
        policy_noise (float): Standard deviation of noise added to target policy.
        noise_clip (float): Maximum value to clip target policy noise.
        policy_freq (int): Frequency of actor and target updates.
        max_lin_vel (float): Maximum linear velocity, used in Q-bound calculation.
        max_ang_vel (float): Maximum angular velocity, used in Q-bound calculation.
        goal_reward (float): Reward given upon reaching goal.
        distance_norm (float): Distance normalization factor.
        time_step (float): Time step used in max bound calculation.
    """
    av_Q = 0
    max_Q = -inf
    av_loss = 0
    for it in range(iterations):
        # sample a batch from the replay buffer
        (
            batch_states,
            batch_actions,
            batch_rewards,
            batch_dones,
            batch_next_states,
        ) = replay_buffer.sample_batch(batch_size)
        state = torch.Tensor(batch_states).to(self.device)
        next_state = torch.Tensor(batch_next_states).to(self.device)
        action = torch.Tensor(batch_actions).to(self.device)
        reward = torch.Tensor(batch_rewards).to(self.device)
        done = torch.Tensor(batch_dones).to(self.device)

        # Obtain the estimated action from the next state by using the actor-target
        next_action = self.actor_target(next_state)

        # Add noise to the action
        noise = (
            torch.Tensor(batch_actions)
            .data.normal_(0, policy_noise)
            .to(self.device)
        )
        noise = noise.clamp(-noise_clip, noise_clip)
        next_action = (next_action + noise).clamp(-self.max_action, self.max_action)

        # Calculate the Q values from the critic-target network for the next state-action pair
        target_Q = self.critic_target(next_state, next_action)

        av_Q += torch.mean(target_Q)
        max_Q = max(max_Q, torch.max(target_Q))
        # Calculate the final Q value from the target network parameters by using Bellman equation
        target_Q = reward + ((1 - done) * discount * target_Q).detach()

        # Get the Q values of the basis networks with the current parameters
        current_Q = self.critic(state, action)

        # Calculate the loss between the current Q value and the target Q value
        loss = F.mse_loss(current_Q, target_Q)

        if self.use_max_bound:
            max_bound = get_max_bound(
                next_state,
                discount,
                max_ang_vel,
                max_lin_vel,
                time_step,
                distance_norm,
                goal_reward,
                reward,
                done,
                self.device,
            )
            max_excess_Q = F.relu(current_Q - max_bound)
            max_bound_loss = (max_excess_Q**2).mean()
            # Add loss for Q values exceeding maximum possible upper bound
            loss += self.bound_weight * max_bound_loss

        # Perform the gradient descent
        self.critic_optimizer.zero_grad()
        loss.backward()
        self.critic_optimizer.step()

        if it % policy_freq == 0:
            # Maximize the actor output value by performing gradient descent on negative Q values
            # (essentially perform gradient ascent)
            actor_grad = self.critic(state, self.actor(state))
            actor_grad = -actor_grad.mean()
            self.actor_optimizer.zero_grad()
            actor_grad.backward()
            self.actor_optimizer.step()

            # Use soft update to update the actor-target network parameters by
            # infusing small amount of current parameters
            for param, target_param in zip(
                self.actor.parameters(), self.actor_target.parameters()
            ):
                target_param.data.copy_(
                    tau * param.data + (1 - tau) * target_param.data
                )
            # Use soft update to update the critic-target network parameters by infusing
            # small amount of current parameters
            for param, target_param in zip(
                self.critic.parameters(), self.critic_target.parameters()
            ):
                target_param.data.copy_(
                    tau * param.data + (1 - tau) * target_param.data
                )

        av_loss += loss
    self.iter_count += 1
    # Write new values for tensorboard
    self.writer.add_scalar("train/loss", av_loss / iterations, self.iter_count)
    self.writer.add_scalar("train/avg_Q", av_Q / iterations, self.iter_count)
    self.writer.add_scalar("train/max_Q", max_Q, self.iter_count)
    if self.save_every > 0 and self.iter_count % self.save_every == 0:
        self.save(filename=self.model_name, directory=self.save_directory)