Skip to content

Commit

Permalink
chore: add Range integration test to validate reading Range values (#…
Browse files Browse the repository at this point in the history
…2475)

* Add integration test for reading Arrow format

* fix lint

* add range type integration test

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

* remove unnecessary comment

* remove integration test decoder

* add range integration tests for v1beta1 and v1beta2

* update integration tests to use CTAS to be more deterministic

* 🦉 Updates from OwlBot post-processor

See https://github.com/googleapis/repo-automation-bots/blob/main/packages/owl-bot/README.md

---------

Co-authored-by: Owl Bot <gcf-owl-bot[bot]@users.noreply.github.com>
  • Loading branch information
PhongChuong and gcf-owl-bot[bot] authored May 2, 2024
1 parent ed414b5 commit e15b9bc
Show file tree
Hide file tree
Showing 3 changed files with 309 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.google.cloud.RetryOption;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
Expand Down Expand Up @@ -170,7 +171,9 @@ public static void beforeClass() throws IOException {
RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create();
bigquery = bigqueryHelper.getOptions().getService();
DatasetInfo datasetInfo =
DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build();
DatasetInfo.newBuilder(/* datasetId bigquery= */ DATASET)
.setDescription(DESCRIPTION)
.build();
bigquery.create(datasetInfo);
LOG.info("Created test dataset: " + DATASET);
}
Expand All @@ -188,7 +191,7 @@ public static void afterClass() {
}

@Test
public void testSimpleRead() {
public void testSimpleReadAvro() {
String table =
BigQueryResource.FormatTableResource(
/* projectId = */ "bigquery-public-data",
Expand Down Expand Up @@ -222,6 +225,110 @@ public void testSimpleRead() {
assertEquals(164_656, rowCount);
}

@Test
public void testSimpleReadArrow() {
String table =
BigQueryResource.FormatTableResource(
/* projectId = */ "bigquery-public-data",
/* datasetId = */ "samples",
/* tableId = */ "shakespeare");

ReadSession session =
client.createReadSession(
/* parent = */ parentProjectId,
/* readSession = */ ReadSession.newBuilder()
.setTable(table)
.setDataFormat(DataFormat.ARROW)
.build(),
/* maxStreamCount = */ 1);
assertEquals(
String.format(
"Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s",
table, session.toString()),
1,
session.getStreamsCount());

// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results
// directly.
Preconditions.checkState(session.getStreamsCount() > 0);

// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();

long rowCount = 0;
// Process each block of rows as they arrive and decode using our simple row reader.
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
rowCount += response.getRowCount();
}
assertEquals(164_656, rowCount);
}

@Test
public void testRangeType() throws InterruptedException {
// Create table with Range values.
String tableName = "test_range_type";
TableId tableId = TableId.of(DATASET, tableName);
QueryJobConfiguration createTable =
QueryJobConfiguration.newBuilder(
String.format(
"CREATE TABLE %s AS SELECT RANGE(DATE '2020-01-01', DATE '2020-12-31') as date, \n"
+ "RANGE(DATETIME '2020-01-01T12:00:00', DATETIME '2020-12-31T12:00:00') as datetime, \n"
+ "RANGE(TIMESTAMP '2014-01-01 07:00:00.000000+00:00', TIMESTAMP '2015-01-01 07:00:00.000000+00:00') as timestamp",
tableName))
.setDefaultDataset(DatasetId.of(DATASET))
.setUseLegacySql(false)
.build();
bigquery.query(createTable);

String table =
BigQueryResource.FormatTableResource(
/* projectId = */ ServiceOptions.getDefaultProjectId(),
/* datasetId = */ DATASET,
/* tableId = */ tableId.getTable());

ReadSession session =
client.createReadSession(
/* parent = */ parentProjectId,
/* readSession = */ ReadSession.newBuilder()
.setTable(table)
.setDataFormat(DataFormat.ARROW)
.build(),
/* maxStreamCount = */ 1);
assertEquals(
String.format(
"Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s",
table, session.toString()),
1,
session.getStreamsCount());

// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results
// directly.
Preconditions.checkState(session.getStreamsCount() > 0);

// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();

long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
rowCount += response.getRowCount();
}
assertEquals(1, rowCount);
}

@Test
public void testSimpleReadAndResume() {
String table =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.RetryOption;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
Expand Down Expand Up @@ -225,6 +226,100 @@ public void testSimpleRead() {
assertEquals(164_656, rowCount);
}

@Test
public void testSimpleReadArrow() {
TableReference tableReference =
TableReference.newBuilder()
.setProjectId("bigquery-public-data")
.setDatasetId("samples")
.setTableId("shakespeare")
.build();

CreateReadSessionRequest request =
CreateReadSessionRequest.newBuilder()
.setParent(parentProjectId)
.setRequestedStreams(1)
.setTableReference(tableReference)
.setFormat(DataFormat.ARROW)
.build();
ReadSession session = client.createReadSession(request);
assertEquals(
String.format(
"Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s",
TextFormat.shortDebugString(tableReference), session.toString()),
1,
session.getStreamsCount());

StreamPosition readPosition =
StreamPosition.newBuilder().setStream(session.getStreams(0)).build();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
rowCount += response.getRowCount();
}

assertEquals(164_656, rowCount);
}

@Test
public void testRangeType() throws InterruptedException {
// Create table with Range values.
String tableName = "test_range_type";
QueryJobConfiguration createTable =
QueryJobConfiguration.newBuilder(
String.format(
"CREATE TABLE %s AS SELECT RANGE(DATE '2020-01-01', DATE '2020-12-31') as date, \n"
+ "RANGE(DATETIME '2020-01-01T12:00:00', DATETIME '2020-12-31T12:00:00') as datetime, \n"
+ "RANGE(TIMESTAMP '2014-01-01 07:00:00.000000+00:00', TIMESTAMP '2015-01-01 07:00:00.000000+00:00') as timestamp",
tableName))
.setDefaultDataset(DatasetId.of(DATASET))
.setUseLegacySql(false)
.build();
bigquery.query(createTable);

TableReference tableReference =
TableReference.newBuilder()
.setProjectId(ServiceOptions.getDefaultProjectId())
.setDatasetId(DATASET)
.setTableId(tableName)
.build();

CreateReadSessionRequest createReadSessionRequestrequest =
CreateReadSessionRequest.newBuilder()
.setParent(parentProjectId)
.setRequestedStreams(1)
.setTableReference(tableReference)
.setFormat(DataFormat.ARROW)
.build();
ReadSession session = client.createReadSession(createReadSessionRequestrequest);
assertEquals(
String.format(
"Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s",
TextFormat.shortDebugString(tableReference), session.toString()),
1,
session.getStreamsCount());

StreamPosition readPosition =
StreamPosition.newBuilder().setStream(session.getStreams(0)).build();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadPosition(readPosition).build();

long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
rowCount += response.getRowCount();
}

assertEquals(1, rowCount);
}

