[thin_check (rust)] Switch to a different io_uring crate.

This one works.
This commit is contained in:
Joe Thornber 2020-07-28 12:57:30 +01:00
parent a90294e279
commit e9abdd9c88
4 changed files with 76 additions and 42 deletions

21
Cargo.lock generated
View File

@ -250,6 +250,16 @@ dependencies = [
"libc", "libc",
] ]
[[package]]
name = "io-uring"
version = "0.3.5"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "a31e11f8867575fc79a3e73e5f554d0b7386bc4a6f469039e8a83136c724fd81"
dependencies = [
"bitflags",
"libc",
]
[[package]] [[package]]
name = "lazy_static" name = "lazy_static"
version = "1.4.0" version = "1.4.0"
@ -527,15 +537,6 @@ dependencies = [
"winapi", "winapi",
] ]
[[package]]
name = "rio"
version = "0.9.3"
source = "registry+https://github.com/rust-lang/crates.io-index"
checksum = "ce02a35f6fbcc9c5ce0674f17d33fb56afbe0bec6f6263affed4b1ebf594d95d"
dependencies = [
"libc",
]
[[package]] [[package]]
name = "ryu" name = "ryu"
version = "1.0.5" version = "1.0.5"
@ -605,6 +606,7 @@ dependencies = [
"fixedbitset", "fixedbitset",
"flate2", "flate2",
"futures", "futures",
"io-uring",
"libc", "libc",
"nix", "nix",
"nom", "nom",
@ -615,7 +617,6 @@ dependencies = [
"quickcheck", "quickcheck",
"quickcheck_macros", "quickcheck_macros",
"rand", "rand",
"rio",
"tempfile", "tempfile",
"thiserror", "thiserror",
] ]

View File

@ -13,6 +13,7 @@ crc32c = "0.4"
fixedbitset = "0.3" fixedbitset = "0.3"
futures = "0.3" futures = "0.3"
flate2 = "1.0" flate2 = "1.0"
io-uring = "0.3"
libc = "0.2.71" libc = "0.2.71"
nix = "0.17" nix = "0.17"
nom = "5.1" nom = "5.1"
@ -21,7 +22,6 @@ num-derive = "0.3"
num-traits = "0.2" num-traits = "0.2"
quick-xml = "0.18" quick-xml = "0.18"
rand = "0.7" rand = "0.7"
rio = "0.9"
tempfile = "3.1" tempfile = "3.1"
thiserror = "1.0" thiserror = "1.0"

View File

