[era_dump (rust)] First code drop

This commit is contained in:
Ming-Hung Tsai 2021-09-16 17:09:47 +08:00
parent 8a1399e3bb
commit 55a81c0b9f
7 changed files with 408 additions and 0 deletions

View File

@ -36,6 +36,8 @@ fn main_() -> Result<()> {
cache_restore::run(&new_args);
} else if name_eq(name, "era_check") {
era_check::run(&new_args);
} else if name_eq(name, "era_dump") {
era_dump::run(&new_args);
} else if name_eq(name, "thin_check") {
thin_check::run(&new_args);
} else if name_eq(name, "thin_dump") {

79
src/commands/era_dump.rs Normal file
View File

@ -0,0 +1,79 @@
extern crate clap;
use clap::{App, Arg};
use std::path::Path;
use std::process;
use crate::commands::utils::*;
use crate::era::dump::{dump, EraDumpOptions};
//------------------------------------------
pub fn run(args: &[std::ffi::OsString]) {
let parser = App::new("era_dump")
.version(crate::version::tools_version())
.about("Dump the era metadata to stdout in XML format")
// flags
.arg(
Arg::with_name("ASYNC_IO")
.help("Force use of io_uring for synchronous io")
.long("async-io")
.hidden(true),
)
.arg(
Arg::with_name("LOGICAL")
.help("Fold any unprocessed write sets into the final era array")
.long("logical"),
)
.arg(
Arg::with_name("REPAIR")
.help("Repair the metadata whilst dumping it")
.short("r")
.long("repair"),
)
// options
.arg(
Arg::with_name("OUTPUT")
.help("Specify the output file rather than stdout")
.short("o")
.long("output")
.value_name("FILE"),
)
// arguments
.arg(
Arg::with_name("INPUT")
.help("Specify the input device to dump")
.required(true)
.index(1),
);
let matches = parser.get_matches_from(args);
let input_file = Path::new(matches.value_of("INPUT").unwrap());
let output_file = if matches.is_present("OUTPUT") {
Some(Path::new(matches.value_of("OUTPUT").unwrap()))
} else {
None
};
// Create a temporary report just in case these checks
// need to report anything.
let report = std::sync::Arc::new(crate::report::mk_simple_report());
check_input_file(input_file, &report);
check_file_not_tiny(input_file, &report);
drop(report);
let opts = EraDumpOptions {
input: input_file,
output: output_file,
async_io: matches.is_present("ASYNC_IO"),
logical: matches.is_present("LOGICAL"),
repair: matches.is_present("REPAIR"),
};
if let Err(reason) = dump(opts) {
eprintln!("{}", reason);
process::exit(1);
}
}
//------------------------------------------

View File

@ -3,6 +3,7 @@ pub mod cache_dump;
pub mod cache_repair;
pub mod cache_restore;
pub mod era_check;
pub mod era_dump;
pub mod thin_check;
pub mod thin_dump;
pub mod thin_metadata_pack;

176
src/era/dump.rs Normal file
View File

@ -0,0 +1,176 @@
use anyhow::anyhow;
use std::fs::File;
use std::io::BufWriter;
use std::io::Write;
use std::path::Path;
use std::sync::{Arc, Mutex};
use crate::era::ir::{self, MetadataVisitor};
use crate::era::superblock::*;
use crate::era::writeset::Writeset;
use crate::era::xml;
use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine};
use crate::pdata::array::{self, ArrayBlock};
use crate::pdata::array_walker::*;
use crate::pdata::bitset::read_bitset;
use crate::pdata::btree_walker::btree_to_map;
//------------------------------------------
const MAX_CONCURRENT_IO: u32 = 1024;
//-----------------------------------------
struct EraEmitter<'a> {
emitter: Mutex<&'a mut dyn MetadataVisitor>,
}
impl<'a> EraEmitter<'a> {
pub fn new(emitter: &'a mut dyn MetadataVisitor) -> EraEmitter {
EraEmitter {
emitter: Mutex::new(emitter),
}
}
}
impl<'a> ArrayVisitor<u32> for EraEmitter<'a> {
fn visit(&self, index: u64, b: ArrayBlock<u32>) -> array::Result<()> {
let begin = index as u32 * b.header.max_entries;
let end = begin + b.header.nr_entries;
for (v, block) in b.values.iter().zip(begin..end) {
let era = ir::Era { block, era: *v };
self.emitter
.lock()
.unwrap()
.era(&era)
.map_err(|e| array::value_err(format!("{}", e)))?;
}
Ok(())
}
}
//------------------------------------------
pub struct EraDumpOptions<'a> {
pub input: &'a Path,
pub output: Option<&'a Path>,
pub async_io: bool,
pub logical: bool,
pub repair: bool,
}
struct Context {
engine: Arc<dyn IoEngine + Send + Sync>,
}
fn mk_context(opts: &EraDumpOptions) -> anyhow::Result<Context> {
let engine: Arc<dyn IoEngine + Send + Sync>;
if opts.async_io {
engine = Arc::new(AsyncIoEngine::new(opts.input, MAX_CONCURRENT_IO, false)?);
} else {
let nr_threads = std::cmp::max(8, num_cpus::get() * 2);
engine = Arc::new(SyncIoEngine::new(opts.input, nr_threads, false)?);
}
Ok(Context { engine })
}
fn dump_writeset(
engine: Arc<dyn IoEngine + Send + Sync>,
out: &mut dyn MetadataVisitor,
era: u32,
ws: &Writeset,
repair: bool,
) -> anyhow::Result<()> {
let (bits, errs) = read_bitset(engine.clone(), ws.root, ws.nr_bits as usize, repair);
// TODO: deal with broken writeset
if errs.is_some() {
return Err(anyhow!(
"errors in writeset of era {}: {}",
era,
errs.unwrap()
));
}
out.writeset_b(&ir::Writeset {
era,
nr_bits: ws.nr_bits,
})?;
for b in 0..ws.nr_bits {
let wbit = ir::WritesetBit {
block: b,
value: bits.contains(b as usize).unwrap_or(false),
};
out.writeset_bit(&wbit)?;
}
out.writeset_e()?;
Ok(())
}
pub fn dump_metadata(
engine: Arc<dyn IoEngine + Send + Sync>,
out: &mut dyn MetadataVisitor,
sb: &Superblock,
repair: bool,
) -> anyhow::Result<()> {
let xml_sb = ir::Superblock {
uuid: "".to_string(),
block_size: sb.data_block_size,
nr_blocks: sb.nr_blocks,
current_era: sb.current_era,
};
out.superblock_b(&xml_sb)?;
let mut path = vec![0];
let writesets =
btree_to_map::<Writeset>(&mut path, engine.clone(), repair, sb.writeset_tree_root)?;
for (era, ws) in writesets.iter() {
dump_writeset(engine.clone(), out, *era as u32, ws, repair)?;
}
out.era_b()?;
let w = ArrayWalker::new(engine.clone(), repair);
let mut emitter = EraEmitter::new(out);
w.walk(&mut emitter, sb.era_array_root)?;
out.era_e()?;
out.superblock_e()?;
out.eof()?;
Ok(())
}
pub fn dump_metadata_logical(
_engine: Arc<dyn IoEngine + Send + Sync>,
_out: &mut dyn MetadataVisitor,
_sb: &Superblock,
_repair: bool,
) -> anyhow::Result<()> {
// TODO
Ok(())
}
pub fn dump(opts: EraDumpOptions) -> anyhow::Result<()> {
let ctx = mk_context(&opts)?;
let sb = read_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION)?;
let writer: Box<dyn Write>;
if opts.output.is_some() {
writer = Box::new(BufWriter::new(File::create(opts.output.unwrap())?));
} else {
writer = Box::new(BufWriter::new(std::io::stdout()));
}
let mut out = xml::XmlWriter::new(writer);
if opts.logical {
dump_metadata_logical(ctx.engine, &mut out, &sb, opts.repair)
} else {
dump_metadata(ctx.engine, &mut out, &sb, opts.repair)
}
}
//------------------------------------------

54
src/era/ir.rs Normal file
View File

@ -0,0 +1,54 @@
use anyhow::Result;
//------------------------------------------
#[derive(Clone)]
pub struct Superblock {
pub uuid: String,
pub block_size: u32,
pub nr_blocks: u32,
pub current_era: u32,
}
#[derive(Clone)]
pub struct Writeset {
pub era: u32,
pub nr_bits: u32,
}
#[derive(Clone)]
pub struct WritesetBit {
pub block: u32,
pub value: bool,
}
#[derive(Clone)]
pub struct Era {
pub block: u32,
pub era: u32,
}
//------------------------------------------
#[derive(Clone)]
pub enum Visit {
Continue,
Stop,
}
pub trait MetadataVisitor {
fn superblock_b(&mut self, sb: &Superblock) -> Result<Visit>;
fn superblock_e(&mut self) -> Result<Visit>;
fn writeset_b(&mut self, ws: &Writeset) -> Result<Visit>;
fn writeset_e(&mut self) -> Result<Visit>;
fn writeset_bit(&mut self, wbit: &WritesetBit) -> Result<Visit>;
fn era_b(&mut self) -> Result<Visit>;
fn era_e(&mut self) -> Result<Visit>;
fn era(&mut self, era: &Era) -> Result<Visit>;
fn eof(&mut self) -> Result<Visit>;
}
//------------------------------------------

View File

@ -1,3 +1,6 @@
pub mod check;
pub mod dump;
pub mod ir;
pub mod superblock;
pub mod writeset;
pub mod xml;

93
src/era/xml.rs Normal file
View File

@ -0,0 +1,93 @@
use anyhow::Result;
use quick_xml::events::{BytesEnd, BytesStart, Event};
use quick_xml::Writer;
use std::io::Write;
use crate::era::ir::*;
use crate::xml::*;
//---------------------------------------
pub struct XmlWriter<W: Write> {
w: Writer<W>,
}
impl<W: Write> XmlWriter<W> {
pub fn new(w: W) -> XmlWriter<W> {
XmlWriter {
w: Writer::new_with_indent(w, 0x20, 2),
}
}
}
impl<W: Write> MetadataVisitor for XmlWriter<W> {
fn superblock_b(&mut self, sb: &Superblock) -> Result<Visit> {
let tag = b"superblock";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"uuid", sb.uuid.clone()));
elem.push_attribute(mk_attr(b"block_size", sb.block_size));
elem.push_attribute(mk_attr(b"nr_blocks", sb.nr_blocks));
elem.push_attribute(mk_attr(b"current_era", sb.current_era));
self.w.write_event(Event::Start(elem))?;
Ok(Visit::Continue)
}
fn superblock_e(&mut self) -> Result<Visit> {
self.w
.write_event(Event::End(BytesEnd::borrowed(b"superblock")))?;
Ok(Visit::Continue)
}
fn writeset_b(&mut self, ws: &Writeset) -> Result<Visit> {
let tag = b"writeset";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"era", ws.era));
elem.push_attribute(mk_attr(b"nr_bits", ws.nr_bits));
self.w.write_event(Event::Start(elem))?;
Ok(Visit::Continue)
}
fn writeset_e(&mut self) -> Result<Visit> {
self.w
.write_event(Event::End(BytesEnd::borrowed(b"writeset")))?;
Ok(Visit::Continue)
}
fn writeset_bit(&mut self, wbit: &WritesetBit) -> Result<Visit> {
let tag = b"bit";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"block", wbit.block));
elem.push_attribute(mk_attr(b"value", wbit.value));
self.w.write_event(Event::Empty(elem))?;
Ok(Visit::Continue)
}
fn era_b(&mut self) -> Result<Visit> {
let tag = b"era_array";
let elem = BytesStart::owned(tag.to_vec(), tag.len());
self.w.write_event(Event::Start(elem))?;
Ok(Visit::Continue)
}
fn era_e(&mut self) -> Result<Visit> {
self.w
.write_event(Event::End(BytesEnd::borrowed(b"era_array")))?;
Ok(Visit::Continue)
}
fn era(&mut self, era: &Era) -> Result<Visit> {
let tag = b"era";
let mut elem = BytesStart::owned(tag.to_vec(), tag.len());
elem.push_attribute(mk_attr(b"block", era.block));
elem.push_attribute(mk_attr(b"era", era.era));
self.w.write_event(Event::Empty(elem))?;
Ok(Visit::Continue)
}
fn eof(&mut self) -> Result<Visit> {
Ok(Visit::Continue)
}
}
//------------------------------------------