[thin_check (rust)] Add a threaded version of btree walk.

Bottom level mappings use this if there are few devices.  Performance
is a bit slower for io_uring, and much slower for sync io (which I think
is due to io scheduling which I can't do much about).
This commit is contained in:
Joe Thornber 2020-08-21 10:10:49 +01:00
parent b01a0a46d1
commit cda92de441
2 changed files with 149 additions and 48 deletions

View File

@ -2,6 +2,7 @@ use anyhow::{anyhow, Result};
use nom::{number::complete::*, IResult}; use nom::{number::complete::*, IResult};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
use crate::checksum; use crate::checksum;
use crate::io_engine::*; use crate::io_engine::*;
@ -153,8 +154,8 @@ pub trait NodeVisitor<V: Unpack> {
#[derive(Clone)] #[derive(Clone)]
pub struct BTreeWalker { pub struct BTreeWalker {
pub engine: Arc<dyn IoEngine + Send + Sync>, engine: Arc<dyn IoEngine + Send + Sync>,
pub sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>, sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
ignore_non_fatal: bool, ignore_non_fatal: bool,
} }
@ -194,7 +195,7 @@ impl BTreeWalker {
Ok(count) Ok(count)
} }
fn walk_nodes<NV, V>(&mut self, visitor: &mut NV, bs: &[u64]) -> Result<()> fn walk_nodes<NV, V>(&self, visitor: &NV, bs: &[u64]) -> Result<()>
where where
NV: NodeVisitor<V>, NV: NodeVisitor<V>,
V: Unpack, V: Unpack,
@ -215,7 +216,7 @@ impl BTreeWalker {
Ok(()) Ok(())
} }
fn walk_node<NV, V>(&mut self, visitor: &mut NV, b: &Block, is_root: bool) -> Result<()> fn walk_node<NV, V>(&self, visitor: &NV, b: &Block, is_root: bool) -> Result<()>
where where
NV: NodeVisitor<V>, NV: NodeVisitor<V>,
V: Unpack, V: Unpack,
@ -237,7 +238,11 @@ impl BTreeWalker {
} => { } => {
self.walk_nodes(visitor, &values)?; self.walk_nodes(visitor, &values)?;
} }
Leaf { header, keys, values } => { Leaf {
header,
keys,
values,
} => {
visitor.visit(&header, &keys, &values)?; visitor.visit(&header, &keys, &values)?;
} }
} }
@ -245,19 +250,7 @@ impl BTreeWalker {
Ok(()) Ok(())
} }
pub fn walk_b<NV, V>(&mut self, visitor: &mut NV, root: &Block) -> Result<()> pub fn walk<NV, V>(&self, visitor: &NV, root: u64) -> Result<()>
where
NV: NodeVisitor<V>,
V: Unpack,
{
if self.sm_inc(root.loc)? > 0 {
Ok(())
} else {
self.walk_node(visitor, &root, true)
}
}
pub fn walk<NV, V>(&mut self, visitor: &mut NV, root: u64) -> Result<()>
where where
NV: NodeVisitor<V>, NV: NodeVisitor<V>,
V: Unpack, V: Unpack,
@ -272,6 +265,99 @@ impl BTreeWalker {
} }
} }
//--------------------------------
fn walk_node_threaded<NV, V>(
w: Arc<BTreeWalker>,
pool: &ThreadPool,
visitor: Arc<NV>,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
use Node::*;
let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE {
return Err(anyhow!("checksum failed for node {}, {:?}", b.loc, bt));
}
let node = unpack_node::<V>(&b.get_data(), w.ignore_non_fatal, is_root)?;
match node {
Internal {
header: _h,
keys: _k,
values,
} => {
walk_nodes_threaded(w, pool, visitor, &values)?;
}
Leaf {
header,
keys,
values,
} => {
visitor.visit(&header, &keys, &values)?;
}
}
Ok(())
}
fn walk_nodes_threaded<NV, V>(
w: Arc<BTreeWalker>,
pool: &ThreadPool,
visitor: Arc<NV>,
bs: &[u64],
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
let mut blocks = Vec::new();
for b in bs {
if w.sm_inc(*b)? == 0 {
blocks.push(Block::new(*b));
}
}
w.engine.read_many(&mut blocks)?;
for b in blocks {
let w = w.clone();
let visitor = visitor.clone();
pool.execute(move || {
// FIXME: return result
w.walk_node(visitor.as_ref(), &b, false);
});
}
pool.join();
Ok(())
}
pub fn walk_threaded<NV, V>(
w: Arc<BTreeWalker>,
pool: &ThreadPool,
visitor: Arc<NV>,
root: u64,
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
if w.sm_inc(root)? > 0 {
Ok(())
} else {
let mut root = Block::new(root);
w.engine.read(&mut root)?;
walk_node_threaded(w, pool, visitor, &root, true)
}
}
//------------------------------------------ //------------------------------------------
struct ValueCollector<V> { struct ValueCollector<V> {
@ -302,10 +388,10 @@ pub fn btree_to_map<V: Unpack + Clone>(
ignore_non_fatal: bool, ignore_non_fatal: bool,
root: u64, root: u64,
) -> Result<BTreeMap<u64, V>> { ) -> Result<BTreeMap<u64, V>> {
let mut walker = BTreeWalker::new(engine, ignore_non_fatal); let walker = BTreeWalker::new(engine, ignore_non_fatal);
let mut visitor = ValueCollector::<V>::new(); let visitor = ValueCollector::<V>::new();
walker.walk(&mut visitor, root)?; walker.walk(&visitor, root)?;
Ok(visitor.values.into_inner().unwrap()) Ok(visitor.values.into_inner().unwrap())
} }
@ -315,10 +401,10 @@ pub fn btree_to_map_with_sm<V: Unpack + Clone>(
ignore_non_fatal: bool, ignore_non_fatal: bool,
root: u64, root: u64,
) -> Result<BTreeMap<u64, V>> { ) -> Result<BTreeMap<u64, V>> {
let mut walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?;
let mut visitor = ValueCollector::<V>::new(); let visitor = ValueCollector::<V>::new();
walker.walk(&mut visitor, root)?; walker.walk(&visitor, root)?;
Ok(visitor.values.into_inner().unwrap()) Ok(visitor.values.into_inner().unwrap())
} }

View File

@ -9,7 +9,7 @@ use threadpool::ThreadPool;
use crate::checksum; use crate::checksum;
use crate::io_engine::{AsyncIoEngine, Block, IoEngine, SyncIoEngine}; use crate::io_engine::{AsyncIoEngine, Block, IoEngine, SyncIoEngine};
use crate::pdata::btree::{btree_to_map, btree_to_map_with_sm, BTreeWalker, NodeHeader, NodeVisitor}; use crate::pdata::btree::*;
use crate::pdata::space_map::*; use crate::pdata::space_map::*;
use crate::pdata::unpack::*; use crate::pdata::unpack::*;
use crate::report::*; use crate::report::*;
@ -162,14 +162,14 @@ fn check_space_map(
// overflow btree // overflow btree
{ {
let mut v = OverflowChecker::new(&*sm); let v = OverflowChecker::new(&*sm);
let mut w; let w;
if metadata_sm.is_none() { if metadata_sm.is_none() {
w = BTreeWalker::new(engine.clone(), false); w = BTreeWalker::new(engine.clone(), false);
} else { } else {
w = BTreeWalker::new_with_sm(engine.clone(), metadata_sm.unwrap().clone(), false)?; w = BTreeWalker::new_with_sm(engine.clone(), metadata_sm.unwrap().clone(), false)?;
} }
w.walk(&mut v, root.ref_count_root)?; w.walk(&v, root.ref_count_root)?;
} }
let mut blocks = Vec::with_capacity(entries.len()); let mut blocks = Vec::with_capacity(entries.len());
@ -357,24 +357,34 @@ fn check_mapping_bottom_level(
) -> Result<()> { ) -> Result<()> {
ctx.report.set_sub_title("mapping tree"); ctx.report.set_sub_title("mapping tree");
for (_thin_id, root) in roots { let w = Arc::new(BTreeWalker::new_with_sm(
let mut w = BTreeWalker::new_with_sm(ctx.engine.clone(), metadata_sm.clone(), false)?; ctx.engine.clone(),
let data_sm = data_sm.clone(); metadata_sm.clone(),
let root = *root; false,
ctx.pool.execute(move || { )?);
let mut v = BottomLevelVisitor { data_sm };
// FIXME: return error if roots.len() > 64 {
match w.walk(&mut v, root) { ctx.report.info("spreading load across devices");
Err(e) => { for (_thin_id, root) in roots {
eprintln!("walk failed {:?}", e); let data_sm = data_sm.clone();
std::process::abort(); let root = *root;
} let v = BottomLevelVisitor { data_sm };
Ok(_result) => {} let w = w.clone();
} ctx.pool.execute(move || {
}); let _r = w.walk(&v, root);
});
}
ctx.pool.join();
} else {
ctx.report.info("spreading load within device");
for (_thin_id, root) in roots {
let w = w.clone();
let data_sm = data_sm.clone();
let root = *root;
let v = Arc::new(BottomLevelVisitor { data_sm });
walk_threaded(w, &ctx.pool, v, root)?
}
} }
ctx.pool.join();
Ok(()) Ok(())
} }
@ -385,7 +395,11 @@ fn mk_context(opts: &ThinCheckOptions) -> Result<Context> {
if opts.async_io { if opts.async_io {
nr_threads = std::cmp::min(4, num_cpus::get()); nr_threads = std::cmp::min(4, num_cpus::get());
engine = Arc::new(AsyncIoEngine::new(opts.dev, MAX_CONCURRENT_IO, opts.auto_repair)?); engine = Arc::new(AsyncIoEngine::new(
opts.dev,
MAX_CONCURRENT_IO,
opts.auto_repair,
)?);
} else { } else {
nr_threads = num_cpus::get() * 2; nr_threads = num_cpus::get() * 2;
engine = Arc::new(SyncIoEngine::new(opts.dev, nr_threads, opts.auto_repair)?); engine = Arc::new(SyncIoEngine::new(opts.dev, nr_threads, opts.auto_repair)?);
@ -504,7 +518,8 @@ pub fn check(opts: ThinCheckOptions) -> Result<()> {
)?; )?;
// Now the counts should be correct and we can check it. // Now the counts should be correct and we can check it.
let metadata_leaks = check_space_map(&ctx, "metadata", entries, None, metadata_sm.clone(), root)?; let metadata_leaks =
check_space_map(&ctx, "metadata", entries, None, metadata_sm.clone(), root)?;
bail_out(&ctx, "metadata space map")?; bail_out(&ctx, "metadata space map")?;
if opts.auto_repair { if opts.auto_repair {