Skip to content

Commit

Permalink
Fix failure when single quote exists in Pinot condition
Browse files Browse the repository at this point in the history
  • Loading branch information
ebyhr committed Apr 24, 2024
1 parent cdbe06e commit 879d852
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,9 @@ private static String inClauseValues(String columnName, String operator, List<Ob

private static String singleQuote(Object value)
{
if (value instanceof String string) {
return format("'%s'", string.replace("'", "''"));
}
return format("'%s'", value);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,7 @@ public abstract class BasePinotConnectorSmokeTest
// If a broker query does not supply a limit, pinot defaults to 10 rows
private static final int DEFAULT_PINOT_LIMIT_FOR_BROKER_QUERIES = 10;
private static final String ALL_TYPES_TABLE = "alltypes";
private static final String STRING_TYPE_TABLE = "string_type_table";
private static final String DATE_TIME_FIELDS_TABLE = "date_time_fields";
private static final String MIXED_CASE_COLUMN_NAMES_TABLE = "mixed_case";
private static final String MIXED_CASE_DISTINCT_TABLE = "mixed_case_distinct";
Expand Down Expand Up @@ -143,6 +144,7 @@ protected QueryRunner createQueryRunner()
pinot.start();

createAndPopulateAllTypesTopic(kafka, pinot);
createAndPopulateStringTypeTopic(kafka, pinot);
createAndPopulateMixedCaseTableAndTopic(kafka, pinot);
createAndPopulateMixedCaseDistinctTableAndTopic(kafka, pinot);
createAndPopulateTooManyRowsTable(kafka, pinot);
Expand Down Expand Up @@ -193,6 +195,17 @@ private void createAndPopulateAllTypesTopic(TestingKafka kafka, TestingPinotClus
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("alltypes_realtimeSpec.json"), ALL_TYPES_TABLE);
}

private void createAndPopulateStringTypeTopic(TestingKafka kafka, TestingPinotCluster pinot)
throws Exception
{
kafka.createTopic(STRING_TYPE_TABLE);
List<ProducerRecord<String, GenericRecord>> records = ImmutableList.of(new ProducerRecord<>(STRING_TYPE_TABLE, null, createStringSingleQuoteRecord()));
kafka.sendMessages(records.stream(), schemaRegistryAwareProducer(kafka));

pinot.createSchema(getClass().getClassLoader().getResourceAsStream("string_schema.json"), STRING_TYPE_TABLE);
pinot.addRealTimeTable(getClass().getClassLoader().getResourceAsStream("string_realtimeSpec.json"), STRING_TYPE_TABLE);
}

private void createAndPopulateMixedCaseTableAndTopic(TestingKafka kafka, TestingPinotCluster pinot)
throws Exception
{
Expand Down Expand Up @@ -715,6 +728,20 @@ private static GenericRecord createNullRecord()
.build();
}

private static GenericRecord createStringSingleQuoteRecord()
{
Schema schema = SchemaBuilder.record("string_type_table")
.fields()
.name("string_col").type().optional().stringType()
.name("updated_at").type().optional().longType()
.endRecord();

return new GenericRecordBuilder(schema)
.set("string_col", "a'quote")
.set("updated_at", initialUpdatedAt.toEpochMilli())
.build();
}

private static GenericRecord createArrayNullRecord()
{
Schema schema = getAllTypesAvroSchema();
Expand Down Expand Up @@ -1264,6 +1291,14 @@ public void testCount()
assertThat(result.getRowCount()).isEqualTo(DEFAULT_PINOT_LIMIT_FOR_BROKER_QUERIES);
}

@Test
public void testStringPredicateWithSingleQuote()
{
// Regression test for https://github.com/trinodb/trino/issues/21681
assertQuery("SELECT true FROM " + STRING_TYPE_TABLE + " WHERE string_col = 'a''quote'", "VALUES true");
assertQueryReturnsEmptyResult("SELECT true FROM " + STRING_TYPE_TABLE + " WHERE string_col = 'a''empty'");
}

@Test
public void testNullBehavior()
{
Expand Down
44 changes: 44 additions & 0 deletions plugin/trino-pinot/src/test/resources/string_realtimeSpec.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
{
"tableName": "string_type_table",
"tableType": "REALTIME",
"segmentsConfig": {
"timeColumnName": "updated_at_seconds",
"timeType": "SECONDS",
"retentionTimeUnit": "DAYS",
"retentionTimeValue": "365",
"segmentPushType": "APPEND",
"segmentPushFrequency": "daily",
"segmentAssignmentStrategy": "BalanceNumSegmentAssignmentStrategy",
"schemaName": "string_type_table",
"replicasPerPartition": "1"
},
"tenants": {
"broker": "DefaultTenant",
"server": "DefaultTenant"
},
"tableIndexConfig": {
"loadMode": "MMAP",
"invertedIndexColumns": ["string_col"],
"sortedColumn": ["updated_at_seconds"],
"noDictionaryColumns": [],
"streamConfigs": {
"streamType": "kafka",
"stream.kafka.consumer.type": "LowLevel",
"stream.kafka.topic.name": "string_type_table",
"stream.kafka.decoder.class.name": "org.apache.pinot.plugin.inputformat.avro.confluent.KafkaConfluentSchemaRegistryAvroMessageDecoder",
"stream.kafka.consumer.factory.class.name": "org.apache.pinot.plugin.stream.kafka20.KafkaConsumerFactory",
"stream.kafka.decoder.prop.schema.registry.rest.url": "http://schema-registry:8081",
"stream.kafka.zk.broker.url": "zookeeper:2181/",
"stream.kafka.broker.list": "kafka:9092",
"realtime.segment.flush.threshold.time": "24h",
"realtime.segment.flush.threshold.size": "0",
"realtime.segment.flush.desired.size": "1M",
"isolation.level": "read_committed",
"stream.kafka.consumer.prop.auto.offset.reset": "smallest",
"stream.kafka.consumer.prop.group.id": "string_type_table"
}
},
"metadata": {
"customConfigs": {}
}
}
19 changes: 19 additions & 0 deletions plugin/trino-pinot/src/test/resources/string_schema.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
{
"schemaName": "string_type_table",
"dimensionFieldSpecs": [
{
"name": "string_col",
"dataType": "STRING"
}
],
"dateTimeFieldSpecs": [
{
"name": "updated_at_seconds",
"dataType": "LONG",
"defaultNullValue" : 0,
"format": "1:SECONDS:EPOCH",
"transformFunction": "toEpochSeconds(updatedAt)",
"granularity" : "1:SECONDS"
}
]
}

0 comments on commit 879d852

Please sign in to comment.