Skip to content

Commit

Permalink
[Enhancement] Improve kafka group authorization failed message in rou…
Browse files Browse the repository at this point in the history
…tine load (#46136)

Signed-off-by: wyb <[email protected]>
(cherry picked from commit 3ac2074)
  • Loading branch information
wyb authored and mergify[bot] committed Jul 17, 2024
1 parent 8df73ec commit d8e782a
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 11 deletions.
47 changes: 36 additions & 11 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,13 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
auto conf_deleter = [conf]() { delete conf; };
DeferOp delete_conf([conf_deleter] { return conf_deleter(); });

std::string group_id;
auto it = ctx->kafka_info->properties.find("group.id");
if (it == ctx->kafka_info->properties.end()) {
group_id = BackendOptions::get_localhost() + "_" + UniqueId::gen_uid().to_string();
_group_id = BackendOptions::get_localhost() + "_" + UniqueId::gen_uid().to_string();
} else {
group_id = it->second;
_group_id = it->second;
}
LOG(INFO) << "init kafka consumer with group id: " << group_id;
LOG(INFO) << "init kafka consumer with group id: " << _group_id;

std::string errstr;
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
Expand All @@ -94,7 +93,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
};

RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("group.id", group_id));
RETURN_IF_ERROR(set_conf("group.id", _group_id));
// For transaction producer, producer will append one control msg to the group of msgs,
// but the control msg will not return to consumer,
// so we can't to judge whether the consumption has been completed by offset comparison.
Expand Down Expand Up @@ -271,10 +270,28 @@ Status KafkaDataConsumer::group_consume(TimedBlockingQueue<RdKafka::Message*>* q
}
break;
}
case RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED: {
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(fmt::format(
"kafka consume failed, err: {}. You should add READ permission for this topic: {} in topic ACLs",
msg->errstr(), _topic));
break;
}
case RdKafka::ERR_GROUP_AUTHORIZATION_FAILED: {
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(fmt::format(
"kafka consume failed, err: {}. You should add or modify the consumer group '{}' with READ "
"permission for this topic: {} in consumer group ACLs and set the routine load job with "
"`property.group.id` property",
msg->errstr(), _group_id, _topic));
break;
}
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(msg->errstr());
st = Status::InternalError(fmt::format("kafka consume failed, err: {}", msg->errstr()));
break;
}

Expand Down Expand Up @@ -367,13 +384,20 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
continue;
}

if ((*it)->err() == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART) {
err = (*it)->err();
if (err == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART) {
LOG(WARNING) << "unknown topic: " << _topic;
return Status::InternalError(fmt::format("unknown topic: {}", _topic));
} else if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
} else if (err == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) {
std::stringstream ss;
ss << "failed to get kafka topic meta, err: " << RdKafka::err2str(err)
<< ". You should add READ permission for this topic: " << _topic << " in topic ACLs";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
} else if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "err: " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << "err: " << err2str(err);
if (err == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << ", try again";
}
LOG(WARNING) << ss.str();
Expand Down Expand Up @@ -419,7 +443,8 @@ Status KafkaDataConsumer::commit(std::vector<RdKafka::TopicPartition*>& offset)
RdKafka::ErrorCode err = _k_consumer->commitSync(offset);
if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "failed to commit kafka offset : " << RdKafka::err2str(err);
ss << "failed to commit kafka offset, topic: " << topic << ", group id: " << _group_id
<< ", err: " << RdKafka::err2str(err);
return Status::InternalError(ss.str());
}
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,6 +184,7 @@ class KafkaDataConsumer : public DataConsumer {
private:
std::string _brokers;
std::string _topic;
std::string _group_id;
std::unordered_map<std::string, std::string> _custom_properties;

size_t _non_eof_partition_count = 0;
Expand Down

0 comments on commit d8e782a

Please sign in to comment.