Versions Compared

Key

  • This line was added.
  • This line was removed.
  • Formatting was changed.

...

Expand
titlemulti_node.py
Code Block
import torch
import torch.nn.functional as F
from torch.utils.datadistributed import Datasetinit_process_group, DataLoaderdestroy_process_group
from torch.utilsnn.data.distributedparallel import DistributedDataParallel as DistributedSamplerDDP
from torch.nnutils.paralleldata import DistributedDataParallel as DDPDataset, DataLoader
from torch.utils.data.distributed import init_process_group, destroy_process_groupDistributedSampler

import argparse
import os
import socket


class MyTrainDataset(Dataset):
    def __init__(self, size):
        self.size = size
        self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]

    def __len__(self):
        return self.size

    def __getitem__(self, index):
        return self.data[index]


def ddp_setup():
    torch.cuda.set_device(int(os.environ["LOCAL_RANK"]))
    init_process_group(backend="nccl")


class Trainer:
    def __init__(
        self,
        model: torch.nn.Module,
        train_data: DataLoader,
        optimizer: torch.optim.Optimizer,
        save_every: int,
        snapshot_path: str,
    ) -> None:
        self.local_rank = int(os.environ["LOCAL_RANK"])
        self.global_rank = int(os.environ["RANK"])
        self.model = model.to(self.local_rank)
        self.train_data = train_data
        self.optimizer = optimizer
        self.save_every = save_every
        self.epochs_run = 0
        self.snapshot_path = snapshot_path
        if os.path.exists(snapshot_path):
            print("Loading snapshot")
            self._load_snapshot(snapshot_path)

        self.model = DDP(self.model, device_ids=[self.local_rank])

    def _load_snapshot(self, snapshot_path):
        loc = f"cuda:{self.local_rank}"
        snapshot = torch.load(snapshot_path, map_location=loc)
        self.model.load_state_dict(snapshot["MODEL_STATE"])
        self.epochs_run = snapshot["EPOCHS_RUN"]
        print(f"Resuming training from snapshot at Epoch {self.epochs_run}")

    def _run_batch(self, source, targets):
        self.optimizer.zero_grad()
        output = self.model(source)
        loss = F.cross_entropy(output, targets)
        loss.backward()
        self.optimizer.step()

    def _run_epoch(self, epoch):
        b_sz = len(next(iter(self.train_data))[0])
        print(f"[{socket.gethostname()}] [GPU{self.global_rank}] Epoch {epoch} | Batchsize: {b_sz}"
              f" | Steps: {len(self.train_data)}")
        self.train_data.sampler.set_epoch(epoch)
        for source, targets in self.train_data:
            source = source.to(self.local_rank)
            targets = targets.to(self.local_rank)
            self._run_batch(source, targets)

    def _save_snapshot(self, epoch):
        snapshot = {
            "MODEL_STATE": self.model.module.state_dict(),
            "EPOCHS_RUN": epoch,
        }
        torch.save(snapshot, self.snapshot_path)
        print(f"Epoch {epoch} | Training snapshot saved at {self.snapshot_path}")

    def train(self, max_epochs: int):
        for epoch in range(self.epochs_run, max_epochs):
            self._run_epoch(epoch)
            if self.local_rank == 0 and epoch % self.save_every == 0:
                self._save_snapshot(epoch)


def load_train_objs():
    train_set = MyTrainDataset(2048)  # load your dataset
    model = torch.nn.Linear(20, 1)  # load your model
    optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
    return train_set, model, optimizer


def prepare_dataloader(dataset: Dataset, batch_size: int):
    return DataLoader(
        dataset,
        batch_size=batch_size,
        pin_memory=True,
        shuffle=False,
        sampler=DistributedSampler(dataset)
    )


def main(save_every: int, total_epochs: int, batch_size: int, snapshot_path: str = "snapshot.pt"):
    ddp_setup()
    dataset, model, optimizer = load_train_objs()
    train_data = prepare_dataloader(dataset, batch_size)
    trainer = Trainer(model, train_data, optimizer, save_every, snapshot_path)
    trainer.train(total_epochs)
    destroy_process_group()


if __name__ == "__main__":
    device_count = torch.cuda.device_count()
    hostname = socket.gethostname()
    device_str = ("[" + hostname + "] PyTorch Version: " + str(torch.__version__) + "\n" +
                  "Torch Distributed: " + str(torch.distributed.is_available()) + "\n" +
                  "Cuda Available: " + str(torch.cuda.is_available()) + "\n" +
                  "Cuda Version: " + str(torch.version.cuda) + "\n" +
                  "ArchList: " + "\n" + str(torch.cuda.get_arch_list()) + "\n" +
                  "NCCL: Version: " + str(torch.cuda.nccl.version()) + "\n" +
                  "Device Count: " + str(device_count))
    print(device_str)

    for device_id in range(0, device_count):
        device = torch.device("cuda:" + str(device_id))
        major, minor = torch.cuda.get_device_capability(device)
        gpu_str = ("[" + hostname + "] Device ID: " + str(device_id) +
                   " Device Name: " + str(torch.cuda.get_device_name(device_id)) + "\n" +
                   "  CUDA compute capability: " + str(major) + "." + str(minor) + "\n" +
                   "  Properties: " + str(torch.cuda.get_device_properties(device_id)) + "\n" +
                   "  NCCL: Available: " + str(torch.cuda.nccl.is_available(torch.rand(1, device=device))))
        print(gpu_str)

    parser = argparse.ArgumentParser(description='simple distributed training job')
    parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
    parser.add_argument('save_every', type=int, help='How often to save a snapshot')
    parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
    args = parser.parse_args()

    main(args.save_every, args.total_epochs, args.batch_size)
Expand
titleSlurm submission script
Code Block
#!/bin/bash

#SBATCH --job-name=pyt-multi-node
#SBATCH --account=<project-name><project>
#SBATCH --time=510:00
#SBATCH --nodes=<num-of-nodes>
#SBATCH --ntasks-per-node=1
#SBATCH --gpus-per-task=<num-of-gpus-per-node>gpus>
#SBATCH --cpus-per-task=4<num-of-gpus + 1>
#SBATCH --mail-type=ALL
#SBATCH --mail-user=<email_-address>
#SBATCH --output=pyt_multi_node_%A.out
#SBATCH --partition=<gpu-partition>

export OMP_NUM_THREADS=1
# export NCCL_DEBUG=INFO
export LOGLEVEL=INFO

export OMP_NUM_THREADS=1
# Uncomment for NCCL related logging.
# export NCCL_DEBUG=INFO

echo "SLURM_JOB_ID:" $SLURM_JOB_ID
echo "SLURM_JOB_NUM_NODESNODELIST:" $SLURM_JOB_NUM_NODESNODELIST
echo "SLURM_JOB_NODELISTGPUS:" $SLURM_JOB_NODELISTGPUS
echo "- - - - - - - - - - - -"
echo "SLURM_GPUS:" $SLURM_GPUS
echo "SLURM_GPUS_PER_NODE" $SLURM_GPUS_PER_NODE
echo "SLURM_GPUS_SLURM_GPUS_ON_NODE:" $SLURM_GPUS_ON_NODE
echo "SLURM_JOB_GPUS:" $SLURM_JOB_GPUS
echo "CUDA_VISIBLE_DEVICES:" $CUDA_VISIBLE_DEVICES
echo "- - - - - - - - - - - -"

# Environment Variable used to setwithin the code.
# Sets a random port to potentially allow different jobs to 
# run on the same GPU device.
export MASTER_PORT=$(expr 10000 + $(echo -n $SLURM_JOBID | tail -c 4))
echo "MASTER_PORT="$MASTER_PORT

srun --overlap smi_monitor.sh $SLURM_JOB_ID &
# ListMonitor GPU devicesusage allocatedacross on headeach node.
# nvidia-smi -L

module purge
module load miniconda3/24.3.0
conda activate <path-to-conda-environmnet>

nodes=( $( scontrol show hostnames $SLURM_JOB_NODELIST ) )
nodes_array=($nodes)
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
echo "Node IP: "$head_node_ip

srun torchrun \
--nnodes "$SLURM_JOB_NUM_NODES" \
--nproc_per_node "$SLURM_GPUS_ON_NODE" \
--rdzv_id $RANDOM \
--rdzv_backend c10d \
--rdzv_endpoint $head_node_ip:$MASTER_PORT \
multi_node.py 50 10

echo "Done."--overlap
# Specifying --overlap allows steps to share all resources (CPUs, memory, and GRES) with all other steps.
# A step using this option will overlap all other steps, even those that did not specify --overlap.
# By default steps do not share resources with other parallel steps. This option applies to step allocations.

module purge
module load miniconda3/<version>
conda activate <path-to-conda-environmnet>

# Choose the first node, from the nodelist to act as head.
nodes=$( scontrol show hostnames $SLURM_JOB_NODELIST )
nodes_array=($nodes)
echo $nodes_array
head_node=${nodes_array[0]}
head_node_ip=$(srun --nodes=1 --ntasks=1 -w "$head_node" hostname --ip-address)
echo "Node IP: "$head_node_ip

srun torchrun \
--nnodes "$SLURM_JOB_NUM_NODES" \
--nproc_per_node "$SLURM_GPUS_ON_NODE" \
--rdzv_id $RANDOM \
--rdzv_backend c10d \
--rdzv_endpoint $head_node_ip:$MASTER_PORT \
multi_node.py 50 10

echo "Done."
Expand
titlesmi_monitor.sh
Code Block
#!/bin/bash
JOBID=$1
TASK_HOSTNAME=$(hostname)
echo $TASK_HOSTNAME >> $FILENAME

FILENAME=$JOBID"_"$TASK_HOSTNAME
echo "Writing nvidia-smi to: "$FILENAME 
nvidia-smi >> $FILENAME
nvidia-smi -L >> $FILENAME

nvidia-smi \
--query-gpu=timestamp,count,gpu_name,gpu_uuid,utilization.gpu,utilization.memory,memory.total,memory.reserved,memory.used,memory.free,temperature.gpu,temperature.memory \
--format=csv -l 1 \
 >> $FILENAME

Code/Script Comments:

  • The code uses the torchrun functionality that works with Slurm to setup and run across multiple nodes.

    • So you don’t have to hard code values, SLURM related environment variables are used within the torchrun command:

      • the --nnodes option is set to "$SLURM_JOB_NUM_NODES".

      • the --nproc_per_node option is set to "$SLURM_GPUS_ON_NODE".

    • Notice each node only runs one task, but this task has multiple gpus.

  • Randomly assign the MASTER_PORT environment variable within the submission script to allow multiple jobs potentially running across the same compute node.

  • Recording the various torch and cuda versions and capabilities confirms that you have the intended environment and helps with triaging issues.

  • Within the submission script we use srun to start background tasks, one for each node, that use the nvidia-smi to monitor the utilization of the gpu and its memory - this uses only a few of the available options.

    • The srun command uses the --overlap option to allow these sub tasks to use the same resources allocated for the job, and thus be able to detect the allocated GPUs. Without this option, then the srun blocks the remaining elements within the submission script.

    • This uses the -l 1 option to loop the command every second.

    • It also demonstrates an alternative to running a long interactive desktop via ondemand where users typically just watch using this command.

    • These background tasks will create an individual file for each node used.

  • Within the submission script, use the export NCCL_DEBUG=INFO environment variable to log NCCL related functionality - useful if you’re seeing related issues.

  • Explicitly setting OMP_NUM_THREADS=1 removes related warnings.

  • For details and understanding of what the actual python code is actually doing, read the the link above.