Skip to content

Commit

Permalink
adding go plugin as a new method in Computed (#87)
Browse files Browse the repository at this point in the history
* update

* newpatch

* Abstract loader.

* Use computed for all

* Update with minor fix.

* Update code , and test all

* Support empty filter.

* Delete comment

* Update so, will complie it before test.

* add missing dir.

* Add go version info and sync stdout

* Upgrade CI go version

* Skip plugin test in CI

Co-authored-by: YongHao Hu <[email protected]>
  • Loading branch information
atlas-comstock and hiohiograb authored May 6, 2022
1 parent 829bfed commit edc2d8e
Show file tree
Hide file tree
Showing 38 changed files with 590 additions and 294 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ jobs:
- name: Install Go
uses: actions/setup-go@v1
with:
go-version: 1.16
go-version: 1.17
id: go

- name: Check out code into the Go module directory
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -13,3 +13,5 @@ testdata-*
.envrc
logs/
*.venv
talaria
*.so
7 changes: 5 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ require (
github.com/fraugster/parquet-go v0.3.0
github.com/golang/snappy v0.0.3
github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d // indirect
github.com/gorilla/mux v1.7.4
github.com/grab/async v0.0.5
github.com/hako/durafmt v0.0.0-20191009132224-3f39dc1ed9f4
github.com/hashicorp/go-immutable-radix v1.2.0 // indirect
Expand Down Expand Up @@ -61,10 +60,14 @@ require (

require github.com/Azure/go-autorest/autorest/adal v0.9.11

require (
github.com/Azure/go-autorest/autorest v0.11.17
github.com/gorilla/mux v1.8.0
)

require (
github.com/Azure/azure-pipeline-go v0.2.3 // indirect
github.com/Azure/go-autorest v14.2.0+incompatible // indirect
github.com/Azure/go-autorest/autorest v0.11.17 // indirect
github.com/Azure/go-autorest/autorest/azure/cli v0.4.2 // indirect
github.com/Azure/go-autorest/autorest/date v0.3.0 // indirect
github.com/Azure/go-autorest/logger v0.2.0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -226,8 +226,8 @@ github.com/googleapis/gax-go/v2 v2.0.5/go.mod h1:DWXyrwAJ9X0FpwwEdw+IPEYBICEFu5m
github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d h1:vr95xIx8Eg3vCzZPxY3rCwTfkjqNDt/FgVqTOk0WByk=
github.com/gopherjs/gopherjs v0.0.0-20200209183636-89e6cbcd0b6d/go.mod h1:wJfORRmW1u3UXTncJ5qlYoELFm8eSnnEO6hX4iZ3EWY=
github.com/gorilla/mux v1.7.4 h1:VuZ8uybHlWmqV03+zRzdwKL4tUnIp1MAQtp1mIFE1bc=
github.com/gorilla/mux v1.7.4/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/gorilla/mux v1.8.0 h1:i40aqfkR1h2SlN9hojwV5ZA91wcXFOvkdNIeFDP5koI=
github.com/gorilla/mux v1.8.0/go.mod h1:DVbg23sWSpFRCP0SfiEN6jmj59UnW/n46BH5rLB71So=
github.com/grab/async v0.0.5 h1:QESBFAKiq5ocTtIcRh8gdha5Xgvu0yStGUefZsOWLPc=
github.com/grab/async v0.0.5/go.mod h1:8zY9m1tryEmU2px8GLmWrHt7QXSQOhyurytRQ3LrzjQ=
github.com/grpc-ecosystem/grpc-gateway v1.16.0/go.mod h1:BDjrQk3hbvj6Nolgz8mAMFbcEtjT1g+wF4CSlocrBnw=
Expand Down
108 changes: 27 additions & 81 deletions internal/column/computed.go → internal/column/computed/computed.go
Original file line number Diff line number Diff line change
@@ -1,117 +1,63 @@
// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file

package column
package computed

import (
"context"
"crypto/rand"
"encoding/binary"
"encoding/hex"
"fmt"
"sync/atomic"
"time"

"github.com/kelindar/loader"
"github.com/kelindar/lua"
"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/scripting"
"github.com/kelindar/talaria/internal/monitor"
script "github.com/kelindar/talaria/internal/scripting"
mlog "github.com/kelindar/talaria/internal/scripting/log"
mnet "github.com/kelindar/talaria/internal/scripting/net"
mstats "github.com/kelindar/talaria/internal/scripting/stats"
)

// Loader is the default loader to use for loading computed columns.
var Loader = loader.New()

// Default empty script
const emptyScript = `function main(row)
return null
end`
const (
LuaLoaderTyp = "lua"
PluginLoaderTyp = "plugin"
)

// Computed represents a computed column
type Computed interface {
Name() string
Name() string // return column 's name
Type() typeof.Type
Value(map[string]interface{}) (interface{}, error)
}

// NewComputed creates a new script from a string
func NewComputed(name string, typ typeof.Type, uriOrCode string, loader *script.Loader) (Computed, error) {
func NewComputed(columnName, functionName string, outpuTyp typeof.Type, uriOrCode string, monitor monitor.Monitor) (Computed, error) {
switch uriOrCode {
case "make://identifier":
return newIdentifier(name), nil
return newIdentifier(columnName), nil
case "make://timestamp":
return newTimestamp(name), nil
return newTimestamp(columnName), nil
}

s, err := loader.Load(name, uriOrCode)
if err != nil {
return nil, err
}
pluginLoader := script.NewPluginLoader(functionName)
luaLoader := script.NewLuaLoader([]lua.Module{
mlog.New(monitor),
mstats.New(monitor),
mnet.New(monitor),
}, outpuTyp)
l := script.NewHandlerLoader(pluginLoader, luaLoader)

return &scripted{
code: s,
typ: typ,
}, nil
}

// ------------------------------------------------------------------------------------------------------------

// scripted represents a computed column computed through a lua script
type scripted struct {
code *lua.Script // The script associated with the column
typ typeof.Type // The type of the column
}

// Name returns the name of the column
func (c *scripted) Name() string {
return c.code.Name()
}

// Type returns the type of the column
func (c *scripted) Type() typeof.Type {
return c.typ
}

// Value computes the column value for the row
func (c *scripted) Value(row map[string]interface{}) (interface{}, error) {
ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond)
defer cancel()

// Run the script
out, err := c.code.Run(ctx, row)
h, err := l.LoadHandler(uriOrCode)
if err != nil {
return nil, err
}

// If there's no new row generated, return nil
if out.Type() == lua.TypeNil {
return nil, nil
}

switch c.typ {
case typeof.Bool:
if v, ok := out.(lua.Bool); ok {
return bool(v), nil
}
case typeof.Int32:
if v, ok := out.(lua.Number); ok {
return int32(v), nil
}
case typeof.Int64, typeof.Timestamp:
if v, ok := out.(lua.Number); ok {
return int64(v), nil
}
case typeof.Float64:
if v, ok := out.(lua.Number); ok {
return float64(v), nil
}
case typeof.String, typeof.JSON:
if v, ok := out.(lua.String); ok {
return string(v), nil
}
}

// Type mismatch
return nil, fmt.Errorf("script expects %s type but got %T", c.typ.String(), out)
return &loadComputed{
name: columnName,
loader: h,
typ: outpuTyp,
}, nil
}

// ------------------------------------------------------------------------------------------------------------
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,12 @@
// Copyright 2019-2020 Grabtaxi Holdings PTE LTE (GRAB), All rights reserved.
// Use of this source code is governed by an MIT-style license that can be found in the LICENSE file

package column
package computed

import (
"testing"

"github.com/kelindar/talaria/internal/encoding/typeof"
"github.com/kelindar/talaria/internal/scripting"
"github.com/stretchr/testify/assert"
)

Expand All @@ -24,7 +23,7 @@ func Test_Computed(t *testing.T) {
}

func Test_Identifier(t *testing.T) {
c, err := NewComputed("id", typeof.String, "make://identifier", nil)
c, err := NewComputed("id", "", typeof.String, "make://identifier", nil)
assert.NoError(t, err)
out, err := c.Value(map[string]interface{}{
"a": 1,
Expand All @@ -39,7 +38,7 @@ func Test_Identifier(t *testing.T) {
}

func Test_Timestamp(t *testing.T) {
c, err := NewComputed("ts", typeof.String, "make://timestamp", nil)
c, err := NewComputed("ts", "", typeof.String, "make://timestamp", nil)
assert.NoError(t, err)
out, err := c.Value(map[string]interface{}{
"a": 1,
Expand All @@ -54,8 +53,8 @@ func Test_Timestamp(t *testing.T) {
}

func Test_Download(t *testing.T) {
l := script.NewLoader(nil)
c, err := NewComputed("data", typeof.JSON, "https://raw.githubusercontent.com/kelindar/lua/master/fixtures/json.lua", l)
c, err := NewComputed("data", "main", typeof.JSON, "https://raw.githubusercontent.com/kelindar/lua/master/fixtures/json.lua", nil)
assert.NoError(t, err)
out, err := c.Value(map[string]interface{}{
"a": 1,
"b": "hello",
Expand All @@ -67,13 +66,13 @@ func Test_Download(t *testing.T) {
}

func newDataColumn(t *testing.T) Computed {
l := script.NewLoader(nil)
c, err := NewComputed("data", typeof.JSON, `
c, err := NewComputed("data", "main", typeof.JSON, `
local json = require("json")
function main(row)
function main(row)
return json.encode(row)
end`, l)
end`, nil)
assert.NotNil(t, c)
assert.NoError(t, err)
return c
}
24 changes: 24 additions & 0 deletions internal/column/computed/lua.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
package computed

import (
"github.com/kelindar/talaria/internal/encoding/typeof"
script "github.com/kelindar/talaria/internal/scripting"
)

type loadComputed struct {
name string // Name of the column
loader script.Handler
typ typeof.Type
}

func (l *loadComputed) Name() string {
return l.name
}

func (l *loadComputed) Type() typeof.Type {
return l.typ
}

func (l *loadComputed) Value(row map[string]interface{}) (interface{}, error) {
return l.loader.Value(row)
}
7 changes: 4 additions & 3 deletions internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -114,9 +114,10 @@ type StatsD struct {

// Computed represents a computed column
type Computed struct {
Name string `json:"name"`
Type typeof.Type `json:"type"`
Func string `json:"func"`
Name string `json:"name"`
Type typeof.Type `json:"type"`
Func string `json:"func"`
FuncName string `json:"funcname"`
}

// Compaction represents a configuration for compaction sinks
Expand Down
7 changes: 6 additions & 1 deletion internal/config/env/configurer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package env

import (
"errors"
"io/ioutil"
"os"
"reflect"
"strconv"
Expand Down Expand Up @@ -34,7 +35,11 @@ func New(key string) *Configurer {
// Configure fetches the values of the env variable for file name and sets that in the config
func (e *Configurer) Configure(c *config.Config) error {
if v, ok := os.LookupEnv(e.key); ok {
return yaml.Unmarshal([]byte(v), c)
yamlFile, err := ioutil.ReadFile(v)
if err != nil {
return yaml.Unmarshal([]byte(v), c)
}
return yaml.Unmarshal([]byte(yamlFile), c)
}

populate(c, e.key)
Expand Down
5 changes: 3 additions & 2 deletions internal/config/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package config

import (
"context"
"fmt"
"log"
"sync/atomic"
"time"
Expand All @@ -27,7 +28,7 @@ func newStore(li time.Duration, co []Configurer) *store {

c, err := s.value()
if err != nil {
panic("unable to load config")
panic(fmt.Sprintf("unable to load config, err is %v", err))
}
s.config.Store(c)
return s
Expand Down Expand Up @@ -56,7 +57,7 @@ func (cs *store) value() (*Config, error) {
// Iterate through all the loaders to fill this config object
for _, p := range cs.configurers {
if err := p.Configure(c); err != nil {
log.Printf("%s : error in loadig config %s", p, err)
log.Printf("%+v : error in loadig config %s", p, err)
return nil, err
}
}
Expand Down
9 changes: 4 additions & 5 deletions internal/encoding/block/from_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,8 @@ import (
"encoding/json"
"testing"

"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/column/computed"
"github.com/kelindar/talaria/internal/encoding/typeof"
script "github.com/kelindar/talaria/internal/scripting"
talaria "github.com/kelindar/talaria/proto"
"github.com/stretchr/testify/assert"
)
Expand Down Expand Up @@ -108,11 +107,11 @@ func TestBlock_FromBatch(t *testing.T) {
assert.Contains(t, string(row["data"].(json.RawMessage)), "event3")
}

func newDataColumn() (column.Computed, error) {
return column.NewComputed("data", typeof.JSON, `
func newDataColumn() (computed.Computed, error) {
return computed.NewComputed("data", "main", typeof.JSON, `
local json = require("json")
function main(input)
return json.encode(input)
end`, script.NewLoader(nil))
end`, nil)
}
3 changes: 3 additions & 0 deletions internal/encoding/block/from_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ func FromCSVBy(input []byte, partitionBy string, filter *typeof.Schema, apply ap

// Read the header first
r, err := rdr.Read()
if err != nil {
return nil, err
}
header := r

// Find the partition index
Expand Down
4 changes: 2 additions & 2 deletions internal/encoding/block/transform.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,12 @@
package block

import (
"github.com/kelindar/talaria/internal/column"
"github.com/kelindar/talaria/internal/column/computed"
"github.com/kelindar/talaria/internal/encoding/typeof"
)

// Transform runs the computed Values and overwrites/appends them to the set.
func Transform(filter *typeof.Schema, computed ...column.Computed) applyFunc {
func Transform(filter *typeof.Schema, computed ...computed.Computed) applyFunc {
return func(r Row) (Row, error) {
// Create a new output row and copy the column values from the input
schema := make(typeof.Schema, len(r.Schema))
Expand Down
Loading

0 comments on commit edc2d8e

Please sign in to comment.