Skip to content

Commit

Permalink
Merge branch 'pinterest:master' into bump_version_08092
Browse files Browse the repository at this point in the history
  • Loading branch information
jfzunigac authored Dec 5, 2024
2 parents 5da44a9 + eb7d908 commit f1f009b
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 110 deletions.
50 changes: 20 additions & 30 deletions singer/src/main/java/com/pinterest/singer/writer/s3/S3Writer.java
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@
import java.util.regex.Pattern;

import software.amazon.awssdk.services.s3.S3Client;
import software.amazon.awssdk.services.s3.model.ObjectCannedACL;

/**
* A LogStreamWriter for Singer that writes to S3 (writer.type=s3).
Expand Down Expand Up @@ -68,7 +67,6 @@ public class S3Writer implements LogStreamWriter {
// Custom Thresholds
private int maxFileSizeMB;
private int minUploadTime;
private int maxRetries;
private Pattern filenamePattern;
private List<String> fileNameTokens = new ArrayList<>();
private boolean filenameParsingEnabled = false;
Expand Down Expand Up @@ -129,8 +127,6 @@ public S3Writer(LogStream logStream, S3WriterConfig s3WriterConfig, S3Uploader s
private void initialize() {
this.maxFileSizeMB = s3WriterConfig.getMaxFileSizeMB();
this.minUploadTime = s3WriterConfig.getMinUploadTimeInSeconds();
this.maxRetries = s3WriterConfig.getMaxRetries();

if (s3WriterConfig.isSetFilenamePattern() && s3WriterConfig.isSetFilenameTokens()) {
this.filenameParsingEnabled = true;
this.filenamePattern = Pattern.compile(s3WriterConfig.getFilenamePattern());
Expand Down Expand Up @@ -182,13 +178,16 @@ public boolean isCommittableWriter() {
}

/**
* Takes the fullPathPrefix and removes all slashes and replaces them with underscores.
* Get or construct buffer file name based on the log stream name.
* The buffer file naming convention is "log_name.dir_name.file_name.buffer".
*
* @return the buffer file name
*/
public static String sanitizeFileName(String fullPathPrefix) {
if (fullPathPrefix.startsWith("/")) {
fullPathPrefix = fullPathPrefix.substring(1);
}
return fullPathPrefix.replace("/", "_");
public String getBufferFileName() {
return (bufferFile != null) ? bufferFile.getName()
: logName + "." + logStream.getLogDir().substring(1)
.replace("/", "_") + "."
+ logStream.getLogStreamName() + ".buffer";
}

/**
Expand All @@ -203,13 +202,13 @@ public static String sanitizeFileName(String fullPathPrefix) {
public synchronized void startCommit(boolean isDraining) throws LogStreamWriterException {
try {
if (!bufferFile.exists()) {
bufferFile.createNewFile();
resetBufferFile();
}

bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true));

} catch (IOException e) {
throw new RuntimeException("Failed to create buffer file: " + bufferFile.getName(), e);
throw new RuntimeException("Failed to create buffer file: " + getBufferFileName(), e);
}
if (uploadFuture == null) {
scheduleUploadTask();
Expand Down Expand Up @@ -250,7 +249,7 @@ private void scheduleUploadTask() {
private void uploadDiskBufferedFileToS3() throws IOException {
File
fileToUpload =
new File(BUFFER_DIR, bufferFile.getName() + "." + FORMATTER.format(new Date()));
new File(BUFFER_DIR, getBufferFileName() + "." + FORMATTER.format(new Date()));
String fileFormat = generateS3ObjectKey();
try {
Files.move(bufferFile.toPath(), fileToUpload.toPath());
Expand All @@ -266,7 +265,7 @@ private void uploadDiskBufferedFileToS3() throws IOException {
}
fileToUpload.delete();
} catch (IOException e) {
LOG.error("Failed to rename buffer file " + bufferFile.getName(), e);
LOG.error("Failed to rename buffer file " + getBufferFileName(), e);
}
}

Expand Down Expand Up @@ -318,24 +317,15 @@ private void writeMessageToBuffer(LogMessageAndPosition logMessageAndPosition)
* @throws IOException
*/
private void resetBufferFile() throws IOException {
String
bufferFileName =
sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer." + UUID.randomUUID()
.toString().substring(0, 8);
bufferFile = new File(BUFFER_DIR, bufferFileName);
bufferFile.createNewFile();
bufferFile = new File(BUFFER_DIR, getBufferFileName());
if (!bufferFile.createNewFile()) {
LOG.info(
"Buffer file for log stream {} already exists, continue with existing buffer file: {}",
logName, getBufferFileName());
}
bufferedOutputStream = new BufferedOutputStream(new FileOutputStream(bufferFile, true));
}

/**
* Helper function to get the remaining part of the host name after the cluster prefix,
* typically a UUID.
*/
public static String extractHostSuffix(String inputStr) {
String[] parts = inputStr.split("-");
return parts[parts.length - 1];
}

private Matcher extractTokensFromFilename(String logFileName) {
Matcher matcher = filenamePattern.matcher(logFileName);
if (!matcher.matches()) {
Expand Down Expand Up @@ -507,7 +497,7 @@ public void close() throws IOException {
uploadDiskBufferedFileToS3();
bufferFile.delete();
} catch (IOException e) {
LOG.error("Failed to close bufferedWriter or upload buffer file: " + bufferFile.getName(),
LOG.error("Failed to close bufferedWriter or upload buffer file: " + getBufferFileName(),
e);
}
}
Expand Down
122 changes: 42 additions & 80 deletions singer/src/test/java/com/pinterest/singer/writer/s3/S3WriterTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,7 @@ public void setUp() {
s3WriterConfig.setMaxRetries(3);

// Initialize the S3Writer with mock dependencies
s3Writer =
new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
}

@After
Expand All @@ -75,62 +74,7 @@ public void tearDown() throws IOException {
}
// reset hostname
SingerUtils.setHostname(SingerUtils.getHostname(), "-");
}

@Test
public void testSanitizeFileName() {
String fullPathPrefix = "/var/logs/app";
String expected = "var_logs_app";
String result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);

fullPathPrefix = "var/logs/app";
expected = "var_logs_app";
result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);

fullPathPrefix = "/var/logs/app/";
expected = "var_logs_app_";
result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);

fullPathPrefix = "/";
expected = "";
result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);

fullPathPrefix = "";
expected = "";
result = s3Writer.sanitizeFileName(fullPathPrefix);
assertEquals(expected, result);
}

@Test
public void testExtractHostSuffix() {
String hostname = "app-server-12345";
String expected = "12345";
String result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);

hostname = "app-12345";
expected = "12345";
result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);

hostname = "12345";
expected = "12345";
result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);

hostname = "app-server";
expected = "server";
result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);

hostname = "";
expected = "";
result = S3Writer.extractHostSuffix(hostname);
assertEquals(expected, result);
s3Writer.close();
}

@Test
Expand All @@ -146,21 +90,8 @@ public void testWriteLogMessageToCommit() throws Exception {
s3Writer.endCommit(1, false);

// Verify that the messages are written to the buffer file
String
bufferFileNamePrefix =
s3Writer.sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer.";
File tmpDir = new File(tempPath);
File bufferFile = null;
File [] tmpFiles = tmpDir.listFiles();
boolean bufferFileExists = false;
for (File file : tmpFiles) {
if (file.getName().startsWith(bufferFileNamePrefix)) {
bufferFileExists = true;
bufferFile = file;
break;
}
}
assertTrue(bufferFileExists);
File bufferFile = new File(tempPath + "/" + s3Writer.getBufferFileName());
assertTrue(bufferFile.exists());
String content = new String(Files.readAllBytes(bufferFile.toPath()));
assertTrue(content.contains("test message"));
}
Expand Down Expand Up @@ -210,6 +141,37 @@ public void testUploadIsScheduled() throws Exception {
verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class));
}

