Skip to content

Commit

Permalink
Merge pull request #2949 from redpanda-data/mihaitodor-add-schema-id-…
Browse files Browse the repository at this point in the history
…translation

Add schema id translation
  • Loading branch information
mihaitodor authored Nov 19, 2024
2 parents 5f28795 + 6bbdc41 commit 7dd7f1b
Show file tree
Hide file tree
Showing 27 changed files with 1,177 additions and 713 deletions.
4 changes: 4 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,14 @@ All notable changes to this project will be documented in this file.
### Added

- New `pg_stream` input supporting change data capture (CDC) from PostgreSQL (@le-vlad)
- Field `metadata_max_age` added to the `redpanda_migrator_offsets` output. (@mihaitodor)

### Changed

- `snowflake_streaming` with `schema_evolution.enabled` set to true can now autocreate tables.
- Fields `translate_schema_ids` and `schema_registry_output_resource` added to the `redpanda_migrator` output. (@mihaitodor)
- Fields `backfill_dependencies` and `input_resource` added to the `schema_registry` output. (@mihaitodor)
- The `schema_registry` input and output and the `schema_registry_encode` and `schema_registry_decode` processors now use the `github.com/twmb/franz-go/pkg/sr` SchemaRegistry client. (@mihaitodor)

## 4.39.0 - 2024-11-07

Expand Down
20 changes: 20 additions & 0 deletions docs/modules/components/pages/outputs/redpanda_migrator.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,8 @@ output:
input_resource: redpanda_migrator_input
replication_factor_override: true
replication_factor: 3
translate_schema_ids: true
schema_registry_output_resource: schema_registry_output
partitioner: "" # No default (optional)
idempotent_write: true
compression: "" # No default (optional)
Expand Down Expand Up @@ -770,6 +772,24 @@ Replication factor for created topics. This is only used when `replication_facto
*Default*: `3`
=== `translate_schema_ids`
Translate schema IDs.
*Type*: `bool`
*Default*: `true`
=== `schema_registry_output_resource`
The label of the schema_registry output to use for fetching schema IDs.
*Type*: `string`
*Default*: `"schema_registry_output"`
=== `partitioner`
Override the default murmur2 hashing partitioner.
Expand Down
144 changes: 77 additions & 67 deletions docs/modules/components/pages/outputs/redpanda_migrator_offsets.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -55,12 +55,7 @@ output:
label: ""
redpanda_migrator_offsets:
seed_brokers: [] # No default (required)
kafka_key: ${! @kafka_key }
client_id: benthos
max_in_flight: 1
timeout: 10s
max_message_bytes: 1MB
broker_write_max_bytes: 100MB
tls:
enabled: false
skip_cert_verify: false
Expand All @@ -69,6 +64,12 @@ output:
root_cas_file: ""
client_certs: []
sasl: [] # No default (optional)
metadata_max_age: 5m
kafka_key: ${! @kafka_key }
max_in_flight: 1
timeout: 10s
max_message_bytes: 1MB
broker_write_max_bytes: 100MB
max_retries: 0
backoff:
initial_interval: 1s
Expand Down Expand Up @@ -105,16 +106,6 @@ seed_brokers:
- foo:9092,bar:9092
```
=== `kafka_key`
Kafka key.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
*Default*: `"${! @kafka_key }"`
=== `client_id`
An identifier for the client connection.
Expand All @@ -124,58 +115,6 @@ An identifier for the client connection.
*Default*: `"benthos"`
=== `max_in_flight`
The maximum number of batches to be sending in parallel at any given time.
*Type*: `int`
*Default*: `1`
=== `timeout`
The maximum period of time to wait for message sends before abandoning the request and retrying
*Type*: `string`
*Default*: `"10s"`
=== `max_message_bytes`
The maximum space in bytes than an individual message may take, messages larger than this value will be rejected. This field corresponds to Kafka's `max.message.bytes`.
*Type*: `string`
*Default*: `"1MB"`
```yml
# Examples
max_message_bytes: 100MB
max_message_bytes: 50mib
```
=== `broker_write_max_bytes`
The upper bound for the number of bytes written to a broker connection in a single write. This field corresponds to Kafka's `socket.request.max.bytes`.
*Type*: `string`
*Default*: `"100MB"`
```yml
# Examples
broker_write_max_bytes: 128MB
broker_write_max_bytes: 50mib
```
=== `tls`
Custom TLS settings can be used to override system defaults.
Expand Down Expand Up @@ -522,6 +461,77 @@ An external ID to provide when assuming a role.
*Default*: `""`
=== `metadata_max_age`
The maximum age of metadata before it is refreshed.
*Type*: `string`
*Default*: `"5m"`
=== `kafka_key`
Kafka key.
This field supports xref:configuration:interpolation.adoc#bloblang-queries[interpolation functions].
*Type*: `string`
*Default*: `"${! @kafka_key }"`
=== `max_in_flight`
The maximum number of batches to be sending in parallel at any given time.
*Type*: `int`
*Default*: `1`
=== `timeout`
The maximum period of time to wait for message sends before abandoning the request and retrying
*Type*: `string`
*Default*: `"10s"`
=== `max_message_bytes`
The maximum space in bytes than an individual message may take, messages larger than this value will be rejected. This field corresponds to Kafka's `max.message.bytes`.
*Type*: `string`
*Default*: `"1MB"`
```yml
# Examples
max_message_bytes: 100MB
max_message_bytes: 50mib
```
=== `broker_write_max_bytes`
The upper bound for the number of bytes written to a broker connection in a single write. This field corresponds to Kafka's `socket.request.max.bytes`.
*Type*: `string`
*Default*: `"100MB"`
```yml
# Examples
broker_write_max_bytes: 128MB
broker_write_max_bytes: 50mib
```
=== `max_retries`
The maximum number of retries before giving up on the request. If set to zero there is no discrete limit.
Expand Down
20 changes: 20 additions & 0 deletions docs/modules/components/pages/outputs/schema_registry.adoc
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,8 @@ output:
schema_registry:
url: "" # No default (required)
subject: "" # No default (required)
backfill_dependencies: true
input_resource: schema_registry_input
tls:
enabled: false
skip_cert_verify: false
Expand Down Expand Up @@ -141,6 +143,24 @@ This field supports xref:configuration:interpolation.adoc#bloblang-queries[inter
*Type*: `string`
=== `backfill_dependencies`
Backfill schema references and previous versions.
*Type*: `bool`
*Default*: `true`
=== `input_resource`
The label of the schema_registry input from which to read source schemas.
*Type*: `string`
*Default*: `"schema_registry_input"`
=== `tls`
Custom TLS settings can be used to override system defaults.
Expand Down
1 change: 1 addition & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ require (
github.com/twmb/franz-go v1.17.1
github.com/twmb/franz-go/pkg/kadm v1.13.0
github.com/twmb/franz-go/pkg/kmsg v1.8.0
github.com/twmb/franz-go/pkg/sr v1.2.0
github.com/vmihailenco/msgpack/v5 v5.4.1
github.com/xdg-go/scram v1.1.2
github.com/xeipuuv/gojsonschema v1.2.0
Expand Down
2 changes: 2 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1178,6 +1178,8 @@ github.com/twmb/franz-go/pkg/kadm v1.13.0 h1:bJq4C2ZikUE2jh/wl9MtMTQ/kpmnBgVFh8X
github.com/twmb/franz-go/pkg/kadm v1.13.0/go.mod h1:VMvpfjz/szpH9WB+vGM+rteTzVv0djyHFimci9qm2C0=
github.com/twmb/franz-go/pkg/kmsg v1.8.0 h1:lAQB9Z3aMrIP9qF9288XcFf/ccaSxEitNA1CDTEIeTA=
github.com/twmb/franz-go/pkg/kmsg v1.8.0/go.mod h1:HzYEb8G3uu5XevZbtU0dVbkphaKTHk0X68N5ka4q6mU=
github.com/twmb/franz-go/pkg/sr v1.2.0 h1:zYr0Ly7KLFfeCGaSr8teN6LvAVeYVrZoUsyyPHTYB+M=
github.com/twmb/franz-go/pkg/sr v1.2.0/go.mod h1:gpd2Xl5/prkj3gyugcL+rVzagjaxFqMgvKMYcUlrpDw=
github.com/uptrace/bun v1.1.12 h1:sOjDVHxNTuM6dNGaba0wUuz7KvDE1BmNu9Gqs2gJSXQ=
github.com/uptrace/bun v1.1.12/go.mod h1:NPG6JGULBeQ9IU6yHp7YGELRa5Agmd7ATZdz4tGZ6z0=
github.com/uptrace/bun/dialect/pgdialect v1.1.12 h1:m/CM1UfOkoBTglGO5CUTKnIKKOApOYxkcP2qn0F9tJk=
Expand Down
4 changes: 2 additions & 2 deletions internal/impl/cohere/json_schema_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,12 +60,12 @@ func (p *dynamicSchemaProvider) GetJSONSchema(ctx context.Context) (jsonSchema,
if time.Now().Before(p.nextRefreshTime) {
return p.cached, nil
}
info, err := p.client.GetSchemaBySubjectAndVersion(ctx, p.subject, nil)
info, err := p.client.GetSchemaBySubjectAndVersion(ctx, p.subject, nil, false)
if err != nil {
return nil, fmt.Errorf("unable to load latest schema for subject %q: %w", p.subject, err)
}
var schema jsonSchema
if err := json.Unmarshal([]byte(info.Schema), &schema); err != nil {
if err := json.Unmarshal([]byte(info.Schema.Schema), &schema); err != nil {
return nil, fmt.Errorf("unable to parse json schema from schema with ID=%d", info.ID)
}
p.cached = schema
Expand Down
22 changes: 11 additions & 11 deletions internal/impl/confluent/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
franz_sr "github.com/twmb/franz-go/pkg/sr"

"github.com/redpanda-data/benthos/v4/public/service"

Expand Down Expand Up @@ -50,7 +51,7 @@ func TestSchemaRegistryClient_GetSchemaBySubjectAndVersion(t *testing.T) {
name string
schemaRegistryServerURL string
args args
wantResPayload sr.SchemaInfo
wantResPayload franz_sr.SubjectSchema
wantErr assert.ErrorAssertionFunc
}{
{
Expand All @@ -60,9 +61,9 @@ func TestSchemaRegistryClient_GetSchemaBySubjectAndVersion(t *testing.T) {
subject: "foo",
version: nil,
},
wantResPayload: sr.SchemaInfo{
wantResPayload: franz_sr.SubjectSchema{
ID: 3,
Schema: testSchema,
Schema: franz_sr.Schema{Schema: testSchema},
},
wantErr: assert.NoError,
},
Expand All @@ -73,9 +74,9 @@ func TestSchemaRegistryClient_GetSchemaBySubjectAndVersion(t *testing.T) {
subject: "main/common",
version: nil,
},
wantResPayload: sr.SchemaInfo{
wantResPayload: franz_sr.SubjectSchema{
ID: 3,
Schema: testSchema,
Schema: franz_sr.Schema{Schema: testSchema},
},
wantErr: assert.NoError,
},
Expand All @@ -86,9 +87,9 @@ func TestSchemaRegistryClient_GetSchemaBySubjectAndVersion(t *testing.T) {
subject: "foo",
version: &version,
},
wantResPayload: sr.SchemaInfo{
wantResPayload: franz_sr.SubjectSchema{
ID: 3,
Schema: testSchema,
Schema: franz_sr.Schema{Schema: testSchema},
},
wantErr: assert.NoError,
},
Expand All @@ -99,9 +100,9 @@ func TestSchemaRegistryClient_GetSchemaBySubjectAndVersion(t *testing.T) {
subject: "main/common",
version: &version,
},
wantResPayload: sr.SchemaInfo{
wantResPayload: franz_sr.SubjectSchema{
ID: 3,
Schema: testSchema,
Schema: franz_sr.Schema{Schema: testSchema},
},
wantErr: assert.NoError,
},
Expand All @@ -114,11 +115,10 @@ func TestSchemaRegistryClient_GetSchemaBySubjectAndVersion(t *testing.T) {
}
return nil, errors.New("nope")
})

c, err := sr.NewClient(urlStr, noopReqSign, nil, service.MockResources())
require.NoError(t, err)

gotResPayload, err := c.GetSchemaBySubjectAndVersion(ctx, tt.args.subject, tt.args.version)
gotResPayload, err := c.GetSchemaBySubjectAndVersion(ctx, tt.args.subject, tt.args.version, false)
if !tt.wantErr(t, err, fmt.Sprintf("GetSchemaBySubjectAndVersion(%v, %v, %v)", ctx, tt.args.subject, tt.args.version)) {
return
}
Expand Down
Loading

0 comments on commit 7dd7f1b

Please sign in to comment.