@Test
public void testSimpleReadAndResume() {
TableReference tableReference =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.cloud.RetryOption;
import com.google.cloud.ServiceOptions;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.DatasetId;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.Field.Mode;
Expand Down Expand Up @@ -221,6 +222,110 @@ public void testSimpleRead() {
assertEquals(164_656, rowCount);
}

@Test
public void testSimpleReadArrow() {
String table =
com.google.cloud.bigquery.storage.v1.it.BigQueryResource.FormatTableResource(
/* projectId = */ "bigquery-public-data",
/* datasetId = */ "samples",
/* tableId = */ "shakespeare");

ReadSession session =
client.createReadSession(
/* parent = */ parentProjectId,
/* readSession = */ ReadSession.newBuilder()
.setTable(table)
.setDataFormat(DataFormat.ARROW)
.build(),
/* maxStreamCount = */ 1);
assertEquals(
String.format(
"Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s",
table, session.toString()),
1,
session.getStreamsCount());

// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results
// directly.
Preconditions.checkState(session.getStreamsCount() > 0);

// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();

long rowCount = 0;
// Process each block of rows as they arrive and decode using our simple row reader.
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
rowCount += response.getRowCount();
}
assertEquals(164_656, rowCount);
}

@Test
public void testRangeType() throws InterruptedException {
// Create table with Range values.
String tableName = "test_range_type";
TableId tableId = TableId.of(DATASET, tableName);
QueryJobConfiguration createTable =
QueryJobConfiguration.newBuilder(
String.format(
"CREATE TABLE %s AS SELECT RANGE(DATE '2020-01-01', DATE '2020-12-31') as date, \n"
+ "RANGE(DATETIME '2020-01-01T12:00:00', DATETIME '2020-12-31T12:00:00') as datetime, \n"
+ "RANGE(TIMESTAMP '2014-01-01 07:00:00.000000+00:00', TIMESTAMP '2015-01-01 07:00:00.000000+00:00') as timestamp",
tableName))
.setDefaultDataset(DatasetId.of(DATASET))
.setUseLegacySql(false)
.build();
bigquery.query(createTable);

String table =
com.google.cloud.bigquery.storage.v1.it.BigQueryResource.FormatTableResource(
/* projectId = */ ServiceOptions.getDefaultProjectId(),
/* datasetId = */ DATASET,
/* tableId = */ tableId.getTable());

ReadSession session =
client.createReadSession(
/* parent = */ parentProjectId,
/* readSession = */ ReadSession.newBuilder()
.setTable(table)
.setDataFormat(DataFormat.ARROW)
.build(),
/* maxStreamCount = */ 1);
assertEquals(
String.format(
"Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s",
table, session.toString()),
1,
session.getStreamsCount());

// Assert that there are streams available in the session. An empty table may not have
// data available. If no sessions are available for an anonymous (cached) table, consider
// writing results of a query to a named table rather than consuming cached results
// directly.
Preconditions.checkState(session.getStreamsCount() > 0);

// Use the first stream to perform reading.
String streamName = session.getStreams(0).getName();

ReadRowsRequest readRowsRequest =
ReadRowsRequest.newBuilder().setReadStream(streamName).build();

long rowCount = 0;
ServerStream<ReadRowsResponse> stream = client.readRowsCallable().call(readRowsRequest);
for (ReadRowsResponse response : stream) {
Preconditions.checkState(response.hasArrowRecordBatch());
rowCount += response.getRowCount();
}
assertEquals(1, rowCount);
}

@Test
public void testSimpleReadAndResume() {
String table =
Expand Down

0 comments on commit e15b9bc

Please sign in to comment.