diff --git a/Cargo.lock b/Cargo.lock index 0478876..a41568a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -619,6 +619,7 @@ dependencies = [ "rand", "tempfile", "thiserror", + "threadpool", ] [[package]] @@ -650,6 +651,15 @@ dependencies = [ "lazy_static", ] +[[package]] +name = "threadpool" +version = "1.8.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d050e60b33d41c19108b32cea32164033a9013fe3b46cbd4457559bfbf77afaa" +dependencies = [ + "num_cpus", +] + [[package]] name = "unicode-width" version = "0.1.8" diff --git a/Cargo.toml b/Cargo.toml index ddcf6bc..e4c1fd1 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -23,6 +23,7 @@ num-traits = "0.2" quick-xml = "0.18" rand = "0.7" tempfile = "3.1" +threadpool = "1.8" thiserror = "1.0" [dev-dependencies] diff --git a/src/block_manager.rs b/src/block_manager.rs index f813012..8e7dc5d 100644 --- a/src/block_manager.rs +++ b/src/block_manager.rs @@ -6,7 +6,7 @@ use std::fs::OpenOptions; use std::io; use std::io::{Read, Seek}; use std::os::unix::fs::OpenOptionsExt; -use std::os::unix::io::AsRawFd; +use std::os::unix::io::{RawFd, AsRawFd}; use std::path::Path; use std::sync::{Arc, Mutex}; use io_uring::opcode::{self, types}; @@ -45,6 +45,8 @@ impl Drop for Block { } } +unsafe impl Send for Block {} + //------------------------------------------ pub trait IoEngine { @@ -105,9 +107,11 @@ impl IoEngine for SyncIoEngine { //------------------------------------------ pub struct AsyncIoEngine { + queue_len: u32, ring: IoUring, nr_blocks: u64, - input: File, + fd: RawFd, + input: Arc, } impl AsyncIoEngine { @@ -118,14 +122,29 @@ impl AsyncIoEngine { .custom_flags(libc::O_DIRECT) .open(path)?; - Ok(AsyncIoEngine { + Ok (AsyncIoEngine { + queue_len, ring: IoUring::new(queue_len)?, nr_blocks: get_nr_blocks(path)?, - input, + fd: input.as_raw_fd(), + input: Arc::new(input), }) } } +impl Clone for AsyncIoEngine { + fn clone(&self) -> AsyncIoEngine { + eprintln!("in clone, queue_len = {}", self.queue_len); + AsyncIoEngine { + queue_len: self.queue_len, + ring: IoUring::new(self.queue_len).expect("couldn't create uring"), + nr_blocks: self.nr_blocks, + fd: self.fd, + input: self.input.clone(), + } + } +} + impl IoEngine for AsyncIoEngine { fn get_nr_blocks(&self) -> u64 { self.nr_blocks diff --git a/src/thin/check.rs b/src/thin/check.rs index c9a4742..c867b89 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -6,8 +6,9 @@ use std::collections::HashSet; use std::error::Error; use std::path::Path; use std::sync::{Arc, Mutex}; -use std::thread; +use std::thread::{self, spawn}; use std::time::{Duration, Instant}; +use threadpool::ThreadPool; use crate::block_manager::{AsyncIoEngine, Block, IoEngine, SyncIoEngine, BLOCK_SIZE}; use crate::checksum; @@ -140,19 +141,22 @@ impl ValueType for ValueU64 { //------------------------------------------ trait NodeVisitor { - fn visit<'a>(&mut self, w: &mut BTreeWalker<'a>, b: &Block, node: &Node) -> Result<()>; + fn visit<'a>(&mut self, w: &BTreeWalker, b: &Block, node: &Node) -> Result<()>; } -struct BTreeWalker<'a> { - engine: &'a mut dyn IoEngine, - seen: &'a mut FixedBitSet, +#[derive(Clone)] +struct BTreeWalker { + engine: Arc>, + seen: Arc>, } -impl<'a> BTreeWalker<'a> { - fn new(engine: &'a mut dyn IoEngine, seen: &'a mut FixedBitSet) -> BTreeWalker<'a> { +impl BTreeWalker { + fn new(engine: AsyncIoEngine) -> BTreeWalker { let nr_blocks = engine.get_nr_blocks() as usize; - assert_eq!(seen.len(), nr_blocks); - let r: BTreeWalker<'a> = BTreeWalker { engine, seen }; + let r: BTreeWalker = BTreeWalker { + engine: Arc::new(Mutex::new(engine)), + seen: Arc::new(Mutex::new(FixedBitSet::with_capacity(nr_blocks))), + }; r } @@ -162,13 +166,17 @@ impl<'a> BTreeWalker<'a> { V: ValueType, { let mut blocks = Vec::new(); + let seen = self.seen.lock().unwrap(); for b in bs { - if !self.seen[*b as usize] { + if !seen[*b as usize] { blocks.push(Block::new(*b)); } } + drop(seen); - self.engine.read_many(&mut blocks)?; + let mut engine = self.engine.lock().unwrap(); + engine.read_many(&mut blocks)?; + drop(engine); for b in blocks { self.walk_node(visitor, &b)?; @@ -182,7 +190,9 @@ impl<'a> BTreeWalker<'a> { NV: NodeVisitor, V: ValueType, { - self.seen.insert(b.loc as usize); + let mut seen = self.seen.lock().unwrap(); + seen.insert(b.loc as usize); + drop(seen); let bt = checksum::metadata_block_type(b.get_data()); if bt != checksum::BT::NODE { @@ -234,16 +244,49 @@ impl ValueType for ValueBlockTime { struct TopLevelVisitor {} impl NodeVisitor for TopLevelVisitor { - fn visit(&mut self, w: &mut BTreeWalker, _b: &Block, node: &Node) -> Result<()> { + fn visit(&mut self, w: &BTreeWalker, _b: &Block, node: &Node) -> Result<()> { if let Node::Leaf { header: _h, keys, values, } = node { - let mut v = BottomLevelVisitor {}; - w.walk_nodes(&mut v, values)?; + let mut blocks = Vec::new(); + let mut thin_ids = Vec::new(); + let seen = w.seen.lock().unwrap(); + for n in 0..keys.len() { + let b = values[n]; + if !seen[b as usize] { + thin_ids.push(keys[n]); + blocks.push(Block::new(b)); + } + } + drop(seen); + + let mut engine = w.engine.lock().unwrap(); + engine.read_many(&mut blocks)?; + drop(engine); + + // FIXME: with a thread pool we need to return errors another way. + let nr_workers = 16; + let pool = ThreadPool::new(nr_workers); + + let mut n = 0; + for b in blocks { + let thin_id = thin_ids[n]; + n += 1; + + let mut w = w.clone(); + pool.execute(move || { + let mut v = BottomLevelVisitor {}; + w.walk_node(&mut v, &b); + eprintln!("checked thin_dev {}", thin_id); + }); + } + + pool.join(); } + Ok(()) } } @@ -251,12 +294,7 @@ impl NodeVisitor for TopLevelVisitor { struct BottomLevelVisitor {} impl NodeVisitor for BottomLevelVisitor { - fn visit( - &mut self, - _w: &mut BTreeWalker, - _b: &Block, - _node: &Node, - ) -> Result<()> { + fn visit(&mut self, _w: &BTreeWalker, _b: &Block, _node: &Node) -> Result<()> { Ok(()) } } @@ -275,10 +313,12 @@ pub fn check(dev: &Path) -> Result<()> { engine.read(&mut root)?; let mut seen = FixedBitSet::with_capacity(engine.get_nr_blocks() as usize); - let mut w = BTreeWalker::new(&mut engine, &mut seen); + let mut w = BTreeWalker::new(engine); let mut visitor = TopLevelVisitor {}; let result = w.walk_node(&mut visitor, &root)?; println!("read mapping tree in {} ms", now.elapsed().as_millis()); Ok(()) } + +//------------------------------------------