From c4a808d58aa3d6b64bd491f24548b47c716b0185 Mon Sep 17 00:00:00 2001 From: Bret McGuire Date: Wed, 4 Sep 2024 11:26:40 -0500 Subject: [PATCH] PYTHON-1369 Extend driver vector support to arbitrary subtypes and fix handling of variable length types (OSS C* 5.0) (#1217) --- cassandra/__init__.py | 6 - cassandra/cqltypes.py | 97 ++++++++-- cassandra/encoder.py | 5 + cassandra/marshal.py | 46 ++++- tests/integration/__init__.py | 7 +- tests/integration/standard/test_types.py | 189 ++++++++++++++++++- tests/unit/test_types.py | 226 ++++++++++++++++++----- 7 files changed, 504 insertions(+), 72 deletions(-) diff --git a/cassandra/__init__.py b/cassandra/__init__.py index 4a5b8b29a3..045fc98cdc 100644 --- a/cassandra/__init__.py +++ b/cassandra/__init__.py @@ -744,9 +744,3 @@ def __init__(self, msg, excs=[]): if excs: complete_msg += ("\nThe following exceptions were observed: \n - " + '\n - '.join(str(e) for e in excs)) Exception.__init__(self, complete_msg) - -class VectorDeserializationFailure(DriverException): - """ - The driver was unable to deserialize a given vector - """ - pass diff --git a/cassandra/cqltypes.py b/cassandra/cqltypes.py index 12d90e2746..4c3af57887 100644 --- a/cassandra/cqltypes.py +++ b/cassandra/cqltypes.py @@ -48,8 +48,8 @@ int32_pack, int32_unpack, int64_pack, int64_unpack, float_pack, float_unpack, double_pack, double_unpack, varint_pack, varint_unpack, point_be, point_le, - vints_pack, vints_unpack) -from cassandra import util, VectorDeserializationFailure + vints_pack, vints_unpack, uvint_unpack, uvint_pack) +from cassandra import util _little_endian_flag = 1 # we always serialize LE import ipaddress @@ -392,6 +392,9 @@ def cass_parameterized_type(cls, full=False): """ return cls.cass_parameterized_type_with(cls.subtypes, full=full) + @classmethod + def serial_size(cls): + return None # it's initially named with a _ to avoid registering it as a real type, but # client programs may want to use the name still for isinstance(), etc @@ -457,10 +460,12 @@ def serialize(uuid, protocol_version): except AttributeError: raise TypeError("Got a non-UUID object for a UUID value") + @classmethod + def serial_size(cls): + return 16 class BooleanType(_CassandraType): typename = 'boolean' - serial_size = 1 @staticmethod def deserialize(byts, protocol_version): @@ -470,6 +475,10 @@ def deserialize(byts, protocol_version): def serialize(truth, protocol_version): return int8_pack(truth) + @classmethod + def serial_size(cls): + return 1 + class ByteType(_CassandraType): typename = 'tinyint' @@ -500,7 +509,6 @@ def serialize(var, protocol_version): class FloatType(_CassandraType): typename = 'float' - serial_size = 4 @staticmethod def deserialize(byts, protocol_version): @@ -510,10 +518,12 @@ def deserialize(byts, protocol_version): def serialize(byts, protocol_version): return float_pack(byts) + @classmethod + def serial_size(cls): + return 4 class DoubleType(_CassandraType): typename = 'double' - serial_size = 8 @staticmethod def deserialize(byts, protocol_version): @@ -523,10 +533,12 @@ def deserialize(byts, protocol_version): def serialize(byts, protocol_version): return double_pack(byts) + @classmethod + def serial_size(cls): + return 8 class LongType(_CassandraType): typename = 'bigint' - serial_size = 8 @staticmethod def deserialize(byts, protocol_version): @@ -536,10 +548,12 @@ def deserialize(byts, protocol_version): def serialize(byts, protocol_version): return int64_pack(byts) + @classmethod + def serial_size(cls): + return 8 class Int32Type(_CassandraType): typename = 'int' - serial_size = 4 @staticmethod def deserialize(byts, protocol_version): @@ -549,6 +563,9 @@ def deserialize(byts, protocol_version): def serialize(byts, protocol_version): return int32_pack(byts) + @classmethod + def serial_size(cls): + return 4 class IntegerType(_CassandraType): typename = 'varint' @@ -645,6 +662,9 @@ def serialize(v, protocol_version): return int64_pack(int(timestamp)) + @classmethod + def serial_size(cls): + return 8 class TimestampType(DateType): pass @@ -652,7 +672,6 @@ class TimestampType(DateType): class TimeUUIDType(DateType): typename = 'timeuuid' - serial_size = 16 def my_timestamp(self): return util.unix_time_from_uuid1(self.val) @@ -668,6 +687,9 @@ def serialize(timeuuid, protocol_version): except AttributeError: raise TypeError("Got a non-UUID object for a UUID value") + @classmethod + def serial_size(cls): + return 16 class SimpleDateType(_CassandraType): typename = 'date' @@ -699,7 +721,6 @@ def serialize(val, protocol_version): class ShortType(_CassandraType): typename = 'smallint' - serial_size = 2 @staticmethod def deserialize(byts, protocol_version): @@ -709,10 +730,14 @@ def deserialize(byts, protocol_version): def serialize(byts, protocol_version): return int16_pack(byts) - class TimeType(_CassandraType): typename = 'time' - serial_size = 8 + # Time should be a fixed size 8 byte type but Cassandra 5.0 code marks it as + # variable size... and we have to match what the server expects since the server + # uses that specification to encode data of that type. + #@classmethod + #def serial_size(cls): + # return 8 @staticmethod def deserialize(byts, protocol_version): @@ -1409,6 +1434,11 @@ class VectorType(_CassandraType): vector_size = 0 subtype = None + @classmethod + def serial_size(cls): + serialized_size = cls.subtype.serial_size() + return cls.vector_size * serialized_size if serialized_size is not None else None + @classmethod def apply_parameters(cls, params, names): assert len(params) == 2 @@ -1418,19 +1448,50 @@ def apply_parameters(cls, params, names): @classmethod def deserialize(cls, byts, protocol_version): - serialized_size = getattr(cls.subtype, "serial_size", None) - if not serialized_size: - raise VectorDeserializationFailure("Cannot determine serialized size for vector with subtype %s" % cls.subtype.__name__) - indexes = (serialized_size * x for x in range(0, cls.vector_size)) - return [cls.subtype.deserialize(byts[idx:idx + serialized_size], protocol_version) for idx in indexes] + serialized_size = cls.subtype.serial_size() + if serialized_size is not None: + expected_byte_size = serialized_size * cls.vector_size + if len(byts) != expected_byte_size: + raise ValueError( + "Expected vector of type {0} and dimension {1} to have serialized size {2}; observed serialized size of {3} instead"\ + .format(cls.subtype.typename, cls.vector_size, expected_byte_size, len(byts))) + indexes = (serialized_size * x for x in range(0, cls.vector_size)) + return [cls.subtype.deserialize(byts[idx:idx + serialized_size], protocol_version) for idx in indexes] + + idx = 0 + rv = [] + while (len(rv) < cls.vector_size): + try: + size, bytes_read = uvint_unpack(byts[idx:]) + idx += bytes_read + rv.append(cls.subtype.deserialize(byts[idx:idx + size], protocol_version)) + idx += size + except: + raise ValueError("Error reading additional data during vector deserialization after successfully adding {} elements"\ + .format(len(rv))) + + # If we have any additional data in the serialized vector treat that as an error as well + if idx < len(byts): + raise ValueError("Additional bytes remaining after vector deserialization completed") + return rv @classmethod def serialize(cls, v, protocol_version): + v_length = len(v) + if cls.vector_size != v_length: + raise ValueError( + "Expected sequence of size {0} for vector of type {1} and dimension {0}, observed sequence of length {2}"\ + .format(cls.vector_size, cls.subtype.typename, v_length)) + + serialized_size = cls.subtype.serial_size() buf = io.BytesIO() for item in v: - buf.write(cls.subtype.serialize(item, protocol_version)) + item_bytes = cls.subtype.serialize(item, protocol_version) + if serialized_size is None: + buf.write(uvint_pack(len(item_bytes))) + buf.write(item_bytes) return buf.getvalue() @classmethod def cql_parameterized_type(cls): - return "%s<%s, %s>" % (cls.typename, cls.subtype.typename, cls.vector_size) + return "%s<%s, %s>" % (cls.typename, cls.subtype.cql_parameterized_type(), cls.vector_size) diff --git a/cassandra/encoder.py b/cassandra/encoder.py index 31d90549f4..e834550fd3 100644 --- a/cassandra/encoder.py +++ b/cassandra/encoder.py @@ -21,6 +21,7 @@ log = logging.getLogger(__name__) from binascii import hexlify +from decimal import Decimal import calendar import datetime import math @@ -59,6 +60,7 @@ class Encoder(object): def __init__(self): self.mapping = { float: self.cql_encode_float, + Decimal: self.cql_encode_decimal, bytearray: self.cql_encode_bytes, str: self.cql_encode_str, int: self.cql_encode_object, @@ -217,3 +219,6 @@ def cql_encode_ipaddress(self, val): is suitable for ``inet`` type columns. """ return "'%s'" % val.compressed + + def cql_encode_decimal(self, val): + return self.cql_encode_float(float(val)) \ No newline at end of file diff --git a/cassandra/marshal.py b/cassandra/marshal.py index 726f0819eb..a527a9e1d7 100644 --- a/cassandra/marshal.py +++ b/cassandra/marshal.py @@ -111,7 +111,6 @@ def vints_unpack(term): # noqa return tuple(values) - def vints_pack(values): revbytes = bytearray() values = [int(v) for v in values[::-1]] @@ -143,3 +142,48 @@ def vints_pack(values): revbytes.reverse() return bytes(revbytes) + +def uvint_unpack(bytes): + first_byte = bytes[0] + + if (first_byte & 128) == 0: + return (first_byte,1) + + num_extra_bytes = 8 - (~first_byte & 0xff).bit_length() + rv = first_byte & (0xff >> num_extra_bytes) + for idx in range(1,num_extra_bytes + 1): + new_byte = bytes[idx] + rv <<= 8 + rv |= new_byte & 0xff + + return (rv, num_extra_bytes + 1) + +def uvint_pack(val): + rv = bytearray() + if val < 128: + rv.append(val) + else: + v = val + num_extra_bytes = 0 + num_bits = v.bit_length() + # We need to reserve (num_extra_bytes+1) bits in the first byte + # ie. with 1 extra byte, the first byte needs to be something like '10XXXXXX' # 2 bits reserved + # ie. with 8 extra bytes, the first byte needs to be '11111111' # 8 bits reserved + reserved_bits = num_extra_bytes + 1 + while num_bits > (8-(reserved_bits)): + num_extra_bytes += 1 + num_bits -= 8 + reserved_bits = min(num_extra_bytes + 1, 8) + rv.append(v & 0xff) + v >>= 8 + + if num_extra_bytes > 8: + raise ValueError('Value %d is too big and cannot be encoded as vint' % val) + + # We can now store the last bits in the first byte + n = 8 - num_extra_bytes + v |= (0xff >> n << n) + rv.append(abs(v)) + + rv.reverse() + return bytes(rv) diff --git a/tests/integration/__init__.py b/tests/integration/__init__.py index 5aa702c727..e389742b74 100644 --- a/tests/integration/__init__.py +++ b/tests/integration/__init__.py @@ -330,9 +330,10 @@ def _id_and_mark(f): greaterthanorequalcass36 = unittest.skipUnless(CASSANDRA_VERSION >= Version('3.6'), 'Cassandra version 3.6 or greater required') greaterthanorequalcass3_10 = unittest.skipUnless(CASSANDRA_VERSION >= Version('3.10'), 'Cassandra version 3.10 or greater required') greaterthanorequalcass3_11 = unittest.skipUnless(CASSANDRA_VERSION >= Version('3.11'), 'Cassandra version 3.11 or greater required') -greaterthanorequalcass40 = unittest.skipUnless(CASSANDRA_VERSION >= Version('4.0-a'), 'Cassandra version 4.0 or greater required') -lessthanorequalcass40 = unittest.skipUnless(CASSANDRA_VERSION <= Version('4.0-a'), 'Cassandra version less or equal to 4.0 required') -lessthancass40 = unittest.skipUnless(CASSANDRA_VERSION < Version('4.0-a'), 'Cassandra version less than 4.0 required') +greaterthanorequalcass40 = unittest.skipUnless(CASSANDRA_VERSION >= Version('4.0'), 'Cassandra version 4.0 or greater required') +greaterthanorequalcass50 = unittest.skipUnless(CASSANDRA_VERSION >= Version('5.0-beta'), 'Cassandra version 5.0 or greater required') +lessthanorequalcass40 = unittest.skipUnless(CASSANDRA_VERSION <= Version('4.0'), 'Cassandra version less or equal to 4.0 required') +lessthancass40 = unittest.skipUnless(CASSANDRA_VERSION < Version('4.0'), 'Cassandra version less than 4.0 required') lessthancass30 = unittest.skipUnless(CASSANDRA_VERSION < Version('3.0'), 'Cassandra version less then 3.0 required') greaterthanorequaldse68 = unittest.skipUnless(DSE_VERSION and DSE_VERSION >= Version('6.8'), "DSE 6.8 or greater required for this test") greaterthanorequaldse67 = unittest.skipUnless(DSE_VERSION and DSE_VERSION >= Version('6.7'), "DSE 6.7 or greater required for this test") diff --git a/tests/integration/standard/test_types.py b/tests/integration/standard/test_types.py index 016c2b9785..55bf117ace 100644 --- a/tests/integration/standard/test_types.py +++ b/tests/integration/standard/test_types.py @@ -14,9 +14,17 @@ import unittest -from datetime import datetime import ipaddress import math +import random +import string +import socket +import uuid + +from datetime import datetime, date, time, timedelta +from decimal import Decimal +from functools import partial + from packaging.version import Version import cassandra @@ -31,7 +39,7 @@ from tests.integration import use_singledc, execute_until_pass, notprotocolv1, \ BasicSharedKeyspaceUnitTestCase, greaterthancass21, lessthancass30, greaterthanorequaldse51, \ - DSE_VERSION, greaterthanorequalcass3_10, requiredse, TestCluster + DSE_VERSION, greaterthanorequalcass3_10, requiredse, TestCluster, greaterthanorequalcass50 from tests.integration.datatype_utils import update_datatypes, PRIMITIVE_DATATYPES, COLLECTION_TYPES, PRIMITIVE_DATATYPES_KEYS, \ get_sample, get_all_samples, get_collection_sample @@ -1291,3 +1299,180 @@ def run_inserts_at_version(self, proto_ver): finally: session.cluster.shutdown() + +@greaterthanorequalcass50 +class TypeTestsVector(BasicSharedKeyspaceUnitTestCase): + + def _get_first_j(self, rs): + rows = rs.all() + self.assertEqual(len(rows), 1) + return rows[0].j + + def _get_row_simple(self, idx, table_name): + rs = self.session.execute("select j from {0}.{1} where i = {2}".format(self.keyspace_name, table_name, idx)) + return self._get_first_j(rs) + + def _get_row_prepared(self, idx, table_name): + cql = "select j from {0}.{1} where i = ?".format(self.keyspace_name, table_name) + ps = self.session.prepare(cql) + rs = self.session.execute(ps, [idx]) + return self._get_first_j(rs) + + def _round_trip_test(self, subtype, subtype_fn, test_fn, use_positional_parameters=True): + + table_name = subtype.replace("<","A").replace(">", "B").replace(",", "C") + "isH" + + def random_subtype_vector(): + return [subtype_fn() for _ in range(3)] + + ddl = """CREATE TABLE {0}.{1} ( + i int PRIMARY KEY, + j vector<{2}, 3>)""".format(self.keyspace_name, table_name, subtype) + self.session.execute(ddl) + + if use_positional_parameters: + cql = "insert into {0}.{1} (i,j) values (%s,%s)".format(self.keyspace_name, table_name) + expected1 = random_subtype_vector() + data1 = {1:random_subtype_vector(), 2:expected1, 3:random_subtype_vector()} + for k,v in data1.items(): + # Attempt a set of inserts using the driver's support for positional params + self.session.execute(cql, (k,v)) + + cql = "insert into {0}.{1} (i,j) values (?,?)".format(self.keyspace_name, table_name) + expected2 = random_subtype_vector() + ps = self.session.prepare(cql) + data2 = {4:random_subtype_vector(), 5:expected2, 6:random_subtype_vector()} + for k,v in data2.items(): + # Add some additional rows via prepared statements + self.session.execute(ps, [k,v]) + + # Use prepared queries to gather data from the rows we added via simple queries and vice versa + if use_positional_parameters: + observed1 = self._get_row_prepared(2, table_name) + for idx in range(0, 3): + test_fn(observed1[idx], expected1[idx]) + + observed2 = self._get_row_simple(5, table_name) + for idx in range(0, 3): + test_fn(observed2[idx], expected2[idx]) + + def test_round_trip_integers(self): + self._round_trip_test("int", partial(random.randint, 0, 2 ** 31), self.assertEqual) + self._round_trip_test("bigint", partial(random.randint, 0, 2 ** 63), self.assertEqual) + self._round_trip_test("smallint", partial(random.randint, 0, 2 ** 15), self.assertEqual) + self._round_trip_test("tinyint", partial(random.randint, 0, (2 ** 7) - 1), self.assertEqual) + self._round_trip_test("varint", partial(random.randint, 0, 2 ** 63), self.assertEqual) + + def test_round_trip_floating_point(self): + _almost_equal_test_fn = partial(self.assertAlmostEqual, places=5) + def _random_decimal(): + return Decimal(random.uniform(0.0, 100.0)) + + # Max value here isn't really connected to max value for floating point nums in IEEE 754... it's used here + # mainly as a convenient benchmark + self._round_trip_test("float", partial(random.uniform, 0.0, 100.0), _almost_equal_test_fn) + self._round_trip_test("double", partial(random.uniform, 0.0, 100.0), _almost_equal_test_fn) + self._round_trip_test("decimal", _random_decimal, _almost_equal_test_fn) + + def test_round_trip_text(self): + def _random_string(): + return ''.join(random.choice(string.ascii_uppercase + string.digits) for _ in range(24)) + + self._round_trip_test("ascii", _random_string, self.assertEqual) + self._round_trip_test("text", _random_string, self.assertEqual) + + def test_round_trip_date_and_time(self): + _almost_equal_test_fn = partial(self.assertAlmostEqual, delta=timedelta(seconds=1)) + def _random_datetime(): + return datetime.today() - timedelta(hours=random.randint(0,18), days=random.randint(1,1000)) + def _random_date(): + return _random_datetime().date() + def _random_time(): + return _random_datetime().time() + + self._round_trip_test("date", _random_date, self.assertEqual) + self._round_trip_test("time", _random_time, self.assertEqual) + self._round_trip_test("timestamp", _random_datetime, _almost_equal_test_fn) + + def test_round_trip_uuid(self): + self._round_trip_test("uuid", uuid.uuid1, self.assertEqual) + self._round_trip_test("timeuuid", uuid.uuid1, self.assertEqual) + + def test_round_trip_miscellany(self): + def _random_bytes(): + return random.getrandbits(32).to_bytes(4,'big') + def _random_boolean(): + return random.choice([True, False]) + def _random_duration(): + return Duration(random.randint(0,11), random.randint(0,11), random.randint(0,10000)) + def _random_inet(): + return socket.inet_ntoa(_random_bytes()) + + self._round_trip_test("boolean", _random_boolean, self.assertEqual) + self._round_trip_test("duration", _random_duration, self.assertEqual) + self._round_trip_test("inet", _random_inet, self.assertEqual) + self._round_trip_test("blob", _random_bytes, self.assertEqual) + + def test_round_trip_collections(self): + def _random_seq(): + return [random.randint(0,100000) for _ in range(8)] + def _random_set(): + return set(_random_seq()) + def _random_map(): + return {k:v for (k,v) in zip(_random_seq(), _random_seq())} + + # Goal here is to test collections of both fixed and variable size subtypes + self._round_trip_test("list", _random_seq, self.assertEqual) + self._round_trip_test("list", _random_seq, self.assertEqual) + self._round_trip_test("set", _random_set, self.assertEqual) + self._round_trip_test("set", _random_set, self.assertEqual) + self._round_trip_test("map", _random_map, self.assertEqual) + self._round_trip_test("map", _random_map, self.assertEqual) + self._round_trip_test("map", _random_map, self.assertEqual) + self._round_trip_test("map", _random_map, self.assertEqual) + + def test_round_trip_vector_of_vectors(self): + def _random_vector(): + return [random.randint(0,100000) for _ in range(2)] + + self._round_trip_test("vector", _random_vector, self.assertEqual) + self._round_trip_test("vector", _random_vector, self.assertEqual) + + def test_round_trip_tuples(self): + def _random_tuple(): + return (random.randint(0,100000),random.randint(0,100000)) + + # Unfortunately we can't use positional parameters when inserting tuples because the driver will try to encode + # them as lists before sending them to the server... and that confuses the parsing logic. + self._round_trip_test("tuple", _random_tuple, self.assertEqual, use_positional_parameters=False) + self._round_trip_test("tuple", _random_tuple, self.assertEqual, use_positional_parameters=False) + self._round_trip_test("tuple", _random_tuple, self.assertEqual, use_positional_parameters=False) + self._round_trip_test("tuple", _random_tuple, self.assertEqual, use_positional_parameters=False) + + def test_round_trip_udts(self): + def _udt_equal_test_fn(udt1, udt2): + self.assertEqual(udt1.a, udt2.a) + self.assertEqual(udt1.b, udt2.b) + + self.session.execute("create type {}.fixed_type (a int, b int)".format(self.keyspace_name)) + self.session.execute("create type {}.mixed_type_one (a int, b varint)".format(self.keyspace_name)) + self.session.execute("create type {}.mixed_type_two (a varint, b int)".format(self.keyspace_name)) + self.session.execute("create type {}.var_type (a varint, b varint)".format(self.keyspace_name)) + + class GeneralUDT: + def __init__(self, a, b): + self.a = a + self.b = b + + self.cluster.register_user_type(self.keyspace_name,'fixed_type', GeneralUDT) + self.cluster.register_user_type(self.keyspace_name,'mixed_type_one', GeneralUDT) + self.cluster.register_user_type(self.keyspace_name,'mixed_type_two', GeneralUDT) + self.cluster.register_user_type(self.keyspace_name,'var_type', GeneralUDT) + + def _random_udt(): + return GeneralUDT(random.randint(0,100000),random.randint(0,100000)) + + self._round_trip_test("fixed_type", _random_udt, _udt_equal_test_fn) + self._round_trip_test("mixed_type_one", _random_udt, _udt_equal_test_fn) + self._round_trip_test("mixed_type_two", _random_udt, _udt_equal_test_fn) + self._round_trip_test("var_type", _random_udt, _udt_equal_test_fn) diff --git a/tests/unit/test_types.py b/tests/unit/test_types.py index 25641c046d..aba11d4ced 100644 --- a/tests/unit/test_types.py +++ b/tests/unit/test_types.py @@ -20,7 +20,7 @@ from binascii import unhexlify import cassandra -from cassandra import util, VectorDeserializationFailure +from cassandra import util from cassandra.cqltypes import ( CassandraType, DateRangeType, DateType, DecimalType, EmptyValue, LongType, SetType, UTF8Type, @@ -196,6 +196,16 @@ def test_parse_casstype_vector(self): self.assertEqual(3, ctype.vector_size) self.assertEqual(FloatType, ctype.subtype) + def test_parse_casstype_vector_of_vectors(self): + inner_type = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)" + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(%s, 3)" % (inner_type)) + self.assertTrue(issubclass(ctype, VectorType)) + self.assertEqual(3, ctype.vector_size) + sub_ctype = ctype.subtype + self.assertTrue(issubclass(sub_ctype, VectorType)) + self.assertEqual(4, sub_ctype.vector_size) + self.assertEqual(FloatType, sub_ctype.subtype) + def test_empty_value(self): self.assertEqual(str(EmptyValue()), 'EMPTY') @@ -309,8 +319,44 @@ def test_cql_quote(self): self.assertEqual(cql_quote('test'), "'test'") self.assertEqual(cql_quote(0), '0') - def test_vector_round_trip_types_with_serialized_size(self): - # Test all the types which specify a serialized size... see PYTHON-1371 for details + +class VectorTests(unittest.TestCase): + def _normalize_set(self, val): + if isinstance(val, set) or isinstance(val, util.SortedSet): + return frozenset([self._normalize_set(v) for v in val]) + return val + + def _round_trip_compare_fn(self, first, second): + if isinstance(first, float): + self.assertAlmostEqual(first, second, places=5) + elif isinstance(first, list): + self.assertEqual(len(first), len(second)) + for (felem, selem) in zip(first, second): + self._round_trip_compare_fn(felem, selem) + elif isinstance(first, set) or isinstance(first, frozenset): + self.assertEqual(len(first), len(second)) + first_norm = self._normalize_set(first) + second_norm = self._normalize_set(second) + self.assertEqual(first_norm, second_norm) + elif isinstance(first, dict): + for ((fk,fv), (sk,sv)) in zip(first.items(), second.items()): + self._round_trip_compare_fn(fk, sk) + self._round_trip_compare_fn(fv, sv) + else: + self.assertEqual(first,second) + + def _round_trip_test(self, data, ctype_str): + ctype = parse_casstype_args(ctype_str) + data_bytes = ctype.serialize(data, 0) + serialized_size = ctype.subtype.serial_size() + if serialized_size: + self.assertEqual(serialized_size * len(data), len(data_bytes)) + result = ctype.deserialize(data_bytes, 0) + self.assertEqual(len(data), len(result)) + for idx in range(0,len(data)): + self._round_trip_compare_fn(data[idx], result[idx]) + + def test_round_trip_basic_types_with_fixed_serialized_size(self): self._round_trip_test([True, False, False, True], \ "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.BooleanType, 4)") self._round_trip_test([3.4, 2.9, 41.6, 12.0], \ @@ -325,55 +371,151 @@ def test_vector_round_trip_types_with_serialized_size(self): "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.TimeUUIDType, 4)") self._round_trip_test([3, 2, 41, 12], \ "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.ShortType, 4)") - self._round_trip_test([datetime.time(1,1,1), datetime.time(2,2,2), datetime.time(3,3,3)], \ - "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.TimeType, 3)") - def test_vector_round_trip_types_without_serialized_size(self): - # Test all the types which do not specify a serialized size... see PYTHON-1371 for details + def test_round_trip_basic_types_without_fixed_serialized_size(self): # Varints - with self.assertRaises(VectorDeserializationFailure): - self._round_trip_test([3, 2, 41, 12], \ - "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.IntegerType, 4)") + self._round_trip_test([3, 2, 41, 12], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.IntegerType, 4)") # ASCII text - with self.assertRaises(VectorDeserializationFailure): - self._round_trip_test(["abc", "def", "ghi", "jkl"], \ - "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.AsciiType, 4)") + self._round_trip_test(["abc", "def", "ghi", "jkl"], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.AsciiType, 4)") # UTF8 text - with self.assertRaises(VectorDeserializationFailure): - self._round_trip_test(["abc", "def", "ghi", "jkl"], \ - "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.UTF8Type, 4)") + self._round_trip_test(["abc", "def", "ghi", "jkl"], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.UTF8Type, 4)") + # Time is something of a weird one. By rights it should be a fixed size type but C* code marks it as variable + # size. We're forced to follow the C* code base (since that's who'll be providing the data we're parsing) so + # we match what they're doing. + self._round_trip_test([datetime.time(1,1,1), datetime.time(2,2,2), datetime.time(3,3,3)], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.TimeType, 3)") # Duration (containts varints) - with self.assertRaises(VectorDeserializationFailure): - self._round_trip_test([util.Duration(1,1,1), util.Duration(2,2,2), util.Duration(3,3,3)], \ - "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.DurationType, 3)") - # List (of otherwise serializable type) - with self.assertRaises(VectorDeserializationFailure): - self._round_trip_test([[3.4], [2.9], [41.6], [12.0]], \ - "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.ListType(org.apache.cassandra.db.marshal.FloatType), 4)") - # Set (of otherwise serializable type) - with self.assertRaises(VectorDeserializationFailure): - self._round_trip_test([set([3.4]), set([2.9]), set([41.6]), set([12.0])], \ - "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.SetType(org.apache.cassandra.db.marshal.FloatType), 4)") - # Map (of otherwise serializable types) - with self.assertRaises(VectorDeserializationFailure): - self._round_trip_test([{1:3.4}, {2:2.9}, {3:41.6}, {4:12.0}], \ + self._round_trip_test([util.Duration(1,1,1), util.Duration(2,2,2), util.Duration(3,3,3)], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.DurationType, 3)") + + def test_round_trip_collection_types(self): + # List (subtype of fixed size) + self._round_trip_test([[1, 2, 3, 4], [5, 6], [7, 8, 9, 10], [11, 12]], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.ListType \ + (org.apache.cassandra.db.marshal.Int32Type), 4)") + # Set (subtype of fixed size) + self._round_trip_test([set([1, 2, 3, 4]), set([5, 6]), set([7, 8, 9, 10]), set([11, 12])], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.SetType \ + (org.apache.cassandra.db.marshal.Int32Type), 4)") + # Map (subtype of fixed size) + self._round_trip_test([{1:1.2}, {2:3.4}, {3:5.6}, {4:7.8}], \ "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.MapType \ (org.apache.cassandra.db.marshal.Int32Type,org.apache.cassandra.db.marshal.FloatType), 4)") + # List (subtype without fixed size) + self._round_trip_test([["one","two"], ["three","four"], ["five","six"], ["seven","eight"]], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.ListType \ + (org.apache.cassandra.db.marshal.AsciiType), 4)") + # Set (subtype without fixed size) + self._round_trip_test([set(["one","two"]), set(["three","four"]), set(["five","six"]), set(["seven","eight"])], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.SetType \ + (org.apache.cassandra.db.marshal.AsciiType), 4)") + # Map (subtype without fixed size) + self._round_trip_test([{1:"one"}, {2:"two"}, {3:"three"}, {4:"four"}], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.MapType \ + (org.apache.cassandra.db.marshal.IntegerType,org.apache.cassandra.db.marshal.AsciiType), 4)") + # List of lists (subtype without fixed size) + data = [[["one","two"],["three"]], [["four"],["five"]], [["six","seven","eight"]], [["nine"]]] + ctype = "org.apache.cassandra.db.marshal.VectorType\ + (org.apache.cassandra.db.marshal.ListType\ + (org.apache.cassandra.db.marshal.ListType\ + (org.apache.cassandra.db.marshal.AsciiType)), 4)" + self._round_trip_test(data, ctype) + # Set of sets (subtype without fixed size) + data = [set([frozenset(["one","two"]),frozenset(["three"])]),\ + set([frozenset(["four"]),frozenset(["five"])]),\ + set([frozenset(["six","seven","eight"])]), + set([frozenset(["nine"])])] + ctype = "org.apache.cassandra.db.marshal.VectorType\ + (org.apache.cassandra.db.marshal.SetType\ + (org.apache.cassandra.db.marshal.SetType\ + (org.apache.cassandra.db.marshal.AsciiType)), 4)" + self._round_trip_test(data, ctype) + # Map of maps (subtype without fixed size) + data = [{100:{1:"one",2:"two",3:"three"}},\ + {200:{4:"four",5:"five"}},\ + {300:{}},\ + {400:{6:"six"}}] + ctype = "org.apache.cassandra.db.marshal.VectorType\ + (org.apache.cassandra.db.marshal.MapType\ + (org.apache.cassandra.db.marshal.Int32Type,\ + org.apache.cassandra.db.marshal.MapType \ + (org.apache.cassandra.db.marshal.IntegerType,org.apache.cassandra.db.marshal.AsciiType)), 4)" + self._round_trip_test(data, ctype) + + def test_round_trip_vector_of_vectors(self): + # Subytpes of subtypes with a fixed size + self._round_trip_test([[1.2, 3.4], [5.6, 7.8], [9.10, 11.12], [13.14, 15.16]], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.VectorType \ + (org.apache.cassandra.db.marshal.FloatType,2), 4)") + + # Subytpes of subtypes without a fixed size + self._round_trip_test([["one", "two"], ["three", "four"], ["five", "six"], ["seven", "eight"]], \ + "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.VectorType \ + (org.apache.cassandra.db.marshal.AsciiType,2), 4)") + + # parse_casstype_args() is tested above... we're explicitly concerned about cql_parapmeterized_type() output here + def test_cql_parameterized_type(self): + # Base vector functionality + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)") + self.assertEqual(ctype.cql_parameterized_type(), "org.apache.cassandra.db.marshal.VectorType") - def _round_trip_test(self, data, ctype_str): - ctype = parse_casstype_args(ctype_str) - data_bytes = ctype.serialize(data, 0) - serialized_size = getattr(ctype.subtype, "serial_size", None) - if serialized_size: - self.assertEqual(serialized_size * len(data), len(data_bytes)) - result = ctype.deserialize(data_bytes, 0) - self.assertEqual(len(data), len(result)) - for idx in range(0,len(data)): - self.assertAlmostEqual(data[idx], result[idx], places=5) + # Test vector-of-vectors + inner_type = "org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)" + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(%s, 3)" % (inner_type)) + inner_parsed_type = "org.apache.cassandra.db.marshal.VectorType" + self.assertEqual(ctype.cql_parameterized_type(), "org.apache.cassandra.db.marshal.VectorType<%s, 3>" % (inner_parsed_type)) + + def test_serialization_fixed_size_too_small(self): + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 5)") + with self.assertRaisesRegex(ValueError, "Expected sequence of size 5 for vector of type float and dimension 5, observed sequence of length 4"): + ctype.serialize([1.2, 3.4, 5.6, 7.8], 0) - def test_vector_cql_parameterized_type(self): + def test_serialization_fixed_size_too_big(self): ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)") - self.assertEqual(ctype.cql_parameterized_type(), "org.apache.cassandra.db.marshal.VectorType") + with self.assertRaisesRegex(ValueError, "Expected sequence of size 4 for vector of type float and dimension 4, observed sequence of length 5"): + ctype.serialize([1.2, 3.4, 5.6, 7.8, 9.10], 0) + + def test_serialization_variable_size_too_small(self): + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.IntegerType, 5)") + with self.assertRaisesRegex(ValueError, "Expected sequence of size 5 for vector of type varint and dimension 5, observed sequence of length 4"): + ctype.serialize([1, 2, 3, 4], 0) + + def test_serialization_variable_size_too_big(self): + ctype = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.IntegerType, 4)") + with self.assertRaisesRegex(ValueError, "Expected sequence of size 4 for vector of type varint and dimension 4, observed sequence of length 5"): + ctype.serialize([1, 2, 3, 4, 5], 0) + + def test_deserialization_fixed_size_too_small(self): + ctype_four = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)") + ctype_four_bytes = ctype_four.serialize([1.2, 3.4, 5.6, 7.8], 0) + ctype_five = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 5)") + with self.assertRaisesRegex(ValueError, "Expected vector of type float and dimension 5 to have serialized size 20; observed serialized size of 16 instead"): + ctype_five.deserialize(ctype_four_bytes, 0) + + def test_deserialization_fixed_size_too_big(self): + ctype_five = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 5)") + ctype_five_bytes = ctype_five.serialize([1.2, 3.4, 5.6, 7.8, 9.10], 0) + ctype_four = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.FloatType, 4)") + with self.assertRaisesRegex(ValueError, "Expected vector of type float and dimension 4 to have serialized size 16; observed serialized size of 20 instead"): + ctype_four.deserialize(ctype_five_bytes, 0) + + def test_deserialization_variable_size_too_small(self): + ctype_four = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.IntegerType, 4)") + ctype_four_bytes = ctype_four.serialize([1, 2, 3, 4], 0) + ctype_five = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.IntegerType, 5)") + with self.assertRaisesRegex(ValueError, "Error reading additional data during vector deserialization after successfully adding 4 elements"): + ctype_five.deserialize(ctype_four_bytes, 0) + + def test_deserialization_variable_size_too_big(self): + ctype_five = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.IntegerType, 5)") + ctype_five_bytes = ctype_five.serialize([1, 2, 3, 4, 5], 0) + ctype_four = parse_casstype_args("org.apache.cassandra.db.marshal.VectorType(org.apache.cassandra.db.marshal.IntegerType, 4)") + with self.assertRaisesRegex(ValueError, "Additional bytes remaining after vector deserialization completed"): + ctype_four.deserialize(ctype_five_bytes, 0) + ZERO = datetime.timedelta(0)