diff --git a/src/chunk/mod.rs b/src/chunk/mod.rs index 97ead1b..c79a6d1 100644 --- a/src/chunk/mod.rs +++ b/src/chunk/mod.rs @@ -6,6 +6,8 @@ mod record_iterator; use std::fs::File; use std::fs::OpenOptions; use std::io; +use std::io::BufReader; +use std::io::Read; use std::io::Seek; use std::marker::PhantomData; use std::sync::Arc; @@ -14,6 +16,8 @@ use codeq::error_context_ext::ErrorContextExt; use codeq::Decode; use codeq::OffsetSize; use codeq::Segment; +use log::error; +use log::warn; use record_iterator::RecordIterator; use crate::chunk::chunk_id::ChunkId; @@ -33,6 +37,13 @@ pub struct Chunk { /// Global offset means the offset since the first chunk, not this chunk. pub(crate) global_offsets: Vec, + /// If the chunk has truncated unfinished write, this field will be set to + /// the file size before truncation. + /// + /// For testing purpose only. + #[allow(dead_code)] + pub(crate) truncated: Option, + pub(crate) _p: PhantomData, } @@ -103,11 +114,13 @@ where T: Types ) -> Result<(Self, Vec>), io::Error> { let f = Self::open_chunk_file(&config, chunk_id)?; let arc_f = Arc::new(f); + let file_size = arc_f.metadata()?.len(); let it = Self::load_records_iter(&config, arc_f.clone(), chunk_id)?; let mut record_offsets = vec![chunk_id.offset()]; let mut records = Vec::new(); let mut truncate = false; + let mut truncated = None; for res in it { match res { @@ -116,6 +129,8 @@ where T: Types records.push(record); } Err(io_err) => { + let global_offset = record_offsets.last().copied().unwrap(); + if io_err.kind() == io::ErrorKind::UnexpectedEof { // Incomplete record, discard it and all the following // records. @@ -123,6 +138,34 @@ where T: Types if truncate { break; } + } else { + // Maybe damaged or unfinished write with trailing + // zeros. + // + // Trailing zeros can happen if EXT4 is mounted with + // `data=writeback` mode, with which, data + // and metadata(file len) will be written to disk in + // arbitrary order. + + let all_zero = Self::verify_trailing_zeros( + arc_f.clone(), + global_offset - chunk_id.offset(), + chunk_id, + )?; + + if all_zero { + warn!( + "Trailing zeros detected at {} in chunk {}; Treat it as unfinished write", + global_offset, + chunk_id + ); + truncate = config.truncate_incomplete_record(); + if truncate { + break; + } + } else { + error!("Found damaged bytes: {}", io_err); + } } return Err(io_err); @@ -134,17 +177,86 @@ where T: Types arc_f .set_len(*record_offsets.last().unwrap() - chunk_id.offset())?; arc_f.sync_all()?; + truncated = Some(file_size); } let chunk = Self { f: arc_f, global_offsets: record_offsets, + truncated, _p: Default::default(), }; Ok((chunk, records)) } + /// Checks if a file contains only zero bytes from a specified offset to the + /// end. + /// + /// This function is used to detect and validate partially written or + /// corrupted data. It reads the file in chunks and verifies that all + /// bytes after the given offset are zeros. This is particularly useful + /// for detecting incomplete or interrupted writes where the remaining + /// space may have been zero-filled. + fn verify_trailing_zeros( + mut file: Arc, + mut start_offset: u64, + chunk_id: ChunkId, + ) -> Result { + let file_size = file.metadata()?.len(); + + if start_offset > file_size { + return Err(io::Error::new( + io::ErrorKind::InvalidInput, + format!( + "Start offset {} exceeds file size {}", + start_offset, file_size + ), + )); + } + + if file_size == start_offset { + return Ok(true); + } + + const WARN_THRESHOLD: u64 = 64 * 1024; // 64KB + if file_size - start_offset > WARN_THRESHOLD { + warn!( + "Large maybe damaged section detected: {} bytes to the end; in chunk {}", + file_size - start_offset, + chunk_id + ); + } + + file.seek(io::SeekFrom::Start(start_offset))?; + + const BUFFER_SIZE: usize = 16 * 1024 * 1024; // 16MB + const READ_CHUNK_SIZE: usize = 1024; // 1KB + let mut reader = BufReader::with_capacity(BUFFER_SIZE, file); + let mut buffer = vec![0; READ_CHUNK_SIZE]; + + loop { + let n = reader.read(&mut buffer)?; + if n == 0 { + break; + } + + for (i, byt) in buffer.iter().enumerate().take(n) { + if *byt != 0 { + error!( + "Non-zero byte detected at offset {} in chunk {}", + start_offset + i as u64, + chunk_id + ); + return Ok(false); + } + } + + start_offset += n as u64; + } + Ok(true) + } + #[allow(clippy::type_complexity)] pub(crate) fn dump( config: &Config, diff --git a/src/chunk/open_chunk.rs b/src/chunk/open_chunk.rs index cdede5c..1b14113 100644 --- a/src/chunk/open_chunk.rs +++ b/src/chunk/open_chunk.rs @@ -38,6 +38,7 @@ where T: Types let chunk = Chunk { f: Arc::new(f), global_offsets: record_offsets, + truncated: None, _p: Default::default(), }; diff --git a/src/raft_log/raft_log.rs b/src/raft_log/raft_log.rs index 30507f5..50c2a57 100644 --- a/src/raft_log/raft_log.rs +++ b/src/raft_log/raft_log.rs @@ -181,13 +181,13 @@ impl RaftLog { let dir_lock = file_lock::FileLock::new(config.clone()) .context(|| format!("open RaftLog in '{}'", config.dir))?; - let offsets = Self::load_chunk_ids(&config)?; + let chunk_ids = Self::load_chunk_ids(&config)?; let mut sm = RaftLogStateMachine::new(&config); let mut closed = BTreeMap::new(); let mut prev_end_offset = None; - for chunk_id in offsets { + for chunk_id in chunk_ids { if let Some(prev_end) = prev_end_offset { if prev_end != chunk_id.offset() { let message = format!( diff --git a/src/tests/test_raft_log.rs b/src/tests/test_raft_log.rs index a024f4b..9a8a7d4 100644 --- a/src/tests/test_raft_log.rs +++ b/src/tests/test_raft_log.rs @@ -534,6 +534,137 @@ ChunkId(00_000_000_000_000_000_610) Ok(()) } +/// The last record will be discarded if it is not completely written and filled +/// with zeros. +/// +/// Trailing zeros can happen if EXT4 is mounted with `data=writeback` mode, +/// with which, data and metadata(file len) will be written to disk in +/// arbitrary order. +#[test] +fn test_re_open_unfinished_tailing_zero_chunk() -> Result<(), io::Error> { + for append_zeros in [3, 1024 * 33] { + let mut ctx = TestContext::new()?; + let config = &mut ctx.config; + + config.chunk_max_records = Some(5); + + let (state, logs) = { + let mut rl = ctx.new_raft_log()?; + build_sample_data_purge_upto_3(&mut rl)?; + + ( + rl.log_state().clone(), + rl.read(0, 1000).collect::, _>>()?, + ) + }; + + // Append several zero bytes + { + let chunk_id = ChunkId(509); + let f = Chunk::::open_chunk_file(&ctx.config, chunk_id)?; + f.set_len(129 + append_zeros)?; + } + + // Re-open + { + let rl = ctx.new_raft_log()?; + + let last_closed = rl.wal.closed.last_key_value().unwrap().1; + assert_eq!(last_closed.chunk.truncated, Some(129 + append_zeros)); + + assert_eq!(state, rl.log_state().clone()); + assert_eq!(logs, rl.read(0, 1000).collect::, _>>()?); + + let dump = rl.dump().write_to_string()?; + println!("After reopen:\n{}", dump); + + assert_eq!( + indoc! {r#" +RaftLog: +ChunkId(00_000_000_000_000_000_324) + R-00000: [000_000_000, 000_000_050) 50: State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None }) + R-00001: [000_000_050, 000_000_078) 28: PurgeUpto((1, 1)) + R-00002: [000_000_078, 000_000_115) 37: Append((2, 4), "world") + R-00003: [000_000_115, 000_000_150) 35: Append((2, 5), "foo") + R-00004: [000_000_150, 000_000_185) 35: Append((2, 6), "bar") +ChunkId(00_000_000_000_000_000_509) + R-00000: [000_000_000, 000_000_066) 66: State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None }) + R-00001: [000_000_066, 000_000_101) 35: Append((2, 7), "wow") + R-00002: [000_000_101, 000_000_129) 28: PurgeUpto((2, 3)) +ChunkId(00_000_000_000_000_000_638) + R-00000: [000_000_000, 000_000_066) 66: State(RaftLogState { vote: None, last: Some((2, 7)), committed: Some((1, 2)), purged: Some((2, 3)), user_data: None }) +"#}, + dump + ); + } + } + + Ok(()) +} + +#[test] +fn test_re_open_unfinished_tailing_not_all_zero_chunk() -> Result<(), io::Error> +{ + let append_zeros = 1024 * 32; + + let mut ctx = TestContext::new()?; + let config = &mut ctx.config; + + config.chunk_max_records = Some(5); + + { + let mut rl = ctx.new_raft_log()?; + build_sample_data_purge_upto_3(&mut rl)?; + } + + // Append several zero bytes followed by a one + { + let chunk_id = ChunkId(509); + let mut f = Chunk::::open_chunk_file(&ctx.config, chunk_id)?; + f.set_len(129 + append_zeros)?; + + f.seek(io::SeekFrom::Start(129 + append_zeros))?; + f.write_u8(1)?; + } + + // Re-open + { + let res = ctx.new_raft_log(); + assert!(res.is_err()); + assert_eq!( + "crc32 checksum mismatch: expected fd59b8d, got 0, \ + while Record::decode(); \ + when:(decode Record at offset 129); \ + when:(iterate ChunkId(00_000_000_000_000_000_509))", + res.unwrap_err().to_string() + ); + + let dump = + Dump::::new(ctx.arc_config())?.write_to_string()?; + println!("After reopen:\n{}", dump); + + assert_eq!( + indoc! {r#" +RaftLog: +ChunkId(00_000_000_000_000_000_324) + R-00000: [000_000_000, 000_000_050) 50: State(RaftLogState { vote: None, last: Some((2, 3)), committed: Some((1, 2)), purged: None, user_data: None }) + R-00001: [000_000_050, 000_000_078) 28: PurgeUpto((1, 1)) + R-00002: [000_000_078, 000_000_115) 37: Append((2, 4), "world") + R-00003: [000_000_115, 000_000_150) 35: Append((2, 5), "foo") + R-00004: [000_000_150, 000_000_185) 35: Append((2, 6), "bar") +ChunkId(00_000_000_000_000_000_509) + R-00000: [000_000_000, 000_000_066) 66: State(RaftLogState { vote: None, last: Some((2, 6)), committed: Some((1, 2)), purged: Some((1, 1)), user_data: None }) + R-00001: [000_000_066, 000_000_101) 35: Append((2, 7), "wow") + R-00002: [000_000_101, 000_000_129) 28: PurgeUpto((2, 3)) +Error: crc32 checksum mismatch: expected fd59b8d, got 0, while Record::decode(); when:(decode Record at offset 129); when:(iterate ChunkId(00_000_000_000_000_000_509)) +"#}, + dump + ); + } + + Ok(()) +} + /// A damaged last record of non-last chunk will not be truncated, but is /// considered a damage. #[test]