diff --git a/be/src/runtime/routine_load/data_consumer.cpp b/be/src/runtime/routine_load/data_consumer.cpp index 7eb96b8340f925..fa9b9eb7dfdc04 100644 --- a/be/src/runtime/routine_load/data_consumer.cpp +++ b/be/src/runtime/routine_load/data_consumer.cpp @@ -64,14 +64,13 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { auto conf_deleter = [conf]() { delete conf; }; DeferOp delete_conf([conf_deleter] { return conf_deleter(); }); - std::string group_id; auto it = ctx->kafka_info->properties.find("group.id"); if (it == ctx->kafka_info->properties.end()) { - group_id = BackendOptions::get_localhost() + "_" + UniqueId::gen_uid().to_string(); + _group_id = BackendOptions::get_localhost() + "_" + UniqueId::gen_uid().to_string(); } else { - group_id = it->second; + _group_id = it->second; } - LOG(INFO) << "init kafka consumer with group id: " << group_id; + LOG(INFO) << "init kafka consumer with group id: " << _group_id; std::string errstr; auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) { @@ -94,7 +93,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) { }; RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers)); - RETURN_IF_ERROR(set_conf("group.id", group_id)); + RETURN_IF_ERROR(set_conf("group.id", _group_id)); // For transaction producer, producer will append one control msg to the group of msgs, // but the control msg will not return to consumer, // so we can't to judge whether the consumption has been completed by offset comparison. @@ -270,10 +269,27 @@ Status KafkaDataConsumer::group_consume(TimedBlockingQueue* q } break; } + case RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED: { + LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); + done = true; + st = Status::InternalError(fmt::format( + "kafka consume failed, err: {}. You should add READ permission for this topic: {} in topic ACLs", + msg->errstr(), _topic)); + break; + } + case RdKafka::ERR_GROUP_AUTHORIZATION_FAILED: { + LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); + done = true; + st = Status::InternalError(fmt::format( + "kafka consume failed, err: {}. You should add or modify the consumer group '{}' with READ " + "permission for this topic in consumer group ACLs and set the routine load job with " + "`property.group.id` property", msg->errstr(), _group_id)); + break; + } default: LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr(); done = true; - st = Status::InternalError(msg->errstr()); + st = Status::InternalError(fmt::format("kafka consume failed, err: {}", msg->errstr())); break; } @@ -360,13 +376,20 @@ Status KafkaDataConsumer::get_partition_meta(std::vector* partition_ids continue; } - if ((*it)->err() == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART) { + err = (*it)->err(); + if (err == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART) { LOG(WARNING) << "unknown topic: " << _topic; return Status::InternalError(fmt::format("unknown topic: {}", _topic)); - } else if ((*it)->err() != RdKafka::ERR_NO_ERROR) { + } else if (err == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) { + std::stringstream ss; + ss << "failed to get kafka topic meta, err: " << RdKafka::err2str(err) + << ". You should add READ permission for this topic: " << _topic << " in topic ACLs"; + LOG(WARNING) << ss.str(); + return Status::InternalError(ss.str()); + } else if (err != RdKafka::ERR_NO_ERROR) { std::stringstream ss; - ss << "err: " << err2str((*it)->err()); - if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) { + ss << "err: " << err2str(err); + if (err == RdKafka::ERR_LEADER_NOT_AVAILABLE) { ss << ", try again"; } LOG(WARNING) << ss.str(); @@ -424,7 +447,8 @@ Status KafkaDataConsumer::commit(const std::string& topic, const std::mapcommitSync(topic_partitions); if (err != RdKafka::ERR_NO_ERROR) { std::stringstream ss; - ss << "failed to commit kafka offset : " << RdKafka::err2str(err); + ss << "failed to commit kafka offset, topic: " << topic << ", group id: " << _group_id << ", err: " + << RdKafka::err2str(err); return Status::InternalError(ss.str()); } return Status::OK(); diff --git a/be/src/runtime/routine_load/data_consumer.h b/be/src/runtime/routine_load/data_consumer.h index e0277238ecd379..bcd5e6e0b91ee6 100644 --- a/be/src/runtime/routine_load/data_consumer.h +++ b/be/src/runtime/routine_load/data_consumer.h @@ -181,6 +181,7 @@ class KafkaDataConsumer : public DataConsumer { private: std::string _brokers; std::string _topic; + std::string _group_id; std::unordered_map _custom_properties; size_t _non_eof_partition_count = 0;