Skip to content

SAC

robot_nav.models.SAC.SAC

SAC

Bases: object

Soft Actor-Critic (SAC) implementation.

This class implements the SAC algorithm using a Gaussian policy actor and double Q-learning critic. It supports automatic entropy tuning, model saving/loading, and logging via TensorBoard.

Parameters:

Name Type Description Default
state_dim int

Dimension of the observation/state space.

required
action_dim int

Dimension of the action space.

required
device str

PyTorch device (e.g., 'cpu' or 'cuda').

required
max_action float

Maximum magnitude of actions.

required
discount float

Discount factor for rewards.

0.99
init_temperature float

Initial entropy temperature.

0.1
alpha_lr float

Learning rate for entropy temperature alpha.

0.0001
alpha_betas tuple

Adam optimizer betas for alpha.

(0.9, 0.999)
actor_lr float

Learning rate for actor network.

0.0001
actor_betas tuple

Adam optimizer betas for actor.

(0.9, 0.999)
actor_update_frequency int

Frequency of actor updates.

1
critic_lr float

Learning rate for critic network.

0.0001
critic_betas tuple

Adam optimizer betas for critic.

(0.9, 0.999)
critic_tau float

Soft update parameter for critic target.

0.005
critic_target_update_frequency int

Frequency of critic target updates.

2
learnable_temperature bool

Whether alpha is learnable.

True
save_every int

Save model every N training steps. Set 0 to disable.

0
load_model bool

Whether to load model from disk at init.

False
log_dist_and_hist bool

Log distribution and histogram if True.

False
save_directory Path

Directory to save models.

Path('robot_nav/models/SAC/checkpoint')
model_name str

Name for model checkpoints.

'SAC'
load_directory Path

Directory to load model checkpoints from.

