[era_dump/restore (rust)] Support displaying writeset blocks in ranges

- Group adjacent marked blocks into ranges
- Show marked blocks only, and ignore untouched blocks
This commit is contained in:
Ming-Hung Tsai 2021-09-29 16:57:04 +08:00
parent 4559039066
commit 8fa7f96dfd
5 changed files with 263 additions and 51 deletions

View File

@ -1,4 +1,3 @@
use anyhow::anyhow;
use std::fs::File;
use std::io::BufWriter;
use std::io::Write;
@ -12,7 +11,7 @@ use crate::era::xml;
use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine};
use crate::pdata::array::{self, ArrayBlock};
use crate::pdata::array_walker::*;
use crate::pdata::bitset::read_bitset;
use crate::pdata::bitset::read_bitset_no_err;
use crate::pdata::btree_walker::btree_to_map;
//------------------------------------------
@ -78,6 +77,7 @@ fn mk_context(opts: &EraDumpOptions) -> anyhow::Result<Context> {
Ok(Context { engine })
}
// notify the visitor about the marked blocks only
fn dump_writeset(
engine: Arc<dyn IoEngine + Send + Sync>,
out: &mut dyn MetadataVisitor,
@ -85,27 +85,68 @@ fn dump_writeset(
ws: &Writeset,
repair: bool,
) -> anyhow::Result<()> {
let (bits, errs) = read_bitset(engine.clone(), ws.root, ws.nr_bits as usize, repair);
// TODO: deal with broken writeset
if errs.is_some() {
return Err(anyhow!(
"errors in writeset of era {}: {}",
era,
errs.unwrap()
));
}
let bits = read_bitset_no_err(engine.clone(), ws.root, ws.nr_bits as usize, repair)?;
out.writeset_b(&ir::Writeset {
era,
nr_bits: ws.nr_bits,
})?;
for b in 0..ws.nr_bits {
let wbit = ir::WritesetBit {
block: b,
value: bits.contains(b as usize).unwrap_or(false),
};
out.writeset_bit(&wbit)?;
// [begin, end) denotes the range of set bits.
let mut begin: u32 = 0;
let mut end: u32 = 0;
for (index, entry) in bits.as_slice().iter().enumerate() {
let mut n = *entry;
if n == u32::MAX {
end = std::cmp::min(end + 32, ws.nr_bits);
continue;
}
while n > 0 {
let zeros = n.trailing_zeros();
if zeros > 0 {
if end > begin {
let m = ir::MarkedBlocks {
begin,
len: end - begin,
};
out.writeset_blocks(&m)?;
}
n >>= zeros;
end += zeros;
begin = end;
}
let ones = n.trailing_ones();
n >>= ones;
end = std::cmp::min(end + ones, ws.nr_bits);
}
// emit the range if it ends before the entry boundary
let endpos = ((index as u32) << 5) + 32;
if end < endpos {
if end > begin {
let m = ir::MarkedBlocks {
begin,
len: end - begin,
};
out.writeset_blocks(&m)?;
}
begin = endpos;
end = begin;
}
}
if end > begin {
let m = ir::MarkedBlocks {
begin,
len: end - begin,
};
out.writeset_blocks(&m)?;
}
out.writeset_e()?;
Ok(())
@ -164,7 +205,7 @@ pub fn dump(opts: EraDumpOptions) -> anyhow::Result<()> {
} else {
writer = Box::new(BufWriter::new(std::io::stdout()));
}
let mut out = xml::XmlWriter::new(writer);
let mut out = xml::XmlWriter::new(writer, false);
if opts.logical {
dump_metadata_logical(ctx.engine, &mut out, &sb, opts.repair)

View File

@ -17,9 +17,9 @@ pub struct Writeset {
}
#[derive(Clone)]
pub struct WritesetBit {
pub block: u32,
pub value: bool,
pub struct MarkedBlocks {
pub begin: u32,
pub len: u32,
}
#[derive(Clone)]
@ -42,7 +42,7 @@ pub trait MetadataVisitor {
fn writeset_b(&mut self, ws: &Writeset) -> Result<Visit>;
fn writeset_e(&mut self) -> Result<Visit>;
fn writeset_bit(&mut self, wbit: &WritesetBit) -> Result<Visit>;
fn writeset_blocks(&mut self, blocks: &MarkedBlocks) -> Result<Visit>;
fn era_b(&mut self) -> Result<Visit>;
fn era_e(&mut self) -> Result<Visit>;

View File

@ -70,7 +70,8 @@ pub struct Restorer<'a> {
writeset_builder: Option<ArrayBuilder<u64>>, // bitset
current_writeset: Option<ir::Writeset>,
era_array_builder: Option<ArrayBuilder<u32>>,
writeset_buf: (u32, u64), // (index in u64 array, value)
writeset_entry: u64,
entry_index: u32,
in_section: Section,
}
@ -83,7 +84,8 @@ impl<'a> Restorer<'a> {
writeset_builder: None,
current_writeset: None,
era_array_builder: None,
writeset_buf: (0, 0),
writeset_entry: 0,
entry_index: 0,
in_section: Section::None,
}
}
@ -170,8 +172,8 @@ impl<'a> MetadataVisitor for Restorer<'a> {
return Err(anyhow!("not in superblock"));
}
self.writeset_builder = Some(ArrayBuilder::new(div_up(ws.nr_bits as u64, 64)));
self.writeset_buf.0 = 0;
self.writeset_buf.1 = 0;
self.entry_index = 0;
self.writeset_entry = 0;
self.current_writeset = Some(ws.clone());
self.in_section = Section::Writeset;
Ok(Visit::Continue)
@ -185,7 +187,7 @@ impl<'a> MetadataVisitor for Restorer<'a> {
if let Some(mut builder) = self.writeset_builder.take() {
if let Some(ws) = self.current_writeset.take() {
// push the trailing bits
builder.push_value(self.w, self.writeset_buf.0 as u64, self.writeset_buf.1)?;
builder.push_value(self.w, self.entry_index as u64, self.writeset_entry)?;
let root = builder.complete(self.w)?;
self.writesets.insert(
@ -206,20 +208,47 @@ impl<'a> MetadataVisitor for Restorer<'a> {
Ok(Visit::Continue)
}
fn writeset_bit(&mut self, wbit: &ir::WritesetBit) -> Result<Visit> {
if wbit.value {
let index = wbit.block >> 6;
let mask = 1 << (wbit.block & 63);
if index == self.writeset_buf.0 {
self.writeset_buf.1 |= mask;
} else {
let builder = self.writeset_builder.as_mut().unwrap();
builder.push_value(self.w, self.writeset_buf.0 as u64, self.writeset_buf.1)?;
self.writeset_buf.0 = index;
self.writeset_buf.1 = mask;
}
fn writeset_blocks(&mut self, blocks: &ir::MarkedBlocks) -> Result<Visit> {
let first = blocks.begin;
let last = first + blocks.len - 1; // inclusive
let mut idx = first >> 6;
let last_idx = last >> 6; // inclusive
let builder = self.writeset_builder.as_mut().unwrap();
// emit the bufferred bits
if idx > self.entry_index {
builder.push_value(self.w, self.entry_index as u64, self.writeset_entry)?;
self.entry_index = idx;
self.writeset_entry = 0;
}
// buffer the bits of the first entry
let bi_first = first & 63;
if idx == last_idx {
let bi_last = last & 63;
let mask = 1u64 << bi_last;
self.writeset_entry |= (mask ^ mask.wrapping_sub(1)) & (u64::MAX << bi_first);
return Ok(Visit::Continue);
}
self.writeset_entry |= u64::MAX << bi_first;
// emit the all-1 entries if necessary
while idx < last_idx {
builder.push_value(self.w, self.entry_index as u64, self.writeset_entry)?;
self.entry_index += 1;
self.writeset_entry = u64::MAX;
idx += 1;
}
// buffer the bits of the last entry
builder.push_value(self.w, self.entry_index as u64, self.writeset_entry)?;
let bi_last = last & 63;
let mask = 1u64 << bi_last;
self.entry_index += 1;
self.writeset_entry |= mask ^ mask.wrapping_sub(1);
Ok(Visit::Continue)
}

View File

@ -11,12 +11,18 @@ use crate::xml::*;
pub struct XmlWriter<W: Write> {
w: Writer<W>,
compact: bool,
nr_blocks: u32,
emitted_blocks: u32,
}
impl<W: Write> XmlWriter<W> {
pub fn new(w: W) -> XmlWriter<W> {
pub fn new(w: W, compact: bool) -> XmlWriter<W> {
XmlWriter {
w: Writer::new_with_indent(w, 0x20, 2),
compact,
nr_blocks: 0,
emitted_blocks: 0,
}
}
}
@ -29,8 +35,10 @@ impl<W: Write> MetadataVisitor for XmlWriter<W> {
elem.push_attribute(mk_attr(b"block_size", sb.block_size));
elem.push_attribute(mk_attr(b"nr_blocks", sb.nr_blocks));
elem.push_attribute(mk_attr(b"current_era", sb.current_era));
self.w.write_event(Event::Start(elem))?;
self.nr_blocks = sb.nr_blocks;
Ok(Visit::Continue)
}
@ -46,21 +54,55 @@ impl<W: Write> MetadataVisitor for XmlWriter<W> {
elem.push_attribute(mk_attr(b"era", ws.era));
elem.push_attribute(mk_attr(b"nr_bits", ws.nr_bits));
self.w.write_event(Event::Start(elem))?;
self.emitted_blocks = 0;
Ok(Visit::Continue)
}
fn writeset_e(&mut self) -> Result<Visit> {
if !self.compact {
for b in self.emitted_blocks..self.nr_blocks {
let tag = b"bit";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"block", b));
elem.push_attribute(mk_attr(b"value", "false"));
self.w.write_event(Event::Empty(elem))?;
}
}
self.w
.write_event(Event::End(BytesEnd::borrowed(b"writeset")))?;
Ok(Visit::Continue)
}
fn writeset_bit(&mut self, wbit: &WritesetBit) -> Result<Visit> {
let tag = b"bit";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"block", wbit.block));
elem.push_attribute(mk_attr(b"value", wbit.value));
self.w.write_event(Event::Empty(elem))?;
fn writeset_blocks(&mut self, blocks: &MarkedBlocks) -> Result<Visit> {
if self.compact {
let tag = b"marked";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"block_begin", blocks.begin));
elem.push_attribute(mk_attr(b"len", blocks.len));
self.w.write_event(Event::Empty(elem))?;
} else {
for b in self.emitted_blocks..blocks.begin {
let tag = b"bit";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"block", b));
elem.push_attribute(mk_attr(b"value", "false"));
self.w.write_event(Event::Empty(elem))?;
}
let end = blocks.begin + blocks.len;
for b in blocks.begin..end {
let tag = b"bit";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"block", b));
elem.push_attribute(mk_attr(b"value", "true"));
self.w.write_event(Event::Empty(elem))?;
}
self.emitted_blocks = end;
}
Ok(Visit::Continue)
}
@ -139,7 +181,7 @@ fn parse_writeset(e: &BytesStart) -> Result<Writeset> {
})
}
fn parse_writeset_bit(e: &BytesStart) -> Result<WritesetBit> {
fn parse_writeset_bit(e: &BytesStart) -> Result<Option<MarkedBlocks>> {
let tag = "bit";
let mut block: Option<u32> = None;
let mut value: Option<bool> = None;
@ -153,9 +195,36 @@ fn parse_writeset_bit(e: &BytesStart) -> Result<WritesetBit> {
}
}
Ok(WritesetBit {
block: check_attr(tag, "block", block)?,
value: check_attr(tag, "value", value)?,
check_attr(tag, "block", block)?;
check_attr(tag, "value", value)?;
if let Some(true) = value {
Ok(Some(MarkedBlocks {
begin: block.unwrap(),
len: 1,
}))
} else {
Ok(None)
}
}
fn parse_writeset_blocks(e: &BytesStart) -> Result<MarkedBlocks> {
let tag = "marked";
let mut begin: Option<u32> = None;
let mut len: Option<u32> = None;
for a in e.attributes() {
let kv = a.unwrap();
match kv.key {
b"block_begin" => begin = Some(u32_val(&kv)?),
b"len" => len = Some(u32_val(&kv)?),
_ => return bad_attr(tag, kv.key),
}
}
Ok(MarkedBlocks {
begin: check_attr(tag, "block_begin", begin)?,
len: check_attr(tag, "len", len)?,
})
}
@ -198,7 +267,14 @@ where
_ => return Err(anyhow!("Parse error at byte {}", reader.buffer_position())),
},
Ok(Event::Empty(ref e)) => match e.name() {
b"bit" => visitor.writeset_bit(&parse_writeset_bit(e)?),
b"bit" => {
if let Some(b) = parse_writeset_bit(e)? {
visitor.writeset_blocks(&b)
} else {
Ok(Visit::Continue)
}
}
b"marked" => visitor.writeset_blocks(&parse_writeset_blocks(e)?),
b"era" => visitor.era(&parse_era(e)?),
_ => return Err(anyhow!("Parse error at byte {}", reader.buffer_position())),
},

View File

@ -2,10 +2,13 @@ use fixedbitset::FixedBitSet;
use std::sync::{Arc, Mutex};
use crate::io_engine::IoEngine;
use crate::math::div_up;
use crate::pdata::array::{self, ArrayBlock};
use crate::pdata::array_walker::{ArrayVisitor, ArrayWalker};
use crate::pdata::space_map::*;
//------------------------------------------
pub struct CheckedBitSet {
bits: FixedBitSet,
}
@ -30,6 +33,8 @@ impl CheckedBitSet {
}
}
//------------------------------------------
struct BitsetVisitor {
nr_bits: usize,
bits: Mutex<CheckedBitSet>,
@ -72,6 +77,55 @@ impl ArrayVisitor<u64> for BitsetVisitor {
}
}
//------------------------------------------
struct BitsetCollector {
bits: Mutex<FixedBitSet>,
nr_bits: usize,
}
impl BitsetCollector {
fn new(nr_bits: usize) -> BitsetCollector {
BitsetCollector {
bits: Mutex::new(FixedBitSet::with_capacity(nr_bits)),
nr_bits,
}
}
pub fn get_bitset(self) -> FixedBitSet {
self.bits.into_inner().unwrap()
}
}
impl ArrayVisitor<u64> for BitsetCollector {
fn visit(&self, index: u64, b: ArrayBlock<u64>) -> array::Result<()> {
let mut bitset = self.bits.lock().unwrap();
let mut idx = (index as usize * b.header.max_entries as usize) << 1; // index of u32 in bitset array
let idx_end = div_up(self.nr_bits, 32);
let mut dest = bitset.as_mut_slice().iter_mut().skip(idx);
for entry in b.values.iter() {
let lower = (*entry & (u32::MAX as u64)) as u32;
*(dest.next().ok_or_else(|| {
array::value_err(format!("bitset size exceeds limit: {} bits", self.nr_bits))
})?) = lower;
idx += 1;
if idx == idx_end {
break;
}
let upper = (*entry >> 32) as u32;
*(dest.next().ok_or_else(|| {
array::value_err(format!("bitset size exceeds limit: {} bits", self.nr_bits))
})?) = upper;
idx += 1;
}
Ok(())
}
}
//------------------------------------------
// TODO: multi-threaded is possible
pub fn read_bitset(
engine: Arc<dyn IoEngine + Send + Sync>,
@ -106,3 +160,15 @@ pub fn read_bitset_with_sm(
};
Ok((v.get_bitset(), e))
}
pub fn read_bitset_no_err(
engine: Arc<dyn IoEngine + Send + Sync>,
root: u64,
nr_bits: usize,
ignore_none_fatal: bool,
) -> array::Result<FixedBitSet> {
let w = ArrayWalker::new(engine, ignore_none_fatal);
let mut v = BitsetCollector::new(nr_bits);
w.walk(&mut v, root)?;
Ok(v.get_bitset())
}