Skip to content

Commit

Permalink
[HUDI-8353] Remove unnecessary glob path usage in the tests (#12093)
Browse files Browse the repository at this point in the history
  • Loading branch information
yihua authored Oct 23, 2024
1 parent 30f1465 commit 66597e5
Show file tree
Hide file tree
Showing 18 changed files with 81 additions and 137 deletions.
2 changes: 1 addition & 1 deletion docker/demo/sparksql-bootstrap-prep-source.commands
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
*/
import org.apache.spark.sql.functions.col

val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow/*/*/*").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
val df = spark.read.format("org.apache.hudi").load("/user/hive/warehouse/stock_ticks_cow").drop("_hoodie_commit_time", "_hoodie_record_key", "_hoodie_file_name", "_hoodie_commit_seqno", "_hoodie_partition_path")
// TODO(HUDI-4944): fix the test to use a partition column with slashes (`/`) included
// in the value. Currently it fails the tests due to slash encoding.
df.write.format("parquet").save("/user/hive/warehouse/stock_ticks_cow_bs_src/2018/08/31/")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -152,10 +152,7 @@ public static Dataset<Row> insertOverwriteData(SparkSession spark, JavaSparkCont
*/
public static void queryData(SparkSession spark, JavaSparkContext jsc, String tablePath, String tableName,
HoodieExampleDataGenerator<HoodieAvroPayload> dataGen) {
Dataset<Row> roViewDF = spark
.read()
.format("hudi")
.load(tablePath + "/*/*/*/*");
Dataset<Row> roViewDF = spark.read().format("hudi").load(tablePath);

roViewDF.createOrReplaceTempView("hudi_ro_table");

Expand Down Expand Up @@ -202,7 +199,7 @@ public static Dataset<Row> updateData(SparkSession spark, JavaSparkContext jsc,
*/
public static Dataset<Row> delete(SparkSession spark, String tablePath, String tableName) {

Dataset<Row> roViewDF = spark.read().format("hudi").load(tablePath + "/*/*/*/*");
Dataset<Row> roViewDF = spark.read().format("hudi").load(tablePath);
roViewDF.createOrReplaceTempView("hudi_ro_table");
Dataset<Row> toBeDeletedDf = spark.sql("SELECT begin_lat, begin_lon, driver, end_lat, end_lon, fare, partitionpath, rider, ts, uuid FROM hudi_ro_table limit 2");
Dataset<Row> df = toBeDeletedDf.select("uuid", "partitionpath", "ts");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,10 +90,7 @@ object HoodieDataSourceExample {
* Load the data files into a DataFrame.
*/
def queryData(spark: SparkSession, tablePath: String, tableName: String, dataGen: HoodieExampleDataGenerator[HoodieAvroPayload]): Unit = {
val roViewDF = spark.
read.
format("hudi").
load(tablePath + "/*/*/*/*")
val roViewDF = spark.read.format("hudi").load(tablePath)

roViewDF.createOrReplaceTempView("hudi_ro_table")

Expand Down Expand Up @@ -136,7 +133,7 @@ object HoodieDataSourceExample {
*/
def delete(spark: SparkSession, tablePath: String, tableName: String): Unit = {

val roViewDF = spark.read.format("hudi").load(tablePath + "/*/*/*/*")
val roViewDF = spark.read.format("hudi").load(tablePath)
roViewDF.createOrReplaceTempView("hudi_ro_table")
val df = spark.sql("select uuid, partitionpath, ts from hudi_ro_table limit 2")

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ public Logger getLogger() {
public Dataset<Row> getDatasetToValidate(SparkSession session, ExecutionContext context,
StructType inputSchema) {
String partitionPathField = context.getWriterContext().getProps().getString(DataSourceWriteOptions.PARTITIONPATH_FIELD().key());
String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath + (partitionPathField.isEmpty() ? "/" : "/*/*/*");
String hudiPath = context.getHoodieTestSuiteWriter().getCfg().targetBasePath;
Dataset<Row> hudiDf = session.read().option(HoodieMetadataConfig.ENABLE.key(), String.valueOf(context.getHoodieTestSuiteWriter().getCfg().enableMetadataOnRead))
.format("hudi").load(hudiPath);
return hudiDf.drop(HoodieRecord.COMMIT_TIME_METADATA_FIELD).drop(HoodieRecord.COMMIT_SEQNO_METADATA_FIELD).drop(HoodieRecord.RECORD_KEY_METADATA_FIELD)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -222,10 +222,7 @@ public void run() throws Exception {
/**
* Read & do some queries
*/
Dataset<Row> snapshotQueryDF = spark.read().format("org.apache.hudi")
// pass any path glob, can include hoodie & non-hoodie
// datasets
.load(tablePath + (nonPartitionedTable ? "/*" : "/*/*/*/*"));
Dataset<Row> snapshotQueryDF = spark.read().format("org.apache.hudi").load(tablePath);
snapshotQueryDF.registerTempTable("hoodie_ro");
spark.sql("describe hoodie_ro").show();
// all trips whose fare amount was greater than 2.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,10 +321,7 @@ public int addInputAndValidateIngestion(SparkSession spark, FileSystem fs, Strin
/**
* Read & do some queries
*/
Dataset<Row> hoodieROViewDF = spark.read().format("hudi")
// pass any path glob, can include hoodie & non-hoodie
// datasets
.load(tablePath + "/*/*/*/*");
Dataset<Row> hoodieROViewDF = spark.read().format("hudi").load(tablePath);
hoodieROViewDF.registerTempTable("hoodie_ro");
spark.sql("describe hoodie_ro").show();
// all trips whose fare amount was greater than 2.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void test() throws Exception {

List<Row> rows = spark().read().format("org.apache.hudi")
.option("hoodie.datasource.query.type", "snapshot")
.load(config.getBasePath() + "/*/*")
.load(config.getBasePath())
.select("id", "name", "age", "ts", "part")
.collectAsList();
assertEquals(2, rows.size());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -320,10 +320,7 @@ public void testConcurrentWrite(boolean rowWriterEnable) throws IOException {
}

private List<Row> readRecords() {
Dataset<Row> roViewDF = sparkSession
.read()
.format("hudi")
.load(basePath + "/*/*/*/*");
Dataset<Row> roViewDF = sparkSession.read().format("hudi").load(basePath);
roViewDF.createOrReplaceTempView("hudi_ro_table");
return sparkSession.sqlContext().sql("select * from hudi_ro_table").collectAsList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -166,10 +166,7 @@ private List<WriteStatus> writeData(String commitTime, int totalRecords, boolean
}

private List<Row> readRecords() {
Dataset<Row> roViewDF = sparkSession
.read()
.format("hudi")
.load(basePath + "/*/*/*/*");
Dataset<Row> roViewDF = sparkSession.read().format("hudi").load(basePath);
roViewDF.createOrReplaceTempView("clutering_table");
return sparkSession.sqlContext().sql("select * from clutering_table").collectAsList();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,8 +621,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, noReconciliationOpts, df1)

val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
val snapshotDF1 = spark.read.format("org.apache.hudi").load(tempBasePath)
assertEquals(10, snapshotDF1.count())

assertEquals(df1.except(dropMetaFields(snapshotDF1)).count(), 0)
Expand All @@ -632,8 +631,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
val df2 = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df2)

val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
val snapshotDF2 = spark.read.format("org.apache.hudi").load(tempBasePath)
assertEquals(10, snapshotDF2.count())

// Ensure 2nd batch of updates matches.
Expand All @@ -649,8 +647,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
// write to Hudi with new column
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, noReconciliationOpts, df3)

val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
val snapshotDF3 = spark.read.format("org.apache.hudi").load(tempBasePath)
assertEquals(15, snapshotDF3.count())

// Ensure 3d batch matches
Expand All @@ -669,8 +666,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
val df4 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, reconciliationOpts, df4)

val snapshotDF4 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
val snapshotDF4 = spark.read.format("org.apache.hudi").load(tempBasePath)

assertEquals(25, snapshotDF4.count())

Expand Down Expand Up @@ -710,7 +706,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
)
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, allowOpts, df5)

val snapshotDF5 = spark.read.format("org.apache.hudi").load(tempBasePath + "/*/*/*/*")
val snapshotDF5 = spark.read.format("org.apache.hudi").load(tempBasePath)
assertEquals(35, snapshotDF5.count())
assertEquals(df5.intersect(dropMetaFields(snapshotDF5)).except(df5).count, 0)

Expand Down Expand Up @@ -795,8 +791,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
val df1 = spark.createDataFrame(sc.parallelize(recordsSeq), structType)
// write to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Overwrite, fooTableModifier, df1)
val snapshotDF1 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
val snapshotDF1 = spark.read.format("org.apache.hudi").load(tempBasePath)
assertEquals(10, snapshotDF1.count())
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf1 = dropMetaFields(snapshotDF1)
Expand All @@ -806,8 +801,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
val updatesDf = spark.createDataFrame(sc.parallelize(updatesSeq), structType)
// write updates to Hudi
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, updatesDf)
val snapshotDF2 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
val snapshotDF2 = spark.read.format("org.apache.hudi").load(tempBasePath)
assertEquals(10, snapshotDF2.count())
// remove metadata columns so that expected and actual DFs can be compared as is
val trimmedDf2 = dropMetaFields(snapshotDF2)
Expand Down Expand Up @@ -839,8 +833,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {

fooTableModifier = fooTableModifier.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, recordsToDelete)
val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
val snapshotDF3 = spark.read.format("org.apache.hudi").load(tempBasePath)
assertEquals(0, snapshotDF3.filter(entry => {
val partitionPath = entry.getString(3)
!partitionPath.equals(HoodieTestDataGenerator.DEFAULT_THIRD_PARTITION_PATH)
Expand All @@ -861,8 +854,7 @@ def testBulkInsertForDropPartitionColumn(): Unit = {
fooTableModifier = fooTableModifier.updated(DataSourceWriteOptions.OPERATION.key(), WriteOperationType.DELETE_PARTITION.name())
val recordsToDelete = spark.emptyDataFrame
HoodieSparkSqlWriter.write(sqlContext, SaveMode.Append, fooTableModifier, recordsToDelete)
val snapshotDF3 = spark.read.format("org.apache.hudi")
.load(tempBasePath + "/*/*/*/*")
val snapshotDF3 = spark.read.format("org.apache.hudi").load(tempBasePath)
snapshotDF3.show()
assertEquals(0, snapshotDF3.filter(entry => {
val partitionPath = entry.getString(3)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

package org.apache.hudi.functional

import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport}
import org.apache.hudi.HoodieConversionUtils.toJavaOption
import org.apache.hudi.common.model.{HoodieRecord, HoodieTableType, OverwriteWithLatestAvroPayload}
import org.apache.hudi.common.table.{HoodieTableConfig, TableSchemaResolver}
Expand All @@ -26,14 +27,13 @@ import org.apache.hudi.exception.SchemaCompatibilityException
import org.apache.hudi.functional.TestBasicSchemaEvolution.{dropColumn, injectColumnAt}
import org.apache.hudi.testutils.HoodieSparkClientTestBase
import org.apache.hudi.util.JFunction
import org.apache.hudi.{AvroConversionUtils, DataSourceWriteOptions, ScalaAssertionSupport}

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.{functions, HoodieUnsafeUtils, Row, SaveMode, SparkSession, SparkSessionExtensions}
import org.apache.spark.sql.hudi.HoodieSparkSessionExtension
import org.apache.spark.sql.types.{IntegerType, LongType, StringType, StructField, StructType}
import org.apache.spark.sql.{HoodieUnsafeUtils, Row, SaveMode, SparkSession, SparkSessionExtensions, functions}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.api.{AfterEach, BeforeEach}
import org.junit.jupiter.api.Assertions.assertEquals
import org.junit.jupiter.params.ParameterizedTest
import org.junit.jupiter.params.provider.CsvSource

Expand Down Expand Up @@ -119,23 +119,17 @@ class TestBasicSchemaEvolution extends HoodieSparkClientTestBase with ScalaAsser
.save(basePath)
}

def loadTable(loadAllVersions: Boolean = true): (StructType, Seq[Row]) = {
def loadTable(): (StructType, Seq[Row]) = {
val tableMetaClient = createMetaClient(spark, basePath)

tableMetaClient.reloadActiveTimeline()

val resolver = new TableSchemaResolver(tableMetaClient)
val latestTableSchema = AvroConversionUtils.convertAvroSchemaToStructType(resolver.getTableAvroSchema(false))

val tablePath = if (loadAllVersions) {
s"$basePath/*/*"
} else {
basePath
}

val df =
spark.read.format("org.apache.hudi")
.load(tablePath)
.load(basePath)
.drop(HoodieRecord.HOODIE_META_COLUMNS.asScala.toSeq: _*)
.orderBy(functions.col("_row_key").cast(IntegerType))

Expand Down
Loading

0 comments on commit 66597e5

Please sign in to comment.