Skip to content

Commit

Permalink
[CALCITE-5011] CassandraAdapterDataTypesTest fails with initializatio…
Browse files Browse the repository at this point in the history
…n error

Cache Cassandra sessions based on "hostname, port, keyspace, username, password" information.
  • Loading branch information
asolimando authored and liyafan82 committed Mar 4, 2022
1 parent 2376a3a commit 4fb1a42
Show file tree
Hide file tree
Showing 3 changed files with 76 additions and 82 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.datastax.oss.driver.api.core.cql.ResultSet;
import com.datastax.oss.driver.api.core.cql.Row;
import com.datastax.oss.driver.api.core.data.TupleValue;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;

import org.checkerframework.checker.nullness.qual.Nullable;

Expand Down Expand Up @@ -87,7 +88,7 @@ class CassandraEnumerator implements Enumerator<Object> {
private @Nullable Object currentRowField(int index) {
assert current != null;
final Object o = current.get(index,
CassandraSchema.CODEC_REGISTRY.codecFor(
CodecRegistry.DEFAULT.codecFor(
current.getColumnDefinitions().get(index).getType()));

return convertToEnumeratorObject(o);
Expand Down Expand Up @@ -124,7 +125,7 @@ class CassandraEnumerator implements Enumerator<Object> {
return IntStream.range(0, numComponents)
.mapToObj(i ->
tupleValue.get(i,
CassandraSchema.CODEC_REGISTRY.codecFor(
CodecRegistry.DEFAULT.codecFor(
tupleValue.getType().getComponentTypes().get(i)))
).map(this::convertToEnumeratorObject)
.map(Objects::requireNonNull) // "null" cannot appear inside collections
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,13 +54,10 @@
import com.datastax.oss.driver.api.core.type.MapType;
import com.datastax.oss.driver.api.core.type.SetType;
import com.datastax.oss.driver.api.core.type.TupleType;
import com.datastax.oss.driver.api.core.type.codec.registry.CodecRegistry;
import com.google.common.collect.ImmutableMap;

import org.checkerframework.checker.nullness.qual.Nullable;
import org.slf4j.Logger;

import java.net.InetSocketAddress;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
Expand All @@ -80,83 +77,24 @@ public class CassandraSchema extends AbstractSchema {
final String name;
final Hook.Closeable hook;

static final CodecRegistry CODEC_REGISTRY = CodecRegistry.DEFAULT;
static final CqlToSqlTypeConversionRules CQL_TO_SQL_TYPE =
CqlToSqlTypeConversionRules.instance();

protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();

private static final int DEFAULT_CASSANDRA_PORT = 9042;

/**
* Creates a Cassandra schema.
*
* @param host Cassandra host, e.g. "localhost"
* @param keyspace Cassandra keyspace name, e.g. "twissandra"
*/
@SuppressWarnings("unused")
public CassandraSchema(String host, String keyspace, SchemaPlus parentSchema, String name) {
this(host, DEFAULT_CASSANDRA_PORT, keyspace, null, null, parentSchema, name);
}

/**
* Creates a Cassandra schema.
*
* @param host Cassandra host, e.g. "localhost"
* @param port Cassandra port, e.g. 9042
* @param keyspace Cassandra keyspace name, e.g. "twissandra"
* @param session a Cassandra session
* @param parentSchema the parent schema
* @param name the schema name
*/
@SuppressWarnings("unused")
public CassandraSchema(String host, int port, String keyspace,
SchemaPlus parentSchema, String name) {
this(host, port, keyspace, null, null, parentSchema, name);
}

/**
* Creates a Cassandra schema.
*
* @param host Cassandra host, e.g. "localhost"
* @param keyspace Cassandra keyspace name, e.g. "twissandra"
* @param username Cassandra username
* @param password Cassandra password
*/
public CassandraSchema(String host, String keyspace, @Nullable String username,
@Nullable String password, SchemaPlus parentSchema, String name) {
this(host, DEFAULT_CASSANDRA_PORT, keyspace, username, password, parentSchema, name);
}

/**
* Creates a Cassandra schema.
*
* @param host Cassandra host, e.g. "localhost"
* @param port Cassandra port, e.g. 9042
* @param keyspace Cassandra keyspace name, e.g. "twissandra"
* @param username Cassandra username
* @param password Cassandra password
*/
public CassandraSchema(String host, int port, String keyspace, @Nullable String username,
@Nullable String password, SchemaPlus parentSchema, String name) {
public CassandraSchema(CqlSession session, SchemaPlus parentSchema, String name) {
super();

this.keyspace = keyspace;
try {
if (username != null && password != null) {
this.session = CqlSession.builder()
.addContactPoint(new InetSocketAddress(host, port))
.withAuthCredentials(username, password)
.withKeyspace(keyspace)
.withLocalDatacenter("datacenter1")
.build();
} else {
this.session = CqlSession.builder()
.addContactPoint(new InetSocketAddress(host, port))
.withKeyspace(keyspace)
.withLocalDatacenter("datacenter1")
.build();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
this.session = session;
this.keyspace = session.getKeyspace()
.orElseThrow(() -> new RuntimeException("No keyspace for session " + session.getName()))
.asInternal();
this.parentSchema = parentSchema;
this.name = name;
this.hook = prepareHook();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,36 +19,91 @@
import org.apache.calcite.schema.Schema;
import org.apache.calcite.schema.SchemaFactory;
import org.apache.calcite.schema.SchemaPlus;
import org.apache.calcite.util.trace.CalciteTrace;

import com.datastax.oss.driver.api.core.CqlSession;
import com.google.common.collect.ImmutableSet;

import org.slf4j.Logger;

import java.net.InetSocketAddress;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.stream.Collectors;

/**
* Factory that creates a {@link CassandraSchema}.
*/
@SuppressWarnings("UnusedDeclaration")
public class CassandraSchemaFactory implements SchemaFactory {

private static final int DEFAULT_CASSANDRA_PORT = 9042;
private static final Map<Map<String, Object>, CqlSession> INFO_TO_SESSION =
new ConcurrentHashMap<>();
private static final Set<String> SESSION_DEFINING_KEYS = ImmutableSet.of(
"host", "port", "keyspace", "username", "password");
protected static final Logger LOGGER = CalciteTrace.getPlannerTracer();

public CassandraSchemaFactory() {
super();
}

@Override public Schema create(SchemaPlus parentSchema, String name,
Map<String, Object> operand) {
Map map = (Map) operand;
String host = (String) map.get("host");
String keyspace = (String) map.get("keyspace");
String username = (String) map.get("username");
String password = (String) map.get("password");

final Map<String, Object> sessionMap = projectMapOverKeys(operand, SESSION_DEFINING_KEYS);

INFO_TO_SESSION.computeIfAbsent(sessionMap, m -> {
String host = (String) m.get("host");
String keyspace = (String) m.get("keyspace");
String username = (String) m.get("username");
String password = (String) m.get("password");
int port = getPort(m);

if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Creating session for info {}", m);
}
try {
if (username != null && password != null) {
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(host, port))
.withAuthCredentials(username, password)
.withKeyspace(keyspace)
.withLocalDatacenter("datacenter1")
.build();
} else {
return CqlSession.builder()
.addContactPoint(new InetSocketAddress(host, port))
.withKeyspace(keyspace)
.withLocalDatacenter("datacenter1")
.build();
}
} catch (Exception e) {
throw new RuntimeException(e);
}
});

return new CassandraSchema(INFO_TO_SESSION.get(sessionMap), parentSchema, name);
}

private static Map<String, Object> projectMapOverKeys(
Map<String, Object> map, Set<String> keysToKeep) {
return map.entrySet().stream()
.filter(e -> keysToKeep.contains(e.getKey()))
.collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue));
}

private static int getPort(Map<String, Object> map) {
if (map.containsKey("port")) {
Object portObj = map.get("port");
int port;
if (portObj instanceof String) {
port = Integer.parseInt((String) portObj);
return Integer.parseInt((String) portObj);
} else {
port = (int) portObj;
return (int) portObj;
}
return new CassandraSchema(host, port, keyspace, username, password, parentSchema, name);
} else {
return new CassandraSchema(host, keyspace, username, password, parentSchema, name);
return DEFAULT_CASSANDRA_PORT;
}
}
}

0 comments on commit 4fb1a42

Please sign in to comment.