Merge pull request #185 from mingnus/2021-07-29-btree-builder-fixes

Fix reference counting in btree construction
This commit is contained in:
Joe Thornber 2021-08-06 08:35:46 +01:00 committed by GitHub
commit b58e42bb95
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 171 additions and 79 deletions

8
src/cache/repair.rs vendored
View File

@ -6,7 +6,7 @@ use crate::cache::dump::*;
use crate::cache::restore::*;
use crate::cache::superblock::*;
use crate::io_engine::*;
use crate::pdata::space_map::*;
use crate::pdata::space_map_metadata::*;
use crate::report::*;
use crate::write_batcher::*;
@ -32,11 +32,11 @@ fn new_context(opts: &CacheRepairOptions) -> Result<Context> {
let engine_out: Arc<dyn IoEngine + Send + Sync>;
if opts.async_io {
engine_in = Arc::new(AsyncIoEngine::new(opts.input, MAX_CONCURRENT_IO, true)?);
engine_in = Arc::new(AsyncIoEngine::new(opts.input, MAX_CONCURRENT_IO, false)?);
engine_out = Arc::new(AsyncIoEngine::new(opts.output, MAX_CONCURRENT_IO, true)?);
} else {
let nr_threads = std::cmp::max(8, num_cpus::get() * 2);
engine_in = Arc::new(SyncIoEngine::new(opts.input, nr_threads, true)?);
engine_in = Arc::new(SyncIoEngine::new(opts.input, nr_threads, false)?);
engine_out = Arc::new(SyncIoEngine::new(opts.output, nr_threads, true)?);
}
@ -54,7 +54,7 @@ pub fn repair(opts: CacheRepairOptions) -> Result<()> {
let sb = read_superblock(ctx.engine_in.as_ref(), SUPERBLOCK_LOCATION)?;
let sm = core_sm(ctx.engine_out.get_nr_blocks(), u32::MAX);
let sm = core_metadata_sm(ctx.engine_out.get_nr_blocks(), u32::MAX);
let mut w = WriteBatcher::new(
ctx.engine_out.clone(),
sm.clone(),

View File

@ -14,7 +14,6 @@ use crate::cache::xml;
use crate::io_engine::*;
use crate::math::*;
use crate::pdata::array_builder::*;
use crate::pdata::space_map::*;
use crate::pdata::space_map_metadata::*;
use crate::pdata::unpack::Pack;
use crate::report::*;
@ -259,7 +258,7 @@ pub fn restore(opts: CacheRestoreOptions) -> Result<()> {
let ctx = mk_context(&opts)?;
let sm = core_sm(ctx.engine.get_nr_blocks(), u32::MAX);
let sm = core_metadata_sm(ctx.engine.get_nr_blocks(), u32::MAX);
let mut w = WriteBatcher::new(ctx.engine.clone(), sm.clone(), ctx.engine.get_batch_size());
// build cache mappings

View File

@ -141,16 +141,12 @@ pub struct WriteResult {
loc: u64,
}
/// Write a node to a free metadata block, and mark the block as reserved,
/// without increasing its reference count.
fn write_reserved_node_<V: Unpack + Pack>(
w: &mut WriteBatcher,
mut node: Node<V>,
) -> Result<WriteResult> {
/// Write a node to a free metadata block.
fn write_node_<V: Unpack + Pack>(w: &mut WriteBatcher, mut node: Node<V>) -> Result<WriteResult> {
let keys = node.get_keys();
let first_key = *keys.first().unwrap_or(&0u64);
let b = w.reserve()?;
let b = w.alloc()?;
node.set_block(b.loc);
let mut cursor = Cursor::new(b.get_data());
@ -187,7 +183,7 @@ impl<V: Unpack + Pack> NodeIO<V> for LeafIO {
values,
};
write_reserved_node_(w, node)
write_node_(w, node)
}
fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<V>)> {
@ -220,7 +216,7 @@ impl NodeIO<u64> for InternalIO {
values,
};
write_reserved_node_(w, node)
write_node_(w, node)
}
fn read(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<u64>)> {
@ -246,6 +242,7 @@ pub struct NodeBuilder<V: Pack + Unpack> {
max_entries_per_node: usize,
values: VecDeque<(u64, V)>,
nodes: Vec<NodeSummary>,
shared: bool,
}
/// When the builder is including pre-built nodes it has to decide whether
@ -265,22 +262,29 @@ pub struct NodeSummary {
impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
/// Create a new NodeBuilder
pub fn new(nio: Box<dyn NodeIO<V>>, value_rc: Box<dyn RefCounter<V>>) -> Self {
pub fn new(nio: Box<dyn NodeIO<V>>, value_rc: Box<dyn RefCounter<V>>, shared: bool) -> Self {
NodeBuilder {
nio,
value_rc,
max_entries_per_node: calc_max_entries::<V>(),
values: VecDeque::new(),
nodes: Vec::new(),
shared,
}
}
/// Push a single value. This may emit a new node, hence the Result
/// return type. The value's ref count will be incremented.
pub fn push_value(&mut self, w: &mut WriteBatcher, key: u64, val: V) -> Result<()> {
// Unshift the previously pushed node since it is not the root
let half_full = self.max_entries_per_node / 2;
if self.nodes.len() == 1 && (self.nodes.last().unwrap().nr_entries < half_full) {
self.unshift_node(w)?;
}
// Have we got enough values to emit a node? We try and keep
// at least max_entries_per_node entries unflushed so we
// can ensure the final node is balanced properly.
if self.values.len() == self.max_entries_per_node * 2 {
else if self.values.len() == self.max_entries_per_node * 2 {
self.emit_node(w)?;
}
@ -289,6 +293,19 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
Ok(())
}
// To avoid writing an under populated node we have to grab some
// values from the first of the shared nodes.
fn append_values(&mut self, w: &mut WriteBatcher, node: &NodeSummary) -> Result<()> {
let (keys, values) = self.read_node(w, node.block)?;
for i in 0..keys.len() {
self.value_rc.inc(&values[i])?;
self.values.push_back((keys[i], values[i].clone()));
}
Ok(())
}
/// Push a number of prebuilt, shared nodes. The builder may decide to not
/// use a shared node, instead reading the values and packing them
/// directly. This may do IO to emit nodes, so returns a Result.
@ -298,41 +315,67 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
pub fn push_nodes(&mut self, w: &mut WriteBatcher, nodes: &[NodeSummary]) -> Result<()> {
assert!(!nodes.is_empty());
// Assume that the node is a shared root if it is the first comer.
// A rooted leaf could have any number of entries.
let maybe_root = (nodes.len() == 1) && self.nodes.is_empty() && self.values.is_empty();
if maybe_root {
let n = &nodes[0];
w.sm.lock().unwrap().inc(n.block, 1)?;
self.nodes.push(n.clone());
return Ok(());
}
// As a sanity check we make sure that all the shared nodes contain the
// minimum nr of entries.
// A single shared node could be possibly under populated (less than half-full)
// due to btree removal, or even underfull (<33% residency) due to kernel issues.
// Those kinds of nodes will be merged into their siblings.
let half_full = self.max_entries_per_node / 2;
for n in nodes {
if n.nr_entries < half_full {
panic!("under populated node");
if nodes.len() > 1 {
for n in nodes {
if n.nr_entries < half_full {
panic!("under populated node");
}
}
}
// Unshift the previously pushed node since it is not the root
if self.nodes.len() == 1 && (self.nodes.last().unwrap().nr_entries < half_full) {
self.unshift_node(w)?;
}
// Decide if we're going to use the pre-built nodes.
if !self.values.is_empty() && (self.values.len() < half_full) {
// To avoid writing an under populated node we have to grab some
// values from the first of the shared nodes.
let (keys, values) = self.read_node(w, nodes.get(0).unwrap().block)?;
let mut nodes_iter = nodes.iter();
let n = nodes_iter.next();
self.append_values(w, n.unwrap())?;
for i in 0..keys.len() {
self.value_rc.inc(&values[i])?;
self.values.push_back((keys[i], values[i].clone()));
}
// Do not flush if there's no succeeding nodes,
// so that it could produce a more compact metadata.
if nodes.len() > 1 {
// Flush all the values.
self.emit_all(w)?;
// Flush all the values.
self.emit_all(w)?;
// Add the remaining nodes.
for i in 1..nodes.len() {
let n = nodes.get(i).unwrap();
self.nodes.push(n.clone());
// Add the remaining nodes.
for n in nodes_iter {
w.sm.lock().unwrap().inc(n.block, 1)?;
self.nodes.push(n.clone());
}
}
} else {
// Flush all the values.
self.emit_all(w)?;
// add the nodes
for n in nodes {
self.nodes.push(n.clone());
if nodes[0].nr_entries < half_full {
// An under populated nodes[0] implies nodes.len() == 1,
// and that has to be merged into their siblings.
self.append_values(w, &nodes[0])?;
} else {
// Add the nodes.
for n in nodes {
w.sm.lock().unwrap().inc(n.block, 1)?;
self.nodes.push(n.clone());
}
}
}
@ -388,7 +431,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
block: wresult.loc,
key: wresult.first_key,
nr_entries,
shared: false,
shared: self.shared,
});
Ok(())
}
@ -433,6 +476,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
fn unshift_node(&mut self, w: &mut WriteBatcher) -> Result<()> {
let ls = self.nodes.pop().unwrap();
let (keys, values) = self.read_node(w, ls.block)?;
w.sm.lock().unwrap().dec(ls.block)?;
let mut vals = VecDeque::new();
@ -460,7 +504,7 @@ pub struct BTreeBuilder<V: Unpack + Pack> {
impl<V: Unpack + Pack + Clone> BTreeBuilder<V> {
pub fn new(value_rc: Box<dyn RefCounter<V>>) -> BTreeBuilder<V> {
BTreeBuilder {
leaf_builder: NodeBuilder::new(Box::new(LeafIO {}), value_rc),
leaf_builder: NodeBuilder::new(Box::new(LeafIO {}), value_rc, false),
}
}
@ -486,10 +530,7 @@ pub fn build_btree(w: &mut WriteBatcher, leaves: Vec<NodeSummary>) -> Result<u64
// up with a single root.
let mut nodes = leaves;
while nodes.len() > 1 {
let mut builder = NodeBuilder::new(
Box::new(InternalIO {}),
Box::new(SMRefCounter::new(w.sm.clone())),
);
let mut builder = NodeBuilder::new(Box::new(InternalIO {}), Box::new(NoopRC {}), false);
for n in nodes {
builder.push_value(w, n.key, n.block)?;
@ -500,13 +541,36 @@ pub fn build_btree(w: &mut WriteBatcher, leaves: Vec<NodeSummary>) -> Result<u64
assert!(nodes.len() == 1);
// The root is expected to be referenced by only one parent,
// hence the ref count is increased before the availability
// of it's parent.
let root = nodes[0].block;
w.sm.lock().unwrap().inc(root, 1)?;
Ok(root)
}
//------------------------------------------
// The pre-built nodes and the contained values were initialized with
// a ref count 1, which is analogous to a "tempoaray snapshot" of
// potentially shared leaves. We have to drop those temporary references
// to pre-built nodes at the end of device building, and also decrease
// ref counts of the contained values if a pre-built leaf is no longer
// referenced.
pub fn release_leaves<V: Pack + Unpack>(
w: &mut WriteBatcher,
leaves: &[NodeSummary],
value_rc: &mut dyn RefCounter<V>,
) -> Result<()> {
let nio = LeafIO {};
for n in leaves {
let deleted = w.sm.lock().unwrap().dec(n.block)?;
if deleted {
let (_, values) = nio.read(w, n.block)?;
for v in values {
value_rc.dec(&v)?;
}
}
}
Ok(())
}
//------------------------------------------

View File

@ -1,6 +1,8 @@
use anyhow::Result;
use fixedbitset::FixedBitSet;
use num_traits::Bounded;
use std::boxed::Box;
use std::convert::{TryFrom, TryInto};
use std::sync::{Arc, Mutex};
//------------------------------------------
@ -61,7 +63,16 @@ where
impl<V> SpaceMap for CoreSpaceMap<V>
where
V: Copy + Default + Eq + std::ops::AddAssign + From<u8> + Into<u32>,
V: Copy
+ Default
+ Eq
+ std::ops::AddAssign
+ From<u8>
+ Into<u32>
+ Bounded
+ TryFrom<u32>
+ std::cmp::PartialOrd,
<V as TryFrom<u32>>::Error: std::fmt::Debug,
{
fn get_nr_blocks(&self) -> Result<u64> {
Ok(self.counts.len() as u64)
@ -77,8 +88,8 @@ where
fn set(&mut self, b: u64, v: u32) -> Result<u32> {
let old = self.counts[b as usize];
assert!(v < 0xff); // FIXME: we can't assume this
self.counts[b as usize] = V::from(v as u8);
assert!(v <= V::max_value().into());
self.counts[b as usize] = v.try_into().unwrap(); // FIXME: do not panic
if old == V::from(0u8) && v != 0 {
self.nr_allocated += 1;
@ -91,12 +102,14 @@ where
fn inc(&mut self, begin: u64, len: u64) -> Result<()> {
for b in begin..(begin + len) {
if self.counts[b as usize] == V::from(0u8) {
let c = &mut self.counts[b as usize];
assert!(*c < V::max_value());
if *c == V::from(0u8) {
// FIXME: can we get a ref to save dereferencing counts twice?
self.nr_allocated += 1;
self.counts[b as usize] = V::from(1u8);
*c = V::from(1u8);
} else {
self.counts[b as usize] += V::from(1u8);
*c += V::from(1u8);
}
}
Ok(())

View File

@ -2,9 +2,11 @@ use anyhow::{anyhow, Result};
use byteorder::{LittleEndian, WriteBytesExt};
use nom::{number::complete::*, IResult};
use std::io::Cursor;
use std::sync::{Arc, Mutex};
use crate::checksum;
use crate::io_engine::*;
use crate::pdata::space_map::*;
use crate::pdata::space_map_common::*;
use crate::pdata::unpack::*;
use crate::write_batcher::*;
@ -12,6 +14,7 @@ use crate::write_batcher::*;
//------------------------------------------
const MAX_METADATA_BITMAPS: usize = 255;
const MAX_METADATA_BLOCKS: usize = MAX_METADATA_BITMAPS * ENTRIES_PER_BITMAP;
//------------------------------------------
@ -102,6 +105,15 @@ fn adjust_counts(
})
}
//------------------------------------------
pub fn core_metadata_sm(nr_blocks: u64, max_count: u32) -> Arc<Mutex<dyn SpaceMap + Send + Sync>> {
core_sm(
std::cmp::min(nr_blocks, MAX_METADATA_BLOCKS as u64),
max_count,
)
}
pub fn write_metadata_sm(w: &mut WriteBatcher) -> Result<SMRoot> {
let r1 = w.get_reserved_range();

View File

@ -3,7 +3,7 @@ use std::path::Path;
use std::sync::Arc;
use crate::io_engine::*;
use crate::pdata::space_map::*;
use crate::pdata::space_map_metadata::*;
use crate::report::*;
use crate::thin::dump::*;
use crate::thin::metadata::*;
@ -33,11 +33,11 @@ fn new_context(opts: &ThinRepairOptions) -> Result<Context> {
let engine_out: Arc<dyn IoEngine + Send + Sync>;
if opts.async_io {
engine_in = Arc::new(AsyncIoEngine::new(opts.input, MAX_CONCURRENT_IO, true)?);
engine_in = Arc::new(AsyncIoEngine::new(opts.input, MAX_CONCURRENT_IO, false)?);
engine_out = Arc::new(AsyncIoEngine::new(opts.output, MAX_CONCURRENT_IO, true)?);
} else {
let nr_threads = std::cmp::max(8, num_cpus::get() * 2);
engine_in = Arc::new(SyncIoEngine::new(opts.input, nr_threads, true)?);
engine_in = Arc::new(SyncIoEngine::new(opts.input, nr_threads, false)?);
engine_out = Arc::new(SyncIoEngine::new(opts.output, nr_threads, true)?);
}
@ -57,7 +57,7 @@ pub fn repair(opts: ThinRepairOptions) -> Result<()> {
let md = build_metadata(ctx.engine_in.clone(), &sb)?;
let md = optimise_metadata(md)?;
let sm = core_sm(ctx.engine_out.get_nr_blocks(), u32::MAX);
let sm = core_metadata_sm(ctx.engine_out.get_nr_blocks(), u32::MAX);
let mut w = WriteBatcher::new(
ctx.engine_out.clone(),
sm.clone(),

View File

@ -101,7 +101,8 @@ impl<'a> Restorer<'a> {
let value_rc = Box::new(MappingRC {
sm: self.data_sm.as_ref().unwrap().clone(),
});
let leaf_builder = NodeBuilder::new(Box::new(LeafIO {}), value_rc);
let shared = matches!(section, MappedSection::Def(_));
let leaf_builder = NodeBuilder::new(Box::new(LeafIO {}), value_rc, shared);
self.current_map = Some((section, leaf_builder));
Ok(Visit::Continue)
@ -134,9 +135,26 @@ impl<'a> Restorer<'a> {
Ok((details_root, mapping_root))
}
// Release the temporary references to the leaves of pre-built subtrees.
// The contained child values will also be decreased if the leaf is
// no longer referenced.
fn release_subtrees(&mut self) -> Result<()> {
let mut value_rc = MappingRC {
sm: self.data_sm.as_ref().unwrap().clone(),
};
for (_, leaves) in self.sub_trees.iter() {
release_leaves(self.w, &leaves, &mut value_rc)?;
}
Ok(())
}
fn finalize(&mut self) -> Result<()> {
let (details_root, mapping_root) = self.build_device_details()?;
self.release_subtrees()?;
// Build data space map
let data_sm = self.data_sm.as_ref().unwrap();
let data_sm_root = build_data_sm(self.w, data_sm.lock().unwrap().deref())?;
@ -333,7 +351,7 @@ pub fn restore(opts: ThinRestoreOptions) -> Result<()> {
let ctx = new_context(&opts)?;
let max_count = u32::MAX;
let sm = core_sm(ctx.engine.get_nr_blocks(), max_count);
let sm = core_metadata_sm(ctx.engine.get_nr_blocks(), max_count);
let mut w = WriteBatcher::new(ctx.engine.clone(), sm.clone(), ctx.engine.get_batch_size());
let mut restorer = Restorer::new(&mut w, ctx.report);
xml::read(input, &mut restorer)?;

View File

@ -18,9 +18,11 @@ pub struct WriteBatcher {
batch_size: usize,
queue: Vec<Block>,
// The reserved range covers all the blocks allocated or reserved by this
// WriteBatcher, and the blocks already occupied. No blocks in this range
// are expected to be freed, hence a single range is used for the representation.
// The reserved range keeps track of all the blocks allocated.
// An allocated block won't be reused even though it was freed.
// In other words, the WriteBatcher performs allocation in
// transactional fashion, that simplifies block allocationas
// as well as tracking.
reserved: std::ops::Range<u64>,
}
@ -83,22 +85,6 @@ impl WriteBatcher {
Ok(Block::zeroed(b))
}
pub fn reserve(&mut self) -> Result<Block> {
let mut sm = self.sm.lock().unwrap();
let b = find_free(sm.deref_mut(), &self.reserved)?;
self.reserved.end = b + 1;
Ok(Block::new(b))
}
pub fn reserve_zeroed(&mut self) -> Result<Block> {
let mut sm = self.sm.lock().unwrap();
let b = find_free(sm.deref_mut(), &self.reserved)?;
self.reserved.end = b + 1;
Ok(Block::zeroed(b))
}
pub fn get_reserved_range(&self) -> std::ops::Range<u64> {
std::ops::Range {
start: self.reserved.start,