From 0f865856ed9d23c569b47e712f59dc691c117684 Mon Sep 17 00:00:00 2001 From: Joe Thornber Date: Mon, 10 Aug 2020 10:44:47 +0100 Subject: [PATCH] [thin_check (rust)] Improve SyncIoEngine. Now opens the file multiple times so different threads can do io in parallel. --- src/io_engine.rs | 45 ++++++++++++++++++++++++++++++++++++--------- src/thin/check.rs | 7 ++++--- 2 files changed, 40 insertions(+), 12 deletions(-) diff --git a/src/io_engine.rs b/src/io_engine.rs index 5b0a313..e89fae7 100644 --- a/src/io_engine.rs +++ b/src/io_engine.rs @@ -4,11 +4,11 @@ use io_uring::IoUring; use std::alloc::{alloc, dealloc, Layout}; use std::fs::File; use std::fs::OpenOptions; -use std::io::{self, Seek, Read}; +use std::io::{self, Read, Seek}; use std::os::unix::fs::OpenOptionsExt; use std::os::unix::io::{AsRawFd, RawFd}; use std::path::Path; -use std::sync::{Arc, Mutex}; +use std::sync::{Arc, Mutex, Condvar}; //------------------------------------------ @@ -62,22 +62,48 @@ fn get_nr_blocks(path: &Path) -> io::Result { pub struct SyncIoEngine { nr_blocks: u64, - input: Mutex, + files: Mutex>, + cvar: Condvar, } impl SyncIoEngine { - pub fn new(path: &Path) -> Result { - let input = OpenOptions::new() + fn open_file(path: &Path) -> Result { + let file = OpenOptions::new() .read(true) .write(false) .custom_flags(libc::O_DIRECT) .open(path)?; + Ok(file) + } + + pub fn new(path: &Path, nr_files: usize) -> Result { + let mut files = Vec::new(); + for _n in 0..nr_files { + files.push(SyncIoEngine::open_file(path)?); + } + Ok(SyncIoEngine { nr_blocks: get_nr_blocks(path)?, - input: Mutex::new(input), + files: Mutex::new(files), + cvar: Condvar::new(), }) } + + fn get(&self) -> File { + let mut files = self.files.lock().unwrap(); + + while files.len() == 0 { + files = self.cvar.wait(files).unwrap(); + } + files.pop().unwrap() + } + + fn put(&self, f: File) { + let mut files = self.files.lock().unwrap(); + files.push(f); + self.cvar.notify_one(); + } } impl IoEngine for SyncIoEngine { @@ -85,21 +111,22 @@ impl IoEngine for SyncIoEngine { self.nr_blocks } - fn read(&self, b: &mut Block) -> Result<()> { - let mut input = self.input.lock().unwrap(); + let mut input = self.get(); input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; input.read_exact(&mut b.get_data())?; + self.put(input); Ok(()) } fn read_many(&self, blocks: &mut Vec) -> Result<()> { - let mut input = self.input.lock().unwrap(); + let mut input = self.get(); for b in blocks { input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; input.read_exact(&mut b.get_data())?; } + self.put(input); Ok(()) } diff --git a/src/thin/check.rs b/src/thin/check.rs index a93eaaf..508a206 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -265,8 +265,9 @@ impl<'a> NodeVisitor for OverflowChecker<'a> { const MAX_CONCURRENT_IO: u32 = 1024; pub fn check(dev: &Path) -> Result<()> { - //let engine = Arc::new(AsyncIoEngine::new(dev, MAX_CONCURRENT_IO)?); - let engine: Arc = Arc::new(SyncIoEngine::new(dev)?); + let nr_threads = 4; + let engine = Arc::new(AsyncIoEngine::new(dev, MAX_CONCURRENT_IO)?); + //let engine: Arc = Arc::new(SyncIoEngine::new(dev, nr_threads)?); let now = Instant::now(); let sb = read_superblock(engine.as_ref(), SUPERBLOCK_LOCATION)?; @@ -295,7 +296,7 @@ pub fn check(dev: &Path) -> Result<()> { let data_sm; { // FIXME: with a thread pool we need to return errors another way. - let nr_workers = 4; + let nr_workers = nr_threads; let pool = ThreadPool::new(nr_workers); let seen = Arc::new(Mutex::new(FixedBitSet::with_capacity( engine.get_nr_blocks() as usize,