Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Expand
titlemulti_gpu.py
Code Block
import torch
import torch.multiprocessing as mp
import torch.nn.functional as F
from torch.utils.datadistributed import Datasetinit_process_group, DataLoader
importdestroy_process_group
from torch.multiprocessing.nn.parallel import DistributedDataParallel as mpDDP
from torch.utils.data.distributed import DistributedSamplerDataset, DataLoader
from torch.utils.nndata.paralleldistributed import DistributedDataParallelDistributedSampler
as
DDPimport fromargparse
torch.distributed import init_process_group, destroy_process_group

import argparse
import os


class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        return self.data[index]

    
def ddp_setup(rank, world_size):
    """
    Args:
        rank: Unique identifier of each process
        world_size: Total number of processes
    """
    os.environ["MASTER_ADDR"] = "localhost"
    torch.cuda.set_device(rank)
    init_process_group(backend="nccl", rank=rank, world_size=world_size)


class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        gpu_id: int,
        save_every: int,
    ) -> None:
        self.gpu_id = gpu_id
        self.model = model.to(gpu_id)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.model = DDP(model, device_ids=[gpu_id])

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = F.cross_entropy(output, targets)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        b_sz = len(next(iter(self.train_data))[0])
        print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.gpu_id)
            targets = targets.to(self.gpu_id)
            self._run_batch(source, targets)

    def _save_checkpoint(self, epoch):
        ckp = self.model.module.state_dict()
        PATH = "checkpoint.pt"
        torch.save(ckp, PATH)
        print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")

    def train(self, max_epochs: int):
        for epoch in range(max_epochs):
            self._run_epoch(epoch)
            if self.gpu_id == 0 and epoch % self.save_every == 0:
                self._save_checkpoint(epoch)


def load_train_objs():
    train_set = MyTrainDataset(2048)  # load your dataset
    model = torch.nn.Linear(20, 1)  # load your model
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )


def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
    ddp_setup(rank, world_size)
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, rank, save_every)
    trainer.train(total_epochs)
    destroy_process_group()


if __name__ == "__main__":
    device_count = torch.cuda.device_count()

    device_str = ("PyTorch Version: " + str(torch.__version__) + "\n" +
                  "Torch Distributed: " + str(torch.distributed.is_available()) + "\n" +
                  "Cuda Available: " + str(torch.cuda.is_available()) + "\n" +
                  "Cuda Version: " + str(torch.version.cuda) + "\n" +
                  "ArchList: " + "\n" + str(torch.cuda.get_arch_list()) + "\n" +
                  "NCCL: Version: " + str(torch.cuda.nccl.version()) + "\n" +
                  "Device Count: " + str(device_count))
    print(device_str)
    
    for device_id in range(0, device_count):
        device = torch.device("cuda:" + str(device_id))
        major, minor = torch.cuda.get_device_capability(device)
        gpu_str = ("Device ID: " + str(device_id) +
                   " Device Name: " + str(torch.cuda.get_device_name(device_id)) + "\n" +
                   "  CUDA compute capability: " + str(major) + "." + str(minor) + "\n" +
                   "  Properties: " + str(torch.cuda.get_device_properties(device_id)) + "\n" +
                   "  NCCL: Available: " + str(torch.cuda.nccl.is_available(torch.rand(1, device=device))))
        print(gpu_str)
        
    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
    parser.add_argument('save_every', type=int, help='How often to save a snapshot')
    parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()

    world_size = device_count
    mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size)
Expand
titleSlurm submission script
Code Block
#!/bin/bash

#SBATCH --job-name=pyt-multi_-gpu
#SBATCH --account=<project-name>
#SBATCH --time=5:00
#SBATCH --nodes=1
#SBATCH --cpus-per-task=2
#SBATCH --mail-type=ALL
#SBATCH --mail-user=<email-address>
#SBATCH --output=pyt_multi_gpu_%A.out
#SBATCH --partition=<gpu-partition>
#SBATCH --gres=gpu:<num-of-gpus>

export OMP_NUM_THREADS=1
# Uncomment for NCCL related logging.
# export NCCL_DEBUG=INFO

echo "SLURM_JOB_ID:" $SLURM_JOB_ID
echo "SLURM_JOB_NODELIST:" $SLURM_JOB_NODELIST
echo "SLURM_GPUS:" $SLURM_GPUS
echo "SLURM_GPUS_ON_NODE:" $SLURM_GPUS_ON_NODE
echo "SLURM_JOB_GPUS:" $SLURM_JOB_GPUS
echo "CUDA_VISIBLE_DEVICES:" $CUDA_VISIBLE_DEVICES

# Environment Variable used within the code.
# Sets a random port to potentially allow different jobs to 
# run on the same GPU device.
export MASTER_PORT=$(expr 10000 + $(echo -n $SLURM_JOBID | tail -c 4))
echo "MASTER_PORT="$MASTER_PORT

# List GPU devices allocated.
nvidia-smi -L

module purge
module load miniconda3/<version>
conda activate <path-to-conda-environmnet>

# Monitor GPU usage
# Run this process in the background.
nvidia-smi \
--query-gpu=timestamp,count,gpu_name,gpu_uuid,utilization.gpu,utilization.memory,memory.total,memory.reserved,memory.used,memory.free,temperature.gpu,temperature.memory \
--format=csv -l 1 \
>  &
echo "Writing nvidia-smi to: gpu_usage.csv"

python multi_gpu.py 50 10

echo "Done."

Code/Script Comments:

  • Under the hood, the code uses the MASTER_PORT environment variable. This is set to a random number within the submission script to allow multiple jobs. If there all use the same value, and run on the same compute node, this can cause errors.

  • Recording the various torch and cuda versions and capabilities confirms that you have the intended environment and helps with triaging issues.

  • Within the submission script, the nvidia-smi tool is run as a background process to monitor the utilization of the gpu and its memory - this uses only a few of the available options.

    • This uses the -l 1 option to loop the command every second.

    • It also demonstrates an alternative to running a long interactive desktop via ondemand where users typically just watch using this command.

  • Within the submission script, use the export NCCL_DEBUG=INFO environment variable to log NCCL related functionality - useful if you’re seeing related issues.

  • Explicitly setting OMP_NUM_THREADS=1 removes related warnings.

  • For details and understanding of what the actual python code is actually doing, read the the link above.