Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Formalize typedvalues implementation #218

Merged
merged 22 commits into from
Oct 3, 2018
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
6a05eb2
protobuf implementation
erwinvaneyk Sep 23, 2018
4a765c3
Fixed multiple cache instantiations
erwinvaneyk Sep 26, 2018
1430ebd
Added mediatype utils (wraps pkg/mime)
erwinvaneyk Sep 26, 2018
681285d
TypedValues v2: moved to protobuf-backed implementation
erwinvaneyk Sep 26, 2018
086005f
Bump Golang builder version 1.10.0 -> 1.11.0
erwinvaneyk Sep 26, 2018
54e1dca
Fied NPE in MediaType.Copy
erwinvaneyk Sep 27, 2018
768b178
Moved typedvalues/types.go to typedvalues/valuetypes.go
erwinvaneyk Sep 27, 2018
bbc3a58
Ensured http.Headers exist when written to in httpconv
erwinvaneyk Sep 27, 2018
6f6296f
Added flow as a value type
erwinvaneyk Sep 28, 2018
27480d5
Add static code check to verify we are not mixing gogo and golang pro…
erwinvaneyk Sep 28, 2018
343c8df
Moved FlowType definitions to init function
erwinvaneyk Sep 28, 2018
c9e0f3b
Replaced the custom identifier structs with objectmetadata in apiserver
erwinvaneyk Sep 28, 2018
8a87893
Added explicit dependency on mergo
erwinvaneyk Oct 1, 2018
f5bd3b0
Fixed HTTP runtime to use custom httpconv
erwinvaneyk Oct 1, 2018
e842964
Added mapping to deprecated default input in httpconv
erwinvaneyk Oct 1, 2018
4e76547
Removed sleeps from e2e tests
erwinvaneyk Oct 1, 2018
bc9388c
Added a fallback to default field in invocation
erwinvaneyk Oct 1, 2018
7f39e60
Updated while internal function to default to no wait instead of 100ms
erwinvaneyk Oct 1, 2018
a9fc617
Moved envproxy to seperate package in apiserver
erwinvaneyk Oct 1, 2018
31017db
Fixed the expression scope inheritance
erwinvaneyk Oct 1, 2018
289111d
Fixed travis CI setup
erwinvaneyk Oct 2, 2018
8577a2a
Removed left over debug code
erwinvaneyk Oct 3, 2018
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 3 additions & 3 deletions .travis.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ sudo: required
dist: trusty
language: go
go:
- 1.9
- "1.10"

python:
- 3.6
Expand All @@ -22,8 +22,7 @@ env:
- PATH=/tmp/fission-workflow-ci/bin:${PATH}
- BIN_DIR=/tmp/fission-workflow-ci/bin
- FISSION_VERSION=0.10.0
- HELM_VERSION=2.8.2
- KUBECTL_VERSION=1.9.6
- HELM_VERSION=2.11.0

services:
- docker
Expand All @@ -45,6 +44,7 @@ before_script:
# Static code analysis
- hack/verify-gofmt.sh
- hack/verify-govet.sh
- hack/verify-misc.sh
- helm lint charts/fission-workflows
# Build
- glide install -v
Expand Down
2 changes: 1 addition & 1 deletion build/Dockerfile
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
# To run (from repo root): docker build -t fission -f ./build/Dockerfile .
ARG GOLANG_VERSION=1.10.0
ARG GOLANG_VERSION=1.11.0
FROM golang:$GOLANG_VERSION AS builder
ARG NOBUILD

Expand Down
4 changes: 2 additions & 2 deletions build/build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -75,15 +75,15 @@ echo "------------------------------"
# Build client
CGO_ENABLED=0 GOOS=${goos} GOARCH=${goarch} go build \
-gcflags=-trimpath=${GOPATH} -asmflags=-trimpath=${GOPATH}\
-ldflags '-X "${versionPath}.BuildDate=${date}"'\
-ldflags '-X "${versionPath}.buildDate=${date}"'\
-o ${output_cli}\
github.com/fission/fission-workflows/cmd/fission-workflows/
echo "$(pwd)/${output_cli}"

