use anyhow::{anyhow}; use byteorder::{ReadBytesExt, WriteBytesExt}; use nom::{number::complete::*, IResult}; use std::collections::BTreeMap; use std::fmt; use std::sync::{Arc, Mutex}; use thiserror::Error; use threadpool::ThreadPool; use data_encoding::BASE64; use crate::checksum; use crate::io_engine::*; use crate::pdata::space_map::*; use crate::pdata::unpack::*; use crate::pack::vm; // FIXME: check that keys are in ascending order between nodes. //------------------------------------------ #[derive(Clone, Debug, PartialEq)] pub struct KeyRange { start: Option, end: Option, // This is the one-past-the-end value } impl fmt::Display for KeyRange { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match (self.start, self.end) { (None, None) => write!(f, "[..]"), (None, Some(e)) => write!(f, "[..{}]", e), (Some(s), None) => write!(f, "[{}..]", s), (Some(s), Some(e)) => write!(f, "[{}..{}]", s, e), } } } impl KeyRange { // None will be returned if either range would be zero length fn split(&self, n: u64) -> Option<(KeyRange, KeyRange)> { match (self.start, self.end) { (None, None) => Some(( KeyRange { start: None, end: Some(n), }, KeyRange { start: Some(n), end: None, }, )), (None, Some(e)) => { if n < e { Some(( KeyRange { start: None, end: Some(n), }, KeyRange { start: Some(n), end: Some(e), }, )) } else { None } } (Some(s), None) => { if s < n { Some(( KeyRange { start: Some(s), end: Some(n), }, KeyRange { start: Some(n), end: None, }, )) } else { None } } (Some(s), Some(e)) => { if s < n && n < e { Some(( KeyRange { start: Some(s), end: Some(n), }, KeyRange { start: Some(n), end: Some(e), }, )) } else { None } } } } } #[test] fn test_split_range() { struct Test(Option, Option, u64, Option<(KeyRange, KeyRange)>); let tests = vec![ Test( None, None, 100, Some(( KeyRange { start: None, end: Some(100), }, KeyRange { start: Some(100), end: None, }, )), ), Test(None, Some(100), 1000, None), Test( None, Some(100), 50, Some(( KeyRange { start: None, end: Some(50), }, KeyRange { start: Some(50), end: Some(100), }, )), ), Test(None, Some(100), 100, None), Test(Some(100), None, 50, None), Test( Some(100), None, 150, Some(( KeyRange { start: Some(100), end: Some(150), }, KeyRange { start: Some(150), end: None, }, )), ), Test(Some(100), Some(200), 50, None), Test(Some(100), Some(200), 250, None), Test( Some(100), Some(200), 150, Some(( KeyRange { start: Some(100), end: Some(150), }, KeyRange { start: Some(150), end: Some(200), }, )), ), ]; for Test(start, end, n, expected) in tests { let kr = KeyRange { start, end }; let actual = kr.split(n); assert_eq!(actual, expected); } } fn split_one(path: &Vec, kr: &KeyRange, k: u64) -> Result<(KeyRange, KeyRange)> { match kr.split(k) { None => { return Err(node_err(path, &format!( "couldn't split key range {} at {}", kr, k ))); } Some(pair) => Ok(pair), } } fn split_key_ranges(path: &Vec, kr: &KeyRange, keys: &[u64]) -> Result> { let mut krs = Vec::with_capacity(keys.len()); if keys.len() == 0 { return Err(node_err(path, "split_key_ranges: no keys present")); } // The first key gives the lower bound let mut kr = KeyRange { start: Some(keys[0]), end: kr.end, }; for i in 1..keys.len() { let (first, rest) = split_one(path, &kr, keys[i])?; krs.push(first); kr = rest; } krs.push(kr); Ok(krs) } //------------------------------------------ pub fn encode_node_path(path: &[u64]) -> String { let mut buffer: Vec = Vec::with_capacity(128); let mut cursor = std::io::Cursor::new(&mut buffer); assert!(path.len() < 256); // The first entry is normally the superblock (0), so we // special case this. if path[0] == 0 { let count = ((path.len() as u8) - 1) << 1; cursor.write_u8(count as u8).unwrap(); vm::pack_u64s(&mut cursor, &path[1..]).unwrap(); } else { let count = ((path.len() as u8) << 1) | 1; cursor.write_u8(count as u8).unwrap(); vm::pack_u64s(&mut cursor, path).unwrap(); } BASE64.encode(&buffer) } pub fn decode_node_path(text: &str) -> anyhow::Result> { let mut buffer = vec![0; 128]; let bytes = &mut buffer[0..BASE64.decode_len(text.len()).unwrap()]; let len = BASE64.decode_mut(text.as_bytes(), &mut bytes[0..]).map_err(|_| anyhow!("bad node path. Unable to base64 decode."))?; let mut input = std::io::Cursor::new(bytes); let mut count = input.read_u8()?; let mut prepend_zero = false; if (count & 0x1) == 0 { // Implicit 0 as first entry prepend_zero = true; } count >>= 1; let count = count as usize; if count == 0 { return Ok(Vec::new()); } let mut output = Vec::with_capacity(count * 8); let mut cursor = std::io::Cursor::new(&mut output); let mut vm = vm::VM::new(); let written = vm.exec(&mut input, &mut cursor, count * 8)?; assert_eq!(written, count * 8); let mut cursor = std::io::Cursor::new(&mut output); let mut path = vm::unpack_u64s(&mut cursor, count)?; if prepend_zero { let mut full_path = vec![0u64]; full_path.append(&mut path); Ok(full_path) } else { Ok(path) } } #[test] fn test_encode_path() { struct Test(Vec); let tests = vec![ Test(vec![]), Test(vec![1]), Test(vec![1, 2]), Test(vec![1, 2, 3, 4]), ]; for t in tests { let encoded = encode_node_path(&t.0[0..]); eprintln!("encoded = '{}'", &encoded); let decoded = decode_node_path(&encoded).unwrap(); assert_eq!(decoded, &t.0[0..]); } } //------------------------------------------ const NODE_HEADER_SIZE: usize = 32; #[derive(Error, Clone, Debug)] pub enum BTreeError { // #[error("io error")] IoError, // (std::io::Error), // FIXME: we can't clone an io_error // #[error("node error: {0}")] NodeError(String), // #[error("value error: {0}")] ValueError(String), // #[error("keys: {0:?}")] KeyContext(KeyRange, Box), // #[error("aggregate: {0:?}")] Aggregate(Vec), // #[error("{0:?}, {1}")] Path(Vec, Box), } impl fmt::Display for BTreeError { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { match self { BTreeError::IoError => write!(f, "io error"), BTreeError::NodeError(msg) => write!(f, "node error: {}", msg), BTreeError::ValueError(msg) => write!(f, "value error: {}", msg), BTreeError::KeyContext(kr, be) => write!(f, "{}, effecting keys {}", be, kr), BTreeError::Aggregate(errs) => { for e in errs { write!(f, "{}", e)? } Ok(()) } BTreeError::Path(path, e) => write!(f, "{} {}", e, encode_node_path(path)), } } } pub fn node_err(path: &Vec, msg: &str) -> BTreeError { BTreeError::Path(path.clone(), Box::new(BTreeError::NodeError(msg.to_string()))) } fn node_err_s(path: &Vec, msg: String) -> BTreeError { BTreeError::Path(path.clone(), Box::new(BTreeError::NodeError(msg))) } pub fn io_err(path: &Vec) -> BTreeError { BTreeError::Path(path.clone(), Box::new(BTreeError::IoError)) } pub fn value_err(msg: String) -> BTreeError { BTreeError::ValueError(msg) } fn aggregate_error(rs: Vec) -> BTreeError { BTreeError::Aggregate(rs) } impl BTreeError { pub fn keys_context(self, keys: &KeyRange) -> BTreeError { BTreeError::KeyContext(keys.clone(), Box::new(self)) } } pub type Result = std::result::Result; //------------------------------------------ #[derive(Debug, Clone, Copy)] pub struct NodeHeader { pub block: u64, pub is_leaf: bool, pub nr_entries: u32, pub max_entries: u32, pub value_size: u32, } #[allow(dead_code)] const INTERNAL_NODE: u32 = 1; const LEAF_NODE: u32 = 2; impl Unpack for NodeHeader { fn disk_size() -> u32 { 32 } fn unpack(data: &[u8]) -> IResult<&[u8], NodeHeader> { let (i, _csum) = le_u32(data)?; let (i, flags) = le_u32(i)?; let (i, block) = le_u64(i)?; let (i, nr_entries) = le_u32(i)?; let (i, max_entries) = le_u32(i)?; let (i, value_size) = le_u32(i)?; let (i, _padding) = le_u32(i)?; Ok(( i, NodeHeader { block, is_leaf: flags == LEAF_NODE, nr_entries, max_entries, value_size, }, )) } } #[derive(Clone)] pub enum Node { Internal { header: NodeHeader, keys: Vec, values: Vec, }, Leaf { header: NodeHeader, keys: Vec, values: Vec, }, } impl Node { pub fn get_header(&self) -> &NodeHeader { use Node::*; match self { Internal { header, .. } => header, Leaf { header, .. } => header, } } } pub fn convert_result<'a, V>(path: &Vec, r: IResult<&'a [u8], V>) -> Result<(&'a [u8], V)> { r.map_err(|_e| node_err(path, "parse error")) } pub fn convert_io_err(path: &Vec, r: std::io::Result) -> Result { r.map_err(|_| io_err(path)) } pub fn unpack_node( path: &Vec, data: &[u8], ignore_non_fatal: bool, is_root: bool, ) -> Result> { use nom::multi::count; let (i, header) = NodeHeader::unpack(data).map_err(|_e| node_err(path, "couldn't parse node header"))?; if header.is_leaf && header.value_size != V::disk_size() { return Err(node_err_s(path, format!( "value_size mismatch: expected {}, was {}", V::disk_size(), header.value_size ))); } let elt_size = header.value_size + 8; if elt_size as usize * header.max_entries as usize + NODE_HEADER_SIZE > BLOCK_SIZE { return Err(node_err_s(path, format!( "max_entries is too large ({})", header.max_entries ))); } if header.nr_entries > header.max_entries { return Err(node_err(path, "nr_entries > max_entries")); } if !ignore_non_fatal { if header.max_entries % 3 != 0 { return Err(node_err(path, "max_entries is not divisible by 3")); } if !is_root { let min = header.max_entries / 3; if header.nr_entries < min { return Err(node_err_s(path, format!( "too few entries {}, expected at least {}", header.nr_entries, min ))); } } } let (i, keys) = convert_result(path, count(le_u64, header.nr_entries as usize)(i))?; let mut last = None; for k in &keys { if let Some(l) = last { if k <= l { return Err(node_err(path, "keys out of order")); } } last = Some(k); } let nr_free = header.max_entries - header.nr_entries; let (i, _padding) = convert_result(path, count(le_u64, nr_free as usize)(i))?; if header.is_leaf { let (_i, values) = convert_result(path, count(V::unpack, header.nr_entries as usize)(i))?; Ok(Node::Leaf { header, keys, values, }) } else { let (_i, values) = convert_result(path, count(le_u64, header.nr_entries as usize)(i))?; Ok(Node::Internal { header, keys, values, }) } } //------------------------------------------ pub trait NodeVisitor { // &self is deliberately non mut to allow the walker to use multiple threads. fn visit(&self, path: &Vec, keys: &KeyRange, header: &NodeHeader, keys: &[u64], values: &[V]) -> Result<()>; } #[derive(Clone)] pub struct BTreeWalker { engine: Arc, sm: Arc>, fails: Arc>>, ignore_non_fatal: bool, } impl BTreeWalker { pub fn new(engine: Arc, ignore_non_fatal: bool) -> BTreeWalker { let nr_blocks = engine.get_nr_blocks() as usize; let r: BTreeWalker = BTreeWalker { engine, sm: Arc::new(Mutex::new(RestrictedSpaceMap::new(nr_blocks as u64))), fails: Arc::new(Mutex::new(BTreeMap::new())), ignore_non_fatal, }; r } pub fn new_with_sm( engine: Arc, sm: Arc>, ignore_non_fatal: bool, ) -> Result { { let sm = sm.lock().unwrap(); assert_eq!(sm.get_nr_blocks().unwrap(), engine.get_nr_blocks()); } Ok(BTreeWalker { engine, sm, fails: Arc::new(Mutex::new(BTreeMap::new())), ignore_non_fatal, }) } fn failed(&self, b: u64) -> Option { let fails = self.fails.lock().unwrap(); match fails.get(&b) { None => None, Some(e) => Some(e.clone()), } } fn set_fail(&self, b: u64, err: BTreeError) { // FIXME: should we monitor the size of fails, and abort if too many errors? let mut fails = self.fails.lock().unwrap(); fails.insert(b, err); } // Atomically increments the ref count, and returns the _old_ count. fn sm_inc(&self, b: u64) -> u32 { let mut sm = self.sm.lock().unwrap(); let count = sm.get(b).unwrap(); sm.inc(b, 1).unwrap(); count } fn build_aggregate(&self, b: u64, errs: Vec) -> Result<()> { match errs.len() { 0 => Ok(()), 1 => { let e = errs[0].clone(); self.set_fail(b, e.clone()); Err(e) } _ => { let e = aggregate_error(errs); self.set_fail(b, e.clone()); Err(e) } } } fn walk_nodes( &self, path: &mut Vec, visitor: &NV, kr: &[KeyRange], bs: &[u64], ) -> Vec where NV: NodeVisitor, V: Unpack, { let mut errs: Vec = Vec::new(); let mut blocks = Vec::with_capacity(bs.len()); for b in bs { if self.sm_inc(*b) == 0 { // Node not yet seen blocks.push(*b); } else { // This node has already been checked ... match self.failed(*b) { None => { // ... it was clean so we can ignore. } Some(e) => { // ... there was an error errs.push(e.clone()); } } } } match self.engine.read_many(&blocks[0..]) { Err(_) => { // IO completely failed, error every block for (i, b) in blocks.iter().enumerate() { let e = io_err(path).keys_context(&kr[i]); errs.push(e.clone()); self.set_fail(*b, e); } } Ok(rblocks) => { let mut i = 0; for rb in rblocks { match rb { Err(_) => { let e = io_err(path).keys_context(&kr[i]); errs.push(e.clone()); self.set_fail(blocks[i], e); } Ok(b) => match self.walk_node(path, visitor, &kr[i], &b, false) { Err(e) => { errs.push(e); } Ok(()) => {} }, } i += 1; } } } errs } fn walk_node_( &self, path: &mut Vec, visitor: &NV, kr: &KeyRange, b: &Block, is_root: bool, ) -> Result<()> where NV: NodeVisitor, V: Unpack, { use Node::*; let bt = checksum::metadata_block_type(b.get_data()); if bt != checksum::BT::NODE { return Err( node_err_s(path, format!("checksum failed for node {}, {:?}", b.loc, bt)) .keys_context(kr), ); } let node = unpack_node::(path, &b.get_data(), self.ignore_non_fatal, is_root)?; match node { Internal { keys, values, .. } => { let krs = split_key_ranges(path, &kr, &keys)?; let errs = self.walk_nodes(path, visitor, &krs, &values); return self.build_aggregate(b.loc, errs); } Leaf { header, keys, values, } => { if let Err(e) = visitor.visit(path, &kr, &header, &keys, &values) { let e = BTreeError::Path(path.clone(), Box::new(e.clone())); self.set_fail(b.loc, e.clone()); return Err(e); } } } Ok(()) } fn walk_node( &self, path: &mut Vec, visitor: &NV, kr: &KeyRange, b: &Block, is_root: bool, ) -> Result<()> where NV: NodeVisitor, V: Unpack, { path.push(b.loc); let r = self.walk_node_(path, visitor, kr, b, is_root); path.pop(); r } pub fn walk(&self, path: &mut Vec, visitor: &NV, root: u64) -> Result<()> where NV: NodeVisitor, V: Unpack, { if self.sm_inc(root) > 0 { if let Some(e) = self.failed(root) { Err(e.clone()) } else { Ok(()) } } else { let root = self.engine.read(root).map_err(|_| io_err(path))?; let kr = KeyRange { start: None, end: None, }; self.walk_node(path, visitor, &kr, &root, true) } } } //-------------------------------- /* fn walk_node_threaded( w: Arc, pool: &ThreadPool, visitor: Arc, kr: &KeyRange, b: &Block, is_root: bool, ) -> Result<()> where NV: NodeVisitor + Send + Sync + 'static, V: Unpack, { use Node::*; let bt = checksum::metadata_block_type(b.get_data()); if bt != checksum::BT::NODE { return Err(node_err_s(format!( "checksum failed for node {}, {:?}", b.loc, bt ))); } let node = unpack_node::(&b.get_data(), w.ignore_non_fatal, is_root)?; match node { Internal { keys, values, .. } => { let krs = BTreeWalker::split_key_ranges(&kr, &keys)?; walk_nodes_threaded(w, pool, visitor, &krs, &values)?; } Leaf { header, keys, values, } => { visitor.visit(kr, &header, &keys, &values)?; } } Ok(()) } fn walk_nodes_threaded( w: Arc, pool: &ThreadPool, visitor: Arc, krs: &[KeyRange], bs: &[u64], ) -> Result<()> where NV: NodeVisitor + Send + Sync + 'static, V: Unpack, { let mut blocks = Vec::new(); for b in bs { if w.sm_inc(*b)? == 0 { blocks.push(*b); } } let rblocks = convert_io_err(w.engine.read_many(&blocks[0..]))?; let mut i = 0; for b in rblocks { match b { Err(_) => { // FIXME: aggregate these return Err(io_err()); } Ok(b) => { let w = w.clone(); let visitor = visitor.clone(); let kr = krs[i].clone(); pool.execute(move || { let result = w.walk_node(visitor.as_ref(), &kr, &b, false); if result.is_err() { todo!(); } }); } } i += 1; } pool.join(); Ok(()) } pub fn walk_threaded( w: Arc, pool: &ThreadPool, visitor: Arc, root: u64, ) -> Result<()> where NV: NodeVisitor + Send + Sync + 'static, V: Unpack, { if w.sm_inc(root)? > 0 { Ok(()) } else { let root = convert_io_err(w.engine.read(root))?; let kr = KeyRange { start: None, end: None, }; walk_node_threaded(w, pool, visitor, &kr, &root, true) } } */ pub fn walk_threaded( path: &mut Vec, w: Arc, pool: &ThreadPool, visitor: Arc, root: u64, ) -> Result<()> where NV: NodeVisitor + Send + Sync + 'static, V: Unpack, { w.walk(path, visitor.as_ref(), root) } //------------------------------------------ struct ValueCollector { values: Mutex>, } impl ValueCollector { fn new() -> ValueCollector { ValueCollector { values: Mutex::new(BTreeMap::new()), } } } // FIXME: should we be using Copy rather than clone? (Yes) impl NodeVisitor for ValueCollector { fn visit(&self, _path: &Vec, _kr: &KeyRange, _h: &NodeHeader, keys: &[u64], values: &[V]) -> Result<()> { let mut vals = self.values.lock().unwrap(); for n in 0..keys.len() { vals.insert(keys[n], values[n].clone()); } Ok(()) } } pub fn btree_to_map( path: &mut Vec, engine: Arc, ignore_non_fatal: bool, root: u64, ) -> Result> { let walker = BTreeWalker::new(engine, ignore_non_fatal); let visitor = ValueCollector::::new(); let mut path = Vec::new(); walker.walk(&mut path, &visitor, root)?; Ok(visitor.values.into_inner().unwrap()) } pub fn btree_to_map_with_sm( path: &mut Vec, engine: Arc, sm: Arc>, ignore_non_fatal: bool, root: u64, ) -> Result> { let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; let visitor = ValueCollector::::new(); walker.walk(path, &visitor, root)?; Ok(visitor.values.into_inner().unwrap()) } //------------------------------------------ struct ValuePathCollector { values: Mutex, V)>> } impl ValuePathCollector { fn new() -> ValuePathCollector { ValuePathCollector { values: Mutex::new(BTreeMap::new()), } } } impl NodeVisitor for ValuePathCollector { fn visit(&self, path: &Vec, _kr: &KeyRange, _h: &NodeHeader, keys: &[u64], values: &[V]) -> Result<()> { let mut vals = self.values.lock().unwrap(); for n in 0..keys.len() { vals.insert(keys[n], (path.clone(), values[n].clone())); } Ok(()) } } pub fn btree_to_map_with_path( path: &mut Vec, engine: Arc, sm: Arc>, ignore_non_fatal: bool, root: u64, ) -> Result, V)>> { let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?; let visitor = ValuePathCollector::::new(); walker.walk(path, &visitor, root)?; Ok(visitor.values.into_inner().unwrap()) } //------------------------------------------