Skip to content

Commit

Permalink
[BugFix] Fix thrift rpc not reopen after failed (#49619)
Browse files Browse the repository at this point in the history
## Why I'm doing:

When an error occurs on rpc. client->reopen needs to be called, otherwise client parsing will return to the connection pool. This causes other rpc's to fail.
```
    ~ClientConnection() {
        if (_client != nullptr) {
            _client_cache->release_client(&_client);
        }
    }
```

This programming model is very bad, but this PR needs backport. i will refactor the client cache related logic in the next PR.

## What I'm doing:

Fixes case:
```
W0805 16:55:14.940848  1227 thrift_rpc_helper.cpp:86] call frontend service failed, address=TNetworkAddress(=****/), port=9020), reason=invalid TType
```

reproduce case:

apply this patch to FE
```
diff --git a/fe/fe-core/src/main/java/com/starrocks/common/Config.java b/fe/fe-core/src/main/java/com/starrocks/common/Config.java
index 750c9e759c..f45512f424 100644
--- a/fe/fe-core/src/main/java/com/starrocks/common/Config.java
+++ b/fe/fe-core/src/main/java/com/starrocks/common/Config.java
@@ -1303,6 +1303,9 @@ public class Config extends ConfigBase {
     @ConfField
     public static boolean enable_udf = false;

+    @ConfField(mutable = true)
+    public static int sleep_times = 30;
+
     @ConfField(mutable = true)
     public static boolean enable_decimal_v3 = true;

diff --git a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
index 5efbe38d83..af02ecbd7d 100644
--- a/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
+++ b/fe/fe-core/src/main/java/com/starrocks/service/FrontendServiceImpl.java
@@ -589,6 +589,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {
         if (!params.isSetUser_ident()) {
             throw new TException("missed user_identity");
         }
+        if (Config.sleep_times > 0) {
+            try {
+                Thread.sleep(1000 * Config.sleep_times);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
         // TODO: check privilege
         UserIdentity userIdentity = UserIdentity.fromThrift(params.getUser_ident());

@@ -1146,6 +1153,13 @@ public class FrontendServiceImpl implements FrontendService.Iface {

     @OverRide
     public TReportExecStatusResult reportExecStatus(TReportExecStatusParams params) throws TException {
+        if (Config.sleep_times > 0) {
+            try {
+                Thread.sleep(1000 * Config.sleep_times);
+            } catch (InterruptedException e) {
+                throw new RuntimeException(e);
+            }
+        }
         return QeProcessorImpl.INSTANCE.reportExecStatus(params, getClientAddr());
     }
```
then send the query multi times.
we will see be:
```
W20240808 21:01:52.651160 139898530641472 pipeline_driver_executor.cpp:346] [Driver] Fail to report exec state: fragment_instance_id=38ad1356-5586-11ef-a78f-c26b621cd046, status: Internal erro
r: ReportExecStatus() to TNetworkAddress(hostname=172.17.0.1, port=8505) failed:
THRIFT_EAGAIN (timed out), retry_times=1
W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple
ase retry later
W20240808 21:01:52.789011 139897653958208 thrift_rpc_helper.cpp:135] Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=172.17.0.1, port=8505).The FE may be busy, ple
ase retry later
```

Signed-off-by: stdpain <[email protected]>
(cherry picked from commit c8bd900)
  • Loading branch information
stdpain authored and mergify[bot] committed Aug 9, 2024
1 parent 177aaf1 commit 8cfd732
Show file tree
Hide file tree
Showing 6 changed files with 9 additions and 8 deletions.
1 change: 1 addition & 0 deletions be/src/exec/pipeline/audit_statistics_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatis

rpc_status = Status(res.status);
} catch (TException& e) {
(void)coord.reopen(config::thrift_rpc_timeout_ms);
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/exec_state_reporter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para

rpc_status = Status(res.status);
} catch (TException& e) {
(void)coord.reopen(config::thrift_rpc_timeout_ms);
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
Expand Down Expand Up @@ -293,6 +294,7 @@ Status ExecStateReporter::report_epoch(const TMVMaintenanceTasks& params, ExecEn

rpc_status = Status::OK();
} catch (TException& e) {
(void)coord.reopen(config::thrift_rpc_timeout_ms);
std::stringstream msg;
msg << "mvReport() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
Expand Down
2 changes: 2 additions & 0 deletions be/src/exec/pipeline/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -643,6 +643,7 @@ void QueryContextManager::report_fragments(

VLOG_ROW << "debug: reportExecStatus params is " << apache::thrift::ThriftDebugString(params).c_str();

// TODO: refactor me
try {
try {
fe_connection->batchReportExecStatus(res, report_batch);
Expand All @@ -658,6 +659,7 @@ void QueryContextManager::report_fragments(
}

} catch (TException& e) {
(void)fe_connection.reopen(config::thrift_rpc_timeout_ms);
std::stringstream msg;
msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
Expand Down
2 changes: 2 additions & 0 deletions be/src/runtime/fragment_mgr.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil

rpc_status = Status(res.status);
} catch (TException& e) {
(void)coord.reopen(config::thrift_rpc_timeout_ms);
std::stringstream msg;
msg << "ReportExecStatus() to " << _coord_addr << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
Expand Down Expand Up @@ -763,6 +764,7 @@ void FragmentMgr::report_fragments(const std::vector<TUniqueId>& non_pipeline_ne
}

} catch (TException& e) {
(void)fe_connection.reopen(config::thrift_rpc_timeout_ms);
std::stringstream msg;
msg << "ReportExecStatus() to " << fragment_exec_state->coord_addr() << " failed:\n" << e.what();
LOG(WARNING) << msg.str();
Expand Down
6 changes: 0 additions & 6 deletions be/src/util/thrift_rpc_helper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,12 +67,6 @@ Status ThriftRpcHelper::rpc_impl(std::function<void(ClientConnection<FrontendSer
try {
callback(client);
return Status::OK();
} catch (apache::thrift::protocol::TProtocolException& e) {
if (e.getType() == apache::thrift::protocol::TProtocolException::TProtocolExceptionType::INVALID_DATA) {
ss << "FE RPC response parsing failure, address=" << address << ".The FE may be busy, please retry later";
} else {
ss << "FE RPC failure, address=" << address << ", reason=" << e.what();
}
} catch (apache::thrift::TException& e) {
ss << "FE RPC failure, address=" << address << ", reason=" << e.what();
}
Expand Down
4 changes: 2 additions & 2 deletions be/test/util/thrift_rpc_helper_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@ TEST_F(ThriftRpcHelperTest, fe_rpc_impl) {
client, addr);
EXPECT_STATUS(Status::ThriftRpcError(""), st);
EXPECT_EQ(
"Rpc error: FE RPC response parsing failure, address=TNetworkAddress(hostname=127.0.0.1, "
"port=9020).The FE may be busy, please retry later",
"Rpc error: FE RPC failure, address=TNetworkAddress(hostname=127.0.0.1, port=9020), reason=invalid "
"TType",
st.to_string());
}
{
Expand Down

0 comments on commit 8cfd732

Please sign in to comment.