# Build bundle
CGO_ENABLED=0 GOOS=${goos} GOARCH=${goarch} go build\
-gcflags=-trimpath=${GOPATH} -asmflags=-trimpath=${GOPATH}\
-ldflags '-X "${versionPath}.BuildDate=${date}"'\
-ldflags '-X "${versionPath}.buildDate=${date}"'\
-o ${output_bundle}\
github.com/fission/fission-workflows/cmd/fission-workflows-bundle/
echo "$(pwd)/${output_bundle}"
2 changes: 2 additions & 0 deletions build/docker.sh
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ NOBUILD=${3:-false}
bundleImage=${IMAGE_REPO}/fission-workflows-bundle
pushd ${BUILD_ROOT}/..
if $NOBUILD ; then
echo "Using pre-build binaries..."
if [ ! -f ./fission-workflows-bundle ]; then
echo "Executable './fission-workflows-bundle' not found!"
exit 1;
Expand All @@ -24,6 +25,7 @@ if $NOBUILD ; then
exit 1;
fi
fi

echo "Building bundle..."
docker build --tag="${bundleImage}:${IMAGE_TAG}" -f ${BUILD_ROOT}/Dockerfile \
--no-cache \
Expand Down
5 changes: 5 additions & 0 deletions charts/fission-workflows/templates/deployment.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,7 @@ spec:
"--debug",
{{- end }}
]
imagePullPolicy: {{ .Values.pullPolicy }}
env:
- name: ES_NATS_URL
value: "nats://{{ .Values.nats.authToken }}@{{ .Values.nats.location }}.{{ .Values.fnenv.fission.ns }}:{{ .Values.nats.port }}"
Expand All @@ -123,4 +124,8 @@ spec:
builder:
image: "{{ .Values.buildEnvImage }}:{{.Values.tag}}"
command: "defaultBuild"
container:
imagePullPolicy: {{ .Values.pullPolicy }}


allowedFunctionsPerContainer: infinite
42 changes: 16 additions & 26 deletions cmd/fission-workflows-bundle/bundle/bundle.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/fission/fission-workflows/pkg/api/aggregates"
"github.com/fission/fission-workflows/pkg/api/store"
"github.com/fission/fission-workflows/pkg/apiserver"
fissionproxy "github.com/fission/fission-workflows/pkg/apiserver/fission"
"github.com/fission/fission-workflows/pkg/controller"
"github.com/fission/fission-workflows/pkg/controller/expr"
wfictr "github.com/fission/fission-workflows/pkg/controller/invocation"
Expand Down Expand Up @@ -187,8 +188,8 @@ func Run(ctx context.Context, opts *Options) error {
}

// Caches
wfiCache := getInvocationStore(app, esPub, eventStore)
wfCache := getWorkflowStore(app, esPub, eventStore)
invocationStore := getInvocationStore(app, esPub, eventStore)
workflowStore := getWorkflowStore(app, esPub, eventStore)

