hermes-agent/skills/mlops/pytorch-lightning/references/distributed.md
teknium f172f7d4aa Add skills tools and enhance model integration
- Introduced new skills tools: `skills_categories`, `skills_list`, and `skill_view` in `model_tools.py`, allowing for better organization and access to skill-related functionalities.
- Updated `toolsets.py` to include a new `skills` toolset, providing a dedicated space for skill tools.
- Enhanced `batch_runner.py` to recognize and validate skills tools during batch processing.
- Added comprehensive tool definitions for skills tools, ensuring compatibility with OpenAI's expected format.
- Created new shell script `test_skills_kimi.sh` for testing skills tool functionality with Kimi K2.5.
- Added example skill files demonstrating the structure and usage of skills within the Hermes-Agent framework, including `SKILL.md` for example and audiocraft skills.
- Improved documentation for skills tools and their integration into the existing tool framework, ensuring clarity for future development and usage.
2026-01-30 07:39:55 +00:00

11 KiB
Raw Blame History

PyTorch Lightning Distributed Training

Distributed Strategies

Lightning supports multiple distributed strategies with a single parameter change.

1. DDP (DistributedDataParallel)

Default strategy for multi-GPU:

# Automatic DDP on all available GPUs
trainer = L.Trainer(accelerator='gpu', devices=4, strategy='ddp')

# Or auto-detect
trainer = L.Trainer(accelerator='gpu', devices='auto')

How DDP works:

  • Replicates model on each GPU
  • Each GPU processes different batch
  • Gradients all-reduced across GPUs
  • Model weights synchronized

Launch:

# Lightning handles spawning processes automatically
python train.py

DDP Configuration:

from lightning.pytorch.strategies import DDPStrategy

strategy = DDPStrategy(
    find_unused_parameters=False,  # Set True if model has unused params
    gradient_as_bucket_view=True,  # Memory optimization
    static_graph=False,  # Set True if graph doesn't change
)

trainer = L.Trainer(strategy=strategy)

2. FSDP (Fully Sharded Data Parallel)

For large models (7B+ parameters):

from lightning.pytorch.strategies import FSDPStrategy

strategy = FSDPStrategy(
    sharding_strategy="FULL_SHARD",  # ZeRO-3 equivalent
    activation_checkpointing=None,   # Or specify layer types
    cpu_offload=False,               # CPU offload for memory
)

trainer = L.Trainer(
    accelerator='gpu',
    devices=8,
    strategy=strategy,
    precision='bf16'  # Recommended with FSDP
)

trainer.fit(model, train_loader)

FSDP Sharding Strategies:

# FULL_SHARD (most memory efficient, equivalent to ZeRO-3)
strategy = FSDPStrategy(sharding_strategy="FULL_SHARD")

# SHARD_GRAD_OP (less memory efficient, equivalent to ZeRO-2)
strategy = FSDPStrategy(sharding_strategy="SHARD_GRAD_OP")

# NO_SHARD (no sharding, like DDP)
strategy = FSDPStrategy(sharding_strategy="NO_SHARD")

Auto-wrap policy (wrap transformer blocks):

from torch.distributed.fsdp.wrap import transformer_auto_wrap_policy
from transformers.models.gpt2.modeling_gpt2 import GPT2Block
import functools

auto_wrap_policy = functools.partial(
    transformer_auto_wrap_policy,
    transformer_layer_cls={GPT2Block}
)

strategy = FSDPStrategy(
    auto_wrap_policy=auto_wrap_policy,
    activation_checkpointing_policy={GPT2Block}  # Checkpoint these blocks
)

3. DeepSpeed

For massive models (70B+ parameters):

from lightning.pytorch.strategies import DeepSpeedStrategy

# DeepSpeed ZeRO-3 with CPU offload
strategy = DeepSpeedStrategy(
    stage=3,                       # ZeRO-3
    offload_optimizer=True,        # CPU offload optimizer
    offload_parameters=True,       # CPU offload parameters
    cpu_checkpointing=True,        # Checkpoint to CPU
)

trainer = L.Trainer(
    accelerator='gpu',
    devices=8,
    strategy=strategy,
    precision='bf16'
)

trainer.fit(model, train_loader)

DeepSpeed configuration file:

{
  "train_batch_size": "auto",
  "train_micro_batch_size_per_gpu": "auto",
  "gradient_accumulation_steps": "auto",
  "zero_optimization": {
    "stage": 3,
    "offload_optimizer": {
      "device": "cpu",
      "pin_memory": true
    },
    "offload_param": {
      "device": "cpu",
      "pin_memory": true
    },
    "overlap_comm": true,
    "contiguous_gradients": true,
    "reduce_bucket_size": 5e8,
    "stage3_prefetch_bucket_size": 5e8,
    "stage3_param_persistence_threshold": 1e6
  },
  "bf16": {
    "enabled": true
  }
}

Use config file:

strategy = DeepSpeedStrategy(config='deepspeed_config.json')
trainer = L.Trainer(strategy=strategy)

4. DDP Spawn

Windows-compatible DDP:

# Use when DDP doesn't work (e.g., Windows, Jupyter)
trainer = L.Trainer(
    accelerator='gpu',
    devices=2,
    strategy='ddp_spawn'  # Spawns new processes
)

Note: Slower than DDP due to process spawning overhead

Multi-Node Training

Setup Multi-Node Cluster

Node 0 (master):

export MASTER_ADDR=192.168.1.100
export MASTER_PORT=12355
export WORLD_SIZE=16  # 2 nodes × 8 GPUs
export NODE_RANK=0

python train.py

Node 1 (worker):

export MASTER_ADDR=192.168.1.100
export MASTER_PORT=12355
export WORLD_SIZE=16
export NODE_RANK=1

python train.py

Training script:

trainer = L.Trainer(
    accelerator='gpu',
    devices=8,              # GPUs per node
    num_nodes=2,            # Total nodes
    strategy='ddp'
)

trainer.fit(model, train_loader)

SLURM Integration

SLURM job script:

#!/bin/bash
#SBATCH --nodes=4
#SBATCH --ntasks-per-node=8
#SBATCH --gres=gpu:8
#SBATCH --time=24:00:00

# Lightning auto-detects SLURM environment
srun python train.py

Training script (no changes needed):

# Lightning automatically reads SLURM environment variables
trainer = L.Trainer(
    accelerator='gpu',
    devices=8,
    num_nodes=4,  # From SBATCH --nodes
    strategy='ddp'
)

Kubernetes (KubeFlow)

Training script:

import os

# Lightning auto-detects Kubernetes
trainer = L.Trainer(
    accelerator='gpu',
    devices=int(os.getenv('WORLD_SIZE', 1)),
    strategy='ddp'
)

Mixed Precision Training

BF16 (A100/H100)

trainer = L.Trainer(
    precision='bf16',  # Or 'bf16-mixed'
    accelerator='gpu'
)

Advantages:

  • No gradient scaler needed
  • Same dynamic range as FP32
  • 2× speedup, 50% memory reduction

FP16 (V100, older GPUs)

trainer = L.Trainer(
    precision='16-mixed',  # Or just '16'
    accelerator='gpu'
)

Automatic gradient scaling handled by Lightning

FP8 (H100)

# Requires transformer_engine
# pip install transformer-engine[pytorch]

trainer = L.Trainer(
    precision='transformer-engine',
    accelerator='gpu'
)

Benefits: 2× faster than BF16 on H100

Gradient Accumulation

Simulate larger batch size:

trainer = L.Trainer(
    accumulate_grad_batches=4,  # Accumulate 4 batches
    precision='bf16'
)

# Effective batch = batch_size × accumulate_grad_batches × num_gpus
# Example: 32 × 4 × 8 = 1024

Dynamic accumulation:

# Accumulate more early in training
trainer = L.Trainer(
    accumulate_grad_batches={
        0: 8,   # Epochs 0-4: accumulate 8
        5: 4,   # Epochs 5-9: accumulate 4
        10: 2   # Epochs 10+: accumulate 2
    }
)

Checkpointing in Distributed

Save Checkpoint

from lightning.pytorch.callbacks import ModelCheckpoint

# Only rank 0 saves by default
checkpoint = ModelCheckpoint(
    dirpath='checkpoints/',
    filename='model-{epoch:02d}',
    save_top_k=3
)

trainer = L.Trainer(callbacks=[checkpoint], strategy='ddp')
trainer.fit(model, train_loader)

Manual save:

class MyModel(L.LightningModule):
    def training_step(self, batch, batch_idx):
        # Training...
        loss = ...

        # Save every 1000 steps (only rank 0)
        if batch_idx % 1000 == 0 and self.trainer.is_global_zero:
            self.trainer.save_checkpoint(f'checkpoint_step_{batch_idx}.ckpt')

        return loss

Load Checkpoint

# Resume training
trainer = L.Trainer(strategy='ddp')
trainer.fit(model, train_loader, ckpt_path='checkpoints/last.ckpt')

# Load for inference
model = MyModel.load_from_checkpoint('checkpoints/best.ckpt')
model.eval()

Strategy Comparison

Strategy Memory Efficiency Speed Use Case
DDP Low Fast Small models (<7B), single node
FSDP High Medium Large models (7-70B)
DeepSpeed ZeRO-2 Medium Fast Medium models (1-13B)
DeepSpeed ZeRO-3 Very High Slower Massive models (70B+)
DDP Spawn Low Slow Windows, debugging

Best Practices

1. Choose Right Strategy

# Model size guide
if model_params < 1e9:  # <1B
    strategy = 'ddp'
elif model_params < 7e9:  # 1-7B
    strategy = 'ddp' or DeepSpeedStrategy(stage=2)
elif model_params < 70e9:  # 7-70B
    strategy = FSDPStrategy(sharding_strategy="FULL_SHARD")
else:  # 70B+
    strategy = DeepSpeedStrategy(stage=3, offload_optimizer=True)

trainer = L.Trainer(strategy=strategy)

2. Avoid Sync Issues

class MyModel(L.LightningModule):
    def training_step(self, batch, batch_idx):
        # WRONG: This runs on all GPUs independently
        if batch_idx % 100 == 0:
            self.log_something()  # Logged 8 times on 8 GPUs!

        # CORRECT: Use is_global_zero
        if batch_idx % 100 == 0 and self.trainer.is_global_zero:
            self.log_something()  # Logged once

        loss = ...
        return loss

3. Efficient Data Loading

from torch.utils.data import DataLoader, DistributedSampler

# Lightning handles DistributedSampler automatically
train_loader = DataLoader(
    dataset,
    batch_size=32,
    num_workers=4,  # 4 workers per GPU
    pin_memory=True,
    persistent_workers=True
)

# Lightning automatically wraps with DistributedSampler in DDP
trainer.fit(model, train_loader)

4. Reduce Communication Overhead

from lightning.pytorch.strategies import DDPStrategy

strategy = DDPStrategy(
    gradient_as_bucket_view=True,  # Reduce memory copies
    static_graph=True,  # If model graph doesn't change (faster)
)

trainer = L.Trainer(strategy=strategy)

Common Issues

Issue: NCCL Timeout

Symptom: Training hangs with NCCL timeout error

Solution 1: Increase timeout

export NCCL_TIMEOUT=3600  # 1 hour
python train.py

Solution 2: Check network

# Test inter-node communication
nvidia-smi nvlink -s

# Verify all nodes can ping each other
ping <node-2-ip>

Issue: OOM with FSDP

Solution: Enable CPU offload

strategy = FSDPStrategy(
    sharding_strategy="FULL_SHARD",
    cpu_offload=True  # Offload to CPU
)

Issue: Different Results with DDP

Cause: Different random seeds per GPU

Solution: Set seed in LightningModule

class MyModel(L.LightningModule):
    def __init__(self):
        super().__init__()
        L.seed_everything(42, workers=True)  # Same seed everywhere

Issue: DeepSpeed Config Errors

Solution: Use Lightning's auto config

strategy = DeepSpeedStrategy(
    stage=3,
    # Don't specify config file, Lightning generates automatically
)

Resources