-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[Enhancement] combined count when loadfile in dpp #34787
Conversation
@@ -1297,7 +1318,7 @@ private void writeDppResult(DppResult dppResult) throws Exception { | |||
URI uri = new URI(outputPath); | |||
Path filePath = new Path(resultFilePath); | |||
try (FileSystem fs = FileSystem.get(uri, serializableHadoopConf.value()); | |||
FSDataOutputStream outputStream = fs.create(filePath)) { | |||
FSDataOutputStream outputStream = fs.create(filePath)) { | |||
Gson gson = new Gson(); | |||
outputStream.write(gson.toJson(dppResult).getBytes()); | |||
outputStream.write('\n'); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The most risky bug in this code is:
There could be unexpected behavior when counting the number of rows with scannedRowsAcc.add(fileGroupDataframe.count());
due to a costly action being triggered on the Spark DataFrame within a transformation process. This line will force the evaluation of all transformations applied to the fileGroupDataframe
up to this point, which can be highly inefficient if the DataFrame is large as it triggers a full scan of the data.
You can modify the code like this:
// Move the row count until after the transformations are complete and an action is required.
// Thus, avoiding triggering multiple actions unnecessarily.
// The exact spot to place the count action depends on the broader context of how the produced DataFrame is utilized downstream.
// Ensure there's a valid action where the result of the dataframe needs to be materialized before counting the rows.
Note that without additional context on how fileGroupDataframe
is used later on, I cannot provide an exact location in the code where .count()
should be placed. It's important that you review the logic to ensure scannedRowsAcc.add(...)
is only called at a point where evaluating the DataFrame is unavoidable or beneficial for subsequent operations (like caching or writing to disk).
fe/spark-dpp/src/main/java/com/starrocks/load/loadv2/etl/EtlJobConfig.java
Outdated
Show resolved
Hide resolved
f074eae
to
bfabd7a
Compare
} catch (Exception e) { | ||
LOG.warn("parse path failed:" + filePath); | ||
throw e; | ||
} | ||
} | ||
|
||
if (fileGroup.fileFormat != null && |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit:
Set<String> formats = new HashSet<>(Arrays.asList("orc", "parquet"));
if (fileGroup.fileFormat != null && formats.contains(fileGroup.fileFormat.toLowerCase())) {
scannedRowsAcc.add(fileGroupDataframe.count());
}
Head branch was pushed to by a user without write access
7f2b4f0
to
0f4ef16
Compare
Signed-off-by: kevin wan <[email protected]>
0f4ef16
to
5101e40
Compare
Kudos, SonarCloud Quality Gate passed! 0 Bugs 0.0% Coverage The version of Java (11.0.21) you have used to run this analysis is deprecated and we will stop accepting it soon. Please update to at least Java 17. |
@Mergifyio backport branch-3.2 |
@Mergifyio backport branch-3.1 |
@Mergifyio backport branch-3.0 |
@Mergifyio backport branch-2.5 |
✅ Backports have been created
|
✅ Backports have been created
|
✅ Backports have been created
|
✅ Backports have been created
|
Signed-off-by: kevin wan <[email protected]> Co-authored-by: mingge <[email protected]> (cherry picked from commit 154ae93)
Signed-off-by: kevin wan <[email protected]> Co-authored-by: mingge <[email protected]> (cherry picked from commit 154ae93)
Signed-off-by: kevin wan <[email protected]> Co-authored-by: mingge <[email protected]> (cherry picked from commit 154ae93)
Signed-off-by: kevin wan <[email protected]> Co-authored-by: mingge <[email protected]> (cherry picked from commit 154ae93)
[FE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[BE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
Signed-off-by: kevin wan <[email protected]> Co-authored-by: mingge <[email protected]> (cherry picked from commit 154ae93)
Signed-off-by: kevin wan <[email protected]> Co-authored-by: mingge <[email protected]> (cherry picked from commit 154ae93)
Signed-off-by: kevin wan <[email protected]> Co-authored-by: mingge <[email protected]> (cherry picked from commit 154ae93)
Signed-off-by: kevin wan <[email protected]> Co-authored-by: mingge <[email protected]> (cherry picked from commit 154ae93)
Fixes #14292
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check: