#include "base/container_of.h" #include "block-cache/io_engine.h" #include #include #include #include #include #include using namespace bcache; using namespace boost; using namespace std; //---------------------------------------------------------------- control_block_set::control_block_set(unsigned nr) : cbs_(nr) { for (auto i = 0u; i < nr; i++) free_cbs_.insert(i); } iocb * control_block_set::alloc(unsigned context) { if (free_cbs_.empty()) return nullptr; auto it = free_cbs_.begin(); cblock &cb = cbs_[*it]; cb.context = context; free_cbs_.erase(it); return &cb.cb; } void control_block_set::free(iocb *cb) { cblock *b = base::container_of(cb, &cblock::cb); unsigned index = b - &cbs_[0]; free_cbs_.insert(index); } unsigned control_block_set::context(iocb *cb) const { cblock *b = base::container_of(cb, &cblock::cb); return b->context; } //---------------------------------------------------------------- aio_engine::aio_engine(unsigned max_io) : aio_context_(0), cbs_(max_io) { int r = io_setup(max_io, &aio_context_); if (r < 0) throw runtime_error("io_setup failed"); } aio_engine::~aio_engine() { io_destroy(aio_context_); } aio_engine::handle aio_engine::open_file(std::string const &path, mode m, sharing s) { int flags = (m == M_READ_ONLY) ? O_RDONLY : O_RDWR; if (s == EXCLUSIVE) flags |= O_EXCL; int fd = ::open(path.c_str(), O_DIRECT | flags); if (fd < 0) { ostringstream out; out << "unable to open '" << path << "'"; throw runtime_error(out.str()); } descriptors_.push_back(base::unique_fd(fd)); return static_cast(fd); } void aio_engine::close_file(handle h) { for (auto it = descriptors_.begin(); it != descriptors_.end(); ++it) { unsigned it_h = it->get(); if (it_h == h) { descriptors_.erase(it); return; } } ostringstream out; out << "unknown descriptor (" << h << ")"; throw runtime_error(out.str()); } bool aio_engine::issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context) { if (reinterpret_cast(data) & (PAGE_SIZE - 1)) throw runtime_error("Data passed to issue_io must be page aligned\n"); iocb *cb; cb = cbs_.alloc(context); if (!cb) return false; memset(cb, 0, sizeof(*cb)); cb->aio_fildes = static_cast(h); cb->u.c.buf = data; cb->u.c.offset = b << SECTOR_SHIFT; cb->u.c.nbytes = (e - b) << SECTOR_SHIFT; cb->aio_lio_opcode = (d == D_READ) ? IO_CMD_PREAD : IO_CMD_PWRITE; int r = io_submit(aio_context_, 1, &cb); return r == 1; } optional aio_engine::wait() { return wait_(NULL); } optional aio_engine::wait(unsigned µsec) { timespec start = micro_to_ts(microsec); timespec stop = start; auto r = wait_(&stop); microsec = ts_to_micro(stop) - microsec; return r; } boost::optional aio_engine::wait_(timespec *ts) { int r; struct io_event event; memset(&event, 0, sizeof(event)); r = io_getevents(aio_context_, 1, 1, &event, ts); if (r < 0) { std::ostringstream out; out << "io_getevents failed: " << r; throw std::runtime_error(out.str()); } if (r == 0) { return optional(); } iocb *cb = reinterpret_cast(event.obj); unsigned context = cbs_.context(cb); if (event.res == cb->u.c.nbytes) { cbs_.free(cb); return optional(make_pair(true, context)); } else if (static_cast(event.res) < 0) { cbs_.free(cb); return optional(make_pair(false, context)); } else { cbs_.free(cb); return optional(make_pair(false, context)); } // shouldn't get here return optional(make_pair(false, 0)); } struct timespec aio_engine::micro_to_ts(unsigned micro) { timespec ts; ts.tv_sec = micro / 1000000u; ts.tv_nsec = (micro % 1000000) * 1000; return ts; } unsigned aio_engine::ts_to_micro(timespec const &ts) { unsigned micro = ts.tv_sec * 1000000; micro += ts.tv_nsec / 1000; return micro; } //----------------------------------------------------------------