# Introducing RL environments
Course: Introduction to Reinforcement Learning (INF8250AE)

TA: Ali Saheb Pasand


## 0. Installing Dependencies

In [None]:
!pip install gymnasium stable-baselines3 torch torchvision
!pip install tensorboard imageio imageio-ffmpeg
!pip install huggingface-hub huggingface-sb3
!pip install gymnasium[atari] autorom[accept-rom-license]
!pip install gymnasium[mujoco]
!pip install pettingzoo supersuit

## 1. Classic Control Environments

In [None]:
# https://gymnasium.farama.org/environments/classic_control/
# https://gymnasium.farama.org/environments/box2d/
# https://gymnasium.farama.org/environments/mujoco/

import gymnasium as gym

env = gym.make('CartPole-v1', render_mode='rgb_array')
print(f"Observation space: {env.observation_space}")
print(f"Action space: {env.action_space}")
obs, info = env.reset()
action = env.action_space.sample()
obs, reward, terminated, truncated, info = env.step(action)
env.close()

## 2. Training with Stable-Baselines3

In [None]:
from stable_baselines3 import PPO
from stable_baselines3.common.evaluation import evaluate_policy

env = gym.make('CartPole-v1')
model = PPO('MlpPolicy', env, verbose=1, tensorboard_log='./ppo_cartpole_tb')
model.learn(total_timesteps=100000)
model.save('ppo_cartpole')

mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=10)
print(f"Mean reward: {mean_reward:.2f} +/- {std_reward:.2f}")

## 3. Recording Agent Performance (Before & After Training)

In [None]:
import imageio

def record_video(env_name, model=None, filename='video.mp4', n_episodes=1, fps=30):
    env = gym.make(env_name, render_mode='rgb_array')
    frames = []
    for ep in range(n_episodes):
        obs, info = env.reset()
        done = False
        while not done:
            frames.append(env.render())
            if model is None:
                action = env.action_space.sample()
            else:
                action, _ = model.predict(obs, deterministic=True)
            obs, reward, terminated, truncated, info = env.step(action)
            done = terminated or truncated
    env.close()
    imageio.mimsave(filename, frames, fps=fps)
    print(f'Video saved to {filename}')

# Random agent before training
record_video('CartPole-v1', model=None, filename='cartpole_before.mp4')
# Trained agent after training
model = PPO.load('ppo_cartpole')
record_video('CartPole-v1', model=model, filename='cartpole_after.mp4')

## 4. Atari Environments

In [None]:
# https://ale.farama.org/environments/
from stable_baselines3.common.env_util import make_atari_env
from stable_baselines3.common.vec_env import VecFrameStack
import ale_py #Important
# Create Atari environment
env = make_atari_env('PongNoFrameskip-v4', n_envs=4, seed=0)
env = VecFrameStack(env, n_stack=4)
print(f"Observation shape: {env.observation_space.shape}")

In [None]:
import numpy as np
def evaluate_random_agent(env_name, n_episodes=5):
    env = gym.make(env_name, render_mode='rgb_array')
    rewards = []
    for ep in range(n_episodes):
        obs, info = env.reset()
        done = False
        ep_reward = 0
        while not done:
            action = env.action_space.sample()
            obs, reward, terminated, truncated, info = env.step(action)
            ep_reward += reward
            done = terminated or truncated
        rewards.append(ep_reward)
    env.close()
    print(f"Random agent mean reward: {np.mean(rewards):.2f} +/- {np.std(rewards):.2f}")
    return rewards

random_rewards = evaluate_random_agent('PongNoFrameskip-v4', n_episodes=3)
record_video('PongNoFrameskip-v4', model=None, filename='pong_random.mp4')

In [None]:
!pip install shimmy
from stable_baselines3 import DQN
from huggingface_sb3 import load_from_hub


## https://huggingface.co/models?search=sb3


checkpoint = load_from_hub(
    repo_id='sb3/ppo-PongNoFrameskip-v4',
    filename='ppo-PongNoFrameskip-v4.zip'
)
model = PPO.load(checkpoint)

# # Download checkpoint from HuggingFace
# checkpoint = load_from_hub(
#     repo_id='sb3/dqn-PongNoFrameskip-v4',
#     filename='dqn-PongNoFrameskip-v4.zip'
# )

# # Load the model with custom_objects to fix incompatibility
# model = DQN.load(
#     checkpoint,
#     custom_objects={
#         "learning_rate": 1e-4,
#         "lr_schedule": lambda x: 1e-4,
#         "exploration_schedule": None,
#         "buffer_size": 100_000,
#         "optimize_memory_usage": False  # Fix ReplayBuffer error
#     }
# )

print("Model loaded successfully!")


In [None]:
import gymnasium as gym
import imageio
from stable_baselines3.common.vec_env import VecFrameStack
from stable_baselines3.common.env_util import make_atari_env

def record_atari_video(env_name, model=None, filename='video.mp4', n_episodes=1, fps=30):
    # Create wrapped Atari environment
    env = make_atari_env(env_name, n_envs=1, seed=0)
    env = VecFrameStack(env, n_stack=4)

    frames = []

    for ep in range(n_episodes):
        obs = env.reset()
        done = False
        while not done:
            # Take action
            if model is None:
                action = [env.action_space.sample()]  # VecEnv expects list
            else:

                action, _ = model.predict(obs, deterministic=True)

            obs, reward, terminated, info = env.step(action)
            done = terminated[0] or info[0]['TimeLimit.truncated']  # VecEnv returns list/array
            # Render frame (unwrap to original env for RGB)
            frame = env.render()
            # input(frame.shape)
            frames.append(frame)  # Take first environment frame

    env.close()

    # Save video
    imageio.mimsave(filename, frames, fps=fps)
    print(f'Video saved to {filename}')


mean_reward, std_reward = evaluate_policy(model, env, n_eval_episodes=5)
print(f"HF model reward: {mean_reward:.2f} +/- {std_reward:.2f}")

# Random agent
record_atari_video('PongNoFrameskip-v4', model=None, filename='pong_random.mp4', n_episodes=1)

# Trained agent
record_atari_video('PongNoFrameskip-v4', model=model, filename='pong_trained.mp4', n_episodes=1)


## 5. Pybullet Environments

In [None]:
#
!pip install gymnasium pybullet stable-baselines3 imageio imageio-ffmpeg

import pybullet as p
import gymnasium as gym
import imageio
import numpy as np
from stable_baselines3 import SAC
import pybullet_data

# Create a custom PyBullet environment wrapper for Gymnasium
class AntBulletEnv(gym.Env):
    def __init__(self, render_mode=None):
        super().__init__()

        # Connect to PyBullet
        if render_mode == "human":
            self.client = p.connect(p.GUI)
        else:
            self.client = p.connect(p.DIRECT)

        p.setAdditionalSearchPath(pybullet_data.getDataPath())
        p.setGravity(0, 0, -9.8)

        # Load environment - using 'ant.urdf' which exists in pybullet_data
        self.plane = p.loadURDF("plane.urdf")
        self.robot = p.loadURDF("quadruped/quadruped.urdf", [0, 0, 0.5])

        # Get number of joints
        self.num_joints = p.getNumJoints(self.robot)

        # Define action and observation spaces
        self.action_space = gym.spaces.Box(low=-1, high=1, shape=(self.num_joints,), dtype=np.float32)
        obs_dim = 3 + 3 + self.num_joints * 2  # position + velocity + joint states
        self.observation_space = gym.spaces.Box(low=-np.inf, high=np.inf, shape=(obs_dim,), dtype=np.float32)

        self.max_steps = 1000
        self.current_step = 0

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        p.resetBasePositionAndOrientation(self.robot, [0, 0, 0.5], [0, 0, 0, 1])

        # Reset joint positions
        for i in range(self.num_joints):
            p.resetJointState(self.robot, i, 0)

        self.current_step = 0
        obs = self._get_obs()
        return obs, {}

    def step(self, action):
        # Apply actions to joints
        for i in range(min(len(action), self.num_joints)):
            p.setJointMotorControl2(
                self.robot, i, p.TORQUE_CONTROL, force=action[i] * 50
            )

        p.stepSimulation()
        self.current_step += 1

        obs = self._get_obs()
        reward = self._compute_reward()
        terminated = self._is_terminated()
        truncated = self.current_step >= self.max_steps

        return obs, reward, terminated, truncated, {}

    def _get_obs(self):
        pos, orn = p.getBasePositionAndOrientation(self.robot)
        vel, ang_vel = p.getBaseVelocity(self.robot)

        joint_states = []
        for i in range(self.num_joints):
            js = p.getJointState(self.robot, i)
            joint_states.extend([js[0], js[1]])  # position, velocity

        obs = np.array(list(pos) + list(vel) + joint_states, dtype=np.float32)
        return obs

    def _compute_reward(self):
        vel, _ = p.getBaseVelocity(self.robot)
        pos, _ = p.getBasePositionAndOrientation(self.robot)

        # Reward forward velocity, penalize falling
        forward_reward = vel[0]
        height_penalty = -10 if pos[2] < 0.2 else 0

        return forward_reward + height_penalty

    def _is_terminated(self):
        pos, _ = p.getBasePositionAndOrientation(self.robot)
        return pos[2] < 0.15  # Fallen over

    def close(self):
        p.disconnect(self.client)

