-
Notifications
You must be signed in to change notification settings - Fork 1.8k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[BugFix] Fix thrift rpc not reopen after failed #49619
Merged
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Seaven
previously approved these changes
Aug 8, 2024
murphyatwork
previously approved these changes
Aug 8, 2024
wyb
reviewed
Aug 8, 2024
kevincai
previously approved these changes
Aug 8, 2024
stdpain
dismissed stale reviews from kevincai, murphyatwork, and Seaven
via
August 9, 2024 01:57
6eaacca
stdpain
force-pushed
the
fix_thrift_rpc_failed
branch
2 times, most recently
from
August 9, 2024 02:00
6eaacca
to
9c9deda
Compare
wyb
previously approved these changes
Aug 9, 2024
Signed-off-by: stdpain <[email protected]>
stdpain
force-pushed
the
fix_thrift_rpc_failed
branch
from
August 9, 2024 03:52
9c9deda
to
1cc15dd
Compare
[FE Incremental Coverage Report]✅ pass : 0 / 0 (0%) |
[BE Incremental Coverage Report]❌ fail : 0 / 6 (00.00%) file detail
|
@Mergifyio backport branch-3.3 |
@Mergifyio backport branch-3.2 |
@Mergifyio backport branch-3.1 |
@Mergifyio backport branch-3.0 |
✅ Backports have been created
|
✅ Backports have been created
|
✅ Backports have been created
|
✅ Backports have been created
|
mergify bot
pushed a commit
that referenced
this pull request
Aug 9, 2024
## Why I'm doing: When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail. ``` ~ClientConnection() { if (_client != nullptr) { _client_cache->release_client(&_client); } } ``` This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR. ## What I'm doing: Fixes case: ``` W0805 16:55:14.940848 1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType ``` reproduce case: apply this patch to FE ``` diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 750c9e759c..f45512f424 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1303,6 +1303,9 @@ public class Config extends ConfigBase { @ConfField public static boolean enable_udf = false; + @ConfField(mutable = true) + public static int sleep_times = 30; + @ConfField(mutable = true) public static boolean enable_decimal_v3 = true; diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 5efbe38d83..af02ecbd7d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (!params.isSetUser_ident()) { throw new TException("missed user_identity"); } + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } // TODO: check privilege UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident()); @@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @OverRide public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException { + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr()); } ``` then send the query multi times. we will see be: ``` W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed: THRIFT_EAGAIN (timed out), retry_times=1 W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later ``` Signed-off-by: stdpain <[email protected]> (cherry picked from commit c8bd900)
44 tasks
mergify bot
pushed a commit
that referenced
this pull request
Aug 9, 2024
## Why I'm doing: When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail. ``` ~ClientConnection() { if (_client != nullptr) { _client_cache->release_client(&_client); } } ``` This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR. ## What I'm doing: Fixes case: ``` W0805 16:55:14.940848 1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType ``` reproduce case: apply this patch to FE ``` diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 750c9e759c..f45512f424 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1303,6 +1303,9 @@ public class Config extends ConfigBase { @ConfField public static boolean enable_udf = false; + @ConfField(mutable = true) + public static int sleep_times = 30; + @ConfField(mutable = true) public static boolean enable_decimal_v3 = true; diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 5efbe38d83..af02ecbd7d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (!params.isSetUser_ident()) { throw new TException("missed user_identity"); } + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } // TODO: check privilege UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident()); @@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @OverRide public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException { + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr()); } ``` then send the query multi times. we will see be: ``` W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed: THRIFT_EAGAIN (timed out), retry_times=1 W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later ``` Signed-off-by: stdpain <[email protected]> (cherry picked from commit c8bd900)
mergify bot
pushed a commit
that referenced
this pull request
Aug 9, 2024
## Why I'm doing: When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail. ``` ~ClientConnection() { if (_client != nullptr) { _client_cache->release_client(&_client); } } ``` This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR. ## What I'm doing: Fixes case: ``` W0805 16:55:14.940848 1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType ``` reproduce case: apply this patch to FE ``` diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 750c9e759c..f45512f424 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1303,6 +1303,9 @@ public class Config extends ConfigBase { @ConfField public static boolean enable_udf = false; + @ConfField(mutable = true) + public static int sleep_times = 30; + @ConfField(mutable = true) public static boolean enable_decimal_v3 = true; diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 5efbe38d83..af02ecbd7d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (!params.isSetUser_ident()) { throw new TException("missed user_identity"); } + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } // TODO: check privilege UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident()); @@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @OverRide public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException { + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr()); } ``` then send the query multi times. we will see be: ``` W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed: THRIFT_EAGAIN (timed out), retry_times=1 W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later ``` Signed-off-by: stdpain <[email protected]> (cherry picked from commit c8bd900) # Conflicts: # be/src/util/thrift_rpc_helper.cpp # be/test/util/thrift_rpc_helper_test.cpp
This was referenced Aug 9, 2024
mergify bot
pushed a commit
that referenced
this pull request
Aug 9, 2024
## Why I'm doing: When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail. ``` ~ClientConnection() { if (_client != nullptr) { _client_cache->release_client(&_client); } } ``` This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR. ## What I'm doing: Fixes case: ``` W0805 16:55:14.940848 1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType ``` reproduce case: apply this patch to FE ``` diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 750c9e759c..f45512f424 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1303,6 +1303,9 @@ public class Config extends ConfigBase { @ConfField public static boolean enable_udf = false; + @ConfField(mutable = true) + public static int sleep_times = 30; + @ConfField(mutable = true) public static boolean enable_decimal_v3 = true; diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 5efbe38d83..af02ecbd7d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (!params.isSetUser_ident()) { throw new TException("missed user_identity"); } + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } // TODO: check privilege UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident()); @@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @OverRide public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException { + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr()); } ``` then send the query multi times. we will see be: ``` W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed: THRIFT_EAGAIN (timed out), retry_times=1 W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later ``` Signed-off-by: stdpain <[email protected]> (cherry picked from commit c8bd900) # Conflicts: # be/src/util/thrift_rpc_helper.cpp # be/test/util/thrift_rpc_helper_test.cpp
44 tasks
wanpengfei-git
pushed a commit
that referenced
this pull request
Aug 9, 2024
…9645) Co-authored-by: stdpain <[email protected]>
wanpengfei-git
pushed a commit
that referenced
this pull request
Aug 9, 2024
…9644) Co-authored-by: stdpain <[email protected]>
stdpain
added a commit
that referenced
this pull request
Aug 19, 2024
When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail. ``` ~ClientConnection() { if (_client != nullptr) { _client_cache->release_client(&_client); } } ``` This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR. Fixes case: ``` W0805 16:55:14.940848 1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType ``` reproduce case: apply this patch to FE ``` diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java index 750c9e759c..f45512f424 100644 --- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java +++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java @@ -1303,6 +1303,9 @@ public class Config extends ConfigBase { @ConfField public static boolean enable_udf = false; + @ConfField(mutable = true) + public static int sleep_times = 30; + @ConfField(mutable = true) public static boolean enable_decimal_v3 = true; diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java index 5efbe38d83..af02ecbd7d 100644 --- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java +++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java @@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { if (!params.isSetUser_ident()) { throw new TException("missed user_identity"); } + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } // TODO: check privilege UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident()); @@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface { @OverRide public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException { + if (Config.sleep_times > 0) { + try { + Thread.sleep(1000 * Config.sleep_times); + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr()); } ``` then send the query multi times. we will see be: ``` W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed: THRIFT_EAGAIN (timed out), retry_times=1 W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple ase retry later ``` Signed-off-by: stdpain <[email protected]> (cherry picked from commit c8bd900) Signed-off-by: stdpain <[email protected]>
wanpengfei-git
pushed a commit
that referenced
this pull request
Aug 19, 2024
…9646) Signed-off-by: stdpain <[email protected]> Co-authored-by: stdpain <[email protected]>
24 tasks
This was referenced Aug 27, 2024
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Why I'm doing:
When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail.
This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR.
What I'm doing:
Fixes case:
reproduce case:
apply this patch to FE
then send the query multi times.
we will see be:
What type of PR is this:
Does this PR entail a change in behavior?
If yes, please specify the type of change:
Checklist:
Bugfix cherry-pick branch check:
Documentation PRs only:
If you are submitting a PR that adds or changes English documentation and have not
included Chinese documentation, then you can check the box to request GPT to translate the
English doc to Chinese. Please ensure to uncheck the Do not translate box if translation is needed.
The workflow will generate a new PR with the Chinese translation after this PR is merged.