[thin_check (rust)] Improve SyncIoEngine.
Now opens the file multiple times so different threads can do io in parallel.
This commit is contained in:
parent
f0df17af9e
commit
0f865856ed
@ -4,11 +4,11 @@ use io_uring::IoUring;
|
||||
use std::alloc::{alloc, dealloc, Layout};
|
||||
use std::fs::File;
|
||||
use std::fs::OpenOptions;
|
||||
use std::io::{self, Seek, Read};
|
||||
use std::io::{self, Read, Seek};
|
||||
use std::os::unix::fs::OpenOptionsExt;
|
||||
use std::os::unix::io::{AsRawFd, RawFd};
|
||||
use std::path::Path;
|
||||
use std::sync::{Arc, Mutex};
|
||||
use std::sync::{Arc, Mutex, Condvar};
|
||||
|
||||
//------------------------------------------
|
||||
|
||||
@ -62,22 +62,48 @@ fn get_nr_blocks(path: &Path) -> io::Result<u64> {
|
||||
|
||||
pub struct SyncIoEngine {
|
||||
nr_blocks: u64,
|
||||
input: Mutex<File>,
|
||||
files: Mutex<Vec<File>>,
|
||||
cvar: Condvar,
|
||||
}
|
||||
|
||||
impl SyncIoEngine {
|
||||
pub fn new(path: &Path) -> Result<SyncIoEngine> {
|
||||
let input = OpenOptions::new()
|
||||
fn open_file(path: &Path) -> Result<File> {
|
||||
let file = OpenOptions::new()
|
||||
.read(true)
|
||||
.write(false)
|
||||
.custom_flags(libc::O_DIRECT)
|
||||
.open(path)?;
|
||||
|
||||
Ok(file)
|
||||
}
|
||||
|
||||
pub fn new(path: &Path, nr_files: usize) -> Result<SyncIoEngine> {
|
||||
let mut files = Vec::new();
|
||||
for _n in 0..nr_files {
|
||||
files.push(SyncIoEngine::open_file(path)?);
|
||||
}
|
||||
|
||||
Ok(SyncIoEngine {
|
||||
nr_blocks: get_nr_blocks(path)?,
|
||||
input: Mutex::new(input),
|
||||
files: Mutex::new(files),
|
||||
cvar: Condvar::new(),
|
||||
})
|
||||
}
|
||||
|
||||
fn get(&self) -> File {
|
||||
let mut files = self.files.lock().unwrap();
|
||||
|
||||
while files.len() == 0 {
|
||||
files = self.cvar.wait(files).unwrap();
|
||||
}
|
||||
files.pop().unwrap()
|
||||
}
|
||||
|
||||
fn put(&self, f: File) {
|
||||
let mut files = self.files.lock().unwrap();
|
||||
files.push(f);
|
||||
self.cvar.notify_one();
|
||||
}
|
||||
}
|
||||
|
||||
impl IoEngine for SyncIoEngine {
|
||||
@ -85,21 +111,22 @@ impl IoEngine for SyncIoEngine {
|
||||
self.nr_blocks
|
||||
}
|
||||
|
||||
|
||||
fn read(&self, b: &mut Block) -> Result<()> {
|
||||
let mut input = self.input.lock().unwrap();
|
||||
let mut input = self.get();
|
||||
input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?;
|
||||
input.read_exact(&mut b.get_data())?;
|
||||
self.put(input);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn read_many(&self, blocks: &mut Vec<Block>) -> Result<()> {
|
||||
let mut input = self.input.lock().unwrap();
|
||||
let mut input = self.get();
|
||||
for b in blocks {
|
||||
input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?;
|
||||
input.read_exact(&mut b.get_data())?;
|
||||
}
|
||||
self.put(input);
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
@ -265,8 +265,9 @@ impl<'a> NodeVisitor<u32> for OverflowChecker<'a> {
|
||||
const MAX_CONCURRENT_IO: u32 = 1024;
|
||||
|
||||
pub fn check(dev: &Path) -> Result<()> {
|
||||
//let engine = Arc::new(AsyncIoEngine::new(dev, MAX_CONCURRENT_IO)?);
|
||||
let engine: Arc<dyn IoEngine + Send + Sync> = Arc::new(SyncIoEngine::new(dev)?);
|
||||
let nr_threads = 4;
|
||||
let engine = Arc::new(AsyncIoEngine::new(dev, MAX_CONCURRENT_IO)?);
|
||||
//let engine: Arc<dyn IoEngine + Send + Sync> = Arc::new(SyncIoEngine::new(dev, nr_threads)?);
|
||||
|
||||
let now = Instant::now();
|
||||
let sb = read_superblock(engine.as_ref(), SUPERBLOCK_LOCATION)?;
|
||||
@ -295,7 +296,7 @@ pub fn check(dev: &Path) -> Result<()> {
|
||||
let data_sm;
|
||||
{
|
||||
// FIXME: with a thread pool we need to return errors another way.
|
||||
let nr_workers = 4;
|
||||
let nr_workers = nr_threads;
|
||||
let pool = ThreadPool::new(nr_workers);
|
||||
let seen = Arc::new(Mutex::new(FixedBitSet::with_capacity(
|
||||
engine.get_nr_blocks() as usize,
|
||||
|
Loading…
x
Reference in New Issue
Block a user