Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

added support for multiple writers #4

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,3 +37,6 @@ mvnwdddd
mvnw.cmd
ddddmvn/

*.txt
src/main/resources/application-local.yml

7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,13 @@
<version>5.3.1</version>
</dependency>

<dependency>
<groupId>software.amazon.awssdk</groupId>
<artifactId>s3</artifactId>
<version>2.26.8</version>
</dependency>



</dependencies>

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,10 @@

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.scheduling.annotation.EnableScheduling;

@SpringBootApplication
@EnableScheduling
public class CloudfrontLogProcessorApplication {

public static void main(String[] args) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,7 @@
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "writer.destination", havingValue = "CLICKHOUSE")
//@ConfigurationProperties(prefix = "clickhouse.datasource")
@ConditionalOnProperty(name = "writer.destination.clickhouse", havingValue = "true")
public class ClickhouseConfig {

@Value("${clickhouse.datasource.user}")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "writer.destination", havingValue = "ELASTICSEARCH")
@ConditionalOnProperty(name = "writer.destination.elasticsearch", havingValue = "true")
public class ElasticsearchConfig {

@Value("${elasticsearch.datasource.host}")
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
package com.licious.cflogprocessor.config;

import com.licious.cflogprocessor.datasource.S3Writer;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "writer.destination.s3", havingValue = "true")
public class S3WriterConfig {

@Value("${s3.datasource.bucket.name}")
private String bucketName;

@Value("${s3.datasource.region}")
private String region;

@Bean
public S3Writer s3Writer() {
return new S3Writer(bucketName, region);
}
}
15 changes: 15 additions & 0 deletions src/main/java/com/licious/cflogprocessor/config/StdoutConfig.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.licious.cflogprocessor.config;

import com.licious.cflogprocessor.datasource.StdoutWriter;
import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
@ConditionalOnProperty(name = "writer.destination.stdout", havingValue = "true")
public class StdoutConfig {
@Bean
public StdoutWriter stdoutWriter() {
return new StdoutWriter();
}
}

This file was deleted.

Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.licious.cflogprocessor.datasource;

import com.licious.cflogprocessor.service.SimpleRecordProcessor;
import org.springframework.core.env.Environment;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
Expand All @@ -13,17 +14,13 @@
import java.util.Date;
import java.util.List;

public class ClickhouseWriter {
public class ClickhouseWriter implements Writer {

private final String clickHouseUrl;
private final String clickHouseUsername;
private final String clickHousePassword;

//@Autowired
private Environment env;
private static final Logger logger = LoggerFactory.getLogger(ClickhouseWriter.class);
private static final int BATCH_SIZE = 999999;
private static final ObjectMapper objectMapper = new ObjectMapper();
private static final int BATCH_SIZE = 50000;
private final ObjectMapper objectMapper = new ObjectMapper();

private final List<CloudfrontLogEntry> buffer = new ArrayList<>();

Expand All @@ -33,19 +30,20 @@ public ClickhouseWriter(String user, String password, String url) {
this.clickHousePassword = password;
this.clickHouseUrl = url;
}
public synchronized void write(CloudfrontLogEntry logEntry){

@Override
public synchronized void write(CloudfrontLogEntry logEntry) throws Exception {
try {
buffer.add(logEntry);
if (buffer.size() >= BATCH_SIZE) {
writeBatchData();
}
}catch (Exception e){
logger.error("Error occurred while writing log entry: {}", e.getMessage());
} catch (Exception e) {
throw new Exception("Error occurred while writing log entry");
}
}
public void writeBatchData() {
logger.info("Inside writeBatchData");
logger.info("Clickhouse Details:"+clickHouseUrl,clickHouseUsername,clickHousePassword);

private void writeBatchData() {

try (Connection conn = DriverManager.getConnection(clickHouseUrl, clickHouseUsername, clickHousePassword)) {
conn.setAutoCommit(false); // Disable auto-commit for batching
Expand Down Expand Up @@ -101,7 +99,7 @@ public String serialize(CloudfrontLogEntry logEntry) throws Exception {
return objectMapper.writeValueAsString(logEntry);
}

private Long convertEpochSeconds(String epochString) throws Exception{
private Long convertEpochSeconds(String epochString) throws Exception {
try {
double epochSeconds = Double.parseDouble(epochString);
long milliseconds = (long) (epochSeconds * 1000);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package com.licious.cflogprocessor.datasource;

import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
import lombok.Getter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Lazy;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Service;

import java.util.ArrayList;
import java.util.List;

@Component
@Getter
public class CompositeWriter implements Writer {
private List<Writer> writers;

@Autowired
public CompositeWriter(List<Writer> writers) {
this.writers = writers;
}

@Override
public void write(CloudfrontLogEntry logEntry) throws Exception {
for (Writer writer : writers) {
writer.write(logEntry);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class ElasticsearchWriter {
public class ElasticsearchWriter implements Writer {

private static final ObjectMapper objectMapper = new ObjectMapper();
private static final Logger logger = LoggerFactory.getLogger(ElasticsearchWriter.class);
Expand All @@ -23,11 +23,12 @@ public class ElasticsearchWriter {
private final String scheme;

public ElasticsearchWriter(String host, int port, String scheme) {
this.host = host;
this.port = port;
this.scheme = scheme;
this.host = host;
this.port = port;
this.scheme = scheme;
}

@Override
public void write(CloudfrontLogEntry logEntry) throws Exception {

// Serialize LogEntry to JSON
Expand All @@ -36,7 +37,7 @@ public void write(CloudfrontLogEntry logEntry) throws Exception {
// Setup Elasticsearch client - http://10.1.3.216:9200
try (RestHighLevelClient client = new RestHighLevelClient(
RestClient.builder(new HttpHost(host, port, scheme)))) {

IndexRequest request = new IndexRequest("cloudfrontlogs");
request.source(json, XContentType.JSON);

Expand Down
73 changes: 73 additions & 0 deletions src/main/java/com/licious/cflogprocessor/datasource/S3Writer.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,73 @@
package com.licious.cflogprocessor.datasource;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;
import org.springframework.scheduling.annotation.Scheduled;
import software.amazon.awssdk.auth.credentials.ProfileCredentialsProvider;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.S3Exception;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.CopyOnWriteArrayList;

public class S3Writer implements Writer {

private final S3Client s3Client;
private final String bucketName;
private final ObjectMapper objectMapper;
private final List<CloudfrontLogEntry> buffer;

public S3Writer(String bucketName, String region) {
this.s3Client = S3Client.builder()
.region(Region.of(region))
.credentialsProvider(ProfileCredentialsProvider.create())
.build();
this.bucketName = bucketName;
this.buffer = new CopyOnWriteArrayList<>();
this.objectMapper = new ObjectMapper();
}

@Override
public void write(CloudfrontLogEntry cloudfrontLogEntry) throws Exception {
buffer.add(cloudfrontLogEntry);
}

@Scheduled(fixedRate = 60000)
public void flush() {
if (buffer.isEmpty()) {
return;
}

try {
List<CloudfrontLogEntry> entriesToWrite = new ArrayList<>(buffer);
buffer.clear();

StringBuilder sb = new StringBuilder();
for (CloudfrontLogEntry entry : entriesToWrite) {
sb.append(objectMapper.writeValueAsString(entry)).append("\n");
}

String logEntries = sb.toString();
InputStream inputStream = new ByteArrayInputStream(logEntries.getBytes(StandardCharsets.UTF_8));
String objectKey = "logs/" + Instant.now().toString() + ".log"; // Generate a unique key for S3

PutObjectRequest putObjectRequest = PutObjectRequest.builder()
.bucket(bucketName)
.key(objectKey)
.build();

s3Client.putObject(putObjectRequest, software.amazon.awssdk.core.sync.RequestBody.fromInputStream(inputStream, logEntries.length()));
} catch (S3Exception | IOException e) {
e.printStackTrace();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.licious.cflogprocessor.datasource;

import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;

public class StdoutWriter implements Writer {

public StdoutWriter() {

}

@Override
public void write(CloudfrontLogEntry logEntry) throws Exception {
System.out.println(logEntry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.licious.cflogprocessor.datasource;

import com.licious.cflogprocessor.formatter.CloudfrontLogEntry;

public interface Writer {
void write(CloudfrontLogEntry logEntry) throws Exception;
}
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ public class KinesisConsumerService {
@Value("${kinesis.applicationName}")
private String applicationName;


@Autowired
private SimpleRecordProcessor recordProcessorFactory;

Expand All @@ -41,21 +40,21 @@ public void init() {
new DefaultAWSCredentialsProviderChain(),
java.util.UUID.randomUUID().toString()
)
.withRegionName(awsRegion)
.withInitialPositionInStream(InitialPositionInStream.LATEST);
.withRegionName(awsRegion)
.withInitialPositionInStream(InitialPositionInStream.LATEST);

AmazonKinesisClientBuilder clientBuilder = AmazonKinesisClientBuilder.standard()
.withCredentials(new DefaultAWSCredentialsProviderChain());
clientBuilder.setRegion(awsRegion);


logger.debug("Worker Starting:");
Worker worker = new Worker.Builder()
.recordProcessorFactory(() -> recordProcessorFactory)
// Specify other configurations such as stream name, application name, etc.
.config(kinesisClientLibConfiguration)
.kinesisClient(clientBuilder.build())
.build();

// Start the worker in a separate thread
Thread workerThread = new Thread(worker);
workerThread.start();
}
Expand Down
Loading