Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50286][SQL] Correctly propagate SQL options to WriteBuilder #48822

Open
wants to merge 3 commits into
base: master
Choose a base branch
from

Conversation

pan3793
Copy link
Member

@pan3793 pan3793 commented Nov 12, 2024

What changes were proposed in this pull request?

SPARK-49098 introduced a SQL syntax to allow users to set table options on DSv2 write cases, but unfortunately, the options set by SQL are not propagated correctly to the underlying DSv2 WriteBuilder

INSERT INTO $t1 WITH (`write.split-size` = 10) SELECT ...
df.writeTo(t1).option("write.split-size", "10").append()

From the user's perspective, the above two are equivalent, but internal implementations differ slightly. Both of them are going to construct an

AppendData(r: DataSourceV2Relation, ..., writeOptions, ...)

but the SQL options are carried by r.options, and the DataFrame API options are carried by writeOptions. Currently, only the latter is propagated to the WriteBuilder, and the former is silently dropped. This PR fixes the above issue by merging those two options.

An additional question: if the user only uses SQL or DataFrame API to construct the query, only one "options" will be filled, but if the user assembles LogicalPlan directly, there is a chance that r.options and writeOptions contain duplicated pairs, which one should take effect?

Why are the changes needed?

Correctly propagate SQL options to WriteBuilder, to complete the feature added in SPARK-49098, so that DSv2 implementations like Iceberg can benefit.

Does this PR introduce any user-facing change?

No, it's an unreleased feature.

How was this patch tested?

UTs added by SPARK-36680 and SPARK-49098 are updated also to check SQL options are correctly propagated to the physical plan

Was this patch authored or co-authored using generative AI tooling?

No.

@github-actions github-actions bot added the SQL label Nov 12, 2024
@pan3793 pan3793 marked this pull request as ready for review November 13, 2024 07:30
@pan3793
Copy link
Member Author

pan3793 commented Nov 13, 2024

@@ -44,7 +46,7 @@ object V2Writes extends Rule[LogicalPlan] with PredicateHelper {

override def apply(plan: LogicalPlan): LogicalPlan = plan transformDown {
case a @ AppendData(r: DataSourceV2Relation, query, options, _, None, _) =>
val writeBuilder = newWriteBuilder(r.table, options, query.schema)
val writeBuilder = newWriteBuilder(r.table, r.options.asScala.toMap ++ options, query.schema)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we add an assert that only one of them can be non empty?

import org.apache.spark.sql.execution.CommandResultExec
import org.apache.spark.sql.execution.datasources.v2._

class DataSourceV2OptionSuite extends DatasourceV2SQLBase {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
class DataSourceV2OptionSuite extends DatasourceV2SQLBase {
class DataSourceV2OptionSQLSuite extends DatasourceV2SQLBase {

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

since this is testing SQL API only.

Copy link
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good catch!

Copy link
Member

@dongjoon-hyun dongjoon-hyun left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM. Thank you for the fix, @pan3793 .

Copy link
Contributor

@szehon-ho szehon-ho left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, did not realize this. Looks from @cloud-fan comment that only one can be set?

}
}

test("SPARK-36680, SPARK-50286: Supports Dynamic Table Options for SQL Insert Overwrite") {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is my fault, but we can optionally change the first JIRA's in these tests to SPARK-49098 as its the one that added the support to the inserts?

@pan3793
Copy link
Member Author

pan3793 commented Nov 14, 2024

... that only one can be set

@szehon-ho yes, as mentioned in the description, DataFrame's API options go writeOptions while SQL options go r.options, I think they won't be set together in normal cases, but it would be great if someone could double check that.

@pan3793
Copy link
Member Author

pan3793 commented Nov 14, 2024

Wait, I forget the SessionConfigSupport

In fact, I submitted a PR to Iceberg to support this feature, but unfortunately, this patch doesn't seem to be getting attention, @szehon-ho do you think we can re-open this PR and get it in? If so, the assumption would not hold.

... that only one can be set

and we should define the priority, I think it should be

  1. options from SQL
  2. options from DataFrame API
  3. options from session configuration

Currently, if there are duplicated options, 2 overrides 3, see

val finalOptions = sessionOptions.filter { case (k, _) => !optionsWithPath.contains(k) } ++
optionsWithPath.originalMap

@cloud-fan, do you think the proposed priority makes sense? or any new ideas?

@cloud-fan
Copy link
Contributor

yea 3 should have lower priority.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants