some streamlining of the block interface

This commit is contained in:
Joe Thornber 2011-07-14 14:05:07 +01:00
parent e77ed189bb
commit a0cae447f6
3 changed files with 260 additions and 77 deletions

74
block.h
View File

@ -9,10 +9,27 @@
#include <boost/optional.hpp> #include <boost/optional.hpp>
#include <boost/shared_ptr.hpp> #include <boost/shared_ptr.hpp>
#include <string.h>
//---------------------------------------------------------------- //----------------------------------------------------------------
namespace persistent_data { namespace persistent_data {
class count_adjuster {
public:
count_adjuster(unsigned &c)
: c_(c) {
c_++;
}
~count_adjuster() {
c_--;
}
private:
unsigned &c_;
};
typedef uint64_t block_address; typedef uint64_t block_address;
template <uint32_t BlockSize> template <uint32_t BlockSize>
@ -43,16 +60,25 @@ namespace persistent_data {
typedef boost::optional<typename validator::ptr> maybe_validator; typedef boost::optional<typename validator::ptr> maybe_validator;
block(block_address location, block(block_address location,
const_buffer &data,
unsigned &count,
unsigned &type_count,
bool is_superblock = false,
maybe_validator v = maybe_validator()) maybe_validator v = maybe_validator())
: location_(location), : location_(location),
adjuster_(count),
type_adjuster_(type_count),
validator_(v), validator_(v),
initialised_(false) { is_superblock_(is_superblock) {
::memcpy(data_, data, sizeof(data));
} }
block_address location_; block_address location_;
count_adjuster adjuster_;
count_adjuster type_adjuster_;
buffer data_; buffer data_;
maybe_validator validator_; maybe_validator validator_;
bool initialised_; bool is_superblock_;
}; };
class read_ref { class read_ref {
@ -64,6 +90,7 @@ namespace persistent_data {
const_buffer &data() const; const_buffer &data() const;
protected: protected:
friend class block_manager;
typename block::ptr block_; typename block::ptr block_;
}; };
@ -93,33 +120,56 @@ namespace persistent_data {
// Validator variants // Validator variants
read_ref read_ref
read_lock(block_address location, read_lock(block_address location,
typename validator::ptr const &v) const; typename validator::ptr v) const;
boost::optional<read_ref> boost::optional<read_ref>
read_try_lock(block_address location, read_try_lock(block_address location,
typename validator::ptr const &v) const; typename validator::ptr v) const;
write_ref write_ref
write_lock(block_address location, write_lock(block_address location,
typename validator::ptr const &v); typename validator::ptr v);
write_ref write_ref
write_lock_zero(block_address location, write_lock_zero(block_address location,
typename validator::ptr const &v); typename validator::ptr v);
// Use this to commit changes // The super block is the one that should be written last.
void flush(write_ref super_block); // Unlocking this block triggers the following events:
//
// i) synchronous write of all dirty blocks _except_ the
// superblock.
//
// ii) synchronous write of superblock
//
// If any locks are held at the time of the superblock
// being unlocked then an exception will be thrown.
write_ref superblock(block_address b);
write_ref superblock_zero(block_address b);
write_ref superblock(block_address b,
typename validator::ptr v);
write_ref superblock_zero(block_address b,
typename validator::ptr v);
// If you aren't using a superblock, then this flush method
// will write all dirty data. Throws if any locks are
// held.
void flush();
private: private:
void check(block_address b) const; void check(block_address b) const;
void read_block(block &b) const; void read_buffer(block_address location, buffer &buf) const;
void write_block(block const &b); void write_buffer(block_address location, const_buffer &buf);
void zero_block(block &b); void zero_buffer(buffer &buf) const;
void write_and_release(block *b); void read_release(block *b) const;
void write_release(block *b);
int fd_; int fd_;
block_address nr_blocks_; block_address nr_blocks_;
mutable unsigned lock_count_;
mutable unsigned superblock_count_;
mutable unsigned ordinary_count_;
}; };
} }

168
block.tcc
View File

@ -52,7 +52,10 @@ block_manager<BlockSize>::write_ref::data()
template <uint32_t BlockSize> template <uint32_t BlockSize>
block_manager<BlockSize>::block_manager(std::string const &path, block_address nr_blocks) block_manager<BlockSize>::block_manager(std::string const &path, block_address nr_blocks)
: nr_blocks_(nr_blocks) : nr_blocks_(nr_blocks),
lock_count_(0),
superblock_count_(0),
ordinary_count_(0)
{ {
fd_ = ::open(path.c_str(), O_RDWR | O_CREAT, 0666); fd_ = ::open(path.c_str(), O_RDWR | O_CREAT, 0666);
if (fd_ < 0) if (fd_ < 0)
@ -71,9 +74,12 @@ block_manager<BlockSize>::read_lock(block_address location) const
{ {
check(location); check(location);
typename block::ptr b(new block(location)); buffer buf;
read_block(*b); read_buffer(location, buf);
return read_ref(b);
return read_ref(
typename block::ptr(
new block(location, buf, lock_count_, ordinary_count_)));
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
@ -89,9 +95,12 @@ block_manager<BlockSize>::write_lock(block_address location)
{ {
check(location); check(location);
typename block::ptr b(new block(location), bind(&block_manager::write_and_release, this, _1)); buffer buf;
read_block(*b); read_buffer(location, buf);
return write_ref(b); return write_ref(
typename block::ptr(
new block(location, buf, lock_count_, ordinary_count_),
bind(&block_manager::write_release, this, _1)));
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
@ -100,27 +109,30 @@ block_manager<BlockSize>::write_lock_zero(block_address location)
{ {
check(location); check(location);
typename block::ptr b(new block(location), bind(&block_manager<BlockSize>::write_and_release, this, _1)); buffer buf;
zero_block(*b); zero_buffer(buf);
typename block::ptr b(new block(location, buf, lock_count_, ordinary_count_),
bind(&block_manager<BlockSize>::write_release, this, _1));
return write_ref(b); return write_ref(b);
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
typename block_manager<BlockSize>::read_ref typename block_manager<BlockSize>::read_ref
block_manager<BlockSize>::read_lock(block_address location, block_manager<BlockSize>::read_lock(block_address location,
typename block_manager<BlockSize>::validator::ptr const &v) const typename block_manager<BlockSize>::validator::ptr v) const
{ {
check(location); check(location);
typename block::ptr b(new block(location, v)); buffer buf;
read_block(*b); read_buffer(location, buf);
typename block::ptr b(new block(location, buf, lock_count_, ordinary_count_, false, v));
return read_ref(b); return read_ref(b);
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
optional<typename block_manager<BlockSize>::read_ref> optional<typename block_manager<BlockSize>::read_ref>
block_manager<BlockSize>::read_try_lock(block_address location, block_manager<BlockSize>::read_try_lock(block_address location,
typename block_manager<BlockSize>::validator::ptr const &v) const typename block_manager<BlockSize>::validator::ptr v) const
{ {
return read_lock(location, v); return read_lock(location, v);
} }
@ -128,51 +140,118 @@ block_manager<BlockSize>::read_try_lock(block_address location,
template <uint32_t BlockSize> template <uint32_t BlockSize>
typename block_manager<BlockSize>::write_ref typename block_manager<BlockSize>::write_ref
block_manager<BlockSize>::write_lock(block_address location, block_manager<BlockSize>::write_lock(block_address location,
typename block_manager<BlockSize>::validator::ptr const &v) typename block_manager<BlockSize>::validator::ptr v)
{ {
check(location); check(location);
typename block::ptr b(new block(location, v), buffer buf;
bind(&block_manager::write_and_release, this, _1)); read_buffer(location, buf);
read_block(*b); typename block::ptr b(new block(location, buf, lock_count_, ordinary_count_, false, v),
bind(&block_manager::write_release, this, _1));
return write_ref(b); return write_ref(b);
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
typename block_manager<BlockSize>::write_ref typename block_manager<BlockSize>::write_ref
block_manager<BlockSize>::write_lock_zero(block_address location, block_manager<BlockSize>::write_lock_zero(block_address location,
typename block_manager<BlockSize>::validator::ptr const &v) typename block_manager<BlockSize>::validator::ptr v)
{ {
check(location); check(location);
typename block::ptr b(new block(location, v), buffer buf;
bind(&block_manager::write_and_release, this, _1)); zero_buffer(buf);
zero_block(*b); typename block::ptr b(new block(location, buf, lock_count_, ordinary_count_, false, v),
bind(&block_manager::write_release, this, _1));
return write_ref(b);
}
template <uint32_t BlockSize>
typename block_manager<BlockSize>::write_ref
block_manager<BlockSize>::superblock(block_address location)
{
check(location);
if (superblock_count_ > 0)
throw runtime_error("already have superblock");
buffer buf;
read_buffer(location, buf);
typename block::ptr b(new block(location, buf, lock_count_, superblock_count_, true),
bind(&block_manager::write_release, this, _1));
return write_ref(b);
}
template <uint32_t BlockSize>
typename block_manager<BlockSize>::write_ref
block_manager<BlockSize>::superblock_zero(block_address location)
{
check(location);
if (superblock_count_ > 0)
throw runtime_error("already have superblock");
buffer buf;
zero_buffer(buf);
typename block::ptr b(new block(location, buf, lock_count_, superblock_count_, true),
bind(&block_manager::write_release, this, _1));
return write_ref(b);
}
template <uint32_t BlockSize>
typename block_manager<BlockSize>::write_ref
block_manager<BlockSize>::superblock(block_address location,
typename block_manager<BlockSize>::validator::ptr v)
{
if (superblock_count_ > 0)
throw runtime_error("already have superblock");
check(location);
buffer buf;
read_buffer(location, buf);
typename block::ptr b(new block(location, buf, lock_count_, superblock_count_, true, v),
bind(&block_manager::write_release, this, _1));
return write_ref(b);
}
template <uint32_t BlockSize>
typename block_manager<BlockSize>::write_ref
block_manager<BlockSize>::superblock_zero(block_address location,
typename block_manager<BlockSize>::validator::ptr v)
{
if (superblock_count_ > 0)
throw runtime_error("already have superblock");
check(location);
buffer buf;
zero_buffer(buf);
typename block::ptr b(new block(location, buf, lock_count_, superblock_count_, true, v),
bind(&block_manager::write_release, this, _1));
return write_ref(b); return write_ref(b);
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
void void
block_manager<BlockSize>::flush(block_manager<BlockSize>::write_ref super_block) block_manager<BlockSize>::flush()
{ {
// FIXME: the caller still holds the write_ref, so the superblock if (lock_count_ > 0)
// will get written twice throw runtime_error("asked to flush while locks are still held");
write_block(super_block);
::fsync(fd_); ::fsync(fd_);
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
void void
block_manager<BlockSize>::read_block(block &b) const block_manager<BlockSize>::read_buffer(block_address b, block_manager<BlockSize>::buffer &buffer) const
{ {
off_t r; off_t r;
r = ::lseek(fd_, BlockSize * b.location_, SEEK_SET); r = ::lseek(fd_, BlockSize * b, SEEK_SET);
if (r == (off_t) -1) if (r == (off_t) -1)
throw std::runtime_error("lseek failed"); throw std::runtime_error("lseek failed");
ssize_t n; ssize_t n;
size_t remaining = BlockSize; size_t remaining = BlockSize;
unsigned char *buf = b.data_; unsigned char *buf = buffer;
do { do {
n = ::read(fd_, buf, remaining); n = ::read(fd_, buf, remaining);
if (n > 0) { if (n > 0) {
@ -183,22 +262,20 @@ block_manager<BlockSize>::read_block(block &b) const
if (n < 0) if (n < 0)
throw std::runtime_error("read failed"); throw std::runtime_error("read failed");
b.initialised_ = true;
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
void void
block_manager<BlockSize>::write_block(block const &b) block_manager<BlockSize>::write_buffer(block_address b, block_manager<BlockSize>::const_buffer &buffer)
{ {
off_t r; off_t r;
r = ::lseek(fd_, BlockSize * b.location_, SEEK_SET); r = ::lseek(fd_, BlockSize * b, SEEK_SET);
if (r == (off_t) -1) if (r == (off_t) -1)
throw std::runtime_error("lseek failed"); throw std::runtime_error("lseek failed");
ssize_t n; ssize_t n;
size_t remaining = BlockSize; size_t remaining = BlockSize;
unsigned char const *buf = b.data_; unsigned char const *buf = buffer;
do { do {
n = ::write(fd_, buf, remaining); n = ::write(fd_, buf, remaining);
if (n > 0) { if (n > 0) {
@ -213,23 +290,32 @@ block_manager<BlockSize>::write_block(block const &b)
template <uint32_t BlockSize> template <uint32_t BlockSize>
void void
block_manager<BlockSize>::zero_block(block &b) block_manager<BlockSize>::zero_buffer(block_manager<BlockSize>::buffer &buffer) const
{ {
memset(b.data_, 0, BlockSize); memset(buffer, 0, BlockSize);
b.initialised_ = true; }
// FIXME: we don't need this anymore
template <uint32_t BlockSize>
void
block_manager<BlockSize>::read_release(block *b) const
{
delete b;
} }
template <uint32_t BlockSize> template <uint32_t BlockSize>
void void
block_manager<BlockSize>::write_and_release(block *b) block_manager<BlockSize>::write_release(block *b)
{ {
if (b->initialised_) { if (b->is_superblock_) {
if (lock_count_ != 1)
throw runtime_error("superblock isn't the last block");
}
if (b->validator_) if (b->validator_)
(*b->validator_)->prepare(*b); (*b->validator_)->prepare(*b);
write_block(*b); write_buffer(b->location_, b->data_);
}
delete b; delete b;
} }

View File

@ -1,7 +1,5 @@
#include "block.h" #include "block.h"
#include <iostream>
#define BOOST_TEST_MODULE BlockManagerTests #define BOOST_TEST_MODULE BlockManagerTests
#include <boost/test/included/unit_test.hpp> #include <boost/test/included/unit_test.hpp>
@ -10,6 +8,10 @@ using namespace std;
//---------------------------------------------------------------- //----------------------------------------------------------------
namespace { namespace {
block_manager<4096>::ptr create_bm(block_address nr = 1024) {
return block_manager<4096>::ptr(new block_manager<4096>("./test.data", nr));
}
template <uint32_t BlockSize> template <uint32_t BlockSize>
void check_all_bytes(typename block_manager<BlockSize>::read_ref const &rr, int v) { void check_all_bytes(typename block_manager<BlockSize>::read_ref const &rr, int v) {
auto data = rr.data(); auto data = rr.data();
@ -41,49 +43,45 @@ BOOST_AUTO_TEST_CASE(bad_path)
BOOST_AUTO_TEST_CASE(out_of_range_access) BOOST_AUTO_TEST_CASE(out_of_range_access)
{ {
block_manager<4096> bm("./test.data", 1024); auto bm = create_bm(1024);
BOOST_CHECK_THROW(bm.read_lock(1024), runtime_error); BOOST_CHECK_THROW(bm->read_lock(1024), runtime_error);
} }
BOOST_AUTO_TEST_CASE(read_lock_all_blocks) BOOST_AUTO_TEST_CASE(read_lock_all_blocks)
{ {
block_address const nr = 64; block_address const nr = 64;
block_manager<4096> bm("./test.data", nr); auto bm = create_bm(nr);
for (unsigned i = 0; i < nr; i++) for (unsigned i = 0; i < nr; i++)
bm.read_lock(i); bm->read_lock(i);
} }
BOOST_AUTO_TEST_CASE(write_lock_all_blocks) BOOST_AUTO_TEST_CASE(write_lock_all_blocks)
{ {
block_address const nr = 64; block_address const nr = 64;
block_manager<4096> bm("./test.data", nr); auto bm = create_bm(nr);
for (unsigned i = 0; i < nr; i++) for (unsigned i = 0; i < nr; i++)
bm.write_lock(i); bm->write_lock(i);
} }
BOOST_AUTO_TEST_CASE(writes_persist) BOOST_AUTO_TEST_CASE(writes_persist)
{ {
block_address const nr = 64; block_address const nr = 64;
block_manager<4096> bm("./test.data", nr); auto bm = create_bm(nr);
for (unsigned i = 0; i < nr; i++) { for (unsigned i = 0; i < nr; i++) {
auto wr = bm.write_lock(i); auto wr = bm->write_lock(i);
::memset(wr.data(), i, 4096); ::memset(wr.data(), i, 4096);
} }
for (unsigned i = 0; i < nr; i++) { for (unsigned i = 0; i < nr; i++) {
auto rr = bm.read_lock(i); auto rr = bm->read_lock(i);
check_all_bytes<4096>(rr, i % 256); check_all_bytes<4096>(rr, i % 256);
} }
} }
BOOST_AUTO_TEST_CASE(write_lock_zero_zeroes) BOOST_AUTO_TEST_CASE(write_lock_zero_zeroes)
{ {
block_address const nr = 64; auto bm = create_bm(64);
block_manager<4096> bm("./test.data", nr); check_all_bytes<4096>(bm->write_lock_zero(23), 0);
check_all_bytes<4096>(bm.write_lock_zero(23), 0);
} }
BOOST_AUTO_TEST_CASE(different_block_sizes) BOOST_AUTO_TEST_CASE(different_block_sizes)
@ -105,21 +103,70 @@ BOOST_AUTO_TEST_CASE(different_block_sizes)
BOOST_AUTO_TEST_CASE(read_validator_works) BOOST_AUTO_TEST_CASE(read_validator_works)
{ {
typename block_manager<4096>::block_manager::validator::ptr v(new zero_validator<4096>()); typename block_manager<4096>::block_manager::validator::ptr v(new zero_validator<4096>());
block_manager<4096> bm("./test.data", 64); auto bm = create_bm(64);
bm.write_lock_zero(0); bm->write_lock_zero(0);
bm.read_lock(0, v); bm->read_lock(0, v);
} }
BOOST_AUTO_TEST_CASE(write_validator_works) BOOST_AUTO_TEST_CASE(write_validator_works)
{ {
auto bm = create_bm(64);
typename block_manager<4096>::block_manager::validator::ptr v(new zero_validator<4096>()); typename block_manager<4096>::block_manager::validator::ptr v(new zero_validator<4096>());
block_manager<4096> bm("./test.data", 64);
{ {
auto wr = bm.write_lock(0, v); auto wr = bm->write_lock(0, v);
::memset(wr.data(), 23, sizeof(wr.data())); ::memset(wr.data(), 23, sizeof(wr.data()));
} }
check_all_bytes<4096>(bm.read_lock(0), 0); check_all_bytes<4096>(bm->read_lock(0), 0);
} }
BOOST_AUTO_TEST_CASE(cannot_have_two_superblocks)
{
auto bm = create_bm();
auto superblock = bm->superblock(0);
BOOST_CHECK_THROW(bm->superblock(1), runtime_error);
}
BOOST_AUTO_TEST_CASE(can_have_subsequent_superblocks)
{
auto bm = create_bm();
{ auto superblock = bm->superblock(0); }
{ auto superblock = bm->superblock(0); }
}
BOOST_AUTO_TEST_CASE(superblocks_can_change_address)
{
auto bm = create_bm();
{ auto superblock = bm->superblock(0); }
{ auto superblock = bm->superblock(1); }
}
BOOST_AUTO_TEST_CASE(superblock_must_be_last)
{
auto bm = create_bm();
{
auto rr = bm->read_lock(63);
{
BOOST_CHECK_THROW(bm->superblock(0), runtime_error);
}
}
}
BOOST_AUTO_TEST_CASE(references_can_be_copied)
{
auto bm = create_bm();
auto wr1 = bm->write_lock(0);
auto wr2(wr1);
}
BOOST_AUTO_TEST_CASE(flush_throws_if_held_locks)
{
auto bm = create_bm();
auto wr = bm->write_lock(0);
BOOST_CHECK_THROW(bm->flush(), runtime_error);
}
// cannot write lock the same block more than once
//---------------------------------------------------------------- //----------------------------------------------------------------