Path('robot_nav/models/SAC/checkpoint')
Source code in robot_nav/models/SAC/SAC.py
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
class SAC(object):
    """
    Soft Actor-Critic (SAC) implementation.

    This class implements the SAC algorithm using a Gaussian policy actor and double Q-learning critic.
    It supports automatic entropy tuning, model saving/loading, and logging via TensorBoard.

    Args:
        state_dim (int): Dimension of the observation/state space.
        action_dim (int): Dimension of the action space.
        device (str): PyTorch device (e.g., 'cpu' or 'cuda').
        max_action (float): Maximum magnitude of actions.
        discount (float): Discount factor for rewards.
        init_temperature (float): Initial entropy temperature.
        alpha_lr (float): Learning rate for entropy temperature alpha.
        alpha_betas (tuple): Adam optimizer betas for alpha.
        actor_lr (float): Learning rate for actor network.
        actor_betas (tuple): Adam optimizer betas for actor.
        actor_update_frequency (int): Frequency of actor updates.
        critic_lr (float): Learning rate for critic network.
        critic_betas (tuple): Adam optimizer betas for critic.
        critic_tau (float): Soft update parameter for critic target.
        critic_target_update_frequency (int): Frequency of critic target updates.
        learnable_temperature (bool): Whether alpha is learnable.
        save_every (int): Save model every N training steps. Set 0 to disable.
        load_model (bool): Whether to load model from disk at init.
        log_dist_and_hist (bool): Log distribution and histogram if True.
        save_directory (Path): Directory to save models.
        model_name (str): Name for model checkpoints.
        load_directory (Path): Directory to load model checkpoints from.
    """

    def __init__(
        self,
        state_dim,
        action_dim,
        device,
        max_action,
        discount=0.99,
        init_temperature=0.1,
        alpha_lr=1e-4,
        alpha_betas=(0.9, 0.999),
        actor_lr=1e-4,
        actor_betas=(0.9, 0.999),
        actor_update_frequency=1,
        critic_lr=1e-4,
        critic_betas=(0.9, 0.999),
        critic_tau=0.005,
        critic_target_update_frequency=2,
        learnable_temperature=True,
        save_every=0,
        load_model=False,
        log_dist_and_hist=False,
        save_directory=Path("robot_nav/models/SAC/checkpoint"),
        model_name="SAC",
        load_directory=Path("robot_nav/models/SAC/checkpoint"),
    ):
        super().__init__()

        self.state_dim = state_dim
        self.action_dim = action_dim
        self.action_range = (-max_action, max_action)
        self.device = torch.device(device)
        self.discount = discount
        self.critic_tau = critic_tau
        self.actor_update_frequency = actor_update_frequency
        self.critic_target_update_frequency = critic_target_update_frequency
        self.learnable_temperature = learnable_temperature
        self.save_every = save_every
        self.model_name = model_name
        self.save_directory = save_directory
        self.log_dist_and_hist = log_dist_and_hist

        self.train_metrics_dict = {
            "train_critic/loss_av": [],
            "train_actor/loss_av": [],
            "train_actor/target_entropy_av": [],
            "train_actor/entropy_av": [],
            "train_alpha/loss_av": [],
            "train_alpha/value_av": [],
            "train/batch_reward_av": [],
        }

        self.critic = critic_model(
            obs_dim=self.state_dim,
            action_dim=action_dim,
            hidden_dim=400,
            hidden_depth=2,
        ).to(self.device)
        self.critic_target = critic_model(
            obs_dim=self.state_dim,
            action_dim=action_dim,
            hidden_dim=400,
            hidden_depth=2,
        ).to(self.device)
        self.critic_target.load_state_dict(self.critic.state_dict())

        self.actor = actor_model(
            obs_dim=self.state_dim,
            action_dim=action_dim,
            hidden_dim=400,
            hidden_depth=2,
            log_std_bounds=[-5, 2],
        ).to(self.device)

        if load_model:
            self.load(filename=model_name, directory=load_directory)

        self.log_alpha = torch.tensor(np.log(init_temperature)).to(self.device)
        self.log_alpha.requires_grad = True
        # set target entropy to -|A|
        self.target_entropy = -action_dim

        # optimizers
        self.actor_optimizer = torch.optim.Adam(
            self.actor.parameters(), lr=actor_lr, betas=actor_betas
        )

        self.critic_optimizer = torch.optim.Adam(
            self.critic.parameters(), lr=critic_lr, betas=critic_betas
        )

        self.log_alpha_optimizer = torch.optim.Adam(
            [self.log_alpha], lr=alpha_lr, betas=alpha_betas
        )

        self.critic_target.train()

        self.actor.train(True)
        self.critic.train(True)
        self.step = 0
        self.writer = SummaryWriter(comment=model_name)

    def save(self, filename, directory):
        """
        Save the actor, critic, and target critic models to the specified directory.

        Args:
            filename (str): Base name of the saved files.
            directory (Path): Directory where models are saved.
        """
        Path(directory).mkdir(parents=True, exist_ok=True)
        torch.save(self.actor.state_dict(), "%s/%s_actor.pth" % (directory, filename))
        torch.save(self.critic.state_dict(), "%s/%s_critic.pth" % (directory, filename))
        torch.save(
            self.critic_target.state_dict(),
            "%s/%s_critic_target.pth" % (directory, filename),
        )

    def load(self, filename, directory):
        """
        Load the actor, critic, and target critic models from the specified directory.

        Args:
            filename (str): Base name of the saved files.
            directory (Path): Directory where models are loaded from.
        """
        self.actor.load_state_dict(
            torch.load("%s/%s_actor.pth" % (directory, filename))
        )
        self.critic.load_state_dict(
            torch.load("%s/%s_critic.pth" % (directory, filename))
        )
        self.critic_target.load_state_dict(
            torch.load("%s/%s_critic_target.pth" % (directory, filename))
        )
        print(f"Loaded weights from: {directory}")

    def train(self, replay_buffer, iterations, batch_size):
        """
        Run multiple training updates using data from the replay buffer.

        Args:
            replay_buffer: Buffer from which to sample training data.
            iterations (int): Number of training iterations to run.
            batch_size (int): Batch size for each update.
        """
        for _ in range(iterations):
            self.update(
                replay_buffer=replay_buffer, step=self.step, batch_size=batch_size
            )

        for key, value in self.train_metrics_dict.items():
            if len(value):
                self.writer.add_scalar(key, mean(value), self.step)
            self.train_metrics_dict[key] = []
        self.step += 1

        if self.save_every > 0 and self.step % self.save_every == 0:
            self.save(filename=self.model_name, directory=self.save_directory)

    @property
    def alpha(self):
        """
        Returns:
            torch.Tensor: Current value of the entropy temperature alpha.
        """
        return self.log_alpha.exp()

    def get_action(self, obs, add_noise):
        """
        Select an action given an observation.

        Args:
            obs (np.ndarray): Input observation.
            add_noise (bool): Whether to add exploration noise.

        Returns:
            np.ndarray: Action vector.
        """
        if add_noise:
            return (
                self.act(obs) + np.random.normal(0, 0.2, size=self.action_dim)
            ).clip(self.action_range[0], self.action_range[1])
        else:
            return self.act(obs)

    def act(self, obs, sample=False):
        """
        Generate an action from the actor network.

        Args:
            obs (np.ndarray): Input observation.
            sample (bool): If True, sample from the policy; otherwise use the mean.

        Returns:
            np.ndarray: Action vector.
        """
        obs = torch.FloatTensor(obs).to(self.device)
        obs = obs.unsqueeze(0)
        dist = self.actor(obs)
        action = dist.sample() if sample else dist.mean
        action = action.clamp(*self.action_range)
        assert action.ndim == 2 and action.shape[0] == 1
        return utils.to_np(action[0])

    def update_critic(self, obs, action, reward, next_obs, done, step):
        """
        Update the critic network based on a batch of transitions.

        Args:
            obs (torch.Tensor): Batch of current observations.
            action (torch.Tensor): Batch of actions taken.
            reward (torch.Tensor): Batch of received rewards.
            next_obs (torch.Tensor): Batch of next observations.
            done (torch.Tensor): Batch of done flags.
            step (int): Current training step (for logging).
        """
        dist = self.actor(next_obs)
        next_action = dist.rsample()
        log_prob = dist.log_prob(next_action).sum(-1, keepdim=True)
        target_Q1, target_Q2 = self.critic_target(next_obs, next_action)
        target_V = torch.min(target_Q1, target_Q2) - self.alpha.detach() * log_prob
        target_Q = reward + ((1 - done) * self.discount * target_V)
        target_Q = target_Q.detach()

        # get current Q estimates
        current_Q1, current_Q2 = self.critic(obs, action)
        critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(
            current_Q2, target_Q
        )
        self.train_metrics_dict["train_critic/loss_av"].append(critic_loss.item())
        self.writer.add_scalar("train_critic/loss", critic_loss, step)

        # Optimize the critic
        self.critic_optimizer.zero_grad()
        critic_loss.backward()
        self.critic_optimizer.step()
        if self.log_dist_and_hist:
            self.critic.log(self.writer, step)

    def update_actor_and_alpha(self, obs, step):
        """
        Update the actor and optionally the entropy temperature.

        Args:
            obs (torch.Tensor): Batch of observations.
            step (int): Current training step (for logging).
        """
        dist = self.actor(obs)
        action = dist.rsample()
        log_prob = dist.log_prob(action).sum(-1, keepdim=True)
        actor_Q1, actor_Q2 = self.critic(obs, action)

        actor_Q = torch.min(actor_Q1, actor_Q2)
        actor_loss = (self.alpha.detach() * log_prob - actor_Q).mean()
        self.train_metrics_dict["train_actor/loss_av"].append(actor_loss.item())
        self.train_metrics_dict["train_actor/target_entropy_av"].append(
            self.target_entropy
        )
        self.train_metrics_dict["train_actor/entropy_av"].append(
            -log_prob.mean().item()
        )
        self.writer.add_scalar("train_actor/loss", actor_loss, step)
        self.writer.add_scalar("train_actor/target_entropy", self.target_entropy, step)
        self.writer.add_scalar("train_actor/entropy", -log_prob.mean(), step)

        # optimize the actor
        self.actor_optimizer.zero_grad()
        actor_loss.backward()
        self.actor_optimizer.step()
        if self.log_dist_and_hist:
            self.actor.log(self.writer, step)

        if self.learnable_temperature:
            self.log_alpha_optimizer.zero_grad()
            alpha_loss = (
                self.alpha * (-log_prob - self.target_entropy).detach()
            ).mean()
            self.train_metrics_dict["train_alpha/loss_av"].append(alpha_loss.item())
            self.train_metrics_dict["train_alpha/value_av"].append(self.alpha.item())
            self.writer.add_scalar("train_alpha/loss", alpha_loss, step)
            self.writer.add_scalar("train_alpha/value", self.alpha, step)
            alpha_loss.backward()
            self.log_alpha_optimizer.step()

    def update(self, replay_buffer, step, batch_size):
        """
        Perform a full update step (critic, actor, alpha, target critic).

        Args:
            replay_buffer: Buffer to sample from.
            step (int): Current training step.
            batch_size (int): Size of sample batch.
        """
        (
            batch_states,
            batch_actions,
            batch_rewards,
            batch_dones,
            batch_next_states,
        ) = replay_buffer.sample_batch(batch_size)

        state = torch.Tensor(batch_states).to(self.device)
        next_state = torch.Tensor(batch_next_states).to(self.device)
        action = torch.Tensor(batch_actions).to(self.device)
        reward = torch.Tensor(batch_rewards).to(self.device)
        done = torch.Tensor(batch_dones).to(self.device)
        self.train_metrics_dict["train/batch_reward_av"].append(
            batch_rewards.mean().item()
        )
        self.writer.add_scalar("train/batch_reward", batch_rewards.mean(), step)

        self.update_critic(state, action, reward, next_state, done, step)

        if step % self.actor_update_frequency == 0:
            self.update_actor_and_alpha(state, step)

        if step % self.critic_target_update_frequency == 0:
            utils.soft_update_params(self.critic, self.critic_target, self.critic_tau)

    def prepare_state(self, latest_scan, distance, cos, sin, collision, goal, action):
        """
        Convert raw sensor input into a normalized state vector.

        Args:
            latest_scan (list or np.ndarray): Laser scan distances.
            distance (float): Distance to goal.
            cos (float): Cosine of heading angle to goal.
            sin (float): Sine of heading angle to goal.
            collision (bool): Whether the robot has collided.
            goal (bool): Whether the goal has been reached.
            action (list): Last action taken [linear_vel, angular_vel].

        Returns:
            tuple: (state vector as list, terminal flag as int)
        """
        latest_scan = np.array(latest_scan)

        inf_mask = np.isinf(latest_scan)
        latest_scan[inf_mask] = 7.0

        max_bins = self.state_dim - 5
        bin_size = int(np.ceil(len(latest_scan) / max_bins))

        # Initialize the list to store the minimum values of each bin
        min_values = []

        # Loop through the data and create bins
        for i in range(0, len(latest_scan), bin_size):
            # Get the current bin
            bin = latest_scan[i : i + min(bin_size, len(latest_scan) - i)]
            # Find the minimum value in the current bin and append it to the min_values list
            min_values.append(min(bin) / 7)

        # Normalize to [0, 1] range
        distance /= 10
        lin_vel = action[0] * 2
        ang_vel = (action[1] + 1) / 2
        state = min_values + [distance, cos, sin] + [lin_vel, ang_vel]

        assert len(state) == self.state_dim
        terminal = 1 if collision or goal else 0

        return state, terminal

