[era_dump (rust)] Implement logical dump

This commit is contained in:
Ming-Hung Tsai 2021-10-06 23:08:47 +08:00
parent cbc3baba45
commit 89e568b897

View File

@ -1,6 +1,10 @@
use anyhow::Result;
use fixedbitset::FixedBitSet;
use std::convert::TryFrom;
use std::fs::File;
use std::io::BufWriter;
use std::io::Write;
use std::ops::Deref;
use std::path::Path;
use std::sync::{Arc, Mutex};
@ -12,7 +16,7 @@ use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine};
use crate::pdata::array::{self, ArrayBlock};
use crate::pdata::array_walker::*;
use crate::pdata::bitset::read_bitset_no_err;
use crate::pdata::btree_walker::btree_to_map;
use crate::pdata::btree_walker::{btree_to_key_set, btree_to_map};
//------------------------------------------
@ -52,6 +56,112 @@ impl<'a> ArrayVisitor<u32> for EraEmitter<'a> {
//------------------------------------------
trait Archive {
fn set(&mut self, key: u32, value: u32) -> Result<()>;
fn get(&self, key: u32) -> Result<u32>;
}
// in-core archive of writeset eras
struct EraArchive<T> {
digested_era: u32, // maximum possible era in the era array
deltas: Vec<T>,
}
fn new_era_archive(nr_blocks: u32, archived_min: u32, nr_writesets: u32) -> Box<dyn Archive> {
match nr_writesets + 1 {
0..=255 => Box::new(EraArchive {
digested_era: archived_min.wrapping_sub(1),
deltas: vec![0u8; nr_blocks as usize],
}),
256..=65535 => Box::new(EraArchive {
digested_era: archived_min.wrapping_sub(1),
deltas: vec![0u16; nr_blocks as usize],
}),
_ => Box::new(EraArchive {
digested_era: archived_min.wrapping_sub(1),
deltas: vec![0u32; nr_blocks as usize],
}),
}
}
impl<T: std::convert::TryFrom<u32>> Archive for EraArchive<T>
where
T: Copy + Into<u32> + TryFrom<u32>,
<T as TryFrom<u32>>::Error: std::fmt::Debug,
{
fn set(&mut self, block: u32, delta: u32) -> Result<()> {
self.deltas[block as usize] = T::try_from(delta).unwrap();
Ok(())
}
fn get(&self, block: u32) -> Result<u32> {
let delta: u32 = self.deltas[block as usize].into();
if delta > 0 {
Ok(self.digested_era.wrapping_add(delta))
} else {
Ok(0)
}
}
}
//------------------------------------------
struct Inner<'a> {
emitter: &'a mut dyn MetadataVisitor,
era_archive: &'a dyn Archive,
}
struct LogicalEraEmitter<'a> {
inner: Mutex<Inner<'a>>,
}
impl<'a> LogicalEraEmitter<'a> {
pub fn new(
emitter: &'a mut dyn MetadataVisitor,
era_archive: &'a dyn Archive,
) -> LogicalEraEmitter<'a> {
LogicalEraEmitter {
inner: Mutex::new(Inner {
emitter,
era_archive,
}),
}
}
}
impl<'a> ArrayVisitor<u32> for LogicalEraEmitter<'a> {
fn visit(&self, index: u64, b: ArrayBlock<u32>) -> array::Result<()> {
let mut inner = self.inner.lock().unwrap();
let begin = index as u32 * b.header.max_entries;
let end = begin + b.header.nr_entries;
for (v, block) in b.values.iter().zip(begin..end) {
let archived = inner
.era_archive
.get(block)
.map_err(|e| array::value_err(format!("{}", e)))?;
let era = if archived > 0 {
ir::Era {
block,
era: archived,
}
} else {
ir::Era { block, era: *v }
};
inner
.emitter
.era(&era)
.map_err(|e| array::value_err(format!("{}", e)))?;
}
Ok(())
}
}
//------------------------------------------
pub struct EraDumpOptions<'a> {
pub input: &'a Path,
pub output: Option<&'a Path>,
@ -185,16 +295,76 @@ pub fn dump_metadata(
Ok(())
}
pub fn dump_metadata_logical(
_engine: Arc<dyn IoEngine + Send + Sync>,
_out: &mut dyn MetadataVisitor,
_sb: &Superblock,
_repair: bool,
) -> anyhow::Result<()> {
// TODO
//-----------------------------------------
fn collate_writeset(index: u32, bitset: &FixedBitSet, archive: &mut dyn Archive) -> Result<()> {
let era_delta = index + 1;
for (i, entry) in bitset.as_slice().iter().enumerate() {
let mut bi = (i << 5) as u32;
let mut n = *entry;
while n > 0 {
if n & 0x1 > 0 {
archive.set(bi, era_delta)?;
}
n >>= 1;
bi += 1;
}
}
Ok(())
}
fn collate_writesets(
engine: Arc<dyn IoEngine + Send + Sync>,
sb: &Superblock,
repair: bool,
) -> Result<Box<dyn Archive>> {
let mut path = vec![0];
let writesets =
btree_to_map::<Writeset>(&mut path, engine.clone(), repair, sb.writeset_tree_root)?;
let archived_min = writesets.iter().next().map_or(0u32, |(k, _v)| *k as u32);
let mut archive = new_era_archive(sb.nr_blocks, archived_min, writesets.len() as u32);
for (index, (_era, ws)) in writesets.iter().enumerate() {
let bitset = read_bitset_no_err(engine.clone(), ws.root, ws.nr_bits as usize, repair)?;
collate_writeset(index as u32, &bitset, archive.as_mut())?;
}
Ok(archive)
}
pub fn dump_metadata_logical(
engine: Arc<dyn IoEngine + Send + Sync>,
out: &mut dyn MetadataVisitor,
sb: &Superblock,
repair: bool,
) -> anyhow::Result<()> {
let era_archive = collate_writesets(engine.clone(), sb, repair)?;
let xml_sb = ir::Superblock {
uuid: "".to_string(),
block_size: sb.data_block_size,
nr_blocks: sb.nr_blocks,
current_era: sb.current_era,
};
out.superblock_b(&xml_sb)?;
out.era_b()?;
let w = ArrayWalker::new(engine, repair);
let mut emitter = LogicalEraEmitter::new(out, era_archive.deref());
w.walk(&mut emitter, sb.era_array_root)?;
out.era_e()?;
out.superblock_e()?;
out.eof()?;
Ok(())
}
//-----------------------------------------
pub fn dump(opts: EraDumpOptions) -> anyhow::Result<()> {
let ctx = mk_context(&opts)?;
let sb = read_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION)?;
@ -207,7 +377,15 @@ pub fn dump(opts: EraDumpOptions) -> anyhow::Result<()> {
}
let mut out = xml::XmlWriter::new(writer, false);
if opts.logical {
let mut path = vec![0];
let writesets = btree_to_key_set::<Writeset>(
&mut path,
ctx.engine.clone(),
opts.repair,
sb.writeset_tree_root,
)?;
if opts.logical && !writesets.is_empty() {
dump_metadata_logical(ctx.engine, &mut out, &sb, opts.repair)
} else {
dump_metadata(ctx.engine, &mut out, &sb, opts.repair)