# Create and train
print("Creating environment and training agent...")
env = AntBulletEnv()
model = SAC("MlpPolicy", env, verbose=1, learning_rate=3e-4)
model.learn(total_timesteps=50000)
model.save("sac_quadruped")

print("\nGenerating video...")
# --- Headless video recording ---
frames = []
obs, _ = env.reset()
done = False
steps = 0
max_steps = 500

while not done and steps < max_steps:
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, truncated, info = env.step(action)
    done = terminated or truncated
    steps += 1

    # Capture frame every few steps to reduce video size
    if steps % 2 == 0:
        width, height = 640, 480
        view_matrix = p.computeViewMatrixFromYawPitchRoll(
            cameraTargetPosition=[0, 0, 0.3],
            distance=2.5,
            yaw=50,
            pitch=-35,
            roll=0,
            upAxisIndex=2
        )
        proj_matrix = p.computeProjectionMatrixFOV(
            fov=60, aspect=width/height, nearVal=0.1, farVal=100
        )

        img = p.getCameraImage(
            width=width,
            height=height,
            viewMatrix=view_matrix,
            projectionMatrix=proj_matrix,
            renderer=p.ER_TINY_RENDERER
        )
        rgb_array = np.array(img[2], dtype=np.uint8).reshape(height, width, 4)[:, :, :3]
        frames.append(rgb_array)

env.close()

# Save video
if frames:
    video_file = "quadruped_headless.mp4"
    imageio.mimsave(video_file, frames, fps=30)
    print(f"Video saved: {video_file}")
else:
    print("No frames captured!")

## 6. Multi-agent Environments (PettingZoo)

In [None]:
# https://pettingzoo.farama.org/index.html
# https://pettingzoo.farama.org/environments/mpe/simple_spread/

# --- 1. Installation and Setup ---

# Install dependencies
# !pip install stable-baselines3[extra] gymnasium -qqq
# !pip install pettingzoo supersuit moviepy -qqq
# !apt-get install -y ffmpeg xvfb -qqq

import os
import io
import base64
import numpy as np
import gymnasium as gym
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import VecVideoRecorder, DummyVecEnv
from stable_baselines3.common.vec_env import VecNormalize
import supersuit as ss
from IPython import display as ipythondisplay
from pathlib import Path

# Import Simple Spread (MPE) - works without issues!
from pettingzoo.mpe import simple_spread_v3

print("✓ All imports successful!")

# --- Helper function for Colab Video Display ---
def show_videos(video_path='', prefix=''):
    """Plays recorded videos in the Colab output."""
    html = []
    for mp4 in Path(video_path).glob(f"{prefix}*.mp4"):
        video_b64 = base64.b64encode(mp4.read_bytes())
        html.append(f"""<video width="600" controls>
                        <source src="data:video/mp4;base64,{video_b64.decode('ascii')}" type="video/mp4" />
                    </video>""")
    ipythondisplay.display(ipythondisplay.HTML(data="<br>".join(html)))

# --- 2. Environment Setup (Multi-Agent with SB3 Wrapper) ---

def env_creator(render_mode='rgb_array'):
    """Create and wrap the Simple Spread environment"""
    # 1. Create the PettingZoo parallel environment
    env = simple_spread_v3.parallel_env(
        N=3,  # 3 agents
        local_ratio=0.5,
        max_cycles=25,
        continuous_actions=True,
        render_mode=render_mode
    )

    # 2. Apply preprocessing wrappers (skip normalize_obs - causes issues with infinite bounds)
    env = ss.clip_actions_v0(env)
    env = ss.dtype_v0(env, 'float32')

    # 3. Convert to single-agent format for SB3
    env = ss.pettingzoo_env_to_vec_env_v1(env)
    env = ss.concat_vec_envs_v1(env, 1, num_cpus=1, base_class='stable_baselines3')

    return env

# Create video folder
video_folder = "videos/multi_agent_ppo"
os.makedirs(video_folder, exist_ok=True)

