On this tutorial, we discover On-line Course of Reward Studying (OPRL) and reveal how we will study dense, step-level reward alerts from trajectory preferences to unravel sparse-reward reinforcement studying duties. We stroll by means of every element, from the maze surroundings and reward-model community to choice technology, coaching loops, and analysis, whereas observing how the agent regularly improves its behaviour by means of on-line preference-driven shaping. By operating this end-to-end implementation, we achieve a sensible understanding of how OPRL allows higher credit score project, sooner studying, and extra secure coverage optimization in difficult environments the place the agent would in any other case wrestle to find significant rewards. Take a look at the FULL CODE NOTEBOOK.
import numpy as np
import torch
import torch.nn as nn
import torch.nn.practical as F
from torch.optim import Adam
import matplotlib.pyplot as plt
from collections import deque
import random
torch.manual_seed(42)
np.random.seed(42)
random.seed(42)
class MazeEnv:
def __init__(self, measurement=8):
self.measurement = measurement
self.begin = (0, 0)
self.objective = (size-1, size-1)
self.obstacles = set([(i, size//2) for i in range(1, size-2)])
self.reset()
def reset(self):
self.pos = self.begin
self.steps = 0
return self._get_state()
def _get_state(self):
state = np.zeros(self.measurement * self.measurement)
state[self.pos[0] * self.measurement + self.pos[1]] = 1
return state
def step(self, motion):
strikes = [(-1,0), (0,1), (1,0), (0,-1)]
new_pos = (self.pos[0] + strikes[action][0],
self.pos[1] + strikes[action][1])
if (0 <= new_pos[0] < self.measurement and
0 <= new_pos[1] < self.measurement and
new_pos not in self.obstacles):
self.pos = new_pos
self.steps += 1
performed = self.pos == self.objective or self.steps >= 60
reward = 10.0 if self.pos == self.objective else 0.0
return self._get_state(), reward, performed
def render(self):
grid = [['.' for _ in range(self.size)] for _ in vary(self.measurement)]
for obs in self.obstacles:
grid[obs[0]][obs[1]] = '█'
grid[self.goal[0]][self.goal[1]] = 'G'
grid[self.pos[0]][self.pos[1]] = 'A'
return 'n'.be a part of([''.join(row) for row in grid])
class ProcessRewardModel(nn.Module):
def __init__(self, state_dim, hidden=128):
tremendous().__init__()
self.web = nn.Sequential(
nn.Linear(state_dim, hidden),
nn.LayerNorm(hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.LayerNorm(hidden),
nn.ReLU(),
nn.Linear(hidden, 1),
nn.Tanh()
)
def ahead(self, states):
return self.web(states)
def trajectory_reward(self, states):
return self.ahead(states).sum()
class PolicyNetwork(nn.Module):
def __init__(self, state_dim, action_dim, hidden=128):
tremendous().__init__()
self.spine = nn.Sequential(
nn.Linear(state_dim, hidden),
nn.ReLU(),
nn.Linear(hidden, hidden),
nn.ReLU()
)
self.actor = nn.Linear(hidden, action_dim)
self.critic = nn.Linear(hidden, 1)
def ahead(self, state):
options = self.spine(state)
return self.actor(options), self.critic(options)
We arrange your entire basis of our OPRL system by importing libraries, defining the maze surroundings, and constructing the reward and coverage networks. We set up how states are represented, how obstacles block motion, and the way the sparse reward construction works. We additionally design the core neural fashions that can later study course of rewards and drive the coverage’s selections. Take a look at the FULL CODE NOTEBOOK.
class OPRLAgent:
def __init__(self, state_dim, action_dim, lr=3e-4):
self.coverage = PolicyNetwork(state_dim, action_dim)
self.reward_model = ProcessRewardModel(state_dim)
self.policy_opt = Adam(self.coverage.parameters(), lr=lr)
self.reward_opt = Adam(self.reward_model.parameters(), lr=lr)
self.trajectories = deque(maxlen=200)
self.preferences = deque(maxlen=500)
self.action_dim = action_dim
def select_action(self, state, epsilon=0.1):
if random.random() < epsilon:
return random.randint(0, self.action_dim - 1)
state_t = torch.FloatTensor(state).unsqueeze(0)
with torch.no_grad():
logits, _ = self.coverage(state_t)
probs = F.softmax(logits, dim=-1)
return torch.multinomial(probs, 1).merchandise()
def collect_trajectory(self, env, epsilon=0.1):
states, actions, rewards = [], [], []
state = env.reset()
performed = False
whereas not performed:
motion = self.select_action(state, epsilon)
next_state, reward, performed = env.step(motion)
states.append(state)
actions.append(motion)
rewards.append(reward)
state = next_state
traj = {
'states': torch.FloatTensor(np.array(states)),
'actions': torch.LongTensor(actions),
'rewards': torch.FloatTensor(rewards),
'return': float(sum(rewards))
}
self.trajectories.append(traj)
return traj
We start developing the OPRL agent by implementing motion choice and trajectory assortment. We use an ε-greedy technique to make sure exploration and collect sequences of states, actions, and returns. As we run the agent by means of the maze, we retailer total trajectories that can later function choice knowledge for shaping the reward mannequin. Take a look at the FULL CODE NOTEBOOK.
def generate_preference(self):
if len(self.trajectories) < 2:
return
t1, t2 = random.pattern(record(self.trajectories), 2)
label = 1.0 if t1['return'] > t2['return'] else 0.0
self.preferences.append({'t1': t1, 't2': t2, 'label': label})
def train_reward_model(self, n_updates=5):
if len(self.preferences) < 32:
return 0.0
total_loss = 0.0
for _ in vary(n_updates):
batch = random.pattern(record(self.preferences), 32)
loss = 0.0
for merchandise in batch:
r1 = self.reward_model.trajectory_reward(merchandise['t1']['states'])
r2 = self.reward_model.trajectory_reward(merchandise['t2']['states'])
logit = r1 - r2
pred_prob = torch.sigmoid(logit)
label = merchandise['label']
loss += -(label * torch.log(pred_prob + 1e-8) +
(1-label) * torch.log(1 - pred_prob + 1e-8))
loss = loss / len(batch)
self.reward_opt.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.reward_model.parameters(), 1.0)
self.reward_opt.step()
total_loss += loss.merchandise()
return total_loss / n_updates
We generate choice pairs from collected trajectories and practice the method reward mannequin utilizing the Bradley–Terry formulation. We evaluate trajectory-level scores, compute possibilities, and replace the reward mannequin to mirror which behaviours seem higher. This enables us to study dense, differentiable, step-level rewards that information the agent even when the surroundings itself is sparse. Take a look at the FULL CODE NOTEBOOK.
def train_policy(self, n_updates=3, gamma=0.98):
if len(self.trajectories) < 5:
return 0.0
total_loss = 0.0
for _ in vary(n_updates):
traj = random.selection(record(self.trajectories))
with torch.no_grad():
process_rewards = self.reward_model(traj['states']).squeeze()
shaped_rewards = traj['rewards'] + 0.1 * process_rewards
returns = []
G = 0
for r in reversed(shaped_rewards.tolist()):
G = r + gamma * G
returns.insert(0, G)
returns = torch.FloatTensor(returns)
returns = (returns - returns.imply()) / (returns.std() + 1e-8)
logits, values = self.coverage(traj['states'])
log_probs = F.log_softmax(logits, dim=-1)
action_log_probs = log_probs.collect(1, traj['actions'].unsqueeze(1))
benefits = returns - values.squeeze().detach()
policy_loss = -(action_log_probs.squeeze() * benefits).imply()
value_loss = F.mse_loss(values.squeeze(), returns)
entropy = -(F.softmax(logits, dim=-1) * log_probs).sum(-1).imply()
loss = policy_loss + 0.5 * value_loss - 0.01 * entropy
self.policy_opt.zero_grad()
loss.backward()
torch.nn.utils.clip_grad_norm_(self.coverage.parameters(), 1.0)
self.policy_opt.step()
total_loss += loss.merchandise()
return total_loss / n_updates
def train_oprl(episodes=500, render_interval=100):
env = MazeEnv(measurement=8)
agent = OPRLAgent(state_dim=64, action_dim=4, lr=3e-4)
returns, reward_losses, policy_losses = [], [], []
success_rate = []
for ep in vary(episodes):
epsilon = max(0.05, 0.5 - ep / 1000)
traj = agent.collect_trajectory(env, epsilon)
returns.append(traj['return'])
if ep % 2 == 0 and ep > 10:
agent.generate_preference()
if ep > 20 and ep % 2 == 0:
rew_loss = agent.train_reward_model(n_updates=3)
reward_losses.append(rew_loss)
if ep > 10:
pol_loss = agent.train_policy(n_updates=2)
policy_losses.append(pol_loss)
success = 1 if traj['return'] > 5 else 0
success_rate.append(success)
if ep % render_interval == 0 and ep > 0:
test_env = MazeEnv(measurement=8)
agent.collect_trajectory(test_env, epsilon=0)
print(test_env.render())
return returns, reward_losses, policy_losses, success_rate
We practice the coverage utilizing formed rewards produced by the realized course of reward mannequin. We compute returns, benefits, worth estimates, and entropy bonuses, enabling the agent to enhance its technique over time. We then construct a full coaching loop by which exploration decays, preferences accumulate, and each the reward mannequin and the coverage are up to date constantly. Take a look at the FULL CODE NOTEBOOK.
print("Coaching OPRL Agent on Sparse Reward Maze...n")
returns, rew_losses, pol_losses, success = train_oprl(episodes=500, render_interval=250)
fig, axes = plt.subplots(2, 2, figsize=(14, 10))
axes[0,0].plot(returns, alpha=0.3)
axes[0,0].plot(np.convolve(returns, np.ones(20)/20, mode="legitimate"), linewidth=2)
axes[0,0].set_xlabel('Episode')
axes[0,0].set_ylabel('Return')
axes[0,0].set_title('Agent Efficiency')
axes[0,0].grid(alpha=0.3)
success_smooth = np.convolve(success, np.ones(20)/20, mode="legitimate")
axes[0,1].plot(success_smooth, linewidth=2, coloration="inexperienced")
axes[0,1].set_xlabel('Episode')
axes[0,1].set_ylabel('Success Fee')
axes[0,1].set_title('Aim Success Fee')
axes[0,1].grid(alpha=0.3)
axes[1,0].plot(rew_losses, linewidth=2, coloration="orange")
axes[1,0].set_xlabel('Replace Step')
axes[1,0].set_ylabel('Loss')
axes[1,0].set_title('Reward Mannequin Loss')
axes[1,0].grid(alpha=0.3)
axes[1,1].plot(pol_losses, linewidth=2, coloration="purple")
axes[1,1].set_xlabel('Replace Step')
axes[1,1].set_ylabel('Loss')
axes[1,1].set_title('Coverage Loss')
axes[1,1].grid(alpha=0.3)
plt.tight_layout()
plt.present()
print("OPRL Coaching Full!")
print("Course of rewards, choice studying, reward shaping, and on-line updates demonstrated.")
We visualize the educational dynamics by plotting returns, success charges, reward-model loss, and coverage loss. We monitor how the agent’s efficiency evolves as OPRL shapes the reward panorama. By the tip of the visualization, we clearly see the affect of course of rewards on fixing a difficult, sparse-reward maze.
In conclusion, we see how OPRL transforms sparse terminal outcomes into wealthy on-line suggestions that constantly guides the agent’s behaviour. We watch the method reward mannequin study preferences, form the return sign, and speed up the coverage’s potential to succeed in the objective. With bigger mazes, various shaping strengths, and even actual human choice suggestions, we recognize how OPRL offers a versatile and highly effective framework for credit score project in complicated decision-making duties. We end with a transparent, hands-on understanding of how OPRL operates and the way we will prolong it to extra superior agentic RL settings.
Take a look at the FULL CODE NOTEBOOK and Paper. Be at liberty to take a look at our GitHub Page for Tutorials, Codes and Notebooks. Additionally, be happy to comply with us on Twitter and don’t overlook to hitch our 100k+ ML SubReddit and Subscribe to our Newsletter. Wait! are you on telegram? now you can join us on telegram as well.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its recognition amongst audiences.
