diff --git a/src/era/dump.rs b/src/era/dump.rs index a72bd51..cc3a9b1 100644 --- a/src/era/dump.rs +++ b/src/era/dump.rs @@ -1,4 +1,3 @@ -use anyhow::anyhow; use std::fs::File; use std::io::BufWriter; use std::io::Write; @@ -12,7 +11,7 @@ use crate::era::xml; use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine}; use crate::pdata::array::{self, ArrayBlock}; use crate::pdata::array_walker::*; -use crate::pdata::bitset::read_bitset; +use crate::pdata::bitset::read_bitset_no_err; use crate::pdata::btree_walker::btree_to_map; //------------------------------------------ @@ -78,6 +77,7 @@ fn mk_context(opts: &EraDumpOptions) -> anyhow::Result { Ok(Context { engine }) } +// notify the visitor about the marked blocks only fn dump_writeset( engine: Arc, out: &mut dyn MetadataVisitor, @@ -85,27 +85,68 @@ fn dump_writeset( ws: &Writeset, repair: bool, ) -> anyhow::Result<()> { - let (bits, errs) = read_bitset(engine.clone(), ws.root, ws.nr_bits as usize, repair); // TODO: deal with broken writeset - if errs.is_some() { - return Err(anyhow!( - "errors in writeset of era {}: {}", - era, - errs.unwrap() - )); - } + let bits = read_bitset_no_err(engine.clone(), ws.root, ws.nr_bits as usize, repair)?; out.writeset_b(&ir::Writeset { era, nr_bits: ws.nr_bits, })?; - for b in 0..ws.nr_bits { - let wbit = ir::WritesetBit { - block: b, - value: bits.contains(b as usize).unwrap_or(false), - }; - out.writeset_bit(&wbit)?; + + // [begin, end) denotes the range of set bits. + let mut begin: u32 = 0; + let mut end: u32 = 0; + for (index, entry) in bits.as_slice().iter().enumerate() { + let mut n = *entry; + + if n == u32::MAX { + end = std::cmp::min(end + 32, ws.nr_bits); + continue; + } + + while n > 0 { + let zeros = n.trailing_zeros(); + if zeros > 0 { + if end > begin { + let m = ir::MarkedBlocks { + begin, + len: end - begin, + }; + out.writeset_blocks(&m)?; + } + n >>= zeros; + end += zeros; + begin = end; + } + + let ones = n.trailing_ones(); + n >>= ones; + end = std::cmp::min(end + ones, ws.nr_bits); + } + + // emit the range if it ends before the entry boundary + let endpos = ((index as u32) << 5) + 32; + if end < endpos { + if end > begin { + let m = ir::MarkedBlocks { + begin, + len: end - begin, + }; + out.writeset_blocks(&m)?; + } + begin = endpos; + end = begin; + } } + + if end > begin { + let m = ir::MarkedBlocks { + begin, + len: end - begin, + }; + out.writeset_blocks(&m)?; + } + out.writeset_e()?; Ok(()) @@ -164,7 +205,7 @@ pub fn dump(opts: EraDumpOptions) -> anyhow::Result<()> { } else { writer = Box::new(BufWriter::new(std::io::stdout())); } - let mut out = xml::XmlWriter::new(writer); + let mut out = xml::XmlWriter::new(writer, false); if opts.logical { dump_metadata_logical(ctx.engine, &mut out, &sb, opts.repair) diff --git a/src/era/ir.rs b/src/era/ir.rs index 0dc1342..947a4f6 100644 --- a/src/era/ir.rs +++ b/src/era/ir.rs @@ -17,9 +17,9 @@ pub struct Writeset { } #[derive(Clone)] -pub struct WritesetBit { - pub block: u32, - pub value: bool, +pub struct MarkedBlocks { + pub begin: u32, + pub len: u32, } #[derive(Clone)] @@ -42,7 +42,7 @@ pub trait MetadataVisitor { fn writeset_b(&mut self, ws: &Writeset) -> Result; fn writeset_e(&mut self) -> Result; - fn writeset_bit(&mut self, wbit: &WritesetBit) -> Result; + fn writeset_blocks(&mut self, blocks: &MarkedBlocks) -> Result; fn era_b(&mut self) -> Result; fn era_e(&mut self) -> Result; diff --git a/src/era/restore.rs b/src/era/restore.rs index bbd05db..127abca 100644 --- a/src/era/restore.rs +++ b/src/era/restore.rs @@ -70,7 +70,8 @@ pub struct Restorer<'a> { writeset_builder: Option>, // bitset current_writeset: Option, era_array_builder: Option>, - writeset_buf: (u32, u64), // (index in u64 array, value) + writeset_entry: u64, + entry_index: u32, in_section: Section, } @@ -83,7 +84,8 @@ impl<'a> Restorer<'a> { writeset_builder: None, current_writeset: None, era_array_builder: None, - writeset_buf: (0, 0), + writeset_entry: 0, + entry_index: 0, in_section: Section::None, } } @@ -170,8 +172,8 @@ impl<'a> MetadataVisitor for Restorer<'a> { return Err(anyhow!("not in superblock")); } self.writeset_builder = Some(ArrayBuilder::new(div_up(ws.nr_bits as u64, 64))); - self.writeset_buf.0 = 0; - self.writeset_buf.1 = 0; + self.entry_index = 0; + self.writeset_entry = 0; self.current_writeset = Some(ws.clone()); self.in_section = Section::Writeset; Ok(Visit::Continue) @@ -185,7 +187,7 @@ impl<'a> MetadataVisitor for Restorer<'a> { if let Some(mut builder) = self.writeset_builder.take() { if let Some(ws) = self.current_writeset.take() { // push the trailing bits - builder.push_value(self.w, self.writeset_buf.0 as u64, self.writeset_buf.1)?; + builder.push_value(self.w, self.entry_index as u64, self.writeset_entry)?; let root = builder.complete(self.w)?; self.writesets.insert( @@ -206,20 +208,47 @@ impl<'a> MetadataVisitor for Restorer<'a> { Ok(Visit::Continue) } - fn writeset_bit(&mut self, wbit: &ir::WritesetBit) -> Result { - if wbit.value { - let index = wbit.block >> 6; - let mask = 1 << (wbit.block & 63); - if index == self.writeset_buf.0 { - self.writeset_buf.1 |= mask; - } else { - let builder = self.writeset_builder.as_mut().unwrap(); - builder.push_value(self.w, self.writeset_buf.0 as u64, self.writeset_buf.1)?; - self.writeset_buf.0 = index; - self.writeset_buf.1 = mask; - } + fn writeset_blocks(&mut self, blocks: &ir::MarkedBlocks) -> Result { + let first = blocks.begin; + let last = first + blocks.len - 1; // inclusive + let mut idx = first >> 6; + let last_idx = last >> 6; // inclusive + let builder = self.writeset_builder.as_mut().unwrap(); + + // emit the bufferred bits + if idx > self.entry_index { + builder.push_value(self.w, self.entry_index as u64, self.writeset_entry)?; + self.entry_index = idx; + self.writeset_entry = 0; } + // buffer the bits of the first entry + let bi_first = first & 63; + if idx == last_idx { + let bi_last = last & 63; + let mask = 1u64 << bi_last; + self.writeset_entry |= (mask ^ mask.wrapping_sub(1)) & (u64::MAX << bi_first); + + return Ok(Visit::Continue); + } + + self.writeset_entry |= u64::MAX << bi_first; + + // emit the all-1 entries if necessary + while idx < last_idx { + builder.push_value(self.w, self.entry_index as u64, self.writeset_entry)?; + self.entry_index += 1; + self.writeset_entry = u64::MAX; + idx += 1; + } + + // buffer the bits of the last entry + builder.push_value(self.w, self.entry_index as u64, self.writeset_entry)?; + let bi_last = last & 63; + let mask = 1u64 << bi_last; + self.entry_index += 1; + self.writeset_entry |= mask ^ mask.wrapping_sub(1); + Ok(Visit::Continue) } diff --git a/src/era/xml.rs b/src/era/xml.rs index 4153a1f..7b54b79 100644 --- a/src/era/xml.rs +++ b/src/era/xml.rs @@ -11,12 +11,18 @@ use crate::xml::*; pub struct XmlWriter { w: Writer, + compact: bool, + nr_blocks: u32, + emitted_blocks: u32, } impl XmlWriter { - pub fn new(w: W) -> XmlWriter { + pub fn new(w: W, compact: bool) -> XmlWriter { XmlWriter { w: Writer::new_with_indent(w, 0x20, 2), + compact, + nr_blocks: 0, + emitted_blocks: 0, } } } @@ -29,8 +35,10 @@ impl MetadataVisitor for XmlWriter { elem.push_attribute(mk_attr(b"block_size", sb.block_size)); elem.push_attribute(mk_attr(b"nr_blocks", sb.nr_blocks)); elem.push_attribute(mk_attr(b"current_era", sb.current_era)); - self.w.write_event(Event::Start(elem))?; + + self.nr_blocks = sb.nr_blocks; + Ok(Visit::Continue) } @@ -46,21 +54,55 @@ impl MetadataVisitor for XmlWriter { elem.push_attribute(mk_attr(b"era", ws.era)); elem.push_attribute(mk_attr(b"nr_bits", ws.nr_bits)); self.w.write_event(Event::Start(elem))?; + + self.emitted_blocks = 0; + Ok(Visit::Continue) } fn writeset_e(&mut self) -> Result { + if !self.compact { + for b in self.emitted_blocks..self.nr_blocks { + let tag = b"bit"; + let mut elem = BytesStart::owned(tag.to_vec(), tag.len()); + elem.push_attribute(mk_attr(b"block", b)); + elem.push_attribute(mk_attr(b"value", "false")); + self.w.write_event(Event::Empty(elem))?; + } + } self.w .write_event(Event::End(BytesEnd::borrowed(b"writeset")))?; Ok(Visit::Continue) } - fn writeset_bit(&mut self, wbit: &WritesetBit) -> Result { - let tag = b"bit"; - let mut elem = BytesStart::owned(tag.to_vec(), tag.len()); - elem.push_attribute(mk_attr(b"block", wbit.block)); - elem.push_attribute(mk_attr(b"value", wbit.value)); - self.w.write_event(Event::Empty(elem))?; + fn writeset_blocks(&mut self, blocks: &MarkedBlocks) -> Result { + if self.compact { + let tag = b"marked"; + let mut elem = BytesStart::owned(tag.to_vec(), tag.len()); + elem.push_attribute(mk_attr(b"block_begin", blocks.begin)); + elem.push_attribute(mk_attr(b"len", blocks.len)); + self.w.write_event(Event::Empty(elem))?; + } else { + for b in self.emitted_blocks..blocks.begin { + let tag = b"bit"; + let mut elem = BytesStart::owned(tag.to_vec(), tag.len()); + elem.push_attribute(mk_attr(b"block", b)); + elem.push_attribute(mk_attr(b"value", "false")); + self.w.write_event(Event::Empty(elem))?; + } + + let end = blocks.begin + blocks.len; + for b in blocks.begin..end { + let tag = b"bit"; + let mut elem = BytesStart::owned(tag.to_vec(), tag.len()); + elem.push_attribute(mk_attr(b"block", b)); + elem.push_attribute(mk_attr(b"value", "true")); + self.w.write_event(Event::Empty(elem))?; + } + + self.emitted_blocks = end; + } + Ok(Visit::Continue) } @@ -139,7 +181,7 @@ fn parse_writeset(e: &BytesStart) -> Result { }) } -fn parse_writeset_bit(e: &BytesStart) -> Result { +fn parse_writeset_bit(e: &BytesStart) -> Result> { let tag = "bit"; let mut block: Option = None; let mut value: Option = None; @@ -153,9 +195,36 @@ fn parse_writeset_bit(e: &BytesStart) -> Result { } } - Ok(WritesetBit { - block: check_attr(tag, "block", block)?, - value: check_attr(tag, "value", value)?, + check_attr(tag, "block", block)?; + check_attr(tag, "value", value)?; + + if let Some(true) = value { + Ok(Some(MarkedBlocks { + begin: block.unwrap(), + len: 1, + })) + } else { + Ok(None) + } +} + +fn parse_writeset_blocks(e: &BytesStart) -> Result { + let tag = "marked"; + let mut begin: Option = None; + let mut len: Option = None; + + for a in e.attributes() { + let kv = a.unwrap(); + match kv.key { + b"block_begin" => begin = Some(u32_val(&kv)?), + b"len" => len = Some(u32_val(&kv)?), + _ => return bad_attr(tag, kv.key), + } + } + + Ok(MarkedBlocks { + begin: check_attr(tag, "block_begin", begin)?, + len: check_attr(tag, "len", len)?, }) } @@ -198,7 +267,14 @@ where _ => return Err(anyhow!("Parse error at byte {}", reader.buffer_position())), }, Ok(Event::Empty(ref e)) => match e.name() { - b"bit" => visitor.writeset_bit(&parse_writeset_bit(e)?), + b"bit" => { + if let Some(b) = parse_writeset_bit(e)? { + visitor.writeset_blocks(&b) + } else { + Ok(Visit::Continue) + } + } + b"marked" => visitor.writeset_blocks(&parse_writeset_blocks(e)?), b"era" => visitor.era(&parse_era(e)?), _ => return Err(anyhow!("Parse error at byte {}", reader.buffer_position())), }, diff --git a/src/pdata/bitset.rs b/src/pdata/bitset.rs index fcd0942..86352b9 100644 --- a/src/pdata/bitset.rs +++ b/src/pdata/bitset.rs @@ -2,10 +2,13 @@ use fixedbitset::FixedBitSet; use std::sync::{Arc, Mutex}; use crate::io_engine::IoEngine; +use crate::math::div_up; use crate::pdata::array::{self, ArrayBlock}; use crate::pdata::array_walker::{ArrayVisitor, ArrayWalker}; use crate::pdata::space_map::*; +//------------------------------------------ + pub struct CheckedBitSet { bits: FixedBitSet, } @@ -30,6 +33,8 @@ impl CheckedBitSet { } } +//------------------------------------------ + struct BitsetVisitor { nr_bits: usize, bits: Mutex, @@ -72,6 +77,55 @@ impl ArrayVisitor for BitsetVisitor { } } +//------------------------------------------ + +struct BitsetCollector { + bits: Mutex, + nr_bits: usize, +} + +impl BitsetCollector { + fn new(nr_bits: usize) -> BitsetCollector { + BitsetCollector { + bits: Mutex::new(FixedBitSet::with_capacity(nr_bits)), + nr_bits, + } + } + + pub fn get_bitset(self) -> FixedBitSet { + self.bits.into_inner().unwrap() + } +} + +impl ArrayVisitor for BitsetCollector { + fn visit(&self, index: u64, b: ArrayBlock) -> array::Result<()> { + let mut bitset = self.bits.lock().unwrap(); + let mut idx = (index as usize * b.header.max_entries as usize) << 1; // index of u32 in bitset array + let idx_end = div_up(self.nr_bits, 32); + let mut dest = bitset.as_mut_slice().iter_mut().skip(idx); + for entry in b.values.iter() { + let lower = (*entry & (u32::MAX as u64)) as u32; + *(dest.next().ok_or_else(|| { + array::value_err(format!("bitset size exceeds limit: {} bits", self.nr_bits)) + })?) = lower; + idx += 1; + + if idx == idx_end { + break; + } + + let upper = (*entry >> 32) as u32; + *(dest.next().ok_or_else(|| { + array::value_err(format!("bitset size exceeds limit: {} bits", self.nr_bits)) + })?) = upper; + idx += 1; + } + Ok(()) + } +} + +//------------------------------------------ + // TODO: multi-threaded is possible pub fn read_bitset( engine: Arc, @@ -106,3 +160,15 @@ pub fn read_bitset_with_sm( }; Ok((v.get_bitset(), e)) } + +pub fn read_bitset_no_err( + engine: Arc, + root: u64, + nr_bits: usize, + ignore_none_fatal: bool, +) -> array::Result { + let w = ArrayWalker::new(engine, ignore_none_fatal); + let mut v = BitsetCollector::new(nr_bits); + w.walk(&mut v, root)?; + Ok(v.get_bitset()) +}