Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[INLONG-11494][Sort] Add Kafka connector on Flink 1.18 #11501

Merged
merged 5 commits into from
Nov 26, 2024

Conversation

PeterZh6
Copy link
Contributor

Fixes #11494 [Feature][Sort] Add Kafka connector on Flink 1.18

Motivation

This PR adds a Kafka connector to inlong-sort for Flink 1.18 support.

Modifications

  • Migrated the Kafka connector from flink-kafka-connector 3.2.0 to support Flink 1.18.
  • Updated the relevant Maven pom.xml to reflect the changes.

Verifying this Change

  • This change is a trivial rework/code cleanup without any test coverage.

  • This change is already covered by existing tests, such as:
    (please describe tests)

  • This change added tests and can be verified as follows:

    Further formal testing will be introduced in #11495.
    Currently, a KafkaProducer is used to send messages to Kafka, and these messages are consumed by sort-connectors-v1.18, which is introduced in this PR.
    Since there is no verified functional sink yet, the built-in file-system sink is used in this test.

    In the example below, we can see that the messages in Kafka are successfully extracted and written to /tmp/output_file.

Example Flink SQL:

CREATE TABLE kafka_source (
    `message` STRING
) WITH (
    'connector' = 'kafka-inlong',
    'topic' = 'test-topic',
    'properties.bootstrap.servers' = 'kafka:9092',
    'properties.group.id' = 'flink-group',
    'scan.startup.mode' = 'earliest-offset',
    'format' = 'json'
);

CREATE TABLE file_sink (
    `message` STRING
) WITH (
    'connector' = 'filesystem',
    'path' = 'file:///tmp/output_file',
    'format' = 'json'
);

INSERT INTO file_sink
SELECT * FROM kafka_source;

Output:
The messages in Kafka have been successfully extracted and transported to /tmp/output_file. See the screenshot below for the result:

kafka_test2_edited

Documentation

  • Does this pull request introduce a new feature? Yes
  • If yes, how is the feature documented? N/A
  • If a feature is not applicable for documentation, explain why: The version change of the sort connectors is not reflected on the official website.

@PeterZh6
Copy link
Contributor Author

Sink function of this connector not tested yet.

@aloyszhang
Copy link
Contributor

please add license if there are some copied codes from other project.

@PeterZh6
Copy link
Contributor Author

PeterZh6 commented Nov 19, 2024

please add license if there are some copied codes from other project.

Thanks for the reminder!
All the copied codes are from flink-connector-kafka. I believe they all follow Apache 2.0 license, so I don't have to add the license once more.

@aloyszhang
Copy link
Contributor

Please ref to inlong/licenses/inlong-sort-connectors/LICENSE, you should add the copied files to this file.

@PeterZh6
Copy link
Contributor Author

Please ref to inlong/licenses/inlong-sort-connectors/LICENSE, you should add the copied files to this file.

oh I see, sorry for the misunderstanding

@dockerzhang dockerzhang requested review from XiaoYou201, aloyszhang and vernedeng and removed request for XiaoYou201 November 23, 2024 18:14
aloyszhang
aloyszhang previously approved these changes Nov 24, 2024
@aloyszhang aloyszhang merged commit 83ef6a5 into apache:master Nov 26, 2024
11 checks passed
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

[Feature][Sort] Add Kafka connector on Flink 1.18
3 participants