Skip to content

Commit

Permalink
Merge pull request #218 from fission/typedvalues-v2
Browse files Browse the repository at this point in the history
Formalize typedvalues implementation
  • Loading branch information
erwinvaneyk authored Oct 3, 2018
2 parents 484871b + 8577a2a commit 90cf2ae
Show file tree
Hide file tree
Showing 110 changed files with 3,690 additions and 2,643 deletions.
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: required
dist: trusty
language: go
go:
- 1.9
- "1.10"

python:
- 3.6
Expand All @@ -22,8 +22,7 @@ env:
- PATH=/tmp/fission-workflow-ci/bin:${PATH}
- BIN_DIR=/tmp/fission-workflow-ci/bin
- FISSION_VERSION=0.10.0
- HELM_VERSION=2.8.2
- KUBECTL_VERSION=1.9.6
- HELM_VERSION=2.11.0

services:
- docker
Expand All @@ -45,6 +44,7 @@ before_script:
# Static code analysis
- hack/verify-gofmt.sh
- hack/verify-govet.sh
- hack/verify-misc.sh
- helm lint charts/fission-workflows
# Build
- glide install -v
Expand Down
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# To run (from repo root): docker build -t fission -f ./build/Dockerfile .
ARG GOLANG_VERSION=1.10.0
ARG GOLANG_VERSION=1.11.0
FROM golang:$GOLANG_VERSION AS builder
ARG NOBUILD

Expand Down
4 changes: 2 additions & 2 deletions build/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ echo "------------------------------"
# Build client
CGO_ENABLED=0 GOOS=${goos} GOARCH=${goarch} go build \
-gcflags=-trimpath=${GOPATH} -asmflags=-trimpath=${GOPATH}\
-ldflags '-X "${versionPath}.BuildDate=${date}"'\
-ldflags '-X "${versionPath}.buildDate=${date}"'\
-o ${output_cli}\
github.com/fission/fission-workflows/cmd/fission-workflows/
echo "$(pwd)/${output_cli}"

# Build bundle
CGO_ENABLED=0 GOOS=${goos} GOARCH=${goarch} go build\
-gcflags=-trimpath=${GOPATH} -asmflags=-trimpath=${GOPATH}\
-ldflags '-X "${versionPath}.BuildDate=${date}"'\
-ldflags '-X "${versionPath}.buildDate=${date}"'\
-o ${output_bundle}\
github.com/fission/fission-workflows/cmd/fission-workflows-bundle/
echo "$(pwd)/${output_bundle}"
2 changes: 2 additions & 0 deletions build/docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ NOBUILD=${3:-false}
bundleImage=${IMAGE_REPO}/fission-workflows-bundle
pushd ${BUILD_ROOT}/..
if $NOBUILD ; then
echo "Using pre-build binaries..."
if [ ! -f ./fission-workflows-bundle ]; then
echo "Executable './fission-workflows-bundle' not found!"
exit 1;
Expand All @@ -24,6 +25,7 @@ if $NOBUILD ; then
exit 1;
fi
fi

echo "Building bundle..."
docker build --tag="${bundleImage}:${IMAGE_TAG}" -f ${BUILD_ROOT}/Dockerfile \
--no-cache \
Expand Down
5 changes: 5 additions & 0 deletions charts/fission-workflows/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ spec:
"--debug",
{{- end }}
]
imagePullPolicy: {{ .Values.pullPolicy }}
env:
- name: ES_NATS_URL
value: "nats://{{ .Values.nats.authToken }}@{{ .Values.nats.location }}.{{ .Values.fnenv.fission.ns }}:{{ .Values.nats.port }}"
Expand All @@ -123,4 +124,8 @@ spec:
builder:
image: "{{ .Values.buildEnvImage }}:{{.Values.tag}}"
command: "defaultBuild"
container:
imagePullPolicy: {{ .Values.pullPolicy }}


allowedFunctionsPerContainer: infinite
42 changes: 16 additions & 26 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/api/store"
"github.com/fission/fission-workflows/pkg/apiserver"
fissionproxy "github.com/fission/fission-workflows/pkg/apiserver/fission"
"github.com/fission/fission-workflows/pkg/controller"
"github.com/fission/fission-workflows/pkg/controller/expr"
wfictr "github.com/fission/fission-workflows/pkg/controller/invocation"
Expand Down Expand Up @@ -187,8 +188,8 @@ func Run(ctx context.Context, opts *Options) error {
}

// Caches
wfiCache := getInvocationStore(app, esPub, eventStore)
wfCache := getWorkflowStore(app, esPub, eventStore)
invocationStore := getInvocationStore(app, esPub, eventStore)
workflowStore := getWorkflowStore(app, esPub, eventStore)

