Skip to content

Commit

Permalink
release 1.5.0
Browse files Browse the repository at this point in the history
  • Loading branch information
zgdgod committed Nov 27, 2024
1 parent fd5d06d commit ebabada
Show file tree
Hide file tree
Showing 52 changed files with 680 additions and 343 deletions.
2 changes: 1 addition & 1 deletion holo-shipper/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.73</version>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>junit</groupId>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ public class FindIncompatibleFlinkJobs {
static List<IncompatibleResult> incompatibleResults = new ArrayList<>();

public static void main(String[] args) throws Exception {

if (args.length != 5) {
System.out.println("Need 5 params. Please usage: java -cp find-incompatible-flink-jobs-1.0.jar " +
"com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <workspace_url> <access_key_id> <access_key_secret> <binlog/rpc>");
return;
}
String endpoint = REGION.get(args[0]);
if (endpoint == null) {
System.out.println("Invalid region");
Expand Down Expand Up @@ -61,6 +65,12 @@ public static void main(String[] args) throws Exception {
ListDeploymentsResponse listDeploymentsResponse =
client.listDeploymentsWithOptions(namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());

if (listDeploymentsResponse.getBody() == null) {
System.out.println("No deployments");
} else if (listDeploymentsResponse.getBody().getData() == null && listDeploymentsResponse.getBody().getErrorMessage() != null) {
System.out.println(listDeploymentsResponse.getBody().getErrorMessage());
break;
}
for (Deployment deployment : listDeploymentsResponse.getBody().getData()) {
String deploymentName = deployment.getName();
String deploymentVersion = deployment.getEngineVersion();
Expand Down
9 changes: 6 additions & 3 deletions hologres-connector-examples/README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,17 @@
# hologres-connectors-examples

Examples for Hologres connectors

# 模块介绍

* [hologres-connector-flink-examples](hologres-connector-flink-examples)

hologres flink connector 的 Examples
hologres flink connector 的 Examples

* [hologres-connector-flink-ordergen](hologres-connector-flink-ordergen)

hologres flink connector 的 订单数据源表生成工具,使用方式见 [flink自定义connector文档](https://help.aliyun.com/document_detail/193520.html)
hologres flink connector 的
订单数据源表生成工具,使用方式见 [flink自定义connector文档](https://help.aliyun.com/document_detail/193520.html)
* [hologres-connector-spark-examples](hologres-connector-spark-examples)

hologres spark connector 的 Examples
hologres spark connector 的 Examples

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@

<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-examples</artifactId>
<version>1.4.3-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>

<properties>
<java.version>1.8</java.version>
Expand All @@ -23,7 +23,7 @@
<dependency>
<groupId>com.alibaba.hologres</groupId>
<artifactId>hologres-connector-flink-1.15</artifactId>
<version>1.4.3-SNAPSHOT</version>
<version>1.5.0-SNAPSHOT</version>
<classifier>jar-with-dependencies</classifier>
</dependency>
<dependency>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,9 @@

import static org.apache.flink.table.api.Expressions.$;

/** A Flink data stream example and SQL sinking data to Hologres. */
/**
* A Flink data stream example and SQL sinking data to Hologres.
*/
public class FlinkDSAndSQLToHoloExample {

/**
Expand Down Expand Up @@ -55,13 +57,13 @@ public static void main(String[] args) throws Exception {
"Adam",
new BigDecimal("123.11"),
new Timestamp(System.currentTimeMillis())
),
),
new SourceItem(
234,
"Bob",
new BigDecimal("000.11"),
new Timestamp(System.currentTimeMillis())
));
));

Table table = tEnv.fromDataStream(source);
table.printSchema();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,9 @@
import java.math.BigDecimal;
import java.sql.Timestamp;

/** A Flink data streak example sinking data to Hologres. */
/**
* A Flink data streak example sinking data to Hologres.
*/
public class FlinkDataStreamToHoloExample {
/**
* Hologres DDL. create table sink_table(user_id bigint, user_name text, price decimal(38,
Expand Down Expand Up @@ -93,7 +95,9 @@ public static void main(String[] args) throws Exception {
env.execute("Insert");
}

/** 将用户POJO数据转换至Hologres Record的实现. */
/**
* 将用户POJO数据转换至Hologres Record的实现.
*/
public static class RecordConverter implements HologresRecordConverter<SourceItem, Record> {
private HologresConnectionParam hologresConnectionParam;
private HologresTableSchema tableSchema;
Expand All @@ -114,9 +118,9 @@ public Record convertFrom(SourceItem record) {
result.setObject(2, record.price);
result.setObject(3, record.saleTimestamp);
/* 在DataStream作业中,用户需要使用自定义的OutputFormatSinkFunction以及RecordConverter,如果要支持消息的回撤,需要在此处对convert结果设置MutationType。 需要hologres-connector 1.3.2及以上版本 */
if (record.eventType == EventType.DELETE) {
result.setType(MutationType.DELETE);
}
if (record.eventType == EventType.DELETE) {
result.setType(MutationType.DELETE);
}
return result;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,17 @@
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;


/** A Flink data stream example and SQL sinking data to Hologres. */
/**
* A Flink data stream example and SQL sinking data to Hologres.
*/
public class FlinkSQLSourceAndSinkExample {

/**
* Hologres DDL. create table source_table(user_id bigint, user_name text, price decimal(38,
* 2),sale_timestamp timestamptz);
* insert into source_table values(123,'Adam',123.11,'2022-05-19 14:33:05.418+08');
* insert into source_table values(456,'Bob',123.45,'2022-05-19 14:33:05.418+08');
*
* <p>
* Hologres DDL. create table sink_table(user_id bigint, user_name text, price decimal(38,
* 2),sale_timestamp timestamptz);
*
Expand All @@ -43,47 +45,47 @@ public static void main(String[] args) throws Exception {
String sinkTableName = commandLine.getOptionValue("sinkTableName");

EnvironmentSettings.Builder streamBuilder =
EnvironmentSettings.newInstance().inStreamingMode();
EnvironmentSettings.newInstance().inStreamingMode();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
StreamTableEnvironment tEnv =
StreamTableEnvironment.create(env, streamBuilder.build());
StreamTableEnvironment.create(env, streamBuilder.build());

String createHologresSourceTable =
String.format(
"create table source("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s'"
+ ")",
database, sourceTableName, userName, password, endPoint);
String.format(
"create table source("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'endpoint' = '%s'"
+ ")",
database, sourceTableName, userName, password, endPoint);
tEnv.executeSql(createHologresSourceTable);

String createHologresTable =
String.format(
"create table sink("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'jdbcRetryCount'='20',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'jdbcCopyWriteMode' = 'true',"
+ " 'endpoint' = '%s'"
+ ")",
database, sinkTableName, userName, password, endPoint);
String.format(
"create table sink("
+ " user_id bigint,"
+ " user_name string,"
+ " price decimal(38,2),"
+ " sale_timestamp timestamp"
+ ") with ("
+ " 'connector'='hologres',"
+ " 'jdbcRetryCount'='20',"
+ " 'dbname' = '%s',"
+ " 'tablename' = '%s',"
+ " 'username' = '%s',"
+ " 'password' = '%s',"
+ " 'jdbcCopyWriteMode' = 'true',"
+ " 'endpoint' = '%s'"
+ ")",
database, sinkTableName, userName, password, endPoint);
tEnv.executeSql(createHologresTable);

tEnv.executeSql("insert into sink select * from source");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,9 @@
import java.util.ArrayList;
import java.util.List;

/** A Flink sql example sinking data to Hologres. */
/**
* A Flink sql example sinking data to Hologres.
*/
public class FlinkSQLToHoloExample {

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/** A Flink data stream demo sink data to hologres and do custom partition. */
/**
* A Flink data stream demo sink data to hologres and do custom partition.
*/
public class FlinkSQLToHoloRePartitionExample {
private static final transient Logger LOG =
LoggerFactory.getLogger(FlinkSQLToHoloRePartitionExample.class);
Expand All @@ -35,7 +37,7 @@ public static void main(String[] args) throws Exception {
String tableName = commandLine.getOptionValue("tablename");

Configuration conf = new Configuration();
conf.setString(RestOptions.BIND_PORT,"8081");
conf.setString(RestOptions.BIND_PORT, "8081");

EnvironmentSettings.Builder streamBuilder =
EnvironmentSettings.newInstance().inStreamingMode();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,20 +3,26 @@
import java.math.BigDecimal;
import java.sql.Timestamp;

/** SourceItem. */
/**
* SourceItem.
*/
public class SourceItem {
/** for example: event type. */
/**
* for example: event type.
*/
public enum EventType {
INSERT,
DELETE
}

public EventType eventType = EventType.INSERT;
public long userId;
public String userName;
public BigDecimal price;
public Timestamp saleTimestamp;

public SourceItem() {}
public SourceItem() {
}

public SourceItem(long userId, String userName, BigDecimal price, Timestamp saleTimestamp) {
this.userId = userId;
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
log4j.rootLogger=info,console

log4j.appender.console=org.apache.log4j.ConsoleAppender
log4j.appender.console.Threshold=INFO
log4j.appender.console.ImmediateFlush=true
Expand Down
Original file line number Diff line number Diff line change
@@ -1,11 +1,12 @@
# Flink-connector-ordergen

hologres flink connector 的 订单数据源表生成工具
hologres flink connector 的 订单数据源表生成工具

### 编译

运行```mvn package -DskipTests```

### 使用方式见

[flink自定义connector文档](https://help.aliyun.com/document_detail/193520.html)
[flink自定义connector文档](https://help.aliyun.com/document_detail/193520.html)

Loading

0 comments on commit ebabada

Please sign in to comment.