import torch
import torch.multiprocessing as mp
import torch.nn.functional as F
from torch.distributed import init_process_group, destroy_process_group
from torch.nn.parallel import DistributedDataParallel as DDP
from torch.utils.data import Dataset, DataLoader
from torch.utils.data.distributed import DistributedSampler
import argparse
import os
class MyTrainDataset(Dataset):
def __init__(self, size):
self.size = size
self.data = [(torch.rand(20), torch.rand(1)) for _ in range(size)]
def __len__(self):
return self.size
def __getitem__(self, index):
return self.data[index]
def ddp_setup(rank, world_size):
"""
Args:
rank: Unique identifier of each process
world_size: Total number of processes
"""
os.environ["MASTER_ADDR"] = "localhost"
torch.cuda.set_device(rank)
init_process_group(backend="nccl", rank=rank, world_size=world_size)
class Trainer:
def __init__(
self,
model: torch.nn.Module,
train_data: DataLoader,
optimizer: torch.optim.Optimizer,
gpu_id: int,
save_every: int,
) -> None:
self.gpu_id = gpu_id
self.model = model.to(gpu_id)
self.train_data = train_data
self.optimizer = optimizer
self.save_every = save_every
self.model = DDP(model, device_ids=[gpu_id])
def _run_batch(self, source, targets):
self.optimizer.zero_grad()
output = self.model(source)
loss = F.cross_entropy(output, targets)
loss.backward()
self.optimizer.step()
def _run_epoch(self, epoch):
b_sz = len(next(iter(self.train_data))[0])
print(f"[GPU{self.gpu_id}] Epoch {epoch} | Batchsize: {b_sz} | Steps: {len(self.train_data)}")
self.train_data.sampler.set_epoch(epoch)
for source, targets in self.train_data:
source = source.to(self.gpu_id)
targets = targets.to(self.gpu_id)
self._run_batch(source, targets)
def _save_checkpoint(self, epoch):
ckp = self.model.module.state_dict()
PATH = "checkpoint.pt"
torch.save(ckp, PATH)
print(f"Epoch {epoch} | Training checkpoint saved at {PATH}")
def train(self, max_epochs: int):
for epoch in range(max_epochs):
self._run_epoch(epoch)
if self.gpu_id == 0 and epoch % self.save_every == 0:
self._save_checkpoint(epoch)
def load_train_objs():
train_set = MyTrainDataset(2048) # load your dataset
model = torch.nn.Linear(20, 1) # load your model
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
return train_set, model, optimizer
def prepare_dataloader(dataset: Dataset, batch_size: int):
return DataLoader(
dataset,
batch_size=batch_size,
pin_memory=True,
shuffle=False,
sampler=DistributedSampler(dataset)
)
def main(rank: int, world_size: int, save_every: int, total_epochs: int, batch_size: int):
ddp_setup(rank, world_size)
dataset, model, optimizer = load_train_objs()
train_data = prepare_dataloader(dataset, batch_size)
trainer = Trainer(model, train_data, optimizer, rank, save_every)
trainer.train(total_epochs)
destroy_process_group()
if __name__ == "__main__":
device_count = torch.cuda.device_count()
device_str = ("PyTorch Version: " + str(torch.__version__) + "\n" +
"Torch Distributed: " + str(torch.distributed.is_available()) + "\n" +
"Cuda Available: " + str(torch.cuda.is_available()) + "\n" +
"Cuda Version: " + str(torch.version.cuda) + "\n" +
"ArchList: " + "\n" + str(torch.cuda.get_arch_list()) + "\n" +
"NCCL: Version: " + str(torch.cuda.nccl.version()) + "\n" +
"Device Count: " + str(device_count))
print(device_str)
for device_id in range(0, device_count):
device = torch.device("cuda:" + str(device_id))
major, minor = torch.cuda.get_device_capability(device)
gpu_str = ("Device ID: " + str(device_id) +
" Device Name: " + str(torch.cuda.get_device_name(device_id)) + "\n" +
" CUDA compute capability: " + str(major) + "." + str(minor) + "\n" +
" Properties: " + str(torch.cuda.get_device_properties(device_id)) + "\n" +
" NCCL: Available: " + str(torch.cuda.nccl.is_available(torch.rand(1, device=device))))
print(gpu_str)
parser = argparse.ArgumentParser(description='simple distributed training job')
parser.add_argument('total_epochs', type=int, help='Total epochs to train the model')
parser.add_argument('save_every', type=int, help='How often to save a snapshot')
parser.add_argument('--batch_size', default=32, type=int, help='Input batch size on each device (default: 32)')
args = parser.parse_args()
world_size = device_count
mp.spawn(main, args=(world_size, args.save_every, args.total_epochs, args.batch_size), nprocs=world_size) |