From 2a77036fa8618b884e5551b27d26752c91529876 Mon Sep 17 00:00:00 2001 From: Ming-Hung Tsai Date: Fri, 14 May 2021 21:27:27 +0800 Subject: [PATCH] [array_builder] First draft Not tested yet --- src/pdata/array_builder.rs | 190 +++++++++++++++++++++++++++++++++++++ 1 file changed, 190 insertions(+) create mode 100644 src/pdata/array_builder.rs diff --git a/src/pdata/array_builder.rs b/src/pdata/array_builder.rs new file mode 100644 index 0000000..da94f5f --- /dev/null +++ b/src/pdata/array_builder.rs @@ -0,0 +1,190 @@ +use anyhow::Result; +use byteorder::WriteBytesExt; +use std::collections::VecDeque; +use std::io::Cursor; + +use crate::checksum; +use crate::io_engine::*; +use crate::pdata::array::*; +use crate::pdata::unpack::*; +use crate::write_batcher::*; + +//------------------------------------------ + +pub struct ArrayBuilder { + array_io: ArrayIO, + max_entries_per_block: usize, + values: VecDeque<(u64, V)>, + array_blocks: Vec, + nr_entries: u64, + nr_emitted: u64, + nr_queued: u64, +} + +struct ArraySummary { + block: u64, + index: u64, + nr_entries: usize, +} + +struct ArrayIO { + dummy: std::marker::PhantomData, +} + +struct WriteResult { + loc: u64, +} + +//------------------------------------------ + +fn calc_max_entries() -> usize { + (BLOCK_SIZE - ArrayBlockHeader::disk_size() as usize) / V::disk_size() as usize +} + +//------------------------------------------ + +impl ArrayBuilder { + pub fn new(nr_entries: u64) -> ArrayBuilder { + ArrayBuilder { + array_io: ArrayIO::new(), + max_entries_per_block: calc_max_entries::(), + values: VecDeque::new(), + array_blocks: Vec::new(), + nr_entries, + nr_emitted: 0, + nr_queued: 0, + } + } + + fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> { + assert!(index >= self.nr_emitted + self.nr_queued); + assert!(index < self.nr_entries); + + self.values.push_back((index, v)); + self.nr_queued = index - self.nr_emitted + 1; + + if self.nr_queued > self.max_entries_per_block as u64 { + self.emit_blocks(w)?; + } + + Ok(()) + } + + fn complete(mut self, w: &mut WriteBatcher) -> Result> { + if self.nr_emitted + self.nr_queued < self.nr_entries { + self.push_value(w, self.nr_entries - 1, Default::default())?; + } + self.emit_all(w)?; + Ok(self.array_blocks) + } + + /// Emit all the remaining queued values + fn emit_all(&mut self, w: &mut WriteBatcher) -> Result<()> { + match self.nr_queued { + 0 => { + // There's nothing to emit + Ok(()) + } + n if n <= self.max_entries_per_block as u64 => self.emit_values(w), + _ => { + panic!( + "There shouldn't be more than {} queued values", + self.max_entries_per_block + ); + } + } + } + + /// Emit one or more fully utilized array blocks + fn emit_blocks(&mut self, w: &mut WriteBatcher) -> Result<()> { + while self.nr_queued > self.max_entries_per_block as u64 { + self.emit_values(w)?; + } + Ok(()) + } + + /// Emit an array block with the queued values + fn emit_values(&mut self, w: &mut WriteBatcher) -> Result<()> { + let mut values = Vec::::with_capacity(self.max_entries_per_block); + let mut nr_free = self.max_entries_per_block; + + while !self.values.is_empty() && nr_free > 0 { + let len = self.values.front().unwrap().0 - self.nr_emitted + 1; + if len <= nr_free as u64 { + let (_, v) = self.values.pop_front().unwrap(); + values.resize_with(len as usize - 1, Default::default); + values.push(v); + nr_free -= len as usize; + self.nr_emitted += len; + self.nr_queued -= len; + } else { + values.resize_with(nr_free, Default::default); + self.nr_emitted += nr_free as u64; + self.nr_queued -= nr_free as u64; + } + } + + let nr_entries = values.len(); + let wresult = self.array_io.write(w, values)?; + + self.array_blocks.push(ArraySummary { + block: wresult.loc, + index: self.nr_emitted / self.max_entries_per_block as u64, + nr_entries, + }); + + Ok(()) + } +} + +//------------------------------------------ + +impl ArrayIO { + pub fn new() -> ArrayIO { + ArrayIO { + dummy: std::marker::PhantomData, + } + } + + fn write(&self, w: &mut WriteBatcher, values: Vec) -> Result { + let header = ArrayBlockHeader { + csum: 0, + max_entries: calc_max_entries::() as u32, + nr_entries: values.len() as u32, + value_size: V::disk_size(), + blocknr: 0, + }; + + let ablock = ArrayBlock { header, values }; + + write_array_block(w, ablock) + } +} + +fn write_array_block( + w: &mut WriteBatcher, + mut ablock: ArrayBlock, +) -> Result { + let b = w.alloc()?; + ablock.set_block(b.loc); + + let mut cursor = Cursor::new(b.get_data()); + pack_array_block(&ablock, &mut cursor)?; + let loc = b.loc; + w.write(b, checksum::BT::ARRAY)?; + + Ok(WriteResult { loc }) +} + +fn pack_array_block( + ablock: &ArrayBlock, + w: &mut W, +) -> Result<()> { + ablock.header.pack(w)?; + for v in ablock.values.iter() { + v.pack(w)?; + } + Ok(()) +} + +//------------------------------------------