-
Notifications
You must be signed in to change notification settings - Fork 3.6k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[PIP-193] [feature][connectors] Add support for a transform Function in Sinks #16740
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
@@ -384,7 +386,12 @@ private void sendOutputMessage(Record srcRecord, Object output) throws Exception | |||
try { | |||
this.sink.write(sinkRecord); | |||
} catch (Exception e) { | |||
log.info("Encountered exception in sink write: ", e); | |||
if (e instanceof ClassCastException && functionClassLoader != componentClassLoader) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we do this check at sink create or start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
you can't because each record can be different
in Java even if you declare a class with some Generic types, you can always force the system to return something wrong.
so this check must stay here.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is also a question about the type of input for user-defined functions. In the current implementation, the consumer in the sink sets the schema according to the generic definition of the sink. usually, we use GenericObject
, If the user-defined function does not match the generic, the current logic will go to the following logic instead of being sent to the sink, is this the expected behavior?
Lines 354 to 360 in 5df15dd
if (result.getUserException() != null) { | |
Exception t = result.getUserException(); | |
log.warn("Encountered exception when processing message {}", | |
srcRecord, t); | |
stats.incrUserExceptions(t); | |
srcRecord.fail(); | |
} else { |
Maybe we need to validate the input parameter types of the user-defined function at the sink start?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you have a Function with the Sink we use the input type of the Function to configure the PulsarSource instead of the Sink type. See here.
Does it answer your concerns ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oh, sorry. I miss this.
However, if the user wants to use it, the input parameter type may need to be carefully defined. In general, users can only use GenericObject
, which triggers the registration of auto_consumer
type schema, because only then can multiple topic inputs be processed.
Am I right?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can use the same input type as if you had a regular Sink or Function. If you want to consume any type of Schema then yes, you should use GenericObject
or byte[]
. If you know that the Schema will always be String, you can use String as input.
...ons/java-examples/src/main/java/org/apache/pulsar/functions/api/examples/RecordFunction.java
Show resolved
Hide resolved
} | ||
AbstractSinkRecord<?> sinkRecord; | ||
if (output instanceof Record) { | ||
sinkRecord = new OutputRecordSinkRecord<>(srcRecord, (Record) output); | ||
Record record = (Record) output; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about moving this block to a method ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
finalSchema = schema; | ||
} | ||
|
||
sinkRecord = new OutputRecordSinkRecord<>(srcRecord, record) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about creating a named class ? it will help Sink developers when they dump the "Record" object (and especially us when we will debug problems about this feature :-) )
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed the code to use distinct constructors. It does not introduce a new class though.
@@ -881,4 +963,98 @@ private void setupOutput(ContextImpl contextImpl) throws Exception { | |||
Thread.currentThread().setContextClassLoader(this.instanceClassLoader); | |||
} | |||
} | |||
|
|||
private static <T> Schema<T> getSinkSchema(Record<?> record, Class<T> clazz) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
we could move these 3 methods to some other class that does similar things, like TopicSchema.
but I have no strong opinion
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We're doing these methods bc we don't have a topic. So it feels weird to put these in a class named TopicSchema
import org.apache.pulsar.common.protocol.schema.SchemaVersion; | ||
import org.apache.pulsar.common.schema.SchemaInfo; | ||
|
||
class SinkSchemaInfoProvider implements SchemaInfoProvider { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what about adding a javadoc here ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good
class SinkSchemaInfoProvider implements SchemaInfoProvider { | ||
|
||
AtomicLong latestVersion = new AtomicLong(0); | ||
ConcurrentHashMap<SchemaVersion, SchemaInfo> schemaInfos = new ConcurrentHashMap<>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
it is not very clear to me (by reading the code) who is populating these maps.
maybe we could add explicit accessors ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I cut the code a little. Is it clearer now ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
good
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM
Following feedback, I renamed |
da621e0
to
84cf65c
Compare
If we detect that a Function is coupled with the Sink, we encode the record value with the Function schema and decode with the Sink schema. The Sink schema is created from the first schema type record received. At each record, a new schema version is created if it is the first time we get the hash of the record schema.
@cbornet It will help users a lot if you can add the docs for this feature. Do you have any plans on that? |
Transform functions are not part of 2.11 so we need to wait for the release before doing the doc... |
@cbornet the milestone tag is still 2.11. |
Yes, not included in 2.11. |
Hi @cbornet, Since 2.11 has been released, now it's good timing to start adding the docs for this feature to the next version of docs. Feel free to share your plans and anything I can help with. Thank you. |
Motivation
For PIP 193 : Sink preprocessing Function (#16739).
Implements changes to support the Sink transform function except the
LocalRunner
support.Modifications
See #16739 Implementation section
Verifying this change
This change added tests and can be verified as follows:
TODO
Does this pull request potentially affect one of the following parts:
Documentation
Check the box below or label this PR directly.
Need to update docs?
doc-required
(Your PR needs to update docs and you will update later)
doc-not-needed
(Please explain why)
doc
(Your PR contains doc changes)
doc-complete
(Docs have been already added)