Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] combined count when loadfile in dpp (backport #34787) #35461

Merged
merged 1 commit into from
Nov 21, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -675,17 +675,18 @@ private StructType constructSrcSchema(EtlJobConfig.EtlFileGroup fileGroup, EtlJo
* then "*" will likely be cast to wrong output according to its field type such as NULL for INT
* in {@link SparkDpp#convertSrcDataframeToDstDataframe}
*/
private Dataset<Row> loadDataFromPath(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
String fileUrl,
EtlJobConfig.EtlIndex baseIndex) throws SparkDppException {
List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
private Dataset<Row> loadDataFromPaths(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
List<String> fileUrls,
List<String> columnValueFromPath,
EtlJobConfig.EtlIndex baseIndex) throws SparkDppException {

StructType srcSchema = constructSrcSchema(fileGroup, baseIndex);
Dataset<Row> sourceData = null;
if (StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc")) {
sourceData = spark.read().orc(fileUrl);
sourceData = spark.read().orc(fileUrls.toArray(new String[] {}));
} else if (StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet")) {
sourceData = spark.read().parquet(fileUrl);
sourceData = spark.read().parquet(fileUrls.toArray(new String[] {}));
}
if (fileGroup.columnsFromPath != null) {
for (int i = 0; i < fileGroup.columnsFromPath.size(); i++) {
Expand All @@ -699,7 +700,6 @@ private Dataset<Row> loadDataFromPath(SparkSession spark,
throw new SparkDppException("The schema of file and table must be equal. " +
"file schema: " + sourceData.schema().treeString() + ", table schema: " + srcSchema.treeString());
}
scannedRowsAcc.add(sourceData.count());
// TODO: data quality check for orc/parquet load
// Check process is roughly the same as the hive load, but there are some bugs to fix.
// Uncomment below when method checkDataFromHiveWithStrictMode is ready.
Expand All @@ -715,19 +715,19 @@ private Dataset<Row> loadDataFromPath(SparkSession spark,
* then "*" will likely be cast to wrong output according to its field type such as NULL for INT
* in {@link SparkDpp#convertSrcDataframeToDstDataframe}
*/
private Dataset<Row> loadDataFromPath(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
String fileUrl,
EtlJobConfig.EtlIndex baseIndex,
List<EtlJobConfig.EtlColumn> columns) throws SparkDppException {
List<String> columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath);
private Dataset<Row> loadDataFromPaths(SparkSession spark,
EtlJobConfig.EtlFileGroup fileGroup,
List<String> fileUrls,
List<String> columnValueFromPath,
EtlJobConfig.EtlIndex baseIndex,
List<EtlJobConfig.EtlColumn> columns) throws SparkDppException {
// for getting schema to check source data
Map<String, Integer> dstColumnNameToIndex = new HashMap<String, Integer>();
for (int i = 0; i < baseIndex.columns.size(); i++) {
dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i);
}
StructType srcSchema = constructSrcSchema(fileGroup, baseIndex);
JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD();
JavaRDD<String> sourceDataRdd = spark.read().textFile(fileUrls.toArray(new String[] {})).toJavaRDD();
int columnSize = srcSchema.size() - columnValueFromPath.size();
List<ColumnParser> parsers = new ArrayList<>();
for (EtlJobConfig.EtlColumn column : baseIndex.columns) {
Expand Down Expand Up @@ -989,32 +989,53 @@ private Dataset<Row> loadDataFromFilePaths(SparkSession spark,
if (fileStatuses == null) {
throw new SparkDppException("fs list status failed: " + filePath);
}
Map<List<String>, List<String>> filesGroupByPathColumn = new HashMap<>();
for (FileStatus fileStatus : fileStatuses) {
if (fileStatus.isDirectory()) {
continue;
}
fileNumberAcc.add(1);
fileSizeAcc.add(fileStatus.getLen());
String dataFilePath = fileStatus.getPath().toString();
List<String> columnValuesFromPath = DppUtils.parseColumnsFromPath(dataFilePath, fileGroup.columnsFromPath);
List<String> groupedFilePaths = filesGroupByPathColumn.getOrDefault(columnValuesFromPath, new ArrayList<>());
groupedFilePaths.add(dataFilePath);
filesGroupByPathColumn.put(columnValuesFromPath, groupedFilePaths);
}

for (Map.Entry<List<String>, List<String>> entry : filesGroupByPathColumn.entrySet()) {
List<String> columnValuesFromPath = entry.getKey();
List<String> groupedFilePaths = entry.getValue();

if (fileGroup.fileFormat != null &&
(StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc") ||
StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet"))) {
dataframe = loadDataFromPath(spark, fileGroup, fileStatus.getPath().toString(), baseIndex);
dataframe = loadDataFromPaths(spark, fileGroup, groupedFilePaths, columnValuesFromPath, baseIndex);
} else {
dataframe = loadDataFromPath(spark, fileGroup, fileStatus.getPath().toString(),
dataframe = loadDataFromPaths(spark, fileGroup, groupedFilePaths, columnValuesFromPath,
baseIndex, baseIndex.columns);
}

dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup);
if (fileGroupDataframe == null) {
fileGroupDataframe = dataframe;
} else {
fileGroupDataframe = fileGroupDataframe.union(dataframe);
}
}

} catch (Exception e) {
LOG.warn("parse path failed:" + filePath);
throw e;
}
}

if (fileGroup.fileFormat != null &&
(StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc") ||
StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet"))) {
scannedRowsAcc.add(fileGroupDataframe.count());
}

return fileGroupDataframe;
}

Expand Down
Loading