//
// Function Runtimes
Expand All @@ -199,7 +200,7 @@ func Run(ctx context.Context, opts *Options) error {

if opts.InternalRuntime || opts.Fission != nil {
log.Infof("Using Task Runtime: Workflow")
reflectiveRuntime := workflows.NewRuntime(invocationAPI, wfiCache())
reflectiveRuntime := workflows.NewRuntime(invocationAPI, invocationStore)
runtimes[workflows.Name] = reflectiveRuntime
} else {
log.Info("No function runtimes specified.")
Expand Down Expand Up @@ -228,12 +229,12 @@ func Run(ctx context.Context, opts *Options) error {
var ctrls []controller.Controller
if opts.WorkflowController {
log.Info("Using controller: workflow")
ctrls = append(ctrls, setupWorkflowController(wfCache(), es, resolvers))
ctrls = append(ctrls, setupWorkflowController(workflowStore, es, resolvers))
}

if opts.InvocationController {
log.Info("Using controller: invocation")
ctrls = append(ctrls, setupInvocationController(wfiCache(), wfCache(), es, runtimes, resolvers))
ctrls = append(ctrls, setupInvocationController(invocationStore, workflowStore, es, runtimes, resolvers))
}

ctrl := controller.NewMetaController(ctrls...)
Expand All @@ -255,7 +256,7 @@ func Run(ctx context.Context, opts *Options) error {
//
if opts.Fission != nil {
proxyMux := http.NewServeMux()
runFissionEnvironmentProxy(proxyMux, es, wfiCache(), wfCache(), resolvers)
runFissionEnvironmentProxy(proxyMux, es, invocationStore, workflowStore, resolvers)
fissionProxySrv := &http.Server{Addr: fissionProxyAddress}
fissionProxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)

Expand Down Expand Up @@ -286,11 +287,11 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.WorkflowAPI {
serveWorkflowAPI(grpcServer, es, resolvers, wfCache())
serveWorkflowAPI(grpcServer, es, resolvers, workflowStore)
}

if opts.InvocationAPI {
serveInvocationAPI(grpcServer, es, wfiCache())
serveInvocationAPI(grpcServer, es, invocationStore)
}

if opts.AdminAPI || opts.WorkflowAPI || opts.InvocationAPI {
Expand Down Expand Up @@ -363,25 +364,14 @@ func Run(ctx context.Context, opts *Options) error {
return nil
}

func getWorkflowStore(app *App, eventPub pubsub.Publisher, backend fes.Backend) func() *store.Workflows {
var workflows *store.Workflows
return func() *store.Workflows {
if workflows == nil {
workflows = store.NewWorkflowsStore(setupWorkflowCache(app, eventPub, backend))
}
return workflows
}
func getWorkflowStore(app *App, eventPub pubsub.Publisher, backend fes.Backend) *store.Workflows {
c := setupWorkflowCache(app, eventPub, backend)
return store.NewWorkflowsStore(c)
}

func getInvocationStore(app *App, eventPub pubsub.Publisher, backend fes.Backend) func() *store.Invocations {
var invocations *store.Invocations
return func() *store.Invocations {
if invocations == nil {
invocations = store.NewInvocationStore(setupWorkflowInvocationCache(app, eventPub, backend))
}
return invocations

}
func getInvocationStore(app *App, eventPub pubsub.Publisher, backend fes.Backend) *store.Invocations {
c := setupWorkflowInvocationCache(app, eventPub, backend)
return store.NewInvocationStore(c)
}

func setupInternalFunctionRuntime() *native.FunctionEnv {
Expand Down Expand Up @@ -523,7 +513,7 @@ func runFissionEnvironmentProxy(proxyMux *http.ServeMux, es fes.Backend, invocat
wfServer := apiserver.NewWorkflow(workflowAPI, workflows)
wfiAPI := api.NewInvocationAPI(es)
wfiServer := apiserver.NewInvocation(wfiAPI, invocations)
fissionProxyServer := fission.NewFissionProxyServer(wfiServer, wfServer)
fissionProxyServer := fissionproxy.NewEnvironmentProxyServer(wfiServer, wfServer)
fissionProxyServer.RegisterServer(proxyMux)
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/fission-workflows/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/fission/fission-workflows/pkg/apiserver/httpclient"
"github.com/fission/fission-workflows/pkg/parse/yaml"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/golang/protobuf/ptypes"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ var cmdInvocation = cli.Command{
wfID := ctx.Args().Get(0)
spec := &types.WorkflowInvocationSpec{
WorkflowId: wfID,
Inputs: map[string]*types.TypedValue{},
Inputs: map[string]*typedvalues.TypedValue{},
}
if ctx.Bool("sync") {
resp, err := client.Invocation.InvokeSync(ctx, spec)
Expand Down
2 changes: 1 addition & 1 deletion compiling.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ environment meets all prerequisite requirements, and checkout the repo from gith
# Install dependencies
glide install -v

# Build the artifacts: wfcli, fission-workflows-bundle
# Build the artifacts: client (fission-workflows) and server (fission-workflows-bundle)
build/build-linux.sh

# Build the docker images (the NOBUILD parameter indicates that Docker should use the artifacts (wfci,
Expand Down
31 changes: 14 additions & 17 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ import:
- go/otgrpc
- package: github.com/hashicorp/golang-lru
version: v0.5.0
testImport:
- package: github.com/stretchr/testify
version: 1.1.4
subpackages:
- assert
version: v1.2.2
- package: github.com/imdario/mergo
version: v0.3.6
testImport:
- package: gopkg.in/ory-am/dockertest.v3
version: v3.3.1
- package: github.com/stretchr/testify
version: v1.2.2
24 changes: 24 additions & 0 deletions hack/verify-misc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

# Miscellaneous static code checks

# check <msg> <cmd...> - runs the command and adds a header with the status (OK or FAIL) to the output
check() {
msg=$1
shift
cmd=$@
printf "[check] ${msg}..."
output=""
if output=$(bash -c "${cmd}") ; then
printf "OK\n"
else
printf "FAIL\n"
if [ ! -z "${output}" ] ; then
echo ${output}
fi
return 1
fi
}

# Check if we don't accidentally use the gogo protobuf implementation, instead of the golang protobuf implementation.
check "no use of gogo-protobuf" ! grep -R 'github.com/gogo/protobuf' pkg/ cmd/
5 changes: 3 additions & 2 deletions pkg/api/aggregates/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ func (wf *Workflow) ApplyEvent(event *fes.Event) error {

switch m := eventData.(type) {
case *events.WorkflowCreated:
// Setup object
spec := m.GetSpec()
wf.BaseEntity = fes.NewBaseEntity(wf, *event.Aggregate)
wf.Workflow = &types.Workflow{
Metadata: &types.ObjectMetadata{
Id: wf.Aggregate().Id,
Name: spec.GetName(),
CreatedAt: event.GetTimestamp(),
},
Spec: m.GetSpec(),
Spec: spec,
Status: &types.WorkflowStatus{
Status: types.WorkflowStatus_PENDING,
},
Expand Down
13 changes: 7 additions & 6 deletions pkg/api/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/fission/fission-workflows/pkg/types/typedvalues/controlflow"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/golang/protobuf/proto"
)
Expand All @@ -23,15 +24,15 @@ func NewDynamicApi(wfAPI *Workflow, wfiAPI *Invocation) *Dynamic {

// AddDynamicFlow inserts the flow as a 'dynamic task' into the workflow invocation with id invocationID as the child
// of the parent task.
func (ap *Dynamic) AddDynamicFlow(invocationID string, parentTaskID string, flow typedvalues.Flow) error {
func (ap *Dynamic) AddDynamicFlow(invocationID string, parentTaskID string, flow controlflow.Flow) error {
if err := validate.Flow(flow); err != nil {
return err
}
switch flow.Type() {
case typedvalues.Workflow:
return ap.addDynamicWorkflow(invocationID, parentTaskID, flow.Workflow(), &types.TaskSpec{})
case typedvalues.Task:
return ap.addDynamicTask(invocationID, parentTaskID, flow.Task())
case controlflow.FlowTypeWorkflow:
return ap.addDynamicWorkflow(invocationID, parentTaskID, flow.GetWorkflow(), &types.TaskSpec{})
case controlflow.FlowTypeTask:
return ap.addDynamicTask(invocationID, parentTaskID, flow.GetTask())
default:
panic("validated flow was still empty")
}
Expand Down Expand Up @@ -72,7 +73,7 @@ func (ap *Dynamic) addDynamicWorkflow(invocationID string, parentTaskID string,
// Generate Proxy Task
proxyTaskSpec := proto.Clone(stubTask).(*types.TaskSpec)
proxyTaskSpec.FunctionRef = wfRef.Format()
proxyTaskSpec.Input(types.InputParent, typedvalues.ParseString(invocationID))
proxyTaskSpec.Input(types.InputParent, typedvalues.MustWrap(invocationID))
proxyTaskID := parentTaskID + "_child"
proxyTask := types.NewTask(proxyTaskID, proxyTaskSpec.FunctionRef)
proxyTask.Spec = proxyTaskSpec
Expand Down
Loading

0 comments on commit 90cf2ae

Please sign in to comment.