From ce94ba73a554e8db58aa282d809a57c21b633992 Mon Sep 17 00:00:00 2001 From: Ming-Hung Tsai Date: Thu, 20 May 2021 11:42:17 +0800 Subject: [PATCH] [cache_restore (rust)] First draft --- src/bin/cache_restore.rs | 77 +++++++++++ src/cache/mod.rs | 1 + src/cache/restore.rs | 273 +++++++++++++++++++++++++++++++++++++ src/pdata/array_builder.rs | 60 +++++--- src/pdata/mod.rs | 1 + 5 files changed, 393 insertions(+), 19 deletions(-) create mode 100644 src/bin/cache_restore.rs create mode 100644 src/cache/restore.rs diff --git a/src/bin/cache_restore.rs b/src/bin/cache_restore.rs new file mode 100644 index 0000000..9375c91 --- /dev/null +++ b/src/bin/cache_restore.rs @@ -0,0 +1,77 @@ +extern crate clap; +extern crate thinp; + +use atty::Stream; +use clap::{App, Arg}; +use std::path::Path; +use std::process; +use std::process::exit; +use std::sync::Arc; +use thinp::cache::restore::{restore, CacheRestoreOptions}; +use thinp::file_utils; +use thinp::report::*; + +fn main() { + let parser = App::new("cache_restore") + .version(thinp::version::tools_version()) + .about("Convert XML format metadata to binary.") + .arg( + Arg::with_name("OVERRIDE_MAPPING_ROOT") + .help("Specify a mapping root to use") + .long("override-mapping-root") + .value_name("OVERRIDE_MAPPING_ROOT") + .takes_value(true), + ) + .arg( + Arg::with_name("INPUT") + .help("Specify the input xml") + .short("i") + .long("input") + .value_name("INPUT") + .required(true), + ) + .arg( + Arg::with_name("OUTPUT") + .help("Specify the output device to check") + .short("o") + .long("output") + .value_name("OUTPUT") + .required(true), + ) + .arg( + Arg::with_name("SYNC_IO") + .help("Force use of synchronous io") + .long("sync-io"), + ); + + let matches = parser.get_matches(); + let input_file = Path::new(matches.value_of("INPUT").unwrap()); + let output_file = Path::new(matches.value_of("OUTPUT").unwrap()); + + if !file_utils::file_exists(input_file) { + eprintln!("Couldn't find input file '{:?}'.", &input_file); + exit(1); + } + + let report; + + if matches.is_present("QUIET") { + report = std::sync::Arc::new(mk_quiet_report()); + } else if atty::is(Stream::Stdout) { + report = std::sync::Arc::new(mk_progress_bar_report()); + } else { + report = Arc::new(mk_simple_report()); + } + + let opts = CacheRestoreOptions { + input: &input_file, + output: &output_file, + async_io: !matches.is_present("SYNC_IO"), + report, + }; + + if let Err(reason) = restore(opts) { + println!("{}", reason); + process::exit(1); + } +} diff --git a/src/cache/mod.rs b/src/cache/mod.rs index 5aa61fa..ceb8867 100644 --- a/src/cache/mod.rs +++ b/src/cache/mod.rs @@ -2,5 +2,6 @@ pub mod check; pub mod dump; pub mod hint; pub mod mapping; +pub mod restore; pub mod superblock; pub mod xml; diff --git a/src/cache/restore.rs b/src/cache/restore.rs new file mode 100644 index 0000000..1e55452 --- /dev/null +++ b/src/cache/restore.rs @@ -0,0 +1,273 @@ +use anyhow::{anyhow, Result}; + +use std::convert::TryInto; +use std::fs::OpenOptions; +use std::path::Path; +use std::sync::Arc; + +use crate::cache::hint::Hint; +use crate::cache::mapping::{Mapping, MappingFlags}; +use crate::cache::superblock::*; +use crate::cache::xml::{self, MetadataVisitor, Visit}; +use crate::io_engine::*; +use crate::math::*; +use crate::pdata::array_builder::*; +use crate::pdata::space_map::*; +use crate::report::*; +use crate::write_batcher::*; + +//------------------------------------------ + +const MAX_CONCURRENT_IO: u32 = 1024; + +//------------------------------------------ + +pub struct CacheRestoreOptions<'a> { + pub input: &'a Path, + pub output: &'a Path, + pub async_io: bool, + pub report: Arc, +} + +struct Context { + _report: Arc, + engine: Arc, +} + +fn mk_context(opts: &CacheRestoreOptions) -> anyhow::Result { + let engine: Arc; + + if opts.async_io { + engine = Arc::new(AsyncIoEngine::new(opts.output, MAX_CONCURRENT_IO, true)?); + } else { + let nr_threads = std::cmp::max(8, num_cpus::get() * 2); + engine = Arc::new(SyncIoEngine::new(opts.output, nr_threads, true)?); + } + + Ok(Context { + _report: opts.report.clone(), + engine, + }) +} + +//------------------------------------------ + +struct RestoreResult { + sb: xml::Superblock, + mapping_root: u64, + dirty_root: Option, + hint_root: u64, + discard_root: u64, +} + +struct Restorer<'a> { + write_batcher: &'a mut WriteBatcher, + sb: Option, + mapping_builder: Option>, + dirty_builder: Option>, + hint_builder: Option>, + mapping_root: Option, + dirty_root: Option, + hint_root: Option, + discard_root: Option, + dirty_bits: (u32, u64), +} + +impl<'a> Restorer<'a> { + fn new(w: &'a mut WriteBatcher) -> Restorer<'a> { + Restorer { + write_batcher: w, + sb: None, + mapping_builder: None, + dirty_builder: None, + hint_builder: None, + mapping_root: None, + dirty_root: None, + hint_root: None, + discard_root: None, + dirty_bits: (0, 0), + } + } + + fn get_result(self) -> Result { + if self.sb.is_none() || self.discard_root.is_none() { + return Err(anyhow!("No superblock found in xml file")); + } + if self.mapping_root.is_none() || self.hint_root.is_none() { + return Err(anyhow!("No mappings or hints sections in xml file")); + } + Ok(RestoreResult { + sb: self.sb.unwrap(), + mapping_root: self.mapping_root.unwrap(), + dirty_root: self.dirty_root, + hint_root: self.hint_root.unwrap(), + discard_root: self.discard_root.unwrap(), + }) + } +} + +impl<'a> MetadataVisitor for Restorer<'a> { + fn superblock_b(&mut self, sb: &xml::Superblock) -> Result { + self.sb = Some(sb.clone()); + self.write_batcher.alloc()?; + self.mapping_builder = Some(ArrayBuilder::new(sb.nr_cache_blocks as u64)); + self.dirty_builder = Some(ArrayBuilder::new(div_up(sb.nr_cache_blocks as u64, 64))); + self.hint_builder = Some(ArrayBuilder::new(sb.nr_cache_blocks as u64)); + + let discard_builder = ArrayBuilder::::new(0); // discard bitset is optional + self.discard_root = Some(discard_builder.complete(self.write_batcher)?); + + Ok(Visit::Continue) + } + + fn superblock_e(&mut self) -> Result { + Ok(Visit::Continue) + } + + fn mappings_b(&mut self) -> Result { + Ok(Visit::Continue) + } + + fn mappings_e(&mut self) -> Result { + let mut mapping_builder = None; + std::mem::swap(&mut self.mapping_builder, &mut mapping_builder); + if let Some(builder) = mapping_builder { + self.mapping_root = Some(builder.complete(self.write_batcher)?); + } + + // push the bufferred trailing bits + let b = self.dirty_builder.as_mut().unwrap(); + b.push_value( + self.write_batcher, + self.dirty_bits.0 as u64, + self.dirty_bits.1, + )?; + + let mut dirty_builder = None; + std::mem::swap(&mut self.dirty_builder, &mut dirty_builder); + if let Some(builder) = dirty_builder { + self.dirty_root = Some(builder.complete(self.write_batcher)?); + } + + Ok(Visit::Continue) + } + + fn mapping(&mut self, m: &xml::Map) -> Result { + let map = Mapping { + oblock: m.oblock, + flags: MappingFlags::Valid as u32, + }; + let mapping_builder = self.mapping_builder.as_mut().unwrap(); + mapping_builder.push_value(self.write_batcher, m.cblock as u64, map)?; + + if m.dirty { + let index = m.cblock >> 6; + let bi = m.cblock & 63; + if index == self.dirty_bits.0 { + self.dirty_bits.1 |= 1 << bi; + } else { + let dirty_builder = self.dirty_builder.as_mut().unwrap(); + dirty_builder.push_value( + self.write_batcher, + self.dirty_bits.0 as u64, + self.dirty_bits.1, + )?; + self.dirty_bits.0 = index; + self.dirty_bits.1 = 0; + } + } + + Ok(Visit::Continue) + } + + fn hints_b(&mut self) -> Result { + Ok(Visit::Continue) + } + + fn hints_e(&mut self) -> Result { + let mut hint_builder = None; + std::mem::swap(&mut self.hint_builder, &mut hint_builder); + if let Some(builder) = hint_builder { + self.hint_root = Some(builder.complete(self.write_batcher)?); + } + Ok(Visit::Continue) + } + + fn hint(&mut self, h: &xml::Hint) -> Result { + let hint = Hint { + hint: h.data[..].try_into().unwrap(), + }; + let hint_builder = self.hint_builder.as_mut().unwrap(); + hint_builder.push_value(self.write_batcher, h.cblock as u64, hint)?; + Ok(Visit::Continue) + } + + fn discards_b(&mut self) -> Result { + Ok(Visit::Continue) + } + + fn discards_e(&mut self) -> Result { + Ok(Visit::Continue) + } + + fn discard(&mut self, _d: &xml::Discard) -> Result { + Ok(Visit::Continue) + } + + fn eof(&mut self) -> Result { + Ok(Visit::Continue) + } +} + +//------------------------------------------ + +pub fn restore(opts: CacheRestoreOptions) -> Result<()> { + let input = OpenOptions::new() + .read(true) + .write(false) + .open(opts.input)?; + + let ctx = mk_context(&opts)?; + + let sm = core_sm(ctx.engine.get_nr_blocks(), u32::MAX); + let mut w = WriteBatcher::new(ctx.engine.clone(), sm.clone(), ctx.engine.get_batch_size()); + + let mut restorer = Restorer::new(&mut w); + xml::read(input, &mut restorer)?; + let result = restorer.get_result()?; + + w.flush()?; + + let sb = Superblock { + flags: SuperblockFlags { + clean_shutdown: true, + needs_check: false, + }, + block: SUPERBLOCK_LOCATION, + version: 2, + policy_name: result.sb.policy.as_bytes().to_vec(), + policy_version: vec![2, 0, 0], + policy_hint_size: result.sb.hint_width, + metadata_sm_root: vec![0; SPACE_MAP_ROOT_SIZE], + mapping_root: result.mapping_root, + dirty_root: result.dirty_root, + hint_root: result.hint_root, + discard_root: result.discard_root, + discard_block_size: 0, + discard_nr_blocks: 0, + data_block_size: result.sb.block_size, + cache_blocks: result.sb.nr_cache_blocks, + compat_flags: 0, + compat_ro_flags: 0, + incompat_flags: 0, + read_hits: 0, + read_misses: 9, + write_hits: 0, + write_misses: 0, + }; + write_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION, &sb)?; + + Ok(()) +} + +//------------------------------------------ diff --git a/src/pdata/array_builder.rs b/src/pdata/array_builder.rs index da94f5f..8b7d540 100644 --- a/src/pdata/array_builder.rs +++ b/src/pdata/array_builder.rs @@ -6,25 +6,24 @@ use std::io::Cursor; use crate::checksum; use crate::io_engine::*; use crate::pdata::array::*; +use crate::pdata::btree_builder::*; use crate::pdata::unpack::*; use crate::write_batcher::*; //------------------------------------------ -pub struct ArrayBuilder { +pub struct ArrayBlockBuilder { array_io: ArrayIO, max_entries_per_block: usize, values: VecDeque<(u64, V)>, - array_blocks: Vec, + array_blocks: Vec, nr_entries: u64, nr_emitted: u64, nr_queued: u64, } -struct ArraySummary { - block: u64, - index: u64, - nr_entries: usize, +pub struct ArrayBuilder { + block_builder: ArrayBlockBuilder, } struct ArrayIO { @@ -43,9 +42,9 @@ fn calc_max_entries() -> usize { //------------------------------------------ -impl ArrayBuilder { - pub fn new(nr_entries: u64) -> ArrayBuilder { - ArrayBuilder { +impl ArrayBlockBuilder { + pub fn new(nr_entries: u64) -> ArrayBlockBuilder { + ArrayBlockBuilder { array_io: ArrayIO::new(), max_entries_per_block: calc_max_entries::(), values: VecDeque::new(), @@ -56,7 +55,7 @@ impl ArrayBuilder { } } - fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> { + pub fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> { assert!(index >= self.nr_emitted + self.nr_queued); assert!(index < self.nr_entries); @@ -70,8 +69,9 @@ impl ArrayBuilder { Ok(()) } - fn complete(mut self, w: &mut WriteBatcher) -> Result> { + pub fn complete(mut self, w: &mut WriteBatcher) -> Result> { if self.nr_emitted + self.nr_queued < self.nr_entries { + // FIXME: flushing with a default values looks confusing self.push_value(w, self.nr_entries - 1, Default::default())?; } self.emit_all(w)?; @@ -112,26 +112,24 @@ impl ArrayBuilder { let len = self.values.front().unwrap().0 - self.nr_emitted + 1; if len <= nr_free as u64 { let (_, v) = self.values.pop_front().unwrap(); - values.resize_with(len as usize - 1, Default::default); + if len > 1 { + values.resize_with(values.len() + len as usize - 1, Default::default); + } values.push(v); nr_free -= len as usize; self.nr_emitted += len; self.nr_queued -= len; } else { - values.resize_with(nr_free, Default::default); + values.resize_with(values.len() + nr_free as usize, Default::default); self.nr_emitted += nr_free as u64; self.nr_queued -= nr_free as u64; + nr_free = 0; } } - let nr_entries = values.len(); let wresult = self.array_io.write(w, values)?; - self.array_blocks.push(ArraySummary { - block: wresult.loc, - index: self.nr_emitted / self.max_entries_per_block as u64, - nr_entries, - }); + self.array_blocks.push(wresult.loc); Ok(()) } @@ -139,6 +137,30 @@ impl ArrayBuilder { //------------------------------------------ +impl ArrayBuilder { + pub fn new(nr_entries: u64) -> ArrayBuilder { + ArrayBuilder { + block_builder: ArrayBlockBuilder::::new(nr_entries), + } + } + + pub fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> { + self.block_builder.push_value(w, index, v) + } + + pub fn complete(self, w: &mut WriteBatcher) -> Result { + let blocks = self.block_builder.complete(w)?; + let mut index_builder = Builder::::new(Box::new(NoopRC {})); + + for (i, b) in blocks.iter().enumerate() { + index_builder.push_value(w, i as u64, *b)?; + } + index_builder.complete(w) + } +} + +//------------------------------------------ + impl ArrayIO { pub fn new() -> ArrayIO { ArrayIO { diff --git a/src/pdata/mod.rs b/src/pdata/mod.rs index 56e8f2c..2ae347b 100644 --- a/src/pdata/mod.rs +++ b/src/pdata/mod.rs @@ -1,4 +1,5 @@ pub mod array; +pub mod array_builder; pub mod array_walker; pub mod bitset; pub mod btree;