diff --git a/.gitignore b/.gitignore
index c8f5680..85098c7 100644
--- a/.gitignore
+++ b/.gitignore
@@ -37,3 +37,6 @@ mvnwdddd
mvnw.cmd
ddddmvn/
+*.txt
+src/main/resources/application-local.yml
+
diff --git a/pom.xml b/pom.xml
index c14cc05..f2b3067 100644
--- a/pom.xml
+++ b/pom.xml
@@ -96,6 +96,13 @@
5.3.1
+
+ software.amazon.awssdk
+ s3
+ 2.26.8
+
+
+
diff --git a/src/main/java/com/licious/cflogprocessor/CloudfrontLogProcessorApplication.java b/src/main/java/com/licious/cflogprocessor/CloudfrontLogProcessorApplication.java
index a5aa90d..4f4d883 100644
--- a/src/main/java/com/licious/cflogprocessor/CloudfrontLogProcessorApplication.java
+++ b/src/main/java/com/licious/cflogprocessor/CloudfrontLogProcessorApplication.java
@@ -2,8 +2,10 @@
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
+import org.springframework.scheduling.annotation.EnableScheduling;
@SpringBootApplication
+@EnableScheduling
public class CloudfrontLogProcessorApplication {
public static void main(String[] args) {
diff --git a/src/main/java/com/licious/cflogprocessor/config/ClickhouseConfig.java b/src/main/java/com/licious/cflogprocessor/config/ClickhouseConfig.java
index 7e8bc6e..73feeff 100644
--- a/src/main/java/com/licious/cflogprocessor/config/ClickhouseConfig.java
+++ b/src/main/java/com/licious/cflogprocessor/config/ClickhouseConfig.java
@@ -7,8 +7,7 @@
import org.springframework.context.annotation.Configuration;
@Configuration
-@ConditionalOnProperty(name = "writer.destination", havingValue = "CLICKHOUSE")
-//@ConfigurationProperties(prefix = "clickhouse.datasource")
+@ConditionalOnProperty(name = "writer.destination.clickhouse", havingValue = "true")
public class ClickhouseConfig {
@Value("${clickhouse.datasource.user}")
diff --git a/src/main/java/com/licious/cflogprocessor/config/ElasticsearchConfig.java b/src/main/java/com/licious/cflogprocessor/config/ElasticsearchConfig.java
index 3faa393..32205e7 100644
--- a/src/main/java/com/licious/cflogprocessor/config/ElasticsearchConfig.java
+++ b/src/main/java/com/licious/cflogprocessor/config/ElasticsearchConfig.java
@@ -7,7 +7,7 @@
import org.springframework.context.annotation.Configuration;
@Configuration
-@ConditionalOnProperty(name = "writer.destination", havingValue = "ELASTICSEARCH")
+@ConditionalOnProperty(name = "writer.destination.elasticsearch", havingValue = "true")
public class ElasticsearchConfig {
@Value("${elasticsearch.datasource.host}")
diff --git a/src/main/java/com/licious/cflogprocessor/config/S3WriterConfig.java b/src/main/java/com/licious/cflogprocessor/config/S3WriterConfig.java
new file mode 100644
index 0000000..890af4f
--- /dev/null
+++ b/src/main/java/com/licious/cflogprocessor/config/S3WriterConfig.java
@@ -0,0 +1,23 @@
+package com.licious.cflogprocessor.config;
+
+import com.licious.cflogprocessor.datasource.S3Writer;
+import org.springframework.beans.factory.annotation.Value;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(name = "writer.destination.s3", havingValue = "true")
+public class S3WriterConfig {
+
+ @Value("${s3.datasource.bucket.name}")
+ private String bucketName;
+
+ @Value("${s3.datasource.region}")
+ private String region;
+
+ @Bean
+ public S3Writer s3Writer() {
+ return new S3Writer(bucketName, region);
+ }
+}
diff --git a/src/main/java/com/licious/cflogprocessor/config/StdoutConfig.java b/src/main/java/com/licious/cflogprocessor/config/StdoutConfig.java
new file mode 100644
index 0000000..e83a8e4
--- /dev/null
+++ b/src/main/java/com/licious/cflogprocessor/config/StdoutConfig.java
@@ -0,0 +1,15 @@
+package com.licious.cflogprocessor.config;
+
+import com.licious.cflogprocessor.datasource.StdoutWriter;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+@Configuration
+@ConditionalOnProperty(name = "writer.destination.stdout", havingValue = "true")
+public class StdoutConfig {
+ @Bean
+ public StdoutWriter stdoutWriter() {
+ return new StdoutWriter();
+ }
+}
diff --git a/src/main/java/com/licious/cflogprocessor/config/WriterProperties.java b/src/main/java/com/licious/cflogprocessor/config/WriterProperties.java
deleted file mode 100644
index 4066fa2..0000000
--- a/src/main/java/com/licious/cflogprocessor/config/WriterProperties.java
+++ /dev/null
@@ -1,26 +0,0 @@
-package com.licious.cflogprocessor.config;
-
-import lombok.Getter;
-import org.springframework.beans.factory.annotation.Value;
-import org.springframework.context.annotation.Configuration;
-import org.springframework.context.annotation.PropertySource;
-
-@Configuration
-@PropertySource("classpath:application.yml")
-@Getter
-public class WriterProperties {
-
- @Value("${writer.destination}")
- private WriterDestination logDestination;
-
- public enum WriterDestination {
- CLICKHOUSE,
- ELASTICSEARCH,
- STDOUT
- }
-
-
-
-
-
-}
diff --git a/src/main/java/com/licious/cflogprocessor/datasource/ClickhouseWriter.java b/src/main/java/com/licious/cflogprocessor/datasource/ClickhouseWriter.java
index e906005..6205e3f 100644
--- a/src/main/java/com/licious/cflogprocessor/datasource/ClickhouseWriter.java
+++ b/src/main/java/com/licious/cflogprocessor/datasource/ClickhouseWriter.java
@@ -1,5 +1,6 @@
package com.licious.cflogprocessor.datasource;
+import com.licious.cflogprocessor.service.SimpleRecordProcessor;
import org.springframework.core.env.Environment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
@@ -13,17 +14,13 @@
import java.util.Date;
import java.util.List;
-public class ClickhouseWriter {
+public class ClickhouseWriter implements Writer {
private final String clickHouseUrl;
private final String clickHouseUsername;
private final String clickHousePassword;
-
- //@Autowired
- private Environment env;
- private static final Logger logger = LoggerFactory.getLogger(ClickhouseWriter.class);
- private static final int BATCH_SIZE = 999999;
- private static final ObjectMapper objectMapper = new ObjectMapper();
+ private static final int BATCH_SIZE = 50000;
+ private final ObjectMapper objectMapper = new ObjectMapper();
private final List buffer = new ArrayList<>();
@@ -33,19 +30,20 @@ public ClickhouseWriter(String user, String password, String url) {
this.clickHousePassword = password;
this.clickHouseUrl = url;
}
- public synchronized void write(CloudfrontLogEntry logEntry){
+
+ @Override
+ public synchronized void write(CloudfrontLogEntry logEntry) throws Exception {
try {
buffer.add(logEntry);
if (buffer.size() >= BATCH_SIZE) {
writeBatchData();
}
- }catch (Exception e){
- logger.error("Error occurred while writing log entry: {}", e.getMessage());
+ } catch (Exception e) {
+ throw new Exception("Error occurred while writing log entry");
}
}
- public void writeBatchData() {
- logger.info("Inside writeBatchData");
- logger.info("Clickhouse Details:"+clickHouseUrl,clickHouseUsername,clickHousePassword);
+
+ private void writeBatchData() {
try (Connection conn = DriverManager.getConnection(clickHouseUrl, clickHouseUsername, clickHousePassword)) {
conn.setAutoCommit(false); // Disable auto-commit for batching
@@ -101,7 +99,7 @@ public String serialize(CloudfrontLogEntry logEntry) throws Exception {
return objectMapper.writeValueAsString(logEntry);
}
- private Long convertEpochSeconds(String epochString) throws Exception{
+ private Long convertEpochSeconds(String epochString) throws Exception {
try {
double epochSeconds = Double.parseDouble(epochString);
long milliseconds = (long) (epochSeconds * 1000);
diff --git a/src/main/java/com/licious/cflogprocessor/datasource/CompositeWriter.java b/src/main/java/com/licious/cflogprocessor/datasource/CompositeWriter.java
new file mode 100644
index 0000000..ac982ba
--- /dev/null
+++ b/src/main/java/com/licious/cflogprocessor/datasource/CompositeWriter.java
@@ -0,0 +1,31 @@
+package com.licious.cflogprocessor.datasource;
+
+import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
+import lombok.Getter;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Lazy;
+import org.springframework.stereotype.Component;
+import org.springframework.stereotype.Service;
+
+import java.util.ArrayList;
+import java.util.List;
+
+@Component
+@Getter
+public class CompositeWriter implements Writer {
+ private List writers;
+
+ @Autowired
+ public CompositeWriter(List writers) {
+ this.writers = writers;
+ }
+
+ @Override
+ public void write(CloudfrontLogEntry logEntry) throws Exception {
+ for (Writer writer : writers) {
+ writer.write(logEntry);
+ }
+ }
+
+}
diff --git a/src/main/java/com/licious/cflogprocessor/datasource/ElasticsearchWriter.java b/src/main/java/com/licious/cflogprocessor/datasource/ElasticsearchWriter.java
index 9cf2088..31c4768 100644
--- a/src/main/java/com/licious/cflogprocessor/datasource/ElasticsearchWriter.java
+++ b/src/main/java/com/licious/cflogprocessor/datasource/ElasticsearchWriter.java
@@ -14,7 +14,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-public class ElasticsearchWriter {
+public class ElasticsearchWriter implements Writer {
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchWriter.class);
@@ -23,11 +23,12 @@ public class ElasticsearchWriter {
private final String scheme;
public ElasticsearchWriter(String host, int port, String scheme) {
- this.host = host;
- this.port = port;
- this.scheme = scheme;
+ this.host = host;
+ this.port = port;
+ this.scheme = scheme;
}
+ @Override
public void write(CloudfrontLogEntry logEntry) throws Exception {
// Serialize LogEntry to JSON
@@ -36,7 +37,7 @@ public void write(CloudfrontLogEntry logEntry) throws Exception {
// Setup Elasticsearch client - http://10.1.3.216:9200
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, scheme)))) {
-
+
IndexRequest request = new IndexRequest("cloudfrontlogs");
request.source(json, XContentType.JSON);
diff --git a/src/main/java/com/licious/cflogprocessor/datasource/S3Writer.java b/src/main/java/com/licious/cflogprocessor/datasource/S3Writer.java
new file mode 100644
index 0000000..2f5060f
--- /dev/null
+++ b/src/main/java/com/licious/cflogprocessor/datasource/S3Writer.java
@@ -0,0 +1,73 @@
+package com.licious.cflogprocessor.datasource;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
+import org.springframework.scheduling.annotation.Scheduled;
+import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
+import software.amazon.awssdk.regions.Region;
+import software.amazon.awssdk.services.s3.S3Client;
+import software.amazon.awssdk.services.s3.model.PutObjectRequest;
+import software.amazon.awssdk.services.s3.model.S3Exception;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CopyOnWriteArrayList;
+
+public class S3Writer implements Writer {
+
+ private final S3Client s3Client;
+ private final String bucketName;
+ private final ObjectMapper objectMapper;
+ private final List buffer;
+
+ public S3Writer(String bucketName, String region) {
+ this.s3Client = S3Client.builder()
+ .region(Region.of(region))
+ .credentialsProvider(ProfileCredentialsProvider.create())
+ .build();
+ this.bucketName = bucketName;
+ this.buffer = new CopyOnWriteArrayList<>();
+ this.objectMapper = new ObjectMapper();
+ }
+
+ @Override
+ public void write(CloudfrontLogEntry cloudfrontLogEntry) throws Exception {
+ buffer.add(cloudfrontLogEntry);
+ }
+
+ @Scheduled(fixedRate = 60000)
+ public void flush() {
+ if (buffer.isEmpty()) {
+ return;
+ }
+
+ try {
+ List entriesToWrite = new ArrayList<>(buffer);
+ buffer.clear();
+
+ StringBuilder sb = new StringBuilder();
+ for (CloudfrontLogEntry entry : entriesToWrite) {
+ sb.append(objectMapper.writeValueAsString(entry)).append("\n");
+ }
+
+ String logEntries = sb.toString();
+ InputStream inputStream = new ByteArrayInputStream(logEntries.getBytes(StandardCharsets.UTF_8));
+ String objectKey = "logs/" + Instant.now().toString() + ".log"; // Generate a unique key for S3
+
+ PutObjectRequest putObjectRequest = PutObjectRequest.builder()
+ .bucket(bucketName)
+ .key(objectKey)
+ .build();
+
+ s3Client.putObject(putObjectRequest, software.amazon.awssdk.core.sync.RequestBody.fromInputStream(inputStream, logEntries.length()));
+ } catch (S3Exception | IOException e) {
+ e.printStackTrace();
+ }
+ }
+}
diff --git a/src/main/java/com/licious/cflogprocessor/datasource/StdoutWriter.java b/src/main/java/com/licious/cflogprocessor/datasource/StdoutWriter.java
new file mode 100644
index 0000000..2bca2e8
--- /dev/null
+++ b/src/main/java/com/licious/cflogprocessor/datasource/StdoutWriter.java
@@ -0,0 +1,15 @@
+package com.licious.cflogprocessor.datasource;
+
+import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
+
+public class StdoutWriter implements Writer {
+
+ public StdoutWriter() {
+
+ }
+
+ @Override
+ public void write(CloudfrontLogEntry logEntry) throws Exception {
+ System.out.println(logEntry);
+ }
+}
diff --git a/src/main/java/com/licious/cflogprocessor/datasource/Writer.java b/src/main/java/com/licious/cflogprocessor/datasource/Writer.java
new file mode 100644
index 0000000..aa4ee29
--- /dev/null
+++ b/src/main/java/com/licious/cflogprocessor/datasource/Writer.java
@@ -0,0 +1,7 @@
+package com.licious.cflogprocessor.datasource;
+
+import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
+
+public interface Writer {
+ void write(CloudfrontLogEntry logEntry) throws Exception;
+}
diff --git a/src/main/java/com/licious/cflogprocessor/service/KinesisConsumerService.java b/src/main/java/com/licious/cflogprocessor/service/KinesisConsumerService.java
index 8a52c02..cb4de1d 100644
--- a/src/main/java/com/licious/cflogprocessor/service/KinesisConsumerService.java
+++ b/src/main/java/com/licious/cflogprocessor/service/KinesisConsumerService.java
@@ -28,7 +28,6 @@ public class KinesisConsumerService {
@Value("${kinesis.applicationName}")
private String applicationName;
-
@Autowired
private SimpleRecordProcessor recordProcessorFactory;
@@ -41,21 +40,21 @@ public void init() {
new DefaultAWSCredentialsProviderChain(),
java.util.UUID.randomUUID().toString()
)
- .withRegionName(awsRegion)
- .withInitialPositionInStream(InitialPositionInStream.LATEST);
+ .withRegionName(awsRegion)
+ .withInitialPositionInStream(InitialPositionInStream.LATEST);
AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain());
clientBuilder.setRegion(awsRegion);
+
+ logger.debug("Worker Starting:");
Worker worker = new Worker.Builder()
.recordProcessorFactory(() -> recordProcessorFactory)
- // Specify other configurations such as stream name, application name, etc.
.config(kinesisClientLibConfiguration)
.kinesisClient(clientBuilder.build())
.build();
- // Start the worker in a separate thread
Thread workerThread = new Thread(worker);
workerThread.start();
}
diff --git a/src/main/java/com/licious/cflogprocessor/service/SimpleRecordProcessor.java b/src/main/java/com/licious/cflogprocessor/service/SimpleRecordProcessor.java
index c17c3ab..1127769 100644
--- a/src/main/java/com/licious/cflogprocessor/service/SimpleRecordProcessor.java
+++ b/src/main/java/com/licious/cflogprocessor/service/SimpleRecordProcessor.java
@@ -5,32 +5,28 @@
import com.amazonaws.services.kinesis.clientlibrary.types.ProcessRecordsInput;
import com.amazonaws.services.kinesis.clientlibrary.types.ShutdownInput;
import com.amazonaws.services.kinesis.model.Record;
-import com.licious.cflogprocessor.config.WriterProperties;
-import com.licious.cflogprocessor.datasource.ClickhouseWriter;
+import com.licious.cflogprocessor.datasource.CompositeWriter;
import com.licious.cflogprocessor.datasource.ElasticsearchWriter;
+import com.licious.cflogprocessor.datasource.Writer;
import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
import com.licious.cflogprocessor.formatter.CloudfrontLogEntrySerializer;
+import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.lang.Nullable;
import org.springframework.stereotype.Service;
+import javax.script.Compilable;
import java.nio.charset.StandardCharsets;
+import java.util.ArrayList;
import java.util.List;
@Service
public class SimpleRecordProcessor implements IRecordProcessor {
- private final WriterProperties writerProperties;
- private final ElasticsearchWriter elasticsearchWriter;
- private final ClickhouseWriter clickhouseWriter;
+ @Autowired
+ private final CompositeWriter compositeWriter;
-
- public SimpleRecordProcessor(
- WriterProperties writerProperties,
- @Nullable ClickhouseWriter clickhouseWriter,
- @Nullable ElasticsearchWriter elasticsearchWriter) {
- this.writerProperties = writerProperties;
- this.clickhouseWriter = clickhouseWriter;
- this.elasticsearchWriter = elasticsearchWriter;
+ public SimpleRecordProcessor(CompositeWriter compositeWriter) {
+ this.compositeWriter = compositeWriter;
}
@Override
@@ -41,39 +37,22 @@ public void initialize(InitializationInput initializationInput) {
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
List records = processRecordsInput.getRecords();
+
for (Record record : records) {
- // Assuming the data in the record is UTF-8 encoded
String data = new String(record.getData().array(), StandardCharsets.UTF_8);
-
CloudfrontLogEntry logEntry = CloudfrontLogEntrySerializer.parseRecord(data);
-
try {
-
- if (writerProperties.getLogDestination() == WriterProperties.WriterDestination.CLICKHOUSE) {
- if (clickhouseWriter != null) {
- clickhouseWriter.write(logEntry);
- }
- } else if (writerProperties.getLogDestination() == WriterProperties.WriterDestination.ELASTICSEARCH) {
- if (elasticsearchWriter != null) {
- elasticsearchWriter.write(logEntry);
- }
- } else {
- // Handle STDOUT or other destinations
- System.out.println("Writing to logs");
- System.out.println(logEntry);
- }
+ compositeWriter.write(logEntry);
} catch (Exception e) {
e.printStackTrace();
}
-
}
-
- // Consider implementing checkpoint logic here
}
@Override
public void shutdown(ShutdownInput shutdownInput) {
// Shutdown logic if needed
}
+
}
diff --git a/src/main/resources/application.yml b/src/main/resources/application.yml
index 1b0fa61..6a72220 100644
--- a/src/main/resources/application.yml
+++ b/src/main/resources/application.yml
@@ -1,5 +1,10 @@
spring.application.name: log-processor
-logging.level.root: INFO
+
+logging:
+ level:
+ org:
+ springframework: DEBUG
+ springframework.beans.factory: TRACE
aws.region: ${AWS_REGION}
@@ -9,10 +14,14 @@ kinesis:
# Writer to enable.
-# Options: CLICKHOUSE, ELASTICSEARCH, STDOUT
+# Options: CLICKHOUSE, ELASTICSEARCH, STDOUT, S3
#
writer:
- destination: ${WRITER_DESTINATION_DATASOURCE:STDOUT}
+ destination:
+ clickhouse: ${WRITER_DESTINATION_CLICKHOUSE:false}
+ stdout: ${WRITER_DESTINATION_STDOUT:true}
+ file: ${WRITER_DESTINATION_FILE:false}
+ s3: ${WRITER_DESTINATION_S3:false}
# ClickHouse specific configurations when Clickhouse writer is enabled
clickhouse:
@@ -30,3 +39,10 @@ elasticsearch:
#user: ${WRITER_DATASOURCE_ELASTICSEARCH_USER:admin}
#password: ${WRITER_DATASOURCE_ELASTICSEARCH_PASSWORD:default}
+# S3 specific configurations when S3 writer is enabled.
+s3:
+ datasource:
+ bucket:
+ name: ${WRITER_DATASOURCE_S3_BUCKETNAME}
+ region: ${WRITER_DATASOURCE_S3_REGION:ap-south-1}
+