From 04e0eb3a66550ff04d24edb80a0574a0651144d1 Mon Sep 17 00:00:00 2001 From: Joe Thornber Date: Wed, 9 Dec 2020 13:22:32 +0000 Subject: [PATCH] [thin_restore (rust)] rewrite the btree_builder Now copes with adding shared leaves. --- src/pdata/btree_builder.rs | 584 ++++++++++++++++++++++++------------- src/pdata/space_map.rs | 17 +- src/thin/restore.rs | 65 ++++- src/write_batcher.rs | 5 +- 4 files changed, 465 insertions(+), 206 deletions(-) diff --git a/src/pdata/btree_builder.rs b/src/pdata/btree_builder.rs index 05567cd..4403cc7 100644 --- a/src/pdata/btree_builder.rs +++ b/src/pdata/btree_builder.rs @@ -13,6 +13,56 @@ use crate::write_batcher::*; //------------------------------------------ +/// A little ref counter abstraction. Used to manage counts for btree +/// values (eg, the block/time in a thin mapping tree). +pub trait RefCounter { + fn get(&self, v: &Value) -> Result; + fn inc(&mut self, v: &Value) -> Result<()>; + fn dec(&mut self, v: &Value) -> Result<()>; +} + +/// Wraps a space map up to become a RefCounter. +struct SMRefCounter { + sm: Arc>, +} + +impl RefCounter for SMRefCounter { + fn get(&self, v: &u64) -> Result { + self.sm.lock().unwrap().get(*v) + } + + fn inc(&mut self, v: &u64) -> Result<()> { + self.sm.lock().unwrap().inc(*v, 1) + } + + fn dec(&mut self, v: &u64) -> Result<()> { + self.sm.lock().unwrap().dec(*v)?; + Ok(()) + } +} + +//------------------------------------------ + +// Building a btree for a given set of values is straight forward. +// But often we want to merge shared subtrees into the btree we're +// building, which _is_ complicated. Requiring rebalancing of nodes, +// and careful copy-on-write operations so we don't disturb the shared +// subtree. +// +// To avoid these problems this code never produces shared internal nodes. +// With the large fan out of btrees this isn't really a problem; we'll +// allocate more nodes than optimum, but not many compared to the number +// of leaves. Also we can pack the leaves much better than the kernel +// does due to out of order insertions. +// +// There are thus two stages to building a btree. +// +// i) Produce a list of populated leaves. These leaves may well be shared. +// ii) Build the upper levels of the btree above the leaves. + +//------------------------------------------ + +/// Pack the given node ready to write to disk. fn pack_node(node: &Node, w: &mut W) -> Result<()> { match node { Node::Internal { @@ -58,100 +108,13 @@ fn pack_node(node: &Node, w: &mut W) -> R Ok(()) } -//------------------------------------------ - -fn calc_max_entries() -> usize { - let elt_size = 8 + V::disk_size() as usize; - ((BLOCK_SIZE - NodeHeader::disk_size() as usize) / elt_size) as usize +pub struct WriteResult { + first_key: u64, + loc: u64, } -//------------------------------------------ - -struct Entries { - pub max_entries: usize, - entries: VecDeque<(u64, V)>, -} - -enum Action { - EmitNode(Vec, Vec), // keys, values -} - -use Action::*; - -impl Entries { - pub fn new(max_entries: usize) -> Entries { - Entries { - max_entries, - entries: VecDeque::new(), - } - } - - pub fn add_entry(&mut self, k: u64, v: V) -> Vec> { - let mut result = Vec::new(); - - if self.full() { - let (keys, values) = self.pop(self.max_entries); - result.push(EmitNode(keys, values)); - } - - self.entries.push_back((k, v)); - result - } - - fn complete_(&mut self, result: &mut Vec>) { - let n = self.entries.len(); - - if n >= self.max_entries { - let n1 = n / 2; - let n2 = n - n1; - let (keys1, values1) = self.pop(n1); - let (keys2, values2) = self.pop(n2); - - result.push(EmitNode(keys1, values1)); - result.push(EmitNode(keys2, values2)); - } else if n > 0 { - let (keys, values) = self.pop(n); - result.push(EmitNode(keys, values)); - } - } - - pub fn complete(&mut self) -> Vec> { - let mut result = Vec::new(); - self.complete_(&mut result); - result - } - - fn full(&self) -> bool { - self.entries.len() >= 2 * self.max_entries - } - - fn pop(&mut self, count: usize) -> (Vec, Vec) { - let mut keys = Vec::new(); - let mut values = Vec::new(); - - for _i in 0..count { - let (k, v) = self.entries.pop_front().unwrap(); - keys.push(k); - values.push(v); - } - - (keys, values) - } -} - -//------------------------------------------ - -#[allow(dead_code)] -pub struct NodeSummary { - block: u64, - nr_entries: usize, - key_low: u64, - key_high: u64, // inclusive -} - -//------------------------------------------ - -fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Result<(u64, u64)> { +/// Write a node to a free metadata block. +fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Result { let keys = node.get_keys(); let first_key = keys.first().unwrap_or(&0u64).clone(); @@ -163,155 +126,380 @@ fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Res pack_node(&node, &mut cursor)?; w.write(b, checksum::BT::NODE)?; - Ok((first_key, loc)) + Ok(WriteResult { first_key, loc }) } -fn write_leaf( - w: &mut WriteBatcher, - keys: Vec, - values: Vec, -) -> Result<(u64, u64)> { - let header = NodeHeader { - block: 0, - is_leaf: true, - nr_entries: keys.len() as u32, - max_entries: calc_max_entries::() as u32, - value_size: V::disk_size(), - }; - - let node = Node::Leaf { - header, - keys, - values, - }; - - write_node_(w, node) +/// A node writer takes a Vec of values and packs them into +/// a btree node. It's up to the specific implementation to +/// decide if it produces internal or leaf nodes. +pub trait NodeIO { + fn write(&self, w: &mut WriteBatcher, keys: Vec, values: Vec) -> Result; + fn read( + &self, + engine: &Arc, + block: u64, + ) -> Result<(Vec, Vec)>; } -fn write_internal(w: &mut WriteBatcher, keys: Vec, values: Vec) -> Result<(u64, u64)> { - let header = NodeHeader { - block: 0, - is_leaf: false, - nr_entries: keys.len() as u32, - max_entries: calc_max_entries::() as u32, - value_size: u64::disk_size(), - }; +struct LeafIO {} - let node: Node = Node::Internal { - header, - keys, - values, - }; +impl NodeIO for LeafIO { + fn write(&self, w: &mut WriteBatcher, keys: Vec, values: Vec) -> Result { + let header = NodeHeader { + block: 0, + is_leaf: true, + nr_entries: keys.len() as u32, + max_entries: calc_max_entries::() as u32, + value_size: V::disk_size(), + }; - write_node_(w, node) + let node = Node::Leaf { + header, + keys, + values, + }; + + write_node_(w, node) + } + + fn read( + &self, + engine: &Arc, + block: u64, + ) -> Result<(Vec, Vec)> { + let b = engine.read(block)?; + let path = Vec::new(); + match unpack_node::(&path, b.get_data(), true, true)? { + Node::Internal { .. } => { + panic!("unexpected internal node"); + } + Node::Leaf { keys, values, .. } => Ok((keys, values)), + } + } } -pub struct Builder { - w: WriteBatcher, - entries: Entries, +struct InternalIO {} - max_internal_entries: usize, - internal_entries: Vec>, +impl NodeIO for InternalIO { + fn write(&self, w: &mut WriteBatcher, keys: Vec, values: Vec) -> Result { + let header = NodeHeader { + block: 0, + is_leaf: false, + nr_entries: keys.len() as u32, + max_entries: calc_max_entries::() as u32, + value_size: u64::disk_size(), + }; - root: u64, + let node: Node = Node::Internal { + header, + keys, + values, + }; + + write_node_(w, node) + } + + fn read( + &self, + engine: &Arc, + block: u64, + ) -> Result<(Vec, Vec)> { + let b = engine.read(block)?; + let path = Vec::new(); + match unpack_node::(&path, b.get_data(), true, true)? { + Node::Internal { keys, values, .. } => Ok((keys, values)), + Node::Leaf { .. } => { + panic!("unexpected leaf node"); + } + } + } } -impl Builder { +//------------------------------------------ + +/// What is the maximum number of entries of a given size we can fit in +/// a btree node? +fn calc_max_entries() -> usize { + let elt_size = 8 + V::disk_size() as usize; + ((BLOCK_SIZE - NodeHeader::disk_size() as usize) / elt_size) as usize +} + +//------------------------------------------ + +/// This takes a sequence of values or nodes, and builds a vector of leaf nodes. +/// Care is taken to make sure that all nodes are at least half full unless there's +/// only a single node. +pub struct NodeBuilder { + batcher: WriteBatcher, + nio: Box>, + value_rc: Box>, + max_entries_per_node: usize, + values: VecDeque<(u64, V)>, + nodes: Vec, +} + +/// When the builder is including pre-built nodes it has to decide whether +/// to use the node as given, or read it and import the values directly +/// for balancing reasons. This struct is used to stop us re-reading +/// the NodeHeaders of nodes that are shared multiple times. +#[derive(Clone)] +pub struct NodeSummary { + block: u64, + key: u64, + nr_entries: usize, + + /// This node was passed in pre-built. Important for deciding if + /// we need to adjust the ref counts if we unpack. + shared: bool, +} + +impl NodeBuilder { + /// Create a new NodeBuilder pub fn new( - engine: Arc, - sm: Arc>, - ) -> Builder { - let max_entries = calc_max_entries::(); - let max_internal_entries = calc_max_entries::(); - - Builder { - w: WriteBatcher::new(engine, sm, 256), - entries: Entries::new(max_entries), - max_internal_entries, - internal_entries: Vec::new(), - root: 0, + batcher: WriteBatcher, + nio: Box>, + value_rc: Box>, + ) -> Self { + NodeBuilder { + batcher, + nio, + value_rc, + max_entries_per_node: calc_max_entries::(), + values: VecDeque::new(), + nodes: Vec::new(), } } - pub fn add_entry(&mut self, k: u64, v: V) -> Result<()> { - let actions = self.entries.add_entry(k, v); - for a in actions { - self.perform_action(a)?; + /// Push a single value. This may emit a new node, hence the Result + /// return type. The value's ref count will be incremented. + pub fn push_value(&mut self, key: u64, val: V) -> Result<()> { + // Have we got enough values to emit a node? We try and keep + // at least max_entries_per_node entries unflushed so we + // can ensure the final node is balanced properly. + if self.values.len() == self.max_entries_per_node * 2 { + self.emit_node()?; } + self.value_rc.inc(&val)?; + self.values.push_back((key, val)); Ok(()) } - pub fn add_leaf_node(&mut self, leaf: &NodeSummary) -> Result<()> { - match leaf.nr_entries { - n if n == 0 => { - // Do nothing - }, - n if n < (self.entries.max_entries / 2) => { - // FIXME: what if we've already queued a handful of entries for a node? - // Add the entries individually - todo!(); - }, - _n => { - let actions = self.entries.complete(); - for a in actions { - self.perform_action(a)?; - } - self.add_internal_entry(0, leaf.key_low, leaf.block)?; + /// Push a number of prebuilt, shared nodes. The builder may decide to not + /// use a shared node, instead reading the values and packing them + /// directly. This may do IO to emit nodes, so returns a Result. + /// Any shared nodes that are used have their block incremented in + /// the space map. Will only increment the ref count for values + /// contained in the nodes if it unpacks them. + pub fn push_nodes(&mut self, nodes: &Vec) -> Result<()> { + assert!(nodes.len() > 0); + + // As a sanity check we make sure that all the shared nodes contain the + // minimum nr of entries. + let half_full = self.max_entries_per_node / 2; + for n in nodes { + if n.nr_entries < half_full { + panic!("under populated node"); + } + } + + // Decide if we're going to use the pre-built nodes. + if self.values.len() < half_full { + // To avoid writing an under populated node we have to grab some + // values from the first of the shared nodes. + let (keys, values) = self.read_node(nodes.get(0).unwrap().block)?; + + for i in 0..keys.len() { + self.value_rc.inc(&values[i])?; + self.values.push_back((keys[i], values[i].clone())); + } + + // Flush all the values. + self.emit_all()?; + + // Add the remaining nodes. + for i in 1..nodes.len() { + let n = nodes.get(i).unwrap(); + self.batcher.sm.lock().unwrap().inc(n.block, 1)?; + self.nodes.push(n.clone()); + } + } else { + // Flush all the values. + self.emit_all()?; + + // add the nodes + for n in nodes { + self.batcher.sm.lock().unwrap().inc(n.block, 1)?; + self.nodes.push(n.clone()); } } Ok(()) } - pub fn complete(mut self) -> Result { - let actions = self.entries.complete(); - for a in actions { - self.perform_action(a)?; + /// Signal that no more values or nodes will be pushed. Returns a + /// vector of the built nodes. Consumes the builder. + pub fn complete(mut self) -> Result> { + let half_full = self.max_entries_per_node / 2; + + if (self.nodes.len() > 0) && (self.values.len() < half_full) { + // We don't have enough values to emit a node. So we're going to + // have to rebalance with the previous node. + self.unshift_node()?; } - self.w.flush()?; - Ok(self.root) + + self.emit_all()?; + Ok(self.nodes) } - //-------------------- + //------------------------- - fn add_internal_entry(&mut self, level: usize, k: u64, v: u64) -> Result<()> { - if self.internal_entries.len() == level { - self.internal_entries - .push(Entries::new(self.max_internal_entries)); + // We're only interested in the keys and values from the node, and + // not whether it's a leaf or internal node. + fn read_node(&self, block: u64) -> Result<(Vec, Vec)> { + self.nio.read(&self.batcher.engine, block) + } + + /// Writes a node with the first 'nr_entries' values. + fn emit_values(&mut self, nr_entries: usize) -> Result<()> { + assert!(self.values.len() <= nr_entries); + + // Write the node + let mut keys = Vec::new(); + let mut values = Vec::new(); + + for _i in 0..nr_entries { + let (k, v) = self.values.pop_front().unwrap(); + keys.push(k); + values.push(v); } - let actions = self.internal_entries[level].add_entry(k, v); - - for a in actions { - self.perform_internal_action(level, a)?; - } + let wresult = self.nio.write(&mut self.batcher, keys, values)?; + // Push a summary to the 'nodes' vector. + self.nodes.push(NodeSummary { + block: wresult.loc, + key: wresult.first_key, + nr_entries, + shared: false, + }); Ok(()) } - fn perform_internal_action(&mut self, level: usize, action: Action) -> Result<()> { - match action { - EmitNode(keys, values) => { - let (k, loc) = write_internal(&mut self.w, keys, values)?; - self.add_internal_entry(level + 1, k, loc)?; - self.root = loc; - }, - } - - Ok(()) + /// Writes a full node. + fn emit_node(&mut self) -> Result<()> { + self.emit_values(self.max_entries_per_node) } - fn perform_action(&mut self, action: Action) -> Result<()> { - match action { - EmitNode(keys, values) => { - let (k, loc) = write_leaf(&mut self.w, keys, values)?; - self.add_internal_entry(0, k, loc)?; - }, + /// Emits all remaining values. Panics if there are more than 2 * + /// max_entries_per_node values. + fn emit_all(&mut self) -> Result<()> { + match self.values.len() { + 0 => { + // There's nothing to emit + Ok(()) + } + n if n <= self.max_entries_per_node => { + // Emit a single node. + self.emit_values(n) + } + n if n <= self.max_entries_per_node * 2 => { + // Emit two nodes. + let n1 = n / 2; + let n2 = n - n1; + self.emit_values(n1)?; + self.emit_values(n2) + } + _ => { + panic!("self.values shouldn't have more than 2 * max_entries_per_node entries"); + } } + } + + /// Pops the last node, and prepends it's values to 'self.values'. Used + /// to rebalance when we have insufficient values for a final node. The + /// node is decremented in the space map. + fn unshift_node(&mut self) -> Result<()> { + let ls = self.nodes.pop().unwrap(); + let (keys, values) = self.read_node(ls.block)?; + self.batcher.sm.lock().unwrap().dec(ls.block)?; + + let mut vals = VecDeque::new(); + + for i in 0..keys.len() { + // We only need to inc the values if the node was pre built. + if ls.shared { + self.value_rc.inc(&values[i])?; + } + vals.push_back((keys[i], values[i].clone())); + } + + vals.append(&mut self.values); + std::mem::swap(&mut self.values, &mut vals); Ok(()) } } //------------------------------------------ + +pub struct Builder { + engine: Arc, + sm: Arc>, + leaf_builder: NodeBuilder, +} + +const BATCH_SIZE: usize = 128; + +impl Builder { + pub fn new( + engine: Arc, + sm: Arc>, + value_rc: Box>, + ) -> Builder { + Builder { + engine: engine.clone(), + sm: sm.clone(), + leaf_builder: NodeBuilder::new( + WriteBatcher::new(engine.clone(), sm.clone(), BATCH_SIZE), + Box::new(LeafIO {}), + value_rc, + ), + } + } + + pub fn push_value(&mut self, k: u64, v: V) -> Result<()> { + self.leaf_builder.push_value(k, v) + } + + pub fn push_leaves(&mut self, leaves: &Vec) -> Result<()> { + self.leaf_builder.push_nodes(leaves) + } + + pub fn complete(self) -> Result { + let mut nodes = self.leaf_builder.complete()?; + + // Now we iterate, adding layers of internal nodes until we end + // up with a single root. + while nodes.len() > 1 { + let mut builder = NodeBuilder::new( + WriteBatcher::new(self.engine.clone(), self.sm.clone(), BATCH_SIZE), + Box::new(InternalIO {}), + Box::new(SMRefCounter { + sm: self.sm.clone(), + }), + ); + + for n in nodes { + builder.push_value(n.key, n.block)?; + } + + nodes = builder.complete()?; + } + + assert!(nodes.len() == 1); + Ok(nodes[0].block) + } +} + +//------------------------------------------ diff --git a/src/pdata/space_map.rs b/src/pdata/space_map.rs index d060c7b..deba551 100644 --- a/src/pdata/space_map.rs +++ b/src/pdata/space_map.rs @@ -2,8 +2,8 @@ use anyhow::{anyhow, Result}; use byteorder::{LittleEndian, WriteBytesExt}; use fixedbitset::FixedBitSet; use nom::{multi::count, number::complete::*, IResult}; -use std::sync::{Arc, Mutex}; use std::boxed::Box; +use std::sync::{Arc, Mutex}; use crate::io_engine::*; use crate::pdata::unpack::{Pack, Unpack}; @@ -226,13 +226,22 @@ pub trait SpaceMap { fn get_nr_allocated(&self) -> Result; fn get(&self, b: u64) -> Result; - // Returns the old ref count + /// Returns the old ref count fn set(&mut self, b: u64, v: u32) -> Result; fn inc(&mut self, begin: u64, len: u64) -> Result<()>; - // Finds a block with a zero reference count. Increments the - // count. + /// Returns true if the block is now free + fn dec(&mut self, b: u64) -> Result { + let old = self.get(b)?; + assert!(old > 0); + self.set(b, old - 1)?; + + Ok(old == 1) + } + + /// Finds a block with a zero reference count. Increments the + /// count. fn alloc(&mut self) -> Result>; } diff --git a/src/thin/restore.rs b/src/thin/restore.rs index 905393f..5135fab 100644 --- a/src/thin/restore.rs +++ b/src/thin/restore.rs @@ -1,9 +1,62 @@ use anyhow::Result; + +use std::collections::{BTreeMap, BTreeSet}; +use std::fs::OpenOptions; use std::path::Path; use std::sync::Arc; use crate::report::*; +use crate::thin::block_time::*; +use crate::thin::device_detail::*; +use crate::thin::superblock::*; +use crate::thin::xml::{self, *}; + +//------------------------------------------ + +#[derive(Default)] +struct Pass1 { + // +} + +impl MetadataVisitor for Pass1 { + fn superblock_b(&mut self, sb: &xml::Superblock) -> Result { + todo!(); + } + + fn superblock_e(&mut self) -> Result { + todo!(); + } + + fn def_shared_b(&mut self, name: &str) -> Result { + todo!(); + } + + fn def_shared_e(&mut self) -> Result { + todo!(); + } + + fn device_b(&mut self, d: &Device) -> Result { + todo!(); + } + + fn device_e(&mut self) -> Result { + todo!(); + } + + fn map(&mut self, m: &Map) -> Result { + todo!(); + } + + fn ref_shared(&mut self, name: &str) -> Result { + todo!(); + } + + fn eof(&mut self) -> Result { + todo!(); + } +} + //------------------------------------------ pub struct ThinRestoreOptions<'a> { @@ -15,8 +68,16 @@ pub struct ThinRestoreOptions<'a> { //------------------------------------------ -pub fn restore(_opts: ThinRestoreOptions) -> Result<()> { - todo!(); +pub fn restore(opts: ThinRestoreOptions) -> Result<()> { + let input = OpenOptions::new() + .read(true) + .write(false) + .open(opts.input)?; + + let mut pass = Pass1::default(); + xml::read(input, &mut pass)?; + + Ok(()) } //------------------------------------------ diff --git a/src/write_batcher.rs b/src/write_batcher.rs index 029a721..f6ef877 100644 --- a/src/write_batcher.rs +++ b/src/write_batcher.rs @@ -7,9 +7,10 @@ use crate::pdata::space_map::*; //------------------------------------------ +#[derive(Clone)] pub struct WriteBatcher { - engine: Arc, - sm: Arc>, + pub engine: Arc, + pub sm: Arc>, batch_size: usize, queue: Vec,