[cache_writeback] Coded, needs testing

This commit is contained in:
Joe Thornber 2016-04-14 08:54:32 +01:00
parent 4573ebb218
commit c8fec7ec40
9 changed files with 620 additions and 12 deletions

View File

@ -36,6 +36,9 @@ SOURCE=\
base/rolling_hash.cc \
base/xml_utils.cc \
block-cache/block_cache.cc \
block-cache/copier.cc \
block-cache/io_engine.cc \
block-cache/mem_pool.cc \
caching/cache_check.cc \
caching/cache_dump.cc \
caching/cache_metadata_size.cc \

67
base/unique_handle.h Normal file
View File

@ -0,0 +1,67 @@
#ifndef BASE_UNIQUE_HANDLE_H
#define BASE_UNIQUE_HANDLE_H
#include <list>
#include <memory>
#include <unistd.h>
//----------------------------------------------------------------
namespace base {
template <typename T, T TNul = T()>
class unique_handle
{
public:
unique_handle(std::nullptr_t = nullptr)
: id_(TNul) {
}
unique_handle(T x)
: id_(x) {
}
explicit operator bool() const {
return id_ != TNul;
}
operator T&() {
return id_;
}
operator T() const {
return id_;
}
T *operator&() {
return &id_;
}
const T *operator&() const {
return &id_;
}
friend bool operator == (unique_handle a, unique_handle b) { return a.id_ == b.id_; }
friend bool operator != (unique_handle a, unique_handle b) { return a.id_ != b.id_; }
friend bool operator == (unique_handle a, std::nullptr_t) { return a.id_ == TNul; }
friend bool operator != (unique_handle a, std::nullptr_t) { return a.id_ != TNul; }
friend bool operator == (std::nullptr_t, unique_handle b) { return TNul == b.id_; }
friend bool operator != (std::nullptr_t, unique_handle b) { return TNul != b.id_; }
private:
T id_;
};
//--------------------------------
struct fd_deleter {
typedef unique_handle<int, -1> pointer;
void operator()(pointer p) {
::close(p);
}
};
typedef std::unique_ptr<int, fd_deleter> unique_fd;
}
//----------------------------------------------------------------
#endif

View File

@ -1,7 +1,123 @@
#include "block-cache/copier.h"
//----------------------------------------------------------------
#include <stdexcept>
using namespace bcache;
using namespace boost;
using namespace std;
//----------------------------------------------------------------
copier::copier(string const &src, string const &dest,
sector_t block_size, size_t mem)
: pool_(block_size, mem),
block_size_(block_size),
nr_blocks_(mem / block_size),
engine_(nr_blocks_),
src_handle_(engine_.open_file(src, io_engine::READ_ONLY)),
dest_handle_(engine_.open_file(dest, io_engine::READ_WRITE)),
genkey_count_(0)
{
}
void
copier::issue(copy_op const &op)
{
auto data = pool_.alloc();
if (!data) {
wait_();
data = pool_.alloc();
if (!data)
// Shouldn't get here
throw runtime_error("couldn't allocate buffer");
}
copy_job job(op, *data);
job.op.read_complete = job.op.write_complete = false;
unsigned key = genkey(); // used as context for the io_engine
engine_.issue_io(src_handle_,
io_engine::READ,
to_sector(op.src_b),
to_sector(op.src_e),
*data,
key);
jobs_.insert(make_pair(key, job));
}
unsigned
copier::nr_pending() const
{
return jobs_.size() + complete_.size();
}
boost::optional<copy_op>
copier::wait()
{
while (complete_.empty() && !jobs_.empty())
wait_();
if (complete_.empty())
return optional<copy_op>();
else {
auto op = complete_.front();
complete_.pop_front();
return optional<copy_op>(op);
}
}
void
copier::wait_()
{
auto p = engine_.wait();
auto it = jobs_.find(p.second);
if (it == jobs_.end())
throw runtime_error("Internal error. Lost track of copy job.");
copy_job j = it->second;
if (!p.first) {
// IO was unsuccessful
jobs_.erase(it);
complete(j);
return;
}
// IO was successful
if (!j.op.read_complete) {
j.op.read_complete = true;
engine_.issue_io(dest_handle_,
io_engine::WRITE,
to_sector(j.op.dest_b),
to_sector(j.op.dest_b + (j.op.src_e - j.op.src_b)),
j.data,
it->first);
} else {
j.op.write_complete = true;
jobs_.erase(it);
complete(j);
}
}
void
copier::complete(copy_job const &j)
{
pool_.free(j.data);
complete_.push_back(j.op);
}
sector_t
copier::to_sector(block_address b) const
{
return b * block_size_;
}
unsigned
copier::genkey()
{
return genkey_count_++;
}
//----------------------------------------------------------------

View File

@ -1,27 +1,75 @@
#ifndef BLOCK_CACHE_COPIER_H
#define BLOCK_CACHE_COPIER_H
#include "block_cache.h"
#include "block-cache/io_engine.h"
#include "block-cache/mem_pool.h"
#include <string>
#include <list>
#include <map>
//----------------------------------------------------------------
namespace bcache {
using block_address = uint64_t;
struct copy_op {
copy_op()
: read_complete(false),
write_complete(false) {
}
block_address src_b, src_e;
block_address dest_b;
bool read_complete;
bool write_complete;
};
class copy_job {
public:
copy_job(copy_op const &op_, void *data_)
: op(op_), data(data_) {
}
copy_op op;
void *data;
};
class copier {
public:
// block size in sectors
copier(std::string const &src, std::string const &dest,
unsigned block_size);
~copier();
sector_t block_size, size_t mem);
// Returns the number of sectors copied
unsigned copy(block_address from, block_address to);
sector_t get_block_size() const {
return block_size_;
}
unsigned get_block_size() const;
// Blocks if out of memory.
void issue(copy_op const &op);
unsigned nr_pending() const;
boost::optional<copy_op> wait();
private:
unsigned block_size_;
void wait_();
void complete(copy_job const &j);
sector_t to_sector(block_address b) const;
unsigned genkey();
mempool pool_;
sector_t block_size_;
unsigned nr_blocks_;
io_engine engine_;
io_engine::handle src_handle_;
io_engine::handle dest_handle_;
unsigned genkey_count_;
using job_map = std::map<unsigned, copy_job>;
using op_list = std::list<copy_op>;
job_map jobs_;
op_list complete_;
};
}

175
block-cache/io_engine.cc Normal file
View File

