Skip to content

Commit

Permalink
KAFKA-18672; CoordinatorRecordSerde must validate value version (apac…
Browse files Browse the repository at this point in the history
…he#18749)

CoordinatorRecordSerde does not validate the version of the value to check whether the version is supported by the current version of the software. This is problematic if a future and unsupported version of the record is read by an older version of the software because it would misinterpret the bytes. Hence CoordinatorRecordSerde must throw an error if the version is unknown. This is also consistent with the handling in the old coordinator.

Reviewers: Jeff Kim <[email protected]>
  • Loading branch information
dajac authored Feb 3, 2025
1 parent eb01221 commit bf05d2c
Show file tree
Hide file tree
Showing 12 changed files with 88 additions and 38 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,23 +28,6 @@
*/
public interface CoordinatorLoader<U> extends AutoCloseable {

/**
* UnknownRecordTypeException is thrown when the Deserializer encounters
* an unknown record type.
*/
class UnknownRecordTypeException extends RuntimeException {
private final short unknownType;

public UnknownRecordTypeException(short unknownType) {
super(String.format("Found an unknown record type %d", unknownType));
this.unknownType = unknownType;
}

public short unknownType() {
return unknownType;
}
}

/**
* Object that is returned as part of the future from load(). Holds the partition load time and the
* end time.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,11 @@ public CoordinatorRecord deserialize(

final ApiMessage valueMessage = apiMessageValueFor(recordType);
final short valueVersion = readVersion(valueBuffer, "value");

if (valueVersion < valueMessage.lowestSupportedVersion() || valueVersion > valueMessage.highestSupportedVersion()) {
throw new UnknownRecordVersionException(recordType, valueVersion);
}

readMessage(valueMessage, valueBuffer, valueVersion, "value");

return CoordinatorRecord.record(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,42 @@
* @param <T> The record type.
*/
public interface Deserializer<T> {
/**
* UnknownRecordTypeException is thrown when the Deserializer encounters
* an unknown record type.
*/
class UnknownRecordTypeException extends RuntimeException {
private final short unknownType;

public UnknownRecordTypeException(short unknownType) {
super(String.format("Found an unknown record type %d", unknownType));
this.unknownType = unknownType;
}

public short unknownType() {
return unknownType;
}
}

class UnknownRecordVersionException extends RuntimeException {
private final short type;
private final short unknownVersion;

public UnknownRecordVersionException(short type, short unknownVersion) {
super(String.format("Found an unknown record version %d for %d type", unknownVersion, type));
this.type = type;
this.unknownVersion = unknownVersion;
}

public short type() {
return type;
}

public short unknownVersion() {
return unknownVersion;
}
}

/**
* Deserializes the key and the value.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, FileRecords, MemoryRecords}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.Time
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.{LoadSummary, UnknownRecordTypeException}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.LoadSummary
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorLoader, CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.server.util.KafkaScheduler
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/scala/kafka/tools/DumpLogSegments.scala
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@ import org.apache.kafka.common.protocol.{ApiMessage, ByteBufferAccessor}
import org.apache.kafka.common.record._
import org.apache.kafka.common.utils.Utils
import org.apache.kafka.coordinator.group.generated.{GroupMetadataValue, GroupMetadataValueJsonConverter, CoordinatorRecordJsonConverters => GroupCoordinatorRecordJsonConverters, CoordinatorRecordType => GroupCoordinatorRecordType}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.group.GroupCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.ShareCoordinatorRecordSerde
import org.apache.kafka.coordinator.share.generated.{CoordinatorRecordJsonConverters => ShareCoordinatorRecordJsonConverters}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.kafka.common.errors.NotLeaderOrFollowerException
import org.apache.kafka.common.record.{ControlRecordType, EndTransactionMarker, FileRecords, MemoryRecords, RecordBatch, SimpleRecord}
import org.apache.kafka.common.requests.TransactionResult
import org.apache.kafka.common.utils.{MockTime, Time}
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.Deserializer.UnknownRecordTypeException
import org.apache.kafka.coordinator.common.runtime.{CoordinatorPlayback, Deserializer}
import org.apache.kafka.server.storage.log.FetchIsolation
import org.apache.kafka.storage.internals.log.{FetchDataInfo, LogOffsetMetadata}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;

Expand All @@ -31,7 +30,7 @@ protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}

Expand All @@ -40,7 +39,7 @@ protected ApiMessage apiMessageValueFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataKey;
import org.apache.kafka.coordinator.group.generated.ConsumerGroupMetadataValue;
import org.apache.kafka.coordinator.group.generated.CoordinatorRecordType;
Expand Down Expand Up @@ -121,8 +121,8 @@ public void testDeserializeWithInvalidRecordType() {

ByteBuffer valueBuffer = ByteBuffer.allocate(64);

CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
Expand Down Expand Up @@ -198,6 +198,34 @@ public void testDeserializeWithInvalidValueBytes() {
ex.getMessage());
}

@Test
public void testDeserializeWithInvalidValueVersion() {
GroupCoordinatorRecordSerde serde = new GroupCoordinatorRecordSerde();

ApiMessage key = new ConsumerGroupMetadataKey().setGroupId("foo");
ByteBuffer keyBuffer = MessageUtil.toCoordinatorTypePrefixedByteBuffer(key);

ByteBuffer valueBuffer1 = ByteBuffer.allocate(2);
valueBuffer1.putShort((short) (ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1));
valueBuffer1.rewind();

Deserializer.UnknownRecordVersionException ex =
assertThrows(Deserializer.UnknownRecordVersionException.class,
() -> serde.deserialize(keyBuffer, valueBuffer1));
assertEquals(key.apiKey(), ex.type());
assertEquals(ConsumerGroupMetadataValue.HIGHEST_SUPPORTED_VERSION + 1, ex.unknownVersion());

keyBuffer.rewind();
ByteBuffer valueBuffer2 = ByteBuffer.allocate(2);
valueBuffer2.putShort((short) (ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1));
valueBuffer2.rewind();

ex = assertThrows(Deserializer.UnknownRecordVersionException.class,
() -> serde.deserialize(keyBuffer, valueBuffer2));
assertEquals(key.apiKey(), ex.type());
assertEquals(ConsumerGroupMetadataValue.LOWEST_SUPPORTED_VERSION - 1, ex.unknownVersion());
}

@Test
public void testDeserializeAllRecordTypes() {
for (CoordinatorRecordType record : CoordinatorRecordType.values()) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;

Expand All @@ -29,7 +28,7 @@ protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}

Expand All @@ -38,7 +37,7 @@ protected ApiMessage apiMessageValueFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
import org.apache.kafka.common.Uuid;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.share.generated.CoordinatorRecordType;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotKey;
import org.apache.kafka.coordinator.share.generated.ShareSnapshotValue;
Expand Down Expand Up @@ -113,8 +113,8 @@ public void testDeserializeWithInvalidRecordType() {

ByteBuffer valueBuffer = ByteBuffer.allocate(64);

CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@

import org.apache.kafka.common.errors.UnsupportedVersionException;
import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecordSerde;
import org.apache.kafka.coordinator.transaction.generated.CoordinatorRecordType;

Expand All @@ -29,7 +28,7 @@ protected ApiMessage apiMessageKeyFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordKey();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}

Expand All @@ -38,7 +37,7 @@ protected ApiMessage apiMessageValueFor(short recordType) {
try {
return CoordinatorRecordType.fromId(recordType).newRecordValue();
} catch (UnsupportedVersionException ex) {
throw new CoordinatorLoader.UnknownRecordTypeException(recordType);
throw new UnknownRecordTypeException(recordType);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@

import org.apache.kafka.common.protocol.ApiMessage;
import org.apache.kafka.common.protocol.MessageUtil;
import org.apache.kafka.coordinator.common.runtime.CoordinatorLoader;
import org.apache.kafka.coordinator.common.runtime.CoordinatorRecord;
import org.apache.kafka.coordinator.common.runtime.Deserializer;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogKey;
import org.apache.kafka.coordinator.transaction.generated.TransactionLogValue;
import org.apache.kafka.server.common.ApiMessageAndVersion;
Expand Down Expand Up @@ -120,8 +120,8 @@ public void testDeserializeWithInvalidRecordType() {

ByteBuffer valueBuffer = ByteBuffer.allocate(64);

CoordinatorLoader.UnknownRecordTypeException ex =
assertThrows(CoordinatorLoader.UnknownRecordTypeException.class,
Deserializer.UnknownRecordTypeException ex =
assertThrows(Deserializer.UnknownRecordTypeException.class,
() -> serde.deserialize(keyBuffer, valueBuffer));
assertEquals((short) 255, ex.unknownType());
}
Expand Down

0 comments on commit bf05d2c

Please sign in to comment.