On this superior DeepSpeed tutorial, we offer a hands-on walkthrough of cutting-edge optimization strategies for coaching giant language fashions effectively. By combining ZeRO optimization, mixed-precision coaching, gradient accumulation, and superior DeepSpeed configurations, the tutorial demonstrates the right way to maximize GPU reminiscence utilization, cut back coaching overhead, and allow scaling of transformer fashions in resource-constrained environments, resembling Colab. Alongside mannequin creation and coaching, it additionally covers efficiency monitoring, inference optimization, checkpointing, and benchmarking totally different ZeRO levels, offering practitioners with each theoretical insights and sensible code to speed up mannequin improvement. Take a look at theย FULL CODES here.
import subprocess
import sys
import os
import json
import time
from pathlib import Path
def install_dependencies():
"""Set up required packages for DeepSpeed in Colab"""
print("๐ Putting in DeepSpeed and dependencies...")
subprocess.check_call([
sys.executable, "-m", "pip", "install",
"torch", "torchvision", "torchaudio", "--index-url",
"https://download.pytorch.org/whl/cu118"
])
subprocess.check_call([sys.executable, "-m", "pip", "install", "deepspeed"])
subprocess.check_call([
sys.executable, "-m", "pip", "install",
"transformers", "datasets", "accelerate", "wandb"
])
print("โ
Set up full!")
install_dependencies()
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.information import DataLoader, Dataset
import deepspeed
from transformers import GPT2Config, GPT2LMHeadModel, GPT2Tokenizer
import numpy as np
from typing import Dict, Any
import argparse
We arrange our Colab setting by putting in PyTorch with CUDA help, DeepSpeed, and important libraries like Transformers, Datasets, Speed up, and Weights & Biases. We guarantee every thing is prepared so we are able to easily construct and prepare fashions with DeepSpeed. Take a look at theย FULL CODES here.
class SyntheticTextDataset(Dataset):
"""Artificial dataset for demonstration functions"""
def __init__(self, measurement: int = 1000, seq_length: int = 512, vocab_size: int = 50257):
self.measurement = measurement
self.seq_length = seq_length
self.vocab_size = vocab_size
self.information = torch.randint(0, vocab_size, (measurement, seq_length))
def __len__(self):
return self.measurement
def __getitem__(self, idx):
return {
'input_ids': self.information[idx],
'labels': self.information[idx].clone()
}
We create a SyntheticTextDataset the place we generate random token sequences to imitate actual textual content information. We use these sequences as each inputs and labels, permitting us to rapidly check DeepSpeed coaching with out counting on a big exterior dataset. Take a look at theย FULL CODES here.
class AdvancedDeepSpeedTrainer:
"""Superior DeepSpeed coach with a number of optimization strategies"""
def __init__(self, model_config: Dict[str, Any], ds_config: Dict[str, Any]):
self.model_config = model_config
self.ds_config = ds_config
self.mannequin = None
self.engine = None
self.tokenizer = None
def create_model(self):
"""Create a GPT-2 type mannequin for demonstration"""
print("๐ง Creating mannequin...")
config = GPT2Config(
vocab_size=self.model_config['vocab_size'],
n_positions=self.model_config['seq_length'],
n_embd=self.model_config['hidden_size'],
n_layer=self.model_config['num_layers'],
n_head=self.model_config['num_heads'],
resid_pdrop=0.1,
embd_pdrop=0.1,
attn_pdrop=0.1,
)
self.mannequin = GPT2LMHeadModel(config)
self.tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
self.tokenizer.pad_token = self.tokenizer.eos_token
print(f"๐ Mannequin parameters: {sum(p.numel() for p in self.mannequin.parameters()):,}")
return self.mannequin
def create_deepspeed_config(self):
"""Create complete DeepSpeed configuration"""
return {
"train_batch_size": self.ds_config['train_batch_size'],
"train_micro_batch_size_per_gpu": self.ds_config['micro_batch_size'],
"gradient_accumulation_steps": self.ds_config['gradient_accumulation_steps'],
"zero_optimization": {
"stage": self.ds_config['zero_stage'],
"allgather_partitions": True,
"allgather_bucket_size": 5e8,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 5e8,
"contiguous_gradients": True,
"cpu_offload": self.ds_config.get('cpu_offload', False)
},
"fp16": {
"enabled": True,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"optimizer": {
"kind": "AdamW",
"params": {
"lr": self.ds_config['learning_rate'],
"betas": [0.9, 0.999],
"eps": 1e-8,
"weight_decay": 0.01
}
},
"scheduler": {
"kind": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": self.ds_config['learning_rate'],
"warmup_num_steps": 100
}
},
"gradient_clipping": 1.0,
"wall_clock_breakdown": True,
"memory_breakdown": True,
"tensorboard": {
"enabled": True,
"output_path": "./logs/",
"job_name": "deepspeed_advanced_tutorial"
}
}
def initialize_deepspeed(self):
"""Initialize DeepSpeed engine"""
print("โก Initializing DeepSpeed...")
parser = argparse.ArgumentParser()
parser.add_argument('--local_rank', kind=int, default=0)
args = parser.parse_args([])
self.engine, optimizer, _, lr_scheduler = deepspeed.initialize(
args=args,
mannequin=self.mannequin,
config=self.create_deepspeed_config()
)
print(f"๐ฏ DeepSpeed engine initialized with ZeRO stage {self.ds_config['zero_stage']}")
return self.engine
def train_step(self, batch: Dict[str, torch.Tensor]) -> Dict[str, float]:
"""Carry out a single coaching step with DeepSpeed optimizations"""
input_ids = batch['input_ids'].to(self.engine.gadget)
labels = batch['labels'].to(self.engine.gadget)
outputs = self.engine(input_ids=input_ids, labels=labels)
loss = outputs.loss
self.engine.backward(loss)
self.engine.step()
return {
'loss': loss.merchandise(),
'lr': self.engine.lr_scheduler.get_last_lr()[0] if self.engine.lr_scheduler else 0
}
def prepare(self, dataloader: DataLoader, num_epochs: int = 2):
"""Full coaching loop with monitoring"""
print(f"๐๏ธ Beginning coaching for {num_epochs} epochs...")
self.engine.prepare()
total_steps = 0
for epoch in vary(num_epochs):
epoch_loss = 0.0
epoch_steps = 0
print(f"n๐ Epoch {epoch + 1}/{num_epochs}")
for step, batch in enumerate(dataloader):
start_time = time.time()
metrics = self.train_step(batch)
epoch_loss += metrics['loss']
epoch_steps += 1
total_steps += 1
if step % 10 == 0:
step_time = time.time() - start_time
print(f" Step {step:4d} | Loss: {metrics['loss']:.4f} | "
f"LR: {metrics['lr']:.2e} | Time: {step_time:.3f}s")
if step % 20 == 0 and hasattr(self.engine, 'monitor'):
self.log_memory_stats()
if step >= 50:
break
avg_loss = epoch_loss / epoch_steps
print(f"๐ Epoch {epoch + 1} accomplished | Common Loss: {avg_loss:.4f}")
print("๐ Coaching accomplished!")
def log_memory_stats(self):
"""Log GPU reminiscence statistics"""
if torch.cuda.is_available():
allotted = torch.cuda.memory_allocated() / 1024**3
reserved = torch.cuda.memory_reserved() / 1024**3
print(f" ๐พ GPU Reminiscence - Allotted: {allotted:.2f}GB | Reserved: {reserved:.2f}GB")
def save_checkpoint(self, path: str):
"""Save mannequin checkpoint utilizing DeepSpeed"""
print(f"๐พ Saving checkpoint to {path}")
self.engine.save_checkpoint(path)
def demonstrate_inference(self, textual content: str = "The way forward for AI is"):
"""Reveal optimized inference with DeepSpeed"""
print(f"n๐ฎ Working inference with immediate: '{textual content}'")
inputs = self.tokenizer.encode(textual content, return_tensors="pt").to(self.engine.gadget)
self.engine.eval()
with torch.no_grad():
outputs = self.engine.module.generate(
inputs,
max_length=inputs.form[1] + 50,
num_return_sequences=1,
temperature=0.8,
do_sample=True,
pad_token_id=self.tokenizer.eos_token_id
)
generated_text = self.tokenizer.decode(outputs[0], skip_special_tokens=True)
print(f"๐ Generated textual content: {generated_text}")
self.engine.prepare()
We construct an end-to-end coach that creates a GPT-2 mannequin, units a DeepSpeed config (ZeRO, FP16, AdamW, warmup scheduler, tensorboard), and initializes the engine. We then run environment friendly coaching steps with logging and reminiscence statistics, save checkpoints, and exhibit inference to confirm optimization and technology in a single place. Take a look at theย FULL CODES here.
def run_advanced_tutorial():
"""Fundamental perform to run the superior DeepSpeed tutorial"""
print("๐ Superior DeepSpeed Tutorial Beginning...")
print("=" * 60)
model_config = {
'vocab_size': 50257,
'seq_length': 512,
'hidden_size': 768,
'num_layers': 6,
'num_heads': 12
}
ds_config = {
'train_batch_size': 16,
'micro_batch_size': 4,
'gradient_accumulation_steps': 4,
'zero_stage': 2,
'learning_rate': 1e-4,
'cpu_offload': False
}
print("๐ Configuration:")
print(f" Mannequin measurement: ~{sum(np.prod(form) for form in [[model_config['vocab_size'], model_config['hidden_size']], [model_config['hidden_size'], model_config['hidden_size']] * model_config['num_layers']]) / 1e6:.1f}M parameters")
print(f" ZeRO Stage: {ds_config['zero_stage']}")
print(f" Batch measurement: {ds_config['train_batch_size']}")
coach = AdvancedDeepSpeedTrainer(model_config, ds_config)
mannequin = coach.create_model()
engine = coach.initialize_deepspeed()
print("n๐ Creating artificial dataset...")
dataset = SyntheticTextDataset(
measurement=200,
seq_length=model_config['seq_length'],
vocab_size=model_config['vocab_size']
)
dataloader = DataLoader(
dataset,
batch_size=ds_config['micro_batch_size'],
shuffle=True
)
print("n๐ Pre-training reminiscence stats:")
coach.log_memory_stats()
coach.prepare(dataloader, num_epochs=2)
print("n๐ Submit-training reminiscence stats:")
coach.log_memory_stats()
coach.demonstrate_inference("DeepSpeed permits environment friendly coaching of")
checkpoint_path = "./deepspeed_checkpoint"
coach.save_checkpoint(checkpoint_path)
demonstrate_zero_stages()
demonstrate_memory_optimization()
print("n๐ฏ Tutorial accomplished efficiently!")
print("Key DeepSpeed options demonstrated:")
print(" โ
ZeRO optimization for reminiscence effectivity")
print(" โ
Combined precision coaching (FP16)")
print(" โ
Gradient accumulation")
print(" โ
Studying charge scheduling")
print(" โ
Checkpoint saving/loading")
print(" โ
Reminiscence monitoring")
def demonstrate_zero_stages():
"""Reveal totally different ZeRO optimization levels"""
print("n๐ง ZeRO Optimization Phases Defined:")
print(" Stage 0: Disabled (baseline)")
print(" Stage 1: Optimizer state partitioning (~4x reminiscence discount)")
print(" Stage 2: Gradient partitioning (~8x reminiscence discount)")
print(" Stage 3: Parameter partitioning (~Nx reminiscence discount)")
zero_configs = {
1: {"stage": 1, "reduce_bucket_size": 5e8},
2: {"stage": 2, "allgather_partitions": True, "reduce_scatter": True},
3: {"stage": 3, "stage3_prefetch_bucket_size": 5e8, "stage3_param_persistence_threshold": 1e6}
}
for stage, config in zero_configs.objects():
estimated_memory_reduction = [1, 4, 8, "Nx"][stage]
print(f" ๐ Stage {stage}: ~{estimated_memory_reduction}x reminiscence discount")
def demonstrate_memory_optimization():
"""Present reminiscence optimization strategies"""
print("n๐ง Reminiscence Optimization Strategies:")
print(" ๐ Gradient Checkpointing: Commerce compute for reminiscence")
print(" ๐ค CPU Offloading: Transfer optimizer states to CPU")
print(" ๐๏ธ Compression: Scale back communication overhead")
print(" โก Combined Precision: Use FP16 for quicker coaching")
We orchestrate the complete coaching run: set configs, construct the GPT-2 mannequin and DeepSpeed engine, create an artificial dataset, monitor GPU reminiscence, prepare for 2 epochs, run inference, and save a checkpoint. We then clarify ZeRO levels and spotlight memory-optimization techniques, resembling gradient checkpointing and CPU offloading, to grasp the trade-offs in apply. Take a look at theย FULL CODES here.
class DeepSpeedConfigGenerator:
"""Utility class to generate DeepSpeed configurations"""
@staticmethod
def generate_config(
batch_size: int = 16,
zero_stage: int = 2,
use_cpu_offload: bool = False,
learning_rate: float = 1e-4
) -> Dict[str, Any]:
"""Generate a whole DeepSpeed configuration"""
config = {
"train_batch_size": batch_size,
"train_micro_batch_size_per_gpu": max(1, batch_size // 4),
"gradient_accumulation_steps": max(1, batch_size // max(1, batch_size // 4)),
"zero_optimization": {
"stage": zero_stage,
"allgather_partitions": True,
"allgather_bucket_size": 5e8,
"overlap_comm": True,
"reduce_scatter": True,
"reduce_bucket_size": 5e8,
"contiguous_gradients": True
},
"fp16": {
"enabled": True,
"loss_scale": 0,
"loss_scale_window": 1000,
"initial_scale_power": 16,
"hysteresis": 2,
"min_loss_scale": 1
},
"optimizer": {
"kind": "AdamW",
"params": {
"lr": learning_rate,
"betas": [0.9, 0.999],
"eps": 1e-8,
"weight_decay": 0.01
}
},
"scheduler": {
"kind": "WarmupLR",
"params": {
"warmup_min_lr": 0,
"warmup_max_lr": learning_rate,
"warmup_num_steps": 100
}
},
"gradient_clipping": 1.0,
"wall_clock_breakdown": True
}
if use_cpu_offload:
config["zero_optimization"]["cpu_offload"] = True
config["zero_optimization"]["pin_memory"] = True
if zero_stage == 3:
config["zero_optimization"].replace({
"stage3_prefetch_bucket_size": 5e8,
"stage3_param_persistence_threshold": 1e6,
"stage3_gather_16bit_weights_on_model_save": True
})
return config
def benchmark_zero_stages():
"""Benchmark totally different ZeRO levels"""
print("n๐ Benchmarking ZeRO Phases...")
model_config = {
'vocab_size': 50257,
'seq_length': 256,
'hidden_size': 512,
'num_layers': 4,
'num_heads': 8
}
outcomes = {}
for stage in [1, 2]:
print(f"n๐ฌ Testing ZeRO Stage {stage}...")
ds_config = {
'train_batch_size': 8,
'micro_batch_size': 2,
'gradient_accumulation_steps': 4,
'zero_stage': stage,
'learning_rate': 1e-4
}
attempt:
coach = AdvancedDeepSpeedTrainer(model_config, ds_config)
mannequin = coach.create_model()
engine = coach.initialize_deepspeed()
if torch.cuda.is_available():
torch.cuda.reset_peak_memory_stats()
dataset = SyntheticTextDataset(measurement=20, seq_length=model_config['seq_length'])
dataloader = DataLoader(dataset, batch_size=ds_config['micro_batch_size'])
start_time = time.time()
for i, batch in enumerate(dataloader):
if i >= 5:
break
coach.train_step(batch)
end_time = time.time()
peak_memory = torch.cuda.max_memory_allocated() / 1024**3
outcomes[stage] = {
'peak_memory_gb': peak_memory,
'time_per_step': (end_time - start_time) / 5
}
print(f" ๐ Peak Reminiscence: {peak_memory:.2f}GB")
print(f" โฑ๏ธ Time per step: {outcomes[stage]['time_per_step']:.3f}s")
del coach, mannequin, engine
torch.cuda.empty_cache()
besides Exception as e:
print(f" โ Error with stage {stage}: {str(e)}")
if len(outcomes) > 1:
print(f"n๐ Comparability:")
stage_1_memory = outcomes.get(1, {}).get('peak_memory_gb', 0)
stage_2_memory = outcomes.get(2, {}).get('peak_memory_gb', 0)
if stage_1_memory > 0 and stage_2_memory > 0:
memory_reduction = (stage_1_memory - stage_2_memory) / stage_1_memory * 100
print(f" ๐ฏ Reminiscence discount from Stage 1 to 2: {memory_reduction:.1f}%")
def demonstrate_advanced_features():
"""Reveal extra superior DeepSpeed options"""
print("n๐ Superior DeepSpeed Options:")
print(" ๐๏ธ Dynamic Loss Scaling: Robotically adjusts FP16 loss scaling")
print(" ๐๏ธ Gradient Compression: Reduces communication overhead")
print(" ๐ Pipeline Parallelism: Splits mannequin throughout units")
print(" ๐งโ๐ Knowledgeable Parallelism: Environment friendly Combination-of-Consultants coaching")
print(" ๐ Curriculum Studying: Progressive coaching methods")
if __name__ == "__main__":
print(f"๐ฅ๏ธ CUDA Accessible: {torch.cuda.is_available()}")
if torch.cuda.is_available():
print(f" GPU: {torch.cuda.get_device_name()}")
print(f" Reminiscence: {torch.cuda.get_device_properties(0).total_memory / 1024**3:.1f}GB")
attempt:
run_advanced_tutorial()
benchmark_zero_stages()
demonstrate_advanced_features()
besides Exception as e:
print(f"โ Error throughout tutorial: {str(e)}")
print("๐ก Suggestions for troubleshooting:")
print(" - Guarantee you might have GPU runtime enabled in Colab")
print(" - Strive lowering batch_size or mannequin measurement if dealing with reminiscence points")
print(" - Allow CPU offloading in ds_config if wanted")
We generate reusable DeepSpeed configurations, benchmark ZeRO levels to match reminiscence and velocity, and showcase superior options resembling dynamic loss scaling and pipeline/MoE parallelism. We additionally detect CUDA, run the complete tutorial end-to-end, and supply clear troubleshooting suggestions, permitting us to iterate confidently in Colab.
In conclusion, we achieve a complete understanding of how DeepSpeed enhances mannequin coaching effectivity by putting a steadiness between efficiency and reminiscence trade-offs. From leveraging ZeRO levels for reminiscence discount to making use of FP16 combined precision and CPU offloading, the tutorial showcases highly effective methods that make large-scale coaching accessible on modest {hardware}. By the top, learners could have educated and optimized a GPT-style mannequin, benchmarked configurations, monitored GPU sources, and explored superior options resembling pipeline parallelism and gradient compression.
Take a look at theย FULL CODES here.ย Be at liberty to take a look at ourย GitHub Page for Tutorials, Codes and Notebooks.ย Additionally,ย be happy to comply with us onย Twitterย and donโt neglect to affix ourย 100k+ ML SubRedditย and Subscribe toย our Newsletter.
Asif Razzaq is the CEO of Marktechpost Media Inc.. As a visionary entrepreneur and engineer, Asif is dedicated to harnessing the potential of Synthetic Intelligence for social good. His most up-to-date endeavor is the launch of an Synthetic Intelligence Media Platform, Marktechpost, which stands out for its in-depth protection of machine studying and deep studying information that’s each technically sound and simply comprehensible by a large viewers. The platform boasts of over 2 million month-to-month views, illustrating its reputation amongst audiences.