Skip to content

Commit

Permalink
[Enhancement] combined count when loadfile in dpp (#34787)
Browse files Browse the repository at this point in the history
Signed-off-by: kevin wan <[email protected]>
Co-authored-by: mingge <[email protected]>
(cherry picked from commit 154ae93)
  • Loading branch information
MaxWk authored and mergify[bot] committed Nov 21, 2023
1 parent 8372dbc commit d5b37b6
Showing 1 changed file with 38 additions and 17 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -675,17 +675,18 @@ private StructType constructSrcSchema(EtlJobConfig.EtlFileGroup fileGroup, EtlJo
* then "*" will likely be cast to wrong output according to its field type such as NULL for INT
* in {@link SparkDpp#convertSrcDataframeToDstDataframe}
*/
private Dataset<Row> loadDataFromPath(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
String fileUrl,
EtlJobConfig.EtlIndex baseIndex) throws SparkDppException {
List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
private Dataset<Row> loadDataFromPaths(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
List<String> fileUrls,
List<String> columnValueFromPath,
EtlJobConfig.EtlIndex baseIndex) throws SparkDppException {

StructType srcSchema = constructSrcSchema(fileGroup, baseIndex);
Dataset<Row> sourceData = null;
if (StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc")) {
sourceData = spark.read().orc(fileUrl);
sourceData = spark.read().orc(fileUrls.toArray(new String[] {}));
} else if (StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet")) {
sourceData = spark.read().parquet(fileUrl);
sourceData = spark.read().parquet(fileUrls.toArray(new String[] {}));
}
if (fileGroup.columnsFromPath != null) {
for (int i = 0; i < fileGroup.columnsFromPath.size(); i++) {
Expand All @@ -699,7 +700,6 @@ private Dataset<Row> loadDataFromPath(SparkSession spark,
throw new SparkDppException("The schema of file and table must be equal. " +
"file schema: " + sourceData.schema().treeString() + ", table schema: " + srcSchema.treeString());
}
scannedRowsAcc.add(sourceData.count());
// TODO: data quality check for orc/parquet load
// Check process is roughly the same as the hive load, but there are some bugs to fix.
// Uncomment below when method checkDataFromHiveWithStrictMode is ready.
Expand All @@ -715,19 +715,19 @@ private Dataset<Row> loadDataFromPath(SparkSession spark,
* then "*" will likely be cast to wrong output according to its field type such as NULL for INT
* in {@link SparkDpp#convertSrcDataframeToDstDataframe}
*/
private Dataset<Row> loadDataFromPath(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
String fileUrl,
EtlJobConfig.EtlIndex baseIndex,
List<EtlJobConfig.EtlColumn> columns) throws SparkDppException {
List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
private Dataset<Row> loadDataFromPaths(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
List<String> fileUrls,
List<String> columnValueFromPath,
EtlJobConfig.EtlIndex baseIndex,
List<EtlJobConfig.EtlColumn> columns) throws SparkDppException {
// for getting schema to check source data
Map<String, Integer> dstColumnNameToIndex = new HashMap<String, Integer>();
for (int i = 0; i < baseIndex.columns.size(); i++) {
dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i);
}
StructType srcSchema = constructSrcSchema(fileGroup, baseIndex);
JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrls.toArray(new String[] {})).toJavaRDD();
int columnSize = srcSchema.size() - columnValueFromPath.size();
List<ColumnParser> parsers = new ArrayList<>();
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
Expand Down Expand Up @@ -989,32 +989,53 @@ private Dataset<Row> loadDataFromFilePaths(SparkSession spark,
if (fileStatuses == null) {
throw new SparkDppException("fs list status failed: " + filePath);
}
Map<List<String>, List<String>> filesGroupByPathColumn = new HashMap<>();
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
continue;
}
fileNumberAcc.add(1);
fileSizeAcc.add(fileStatus.getLen());
String dataFilePath = fileStatus.getPath().toString();
List<String> columnValuesFromPath = DppUtils.parseColumnsFromPath(dataFilePath, fileGroup.columnsFromPath);
List<String> groupedFilePaths = filesGroupByPathColumn.getOrDefault(columnValuesFromPath, new ArrayList<>());
groupedFilePaths.add(dataFilePath);
filesGroupByPathColumn.put(columnValuesFromPath, groupedFilePaths);
}

for (Map.Entry<List<String>, List<String>> entry : filesGroupByPathColumn.entrySet()) {
List<String> columnValuesFromPath = entry.getKey();
List<String> groupedFilePaths = entry.getValue();

if (fileGroup.fileFormat != null &&
(StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc") ||
StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet"))) {
dataframe = loadDataFromPath(spark, fileGroup, fileStatus.getPath().toString(), baseIndex);
dataframe = loadDataFromPaths(spark, fileGroup, groupedFilePaths, columnValuesFromPath, baseIndex);
} else {
dataframe = loadDataFromPath(spark, fileGroup, fileStatus.getPath().toString(),
dataframe = loadDataFromPaths(spark, fileGroup, groupedFilePaths, columnValuesFromPath,
baseIndex, baseIndex.columns);
}

dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
if (fileGroupDataframe == null) {
fileGroupDataframe = dataframe;
} else {
fileGroupDataframe = fileGroupDataframe.union(dataframe);
}
}

} catch (Exception e) {
LOG.warn("parse path failed:" + filePath);
throw e;
}
}

if (fileGroup.fileFormat != null &&
(StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc") ||
StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet"))) {
scannedRowsAcc.add(fileGroupDataframe.count());
}

return fileGroupDataframe;
}

Expand Down

0 comments on commit d5b37b6

Please sign in to comment.