@ -0,0 +1,175 @@
#include "base/container_of.h"
#include "block-cache/io_engine.h"
#include <errno.h>
#include <fcntl.h>
#include <sstream>
#include <stdexcept>
#include <sys/stat.h>
#include <sys/types.h>
using namespace bcache;
using namespace boost;
using namespace std;
#define SECTOR_SHIFT 9
//----------------------------------------------------------------
control_block_set::control_block_set(unsigned nr)
: cbs_(nr)
{
for (auto i = 0u; i < nr; i++)
free_cbs_.insert(i);
}
iocb *
control_block_set::alloc(unsigned context)
{
if (free_cbs_.empty())
return nullptr;
auto it = free_cbs_.begin();
cblock &cb = cbs_[*it];
cb.context = context;
free_cbs_.erase(it);
return &cb.cb;
}
void
control_block_set::free(iocb *cb)
{
cblock *b = base::container_of(cb, &cblock::cb);
unsigned index = b - &cbs_[0];
free_cbs_.insert(index);
}
unsigned
control_block_set::context(iocb *cb) const
{
cblock *b = base::container_of(cb, &cblock::cb);
return b->context;
}
//----------------------------------------------------------------
io_engine::io_engine(unsigned max_io)
: aio_context_(0),
cbs_(max_io),
events_(max_io)
{
int r = io_setup(max_io, &aio_context_);
if (r < 0)
throw runtime_error("io_setup failed");
}
io_engine::~io_engine()
{
io_destroy(aio_context_);
}
io_engine::handle
io_engine:: open_file(std::string const &path, mode m)
{
int flags = (m == READ_ONLY) ? O_RDONLY : O_RDWR;
int fd = ::open(path.c_str(), O_DIRECT | flags);
if (fd < 0) {
ostringstream out;
out << "unable to open '" << path << "'";
throw runtime_error(out.str());
}
descriptors_.push_back(base::unique_fd(fd));
return static_cast<handle>(fd);
}
void
io_engine::close_file(handle h)
{
for (auto it = descriptors_.begin(); it != descriptors_.end(); ++it) {
unsigned it_h = it->get();
if (it_h == h) {
descriptors_.erase(it);
return;
}
}
ostringstream out;
out << "unknown descriptor (" << h << ")";
throw runtime_error(out.str());
}
bool
io_engine::issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context)
{
auto cb = cbs_.alloc(context);
if (!cb)
return false;
memset(cb, 0, sizeof(*cb));
cb->aio_fildes = static_cast<int>(h);
cb->u.c.buf = data;
cb->u.c.offset = b << SECTOR_SHIFT;
cb->u.c.nbytes = (e - b) << SECTOR_SHIFT;
cb->aio_lio_opcode = (d == READ) ? IO_CMD_PREAD : IO_CMD_PWRITE;
int r = io_submit(aio_context_, 1, &cb);
if (r != 1) {
std::ostringstream out;
out << "couldn't issue "
<< ((d == READ) ? "READ" : "WRITE")
<< " io: io_submit ";
if (r < 0)
out << "failed with " << r;
else
out << "succeeded, but queued no io";
throw std::runtime_error(out.str());
}
return true;
}
std::pair<bool, io_engine::handle>
io_engine::wait()
{
int r;
unsigned i;
r = io_getevents(aio_context_, 1, events_.size(), &events_[0], NULL);
if (r < 0) {
std::ostringstream out;
out << "io_getevents failed: " << r;
throw std::runtime_error(out.str());
}
for (i = 0; i < static_cast<unsigned>(r); i++) {
io_event const &e = events_[i];
iocb *cb = reinterpret_cast<iocb *>(e.obj);
unsigned context = cbs_.context(cb);
cbs_.free(cb);
if (e.res == cb->u.c.nbytes)
return make_pair(true, context);
else {
std::ostringstream out;
out << "io failed"
<< ", e.res = " << e.res
<< ", e.res2 = " << e.res2
<< ", offset = " << cb->u.c.offset
<< ", nbytes = " << cb->u.c.nbytes;
return make_pair(false, context);
}
}
// shouldn't get here
return make_pair(false, 0);
}
//----------------------------------------------------------------

82
block-cache/io_engine.h Normal file
View File

@ -0,0 +1,82 @@
#ifndef BLOCK_CACHE_IO_ENGINE_H
#define BLOCK_CACHE_IO_ENGINE_H
#include "base/unique_handle.h"
#include <boost/optional.hpp>
#include <ctype.h>
#include <libaio.h>
#include <set>
#include <string>
//----------------------------------------------------------------
namespace bcache {
using sector_t = uint64_t;
//----------------
class control_block_set {
public:
control_block_set(unsigned nr);
iocb *alloc(unsigned context);
void free(iocb *);
unsigned context(iocb *) const;
private:
struct cblock {
unsigned context;
struct iocb cb;
};
std::set<unsigned> free_cbs_;
std::vector<cblock> cbs_;
};
//----------------
class io_engine {
public:
enum mode {
READ_ONLY,
READ_WRITE
};
enum dir {
READ,
WRITE
};
// max_io is the maximum nr of concurrent ios expected
io_engine(unsigned max_io);
~io_engine();
using handle = unsigned;
handle open_file(std::string const &path, mode m);
void close_file(handle h);
// returns false if there are insufficient resources to
// queue the IO
bool issue_io(handle h, dir d, sector_t b, sector_t e, void *data, unsigned context);
// returns (success, context)
std::pair<bool, unsigned> wait();
private:
std::list<base::unique_fd> descriptors_;
io_context_t aio_context_;
control_block_set cbs_;
std::vector<io_event> events_;
io_engine(io_engine const &) = delete;
io_engine &operator =(io_engine const &) = delete;
};
}
//----------------------------------------------------------------
#endif

57
block-cache/mem_pool.cc Normal file
View File

@ -0,0 +1,57 @@
#include "block-cache/mem_pool.h"
#include <stdlib.h>
using namespace bcache;
using namespace boost;
using namespace mempool_detail;
#define PAGE_SIZE 4096
//----------------------------------------------------------------
mempool::mempool(size_t block_size, size_t total_mem)
{
mem_ = alloc_aligned(total_mem, PAGE_SIZE);
unsigned nr_blocks = total_mem / block_size;
for (auto i = 0u; i < nr_blocks; i++)
free(static_cast<unsigned char *>(mem_) + (block_size * i));
}
mempool::~mempool()
{
::free(mem_);
}
boost::optional<void *>
mempool::alloc()
{
if (free_.empty())
return optional<void *>();
mempool_detail::alloc_block &b = free_.front();
free_.pop_front();
return optional<void *>(reinterpret_cast<void *>(&b));
}
void
mempool::free(void *data)
{
mempool_detail::alloc_block *b = reinterpret_cast<mempool_detail::alloc_block *>(data);
free_.push_front(*b);
}
void *
mempool::alloc_aligned(size_t len, size_t alignment)
{
void *result = NULL;
int r = posix_memalign(&result, alignment, len);
if (r)
return NULL;
return result;
}
//----------------------------------------------------------------

45
block-cache/mem_pool.h Normal file
View File

@ -0,0 +1,45 @@
#ifndef BLOCK_CACHE_MEM_POOL_H
#define BLOCK_CACHE_MEM_POOL_H
#include <boost/intrusive/list.hpp>
#include <boost/optional.hpp>
#include <list>
namespace bi = boost::intrusive;
//----------------------------------------------------------------
namespace bcache {
// FIXME: move to base?
namespace mempool_detail {
struct alloc_block : public bi::list_base_hook<> {
};
};
class mempool {
public:
mempool(size_t block_size, size_t total_mem);
~mempool();
boost::optional<void *> alloc();
void free(void *data);
private:
static void *alloc_aligned(size_t len, size_t alignment);
using block_list = bi::list<mempool_detail::alloc_block>;
void *mem_;
block_list free_;
//----------------
mempool(mempool const &) = delete;
mempool &operator =(mempool const &) = delete;
};
}
//----------------------------------------------------------------
#endif

View File

@ -19,8 +19,13 @@ using namespace std;
namespace {
struct flags {
flags()
: cache_size(1024 * 1024 * 128) {
}
using maybe_string = boost::optional<string>;
size_t cache_size;
maybe_string metadata_dev;
maybe_string origin_dev;
maybe_string fast_dev;
@ -41,13 +46,22 @@ namespace {
if (only_dirty_ && !(m.flags_ & M_DIRTY))
return;
auto sectors_copied = copier_.copy(cblock, m.oblock_);
copy_op cop;
cop.src_b = cblock;
cop.src_e = cblock + 1ull;
cop.dest_b = m.oblock_;
// blocks
copier_.issue(cop);
stats_.blocks_issued++;
#if 0
if (sectors_copied < block_size_) {
stats_.blocks_failed++;
stats_.sectors_failed += block_size_ - sectors_copied;
}
#endif
}
struct copy_stats {
@ -107,7 +121,8 @@ namespace {
block_manager<>::ptr bm = open_bm(*f.metadata_dev, block_manager<>::READ_ONLY);
metadata md(bm, metadata::OPEN);
copier c(*f.fast_dev, *f.origin_dev, md.sb_.data_block_size);
copier c(*f.fast_dev, *f.origin_dev,
md.sb_.data_block_size, f.cache_size);
copy_visitor cv(c, clean_shutdown(md));
ignore_damage_visitor dv;
walk_mapping_array(*md.mappings_, cv, dv);