diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java index e18cb1f54b..b1397bbd72 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1/it/ITBigQueryStorageTest.java @@ -33,6 +33,7 @@ import com.google.cloud.RetryOption; import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Field.Mode; @@ -170,7 +171,9 @@ public static void beforeClass() throws IOException { RemoteBigQueryHelper bigqueryHelper = RemoteBigQueryHelper.create(); bigquery = bigqueryHelper.getOptions().getService(); DatasetInfo datasetInfo = - DatasetInfo.newBuilder(/* datasetId = */ DATASET).setDescription(DESCRIPTION).build(); + DatasetInfo.newBuilder(/* datasetId bigquery= */ DATASET) + .setDescription(DESCRIPTION) + .build(); bigquery.create(datasetInfo); LOG.info("Created test dataset: " + DATASET); } @@ -188,7 +191,7 @@ public static void afterClass() { } @Test - public void testSimpleRead() { + public void testSimpleReadAvro() { String table = BigQueryResource.FormatTableResource( /* projectId = */ "bigquery-public-data", @@ -222,6 +225,110 @@ public void testSimpleRead() { assertEquals(164_656, rowCount); } + @Test + public void testSimpleReadArrow() { + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ "bigquery-public-data", + /* datasetId = */ "samples", + /* tableId = */ "shakespeare"); + + ReadSession session = + client.createReadSession( + /* parent = */ parentProjectId, + /* readSession = */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.ARROW) + .build(), + /* maxStreamCount = */ 1); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + long rowCount = 0; + // Process each block of rows as they arrive and decode using our simple row reader. + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + rowCount += response.getRowCount(); + } + assertEquals(164_656, rowCount); + } + + @Test + public void testRangeType() throws InterruptedException { + // Create table with Range values. + String tableName = "test_range_type"; + TableId tableId = TableId.of(DATASET, tableName); + QueryJobConfiguration createTable = + QueryJobConfiguration.newBuilder( + String.format( + "CREATE TABLE %s AS SELECT RANGE(DATE '2020-01-01', DATE '2020-12-31') as date, \n" + + "RANGE(DATETIME '2020-01-01T12:00:00', DATETIME '2020-12-31T12:00:00') as datetime, \n" + + "RANGE(TIMESTAMP '2014-01-01 07:00:00.000000+00:00', TIMESTAMP '2015-01-01 07:00:00.000000+00:00') as timestamp", + tableName)) + .setDefaultDataset(DatasetId.of(DATASET)) + .setUseLegacySql(false) + .build(); + bigquery.query(createTable); + + String table = + BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ DATASET, + /* tableId = */ tableId.getTable()); + + ReadSession session = + client.createReadSession( + /* parent = */ parentProjectId, + /* readSession = */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.ARROW) + .build(), + /* maxStreamCount = */ 1); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + long rowCount = 0; + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + rowCount += response.getRowCount(); + } + assertEquals(1, rowCount); + } + @Test public void testSimpleReadAndResume() { String table = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java index 933bb77ab1..68777c386f 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta1/it/ITBigQueryStorageTest.java @@ -31,6 +31,7 @@ import com.google.cloud.RetryOption; import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Field.Mode; @@ -225,6 +226,100 @@ public void testSimpleRead() { assertEquals(164_656, rowCount); } + @Test + public void testSimpleReadArrow() { + TableReference tableReference = + TableReference.newBuilder() + .setProjectId("bigquery-public-data") + .setDatasetId("samples") + .setTableId("shakespeare") + .build(); + + CreateReadSessionRequest request = + CreateReadSessionRequest.newBuilder() + .setParent(parentProjectId) + .setRequestedStreams(1) + .setTableReference(tableReference) + .setFormat(DataFormat.ARROW) + .build(); + ReadSession session = client.createReadSession(request); + assertEquals( + String.format( + "Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s", + TextFormat.shortDebugString(tableReference), session.toString()), + 1, + session.getStreamsCount()); + + StreamPosition readPosition = + StreamPosition.newBuilder().setStream(session.getStreams(0)).build(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); + + long rowCount = 0; + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + rowCount += response.getRowCount(); + } + + assertEquals(164_656, rowCount); + } + + @Test + public void testRangeType() throws InterruptedException { + // Create table with Range values. + String tableName = "test_range_type"; + QueryJobConfiguration createTable = + QueryJobConfiguration.newBuilder( + String.format( + "CREATE TABLE %s AS SELECT RANGE(DATE '2020-01-01', DATE '2020-12-31') as date, \n" + + "RANGE(DATETIME '2020-01-01T12:00:00', DATETIME '2020-12-31T12:00:00') as datetime, \n" + + "RANGE(TIMESTAMP '2014-01-01 07:00:00.000000+00:00', TIMESTAMP '2015-01-01 07:00:00.000000+00:00') as timestamp", + tableName)) + .setDefaultDataset(DatasetId.of(DATASET)) + .setUseLegacySql(false) + .build(); + bigquery.query(createTable); + + TableReference tableReference = + TableReference.newBuilder() + .setProjectId(ServiceOptions.getDefaultProjectId()) + .setDatasetId(DATASET) + .setTableId(tableName) + .build(); + + CreateReadSessionRequest createReadSessionRequestrequest = + CreateReadSessionRequest.newBuilder() + .setParent(parentProjectId) + .setRequestedStreams(1) + .setTableReference(tableReference) + .setFormat(DataFormat.ARROW) + .build(); + ReadSession session = client.createReadSession(createReadSessionRequestrequest); + assertEquals( + String.format( + "Did not receive expected number of streams for table reference '%s' CreateReadSession response:%n%s", + TextFormat.shortDebugString(tableReference), session.toString()), + 1, + session.getStreamsCount()); + + StreamPosition readPosition = + StreamPosition.newBuilder().setStream(session.getStreams(0)).build(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadPosition(readPosition).build(); + + long rowCount = 0; + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + rowCount += response.getRowCount(); + } + + assertEquals(1, rowCount); + } + @Test public void testSimpleReadAndResume() { TableReference tableReference = diff --git a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java index fdf440e44a..539b3b2d05 100644 --- a/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java +++ b/google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/it/ITBigQueryStorageTest.java @@ -31,6 +31,7 @@ import com.google.cloud.RetryOption; import com.google.cloud.ServiceOptions; import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.DatasetId; import com.google.cloud.bigquery.DatasetInfo; import com.google.cloud.bigquery.Field; import com.google.cloud.bigquery.Field.Mode; @@ -221,6 +222,110 @@ public void testSimpleRead() { assertEquals(164_656, rowCount); } + @Test + public void testSimpleReadArrow() { + String table = + com.google.cloud.bigquery.storage.v1.it.BigQueryResource.FormatTableResource( + /* projectId = */ "bigquery-public-data", + /* datasetId = */ "samples", + /* tableId = */ "shakespeare"); + + ReadSession session = + client.createReadSession( + /* parent = */ parentProjectId, + /* readSession = */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.ARROW) + .build(), + /* maxStreamCount = */ 1); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + long rowCount = 0; + // Process each block of rows as they arrive and decode using our simple row reader. + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + rowCount += response.getRowCount(); + } + assertEquals(164_656, rowCount); + } + + @Test + public void testRangeType() throws InterruptedException { + // Create table with Range values. + String tableName = "test_range_type"; + TableId tableId = TableId.of(DATASET, tableName); + QueryJobConfiguration createTable = + QueryJobConfiguration.newBuilder( + String.format( + "CREATE TABLE %s AS SELECT RANGE(DATE '2020-01-01', DATE '2020-12-31') as date, \n" + + "RANGE(DATETIME '2020-01-01T12:00:00', DATETIME '2020-12-31T12:00:00') as datetime, \n" + + "RANGE(TIMESTAMP '2014-01-01 07:00:00.000000+00:00', TIMESTAMP '2015-01-01 07:00:00.000000+00:00') as timestamp", + tableName)) + .setDefaultDataset(DatasetId.of(DATASET)) + .setUseLegacySql(false) + .build(); + bigquery.query(createTable); + + String table = + com.google.cloud.bigquery.storage.v1.it.BigQueryResource.FormatTableResource( + /* projectId = */ ServiceOptions.getDefaultProjectId(), + /* datasetId = */ DATASET, + /* tableId = */ tableId.getTable()); + + ReadSession session = + client.createReadSession( + /* parent = */ parentProjectId, + /* readSession = */ ReadSession.newBuilder() + .setTable(table) + .setDataFormat(DataFormat.ARROW) + .build(), + /* maxStreamCount = */ 1); + assertEquals( + String.format( + "Did not receive expected number of streams for table '%s' CreateReadSession response:%n%s", + table, session.toString()), + 1, + session.getStreamsCount()); + + // Assert that there are streams available in the session. An empty table may not have + // data available. If no sessions are available for an anonymous (cached) table, consider + // writing results of a query to a named table rather than consuming cached results + // directly. + Preconditions.checkState(session.getStreamsCount() > 0); + + // Use the first stream to perform reading. + String streamName = session.getStreams(0).getName(); + + ReadRowsRequest readRowsRequest = + ReadRowsRequest.newBuilder().setReadStream(streamName).build(); + + long rowCount = 0; + ServerStream stream = client.readRowsCallable().call(readRowsRequest); + for (ReadRowsResponse response : stream) { + Preconditions.checkState(response.hasArrowRecordBatch()); + rowCount += response.getRowCount(); + } + assertEquals(1, rowCount); + } + @Test public void testSimpleReadAndResume() { String table =