Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[BugFix] Fix thrift rpc not reopen after failed (backport #49619) #49646

Merged
merged 1 commit into from
Aug 19, 2024

Conversation

mergify[bot]
Copy link
Contributor

@mergify mergify bot commented Aug 9, 2024

Why I'm doing:

When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail.

    ~ClientConnection() {
        if (_client != nullptr) {
            _client_cache->release_client(&_client);
        }
    }

This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR.

What I'm doing:

Fixes case:

W0805 16:55:14.940848  1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType

reproduce case:

apply this patch to FE

diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java
index 750c9e759c..f45512f424 100644
--- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java
+++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java
@@ -1303,6 +1303,9 @@ public class Config extends ConfigBase {
     @ConfField
     public static boolean enable_udf = false;
 
+    @ConfField(mutable = true)
+    public static int sleep_times = 30;
+
     @ConfField(mutable = true)
     public static boolean enable_decimal_v3 = true;
 
diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
index 5efbe38d83..af02ecbd7d 100644
--- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
@@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         if (!params.isSetUser_ident()) {
             throw new TException("missed user_identity");
         }
+        if (Config.sleep_times > 0) {
+            try {
+                Thread.sleep(1000 * Config.sleep_times);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
         // TODO: check privilege
         UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident());
 
@@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
 
     @Override
     public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException {
+        if (Config.sleep_times > 0) {
+            try {
+                Thread.sleep(1000 * Config.sleep_times);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
         return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr());
     }

then send the query multi times.
we will see be:

W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro
r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed:
THRIFT_EAGAIN (timed out), retry_times=1
W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple
ase retry later
W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple
ase retry later

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

Bugfix cherry-pick branch check:

  • I have checked the version labels which the pr will be auto-backported to the target branch
    • 3.3
    • 3.2
    • 3.1
    • 3.0
    • 2.5

Documentation PRs only:

If you are submitting a PR that adds or changes English documentation and have not
included Chinese documentation, then you can check the box to request GPT to translate the
English doc to Chinese. Please ensure to uncheck the Do not translate box if translation is needed.
The workflow will generate a new PR with the Chinese translation after this PR is merged.

  • Yes, translate English markdown files with GPT
  • Do not translate

This is an automatic backport of pull request #49619 done by [Mergify](https://mergify.com). ## Why I'm doing:

When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail.

    ~ClientConnection() {
        if (_client != nullptr) {
            _client_cache->release_client(&_client);
        }
    }

This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR.

What I'm doing:

Fixes case:

W0805 16:55:14.940848  1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType

reproduce case:

apply this patch to FE

diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java
index 750c9e759c..f45512f424 100644
--- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java
+++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java
@@ -1303,6 +1303,9 @@ public class Config extends ConfigBase {
     @ConfField
     public static boolean enable_udf = false;
 
+    @ConfField(mutable = true)
+    public static int sleep_times = 30;
+
     @ConfField(mutable = true)
     public static boolean enable_decimal_v3 = true;
 
diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
index 5efbe38d83..af02ecbd7d 100644
--- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
@@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         if (!params.isSetUser_ident()) {
             throw new TException("missed user_identity");
         }
+        if (Config.sleep_times > 0) {
+            try {
+                Thread.sleep(1000 * Config.sleep_times);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
         // TODO: check privilege
         UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident());
 
@@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
 
     @Override
     public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException {
+        if (Config.sleep_times > 0) {
+            try {
+                Thread.sleep(1000 * Config.sleep_times);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
         return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr());
     }

then send the query multi times.
we will see be:

W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro
r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed:
THRIFT_EAGAIN (timed out), retry_times=1
W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple
ase retry later
W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple
ase retry later

What type of PR is this:

  • BugFix
  • Feature
  • Enhancement
  • Refactor
  • UT
  • Doc
  • Tool

Does this PR entail a change in behavior?

  • Yes, this PR will result in a change in behavior.
  • No, this PR will not result in a change in behavior.

If yes, please specify the type of change:

  • Interface/UI changes: syntax, type conversion, expression evaluation, display information
  • Parameter changes: default values, similar parameters but with different default values
  • Policy changes: use new policy to replace old one, functionality automatically enabled
  • Feature removed
  • Miscellaneous: upgrade & downgrade compatibility, etc.

Checklist:

  • I have added test cases for my bug fix or my new feature
  • This pr needs user documentation (for new or modified features or behaviors)
    • I have added documentation for my new feature or new function
  • This is a backport pr

@mergify mergify bot added the conflicts label Aug 9, 2024
Copy link
Contributor Author

mergify bot commented Aug 9, 2024

Cherry-pick of c8bd900 has failed:

On branch mergify/bp/branch-3.1/pr-49619
Your branch is up to date with 'origin/branch-3.1'.

You are currently cherry-picking commit c8bd9004d9.
  (fix conflicts and run "git cherry-pick --continue")
  (use "git cherry-pick --skip" to skip this patch)
  (use "git cherry-pick --abort" to cancel the cherry-pick operation)

Changes to be committed:
	modified:   be/src/exec/pipeline/audit_statistics_reporter.cpp
	modified:   be/src/exec/pipeline/exec_state_reporter.cpp
	modified:   be/src/exec/pipeline/query_context.cpp
	modified:   be/src/runtime/fragment_mgr.cpp

Unmerged paths:
  (use "git add/rm <file>..." as appropriate to mark resolution)
	both modified:   be/src/util/thrift_rpc_helper.cpp
	deleted by us:   be/test/util/thrift_rpc_helper_test.cpp

To fix up this pull request, you can check it out locally. See documentation: https://docs.github.com/en/pull-requests/collaborating-with-pull-requests/reviewing-changes-in-pull-requests/checking-out-pull-requests-locally

@wanpengfei-git wanpengfei-git enabled auto-merge (squash) August 9, 2024 06:19
@mergify mergify bot closed this Aug 9, 2024
auto-merge was automatically disabled August 9, 2024 06:19

Pull request was closed

Copy link
Contributor Author

mergify bot commented Aug 9, 2024

@mergify[bot]: Backport conflict, please reslove the conflict and resubmit the pr

@mergify mergify bot deleted the mergify/bp/branch-3.1/pr-49619 branch August 9, 2024 06:19
@stdpain stdpain restored the mergify/bp/branch-3.1/pr-49619 branch August 19, 2024 02:49
@stdpain stdpain reopened this Aug 19, 2024
@wanpengfei-git wanpengfei-git enabled auto-merge (squash) August 19, 2024 02:50
When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail.
```
    ~ClientConnection() {
        if (_client != nullptr) {
            _client_cache->release_client(&_client);
        }
    }
```

This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR.

Fixes case:
```
W0805 16:55:14.940848  1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType
```

reproduce case:

apply this patch to FE
```
diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java
index 750c9e759c..f45512f424 100644
--- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java
+++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java
@@ -1303,6 +1303,9 @@ public class Config extends ConfigBase {
     @ConfField
     public static boolean enable_udf = false;

+    @ConfField(mutable = true)
+    public static int sleep_times = 30;
+
     @ConfField(mutable = true)
     public static boolean enable_decimal_v3 = true;

diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
index 5efbe38d83..af02ecbd7d 100644
--- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
@@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         if (!params.isSetUser_ident()) {
             throw new TException("missed user_identity");
         }
+        if (Config.sleep_times > 0) {
+            try {
+                Thread.sleep(1000 * Config.sleep_times);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
         // TODO: check privilege
         UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident());

@@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {

     @OverRide
     public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException {
+        if (Config.sleep_times > 0) {
+            try {
+                Thread.sleep(1000 * Config.sleep_times);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
         return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr());
     }
```
then send the query multi times.
we will see be:
```
W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro
r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed:
THRIFT_EAGAIN (timed out), retry_times=1
W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple
ase retry later
W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple
ase retry later
```

Signed-off-by: stdpain <[email protected]>
(cherry picked from commit c8bd900)

Signed-off-by: stdpain <[email protected]>
@stdpain stdpain force-pushed the mergify/bp/branch-3.1/pr-49619 branch from c046669 to 649a3e6 Compare August 19, 2024 02:55
@wanpengfei-git wanpengfei-git merged commit b6f0072 into branch-3.1 Aug 19, 2024
31 checks passed
@wanpengfei-git wanpengfei-git deleted the mergify/bp/branch-3.1/pr-49619 branch August 19, 2024 03:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants