[thin_check (rust)] Reinstate walk_node_threaded

This commit is contained in:
Joe Thornber 2020-09-23 15:35:09 +01:00
parent f4c3098e02
commit 34052c540c
2 changed files with 238 additions and 95 deletions

View File

@ -1,18 +1,18 @@
use anyhow::{anyhow}; use anyhow::anyhow;
use byteorder::{ReadBytesExt, WriteBytesExt}; use byteorder::{ReadBytesExt, WriteBytesExt};
use data_encoding::BASE64;
use nom::{number::complete::*, IResult}; use nom::{number::complete::*, IResult};
use std::collections::BTreeMap; use std::collections::BTreeMap;
use std::fmt; use std::fmt;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use thiserror::Error; use thiserror::Error;
use threadpool::ThreadPool; use threadpool::ThreadPool;
use data_encoding::BASE64;
use crate::checksum; use crate::checksum;
use crate::io_engine::*; use crate::io_engine::*;
use crate::pack::vm;
use crate::pdata::space_map::*; use crate::pdata::space_map::*;
use crate::pdata::unpack::*; use crate::pdata::unpack::*;
use crate::pack::vm;
//------------------------------------------ //------------------------------------------
@ -181,10 +181,10 @@ fn test_split_range() {
fn split_one(path: &Vec<u64>, kr: &KeyRange, k: u64) -> Result<(KeyRange, KeyRange)> { fn split_one(path: &Vec<u64>, kr: &KeyRange, k: u64) -> Result<(KeyRange, KeyRange)> {
match kr.split(k) { match kr.split(k) {
None => { None => {
return Err(node_err(path, &format!( return Err(node_err(
"couldn't split key range {} at {}", path,
kr, k &format!("couldn't split key range {} at {}", kr, k),
))); ));
} }
Some(pair) => Ok(pair), Some(pair) => Ok(pair),
} }
@ -222,7 +222,7 @@ pub fn encode_node_path(path: &[u64]) -> String {
let mut buffer: Vec<u8> = Vec::with_capacity(128); let mut buffer: Vec<u8> = Vec::with_capacity(128);
let mut cursor = std::io::Cursor::new(&mut buffer); let mut cursor = std::io::Cursor::new(&mut buffer);
assert!(path.len() < 256); assert!(path.len() < 256);
// The first entry is normally the superblock (0), so we // The first entry is normally the superblock (0), so we
// special case this. // special case this.
if path.len() > 0 && path[0] == 0 { if path.len() > 0 && path[0] == 0 {
@ -234,17 +234,19 @@ pub fn encode_node_path(path: &[u64]) -> String {
cursor.write_u8(count as u8).unwrap(); cursor.write_u8(count as u8).unwrap();
vm::pack_u64s(&mut cursor, path).unwrap(); vm::pack_u64s(&mut cursor, path).unwrap();
} }
BASE64.encode(&buffer) BASE64.encode(&buffer)
} }
pub fn decode_node_path(text: &str) -> anyhow::Result<Vec<u64>> { pub fn decode_node_path(text: &str) -> anyhow::Result<Vec<u64>> {
let mut buffer = vec![0; 128]; let mut buffer = vec![0; 128];
let bytes = &mut buffer[0..BASE64.decode_len(text.len()).unwrap()]; let bytes = &mut buffer[0..BASE64.decode_len(text.len()).unwrap()];
BASE64.decode_mut(text.as_bytes(), &mut bytes[0..]).map_err(|_| anyhow!("bad node path. Unable to base64 decode."))?; BASE64
.decode_mut(text.as_bytes(), &mut bytes[0..])
.map_err(|_| anyhow!("bad node path. Unable to base64 decode."))?;
let mut input = std::io::Cursor::new(bytes); let mut input = std::io::Cursor::new(bytes);
let mut count = input.read_u8()?; let mut count = input.read_u8()?;
let mut prepend_zero = false; let mut prepend_zero = false;
if (count & 0x1) == 0 { if (count & 0x1) == 0 {
@ -344,7 +346,10 @@ impl fmt::Display for BTreeError {
} }
} }
pub fn node_err(path: &Vec<u64>, msg: &str) -> BTreeError { pub fn node_err(path: &Vec<u64>, msg: &str) -> BTreeError {
BTreeError::Path(path.clone(), Box::new(BTreeError::NodeError(msg.to_string()))) BTreeError::Path(
path.clone(),
Box::new(BTreeError::NodeError(msg.to_string())),
)
} }
fn node_err_s(path: &Vec<u64>, msg: String) -> BTreeError { fn node_err_s(path: &Vec<u64>, msg: String) -> BTreeError {
@ -359,7 +364,7 @@ pub fn value_err(msg: String) -> BTreeError {
BTreeError::ValueError(msg) BTreeError::ValueError(msg)
} }
fn aggregate_error(rs: Vec<BTreeError>) -> BTreeError { pub fn aggregate_error(rs: Vec<BTreeError>) -> BTreeError {
BTreeError::Aggregate(rs) BTreeError::Aggregate(rs)
} }
@ -457,19 +462,22 @@ pub fn unpack_node<V: Unpack>(
NodeHeader::unpack(data).map_err(|_e| node_err(path, "couldn't parse node header"))?; NodeHeader::unpack(data).map_err(|_e| node_err(path, "couldn't parse node header"))?;
if header.is_leaf && header.value_size != V::disk_size() { if header.is_leaf && header.value_size != V::disk_size() {
return Err(node_err_s(path, format!( return Err(node_err_s(
"value_size mismatch: expected {}, was {}", path,
V::disk_size(), format!(
header.value_size "value_size mismatch: expected {}, was {}",
))); V::disk_size(),
header.value_size
),
));
} }
let elt_size = header.value_size + 8; let elt_size = header.value_size + 8;
if elt_size as usize * header.max_entries as usize + NODE_HEADER_SIZE > BLOCK_SIZE { if elt_size as usize * header.max_entries as usize + NODE_HEADER_SIZE > BLOCK_SIZE {
return Err(node_err_s(path, format!( return Err(node_err_s(
"max_entries is too large ({})", path,
header.max_entries format!("max_entries is too large ({})", header.max_entries),
))); ));
} }
if header.nr_entries > header.max_entries { if header.nr_entries > header.max_entries {
@ -484,10 +492,13 @@ pub fn unpack_node<V: Unpack>(
if !is_root { if !is_root {
let min = header.max_entries / 3; let min = header.max_entries / 3;
if header.nr_entries < min { if header.nr_entries < min {
return Err(node_err_s(path, format!( return Err(node_err_s(
"too few entries {}, expected at least {}", path,
header.nr_entries, min format!(
))); "too few entries {}, expected at least {}",
header.nr_entries, min
),
));
} }
} }
} }
@ -530,8 +541,14 @@ pub fn unpack_node<V: Unpack>(
pub trait NodeVisitor<V: Unpack> { pub trait NodeVisitor<V: Unpack> {
// &self is deliberately non mut to allow the walker to use multiple threads. // &self is deliberately non mut to allow the walker to use multiple threads.
fn visit(&self, path: &Vec<u64>, keys: &KeyRange, header: &NodeHeader, keys: &[u64], values: &[V]) fn visit(
-> Result<()>; &self,
path: &Vec<u64>,
keys: &KeyRange,
header: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()>;
} }
#[derive(Clone)] #[derive(Clone)]
@ -625,7 +642,7 @@ impl BTreeWalker {
let mut errs: Vec<BTreeError> = Vec::new(); let mut errs: Vec<BTreeError> = Vec::new();
let mut blocks = Vec::with_capacity(bs.len()); let mut blocks = Vec::with_capacity(bs.len());
let mut filtered_krs = Vec::with_capacity(bs.len()); let mut filtered_krs = Vec::with_capacity(krs.len());
for i in 0..bs.len() { for i in 0..bs.len() {
if self.sm_inc(bs[i]) == 0 { if self.sm_inc(bs[i]) == 0 {
// Node not yet seen // Node not yet seen
@ -695,10 +712,11 @@ impl BTreeWalker {
let bt = checksum::metadata_block_type(b.get_data()); let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE { if bt != checksum::BT::NODE {
return Err( return Err(node_err_s(
node_err_s(path, format!("checksum failed for node {}, {:?}", b.loc, bt)) path,
.keys_context(kr), format!("checksum failed for node {}, {:?}", b.loc, bt),
); )
.keys_context(kr));
} }
let node = unpack_node::<V>(path, &b.get_data(), self.ignore_non_fatal, is_root)?; let node = unpack_node::<V>(path, &b.get_data(), self.ignore_non_fatal, is_root)?;
@ -767,9 +785,9 @@ impl BTreeWalker {
//-------------------------------- //--------------------------------
/* fn walk_node_threaded_<NV, V>(
fn walk_node_threaded<NV, V>(
w: Arc<BTreeWalker>, w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool, pool: &ThreadPool,
visitor: Arc<NV>, visitor: Arc<NV>,
kr: &KeyRange, kr: &KeyRange,
@ -784,80 +802,140 @@ where
let bt = checksum::metadata_block_type(b.get_data()); let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE { if bt != checksum::BT::NODE {
return Err(node_err_s(format!( return Err(node_err_s(
"checksum failed for node {}, {:?}", path,
b.loc, bt format!("checksum failed for node {}, {:?}", b.loc, bt),
))); )
.keys_context(kr));
} }
let node = unpack_node::<V>(&b.get_data(), w.ignore_non_fatal, is_root)?; let node = unpack_node::<V>(path, &b.get_data(), w.ignore_non_fatal, is_root)?;
match node { match node {
Internal { keys, values, .. } => { Internal { keys, values, .. } => {
let krs = BTreeWalker::split_key_ranges(&kr, &keys)?; let krs = split_key_ranges(path, &kr, &keys)?;
walk_nodes_threaded(w, pool, visitor, &krs, &values)?; let errs = walk_nodes_threaded(w.clone(), path, pool, visitor, &krs, &values);
return w.build_aggregate(b.loc, errs);
} }
Leaf { Leaf {
header, header,
keys, keys,
values, values,
} => { } => {
visitor.visit(kr, &header, &keys, &values)?; visitor.visit(path, kr, &header, &keys, &values)?;
} }
} }
Ok(()) Ok(())
} }
fn walk_nodes_threaded<NV, V>( fn walk_node_threaded<NV, V>(
w: Arc<BTreeWalker>, w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool, pool: &ThreadPool,
visitor: Arc<NV>, visitor: Arc<NV>,
krs: &[KeyRange], kr: &KeyRange,
bs: &[u64], b: &Block,
is_root: bool,
) -> Result<()> ) -> Result<()>
where where
NV: NodeVisitor<V> + Send + Sync + 'static, NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack, V: Unpack,
{ {
let mut blocks = Vec::new(); path.push(b.loc);
for b in bs { let r = walk_node_threaded_(w, path, pool, visitor, kr, b, is_root);
if w.sm_inc(*b)? == 0 { path.pop();
blocks.push(*b); r
}
fn walk_nodes_threaded<NV, V>(
w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool,
visitor: Arc<NV>,
krs: &[KeyRange],
bs: &[u64],
) -> Vec<BTreeError>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
assert_eq!(krs.len(), bs.len());
let mut errs: Vec<BTreeError> = Vec::new();
let mut blocks = Vec::with_capacity(bs.len());
let mut filtered_krs = Vec::with_capacity(krs.len());
for i in 0..bs.len() {
if w.sm_inc(bs[i]) == 0 {
// Node not yet seen
blocks.push(bs[i]);
filtered_krs.push(krs[i].clone());
} else {
// This node has already been checked ...
match w.failed(bs[i]) {
None => {
// ... it was clean so we can ignore.
}
Some(e) => {
// ... there was an error
errs.push(e.clone());
}
}
} }
} }
let rblocks = convert_io_err(w.engine.read_many(&blocks[0..]))?; match w.engine.read_many(&blocks[0..]) {
Err(_) => {
let mut i = 0; // IO completely failed error every block
for b in rblocks { for (i, b) in blocks.iter().enumerate() {
match b { let e = io_err(path).keys_context(&filtered_krs[i]);
Err(_) => { errs.push(e.clone());
// FIXME: aggregate these w.set_fail(*b, e);
return Err(io_err());
} }
Ok(b) => { }
let w = w.clone(); Ok(rblocks) => {
let visitor = visitor.clone(); let mut i = 0;
let kr = krs[i].clone(); let errs = Arc::new(Mutex::new(Vec::new()));
pool.execute(move || { for rb in rblocks {
let result = w.walk_node(visitor.as_ref(), &kr, &b, false); match rb {
if result.is_err() { Err(_) => {
todo!(); let e = io_err(path).keys_context(&filtered_krs[i]);
let mut errs = errs.lock().unwrap();
errs.push(e.clone());
w.set_fail(blocks[i], e);
} }
}); Ok(b) => {
let w = w.clone();
let visitor = visitor.clone();
let kr = filtered_krs[i].clone();
let errs = errs.clone();
let mut path = path.clone();
pool.execute(move || {
match w.walk_node(&mut path, visitor.as_ref(), &kr, &b, false) {
Err(e) => {
let mut errs = errs.lock().unwrap();
errs.push(e);
}
Ok(()) => {}
}
});
}
}
i += 1;
} }
pool.join();
} }
i += 1;
} }
pool.join();
Ok(()) errs
} }
pub fn walk_threaded<NV, V>( pub fn walk_threaded<NV, V>(
path: &mut Vec<u64>,
w: Arc<BTreeWalker>, w: Arc<BTreeWalker>,
pool: &ThreadPool, pool: &ThreadPool,
visitor: Arc<NV>, visitor: Arc<NV>,
@ -867,19 +945,23 @@ where
NV: NodeVisitor<V> + Send + Sync + 'static, NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack, V: Unpack,
{ {
if w.sm_inc(root)? > 0 { if w.sm_inc(root) > 0 {
Ok(()) if let Some(e) = w.failed(root) {
Err(e.clone())
} else {
Ok(())
}
} else { } else {
let root = convert_io_err(w.engine.read(root))?; let root = w.engine.read(root).map_err(|_| io_err(path))?;
let kr = KeyRange { let kr = KeyRange {
start: None, start: None,
end: None, end: None,
}; };
walk_node_threaded(w, pool, visitor, &kr, &root, true) walk_node_threaded(w, path, pool, visitor, &kr, &root, true)
} }
} }
*/
/*
pub fn walk_threaded<NV, V>( pub fn walk_threaded<NV, V>(
path: &mut Vec<u64>, path: &mut Vec<u64>,
w: Arc<BTreeWalker>, w: Arc<BTreeWalker>,
@ -893,6 +975,7 @@ where
{ {
w.walk(path, visitor.as_ref(), root) w.walk(path, visitor.as_ref(), root)
} }
*/
//------------------------------------------ //------------------------------------------
@ -910,7 +993,14 @@ impl<V> ValueCollector<V> {
// FIXME: should we be using Copy rather than clone? (Yes) // FIXME: should we be using Copy rather than clone? (Yes)
impl<V: Unpack + Copy> NodeVisitor<V> for ValueCollector<V> { impl<V: Unpack + Copy> NodeVisitor<V> for ValueCollector<V> {
fn visit(&self, _path: &Vec<u64>, _kr: &KeyRange, _h: &NodeHeader, keys: &[u64], values: &[V]) -> Result<()> { fn visit(
&self,
_path: &Vec<u64>,
_kr: &KeyRange,
_h: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()> {
let mut vals = self.values.lock().unwrap(); let mut vals = self.values.lock().unwrap();
for n in 0..keys.len() { for n in 0..keys.len() {
vals.insert(keys[n], values[n].clone()); vals.insert(keys[n], values[n].clone());
@ -950,7 +1040,7 @@ pub fn btree_to_map_with_sm<V: Unpack + Copy>(
//------------------------------------------ //------------------------------------------
struct ValuePathCollector<V> { struct ValuePathCollector<V> {
values: Mutex<BTreeMap<u64, (Vec<u64>, V)>> values: Mutex<BTreeMap<u64, (Vec<u64>, V)>>,
} }
impl<V> ValuePathCollector<V> { impl<V> ValuePathCollector<V> {
@ -962,7 +1052,14 @@ impl<V> ValuePathCollector<V> {
} }
impl<V: Unpack + Clone> NodeVisitor<V> for ValuePathCollector<V> { impl<V: Unpack + Clone> NodeVisitor<V> for ValuePathCollector<V> {
fn visit(&self, path: &Vec<u64>, _kr: &KeyRange, _h: &NodeHeader, keys: &[u64], values: &[V]) -> Result<()> { fn visit(
&self,
path: &Vec<u64>,
_kr: &KeyRange,
_h: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()> {
let mut vals = self.values.lock().unwrap(); let mut vals = self.values.lock().unwrap();
for n in 0..keys.len() { for n in 0..keys.len() {
vals.insert(keys[n], (path.clone(), values[n].clone())); vals.insert(keys[n], (path.clone(), values[n].clone()));

View File

@ -13,8 +13,8 @@ use crate::pdata::btree::{self, *};
use crate::pdata::space_map::*; use crate::pdata::space_map::*;
use crate::pdata::unpack::*; use crate::pdata::unpack::*;
use crate::report::*; use crate::report::*;
use crate::thin::superblock::*;
use crate::thin::block_time::*; use crate::thin::block_time::*;
use crate::thin::superblock::*;
//------------------------------------------ //------------------------------------------
@ -25,7 +25,14 @@ struct BottomLevelVisitor {
//------------------------------------------ //------------------------------------------
impl NodeVisitor<BlockTime> for BottomLevelVisitor { impl NodeVisitor<BlockTime> for BottomLevelVisitor {
fn visit(&self, _path: &Vec<u64>, _kr: &KeyRange, _h: &NodeHeader, _k: &[u64], values: &[BlockTime]) -> btree::Result<()> { fn visit(
&self,
_path: &Vec<u64>,
_kr: &KeyRange,
_h: &NodeHeader,
_k: &[u64],
values: &[BlockTime],
) -> btree::Result<()> {
// FIXME: do other checks // FIXME: do other checks
if values.len() == 0 { if values.len() == 0 {
@ -99,7 +106,14 @@ impl<'a> OverflowChecker<'a> {
} }
impl<'a> NodeVisitor<u32> for OverflowChecker<'a> { impl<'a> NodeVisitor<u32> for OverflowChecker<'a> {
fn visit(&self, _path: &Vec<u64>, _kr: &KeyRange, _h: &NodeHeader, keys: &[u64], values: &[u32]) -> btree::Result<()> { fn visit(
&self,
_path: &Vec<u64>,
_kr: &KeyRange,
_h: &NodeHeader,
keys: &[u64],
values: &[u32],
) -> btree::Result<()> {
for n in 0..keys.len() { for n in 0..keys.len() {
let k = keys[n]; let k = keys[n];
let v = values[n]; let v = values[n];
@ -164,7 +178,7 @@ fn check_space_map(
match b { match b {
Err(_e) => { Err(_e) => {
todo!(); todo!();
}, }
Ok(b) => { Ok(b) => {
if checksum::metadata_block_type(&b.get_data()) != checksum::BT::BITMAP { if checksum::metadata_block_type(&b.get_data()) != checksum::BT::BITMAP {
report.fatal(&format!( report.fatal(&format!(
@ -358,20 +372,34 @@ fn check_mapping_bottom_level(
false, false,
)?); )?);
if roots.len() > 64000 { // We want to print out errors as we progress, so we aggregate for each thin and print
// at that point.
let mut failed = false;
if roots.len() > 64 {
ctx.report.info("spreading load across devices"); ctx.report.info("spreading load across devices");
let errs = Arc::new(Mutex::new(Vec::new()));
for (_thin_id, (path, root)) in roots { for (_thin_id, (path, root)) in roots {
let data_sm = data_sm.clone(); let data_sm = data_sm.clone();
let root = *root; let root = *root;
let v = BottomLevelVisitor { data_sm }; let v = BottomLevelVisitor { data_sm };
let w = w.clone(); let w = w.clone();
let mut path = path.clone(); let mut path = path.clone();
let errs = errs.clone();
ctx.pool.execute(move || { ctx.pool.execute(move || {
// FIXME: propogate errors + share fails. if let Err(e) = w.walk(&mut path, &v, root) {
let _r = w.walk(&mut path, &v, root); let mut errs = errs.lock().unwrap();
errs.push(e);
}
}); });
} }
ctx.pool.join(); ctx.pool.join();
let errs = Arc::try_unwrap(errs).unwrap().into_inner().unwrap();
if errs.len() > 0 {
eprintln!("{}", aggregate_error(errs));
failed = true;
}
} else { } else {
ctx.report.info("spreading load within device"); ctx.report.info("spreading load within device");
for (_thin_id, (path, root)) in roots { for (_thin_id, (path, root)) in roots {
@ -380,12 +408,19 @@ fn check_mapping_bottom_level(
let root = *root; let root = *root;
let v = Arc::new(BottomLevelVisitor { data_sm }); let v = Arc::new(BottomLevelVisitor { data_sm });
let mut path = path.clone(); let mut path = path.clone();
// FIXME: propogate errors + share fails.
walk_threaded(&mut path, w, &ctx.pool, v, root)? if let Err(e) = walk_threaded(&mut path, w, &ctx.pool, v, root) {
failed = true;
eprintln!("{}", e);
}
} }
} }
Ok(()) if failed {
Err(anyhow!("Check of mappings failed"))
} else {
Ok(())
}
} }
fn mk_context(opts: &ThinCheckOptions) -> Result<Context> { fn mk_context(opts: &ThinCheckOptions) -> Result<Context> {
@ -464,15 +499,19 @@ pub fn check(opts: ThinCheckOptions) -> Result<()> {
// mapping top level // mapping top level
report.set_sub_title("mapping tree"); report.set_sub_title("mapping tree");
let roots = let roots = btree_to_map_with_path::<u64>(
btree_to_map_with_path::<u64>(&mut path, engine.clone(), metadata_sm.clone(), false, sb.mapping_root)?; &mut path,
engine.clone(),
metadata_sm.clone(),
false,
sb.mapping_root,
)?;
// mapping bottom level // mapping bottom level
let root = unpack::<SMRoot>(&sb.data_sm_root[0..])?; let root = unpack::<SMRoot>(&sb.data_sm_root[0..])?;
let data_sm = core_sm(root.nr_blocks, nr_devs as u32); let data_sm = core_sm(root.nr_blocks, nr_devs as u32);
check_mapping_bottom_level(&ctx, &metadata_sm, &data_sm, &roots)?; check_mapping_bottom_level(&ctx, &metadata_sm, &data_sm, &roots)?;
bail_out(&ctx, "mapping tree")?; bail_out(&ctx, "mapping tree")?;
eprintln!("checked mapping");
report.set_sub_title("data space map"); report.set_sub_title("data space map");
let root = unpack::<SMRoot>(&sb.data_sm_root[0..])?; let root = unpack::<SMRoot>(&sb.data_sm_root[0..])?;
@ -523,8 +562,15 @@ pub fn check(opts: ThinCheckOptions) -> Result<()> {
)?; )?;
// Now the counts should be correct and we can check it. // Now the counts should be correct and we can check it.
let metadata_leaks = let metadata_leaks = check_space_map(
check_space_map(&mut path, &ctx, "metadata", entries, None, metadata_sm.clone(), root)?; &mut path,
&ctx,
"metadata",
entries,
None,
metadata_sm.clone(),
root,
)?;
bail_out(&ctx, "metadata space map")?; bail_out(&ctx, "metadata space map")?;
if opts.auto_repair { if opts.auto_repair {