Skip to content

Commit

Permalink
fix(fs): fix mergefs bug where files were opened too many times
Browse files Browse the repository at this point in the history
Signed-off-by: Dave Henderson <[email protected]>
  • Loading branch information
hairyhenderson committed Dec 16, 2024
1 parent 40c38b7 commit 1c402b4
Show file tree
Hide file tree
Showing 3 changed files with 195 additions and 180 deletions.
75 changes: 44 additions & 31 deletions internal/datafs/mergefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
// unescaped '+' characters to make it simpler to provide types like
// "application/array+json"
overrideType := typeOverrideParam()
mimeType := u.Query().Get(overrideType)
mimeType = strings.ReplaceAll(mimeType, " ", "+")
mimeTypeHint := u.Query().Get(overrideType)
mimeTypeHint = strings.ReplaceAll(mimeTypeHint, " ", "+")

// now that we have the hint, remove it from the URL - we can't have it
// leaking into the filesystem layer
Expand Down Expand Up @@ -151,23 +151,6 @@ func (f *mergeFS) Open(name string) (fs.File, error) {

fsys = fsimpl.WithHTTPClientFS(f.httpClient, fsys)

// find the content type
fi, err := fs.Stat(fsys, base)
if err != nil {
return nil, &fs.PathError{
Op: "open", Path: name,
Err: fmt.Errorf("stat merge part %q: %w", part, err),
}
}

if fi.ModTime().After(modTime) {
modTime = fi.ModTime()
}

if mimeType == "" {
mimeType = fsimpl.ContentType(fi)
}

f, err := fsys.Open(base)
if err != nil {
return nil, &fs.PathError{
Expand All @@ -176,7 +159,7 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
}
}

subFiles[i] = subFile{f, mimeType}
subFiles[i] = subFile{f, mimeTypeHint}
}

return &mergeFile{
Expand Down Expand Up @@ -226,18 +209,16 @@ func (f *mergeFile) Read(p []byte) (int, error) {
if f.merged == nil {
f.readMux.Lock()
defer f.readMux.Unlock()

// read from all and merge
data := make([]map[string]interface{}, len(f.subFiles))
data := make([]map[string]any, len(f.subFiles))
for i, sf := range f.subFiles {
b, err := io.ReadAll(sf)
if err != nil && !errors.Is(err, io.EOF) {
return 0, fmt.Errorf("readAll: %w", err)
}

data[i], err = parseMap(sf.contentType, string(b))
d, err := f.readSubFile(sf)
if err != nil {
return 0, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err)
return 0, fmt.Errorf("readSubFile: %w", err)
}

data[i] = d
}

md, err := mergeData(data)
Expand All @@ -253,6 +234,36 @@ func (f *mergeFile) Read(p []byte) (int, error) {
return f.merged.Read(p)
}

func (f *mergeFile) readSubFile(sf subFile) (map[string]any, error) {
// stat for content type and modTime
fi, err := sf.Stat()
if err != nil {
return nil, fmt.Errorf("stat merge part %q: %w", f.name, err)
}

// the merged file's modTime is the most recent of all the sub-files
if fi.ModTime().After(f.modTime) {
f.modTime = fi.ModTime()
}

// if we haven't been given a content type hint, guess the normal way
if sf.contentType == "" {
sf.contentType = fsimpl.ContentType(fi)
}

b, err := io.ReadAll(sf)
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("readAll: %w", err)
}

sfData, err := parseMap(sf.contentType, string(b))
if err != nil {
return nil, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err)
}

return sfData, nil
}