alpha property

Returns:

Type Description

torch.Tensor: Current value of the entropy temperature alpha.

act(obs, sample=False)

Generate an action from the actor network.

Parameters:

Name Type Description Default
obs ndarray

Input observation.

required
sample bool

If True, sample from the policy; otherwise use the mean.

False

Returns:

Type Description

np.ndarray: Action vector.

Source code in robot_nav/models/SAC/SAC.py
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
def act(self, obs, sample=False):
    """
    Generate an action from the actor network.

    Args:
        obs (np.ndarray): Input observation.
        sample (bool): If True, sample from the policy; otherwise use the mean.

    Returns:
        np.ndarray: Action vector.
    """
    obs = torch.FloatTensor(obs).to(self.device)
    obs = obs.unsqueeze(0)
    dist = self.actor(obs)
    action = dist.sample() if sample else dist.mean
    action = action.clamp(*self.action_range)
    assert action.ndim == 2 and action.shape[0] == 1
    return utils.to_np(action[0])

get_action(obs, add_noise)

Select an action given an observation.

Parameters:

Name Type Description Default
obs ndarray

Input observation.

required
add_noise bool

Whether to add exploration noise.

required

Returns:

Type Description

np.ndarray: Action vector.

Source code in robot_nav/models/SAC/SAC.py
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
def get_action(self, obs, add_noise):
    """
    Select an action given an observation.

    Args:
        obs (np.ndarray): Input observation.
        add_noise (bool): Whether to add exploration noise.

    Returns:
        np.ndarray: Action vector.
    """
    if add_noise:
        return (
            self.act(obs) + np.random.normal(0, 0.2, size=self.action_dim)
        ).clip(self.action_range[0], self.action_range[1])
    else:
        return self.act(obs)

load(filename, directory)

Load the actor, critic, and target critic models from the specified directory.

Parameters:

Name Type Description Default
filename str

Base name of the saved files.

required
directory Path

Directory where models are loaded from.

required
Source code in robot_nav/models/SAC/SAC.py
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
def load(self, filename, directory):
    """
    Load the actor, critic, and target critic models from the specified directory.

    Args:
        filename (str): Base name of the saved files.
        directory (Path): Directory where models are loaded from.
    """
    self.actor.load_state_dict(
        torch.load("%s/%s_actor.pth" % (directory, filename))
    )
    self.critic.load_state_dict(
        torch.load("%s/%s_critic.pth" % (directory, filename))
    )
    self.critic_target.load_state_dict(
        torch.load("%s/%s_critic_target.pth" % (directory, filename))
    )
    print(f"Loaded weights from: {directory}")

prepare_state(latest_scan, distance, cos, sin, collision, goal, action)

Convert raw sensor input into a normalized state vector.

Parameters:

Name Type Description Default
latest_scan list or ndarray

Laser scan distances.

required
distance float

Distance to goal.

required
cos float

Cosine of heading angle to goal.

required
sin float

Sine of heading angle to goal.

required
collision bool

Whether the robot has collided.

required
goal bool

Whether the goal has been reached.

required
action list

Last action taken [linear_vel, angular_vel].

required

Returns:

Name Type Description
tuple

(state vector as list, terminal flag as int)

