-
Notifications
You must be signed in to change notification settings - Fork 121
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Pytorch plugin usage example #11
Changes from 3 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,18 @@ | ||
FROM pytorch/pytorch:1.5-cuda10.1-cudnn7-runtime | ||
|
||
RUN pip install awscli | ||
|
||
RUN pip install tensorboardX==2.0.0 flytekit[pytorch]==0.8.1 | ||
|
||
WORKDIR /app | ||
|
||
COPY ./workflows ./workflows | ||
COPY ./flyte.config ./ | ||
|
||
ARG IMAGE_TAG | ||
ENV FLYTE_INTERNAL_IMAGE "$IMAGE_TAG" | ||
ENV LC_ALL C.UTF-8 | ||
ENV LANG C.UTF-8 | ||
ENV PYTHONPATH /app | ||
|
||
ENTRYPOINT [] |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,2 @@ | ||
docker_build: | ||
IMAGE_NAME=flytesnacks-pytorch scripts/docker_build.sh |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,59 @@ | ||
PyTorch plugin usage example | ||
============================ | ||
|
||
This demo is built on top of this `example`_ | ||
|
||
############# | ||
Prerequisites | ||
############# | ||
|
||
Before running this make sure that | ||
- pytorch plugin is enabled in flytepropeller's config | ||
- `Kubeflow pytorch operator`_ is installed in your k8s cluster (you can use `base`_ and configure it in your deploy) | ||
- [if using GPU] `GPU device plugin`_ is deployed as well | ||
|
||
##### | ||
Usage | ||
##### | ||
|
||
Build container and push it to the registry | ||
|
||
.. code-block:: | ||
cd pytorch | ||
make docker_build | ||
Create flyte project | ||
|
||
.. code-block:: | ||
curl -v -X POST http://127.0.0.1:30081/api/v1/projects -d '{"project": {"id": "pytorch-mnist", "name": "pytorch-mnist"} }' | ||
Register workflow | ||
|
||
.. code-block:: | ||
docker run --network host flytesnacks-pytorch:<TAG> pyflyte -p pytorch-mnist -d development -c flyte.config register workflows | ||
Navigate to https://127.0.0.1:30081/console/projects/pytorch-mnist/workflows?domain=development and launch workflow. | ||
|
||
##### | ||
Notes | ||
##### | ||
|
||
In order to disable GPU usage for the MNIST task just comment out ``per_replica_gpu_limit="1"`` in ``pytorch_task`` decorator | ||
|
||
.. code-block:: | ||
@pytorch_task( | ||
workers_count=2, | ||
per_replica_cpu_request="500m", | ||
per_replica_memory_request="4Gi", | ||
per_replica_memory_limit="8Gi", | ||
#per_replica_gpu_limit="1", | ||
) | ||
.. _`example`: https://github.com/kubeflow/pytorch-operator/blob/b7fef224fef1ef0117f6e74961b557270fcf4b04/examples/mnist/mnist.py | ||
.. _`Kubeflow pytorch operator`: https://github.com/kubeflow/pytorch-operator | ||
.. _`base`: https://github.com/lyft/flyte/blob/master/kustomize/base/operators/kfoperators/pytorch/kustomization.yaml | ||
.. _`GPU device plugin`: https://kubernetes.io/docs/tasks/manage-gpus/scheduling-gpus/#deploying-nvidia-gpu-device-plugin |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,16 @@ | ||
[sdk] | ||
workflow_packages=workflows | ||
|
||
[platform] | ||
url=127.0.0.1:30081 | ||
insecure=True | ||
|
||
[auth] | ||
assumable_iam_role=arn:aws:iam::123:role/test-role | ||
|
||
[aws] | ||
s3_shard_formatter=s3://my-s3-bucket/{} | ||
s3_shard_string_length=2 | ||
endpoint=http://minio:9000 | ||
access_key_id=minio | ||
secret_access_key=miniostorage |
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,60 @@ | ||
#!/usr/bin/env bash | ||
|
||
# WARNING: THIS FILE IS MANAGED IN THE 'BOILERPLATE' REPO AND COPIED TO OTHER REPOSITORIES. | ||
# ONLY EDIT THIS FILE FROM WITHIN THE 'LYFT/BOILERPLATE' REPOSITORY: | ||
# | ||
# TO OPT OUT OF UPDATES, SEE https://github.com/lyft/boilerplate/blob/master/Readme.rst | ||
|
||
set -e | ||
|
||
echo "" | ||
echo "------------------------------------" | ||
echo " DOCKER BUILD" | ||
echo "------------------------------------" | ||
echo "" | ||
|
||
if [ -n "$REGISTRY" ]; then | ||
# Do not push if there are unstaged git changes | ||
CHANGED=$(git status --porcelain) | ||
if [ -n "$CHANGED" ]; then | ||
echo "Please commit git changes before pushing to a registry" | ||
exit 1 | ||
fi | ||
fi | ||
|
||
|
||
GIT_SHA=$(git rev-parse HEAD) | ||
|
||
IMAGE_TAG_WITH_SHA="${IMAGE_NAME}:${GIT_SHA}" | ||
|
||
RELEASE_SEMVER=$(git describe --tags --exact-match "$GIT_SHA" 2>/dev/null) || true | ||
if [ -n "$RELEASE_SEMVER" ]; then | ||
IMAGE_TAG_WITH_SEMVER="${IMAGE_NAME}:${RELEASE_SEMVER}${IMAGE_TAG_SUFFIX}" | ||
fi | ||
|
||
# build the image | ||
docker build -t "$IMAGE_TAG_WITH_SHA" --build-arg IMAGE_TAG="${IMAGE_TAG_WITH_SHA}" . | ||
echo "${IMAGE_TAG_WITH_SHA} built locally." | ||
|
||
# if REGISTRY specified, push the images to the remote registy | ||
if [ -n "$REGISTRY" ]; then | ||
|
||
if [ -n "${DOCKER_REGISTRY_PASSWORD}" ]; then | ||
docker login --username="$DOCKER_REGISTRY_USERNAME" --password="$DOCKER_REGISTRY_PASSWORD" | ||
fi | ||
|
||
docker tag "$IMAGE_TAG_WITH_SHA" "${REGISTRY}/${IMAGE_TAG_WITH_SHA}" | ||
|
||
docker push "${REGISTRY}/${IMAGE_TAG_WITH_SHA}" | ||
echo "${REGISTRY}/${IMAGE_TAG_WITH_SHA} pushed to remote." | ||
|
||
# If the current commit has a semver tag, also push the images with the semver tag | ||
if [ -n "$RELEASE_SEMVER" ]; then | ||
|
||
docker tag "$IMAGE_TAG_WITH_SHA" "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER}" | ||
|
||
docker push "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER}" | ||
echo "${REGISTRY}/${IMAGE_TAG_WITH_SEMVER} pushed to remote." | ||
|
||
fi | ||
fi |
Original file line number | Diff line number | Diff line change | ||||
---|---|---|---|---|---|---|
@@ -0,0 +1,181 @@ | ||||||
#https://github.com/kubeflow/pytorch-operator/blob/b7fef224fef1ef0117f6e74961b557270fcf4b04/examples/mnist/mnist.py | ||||||
from __future__ import absolute_import | ||||||
from __future__ import print_function | ||||||
|
||||||
import argparse | ||||||
import os | ||||||
|
||||||
from tensorboardX import SummaryWriter | ||||||
from torchvision import datasets, transforms | ||||||
import torch | ||||||
import torch.distributed as dist | ||||||
import torch.nn as nn | ||||||
import torch.nn.functional as F | ||||||
import torch.optim as optim | ||||||
|
||||||
from flytekit.sdk.tasks import inputs, outputs | ||||||
from flytekit.sdk.types import Types | ||||||
from flytekit.sdk.workflow import workflow_class, Input, Output | ||||||
|
||||||
from flytekit.sdk.tasks import pytorch_task | ||||||
|
||||||
WORLD_SIZE = int(os.environ.get('WORLD_SIZE', 1)) | ||||||
|
||||||
|
||||||
class Net(nn.Module): | ||||||
def __init__(self): | ||||||
super(Net, self).__init__() | ||||||
self.conv1 = nn.Conv2d(1, 20, 5, 1) | ||||||
self.conv2 = nn.Conv2d(20, 50, 5, 1) | ||||||
self.fc1 = nn.Linear(4*4*50, 500) | ||||||
self.fc2 = nn.Linear(500, 10) | ||||||
|
||||||
def forward(self, x): | ||||||
x = F.relu(self.conv1(x)) | ||||||
x = F.max_pool2d(x, 2, 2) | ||||||
x = F.relu(self.conv2(x)) | ||||||
x = F.max_pool2d(x, 2, 2) | ||||||
x = x.view(-1, 4*4*50) | ||||||
x = F.relu(self.fc1(x)) | ||||||
x = self.fc2(x) | ||||||
return F.log_softmax(x, dim=1) | ||||||
|
||||||
def train(model, device, train_loader, optimizer, epoch, writer, log_interval): | ||||||
model.train() | ||||||
for batch_idx, (data, target) in enumerate(train_loader): | ||||||
data, target = data.to(device), target.to(device) | ||||||
optimizer.zero_grad() | ||||||
output = model(data) | ||||||
loss = F.nll_loss(output, target) | ||||||
loss.backward() | ||||||
optimizer.step() | ||||||
if batch_idx % log_interval == 0: | ||||||
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tloss={:.4f}'.format( | ||||||
epoch, batch_idx * len(data), len(train_loader.dataset), | ||||||
100. * batch_idx / len(train_loader), loss.item())) | ||||||
niter = epoch * len(train_loader) + batch_idx | ||||||
writer.add_scalar('loss', loss.item(), niter) | ||||||
|
||||||
def test(model, device, test_loader, writer, epoch): | ||||||
model.eval() | ||||||
test_loss = 0 | ||||||
correct = 0 | ||||||
with torch.no_grad(): | ||||||
for data, target in test_loader: | ||||||
data, target = data.to(device), target.to(device) | ||||||
output = model(data) | ||||||
test_loss += F.nll_loss(output, target, reduction='sum').item() # sum up batch loss | ||||||
pred = output.max(1, keepdim=True)[1] # get the index of the max log-probability | ||||||
correct += pred.eq(target.view_as(pred)).sum().item() | ||||||
|
||||||
test_loss /= len(test_loader.dataset) | ||||||
print('\naccuracy={:.4f}\n'.format(float(correct) / len(test_loader.dataset))) | ||||||
accuracy = float(correct) / len(test_loader.dataset) | ||||||
writer.add_scalar('accuracy', accuracy, epoch) | ||||||
return (epoch, accuracy) | ||||||
|
||||||
def epoch_step(model, device, train_loader, test_loader, optimizer, epoch, writer, log_interval): | ||||||
train(model, device, train_loader, optimizer, epoch, writer, log_interval) | ||||||
return test(model, device, test_loader, writer, epoch) | ||||||
|
||||||
def should_distribute(): | ||||||
return dist.is_available() and WORLD_SIZE > 1 | ||||||
|
||||||
|
||||||
def is_distributed(): | ||||||
return dist.is_available() and dist.is_initialized() | ||||||
|
||||||
@inputs( | ||||||
no_cuda=Types.Boolean, | ||||||
batch_size=Types.Integer, | ||||||
test_batch_size=Types.Integer, | ||||||
epochs=Types.Integer, | ||||||
learning_rate=Types.Float, | ||||||
sgd_momentum=Types.Float, | ||||||
seed=Types.Integer, | ||||||
log_interval=Types.Integer, | ||||||
save_model=Types.Boolean, | ||||||
dir=Types.String) | ||||||
@outputs(out=Types.String) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think this needs some improvements like changing output to be the model. @wild-endeavor can you help? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Sorry, I don't understand. Currently it seems that the output
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. First of all MNIST in pytorch world is sort of 'hello world' (https://github.com/pytorch/examples/tree/master/mnist) One can store model on their own by supplying I considered that accuracy is good enough to demonstrate, that job was done since
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. @igorvalko it is possible to output a model as a blob object in Flyte - Here is an example: The benefit is, you can then simply point your notebook to flyteadmin and load the model https://github.com/lyft/flytesnacks/blob/master/python/multi_step_linear/diabetes_xgboost.py#L104 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. and lets rename out to accuracy? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. done |
||||||
@pytorch_task( | ||||||
workers_count=2, | ||||||
per_replica_cpu_request="500m", | ||||||
per_replica_memory_request="4Gi", | ||||||
per_replica_memory_limit="8Gi", | ||||||
per_replica_gpu_limit="1", | ||||||
) | ||||||
def mnist_pytorch_job(workflow_params, no_cuda, batch_size, test_batch_size, epochs, learning_rate, sgd_momentum, seed, log_interval, save_model, dir, out): | ||||||
backend_type = dist.Backend.GLOO | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. So eventually I want this boilerplate code to be in the pytorch wrapper, can you add a TODO: simplify by abstracting the boilerplate There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Should we leave TODOs in example/boilerplate code? Maybe it's better to create issue to remember this idea? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue works too, either way. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. issue is great, and the TODO can point to the issue :) |
||||||
|
||||||
writer = SummaryWriter(dir) | ||||||
|
||||||
torch.manual_seed(seed) | ||||||
|
||||||
use_cuda = not no_cuda | ||||||
device = torch.device('cuda' if use_cuda and torch.cuda.is_available else 'cpu') | ||||||
|
||||||
print('Using device: {}, world size: {}'.format(device, WORLD_SIZE)) | ||||||
|
||||||
if should_distribute(): | ||||||
print('Using distributed PyTorch with {} backend'.format(backend_type)) | ||||||
dist.init_process_group(backend=backend_type) | ||||||
|
||||||
kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {} | ||||||
train_loader = torch.utils.data.DataLoader( | ||||||
datasets.MNIST('../data', train=True, download=True, | ||||||
transform=transforms.Compose([ | ||||||
transforms.ToTensor(), | ||||||
transforms.Normalize((0.1307,), (0.3081,)) | ||||||
])), | ||||||
batch_size=batch_size, shuffle=True, **kwargs) | ||||||
test_loader = torch.utils.data.DataLoader( | ||||||
datasets.MNIST('../data', train=False, transform=transforms.Compose([ | ||||||
transforms.ToTensor(), | ||||||
transforms.Normalize((0.1307,), (0.3081,)) | ||||||
])), | ||||||
batch_size=test_batch_size, shuffle=False, **kwargs) | ||||||
|
||||||
model = Net().to(device) | ||||||
|
||||||
if is_distributed(): | ||||||
Distributor = nn.parallel.DistributedDataParallel if use_cuda and torch.cuda.is_available \ | ||||||
else nn.parallel.DistributedDataParallelCPU | ||||||
model = Distributor(model) | ||||||
|
||||||
optimizer = optim.SGD(model.parameters(), lr=learning_rate, momentum=sgd_momentum) | ||||||
|
||||||
accuracies = [epoch_step(model, device, train_loader, test_loader, optimizer, epoch, writer, log_interval) for epoch in range(1, epochs + 1)] | ||||||
|
||||||
if (save_model): | ||||||
torch.save(model.state_dict(),"mnist_cnn.pt") | ||||||
|
||||||
out.set(str(accuracies)) | ||||||
|
||||||
|
||||||
@workflow_class | ||||||
class MNISTTest(object): | ||||||
no_cuda = Input(Types.Boolean, default=False, help="disables CUDA training") | ||||||
batch_size = Input(Types.Integer, default=64, help='input batch size for training (default: 64)') | ||||||
test_batch_size = Input(Types.Integer, default=1000, help='input batch size for testing (default: 1000)') | ||||||
epochs = Input(Types.Integer, default=1, help='number of epochs to train (default: 10)') | ||||||
learning_rate = Input(Types.Float, default=0.01, help='learning rate (default: 0.01)') | ||||||
sgd_momentum = Input(Types.Float, default=0.5, help='SGD momentum (default: 0.5)') | ||||||
seed = Input(Types.Integer, default=1, help='random seed (default: 1)') | ||||||
log_interval = Input(Types.Integer, default=10, help='how many batches to wait before logging training status') | ||||||
save_model = Input(Types.Boolean, default=False, help='For Saving the current Model') | ||||||
dir = Input(Types.String, default='logs', help='directory where summary logs are stored') | ||||||
|
||||||
mnist_result = mnist_pytorch_job( | ||||||
no_cuda=no_cuda, | ||||||
batch_size=batch_size, | ||||||
test_batch_size=test_batch_size, | ||||||
epochs=epochs, | ||||||
learning_rate=learning_rate, | ||||||
sgd_momentum=sgd_momentum, | ||||||
seed=seed, | ||||||
log_interval=log_interval, | ||||||
save_model=save_model, | ||||||
dir=dir | ||||||
) | ||||||
|
||||||
accuracies = Output(mnist_result.outputs.out, sdk_type=Types.String) | ||||||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Shouldn’t this be list of floats There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. https://github.com/lyft/flytesnacks/pull/11/files#diff-daa70d7f0f13b99d1e904cc0cb7fd7a7R152 I'm stringifying it here to be sure that UI will just have to print string I've provided. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. so we dont need to do that, since it is just an array of floats, UI can show it, if we mark it as an array of floats. Do you want to change that? |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How does TensorBoard work in this case?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
https://github.com/kubeflow/pytorch-operator/blob/master/examples/mnist/mnist.py#L6
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
hmm we will need to run tensorboard somewhere i guess right?