Skip to content

Commit

Permalink
Merge pull request #7 from opencredo/config_def_test_and_validate
Browse files Browse the repository at this point in the history
Added unit tests around config and other minor tweaks.
  • Loading branch information
rufusfnash authored Jan 24, 2019
2 parents c29aed9 + e525a88 commit acf920b
Show file tree
Hide file tree
Showing 14 changed files with 279 additions and 93 deletions.
23 changes: 19 additions & 4 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ kafka-connect-venafi-tpp is a [Kafka connector](http://kafka.apache.org/document

# Development
To manually install the connector:
1. Build the Jar with `mvn package`
2. Find the Jar in your target folder called venafi-tpp-log-connector-<version you're building>.jar
1. Build the JAR with `mvn package`
2. Find the JAR in your target folder called venafi-tpp-log-connector-<version you're building>.jar
3. Create a connect property file
```
name=venafi
Expand All @@ -20,11 +20,26 @@ venafi.password=placeholder_password
venafi.batch.size=100
venafi.poll.interval=1000
```
This is filled with the default values as provided by the config definition [class](???)
This is filled with the default values as provided by the [config definition class](./src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java).
Whilst all `venafi` fields are currently optional and will default to above please change `venafi.base.url`, `venafi.username` and `venafi.password`.
4. Create a directory to place this files e.g. `<path-to-confluent>/share/kafka/plugins`.
5. Add this to the plugin path in your Connect properties file.
6. Then call: `bin/confluent load venafi -d venafi.properties`
If you need to unload/reload it use: `bin/confluent unload venafi`

For more information please look at the [Confluent instructions on manually installing connectors](https://docs.confluent.io/current/connect/managing/install.html#connect-install-connectors).

---
If you need to unload/reload it use: `bin/confluent unload venafi`
If you intend to change the JAR please stop, change the JAR, then start the cluster.

# Useful commands while developing
```
sudo bin/confluent start
sudo bin/confluent status
sudo bin/confluent load venafi -d ~/venafi.properties
sudo bin/confluent status venafi
sudo bin/kafka-topics --list --zookeeper localhost:2181
sudo bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-offsets --from-beginning
sudo bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic TPP-LOGS
sudo bin/confluent log connect
```
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.google.gson.JsonDeserializer;
import com.google.gson.JsonElement;
import com.google.gson.JsonParseException;
import org.apache.kafka.connect.errors.ConnectException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -30,15 +31,8 @@ private static ZonedDateTime getParsedDate(String dateTimeString) {
try {
return ZonedDateTime.parse(dateTimeString);
} catch (DateTimeParseException e) {
//swallow exception for now
log.debug("Failed to parse to ZonedDateTime format", e);
}

try {
return ZonedDateTime.parse(dateTimeString + "Z");
} catch (DateTimeParseException up) {
log.debug("Failed to parse to ZonedDateTime format with added Z.", up);
throw up;
throw new ConnectException(e);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,12 @@

public class TppLogSourceConfig extends AbstractConfig {
public static final String BASE_URL_CONFIG = "venafi.base.url";
private static final String BASE_URL_DEFAULT = "https://localhost:443/vedsdk";
private static final String BASE_URL_DOC = "Url to TPP api with /VEDSDK";

public static final String USERNAME_CONFIG = "venafi.username";
private static final String USERNAME_DEFAULT = "placeholder_username";
private static final String USERNAME_DOC = "The username to use with the /VEDSDK api.";

public static final String PASSWORD_CONFIG = "venafi.password";
private static final String PASSWORD_DEFAULT = "placeholder_password";
private static final String PASSWORD_DOC = "The password to use with the /VEDSDK api.";

public static final String TOPIC_CONFIG = "venafi.topic";
Expand All @@ -31,13 +28,15 @@ public class TppLogSourceConfig extends AbstractConfig {
private static final int POLL_INTERVAL_DEFAULT = 1000;
private static final String POLL_INTERVAL_DOC = "Poll interval in milliseconds.";

public static final int MAX_BATCH_SIZE = 10000;
public static final int MIN_BATCH_SIZE = 2;
public static final ConfigDef CONFIG_DEF = new ConfigDef()
.define(BASE_URL_CONFIG, ConfigDef.Type.STRING, BASE_URL_DEFAULT, ConfigDef.Importance.HIGH, BASE_URL_DOC)
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(BATCH_SIZE, ConfigDef.Type.INT, BATCH_SIZE_DEFAULT, ConfigDef.Importance.LOW, BATCH_SIZE_DOC)
.define(BASE_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, BASE_URL_DOC)
.define(TOPIC_CONFIG, ConfigDef.Type.STRING, TOPIC_DEFAULT, new NonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, TOPIC_DOC)
.define(BATCH_SIZE, ConfigDef.Type.INT, BATCH_SIZE_DEFAULT, ConfigDef.Range.between(MIN_BATCH_SIZE, MAX_BATCH_SIZE), ConfigDef.Importance.LOW, BATCH_SIZE_DOC)
.define(POLL_INTERVAL, ConfigDef.Type.INT, POLL_INTERVAL_DEFAULT, ConfigDef.Importance.LOW, POLL_INTERVAL_DOC)
.define(USERNAME_CONFIG, ConfigDef.Type.STRING, USERNAME_DEFAULT, ConfigDef.Importance.HIGH, USERNAME_DOC)
.define(PASSWORD_CONFIG, ConfigDef.Type.STRING, PASSWORD_DEFAULT, ConfigDef.Importance.HIGH, PASSWORD_DOC);
.define(USERNAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, USERNAME_DOC)
.define(PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PASSWORD_DOC);

public TppLogSourceConfig(Map<String, ?> props) {
super(CONFIG_DEF, props);
Expand All @@ -59,3 +58,12 @@ Map<String, String> returnPropertiesWithDefaultsValuesIfMissing() {
return config;
}
}

final class NonEmptyStringWithoutControlChars extends ConfigDef.NonEmptyStringWithoutControlChars {
//Only here to create nice human readable for exporting to documentation.
@Override
public String toString() {
return "non-empty string and no ISO control characters";
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public Class<? extends Task> taskClass() {
@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
if (maxTasks != 1) {
throw new IllegalArgumentException("max Tasks should be set to 1.");
log.info("Ignoring maxTasks as there can only be one.");
}
List<Map<String, String>> configs = new ArrayList<>(maxTasks);
Map<String, String> taskConfig = new HashMap<>();
Expand All @@ -64,6 +64,6 @@ public ConfigDef config() {

@Override
public String version() {
return "1.0";
return VersionUtil.getVersion();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ public class TppLogSourceTask extends SourceTask {
private String baseUrl;
private String topic;
private String batchSize;
private int apiOffset;
private long apiOffset;
private Long interval;
private Long last_execution = 0L;
private TokenClient tokenClient;
Expand All @@ -35,7 +35,7 @@ static boolean isNotNullOrBlank(String str) {

@Override
public String version() {
return "1.0";
return VersionUtil.getVersion();
}

@Override
Expand All @@ -54,9 +54,9 @@ public void start(Map<String, String> props) {
fromDate = lastRead;
}

Integer lastApiOffset = (Integer) persistedMap.get(LAST_API_OFFSET);
if (lastApiOffset != null) {
apiOffset = lastApiOffset;
Object lastApiOffset = persistedMap.get(LAST_API_OFFSET);
if (lastApiOffset != null && lastApiOffset instanceof Long) {
apiOffset = (Long) lastApiOffset;
}


Expand Down Expand Up @@ -90,7 +90,7 @@ public List<SourceRecord> poll() {

private List<SourceRecord> getTppLogsAsSourceRecords(String token) {

int loopOffset = 0;
long loopOffset = 0;

List<EventLog> jsonLogs = getTppLogs(token, fromDate, apiOffset);
ArrayList<SourceRecord> records = new ArrayList<>();
Expand All @@ -108,20 +108,20 @@ private List<SourceRecord> getTppLogsAsSourceRecords(String token) {
return records;
}

private SourceRecord buildSourceRecord(EventLog eventLog, String lastRead, Integer apiOffset) {
private SourceRecord buildSourceRecord(EventLog eventLog, String lastRead, Long apiOffset) {
Map<String, Object> sourceOffset = buildSourceOffset(lastRead, apiOffset);
Map<String, Object> sourcePartition = buildSourcePartition();
return new SourceRecord(sourcePartition, sourceOffset, topic, EventLog.TppLogSchema(), eventLog.toStruct());
}

private int calculateLoopOffset(int currentLoopOffset, String newFromDate, String oldFromDate) {
private long calculateLoopOffset(long currentLoopOffset, String newFromDate, String oldFromDate) {
if (newFromDate.equals(oldFromDate)) {
return ++currentLoopOffset;
}
return 1;
return 1L;
}

private int calculateApiOffset(int currentLoopOffset, List<EventLog> jsonLogs) {
private long calculateApiOffset(long currentLoopOffset, List<EventLog> jsonLogs) {
if (jsonLogs.size() == currentLoopOffset) {
return apiOffset + currentLoopOffset;
}
Expand All @@ -132,14 +132,14 @@ private Map<String, Object> buildSourcePartition() {
return Collections.singletonMap(URL, baseUrl);
}

private Map<String, Object> buildSourceOffset(String lastRead, Integer apiOffset) {
private Map<String, Object> buildSourceOffset(String lastRead, Long apiOffset) {
Map<String, Object> sourceOffset = new HashMap<>();
sourceOffset.put(LAST_READ, lastRead);
sourceOffset.put(LAST_API_OFFSET, apiOffset);
return sourceOffset;
}

List<EventLog> getTppLogs(String token, String date, int offset) {
List<EventLog> getTppLogs(String token, String date, long offset) {
LogResponse logResponse = LogsClient.getLogs(token, date, baseUrl, batchSize, offset);

return logResponse.getLogEvents();
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
package com.opencredo.connect.venafi.tpp.log;

public class VersionUtil {
public static String getVersion() {
return "1.0.0";

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public interface TppLog {

//If we in the future need to send a query Param with +
// be aware TPP server decodes plus as space so we'd have to actively encode + as %2B
default LogResponse getLogs(String token, String fromTime, String limit, int offset) {
default LogResponse getLogs(String token, String fromTime, String limit, long offset) {
Map<String, Object> queryParams = new HashMap<>();
queryParams.put(FROM_TIME, fromTime);
queryParams.put(LIMIT, limit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
public interface TppPlatformAuthorization {


@RequestLine("POST /authorize")
@RequestLine("POST /authorize/")
@Headers("Content-Type: application/json")
TppToken getToken(Credentials credentials);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
public class LogsClient {
private static final org.slf4j.Logger log = LoggerFactory.getLogger(LogsClient.class);

public static LogResponse getLogs(String token, String date, String baseUrl, String batchSize, int offset) {
public static LogResponse getLogs(String token, String date, String baseUrl, String batchSize, long offset) {
try {
return Feign.builder()
.logger(new Slf4jLogger())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@

public class TokenClient {

private static final org.slf4j.Logger log = LoggerFactory.getLogger(TokenClient.class);
private String tokenValue;
private ZonedDateTime tokenExpiry = ZonedDateTime.now();
private Credentials credentials;
private static final org.slf4j.Logger log = LoggerFactory.getLogger(TokenClient.class);

public TokenClient(String username, String password) {
credentials = new Credentials(username, password);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,16 +29,16 @@ public class EventLog {
public static final Schema SCHEMA = SchemaBuilder.struct()
.name(EventLog.class.getSimpleName())
.field(CLIENT_TIMESTAMP, Timestamp.SCHEMA)
.field(COMPONENT, Schema.STRING_SCHEMA)
.field(COMPONENT_ID, Schema.OPTIONAL_INT32_SCHEMA)
.field(COMPONENT_SUBSYSTEM, Schema.OPTIONAL_STRING_SCHEMA)
.field(EVENT_ID, Schema.OPTIONAL_STRING_SCHEMA)
.field(GROUPING, Schema.INT32_SCHEMA)
.field(ID, Schema.INT64_SCHEMA)
.field(NAME, Schema.STRING_SCHEMA)
.field(SERVER_TIMESTAMP, Timestamp.SCHEMA)
.field(SEVERITY, Schema.STRING_SCHEMA)
.field(SOURCE_IP, Schema.STRING_SCHEMA)
.field(COMPONENT, Schema.OPTIONAL_STRING_SCHEMA)
.field(COMPONENT_ID, Schema.OPTIONAL_INT32_SCHEMA)
.field(COMPONENT_SUBSYSTEM, Schema.OPTIONAL_STRING_SCHEMA)
.field(EVENT_ID, Schema.OPTIONAL_STRING_SCHEMA)
.field(TEXT_1, Schema.OPTIONAL_STRING_SCHEMA)
.field(TEXT_2, Schema.OPTIONAL_STRING_SCHEMA)
.field(VALUE_1, Schema.OPTIONAL_INT32_SCHEMA)
Expand Down Expand Up @@ -137,15 +137,16 @@ public Struct toStruct() {

Struct tppLog = new Struct(TppLogSchema())
.put(CLIENT_TIMESTAMP, Date.from(getClientTimestamp().toInstant()))
.put(COMPONENT, getComponent())

.put(GROUPING, getGrouping())
.put(ID, getId())
.put(NAME, getName())
.put(SERVER_TIMESTAMP, Date.from(getServerTimestamp().toInstant()))
.put(SEVERITY, getSeverity())
.put(SOURCE_IP, getSourceIP());

if (getComponent() != null) {
tppLog.put(COMPONENT, getComponent());
}
if (getEventId() != null) {
tppLog.put(EVENT_ID, getEventId());
}
Expand Down
Loading

0 comments on commit acf920b

Please sign in to comment.