print("\n[1/4] Creating environment...")
print("Environment: Simple Spread (3 cooperative agents)")
print("Task: Agents must coordinate to cover 3 landmarks")

# Create the training environment
train_env = env_creator(render_mode='rgb_array')

# Normalize the environment
train_env = VecNormalize(train_env, norm_obs=True, norm_reward=True, clip_obs=10.)

print("✓ Environment setup complete.")
print(f"  Observation space: {train_env.observation_space.shape}")
print(f"  Action space: {train_env.action_space.shape}")

# --- 3. Initialize and Train the PPO Model ---
print("\n[2/4] Initializing PPO model...")
TIMESTEPS = 20000 # 200k steps for Simple Spread

model = PPO(
    "MlpPolicy",  # MLP for vector observations
    train_env,
    verbose=1,
    n_steps=2048,
    batch_size=256,
    n_epochs=10,
    gamma=0.99,
    learning_rate=5e-4,
    clip_range=0.2,
    ent_coef=0.01,
    vf_coef=0.5,
    max_grad_norm=0.5,
    tensorboard_log="./ppo_spread_tensorboard/"
)

print(f"\n[3/4] Starting training for {TIMESTEPS:,} timesteps...")
print("This will take approximately 5-10 minutes...")
model.learn(total_timesteps=TIMESTEPS)
print("✓ Training complete!")

# Save the model
model.save("ppo_simple_spread_trained")
print("✓ Model saved as 'ppo_simple_spread_trained.zip'")

# Save normalization statistics
train_env.save("vec_normalize_spread.pkl")

# --- 4. Evaluate and Record Video ---
print("\n[4/4] Recording evaluation videos...")

# Function to record videos manually (since VecVideoRecorder has issues)
def record_manual_video(model, filename, deterministic=True, num_steps=200):
    """Manually record video by collecting frames"""
    # Create environment for recording
    env = simple_spread_v3.parallel_env(
        N=3,
        local_ratio=0.5,
        max_cycles=25,
        continuous_actions=True,
        render_mode='rgb_array'
    )

    frames = []
    total_reward = 0

    obs, info = env.reset(seed=42)

    for step in range(num_steps):
        # Render frame
        frame = env.render()
        if frame is not None:
            frames.append(frame)

        # Get actions for all agents using the trained model
        actions = {}
        for agent in env.agents:
            if agent in obs:
                # Flatten observation for the model
                obs_flat = obs[agent].flatten().astype(np.float32)

                if model is not None:
                    # Normalize using saved stats
                    obs_normalized = (obs_flat - train_env.obs_rms.mean) / np.sqrt(train_env.obs_rms.var + 1e-8)
                    obs_normalized = np.clip(obs_normalized, -10, 10)
                    action, _ = model.predict(obs_normalized.reshape(1, -1), deterministic=deterministic)
                    actions[agent] = action[0]
                else:
                    # Random action
                    actions[agent] = env.action_space(agent).sample()

        # Step environment
        obs, rewards, terminations, truncations, infos = env.step(actions)
        total_reward += sum(rewards.values())

        # Check if done
        if all(terminations.values()) or all(truncations.values()):
            obs, info = env.reset()

    env.close()

    # Save video
    if frames:
        import imageio
        imageio.mimsave(os.path.join(video_folder, filename), frames, fps=10)
        print(f"✓ Video saved: {filename}")

    return total_reward

# Record trained agent
print("\nRecording trained agent...")
trained_reward = record_manual_video(
    model,
    "spread-ppo-trained.mp4",
    deterministic=True,
    num_steps=200
)
print(f"✓ Trained agent total reward: {trained_reward:.2f}")

# Record random agent
print("\nRecording random agent...")
random_reward = record_manual_video(
    None,  # None = random actions
    "spread-random.mp4",
    deterministic=False,
    num_steps=200
)
print(f"✓ Random agent total reward: {random_reward:.2f}")
train_env.close()

# --- 5. Display Videos in Colab ---
print("\n" + "="*60)
print("TRAINING COMPLETE!")
print("="*60)
print(f"\nTask: 3 agents must coordinate to cover 3 landmarks")
print(f"      while minimizing collisions")
print(f"\nPerformance Comparison:")
print(f"  Trained Agent: {trained_reward:>8.2f} (coordinated coverage)")
print(f"  Random Agent:  {random_reward:>8.2f} (no coordination)")
print(f"  Improvement:   {trained_reward - random_reward:>8.2f}")
print("\nFiles created:")
print(f"  - ppo_simple_spread_trained.zip (model)")
print(f"  - vec_normalize_spread.pkl (normalization stats)")
print(f"  - {video_folder}/spread-ppo-trained.mp4")
print(f"  - {video_folder}/spread-random.mp4")
print("\n" + "="*60)
print("\nDisplaying videos below...")
print("="*60)
show_videos(video_folder, prefix="spread")