func mergeData(data []map[string]interface{}) ([]byte, error) {
dst := data[0]
data = data[1:]
Expand All @@ -269,17 +280,19 @@ func mergeData(data []map[string]interface{}) ([]byte, error) {
return []byte(s), nil
}

func parseMap(mimeType, data string) (map[string]interface{}, error) {
func parseMap(mimeType, data string) (map[string]any, error) {
datum, err := parsers.ParseData(mimeType, data)
if err != nil {
return nil, fmt.Errorf("parseData: %w", err)
}
var m map[string]interface{}

var m map[string]any
switch datum := datum.(type) {
case map[string]interface{}:
case map[string]any:
m = datum
default:
return nil, fmt.Errorf("unexpected data type '%T' for datasource (type %s); merge: can only merge maps", datum, mimeType)
}

return m, nil
}
162 changes: 70 additions & 92 deletions internal/datafs/mergefs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datafs

import (
"context"
"fmt"
"io"
"io/fs"
"mime"
Expand Down Expand Up @@ -31,19 +32,7 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS {
yamlContent := "hello: earth\ngoodnight: moon\n"
arrayContent := `["hello", "world"]`

wd, _ := os.Getwd()

// MapFS doesn't support windows path separators, so we use / exclusively
// in this test
vol := filepath.VolumeName(wd)
if vol != "" && wd != vol {
wd = wd[len(vol)+1:]
} else if wd[0] == '/' {
wd = wd[1:]
}
wd = filepath.ToSlash(wd)

t.Logf("wd: %s", wd)
wd := wdForTest(t)

fsys := WrapWdFS(fstest.MapFS{
"tmp": {Mode: fs.ModeDir | 0o777},
Expand Down Expand Up @@ -90,84 +79,22 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS {
return fsys
}

// func TestReadMerge(t *testing.T) {
// ctx := context.Background()

// jsonContent := `{"hello": "world"}`
// yamlContent := "hello: earth\ngoodnight: moon\n"
// arrayContent := `["hello", "world"]`

// mergedContent := "goodnight: moon\nhello: world\n"

// fsys := fstest.MapFS{}
// fsys["tmp"] = &fstest.MapFile{Mode: fs.ModeDir | 0777}
// fsys["tmp/jsonfile.json"] = &fstest.MapFile{Data: []byte(jsonContent)}
// fsys["tmp/array.json"] = &fstest.MapFile{Data: []byte(arrayContent)}
// fsys["tmp/yamlfile.yaml"] = &fstest.MapFile{Data: []byte(yamlContent)}
// fsys["tmp/textfile.txt"] = &fstest.MapFile{Data: []byte(`plain text...`)}

// // workding dir with volume name trimmed
// wd, _ := os.Getwd()
// vol := filepath.VolumeName(wd)
// wd = wd[len(vol)+1:]

// fsys[path.Join(wd, "jsonfile.json")] = &fstest.MapFile{Data: []byte(jsonContent)}
// fsys[path.Join(wd, "array.json")] = &fstest.MapFile{Data: []byte(arrayContent)}
// fsys[path.Join(wd, "yamlfile.yaml")] = &fstest.MapFile{Data: []byte(yamlContent)}
// fsys[path.Join(wd, "textfile.txt")] = &fstest.MapFile{Data: []byte(`plain text...`)}

// fsmux := fsimpl.NewMux()
// fsmux.Add(fsimpl.WrappedFSProvider(&fsys, "file"))
// ctx = datafs.ContextWithFSProvider(ctx, fsmux)

// source := &Source{Alias: "foo", URL: mustParseURL("merge:file:///tmp/jsonfile.json|file:///tmp/yamlfile.yaml")}
// d := &Data{
// Sources: map[string]*Source{
// "foo": source,
// "bar": {Alias: "bar", URL: mustParseURL("file:///tmp/jsonfile.json")},
// "baz": {Alias: "baz", URL: mustParseURL("file:///tmp/yamlfile.yaml")},
// "text": {Alias: "text", URL: mustParseURL("file:///tmp/textfile.txt")},
// "badscheme": {Alias: "badscheme", URL: mustParseURL("bad:///scheme.json")},
// "badtype": {Alias: "badtype", URL: mustParseURL("file:///tmp/textfile.txt?type=foo/bar")},
// "array": {Alias: "array", URL: mustParseURL("file:///tmp/array.json?type=" + url.QueryEscape(jsonArrayMimetype))},
// },
// Ctx: ctx,
// }

// actual, err := d.readMerge(ctx, source)
// require.NoError(t, err)
// assert.Equal(t, mergedContent, string(actual))

// source.URL = mustParseURL("merge:bar|baz")
// actual, err = d.readMerge(ctx, source)
// require.NoError(t, err)
// assert.Equal(t, mergedContent, string(actual))

// source.URL = mustParseURL("merge:./jsonfile.json|baz")
// actual, err = d.readMerge(ctx, source)
// require.NoError(t, err)
// assert.Equal(t, mergedContent, string(actual))

// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)

// source.URL = mustParseURL("merge:bogusalias|file:///tmp/jsonfile.json")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)

// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badscheme")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)

// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badtype")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)

// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|array")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)
// }
func wdForTest(t *testing.T) string {
t.Helper()

wd, _ := os.Getwd()

// MapFS doesn't support windows path separators, so we use / exclusively
vol := filepath.VolumeName(wd)
if vol != "" && wd != vol {
wd = wd[len(vol)+1:]
} else if wd[0] == '/' {
wd = wd[1:]
}
wd = filepath.ToSlash(wd)

return wd
}

func TestMergeData(t *testing.T) {
def := map[string]interface{}{
Expand Down Expand Up @@ -228,7 +155,6 @@ func TestMergeData(t *testing.T) {
}

func TestMergeFS_Open(t *testing.T) {
// u, _ := url.Parse("merge:")
fsys := setupMergeFsys(context.Background(), t)
assert.IsType(t, &mergeFS{}, fsys)

Expand Down Expand Up @@ -354,3 +280,55 @@ func TestMergeFS_ReadFile(t *testing.T) {
})
}
}

func TestMergeFS_ReadsSubFilesOnce(t *testing.T) {
mergedContent := "goodnight: moon\nhello: world\n"

wd := wdForTest(t)

fsys := WrapWdFS(
openOnce(&fstest.MapFS{
path.Join(wd, "tmp/jsonfile.json"): {Data: []byte(`{"hello": "world"}`)},
path.Join(wd, "tmp/yamlfile.yaml"): {Data: []byte("hello: earth\ngoodnight: moon\n")},
}))

mux := fsimpl.NewMux()
mux.Add(MergeFS)
mux.Add(WrappedFSProvider(fsys, "file", ""))

ctx := ContextWithFSProvider(context.Background(), mux)

reg := NewRegistry()
reg.Register("jsonfile", config.DataSource{URL: mustParseURL("tmp/jsonfile.json")})
reg.Register("yamlfile", config.DataSource{URL: mustParseURL("tmp/yamlfile.yaml")})

fsys, err := NewMergeFS(mustParseURL("merge:///"))
require.NoError(t, err)

fsys = WithDataSourceRegistryFS(reg, fsys)
fsys = fsimpl.WithContextFS(ctx, fsys)

b, err := fs.ReadFile(fsys, "jsonfile|yamlfile")
require.NoError(t, err)
assert.Equal(t, mergedContent, string(b))
}

type openOnceFS struct {
fs *fstest.MapFS
opened map[string]struct{}
}

// a filesystem that only allows opening or stating a file once
func openOnce(fsys *fstest.MapFS) fs.FS {
return &openOnceFS{fs: fsys, opened: map[string]struct{}{}}
}

func (f *openOnceFS) Open(name string) (fs.File, error) {
if _, ok := f.opened[name]; ok {
return nil, fmt.Errorf("open: %q already opened", name)
}

f.opened[name] = struct{}{}

return f.fs.Open(name)
}
Loading

0 comments on commit 1c402b4

Please sign in to comment.