diff --git a/pytorch/Dockerfile b/pytorch/Dockerfile new file mode 100644 index 000000000..db3dd7fa0 --- /dev/null +++ b/pytorch/Dockerfile @@ -0,0 +1,18 @@ +FROM pytorch/pytorch:1.5-cuda10.1-cudnn7-runtime + +RUN pip install awscli + +RUN pip install tensorboardX==2.0.0 flytekit[pytorch]==0.8.1 + +WORKDIR /app + +COPY ./workflows ./workflows +COPY ./flyte.config ./ + +ARG IMAGE_TAG +ENV FLYTE_INTERNAL_IMAGE "$IMAGE_TAG" +ENV LC_ALL C.UTF-8 +ENV LANG C.UTF-8 +ENV PYTHONPATH /app + +ENTRYPOINT [] \ No newline at end of file diff --git a/pytorch/Makefile b/pytorch/Makefile new file mode 100644 index 000000000..419e08cca --- /dev/null +++ b/pytorch/Makefile @@ -0,0 +1,2 @@ +docker_build: + IMAGE_NAME=flytesnacks-pytorch scripts/docker_build.sh \ No newline at end of file diff --git a/pytorch/README.rst b/pytorch/README.rst new file mode 100644 index 000000000..98c68941c --- /dev/null +++ b/pytorch/README.rst @@ -0,0 +1,59 @@ +PyTorch plugin usage example +============================ + +This demo is built on top of this `example`_ + +############# +Prerequisites +############# + +Before running this make sure that + - pytorch plugin is enabled in flytepropeller's config + - `Kubeflow pytorch operator`_ is installed in your k8s cluster (you can use `base`_ and configure it in your deploy) + - [if using GPU] `GPU device plugin`_ is deployed as well + +##### +Usage +##### + +Build container and push it to the registry + +.. code-block:: + + cd pytorch + make docker_build + +Create flyte project + +.. code-block:: + + curl -v -X POST http://127.0.0.1:30081/api/v1/projects -d '{"project": {"id": "pytorch-mnist", "name": "pytorch-mnist"} }' + +Register workflow + +.. code-block:: + + docker run --network host flytesnacks-pytorch: pyflyte -p pytorch-mnist -d development -c flyte.config register workflows + +Navigate to https://127.0.0.1:30081/console/projects/pytorch-mnist/workflows?domain=development and launch workflow. + +##### +Notes +##### + +In order to disable GPU usage for the MNIST task just comment out ``per_replica_gpu_limit="1"`` in ``pytorch_task`` decorator + +.. code-block:: + + @pytorch_task( + workers_count=2, + per_replica_cpu_request="500m", + per_replica_memory_request="4Gi", + per_replica_memory_limit="8Gi", + #per_replica_gpu_limit="1", + ) + +.. _`example`: https://github.com/kubeflow/pytorch-operator/blob/b7fef224fef1ef0117f6e74961b557270fcf4b04/examples/mnist/mnist.py +.. _`Kubeflow pytorch operator`: https://github.com/kubeflow/pytorch-operator +.. _`base`: https://github.com/lyft/flyte/blob/master/kustomize/base/operators/kfoperators/pytorch/kustomization.yaml +.. _`GPU device plugin`: https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/#deploying-nvidia-gpu-device-plugin \ No newline at end of file diff --git a/pytorch/flyte.config b/pytorch/flyte.config new file mode 100644 index 000000000..214ab3a6f --- /dev/null +++ b/pytorch/flyte.config @@ -0,0 +1,16 @@ +[sdk] +workflow_packages=workflows + +[platform] +url=127.0.0.1:30081 +insecure=True + +[auth] +assumable_iam_role=arn:aws:iam::123:role/test-role + +[aws] +s3_shard_formatter=s3://my-s3-bucket/{} +s3_shard_string_length=2 +endpoint=http://minio:9000 +access_key_id=minio +secret_access_key=miniostorage \ No newline at end of file diff --git a/pytorch/scripts/docker_build.sh b/pytorch/scripts/docker_build.sh new file mode 100755 index 000000000..cea73500e --- /dev/null +++ b/pytorch/scripts/docker_build.sh @@ -0,0 +1,60 @@ +#!/usr/bin/env bash + +# WARNING: THIS FILE IS MANAGED IN THE 'BOILERPLATE' REPO AND COPIED TO OTHER REPOSITORIES. +# ONLY EDIT THIS FILE FROM WITHIN THE 'LYFT/BOILERPLATE' REPOSITORY: +# +# TO OPT OUT OF UPDATES, SEE https://github.com/lyft/boilerplate/blob/master/Readme.rst + +set -e + +echo "" +echo "------------------------------------" +echo " DOCKER BUILD" +echo "------------------------------------" +echo "" + +if [ -n "$REGISTRY" ]; then + # Do not push if there are unstaged git changes + CHANGED=$(git status --porcelain) + if [ -n "$CHANGED" ]; then + echo "Please commit git changes before pushing to a registry" + exit 1 + fi +fi + + +GIT_SHA=$(git rev-parse HEAD) + +IMAGE_TAG_WITH_SHA="${IMAGE_NAME}:${GIT_SHA}" + +RELEASE_SEMVER=$(git describe --tags --exact-match "$GIT_SHA" 2>/dev/null) || true +if [ -n "$RELEASE_SEMVER" ]; then + IMAGE_TAG_WITH_SEMVER="${IMAGE_NAME}:${RELEASE_SEMVER}${IMAGE_TAG_SUFFIX}" +fi + +# build the image +docker build -t "$IMAGE_TAG_WITH_SHA" --build-arg IMAGE_TAG="${IMAGE_TAG_WITH_SHA}" . +echo "${IMAGE_TAG_WITH_SHA} built locally." + +# if REGISTRY specified, push the images to the remote registy +if [ -n "$REGISTRY" ]; then + + if [ -n "${DOCKER_REGISTRY_PASSWORD}" ]; then + docker login --username="$DOCKER_REGISTRY_USERNAME" --password="$DOCKER_REGISTRY_PASSWORD" + fi + + docker tag "$IMAGE_TAG_WITH_SHA" "${REGISTRY}/${IMAGE_TAG_WITH_SHA}" + + docker push "${REGISTRY}/${IMAGE_TAG_WITH_SHA}" + echo "${REGISTRY}/${IMAGE_TAG_WITH_SHA} pushed to remote." + + # If the current commit has a semver tag, also push the images with the semver tag + if [ -n "$RELEASE_SEMVER" ]; then + + docker tag "$IMAGE_TAG_WITH_SHA" "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER}" + + docker push "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER}" + echo "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER} pushed to remote." + + fi +fi diff --git a/pytorch/workflows/mnist.py b/pytorch/workflows/mnist.py new file mode 100644 index 000000000..c2ba6e061 --- /dev/null +++ b/pytorch/workflows/mnist.py @@ -0,0 +1,180 @@ +#https://github.com/kubeflow/pytorch-operator/blob/b7fef224fef1ef0117f6e74961b557270fcf4b04/examples/mnist/mnist.py +from __future__ import absolute_import +from __future__ import print_function + +import argparse +import os + +from tensorboardX import SummaryWriter +from torchvision import datasets, transforms +import torch +import torch.distributed as dist +import torch.nn as nn +import torch.nn.functional as F +import torch.optim as optim + +from flytekit.sdk.tasks import inputs, outputs +from flytekit.sdk.types import Types +from flytekit.sdk.workflow import workflow_class, Input, Output + +from flytekit.sdk.tasks import pytorch_task + +WORLD_SIZE = int(os.environ.get('WORLD_SIZE', 1)) + + +class Net(nn.Module): + def __init__(self): + super(Net, self).__init__() + self.conv1 = nn.Conv2d(1, 20, 5, 1) + self.conv2 = nn.Conv2d(20, 50, 5, 1) + self.fc1 = nn.Linear(4*4*50, 500) + self.fc2 = nn.Linear(500, 10) + + def forward(self, x): + x = F.relu(self.conv1(x)) + x = F.max_pool2d(x, 2, 2) + x = F.relu(self.conv2(x)) + x = F.max_pool2d(x, 2, 2) + x = x.view(-1, 4*4*50) + x = F.relu(self.fc1(x)) + x = self.fc2(x) + return F.log_softmax(x, dim=1) + +def train(model, device, train_loader, optimizer, epoch, writer, log_interval): + model.train() + for batch_idx, (data, target) in enumerate(train_loader): + data, target = data.to(device), target.to(device) + optimizer.zero_grad() + output = model(data) + loss = F.nll_loss(output, target) + loss.backward() + optimizer.step() + if batch_idx % log_interval == 0: + print('Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}'.format( + epoch, batch_idx * len(data), len(train_loader.dataset), + 100. * batch_idx / len(train_loader), loss.item())) + niter = epoch * len(train_loader) + batch_idx + writer.add_scalar('loss', loss.item(), niter) + +def test(model, device, test_loader, writer, epoch): + model.eval() + test_loss = 0 + correct = 0 + with torch.no_grad(): + for data, target in test_loader: + data, target = data.to(device), target.to(device) + output = model(data) + test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss + pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability + correct += pred.eq(target.view_as(pred)).sum().item() + + test_loss /= len(test_loader.dataset) + print('\naccuracy={:.4f}\n'.format(float(correct) / len(test_loader.dataset))) + accuracy = float(correct) / len(test_loader.dataset) + writer.add_scalar('accuracy', accuracy, epoch) + return accuracy + +def epoch_step(model, device, train_loader, test_loader, optimizer, epoch, writer, log_interval): + train(model, device, train_loader, optimizer, epoch, writer, log_interval) + return test(model, device, test_loader, writer, epoch) + +def should_distribute(): + return dist.is_available() and WORLD_SIZE > 1 + + +def is_distributed(): + return dist.is_available() and dist.is_initialized() + +@inputs( + no_cuda=Types.Boolean, + batch_size=Types.Integer, + test_batch_size=Types.Integer, + epochs=Types.Integer, + learning_rate=Types.Float, + sgd_momentum=Types.Float, + seed=Types.Integer, + log_interval=Types.Integer, + dir=Types.String) +@outputs(epoch_accuracies=[Types.Float], model_state=Types.Blob) +@pytorch_task( + workers_count=2, + per_replica_cpu_request="500m", + per_replica_memory_request="4Gi", + per_replica_memory_limit="8Gi", + per_replica_gpu_limit="1", +) +def mnist_pytorch_job(workflow_params, no_cuda, batch_size, test_batch_size, epochs, learning_rate, sgd_momentum, seed, log_interval, dir, epoch_accuracies, model_state): + backend_type = dist.Backend.GLOO + + writer = SummaryWriter(dir) + + torch.manual_seed(seed) + + use_cuda = not no_cuda + device = torch.device('cuda' if use_cuda and torch.cuda.is_available else 'cpu') + + print('Using device: {}, world size: {}'.format(device, WORLD_SIZE)) + + if should_distribute(): + print('Using distributed PyTorch with {} backend'.format(backend_type)) + dist.init_process_group(backend=backend_type) + + kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} + train_loader = torch.utils.data.DataLoader( + datasets.MNIST('../data', train=True, download=True, + transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=batch_size, shuffle=True, **kwargs) + test_loader = torch.utils.data.DataLoader( + datasets.MNIST('../data', train=False, transform=transforms.Compose([ + transforms.ToTensor(), + transforms.Normalize((0.1307,), (0.3081,)) + ])), + batch_size=test_batch_size, shuffle=False, **kwargs) + + model = Net().to(device) + + if is_distributed(): + Distributor = nn.parallel.DistributedDataParallel if use_cuda and torch.cuda.is_available \ + else nn.parallel.DistributedDataParallelCPU + model = Distributor(model) + + optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=sgd_momentum) + + accuracies = [epoch_step(model, device, train_loader, test_loader, optimizer, epoch, writer, log_interval) for epoch in range(1, epochs + 1)] + + model_file = "mnist_cnn.pt" + torch.save(model.state_dict(), model_file) + + model_state.set(model_file) + epoch_accuracies.set(accuracies) + + +@workflow_class +class MNISTTest(object): + no_cuda = Input(Types.Boolean, default=False, help="disables CUDA training") + batch_size = Input(Types.Integer, default=64, help='input batch size for training (default: 64)') + test_batch_size = Input(Types.Integer, default=1000, help='input batch size for testing (default: 1000)') + epochs = Input(Types.Integer, default=1, help='number of epochs to train (default: 10)') + learning_rate = Input(Types.Float, default=0.01, help='learning rate (default: 0.01)') + sgd_momentum = Input(Types.Float, default=0.5, help='SGD momentum (default: 0.5)') + seed = Input(Types.Integer, default=1, help='random seed (default: 1)') + log_interval = Input(Types.Integer, default=10, help='how many batches to wait before logging training status') + dir = Input(Types.String, default='logs', help='directory where summary logs are stored') + + mnist_result = mnist_pytorch_job( + no_cuda=no_cuda, + batch_size=batch_size, + test_batch_size=test_batch_size, + epochs=epochs, + learning_rate=learning_rate, + sgd_momentum=sgd_momentum, + seed=seed, + log_interval=log_interval, + dir=dir + ) + + accuracies = Output(mnist_result.outputs.epoch_accuracies, sdk_type=[Types.Float]) + model = Output(mnist_result.outputs.model_state, sdk_type=Types.Blob)