From 040e3bfc2d750790e5f583f36953e6317fa9b5d1 Mon Sep 17 00:00:00 2001 From: Joe Thornber Date: Wed, 24 Mar 2021 14:20:20 +0000 Subject: [PATCH] Lot's of work on thin_restore --- src/bin/thin_restore.rs | 2 + src/checksum.rs | 1 - src/io_engine.rs | 8 + src/lib.rs | 1 + src/pdata/btree.rs | 2 + src/pdata/btree_builder.rs | 121 +++++------ src/pdata/mod.rs | 1 + src/pdata/space_map.rs | 218 +------------------- src/pdata/space_map_disk.rs | 399 ++++++++++++++++++++++++++++++++++++ src/thin/block_time.rs | 9 + src/thin/check.rs | 1 + src/thin/device_detail.rs | 24 ++- src/thin/dump.rs | 8 +- src/thin/restore.rs | 258 +++++++++++++++++++++-- src/thin/superblock.rs | 94 ++++++--- src/thin/xml.rs | 30 ++- src/write_batcher.rs | 29 ++- 17 files changed, 858 insertions(+), 348 deletions(-) create mode 100644 src/pdata/space_map_disk.rs diff --git a/src/bin/thin_restore.rs b/src/bin/thin_restore.rs index b774480..93f62e9 100644 --- a/src/bin/thin_restore.rs +++ b/src/bin/thin_restore.rs @@ -27,6 +27,7 @@ fn main() { .help("Specify the input xml") .short("i") .long("input") + .value_name("INPUT") .required(true), ) .arg( @@ -34,6 +35,7 @@ fn main() { .help("Specify the output device to check") .short("o") .long("output") + .value_name("OUTPUT") .required(true), ) .arg( diff --git a/src/checksum.rs b/src/checksum.rs index 0ffbd47..edb5e0a 100644 --- a/src/checksum.rs +++ b/src/checksum.rs @@ -6,7 +6,6 @@ use std::io::Cursor; const BLOCK_SIZE: u64 = 4096; #[allow(dead_code)] -const MAGIC: u64 = 0xa537a0aa6309ef77; const SUPERBLOCK_CSUM_XOR: u32 = 160774; const BITMAP_CSUM_XOR: u32 = 240779; const INDEX_CSUM_XOR: u32 = 160478; diff --git a/src/io_engine.rs b/src/io_engine.rs index 879c294..89b38d6 100644 --- a/src/io_engine.rs +++ b/src/io_engine.rs @@ -26,6 +26,8 @@ pub struct Block { } impl Block { + // Creates a new block that corresponds to the given location. The + // memory is not initialised. pub fn new(loc: u64) -> Block { let layout = Layout::from_size_align(BLOCK_SIZE, ALIGN).unwrap(); let ptr = unsafe { alloc(layout) }; @@ -42,6 +44,12 @@ impl Block { pub fn get_data<'a>(&self) -> &'a mut [u8] { unsafe { std::slice::from_raw_parts_mut::<'a>(self.data, BLOCK_SIZE) } } + + pub fn zero(&mut self) { + unsafe { + std::ptr::write_bytes(self.data, 0, BLOCK_SIZE); + } + } } impl Drop for Block { diff --git a/src/lib.rs b/src/lib.rs index 451257f..0ff3841 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -19,6 +19,7 @@ pub mod cache; pub mod checksum; pub mod file_utils; pub mod io_engine; +pub mod math; pub mod pack; pub mod pdata; pub mod report; diff --git a/src/pdata/btree.rs b/src/pdata/btree.rs index 4351456..833ff50 100644 --- a/src/pdata/btree.rs +++ b/src/pdata/btree.rs @@ -539,6 +539,7 @@ pub fn unpack_node( } if !is_root { + /* let min = header.max_entries / 3; if header.nr_entries < min { return Err(node_err_s( @@ -549,6 +550,7 @@ pub fn unpack_node( ), )); } + */ } } diff --git a/src/pdata/btree_builder.rs b/src/pdata/btree_builder.rs index 2daf04f..cbae889 100644 --- a/src/pdata/btree_builder.rs +++ b/src/pdata/btree_builder.rs @@ -21,6 +21,14 @@ pub trait RefCounter { fn dec(&mut self, v: &Value) -> Result<()>; } +pub struct NoopRC {} + +impl RefCounter for NoopRC { + fn get(&self, _v: &Value) -> Result {Ok(0)} + fn inc(&mut self, _v: &Value) -> Result<()> {Ok(())} + fn dec(&mut self, _v: &Value) -> Result<()> {Ok(())} +} + /// Wraps a space map up to become a RefCounter. struct SMRefCounter { sm: Arc>, @@ -126,12 +134,12 @@ fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Res let keys = node.get_keys(); let first_key = keys.first().unwrap_or(&0u64).clone(); - let loc = w.alloc()?; - node.set_block(loc); + let b = w.alloc()?; + node.set_block(b.loc); - let b = Block::new(loc); let mut cursor = Cursor::new(b.get_data()); pack_node(&node, &mut cursor)?; + let loc = b.loc; w.write(b, checksum::BT::NODE)?; Ok(WriteResult { first_key, loc }) @@ -149,7 +157,7 @@ pub trait NodeIO { ) -> Result<(Vec, Vec)>; } -struct LeafIO {} +pub struct LeafIO {} impl NodeIO for LeafIO { fn write(&self, w: &mut WriteBatcher, keys: Vec, values: Vec) -> Result { @@ -229,7 +237,6 @@ impl NodeIO for InternalIO { /// Care is taken to make sure that all nodes are at least half full unless there's /// only a single node. pub struct NodeBuilder { - batcher: WriteBatcher, nio: Box>, value_rc: Box>, max_entries_per_node: usize, @@ -252,15 +259,13 @@ pub struct NodeSummary { shared: bool, } -impl NodeBuilder { +impl<'a, V: Pack + Unpack + Clone> NodeBuilder { /// Create a new NodeBuilder pub fn new( - batcher: WriteBatcher, nio: Box>, value_rc: Box>, ) -> Self { NodeBuilder { - batcher, nio, value_rc, max_entries_per_node: calc_max_entries::(), @@ -270,12 +275,12 @@ impl NodeBuilder { } /// Push a single value. This may emit a new node, hence the Result /// return type. The value's ref count will be incremented. - pub fn push_value(&mut self, key: u64, val: V) -> Result<()> { + pub fn push_value(&mut self, w: &mut WriteBatcher, key: u64, val: V) -> Result<()> { // Have we got enough values to emit a node? We try and keep // at least max_entries_per_node entries unflushed so we // can ensure the final node is balanced properly. if self.values.len() == self.max_entries_per_node * 2 { - self.emit_node()?; + self.emit_node(w)?; } self.value_rc.inc(&val)?; @@ -289,7 +294,7 @@ impl NodeBuilder { /// Any shared nodes that are used have their block incremented in /// the space map. Will only increment the ref count for values /// contained in the nodes if it unpacks them. - pub fn push_nodes(&mut self, nodes: &Vec) -> Result<()> { + pub fn push_nodes(&mut self, w: &mut WriteBatcher, nodes: &Vec) -> Result<()> { assert!(nodes.len() > 0); // As a sanity check we make sure that all the shared nodes contain the @@ -305,7 +310,7 @@ impl NodeBuilder { if self.values.len() < half_full { // To avoid writing an under populated node we have to grab some // values from the first of the shared nodes. - let (keys, values) = self.read_node(nodes.get(0).unwrap().block)?; + let (keys, values) = self.read_node(w, nodes.get(0).unwrap().block)?; for i in 0..keys.len() { self.value_rc.inc(&values[i])?; @@ -313,21 +318,21 @@ impl NodeBuilder { } // Flush all the values. - self.emit_all()?; + self.emit_all(w)?; // Add the remaining nodes. for i in 1..nodes.len() { let n = nodes.get(i).unwrap(); - self.batcher.sm.lock().unwrap().inc(n.block, 1)?; + w.sm.lock().unwrap().inc(n.block, 1)?; self.nodes.push(n.clone()); } } else { // Flush all the values. - self.emit_all()?; + self.emit_all(w)?; // add the nodes for n in nodes { - self.batcher.sm.lock().unwrap().inc(n.block, 1)?; + w.sm.lock().unwrap().inc(n.block, 1)?; self.nodes.push(n.clone()); } } @@ -337,16 +342,21 @@ impl NodeBuilder { /// Signal that no more values or nodes will be pushed. Returns a /// vector of the built nodes. Consumes the builder. - pub fn complete(mut self) -> Result> { + pub fn complete(mut self, w: &mut WriteBatcher) -> Result> { let half_full = self.max_entries_per_node / 2; if (self.nodes.len() > 0) && (self.values.len() < half_full) { // We don't have enough values to emit a node. So we're going to // have to rebalance with the previous node. - self.unshift_node()?; + self.unshift_node(w)?; } - self.emit_all()?; + self.emit_all(w)?; + + if self.nodes.len() == 0 { + self.emit_empty_leaf(w)? + } + Ok(self.nodes) } @@ -354,13 +364,13 @@ impl NodeBuilder { // We're only interested in the keys and values from the node, and // not whether it's a leaf or internal node. - fn read_node(&self, block: u64) -> Result<(Vec, Vec)> { - self.nio.read(&self.batcher.engine, block) + fn read_node(&self, w: &WriteBatcher, block: u64) -> Result<(Vec, Vec)> { + self.nio.read(&w.engine, block) } /// Writes a node with the first 'nr_entries' values. - fn emit_values(&mut self, nr_entries: usize) -> Result<()> { - assert!(self.values.len() <= nr_entries); + fn emit_values(&mut self, w: &mut WriteBatcher, nr_entries: usize) -> Result<()> { + assert!(nr_entries <= self.values.len()); // Write the node let mut keys = Vec::new(); @@ -372,7 +382,7 @@ impl NodeBuilder { values.push(v); } - let wresult = self.nio.write(&mut self.batcher, keys, values)?; + let wresult = self.nio.write(w, keys, values)?; // Push a summary to the 'nodes' vector. self.nodes.push(NodeSummary { @@ -385,13 +395,13 @@ impl NodeBuilder { } /// Writes a full node. - fn emit_node(&mut self) -> Result<()> { - self.emit_values(self.max_entries_per_node) + fn emit_node(&mut self, w: &mut WriteBatcher) -> Result<()> { + self.emit_values(w, self.max_entries_per_node) } /// Emits all remaining values. Panics if there are more than 2 * /// max_entries_per_node values. - fn emit_all(&mut self) -> Result<()> { + fn emit_all(&mut self, w: &mut WriteBatcher) -> Result<()> { match self.values.len() { 0 => { // There's nothing to emit @@ -399,14 +409,14 @@ impl NodeBuilder { } n if n <= self.max_entries_per_node => { // Emit a single node. - self.emit_values(n) + self.emit_values(w, n) } n if n <= self.max_entries_per_node * 2 => { // Emit two nodes. let n1 = n / 2; let n2 = n - n1; - self.emit_values(n1)?; - self.emit_values(n2) + self.emit_values(w, n1)?; + self.emit_values(w, n2) } _ => { panic!("self.values shouldn't have more than 2 * max_entries_per_node entries"); @@ -414,13 +424,17 @@ impl NodeBuilder { } } + fn emit_empty_leaf(&mut self, w: &mut WriteBatcher) -> Result<()> { + self.emit_values(w, 0) + } + /// Pops the last node, and prepends it's values to 'self.values'. Used /// to rebalance when we have insufficient values for a final node. The /// node is decremented in the space map. - fn unshift_node(&mut self) -> Result<()> { + fn unshift_node(&mut self, w: &mut WriteBatcher) -> Result<()> { let ls = self.nodes.pop().unwrap(); - let (keys, values) = self.read_node(ls.block)?; - self.batcher.sm.lock().unwrap().dec(ls.block)?; + let (keys, values) = self.read_node(w, ls.block)?; + w.sm.lock().unwrap().dec(ls.block)?; let mut vals = VecDeque::new(); @@ -442,57 +456,47 @@ impl NodeBuilder { //------------------------------------------ pub struct Builder { - engine: Arc, - sm: Arc>, leaf_builder: NodeBuilder, } -const BATCH_SIZE: usize = 128; - impl Builder { pub fn new( - engine: Arc, - sm: Arc>, value_rc: Box>, ) -> Builder { Builder { - engine: engine.clone(), - sm: sm.clone(), leaf_builder: NodeBuilder::new( - WriteBatcher::new(engine.clone(), sm.clone(), BATCH_SIZE), Box::new(LeafIO {}), value_rc, ), } } - pub fn push_value(&mut self, k: u64, v: V) -> Result<()> { - self.leaf_builder.push_value(k, v) + pub fn push_value(&mut self, w: &mut WriteBatcher, k: u64, v: V) -> Result<()> { + self.leaf_builder.push_value(w, k, v) } - pub fn push_leaves(&mut self, leaves: &Vec) -> Result<()> { - self.leaf_builder.push_nodes(leaves) + pub fn push_leaves(&mut self, w: &mut WriteBatcher, leaves: &Vec) -> Result<()> { + self.leaf_builder.push_nodes(w, leaves) } - pub fn complete(self) -> Result { - let mut nodes = self.leaf_builder.complete()?; + pub fn complete(self, w: &mut WriteBatcher) -> Result { + let mut nodes = self.leaf_builder.complete(w)?; // Now we iterate, adding layers of internal nodes until we end // up with a single root. while nodes.len() > 1 { let mut builder = NodeBuilder::new( - WriteBatcher::new(self.engine.clone(), self.sm.clone(), BATCH_SIZE), Box::new(InternalIO {}), Box::new(SMRefCounter { - sm: self.sm.clone(), + sm: w.sm.clone(), }), ); for n in nodes { - builder.push_value(n.key, n.block)?; + builder.push_value(w, n.key, n.block)?; } - nodes = builder.complete()?; + nodes = builder.complete(w)?; } assert!(nodes.len() == 1); @@ -502,16 +506,3 @@ impl Builder { //------------------------------------------ -/* -======= -fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Result<(u64, u64)> { - let keys = node.get_keys(); - let first_key = *keys.first().unwrap_or(&0u64); ->>>>>>> main - -======= -fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Result<(u64, u64)> { - let keys = node.get_keys(); - let first_key = *keys.first().unwrap_or(&0u64); ->>>>>>> main -*/ diff --git a/src/pdata/mod.rs b/src/pdata/mod.rs index 370d244..70d5d39 100644 --- a/src/pdata/mod.rs +++ b/src/pdata/mod.rs @@ -7,5 +7,6 @@ pub mod btree_merge; pub mod btree_leaf_walker; pub mod btree_walker; pub mod space_map; +pub mod space_map_disk; pub mod unpack; diff --git a/src/pdata/space_map.rs b/src/pdata/space_map.rs index deba551..5f1f2f0 100644 --- a/src/pdata/space_map.rs +++ b/src/pdata/space_map.rs @@ -1,224 +1,8 @@ -use anyhow::{anyhow, Result}; -use byteorder::{LittleEndian, WriteBytesExt}; +use anyhow::Result; use fixedbitset::FixedBitSet; -use nom::{multi::count, number::complete::*, IResult}; use std::boxed::Box; use std::sync::{Arc, Mutex}; -use crate::io_engine::*; -use crate::pdata::unpack::{Pack, Unpack}; - -//------------------------------------------ - -#[derive(Debug)] -pub struct SMRoot { - pub nr_blocks: u64, - pub nr_allocated: u64, - pub bitmap_root: u64, - pub ref_count_root: u64, -} - -pub fn unpack_root(data: &[u8]) -> Result { - match SMRoot::unpack(data) { - Err(_e) => Err(anyhow!("couldn't parse SMRoot")), - Ok((_i, v)) => Ok(v), - } -} - -impl Unpack for SMRoot { - fn disk_size() -> u32 { - 32 - } - - fn unpack(data: &[u8]) -> IResult<&[u8], SMRoot> { - let (i, nr_blocks) = le_u64(data)?; - let (i, nr_allocated) = le_u64(i)?; - let (i, bitmap_root) = le_u64(i)?; - let (i, ref_count_root) = le_u64(i)?; - - Ok(( - i, - SMRoot { - nr_blocks, - nr_allocated, - bitmap_root, - ref_count_root, - }, - )) - } -} - -//------------------------------------------ - -#[derive(Clone, Copy, Debug)] -pub struct IndexEntry { - pub blocknr: u64, - pub nr_free: u32, - pub none_free_before: u32, -} - -impl Unpack for IndexEntry { - fn disk_size() -> u32 { - 16 - } - - fn unpack(data: &[u8]) -> IResult<&[u8], Self> { - let (i, blocknr) = le_u64(data)?; - let (i, nr_free) = le_u32(i)?; - let (i, none_free_before) = le_u32(i)?; - - Ok(( - i, - IndexEntry { - blocknr, - nr_free, - none_free_before, - }, - )) - } -} - -//------------------------------------------ - -pub const MAX_METADATA_BITMAPS: usize = 255; - -pub struct MetadataIndex { - pub indexes: Vec, -} - -impl Unpack for MetadataIndex { - fn disk_size() -> u32 { - BLOCK_SIZE as u32 - } - - fn unpack(data: &[u8]) -> IResult<&[u8], Self> { - let (i, _csum) = le_u32(data)?; - let (i, _padding) = le_u32(i)?; - let (i, _blocknr) = le_u64(i)?; - let (i, indexes) = count(IndexEntry::unpack, MAX_METADATA_BITMAPS)(i)?; - - Ok((i, MetadataIndex { indexes })) - } -} - -//------------------------------------------ - -#[derive(Debug)] -pub struct BitmapHeader { - pub csum: u32, - pub not_used: u32, - pub blocknr: u64, -} - -impl Unpack for BitmapHeader { - fn disk_size() -> u32 { - 16 - } - - fn unpack(data: &[u8]) -> IResult<&[u8], Self> { - let (i, csum) = le_u32(data)?; - let (i, not_used) = le_u32(i)?; - let (i, blocknr) = le_u64(i)?; - - Ok(( - i, - BitmapHeader { - csum, - not_used, - blocknr, - }, - )) - } -} - -impl Pack for BitmapHeader { - fn pack(&self, out: &mut W) -> Result<()> { - out.write_u32::(self.csum)?; - out.write_u32::(self.not_used)?; - out.write_u64::(self.blocknr)?; - Ok(()) - } -} - -#[derive(Clone, Debug, PartialEq)] -pub enum BitmapEntry { - Small(u8), - Overflow, -} - -#[derive(Debug)] -pub struct Bitmap { - pub header: BitmapHeader, - pub entries: Vec, -} - -impl Unpack for Bitmap { - fn disk_size() -> u32 { - BLOCK_SIZE as u32 - } - - fn unpack(data: &[u8]) -> IResult<&[u8], Self> { - let (mut i, header) = BitmapHeader::unpack(data)?; - - let nr_words = (BLOCK_SIZE - BitmapHeader::disk_size() as usize) / 8; - let mut entries = Vec::with_capacity(nr_words * 32); - for _w in 0..nr_words { - let (tmp, mut word) = le_u64(i)?; - - for _b in 0..32 { - let val = word & 0x3; - word >>= 2; - - // The bits are stored with the high bit at b * 2 + 1, - // and low at b *2. So we have to interpret this val. - entries.push(match val { - 0 => BitmapEntry::Small(0), - 1 => BitmapEntry::Small(2), - 2 => BitmapEntry::Small(1), - _ => BitmapEntry::Overflow, - }); - } - - i = tmp; - } - - Ok((i, Bitmap { header, entries })) - } -} - -impl Pack for Bitmap { - fn pack(&self, out: &mut W) -> Result<()> { - use BitmapEntry::*; - BitmapHeader::pack(&self.header, out)?; - - for chunk in self.entries.chunks(32) { - let mut w = 0u64; - for e in chunk { - w >>= 2; - match e { - Small(0) => {} - Small(1) => { - w |= 0x2 << 62; - } - Small(2) => { - w |= 0x1 << 62; - } - Small(_) => { - return Err(anyhow!("Bad small value in bitmap entry")); - } - Overflow => { - w |= 0x3 << 62; - } - } - } - - u64::pack(&w, out)?; - } - - Ok(()) - } -} - //------------------------------------------ pub trait SpaceMap { diff --git a/src/pdata/space_map_disk.rs b/src/pdata/space_map_disk.rs new file mode 100644 index 0000000..b89de29 --- /dev/null +++ b/src/pdata/space_map_disk.rs @@ -0,0 +1,399 @@ +use anyhow::{anyhow, Result}; +use byteorder::{LittleEndian, WriteBytesExt}; +use nom::{number::complete::*, IResult}; +use std::io::Cursor; +use std::collections::BTreeMap; + +use crate::checksum; +use crate::io_engine::*; +use crate::math::*; +use crate::pdata::btree_builder::*; +use crate::pdata::space_map::*; +use crate::pdata::unpack::*; +use crate::write_batcher::*; + +//-------------------------------- + +const MAX_METADATA_BITMAPS: usize = 255; +// const MAX_METADATA_BLOCKS: u64 = 255 * ((1 << 14) - 64); +const ENTRIES_PER_BYTE: usize = 4; +const ENTRIES_PER_BITMAP: usize = WORDS_PER_BITMAP * 8 * ENTRIES_PER_BYTE; + +//-------------------------------- + +#[derive(Clone, Copy, Debug)] +pub struct IndexEntry { + pub blocknr: u64, + pub nr_free: u32, + pub none_free_before: u32, +} + +impl Unpack for IndexEntry { + fn disk_size() -> u32 { + 16 + } + + fn unpack(i: &[u8]) -> IResult<&[u8], IndexEntry> { + let (i, blocknr) = le_u64(i)?; + let (i, nr_free) = le_u32(i)?; + let (i, none_free_before) = le_u32(i)?; + + Ok(( + i, + IndexEntry { + blocknr, + nr_free, + none_free_before, + }, + )) + } +} + +impl Pack for IndexEntry { + fn pack(&self, w: &mut W) -> Result<()> { + w.write_u64::(self.blocknr)?; + w.write_u32::(self.nr_free)?; + w.write_u32::(self.none_free_before)?; + Ok(()) + } +} + +//-------------------------------- + +pub struct MetadataIndex { + pub blocknr: u64, + pub indexes: Vec, +} + +impl Unpack for MetadataIndex { + fn disk_size() -> u32 { + BLOCK_SIZE as u32 + } + + fn unpack(i: &[u8]) -> IResult<&[u8], MetadataIndex> { + // FIXME: check the checksum + let (i, _csum) = le_u32(i)?; + let (i, _padding) = le_u32(i)?; + let (i, blocknr) = le_u64(i)?; + let (i, indexes) = nom::multi::count(IndexEntry::unpack, MAX_METADATA_BITMAPS)(i)?; + + Ok((i, MetadataIndex { blocknr, indexes })) + } +} + +impl Pack for MetadataIndex { + fn pack(&self, w: &mut W) -> Result<()> { + w.write_u32::(0)?; // csum + w.write_u32::(0)?; // padding + w.write_u64::(self.blocknr)?; + + assert!(self.indexes.len() <= MAX_METADATA_BITMAPS); + + for ie in &self.indexes { + ie.pack(w)?; + } + + Ok(()) + } +} + +//-------------------------------- + +const WORDS_PER_BITMAP: usize = (BLOCK_SIZE - 16) / 8; + +#[derive(Clone, Copy, Debug, PartialEq, Eq)] +pub enum BitmapEntry { + Small(u8), + Overflow, +} + +#[derive(Debug)] +pub struct Bitmap { + pub blocknr: u64, + pub entries: Vec, +} + +impl Unpack for Bitmap { + fn disk_size() -> u32 { + BLOCK_SIZE as u32 + } + + fn unpack(data: &[u8]) -> IResult<&[u8], Self> { + let (i, _csum) = le_u32(data)?; + let (i, _not_used) = le_u32(i)?; + let (mut i, blocknr) = le_u64(i)?; + + let header_size = 16; + let nr_words = (BLOCK_SIZE - header_size) / 8; + let mut entries = Vec::with_capacity(nr_words * 32); + for _w in 0..nr_words { + let (tmp, mut word) = le_u64(i)?; + + for _b in 0..32 { + let val = word & 0x3; + word >>= 2; + + // The bits are stored with the high bit at b * 2 + 1, + // and low at b *2. So we have to interpret this val. + entries.push(match val { + 0 => BitmapEntry::Small(0), + 1 => BitmapEntry::Small(2), + 2 => BitmapEntry::Small(1), + _ => BitmapEntry::Overflow, + }); + } + + i = tmp; + } + + Ok((i, Bitmap { blocknr, entries })) + } +} + +impl Pack for Bitmap { + fn pack(&self, out: &mut W) -> Result<()> { + use BitmapEntry::*; + + out.write_u32::(0)?; + out.write_u32::(0)?; + out.write_u64::(self.blocknr)?; + + for chunk in self.entries.chunks(32) { + let mut w = 0u64; + for e in chunk { + w >>= 2; + match e { + Small(0) => {} + Small(1) => { + w |= 0x2 << 62; + } + Small(2) => { + w |= 0x1 << 62; + } + Small(_) => { + return Err(anyhow!("Bad small value in bitmap entry")); + } + Overflow => { + w |= 0x3 << 62; + } + } + } + + u64::pack(&w, out)?; + } + + Ok(()) + } +} + +//-------------------------------- + +#[derive(Debug)] +pub struct SMRoot { + pub nr_blocks: u64, + pub nr_allocated: u64, + pub bitmap_root: u64, + pub ref_count_root: u64, +} + +impl Unpack for SMRoot { + fn disk_size() -> u32 { + 32 + } + + fn unpack(i: &[u8]) -> IResult<&[u8], Self> { + let (i, nr_blocks) = le_u64(i)?; + let (i, nr_allocated) = le_u64(i)?; + let (i, bitmap_root) = le_u64(i)?; + let (i, ref_count_root) = le_u64(i)?; + + Ok(( + i, + SMRoot { + nr_blocks, + nr_allocated, + bitmap_root, + ref_count_root, + }, + )) + } +} + +pub fn unpack_root(data: &[u8]) -> Result { + match SMRoot::unpack(data) { + Err(_e) => Err(anyhow!("couldn't parse SMRoot")), + Ok((_i, v)) => Ok(v), + } +} + +impl Pack for SMRoot { + fn pack(&self, w: &mut W) -> Result<()> { + w.write_u64::(self.nr_blocks)?; + w.write_u64::(self.nr_allocated)?; + w.write_u64::(self.bitmap_root)?; + w.write_u64::(self.ref_count_root)?; + + Ok(()) + } +} + +//-------------------------------- + +pub fn write_common(w: &mut WriteBatcher, sm: &dyn SpaceMap) -> Result<(Vec, u64)> { + use BitmapEntry::*; + + let mut index_entries = Vec::new(); + let mut overflow_builder: Builder = Builder::new(Box::new(NoopRC {})); + + // how many bitmaps do we need? + for bm in 0..div_up(sm.get_nr_blocks()? as usize, ENTRIES_PER_BITMAP) { + let mut entries = Vec::with_capacity(ENTRIES_PER_BITMAP); + let mut first_free: Option = None; + let mut nr_free: u32 = 0; + for i in 0..ENTRIES_PER_BITMAP { + let b: u64 = ((bm * ENTRIES_PER_BITMAP) as u64) + i as u64; + if b > sm.get_nr_blocks()? { + break; + } + let rc = sm.get(b)?; + let e = match rc { + 0 => { + nr_free += 1; + if first_free.is_none() { + first_free = Some(i as u32); + } + Small(0) + } + 1 => Small(1), + 2 => Small(2), + _ => { + overflow_builder.push_value(w, b as u64, rc)?; + Overflow + } + }; + entries.push(e); + } + + // allocate a new block + let b = w.alloc()?; + let mut cursor = Cursor::new(b.get_data()); + + // write the bitmap to it + let blocknr = b.loc; + let bitmap = Bitmap { blocknr, entries }; + bitmap.pack(&mut cursor)?; + w.write(b, checksum::BT::BITMAP)?; + + // Insert into the index tree + let ie = IndexEntry { + blocknr, + nr_free, + none_free_before: first_free.unwrap_or(ENTRIES_PER_BITMAP as u32), + }; + index_entries.push(ie); + } + + let ref_count_root = overflow_builder.complete(w)?; + Ok((index_entries, ref_count_root)) +} + +pub fn write_disk_sm(w: &mut WriteBatcher, sm: &dyn SpaceMap) -> Result { + let (index_entries, ref_count_root) = write_common(w, sm)?; + + let mut index_builder: Builder = Builder::new(Box::new(NoopRC {})); + for (i, ie) in index_entries.iter().enumerate() { + index_builder.push_value(w, i as u64, *ie)?; + } + + let bitmap_root = index_builder.complete(w)?; + + Ok(SMRoot { + nr_blocks: sm.get_nr_blocks()?, + nr_allocated: sm.get_nr_allocated()?, + bitmap_root, + ref_count_root, + }) +} + +//---------------------------- + +fn block_to_bitmap(b: u64) -> usize { + (b / ENTRIES_PER_BITMAP as u64) as usize +} + +fn adjust_counts(w: &mut WriteBatcher, ie: &IndexEntry, allocs: &[u64]) -> Result { + use BitmapEntry::*; + + let mut first_free = ie.none_free_before; + let mut nr_free = ie.nr_free - allocs.len() as u32; + + // Read the bitmap + let bitmap_block = w.engine.read(ie.blocknr)?; + let (_, mut bitmap) = Bitmap::unpack(bitmap_block.get_data())?; + + // Update all the entries + for a in allocs { + if first_free == *a as u32 { + first_free = *a as u32 + 1; + } + + if bitmap.entries[*a as usize] == Small(0) { + nr_free -= 1; + } + + bitmap.entries[*a as usize] = Small(1); + } + + // Write the bitmap + let mut cur = Cursor::new(bitmap_block.get_data()); + bitmap.pack(&mut cur)?; + w.write(bitmap_block, checksum::BT::BITMAP)?; + + // Return the adjusted index entry + Ok (IndexEntry { + blocknr: ie.blocknr, + nr_free, + none_free_before: first_free, + }) +} + +pub fn write_metadata_sm(w: &mut WriteBatcher, sm: &dyn SpaceMap) -> Result { + w.clear_allocations(); + let (mut indexes, ref_count_root) = write_common(w, sm)?; + + let bitmap_root = w.alloc()?; + + // Now we need to patch up the counts for the metadata that was used for storing + // the space map itself. These ref counts all went from 0 to 1. + let allocations = w.clear_allocations(); + + // Sort the allocations by bitmap + let mut by_bitmap = BTreeMap::new(); + for b in allocations { + let bitmap = block_to_bitmap(b); + (*by_bitmap.entry(bitmap).or_insert(Vec::new())).push(b % ENTRIES_PER_BITMAP as u64); + } + + for (bitmap, allocs) in by_bitmap { + indexes[bitmap] = adjust_counts(w, &indexes[bitmap], &allocs)?; + } + + // Write out the metadata index + let metadata_index = MetadataIndex { + blocknr: bitmap_root.loc, + indexes + }; + let mut cur = Cursor::new(bitmap_root.get_data()); + metadata_index.pack(&mut cur)?; + let loc = bitmap_root.loc; + w.write(bitmap_root, checksum::BT::INDEX)?; + + Ok(SMRoot { + nr_blocks: sm.get_nr_blocks()?, + nr_allocated: sm.get_nr_allocated()?, + bitmap_root: loc, + ref_count_root, + }) +} + +//-------------------------------- diff --git a/src/thin/block_time.rs b/src/thin/block_time.rs index 3afe28c..0df343e 100644 --- a/src/thin/block_time.rs +++ b/src/thin/block_time.rs @@ -1,3 +1,5 @@ +use anyhow::Result; +use byteorder::WriteBytesExt; use nom::{number::complete::*, IResult}; use std::fmt; @@ -31,6 +33,13 @@ impl Unpack for BlockTime { } } +impl Pack for BlockTime { + fn pack(&self, data: &mut W) -> Result<()> { + let bt: u64 = (self.block << 24) | self.time as u64; + bt.pack(data) + } +} + impl fmt::Display for BlockTime { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { write!(f, "{} @ {}", self.block, self.time) diff --git a/src/thin/check.rs b/src/thin/check.rs index cb25257..69223c4 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -12,6 +12,7 @@ use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine}; use crate::pdata::btree::{self, *}; use crate::pdata::btree_walker::*; use crate::pdata::space_map::*; +use crate::pdata::space_map_disk::*; use crate::pdata::unpack::*; use crate::report::*; use crate::thin::block_time::*; diff --git a/src/thin/device_detail.rs b/src/thin/device_detail.rs index b375d88..9292c95 100644 --- a/src/thin/device_detail.rs +++ b/src/thin/device_detail.rs @@ -1,7 +1,9 @@ +use anyhow::Result; +use byteorder::{LittleEndian, WriteBytesExt}; +use nom::{number::complete::*, IResult}; use std::fmt; use crate::pdata::unpack::*; -use nom::{number::complete::*, IResult}; //------------------------------------------ @@ -15,11 +17,11 @@ pub struct DeviceDetail { impl fmt::Display for DeviceDetail { fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { - write!(f, "mapped = {}, trans = {}, create = {}, snap = {}", - self.mapped_blocks, - self.transaction_id, - self.creation_time, - self.snapshotted_time)?; + write!( + f, + "mapped = {}, trans = {}, create = {}, snap = {}", + self.mapped_blocks, self.transaction_id, self.creation_time, self.snapshotted_time + )?; Ok(()) } } @@ -47,4 +49,14 @@ impl Unpack for DeviceDetail { } } +impl Pack for DeviceDetail { + fn pack(&self, w: &mut W) -> Result<()> { + w.write_u64::(self.mapped_blocks)?; + w.write_u64::(self.transaction_id)?; + w.write_u32::(self.creation_time)?; + w.write_u32::(self.snapshotted_time)?; + Ok(()) + } +} + //------------------------------------------ diff --git a/src/thin/dump.rs b/src/thin/dump.rs index 1385902..f12a57a 100644 --- a/src/thin/dump.rs +++ b/src/thin/dump.rs @@ -11,6 +11,7 @@ use crate::pdata::btree::{self, *}; use crate::pdata::btree_leaf_walker::*; use crate::pdata::btree_walker::*; use crate::pdata::space_map::*; +use crate::pdata::space_map_disk::*; use crate::pdata::unpack::*; use crate::report::*; use crate::thin::block_time::*; @@ -287,7 +288,7 @@ fn find_shared_nodes( // We have to get the leaves so w is consumed and the &mut on sm // is dropped. - let leaves = w.get_leaves(); + let _leaves = w.get_leaves(); let mut shared = BTreeSet::new(); { for i in 0..sm.get_nr_blocks().unwrap() { @@ -297,6 +298,8 @@ fn find_shared_nodes( } } +/* + // FIXME: why?!! // we're not interested in leaves (roots will get re-added later). { for i in 0..leaves.len() { @@ -305,6 +308,7 @@ fn find_shared_nodes( } } } + */ Ok((shared, sm)) } @@ -616,9 +620,11 @@ pub fn dump(opts: ThinDumpOptions) -> Result<()> { let sb = read_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION)?; let md = build_metadata(&ctx, &sb)?; +/* ctx.report .set_title("Optimising metadata to improve leaf packing"); let md = optimise_metadata(md)?; + */ dump_metadata(&ctx, &sb, &md) } diff --git a/src/thin/restore.rs b/src/thin/restore.rs index 5135fab..9094b02 100644 --- a/src/thin/restore.rs +++ b/src/thin/restore.rs @@ -1,62 +1,203 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; -use std::collections::{BTreeMap, BTreeSet}; +use std::collections::BTreeMap; use std::fs::OpenOptions; use std::path::Path; use std::sync::Arc; +use crate::io_engine::*; +use crate::pdata::btree_builder::*; +use crate::pdata::space_map::*; use crate::report::*; - use crate::thin::block_time::*; use crate::thin::device_detail::*; -use crate::thin::superblock::*; +use crate::thin::superblock::{self, *}; use crate::thin::xml::{self, *}; +use crate::write_batcher::*; //------------------------------------------ -#[derive(Default)] -struct Pass1 { - // +enum MappedSection { + Def(String), + Dev(u32), } -impl MetadataVisitor for Pass1 { +impl std::fmt::Display for MappedSection { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + match self { + MappedSection::Def(name) => write!(f, "Def {}", name), + MappedSection::Dev(thin_id) => write!(f, "Device {}", thin_id), + } + } +} + +struct Pass1Result { + sb: Option, + devices: BTreeMap)>, +} + +struct Pass1<'a> { + w: &'a mut WriteBatcher, + + current_dev: Option, + sub_trees: BTreeMap>, + + // The builder for the current shared sub tree or device + map: Option<(MappedSection, NodeBuilder)>, + + result: Pass1Result, +} + +impl<'a> Pass1<'a> { + fn new(w: &'a mut WriteBatcher) -> Self { + Pass1 { + w, + current_dev: None, + sub_trees: BTreeMap::new(), + map: None, + result: Pass1Result { + sb: None, + devices: BTreeMap::new(), + }, + } + } + + fn get_result(self) -> Pass1Result { + self.result + } + + fn begin_section(&mut self, section: MappedSection) -> Result { + if let Some((outer, _)) = self.map.as_ref() { + let msg = format!( + "Nested subtrees are not allowed '{}' within '{}'", + section, outer + ); + return Err(anyhow!(msg)); + } + + let value_rc = Box::new(NoopRC {}); + let leaf_builder = NodeBuilder::new(Box::new(LeafIO {}), value_rc); + + self.map = Some((section, leaf_builder)); + Ok(Visit::Continue) + } + + fn end_section(&mut self) -> Result<(MappedSection, Vec)> { + let mut current = None; + std::mem::swap(&mut self.map, &mut current); + + if let Some((name, nodes)) = current { + Ok((name, nodes.complete(self.w)?)) + } else { + let msg = format!("Unbalanced tag"); + Err(anyhow!(msg)) + } + } +} + +impl<'a> MetadataVisitor for Pass1<'a> { fn superblock_b(&mut self, sb: &xml::Superblock) -> Result { - todo!(); + self.result.sb = Some(sb.clone()); + Ok(Visit::Continue) } fn superblock_e(&mut self) -> Result { - todo!(); + Ok(Visit::Continue) } fn def_shared_b(&mut self, name: &str) -> Result { - todo!(); + self.begin_section(MappedSection::Def(name.to_string())) } fn def_shared_e(&mut self) -> Result { - todo!(); + if let (MappedSection::Def(name), nodes) = self.end_section()? { + self.sub_trees.insert(name, nodes); + Ok(Visit::Continue) + } else { + Err(anyhow!("unexpected ")) + } } fn device_b(&mut self, d: &Device) -> Result { - todo!(); + self.current_dev = Some(DeviceDetail { + mapped_blocks: d.mapped_blocks, + transaction_id: d.transaction, + creation_time: d.creation_time as u32, + snapshotted_time: d.snap_time as u32, + }); + self.begin_section(MappedSection::Dev(d.dev_id)) } fn device_e(&mut self) -> Result { - todo!(); + if let Some(detail) = self.current_dev.take() { + if let (MappedSection::Dev(thin_id), nodes) = self.end_section()? { + self.result.devices.insert(thin_id, (detail, nodes)); + Ok(Visit::Continue) + } else { + Err(anyhow!("internal error, couldn't find device details")) + } + } else { + Err(anyhow!("unexpected ")) + } } fn map(&mut self, m: &Map) -> Result { - todo!(); + if let Some((_name, _builder)) = self.map.as_mut() { + for i in 0..m.len { + let bt = BlockTime { + block: m.data_begin + i, + time: m.time, + }; + let (_, builder) = self.map.as_mut().unwrap(); + builder.push_value(self.w, m.thin_begin + i, bt)?; + } + Ok(Visit::Continue) + } else { + let msg = format!("Mapping tags must appear within a or tag."); + Err(anyhow!(msg)) + } } fn ref_shared(&mut self, name: &str) -> Result { - todo!(); + if self.current_dev.is_none() { + return Err(anyhow!( + " tags may only occur within sections." + )); + } + + if let Some(leaves) = self.sub_trees.get(name) { + // We could be in a or + if let Some((_name, builder)) = self.map.as_mut() { + builder.push_nodes(self.w, leaves)?; + } else { + let msg = format!( + " tag must be within either a or section", + name + ); + return Err(anyhow!(msg)); + } + Ok(Visit::Continue) + } else { + let msg = format!("Couldn't find sub tree '{}'.", name); + Err(anyhow!(msg)) + } } fn eof(&mut self) -> Result { - todo!(); + // FIXME: build the rest of the device trees + Ok(Visit::Continue) } } +//------------------------------------------ +/* +/// Writes a data space map to disk. Returns the space map root that needs +/// to be written to the superblock. +fn build_data_sm(batcher: WriteBatcher, sm: Box) -> Result> { + +} +*/ + //------------------------------------------ pub struct ThinRestoreOptions<'a> { @@ -66,6 +207,29 @@ pub struct ThinRestoreOptions<'a> { pub report: Arc, } +struct Context { + report: Arc, + engine: Arc, +} + +const MAX_CONCURRENT_IO: u32 = 1024; + +fn new_context(opts: &ThinRestoreOptions) -> Result { + let engine: Arc; + + if opts.async_io { + engine = Arc::new(AsyncIoEngine::new(opts.output, MAX_CONCURRENT_IO, true)?); + } else { + let nr_threads = std::cmp::max(8, num_cpus::get() * 2); + engine = Arc::new(SyncIoEngine::new(opts.output, nr_threads, true)?); + } + + Ok(Context { + report: opts.report.clone(), + engine, + }) +} + //------------------------------------------ pub fn restore(opts: ThinRestoreOptions) -> Result<()> { @@ -74,8 +238,66 @@ pub fn restore(opts: ThinRestoreOptions) -> Result<()> { .write(false) .open(opts.input)?; - let mut pass = Pass1::default(); + let ctx = new_context(&opts)?; + let max_count = u32::MAX; + + let sm = core_sm(ctx.engine.get_nr_blocks(), max_count); + let mut w = WriteBatcher::new(ctx.engine.clone(), sm.clone(), ctx.engine.get_batch_size()); + let mut pass = Pass1::new(&mut w); xml::read(input, &mut pass)?; + let pass = pass.get_result(); + + // Build the device details tree. + let mut details_builder: Builder = Builder::new(Box::new(NoopRC {})); + for (thin_id, (detail, _)) in &pass.devices { + details_builder.push_value(&mut w, *thin_id as u64, *detail)?; + } + let details_root = details_builder.complete(&mut w)?; + + // Build the individual mapping trees that make up the bottom layer. + let mut devs: BTreeMap = BTreeMap::new(); + for (thin_id, (_, nodes)) in &pass.devices { + ctx.report + .info(&format!("building btree for device {}", thin_id)); + let mut builder: Builder = Builder::new(Box::new(NoopRC {})); + builder.push_leaves(&mut w, nodes)?; + let root = builder.complete(&mut w)?; + devs.insert(*thin_id, root); + } + + // Build the top level mapping tree + let mut builder: Builder = Builder::new(Box::new(NoopRC {})); + for (thin_id, root) in devs { + builder.push_value(&mut w, thin_id as u64, root)?; + } + let mapping_root = builder.complete(&mut w)?; + + // Build data space map + + // FIXME: I think we need to decrement the shared leaves + // Build metadata space map + + // Write the superblock + if let Some(xml_sb) = pass.sb { + let sb = superblock::Superblock { + flags: SuperblockFlags { needs_check: false }, + block: SUPERBLOCK_LOCATION, + version: 2, + time: xml_sb.time as u32, + transaction_id: xml_sb.transaction, + metadata_snap: 0, + data_sm_root: vec![0; SPACE_MAP_ROOT_SIZE], + metadata_sm_root: vec![0; SPACE_MAP_ROOT_SIZE], + mapping_root, + details_root, + data_block_size: xml_sb.data_block_size, + nr_metadata_blocks: ctx.engine.get_nr_blocks(), + }; + + write_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION, &sb)?; + } else { + return Err(anyhow!("No superblock found in xml file")); + } Ok(()) } diff --git a/src/thin/superblock.rs b/src/thin/superblock.rs index 73e5b82..3b29b5c 100644 --- a/src/thin/superblock.rs +++ b/src/thin/superblock.rs @@ -1,10 +1,18 @@ -use crate::io_engine::*; use anyhow::{anyhow, Result}; +use byteorder::{LittleEndian, WriteBytesExt}; use nom::{bytes::complete::*, number::complete::*, IResult}; use std::fmt; +use std::io::Cursor; + +use crate::io_engine::*; +use crate::checksum::*; + +//---------------------------------------- + +pub const MAGIC: u64 = 27022010; pub const SUPERBLOCK_LOCATION: u64 = 0; -//const UUID_SIZE: usize = 16; -const SPACE_MAP_ROOT_SIZE: usize = 128; +const UUID_SIZE: usize = 16; +pub const SPACE_MAP_ROOT_SIZE: usize = 128; #[derive(Debug, Clone)] pub struct SuperblockFlags { @@ -35,36 +43,9 @@ pub struct Superblock { pub mapping_root: u64, pub details_root: u64, pub data_block_size: u32, + pub nr_metadata_blocks: u64, } -/* -pub enum CheckSeverity { - Fatal, - NonFatal, -} - -pub trait CheckError { - fn severity(&self) -> CheckSeverity; - fn block(&self) -> u64; - fn sub_errors(&self) -> Vec>; -} - -enum ErrorType { - BadChecksum, - BadBlockType(&'static str), - BadBlock(u64), - BadVersion(u32), - MetadataSnapOutOfBounds(u64), - MappingRootOutOfBounds(u64), - DetailsRootOutOfBounds(u64), -} - -struct SuperblockError { - severity: CheckSeverity, - kind: ErrorType, -} -*/ - fn unpack(data: &[u8]) -> IResult<&[u8], Superblock> { let (i, _csum) = le_u32(data)?; let (i, flags) = le_u32(i)?; @@ -81,7 +62,7 @@ fn unpack(data: &[u8]) -> IResult<&[u8], Superblock> { let (i, details_root) = le_u64(i)?; let (i, data_block_size) = le_u32(i)?; let (i, _metadata_block_size) = le_u32(i)?; - let (i, _metadata_nr_blocks) = le_u64(i)?; + let (i, nr_metadata_blocks) = le_u64(i)?; Ok(( i, @@ -100,6 +81,7 @@ fn unpack(data: &[u8]) -> IResult<&[u8], Superblock> { mapping_root, details_root, data_block_size, + nr_metadata_blocks, }, )) } @@ -115,3 +97,51 @@ pub fn read_superblock(engine: &dyn IoEngine, loc: u64) -> Result { } //------------------------------ + +fn pack_superblock(sb: &Superblock, w: &mut W) -> Result<()> { + // checksum, which we don't know yet + w.write_u32::(0)?; + + // flags + if sb.flags.needs_check { + w.write_u32::(0x1)?; + } else { + w.write_u32::(0)?; + } + + w.write_u64::(sb.block)?; + w.write_all(&vec![0; UUID_SIZE])?; + w.write_u64::(MAGIC)?; + w.write_u32::(sb.version)?; + w.write_u32::(sb.time)?; + w.write_u64::(sb.transaction_id)?; + w.write_u64::(sb.metadata_snap)?; + w.write_all(&vec![0; SPACE_MAP_ROOT_SIZE])?; // data sm root + w.write_all(&vec![0; SPACE_MAP_ROOT_SIZE])?; // metadata sm root + w.write_u64::(sb.mapping_root)?; + w.write_u64::(sb.details_root)?; + w.write_u32::(sb.data_block_size)?; + w.write_u32::(BLOCK_SIZE as u32)?; + w.write_u64::(sb.nr_metadata_blocks)?; + + Ok(()) +} + +pub fn write_superblock(engine: &dyn IoEngine, _loc: u64, sb: &Superblock) -> Result<()> { + let b = Block::zeroed(SUPERBLOCK_LOCATION); + + // pack the superblock + { + let mut cursor = Cursor::new(b.get_data()); + pack_superblock(sb, &mut cursor)?; + } + + // calculate the checksum + write_checksum(b.get_data(), BT::SUPERBLOCK)?; + + // write + engine.write(&b)?; + Ok(()) +} + +//------------------------------ diff --git a/src/thin/xml.rs b/src/thin/xml.rs index 752dd8e..a15df16 100644 --- a/src/thin/xml.rs +++ b/src/thin/xml.rs @@ -1,4 +1,4 @@ -use anyhow::Result; +use anyhow::{anyhow, Result}; use std::{borrow::Cow, fmt::Display, io::prelude::*, io::BufReader, io::Write}; use quick_xml::events::attributes::Attribute; @@ -46,9 +46,11 @@ pub trait MetadataVisitor { fn superblock_b(&mut self, sb: &Superblock) -> Result; fn superblock_e(&mut self) -> Result; + // Defines a shared sub tree. May only contain a 'map' (no 'ref' allowed). fn def_shared_b(&mut self, name: &str) -> Result; fn def_shared_e(&mut self) -> Result; + // A device contains a number of 'map' or 'ref' items. fn device_b(&mut self, d: &Device) -> Result; fn device_e(&mut self) -> Result; @@ -207,8 +209,9 @@ fn bad_attr(_tag: &str, _attr: &[u8]) -> Result { todo!(); } -fn missing_attr(_tag: &str, _attr: &str) -> Result { - todo!(); +fn missing_attr(tag: &str, attr: &str) -> Result { + let msg = format!("missing attribute '{}' for tag '{}", attr, tag); + Err(anyhow!(msg)) } fn check_attr(tag: &str, name: &str, maybe_v: Option) -> Result { @@ -257,6 +260,24 @@ fn parse_superblock(e: &BytesStart) -> Result { }) } +fn parse_def(e: &BytesStart, tag: &str) -> Result { + let mut name: Option = None; + + for a in e.attributes() { + let kv = a.unwrap(); + match kv.key { + b"name" => { + name = Some(string_val(&kv)); + }, + _ => { + return bad_attr(tag, kv.key) + } + } + } + + Ok(name.unwrap()) +} + fn parse_device(e: &BytesStart) -> Result { let mut dev_id: Option = None; let mut mapped_blocks: Option = None; @@ -348,16 +369,19 @@ where Ok(Event::Start(ref e)) => match e.name() { b"superblock" => visitor.superblock_b(&parse_superblock(e)?), b"device" => visitor.device_b(&parse_device(e)?), + b"def" => visitor.def_shared_b(&parse_def(e, "def")?), _ => todo!(), }, Ok(Event::End(ref e)) => match e.name() { b"superblock" => visitor.superblock_e(), b"device" => visitor.device_e(), + b"def" => visitor.def_shared_e(), _ => todo!(), }, Ok(Event::Empty(ref e)) => match e.name() { b"single_mapping" => visitor.map(&parse_single_map(e)?), b"range_mapping" => visitor.map(&parse_range_map(e)?), + b"ref" => visitor.ref_shared(&parse_def(e, "ref")?), _ => todo!(), }, Ok(Event::Text(_)) => Ok(Visit::Continue), diff --git a/src/write_batcher.rs b/src/write_batcher.rs index f6ef877..f046cc8 100644 --- a/src/write_batcher.rs +++ b/src/write_batcher.rs @@ -1,4 +1,5 @@ use anyhow::{anyhow, Result}; +use std::collections::BTreeSet; use std::sync::{Arc, Mutex}; use crate::checksum; @@ -10,10 +11,13 @@ use crate::pdata::space_map::*; #[derive(Clone)] pub struct WriteBatcher { pub engine: Arc, + + // FIXME: this doesn't need to be in a mutex pub sm: Arc>, batch_size: usize, queue: Vec, + allocations: BTreeSet, } impl WriteBatcher { @@ -27,10 +31,11 @@ impl WriteBatcher { sm, batch_size, queue: Vec::with_capacity(batch_size), + allocations: BTreeSet::new(), } } - pub fn alloc(&mut self) -> Result { + pub fn alloc(&mut self) -> Result { let mut sm = self.sm.lock().unwrap(); let b = sm.alloc()?; @@ -38,23 +43,37 @@ impl WriteBatcher { return Err(anyhow!("out of metadata space")); } - Ok(b.unwrap()) + Ok(Block::new(b.unwrap())) + } + + pub fn clear_allocations(&mut self) -> BTreeSet { + let mut tmp = BTreeSet::new(); + std::mem::swap(&mut tmp, &mut self.allocations); + tmp } pub fn write(&mut self, b: Block, kind: checksum::BT) -> Result<()> { checksum::write_checksum(&mut b.get_data(), kind)?; if self.queue.len() == self.batch_size { - self.flush()?; + let mut tmp = Vec::new(); + std::mem::swap(&mut tmp, &mut self.queue); + self.flush_(tmp)?; } self.queue.push(b); Ok(()) } + pub fn flush_(&mut self, queue: Vec) -> Result<()> { + self.engine.write_many(&queue)?; + Ok(()) + } + pub fn flush(&mut self) -> Result<()> { - self.engine.write_many(&self.queue)?; - self.queue.clear(); + let mut tmp = Vec::new(); + std::mem::swap(&mut tmp, &mut self.queue); + self.flush_(tmp)?; Ok(()) } }