From 9c786189ae98dd61789060ab1e6b357c92669e61 Mon Sep 17 00:00:00 2001 From: Rufus Nash Date: Thu, 24 Jan 2019 14:26:41 +0000 Subject: [PATCH 01/10] Added unit tests around config and other minor tweaks. --- README.md | 23 ++- .../venafi/tpp/log/TppLogSourceConfig.java | 15 +- .../venafi/tpp/log/TppLogSourceConnector.java | 4 +- .../venafi/tpp/log/TppLogSourceTask.java | 24 +-- .../connect/venafi/tpp/log/VersionUtil.java | 8 + .../connect/venafi/tpp/log/api/TppLog.java | 2 +- .../tpp/log/api/TppPlatformAuthorization.java | 2 +- .../venafi/tpp/log/api/client/LogsClient.java | 2 +- .../venafi/tpp/log/model/EventLog.java | 13 +- .../tpp/log/EventLogSourceTaskTest.java | 14 +- .../tpp/log/TppLogSourceConfigTest.java | 154 ++++++++++++++++++ .../tpp/log/TppLogSourceConnectorTest.java | 42 ++--- 12 files changed, 244 insertions(+), 59 deletions(-) create mode 100644 src/main/java/com/opencredo/connect/venafi/tpp/log/VersionUtil.java create mode 100644 src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java diff --git a/README.md b/README.md index a1f8bce..ee8c38f 100644 --- a/README.md +++ b/README.md @@ -5,8 +5,8 @@ kafka-connect-venafi-tpp is a [Kafka connector](http://kafka.apache.org/document # Development To manually install the connector: -1. Build the Jar with `mvn package` -2. Find the Jar in your target folder called venafi-tpp-log-connector-.jar +1. Build the JAR with `mvn package` +2. Find the JAR in your target folder called venafi-tpp-log-connector-.jar 3. Create a connect property file ``` name=venafi @@ -20,11 +20,26 @@ venafi.password=placeholder_password venafi.batch.size=100 venafi.poll.interval=1000 ``` -This is filled with the default values as provided by the config definition [class](???) +This is filled with the default values as provided by the config definition [class](../src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java). Whilst all `venafi` fields are currently optional and will default to above please change `venafi.base.url`, `venafi.username` and `venafi.password`. 4. Create a directory to place this files e.g. `/share/kafka/plugins`. 5. Add this to the plugin path in your Connect properties file. 6. Then call: `bin/confluent load venafi -d venafi.properties` -If you need to unload/reload it use: `bin/confluent unload venafi` For more information please look at the [Confluent instructions on manually installing connectors](https://docs.confluent.io/current/connect/managing/install.html#connect-install-connectors). + +--- +If you need to unload/reload it use: `bin/confluent unload venafi` +If you intend to change the JAR please stop, change the JAR then start the cluster. + +# Useful commands while developing +``` +sudo bin/confluent start +sudo bin/confluent status +sudo bin/confluent load venafi -d ~/venafi.properties +sudo bin/confluent status venafi +sudo bin/kafka-topics --list --zookeeper localhost:2181 +sudo bin/kafka-console-consumer --bootstrap-server localhost:9092 --topic connect-offsets --from-beginning +sudo bin/kafka-avro-console-consumer --bootstrap-server localhost:9092 --topic TPP-LOGS +sudo bin/confluent log connect +``` diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java index d3380a2..5d92f32 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java @@ -8,15 +8,12 @@ public class TppLogSourceConfig extends AbstractConfig { public static final String BASE_URL_CONFIG = "venafi.base.url"; - private static final String BASE_URL_DEFAULT = "https://localhost:443/vedsdk"; private static final String BASE_URL_DOC = "Url to TPP api with /VEDSDK"; public static final String USERNAME_CONFIG = "venafi.username"; - private static final String USERNAME_DEFAULT = "placeholder_username"; private static final String USERNAME_DOC = "The username to use with the /VEDSDK api."; public static final String PASSWORD_CONFIG = "venafi.password"; - private static final String PASSWORD_DEFAULT = "placeholder_password"; private static final String PASSWORD_DOC = "The password to use with the /VEDSDK api."; public static final String TOPIC_CONFIG = "venafi.topic"; @@ -31,13 +28,15 @@ public class TppLogSourceConfig extends AbstractConfig { private static final int POLL_INTERVAL_DEFAULT = 1000; private static final String POLL_INTERVAL_DOC = "Poll interval in milliseconds."; + public static final int MAX_BATCH_SIZE = 10000; + public static final int MIN_BATCH_SIZE = 2; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(BASE_URL_CONFIG, ConfigDef.Type.STRING, BASE_URL_DEFAULT, ConfigDef.Importance.HIGH, BASE_URL_DOC) - .define(TOPIC_CONFIG, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.Importance.HIGH, TOPIC_DOC) - .define(BATCH_SIZE, ConfigDef.Type.INT, BATCH_SIZE_DEFAULT, ConfigDef.Importance.LOW, BATCH_SIZE_DOC) + .define(BASE_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars() , ConfigDef.Importance.HIGH, BASE_URL_DOC) + .define(TOPIC_CONFIG, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, TOPIC_DOC) + .define(BATCH_SIZE, ConfigDef.Type.INT, BATCH_SIZE_DEFAULT, ConfigDef.Range.between(MIN_BATCH_SIZE, MAX_BATCH_SIZE), ConfigDef.Importance.LOW, BATCH_SIZE_DOC) .define(POLL_INTERVAL, ConfigDef.Type.INT, POLL_INTERVAL_DEFAULT, ConfigDef.Importance.LOW, POLL_INTERVAL_DOC) - .define(USERNAME_CONFIG, ConfigDef.Type.STRING, USERNAME_DEFAULT, ConfigDef.Importance.HIGH, USERNAME_DOC) - .define(PASSWORD_CONFIG, ConfigDef.Type.STRING, PASSWORD_DEFAULT, ConfigDef.Importance.HIGH, PASSWORD_DOC); + .define(USERNAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, USERNAME_DOC) + .define(PASSWORD_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, PASSWORD_DOC); public TppLogSourceConfig(Map props) { super(CONFIG_DEF, props); diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java index 530b491..69c6f98 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java @@ -43,7 +43,7 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { if (maxTasks != 1) { - throw new IllegalArgumentException("max Tasks should be set to 1."); + log.info("Ignoring maxTasks as there is can only be one."); } List> configs = new ArrayList<>(maxTasks); Map taskConfig = new HashMap<>(); @@ -64,6 +64,6 @@ public ConfigDef config() { @Override public String version() { - return "1.0"; + return VersionUtil.getVersion(); } } diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceTask.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceTask.java index 8be73cd..410f1d0 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceTask.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceTask.java @@ -24,7 +24,7 @@ public class TppLogSourceTask extends SourceTask { private String baseUrl; private String topic; private String batchSize; - private int apiOffset; + private long apiOffset; private Long interval; private Long last_execution = 0L; private TokenClient tokenClient; @@ -35,7 +35,7 @@ static boolean isNotNullOrBlank(String str) { @Override public String version() { - return "1.0"; + return VersionUtil.getVersion(); } @Override @@ -54,9 +54,9 @@ public void start(Map props) { fromDate = lastRead; } - Integer lastApiOffset = (Integer) persistedMap.get(LAST_API_OFFSET); - if (lastApiOffset != null) { - apiOffset = lastApiOffset; + Object lastApiOffset = persistedMap.get(LAST_API_OFFSET); + if (lastApiOffset != null && lastApiOffset instanceof Long) { + apiOffset = (Long) lastApiOffset; } @@ -90,7 +90,7 @@ public List poll() { private List getTppLogsAsSourceRecords(String token) { - int loopOffset = 0; + long loopOffset = 0; List jsonLogs = getTppLogs(token, fromDate, apiOffset); ArrayList records = new ArrayList<>(); @@ -108,20 +108,20 @@ private List getTppLogsAsSourceRecords(String token) { return records; } - private SourceRecord buildSourceRecord(EventLog eventLog, String lastRead, Integer apiOffset) { + private SourceRecord buildSourceRecord(EventLog eventLog, String lastRead, Long apiOffset) { Map sourceOffset = buildSourceOffset(lastRead, apiOffset); Map sourcePartition = buildSourcePartition(); return new SourceRecord(sourcePartition, sourceOffset, topic, EventLog.TppLogSchema(), eventLog.toStruct()); } - private int calculateLoopOffset(int currentLoopOffset, String newFromDate, String oldFromDate) { + private long calculateLoopOffset(long currentLoopOffset, String newFromDate, String oldFromDate) { if (newFromDate.equals(oldFromDate)) { return ++currentLoopOffset; } - return 1; + return 1L; } - private int calculateApiOffset(int currentLoopOffset, List jsonLogs) { + private long calculateApiOffset(long currentLoopOffset, List jsonLogs) { if (jsonLogs.size() == currentLoopOffset) { return apiOffset + currentLoopOffset; } @@ -132,14 +132,14 @@ private Map buildSourcePartition() { return Collections.singletonMap(URL, baseUrl); } - private Map buildSourceOffset(String lastRead, Integer apiOffset) { + private Map buildSourceOffset(String lastRead, Long apiOffset) { Map sourceOffset = new HashMap<>(); sourceOffset.put(LAST_READ, lastRead); sourceOffset.put(LAST_API_OFFSET, apiOffset); return sourceOffset; } - List getTppLogs(String token, String date, int offset) { + List getTppLogs(String token, String date, long offset) { LogResponse logResponse = LogsClient.getLogs(token, date, baseUrl, batchSize, offset); return logResponse.getLogEvents(); diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/VersionUtil.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/VersionUtil.java new file mode 100644 index 0000000..d0085d5 --- /dev/null +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/VersionUtil.java @@ -0,0 +1,8 @@ +package com.opencredo.connect.venafi.tpp.log; + +public class VersionUtil { + public static String getVersion() { + return "1.0.0"; + + } +} diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/api/TppLog.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/api/TppLog.java index bf24f3d..e7bc113 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/api/TppLog.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/api/TppLog.java @@ -23,7 +23,7 @@ public interface TppLog { //If we in the future need to send a query Param with + // be aware TPP server decodes plus as space so we'd have to actively encode + as %2B - default LogResponse getLogs(String token, String fromTime, String limit, int offset) { + default LogResponse getLogs(String token, String fromTime, String limit, long offset) { Map queryParams = new HashMap<>(); queryParams.put(FROM_TIME, fromTime); queryParams.put(LIMIT, limit); diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/api/TppPlatformAuthorization.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/api/TppPlatformAuthorization.java index bee00c8..1daffb4 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/api/TppPlatformAuthorization.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/api/TppPlatformAuthorization.java @@ -8,7 +8,7 @@ public interface TppPlatformAuthorization { - @RequestLine("POST /authorize") + @RequestLine("POST /authorize/") @Headers("Content-Type: application/json") TppToken getToken(Credentials credentials); diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/api/client/LogsClient.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/api/client/LogsClient.java index dd0cc60..9d50293 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/api/client/LogsClient.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/api/client/LogsClient.java @@ -16,7 +16,7 @@ public class LogsClient { private static final org.slf4j.Logger log = LoggerFactory.getLogger(LogsClient.class); - public static LogResponse getLogs(String token, String date, String baseUrl, String batchSize, int offset) { + public static LogResponse getLogs(String token, String date, String baseUrl, String batchSize, long offset) { try { return Feign.builder() .logger(new Slf4jLogger()) diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/model/EventLog.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/model/EventLog.java index e579b3a..aa26a9f 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/model/EventLog.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/model/EventLog.java @@ -29,16 +29,16 @@ public class EventLog { public static final Schema SCHEMA = SchemaBuilder.struct() .name(EventLog.class.getSimpleName()) .field(CLIENT_TIMESTAMP, Timestamp.SCHEMA) - .field(COMPONENT, Schema.STRING_SCHEMA) - .field(COMPONENT_ID, Schema.OPTIONAL_INT32_SCHEMA) - .field(COMPONENT_SUBSYSTEM, Schema.OPTIONAL_STRING_SCHEMA) - .field(EVENT_ID, Schema.OPTIONAL_STRING_SCHEMA) .field(GROUPING, Schema.INT32_SCHEMA) .field(ID, Schema.INT64_SCHEMA) .field(NAME, Schema.STRING_SCHEMA) .field(SERVER_TIMESTAMP, Timestamp.SCHEMA) .field(SEVERITY, Schema.STRING_SCHEMA) .field(SOURCE_IP, Schema.STRING_SCHEMA) + .field(COMPONENT, Schema.OPTIONAL_STRING_SCHEMA) + .field(COMPONENT_ID, Schema.OPTIONAL_INT32_SCHEMA) + .field(COMPONENT_SUBSYSTEM, Schema.OPTIONAL_STRING_SCHEMA) + .field(EVENT_ID, Schema.OPTIONAL_STRING_SCHEMA) .field(TEXT_1, Schema.OPTIONAL_STRING_SCHEMA) .field(TEXT_2, Schema.OPTIONAL_STRING_SCHEMA) .field(VALUE_1, Schema.OPTIONAL_INT32_SCHEMA) @@ -137,8 +137,6 @@ public Struct toStruct() { Struct tppLog = new Struct(TppLogSchema()) .put(CLIENT_TIMESTAMP, Date.from(getClientTimestamp().toInstant())) - .put(COMPONENT, getComponent()) - .put(GROUPING, getGrouping()) .put(ID, getId()) .put(NAME, getName()) @@ -146,6 +144,9 @@ public Struct toStruct() { .put(SEVERITY, getSeverity()) .put(SOURCE_IP, getSourceIP()); + if (getComponent() != null) { + tppLog.put(COMPONENT, getComponent()); + } if (getEventId() != null) { tppLog.put(EVENT_ID, getEventId()); } diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java index 6131a70..d954543 100644 --- a/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java @@ -57,6 +57,12 @@ private void shutdown() { wireMockServer.shutdown(); } + @Test + public void as_a_task_I_should_return_a_version() { + TppLogSourceTask task = given_a_task_is_setup(); + assertEquals("1.0.0", task.version()); + } + @Test public void as_a_client_I_want_a_token() { @@ -123,7 +129,7 @@ public void as_a_client_I_want_a_token_to_only_regenerate_while_away_token_expir @Test public void as_a_task_I_want_a_valid_context() { - SourceTaskContext mockSourceTaskContext = given_a_mock_source_context_with(getTodayPlus(2), 1); + SourceTaskContext mockSourceTaskContext = given_a_mock_source_context_with(getTodayPlus(2), 1L); given_the_mock_will_respond_to_auth(); given_the_mock_will_respond_to_log_for_offsetsStorage(); @@ -225,7 +231,7 @@ private List when_the_task_is_polled(TppLogSourceTask task) { return task.poll(); } - private SourceTaskContext given_a_mock_source_context_with(ZonedDateTime lastReadDate, Integer lastApiOffset) { + private SourceTaskContext given_a_mock_source_context_with(ZonedDateTime lastReadDate, Long lastApiOffset) { Map config = new HashMap<>(2); config.put(LAST_READ, lastReadDate.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME)); config.put(LAST_API_OFFSET, lastApiOffset); @@ -233,7 +239,7 @@ private SourceTaskContext given_a_mock_source_context_with(ZonedDateTime lastRea return given_a_mock_source_context_with(config); } - private SourceTaskContext given_a_mock_source_context_with(Map config) { + private SourceTaskContext given_a_mock_source_context_with(Map config) { SourceTaskContext mockSourceTaskContext = Mockito.mock(SourceTaskContext.class); OffsetStorageReader mockOffsetStorageReader = Mockito.mock(OffsetStorageReader.class); Mockito.when(mockOffsetStorageReader.offset(Mockito.anyMap())).thenReturn(config); @@ -276,6 +282,8 @@ private Map getTaskConfig() { Map config = new HashMap<>(); config.put(BASE_URL_CONFIG, wireMockServer.baseUrl()); config.put(POLL_INTERVAL, "0"); + config.put(USERNAME_CONFIG, "placeholder_username"); + config.put(PASSWORD_CONFIG, "placeholder_password"); return new TppLogSourceConfig(config).returnPropertiesWithDefaultsValuesIfMissing(); } diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java new file mode 100644 index 0000000..537d29b --- /dev/null +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java @@ -0,0 +1,154 @@ +package com.opencredo.connect.venafi.tpp.log; + +import org.apache.kafka.common.config.ConfigException; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.function.Executable; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; + +import static com.opencredo.connect.venafi.tpp.log.TppLogSourceConfig.*; +import static org.junit.jupiter.api.Assertions.*; + +class TppLogSourceConfigTest { + + public static final String EMPTY_STRING = ""; + public static final int TOO_LARGE_BATCH_SIZE = MAX_BATCH_SIZE + 1; + public static final int TOO_SMALL_BATCH_SIZE = MIN_BATCH_SIZE - 1; + + @Test + void as_a_Config_I_should_be_able_to_generate_a_guide() { + assertNotNull(CONFIG_DEF.toEnrichedRst()); + assertNotEquals("", CONFIG_DEF.toEnrichedRst()); + } + + @Test + void as_a_config_I_should_throw_a_config_exception_if_nothing_is_set() { + Map props = Collections.emptyMap(); + Executable executingConfig = given_an_config_with(props); + then_I_expect_a_config_exception_when_I_run_this(executingConfig); + } + + @Test + void as_a_config_I_should_throw_a_config_exception_if_minimum_properties_are_set() { + Map props = given_a_minimum_list_of_properties(); + Executable executingConfig = given_an_config_with(props); + then_I_expect_no_config_exception_when_run_this(executingConfig); + } + + private Map given_a_minimum_list_of_properties() { + Map props = new HashMap<>(); + props.put(BASE_URL_CONFIG, "https://localhost:443"); + props.put(USERNAME_CONFIG, "placeholder_username"); + props.put(PASSWORD_CONFIG, "placeholder_password"); + return props; + } + + @Test + void as_a_config_I_should_throw_a_config_exception_if_base_URL_is_not_set() { + Map props = given_a_minimum_list_of_properties(); + given_this_is_removed_from(BASE_URL_CONFIG, props); + Executable executingConfig = given_an_config_with(props); + ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); + assertEquals(noDefaultValueFor(BASE_URL_CONFIG), exception.getMessage()); + } + + @Test + void as_a_config_I_should_throw_a_config_exception_if_password_is_not_set() { + Map props = given_a_minimum_list_of_properties(); + given_this_is_removed_from(PASSWORD_CONFIG, props); + Executable executingConfig = given_an_config_with(props); + ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); + assertEquals(noDefaultValueFor(PASSWORD_CONFIG), exception.getMessage()); + } + + @Test + void as_a_config_I_should_throw_a_config_exception_if_username_is_not_set() { + Map props = given_a_minimum_list_of_properties(); + given_this_is_removed_from(USERNAME_CONFIG, props); + Executable executingConfig = given_an_config_with(props); + ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); + assertEquals(noDefaultValueFor(USERNAME_CONFIG), exception.getMessage()); + } + + + @Test + void as_a_config_I_should_throw_a_config_exception_if_base_URL_is_invalid() { + Map props = given_a_minimum_list_of_properties(); + given_this_is_overriden_with(BASE_URL_CONFIG, EMPTY_STRING, props); + Executable executingConfig = given_an_config_with(props); + ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); + assertEquals(invalidConfigForNonEmptyString(BASE_URL_CONFIG, EMPTY_STRING), exception.getMessage()); + } + + @Test + void as_a_config_I_should_throw_a_config_exception_if_topic_is_invalid() { + Map props = given_a_minimum_list_of_properties(); + given_this_is_overriden_with(TOPIC_CONFIG, EMPTY_STRING, props); + Executable executingConfig = given_an_config_with(props); + ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); + assertEquals(invalidConfigForNonEmptyString(TOPIC_CONFIG, EMPTY_STRING), exception.getMessage()); + } + + @Test + void as_a_config_I_should_throw_a_config_exception_if_batch_size_is_too_small() { + Map props = given_a_minimum_list_of_properties(); + given_this_is_overriden_with(BATCH_SIZE, TOO_SMALL_BATCH_SIZE, props); + Executable executingConfig = given_an_config_with(props); + ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); + assertEquals(invalidConfigForTooSmallValue(BATCH_SIZE, TOO_SMALL_BATCH_SIZE, MIN_BATCH_SIZE), exception.getMessage()); + } + + @Test + void as_a_config_I_should_throw_a_config_exception_if_batch_size_is_too_big() { + Map props = given_a_minimum_list_of_properties(); + given_this_is_overriden_with(BATCH_SIZE, TOO_LARGE_BATCH_SIZE, props); + Executable executingConfig = given_an_config_with(props); + ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); + assertEquals(invalidConfigForTooLargeValue(BATCH_SIZE, TOO_LARGE_BATCH_SIZE, MAX_BATCH_SIZE), exception.getMessage()); + } + + private String noDefaultValueFor(String key) { + return "Missing required configuration \"" + key + "\" which has no default value."; + } + + private String invalidConfigForTooSmallValue(String key, int value, int min) { + return invalidValueForConfig(key, Integer.toString(value)) + ": Value must be at least " + min; + } + + private String invalidConfigForTooLargeValue(String key, int value, int max) { + return invalidValueForConfig(key, Integer.toString(value)) + ": Value must be no more than " + max; + } + + private String invalidConfigForNonEmptyString(String key, String value) { + return invalidValueForConfig(key, value) + ": String may not be empty"; + } + + private String invalidValueForConfig(String key, String value) { + return "Invalid value " + value + " for configuration " + key; + } + + private void given_this_is_removed_from(String key, Map props) { + props.remove(key); + } + + private void given_this_is_overriden_with(String key, Object value, Map props) { + props.put(key, value); + } + + private ConfigException then_I_expect_a_config_exception_when_I_run_this(Executable executingConfig) { + return assertThrows(ConfigException.class, executingConfig); + + } + + + private void then_I_expect_no_config_exception_when_run_this(Executable executingConfig) { + assertDoesNotThrow(executingConfig); + + } + + private Executable given_an_config_with(Map props) { + return () -> new TppLogSourceConfig(props); + } +} \ No newline at end of file diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnectorTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnectorTest.java index 7cd3693..f7f21f2 100644 --- a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnectorTest.java +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnectorTest.java @@ -41,17 +41,9 @@ private void shutdown() { } @Test - void as_a_connector_I_should_be_able_to_start_up_with_empty_properties() throws IllegalAccessException, InstantiationException, InterruptedException { - TppLogSourceConnector connector = given_a_connector(); - - when_the_connector_is_started_with_no_properties(connector); - SourceTask sourceTask = then_I_should_be_able_to_get_a_source_task_from_the_connector(connector); - Map taskProperties = then_I_can_get_the_task_properties(connector); - - when_the_task_is_started(sourceTask, taskProperties); - List records = then_the_task_can_be_polled(sourceTask); - - assertEquals(0, records.size()); + public void as_a_task_I_should_return_a_version() { + TppLogSourceConnector source = given_a_source(); + assertEquals("1.0.0", source.version()); } @Test @@ -59,15 +51,16 @@ void as_a_connector_I_should_be_able_to_start_up_with_some_properties() throws I given_the_mock_will_respond_to_auth(); given_the_mock_will_respond_to_log(); - TppLogSourceConnector connector = given_a_connector(); - - when_the_connector_is_started_with_no_properties(connector); - SourceTask sourceTask = then_I_should_be_able_to_get_a_source_task_from_the_connector(connector); - Map taskProperties = then_I_can_get_the_task_properties(connector); + TppLogSourceConnector source = given_a_source(); + when_the_source_is_started_with_minimum_properties(source); + SourceTask sourceTask = then_I_should_be_able_to_get_a_source_task_from_the_connector(source); + Map taskProperties = then_I_can_get_the_task_properties(source); when_the_task_is_started(sourceTask, taskProperties); List records = then_the_task_can_be_polled(sourceTask); - assertEquals(0, records.size()); + assertEquals(2, records.size()); + wireMockServer.verify(1, postRequestedFor(urlPathMatching(AUTHORIZE_API_REGEX_PATH))); + wireMockServer.verify(1, getRequestedFor(urlPathMatching(LOG_API_REGEX_PATH))); } private List then_the_task_can_be_polled(SourceTask sourceTask) throws InterruptedException { @@ -88,14 +81,21 @@ private SourceTask then_I_should_be_able_to_get_a_source_task_from_the_connector return (SourceTask) task; } - private void when_the_connector_is_started_with_no_properties(TppLogSourceConnector connector) { - when_the_connector_is_started_with_properties(connector, new HashMap<>()); + private void when_the_source_is_started_with_minimum_properties(TppLogSourceConnector source) { + Map props = new HashMap<>(); + props.put(TppLogSourceConfig.BASE_URL_CONFIG, wireMockServer.baseUrl()); + props.put(TppLogSourceConfig.USERNAME_CONFIG, "placeholder_username"); + props.put(TppLogSourceConfig.PASSWORD_CONFIG, "placeholder_password"); + + + when_the_source_is_started_with_properties(source, props); } - private void when_the_connector_is_started_with_properties(TppLogSourceConnector connector,Map props){ + private void when_the_source_is_started_with_properties(TppLogSourceConnector connector, Map props) { connector.start(props); } - private TppLogSourceConnector given_a_connector() { + + private TppLogSourceConnector given_a_source() { return new TppLogSourceConnector(); } From e2a820d41803d79a52bf68142f11452ffe74b12b Mon Sep 17 00:00:00 2001 From: rufusfnash <46454852+rufusfnash@users.noreply.github.com> Date: Thu, 24 Jan 2019 14:28:48 +0000 Subject: [PATCH 02/10] Update README.md Tweak to readme --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index ee8c38f..098d3b2 100644 --- a/README.md +++ b/README.md @@ -20,7 +20,7 @@ venafi.password=placeholder_password venafi.batch.size=100 venafi.poll.interval=1000 ``` -This is filled with the default values as provided by the config definition [class](../src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java). +This is filled with the default values as provided by the [config definition class](./src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java). Whilst all `venafi` fields are currently optional and will default to above please change `venafi.base.url`, `venafi.username` and `venafi.password`. 4. Create a directory to place this files e.g. `/share/kafka/plugins`. 5. Add this to the plugin path in your Connect properties file. From 3b8a987d8b666da00b85576d2193a67be8ca22af Mon Sep 17 00:00:00 2001 From: chbalzer <33552278+chbalzer@users.noreply.github.com> Date: Thu, 24 Jan 2019 14:55:12 +0000 Subject: [PATCH 03/10] Update README.md Co-Authored-By: rufusfnash <46454852+rufusfnash@users.noreply.github.com> --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 098d3b2..ecfc659 100644 --- a/README.md +++ b/README.md @@ -30,7 +30,7 @@ For more information please look at the [Confluent instructions on manually inst --- If you need to unload/reload it use: `bin/confluent unload venafi` -If you intend to change the JAR please stop, change the JAR then start the cluster. +If you intend to change the JAR please stop, change the JAR, then start the cluster. # Useful commands while developing ``` From bfba343bb908336a2f7581448f72d022c9bc382c Mon Sep 17 00:00:00 2001 From: Rufus Nash Date: Thu, 24 Jan 2019 14:59:30 +0000 Subject: [PATCH 04/10] minor auto format classes --- .../venafi/tpp/log/TppLogSourceConfig.java | 13 +++++- .../connect/venafi/tpp/log/VersionUtil.java | 2 +- .../tpp/log/api/client/TokenClient.java | 2 +- .../tpp/log/EventLogSourceTaskTest.java | 43 +++++++++---------- 4 files changed, 33 insertions(+), 27 deletions(-) diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java index 5d92f32..ec8ae3b 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfig.java @@ -31,8 +31,8 @@ public class TppLogSourceConfig extends AbstractConfig { public static final int MAX_BATCH_SIZE = 10000; public static final int MIN_BATCH_SIZE = 2; public static final ConfigDef CONFIG_DEF = new ConfigDef() - .define(BASE_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE,ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars() , ConfigDef.Importance.HIGH, BASE_URL_DOC) - .define(TOPIC_CONFIG, ConfigDef.Type.STRING, TOPIC_DEFAULT, ConfigDef.NonEmptyStringWithoutControlChars.nonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, TOPIC_DOC) + .define(BASE_URL_CONFIG, ConfigDef.Type.STRING, ConfigDef.NO_DEFAULT_VALUE, new NonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, BASE_URL_DOC) + .define(TOPIC_CONFIG, ConfigDef.Type.STRING, TOPIC_DEFAULT, new NonEmptyStringWithoutControlChars(), ConfigDef.Importance.HIGH, TOPIC_DOC) .define(BATCH_SIZE, ConfigDef.Type.INT, BATCH_SIZE_DEFAULT, ConfigDef.Range.between(MIN_BATCH_SIZE, MAX_BATCH_SIZE), ConfigDef.Importance.LOW, BATCH_SIZE_DOC) .define(POLL_INTERVAL, ConfigDef.Type.INT, POLL_INTERVAL_DEFAULT, ConfigDef.Importance.LOW, POLL_INTERVAL_DOC) .define(USERNAME_CONFIG, ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, USERNAME_DOC) @@ -58,3 +58,12 @@ Map returnPropertiesWithDefaultsValuesIfMissing() { return config; } } + +final class NonEmptyStringWithoutControlChars extends ConfigDef.NonEmptyStringWithoutControlChars { + //Only here to create nice human readable for exporting to documentation. + @Override + public String toString() { + return "non-empty string and no ISO control characters"; + } +} + diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/VersionUtil.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/VersionUtil.java index d0085d5..91cd59e 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/VersionUtil.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/VersionUtil.java @@ -2,7 +2,7 @@ public class VersionUtil { public static String getVersion() { - return "1.0.0"; + return "1.0.0"; } } diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/api/client/TokenClient.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/api/client/TokenClient.java index 3bfcc7d..6fb997b 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/api/client/TokenClient.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/api/client/TokenClient.java @@ -18,10 +18,10 @@ public class TokenClient { + private static final org.slf4j.Logger log = LoggerFactory.getLogger(TokenClient.class); private String tokenValue; private ZonedDateTime tokenExpiry = ZonedDateTime.now(); private Credentials credentials; - private static final org.slf4j.Logger log = LoggerFactory.getLogger(TokenClient.class); public TokenClient(String username, String password) { credentials = new Credentials(username, password); diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java index d954543..66cef80 100644 --- a/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java @@ -43,6 +43,26 @@ static ZonedDateTime getTodayPlus(int seconds) { return TODAY.plusSeconds(seconds); } + static String createLogEventBody(ZonedDateTime dateTime) { + return " {\n" + + " \"ClientTimestamp\": \"" + dateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) + "\",\n" + + " \"Component\": \"\\\\VED\\\\Policy\\\\certificates\\\\_Discovered\\\\TrustNet\\\\defaultwebsite.lab.venafi.com - 83\",\n" + + " \"ComponentId\": 123185,\n" + + " \"ComponentSubsystem\": \"Config\",\n" + + " \"Data\": null,\n" + + " \"Grouping\": 0,\n" + + " \"Id\": 1835016,\n" + + " \"Name\": \"Certificate Revocation - CRL Failure\",\n" + + " \"ServerTimestamp\": \"" + dateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) + "\",\n" + + " \"Severity\": \"Info\",\n" + + " \"SourceIP\": \"[::1]\",\n" + + " \"Text1\": \"CN=traininglab-Root-CA, DC=traininglab, DC=local\",\n" + + " \"Text2\": \"ldap:///CN=traininglab-Root-CA(1),CN=server1,CN=CDP,CN=Public%20Key%20Services,CN=Services,CN=Configuration,DC=traininglab,DC=local?certificateRevocationList?base?objectClass=cRLDistributionPoint\",\n" + + " \"Value1\": 0,\n" + + " \"Value2\": 0\n" + + " }\n"; + } + String getStringOfTodayPlus(int seconds) { return getTodayPlus(seconds).format(DateTimeFormatter.ISO_OFFSET_DATE_TIME); } @@ -126,7 +146,6 @@ public void as_a_client_I_want_a_token_to_only_regenerate_while_away_token_expir } - @Test public void as_a_task_I_want_a_valid_context() { SourceTaskContext mockSourceTaskContext = given_a_mock_source_context_with(getTodayPlus(2), 1L); @@ -184,7 +203,6 @@ public void as_a_client_I_want_some_logs_and_handle_token_expiry() { then_the_logs_are_of_size(logs, 0); } - @Test public void as_a_client_I_want_to_paginate_logs() { @@ -226,7 +244,6 @@ public void as_a_connector_I_want_to_pass_an_object_as_a_struct() { System.out.println(record); } - private List when_the_task_is_polled(TppLogSourceTask task) { return task.poll(); } @@ -445,24 +462,4 @@ private void given_the_mock_will_respond_to_log_for_empty_offsetsStorage() { )); } - static String createLogEventBody(ZonedDateTime dateTime) { - return " {\n" + - " \"ClientTimestamp\": \"" + dateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) + "\",\n" + - " \"Component\": \"\\\\VED\\\\Policy\\\\certificates\\\\_Discovered\\\\TrustNet\\\\defaultwebsite.lab.venafi.com - 83\",\n" + - " \"ComponentId\": 123185,\n" + - " \"ComponentSubsystem\": \"Config\",\n" + - " \"Data\": null,\n" + - " \"Grouping\": 0,\n" + - " \"Id\": 1835016,\n" + - " \"Name\": \"Certificate Revocation - CRL Failure\",\n" + - " \"ServerTimestamp\": \"" + dateTime.format(DateTimeFormatter.ISO_OFFSET_DATE_TIME) + "\",\n" + - " \"Severity\": \"Info\",\n" + - " \"SourceIP\": \"[::1]\",\n" + - " \"Text1\": \"CN=traininglab-Root-CA, DC=traininglab, DC=local\",\n" + - " \"Text2\": \"ldap:///CN=traininglab-Root-CA(1),CN=server1,CN=CDP,CN=Public%20Key%20Services,CN=Services,CN=Configuration,DC=traininglab,DC=local?certificateRevocationList?base?objectClass=cRLDistributionPoint\",\n" + - " \"Value1\": 0,\n" + - " \"Value2\": 0\n" + - " }\n"; - } - } \ No newline at end of file From 0989955c4680aa580336307ad06783b5d4af035d Mon Sep 17 00:00:00 2001 From: rufusfnash <46454852+rufusfnash@users.noreply.github.com> Date: Thu, 24 Jan 2019 15:02:07 +0000 Subject: [PATCH 05/10] Update TppLogSourceConnector.java --- .../opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java index 69c6f98..92c9f0b 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnector.java @@ -43,7 +43,7 @@ public Class taskClass() { @Override public List> taskConfigs(int maxTasks) { if (maxTasks != 1) { - log.info("Ignoring maxTasks as there is can only be one."); + log.info("Ignoring maxTasks as there can only be one."); } List> configs = new ArrayList<>(maxTasks); Map taskConfig = new HashMap<>(); From 8b283789c3b2afaefac8f9ec6fa79e744ca0ecc5 Mon Sep 17 00:00:00 2001 From: chbalzer <33552278+chbalzer@users.noreply.github.com> Date: Thu, 24 Jan 2019 15:05:09 +0000 Subject: [PATCH 06/10] Update src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java Co-Authored-By: rufusfnash <46454852+rufusfnash@users.noreply.github.com> --- .../connect/venafi/tpp/log/TppLogSourceConfigTest.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java index 537d29b..ad01fae 100644 --- a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java @@ -20,7 +20,7 @@ class TppLogSourceConfigTest { @Test void as_a_Config_I_should_be_able_to_generate_a_guide() { assertNotNull(CONFIG_DEF.toEnrichedRst()); - assertNotEquals("", CONFIG_DEF.toEnrichedRst()); + assertNotEquals("", CONFIG_DEF.toEnrichedRst().trim()); } @Test @@ -151,4 +151,4 @@ private void then_I_expect_no_config_exception_when_run_this(Executable executin private Executable given_an_config_with(Map props) { return () -> new TppLogSourceConfig(props); } -} \ No newline at end of file +} From 02b8db01ca69d6cb8270df41f47dcea1166f7c0f Mon Sep 17 00:00:00 2001 From: chbalzer <33552278+chbalzer@users.noreply.github.com> Date: Thu, 24 Jan 2019 15:07:38 +0000 Subject: [PATCH 07/10] Update src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java Co-Authored-By: rufusfnash <46454852+rufusfnash@users.noreply.github.com> --- .../connect/venafi/tpp/log/TppLogSourceConfigTest.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java index ad01fae..6ccf7f6 100644 --- a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java @@ -31,7 +31,7 @@ void as_a_config_I_should_throw_a_config_exception_if_nothing_is_set() { } @Test - void as_a_config_I_should_throw_a_config_exception_if_minimum_properties_are_set() { + void as_a_config_I_should_not_throw_a_config_exception_if_minimum_properties_are_set() { Map props = given_a_minimum_list_of_properties(); Executable executingConfig = given_an_config_with(props); then_I_expect_no_config_exception_when_run_this(executingConfig); From e9226b697eaf25ba0db0c3474d776c7038a1588b Mon Sep 17 00:00:00 2001 From: Rufus Nash Date: Thu, 24 Jan 2019 15:13:54 +0000 Subject: [PATCH 08/10] tweaks per PR --- .../tpp/log/TppLogSourceConfigTest.java | 22 +++++++++---------- .../tpp/log/TppLogSourceConnectorTest.java | 8 ++++--- 2 files changed, 16 insertions(+), 14 deletions(-) diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java index 537d29b..0bdcf42 100644 --- a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConfigTest.java @@ -26,14 +26,14 @@ void as_a_Config_I_should_be_able_to_generate_a_guide() { @Test void as_a_config_I_should_throw_a_config_exception_if_nothing_is_set() { Map props = Collections.emptyMap(); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); then_I_expect_a_config_exception_when_I_run_this(executingConfig); } @Test - void as_a_config_I_should_throw_a_config_exception_if_minimum_properties_are_set() { + void as_a_config_I_should_not_throw_a_config_exception_if_minimum_properties_are_set() { Map props = given_a_minimum_list_of_properties(); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); then_I_expect_no_config_exception_when_run_this(executingConfig); } @@ -49,7 +49,7 @@ private Map given_a_minimum_list_of_properties() { void as_a_config_I_should_throw_a_config_exception_if_base_URL_is_not_set() { Map props = given_a_minimum_list_of_properties(); given_this_is_removed_from(BASE_URL_CONFIG, props); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); assertEquals(noDefaultValueFor(BASE_URL_CONFIG), exception.getMessage()); } @@ -58,7 +58,7 @@ void as_a_config_I_should_throw_a_config_exception_if_base_URL_is_not_set() { void as_a_config_I_should_throw_a_config_exception_if_password_is_not_set() { Map props = given_a_minimum_list_of_properties(); given_this_is_removed_from(PASSWORD_CONFIG, props); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); assertEquals(noDefaultValueFor(PASSWORD_CONFIG), exception.getMessage()); } @@ -67,7 +67,7 @@ void as_a_config_I_should_throw_a_config_exception_if_password_is_not_set() { void as_a_config_I_should_throw_a_config_exception_if_username_is_not_set() { Map props = given_a_minimum_list_of_properties(); given_this_is_removed_from(USERNAME_CONFIG, props); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); assertEquals(noDefaultValueFor(USERNAME_CONFIG), exception.getMessage()); } @@ -77,7 +77,7 @@ void as_a_config_I_should_throw_a_config_exception_if_username_is_not_set() { void as_a_config_I_should_throw_a_config_exception_if_base_URL_is_invalid() { Map props = given_a_minimum_list_of_properties(); given_this_is_overriden_with(BASE_URL_CONFIG, EMPTY_STRING, props); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); assertEquals(invalidConfigForNonEmptyString(BASE_URL_CONFIG, EMPTY_STRING), exception.getMessage()); } @@ -86,7 +86,7 @@ void as_a_config_I_should_throw_a_config_exception_if_base_URL_is_invalid() { void as_a_config_I_should_throw_a_config_exception_if_topic_is_invalid() { Map props = given_a_minimum_list_of_properties(); given_this_is_overriden_with(TOPIC_CONFIG, EMPTY_STRING, props); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); assertEquals(invalidConfigForNonEmptyString(TOPIC_CONFIG, EMPTY_STRING), exception.getMessage()); } @@ -95,7 +95,7 @@ void as_a_config_I_should_throw_a_config_exception_if_topic_is_invalid() { void as_a_config_I_should_throw_a_config_exception_if_batch_size_is_too_small() { Map props = given_a_minimum_list_of_properties(); given_this_is_overriden_with(BATCH_SIZE, TOO_SMALL_BATCH_SIZE, props); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); assertEquals(invalidConfigForTooSmallValue(BATCH_SIZE, TOO_SMALL_BATCH_SIZE, MIN_BATCH_SIZE), exception.getMessage()); } @@ -104,7 +104,7 @@ void as_a_config_I_should_throw_a_config_exception_if_batch_size_is_too_small() void as_a_config_I_should_throw_a_config_exception_if_batch_size_is_too_big() { Map props = given_a_minimum_list_of_properties(); given_this_is_overriden_with(BATCH_SIZE, TOO_LARGE_BATCH_SIZE, props); - Executable executingConfig = given_an_config_with(props); + Executable executingConfig = given_a_config_with(props); ConfigException exception = then_I_expect_a_config_exception_when_I_run_this(executingConfig); assertEquals(invalidConfigForTooLargeValue(BATCH_SIZE, TOO_LARGE_BATCH_SIZE, MAX_BATCH_SIZE), exception.getMessage()); } @@ -148,7 +148,7 @@ private void then_I_expect_no_config_exception_when_run_this(Executable executin } - private Executable given_an_config_with(Map props) { + private Executable given_a_config_with(Map props) { return () -> new TppLogSourceConfig(props); } } \ No newline at end of file diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnectorTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnectorTest.java index f7f21f2..26f19e4 100644 --- a/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnectorTest.java +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/TppLogSourceConnectorTest.java @@ -25,6 +25,8 @@ class TppLogSourceConnectorTest { public static final int ONE_MAX_TASK = 1; public static final int FIRST_VALUE_IN_LIST = 0; + public static final int EXPECTED_NUMBER_OF_LOG_ENTRIES_RETURNED_BY_MOCK = 2; + public static final int CALLED_ONCE = 1; private WireMockServer wireMockServer = new WireMockServer( new WireMockConfiguration().dynamicPort() .extensions(new ResponseTemplateTransformer(false)) @@ -58,9 +60,9 @@ void as_a_connector_I_should_be_able_to_start_up_with_some_properties() throws I when_the_task_is_started(sourceTask, taskProperties); List records = then_the_task_can_be_polled(sourceTask); - assertEquals(2, records.size()); - wireMockServer.verify(1, postRequestedFor(urlPathMatching(AUTHORIZE_API_REGEX_PATH))); - wireMockServer.verify(1, getRequestedFor(urlPathMatching(LOG_API_REGEX_PATH))); + assertEquals(EXPECTED_NUMBER_OF_LOG_ENTRIES_RETURNED_BY_MOCK, records.size()); + wireMockServer.verify(CALLED_ONCE, postRequestedFor(urlPathMatching(AUTHORIZE_API_REGEX_PATH))); + wireMockServer.verify(CALLED_ONCE, getRequestedFor(urlPathMatching(LOG_API_REGEX_PATH))); } private List then_the_task_can_be_polled(SourceTask sourceTask) throws InterruptedException { From a6dcd9e71f7eb7b07aed5ec3fd52a88004fd58cd Mon Sep 17 00:00:00 2001 From: rufusfnash <46454852+rufusfnash@users.noreply.github.com> Date: Thu, 24 Jan 2019 15:21:05 +0000 Subject: [PATCH 09/10] Update EventLogSourceTaskTest.java Removing old comment --- .../connect/venafi/tpp/log/EventLogSourceTaskTest.java | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java b/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java index 66cef80..14826a4 100644 --- a/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java +++ b/src/test/java/com/opencredo/connect/venafi/tpp/log/EventLogSourceTaskTest.java @@ -195,7 +195,6 @@ public void as_a_client_I_want_some_logs() { public void as_a_client_I_want_some_logs_and_handle_token_expiry() { given_the_mock_will_respond_to_auth(); -// given_the_mock_will_respond_to_log(); given_the_mock_will_respond_to_log_as_expired_token(); TppLogSourceTask task = given_a_task_is_setup(); @@ -462,4 +461,4 @@ private void given_the_mock_will_respond_to_log_for_empty_offsetsStorage() { )); } -} \ No newline at end of file +} From e525a88b9f2ceeddc09a7412083aaa8dac143d3c Mon Sep 17 00:00:00 2001 From: Rufus Nash Date: Thu, 24 Jan 2019 15:34:55 +0000 Subject: [PATCH 10/10] Minor clean up tweaks. --- .../log/Deserializer/ZonedDateTimeDeserializer.java | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/src/main/java/com/opencredo/connect/venafi/tpp/log/Deserializer/ZonedDateTimeDeserializer.java b/src/main/java/com/opencredo/connect/venafi/tpp/log/Deserializer/ZonedDateTimeDeserializer.java index 806bb34..1663343 100644 --- a/src/main/java/com/opencredo/connect/venafi/tpp/log/Deserializer/ZonedDateTimeDeserializer.java +++ b/src/main/java/com/opencredo/connect/venafi/tpp/log/Deserializer/ZonedDateTimeDeserializer.java @@ -4,6 +4,7 @@ import com.google.gson.JsonDeserializer; import com.google.gson.JsonElement; import com.google.gson.JsonParseException; +import org.apache.kafka.connect.errors.ConnectException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -30,15 +31,8 @@ private static ZonedDateTime getParsedDate(String dateTimeString) { try { return ZonedDateTime.parse(dateTimeString); } catch (DateTimeParseException e) { - //swallow exception for now log.debug("Failed to parse to ZonedDateTime format", e); - } - - try { - return ZonedDateTime.parse(dateTimeString + "Z"); - } catch (DateTimeParseException up) { - log.debug("Failed to parse to ZonedDateTime format with added Z.", up); - throw up; + throw new ConnectException(e); } }