print("\n✓ Watch the videos above to see the difference!")
print("  - Random agents move without coordination")
print("  - Trained agents efficiently cover all landmarks")

## 7. Custom Environment


In [None]:
# Smart Building Energy Management with PPO
!pip install stable-baselines3 gymnasium

import gymnasium as gym
from gymnasium import spaces
import numpy as np
from stable_baselines3 import PPO

# -----------------------------
# 1. Define custom environment
# -----------------------------
class SmartBuildingEnv(gym.Env):
    """Simulated smart building temperature control environment"""
    metadata = {'render_modes': ['human'], 'render_fps': 1}

    def __init__(self, render_mode=None):
        super().__init__()
        self.render_mode = render_mode

        # Observation: [indoor_temp, outdoor_temp, HVAC_power]
        self.observation_space = spaces.Box(
            low=np.array([0.0, -10.0, 0.0]),  # min indoor, min outdoor, min HVAC
            high=np.array([40.0, 40.0, 5.0]), # max indoor, max outdoor, max HVAC
            dtype=np.float32
        )

        # Action: change HVAC power (-1 to +1)
        self.action_space = spaces.Box(low=-1.0, high=1.0, shape=(1,), dtype=np.float32)

        # Environment state
        self.indoor_temp = 22.0
        self.outdoor_temp = np.random.uniform(5.0, 35.0)
        self.hvac_power = 0.0
        self.step_count = 0

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.indoor_temp = 22.0
        self.outdoor_temp = np.random.uniform(5.0, 35.0)
        self.hvac_power = 0.0
        self.step_count = 0
        return np.array([self.indoor_temp, self.outdoor_temp, self.hvac_power], dtype=np.float32), {}

    def step(self, action):
        action = np.clip(action[0], -1.0, 1.0)
        # Update HVAC power
        self.hvac_power = np.clip(self.hvac_power + action, 0.0, 5.0)

        # Simple temperature dynamics
        # Indoor temp moves toward outdoor temp plus HVAC effect
        self.indoor_temp += 0.1 * (self.outdoor_temp - self.indoor_temp) + 0.5 * (self.hvac_power - 2.5)

        # Reward: keep indoor temp 22-25°C and minimize HVAC power
        comfort_penalty = -abs(self.indoor_temp - 23.5)  # optimal comfort ~23.5°C
        energy_penalty = -0.1 * self.hvac_power        # penalize energy usage
        reward = comfort_penalty + energy_penalty

        self.step_count += 1
        done = self.step_count >= 200  # episode length
        return np.array([self.indoor_temp, self.outdoor_temp, self.hvac_power], dtype=np.float32), reward, done, False, {}

    def render(self):
        print(f"Step: {self.step_count}, Indoor Temp: {self.indoor_temp:.2f}, HVAC Power: {self.hvac_power:.2f}")

# -----------------------------
# 2. Create environment
# -----------------------------
env = SmartBuildingEnv()

# -----------------------------
# 3. Train PPO agent
# -----------------------------
model = PPO("MlpPolicy", env, verbose=1)
model.learn(total_timesteps=50000)

# -----------------------------
# 4. Test trained agent
# -----------------------------
obs, _ = env.reset()
for _ in range(50):
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, done, _, _ = env.step(action)
    env.render()
    if done:
        break