@Test
public void testResumeFromExistingBufferFile() throws Exception {
// Prepare log message
ByteBuffer messageBuffer = ByteBuffer.wrap("This is message 1 :".getBytes());
LogMessage logMessage = new LogMessage(messageBuffer);
LogMessageAndPosition logMessageAndPosition = new LogMessageAndPosition(logMessage, null);

// Write log message to commit
s3Writer.startCommit(false);
s3Writer.writeLogMessageToCommit(logMessageAndPosition, false);

// Create a new S3Writer with the same buffer file and write another message to simulate resuming
S3Writer
newS3Writer =
new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
messageBuffer = ByteBuffer.wrap(" This is message 2".getBytes());
logMessage = new LogMessage(messageBuffer);
logMessageAndPosition = new LogMessageAndPosition(logMessage, null);

// Write log message to commit
newS3Writer.startCommit(false);
newS3Writer.writeLogMessageToCommit(logMessageAndPosition, false);

// Verify that the messages are written to the buffer file
File bufferFile = new File(tempPath + "/" + newS3Writer.getBufferFileName());
assertTrue(bufferFile.exists());
String content = new String(Files.readAllBytes(bufferFile.toPath()));
assertTrue(content.contains("This is message 1 : This is message 2"));
newS3Writer.close();
}

@Test
public void testObjectKeyGeneration() {
// Custom and default tokens used
Expand All @@ -223,24 +185,25 @@ public void testObjectKeyGeneration() {
s3WriterConfig.setBucket("bucket-name");
s3WriterConfig.setFilenamePattern("(?<namespace>[^-]+)-(?<filename>[^.]+)\\.(?<index>\\d+)");
s3WriterConfig.setFilenameTokens(Arrays.asList("namespace", "filename", "index"));
s3Writer =
new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);

// Check key prefix
String[] objectKeyParts = s3Writer.generateS3ObjectKey().split("/");
assertEquals(4, objectKeyParts.length);
assertEquals("my-path", objectKeyParts[0]);
assertEquals("my_namespace", objectKeyParts[1]);
assertEquals(logStream.getSingerLog().getSingerLogConfig().getName(), objectKeyParts[2]);

// Check last part of object key
String[] keySuffixParts = objectKeyParts[3].split("\\.");
assertEquals(3, keySuffixParts.length);
assertEquals("test_log-0", keySuffixParts[0]);
assertNotEquals("{{S}}", keySuffixParts[1]);
assertEquals(2, keySuffixParts[1].length());

// Custom tokens provided but filename pattern does not match
s3WriterConfig.setFilenamePattern("(?<filename>[^.]+)\\.(?<index>\\d+).0");
s3Writer =
new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
s3Writer = new S3Writer(logStream, s3WriterConfig, mockS3Uploader, tempPath);
objectKeyParts = s3Writer.generateS3ObjectKey().split("/");
assertEquals("%{namespace}", objectKeyParts[1]);
keySuffixParts = objectKeyParts[3].split("\\.");
Expand Down Expand Up @@ -280,10 +243,9 @@ public void testClose() throws Exception {

// Verify that the buffer file was correctly handled
String
bufferFileName =
s3Writer.sanitizeFileName(logStream.getFullPathPrefix()) + ".buffer.log";
bufferFileName = s3Writer.getBufferFileName();
File bufferFile = new File(FilenameUtils.concat(tempPath, bufferFileName));
assertTrue(!bufferFile.exists());
assertFalse(bufferFile.exists());
assertEquals(0, bufferFile.length());
verify(mockS3Uploader, atLeastOnce()).upload(any(S3ObjectUpload.class));
}
Expand Down

0 comments on commit f1f009b

Please sign in to comment.