Skip to content

Commit

Permalink
Feature: treat trailing non-zero as unfinished write and truncate the…
Browse files Browse the repository at this point in the history
… file

Trailing zeros can happen if EXT4 is mounted with `data=writeback` mode,
with which, data and metadata(file len) will be written to disk in
arbitrary order.

In such case, the zero bytes since the first un-decodable WALRecord will
be truncated and the chunk is considered successfully opened.
  • Loading branch information
drmingdrmer committed Dec 12, 2024
1 parent faa4fd9 commit 757542b
Show file tree
Hide file tree
Showing 4 changed files with 246 additions and 2 deletions.
112 changes: 112 additions & 0 deletions src/chunk/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,8 @@ mod record_iterator;
use std::fs::File;
use std::fs::OpenOptions;
use std::io;
use std::io::BufReader;
use std::io::Read;
use std::io::Seek;
use std::marker::PhantomData;
use std::sync::Arc;
Expand All @@ -14,6 +16,8 @@ use codeq::error_context_ext::ErrorContextExt;
use codeq::Decode;
use codeq::OffsetSize;
use codeq::Segment;
use log::error;
use log::warn;
use record_iterator::RecordIterator;

use crate::chunk::chunk_id::ChunkId;
Expand All @@ -33,6 +37,13 @@ pub struct Chunk<T> {
/// Global offset means the offset since the first chunk, not this chunk.
pub(crate) global_offsets: Vec<u64>,

/// If the chunk has truncated unfinished write, this field will be set to
/// the file size before truncation.
///
/// For testing purpose only.
#[allow(dead_code)]
pub(crate) truncated: Option<u64>,

pub(crate) _p: PhantomData<T>,
}

Expand Down Expand Up @@ -103,11 +114,13 @@ where T: Types
) -> Result<(Self, Vec<WALRecord<T>>), io::Error> {
let f = Self::open_chunk_file(&config, chunk_id)?;
let arc_f = Arc::new(f);
let file_size = arc_f.metadata()?.len();
let it = Self::load_records_iter(&config, arc_f.clone(), chunk_id)?;

let mut record_offsets = vec![chunk_id.offset()];
let mut records = Vec::new();
let mut truncate = false;
let mut truncated = None;

for res in it {
match res {
Expand All @@ -116,13 +129,43 @@ where T: Types
records.push(record);
}
Err(io_err) => {
let global_offset = record_offsets.last().copied().unwrap();

if io_err.kind() == io::ErrorKind::UnexpectedEof {
// Incomplete record, discard it and all the following
// records.
truncate = config.truncate_incomplete_record();
if truncate {
break;
}
} else {
// Maybe damaged or unfinished write with trailing
// zeros.
//
// Trailing zeros can happen if EXT4 is mounted with
// `data=writeback` mode, with which, data
// and metadata(file len) will be written to disk in
// arbitrary order.

let all_zero = Self::verify_trailing_zeros(
arc_f.clone(),
global_offset - chunk_id.offset(),
chunk_id,
)?;

if all_zero {
warn!(
"Trailing zeros detected at {} in chunk {}; Treat it as unfinished write",
global_offset,
chunk_id
);
truncate = config.truncate_incomplete_record();
if truncate {
break;
}
} else {
error!("Found damaged bytes: {}", io_err);
}
}

return Err(io_err);
Expand All @@ -134,17 +177,86 @@ where T: Types
arc_f
.set_len(*record_offsets.last().unwrap() - chunk_id.offset())?;
arc_f.sync_all()?;
truncated = Some(file_size);
}

let chunk = Self {
f: arc_f,
global_offsets: record_offsets,
truncated,
_p: Default::default(),
};

Ok((chunk, records))
}

/// Checks if a file contains only zero bytes from a specified offset to the
/// end.
///
/// This function is used to detect and validate partially written or
/// corrupted data. It reads the file in chunks and verifies that all
/// bytes after the given offset are zeros. This is particularly useful
/// for detecting incomplete or interrupted writes where the remaining
/// space may have been zero-filled.
fn verify_trailing_zeros(
mut file: Arc<File>,
mut start_offset: u64,
chunk_id: ChunkId,
) -> Result<bool, io::Error> {
let file_size = file.metadata()?.len();

if start_offset > file_size {
return Err(io::Error::new(
io::ErrorKind::InvalidInput,
format!(
"Start offset {} exceeds file size {}",
start_offset, file_size
),
));
}

if file_size == start_offset {
return Ok(true);
}

const WARN_THRESHOLD: u64 = 64 * 1024; // 64KB
if file_size - start_offset > WARN_THRESHOLD {
warn!(
"Large maybe damaged section detected: {} bytes to the end; in chunk {}",
file_size - start_offset,
chunk_id
);
}

file.seek(io::SeekFrom::Start(start_offset))?;

const BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB
const READ_CHUNK_SIZE: usize = 1024; // 1KB
let mut reader = BufReader::with_capacity(BUFFER_SIZE, file);
let mut buffer = vec![0; READ_CHUNK_SIZE];

loop {
let n = reader.read(&mut buffer)?;
if n == 0 {
break;
}

for (i, byt) in buffer.iter().enumerate().take(n) {
if *byt != 0 {
error!(
"Non-zero byte detected at offset {} in chunk {}",
start_offset + i as u64,
chunk_id
);
return Ok(false);
}
}

start_offset += n as u64;
}
Ok(true)
}

#[allow(clippy::type_complexity)]
pub(crate) fn dump(
config: &Config,
Expand Down
1 change: 1 addition & 0 deletions src/chunk/open_chunk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ where T: Types
let chunk = Chunk {
f: Arc::new(f),
global_offsets: record_offsets,
truncated: None,
_p: Default::default(),
};

Expand Down
4 changes: 2 additions & 2 deletions src/raft_log/raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -181,13 +181,13 @@ impl<T: Types> RaftLog<T> {
let dir_lock = file_lock::FileLock::new(config.clone())
.context(|| format!("open RaftLog in '{}'", config.dir))?;

let offsets = Self::load_chunk_ids(&config)?;
let chunk_ids = Self::load_chunk_ids(&config)?;

let mut sm = RaftLogStateMachine::new(&config);
let mut closed = BTreeMap::new();
let mut prev_end_offset = None;

for chunk_id in offsets {
for chunk_id in chunk_ids {
if let Some(prev_end) = prev_end_offset {
if prev_end != chunk_id.offset() {
let message = format!(
Expand Down
131 changes: 131 additions & 0 deletions src/tests/test_raft_log.rs
Original file line number Diff line number Diff line change
Expand Up @@ -534,6 +534,137 @@ ChunkId(00_000_000_000_000_000_610)
Ok(())
}

/// The last record will be discarded if it is not completely written and filled
/// with zeros.
///
/// Trailing zeros can happen if EXT4 is mounted with `data=writeback` mode,
/// with which, data and metadata(file len) will be written to disk in
/// arbitrary order.
#[test]
fn test_re_open_unfinished_tailing_zero_chunk() -> Result<(), io::Error> {
for append_zeros in [3, 1024 * 33] {
let mut ctx = TestContext::new()?;
let config = &mut ctx.config;

config.chunk_max_records = Some(5);

let (state, logs) = {
let mut rl = ctx.new_raft_log()?;
build_sample_data_purge_upto_3(&mut rl)?;

(
rl.log_state().clone(),
rl.read(0, 1000).collect::<Result<Vec<_>, _>>()?,
)
};

// Append several zero bytes
{
let chunk_id = ChunkId(509);
let f = Chunk::<TestTypes>::open_chunk_file(&ctx.config, chunk_id)?;
f.set_len(129 + append_zeros)?;
}

// Re-open
{
let rl = ctx.new_raft_log()?;

let last_closed = rl.wal.closed.last_key_value().unwrap().1;
assert_eq!(last_closed.chunk.truncated, Some(129 + append_zeros));

assert_eq!(state, rl.log_state().clone());
assert_eq!(logs, rl.read(0, 1000).collect::<Result<Vec<_>, _>>()?);

let dump = rl.dump().write_to_string()?;
println!("After reopen:\n{}", dump);

assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) 50: State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) 28: PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) 37: Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) 35: Append((2, 5), "foo")
R-00004: [000_000_150, 000_000_185) 35: Append((2, 6), "bar")
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) 66: State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) 35: Append((2, 7), "wow")
R-00002: [000_000_101, 000_000_129) 28: PurgeUpto((2, 3))
ChunkId(00_000_000_000_000_000_638)
R-00000: [000_000_000, 000_000_066) 66: State(RaftLogState { vote: None, last: Some((2, 7)), committed: Some((1, 2)), purged: Some((2, 3)), user_data: None })
"#},
dump
);
}
}

Ok(())
}

#[test]
fn test_re_open_unfinished_tailing_not_all_zero_chunk() -> Result<(), io::Error>
{
let append_zeros = 1024 * 32;

let mut ctx = TestContext::new()?;
let config = &mut ctx.config;

config.chunk_max_records = Some(5);

{
let mut rl = ctx.new_raft_log()?;
build_sample_data_purge_upto_3(&mut rl)?;
}

// Append several zero bytes followed by a one
{
let chunk_id = ChunkId(509);
let mut f = Chunk::<TestTypes>::open_chunk_file(&ctx.config, chunk_id)?;
f.set_len(129 + append_zeros)?;

f.seek(io::SeekFrom::Start(129 + append_zeros))?;
f.write_u8(1)?;
}

// Re-open
{
let res = ctx.new_raft_log();
assert!(res.is_err());
assert_eq!(
"crc32 checksum mismatch: expected fd59b8d, got 0, \
while Record::decode(); \
when:(decode Record at offset 129); \
when:(iterate ChunkId(00_000_000_000_000_000_509))",
res.unwrap_err().to_string()
);

let dump =
Dump::<TestTypes>::new(ctx.arc_config())?.write_to_string()?;
println!("After reopen:\n{}", dump);

assert_eq!(
indoc! {r#"
RaftLog:
ChunkId(00_000_000_000_000_000_324)
R-00000: [000_000_000, 000_000_050) 50: State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None })
R-00001: [000_000_050, 000_000_078) 28: PurgeUpto((1, 1))
R-00002: [000_000_078, 000_000_115) 37: Append((2, 4), "world")
R-00003: [000_000_115, 000_000_150) 35: Append((2, 5), "foo")
R-00004: [000_000_150, 000_000_185) 35: Append((2, 6), "bar")
ChunkId(00_000_000_000_000_000_509)
R-00000: [000_000_000, 000_000_066) 66: State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None })
R-00001: [000_000_066, 000_000_101) 35: Append((2, 7), "wow")
R-00002: [000_000_101, 000_000_129) 28: PurgeUpto((2, 3))
Error: crc32 checksum mismatch: expected fd59b8d, got 0, while Record::decode(); when:(decode Record at offset 129); when:(iterate ChunkId(00_000_000_000_000_000_509))
"#},
dump
);
}

Ok(())
}

/// A damaged last record of non-last chunk will not be truncated, but is
/// considered a damage.
#[test]
Expand Down

0 comments on commit 757542b

Please sign in to comment.