Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(fs): fix mergefs bug where files were opened too many times #2287

Merged
merged 1 commit into from
Dec 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 44 additions & 31 deletions internal/datafs/mergefs.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,8 +122,8 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
// unescaped '+' characters to make it simpler to provide types like
// "application/array+json"
overrideType := typeOverrideParam()
mimeType := u.Query().Get(overrideType)
mimeType = strings.ReplaceAll(mimeType, " ", "+")
mimeTypeHint := u.Query().Get(overrideType)
mimeTypeHint = strings.ReplaceAll(mimeTypeHint, " ", "+")

// now that we have the hint, remove it from the URL - we can't have it
// leaking into the filesystem layer
Expand Down Expand Up @@ -151,23 +151,6 @@ func (f *mergeFS) Open(name string) (fs.File, error) {

fsys = fsimpl.WithHTTPClientFS(f.httpClient, fsys)

// find the content type
fi, err := fs.Stat(fsys, base)
if err != nil {
return nil, &fs.PathError{
Op: "open", Path: name,
Err: fmt.Errorf("stat merge part %q: %w", part, err),
}
}

if fi.ModTime().After(modTime) {
modTime = fi.ModTime()
}

if mimeType == "" {
mimeType = fsimpl.ContentType(fi)
}

f, err := fsys.Open(base)
if err != nil {
return nil, &fs.PathError{
Expand All @@ -176,7 +159,7 @@ func (f *mergeFS) Open(name string) (fs.File, error) {
}
}

subFiles[i] = subFile{f, mimeType}
subFiles[i] = subFile{f, mimeTypeHint}
}

return &mergeFile{
Expand Down Expand Up @@ -226,18 +209,16 @@ func (f *mergeFile) Read(p []byte) (int, error) {
if f.merged == nil {
f.readMux.Lock()
defer f.readMux.Unlock()

// read from all and merge
data := make([]map[string]interface{}, len(f.subFiles))
data := make([]map[string]any, len(f.subFiles))
for i, sf := range f.subFiles {
b, err := io.ReadAll(sf)
if err != nil && !errors.Is(err, io.EOF) {
return 0, fmt.Errorf("readAll: %w", err)
}

data[i], err = parseMap(sf.contentType, string(b))
d, err := f.readSubFile(sf)
if err != nil {
return 0, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err)
return 0, fmt.Errorf("readSubFile: %w", err)
}

data[i] = d
}

md, err := mergeData(data)
Expand All @@ -253,6 +234,36 @@ func (f *mergeFile) Read(p []byte) (int, error) {
return f.merged.Read(p)
}

func (f *mergeFile) readSubFile(sf subFile) (map[string]any, error) {
// stat for content type and modTime
fi, err := sf.Stat()
if err != nil {
return nil, fmt.Errorf("stat merge part %q: %w", f.name, err)
}

// the merged file's modTime is the most recent of all the sub-files
if fi.ModTime().After(f.modTime) {
f.modTime = fi.ModTime()
}

// if we haven't been given a content type hint, guess the normal way
if sf.contentType == "" {
sf.contentType = fsimpl.ContentType(fi)
}

b, err := io.ReadAll(sf)
if err != nil && !errors.Is(err, io.EOF) {
return nil, fmt.Errorf("readAll: %w", err)
}

sfData, err := parseMap(sf.contentType, string(b))
if err != nil {
return nil, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err)
}

return sfData, nil
}

func mergeData(data []map[string]interface{}) ([]byte, error) {
dst := data[0]
data = data[1:]
Expand All @@ -269,17 +280,19 @@ func mergeData(data []map[string]interface{}) ([]byte, error) {
return []byte(s), nil
}

func parseMap(mimeType, data string) (map[string]interface{}, error) {
func parseMap(mimeType, data string) (map[string]any, error) {
datum, err := parsers.ParseData(mimeType, data)
if err != nil {
return nil, fmt.Errorf("parseData: %w", err)
}
var m map[string]interface{}

var m map[string]any
switch datum := datum.(type) {
case map[string]interface{}:
case map[string]any:
m = datum
default:
return nil, fmt.Errorf("unexpected data type '%T' for datasource (type %s); merge: can only merge maps", datum, mimeType)
}

return m, nil
}
162 changes: 70 additions & 92 deletions internal/datafs/mergefs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package datafs

import (
"context"
"fmt"
"io"
"io/fs"
"mime"
Expand Down Expand Up @@ -31,19 +32,7 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS {
yamlContent := "hello: earth\ngoodnight: moon\n"
arrayContent := `["hello", "world"]`

wd, _ := os.Getwd()

// MapFS doesn't support windows path separators, so we use / exclusively
// in this test
vol := filepath.VolumeName(wd)
if vol != "" && wd != vol {
wd = wd[len(vol)+1:]
} else if wd[0] == '/' {
wd = wd[1:]
}
wd = filepath.ToSlash(wd)

t.Logf("wd: %s", wd)
wd := wdForTest(t)

fsys := WrapWdFS(fstest.MapFS{
"tmp": {Mode: fs.ModeDir | 0o777},
Expand Down Expand Up @@ -90,84 +79,22 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS {
return fsys
}

// func TestReadMerge(t *testing.T) {
// ctx := context.Background()

// jsonContent := `{"hello": "world"}`
// yamlContent := "hello: earth\ngoodnight: moon\n"
// arrayContent := `["hello", "world"]`

// mergedContent := "goodnight: moon\nhello: world\n"

// fsys := fstest.MapFS{}
// fsys["tmp"] = &fstest.MapFile{Mode: fs.ModeDir | 0777}
// fsys["tmp/jsonfile.json"] = &fstest.MapFile{Data: []byte(jsonContent)}
// fsys["tmp/array.json"] = &fstest.MapFile{Data: []byte(arrayContent)}
// fsys["tmp/yamlfile.yaml"] = &fstest.MapFile{Data: []byte(yamlContent)}
// fsys["tmp/textfile.txt"] = &fstest.MapFile{Data: []byte(`plain text...`)}

// // workding dir with volume name trimmed
// wd, _ := os.Getwd()
// vol := filepath.VolumeName(wd)
// wd = wd[len(vol)+1:]

// fsys[path.Join(wd, "jsonfile.json")] = &fstest.MapFile{Data: []byte(jsonContent)}
// fsys[path.Join(wd, "array.json")] = &fstest.MapFile{Data: []byte(arrayContent)}
// fsys[path.Join(wd, "yamlfile.yaml")] = &fstest.MapFile{Data: []byte(yamlContent)}
// fsys[path.Join(wd, "textfile.txt")] = &fstest.MapFile{Data: []byte(`plain text...`)}

// fsmux := fsimpl.NewMux()
// fsmux.Add(fsimpl.WrappedFSProvider(&fsys, "file"))
// ctx = datafs.ContextWithFSProvider(ctx, fsmux)

// source := &Source{Alias: "foo", URL: mustParseURL("merge:file:///tmp/jsonfile.json|file:///tmp/yamlfile.yaml")}
// d := &Data{
// Sources: map[string]*Source{
// "foo": source,
// "bar": {Alias: "bar", URL: mustParseURL("file:///tmp/jsonfile.json")},
// "baz": {Alias: "baz", URL: mustParseURL("file:///tmp/yamlfile.yaml")},
// "text": {Alias: "text", URL: mustParseURL("file:///tmp/textfile.txt")},
// "badscheme": {Alias: "badscheme", URL: mustParseURL("bad:///scheme.json")},
// "badtype": {Alias: "badtype", URL: mustParseURL("file:///tmp/textfile.txt?type=foo/bar")},
// "array": {Alias: "array", URL: mustParseURL("file:///tmp/array.json?type=" + url.QueryEscape(jsonArrayMimetype))},
// },
// Ctx: ctx,
// }

// actual, err := d.readMerge(ctx, source)
// require.NoError(t, err)
// assert.Equal(t, mergedContent, string(actual))

// source.URL = mustParseURL("merge:bar|baz")
// actual, err = d.readMerge(ctx, source)
// require.NoError(t, err)
// assert.Equal(t, mergedContent, string(actual))

// source.URL = mustParseURL("merge:./jsonfile.json|baz")
// actual, err = d.readMerge(ctx, source)
// require.NoError(t, err)
// assert.Equal(t, mergedContent, string(actual))

// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)

// source.URL = mustParseURL("merge:bogusalias|file:///tmp/jsonfile.json")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)

// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badscheme")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)

// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badtype")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)

// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|array")
// _, err = d.readMerge(ctx, source)
// require.Error(t, err)
// }
func wdForTest(t *testing.T) string {
t.Helper()

wd, _ := os.Getwd()

// MapFS doesn't support windows path separators, so we use / exclusively
vol := filepath.VolumeName(wd)
if vol != "" && wd != vol {
wd = wd[len(vol)+1:]
} else if wd[0] == '/' {
wd = wd[1:]
}
wd = filepath.ToSlash(wd)

return wd
}

func TestMergeData(t *testing.T) {
def := map[string]interface{}{
Expand Down Expand Up @@ -228,7 +155,6 @@ func TestMergeData(t *testing.T) {
}

func TestMergeFS_Open(t *testing.T) {
// u, _ := url.Parse("merge:")
fsys := setupMergeFsys(context.Background(), t)
assert.IsType(t, &mergeFS{}, fsys)

Expand Down Expand Up @@ -354,3 +280,55 @@ func TestMergeFS_ReadFile(t *testing.T) {
})
}
}

func TestMergeFS_ReadsSubFilesOnce(t *testing.T) {
mergedContent := "goodnight: moon\nhello: world\n"

wd := wdForTest(t)

fsys := WrapWdFS(
openOnce(&fstest.MapFS{
path.Join(wd, "tmp/jsonfile.json"): {Data: []byte(`{"hello": "world"}`)},
path.Join(wd, "tmp/yamlfile.yaml"): {Data: []byte("hello: earth\ngoodnight: moon\n")},
}))

mux := fsimpl.NewMux()
mux.Add(MergeFS)
mux.Add(WrappedFSProvider(fsys, "file", ""))

ctx := ContextWithFSProvider(context.Background(), mux)

reg := NewRegistry()
reg.Register("jsonfile", config.DataSource{URL: mustParseURL("tmp/jsonfile.json")})
reg.Register("yamlfile", config.DataSource{URL: mustParseURL("tmp/yamlfile.yaml")})

fsys, err := NewMergeFS(mustParseURL("merge:///"))
require.NoError(t, err)

fsys = WithDataSourceRegistryFS(reg, fsys)
fsys = fsimpl.WithContextFS(ctx, fsys)

b, err := fs.ReadFile(fsys, "jsonfile|yamlfile")
require.NoError(t, err)
assert.Equal(t, mergedContent, string(b))
}

type openOnceFS struct {
fs *fstest.MapFS
opened map[string]struct{}
}

// a filesystem that only allows opening or stating a file once
func openOnce(fsys *fstest.MapFS) fs.FS {
return &openOnceFS{fs: fsys, opened: map[string]struct{}{}}
}

func (f *openOnceFS) Open(name string) (fs.File, error) {
if _, ok := f.opened[name]; ok {
return nil, fmt.Errorf("open: %q already opened", name)
}

f.opened[name] = struct{}{}

return f.fs.Open(name)
}
Loading
Loading