Skip to content

Commit

Permalink
One filesystem per outputstream.
Browse files Browse the repository at this point in the history
  • Loading branch information
duongkame committed Jul 12, 2024
1 parent 440477a commit 1d273df
Showing 1 changed file with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.atomic.AtomicInteger;

import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -77,12 +78,14 @@ public class HsyncGenerator extends BaseFreonGenerator implements Callable<Void>
private Timer timer;

private OzoneConfiguration configuration;
private FileSystem fileSystem;
private FileSystem[] fileSystems;
private FSDataOutputStream[] outputStreams;
private byte[] data;
private Path[] files;
private AtomicInteger[] callsPerFile;

public HsyncGenerator() {
}
private byte[] data;

@VisibleForTesting
HsyncGenerator(OzoneConfiguration ozoneConfiguration) {
Expand All @@ -97,13 +100,20 @@ public Void call() throws Exception {
configuration = freon.createOzoneConfiguration();
}
URI uri = URI.create(rootPath);
fileSystem = FileSystem.get(uri, configuration);

fileSystems = new FileSystem[numberOfFiles];
outputStreams = new FSDataOutputStream[numberOfFiles];
files = new Path[numberOfFiles];
callsPerFile = new AtomicInteger[numberOfFiles];
for (int i = 0; i < numberOfFiles; i++) {
FileSystem fileSystem = FileSystem.get(uri, configuration);
Path file = new Path(rootPath + "/" + generateObjectName(i));
fileSystem.mkdirs(file.getParent());
outputStreams[i] = fileSystem.create(file);
fileSystems[i] = fileSystem;
files[i] = file;
callsPerFile[i] = new AtomicInteger();

LOG.info("Created file for testing: {}", file);
}

Expand All @@ -116,17 +126,28 @@ public Void call() throws Exception {
for (FSDataOutputStream outputStream : outputStreams) {
outputStream.close();
}
fileSystem.close();
for (FileSystem fs : fileSystems) {
fs.close();
}
}

StringBuilder distributionReport = new StringBuilder();
for (int i = 0; i < numberOfFiles; i++) {
distributionReport.append("\t").append(files[i]).append(": ").append(callsPerFile[i]).append("\n");
}

LOG.info("Hsync generator finished, calls distribution: \n {}", distributionReport);

return null;
}

private void sendHsync(long counter) throws Exception {
timer.time(() -> {
FSDataOutputStream outputStream = outputStreams[((int) counter) % numberOfFiles];
int i = ((int) counter) % numberOfFiles;
FSDataOutputStream outputStream = outputStreams[i];
outputStream.write(data);
outputStream.hsync();
callsPerFile[i].incrementAndGet();
return null;
});
}
Expand Down

0 comments on commit 1d273df

Please sign in to comment.