Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-50130][SQL][FOLLOWUP] Simplify the resolution of LazyOuterReference #48820

Open
wants to merge 2 commits into
base: master
Choose a base branch
from

Conversation

cloud-fan
Copy link
Contributor

@cloud-fan cloud-fan commented Nov 12, 2024

What changes were proposed in this pull request?

This is a followup of #48664 to simplify the code. The new workflow is:

  1. The Column API creates LazyOuterReference
  2. QueryExecution does lazy analysis if its main query contains LazyOuterReference. Eager analysis is still performed if only subquery expressions contain LazyOuterReference.
  3. The column resolution framework is updated to resolve LazyOuterReference

After this simplification, we no longer need the special logic to strip LazyOuterReference in the DataFrame side. We no longer need the extra flag in the subquery expressions.

Why are the changes needed?

cleanup

Does this PR introduce any user-facing change?

no

How was this patch tested?

existing tests

Was this patch authored or co-authored using generative AI tooling?

no

@github-actions github-actions bot added the SQL label Nov 12, 2024
@cloud-fan
Copy link
Contributor Author

Copy link
Member

@ueshin ueshin left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan Thanks for taking a look!

I forgot to add one test case in #48664 we also want it to fail (#48828):

from pyspark.sql import functions as sf

l = spark.createDataFrame(
    [(1, 1.0), (1, 2.0), (2, 1.0), (2, 2.0), (3, 3.0), (None, None), (None, 5.0), (6, None)],
    ["a", "b"],
)

r = spark.createDataFrame(
    [(2, 3.0), (2, 3.0), (3, 2.0), (4, 1.0), (None, None), (None, 5.0), (6, None)],
    ["c", "d"],
)

l.select(
    "a",
    (
        r
        .where(sf.col("b") == sf.col("a").outer())
        .select(sf.sum("d"))
        .scalar()
    ),
).show()

This query should fail because sf.col("b") needs .outer(), but it passes with this change. cc @allisonwang-db

lazy val isLazyAnalysis: Boolean = logical.containsAnyPattern(LAZY_ANALYSIS_EXPRESSION)
lazy val isLazyAnalysis: Boolean = {
// Only check the main query as we can resolve LazyOuterReference inside subquery expressions.
logical.exists(_.expressions.exists(_.exists(_.isInstanceOf[LazyOuterReference])))
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh, logical.containsAnyPattern(LAZY_ANALYSIS_EXPRESSION) will check subqueries?
I also intended to check the main query. Thanks for the correction!

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: it should be LazyAnalysisExpression, although currently only LazyOuterReference?

def name: String =
nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")

def name: String = nameParts.map(quoteIfNeeded).mkString(".")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

def name: String =
nameParts.map(n => if (n.contains(".")) s"`$n`" else n).mkString(".")

def name: String = nameParts.map(quoteIfNeeded).mkString(".")
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍

@cloud-fan
Copy link
Contributor Author

@ueshin After thinking more about it, I think the new test case should pass instead of fail. My opinion is:

  1. In SQL, users can just write un-qualified column names to reference the outer plan. We should allow the same in DataFrame API.
  2. Eventually, Spark Connect is the main API and DataFrame is always lazy. Then .outer() is only needed to avoid ambiguity when there are column name conflicts, to explicitly reference the outer plan only.

case u: UnresolvedAttribute =>
resolveOuterReference(u.nameParts, outerPlan.get).getOrElse(u)
resolve(u.nameParts).getOrElse(u)
case u: LazyOuterReference =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the new change. Other changes are just reverting back the previous changes.

@ueshin
Copy link
Member

ueshin commented Nov 13, 2024

@cloud-fan

In SQL, users can just write un-qualified column names to reference the outer plan. We should allow the same in DataFrame API.

In that case, sf.col("a") should also be allowed instead of sf.col("a").outer() in the above example?

l.select(
    "a",
    (
        r
        .where(sf.col("b") == sf.col("a"))
        .select(sf.sum("d"))
        .scalar()
    ),
).show()

otherwise, users may not know why it's necessary for a, but not for b.

So far we do need at least one .outer() to make the analysis lazy, it should have a consistent meaning; otherwise we need another way to make the analysis lazy.

cc @allisonwang-db who suggested the current outer() behavior.

@cloud-fan
Copy link
Contributor Author

@ueshin I think .where(sf.col("b") == sf.col("a")) should be allowed in Spark Connect. For now we need at lease one .outer() to trigger lazy analysis for Classic Spark, but it's not really a problem for Spark Connect.

@ueshin
Copy link
Member

ueshin commented Nov 14, 2024

@cloud-fan Yes, Spark Connect is easy to support that. It doesn't even need LazyOuterReference and UnresolvedOuterReference. I prototyped with Spark Connect.
I'm just wondering if we can introduce this difference between classic and connect. I think no. They should have the same behavior.

@cloud-fan
Copy link
Contributor Author

I think it's fine for Classic Spark to have more limitations. We are moving users from Classic Spark to Spark Connect, not the other direction. This also makes DataFrame more consistent with SQL.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants