diff --git a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java index d8e6aaf9..d561e18a 100644 --- a/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java +++ b/singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java @@ -39,7 +39,6 @@ import java.util.regex.Pattern; import software.amazon.awssdk.services.s3.S3Client; -import software.amazon.awssdk.services.s3.model.ObjectCannedACL; /** * A LogStreamWriter for Singer that writes to S3 (writer.type=s3). @@ -68,7 +67,6 @@ public class S3Writer implements LogStreamWriter { // Custom Thresholds private int maxFileSizeMB; private int minUploadTime; - private int maxRetries; private Pattern filenamePattern; private List fileNameTokens = new ArrayList<>(); private boolean filenameParsingEnabled = false; @@ -129,8 +127,6 @@ public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig, S3Uploader s private void initialize() { this.maxFileSizeMB = s3WriterConfig.getMaxFileSizeMB(); this.minUploadTime = s3WriterConfig.getMinUploadTimeInSeconds(); - this.maxRetries = s3WriterConfig.getMaxRetries(); - if (s3WriterConfig.isSetFilenamePattern() && s3WriterConfig.isSetFilenameTokens()) { this.filenameParsingEnabled = true; this.filenamePattern = Pattern.compile(s3WriterConfig.getFilenamePattern()); @@ -182,13 +178,16 @@ public boolean isCommittableWriter() { } /** - * Takes the fullPathPrefix and removes all slashes and replaces them with underscores. + * Get or construct buffer file name based on the log stream name. + * The buffer file naming convention is "log_name.dir_name.file_name.buffer". + * + * @return the buffer file name */ - public static String sanitizeFileName(String fullPathPrefix) { - if (fullPathPrefix.startsWith("/")) { - fullPathPrefix = fullPathPrefix.substring(1); - } - return fullPathPrefix.replace("/", "_"); + public String getBufferFileName() { + return (bufferFile != null) ? bufferFile.getName() + : logName + "." + logStream.getLogDir().substring(1) + .replace("/", "_") + "." + + logStream.getLogStreamName() + ".buffer"; } /** @@ -203,13 +202,13 @@ public static String sanitizeFileName(String fullPathPrefix) { public synchronized void startCommit(boolean isDraining) throws LogStreamWriterException { try { if (!bufferFile.exists()) { - bufferFile.createNewFile(); + resetBufferFile(); } bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true)); } catch (IOException e) { - throw new RuntimeException("Failed to create buffer file: " + bufferFile.getName(), e); + throw new RuntimeException("Failed to create buffer file: " + getBufferFileName(), e); } if (uploadFuture == null) { scheduleUploadTask(); @@ -250,7 +249,7 @@ private void scheduleUploadTask() { private void uploadDiskBufferedFileToS3() throws IOException { File fileToUpload = - new File(BUFFER_DIR, bufferFile.getName() + "." + FORMATTER.format(new Date())); + new File(BUFFER_DIR, getBufferFileName() + "." + FORMATTER.format(new Date())); String fileFormat = generateS3ObjectKey(); try { Files.move(bufferFile.toPath(), fileToUpload.toPath()); @@ -266,7 +265,7 @@ private void uploadDiskBufferedFileToS3() throws IOException { } fileToUpload.delete(); } catch (IOException e) { - LOG.error("Failed to rename buffer file " + bufferFile.getName(), e); + LOG.error("Failed to rename buffer file " + getBufferFileName(), e); } } @@ -318,24 +317,15 @@ private void writeMessageToBuffer(LogMessageAndPosition logMessageAndPosition) * @throws IOException */ private void resetBufferFile() throws IOException { - String - bufferFileName = - sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer." + UUID.randomUUID() - .toString().substring(0, 8); - bufferFile = new File(BUFFER_DIR, bufferFileName); - bufferFile.createNewFile(); + bufferFile = new File(BUFFER_DIR, getBufferFileName()); + if (!bufferFile.createNewFile()) { + LOG.info( + "Buffer file for log stream {} already exists, continue with existing buffer file: {}", + logName, getBufferFileName()); + } bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true)); } - /** - * Helper function to get the remaining part of the host name after the cluster prefix, - * typically a UUID. - */ - public static String extractHostSuffix(String inputStr) { - String[] parts = inputStr.split("-"); - return parts[parts.length - 1]; - } - private Matcher extractTokensFromFilename(String logFileName) { Matcher matcher = filenamePattern.matcher(logFileName); if (!matcher.matches()) { @@ -507,7 +497,7 @@ public void close() throws IOException { uploadDiskBufferedFileToS3(); bufferFile.delete(); } catch (IOException e) { - LOG.error("Failed to close bufferedWriter or upload buffer file: " + bufferFile.getName(), + LOG.error("Failed to close bufferedWriter or upload buffer file: " + getBufferFileName(), e); } } diff --git a/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java index 6c87ae99..17b76298 100644 --- a/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java +++ b/singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java @@ -63,8 +63,7 @@ public void setUp() { s3WriterConfig.setMaxRetries(3); // Initialize the S3Writer with mock dependencies - s3Writer = - new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); } @After @@ -75,62 +74,7 @@ public void tearDown() throws IOException { } // reset hostname SingerUtils.setHostname(SingerUtils.getHostname(), "-"); - } - - @Test - public void testSanitizeFileName() { - String fullPathPrefix = "/var/logs/app"; - String expected = "var_logs_app"; - String result = s3Writer.sanitizeFileName(fullPathPrefix); - assertEquals(expected, result); - - fullPathPrefix = "var/logs/app"; - expected = "var_logs_app"; - result = s3Writer.sanitizeFileName(fullPathPrefix); - assertEquals(expected, result); - - fullPathPrefix = "/var/logs/app/"; - expected = "var_logs_app_"; - result = s3Writer.sanitizeFileName(fullPathPrefix); - assertEquals(expected, result); - - fullPathPrefix = "/"; - expected = ""; - result = s3Writer.sanitizeFileName(fullPathPrefix); - assertEquals(expected, result); - - fullPathPrefix = ""; - expected = ""; - result = s3Writer.sanitizeFileName(fullPathPrefix); - assertEquals(expected, result); - } - - @Test - public void testExtractHostSuffix() { - String hostname = "app-server-12345"; - String expected = "12345"; - String result = S3Writer.extractHostSuffix(hostname); - assertEquals(expected, result); - - hostname = "app-12345"; - expected = "12345"; - result = S3Writer.extractHostSuffix(hostname); - assertEquals(expected, result); - - hostname = "12345"; - expected = "12345"; - result = S3Writer.extractHostSuffix(hostname); - assertEquals(expected, result); - - hostname = "app-server"; - expected = "server"; - result = S3Writer.extractHostSuffix(hostname); - assertEquals(expected, result); - - hostname = ""; - expected = ""; - result = S3Writer.extractHostSuffix(hostname); - assertEquals(expected, result); + s3Writer.close(); } @Test @@ -146,21 +90,8 @@ public void testWriteLogMessageToCommit() throws Exception { s3Writer.endCommit(1, false); // Verify that the messages are written to the buffer file - String - bufferFileNamePrefix = - s3Writer.sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer."; - File tmpDir = new File(tempPath); - File bufferFile = null; - File [] tmpFiles = tmpDir.listFiles(); - boolean bufferFileExists = false; - for (File file : tmpFiles) { - if (file.getName().startsWith(bufferFileNamePrefix)) { - bufferFileExists = true; - bufferFile = file; - break; - } - } - assertTrue(bufferFileExists); + File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName()); + assertTrue(bufferFile.exists()); String content = new String(Files.readAllBytes(bufferFile.toPath())); assertTrue(content.contains("test message")); } @@ -210,6 +141,37 @@ public void testUploadIsScheduled() throws Exception { verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); } + @Test + public void testResumeFromExistingBufferFile() throws Exception { + // Prepare log message + ByteBuffer messageBuffer = ByteBuffer.wrap("This is message 1 :".getBytes()); + LogMessage logMessage = new LogMessage(messageBuffer); + LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null); + + // Write log message to commit + s3Writer.startCommit(false); + s3Writer.writeLogMessageToCommit(logMessageAndPosition, false); + + // Create a new S3Writer with the same buffer file and write another message to simulate resuming + S3Writer + newS3Writer = + new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + messageBuffer = ByteBuffer.wrap(" This is message 2".getBytes()); + logMessage = new LogMessage(messageBuffer); + logMessageAndPosition = new LogMessageAndPosition(logMessage, null); + + // Write log message to commit + newS3Writer.startCommit(false); + newS3Writer.writeLogMessageToCommit(logMessageAndPosition, false); + + // Verify that the messages are written to the buffer file + File bufferFile = new File(tempPath + "/" + newS3Writer.getBufferFileName()); + assertTrue(bufferFile.exists()); + String content = new String(Files.readAllBytes(bufferFile.toPath())); + assertTrue(content.contains("This is message 1 : This is message 2")); + newS3Writer.close(); + } + @Test public void testObjectKeyGeneration() { // Custom and default tokens used @@ -223,24 +185,25 @@ public void testObjectKeyGeneration() { s3WriterConfig.setBucket("bucket-name"); s3WriterConfig.setFilenamePattern("(?[^-]+)-(?[^.]+)\\.(?\\d+)"); s3WriterConfig.setFilenameTokens(Arrays.asList("namespace", "filename", "index")); - s3Writer = - new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + // Check key prefix String[] objectKeyParts = s3Writer.generateS3ObjectKey().split("/"); assertEquals(4, objectKeyParts.length); assertEquals("my-path", objectKeyParts[0]); assertEquals("my_namespace", objectKeyParts[1]); assertEquals(logStream.getSingerLog().getSingerLogConfig().getName(), objectKeyParts[2]); + // Check last part of object key String[] keySuffixParts = objectKeyParts[3].split("\\."); assertEquals(3, keySuffixParts.length); assertEquals("test_log-0", keySuffixParts[0]); assertNotEquals("{{S}}", keySuffixParts[1]); assertEquals(2, keySuffixParts[1].length()); + // Custom tokens provided but filename pattern does not match s3WriterConfig.setFilenamePattern("(?[^.]+)\\.(?\\d+).0"); - s3Writer = - new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); + s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath); objectKeyParts = s3Writer.generateS3ObjectKey().split("/"); assertEquals("%{namespace}", objectKeyParts[1]); keySuffixParts = objectKeyParts[3].split("\\."); @@ -280,10 +243,9 @@ public void testClose() throws Exception { // Verify that the buffer file was correctly handled String - bufferFileName = - s3Writer.sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer.log"; + bufferFileName = s3Writer.getBufferFileName(); File bufferFile = new File(FilenameUtils.concat(tempPath, bufferFileName)); - assertTrue(!bufferFile.exists()); + assertFalse(bufferFile.exists()); assertEquals(0, bufferFile.length()); verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class)); }