diff --git a/be/src/exec/pipeline/audit_statistics_reporter.cpp b/be/src/exec/pipeline/audit_statistics_reporter.cpp index 9263e7c022e55..d0f316c288b1d 100644 --- a/be/src/exec/pipeline/audit_statistics_reporter.cpp +++ b/be/src/exec/pipeline/audit_statistics_reporter.cpp @@ -77,6 +77,7 @@ Status AuditStatisticsReporter::report_audit_statistics(const TReportAuditStatis rpc_status = Status(res.status); } catch (TException& e) { + (void)coord.reopen(config::thrift_rpc_timeout_ms); std::stringstream msg; msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what(); LOG(WARNING) << msg.str(); diff --git a/be/src/exec/pipeline/exec_state_reporter.cpp b/be/src/exec/pipeline/exec_state_reporter.cpp index a9c427107de4e..62f9a95f58e3e 100644 --- a/be/src/exec/pipeline/exec_state_reporter.cpp +++ b/be/src/exec/pipeline/exec_state_reporter.cpp @@ -181,6 +181,7 @@ Status ExecStateReporter::report_exec_status(const TReportExecStatusParams& para rpc_status = Status(res.status); } catch (TException& e) { + (void)coord.reopen(config::thrift_rpc_timeout_ms); std::stringstream msg; msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what(); LOG(WARNING) << msg.str(); @@ -293,6 +294,7 @@ Status ExecStateReporter::report_epoch(const TMVMaintenanceTasks& params, ExecEn rpc_status = Status::OK(); } catch (TException& e) { + (void)coord.reopen(config::thrift_rpc_timeout_ms); std::stringstream msg; msg << "mvReport() to " << fe_addr << " failed:\n" << e.what(); LOG(WARNING) << msg.str(); diff --git a/be/src/exec/pipeline/query_context.cpp b/be/src/exec/pipeline/query_context.cpp index 5488dbfae2496..3dd558e86e1a0 100644 --- a/be/src/exec/pipeline/query_context.cpp +++ b/be/src/exec/pipeline/query_context.cpp @@ -639,6 +639,7 @@ void QueryContextManager::report_fragments( VLOG_ROW << "debug: reportExecStatus params is " << apache::thrift::ThriftDebugString(params).c_str(); + // TODO: refactor me try { try { fe_connection->batchReportExecStatus(res, report_batch); @@ -652,6 +653,7 @@ void QueryContextManager::report_fragments( } } catch (TException& e) { + (void)fe_connection.reopen(config::thrift_rpc_timeout_ms); std::stringstream msg; msg << "ReportExecStatus() to " << fe_addr << " failed:\n" << e.what(); LOG(WARNING) << msg.str(); diff --git a/be/src/runtime/fragment_mgr.cpp b/be/src/runtime/fragment_mgr.cpp index 7e161466c1e20..438c1cd27797b 100644 --- a/be/src/runtime/fragment_mgr.cpp +++ b/be/src/runtime/fragment_mgr.cpp @@ -339,6 +339,7 @@ void FragmentExecState::coordinator_callback(const Status& status, RuntimeProfil rpc_status = Status(res.status); } catch (TException& e) { + (void)coord.reopen(config::thrift_rpc_timeout_ms); std::stringstream msg; msg << "ReportExecStatus() to " << _coord_addr << " failed:\n" << e.what(); LOG(WARNING) << msg.str(); @@ -763,6 +764,7 @@ void FragmentMgr::report_fragments(const std::vector& non_pipeline_ne } } catch (TException& e) { + (void)fe_connection.reopen(config::thrift_rpc_timeout_ms); std::stringstream msg; msg << "ReportExecStatus() to " << fragment_exec_state->coord_addr() << " failed:\n" << e.what(); LOG(WARNING) << msg.str();