Skip to content
Snippets Groups Projects
Commit 912f6813 authored by Fahad Khalid's avatar Fahad Khalid
Browse files

Removed the pytorch sample, as it has been moved to the tf2_pytorch branch.

parent 66fcb392
No related branches found
No related tags found
1 merge request!4Updated to use Tensorflow2
# Notes
The source code sample was taken from the Horovod examples repository
[here](https://github.com/horovod/horovod/tree/master/examples/pytorch)
(last checked: April 27, 2021). The sample has been slightly modified. Our
changes are limited to,
* The data loading mechanism.
* Removal of `filelock` to eliminate dependence on a package that is not
available on the supercomputers.
* A few additional comments pertaining to our custom data loading mechanism.
**Note:** All newly added statements follow a comment beginning with `[HPCNS]`.
All statements that demonstrate the use of Horovod follow a comment beginning
with `[Horovod]` (as added by Horovod developers).
The following sample is included:
1. `mnist.py`: A simple training program for an MNIST classifier that
uses Horovod for data distribution.
#!/usr/bin/env bash
# Slurm job configuration
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --ntasks-per-node=4
#SBATCH --output=output_%j.out
#SBATCH --error=error_%j.er
#SBATCH --time=00:10:00
#SBATCH --job-name=TUTORIAL
#SBATCH --gres=gpu:4
#SBATCH --partition=dc-gpu-devel
# Load the required modules
module load GCC/9.3.0
module load OpenMPI/4.1.0rc1
module load PyTorch/1.7.0-Python-3.8.5
module load torchvision/0.8.1-Python-3.8.5
module load Horovod/0.20.3-Python-3.8.5
# Make all GPUs visible per node
export CUDA_VISIBLE_DEVICES=0,1,2,3
# Run the program
srun python -u mnist.py
#!/usr/bin/env bash
# Slurm job configuration
#SBATCH --nodes=2
#SBATCH --ntasks=2
#SBATCH --ntasks-per-node=1
#SBATCH --output=output_%j.out
#SBATCH --error=error_%j.er
#SBATCH --time=00:10:00
#SBATCH --job-name=TUTORIAL
#SBATCH --gres=gpu:1
#SBATCH --partition=develgpus
# Load the required modules
module load GCC/9.3.0
module load OpenMPI/4.1.0rc1
module load PyTorch/1.7.0-Python-3.8.5
module load torchvision/0.8.1-Python-3.8.5
module load Horovod/0.20.3-Python-3.8.5
# Make all GPUs visible per node
export CUDA_VISIBLE_DEVICES=0
# Run the program
srun python -u mnist.py
#!/usr/bin/env bash
# Slurm job configuration
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --ntasks-per-node=4
#SBATCH --output=output_%j.out
#SBATCH --error=error_%j.er
#SBATCH --time=00:10:00
#SBATCH --job-name=TUTORIAL
#SBATCH --gres=gpu:4
#SBATCH --partition=develbooster
# Load the required modules
module load GCC/9.3.0
module load OpenMPI/4.1.0rc1
module load PyTorch/1.7.0-Python-3.8.5
module load torchvision/0.8.1-Python-3.8.5
module load Horovod/0.20.3-Python-3.8.5
# Make all GPUs visible per node
export CUDA_VISIBLE_DEVICES=0,1,2,3
# Run the program
srun python -u mnist.py
#!/usr/bin/env bash
# Slurm job configuration
#SBATCH --nodes=1
#SBATCH --ntasks=4
#SBATCH --ntasks-per-node=4
#SBATCH --output=output_%j.out
#SBATCH --error=error_%j.er
#SBATCH --time=00:10:00
#SBATCH --job-name=TUTORIAL
#SBATCH --gres=gpu:4
#SBATCH --partition=develgpus
# Load the required modules
module load GCC/9.3.0
module load OpenMPI/4.1.0rc1
module load PyTorch/1.7.0-Python-3.8.5
module load torchvision/0.8.1-Python-3.8.5
module load Horovod/0.20.3-Python-3.8.5
# Make all GPUs visible per node
export CUDA_VISIBLE_DEVICES=0,1,2,3
# Run the program
srun python -u mnist.py
import os
import sys
import shutil
import argparse
import torch.multiprocessing as mp
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import torch.utils.data.distributed
import horovod.torch as hvd
# [HPCNS] Import the DataValidator, which can then be used to
# validate and load the path to the already downloaded dataset.
sys.path.insert(0, '../utils')
from data_utils import DataValidator
# Training settings
parser = argparse.ArgumentParser(description='PyTorch MNIST Example')
parser.add_argument('--batch-size', type=int, default=64, metavar='N',
help='input batch size for training (default: 64)')
parser.add_argument('--test-batch-size', type=int, default=1000, metavar='N',
help='input batch size for testing (default: 1000)')
parser.add_argument('--epochs', type=int, default=10, metavar='N',
help='number of epochs to train (default: 10)')
parser.add_argument('--lr', type=float, default=0.01, metavar='LR',
help='learning rate (default: 0.01)')
parser.add_argument('--momentum', type=float, default=0.5, metavar='M',
help='SGD momentum (default: 0.5)')
parser.add_argument('--no-cuda', action='store_true', default=False,
help='disables CUDA training')
parser.add_argument('--seed', type=int, default=42, metavar='S',
help='random seed (default: 42)')
parser.add_argument('--log-interval', type=int, default=10, metavar='N',
help='how many batches to wait before logging training status')
parser.add_argument('--fp16-allreduce', action='store_true', default=False,
help='use fp16 compression during allreduce')
parser.add_argument('--use-adasum', action='store_true', default=False,
help='use adasum algorithm to do reduction')
parser.add_argument('--gradient-predivide-factor', type=float, default=1.0,
help='apply gradient predivide factor in optimizer (default: 1.0)')
parser.add_argument('--data-dir',
help='location of the training dataset in the local filesystem (will be downloaded if needed)')
class Net(nn.Module):
def __init__(self):
super(Net, self).__init__()
self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
self.conv2_drop = nn.Dropout2d()
self.fc1 = nn.Linear(320, 50)
self.fc2 = nn.Linear(50, 10)
def forward(self, x):
x = F.relu(F.max_pool2d(self.conv1(x), 2))
x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
x = x.view(-1, 320)
x = F.relu(self.fc1(x))
x = F.dropout(x, training=self.training)
x = self.fc2(x)
return F.log_softmax(x)
def train(epoch):
model.train()
# Horovod: set epoch to sampler for shuffling.
train_sampler.set_epoch(epoch)
for batch_idx, (data, target) in enumerate(train_loader):
if args.cuda:
data, target = data.cuda(), target.cuda()
optimizer.zero_grad()
output = model(data)
loss = F.nll_loss(output, target)
loss.backward()
optimizer.step()
if batch_idx % args.log_interval == 0:
# Horovod: use train_sampler to determine the number of examples in
# this worker's partition.
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx * len(data), len(train_sampler),
100. * batch_idx / len(train_loader), loss.item()))
def metric_average(val, name):
tensor = torch.tensor(val)
avg_tensor = hvd.allreduce(tensor, name=name)
return avg_tensor.item()
def test():
model.eval()
test_loss = 0.
test_accuracy = 0.
for data, target in test_loader:
if args.cuda:
data, target = data.cuda(), target.cuda()
output = model(data)
# sum up batch loss
test_loss += F.nll_loss(output, target, size_average=False).item()
# get the index of the max log-probability
pred = output.data.max(1, keepdim=True)[1]
test_accuracy += pred.eq(target.data.view_as(pred)).cpu().float().sum()
# Horovod: use test_sampler to determine the number of examples in
# this worker's partition.
test_loss /= len(test_sampler)
test_accuracy /= len(test_sampler)
# Horovod: average metric values across workers.
test_loss = metric_average(test_loss, 'avg_loss')
test_accuracy = metric_average(test_accuracy, 'avg_accuracy')
# Horovod: print output only on first rank.
if hvd.rank() == 0:
print('\nTest set: Average loss: {:.4f}, Accuracy: {:.2f}%\n'.format(
test_loss, 100. * test_accuracy))
if __name__ == '__main__':
args = parser.parse_args()
args.cuda = not args.no_cuda and torch.cuda.is_available()
# Horovod: initialize library.
hvd.init()
torch.manual_seed(args.seed)
if args.cuda:
# Horovod: pin GPU to local rank.
torch.cuda.set_device(hvd.local_rank())
torch.cuda.manual_seed(args.seed)
# Horovod: limit # of CPU threads to be used per worker.
torch.set_num_threads(1)
kwargs = {'num_workers': 1, 'pin_memory': True} if args.cuda else {}
# When supported, use 'forkserver' to spawn dataloader workers instead of 'fork' to prevent
# issues with Infiniband implementations that are not fork-safe
if (kwargs.get('num_workers', 0) > 0 and hasattr(mp, '_supports_context') and
mp._supports_context and 'forkserver' in mp.get_all_start_methods()):
kwargs['multiprocessing_context'] = 'forkserver'
# data_dir = args.data_dir or './data'
# [HPCNS] Name of the dataset file
data_file = 'mnist/pytorch/data'
# [HPCNS] Path to the directory containing the dataset file
data_dir = DataValidator.validated_data_dir(data_file)
# [HPCNS] Fully qualified dataset file name
dataset_file = os.path.join(data_dir, data_file)
# [HPCNS] Dataset filename for this rank
dataset_root_for_rank = f'MNIST-data-{hvd.rank()}'
dataset_for_rank = f'{dataset_root_for_rank}/MNIST'
# [HPCNS] If the path already exists, remove it
if os.path.exists(dataset_for_rank):
shutil.rmtree(dataset_for_rank)
# [HPCNS] Make a copy of the dataset for this rank
shutil.copytree(dataset_file, dataset_for_rank)
train_dataset = \
datasets.MNIST(dataset_root_for_rank, train=True, download=False,
transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
# Horovod: use DistributedSampler to partition the training data.
train_sampler = torch.utils.data.distributed.DistributedSampler(
train_dataset, num_replicas=hvd.size(), rank=hvd.rank())
train_loader = torch.utils.data.DataLoader(
train_dataset, batch_size=args.batch_size, sampler=train_sampler, **kwargs)
test_dataset = \
datasets.MNIST(dataset_root_for_rank, train=False, transform=transforms.Compose([
transforms.ToTensor(),
transforms.Normalize((0.1307,), (0.3081,))
]))
# Horovod: use DistributedSampler to partition the test data.
test_sampler = torch.utils.data.distributed.DistributedSampler(
test_dataset, num_replicas=hvd.size(), rank=hvd.rank())
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=args.test_batch_size,
sampler=test_sampler, **kwargs)
model = Net()
# By default, Adasum doesn't need scaling up learning rate.
lr_scaler = hvd.size() if not args.use_adasum else 1
if args.cuda:
# Move model to GPU.
model.cuda()
# If using GPU Adasum allreduce, scale learning rate by local_size.
if args.use_adasum and hvd.nccl_built():
lr_scaler = hvd.local_size()
# Horovod: scale learning rate by lr_scaler.
optimizer = optim.SGD(model.parameters(), lr=args.lr * lr_scaler,
momentum=args.momentum)
# Horovod: broadcast parameters & optimizer state.
hvd.broadcast_parameters(model.state_dict(), root_rank=0)
hvd.broadcast_optimizer_state(optimizer, root_rank=0)
# Horovod: (optional) compression algorithm.
compression = hvd.Compression.fp16 if args.fp16_allreduce else hvd.Compression.none
# Horovod: wrap optimizer with DistributedOptimizer.
optimizer = hvd.DistributedOptimizer(optimizer,
named_parameters=model.named_parameters(),
compression=compression,
op=hvd.Adasum if args.use_adasum else hvd.Average,
gradient_predivide_factor=args.gradient_predivide_factor)
for epoch in range(1, args.epochs + 1):
train(epoch)
test()
# [HPCNS] Remove the copied dataset
shutil.rmtree(dataset_root_for_rank)
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment