Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Enhancement] Improve kafka group authorization failed message in routine load #46136

Merged
merged 2 commits into from
Jul 17, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
47 changes: 36 additions & 11 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,14 +64,13 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
auto conf_deleter = [conf]() { delete conf; };
DeferOp delete_conf([conf_deleter] { return conf_deleter(); });

std::string group_id;
auto it = ctx->kafka_info->properties.find("group.id");
if (it == ctx->kafka_info->properties.end()) {
group_id = BackendOptions::get_localhost() + "_" + UniqueId::gen_uid().to_string();
_group_id = BackendOptions::get_localhost() + "_" + UniqueId::gen_uid().to_string();
} else {
group_id = it->second;
_group_id = it->second;
}
LOG(INFO) << "init kafka consumer with group id: " << group_id;
LOG(INFO) << "init kafka consumer with group id: " << _group_id;

std::string errstr;
auto set_conf = [&conf, &errstr](const std::string& conf_key, const std::string& conf_val) {
Expand All @@ -94,7 +93,7 @@ Status KafkaDataConsumer::init(StreamLoadContext* ctx) {
};

RETURN_IF_ERROR(set_conf("metadata.broker.list", ctx->kafka_info->brokers));
RETURN_IF_ERROR(set_conf("group.id", group_id));
RETURN_IF_ERROR(set_conf("group.id", _group_id));
// For transaction producer, producer will append one control msg to the group of msgs,
// but the control msg will not return to consumer,
// so we can't to judge whether the consumption has been completed by offset comparison.
Expand Down Expand Up @@ -270,10 +269,28 @@ Status KafkaDataConsumer::group_consume(TimedBlockingQueue<RdKafka::Message*>* q
}
break;
}
case RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED: {
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(fmt::format(
"kafka consume failed, err: {}. You should add READ permission for this topic: {} in topic ACLs",
msg->errstr(), _topic));
break;
}
case RdKafka::ERR_GROUP_AUTHORIZATION_FAILED: {
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(fmt::format(
"kafka consume failed, err: {}. You should add or modify the consumer group '{}' with READ "
"permission for this topic: {} in consumer group ACLs and set the routine load job with "
"`property.group.id` property",
msg->errstr(), _group_id, _topic));
break;
}
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(msg->errstr());
st = Status::InternalError(fmt::format("kafka consume failed, err: {}", msg->errstr()));
break;
}

Expand Down Expand Up @@ -360,13 +377,20 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
continue;
}

if ((*it)->err() == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART) {
err = (*it)->err();
if (err == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART) {
LOG(WARNING) << "unknown topic: " << _topic;
return Status::InternalError(fmt::format("unknown topic: {}", _topic));
} else if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
} else if (err == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) {
std::stringstream ss;
ss << "failed to get kafka topic meta, err: " << RdKafka::err2str(err)
<< ". You should add READ permission for this topic: " << _topic << " in topic ACLs";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
} else if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "err: " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << "err: " << err2str(err);
if (err == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << ", try again";
}
LOG(WARNING) << ss.str();
Expand Down Expand Up @@ -424,7 +448,8 @@ Status KafkaDataConsumer::commit(const std::string& topic, const std::map<int32_
RdKafka::ErrorCode err = _k_consumer->commitSync(topic_partitions);
if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "failed to commit kafka offset : " << RdKafka::err2str(err);
ss << "failed to commit kafka offset, topic: " << topic << ", group id: " << _group_id
<< ", err: " << RdKafka::err2str(err);
return Status::InternalError(ss.str());
}
return Status::OK();
Expand Down
1 change: 1 addition & 0 deletions be/src/runtime/routine_load/data_consumer.h
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@ class KafkaDataConsumer : public DataConsumer {
private:
std::string _brokers;
std::string _topic;
std::string _group_id;
std::unordered_map<std::string, std::string> _custom_properties;

size_t _non_eof_partition_count = 0;
Expand Down
Loading