Source code in robot_nav/models/SAC/SAC.py
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
def prepare_state(self, latest_scan, distance, cos, sin, collision, goal, action):
    """
    Convert raw sensor input into a normalized state vector.

    Args:
        latest_scan (list or np.ndarray): Laser scan distances.
        distance (float): Distance to goal.
        cos (float): Cosine of heading angle to goal.
        sin (float): Sine of heading angle to goal.
        collision (bool): Whether the robot has collided.
        goal (bool): Whether the goal has been reached.
        action (list): Last action taken [linear_vel, angular_vel].

    Returns:
        tuple: (state vector as list, terminal flag as int)
    """
    latest_scan = np.array(latest_scan)

    inf_mask = np.isinf(latest_scan)
    latest_scan[inf_mask] = 7.0

    max_bins = self.state_dim - 5
    bin_size = int(np.ceil(len(latest_scan) / max_bins))

    # Initialize the list to store the minimum values of each bin
    min_values = []

    # Loop through the data and create bins
    for i in range(0, len(latest_scan), bin_size):
        # Get the current bin
        bin = latest_scan[i : i + min(bin_size, len(latest_scan) - i)]
        # Find the minimum value in the current bin and append it to the min_values list
        min_values.append(min(bin) / 7)

    # Normalize to [0, 1] range
    distance /= 10
    lin_vel = action[0] * 2
    ang_vel = (action[1] + 1) / 2
    state = min_values + [distance, cos, sin] + [lin_vel, ang_vel]

    assert len(state) == self.state_dim
    terminal = 1 if collision or goal else 0

    return state, terminal

save(filename, directory)

Save the actor, critic, and target critic models to the specified directory.

Parameters:

Name Type Description Default
filename str

Base name of the saved files.

required
directory Path

Directory where models are saved.

required
Source code in robot_nav/models/SAC/SAC.py
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
def save(self, filename, directory):
    """
    Save the actor, critic, and target critic models to the specified directory.

    Args:
        filename (str): Base name of the saved files.
        directory (Path): Directory where models are saved.
    """
    Path(directory).mkdir(parents=True, exist_ok=True)
    torch.save(self.actor.state_dict(), "%s/%s_actor.pth" % (directory, filename))
    torch.save(self.critic.state_dict(), "%s/%s_critic.pth" % (directory, filename))
    torch.save(
        self.critic_target.state_dict(),
        "%s/%s_critic_target.pth" % (directory, filename),
    )

train(replay_buffer, iterations, batch_size)

Run multiple training updates using data from the replay buffer.

Parameters:

Name Type Description Default
replay_buffer

Buffer from which to sample training data.

required
iterations int

Number of training iterations to run.

required
batch_size int

Batch size for each update.

required
Source code in robot_nav/models/SAC/SAC.py
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
def train(self, replay_buffer, iterations, batch_size):
    """
    Run multiple training updates using data from the replay buffer.

    Args:
        replay_buffer: Buffer from which to sample training data.
        iterations (int): Number of training iterations to run.
        batch_size (int): Batch size for each update.
    """
    for _ in range(iterations):
        self.update(
            replay_buffer=replay_buffer, step=self.step, batch_size=batch_size
        )

    for key, value in self.train_metrics_dict.items():
        if len(value):
            self.writer.add_scalar(key, mean(value), self.step)
        self.train_metrics_dict[key] = []
    self.step += 1

    if self.save_every > 0 and self.step % self.save_every == 0:
        self.save(filename=self.model_name, directory=self.save_directory)

update(replay_buffer, step, batch_size)

Perform a full update step (critic, actor, alpha, target critic).

Parameters:

Name Type Description Default
replay_buffer

Buffer to sample from.

required
step int

Current training step.

required
batch_size int

Size of sample batch.

required
Source code in robot_nav/models/SAC/SAC.py
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
def update(self, replay_buffer, step, batch_size):
    """
    Perform a full update step (critic, actor, alpha, target critic).

    Args:
        replay_buffer: Buffer to sample from.
        step (int): Current training step.
        batch_size (int): Size of sample batch.
    """
    (
        batch_states,
        batch_actions,
        batch_rewards,
        batch_dones,
        batch_next_states,
    ) = replay_buffer.sample_batch(batch_size)

    state = torch.Tensor(batch_states).to(self.device)
    next_state = torch.Tensor(batch_next_states).to(self.device)
    action = torch.Tensor(batch_actions).to(self.device)
    reward = torch.Tensor(batch_rewards).to(self.device)
    done = torch.Tensor(batch_dones).to(self.device)
    self.train_metrics_dict["train/batch_reward_av"].append(
        batch_rewards.mean().item()
    )
    self.writer.add_scalar("train/batch_reward", batch_rewards.mean(), step)

    self.update_critic(state, action, reward, next_state, done, step)

    if step % self.actor_update_frequency == 0:
        self.update_actor_and_alpha(state, step)

    if step % self.critic_target_update_frequency == 0:
        utils.soft_update_params(self.critic, self.critic_target, self.critic_tau)

update_actor_and_alpha(obs, step)

Update the actor and optionally the entropy temperature.

Parameters:

Name Type Description Default
obs Tensor

Batch of observations.

required
step int

Current training step (for logging).

required
Source code in robot_nav/models/SAC/SAC.py
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
def update_actor_and_alpha(self, obs, step):
    """
    Update the actor and optionally the entropy temperature.

    Args:
        obs (torch.Tensor): Batch of observations.
        step (int): Current training step (for logging).
    """
    dist = self.actor(obs)
    action = dist.rsample()
    log_prob = dist.log_prob(action).sum(-1, keepdim=True)
    actor_Q1, actor_Q2 = self.critic(obs, action)

    actor_Q = torch.min(actor_Q1, actor_Q2)
    actor_loss = (self.alpha.detach() * log_prob - actor_Q).mean()
    self.train_metrics_dict["train_actor/loss_av"].append(actor_loss.item())
    self.train_metrics_dict["train_actor/target_entropy_av"].append(
        self.target_entropy
    )
    self.train_metrics_dict["train_actor/entropy_av"].append(
        -log_prob.mean().item()
    )
    self.writer.add_scalar("train_actor/loss", actor_loss, step)
    self.writer.add_scalar("train_actor/target_entropy", self.target_entropy, step)
    self.writer.add_scalar("train_actor/entropy", -log_prob.mean(), step)

    # optimize the actor
    self.actor_optimizer.zero_grad()
    actor_loss.backward()
    self.actor_optimizer.step()
    if self.log_dist_and_hist:
        self.actor.log(self.writer, step)

    if self.learnable_temperature:
        self.log_alpha_optimizer.zero_grad()
        alpha_loss = (
            self.alpha * (-log_prob - self.target_entropy).detach()
        ).mean()
        self.train_metrics_dict["train_alpha/loss_av"].append(alpha_loss.item())
        self.train_metrics_dict["train_alpha/value_av"].append(self.alpha.item())
        self.writer.add_scalar("train_alpha/loss", alpha_loss, step)
        self.writer.add_scalar("train_alpha/value", self.alpha, step)
        alpha_loss.backward()
        self.log_alpha_optimizer.step()

update_critic(obs, action, reward, next_obs, done, step)

Update the critic network based on a batch of transitions.

Parameters:

Name Type Description Default
obs Tensor

Batch of current observations.

required
action Tensor

Batch of actions taken.

required
reward Tensor

Batch of received rewards.

required
next_obs Tensor

Batch of next observations.

required
done Tensor

Batch of done flags.

required
step int

Current training step (for logging).

required
Source code in robot_nav/models/SAC/SAC.py
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
def update_critic(self, obs, action, reward, next_obs, done, step):
    """
    Update the critic network based on a batch of transitions.

    Args:
        obs (torch.Tensor): Batch of current observations.
        action (torch.Tensor): Batch of actions taken.
        reward (torch.Tensor): Batch of received rewards.
        next_obs (torch.Tensor): Batch of next observations.
        done (torch.Tensor): Batch of done flags.
        step (int): Current training step (for logging).
    """
    dist = self.actor(next_obs)
    next_action = dist.rsample()
    log_prob = dist.log_prob(next_action).sum(-1, keepdim=True)
    target_Q1, target_Q2 = self.critic_target(next_obs, next_action)
    target_V = torch.min(target_Q1, target_Q2) - self.alpha.detach() * log_prob
    target_Q = reward + ((1 - done) * self.discount * target_V)
    target_Q = target_Q.detach()

    # get current Q estimates
    current_Q1, current_Q2 = self.critic(obs, action)
    critic_loss = F.mse_loss(current_Q1, target_Q) + F.mse_loss(
        current_Q2, target_Q
    )
    self.train_metrics_dict["train_critic/loss_av"].append(critic_loss.item())
    self.writer.add_scalar("train_critic/loss", critic_loss, step)

    # Optimize the critic
    self.critic_optimizer.zero_grad()
    critic_loss.backward()
    self.critic_optimizer.step()
    if self.log_dist_and_hist:
        self.critic.log(self.writer, step)

robot_nav.models.SAC.SAC_actor

DiagGaussianActor

Bases: Module

Diagonal Gaussian policy network with tanh squashing.

This network outputs a squashed Gaussian distribution given an observation, suitable for continuous control tasks.

Parameters:

Name Type Description Default
obs_dim int

Dimension of the observation space.

required
action_dim int

Dimension of the action space.

required
hidden_dim int

Number of units in hidden layers.

required
hidden_depth int

Number of hidden layers.

required
log_std_bounds list

Min and max bounds for log standard deviation.

required
Source code in robot_nav/models/SAC/SAC_actor.py
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
class DiagGaussianActor(nn.Module):
    """
    Diagonal Gaussian policy network with tanh squashing.

    This network outputs a squashed Gaussian distribution given an observation,
    suitable for continuous control tasks.

    Args:
        obs_dim (int): Dimension of the observation space.
        action_dim (int): Dimension of the action space.
        hidden_dim (int): Number of units in hidden layers.
        hidden_depth (int): Number of hidden layers.
        log_std_bounds (list): Min and max bounds for log standard deviation.
    """

    def __init__(self, obs_dim, action_dim, hidden_dim, hidden_depth, log_std_bounds):
        """
        Initialize the actor network.
        """
        super().__init__()

        self.log_std_bounds = log_std_bounds
        self.trunk = utils.mlp(obs_dim, hidden_dim, 2 * action_dim, hidden_depth)

        self.outputs = dict()
        self.apply(utils.weight_init)

    def forward(self, obs):
        """
        Forward pass through the network.

        Args:
            obs (Tensor): Observation input.

        Returns:
            SquashedNormal: Action distribution with mean and std tracked in `self.outputs`.
        """
        mu, log_std = self.trunk(obs).chunk(2, dim=-1)

        # constrain log_std inside [log_std_min, log_std_max]
        log_std = torch.tanh(log_std)
        log_std_min, log_std_max = self.log_std_bounds
        log_std = log_std_min + 0.5 * (log_std_max - log_std_min) * (log_std + 1)

        std = log_std.exp()

        self.outputs["mu"] = mu
        self.outputs["std"] = std

        dist = SquashedNormal(mu, std)
        return dist

    def log(self, writer, step):
        """
        Log network outputs (mu and std histograms) to TensorBoard.

        Args:
            writer (SummaryWriter): TensorBoard writer instance.
            step (int): Current global training step.
        """
        for k, v in self.outputs.items():
            writer.add_histogram(f"train_actor/{k}_hist", v, step)

__init__(obs_dim, action_dim, hidden_dim, hidden_depth, log_std_bounds)

Initialize the actor network.

Source code in robot_nav/models/SAC/SAC_actor.py
153
154
155
156
157
158
159
160
161
162
163
def __init__(self, obs_dim, action_dim, hidden_dim, hidden_depth, log_std_bounds):
    """
    Initialize the actor network.
    """
    super().__init__()

    self.log_std_bounds = log_std_bounds
    self.trunk = utils.mlp(obs_dim, hidden_dim, 2 * action_dim, hidden_depth)

    self.outputs = dict()
    self.apply(utils.weight_init)

forward(obs)

Forward pass through the network.

Parameters:

Name Type Description Default
obs Tensor

Observation input.

required

Returns:

Name Type Description
SquashedNormal

Action distribution with mean and std tracked in self.outputs.

Source code in robot_nav/models/SAC/SAC_actor.py
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
def forward(self, obs):
    """
    Forward pass through the network.

    Args:
        obs (Tensor): Observation input.

    Returns:
        SquashedNormal: Action distribution with mean and std tracked in `self.outputs`.
    """
    mu, log_std = self.trunk(obs).chunk(2, dim=-1)

    # constrain log_std inside [log_std_min, log_std_max]
    log_std = torch.tanh(log_std)
    log_std_min, log_std_max = self.log_std_bounds
    log_std = log_std_min + 0.5 * (log_std_max - log_std_min) * (log_std + 1)

    std = log_std.exp()

    self.outputs["mu"] = mu
    self.outputs["std"] = std

    dist = SquashedNormal(mu, std)
    return dist

log(writer, step)

Log network outputs (mu and std histograms) to TensorBoard.

Parameters:

Name Type Description Default
writer SummaryWriter

TensorBoard writer instance.

required
step int

Current global training step.

required
Source code in robot_nav/models/SAC/SAC_actor.py
190
191
192
193
194
195
196
197
198
199
def log(self, writer, step):
    """
    Log network outputs (mu and std histograms) to TensorBoard.

    Args:
        writer (SummaryWriter): TensorBoard writer instance.
        step (int): Current global training step.
    """
    for k, v in self.outputs.items():
        writer.add_histogram(f"train_actor/{k}_hist", v, step)

SquashedNormal

Bases: TransformedDistribution

A squashed (tanh-transformed) diagonal Gaussian distribution.

This is used for stochastic policies where actions must be within bounded intervals.

Source code in robot_nav/models/SAC/SAC_actor.py
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
class SquashedNormal(pyd.transformed_distribution.TransformedDistribution):
    """
    A squashed (tanh-transformed) diagonal Gaussian distribution.

    This is used for stochastic policies where actions must be within bounded intervals.
    """

    def __init__(self, loc, scale):
        """
        Initialize the squashed normal distribution.

        Args:
            loc (Tensor): Mean of the Gaussian.
            scale (Tensor): Standard deviation of the Gaussian.
        """
        self.loc = loc
        self.scale = scale

        self.base_dist = pyd.Normal(loc, scale)
        transforms = [TanhTransform()]
        super().__init__(self.base_dist, transforms)

    @property
    def mean(self):
        """
        Compute the mean of the transformed distribution.

        Returns:
            Tensor: Mean of the squashed distribution.
        """
        mu = self.loc
        for tr in self.transforms:
            mu = tr(mu)
        return mu

mean property

Compute the mean of the transformed distribution.

Returns:

Name Type Description
Tensor

Mean of the squashed distribution.

__init__(loc, scale)

Initialize the squashed normal distribution.

Parameters:

Name Type Description Default
loc Tensor

Mean of the Gaussian.

required
scale Tensor

Standard deviation of the Gaussian.

required
Source code in robot_nav/models/SAC/SAC_actor.py
109
110
111
112
113
114
115
116
117
118
119
120
121
122
def __init__(self, loc, scale):
    """
    Initialize the squashed normal distribution.

    Args:
        loc (Tensor): Mean of the Gaussian.
        scale (Tensor): Standard deviation of the Gaussian.
    """
    self.loc = loc
    self.scale = scale

    self.base_dist = pyd.Normal(loc, scale)
    transforms = [TanhTransform()]
    super().__init__(self.base_dist, transforms)

TanhTransform

Bases: Transform

A bijective transformation that applies the hyperbolic tangent function.

This is used to squash the output of a normal distribution to be within [-1, 1], making it suitable for bounded continuous action spaces.

Attributes:

Name Type Description
domain

The input domain (real numbers).

codomain

The output codomain (interval between -1 and 1).

bijective

Whether the transform is bijective (True).

sign

The sign of the Jacobian determinant (positive).

Source code in robot_nav/models/SAC/SAC_actor.py
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
class TanhTransform(pyd.transforms.Transform):
    """
    A bijective transformation that applies the hyperbolic tangent function.

    This is used to squash the output of a normal distribution to be within [-1, 1],
    making it suitable for bounded continuous action spaces.

    Attributes:
        domain: The input domain (real numbers).
        codomain: The output codomain (interval between -1 and 1).
        bijective: Whether the transform is bijective (True).
        sign: The sign of the Jacobian determinant (positive).
    """

    domain = pyd.constraints.real
    codomain = pyd.constraints.interval(-1.0, 1.0)
    bijective = True
    sign = +1

    def __init__(self, cache_size=1):
        """
        Initialize the TanhTransform.

        Args:
            cache_size (int): Size of the cache for storing intermediate values.
        """
        super().__init__(cache_size=cache_size)

    @staticmethod
    def atanh(x):
        """
        Inverse hyperbolic tangent function.

        Args:
            x (Tensor): Input tensor.

        Returns:
            Tensor: atanh(x)
        """
        return 0.5 * (x.log1p() - (-x).log1p())

    def __eq__(self, other):
        """
        Equality check for the transform.

        Returns:
            bool: True if the other object is also a TanhTransform.
        """
        return isinstance(other, TanhTransform)

    def _call(self, x):
        """
        Forward transformation.

        Args:
            x (Tensor): Input tensor.

        Returns:
            Tensor: tanh(x)
        """
        return x.tanh()

    def _inverse(self, y):
        """
        Inverse transformation.

        Args:
            y (Tensor): Input tensor in [-1, 1].

        Returns:
            Tensor: atanh(y)
        """
        # We do not clamp to the boundary here as it may degrade the performance of certain algorithms.
        # one should use `cache_size=1` instead
        return self.atanh(y)

    def log_abs_det_jacobian(self, x, y):
        """
        Log absolute determinant of the Jacobian of the transformation.

        Args:
            x (Tensor): Input tensor.
            y (Tensor): Output tensor.

        Returns:
            Tensor: log|det(Jacobian)|
        """
        # We use a formula that is more numerically stable, see details in the following link
        # https://github.com/tensorflow/probability/commit/ef6bb176e0ebd1cf6e25c6b5cecdd2428c22963f#diff-e120f70e92e6741bca649f04fcd907b7
        return 2.0 * (math.log(2.0) - x - F.softplus(-2.0 * x))

__eq__(other)

Equality check for the transform.

Returns:

Name Type Description
bool

True if the other object is also a TanhTransform.

Source code in robot_nav/models/SAC/SAC_actor.py
51
52
53
54
55
56
57
58
def __eq__(self, other):
    """
    Equality check for the transform.

    Returns:
        bool: True if the other object is also a TanhTransform.
    """
    return isinstance(other, TanhTransform)

__init__(cache_size=1)

Initialize the TanhTransform.

Parameters:

Name Type Description Default
cache_size int

Size of the cache for storing intermediate values.

1
Source code in robot_nav/models/SAC/SAC_actor.py
29
30
31
32
33
34
35
36
def __init__(self, cache_size=1):
    """
    Initialize the TanhTransform.

    Args:
        cache_size (int): Size of the cache for storing intermediate values.
    """
    super().__init__(cache_size=cache_size)

atanh(x) staticmethod

Inverse hyperbolic tangent function.

Parameters:

Name Type Description Default
x Tensor

Input tensor.

required

Returns:

Name Type Description
Tensor

atanh(x)

Source code in robot_nav/models/SAC/SAC_actor.py
38
39
40
41
42
43
44
45
46
47
48
49
@staticmethod
def atanh(x):
    """
    Inverse hyperbolic tangent function.

    Args:
        x (Tensor): Input tensor.

    Returns:
        Tensor: atanh(x)
    """
    return 0.5 * (x.log1p() - (-x).log1p())

log_abs_det_jacobian(x, y)

Log absolute determinant of the Jacobian of the transformation.

Parameters:

Name Type Description Default
x Tensor

Input tensor.

required
y Tensor

Output tensor.

required

Returns:

Name Type Description
Tensor

log|det(Jacobian)|

Source code in robot_nav/models/SAC/SAC_actor.py
86
87
88
89
90
91
92
93
94
95
96
97
98
99
def log_abs_det_jacobian(self, x, y):
    """
    Log absolute determinant of the Jacobian of the transformation.

    Args:
        x (Tensor): Input tensor.
        y (Tensor): Output tensor.

    Returns:
        Tensor: log|det(Jacobian)|
    """
    # We use a formula that is more numerically stable, see details in the following link
    # https://github.com/tensorflow/probability/commit/ef6bb176e0ebd1cf6e25c6b5cecdd2428c22963f#diff-e120f70e92e6741bca649f04fcd907b7
    return 2.0 * (math.log(2.0) - x - F.softplus(-2.0 * x))

robot_nav.models.SAC.SAC_critic

DoubleQCritic

Bases: Module

Double Q-learning critic network.

Implements two independent Q-functions (Q1 and Q2) to mitigate overestimation bias in value estimates, as introduced in the Twin Delayed Deep Deterministic Policy Gradient (TD3) and Soft Actor-Critic (SAC) algorithms.

Parameters:

Name Type Description Default
obs_dim int

Dimension of the observation space.

required
action_dim int

Dimension of the action space.

required
hidden_dim int

Number of units in each hidden layer.

required
hidden_depth int

Number of hidden layers.

required
Source code in robot_nav/models/SAC/SAC_critic.py
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
class DoubleQCritic(nn.Module):
    """
    Double Q-learning critic network.

    Implements two independent Q-functions (Q1 and Q2) to mitigate overestimation bias in value estimates,
    as introduced in the Twin Delayed Deep Deterministic Policy Gradient (TD3) and Soft Actor-Critic (SAC) algorithms.

    Args:
        obs_dim (int): Dimension of the observation space.
        action_dim (int): Dimension of the action space.
        hidden_dim (int): Number of units in each hidden layer.
        hidden_depth (int): Number of hidden layers.
    """

    def __init__(self, obs_dim, action_dim, hidden_dim, hidden_depth):
        """
        Initialize the Double Q-critic network with two MLPs.

        Q1 and Q2 share the same architecture but have separate weights.
        """
        super().__init__()

        self.Q1 = utils.mlp(obs_dim + action_dim, hidden_dim, 1, hidden_depth)
        self.Q2 = utils.mlp(obs_dim + action_dim, hidden_dim, 1, hidden_depth)

        self.outputs = dict()
        self.apply(utils.weight_init)

    def forward(self, obs, action):
        """
        Compute Q-values for the given observation-action pairs.

        Args:
            obs (Tensor): Observations of shape (batch_size, obs_dim).
            action (Tensor): Actions of shape (batch_size, action_dim).

        Returns:
            Tuple[Tensor, Tensor]: Q1 and Q2 values, each of shape (batch_size, 1).
        """
        assert obs.size(0) == action.size(0)

        obs_action = torch.cat([obs, action], dim=-1)
        q1 = self.Q1(obs_action)
        q2 = self.Q2(obs_action)

        self.outputs["q1"] = q1
        self.outputs["q2"] = q2

        return q1, q2

    def log(self, writer, step):
        """
        Log histograms of Q-value distributions to TensorBoard.

        Args:
            writer (SummaryWriter): TensorBoard writer instance.
            step (int): Current training step (global).
        """
        for k, v in self.outputs.items():
            writer.add_histogram(f"train_critic/{k}_hist", v, step)

__init__(obs_dim, action_dim, hidden_dim, hidden_depth)

Initialize the Double Q-critic network with two MLPs.

Q1 and Q2 share the same architecture but have separate weights.

Source code in robot_nav/models/SAC/SAC_critic.py
21
22
23
24
25
26
27
28
29
30
31
32
33
def __init__(self, obs_dim, action_dim, hidden_dim, hidden_depth):
    """
    Initialize the Double Q-critic network with two MLPs.

    Q1 and Q2 share the same architecture but have separate weights.
    """
    super().__init__()

    self.Q1 = utils.mlp(obs_dim + action_dim, hidden_dim, 1, hidden_depth)
    self.Q2 = utils.mlp(obs_dim + action_dim, hidden_dim, 1, hidden_depth)

    self.outputs = dict()
    self.apply(utils.weight_init)

forward(obs, action)

Compute Q-values for the given observation-action pairs.

Parameters:

Name Type Description Default
obs Tensor

Observations of shape (batch_size, obs_dim).

required
action Tensor

Actions of shape (batch_size, action_dim).

required

Returns:

Type Description

Tuple[Tensor, Tensor]: Q1 and Q2 values, each of shape (batch_size, 1).

Source code in robot_nav/models/SAC/SAC_critic.py
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def forward(self, obs, action):
    """
    Compute Q-values for the given observation-action pairs.

    Args:
        obs (Tensor): Observations of shape (batch_size, obs_dim).
        action (Tensor): Actions of shape (batch_size, action_dim).

    Returns:
        Tuple[Tensor, Tensor]: Q1 and Q2 values, each of shape (batch_size, 1).
    """
    assert obs.size(0) == action.size(0)

    obs_action = torch.cat([obs, action], dim=-1)
    q1 = self.Q1(obs_action)
    q2 = self.Q2(obs_action)

    self.outputs["q1"] = q1
    self.outputs["q2"] = q2

    return q1, q2

log(writer, step)

Log histograms of Q-value distributions to TensorBoard.

Parameters:

Name Type Description Default
writer SummaryWriter

TensorBoard writer instance.

required
step int

Current training step (global).

required
Source code in robot_nav/models/SAC/SAC_critic.py
57
58
59
60
61
62
63
64
65
66
def log(self, writer, step):
    """
    Log histograms of Q-value distributions to TensorBoard.

    Args:
        writer (SummaryWriter): TensorBoard writer instance.
        step (int): Current training step (global).
    """
    for k, v in self.outputs.items():
        writer.add_histogram(f"train_critic/{k}_hist", v, step)

