diff --git a/src/block_manager.rs b/src/block_manager.rs index 8e7dc5d..ba8bf81 100644 --- a/src/block_manager.rs +++ b/src/block_manager.rs @@ -1,4 +1,6 @@ use anyhow::{anyhow, Result}; +use io_uring::opcode::{self, types}; +use io_uring::IoUring; use std::alloc::{alloc, dealloc, Layout}; use std::collections::HashMap; use std::fs::File; @@ -6,11 +8,9 @@ use std::fs::OpenOptions; use std::io; use std::io::{Read, Seek}; use std::os::unix::fs::OpenOptionsExt; -use std::os::unix::io::{RawFd, AsRawFd}; +use std::os::unix::io::{AsRawFd, RawFd}; use std::path::Path; use std::sync::{Arc, Mutex}; -use io_uring::opcode::{self, types}; -use io_uring::IoUring; //------------------------------------------ @@ -51,8 +51,8 @@ unsafe impl Send for Block {} pub trait IoEngine { fn get_nr_blocks(&self) -> u64; - fn read(&mut self, block: &mut Block) -> Result<()>; - fn read_many(&mut self, blocks: &mut Vec) -> Result<()>; + fn read(&self, block: &mut Block) -> Result<()>; + fn read_many(&self, blocks: &mut Vec) -> Result<()>; } fn get_nr_blocks(path: &Path) -> io::Result { @@ -62,6 +62,7 @@ fn get_nr_blocks(path: &Path) -> io::Result { //------------------------------------------ +/* pub struct SyncIoEngine { nr_blocks: u64, input: File, @@ -103,10 +104,10 @@ impl IoEngine for SyncIoEngine { Ok(()) } } - +*/ //------------------------------------------ -pub struct AsyncIoEngine { +pub struct AsyncIoEngine_ { queue_len: u32, ring: IoUring, nr_blocks: u64, @@ -114,6 +115,10 @@ pub struct AsyncIoEngine { input: Arc, } +pub struct AsyncIoEngine { + inner: Mutex, +} + impl AsyncIoEngine { pub fn new(path: &Path, queue_len: u32) -> Result { let input = OpenOptions::new() @@ -122,75 +127,86 @@ impl AsyncIoEngine { .custom_flags(libc::O_DIRECT) .open(path)?; - Ok (AsyncIoEngine { - queue_len, - ring: IoUring::new(queue_len)?, - nr_blocks: get_nr_blocks(path)?, - fd: input.as_raw_fd(), - input: Arc::new(input), + Ok(AsyncIoEngine { + inner: Mutex::new(AsyncIoEngine_ { + queue_len, + ring: IoUring::new(queue_len)?, + nr_blocks: get_nr_blocks(path)?, + fd: input.as_raw_fd(), + input: Arc::new(input), + }), }) } } impl Clone for AsyncIoEngine { fn clone(&self) -> AsyncIoEngine { - eprintln!("in clone, queue_len = {}", self.queue_len); - AsyncIoEngine { - queue_len: self.queue_len, - ring: IoUring::new(self.queue_len).expect("couldn't create uring"), - nr_blocks: self.nr_blocks, - fd: self.fd, - input: self.input.clone(), + let inner = self.inner.lock().unwrap(); + eprintln!("in clone, queue_len = {}", inner.queue_len); + AsyncIoEngine {inner: Mutex::new(AsyncIoEngine_ { + queue_len: inner.queue_len, + ring: IoUring::new(inner.queue_len).expect("couldn't create uring"), + nr_blocks: inner.nr_blocks, + fd: inner.fd, + input: inner.input.clone(), + }), } } } impl IoEngine for AsyncIoEngine { fn get_nr_blocks(&self) -> u64 { - self.nr_blocks + let inner = self.inner.lock().unwrap(); + inner.nr_blocks } - fn read(&mut self, b: &mut Block) -> Result<()> { - let fd = types::Target::Fd(self.input.as_raw_fd()); - let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32).offset(b.loc as i64 * BLOCK_SIZE as i64); + fn read(&self, b: &mut Block) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); + let fd = types::Target::Fd(inner.input.as_raw_fd()); + let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32) + .offset(b.loc as i64 * BLOCK_SIZE as i64); unsafe { - let mut queue = self.ring.submission().available(); - queue.push(read_e.build().user_data(1)) + let mut queue = inner.ring.submission().available(); + queue + .push(read_e.build().user_data(1)) .ok() .expect("queue is full"); } - self.ring.submit_and_wait(1)?; + inner.ring.submit_and_wait(1)?; - let cqes = self.ring.completion().available().collect::>(); + let cqes = inner.ring.completion().available().collect::>(); - // FIXME: return proper errors - assert_eq!(cqes.len(), 1); - assert_eq!(cqes[0].user_data(), 1); - assert_eq!(cqes[0].result(), BLOCK_SIZE as i32); + // FIXME: return proper errors + assert_eq!(cqes.len(), 1); + assert_eq!(cqes[0].user_data(), 1); + assert_eq!(cqes[0].result(), BLOCK_SIZE as i32); Ok(()) } - fn read_many(&mut self, blocks: &mut Vec) -> Result<()> { + fn read_many(&self, blocks: &mut Vec) -> Result<()> { + let mut inner = self.inner.lock().unwrap(); let count = blocks.len(); - let fd = types::Target::Fd(self.input.as_raw_fd()); + let fd = types::Target::Fd(inner.input.as_raw_fd()); for b in blocks.into_iter() { - let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32).offset(b.loc as i64 * BLOCK_SIZE as i64); + let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32) + .offset(b.loc as i64 * BLOCK_SIZE as i64); unsafe { - let mut queue = self.ring.submission().available(); - queue.push(read_e.build().user_data(1)) + let mut queue = inner.ring.submission().available(); + queue + .push(read_e.build().user_data(1)) .ok() .expect("queue is full"); } } - self.ring.submit_and_wait(count)?; + inner.ring.submit_and_wait(count)?; - let cqes = self.ring.completion().available().collect::>(); + let cqes = inner.ring.completion().available().collect::>(); // FIXME: return proper errors assert_eq!(cqes.len(), count); diff --git a/src/thin/check.rs b/src/thin/check.rs index c867b89..471fad7 100644 --- a/src/thin/check.rs +++ b/src/thin/check.rs @@ -10,7 +10,7 @@ use std::thread::{self, spawn}; use std::time::{Duration, Instant}; use threadpool::ThreadPool; -use crate::block_manager::{AsyncIoEngine, Block, IoEngine, SyncIoEngine, BLOCK_SIZE}; +use crate::block_manager::{AsyncIoEngine, Block, IoEngine, BLOCK_SIZE}; use crate::checksum; use crate::thin::superblock::*;