work in progress

This commit is contained in:
Joe Thornber 2020-11-04 12:38:35 +00:00
parent 37ea0280df
commit 1ae62adec6
15 changed files with 1722 additions and 791 deletions

View File

@ -50,6 +50,7 @@ unsafe impl Send for Block {}
pub trait IoEngine {
fn get_nr_blocks(&self) -> u64;
fn get_batch_size(&self) -> usize;
fn read(&self, b: u64) -> Result<Block>;
// The whole io could fail, or individual blocks
@ -167,6 +168,10 @@ impl IoEngine for SyncIoEngine {
self.nr_blocks
}
fn get_batch_size(&self) -> usize {
1
}
fn read(&self, loc: u64) -> Result<Block> {
SyncIoEngine::read_(&mut self.get(), loc)
}
@ -346,6 +351,10 @@ impl IoEngine for AsyncIoEngine {
inner.nr_blocks
}
fn get_batch_size(&self) -> usize {
self.inner.lock().unwrap().queue_len as usize
}
fn read(&self, b: u64) -> Result<Block> {
let mut inner = self.inner.lock().unwrap();
let fd = types::Target::Fd(inner.input.as_raw_fd());

View File

@ -25,3 +25,4 @@ pub mod report;
pub mod shrink;
pub mod thin;
pub mod version;
pub mod write_batcher;

View File

@ -2,24 +2,28 @@ use anyhow::anyhow;
use byteorder::{LittleEndian, ReadBytesExt, WriteBytesExt};
use data_encoding::BASE64;
use nom::{number::complete::*, IResult};
use std::collections::BTreeMap;
use std::fmt;
use std::sync::{Arc, Mutex};
use thiserror::Error;
use threadpool::ThreadPool;
use crate::checksum;
use crate::io_engine::*;
use crate::pack::vm;
use crate::pdata::space_map::*;
use crate::pdata::unpack::*;
//------------------------------------------
#[derive(Clone, Debug, PartialEq)]
pub struct KeyRange {
start: Option<u64>,
end: Option<u64>, // This is the one-past-the-end value
pub start: Option<u64>,
pub end: Option<u64>, // This is the one-past-the-end value
}
impl KeyRange {
pub fn new() -> KeyRange {
KeyRange {
start: None,
end: None,
}
}
}
impl fmt::Display for KeyRange {
@ -190,7 +194,7 @@ fn split_one(path: &Vec<u64>, kr: &KeyRange, k: u64) -> Result<(KeyRange, KeyRan
}
}
fn split_key_ranges(path: &Vec<u64>, kr: &KeyRange, keys: &[u64]) -> Result<Vec<KeyRange>> {
pub fn split_key_ranges(path: &Vec<u64>, kr: &KeyRange, keys: &[u64]) -> Result<Vec<KeyRange>> {
let mut krs = Vec::with_capacity(keys.len());
if keys.len() == 0 {
@ -352,7 +356,7 @@ pub fn node_err(path: &Vec<u64>, msg: &str) -> BTreeError {
)
}
fn node_err_s(path: &Vec<u64>, msg: String) -> BTreeError {
pub fn node_err_s(path: &Vec<u64>, msg: String) -> BTreeError {
BTreeError::Path(path.clone(), Box::new(BTreeError::NodeError(msg)))
}
@ -579,565 +583,3 @@ pub fn unpack_node<V: Unpack>(
}
//------------------------------------------
pub trait NodeVisitor<V: Unpack> {
// &self is deliberately non mut to allow the walker to use multiple threads.
fn visit(
&self,
path: &Vec<u64>,
keys: &KeyRange,
header: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()>;
// Nodes may be shared and thus visited multiple times. The walker avoids
// doing repeated IO, but it does call this method to keep the visitor up to
// date.
fn visit_again(&self, path: &Vec<u64>, b: u64) -> Result<()>;
fn end_walk(&self) -> Result<()>;
}
#[derive(Clone)]
pub struct BTreeWalker {
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
fails: Arc<Mutex<BTreeMap<u64, BTreeError>>>,
ignore_non_fatal: bool,
}
impl BTreeWalker {
pub fn new(engine: Arc<dyn IoEngine + Send + Sync>, ignore_non_fatal: bool) -> BTreeWalker {
let nr_blocks = engine.get_nr_blocks() as usize;
let r: BTreeWalker = BTreeWalker {
engine,
sm: Arc::new(Mutex::new(RestrictedSpaceMap::new(nr_blocks as u64))),
fails: Arc::new(Mutex::new(BTreeMap::new())),
ignore_non_fatal,
};
r
}
pub fn new_with_sm(
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
ignore_non_fatal: bool,
) -> Result<BTreeWalker> {
{
let sm = sm.lock().unwrap();
assert_eq!(sm.get_nr_blocks().unwrap(), engine.get_nr_blocks());
}
Ok(BTreeWalker {
engine,
sm,
fails: Arc::new(Mutex::new(BTreeMap::new())),
ignore_non_fatal,
})
}
fn failed(&self, b: u64) -> Option<BTreeError> {
let fails = self.fails.lock().unwrap();
match fails.get(&b) {
None => None,
Some(e) => Some(e.clone()),
}
}
fn set_fail(&self, b: u64, err: BTreeError) {
// FIXME: should we monitor the size of fails, and abort if too many errors?
let mut fails = self.fails.lock().unwrap();
fails.insert(b, err);
}
// Atomically increments the ref count, and returns the _old_ count.
fn sm_inc(&self, b: u64) -> u32 {
let mut sm = self.sm.lock().unwrap();
let count = sm.get(b).unwrap();
sm.inc(b, 1).unwrap();
count
}
fn build_aggregate(&self, b: u64, errs: Vec<BTreeError>) -> Result<()> {
match errs.len() {
0 => Ok(()),
1 => {
let e = errs[0].clone();
self.set_fail(b, e.clone());
Err(e)
}
_ => {
let e = aggregate_error(errs);
self.set_fail(b, e.clone());
Err(e)
}
}
}
fn walk_nodes<NV, V>(
&self,
path: &mut Vec<u64>,
visitor: &NV,
krs: &[KeyRange],
bs: &[u64],
) -> Vec<BTreeError>
where
NV: NodeVisitor<V>,
V: Unpack,
{
assert_eq!(krs.len(), bs.len());
let mut errs: Vec<BTreeError> = Vec::new();
let mut blocks = Vec::with_capacity(bs.len());
let mut filtered_krs = Vec::with_capacity(krs.len());
for i in 0..bs.len() {
if self.sm_inc(bs[i]) == 0 {
// Node not yet seen
blocks.push(bs[i]);
filtered_krs.push(krs[i].clone());
} else {
// This node has already been checked ...
match self.failed(bs[i]) {
None => {
// ... it was clean.
if let Err(e) = visitor.visit_again(path, bs[i]) {
// ... but the visitor isn't happy
errs.push(e.clone());
}
}
Some(e) => {
// ... there was an error
errs.push(e.clone());
}
}
}
}
match self.engine.read_many(&blocks[0..]) {
Err(_) => {
// IO completely failed, error every block
for (i, b) in blocks.iter().enumerate() {
let e = io_err(path).keys_context(&filtered_krs[i]);
errs.push(e.clone());
self.set_fail(*b, e);
}
}
Ok(rblocks) => {
let mut i = 0;
for rb in rblocks {
match rb {
Err(_) => {
let e = io_err(path).keys_context(&filtered_krs[i]);
errs.push(e.clone());
self.set_fail(blocks[i], e);
}
Ok(b) => match self.walk_node(path, visitor, &filtered_krs[i], &b, false) {
Err(e) => {
errs.push(e);
}
Ok(()) => {}
},
}
i += 1;
}
}
}
errs
}
fn walk_node_<NV, V>(
&self,
path: &mut Vec<u64>,
visitor: &NV,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V>,
V: Unpack,
{
use Node::*;
let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE {
return Err(node_err_s(
path,
format!("checksum failed for node {}, {:?}", b.loc, bt),
)
.keys_context(kr));
}
let node = unpack_node::<V>(path, &b.get_data(), self.ignore_non_fatal, is_root)?;
match node {
Internal { keys, values, .. } => {
let krs = split_key_ranges(path, &kr, &keys)?;
let errs = self.walk_nodes(path, visitor, &krs, &values);
return self.build_aggregate(b.loc, errs);
}
Leaf {
header,
keys,
values,
} => {
if let Err(e) = visitor.visit(path, &kr, &header, &keys, &values) {
let e = BTreeError::Path(path.clone(), Box::new(e.clone()));
self.set_fail(b.loc, e.clone());
return Err(e);
}
}
}
Ok(())
}
fn walk_node<NV, V>(
&self,
path: &mut Vec<u64>,
visitor: &NV,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V>,
V: Unpack,
{
path.push(b.loc);
let r = self.walk_node_(path, visitor, kr, b, is_root);
path.pop();
visitor.end_walk()?;
r
}
pub fn walk<NV, V>(&self, path: &mut Vec<u64>, visitor: &NV, root: u64) -> Result<()>
where
NV: NodeVisitor<V>,
V: Unpack,
{
if self.sm_inc(root) > 0 {
if let Some(e) = self.failed(root) {
Err(e.clone())
} else {
visitor.visit_again(path, root)
}
} else {
let root = self.engine.read(root).map_err(|_| io_err(path))?;
let kr = KeyRange {
start: None,
end: None,
};
self.walk_node(path, visitor, &kr, &root, true)
}
}
}
//--------------------------------
fn walk_node_threaded_<NV, V>(
w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool,
visitor: Arc<NV>,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
use Node::*;
let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE {
return Err(node_err_s(
path,
format!("checksum failed for node {}, {:?}", b.loc, bt),
)
.keys_context(kr));
}
let node = unpack_node::<V>(path, &b.get_data(), w.ignore_non_fatal, is_root)?;
match node {
Internal { keys, values, .. } => {
let krs = split_key_ranges(path, &kr, &keys)?;
let errs = walk_nodes_threaded(w.clone(), path, pool, visitor, &krs, &values);
return w.build_aggregate(b.loc, errs);
}
Leaf {
header,
keys,
values,
} => {
visitor.visit(path, kr, &header, &keys, &values)?;
}
}
Ok(())
}
fn walk_node_threaded<NV, V>(
w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool,
visitor: Arc<NV>,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
path.push(b.loc);
let r = walk_node_threaded_(w, path, pool, visitor.clone(), kr, b, is_root);
path.pop();
visitor.end_walk()?;
r
}
fn walk_nodes_threaded<NV, V>(
w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool,
visitor: Arc<NV>,
krs: &[KeyRange],
bs: &[u64],
) -> Vec<BTreeError>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
assert_eq!(krs.len(), bs.len());
let mut errs: Vec<BTreeError> = Vec::new();
let mut blocks = Vec::with_capacity(bs.len());
let mut filtered_krs = Vec::with_capacity(krs.len());
for i in 0..bs.len() {
if w.sm_inc(bs[i]) == 0 {
// Node not yet seen
blocks.push(bs[i]);
filtered_krs.push(krs[i].clone());
} else {
// This node has already been checked ...
match w.failed(bs[i]) {
None => {
// ... it was clean.
if let Err(e) = visitor.visit_again(path, bs[i]) {
// ... but the visitor isn't happy
errs.push(e.clone());
}
}
Some(e) => {
// ... there was an error
errs.push(e.clone());
}
}
}
}
match w.engine.read_many(&blocks[0..]) {
Err(_) => {
// IO completely failed error every block
for (i, b) in blocks.iter().enumerate() {
let e = io_err(path).keys_context(&filtered_krs[i]);
errs.push(e.clone());
w.set_fail(*b, e);
}
}
Ok(rblocks) => {
let mut i = 0;
let errs = Arc::new(Mutex::new(Vec::new()));
for rb in rblocks {
match rb {
Err(_) => {
let e = io_err(path).keys_context(&filtered_krs[i]);
let mut errs = errs.lock().unwrap();
errs.push(e.clone());
w.set_fail(blocks[i], e);
}
Ok(b) => {
let w = w.clone();
let visitor = visitor.clone();
let kr = filtered_krs[i].clone();
let errs = errs.clone();
let mut path = path.clone();
pool.execute(move || {
match w.walk_node(&mut path, visitor.as_ref(), &kr, &b, false) {
Err(e) => {
let mut errs = errs.lock().unwrap();
errs.push(e);
}
Ok(()) => {}
}
});
}
}
i += 1;
}
pool.join();
}
}
errs
}
pub fn walk_threaded<NV, V>(
path: &mut Vec<u64>,
w: Arc<BTreeWalker>,
pool: &ThreadPool,
visitor: Arc<NV>,
root: u64,
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
if w.sm_inc(root) > 0 {
if let Some(e) = w.failed(root) {
Err(e.clone())
} else {
visitor.visit_again(path, root)
}
} else {
let root = w.engine.read(root).map_err(|_| io_err(path))?;
let kr = KeyRange {
start: None,
end: None,
};
walk_node_threaded(w, path, pool, visitor, &kr, &root, true)
}
}
//------------------------------------------
struct ValueCollector<V> {
values: Mutex<BTreeMap<u64, V>>,
}
impl<V> ValueCollector<V> {
fn new() -> ValueCollector<V> {
ValueCollector {
values: Mutex::new(BTreeMap::new()),
}
}
}
// FIXME: should we be using Copy rather than clone? (Yes)
impl<V: Unpack + Copy> NodeVisitor<V> for ValueCollector<V> {
fn visit(
&self,
_path: &Vec<u64>,
_kr: &KeyRange,
_h: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()> {
let mut vals = self.values.lock().unwrap();
for n in 0..keys.len() {
vals.insert(keys[n], values[n].clone());
}
Ok(())
}
fn visit_again(&self, _path: &Vec<u64>, _b: u64) -> Result<()> {
Ok(())
}
fn end_walk(&self) -> Result<()> {
Ok(())
}
}
pub fn btree_to_map<V: Unpack + Copy>(
path: &mut Vec<u64>,
engine: Arc<dyn IoEngine + Send + Sync>,
ignore_non_fatal: bool,
root: u64,
) -> Result<BTreeMap<u64, V>> {
let walker = BTreeWalker::new(engine, ignore_non_fatal);
let visitor = ValueCollector::<V>::new();
walker.walk(path, &visitor, root)?;
Ok(visitor.values.into_inner().unwrap())
}
pub fn btree_to_map_with_sm<V: Unpack + Copy>(
path: &mut Vec<u64>,
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
ignore_non_fatal: bool,
root: u64,
) -> Result<BTreeMap<u64, V>> {
let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?;
let visitor = ValueCollector::<V>::new();
walker.walk(path, &visitor, root)?;
Ok(visitor.values.into_inner().unwrap())
}
//------------------------------------------
struct ValuePathCollector<V> {
values: Mutex<BTreeMap<u64, (Vec<u64>, V)>>,
}
impl<V> ValuePathCollector<V> {
fn new() -> ValuePathCollector<V> {
ValuePathCollector {
values: Mutex::new(BTreeMap::new()),
}
}
}
impl<V: Unpack + Clone> NodeVisitor<V> for ValuePathCollector<V> {
fn visit(
&self,
path: &Vec<u64>,
_kr: &KeyRange,
_h: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()> {
let mut vals = self.values.lock().unwrap();
for n in 0..keys.len() {
vals.insert(keys[n], (path.clone(), values[n].clone()));
}
Ok(())
}
fn visit_again(&self, _path: &Vec<u64>, _b: u64) -> Result<()> {
Ok(())
}
fn end_walk(&self) -> Result<()> {
Ok(())
}
}
pub fn btree_to_map_with_path<V: Unpack + Copy>(
path: &mut Vec<u64>,
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
ignore_non_fatal: bool,
root: u64,
) -> Result<BTreeMap<u64, (Vec<u64>, V)>> {
let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?;
let visitor = ValuePathCollector::<V>::new();
walker.walk(path, &visitor, root)?;
Ok(visitor.values.into_inner().unwrap())
}
//------------------------------------------

View File

@ -1,4 +1,4 @@
use anyhow::{anyhow, Result};
use anyhow::Result;
use byteorder::{LittleEndian, WriteBytesExt};
use std::collections::VecDeque;
use std::io::Cursor;
@ -9,6 +9,7 @@ use crate::io_engine::*;
use crate::pdata::btree::*;
use crate::pdata::space_map::*;
use crate::pdata::unpack::*;
use crate::write_batcher::*;
//------------------------------------------
@ -67,24 +68,16 @@ fn calc_max_entries<V: Unpack>() -> usize {
//------------------------------------------
struct Entries<V> {
max_entries: usize,
pub max_entries: usize,
entries: VecDeque<(u64, V)>,
}
enum Action<V> {
Noop,
WriteSingle {
keys: Vec<u64>,
values: Vec<V>,
},
WritePair {
keys1: Vec<u64>,
values1: Vec<V>,
keys2: Vec<u64>,
values2: Vec<V>,
},
EmitNode(Vec<u64>, Vec<V>), // keys, values
}
use Action::*;
impl<V> Entries<V> {
pub fn new(max_entries: usize) -> Entries<V> {
Entries {
@ -93,20 +86,19 @@ impl<V> Entries<V> {
}
}
pub fn add_entry(&mut self, k: u64, v: V) -> Action<V> {
let result = if self.full() {
pub fn add_entry(&mut self, k: u64, v: V) -> Vec<Action<V>> {
let mut result = Vec::new();
if self.full() {
let (keys, values) = self.pop(self.max_entries);
Action::WriteSingle { keys, values }
} else {
Action::Noop
};
result.push(EmitNode(keys, values));
}
self.entries.push_back((k, v));
result
}
pub fn complete(&mut self) -> Action<V> {
fn complete_(&mut self, result: &mut Vec<Action<V>>) {
let n = self.entries.len();
if n >= self.max_entries {
@ -115,20 +107,20 @@ impl<V> Entries<V> {
let (keys1, values1) = self.pop(n1);
let (keys2, values2) = self.pop(n2);
Action::WritePair {
keys1,
values1,
keys2,
values2,
}
result.push(EmitNode(keys1, values1));
result.push(EmitNode(keys2, values2));
} else if n > 0 {
let (keys, values) = self.pop(n);
Action::WriteSingle { keys, values }
} else {
Action::Noop
result.push(EmitNode(keys, values));
}
}
pub fn complete(&mut self) -> Vec<Action<V>> {
let mut result = Vec::new();
self.complete_(&mut result);
result
}
fn full(&self) -> bool {
self.entries.len() >= 2 * self.max_entries
}
@ -149,55 +141,11 @@ impl<V> Entries<V> {
//------------------------------------------
struct WriteBatcher {
engine: Arc<Box<dyn IoEngine>>,
sm: Arc<Mutex<dyn SpaceMap>>,
batch_size: usize,
queue: Vec<Block>,
}
impl WriteBatcher {
fn new(
engine: Arc<Box<dyn IoEngine>>,
sm: Arc<Mutex<dyn SpaceMap>>,
batch_size: usize,
) -> WriteBatcher {
WriteBatcher {
engine,
sm,
batch_size,
queue: Vec::with_capacity(batch_size),
}
}
fn alloc(&mut self) -> Result<u64> {
let mut sm = self.sm.lock().unwrap();
let b = sm.alloc()?;
if b.is_none() {
return Err(anyhow!("out of metadata space"));
}
Ok(b.unwrap())
}
fn write(&mut self, b: Block) -> Result<()> {
checksum::write_checksum(&mut b.get_data(), checksum::BT::NODE)?;
if self.queue.len() == self.batch_size {
self.flush()?;
}
self.queue.push(b);
Ok(())
}
fn flush(&mut self) -> Result<()> {
self.engine.write_many(&self.queue)?;
self.queue.clear();
Ok(())
}
pub struct NodeSummary {
block: u64,
nr_entries: usize,
key_low: u64,
key_high: u64, // inclusive
}
//------------------------------------------
@ -208,11 +156,11 @@ fn write_node_<V: Unpack + Pack>(w: &mut WriteBatcher, mut node: Node<V>) -> Res
let loc = w.alloc()?;
node.set_block(loc);
let b = Block::new(loc);
let mut cursor = Cursor::new(b.get_data());
pack_node(&node, &mut cursor)?;
w.write(b)?;
w.write(b, checksum::BT::NODE)?;
Ok((first_key, loc))
}
@ -268,7 +216,10 @@ pub struct Builder<V: Unpack + Pack> {
}
impl<V: Unpack + Pack> Builder<V> {
pub fn new(engine: Arc<Box<dyn IoEngine>>, sm: Arc<Mutex<dyn SpaceMap>>) -> Builder<V> {
pub fn new(
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap>>,
) -> Builder<V> {
let max_entries = calc_max_entries::<V>();
let max_internal_entries = calc_max_entries::<u64>();
@ -282,13 +233,41 @@ impl<V: Unpack + Pack> Builder<V> {
}
pub fn add_entry(&mut self, k: u64, v: V) -> Result<()> {
let action = self.entries.add_entry(k, v);
self.perform_action(action)
let actions = self.entries.add_entry(k, v);
for a in actions {
self.perform_action(a)?;
}
Ok(())
}
pub fn add_leaf_node(&mut self, leaf: &NodeSummary) -> Result<()> {
match leaf.nr_entries {
n if n == 0 => {
// Do nothing
},
n if n < (self.entries.max_entries / 2) => {
// FIXME: what if we've already queued a handful of entries for a node?
// Add the entries individually
todo!();
},
n => {
let actions = self.entries.complete();
for a in actions {
self.perform_action(a)?;
}
self.add_internal_entry(0, leaf.key_low, leaf.block)?;
}
}
Ok(())
}
pub fn complete(mut self) -> Result<u64> {
let action = self.entries.complete();
self.perform_action(action)?;
let actions = self.entries.complete();
for a in actions {
self.perform_action(a)?;
}
self.w.flush()?;
Ok(self.root)
}
@ -297,33 +276,26 @@ impl<V: Unpack + Pack> Builder<V> {
fn add_internal_entry(&mut self, level: usize, k: u64, v: u64) -> Result<()> {
if self.internal_entries.len() == level {
self.internal_entries.push(Entries::new(self.max_internal_entries));
self.internal_entries
.push(Entries::new(self.max_internal_entries));
}
let action = self.internal_entries[level].add_entry(k, v);
self.perform_internal_action(level, action)
let actions = self.internal_entries[level].add_entry(k, v);
for a in actions {
self.perform_internal_action(level, a)?;
}
Ok(())
}
fn perform_internal_action(&mut self, level: usize, action: Action<u64>) -> Result<()> {
match action {
Action::Noop => {}
Action::WriteSingle { keys, values } => {
EmitNode(keys, values) => {
let (k, loc) = write_internal(&mut self.w, keys, values)?;
self.add_internal_entry(level + 1, k, loc)?;
self.root = loc;
}
Action::WritePair {
keys1,
values1,
keys2,
values2,
} => {
let (k, loc) = write_leaf(&mut self.w, keys1, values1)?;
self.add_internal_entry(level + 1, k, loc)?;
let (k, loc) = write_leaf(&mut self.w, keys2, values2)?;
self.add_internal_entry(level + 1, k, loc)?;
}
},
}
Ok(())
@ -331,23 +303,10 @@ impl<V: Unpack + Pack> Builder<V> {
fn perform_action<V2: Unpack + Pack>(&mut self, action: Action<V2>) -> Result<()> {
match action {
Action::Noop => {}
Action::WriteSingle { keys, values } => {
EmitNode(keys, values) => {
let (k, loc) = write_leaf(&mut self.w, keys, values)?;
self.add_internal_entry(0, k, loc)?;
}
Action::WritePair {
keys1,
values1,
keys2,
values2,
} => {
let (k, loc) = write_leaf(&mut self.w, keys1, values1)?;
self.add_internal_entry(0, k, loc)?;
let (k, loc) = write_leaf(&mut self.w, keys2, values2)?;
self.add_internal_entry(0, k, loc)?;
}
},
}
Ok(())
@ -355,10 +314,3 @@ impl<V: Unpack + Pack> Builder<V> {
}
//------------------------------------------
#[test]
fn fail() {
assert!(false);
}
//------------------------------------------

View File

@ -0,0 +1,245 @@
use fixedbitset::FixedBitSet;
use std::sync::{Arc, Mutex};
use crate::checksum;
use crate::io_engine::*;
use crate::pdata::btree::*;
use crate::pdata::space_map::*;
use crate::pdata::unpack::*;
//------------------------------------------
pub trait LeafVisitor<V: Unpack> {
fn visit(&mut self, kr: &KeyRange, b: u64) -> Result<()>;
// Nodes may be shared and thus visited multiple times. The walker avoids
// doing repeated IO, but it does call this method to keep the visitor up to
// date. b may be an internal node obviously.
fn visit_again(&mut self, b: u64) -> Result<()>;
fn end_walk(&mut self) -> Result<()>;
}
// This is useful if you just want to get the space map counts from the walk.
pub struct NoopLeafVisitor {}
impl<V: Unpack> LeafVisitor<V> for NoopLeafVisitor {
fn visit(&mut self, kr: &KeyRange, b: u64) -> Result<()> {
Ok(())
}
fn visit_again(&mut self, b: u64) -> Result<()> {
Ok(())
}
fn end_walk(&mut self) -> Result<()> {
Ok(())
}
}
pub struct LeafWalker<'a> {
engine: Arc<dyn IoEngine + Send + Sync>,
sm: &'a mut dyn SpaceMap,
leaves: FixedBitSet,
ignore_non_fatal: bool,
}
impl<'a> LeafWalker<'a> {
pub fn new(
engine: Arc<dyn IoEngine + Send + Sync>,
sm: &'a mut dyn SpaceMap,
ignore_non_fatal: bool,
) -> LeafWalker<'a> {
let nr_blocks = engine.get_nr_blocks() as usize;
LeafWalker {
engine,
sm,
leaves: FixedBitSet::with_capacity(nr_blocks),
ignore_non_fatal,
}
}
// Atomically increments the ref count, and returns the _old_ count.
fn sm_inc(&mut self, b: u64) -> u32 {
let sm = &mut self.sm;
let count = sm.get(b).unwrap();
sm.inc(b, 1).unwrap();
count
}
fn walk_nodes<LV, V>(
&mut self,
depth: usize,
path: &mut Vec<u64>,
visitor: &mut LV,
krs: &[KeyRange],
bs: &[u64],
) -> Result<()>
where
LV: LeafVisitor<V>,
V: Unpack,
{
assert_eq!(krs.len(), bs.len());
let mut errs: Vec<BTreeError> = Vec::new();
let mut blocks = Vec::with_capacity(bs.len());
let mut filtered_krs = Vec::with_capacity(krs.len());
for i in 0..bs.len() {
if self.sm_inc(bs[i]) == 0 {
// Node not yet seen
blocks.push(bs[i]);
filtered_krs.push(krs[i].clone());
} else {
// This node has already been checked ...
if let Err(e) = visitor.visit_again(bs[i]) {
// ... but the visitor isn't happy
errs.push(e.clone());
}
}
}
let rblocks = self
.engine
.read_many(&blocks[0..])
.map_err(|_e| io_err(path))?;
let mut i = 0;
for rb in rblocks {
match rb {
Err(_) => {
return Err(io_err(path).keys_context(&filtered_krs[i]));
}
Ok(b) => {
self.walk_node(depth - 1, path, visitor, &filtered_krs[i], &b, false)?;
}
}
i += 1;
}
Ok(())
}
fn walk_node_<LV, V>(
&mut self,
depth: usize,
path: &mut Vec<u64>,
visitor: &mut LV,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
LV: LeafVisitor<V>,
V: Unpack,
{
use Node::*;
let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE {
return Err(node_err_s(
path,
format!("checksum failed for node {}, {:?}", b.loc, bt),
)
.keys_context(kr));
}
let node = unpack_node::<V>(path, &b.get_data(), self.ignore_non_fatal, is_root)?;
if let Internal { keys, values, .. } = node {
let krs = split_key_ranges(path, &kr, &keys)?;
if depth == 0 {
for i in 0..krs.len() {
self.sm.inc(values[i], 1).expect("sm.inc() failed");
for v in &values {
self.leaves.insert(*v as usize);
}
visitor.visit(&krs[i], values[i])?;
}
Ok(())
} else {
self.walk_nodes(depth, path, visitor, &krs, &values)
}
} else {
Err(node_err(path, "btree nodes are not all at the same depth."))
}
}
fn walk_node<LV, V>(
&mut self,
depth: usize,
path: &mut Vec<u64>,
visitor: &mut LV,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
LV: LeafVisitor<V>,
V: Unpack,
{
path.push(b.loc);
let r = self.walk_node_(depth, path, visitor, kr, b, is_root);
path.pop();
visitor.end_walk()?;
r
}
fn get_depth<V: Unpack>(&self, path: &mut Vec<u64>, root: u64, is_root: bool) -> Result<usize> {
use Node::*;
let b = self.engine.read(root).map_err(|_| io_err(path))?;
let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE {
return Err(node_err_s(
path,
format!("checksum failed for node {}, {:?}", root, bt),
));
}
let node = unpack_node::<V>(path, &b.get_data(), self.ignore_non_fatal, is_root)?;
match node {
Internal { values, .. } => {
let n = self.get_depth::<V>(path, values[0], false)?;
Ok(n + 1)
}
Leaf { .. } => Ok(0),
}
}
pub fn walk<LV, V>(&mut self, path: &mut Vec<u64>, visitor: &mut LV, root: u64) -> Result<()>
where
LV: LeafVisitor<V>,
V: Unpack,
{
let kr = KeyRange {
start: None,
end: None,
};
let depth = self.get_depth::<V>(path, root, true)?;
if depth == 0 {
self.sm_inc(root);
self.leaves.insert(root as usize);
visitor.visit(&kr, root)?;
Ok(())
} else {
if self.sm_inc(root) > 0 {
visitor.visit_again(root)
} else {
let root = self.engine.read(root).map_err(|_| io_err(path))?;
self.walk_node(depth - 1, path, visitor, &kr, &root, true)
}
}
}
// Call this to extract the leaves bitset after you've done your walking.
pub fn get_leaves(self) -> FixedBitSet {
self.leaves
}
}
//------------------------------------------

136
src/pdata/btree_merge.rs Normal file
View File

@ -0,0 +1,136 @@
use anyhow::{anyhow, Result};
use byteorder::{LittleEndian, WriteBytesExt};
use std::collections::VecDeque;
use std::io::Cursor;
use std::sync::{Arc, Mutex};
use crate::checksum;
use crate::io_engine::*;
use crate::pdata::btree;
use crate::pdata::btree::*;
use crate::pdata::btree_walker::*;
use crate::pdata::space_map::*;
use crate::pdata::unpack::*;
use crate::write_batcher::*;
//------------------------------------------
// The subtrees will often consist of a single under populated leaf node. Given this
// we're going to merge by:
// i) Building an ordered list of all leaf nodes across all subtrees.
// ii) Merge leaf nodes where they can be packed more efficiently (non destructively to original subtrees).
// iii) Build higher levels from scratch. There are very few of these internal nodes compared to leaves anyway.
struct NodeSummary {
block: u64,
nr_entries: usize,
key_low: u64,
key_high: u64, // inclusive
}
struct LVInner {
last_key: Option<u64>,
leaves: Vec<NodeSummary>,
}
struct LeafVisitor {
inner: Mutex<LVInner>,
}
impl LeafVisitor {
fn new() -> LeafVisitor {
LeafVisitor {
inner: Mutex::new(LVInner {
last_key: None,
leaves: Vec::new(),
}),
}
}
}
impl<V: Unpack> NodeVisitor<V> for LeafVisitor {
fn visit(
&self,
path: &Vec<u64>,
kr: &KeyRange,
header: &NodeHeader,
keys: &[u64],
values: &[V],
) -> btree::Result<()> {
// ignore empty nodes
if keys.len() == 0 {
return Ok(());
}
let mut inner = self.inner.lock().unwrap();
// Check keys are ordered.
if inner.leaves.len() > 0 {
let last_key = inner.leaves.last().unwrap().key_high;
if keys[0] <= last_key {
return Err(BTreeError::NodeError(
"unable to merge btrees: sub trees out of order".to_string(),
));
}
}
let l = NodeSummary {
block: *path.last().unwrap(),
nr_entries: keys.len(),
key_low: keys[0],
key_high: *keys.last().unwrap(),
};
inner.leaves.push(l);
Ok(())
}
fn visit_again(&self, path: &Vec<u64>, b: u64) -> btree::Result<()> {
Ok(())
}
fn end_walk(&self) -> btree::Result<()> {
Ok(())
}
}
pub type AEngine = Arc<dyn IoEngine + Send + Sync>;
fn collect_leaves<V: Unpack>(engine: AEngine, roots: &[u64]) -> Result<Vec<NodeSummary>> {
let lv = LeafVisitor::new();
let walker = BTreeWalker::new(engine, false);
let mut path = Vec::new();
for root in roots {
walker.walk::<LeafVisitor, V>(&mut path, &lv, *root)?;
}
Ok(lv.inner.into_inner().unwrap().leaves)
}
//------------------------------------------
fn optimise_leaves<V: Unpack + Pack>(
batcher: &mut WriteBatcher,
lvs: Vec<NodeSummary>,
) -> Result<Vec<NodeSummary>> {
// FIXME: implement
Ok(lvs)
}
//------------------------------------------
pub fn merge<V: Unpack + Pack>(
engine: AEngine,
sm: Arc<Mutex<dyn SpaceMap>>,
roots: &[u64],
) -> Result<u64> {
let lvs = collect_leaves::<V>(engine.clone(), roots)?;
let mut batcher = WriteBatcher::new(engine, sm, 256);
let lvs = optimise_leaves::<V>(&mut batcher, lvs)?;
todo!();
}
//------------------------------------------

573
src/pdata/btree_walker.rs Normal file
View File

@ -0,0 +1,573 @@
use std::collections::BTreeMap;
use std::sync::{Arc, Mutex};
use threadpool::ThreadPool;
use crate::checksum;
use crate::io_engine::*;
use crate::pdata::btree::*;
use crate::pdata::space_map::*;
use crate::pdata::unpack::*;
//------------------------------------------
pub trait NodeVisitor<V: Unpack> {
// &self is deliberately non mut to allow the walker to use multiple threads.
fn visit(
&self,
path: &Vec<u64>,
kr: &KeyRange,
header: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()>;
// Nodes may be shared and thus visited multiple times. The walker avoids
// doing repeated IO, but it does call this method to keep the visitor up to
// date.
fn visit_again(&self, path: &Vec<u64>, b: u64) -> Result<()>;
fn end_walk(&self) -> Result<()>;
}
#[derive(Clone)]
pub struct BTreeWalker {
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
fails: Arc<Mutex<BTreeMap<u64, BTreeError>>>,
ignore_non_fatal: bool,
}
impl BTreeWalker {
pub fn new(engine: Arc<dyn IoEngine + Send + Sync>, ignore_non_fatal: bool) -> BTreeWalker {
let nr_blocks = engine.get_nr_blocks() as usize;
let r: BTreeWalker = BTreeWalker {
engine,
sm: Arc::new(Mutex::new(RestrictedSpaceMap::new(nr_blocks as u64))),
fails: Arc::new(Mutex::new(BTreeMap::new())),
ignore_non_fatal,
};
r
}
pub fn new_with_sm(
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
ignore_non_fatal: bool,
) -> Result<BTreeWalker> {
{
let sm = sm.lock().unwrap();
assert_eq!(sm.get_nr_blocks().unwrap(), engine.get_nr_blocks());
}
Ok(BTreeWalker {
engine,
sm,
fails: Arc::new(Mutex::new(BTreeMap::new())),
ignore_non_fatal,
})
}
fn failed(&self, b: u64) -> Option<BTreeError> {
let fails = self.fails.lock().unwrap();
match fails.get(&b) {
None => None,
Some(e) => Some(e.clone()),
}
}
fn set_fail(&self, b: u64, err: BTreeError) {
// FIXME: should we monitor the size of fails, and abort if too many errors?
let mut fails = self.fails.lock().unwrap();
fails.insert(b, err);
}
// Atomically increments the ref count, and returns the _old_ count.
fn sm_inc(&self, b: u64) -> u32 {
let mut sm = self.sm.lock().unwrap();
let count = sm.get(b).unwrap();
sm.inc(b, 1).unwrap();
count
}
fn build_aggregate(&self, b: u64, errs: Vec<BTreeError>) -> Result<()> {
match errs.len() {
0 => Ok(()),
1 => {
let e = errs[0].clone();
self.set_fail(b, e.clone());
Err(e)
}
_ => {
let e = aggregate_error(errs);
self.set_fail(b, e.clone());
Err(e)
}
}
}
fn walk_nodes<NV, V>(
&self,
path: &mut Vec<u64>,
visitor: &NV,
krs: &[KeyRange],
bs: &[u64],
) -> Vec<BTreeError>
where
NV: NodeVisitor<V>,
V: Unpack,
{
assert_eq!(krs.len(), bs.len());
let mut errs: Vec<BTreeError> = Vec::new();
let mut blocks = Vec::with_capacity(bs.len());
let mut filtered_krs = Vec::with_capacity(krs.len());
for i in 0..bs.len() {
if self.sm_inc(bs[i]) == 0 {
// Node not yet seen
blocks.push(bs[i]);
filtered_krs.push(krs[i].clone());
} else {
// This node has already been checked ...
match self.failed(bs[i]) {
None => {
// ... it was clean.
if let Err(e) = visitor.visit_again(path, bs[i]) {
// ... but the visitor isn't happy
errs.push(e.clone());
}
}
Some(e) => {
// ... there was an error
errs.push(e.clone());
}
}
}
}
match self.engine.read_many(&blocks[0..]) {
Err(_) => {
// IO completely failed, error every block
for (i, b) in blocks.iter().enumerate() {
let e = io_err(path).keys_context(&filtered_krs[i]);
errs.push(e.clone());
self.set_fail(*b, e);
}
}
Ok(rblocks) => {
let mut i = 0;
for rb in rblocks {
match rb {
Err(_) => {
let e = io_err(path).keys_context(&filtered_krs[i]);
errs.push(e.clone());
self.set_fail(blocks[i], e);
}
Ok(b) => match self.walk_node(path, visitor, &filtered_krs[i], &b, false) {
Err(e) => {
errs.push(e);
}
Ok(()) => {}
},
}
i += 1;
}
}
}
errs
}
fn walk_node_<NV, V>(
&self,
path: &mut Vec<u64>,
visitor: &NV,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V>,
V: Unpack,
{
use Node::*;
let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE {
return Err(node_err_s(
path,
format!("checksum failed for node {}, {:?}", b.loc, bt),
)
.keys_context(kr));
}
let node = unpack_node::<V>(path, &b.get_data(), self.ignore_non_fatal, is_root)?;
match node {
Internal { keys, values, .. } => {
let krs = split_key_ranges(path, &kr, &keys)?;
let errs = self.walk_nodes(path, visitor, &krs, &values);
return self.build_aggregate(b.loc, errs);
}
Leaf {
header,
keys,
values,
} => {
if let Err(e) = visitor.visit(path, &kr, &header, &keys, &values) {
let e = BTreeError::Path(path.clone(), Box::new(e.clone()));
self.set_fail(b.loc, e.clone());
return Err(e);
}
}
}
Ok(())
}
fn walk_node<NV, V>(
&self,
path: &mut Vec<u64>,
visitor: &NV,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V>,
V: Unpack,
{
path.push(b.loc);
let r = self.walk_node_(path, visitor, kr, b, is_root);
path.pop();
visitor.end_walk()?;
r
}
pub fn walk<NV, V>(&self, path: &mut Vec<u64>, visitor: &NV, root: u64) -> Result<()>
where
NV: NodeVisitor<V>,
V: Unpack,
{
if self.sm_inc(root) > 0 {
if let Some(e) = self.failed(root) {
Err(e.clone())
} else {
visitor.visit_again(path, root)
}
} else {
let root = self.engine.read(root).map_err(|_| io_err(path))?;
let kr = KeyRange {
start: None,
end: None,
};
self.walk_node(path, visitor, &kr, &root, true)
}
}
}
//--------------------------------
fn walk_node_threaded_<NV, V>(
w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool,
visitor: Arc<NV>,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
use Node::*;
let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE {
return Err(node_err_s(
path,
format!("checksum failed for node {}, {:?}", b.loc, bt),
)
.keys_context(kr));
}
let node = unpack_node::<V>(path, &b.get_data(), w.ignore_non_fatal, is_root)?;
match node {
Internal { keys, values, .. } => {
let krs = split_key_ranges(path, &kr, &keys)?;
let errs = walk_nodes_threaded(w.clone(), path, pool, visitor, &krs, &values);
return w.build_aggregate(b.loc, errs);
}
Leaf {
header,
keys,
values,
} => {
visitor.visit(path, kr, &header, &keys, &values)?;
}
}
Ok(())
}
fn walk_node_threaded<NV, V>(
w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool,
visitor: Arc<NV>,
kr: &KeyRange,
b: &Block,
is_root: bool,
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
path.push(b.loc);
let r = walk_node_threaded_(w, path, pool, visitor.clone(), kr, b, is_root);
path.pop();
visitor.end_walk()?;
r
}
fn walk_nodes_threaded<NV, V>(
w: Arc<BTreeWalker>,
path: &mut Vec<u64>,
pool: &ThreadPool,
visitor: Arc<NV>,
krs: &[KeyRange],
bs: &[u64],
) -> Vec<BTreeError>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
assert_eq!(krs.len(), bs.len());
let mut errs: Vec<BTreeError> = Vec::new();
let mut blocks = Vec::with_capacity(bs.len());
let mut filtered_krs = Vec::with_capacity(krs.len());
for i in 0..bs.len() {
if w.sm_inc(bs[i]) == 0 {
// Node not yet seen
blocks.push(bs[i]);
filtered_krs.push(krs[i].clone());
} else {
// This node has already been checked ...
match w.failed(bs[i]) {
None => {
// ... it was clean.
if let Err(e) = visitor.visit_again(path, bs[i]) {
// ... but the visitor isn't happy
errs.push(e.clone());
}
}
Some(e) => {
// ... there was an error
errs.push(e.clone());
}
}
}
}
match w.engine.read_many(&blocks[0..]) {
Err(_) => {
// IO completely failed error every block
for (i, b) in blocks.iter().enumerate() {
let e = io_err(path).keys_context(&filtered_krs[i]);
errs.push(e.clone());
w.set_fail(*b, e);
}
}
Ok(rblocks) => {
let mut i = 0;
let errs = Arc::new(Mutex::new(Vec::new()));
for rb in rblocks {
match rb {
Err(_) => {
let e = io_err(path).keys_context(&filtered_krs[i]);
let mut errs = errs.lock().unwrap();
errs.push(e.clone());
w.set_fail(blocks[i], e);
}
Ok(b) => {
let w = w.clone();
let visitor = visitor.clone();
let kr = filtered_krs[i].clone();
let errs = errs.clone();
let mut path = path.clone();
pool.execute(move || {
match w.walk_node(&mut path, visitor.as_ref(), &kr, &b, false) {
Err(e) => {
let mut errs = errs.lock().unwrap();
errs.push(e);
}
Ok(()) => {}
}
});
}
}
i += 1;
}
pool.join();
}
}
errs
}
pub fn walk_threaded<NV, V>(
path: &mut Vec<u64>,
w: Arc<BTreeWalker>,
pool: &ThreadPool,
visitor: Arc<NV>,
root: u64,
) -> Result<()>
where
NV: NodeVisitor<V> + Send + Sync + 'static,
V: Unpack,
{
if w.sm_inc(root) > 0 {
if let Some(e) = w.failed(root) {
Err(e.clone())
} else {
visitor.visit_again(path, root)
}
} else {
let root = w.engine.read(root).map_err(|_| io_err(path))?;
let kr = KeyRange {
start: None,
end: None,
};
walk_node_threaded(w, path, pool, visitor, &kr, &root, true)
}
}
//------------------------------------------
struct ValueCollector<V> {
values: Mutex<BTreeMap<u64, V>>,
}
impl<V> ValueCollector<V> {
fn new() -> ValueCollector<V> {
ValueCollector {
values: Mutex::new(BTreeMap::new()),
}
}
}
// FIXME: should we be using Copy rather than clone? (Yes)
impl<V: Unpack + Copy> NodeVisitor<V> for ValueCollector<V> {
fn visit(
&self,
_path: &Vec<u64>,
_kr: &KeyRange,
_h: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()> {
let mut vals = self.values.lock().unwrap();
for n in 0..keys.len() {
vals.insert(keys[n], values[n].clone());
}
Ok(())
}
fn visit_again(&self, _path: &Vec<u64>, _b: u64) -> Result<()> {
Ok(())
}
fn end_walk(&self) -> Result<()> {
Ok(())
}
}
pub fn btree_to_map<V: Unpack + Copy>(
path: &mut Vec<u64>,
engine: Arc<dyn IoEngine + Send + Sync>,
ignore_non_fatal: bool,
root: u64,
) -> Result<BTreeMap<u64, V>> {
let walker = BTreeWalker::new(engine, ignore_non_fatal);
let visitor = ValueCollector::<V>::new();
walker.walk(path, &visitor, root)?;
Ok(visitor.values.into_inner().unwrap())
}
pub fn btree_to_map_with_sm<V: Unpack + Copy>(
path: &mut Vec<u64>,
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
ignore_non_fatal: bool,
root: u64,
) -> Result<BTreeMap<u64, V>> {
let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?;
let visitor = ValueCollector::<V>::new();
walker.walk(path, &visitor, root)?;
Ok(visitor.values.into_inner().unwrap())
}
//------------------------------------------
struct ValuePathCollector<V> {
values: Mutex<BTreeMap<u64, (Vec<u64>, V)>>,
}
impl<V> ValuePathCollector<V> {
fn new() -> ValuePathCollector<V> {
ValuePathCollector {
values: Mutex::new(BTreeMap::new()),
}
}
}
impl<V: Unpack + Clone> NodeVisitor<V> for ValuePathCollector<V> {
fn visit(
&self,
path: &Vec<u64>,
_kr: &KeyRange,
_h: &NodeHeader,
keys: &[u64],
values: &[V],
) -> Result<()> {
let mut vals = self.values.lock().unwrap();
for n in 0..keys.len() {
vals.insert(keys[n], (path.clone(), values[n].clone()));
}
Ok(())
}
fn visit_again(&self, _path: &Vec<u64>, _b: u64) -> Result<()> {
Ok(())
}
fn end_walk(&self) -> Result<()> {
Ok(())
}
}
pub fn btree_to_map_with_path<V: Unpack + Copy>(
path: &mut Vec<u64>,
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap + Send + Sync>>,
ignore_non_fatal: bool,
root: u64,
) -> Result<BTreeMap<u64, (Vec<u64>, V)>> {
let walker = BTreeWalker::new_with_sm(engine, sm, ignore_non_fatal)?;
let visitor = ValuePathCollector::<V>::new();
walker.walk(path, &visitor, root)?;
Ok(visitor.values.into_inner().unwrap())
}
//------------------------------------------

View File

@ -1,5 +1,8 @@
pub mod btree;
pub mod btree_builder;
pub mod btree_merge;
pub mod btree_leaf_walker;
pub mod btree_walker;
pub mod space_map;
pub mod unpack;

View File

@ -3,6 +3,7 @@ use byteorder::{LittleEndian, WriteBytesExt};
use fixedbitset::FixedBitSet;
use nom::{multi::count, number::complete::*, IResult};
use std::sync::{Arc, Mutex};
use std::boxed::Box;
use crate::io_engine::*;
use crate::pdata::unpack::{Pack, Unpack};
@ -328,6 +329,16 @@ pub fn core_sm(nr_entries: u64, max_count: u32) -> Arc<Mutex<dyn SpaceMap + Send
}
}
pub fn core_sm_without_mutex(nr_entries: u64, max_count: u32) -> Box<dyn SpaceMap> {
if max_count <= u8::MAX as u32 {
Box::new(CoreSpaceMap::<u8>::new(nr_entries))
} else if max_count <= u16::MAX as u32 {
Box::new(CoreSpaceMap::<u16>::new(nr_entries))
} else {
Box::new(CoreSpaceMap::<u32>::new(nr_entries))
}
}
//------------------------------------------
// This in core space map can only count to one, useful when walking

View File

@ -153,17 +153,17 @@ impl SimpleInner {
impl ReportInner for SimpleInner {
fn set_title(&mut self, txt: &str) {
println!("{}", txt);
eprintln!("{}", txt);
}
fn set_sub_title(&mut self, txt: &str) {
println!("{}", txt);
eprintln!("{}", txt);
}
fn progress(&mut self, percent: u8) {
let elapsed = self.last_progress.elapsed().unwrap();
if elapsed > std::time::Duration::from_secs(5) {
println!("Progress: {}%", percent);
eprintln!("Progress: {}%", percent);
self.last_progress = std::time::SystemTime::now();
}
}

View File

@ -9,6 +9,7 @@ use threadpool::ThreadPool;
use crate::checksum;
use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine};
use crate::pdata::btree::{self, *};
use crate::pdata::btree_walker::*;
use crate::pdata::space_map::*;
use crate::pdata::unpack::*;
use crate::report::*;

View File

@ -1,15 +1,21 @@
use anyhow::Result;
use anyhow::{anyhow, Result};
use std::collections::{BTreeMap, BTreeSet};
use std::io::Write;
use std::ops::DerefMut;
use std::path::Path;
use std::sync::{Arc, Mutex};
use crate::io_engine::{AsyncIoEngine, IoEngine, SyncIoEngine};
use crate::checksum;
use crate::io_engine::{AsyncIoEngine, Block, IoEngine, SyncIoEngine};
use crate::pdata::btree::{self, *};
use crate::pdata::btree_leaf_walker::*;
use crate::pdata::btree_walker::*;
use crate::pdata::space_map::*;
use crate::pdata::unpack::*;
use crate::report::*;
use crate::thin::block_time::*;
use crate::thin::device_detail::*;
use crate::thin::runs::*;
use crate::thin::superblock::*;
use crate::thin::xml::{self, MetadataVisitor};
@ -166,51 +172,124 @@ fn mk_context(opts: &ThinDumpOptions) -> Result<Context> {
//------------------------------------------
struct NoopVisitor {}
type DefId = u64;
type ThinId = u32;
impl<V: Unpack> btree::NodeVisitor<V> for NoopVisitor {
fn visit(
&self,
_path: &Vec<u64>,
_kr: &btree::KeyRange,
_h: &btree::NodeHeader,
_k: &[u64],
_values: &[V],
) -> btree::Result<()> {
#[derive(Clone)]
enum Entry {
Leaf(u64),
Ref(DefId),
}
#[derive(Clone)]
struct Mapping {
kr: KeyRange,
entries: Vec<Entry>,
}
#[derive(Clone)]
struct Device {
thin_id: ThinId,
detail: DeviceDetail,
map: Mapping,
}
#[derive(Clone)]
struct Def {
def_id: DefId,
map: Mapping,
}
#[derive(Clone)]
struct Metadata {
defs: Vec<Def>,
devs: Vec<Device>,
}
//------------------------------------------
struct CollectLeaves {
leaves: Vec<Entry>,
}
impl CollectLeaves {
fn new() -> CollectLeaves {
CollectLeaves { leaves: Vec::new() }
}
}
impl LeafVisitor<BlockTime> for CollectLeaves {
fn visit(&mut self, _kr: &KeyRange, b: u64) -> btree::Result<()> {
self.leaves.push(Entry::Leaf(b));
Ok(())
}
fn visit_again(&self, _path: &Vec<u64>, _b: u64) -> btree::Result<()> {
fn visit_again(&mut self, b: u64) -> btree::Result<()> {
self.leaves.push(Entry::Ref(b));
Ok(())
}
fn end_walk(&self) -> btree::Result<()> {
fn end_walk(&mut self) -> btree::Result<()> {
Ok(())
}
}
fn find_shared_nodes(
fn collect_leaves(
ctx: &Context,
nr_metadata_blocks: u64,
roots: &BTreeMap<u64, u64>,
) -> Result<(BTreeSet<u64>, Arc<Mutex<dyn SpaceMap + Send + Sync>>)> {
// By default the walker uses a restricted space map that can only count to 1. So
// we explicitly create a full sm.
let sm = core_sm(nr_metadata_blocks, roots.len() as u32);
let w = BTreeWalker::new_with_sm(ctx.engine.clone(), sm.clone(), false)?;
shared: &mut BTreeSet<u64>,
mut sm: Box<dyn SpaceMap>,
) -> Result<BTreeMap<u64, Vec<Entry>>> {
let mut map: BTreeMap<u64, Vec<Entry>> = BTreeMap::new();
let mut path = Vec::new();
path.push(0);
ctx.report.set_title(&format!(
"Collecting leaves for {} shared nodes",
shared.len()
));
for (thin_id, root) in roots {
ctx.report.info(&format!("scanning {}", thin_id));
let v = NoopVisitor {};
w.walk::<NoopVisitor, BlockTime>(&mut path, &v, *root)?;
// FIXME: we don't want any leaves in shared.
for r in shared.iter() {
let old_count = sm.get(*r).expect("couldn't get count from space map.");
sm.set(*r, 0).expect("couldn't set count in space map.");
let mut w = LeafWalker::new(ctx.engine.clone(), sm.deref_mut(), false);
let mut v = CollectLeaves::new();
let mut path = Vec::new();
path.push(0);
// ctx.report.set_title(&format!("collecting {}", *r));
w.walk::<CollectLeaves, BlockTime>(&mut path, &mut v, *r)?;
sm.set(*r, old_count)
.expect("couldn't set count in space map.");
map.insert(*r, v.leaves);
}
Ok(map)
}
//------------------------------------------
fn find_shared_nodes(
ctx: &Context,
roots: &BTreeMap<u64, (Vec<u64>, u64)>,
) -> Result<(BTreeSet<u64>, Box<dyn SpaceMap>)> {
let nr_metadata_blocks = ctx.engine.get_nr_blocks();
let mut sm = core_sm_without_mutex(nr_metadata_blocks, roots.len() as u32);
let mut v = NoopLeafVisitor {};
let mut w = LeafWalker::new(ctx.engine.clone(), sm.deref_mut(), false);
for (thin_id, (path, root)) in roots {
let mut path = path.clone();
ctx.report.info(&format!("scanning {}", thin_id));
w.walk::<NoopLeafVisitor, BlockTime>(&mut path, &mut v, *root)?;
}
// We have to get the leaves so w is consumed and the &mut on sm
// is dropped.
let leaves = w.get_leaves();
let mut shared = BTreeSet::new();
{
let sm = sm.lock().unwrap();
for i in 0..sm.get_nr_blocks().unwrap() {
if sm.get(i).expect("couldn't get count from space map.") > 1 {
shared.insert(i);
@ -218,69 +297,280 @@ fn find_shared_nodes(
}
}
return Ok((shared, sm));
// we're not interested in leaves (roots will get re-added later).
{
for i in 0..leaves.len() {
if leaves.contains(i) {
shared.remove(&(i as u64));
}
}
}
Ok((shared, sm))
}
//------------------------------------------
fn dump_node(
ctx: &Context,
out: &mut dyn xml::MetadataVisitor,
root: u64,
sm: &Arc<Mutex<dyn SpaceMap + Send + Sync>>,
force: bool, // sets the ref count for the root to zero to force output.
) -> Result<()> {
let w = BTreeWalker::new_with_sm(ctx.engine.clone(), sm.clone(), false)?;
let mut path = Vec::new();
path.push(0);
let v = MappingVisitor::new(out);
// Temporarily set the ref count for the root to zero.
let mut old_count = 0;
if force {
let mut sm = sm.lock().unwrap();
old_count = sm.get(root).unwrap();
sm.set(root, 0)?;
}
w.walk::<MappingVisitor, BlockTime>(&mut path, &v, root)?;
// Reset the ref count for root.
if force {
let mut sm = sm.lock().unwrap();
sm.set(root, old_count)?;
}
Ok(())
}
//------------------------------------------
pub fn dump(opts: ThinDumpOptions) -> Result<()> {
let ctx = mk_context(&opts)?;
fn build_metadata(ctx: &Context, sb: &Superblock) -> Result<Metadata> {
let report = &ctx.report;
let engine = &ctx.engine;
// superblock
report.set_title("Reading superblock");
let sb = read_superblock(engine.as_ref(), SUPERBLOCK_LOCATION)?;
let metadata_root = unpack::<SMRoot>(&sb.metadata_sm_root[0..])?;
let data_root = unpack::<SMRoot>(&sb.data_sm_root[0..])?;
//let metadata_root = unpack::<SMRoot>(&sb.metadata_sm_root[0..])?;
//let data_root = unpack::<SMRoot>(&sb.data_sm_root[0..])?;
let mut path = Vec::new();
path.push(0);
report.set_title("Reading device details");
let devs = btree_to_map::<DeviceDetail>(&mut path, engine.clone(), true, sb.details_root)?;
let details = btree_to_map::<DeviceDetail>(&mut path, engine.clone(), true, sb.details_root)?;
report.set_title("Reading mappings roots");
let roots = btree_to_map::<u64>(&mut path, engine.clone(), true, sb.mapping_root)?;
let roots;
{
let sm = Arc::new(Mutex::new(RestrictedSpaceMap::new(engine.get_nr_blocks())));
roots =
btree_to_map_with_path::<u64>(&mut path, engine.clone(), sm, true, sb.mapping_root)?;
}
report.set_title("Finding shared mappings");
let (shared, sm) = find_shared_nodes(&ctx, metadata_root.nr_blocks, &roots)?;
report.info(&format!("{} shared nodes found", shared.len()));
let (mut shared, sm) = find_shared_nodes(ctx, &roots)?;
// Add in the roots, because they may not be shared.
for (_thin_id, (_path, root)) in &roots {
shared.insert(*root);
}
let entry_map = collect_leaves(&ctx, &mut shared, sm)?;
let mut defs = Vec::new();
let mut devs = Vec::new();
let mut seen = BTreeSet::new();
for (thin_id, (_path, root)) in roots {
let id = thin_id as u64;
let detail = details.get(&id).expect("couldn't find device details");
seen.insert(root);
let es = entry_map.get(&root).unwrap();
let kr = KeyRange::new(); // FIXME: finish
devs.push(Device {
thin_id: thin_id as u32,
detail: detail.clone(),
map: Mapping {
kr,
entries: es.to_vec(),
},
});
}
for b in shared {
if !seen.contains(&b) {
let es = entry_map.get(&b).unwrap();
let kr = KeyRange::new(); // FIXME: finish
defs.push(Def {
def_id: b,
map: Mapping {
kr,
entries: es.to_vec(),
},
});
}
}
Ok(Metadata { defs, devs })
}
//------------------------------------------
fn gather_entries(g: &mut Gatherer, es: &Vec<Entry>) {
g.new_seq();
for e in es {
match e {
Entry::Leaf(b) => {
g.next(*b);
}
Entry::Ref(_id) => {
g.new_seq();
}
}
}
}
fn entries_to_runs(runs: &BTreeMap<u64, Vec<u64>>, es: &Vec<Entry>) -> Vec<Entry> {
use Entry::*;
let mut result = Vec::new();
let mut entry_index = 0;
while entry_index < es.len() {
match es[entry_index] {
Ref(id) => {
result.push(Ref(id));
entry_index += 1;
}
Leaf(b) => {
if let Some(run) = runs.get(&b) {
result.push(Ref(b));
entry_index += run.len();
} else {
result.push(Leaf(b));
entry_index += 1;
}
}
}
}
result
}
// FIXME: do we really need to track kr?
// FIXME: I think this may be better done as part of restore.
fn optimise_metadata(md: Metadata) -> Result<Metadata> {
use Entry::*;
let mut g = Gatherer::new();
for d in &md.defs {
gather_entries(&mut g, &d.map.entries);
}
for d in &md.devs {
gather_entries(&mut g, &d.map.entries);
}
let mut defs = Vec::new();
let mut devs = Vec::new();
let mut runs = BTreeMap::new();
for run in g.gather() {
runs.insert(run[0], run);
}
eprintln!("{} runs", runs.len());
// The runs become additional defs that just contain leaves.
for (head, run) in runs.iter() {
let kr = KeyRange::new();
let entries: Vec<Entry> = run.iter().map(|b| Leaf(*b)).collect();
defs.push(Def {
def_id: *head,
map: Mapping { kr, entries },
});
}
// Expand old defs to use the new atomic runs
for d in &md.defs {
let kr = KeyRange::new();
let entries = entries_to_runs(&runs, &d.map.entries);
defs.push(Def {
def_id: d.def_id,
map: Mapping { kr, entries },
});
}
// Expand old devs to use the new atomic runs
for d in &md.devs {
let kr = KeyRange::new();
let entries = entries_to_runs(&runs, &d.map.entries);
devs.push(Device {
thin_id: d.thin_id,
detail: d.detail,
map: Mapping { kr, entries },
});
}
Ok(Metadata { defs, devs })
}
//------------------------------------------
fn emit_leaf(out: &mut dyn xml::MetadataVisitor, b: &Block) -> Result<()> {
use Node::*;
let v = MappingVisitor::new(out);
let path = Vec::new();
let kr = KeyRange::new();
let bt = checksum::metadata_block_type(b.get_data());
if bt != checksum::BT::NODE {
return Err(anyhow!(format!(
"checksum failed for node {}, {:?}",
b.loc, bt
)));
}
let node = unpack_node::<BlockTime>(&path, &b.get_data(), true, true)?;
match node {
Internal { .. } => {
return Err(anyhow!("not a leaf"));
}
Leaf {
header,
keys,
values,
} => {
if let Err(_e) = v.visit(&path, &kr, &header, &keys, &values) {
return Err(anyhow!("couldn't emit leaf node"));
}
}
}
Ok(())
}
fn read_for<T>(engine: Arc<dyn IoEngine>, blocks: &[u64], mut t: T) -> Result<()>
where
T: FnMut(Block) -> Result<()>,
{
for cs in blocks.chunks(engine.get_batch_size()) {
for b in engine
.read_many(cs)
.map_err(|_e| anyhow!("read_many failed"))?
{
t(b.map_err(|_e| anyhow!("read of individual block failed"))?)?;
}
}
Ok(())
}
fn emit_leaves(ctx: &Context, out: &mut dyn xml::MetadataVisitor, ls: &[u64]) -> Result<()> {
let proc = |b| {
emit_leaf(out, &b)?;
Ok(())
};
read_for(ctx.engine.clone(), ls, proc)
}
fn emit_entries<W: Write>(
ctx: &Context,
out: &mut xml::XmlWriter<W>,
entries: &Vec<Entry>,
) -> Result<()> {
let mut leaves = Vec::new();
for e in entries {
match e {
Entry::Leaf(b) => {
leaves.push(*b);
}
Entry::Ref(id) => {
if leaves.len() > 0 {
emit_leaves(&ctx, out, &leaves[0..])?;
leaves.clear();
}
let str = format!("{}", id);
out.ref_shared(&str)?;
}
}
}
if leaves.len() > 0 {
emit_leaves(&ctx, out, &leaves[0..])?;
}
Ok(())
}
fn dump_metadata(ctx: &Context, sb: &Superblock, md: &Metadata) -> Result<()> {
let data_root = unpack::<SMRoot>(&sb.data_sm_root[0..])?;
let mut out = xml::XmlWriter::new(std::io::stdout());
let xml_sb = xml::Superblock {
uuid: "".to_string(),
@ -294,25 +584,24 @@ pub fn dump(opts: ThinDumpOptions) -> Result<()> {
};
out.superblock_b(&xml_sb)?;
report.set_title("Dumping shared regions");
for b in shared {
out.def_shared_b(&format!("{}", b))?;
dump_node(&ctx, &mut out, b, &sm, true)?;
ctx.report.set_title("Dumping shared regions");
for d in &md.defs {
out.def_shared_b(&format!("{}", d.def_id))?;
emit_entries(ctx, &mut out, &d.map.entries)?;
out.def_shared_e()?;
}
report.set_title("Dumping mappings");
for (thin_id, detail) in devs {
let d = xml::Device {
dev_id: thin_id as u32,
mapped_blocks: detail.mapped_blocks,
transaction: detail.transaction_id,
creation_time: detail.creation_time as u64,
snap_time: detail.snapshotted_time as u64,
ctx.report.set_title("Dumping devices");
for dev in &md.devs {
let device = xml::Device {
dev_id: dev.thin_id,
mapped_blocks: dev.detail.mapped_blocks,
transaction: dev.detail.transaction_id,
creation_time: dev.detail.creation_time as u64,
snap_time: dev.detail.snapshotted_time as u64,
};
out.device_b(&d)?;
let root = roots.get(&thin_id).unwrap();
dump_node(&ctx, &mut out, *root, &sm, false)?;
out.device_b(&device)?;
emit_entries(ctx, &mut out, &dev.map.entries)?;
out.device_e()?;
}
out.superblock_e()?;
@ -321,3 +610,16 @@ pub fn dump(opts: ThinDumpOptions) -> Result<()> {
}
//------------------------------------------
pub fn dump(opts: ThinDumpOptions) -> Result<()> {
let ctx = mk_context(&opts)?;
let sb = read_superblock(ctx.engine.as_ref(), SUPERBLOCK_LOCATION)?;
let md = build_metadata(&ctx, &sb)?;
ctx.report
.set_title("Optimising metadata to improve leaf packing");
let md = optimise_metadata(md)?;
dump_metadata(&ctx, &sb, &md)
}
//------------------------------------------

View File

@ -1,7 +1,8 @@
pub mod block_time;
pub mod device_detail;
pub mod superblock;
pub mod check;
pub mod device_detail;
pub mod dump;
pub mod restore;
pub mod runs;
pub mod superblock;
pub mod xml;

194
src/thin/runs.rs Normal file
View File

@ -0,0 +1,194 @@
use anyhow::{anyhow, Result};
use std::collections::{BTreeMap, BTreeSet};
use std::mem;
//------------------------------------------
#[derive(Clone, Debug)]
struct Entry {
neighbours: BTreeSet<u64>,
}
impl Entry {
fn first_neighbour(&self) -> Option<u64> {
self.neighbours.iter().cloned().next()
}
}
pub struct Gatherer {
prev: Option<u64>,
heads: BTreeSet<u64>,
tails: BTreeSet<u64>,
entries: BTreeMap<u64, Entry>,
}
impl Gatherer {
pub fn new() -> Gatherer {
Gatherer {
prev: None,
heads: BTreeSet::new(),
tails: BTreeSet::new(),
entries: BTreeMap::new(),
}
}
fn is_head(&self, b: u64) -> bool {
self.heads.contains(&b)
}
fn mark_head(&mut self, b: u64) {
self.heads.insert(b);
}
fn is_tail(&self, b: u64) -> bool {
self.tails.contains(&b)
}
fn mark_tail(&mut self, b: u64) {
self.tails.insert(b);
}
pub fn new_seq(&mut self) {
if let Some(b) = self.prev {
self.mark_tail(b);
}
self.prev = None;
}
pub fn next(&mut self, b: u64) {
if let Some(prev) = self.prev {
let e = self.entries.get_mut(&prev).unwrap();
e.neighbours.insert(b);
} else {
self.mark_head(b);
}
if self.entries.get(&b).is_none() {
let e = Entry {
neighbours: BTreeSet::new(),
};
self.entries.insert(b, e);
}
self.prev = Some(b);
}
fn extract_seq(&self, mut b: u64) -> Vec<u64> {
let mut r = Vec::new();
// FIXME: remove
assert!(self.is_head(b));
loop {
r.push(b);
if self.is_tail(b) {
return r;
}
let e = self.entries.get(&b).unwrap();
b = e.first_neighbour().unwrap();
}
}
fn complete_heads_and_tails(&mut self) {
let mut tails = BTreeSet::new();
// add extra tails
for (b, e) in self.entries.iter() {
if e.neighbours.len() != 1 {
tails.insert(*b);
}
if let Some(n) = e.first_neighbour() {
if self.is_head(n) {
tails.insert(*b);
}
}
}
for t in tails {
self.mark_tail(t);
}
// Now we need to mark entries that follow a tail as heads.
let mut heads = mem::take(&mut self.heads);
for t in &self.tails {
if let Some(e) = self.entries.get(&t) {
for n in &e.neighbours {
heads.insert(*n);
}
}
}
mem::swap(&mut heads, &mut self.heads);
}
// Returns atomic subsequences.
pub fn gather(&mut self) -> Vec<Vec<u64>> {
// close the last sequence.
self.new_seq();
self.complete_heads_and_tails();
// FIXME: there must be a 'map'
let mut seqs = Vec::new();
for b in &self.heads {
seqs.push(self.extract_seq(*b));
}
seqs
}
}
//------------------------------------------
#[cfg(test)]
mod tests {
use super::*;
#[test]
fn gather() {
struct Test(Vec<Vec<u64>>, Vec<Vec<u64>>);
let tests = vec![
Test(vec![], vec![]),
Test(vec![vec![1]], vec![vec![1]]),
Test(vec![vec![1, 2, 3]], vec![vec![1, 2, 3]]),
Test(vec![vec![1, 2], vec![1, 2, 3]], vec![vec![1, 2], vec![3]]),
Test(
vec![vec![1, 2, 3, 4], vec![2, 3, 4, 5]],
vec![vec![1], vec![2, 3, 4], vec![5]],
),
Test(
vec![vec![2, 3, 4, 5], vec![1, 2, 3, 4]],
vec![vec![1], vec![2, 3, 4], vec![5]],
),
Test(
vec![
vec![1, 2, 3, 4],
vec![2, 3, 4, 5, 6],
vec![3, 4],
vec![5, 6],
],
vec![vec![1], vec![2], vec![3, 4], vec![5, 6]],
),
];
for t in tests {
eprintln!("new test case");
let mut g = Gatherer::new();
for s in t.0 {
g.new_seq();
for b in s {
g.next(b);
}
}
let seqs = g.gather();
assert_eq!(seqs, t.1);
}
}
}
//------------------------------------------

61
src/write_batcher.rs Normal file
View File

@ -0,0 +1,61 @@
use anyhow::{anyhow, Result};
use std::sync::{Arc, Mutex};
use crate::checksum;
use crate::io_engine::*;
use crate::pdata::space_map::*;
//------------------------------------------
pub struct WriteBatcher {
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap>>,
batch_size: usize,
queue: Vec<Block>,
}
impl WriteBatcher {
pub fn new(
engine: Arc<dyn IoEngine + Send + Sync>,
sm: Arc<Mutex<dyn SpaceMap>>,
batch_size: usize,
) -> WriteBatcher {
WriteBatcher {
engine,
sm,
batch_size,
queue: Vec::with_capacity(batch_size),
}
}
pub fn alloc(&mut self) -> Result<u64> {
let mut sm = self.sm.lock().unwrap();
let b = sm.alloc()?;
if b.is_none() {
return Err(anyhow!("out of metadata space"));
}
Ok(b.unwrap())
}
pub fn write(&mut self, b: Block, kind: checksum::BT) -> Result<()> {
checksum::write_checksum(&mut b.get_data(), kind)?;
if self.queue.len() == self.batch_size {
self.flush()?;
}
self.queue.push(b);
Ok(())
}
pub fn flush(&mut self) -> Result<()> {
self.engine.write_many(&self.queue)?;
self.queue.clear();
Ok(())
}
}
//------------------------------------------