@ -1,5 +1,4 @@
use anyhow::{anyhow, Result}; use anyhow::{anyhow, Result};
use rio::{self, Completion, Rio};
use std::alloc::{alloc, dealloc, Layout}; use std::alloc::{alloc, dealloc, Layout};
use std::collections::HashMap; use std::collections::HashMap;
use std::fs::File; use std::fs::File;
@ -7,14 +6,17 @@ use std::fs::OpenOptions;
use std::io; use std::io;
use std::io::{Read, Seek}; use std::io::{Read, Seek};
use std::os::unix::fs::OpenOptionsExt; use std::os::unix::fs::OpenOptionsExt;
use std::os::unix::io::AsRawFd;
use std::path::Path; use std::path::Path;
use std::sync::{Arc, Mutex}; use std::sync::{Arc, Mutex};
use io_uring::opcode::{self, types};
use io_uring::IoUring;
//------------------------------------------
pub const BLOCK_SIZE: usize = 4096; pub const BLOCK_SIZE: usize = 4096;
const ALIGN: usize = 4096; const ALIGN: usize = 4096;
// FIXME: introduce a cache
// FIXME: use O_DIRECT
#[derive(Debug)] #[derive(Debug)]
pub struct Block { pub struct Block {
pub loc: u64, pub loc: u64,
@ -84,7 +86,8 @@ impl IoEngine for SyncIoEngine {
} }
fn read(&mut self, b: &mut Block) -> Result<()> { fn read(&mut self, b: &mut Block) -> Result<()> {
self.input.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?; self.input
.seek(io::SeekFrom::Start(b.loc * BLOCK_SIZE as u64))?;
self.input.read_exact(&mut b.get_data())?; self.input.read_exact(&mut b.get_data())?;
Ok(()) Ok(())
@ -101,53 +104,83 @@ impl IoEngine for SyncIoEngine {
//------------------------------------------ //------------------------------------------
/*
pub struct AsyncIoEngine { pub struct AsyncIoEngine {
ring: Rio, ring: IoUring,
nr_blocks: u64, nr_blocks: u64,
input: File, input: File,
} }
impl AsyncIoEngine { impl AsyncIoEngine {
pub fn new(path: &Path) -> Result<IoEngine> { pub fn new(path: &Path, queue_len: u32) -> Result<AsyncIoEngine> {
let input = OpenOptions::new() let input = OpenOptions::new()
.read(true) .read(true)
.write(false) .write(false)
.custom_flags(libc::O_DIRECT) .custom_flags(libc::O_DIRECT)
.open(path)?; .open(path)?;
let ring = rio::new()?; Ok(AsyncIoEngine {
ring: IoUring::new(queue_len)?,
Ok(IoEngine {
ring,
nr_blocks: get_nr_blocks(path)?, nr_blocks: get_nr_blocks(path)?,
input, input,
}) })
} }
}
pub fn read(&self, blocks: &mut Vec<Block>) -> Result<()> { impl IoEngine for AsyncIoEngine {
// FIXME: using a bounce buffer as a hack, since b.get_data() will not have fn get_nr_blocks(&self) -> u64 {
// a big enough lifetime. self.nr_blocks
let mut bounce_buffer = vec![0; blocks.len() * BLOCK_SIZE]; }
let mut completions = Vec::new();
for n in 0..blocks.len() { fn read(&mut self, b: &mut Block) -> Result<()> {
let b = &blocks[n]; let fd = types::Target::Fd(self.input.as_raw_fd());
let at = b.loc * BLOCK_SIZE as u64; let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32).offset(b.loc as i64 * BLOCK_SIZE as i64);
let completion = self.ring.read_at(&self.input, &slice, at);
completions.push(completion); unsafe {
let mut queue = self.ring.submission().available();
queue.push(read_e.build().user_data(1))
.ok()
.expect("queue is full");
} }
for c in completions { self.ring.submit_and_wait(1)?;
let n = c.wait()?;
if n != BLOCK_SIZE { let cqes = self.ring.completion().available().collect::<Vec<_>>();
return Err(anyhow!("short read"));
// FIXME: return proper errors
assert_eq!(cqes.len(), 1);
assert_eq!(cqes[0].user_data(), 1);
assert_eq!(cqes[0].result(), BLOCK_SIZE as i32);
Ok(())
}
fn read_many(&mut self, blocks: &mut Vec<Block>) -> Result<()> {
let count = blocks.len();
let fd = types::Target::Fd(self.input.as_raw_fd());
for b in blocks.into_iter() {
let read_e = opcode::Read::new(fd, b.data, BLOCK_SIZE as u32).offset(b.loc as i64 * BLOCK_SIZE as i64);
unsafe {
let mut queue = self.ring.submission().available();
queue.push(read_e.build().user_data(1))
.ok()
.expect("queue is full");
} }
} }
// copy out of the bounce buffer self.ring.submit_and_wait(count)?;
let cqes = self.ring.completion().available().collect::<Vec<_>>();
// FIXME: return proper errors
assert_eq!(cqes.len(), count);
for c in &cqes {
assert_eq!(c.result(), BLOCK_SIZE as i32);
}
Ok(()) Ok(())
} }
} }
*/
//------------------------------------------

View File

@ -7,7 +7,7 @@ use std::sync::{Arc, Mutex};
use std::thread; use std::thread;
use std::time::{Duration, Instant}; use std::time::{Duration, Instant};
use crate::block_manager::{Block, IoEngine, SyncIoEngine, BLOCK_SIZE}; use crate::block_manager::{Block, IoEngine, AsyncIoEngine, SyncIoEngine, BLOCK_SIZE};
use crate::checksum; use crate::checksum;
use crate::thin::superblock::*; use crate::thin::superblock::*;
@ -220,7 +220,8 @@ fn walk_node<E: IoEngine>(
} }
pub fn check(dev: &Path) -> Result<()> { pub fn check(dev: &Path) -> Result<()> {
let mut engine = SyncIoEngine::new(dev)?; //let mut engine = SyncIoEngine::new(dev)?;
let mut engine = AsyncIoEngine::new(dev, 256)?;
let now = Instant::now(); let now = Instant::now();
let sb = read_superblock(&mut engine, SUPERBLOCK_LOCATION)?; let sb = read_superblock(&mut engine, SUPERBLOCK_LOCATION)?;
@ -232,8 +233,7 @@ pub fn check(dev: &Path) -> Result<()> {
walk_node(&mut engine, &mut seen, MappingLevel::Top, &root)?; walk_node(&mut engine, &mut seen, MappingLevel::Top, &root)?;
println!( println!(
"read superblock, mapping root at {}, {} ms", "read mapping tree in {} ms",
sb.mapping_root,
now.elapsed().as_millis() now.elapsed().as_millis()
); );