robot_nav.models.SAC.SAC_utils

MLP

Bases: Module

Multi-layer perceptron (MLP) with configurable depth and optional output activation.

Parameters:

Name Type Description Default
input_dim int

Number of input features.

required
hidden_dim int

Number of hidden units in each hidden layer.

required
output_dim int

Number of output features.

required
hidden_depth int

Number of hidden layers.

required
output_mod Module

Optional output activation module (e.g., Tanh, Sigmoid).

None
Source code in robot_nav/models/SAC/SAC_utils.py
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
class MLP(nn.Module):
    """
    Multi-layer perceptron (MLP) with configurable depth and optional output activation.

    Args:
        input_dim (int): Number of input features.
        hidden_dim (int): Number of hidden units in each hidden layer.
        output_dim (int): Number of output features.
        hidden_depth (int): Number of hidden layers.
        output_mod (nn.Module, optional): Optional output activation module (e.g., Tanh, Sigmoid).
    """

    def __init__(
        self, input_dim, hidden_dim, output_dim, hidden_depth, output_mod=None
    ):
        super().__init__()
        self.trunk = mlp(input_dim, hidden_dim, output_dim, hidden_depth, output_mod)
        self.apply(weight_init)

    def forward(self, x):
        """
        Forward pass through the MLP.

        Args:
            x (Tensor): Input tensor of shape (batch_size, input_dim).

        Returns:
            Tensor: Output tensor of shape (batch_size, output_dim).
        """
        return self.trunk(x)

forward(x)

Forward pass through the MLP.

Parameters:

Name Type Description Default
x Tensor

Input tensor of shape (batch_size, input_dim).

required

Returns:

Name Type Description
Tensor

Output tensor of shape (batch_size, output_dim).

Source code in robot_nav/models/SAC/SAC_utils.py
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
def forward(self, x):
    """
    Forward pass through the MLP.

    Args:
        x (Tensor): Input tensor of shape (batch_size, input_dim).

    Returns:
        Tensor: Output tensor of shape (batch_size, output_dim).
    """
    return self.trunk(x)

make_dir(*path_parts)

Create a directory if it does not exist.

Parameters:

Name Type Description Default
*path_parts str

Components of the path to be joined into the directory.

()

Returns:

Name Type Description
str

The full path of the created or existing directory.

Source code in robot_nav/models/SAC/SAC_utils.py
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
def make_dir(*path_parts):
    """
    Create a directory if it does not exist.

    Args:
        *path_parts (str): Components of the path to be joined into the directory.

    Returns:
        str: The full path of the created or existing directory.
    """
    dir_path = os.path.join(*path_parts)
    try:
        os.mkdir(dir_path)
    except OSError:
        pass
    return dir_path

mlp(input_dim, hidden_dim, output_dim, hidden_depth, output_mod=None)

Create an MLP as a nn.Sequential module.

Parameters:

Name Type Description Default
input_dim int

Input feature dimension.

required
hidden_dim int

Hidden layer size.

required
output_dim int

Output feature dimension.

required
hidden_depth int

Number of hidden layers.

required
output_mod Module

Output activation module.

None

Returns:

Type Description

nn.Sequential: The constructed MLP.

Source code in robot_nav/models/SAC/SAC_utils.py
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
def mlp(input_dim, hidden_dim, output_dim, hidden_depth, output_mod=None):
    """
    Create an MLP as a `nn.Sequential` module.

    Args:
        input_dim (int): Input feature dimension.
        hidden_dim (int): Hidden layer size.
        output_dim (int): Output feature dimension.
        hidden_depth (int): Number of hidden layers.
        output_mod (nn.Module, optional): Output activation module.

    Returns:
        nn.Sequential: The constructed MLP.
    """
    if hidden_depth == 0:
        mods = [nn.Linear(input_dim, output_dim)]
    else:
        mods = [nn.Linear(input_dim, hidden_dim), nn.ReLU(inplace=True)]
        for i in range(hidden_depth - 1):
            mods += [nn.Linear(hidden_dim, hidden_dim), nn.ReLU(inplace=True)]
        mods.append(nn.Linear(hidden_dim, output_dim))
    if output_mod is not None:
        mods.append(output_mod)
    trunk = nn.Sequential(*mods)
    return trunk

set_seed_everywhere(seed)

Set random seed for reproducibility across NumPy, random, and PyTorch.

Parameters:

Name Type Description Default
seed int

Random seed.

required
Source code in robot_nav/models/SAC/SAC_utils.py
26
27
28
29
30
31
32
33
34
35
36
37
def set_seed_everywhere(seed):
    """
    Set random seed for reproducibility across NumPy, random, and PyTorch.

    Args:
        seed (int): Random seed.
    """
    torch.manual_seed(seed)
    if torch.cuda.is_available():
        torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)

soft_update_params(net, target_net, tau)

Perform a soft update of the parameters of the target network.

Parameters:

Name Type Description Default
net Module

Source network whose parameters are used for updating.

required
target_net Module

Target network to be updated.

required
tau float

Interpolation parameter (0 < tau < 1) for soft updates. A value closer to 1 means faster updates.

required
Source code in robot_nav/models/SAC/SAC_utils.py
12
13
14
15
16
17
18
19
20
21
22
23
def soft_update_params(net, target_net, tau):
    """
    Perform a soft update of the parameters of the target network.

    Args:
        net (nn.Module): Source network whose parameters are used for updating.
        target_net (nn.Module): Target network to be updated.
        tau (float): Interpolation parameter (0 < tau < 1) for soft updates.
                     A value closer to 1 means faster updates.
    """
    for param, target_param in zip(net.parameters(), target_net.parameters()):
        target_param.data.copy_(tau * param.data + (1 - tau) * target_param.data)

weight_init(m)

Custom weight initialization for layers.

Applies orthogonal initialization to Linear layers and zero initialization to biases.

Parameters:

Name Type Description Default
m Module

Layer to initialize.

required
Source code in robot_nav/models/SAC/SAC_utils.py
58
59
60
61
62
63
64
65
66
67
68
69
70
def weight_init(m):
    """
    Custom weight initialization for layers.

    Applies orthogonal initialization to Linear layers and zero initialization to biases.

    Args:
        m (nn.Module): Layer to initialize.
    """
    if isinstance(m, nn.Linear):
        nn.init.orthogonal_(m.weight.data)
        if hasattr(m.bias, "data"):
            m.bias.data.fill_(0.0)