//
// Function Runtimes
Expand All @@ -199,7 +200,7 @@ func Run(ctx context.Context, opts *Options) error {

if opts.InternalRuntime || opts.Fission != nil {
log.Infof("Using Task Runtime: Workflow")
reflectiveRuntime := workflows.NewRuntime(invocationAPI, wfiCache())
reflectiveRuntime := workflows.NewRuntime(invocationAPI, invocationStore)
runtimes[workflows.Name] = reflectiveRuntime
} else {
log.Info("No function runtimes specified.")
Expand Down Expand Up @@ -228,12 +229,12 @@ func Run(ctx context.Context, opts *Options) error {
var ctrls []controller.Controller
if opts.WorkflowController {
log.Info("Using controller: workflow")
ctrls = append(ctrls, setupWorkflowController(wfCache(), es, resolvers))
ctrls = append(ctrls, setupWorkflowController(workflowStore, es, resolvers))
}

if opts.InvocationController {
log.Info("Using controller: invocation")
ctrls = append(ctrls, setupInvocationController(wfiCache(), wfCache(), es, runtimes, resolvers))
ctrls = append(ctrls, setupInvocationController(invocationStore, workflowStore, es, runtimes, resolvers))
}

ctrl := controller.NewMetaController(ctrls...)
Expand All @@ -255,7 +256,7 @@ func Run(ctx context.Context, opts *Options) error {
//
if opts.Fission != nil {
proxyMux := http.NewServeMux()
runFissionEnvironmentProxy(proxyMux, es, wfiCache(), wfCache(), resolvers)
runFissionEnvironmentProxy(proxyMux, es, invocationStore, workflowStore, resolvers)
fissionProxySrv := &http.Server{Addr: fissionProxyAddress}
fissionProxySrv.Handler = handlers.LoggingHandler(os.Stdout, proxyMux)

Expand Down Expand Up @@ -286,11 +287,11 @@ func Run(ctx context.Context, opts *Options) error {
}

if opts.WorkflowAPI {
serveWorkflowAPI(grpcServer, es, resolvers, wfCache())
serveWorkflowAPI(grpcServer, es, resolvers, workflowStore)
}

if opts.InvocationAPI {
serveInvocationAPI(grpcServer, es, wfiCache())
serveInvocationAPI(grpcServer, es, invocationStore)
}

if opts.AdminAPI || opts.WorkflowAPI || opts.InvocationAPI {
Expand Down Expand Up @@ -363,25 +364,14 @@ func Run(ctx context.Context, opts *Options) error {
return nil
}

func getWorkflowStore(app *App, eventPub pubsub.Publisher, backend fes.Backend) func() *store.Workflows {
var workflows *store.Workflows
return func() *store.Workflows {
if workflows == nil {
workflows = store.NewWorkflowsStore(setupWorkflowCache(app, eventPub, backend))
}
return workflows
}
func getWorkflowStore(app *App, eventPub pubsub.Publisher, backend fes.Backend) *store.Workflows {
c := setupWorkflowCache(app, eventPub, backend)
return store.NewWorkflowsStore(c)
}

func getInvocationStore(app *App, eventPub pubsub.Publisher, backend fes.Backend) func() *store.Invocations {
var invocations *store.Invocations
return func() *store.Invocations {
if invocations == nil {
invocations = store.NewInvocationStore(setupWorkflowInvocationCache(app, eventPub, backend))
}
return invocations

}
func getInvocationStore(app *App, eventPub pubsub.Publisher, backend fes.Backend) *store.Invocations {
c := setupWorkflowInvocationCache(app, eventPub, backend)
return store.NewInvocationStore(c)
}

func setupInternalFunctionRuntime() *native.FunctionEnv {
Expand Down Expand Up @@ -523,7 +513,7 @@ func runFissionEnvironmentProxy(proxyMux *http.ServeMux, es fes.Backend, invocat
wfServer := apiserver.NewWorkflow(workflowAPI, workflows)
wfiAPI := api.NewInvocationAPI(es)
wfiServer := apiserver.NewInvocation(wfiAPI, invocations)
fissionProxyServer := fission.NewFissionProxyServer(wfiServer, wfServer)
fissionProxyServer := fissionproxy.NewEnvironmentProxyServer(wfiServer, wfServer)
fissionProxyServer.RegisterServer(proxyMux)
}

Expand Down
3 changes: 2 additions & 1 deletion cmd/fission-workflows/invocation.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ import (
"github.com/fission/fission-workflows/pkg/apiserver/httpclient"
"github.com/fission/fission-workflows/pkg/parse/yaml"
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/golang/protobuf/ptypes"
"github.com/urfave/cli"
)
Expand Down Expand Up @@ -104,7 +105,7 @@ var cmdInvocation = cli.Command{
wfID := ctx.Args().Get(0)
spec := &types.WorkflowInvocationSpec{
WorkflowId: wfID,
Inputs: map[string]*types.TypedValue{},
Inputs: map[string]*typedvalues.TypedValue{},
}
if ctx.Bool("sync") {
resp, err := client.Invocation.InvokeSync(ctx, spec)
Expand Down
2 changes: 1 addition & 1 deletion compiling.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ environment meets all prerequisite requirements, and checkout the repo from gith
# Install dependencies
glide install -v

# Build the artifacts: wfcli, fission-workflows-bundle
# Build the artifacts: client (fission-workflows) and server (fission-workflows-bundle)
build/build-linux.sh

# Build the docker images (the NOBUILD parameter indicates that Docker should use the artifacts (wfci,
Expand Down
31 changes: 14 additions & 17 deletions glide.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 6 additions & 4 deletions glide.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,12 @@ import:
- go/otgrpc
- package: github.com/hashicorp/golang-lru
version: v0.5.0
testImport:
- package: github.com/stretchr/testify
version: 1.1.4
subpackages:
- assert
version: v1.2.2
- package: github.com/imdario/mergo
version: v0.3.6
testImport:
- package: gopkg.in/ory-am/dockertest.v3
version: v3.3.1
- package: github.com/stretchr/testify
version: v1.2.2
24 changes: 24 additions & 0 deletions hack/verify-misc.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
#!/usr/bin/env bash

# Miscellaneous static code checks

# check <msg> <cmd...> - runs the command and adds a header with the status (OK or FAIL) to the output
check() {
msg=$1
shift
cmd=$@
printf "[check] ${msg}..."
output=""
if output=$(bash -c "${cmd}") ; then
printf "OK\n"
else
printf "FAIL\n"
if [ ! -z "${output}" ] ; then
echo ${output}
fi
return 1
fi
}

# Check if we don't accidentally use the gogo protobuf implementation, instead of the golang protobuf implementation.
check "no use of gogo-protobuf" ! grep -R 'github.com/gogo/protobuf' pkg/ cmd/
5 changes: 3 additions & 2 deletions pkg/api/aggregates/workflow.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,14 +49,15 @@ func (wf *Workflow) ApplyEvent(event *fes.Event) error {

switch m := eventData.(type) {
case *events.WorkflowCreated:
// Setup object
spec := m.GetSpec()
wf.BaseEntity = fes.NewBaseEntity(wf, *event.Aggregate)
wf.Workflow = &types.Workflow{
Metadata: &types.ObjectMetadata{
Id: wf.Aggregate().Id,
Name: spec.GetName(),
CreatedAt: event.GetTimestamp(),
},
Spec: m.GetSpec(),
Spec: spec,
Status: &types.WorkflowStatus{
Status: types.WorkflowStatus_PENDING,
},
Expand Down
13 changes: 7 additions & 6 deletions pkg/api/dynamic.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package api
import (
"github.com/fission/fission-workflows/pkg/types"
"github.com/fission/fission-workflows/pkg/types/typedvalues"
"github.com/fission/fission-workflows/pkg/types/typedvalues/controlflow"
"github.com/fission/fission-workflows/pkg/types/validate"
"github.com/golang/protobuf/proto"
)
Expand All @@ -23,15 +24,15 @@ func NewDynamicApi(wfAPI *Workflow, wfiAPI *Invocation) *Dynamic {

// AddDynamicFlow inserts the flow as a 'dynamic task' into the workflow invocation with id invocationID as the child
// of the parent task.
func (ap *Dynamic) AddDynamicFlow(invocationID string, parentTaskID string, flow typedvalues.Flow) error {
func (ap *Dynamic) AddDynamicFlow(invocationID string, parentTaskID string, flow controlflow.Flow) error {
if err := validate.Flow(flow); err != nil {
return err
}
switch flow.Type() {
case typedvalues.Workflow:
return ap.addDynamicWorkflow(invocationID, parentTaskID, flow.Workflow(), &types.TaskSpec{})
case typedvalues.Task:
return ap.addDynamicTask(invocationID, parentTaskID, flow.Task())
case controlflow.FlowTypeWorkflow:
return ap.addDynamicWorkflow(invocationID, parentTaskID, flow.GetWorkflow(), &types.TaskSpec{})
case controlflow.FlowTypeTask:
return ap.addDynamicTask(invocationID, parentTaskID, flow.GetTask())
default:
panic("validated flow was still empty")
}
Expand Down Expand Up @@ -72,7 +73,7 @@ func (ap *Dynamic) addDynamicWorkflow(invocationID string, parentTaskID string,
// Generate Proxy Task
proxyTaskSpec := proto.Clone(stubTask).(*types.TaskSpec)
proxyTaskSpec.FunctionRef = wfRef.Format()
proxyTaskSpec.Input(types.InputParent, typedvalues.ParseString(invocationID))
proxyTaskSpec.Input(types.InputParent, typedvalues.MustWrap(invocationID))
proxyTaskID := parentTaskID + "_child"
proxyTask := types.NewTask(proxyTaskID, proxyTaskSpec.FunctionRef)
proxyTask.Spec = proxyTaskSpec
Expand Down
Loading