Skip to content

Commit

Permalink
[Enhancement] Improve kafka topic and group authorization failed message
Browse files Browse the repository at this point in the history
Signed-off-by: wyb <[email protected]>
  • Loading branch information
wyb committed Jul 11, 2024
1 parent 4bdd206 commit c66dde0
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions be/src/runtime/routine_load/data_consumer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -270,10 +270,27 @@ Status KafkaDataConsumer::group_consume(TimedBlockingQueue<RdKafka::Message*>* q
}
break;
}
case RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED: {
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(fmt::format(
"kafka consume failed, err: {}. You should add READ permission for this topic in topic ACLs.",
msg->errstr()));
break;
}
case RdKafka::ERR_GROUP_AUTHORIZATION_FAILED: {
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(fmt::format(
"kafka consume failed, err: {}. You should add a consumer group with READ permission for this "
"topic in consumer group ACLs and set the routine load job with `property.group.id` property.",
msg->errstr()));
break;
}
default:
LOG(WARNING) << "kafka consume failed: " << _id << ", msg: " << msg->errstr();
done = true;
st = Status::InternalError(msg->errstr());
st = Status::InternalError(fmt::format("kafka consume failed, err: {}", msg->errstr()));
break;
}

Expand Down Expand Up @@ -360,13 +377,20 @@ Status KafkaDataConsumer::get_partition_meta(std::vector<int32_t>* partition_ids
continue;
}

if ((*it)->err() == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART) {
err = (*it)->err();
if (err == RdKafka::ERR_UNKNOWN_TOPIC_OR_PART) {
LOG(WARNING) << "unknown topic: " << _topic;
return Status::InternalError(fmt::format("unknown topic: {}", _topic));
} else if ((*it)->err() != RdKafka::ERR_NO_ERROR) {
} else if (err == RdKafka::ERR_TOPIC_AUTHORIZATION_FAILED) {
std::stringstream ss;
ss << "failed to get kafka topic: " << _topic << " meta, err: " << RdKafka::err2str(err)
<< ". You should add READ permission for this topic in topic ACLs.";
LOG(WARNING) << ss.str();
return Status::InternalError(ss.str());
} else if (err != RdKafka::ERR_NO_ERROR) {
std::stringstream ss;
ss << "err: " << err2str((*it)->err());
if ((*it)->err() == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << "err: " << err2str(err);
if (err == RdKafka::ERR_LEADER_NOT_AVAILABLE) {
ss << ", try again";
}
LOG(WARNING) << ss.str();
Expand Down

0 comments on commit c66dde0

Please sign in to comment.