From c8fec7ec4055d0f4bcbca8e7d80c9ca419f3325c Mon Sep 17 00:00:00 2001 From: Joe Thornber Date: Thu, 14 Apr 2016 08:54:32 +0100 Subject: [PATCH] [cache_writeback] Coded, needs testing --- Makefile.in | 3 + base/unique_handle.h | 67 ++++++++++++++ block-cache/copier.cc | 120 ++++++++++++++++++++++++- block-cache/copier.h | 64 ++++++++++++-- block-cache/io_engine.cc | 175 +++++++++++++++++++++++++++++++++++++ block-cache/io_engine.h | 82 +++++++++++++++++ block-cache/mem_pool.cc | 57 ++++++++++++ block-cache/mem_pool.h | 45 ++++++++++ caching/cache_writeback.cc | 19 +++- 9 files changed, 620 insertions(+), 12 deletions(-) create mode 100644 base/unique_handle.h create mode 100644 block-cache/io_engine.cc create mode 100644 block-cache/io_engine.h create mode 100644 block-cache/mem_pool.cc create mode 100644 block-cache/mem_pool.h diff --git a/Makefile.in b/Makefile.in index 87f2fd7..d2e1e97 100644 --- a/Makefile.in +++ b/Makefile.in @@ -36,6 +36,9 @@ SOURCE=\ base/rolling_hash.cc \ base/xml_utils.cc \ block-cache/block_cache.cc \ + block-cache/copier.cc \ + block-cache/io_engine.cc \ + block-cache/mem_pool.cc \ caching/cache_check.cc \ caching/cache_dump.cc \ caching/cache_metadata_size.cc \ diff --git a/base/unique_handle.h b/base/unique_handle.h new file mode 100644 index 0000000..9c44e1b --- /dev/null +++ b/base/unique_handle.h @@ -0,0 +1,67 @@ +#ifndef BASE_UNIQUE_HANDLE_H +#define BASE_UNIQUE_HANDLE_H + +#include +#include +#include + +//---------------------------------------------------------------- + +namespace base { + template + class unique_handle + { + public: + unique_handle(std::nullptr_t = nullptr) + : id_(TNul) { + } + + unique_handle(T x) + : id_(x) { + } + + explicit operator bool() const { + return id_ != TNul; + } + + operator T&() { + return id_; + } + + operator T() const { + return id_; + } + + T *operator&() { + return &id_; + } + + const T *operator&() const { + return &id_; + } + + friend bool operator == (unique_handle a, unique_handle b) { return a.id_ == b.id_; } + friend bool operator != (unique_handle a, unique_handle b) { return a.id_ != b.id_; } + friend bool operator == (unique_handle a, std::nullptr_t) { return a.id_ == TNul; } + friend bool operator != (unique_handle a, std::nullptr_t) { return a.id_ != TNul; } + friend bool operator == (std::nullptr_t, unique_handle b) { return TNul == b.id_; } + friend bool operator != (std::nullptr_t, unique_handle b) { return TNul != b.id_; } + + private: + T id_; + }; + + //-------------------------------- + + struct fd_deleter { + typedef unique_handle pointer; + void operator()(pointer p) { + ::close(p); + } + }; + typedef std::unique_ptr unique_fd; +} + +//---------------------------------------------------------------- + +#endif diff --git a/block-cache/copier.cc b/block-cache/copier.cc index 60ca93b..c130e14 100644 --- a/block-cache/copier.cc +++ b/block-cache/copier.cc @@ -1,7 +1,123 @@ #include "block-cache/copier.h" -//---------------------------------------------------------------- - +#include +using namespace bcache; +using namespace boost; +using namespace std; + +//---------------------------------------------------------------- + +copier::copier(string const &src, string const &dest, + sector_t block_size, size_t mem) + : pool_(block_size, mem), + block_size_(block_size), + nr_blocks_(mem / block_size), + engine_(nr_blocks_), + src_handle_(engine_.open_file(src, io_engine::READ_ONLY)), + dest_handle_(engine_.open_file(dest, io_engine::READ_WRITE)), + genkey_count_(0) +{ +} + +void +copier::issue(copy_op const &op) +{ + auto data = pool_.alloc(); + if (!data) { + wait_(); + data = pool_.alloc(); + + if (!data) + // Shouldn't get here + throw runtime_error("couldn't allocate buffer"); + } + + copy_job job(op, *data); + job.op.read_complete = job.op.write_complete = false; + unsigned key = genkey(); // used as context for the io_engine + + engine_.issue_io(src_handle_, + io_engine::READ, + to_sector(op.src_b), + to_sector(op.src_e), + *data, + key); + jobs_.insert(make_pair(key, job)); +} + +unsigned +copier::nr_pending() const +{ + return jobs_.size() + complete_.size(); +} + +boost::optional +copier::wait() +{ + while (complete_.empty() && !jobs_.empty()) + wait_(); + + if (complete_.empty()) + return optional(); + + else { + auto op = complete_.front(); + complete_.pop_front(); + return optional(op); + } +} + +void +copier::wait_() +{ + auto p = engine_.wait(); + auto it = jobs_.find(p.second); + if (it == jobs_.end()) + throw runtime_error("Internal error. Lost track of copy job."); + + copy_job j = it->second; + if (!p.first) { + // IO was unsuccessful + jobs_.erase(it); + complete(j); + return; + } + + // IO was successful + if (!j.op.read_complete) { + j.op.read_complete = true; + engine_.issue_io(dest_handle_, + io_engine::WRITE, + to_sector(j.op.dest_b), + to_sector(j.op.dest_b + (j.op.src_e - j.op.src_b)), + j.data, + it->first); + + } else { + j.op.write_complete = true; + jobs_.erase(it); + complete(j); + } +} + +void +copier::complete(copy_job const &j) +{ + pool_.free(j.data); + complete_.push_back(j.op); +} + +sector_t +copier::to_sector(block_address b) const +{ + return b * block_size_; +} + +unsigned +copier::genkey() +{ + return genkey_count_++; +} //---------------------------------------------------------------- diff --git a/block-cache/copier.h b/block-cache/copier.h index 41ed82e..28fdd8a 100644 --- a/block-cache/copier.h +++ b/block-cache/copier.h @@ -1,27 +1,75 @@ #ifndef BLOCK_CACHE_COPIER_H #define BLOCK_CACHE_COPIER_H -#include "block_cache.h" +#include "block-cache/io_engine.h" +#include "block-cache/mem_pool.h" #include +#include +#include //---------------------------------------------------------------- namespace bcache { + using block_address = uint64_t; + + struct copy_op { + copy_op() + : read_complete(false), + write_complete(false) { + } + + block_address src_b, src_e; + block_address dest_b; + + bool read_complete; + bool write_complete; + }; + + class copy_job { + public: + copy_job(copy_op const &op_, void *data_) + : op(op_), data(data_) { + } + + copy_op op; + void *data; + }; + class copier { public: - // block size in sectors copier(std::string const &src, std::string const &dest, - unsigned block_size); - ~copier(); + sector_t block_size, size_t mem); - // Returns the number of sectors copied - unsigned copy(block_address from, block_address to); + sector_t get_block_size() const { + return block_size_; + } - unsigned get_block_size() const; + // Blocks if out of memory. + void issue(copy_op const &op); + + unsigned nr_pending() const; + boost::optional wait(); private: - unsigned block_size_; + void wait_(); + void complete(copy_job const &j); + + sector_t to_sector(block_address b) const; + unsigned genkey(); + + mempool pool_; + sector_t block_size_; + unsigned nr_blocks_; + io_engine engine_; + io_engine::handle src_handle_; + io_engine::handle dest_handle_; + unsigned genkey_count_; + + using job_map = std::map; + using op_list = std::list; + job_map jobs_; + op_list complete_; }; } diff --git a/block-cache/io_engine.cc b/block-cache/io_engine.cc new file mode 100644 index 0000000..5580c8a --- /dev/null +++ b/block-cache/io_engine.cc @@ -0,0 +1,175 @@ +#include "base/container_of.h" +#include "block-cache/io_engine.h" + +#include +#include +#include +#include +#include +#include + +using namespace bcache; +using namespace boost; +using namespace std; + +#define SECTOR_SHIFT 9 + +//---------------------------------------------------------------- + +control_block_set::control_block_set(unsigned nr) + : cbs_(nr) +{ + for (auto i = 0u; i < nr; i++) + free_cbs_.insert(i); +} + +iocb * +control_block_set::alloc(unsigned context) +{ + if (free_cbs_.empty()) + return nullptr; + + auto it = free_cbs_.begin(); + + cblock &cb = cbs_[*it]; + cb.context = context; + free_cbs_.erase(it); + + return &cb.cb; +} + +void +control_block_set::free(iocb *cb) +{ + cblock *b = base::container_of(cb, &cblock::cb); + unsigned index = b - &cbs_[0]; + free_cbs_.insert(index); +} + +unsigned +control_block_set::context(iocb *cb) const +{ + cblock *b = base::container_of(cb, &cblock::cb); + return b->context; +} + +//---------------------------------------------------------------- + +io_engine::io_engine(unsigned max_io) + : aio_context_(0), + cbs_(max_io), + events_(max_io) +{ + int r = io_setup(max_io, &aio_context_); + if (r < 0) + throw runtime_error("io_setup failed"); +} + +io_engine::~io_engine() +{ + io_destroy(aio_context_); +} + +io_engine::handle +io_engine:: open_file(std::string const &path, mode m) +{ + int flags = (m == READ_ONLY) ? O_RDONLY : O_RDWR; + int fd = ::open(path.c_str(), O_DIRECT | flags); + if (fd < 0) { + ostringstream out; + out << "unable to open '" << path << "'"; + throw runtime_error(out.str()); + } + + descriptors_.push_back(base::unique_fd(fd)); + + return static_cast(fd); +} + +void +io_engine::close_file(handle h) +{ + for (auto it = descriptors_.begin(); it != descriptors_.end(); ++it) { + unsigned it_h = it->get(); + if (it_h == h) { + descriptors_.erase(it); + return; + } + } + + ostringstream out; + out << "unknown descriptor (" << h << ")"; + throw runtime_error(out.str()); +} + +bool +io_engine::issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context) +{ + auto cb = cbs_.alloc(context); + if (!cb) + return false; + + memset(cb, 0, sizeof(*cb)); + cb->aio_fildes = static_cast(h); + cb->u.c.buf = data; + cb->u.c.offset = b << SECTOR_SHIFT; + cb->u.c.nbytes = (e - b) << SECTOR_SHIFT; + + cb->aio_lio_opcode = (d == READ) ? IO_CMD_PREAD : IO_CMD_PWRITE; + + int r = io_submit(aio_context_, 1, &cb); + if (r != 1) { + std::ostringstream out; + out << "couldn't issue " + << ((d == READ) ? "READ" : "WRITE") + << " io: io_submit "; + if (r < 0) + out << "failed with " << r; + else + out << "succeeded, but queued no io"; + + throw std::runtime_error(out.str()); + } + + return true; +} + +std::pair +io_engine::wait() +{ + int r; + unsigned i; + + r = io_getevents(aio_context_, 1, events_.size(), &events_[0], NULL); + if (r < 0) { + std::ostringstream out; + out << "io_getevents failed: " << r; + throw std::runtime_error(out.str()); + } + + for (i = 0; i < static_cast(r); i++) { + io_event const &e = events_[i]; + iocb *cb = reinterpret_cast(e.obj); + unsigned context = cbs_.context(cb); + cbs_.free(cb); + + if (e.res == cb->u.c.nbytes) + return make_pair(true, context); + + else { + std::ostringstream out; + out << "io failed" + << ", e.res = " << e.res + << ", e.res2 = " << e.res2 + << ", offset = " << cb->u.c.offset + << ", nbytes = " << cb->u.c.nbytes; + return make_pair(false, context); + } + } + + + // shouldn't get here + return make_pair(false, 0); +} + +//---------------------------------------------------------------- diff --git a/block-cache/io_engine.h b/block-cache/io_engine.h new file mode 100644 index 0000000..07d97a8 --- /dev/null +++ b/block-cache/io_engine.h @@ -0,0 +1,82 @@ +#ifndef BLOCK_CACHE_IO_ENGINE_H +#define BLOCK_CACHE_IO_ENGINE_H + +#include "base/unique_handle.h" + +#include +#include +#include +#include +#include + +//---------------------------------------------------------------- + +namespace bcache { + using sector_t = uint64_t; + + //---------------- + + class control_block_set { + public: + control_block_set(unsigned nr); + + iocb *alloc(unsigned context); + void free(iocb *); + + unsigned context(iocb *) const; + + private: + struct cblock { + unsigned context; + struct iocb cb; + }; + + std::set free_cbs_; + std::vector cbs_; + }; + + //---------------- + + class io_engine { + public: + enum mode { + READ_ONLY, + READ_WRITE + }; + + enum dir { + READ, + WRITE + }; + + // max_io is the maximum nr of concurrent ios expected + io_engine(unsigned max_io); + ~io_engine(); + + using handle = unsigned; + + handle open_file(std::string const &path, mode m); + void close_file(handle h); + + // returns false if there are insufficient resources to + // queue the IO + bool issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context); + + // returns (success, context) + std::pair wait(); + + private: + std::list descriptors_; + + io_context_t aio_context_; + control_block_set cbs_; + std::vector events_; + + io_engine(io_engine const &) = delete; + io_engine &operator =(io_engine const &) = delete; + }; +} + +//---------------------------------------------------------------- + +#endif diff --git a/block-cache/mem_pool.cc b/block-cache/mem_pool.cc new file mode 100644 index 0000000..f655fd0 --- /dev/null +++ b/block-cache/mem_pool.cc @@ -0,0 +1,57 @@ +#include "block-cache/mem_pool.h" + +#include + +using namespace bcache; +using namespace boost; +using namespace mempool_detail; + +#define PAGE_SIZE 4096 + +//---------------------------------------------------------------- + +mempool::mempool(size_t block_size, size_t total_mem) +{ + mem_ = alloc_aligned(total_mem, PAGE_SIZE); + + unsigned nr_blocks = total_mem / block_size; + for (auto i = 0u; i < nr_blocks; i++) + free(static_cast(mem_) + (block_size * i)); +} + +mempool::~mempool() +{ + ::free(mem_); +} + +boost::optional +mempool::alloc() +{ + if (free_.empty()) + return optional(); + + mempool_detail::alloc_block &b = free_.front(); + free_.pop_front(); + return optional(reinterpret_cast(&b)); +} + +void +mempool::free(void *data) +{ + mempool_detail::alloc_block *b = reinterpret_cast(data); + free_.push_front(*b); +} + +void * +mempool::alloc_aligned(size_t len, size_t alignment) +{ + void *result = NULL; + int r = posix_memalign(&result, alignment, len); + if (r) + return NULL; + + return result; +} + +//---------------------------------------------------------------- + diff --git a/block-cache/mem_pool.h b/block-cache/mem_pool.h new file mode 100644 index 0000000..61c1812 --- /dev/null +++ b/block-cache/mem_pool.h @@ -0,0 +1,45 @@ +#ifndef BLOCK_CACHE_MEM_POOL_H +#define BLOCK_CACHE_MEM_POOL_H + +#include +#include +#include + +namespace bi = boost::intrusive; + +//---------------------------------------------------------------- + +namespace bcache { + // FIXME: move to base? + + namespace mempool_detail { + struct alloc_block : public bi::list_base_hook<> { + }; + }; + + class mempool { + public: + mempool(size_t block_size, size_t total_mem); + ~mempool(); + + boost::optional alloc(); + void free(void *data); + + private: + static void *alloc_aligned(size_t len, size_t alignment); + + using block_list = bi::list; + + void *mem_; + block_list free_; + + //---------------- + + mempool(mempool const &) = delete; + mempool &operator =(mempool const &) = delete; + }; +} + +//---------------------------------------------------------------- + +#endif diff --git a/caching/cache_writeback.cc b/caching/cache_writeback.cc index 042ae74..d81655a 100644 --- a/caching/cache_writeback.cc +++ b/caching/cache_writeback.cc @@ -19,8 +19,13 @@ using namespace std; namespace { struct flags { + flags() + : cache_size(1024 * 1024 * 128) { + } + using maybe_string = boost::optional; + size_t cache_size; maybe_string metadata_dev; maybe_string origin_dev; maybe_string fast_dev; @@ -41,13 +46,22 @@ namespace { if (only_dirty_ && !(m.flags_ & M_DIRTY)) return; - auto sectors_copied = copier_.copy(cblock, m.oblock_); + copy_op cop; + cop.src_b = cblock; + cop.src_e = cblock + 1ull; + cop.dest_b = m.oblock_; + + // blocks + copier_.issue(cop); stats_.blocks_issued++; + +#if 0 if (sectors_copied < block_size_) { stats_.blocks_failed++; stats_.sectors_failed += block_size_ - sectors_copied; } +#endif } struct copy_stats { @@ -107,7 +121,8 @@ namespace { block_manager<>::ptr bm = open_bm(*f.metadata_dev, block_manager<>::READ_ONLY); metadata md(bm, metadata::OPEN); - copier c(*f.fast_dev, *f.origin_dev, md.sb_.data_block_size); + copier c(*f.fast_dev, *f.origin_dev, + md.sb_.data_block_size, f.cache_size); copy_visitor cv(c, clean_shutdown(md)); ignore_damage_visitor dv; walk_mapping_array(*md.mappings_, cv, dv);