diff --git a/src/pdata/btree_builder.rs b/src/pdata/btree_builder.rs index 3fad55e..b5a320e 100644 --- a/src/pdata/btree_builder.rs +++ b/src/pdata/btree_builder.rs @@ -36,10 +36,16 @@ impl RefCounter for NoopRC { } /// Wraps a space map up to become a RefCounter. -struct SMRefCounter { +pub struct SMRefCounter { sm: Arc>, } +impl SMRefCounter { + pub fn new(sm: Arc>) -> SMRefCounter { + SMRefCounter { sm } + } +} + impl RefCounter for SMRefCounter { fn get(&self, v: &u64) -> Result { self.sm.lock().unwrap().get(*v) @@ -135,12 +141,16 @@ pub struct WriteResult { loc: u64, } -/// Write a node to a free metadata block. -fn write_node_(w: &mut WriteBatcher, mut node: Node) -> Result { +/// Write a node to a free metadata block, and mark the block as reserved, +/// without increasing its reference count. +fn write_reserved_node_( + w: &mut WriteBatcher, + mut node: Node, +) -> Result { let keys = node.get_keys(); let first_key = *keys.first().unwrap_or(&0u64); - let b = w.alloc()?; + let b = w.reserve()?; node.set_block(b.loc); let mut cursor = Cursor::new(b.get_data()); @@ -177,7 +187,7 @@ impl NodeIO for LeafIO { values, }; - write_node_(w, node) + write_reserved_node_(w, node) } fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec, Vec)> { @@ -210,7 +220,7 @@ impl NodeIO for InternalIO { values, }; - write_node_(w, node) + write_reserved_node_(w, node) } fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec, Vec)> { @@ -314,7 +324,6 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder { // Add the remaining nodes. for i in 1..nodes.len() { let n = nodes.get(i).unwrap(); - w.sm.lock().unwrap().inc(n.block, 1)?; self.nodes.push(n.clone()); } } else { @@ -323,7 +332,6 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder { // add the nodes for n in nodes { - w.sm.lock().unwrap().inc(n.block, 1)?; self.nodes.push(n.clone()); } } @@ -425,7 +433,6 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder { fn unshift_node(&mut self, w: &mut WriteBatcher) -> Result<()> { let ls = self.nodes.pop().unwrap(); let (keys, values) = self.read_node(w, ls.block)?; - w.sm.lock().unwrap().dec(ls.block)?; let mut vals = VecDeque::new(); @@ -473,7 +480,7 @@ impl Builder { while nodes.len() > 1 { let mut builder = NodeBuilder::new( Box::new(InternalIO {}), - Box::new(SMRefCounter { sm: w.sm.clone() }), + Box::new(SMRefCounter::new(w.sm.clone())), ); for n in nodes { @@ -484,7 +491,14 @@ impl Builder { } assert!(nodes.len() == 1); - Ok(nodes[0].block) + + // The root is expected to be referenced by only one parent, + // hence the ref count is increased before the availability + // of it's parent. + let root = nodes[0].block; + w.sm.lock().unwrap().inc(root, 1)?; + + Ok(root) } } diff --git a/src/thin/restore.rs b/src/thin/restore.rs index 7f1bfa7..e128c91 100644 --- a/src/thin/restore.rs +++ b/src/thin/restore.rs @@ -330,7 +330,6 @@ pub fn restore(opts: ThinRestoreOptions) -> Result<()> { // Build data space map let data_sm_root = build_data_sm(&mut w, pass.data_sm.lock().unwrap().deref())?; - // FIXME: I think we need to decrement the shared leaves // Build metadata space map let metadata_sm_root = build_metadata_sm(&mut w)?; diff --git a/src/write_batcher.rs b/src/write_batcher.rs index da2c042..6784b11 100644 --- a/src/write_batcher.rs +++ b/src/write_batcher.rs @@ -1,5 +1,6 @@ use anyhow::{anyhow, Result}; use std::collections::BTreeSet; +use std::ops::DerefMut; use std::sync::{Arc, Mutex}; use crate::checksum; @@ -17,7 +18,33 @@ pub struct WriteBatcher { batch_size: usize, queue: Vec, + + // The actual blocks allocated or reserved by this WriteBatcher allocations: BTreeSet, + + // The reserved range covers all the blocks allocated or reserved by this + // WriteBatcher, and the blocks already occupied. No blocks in this range + // are expected to be freed, hence a single range is used for the representation. + reserved: std::ops::Range, +} + +pub fn find_free(sm: &mut dyn SpaceMap, reserved: &std::ops::Range) -> Result { + let nr_blocks = sm.get_nr_blocks()?; + let mut b; + if reserved.end >= reserved.start { + b = sm.find_free(reserved.end, nr_blocks)?; + if b.is_none() { + b = sm.find_free(0, reserved.start)?; + } + } else { + b = sm.find_free(reserved.end, reserved.start)?; + } + + if b.is_none() { + return Err(anyhow!("out of metadata space")); + } + + Ok(b.unwrap()) } impl WriteBatcher { @@ -26,37 +53,59 @@ impl WriteBatcher { sm: Arc>, batch_size: usize, ) -> WriteBatcher { + let alloc_begin = sm.lock().unwrap().get_alloc_begin().unwrap_or(0); + WriteBatcher { engine, sm, batch_size, queue: Vec::with_capacity(batch_size), allocations: BTreeSet::new(), + reserved: std::ops::Range { + start: alloc_begin, + end: alloc_begin, + }, } } pub fn alloc(&mut self) -> Result { let mut sm = self.sm.lock().unwrap(); - let b = sm.alloc()?; + let b = find_free(sm.deref_mut(), &self.reserved)?; + self.reserved.end = b + 1; + self.allocations.insert(b); - if b.is_none() { - return Err(anyhow!("out of metadata space")); - } + sm.set(b, 1)?; - self.allocations.insert(b.unwrap()); - Ok(Block::new(b.unwrap())) + Ok(Block::new(b)) } pub fn alloc_zeroed(&mut self) -> Result { let mut sm = self.sm.lock().unwrap(); - let b = sm.alloc()?; + let b = find_free(sm.deref_mut(), &self.reserved)?; + self.reserved.end = b + 1; + self.allocations.insert(b); - if b.is_none() { - return Err(anyhow!("out of metadata space")); - } + sm.set(b, 1)?; - self.allocations.insert(b.unwrap()); - Ok(Block::zeroed(b.unwrap())) + Ok(Block::zeroed(b)) + } + + pub fn reserve(&mut self) -> Result { + let mut sm = self.sm.lock().unwrap(); + let b = find_free(sm.deref_mut(), &self.reserved)?; + self.reserved.end = b + 1; + self.allocations.insert(b); + + Ok(Block::new(b)) + } + + pub fn reserve_zeroed(&mut self) -> Result { + let mut sm = self.sm.lock().unwrap(); + let b = find_free(sm.deref_mut(), &self.reserved)?; + self.reserved.end = b + 1; + self.allocations.insert(b); + + Ok(Block::zeroed(b)) } pub fn clear_allocations(&mut self) -> BTreeSet {