Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add pre_execution_queries parameter to run setup queries before main query on Postgres and MySQL source #729

Open
wants to merge 10 commits into
base: main
Choose a base branch
from

Conversation

jsjasonseba
Copy link

This PR attempts to solve: #728
Also related issues: #645 #471 #656 #512

This PR adds pre_execution_queries parameter to run setup queries before main query. SET statements are typically applied on session/connection scope, so the pre-execution queries must be applied to each connection on each partition. This is implemented by running the pre-execution queries using a connection before assigning the connection to SourcePartition. This will allow all queries run by each connection on each partition to have the necessary settings.

Currently implemented in Postgres and MySQL sources.

I am quite new with rust and I noticed this change is quite breaking to the rust API since rust does not support default args. Please let me know if there is a better way to do this.

@jsjasonseba
Copy link
Author

Hi @wangxiaoying, I have fixed the tests and applied formatting. Apologies for missing them earlier. Could you please rerun the workflow?

@wangxiaoying
Copy link
Contributor

Thank you @jsjasonseba for the PR! I will take a deeper look at it by this weekend.

Copy link
Contributor

@wangxiaoying wangxiaoying left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you @jsjasonseba for the PR, I think it looks good in general!

I left a few comments in the review. Please let me know if you have any questions!

@@ -107,6 +107,8 @@ where
self.origin_query = query;
}

fn set_pre_execution_queries(&mut self, _pre_execution_queries: Option<&[String]>) {}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

mark with unimplemented for unsupported sources?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow that's neat, just learned about this. Will implement them soon.

let mut conn = self.pool.get()?;

if let Some(queries) = &self.pre_execution_queries {
for query in queries {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rename the inner query (e.g., to pre_query) so that it won't be confused with the outer query variable?

dst: &'w mut D,
queries: &[Q],
origin_query: Option<String>,
pre_execution_queries: Option<&[String]>,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Instead of adding a new parameter to the constructor, maybe it is better to add a set_pre_execution_queries function? When using the dispatcher, we can:

  1. let dispatcher = Dispatcher::new(...); -- remain the same
  2. dispatcher.set_pre_execution_queries(...); --- optional, only set when needed
  3. dispatcher.run() -- remain the same

I think it could be a way to avoid the breaking change. What do you think?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, I think this a better approach. Thanks for the suggestion.

@@ -1238,4 +1238,33 @@ def test_postgres_partition_with_orderby_limit_desc(postgres_url: str) -> None:
},
)
df.sort_values(by="test_int", inplace=True, ignore_index=True)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We probably also need to add a test for applying pre_execution_queries with partitioning enabled. To make the test case easier, we can use query = [list of partitioned query] to directly specify the query for each partition. For example:

query = [
        "SELECT CAST(name AS TEXT) AS name, CAST(setting AS INTEGER) FROM pg_settings WHERE name = 'statement_timeout'", 
        "SELECT CAST(name AS TEXT) AS name, CAST(setting AS INTEGER) FROM pg_settings WHERE name = 'idle_in_transaction_session_timeout'"
]
pre_execution_query = pre_execution_query = [
        "SET SESSION statement_timeout = 2151",
        "SET SESSION idle_in_transaction_session_timeout = 2252",
    ]

df = read_sql(postgres_url, query, pre_execution_query=pre_execution_query)

expected = pd.DataFrame(
        index=range(2),
        data={
            "name": pd.Series(["statement_timeout", "idle_in_transaction_session_timeout"], dtype="str"),
            "setting": pd.Series([2151, 2252], dtype="Int64"),
        },
)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants