Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

PyTorch 2.4.1

GPU Device

Cuda Version

A30

A6000

L40s

H100

11.8

works

works

works

works

12.1

works

works

works

works

12.4

works

works

works

works

Example

The following example is based heavily on the code within the ddp-tutorial-series GitHub repo.

You will need to update the slurm submission script appropriately.

Expand
titlemulti_gpu.py
Code Block
import torch
import torch.multiprocessing as mp
import torch.nn.functional as F
from torch.utils.datadistributed import Dataset, DataLoader
import torch.multiprocessinginit_process_group, destroy_process_group
from torch.nn.parallel import DistributedDataParallel as mpDDP
from torch.utils.data.distributed import DistributedSamplerDataset, DataLoader
from torch.nn.parallel import DistributedDataParallel as DDP
from torchutils.data.distributed import init_process_group, destroy_process_groupDistributedSampler

import argparse
import os


class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        return self.data[index]

    
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    torch.cuda.set_device(rank)
    init_process_group(backend="nccl", rank=rank, world_size=world_size)


class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        gpu_id: int,
        save_every: int,
    ) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.model = DDP(model, device_ids=[gpu_id])

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = F.cross_entropy(output, targets)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        b_sz = len(next(iter(self.train_data))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            self._run_batch(source, targets)

    def _save_checkpoint(self, epoch):
        ckp = self.model.module.state_dict()
        PATH = "checkpoint.pt"
        torch.save(ckp, PATH)
        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if self.gpu_id == 0 and epoch % self.save_every == 0:
                self._save_checkpoint(epoch)


def load_train_objs():
    train_set = MyTrainDataset(2048)  # load your dataset
    model = torch.nn.Linear(20, 1)  # load your model
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )


def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, rank, save_every)
    trainer.train(total_epochs)
    destroy_process_group()


if __name__ == "__main__":
    device_count = torch.cuda.device_count()

    device_str = ("PyTorch Version: " + str(torch.__version__) + "\n" +
                  "Torch Distributed: " + str(torch.distributed.is_available()) + "\n" +
                  "Cuda Available: " + str(torch.cuda.is_available()) + "\n" +
                  "Cuda Version: " + str(torch.version.cuda) + "\n" +
                  "ArchList: " + "\n" + str(torch.cuda.get_arch_list()) + "\n" +
                  "NCCL: Version: " + str(torch.cuda.nccl.version()) + "\n" +
                  "Device Count: " + str(device_count))
    print(device_str)
    
    for device_id in range(0, device_count):
        device = torch.device("cuda:" + str(device_id))
        major, minor = torch.cuda.get_device_capability(device)
        gpu_str = ("Device ID: " + str(device_id) +
                   " Device Name: " + str(torch.cuda.get_device_name(device_id)) + "\n" +
                   "  CUDA compute capability: " + str(major) + "." + str(minor) + "\n" +
                   "  Properties: " + str(torch.cuda.get_device_properties(device_id)) + "\n" +
                   "  NCCL: Available: " + str(torch.cuda.nccl.is_available(torch.rand(1, device=device))))
        print(gpu_str)
        
    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
    parser.add_argument('save_every', type=int, help='How often to save a snapshot')
    parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()

    world_size = device_count
    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)
Expand
titleSlurm submission script
Code Block
#!/bin/bash

#SBATCH --job-name=pyt-multi-gpu
#SBATCH --account=<project-name>
#SBATCH --time=5:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=2
#SBATCH --mail-type=ALL
#SBATCH --mail-user=<email-address>
#SBATCH --output=pyt_multi_gpu_%A.out
#SBATCH --partition=<gpu-partition>
#SBATCH --gres=gpu:4<num-of-gpus>

export OMP_NUM_THREADS=1
# Uncomment for NCCL related logging.
# export NCCL_DEBUG=INFO

echo "SLURM_JOB_ID:" $SLURM_JOB_ID
echo "SLURM_JOB_NODELIST:" $SLURM_JOB_NODELIST
echo "SLURM_GPUS:" $SLURM_GPUS
echo "SLURM_GPUS_ON_NODE:" $SLURM_GPUS_ON_NODE
echo "SLURM_JOB_GPUS:" $SLURM_JOB_GPUS
echo "CUDA_VISIBLE_DEVICES:" $CUDA_VISIBLE_DEVICES

# Environment Variable sedused within the code.
# Sets a random port to potentially allow different jobs to 
# run on the same GPU device.
export MASTER_PORT=$(expr 10000 + $(echo -n $SLURM_JOBID | tail -c 4))
echo "MASTER_PORT="$MASTER_PORT

# List GPU devices allocated.
nvidia-smi -L

module purge
module load miniconda3/<version>
conda activate <path-to-conda-environmnet>

# Monitor GPU usage
# Run this process in the background.
nvidia-smi \
--query-gpu=timestamp,count,gpu_name,gpu_uuid,utilization.gpu,utilization.memory,memory.total,memory.reserved,memory.used,memory.free,temperature.gpu,temperature.memory \
--format=csv -l 1 \
>  &
echo "Writing nvidia-smi to: gpu_usage.csv"

python multi_gpu.py 50 10

echo "Done."

Code/Script Comments:

  • Under the hood, the code uses the MASTER_PORT environment variable. This is set to a random number within the submission script to allow multiple jobs. If there all use the same value, and run on the same compute node, this can cause errors.

  • Recording the various torch and cuda versions and capabilities confirms that you have the intended environment and helps with triaging issues.

  • Within the submission script, the nvidia-smi tool is run as a background process to monitor the utilization of the gpu and its memory - this uses only a few of the available options.

    • This uses the -l 1 option to loop the command every second.

    • It also demonstrates an alternative to running a long interactive desktop via ondemand where users typically just watch using this command.

  • Within the submission script, use the export NCCL_DEBUG=INFO environment variable to log NCCL related functionality - useful if you’re seeing related issues.

  • Explicitly setting OMP_NUM_THREADS=1 removes related warnings.

  • For details and understanding of what the actual python code is actually doing, read the the link above.