[array_builder] Simplify array building process
This commit is contained in:
parent
60b65ebe7a
commit
88e7f8fd69
@ -1,10 +1,10 @@
|
|||||||
use anyhow::Result;
|
use anyhow::{anyhow, Result};
|
||||||
use byteorder::WriteBytesExt;
|
use byteorder::WriteBytesExt;
|
||||||
use std::collections::VecDeque;
|
|
||||||
use std::io::Cursor;
|
use std::io::Cursor;
|
||||||
|
|
||||||
use crate::checksum;
|
use crate::checksum;
|
||||||
use crate::io_engine::*;
|
use crate::io_engine::*;
|
||||||
|
use crate::math::*;
|
||||||
use crate::pdata::array::*;
|
use crate::pdata::array::*;
|
||||||
use crate::pdata::btree_builder::*;
|
use crate::pdata::btree_builder::*;
|
||||||
use crate::pdata::unpack::*;
|
use crate::pdata::unpack::*;
|
||||||
@ -14,12 +14,10 @@ use crate::write_batcher::*;
|
|||||||
|
|
||||||
pub struct ArrayBlockBuilder<V: Unpack + Pack> {
|
pub struct ArrayBlockBuilder<V: Unpack + Pack> {
|
||||||
array_io: ArrayIO<V>,
|
array_io: ArrayIO<V>,
|
||||||
max_entries_per_block: usize,
|
nr_entries: u64, // size of the array
|
||||||
values: VecDeque<(u64, V)>,
|
entries_per_block: usize,
|
||||||
array_blocks: Vec<u64>,
|
array_blocks: Vec<u64>, // emitted array blocks
|
||||||
nr_entries: u64,
|
values: Vec<V>, // internal buffer
|
||||||
nr_emitted: u64,
|
|
||||||
nr_queued: u64,
|
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct ArrayBuilder<V: Unpack + Pack> {
|
pub struct ArrayBuilder<V: Unpack + Pack> {
|
||||||
@ -44,91 +42,68 @@ fn calc_max_entries<V: Unpack>() -> usize {
|
|||||||
|
|
||||||
impl<V: Unpack + Pack + Clone + Default> ArrayBlockBuilder<V> {
|
impl<V: Unpack + Pack + Clone + Default> ArrayBlockBuilder<V> {
|
||||||
pub fn new(nr_entries: u64) -> ArrayBlockBuilder<V> {
|
pub fn new(nr_entries: u64) -> ArrayBlockBuilder<V> {
|
||||||
|
let entries_per_block = calc_max_entries::<V>();
|
||||||
|
let nr_blocks = div_up(nr_entries, entries_per_block as u64) as usize;
|
||||||
|
let next_cap = std::cmp::min(nr_entries, entries_per_block as u64) as usize;
|
||||||
|
|
||||||
ArrayBlockBuilder {
|
ArrayBlockBuilder {
|
||||||
array_io: ArrayIO::new(),
|
array_io: ArrayIO::new(),
|
||||||
max_entries_per_block: calc_max_entries::<V>(),
|
|
||||||
values: VecDeque::new(),
|
|
||||||
array_blocks: Vec::new(),
|
|
||||||
nr_entries,
|
nr_entries,
|
||||||
nr_emitted: 0,
|
entries_per_block,
|
||||||
nr_queued: 0,
|
array_blocks: Vec::with_capacity(nr_blocks),
|
||||||
|
values: Vec::<V>::with_capacity(next_cap),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> {
|
pub fn push_value(&mut self, w: &mut WriteBatcher, index: u64, v: V) -> Result<()> {
|
||||||
assert!(index >= self.nr_emitted + self.nr_queued);
|
let bi = index / self.entries_per_block as u64;
|
||||||
assert!(index < self.nr_entries);
|
let i = (index % self.entries_per_block as u64) as usize;
|
||||||
|
|
||||||
self.values.push_back((index, v));
|
if bi < self.array_blocks.len() as u64 || i < self.values.len() || index >= self.nr_entries
|
||||||
self.nr_queued = index - self.nr_emitted + 1;
|
{
|
||||||
|
return Err(anyhow!("array index out of bounds"));
|
||||||
if self.nr_queued > self.max_entries_per_block as u64 {
|
|
||||||
self.emit_blocks(w)?;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
while (self.array_blocks.len() as u64) < bi {
|
||||||
|
self.emit_block(w)?;
|
||||||
|
}
|
||||||
|
|
||||||
|
if i > self.values.len() + 1 {
|
||||||
|
self.values.resize_with(i - 1, Default::default);
|
||||||
|
}
|
||||||
|
self.values.push(v);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn complete(mut self, w: &mut WriteBatcher) -> Result<Vec<u64>> {
|
pub fn complete(mut self, w: &mut WriteBatcher) -> Result<Vec<u64>> {
|
||||||
if self.nr_emitted + self.nr_queued < self.nr_entries {
|
// Emit all the remaining queued values
|
||||||
// FIXME: flushing with a default values looks confusing
|
let nr_blocks = self.array_blocks.capacity();
|
||||||
self.push_value(w, self.nr_entries - 1, Default::default())?;
|
while self.array_blocks.len() < nr_blocks {
|
||||||
|
self.emit_block(w)?;
|
||||||
}
|
}
|
||||||
self.emit_all(w)?;
|
|
||||||
Ok(self.array_blocks)
|
Ok(self.array_blocks)
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Emit all the remaining queued values
|
/// Emit a fully utilized array block
|
||||||
fn emit_all(&mut self, w: &mut WriteBatcher) -> Result<()> {
|
fn emit_block(&mut self, w: &mut WriteBatcher) -> Result<()> {
|
||||||
match self.nr_queued {
|
let nr_blocks = self.array_blocks.capacity();
|
||||||
0 => {
|
let cur_bi = self.array_blocks.len();
|
||||||
// There's nothing to emit
|
let next_cap;
|
||||||
Ok(())
|
if cur_bi < nr_blocks - 1 {
|
||||||
}
|
let next_begin = (cur_bi as u64 + 1) * self.entries_per_block as u64;
|
||||||
n if n <= self.max_entries_per_block as u64 => self.emit_values(w),
|
next_cap =
|
||||||
_ => {
|
std::cmp::min(self.nr_entries - next_begin, self.entries_per_block as u64) as usize;
|
||||||
panic!(
|
} else {
|
||||||
"There shouldn't be more than {} queued values",
|
next_cap = 0;
|
||||||
self.max_entries_per_block
|
|
||||||
);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Emit one or more fully utilized array blocks
|
|
||||||
fn emit_blocks(&mut self, w: &mut WriteBatcher) -> Result<()> {
|
|
||||||
while self.nr_queued > self.max_entries_per_block as u64 {
|
|
||||||
self.emit_values(w)?;
|
|
||||||
}
|
|
||||||
Ok(())
|
|
||||||
}
|
|
||||||
|
|
||||||
/// Emit an array block with the queued values
|
|
||||||
fn emit_values(&mut self, w: &mut WriteBatcher) -> Result<()> {
|
|
||||||
let mut values = Vec::<V>::with_capacity(self.max_entries_per_block);
|
|
||||||
let mut nr_free = self.max_entries_per_block;
|
|
||||||
|
|
||||||
while !self.values.is_empty() && nr_free > 0 {
|
|
||||||
let len = self.values.front().unwrap().0 - self.nr_emitted + 1;
|
|
||||||
if len <= nr_free as u64 {
|
|
||||||
let (_, v) = self.values.pop_front().unwrap();
|
|
||||||
if len > 1 {
|
|
||||||
values.resize_with(values.len() + len as usize - 1, Default::default);
|
|
||||||
}
|
|
||||||
values.push(v);
|
|
||||||
nr_free -= len as usize;
|
|
||||||
self.nr_emitted += len;
|
|
||||||
self.nr_queued -= len;
|
|
||||||
} else {
|
|
||||||
values.resize_with(values.len() + nr_free as usize, Default::default);
|
|
||||||
self.nr_emitted += nr_free as u64;
|
|
||||||
self.nr_queued -= nr_free as u64;
|
|
||||||
nr_free = 0;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
let mut values = Vec::<V>::with_capacity(next_cap);
|
||||||
|
std::mem::swap(&mut self.values, &mut values);
|
||||||
|
|
||||||
|
values.resize_with(values.capacity(), Default::default);
|
||||||
let wresult = self.array_io.write(w, values)?;
|
let wresult = self.array_io.write(w, values)?;
|
||||||
|
|
||||||
self.array_blocks.push(wresult.loc);
|
self.array_blocks.push(wresult.loc);
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
|
Loading…
Reference in New Issue
Block a user