factor out index_store abstraction for disk space maps

This commit is contained in:
Joe Thornber 2011-11-10 14:43:15 +00:00
parent 9d0686fce7
commit 0bd9aa33bf
3 changed files with 208 additions and 146 deletions

View File

@ -38,7 +38,7 @@ test-programs: $(TEST_PROGRAMS)
.SUFFIXES: .cc .o .d
%.d: %.cc
g++ -MM -MT $(subst .cc,.o,$<) $(CPPFLAGS) $< > $@.$$$$; \
g++ -MM -MT $(subst .cc,.o,$<) $(CPPFLAGS) $< > $@.$$$$; \
sed 's,\([^ :]*\)\.o[ :]*,\1.o $@ : ,g' < $@.$$$$ > $@; \
rm -f $@.$$$$

View File

@ -197,24 +197,41 @@ namespace {
}
};
class index_store {
public:
typedef boost::shared_ptr<index_store> ptr;
virtual void resize(block_address nr_indexes) = 0;
virtual index_entry find_ie(block_address b) const = 0;
virtual void save_ie(block_address b, struct index_entry ie) = 0;
virtual void commit_ies() = 0;
virtual ptr clone() const = 0;
virtual block_address get_root() const = 0;
virtual void check(block_counter &counter, block_address nr_index_entries) const = 0;
};
unsigned const ENTRIES_PER_BLOCK = (MD_BLOCK_SIZE - sizeof(bitmap_header)) * 4;
class sm_disk_base : public checked_space_map {
public:
typedef boost::shared_ptr<sm_disk_base> ptr;
typedef transaction_manager::read_ref read_ref;
typedef transaction_manager::write_ref write_ref;
sm_disk_base(transaction_manager::ptr tm)
sm_disk_base(index_store::ptr indexes,
transaction_manager::ptr tm)
: tm_(tm),
entries_per_block_((MD_BLOCK_SIZE - sizeof(bitmap_header)) * 4),
indexes_(indexes),
nr_blocks_(0),
nr_allocated_(0),
ref_counts_(tm_, ref_count_traits::ref_counter()) {
}
sm_disk_base(transaction_manager::ptr tm,
sm_disk_base(index_store::ptr indexes,
transaction_manager::ptr tm,
sm_root const &root)
: tm_(tm),
entries_per_block_((MD_BLOCK_SIZE - sizeof(bitmap_header)) * 4),
indexes_(indexes),
nr_blocks_(root.nr_blocks_),
nr_allocated_(root.nr_allocated_),
ref_counts_(tm_, root.ref_count_root_, ref_count_traits::ref_counter()) {
@ -276,18 +293,18 @@ namespace {
block_address new_block() {
// FIXME: silly to always start searching from the
// beginning.
block_address nr_indexes = div_up<block_address>(nr_blocks_, entries_per_block_);
block_address nr_indexes = div_up<block_address>(nr_blocks_, ENTRIES_PER_BLOCK);
for (block_address index = 0; index < nr_indexes; index++) {
index_entry ie = find_ie(index);
bitmap bm(tm_, ie);
optional<unsigned> maybe_b = bm.find_free(0, (index == nr_indexes - 1) ?
nr_blocks_ % entries_per_block_ : entries_per_block_);
nr_blocks_ % ENTRIES_PER_BLOCK : ENTRIES_PER_BLOCK);
if (maybe_b) {
block_address b = *maybe_b;
save_ie(index, bm.get_ie());
nr_allocated_++;
b = (index * entries_per_block_) + b;
b = (index * ENTRIES_PER_BLOCK) + b;
assert(get_count(b) == 1);
return b;
}
@ -303,15 +320,17 @@ namespace {
virtual void extend(block_address extra_blocks) {
block_address nr_blocks = nr_blocks_ + extra_blocks;
block_address bitmap_count = div_up<block_address>(nr_blocks, entries_per_block_);
block_address old_bitmap_count = div_up<block_address>(nr_blocks_, entries_per_block_);
block_address bitmap_count = div_up<block_address>(nr_blocks, ENTRIES_PER_BLOCK);
block_address old_bitmap_count = div_up<block_address>(nr_blocks_, ENTRIES_PER_BLOCK);
indexes_->resize(bitmap_count);
for (block_address i = old_bitmap_count; i < bitmap_count; i++) {
write_ref wr = tm_->new_block(bitmap_validator());
index_entry ie;
ie.blocknr_ = wr.get_location();
ie.nr_free_ = i == (bitmap_count - 1) ?
(nr_blocks % entries_per_block_) : entries_per_block_;
(nr_blocks % ENTRIES_PER_BLOCK) : ENTRIES_PER_BLOCK;
ie.none_free_before_ = 0;
save_ie(i, ie);
@ -323,6 +342,9 @@ namespace {
virtual void check(block_counter &counter) const {
ref_count_checker::ptr v(new ref_count_checker(counter));
ref_counts_.visit(v);
block_address nr_entries = div_up<block_address>(get_nr_blocks(), ENTRIES_PER_BLOCK);
indexes_->check(counter, nr_entries);
}
struct look_aside_iterator : public iterator {
@ -343,16 +365,36 @@ namespace {
virtual void iterate(iterator &it) const {
look_aside_iterator wrapper(*this, it);
unsigned nr_indexes = div_up<block_address>(nr_blocks_, entries_per_block_);
unsigned nr_indexes = div_up<block_address>(nr_blocks_, ENTRIES_PER_BLOCK);
for (unsigned i = 0; i < nr_indexes; i++) {
unsigned hi = (i == nr_indexes - 1) ? (nr_blocks_ % entries_per_block_) : entries_per_block_;
unsigned hi = (i == nr_indexes - 1) ? (nr_blocks_ % ENTRIES_PER_BLOCK) : ENTRIES_PER_BLOCK;
index_entry ie = find_ie(i);
bitmap bm(tm_, ie);
bm.iterate(i * entries_per_block_, hi, wrapper);
bm.iterate(i * ENTRIES_PER_BLOCK, hi, wrapper);
}
}
size_t root_size() {
return sizeof(sm_root_disk);
}
void copy_root(void *dest, size_t len) {
sm_root_disk d;
sm_root v;
if (len < sizeof(d))
throw runtime_error("root too small");
v.nr_blocks_ = sm_disk_base::get_nr_blocks();
v.nr_allocated_ = sm_disk_base::get_nr_allocated();
v.bitmap_root_ = get_index_store()->get_root();
v.ref_count_root_ = sm_disk_base::get_ref_count_root();
sm_root_traits::pack(v, d);
::memcpy(dest, &d, sizeof(d));
}
protected:
transaction_manager::ptr get_tm() const {
return tm_;
@ -366,29 +408,38 @@ namespace {
return ref_counts_.get_root();
}
unsigned get_entries_per_block() const {
return entries_per_block_;
index_store::ptr get_index_store() const {
return indexes_;
}
private:
virtual index_entry find_ie(block_address b) const = 0;
virtual void save_ie(block_address b, struct index_entry ie) = 0;
virtual void commit_ies() = 0;
// FIXME: remove these, they're just for the transistion
index_entry find_ie(block_address b) const {
return indexes_->find_ie(b);
}
void save_ie(block_address b, struct index_entry ie) {
return indexes_->save_ie(b, ie);
}
void commit_ies() {
return indexes_->commit_ies();
}
ref_t lookup_bitmap(block_address b) const {
index_entry ie = find_ie(b / entries_per_block_);
index_entry ie = find_ie(b / ENTRIES_PER_BLOCK);
bitmap bm(tm_, ie);
return bm.lookup(b % entries_per_block_);
return bm.lookup(b % ENTRIES_PER_BLOCK);
}
void insert_bitmap(block_address b, unsigned n) {
if (n > 3)
throw runtime_error("bitmap can only hold 2 bit values");
index_entry ie = find_ie(b / entries_per_block_);
index_entry ie = find_ie(b / ENTRIES_PER_BLOCK);
bitmap bm(tm_, ie);
bm.insert(b % entries_per_block_, n);
save_ie(b / entries_per_block_, bm.get_ie());
bm.insert(b % ENTRIES_PER_BLOCK, n);
save_ie(b / ENTRIES_PER_BLOCK, bm.get_ie());
}
ref_t lookup_ref_count(block_address b) const {
@ -410,7 +461,7 @@ namespace {
}
transaction_manager::ptr tm_;
uint32_t entries_per_block_;
index_store::ptr indexes_;
block_address nr_blocks_;
block_address nr_allocated_;
@ -470,53 +521,26 @@ namespace {
set<block_address> seen_indexes_;
};
class sm_disk : public sm_disk_base {
class btree_index_store : public index_store {
public:
typedef boost::shared_ptr<sm_disk> ptr;
typedef boost::shared_ptr<btree_index_store> ptr;
sm_disk(transaction_manager::ptr tm)
: sm_disk_base(tm),
bitmaps_(sm_disk_base::get_tm(), index_entry_traits::ref_counter()) {
btree_index_store(transaction_manager::ptr tm)
: tm_(tm),
bitmaps_(tm, index_entry_traits::ref_counter()) {
}
sm_disk(transaction_manager::ptr tm,
sm_root const &root)
: sm_disk_base(tm, root),
bitmaps_(sm_disk_base::get_tm(), root.bitmap_root_, index_entry_traits::ref_counter()) {
btree_index_store(transaction_manager::ptr tm,
block_address root)
: tm_(tm),
bitmaps_(tm, root, index_entry_traits::ref_counter()) {
}
size_t root_size() {
return sizeof(sm_root_disk);
virtual void resize(block_address nr_entries) {
// No op
}
void copy_root(void *dest, size_t len) {
sm_root_disk d;
sm_root v;
if (len < sizeof(d))
throw runtime_error("root too small");
v.nr_blocks_ = sm_disk_base::get_nr_blocks();
v.nr_allocated_ = sm_disk_base::get_nr_allocated();
v.bitmap_root_ = bitmaps_.get_root();
v.ref_count_root_ = sm_disk_base::get_ref_count_root();
sm_root_traits::pack(v, d);
::memcpy(dest, &d, sizeof(d));
}
void check(block_counter &counter) const {
sm_disk_base::check(counter);
bitmap_tree_validator::ptr v(new bitmap_tree_validator(counter));
bitmaps_.visit(v);
block_address nr_entries = div_up<block_address>(get_nr_blocks(), get_entries_per_block());
v->check_all_index_entries_present(nr_entries);
}
private:
index_entry find_ie(block_address ie_index) const {
virtual index_entry find_ie(block_address ie_index) const {
uint64_t key[1] = {ie_index};
optional<index_entry> mindex = bitmaps_.lookup(key);
if (!mindex)
@ -525,90 +549,80 @@ namespace {
return *mindex;
}
void save_ie(block_address ie_index, struct index_entry ie) {
virtual void save_ie(block_address ie_index, struct index_entry ie) {
uint64_t key[1] = {ie_index};
bitmaps_.insert(key, ie);
}
void commit_ies() {
virtual void commit_ies() {
// No op
}
btree<1, index_entry_traits> bitmaps_;
};
class sm_metadata : public sm_disk_base {
public:
typedef boost::shared_ptr<sm_metadata> ptr;
sm_metadata(transaction_manager::ptr tm)
: sm_disk_base(tm),
entries_(MAX_METADATA_BITMAPS) {
block_manager<>::write_ref wr = tm->new_block(index_validator());
bitmap_root_ = wr.get_location();
virtual index_store::ptr clone() const {
return index_store::ptr(new btree_index_store(tm_, bitmaps_.get_root()));
}
sm_metadata(transaction_manager::ptr tm,
sm_root const &root)
: sm_disk_base(tm, root),
bitmap_root_(root.bitmap_root_),
entries_(MAX_METADATA_BITMAPS) {
load_ies();
virtual block_address get_root() const {
return bitmaps_.get_root();
}
size_t root_size() {
return sizeof(sm_root_disk);
}
// FIXME: common code
void copy_root(void *dest, size_t len) {
sm_root_disk d;
sm_root v;
if (len < sizeof(d))
throw runtime_error("root too small");
v.nr_blocks_ = sm_disk_base::get_nr_blocks();
v.nr_allocated_ = sm_disk_base::get_nr_allocated();
v.bitmap_root_ = bitmap_root_;
v.ref_count_root_ = sm_disk_base::get_ref_count_root();
sm_root_traits::pack(v, d);
::memcpy(dest, &d, sizeof(d));
}
void check(block_counter &counter) const {
sm_disk_base::check(counter);
counter.inc(bitmap_root_);
for (unsigned i = 0; i < entries_.size(); i++)
if (entries_[i].blocknr_ != 0) // superblock
counter.inc(entries_[i].blocknr_);
virtual void check(block_counter &counter, block_address nr_index_entries) const {
bitmap_tree_validator::ptr v(new bitmap_tree_validator(counter));
bitmaps_.visit(v);
v->check_all_index_entries_present(nr_index_entries);
}
private:
index_entry find_ie(block_address ie_index) const {
transaction_manager::ptr tm_;
btree<1, index_entry_traits> bitmaps_;
};
class sm_disk : public sm_disk_base {
public:
typedef boost::shared_ptr<sm_disk> ptr;
sm_disk(transaction_manager::ptr tm)
: sm_disk_base(index_store::ptr(new btree_index_store(tm)), tm) {
}
sm_disk(transaction_manager::ptr tm,
sm_root const &root)
: sm_disk_base(index_store::ptr(new btree_index_store(tm, root.bitmap_root_)), tm, root) {
}
};
class metadata_index_store : public index_store {
public:
typedef boost::shared_ptr<metadata_index_store> ptr;
metadata_index_store(transaction_manager::ptr tm)
: tm_(tm) {
block_manager<>::write_ref wr = tm_->new_block(index_validator());
bitmap_root_ = wr.get_location();
}
metadata_index_store(transaction_manager::ptr tm, block_address root, block_address nr_indexes)
: tm_(tm),
bitmap_root_(root) {
resize(nr_indexes);
load_ies();
}
virtual void resize(block_address nr_indexes) {
entries_.resize(nr_indexes);
}
virtual index_entry find_ie(block_address ie_index) const {
return entries_[ie_index];
}
void save_ie(block_address ie_index, struct index_entry ie) {
virtual void save_ie(block_address ie_index, struct index_entry ie) {
entries_[ie_index] = ie;
}
void load_ies() {
block_manager<>::read_ref rr =
sm_disk_base::get_tm()->read_lock(bitmap_root_, index_validator());
metadata_index const *mdi = reinterpret_cast<metadata_index const *>(&rr.data());
unsigned nr_indexes = div_up<block_address>(sm_disk_base::get_nr_blocks(),
sm_disk_base::get_entries_per_block());
for (unsigned i = 0; i < nr_indexes; i++)
index_entry_traits::unpack(*(mdi->index + i), entries_[i]);
}
void commit_ies() {
virtual void commit_ies() {
std::pair<block_manager<>::write_ref, bool> p =
sm_disk_base::get_tm()->shadow(bitmap_root_, index_validator());
tm_->shadow(bitmap_root_, index_validator());
bitmap_root_ = p.first.get_location();
metadata_index *mdi = reinterpret_cast<metadata_index *>(&p.first.data());
@ -617,9 +631,55 @@ namespace {
index_entry_traits::pack(entries_[i], mdi->index[i]);
}
virtual index_store::ptr clone() const {
return index_store::ptr(new metadata_index_store(tm_, bitmap_root_, entries_.size()));
}
virtual block_address get_root() const {
return bitmap_root_;
}
virtual void check(block_counter &counter, block_address nr_index_entries) const {
counter.inc(bitmap_root_);
for (unsigned i = 0; i < entries_.size(); i++)
if (entries_[i].blocknr_ != 0) // superblock
counter.inc(entries_[i].blocknr_);
}
private:
void load_ies() {
block_manager<>::read_ref rr =
tm_->read_lock(bitmap_root_, index_validator());
metadata_index const *mdi = reinterpret_cast<metadata_index const *>(&rr.data());
for (unsigned i = 0; i < entries_.size(); i++)
index_entry_traits::unpack(*(mdi->index + i), entries_[i]);
}
transaction_manager::ptr tm_;
block_address bitmap_root_;
std::vector<index_entry> entries_;
};
class sm_metadata : public sm_disk_base {
public:
typedef boost::shared_ptr<sm_metadata> ptr;
sm_metadata(transaction_manager::ptr tm)
: sm_disk_base(index_store::ptr(new metadata_index_store(tm)), tm) {
}
sm_metadata(transaction_manager::ptr tm,
sm_root const &root)
: sm_disk_base(mk_index_store(tm, root.bitmap_root_, root.nr_blocks_), tm, root) {
}
private:
index_store::ptr mk_index_store(transaction_manager::ptr tm, block_address root, block_address nr_blocks) const {
block_address nr_indexes = div_up<block_address>(nr_blocks, ENTRIES_PER_BLOCK);
return index_store::ptr(new metadata_index_store(tm, root, nr_indexes));
}
};
}
//----------------------------------------------------------------

View File

@ -36,7 +36,7 @@ namespace {
public:
sm_recursive(checked_space_map::ptr sm)
: sm_(sm),
recursing_(false) {
depth_(0) {
}
virtual block_address get_nr_blocks() const {
@ -54,7 +54,7 @@ namespace {
}
virtual void set_count(block_address b, ref_t c) {
if (recursing_)
if (depth_)
add_op(block_op(block_op::SET, b, c));
else {
recursing_lock lock(*this);
@ -68,7 +68,7 @@ namespace {
}
virtual void inc(block_address b) {
if (recursing_)
if (depth_)
add_op(block_op(block_op::INC, b));
else {
recursing_lock lock(*this);
@ -77,7 +77,7 @@ namespace {
}
virtual void dec(block_address b) {
if (recursing_)
if (depth_)
add_op(block_op(block_op::DEC, b));
else {
recursing_lock lock(*this);
@ -86,13 +86,15 @@ namespace {
}
virtual block_address new_block() {
cant_recurse("new_block");
// new_block can recurse, because we know it's
// looking up entries in the _previous_
// transaction.
recursing_lock lock(*this);
return sm_->new_block();
}
virtual bool count_possibly_greater_than_one(block_address b) const {
if (recursing_)
if (depth_)
return true;
else {
@ -107,6 +109,10 @@ namespace {
return sm_->extend(extra_blocks);
}
virtual void iterate(iterator &it) const {
sm_->iterate(it);
}
virtual size_t root_size() {
cant_recurse("root_size");
recursing_const_lock lock(*this);
@ -119,7 +125,7 @@ namespace {
return sm_->copy_root(dest, len);
}
virtual void check(block_counter &counter) const {
virtual void check(persistent_data::block_counter &counter) const {
cant_recurse("check");
recursing_const_lock lock(*this);
return sm_->check(counter);
@ -158,23 +164,19 @@ namespace {
}
void cant_recurse(string const &method) const {
if (recursing_)
if (depth_)
throw runtime_error("recursive '" + method + "' not supported");
}
void set_recursing() const {
recursing_ = true;
}
struct recursing_lock {
recursing_lock(sm_recursive &smr)
: smr_(smr) {
smr_.recursing_ = true;
smr_.depth_++;
}
~recursing_lock() {
smr_.flush_ops();
smr_.recursing_ = false;
if (!--smr_.depth_)
smr_.flush_ops();
}
private:
@ -184,11 +186,11 @@ namespace {
struct recursing_const_lock {
recursing_const_lock(sm_recursive const &smr)
: smr_(smr) {
smr_.recursing_ = true;
smr_.depth_++;
}
~recursing_const_lock() {
smr_.recursing_ = false;
smr_.depth_--;
}
private:
@ -196,7 +198,7 @@ namespace {
};
checked_space_map::ptr sm_;
mutable bool recursing_;
mutable int depth_;
enum op {
BOP_INC,