[thin_restore (rust)] Apply several fixes
- Fix reading queued blocks - Fix unnecessary block shadowing when there's no remaining values - Prevent superblock from overwritten - Flush queued writes before updating superblock
This commit is contained in:
parent
e9899ac610
commit
4b4584c830
@ -152,7 +152,7 @@ pub trait NodeIO<V: Unpack + Pack> {
|
|||||||
fn write(&self, w: &mut WriteBatcher, keys: Vec<u64>, values: Vec<V>) -> Result<WriteResult>;
|
fn write(&self, w: &mut WriteBatcher, keys: Vec<u64>, values: Vec<V>) -> Result<WriteResult>;
|
||||||
fn read(
|
fn read(
|
||||||
&self,
|
&self,
|
||||||
engine: &Arc<dyn IoEngine + Send + Sync>,
|
w: &mut WriteBatcher,
|
||||||
block: u64,
|
block: u64,
|
||||||
) -> Result<(Vec<u64>, Vec<V>)>;
|
) -> Result<(Vec<u64>, Vec<V>)>;
|
||||||
}
|
}
|
||||||
@ -180,10 +180,10 @@ impl<V: Unpack + Pack> NodeIO<V> for LeafIO {
|
|||||||
|
|
||||||
fn read(
|
fn read(
|
||||||
&self,
|
&self,
|
||||||
engine: &Arc<dyn IoEngine + Send + Sync>,
|
w: &mut WriteBatcher,
|
||||||
block: u64,
|
block: u64,
|
||||||
) -> Result<(Vec<u64>, Vec<V>)> {
|
) -> Result<(Vec<u64>, Vec<V>)> {
|
||||||
let b = engine.read(block)?;
|
let b = w.read(block)?;
|
||||||
let path = Vec::new();
|
let path = Vec::new();
|
||||||
match unpack_node::<V>(&path, b.get_data(), true, true)? {
|
match unpack_node::<V>(&path, b.get_data(), true, true)? {
|
||||||
Node::Internal { .. } => {
|
Node::Internal { .. } => {
|
||||||
@ -217,10 +217,10 @@ impl NodeIO<u64> for InternalIO {
|
|||||||
|
|
||||||
fn read(
|
fn read(
|
||||||
&self,
|
&self,
|
||||||
engine: &Arc<dyn IoEngine + Send + Sync>,
|
w: &mut WriteBatcher,
|
||||||
block: u64,
|
block: u64,
|
||||||
) -> Result<(Vec<u64>, Vec<u64>)> {
|
) -> Result<(Vec<u64>, Vec<u64>)> {
|
||||||
let b = engine.read(block)?;
|
let b = w.read(block)?;
|
||||||
let path = Vec::new();
|
let path = Vec::new();
|
||||||
match unpack_node::<u64>(&path, b.get_data(), true, true)? {
|
match unpack_node::<u64>(&path, b.get_data(), true, true)? {
|
||||||
Node::Internal { keys, values, .. } => Ok((keys, values)),
|
Node::Internal { keys, values, .. } => Ok((keys, values)),
|
||||||
@ -307,7 +307,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Decide if we're going to use the pre-built nodes.
|
// Decide if we're going to use the pre-built nodes.
|
||||||
if self.values.len() < half_full {
|
if (self.values.len() > 0) && (self.values.len() < half_full) {
|
||||||
// To avoid writing an under populated node we have to grab some
|
// To avoid writing an under populated node we have to grab some
|
||||||
// values from the first of the shared nodes.
|
// values from the first of the shared nodes.
|
||||||
let (keys, values) = self.read_node(w, nodes.get(0).unwrap().block)?;
|
let (keys, values) = self.read_node(w, nodes.get(0).unwrap().block)?;
|
||||||
@ -345,7 +345,7 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
|
|||||||
pub fn complete(mut self, w: &mut WriteBatcher) -> Result<Vec<NodeSummary>> {
|
pub fn complete(mut self, w: &mut WriteBatcher) -> Result<Vec<NodeSummary>> {
|
||||||
let half_full = self.max_entries_per_node / 2;
|
let half_full = self.max_entries_per_node / 2;
|
||||||
|
|
||||||
if (self.nodes.len() > 0) && (self.values.len() < half_full) {
|
if (self.values.len() > 0) && (self.values.len() < half_full) && (self.nodes.len() > 0) {
|
||||||
// We don't have enough values to emit a node. So we're going to
|
// We don't have enough values to emit a node. So we're going to
|
||||||
// have to rebalance with the previous node.
|
// have to rebalance with the previous node.
|
||||||
self.unshift_node(w)?;
|
self.unshift_node(w)?;
|
||||||
@ -364,8 +364,8 @@ impl<'a, V: Pack + Unpack + Clone> NodeBuilder<V> {
|
|||||||
|
|
||||||
// We're only interested in the keys and values from the node, and
|
// We're only interested in the keys and values from the node, and
|
||||||
// not whether it's a leaf or internal node.
|
// not whether it's a leaf or internal node.
|
||||||
fn read_node(&self, w: &WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<V>)> {
|
fn read_node(&self, w: &mut WriteBatcher, block: u64) -> Result<(Vec<u64>, Vec<V>)> {
|
||||||
self.nio.read(&w.engine, block)
|
self.nio.read(w, block)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Writes a node with the first 'nr_entries' values.
|
/// Writes a node with the first 'nr_entries' values.
|
||||||
|
@ -98,6 +98,7 @@ impl<'a> Pass1<'a> {
|
|||||||
impl<'a> MetadataVisitor for Pass1<'a> {
|
impl<'a> MetadataVisitor for Pass1<'a> {
|
||||||
fn superblock_b(&mut self, sb: &xml::Superblock) -> Result<Visit> {
|
fn superblock_b(&mut self, sb: &xml::Superblock) -> Result<Visit> {
|
||||||
self.result.sb = Some(sb.clone());
|
self.result.sb = Some(sb.clone());
|
||||||
|
self.w.alloc()?;
|
||||||
Ok(Visit::Continue)
|
Ok(Visit::Continue)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -277,6 +278,8 @@ pub fn restore(opts: ThinRestoreOptions) -> Result<()> {
|
|||||||
// FIXME: I think we need to decrement the shared leaves
|
// FIXME: I think we need to decrement the shared leaves
|
||||||
// Build metadata space map
|
// Build metadata space map
|
||||||
|
|
||||||
|
w.flush()?;
|
||||||
|
|
||||||
// Write the superblock
|
// Write the superblock
|
||||||
if let Some(xml_sb) = pass.sb {
|
if let Some(xml_sb) = pass.sb {
|
||||||
let sb = superblock::Superblock {
|
let sb = superblock::Superblock {
|
||||||
|
@ -65,6 +65,18 @@ impl WriteBatcher {
|
|||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
pub fn read(&mut self, blocknr: u64) -> Result<Block> {
|
||||||
|
for b in self.queue.iter().rev() {
|
||||||
|
if b.loc == blocknr {
|
||||||
|
let r = Block::new(b.loc);
|
||||||
|
r.get_data().copy_from_slice(b.get_data());
|
||||||
|
return Ok(r);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
self.engine.read(blocknr).map_err(|_| anyhow!("read block error"))
|
||||||
|
}
|
||||||
|
|
||||||
pub fn flush_(&mut self, queue: Vec<Block>) -> Result<()> {
|
pub fn flush_(&mut self, queue: Vec<Block>) -> Result<()> {
|
||||||
self.engine.write_many(&queue)?;
|
self.engine.write_many(&queue)?;
|
||||||
Ok(())
|
Ok(())
|
||||||
|
Loading…
x
Reference in New Issue
Block a user