-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
PIP-144: Making SchemaRegistry implementation configurable #14102
base: master
Are you sure you want to change the base?
Conversation
conf/standalone.conf
Outdated
@@ -180,6 +180,8 @@ maxTopicsPerNamespace=0 | |||
# 'is_allow_auto_update_schema' of namespace policy. | |||
isAllowAutoUpdateSchemaEnabled=true | |||
|
|||
schemaRegistryClass= |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's better to have a description, by default pulsar will use the internal schema registry which is based on the bookkeeper.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@codelipenghui i have added it now
@@ -42,19 +44,29 @@ | |||
return checkers; | |||
} | |||
|
|||
static SchemaRegistryService create(SchemaStorage schemaStorage, Set<String> schemaRegistryCompatibilityCheckers) { | |||
static SchemaRegistryService create(String schemaRegistryName, SchemaStorage schemaStorage, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
static SchemaRegistryService create(String schemaRegistryName, SchemaStorage schemaStorage, | |
static SchemaRegistryService create(String schemaRegistryClassName, SchemaStorage schemaStorage, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is updated
return SchemaRegistryServiceWithSchemaDataValidator | ||
.of(new SchemaRegistryServiceImpl(schemaStorage, checkers)); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Correct the code format there.
return SchemaRegistryServiceWithSchemaDataValidator | ||
.of(new SchemaRegistryServiceImpl(schemaStorage, checkers)); | ||
} else { | ||
return (SchemaRegistryService) Class.forName(schemaRegistryName) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Better to have a unit test to make sure the configuration works well.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
the default case is already tested in the existing test cases. i'll add some more test cases to explicitly test this part.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
have updated this logic. we are using a default value in the config and continuing to use the schema validator wrapper class.
the reasoning was that any custom schema registry implementation would need to comply with existing usages of SchemaRegistryService interface, including pulsar-admin apis and the client connection flows. the SchemaRegistryServiceWithSchemaDataValidator wrapper class ensures that any input schemas are validated via avro/json/protobuf-native parser before the compatibility checks and schema storage interactions are applied.
comments from yesterday's review by @merlimat and ankur:
|
@aparajita89 do we also need to update the |
@Huanli-Meng this is an optional config. if no value is provided in the broker.conf file then the existing flow would go through. imo, we can add this config to the documentation but there isn't a need to update the config files unless someone really requires it. |
@Anonymitaet could you let me know what changes i need to make for the documentation? |
@aparajita89 how about adding the parameter and its description here https://pulsar.apache.org/docs/en/next/reference-configuration/#standalone? |
@Anonymitaet thanks, i have added it in reference-configuration.md |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@aparajita89 only one minor comment. PTAL
@@ -235,6 +235,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di | |||
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication || | |||
|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false| | |||
|schemaRegistryStorageClassName|The schema storage implementation used by this broker.|org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory| | |||
|schemaRegistryClassName|Override the schema registry used by pulsar with a custom implementation. If this config is not provided, the default schema registry will be used.|org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we change the description of the parameter as below ?
Specify the schema registry to be used in Pulsar. If it is not set, the default schema registry is used.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
sure, i have updated it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Approve from technical writer's points of view. Let's wait for technical approval.
} catch (Exception e) { | ||
LOG.warn("Unable to create schema registry storage, defaulting to empty storage", e); | ||
} | ||
} | ||
return new DefaultSchemaRegistryService(); | ||
} | ||
|
||
void initialize(ServiceConfiguration configuration); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why add this interface? It doesn't seem necessary
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this would be needed in custom implementations of the schema registry where additional configs are required which are specific to the custom implementation. for example, we require a custom schema registry which needs to make HTTP calls to another service. configs specific to the schema registry, like the service endpoint, needs to be passed via ServiceConfiguration and set in the schema registry using this initialize method. this is the same convention being followed for other configs such as authorization provider as well.
@@ -235,6 +235,7 @@ Pulsar brokers are responsible for handling incoming messages from producers, di | |||
|athenzDomainNames| Supported Athenz provider domain names(comma separated) for authentication || | |||
|exposePreciseBacklogInPrometheus| Enable expose the precise backlog stats, set false to use published counter and consumed counter to calculate, this would be more efficient but may be inaccurate. |false| | |||
|schemaRegistryStorageClassName|The schema storage implementation used by this broker.|org.apache.pulsar.broker.service.schema.BookkeeperSchemaStorageFactory| | |||
|schemaRegistryClassName|Specify the schema registry to be used in Pulsar. If it is not set, the default schema registry is used.|org.apache.pulsar.broker.service.schema.SchemaRegistryServiceImpl| |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
doc changes should be in recent pulsar versions as well right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
moved this to the latest doc as the change will be in newer versions
020ac8a
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hello @aparajita89
Thanks for your initiative.
Can you please start a PIP and a discussion on [email protected] for this feature?
sure, will do that |
@eolivelli the PIP has been created, please take a look: #14395 |
@eolivelli i have added the integration test |
@@ -706,8 +707,9 @@ public void start() throws PulsarServerException { | |||
this.startNamespaceService(); | |||
|
|||
schemaStorage = createAndStartSchemaStorage(); | |||
schemaRegistryService = SchemaRegistryService.create( | |||
schemaStorage, config.getSchemaRegistryCompatibilityCheckers()); | |||
ensureSchemaRegistryName(schemaStorage); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
createAndStartSchemaStorage
will create schemaStorage, if don't set the schemaRegistryStorageClassName, it will throw NPE, so this method seem not useful or need to add a default SchemaRegistryStorageClassName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@congbobo184 the default schema registry name is being set in ServiceConfiguration so NPE would not happen here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I mean why we need check schemaStorage == null in ensureSchemaRegistryName
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this was part of the existing logic. if schema storage is null then the SchemaRegistryServiceDisabled needs to be used. otherwise, SchemaRegistryServiceImpl needs to be used by default.
below is the expected behavior for each of the combinations for configs set in broker's conf file.
- if schemaRegistryClassName == null and schemaRegistryStorageClassName == null then set schemaStorage to null and schemaRegistry to SchemaRegistryServiceDisabled
- if schemaRegistryClassName != null and schemaRegistryStorageClassName == null then set schemaStorage to null and schemaRegistry to SchemaRegistryServiceDisabled
- if schemaRegistryClassName == null and schemaRegistryStorageClassName != null then set schemaStorage to the configured object and schemaRegistry to SchemaRegistryServiceImpl
- if schemaRegistryClassName != null and schemaRegistryStorageClassName != null then set schemaStorage to the configured object and schemaRegistry to the configured object
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
in the older code, this logic was implemented in SchemaRegistryService.create(...)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
SchemaStorage schemaStorage = (SchemaStorage) createMethod.invoke(factoryInstance, this); |
in this code, createAndStartSchemaStorage will not create a schemaStorage is null. so why we should check ensureSchemaRegistryName and set default?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you are right, schemaStorage can never be null. i had only kept this null check to retain the existing logic. i have removed it now.
@eolivelli Please help review this PR again. |
ping @eolivelli Please help review this PR. |
result.complete(new SchemaAndMetadata(schemaId, schemaData, | ||
storedSchemaCompletableFuture.get().get(version).get().version)); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use logger
result.complete(new SchemaAndMetadata(schemaId, schemaData, | ||
storedSchemaCompletableFuture.get().get(versionInt).get().version)); | ||
} catch (InterruptedException e) { | ||
e.printStackTrace(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
please use logger
@@ -24,6 +24,11 @@ | |||
public class Constants { | |||
|
|||
public static final String GLOBAL_CLUSTER = "global"; | |||
public static final String SCHEMA_REGISTRY_CLASS_NAME; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: please declare the value here: "org.apache.pulsar.broker.service.schema.SchemaRegistryServiceDisabled"
@Slf4j | ||
public class SchemaRegistryTest extends TopicMessagingBase { | ||
protected String methodName; | ||
private String SCHEMA_REGISTRY="org.apache.pulsar.broker.service.schema.MockSchemaRegistry"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: static ? also please add a white space around the '=' char
The pr had no activity for 30 days, mark with Stale label. |
Fixes #14101
Motivation
SchemaRegistryService is hardcoded to use SchemaRegistryServiceImpl in org.apache.pulsar.broker.service.schema.SchemaRegistryService#create
Modifications
broker.conf has a new config called schemaRegistryClassName which will have the name of the class to be used. the class is instantiated in SchemaRegistryService interface's static create method via reflection.
Verifying this change
This change is already covered by existing tests which create a producer or consumer successfully.
Does this pull request potentially affect one of the following parts:
If
yes
was chosen, please highlight the changesDocumentation
Check the box below or label this PR directly (if you have committer privilege).
Need to update docs?
doc-required
this PR adds a new config which needs to be explained in the docs