Skip to content

Commit

Permalink
Added streaming chunk support
Browse files Browse the repository at this point in the history
While working on Sediment, I realized that I was causing one extra copy
of data when writing to the WAL than was needed, since internally the
WAL already has a buffered writer. This commit adds the ability to write
a chunk using a type that implements io::Write, provided that you know
the length of the chunk ahead of time.

The main change to support this is to move the crc from the header of
the chunk to be the tail end of the chunk instead.
  • Loading branch information
ecton committed Jan 2, 2023
1 parent 77b2764 commit 9946e27
Show file tree
Hide file tree
Showing 4 changed files with 138 additions and 39 deletions.
93 changes: 84 additions & 9 deletions src/entry.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
use std::io::{self, Write};

use crc32c::crc32c_append;
use parking_lot::MutexGuard;

use crate::{
log_file::{LogFile, LogFileWriter},
to_io_result::ToIoResult,
Expand Down Expand Up @@ -96,23 +99,34 @@ impl<'a> EntryWriter<'a> {
/// Appends a chunk of data to this log entry. Each chunk of data is able to
/// be read using [`Entry::read_chunk`](crate::Entry).
pub fn write_chunk(&mut self, data: &[u8]) -> io::Result<ChunkRecord> {
let length = u32::try_from(data.len()).to_io()?;
let mut writer = self.begin_chunk(u32::try_from(data.len()).to_io()?)?;
writer.write_all(data)?;
writer.finish()
}

/// Begins writing a chunk with the given `length`.
///
/// The writer returned already contains an internal buffer. This function
/// can be used to write a complex payload without needing to first
/// combine it in another buffer.
pub fn begin_chunk(&mut self, length: u32) -> io::Result<ChunkWriter<'_>> {
let mut file = self.file.as_ref().expect("already dropped").lock();

let position = LogPosition {
file_id: file.id(),
offset: file.position(),
};
let crc = crc32c::crc32c(data);
let mut header = [CHUNK; 9];
header[1..5].copy_from_slice(&crc.to_le_bytes());
header[5..].copy_from_slice(&length.to_le_bytes());
file.write_all(&header)?;
file.write_all(data)?;

Ok(ChunkRecord {
file.write_all(&[CHUNK])?;
file.write_all(&length.to_le_bytes())?;

Ok(ChunkWriter {
file,
position,
crc,
length,
bytes_remaining: length,
crc32: 0,
finished: false,
})
}
}
Expand All @@ -125,6 +139,67 @@ impl<'a> Drop for EntryWriter<'a> {
}
}

pub struct ChunkWriter<'a> {
file: MutexGuard<'a, LogFileWriter>,
position: LogPosition,
length: u32,
bytes_remaining: u32,
crc32: u32,
finished: bool,
}

impl<'a> ChunkWriter<'a> {
pub fn finish(mut self) -> io::Result<ChunkRecord> {
self.write_tail()?;
Ok(ChunkRecord {
position: self.position,
crc: self.crc32,
length: self.length,
})
}

fn write_tail(&mut self) -> io::Result<()> {
self.finished = true;

if self.bytes_remaining != 0 {
return Err(io::Error::new(
io::ErrorKind::Other,
"written length does not match expected length",
));
}

self.file.write_all(&self.crc32.to_le_bytes())
}
}

impl<'a> Drop for ChunkWriter<'a> {
fn drop(&mut self) {
if !self.finished {
self.write_tail()
.expect("chunk writer dropped without finishing");
}
}
}

impl<'a> Write for ChunkWriter<'a> {
fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
let bytes_to_write = buf
.len()
.min(usize::try_from(self.bytes_remaining).to_io()?);

let bytes_written = self.file.write(&buf[..bytes_to_write])?;
if bytes_written > 0 {
self.bytes_remaining -= u32::try_from(bytes_written).to_io()?;
self.crc32 = crc32c_append(self.crc32, &buf[..bytes_written]);
}
Ok(bytes_written)
}

fn flush(&mut self) -> io::Result<()> {
Ok(())
}
}

