diff --git a/internal/datafs/mergefs.go b/internal/datafs/mergefs.go index b056aff75..dd74eb679 100644 --- a/internal/datafs/mergefs.go +++ b/internal/datafs/mergefs.go @@ -122,8 +122,8 @@ func (f *mergeFS) Open(name string) (fs.File, error) { // unescaped '+' characters to make it simpler to provide types like // "application/array+json" overrideType := typeOverrideParam() - mimeType := u.Query().Get(overrideType) - mimeType = strings.ReplaceAll(mimeType, " ", "+") + mimeTypeHint := u.Query().Get(overrideType) + mimeTypeHint = strings.ReplaceAll(mimeTypeHint, " ", "+") // now that we have the hint, remove it from the URL - we can't have it // leaking into the filesystem layer @@ -151,23 +151,6 @@ func (f *mergeFS) Open(name string) (fs.File, error) { fsys = fsimpl.WithHTTPClientFS(f.httpClient, fsys) - // find the content type - fi, err := fs.Stat(fsys, base) - if err != nil { - return nil, &fs.PathError{ - Op: "open", Path: name, - Err: fmt.Errorf("stat merge part %q: %w", part, err), - } - } - - if fi.ModTime().After(modTime) { - modTime = fi.ModTime() - } - - if mimeType == "" { - mimeType = fsimpl.ContentType(fi) - } - f, err := fsys.Open(base) if err != nil { return nil, &fs.PathError{ @@ -176,7 +159,7 @@ func (f *mergeFS) Open(name string) (fs.File, error) { } } - subFiles[i] = subFile{f, mimeType} + subFiles[i] = subFile{f, mimeTypeHint} } return &mergeFile{ @@ -226,18 +209,16 @@ func (f *mergeFile) Read(p []byte) (int, error) { if f.merged == nil { f.readMux.Lock() defer f.readMux.Unlock() + // read from all and merge - data := make([]map[string]interface{}, len(f.subFiles)) + data := make([]map[string]any, len(f.subFiles)) for i, sf := range f.subFiles { - b, err := io.ReadAll(sf) - if err != nil && !errors.Is(err, io.EOF) { - return 0, fmt.Errorf("readAll: %w", err) - } - - data[i], err = parseMap(sf.contentType, string(b)) + d, err := f.readSubFile(sf) if err != nil { - return 0, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err) + return 0, fmt.Errorf("readSubFile: %w", err) } + + data[i] = d } md, err := mergeData(data) @@ -253,6 +234,36 @@ func (f *mergeFile) Read(p []byte) (int, error) { return f.merged.Read(p) } +func (f *mergeFile) readSubFile(sf subFile) (map[string]any, error) { + // stat for content type and modTime + fi, err := sf.Stat() + if err != nil { + return nil, fmt.Errorf("stat merge part %q: %w", f.name, err) + } + + // the merged file's modTime is the most recent of all the sub-files + if fi.ModTime().After(f.modTime) { + f.modTime = fi.ModTime() + } + + // if we haven't been given a content type hint, guess the normal way + if sf.contentType == "" { + sf.contentType = fsimpl.ContentType(fi) + } + + b, err := io.ReadAll(sf) + if err != nil && !errors.Is(err, io.EOF) { + return nil, fmt.Errorf("readAll: %w", err) + } + + sfData, err := parseMap(sf.contentType, string(b)) + if err != nil { + return nil, fmt.Errorf("parsing map with content type %s: %w", sf.contentType, err) + } + + return sfData, nil +} + func mergeData(data []map[string]interface{}) ([]byte, error) { dst := data[0] data = data[1:] @@ -269,17 +280,19 @@ func mergeData(data []map[string]interface{}) ([]byte, error) { return []byte(s), nil } -func parseMap(mimeType, data string) (map[string]interface{}, error) { +func parseMap(mimeType, data string) (map[string]any, error) { datum, err := parsers.ParseData(mimeType, data) if err != nil { return nil, fmt.Errorf("parseData: %w", err) } - var m map[string]interface{} + + var m map[string]any switch datum := datum.(type) { - case map[string]interface{}: + case map[string]any: m = datum default: return nil, fmt.Errorf("unexpected data type '%T' for datasource (type %s); merge: can only merge maps", datum, mimeType) } + return m, nil } diff --git a/internal/datafs/mergefs_test.go b/internal/datafs/mergefs_test.go index 17ffa716e..b332722bc 100644 --- a/internal/datafs/mergefs_test.go +++ b/internal/datafs/mergefs_test.go @@ -2,6 +2,7 @@ package datafs import ( "context" + "fmt" "io" "io/fs" "mime" @@ -31,19 +32,7 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS { yamlContent := "hello: earth\ngoodnight: moon\n" arrayContent := `["hello", "world"]` - wd, _ := os.Getwd() - - // MapFS doesn't support windows path separators, so we use / exclusively - // in this test - vol := filepath.VolumeName(wd) - if vol != "" && wd != vol { - wd = wd[len(vol)+1:] - } else if wd[0] == '/' { - wd = wd[1:] - } - wd = filepath.ToSlash(wd) - - t.Logf("wd: %s", wd) + wd := wdForTest(t) fsys := WrapWdFS(fstest.MapFS{ "tmp": {Mode: fs.ModeDir | 0o777}, @@ -90,84 +79,22 @@ func setupMergeFsys(ctx context.Context, t *testing.T) fs.FS { return fsys } -// func TestReadMerge(t *testing.T) { -// ctx := context.Background() - -// jsonContent := `{"hello": "world"}` -// yamlContent := "hello: earth\ngoodnight: moon\n" -// arrayContent := `["hello", "world"]` - -// mergedContent := "goodnight: moon\nhello: world\n" - -// fsys := fstest.MapFS{} -// fsys["tmp"] = &fstest.MapFile{Mode: fs.ModeDir | 0777} -// fsys["tmp/jsonfile.json"] = &fstest.MapFile{Data: []byte(jsonContent)} -// fsys["tmp/array.json"] = &fstest.MapFile{Data: []byte(arrayContent)} -// fsys["tmp/yamlfile.yaml"] = &fstest.MapFile{Data: []byte(yamlContent)} -// fsys["tmp/textfile.txt"] = &fstest.MapFile{Data: []byte(`plain text...`)} - -// // workding dir with volume name trimmed -// wd, _ := os.Getwd() -// vol := filepath.VolumeName(wd) -// wd = wd[len(vol)+1:] - -// fsys[path.Join(wd, "jsonfile.json")] = &fstest.MapFile{Data: []byte(jsonContent)} -// fsys[path.Join(wd, "array.json")] = &fstest.MapFile{Data: []byte(arrayContent)} -// fsys[path.Join(wd, "yamlfile.yaml")] = &fstest.MapFile{Data: []byte(yamlContent)} -// fsys[path.Join(wd, "textfile.txt")] = &fstest.MapFile{Data: []byte(`plain text...`)} - -// fsmux := fsimpl.NewMux() -// fsmux.Add(fsimpl.WrappedFSProvider(&fsys, "file")) -// ctx = datafs.ContextWithFSProvider(ctx, fsmux) - -// source := &Source{Alias: "foo", URL: mustParseURL("merge:file:///tmp/jsonfile.json|file:///tmp/yamlfile.yaml")} -// d := &Data{ -// Sources: map[string]*Source{ -// "foo": source, -// "bar": {Alias: "bar", URL: mustParseURL("file:///tmp/jsonfile.json")}, -// "baz": {Alias: "baz", URL: mustParseURL("file:///tmp/yamlfile.yaml")}, -// "text": {Alias: "text", URL: mustParseURL("file:///tmp/textfile.txt")}, -// "badscheme": {Alias: "badscheme", URL: mustParseURL("bad:///scheme.json")}, -// "badtype": {Alias: "badtype", URL: mustParseURL("file:///tmp/textfile.txt?type=foo/bar")}, -// "array": {Alias: "array", URL: mustParseURL("file:///tmp/array.json?type=" + url.QueryEscape(jsonArrayMimetype))}, -// }, -// Ctx: ctx, -// } - -// actual, err := d.readMerge(ctx, source) -// require.NoError(t, err) -// assert.Equal(t, mergedContent, string(actual)) - -// source.URL = mustParseURL("merge:bar|baz") -// actual, err = d.readMerge(ctx, source) -// require.NoError(t, err) -// assert.Equal(t, mergedContent, string(actual)) - -// source.URL = mustParseURL("merge:./jsonfile.json|baz") -// actual, err = d.readMerge(ctx, source) -// require.NoError(t, err) -// assert.Equal(t, mergedContent, string(actual)) - -// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) - -// source.URL = mustParseURL("merge:bogusalias|file:///tmp/jsonfile.json") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) - -// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badscheme") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) - -// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|badtype") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) - -// source.URL = mustParseURL("merge:file:///tmp/jsonfile.json|array") -// _, err = d.readMerge(ctx, source) -// require.Error(t, err) -// } +func wdForTest(t *testing.T) string { + t.Helper() + + wd, _ := os.Getwd() + + // MapFS doesn't support windows path separators, so we use / exclusively + vol := filepath.VolumeName(wd) + if vol != "" && wd != vol { + wd = wd[len(vol)+1:] + } else if wd[0] == '/' { + wd = wd[1:] + } + wd = filepath.ToSlash(wd) + + return wd +} func TestMergeData(t *testing.T) { def := map[string]interface{}{ @@ -228,7 +155,6 @@ func TestMergeData(t *testing.T) { } func TestMergeFS_Open(t *testing.T) { - // u, _ := url.Parse("merge:") fsys := setupMergeFsys(context.Background(), t) assert.IsType(t, &mergeFS{}, fsys) @@ -354,3 +280,55 @@ func TestMergeFS_ReadFile(t *testing.T) { }) } } + +func TestMergeFS_ReadsSubFilesOnce(t *testing.T) { + mergedContent := "goodnight: moon\nhello: world\n" + + wd := wdForTest(t) + + fsys := WrapWdFS( + openOnce(&fstest.MapFS{ + path.Join(wd, "tmp/jsonfile.json"): {Data: []byte(`{"hello": "world"}`)}, + path.Join(wd, "tmp/yamlfile.yaml"): {Data: []byte("hello: earth\ngoodnight: moon\n")}, + })) + + mux := fsimpl.NewMux() + mux.Add(MergeFS) + mux.Add(WrappedFSProvider(fsys, "file", "")) + + ctx := ContextWithFSProvider(context.Background(), mux) + + reg := NewRegistry() + reg.Register("jsonfile", config.DataSource{URL: mustParseURL("tmp/jsonfile.json")}) + reg.Register("yamlfile", config.DataSource{URL: mustParseURL("tmp/yamlfile.yaml")}) + + fsys, err := NewMergeFS(mustParseURL("merge:///")) + require.NoError(t, err) + + fsys = WithDataSourceRegistryFS(reg, fsys) + fsys = fsimpl.WithContextFS(ctx, fsys) + + b, err := fs.ReadFile(fsys, "jsonfile|yamlfile") + require.NoError(t, err) + assert.Equal(t, mergedContent, string(b)) +} + +type openOnceFS struct { + fs *fstest.MapFS + opened map[string]struct{} +} + +// a filesystem that only allows opening or stating a file once +func openOnce(fsys *fstest.MapFS) fs.FS { + return &openOnceFS{fs: fsys, opened: map[string]struct{}{}} +} + +func (f *openOnceFS) Open(name string) (fs.File, error) { + if _, ok := f.opened[name]; ok { + return nil, fmt.Errorf("open: %q already opened", name) + } + + f.opened[name] = struct{}{} + + return f.fs.Open(name) +} diff --git a/internal/tests/integration/datasources_merge_test.go b/internal/tests/integration/datasources_merge_test.go index 4317ef80f..96d9c6219 100644 --- a/internal/tests/integration/datasources_merge_test.go +++ b/internal/tests/integration/datasources_merge_test.go @@ -35,62 +35,86 @@ func setupDatasourcesMergeTest(t *testing.T) (*fs.Dir, *httptest.Server) { func TestDatasources_Merge(t *testing.T) { tmpDir, srv := setupDatasourcesMergeTest(t) - o, e, err := cmd(t, - "-d", "user="+tmpDir.Join("config.json"), - "-d", "default="+tmpDir.Join("default.yml"), - "-d", "config=merge:user|default", - "-i", `{{ ds "config" | toJSON }}`, - ).run() - assertSuccess(t, o, e, err, `{"foo":{"bar":"baz"},"isDefault":false,"isOverride":true,"other":true}`) - - o, e, err = cmd(t, - "-d", "default="+tmpDir.Join("default.yml"), - "-d", "config=merge:user|default", - "-i", `{{ defineDatasource "user" `+"`"+tmpDir.Join("config.json")+"`"+` }}{{ ds "config" | toJSON }}`, - ).run() - assertSuccess(t, o, e, err, `{"foo":{"bar":"baz"},"isDefault":false,"isOverride":true,"other":true}`) - - o, e, err = cmd(t, - "-d", "default="+tmpDir.Join("default.yml"), - "-d", "config=merge:"+srv.URL+"/foo.json|default", - "-i", `{{ ds "config" | toJSON }}`, - ).run() - assertSuccess(t, o, e, err, `{"foo":"bar","isDefault":true,"isOverride":false,"other":true}`) - - o, e, err = cmd(t, - "-c", "merged=merge:"+srv.URL+"/2.env|"+srv.URL+"/1.env", - "-i", `FOO is {{ .merged.FOO }}`, - ).run() - assertSuccess(t, o, e, err, `FOO is 3`) - - o, e, err = cmd(t, - "-c", "default="+tmpDir.Join("default.yml"), - "-i", `{{ defineDatasource "merged" "merge:`+srv.URL+`/foo.json|default" -}} + t.Run("from two aliased datasources", func(t *testing.T) { + o, e, err := cmd(t, + "-d", "user="+tmpDir.Join("config.json"), + "-d", "default="+tmpDir.Join("default.yml"), + "-d", "config=merge:user|default", + "-i", `{{ ds "config" | toJSON }}`, + ).run() + assertSuccess(t, o, e, err, `{"foo":{"bar":"baz"},"isDefault":false,"isOverride":true,"other":true}`) + }) + + t.Run("with dynamic datasource", func(t *testing.T) { + o, e, err := cmd(t, + "-d", "default="+tmpDir.Join("default.yml"), + "-d", "config=merge:user|default", + "-i", `{{ defineDatasource "user" `+"`"+tmpDir.Join("config.json")+"`"+` }}{{ ds "config" | toJSON }}`, + ).run() + assertSuccess(t, o, e, err, `{"foo":{"bar":"baz"},"isDefault":false,"isOverride":true,"other":true}`) + }) + + t.Run("with inline datasource", func(t *testing.T) { + o, e, err := cmd(t, + "-d", "default="+tmpDir.Join("default.yml"), + "-d", "config=merge:"+srv.URL+"/foo.json|default", + "-i", `{{ ds "config" | toJSON }}`, + ).run() + assertSuccess(t, o, e, err, `{"foo":"bar","isDefault":true,"isOverride":false,"other":true}`) + }) + + t.Run("with two inline env datasources", func(t *testing.T) { + o, e, err := cmd(t, + "-c", "merged=merge:"+srv.URL+"/2.env|"+srv.URL+"/1.env", + "-i", `FOO is {{ .merged.FOO }}`, + ).run() + assertSuccess(t, o, e, err, `FOO is 3`) + }) + + t.Run("inline merge with inline datasource", func(t *testing.T) { + o, e, err := cmd(t, + "-c", "default="+tmpDir.Join("default.yml"), + "-i", `{{ defineDatasource "merged" "merge:`+srv.URL+`/foo.json|default" -}} {{ ds "merged" | toJSON }}`, - ).run() - assertSuccess(t, o, e, err, `{"foo":"bar","isDefault":true,"isOverride":false,"other":true}`) - - o, e, err = cmd(t, - "-d", "default="+tmpDir.Join("default.yml"), - "-d", "wrongtype="+srv.URL+"/wrongtype.txt?type=application/json", - "-d", "config=merge:wrongtype|default", - "-i", `{{ ds "config" | toJSON }}`, - ).run() - assertSuccess(t, o, e, err, `{"foo":"bar","isDefault":true,"isOverride":false,"other":true}`) - - o, e, err = cmd(t, - "-d", "default="+tmpDir.Join("default.yml"), - "-d", "wrongtype="+srv.URL+"/wrongtype.txt?_=application/json", - "-d", "config=merge:wrongtype|default", - "-i", `{{ ds "config" | toJSON }}`, - ).withEnv("GOMPLATE_TYPE_PARAM", "_").run() - assertSuccess(t, o, e, err, `{"foo":"bar","isDefault":true,"isOverride":false,"other":true}`) - - o, e, err = cmd(t, - "-c", "default="+tmpDir.Join("default.yml"), - "-c", "params="+srv.URL+"/params?foo=bar&type=http&_type=application/json", - "-c", "merged=merge:params|default", - "-i", `{{ .merged | toJSON }}`, - ).withEnv("GOMPLATE_TYPE_PARAM", "_type").run() - assertSuccess(t, o, e, err, `{"foo":["bar"],"isDefault":true,"isOverride":false,"other":true,"type":["http"]}`) + ).run() + assertSuccess(t, o, e, err, `{"foo":"bar","isDefault":true,"isOverride":false,"other":true}`) + }) + + t.Run("with overridden type", func(t *testing.T) { + o, e, err := cmd(t, + "-d", "default="+tmpDir.Join("default.yml"), + "-d", "wrongtype="+srv.URL+"/wrongtype.txt?type=application/json", + "-d", "config=merge:wrongtype|default", + "-i", `{{ ds "config" | toJSON }}`, + ).run() + assertSuccess(t, o, e, err, `{"foo":"bar","isDefault":true,"isOverride":false,"other":true}`) + }) + + t.Run("type overridden by env var", func(t *testing.T) { + o, e, err := cmd(t, + "-d", "default="+tmpDir.Join("default.yml"), + "-d", "wrongtype="+srv.URL+"/wrongtype.txt?_=application/json", + "-d", "config=merge:wrongtype|default", + "-i", `{{ ds "config" | toJSON }}`, + ).withEnv("GOMPLATE_TYPE_PARAM", "_").run() + assertSuccess(t, o, e, err, `{"foo":"bar","isDefault":true,"isOverride":false,"other":true}`) + + o, e, err = cmd(t, + "-c", "default="+tmpDir.Join("default.yml"), + "-c", "params="+srv.URL+"/params?foo=bar&type=http&_type=application/json", + "-c", "merged=merge:params|default", + "-i", `{{ .merged | toJSON }}`, + ).withEnv("GOMPLATE_TYPE_PARAM", "_type").run() + assertSuccess(t, o, e, err, `{"foo":["bar"],"isDefault":true,"isOverride":false,"other":true,"type":["http"]}`) + }) + + t.Run("from stdin", func(t *testing.T) { + o, e, err := cmd(t, + "-d", "stdindata=stdin:///in.json", + "-d", "filedata="+srv.URL+"/foo.json", + "-d", "merged=merge:stdindata|filedata", + "-i", `{{ ds "merged" | toJSON }}`, + ).withStdin(`{"baz": "qux"}`).run() + assertSuccess(t, o, e, err, `{"baz":"qux","foo":"bar"}`) + }) }