Skip to content

Commit

Permalink
Support Sep (%2F) workaround in Schema Registry Processors (#2589)
Browse files Browse the repository at this point in the history
* Looks like working, but need cleanup

Signed-off-by: Gilad Leifman <[email protected]>

* Added tests, moved file

Signed-off-by: Gilad Leifman <[email protected]>

* Fix lints

* Rebase changes and update CHANGELOG

* Rebase changes and update CHANGELOG

* Remove unused test config file

---------

Signed-off-by: Gilad Leifman <[email protected]>
Co-authored-by: Ashley Jeffs <[email protected]>
  • Loading branch information
Mizaro and Jeffail authored Sep 11, 2024
1 parent d926fab commit 0704c50
Show file tree
Hide file tree
Showing 5 changed files with 151 additions and 5 deletions.
6 changes: 5 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,12 +3,16 @@ Changelog

All notable changes to this project will be documented in this file.

## 4.36.0 - TBD
## 4.36.0 - 2024-09-11

### Added

- Fields `replication_factor` and `replication_factor_override` added to the `kafka_migrator` input and output. (@mihaitodor)

### Fixed

- The `schema_registry_encode` and `schema_registry_decode` processors no longer unescape path separators in the schema name. (@Mizaro)

## 4.35.1 - 2024-09-06

### Added
Expand Down
128 changes: 128 additions & 0 deletions internal/impl/confluent/client_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,128 @@
// Copyright 2024 Redpanda Data, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package confluent

import (
"context"
"encoding/json"
"errors"
"fmt"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/redpanda-data/benthos/v4/public/service"

"github.com/redpanda-data/connect/v4/internal/impl/confluent/sr"
)

func TestSchemaRegistryClient_GetSchemaBySubjectAndVersion(t *testing.T) {
ctx := context.Background()
fooFirst, err := json.Marshal(struct {
Schema string `json:"schema"`
ID int `json:"id"`
}{
Schema: testSchema,
ID: 3,
})
require.NoError(t, err)

version := 4

type args struct {
subject string
version *int
}
tests := []struct {
name string
schemaRegistryServerURL string
args args
wantResPayload sr.SchemaInfo
wantErr assert.ErrorAssertionFunc
}{
{
name: "sanity",
schemaRegistryServerURL: "/subjects/foo/versions/latest",
args: args{
subject: "foo",
version: nil,
},
wantResPayload: sr.SchemaInfo{
ID: 3,
Schema: testSchema,
},
wantErr: assert.NoError,
},
{
name: "contains sep (%2F)",
schemaRegistryServerURL: "/subjects/main%2Fcommon/versions/latest",
args: args{
subject: "main/common",
version: nil,
},
wantResPayload: sr.SchemaInfo{
ID: 3,
Schema: testSchema,
},
wantErr: assert.NoError,
},
{
name: "sanity with version",
schemaRegistryServerURL: "/subjects/foo/versions/4",
args: args{
subject: "foo",
version: &version,
},
wantResPayload: sr.SchemaInfo{
ID: 3,
Schema: testSchema,
},
wantErr: assert.NoError,
},
{
name: "contains sep (%2F) with version",
schemaRegistryServerURL: "/subjects/main%2Fcommon/versions/4",
args: args{
subject: "main/common",
version: &version,
},
wantResPayload: sr.SchemaInfo{
ID: 3,
Schema: testSchema,
},
wantErr: assert.NoError,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) {
if path == tt.schemaRegistryServerURL {
return fooFirst, nil
}
return nil, errors.New("nope")
})

c, err := sr.NewClient(urlStr, noopReqSign, nil, service.MockResources())
require.NoError(t, err)

gotResPayload, err := c.GetSchemaBySubjectAndVersion(ctx, tt.args.subject, tt.args.version)
if !tt.wantErr(t, err, fmt.Sprintf("GetSchemaBySubjectAndVersion(%v, %v, %v)", ctx, tt.args.subject, tt.args.version)) {
return
}
assert.Equalf(t, tt.wantResPayload, gotResPayload, "GetSchemaBySubjectAndVersion(%v, %v, %v)", ctx, tt.args.subject, tt.args.version)
})
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,7 @@ func runSchemaRegistryServer(t testing.TB, fn func(path string) ([]byte, error))
reqMut.Lock()
defer reqMut.Unlock()

b, err := fn(r.URL.Path)
b, err := fn(r.URL.EscapedPath())
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -134,13 +134,13 @@ func TestSchemaRegistryEncodeAvro(t *testing.T) {
require.NoError(t, err)

urlStr := runSchemaRegistryServer(t, func(path string) ([]byte, error) {
if path == "/subjects/foo/versions/latest" {
if path == "/subjects/foo%2Fbar/versions/latest" {
return fooFirst, nil
}
return nil, errors.New("nope")
})

subj, err := service.NewInterpolatedString("foo")
subj, err := service.NewInterpolatedString("foo/bar")
require.NoError(t, err)

encoder, err := newSchemaRegistryEncoder(urlStr, noopReqSign, nil, subj, false, time.Minute*10, time.Minute, service.MockResources())
Expand Down
16 changes: 15 additions & 1 deletion internal/impl/confluent/sr/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,15 @@ import (
"io/fs"
"net/http"
"net/url"
"regexp"

"github.com/redpanda-data/benthos/v4/public/service"
)

var (
escapedSepRegexp = regexp.MustCompile("(?i)%2F")
)

// Client is used to make requests to a schema registry.
type Client struct {
SchemaRegistryBaseURL *url.URL
Expand Down Expand Up @@ -194,8 +199,17 @@ func (c *Client) doRequest(ctx context.Context, verb, reqPath string) (resCode i
return
}

reqURLString := reqURL.String()
if match := escapedSepRegexp.MatchString(reqPath); match {
// Supporting '%2f' in the request url bypassing
// Workaround for Golang issue https://github.com/golang/go/issues/3659
if reqURLString, err = url.PathUnescape(reqURLString); err != nil {
return
}
}

var req *http.Request
if req, err = http.NewRequestWithContext(ctx, verb, reqURL.String(), http.NoBody); err != nil {
if req, err = http.NewRequestWithContext(ctx, verb, reqURLString, http.NoBody); err != nil {
return
}
req.Header.Add("Accept", "application/vnd.schemaregistry.v1+json")
Expand Down

0 comments on commit 0704c50

Please sign in to comment.