In [None]:
# ====== Imports ======
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import matplotlib.pyplot as plt
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import DummyVecEnv, VecTransposeImage
from stable_baselines3.common.env_checker import check_env
import cv2
# ====== Custom Warehouse Environment ======
class WarehouseEnv(gym.Env):
    """
    Simple warehouse environment:
    - Grid world where an agent picks up items and delivers to goal
    - Observation is an RGB image of the grid
    """
    metadata = {'render_modes': ['rgb_array'], 'render_fps': 1}

    def __init__(self, grid_size=10, n_items=3, render_mode='rgb_array'):
        super().__init__()
        self.grid_size = grid_size
        self.n_items = n_items
        self.render_mode = render_mode

        # Action space: 0=up, 1=down, 2=left, 3=right
        self.action_space = spaces.Discrete(4)

        # Observation: RGB image of the grid
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(84, 84, 3), dtype=np.uint8)

        self.agent_pos = None
        self.items_pos = None
        self.goal_pos = None
        self.number_of_steps = 0.0

    def reset(self, seed=None, options=None):
        super().reset(seed=seed)
        self.number_of_steps = 0.0
        self.agent_pos = np.array([0, 0])
        # Random item positions
        self.items_pos = [np.random.randint(0, self.grid_size, size=2) for _ in range(self.n_items)]
        # Goal position
        self.goal_pos = np.array([self.grid_size - 1, self.grid_size - 1])
        obs = self._get_obs()
        return obs, {}

    def step(self, action):

      # Move agent
      if action == 0 and self.agent_pos[0] > 0:
          self.agent_pos[0] -= 1
      elif action == 1 and self.agent_pos[0] < self.grid_size - 1:
          self.agent_pos[0] += 1
      elif action == 2 and self.agent_pos[1] > 0:
          self.agent_pos[1] -= 1
      elif action == 3 and self.agent_pos[1] < self.grid_size - 1:
          self.agent_pos[1] += 1

      reward = 0
      # Pick up items
      remaining_items = []
      for item in self.items_pos:
          if np.array_equal(self.agent_pos, item):
              reward += 1  # Collected item
          else:
              remaining_items.append(item)
      self.items_pos = remaining_items  # Keep only items not collected

      # Deliver to goal
      if len(self.items_pos) == 0 and np.array_equal(self.agent_pos, self.goal_pos):
          reward += 10
          terminated = True
      else:
          terminated = False

      obs = self._get_obs()
      if self.number_of_steps >= 1000:
          terminated = True
      self.number_of_steps += 1.0
      truncated = False
      info = {}
      return obs, reward, terminated, truncated, info

    def _get_obs(self):
      obs = np.zeros((self.grid_size, self.grid_size, 3), dtype=np.uint8)
      obs[self.agent_pos[0], self.agent_pos[1]] = [255, 0, 0]  # Agent red
      for item in self.items_pos:
          obs[item[0], item[1]] = [0, 255, 0]  # Item green
      obs[self.goal_pos[0], self.goal_pos[1]] = [0, 0, 255]  # Goal blue
      # Resize to 84x84 for CNN
      obs = cv2.resize(obs, (84, 84), interpolation=cv2.INTER_NEAREST)
      return obs

    def render(self):
        return self._get_obs()

# ====== Check the environment ======
from stable_baselines3.common.monitor import Monitor
env = WarehouseEnv(grid_size=10, n_items=3)
env = Monitor(env)  # Wrap the environment

check_env(env)

# ====== Show Initial State ======
initial_obs = env.reset()[0]
plt.imshow(initial_obs)
plt.title("Initial Warehouse State")
plt.axis('off')
plt.show()

# ====== Wrap Environment for SB3 CNN ======
# Convert to vectorized env and transpose image for CNN
vec_env = DummyVecEnv([lambda: env])
vec_env = VecTransposeImage(vec_env)  # shape (C,H,W) for CNN

# ====== Train PPO Agent ======
model = PPO('CnnPolicy', vec_env, verbose=1, tensorboard_log="./ppo_warehouse_tensorboard/")
model.learn(total_timesteps=5000)




In [None]:
import imageio
import matplotlib.pyplot as plt

# ====== Test Trained Agent ======
obs = vec_env.reset()
done = False
frames = []
counter = 0.0
while True:
    action, _ = model.predict(obs, deterministic=True)
    obs, reward, terminated, info = vec_env.step(action)
    counter += 1.0
    # print(counter)
    # Render environment frame
    frame = vec_env.render()  # get first env's frame
    # print(terminated, info[0].get('TimeLimit.truncated', False))
    done = terminated or info[0].get('TimeLimit.truncated', False)
    if done:
      break
    else:
      frames.append(frame)

# ====== Show last frame ======
plt.imshow(frames[-1])
plt.title("Final Warehouse State After PPO Agent")
plt.axis('off')
plt.show()

# ====== Save video ======
video_filename = "warehouse_ppo_agent.mp4"
imageio.mimsave(video_filename, frames, fps=10)  # adjust fps as needed
print(f"Video saved to {video_filename}")


## Other resources
dm_control:
https://github.com/google-deepmind/dm_control

Robotic arms:
https://github.com/qgallouedec/panda-gym

Minigrid:
https://minigrid.farama.org/

Listed many more environments:

https://github.com/clvrai/awesome-rl-envs

https://github.com/tshrjn/env-zoo