Skip to content

Commit

Permalink
Allow FS to be swapped by service users (#1618)
Browse files Browse the repository at this point in the history
* Added the ability to use a different FS for Streams

* Replaced FS struct with an FS interface instead

* Fix linting

* Brought back service.FS struct, used constructor and wrapper to pass in custom FS implementation
  • Loading branch information
codegangsta authored Dec 15, 2022
1 parent 66a29c1 commit 3a7c024
Show file tree
Hide file tree
Showing 5 changed files with 135 additions and 0 deletions.
8 changes: 8 additions & 0 deletions internal/manager/type.go
Original file line number Diff line number Diff line change
Expand Up @@ -158,6 +158,14 @@ func OptSetStreamsMode(b bool) OptFunc {
}
}

// OptSetFS determines which ifs.FS implementation to use for its filesystem.
// This can be used to override the default os based filesystem implementation.
func OptSetFS(fs ifs.FS) OptFunc {
return func(t *Type) {
t.fs = fs
}
}

// New returns an instance of manager.Type, which can be shared amongst
// components and logical threads of a Benthos service.
func New(conf ResourceConfig, opts ...OptFunc) (*Type, error) {
Expand Down
10 changes: 10 additions & 0 deletions public/service/environment.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"github.com/benthosdev/benthos/v4/internal/component/tracer"
"github.com/benthosdev/benthos/v4/internal/config"
"github.com/benthosdev/benthos/v4/internal/docs"
"github.com/benthosdev/benthos/v4/internal/filepath/ifs"
"github.com/benthosdev/benthos/v4/public/bloblang"
)

Expand All @@ -32,11 +33,13 @@ import (
type Environment struct {
internal *bundle.Environment
bloblangEnv *bloblang.Environment
fs ifs.FS
}

var globalEnvironment = &Environment{
internal: bundle.GlobalEnvironment,
bloblangEnv: bloblang.GlobalEnvironment(),
fs: ifs.OS(),
}

// GlobalEnvironment returns a reference to the global environment, adding
Expand All @@ -58,6 +61,7 @@ func (e *Environment) Clone() *Environment {
return &Environment{
internal: e.internal.Clone(),
bloblangEnv: e.bloblangEnv.WithoutFunctions().WithoutMethods(),
fs: e.fs,
}
}

Expand All @@ -67,6 +71,12 @@ func (e *Environment) UseBloblangEnvironment(bEnv *bloblang.Environment) {
e.bloblangEnv = bEnv
}

// UseFS configures the service environment to use an implementation of ifs.FS
// as its filesystem.
func (e *Environment) UseFS(fs *FS) {
e.fs = fs
}

// NewStreamBuilder creates a new StreamBuilder upon the defined environment,
// only components known to this environment will be available to the stream
// builder.
Expand Down
73 changes: 73 additions & 0 deletions public/service/environment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,18 @@ package service_test
import (
"context"
"errors"
"fmt"
"io/fs"
"os"
"path/filepath"
"testing"
"testing/fstest"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/benthosdev/benthos/v4/internal/filepath/ifs"
"github.com/benthosdev/benthos/v4/public/bloblang"
"github.com/benthosdev/benthos/v4/public/service"
)
Expand Down Expand Up @@ -147,3 +153,70 @@ logger:
require.NoError(t, strm.StopWithin(time.Second))
assert.Equal(t, []string{"meow"}, received)
}

type testFS struct {
ifs.FS
override fstest.MapFS
}

func (fs testFS) Open(name string) (fs.File, error) {
if f, err := fs.override.Open(name); err == nil {
return f, nil
}

return fs.FS.Open(name)
}

func (fs testFS) OpenFile(name string, flag int, perm fs.FileMode) (fs.File, error) {
if f, err := fs.override.Open(name); err == nil {
return f, nil
}

return fs.FS.OpenFile(name, flag, perm)
}

func (fs testFS) Stat(name string) (fs.FileInfo, error) {
if f, err := fs.override.Stat(name); err == nil {
return f, nil
}

return fs.FS.Stat(name)
}

func TestEnvironmentUseFS(t *testing.T) {
tmpDir := t.TempDir()
outFilePath := filepath.Join(tmpDir, "out.txt")

env := service.NewEnvironment()
env.UseFS(service.NewFS(testFS{ifs.OS(), fstest.MapFS{
"hello.txt": {
Data: []byte("hello\nworld"),
},
}}))

b := env.NewStreamBuilder()

require.NoError(t, b.SetYAML(fmt.Sprintf(`
input:
file:
paths: [hello.txt]
output:
label: foo
file:
codec: lines
path: %v
`, outFilePath)))

strm, err := b.Build()
require.NoError(t, err)

require.NoError(t, strm.Run(context.Background()))

outBytes, err := os.ReadFile(outFilePath)
require.NoError(t, err)

assert.Equal(t, `hello
world
`, string(outBytes))
}
43 changes: 43 additions & 0 deletions public/service/resources.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,12 +84,55 @@ func (r *Resources) OtelTracer() trace.TracerProvider {
return r.mgr.Tracer()
}

// wrapperFS provides extra methods support around a bare fs.FS that does
// fully implement ifs.FS, this allows us to keep some clean interfaces while
// also ensuring backward compatibility.
type wrapperFS struct {
fs fs.FS
fallback ifs.FS
}

// Open opens the named file for reading.
func (f *wrapperFS) Open(name string) (fs.File, error) {
return f.fs.Open(name)
}

// OpenFile is the generalized open call.
func (f *wrapperFS) OpenFile(name string, flag int, perm fs.FileMode) (fs.File, error) {
return f.fallback.OpenFile(name, flag, perm)
}

// Stat returns a FileInfo describing the named file.
func (f *wrapperFS) Stat(name string) (fs.FileInfo, error) {
return f.fallback.Stat(name)
}

// Remove removes the named file or (empty) directory.
func (f *wrapperFS) Remove(name string) error {
return f.fallback.Remove(name)
}

// MkdirAll creates a directory named path, along with any necessary parents,
// and returns nil, or else returns an error.
func (f *wrapperFS) MkdirAll(path string, perm fs.FileMode) error {
return f.fallback.MkdirAll(path, perm)
}

// FS implements a superset of fs.FS and includes goodies that benthos
// components specifically need.
type FS struct {
i ifs.FS
}

// NewFS provides a new instance of a filesystem. The fs.FS passed in can
// optionally implement methods from benthos ifs.FS
func NewFS(filesystem fs.FS) *FS {
if fsimpl, ok := filesystem.(ifs.FS); ok {
return &FS{fsimpl}
}
return &FS{&wrapperFS{filesystem, ifs.OS()}}
}

// Open opens the named file for reading.
func (f *FS) Open(name string) (fs.File, error) {
return f.i.Open(name)
Expand Down
1 change: 1 addition & 0 deletions public/service/stream_builder.go
Original file line number Diff line number Diff line change
Expand Up @@ -836,6 +836,7 @@ func (s *StreamBuilder) buildWithEnv(env *bundle.Environment) (*Stream, error) {
manager.OptSetTracer(tracer),
manager.OptSetEnvironment(env),
manager.OptSetBloblangEnvironment(s.env.getBloblangParserEnv()),
manager.OptSetFS(s.env.fs),
)
if err != nil {
return nil, err
Expand Down

0 comments on commit 3a7c024

Please sign in to comment.