/// The position of a chunk of data within a [`WriteAheadLog`].
#[derive(Debug, Clone, Copy, Eq, PartialEq)]
pub struct LogPosition {
Expand Down
31 changes: 20 additions & 11 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl WriteAheadLog {
Recovery::Recover => {
while let Some(mut entry) = reader.read_entry()? {
manager.recover(&mut entry)?;
while let Some(mut chunk) = match entry.read_chunk()? {
while let Some(chunk) = match entry.read_chunk()? {
ReadChunkResult::Chunk(chunk) => Some(chunk),
ReadChunkResult::EndOfEntry | ReadChunkResult::AbortedEntry => None,
} {
Expand Down Expand Up @@ -368,14 +368,13 @@ impl WriteAheadLog {
)?;
file.seek(SeekFrom::Start(position.offset))?;
let mut reader = BufReader::new(file);
let mut header_bytes = [0; 9];
let mut header_bytes = [0; 5];
reader.read_exact(&mut header_bytes)?;
let crc32 = u32::from_le_bytes(header_bytes[1..5].try_into().expect("u32 is 4 bytes"));
let length = u32::from_le_bytes(header_bytes[5..9].try_into().expect("u32 is 4 bytes"));
let length = u32::from_le_bytes(header_bytes[1..5].try_into().expect("u32 is 4 bytes"));

Ok(ChunkReader {
reader,
stored_crc32: crc32,
stored_crc32: None,
length,
bytes_remaining: length,
read_crc32: 0,
Expand Down Expand Up @@ -438,7 +437,7 @@ pub struct ChunkReader {
reader: BufReader<File>,
bytes_remaining: u32,
length: u32,
stored_crc32: u32,
stored_crc32: Option<u32>,
read_crc32: u32,
}

Expand All @@ -462,13 +461,23 @@ impl ChunkReader {
/// Returns true if the stored checksum matches the computed checksum during
/// read.
///
/// This function returns `None` if the chunk hasn't been read completely.
#[must_use]
pub const fn crc_is_valid(&self) -> Option<bool> {
/// This function will only return `Ok()` if the chunk has been fully read.
pub fn crc_is_valid(&mut self) -> io::Result<bool> {
if self.bytes_remaining == 0 {
Some(self.stored_crc32 == self.read_crc32)
if self.stored_crc32.is_none() {
let mut stored_crc32 = [0; 4];
// Bypass our internal read, otherwise our crc would include the
// crc read itself.
self.reader.read_exact(&mut stored_crc32)?;
self.stored_crc32 = Some(u32::from_le_bytes(stored_crc32));
}

Ok(self.stored_crc32.expect("already initialized") == self.read_crc32)
} else {
None
Err(io::Error::new(
io::ErrorKind::Other,
"crc cannot be checked before reading all chunk bytes",
))
}
}
}
Expand Down
50 changes: 32 additions & 18 deletions src/log_file.rs
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,7 @@ impl SegmentReader {
if let Some(id) = self.current_entry_id.take() {
// Skip the remainder of the current entry.
let mut entry = Entry { id, reader: self };
while let Some(mut chunk) = match entry.read_chunk()? {
while let Some(chunk) = match entry.read_chunk()? {
ReadChunkResult::Chunk(chunk) => Some(chunk),
_ => None,
} {
Expand Down Expand Up @@ -420,7 +420,7 @@ impl<'entry> Entry<'entry> {

match self.reader.file.buffer().first().copied() {
Some(CHUNK) => {
let mut header_bytes = [0; 9];
let mut header_bytes = [0; 5];
let offset = self.reader.file.stream_position()?;
self.reader.file.read_exact(&mut header_bytes)?;
Ok(ReadChunkResult::Chunk(EntryChunk {
Expand All @@ -430,11 +430,9 @@ impl<'entry> Entry<'entry> {
},
entry: self,
calculated_crc: 0,
stored_crc: u32::from_le_bytes(
header_bytes[1..5].try_into().expect("u32 is 4 bytes"),
),
stored_crc32: None,
bytes_remaining: u32::from_le_bytes(
header_bytes[5..].try_into().expect("u32 is 4 bytes"),
header_bytes[1..5].try_into().expect("u32 is 4 bytes"),
),
}))
}
Expand Down Expand Up @@ -498,8 +496,8 @@ pub struct EntryChunk<'chunk, 'entry> {
entry: &'chunk mut Entry<'entry>,
position: LogPosition,
bytes_remaining: u32,
stored_crc: u32,
calculated_crc: u32,
stored_crc32: Option<u32>,
}

impl<'chunk, 'entry> EntryChunk<'chunk, 'entry> {
Expand All @@ -516,14 +514,24 @@ impl<'chunk, 'entry> EntryChunk<'chunk, 'entry> {
}

/// Returns true if the CRC has been validated, or false if the computed crc
/// is different than the stored crc. Returns `None` if not all data has
/// been read yet.
#[must_use]
pub const fn check_crc(&self) -> Option<bool> {
if self.bytes_remaining > 0 {
None
/// is different than the stored crc. Returns an error if the chunk has not
/// been fully read yet.
pub fn check_crc(&mut self) -> io::Result<bool> {
if self.bytes_remaining == 0 {
if self.stored_crc32.is_none() {
let mut stored_crc32 = [0; 4];
// Bypass our internal read, otherwise our crc would include the
// crc read itself.
self.entry.reader.file.read_exact(&mut stored_crc32)?;
self.stored_crc32 = Some(u32::from_le_bytes(stored_crc32));
}

Ok(self.stored_crc32.expect("already initialized") == self.calculated_crc)
} else {
Some(self.stored_crc == self.calculated_crc)
Err(io::Error::new(
io::ErrorKind::Other,
"crc cannot be checked before reading all chunk bytes",
))
}
}

Expand All @@ -535,12 +543,18 @@ impl<'chunk, 'entry> EntryChunk<'chunk, 'entry> {
}

/// Advances past the end of this chunk without reading the remaining bytes.
pub fn skip_remaining_bytes(&mut self) -> io::Result<()> {
if self.bytes_remaining > 0 {
pub fn skip_remaining_bytes(mut self) -> io::Result<()> {
self.skip_remaining_bytes_internal()
}

/// Advances past the end of this chunk without reading the remaining bytes.
fn skip_remaining_bytes_internal(&mut self) -> io::Result<()> {
if self.bytes_remaining > 0 || self.stored_crc32.is_none() {
// Skip past the remaining bytes plus the crc.
self.entry
.reader
.file
.seek(SeekFrom::Current(i64::from(self.bytes_remaining)))?;
.seek(SeekFrom::Current(i64::from(self.bytes_remaining + 4)))?;
self.bytes_remaining = 0;
}
Ok(())
Expand All @@ -565,7 +579,7 @@ impl<'chunk, 'entry> Read for EntryChunk<'chunk, 'entry> {

impl<'chunk, 'entry> Drop for EntryChunk<'chunk, 'entry> {
fn drop(&mut self) {
self.skip_remaining_bytes()
self.skip_remaining_bytes_internal()
.expect("error while skipping remaining bytes");
}
}
Expand Down
3 changes: 2 additions & 1 deletion src/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,7 @@ fn basic() {
let wal = WriteAheadLog::recover(&dir, checkpointer.clone()).unwrap();

let invocations = checkpointer.invocations.lock();
println!("Invocations: {:?}", invocations);
assert_eq!(invocations.len(), 2);
match &invocations[0] {
CheckpointCall::ShouldRecoverSegment { version_info } => {
Expand All @@ -113,7 +114,7 @@ fn basic() {
let mut buffer = Vec::new();
reader.read_to_end(&mut buffer).unwrap();
assert_eq!(buffer, message);
assert_eq!(reader.crc_is_valid(), Some(true));
assert!(reader.crc_is_valid().expect("error validating crc"));
}

#[derive(Debug, Default, Clone)]
Expand Down

0 comments on commit 9946e27

Please sign in to comment.