diff --git a/fe/spark-dpp/src/main/java/com/starrocks/load/loadv2/dpp/SparkDpp.java b/fe/spark-dpp/src/main/java/com/starrocks/load/loadv2/dpp/SparkDpp.java index 853bbf62b5d7fe..938e91e24368e8 100644 --- a/fe/spark-dpp/src/main/java/com/starrocks/load/loadv2/dpp/SparkDpp.java +++ b/fe/spark-dpp/src/main/java/com/starrocks/load/loadv2/dpp/SparkDpp.java @@ -662,17 +662,18 @@ private StructType constructSrcSchema(EtlJobConfig.EtlFileGroup fileGroup, EtlJo * then "*" will likely be cast to wrong output according to its field type such as NULL for INT * in {@link SparkDpp#convertSrcDataframeToDstDataframe} */ - private Dataset loadDataFromPath(SparkSession spark, - EtlJobConfig.EtlFileGroup fileGroup, - String fileUrl, - EtlJobConfig.EtlIndex baseIndex) throws SparkDppException { - List columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath); + private Dataset loadDataFromPaths(SparkSession spark, + EtlJobConfig.EtlFileGroup fileGroup, + List fileUrls, + List columnValueFromPath, + EtlJobConfig.EtlIndex baseIndex) throws SparkDppException { + StructType srcSchema = constructSrcSchema(fileGroup, baseIndex); Dataset sourceData = null; if (StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc")) { - sourceData = spark.read().orc(fileUrl); + sourceData = spark.read().orc(fileUrls.toArray(new String[] {})); } else if (StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet")) { - sourceData = spark.read().parquet(fileUrl); + sourceData = spark.read().parquet(fileUrls.toArray(new String[] {})); } if (fileGroup.columnsFromPath != null) { for (int i = 0; i < fileGroup.columnsFromPath.size(); i++) { @@ -686,7 +687,6 @@ private Dataset loadDataFromPath(SparkSession spark, throw new SparkDppException("The schema of file and table must be equal. " + "file schema: " + sourceData.schema().treeString() + ", table schema: " + srcSchema.treeString()); } - scannedRowsAcc.add(sourceData.count()); // TODO: data quality check for orc/parquet load // Check process is roughly the same as the hive load, but there are some bugs to fix. // Uncomment below when method checkDataFromHiveWithStrictMode is ready. @@ -702,19 +702,19 @@ private Dataset loadDataFromPath(SparkSession spark, * then "*" will likely be cast to wrong output according to its field type such as NULL for INT * in {@link SparkDpp#convertSrcDataframeToDstDataframe} */ - private Dataset loadDataFromPath(SparkSession spark, - EtlJobConfig.EtlFileGroup fileGroup, - String fileUrl, - EtlJobConfig.EtlIndex baseIndex, - List columns) throws SparkDppException { - List columnValueFromPath = DppUtils.parseColumnsFromPath(fileUrl, fileGroup.columnsFromPath); + private Dataset loadDataFromPaths(SparkSession spark, + EtlJobConfig.EtlFileGroup fileGroup, + List fileUrls, + List columnValueFromPath, + EtlJobConfig.EtlIndex baseIndex, + List columns) throws SparkDppException { // for getting schema to check source data Map dstColumnNameToIndex = new HashMap(); for (int i = 0; i < baseIndex.columns.size(); i++) { dstColumnNameToIndex.put(baseIndex.columns.get(i).columnName, i); } StructType srcSchema = constructSrcSchema(fileGroup, baseIndex); - JavaRDD sourceDataRdd = spark.read().textFile(fileUrl).toJavaRDD(); + JavaRDD sourceDataRdd = spark.read().textFile(fileUrls.toArray(new String[] {})).toJavaRDD(); int columnSize = srcSchema.size() - columnValueFromPath.size(); List parsers = new ArrayList<>(); for (EtlJobConfig.EtlColumn column : baseIndex.columns) { @@ -976,20 +976,33 @@ private Dataset loadDataFromFilePaths(SparkSession spark, if (fileStatuses == null) { throw new SparkDppException("fs list status failed: " + filePath); } + Map, List> filesGroupByPathColumn = new HashMap<>(); for (FileStatus fileStatus : fileStatuses) { if (fileStatus.isDirectory()) { continue; } fileNumberAcc.add(1); fileSizeAcc.add(fileStatus.getLen()); + String dataFilePath = fileStatus.getPath().toString(); + List columnValuesFromPath = DppUtils.parseColumnsFromPath(dataFilePath, fileGroup.columnsFromPath); + List groupedFilePaths = filesGroupByPathColumn.getOrDefault(columnValuesFromPath, new ArrayList<>()); + groupedFilePaths.add(dataFilePath); + filesGroupByPathColumn.put(columnValuesFromPath, groupedFilePaths); + } + + for (Map.Entry, List> entry : filesGroupByPathColumn.entrySet()) { + List columnValuesFromPath = entry.getKey(); + List groupedFilePaths = entry.getValue(); + if (fileGroup.fileFormat != null && (StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc") || StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet"))) { - dataframe = loadDataFromPath(spark, fileGroup, fileStatus.getPath().toString(), baseIndex); + dataframe = loadDataFromPaths(spark, fileGroup, groupedFilePaths, columnValuesFromPath, baseIndex); } else { - dataframe = loadDataFromPath(spark, fileGroup, fileStatus.getPath().toString(), + dataframe = loadDataFromPaths(spark, fileGroup, groupedFilePaths, columnValuesFromPath, baseIndex, baseIndex.columns); } + dataframe = convertSrcDataframeToDstDataframe(baseIndex, dataframe, dstTableSchema, fileGroup); if (fileGroupDataframe == null) { fileGroupDataframe = dataframe; @@ -997,11 +1010,19 @@ private Dataset loadDataFromFilePaths(SparkSession spark, fileGroupDataframe = fileGroupDataframe.union(dataframe); } } + } catch (Exception e) { LOG.warn("parse path failed:" + filePath); throw e; } } + + if (fileGroup.fileFormat != null && + (StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "orc") || + StringUtils.equalsIgnoreCase(fileGroup.fileFormat, "parquet"))) { + scannedRowsAcc.add(fileGroupDataframe.count()); + } + return fileGroupDataframe; }