#include "block.h" #include #include #include #include #include #include #include using namespace boost; using namespace persistent_data; using namespace std; //---------------------------------------------------------------- template block_manager::read_ref::read_ref(typename block_manager::block::ptr b) : block_(b) { } template block_address block_manager::read_ref::get_location() const { return block_->location_; } template typename block_manager::const_buffer & block_manager::read_ref::data() const { return block_->data_; } template block_manager::write_ref::write_ref(typename block_manager::block::ptr b) : read_ref(b) { } template typename block_manager::buffer & block_manager::write_ref::data() { return read_ref::block_->data_; } //---------------------------------------------------------------- template block_manager::block_manager(std::string const &path, block_address nr_blocks) : nr_blocks_(nr_blocks), lock_count_(0), superblock_count_(0), ordinary_count_(0) { fd_ = ::open(path.c_str(), O_RDWR | O_CREAT, 0666); if (fd_ < 0) throw std::runtime_error("couldn't open file"); } template block_manager::~block_manager() { ::close(fd_); } template typename block_manager::read_ref block_manager::read_lock(block_address location) const { check(location); buffer buf; read_buffer(location, buf); register_lock(location, READ_LOCK); return read_ref( typename block::ptr( new block(location, buf, lock_count_, ordinary_count_), bind(&block_manager::read_release, this, _1))); } template optional::read_ref> block_manager::read_try_lock(block_address location) const { return read_lock(location); } template typename block_manager::write_ref block_manager::write_lock(block_address location) { check(location); buffer buf; read_buffer(location, buf); register_lock(location, WRITE_LOCK); return write_ref( typename block::ptr( new block(location, buf, lock_count_, ordinary_count_), bind(&block_manager::write_release, this, _1))); } template typename block_manager::write_ref block_manager::write_lock_zero(block_address location) { check(location); buffer buf; zero_buffer(buf); register_lock(location, WRITE_LOCK); typename block::ptr b(new block(location, buf, lock_count_, ordinary_count_), bind(&block_manager::write_release, this, _1)); return write_ref(b); } template typename block_manager::read_ref block_manager::read_lock(block_address location, typename block_manager::validator::ptr v) const { check(location); buffer buf; read_buffer(location, buf); typename block::ptr b(new block(location, buf, lock_count_, ordinary_count_, false, v), bind(&block_manager::read_release, this, _1)); register_lock(location, READ_LOCK); return read_ref(b); } template optional::read_ref> block_manager::read_try_lock(block_address location, typename block_manager::validator::ptr v) const { return read_lock(location, v); } template typename block_manager::write_ref block_manager::write_lock(block_address location, typename block_manager::validator::ptr v) { check(location); buffer buf; read_buffer(location, buf); typename block::ptr b(new block(location, buf, lock_count_, ordinary_count_, false, v), bind(&block_manager::write_release, this, _1)); register_lock(location, WRITE_LOCK); return write_ref(b); } template typename block_manager::write_ref block_manager::write_lock_zero(block_address location, typename block_manager::validator::ptr v) { check(location); buffer buf; zero_buffer(buf); typename block::ptr b(new block(location, buf, lock_count_, ordinary_count_, false, v), bind(&block_manager::write_release, this, _1)); register_lock(location, WRITE_LOCK); return write_ref(b); } template typename block_manager::write_ref block_manager::superblock(block_address location) { check(location); if (superblock_count_ > 0) throw runtime_error("already have superblock"); buffer buf; read_buffer(location, buf); typename block::ptr b(new block(location, buf, lock_count_, superblock_count_, true), bind(&block_manager::write_release, this, _1)); register_lock(location, WRITE_LOCK); return write_ref(b); } template typename block_manager::write_ref block_manager::superblock_zero(block_address location) { check(location); if (superblock_count_ > 0) throw runtime_error("already have superblock"); buffer buf; zero_buffer(buf); typename block::ptr b(new block(location, buf, lock_count_, superblock_count_, true), bind(&block_manager::write_release, this, _1)); register_lock(location, WRITE_LOCK); return write_ref(b); } template typename block_manager::write_ref block_manager::superblock(block_address location, typename block_manager::validator::ptr v) { if (superblock_count_ > 0) throw runtime_error("already have superblock"); check(location); buffer buf; read_buffer(location, buf); typename block::ptr b(new block(location, buf, lock_count_, superblock_count_, true, v), bind(&block_manager::write_release, this, _1)); register_lock(location, WRITE_LOCK); return write_ref(b); } template typename block_manager::write_ref block_manager::superblock_zero(block_address location, typename block_manager::validator::ptr v) { if (superblock_count_ > 0) throw runtime_error("already have superblock"); check(location); buffer buf; zero_buffer(buf); typename block::ptr b(new block(location, buf, lock_count_, superblock_count_, true, v), bind(&block_manager::write_release, this, _1)); register_lock(location, WRITE_LOCK); return write_ref(b); } template void block_manager::flush() { if (lock_count_ > 0) throw runtime_error("asked to flush while locks are still held"); ::fsync(fd_); } template void block_manager::read_buffer(block_address b, block_manager::buffer &buffer) const { off_t r; r = ::lseek(fd_, BlockSize * b, SEEK_SET); if (r == (off_t) -1) throw std::runtime_error("lseek failed"); ssize_t n; size_t remaining = BlockSize; unsigned char *buf = buffer; do { n = ::read(fd_, buf, remaining); if (n > 0) { remaining -= n; buf += n; } } while (remaining && ((n > 0) || (n == EINTR) || (n == EAGAIN))); if (n < 0) throw std::runtime_error("read failed"); } template void block_manager::write_buffer(block_address b, block_manager::const_buffer &buffer) { off_t r; r = ::lseek(fd_, BlockSize * b, SEEK_SET); if (r == (off_t) -1) throw std::runtime_error("lseek failed"); ssize_t n; size_t remaining = BlockSize; unsigned char const *buf = buffer; do { n = ::write(fd_, buf, remaining); if (n > 0) { remaining -= n; buf += n; } } while (remaining && ((n > 0) || (n == EINTR) || (n == EAGAIN))); if (n < 0) throw std::runtime_error("write failed"); } template void block_manager::zero_buffer(block_manager::buffer &buffer) const { ::memset(buffer, 0, BlockSize); } // FIXME: we don't need this anymore template void block_manager::read_release(block *b) const { unregister_lock(b->location_, READ_LOCK); delete b; } template void block_manager::write_release(block *b) { if (b->is_superblock_) { if (lock_count_ != 1) throw runtime_error("superblock isn't the last block"); } if (b->validator_) (*b->validator_)->prepare(*b); write_buffer(b->location_, b->data_); unregister_lock(b->location_, WRITE_LOCK); delete b; } template void block_manager::check(block_address b) const { if (b >= nr_blocks_) throw std::runtime_error("block address out of bounds"); } template block_address block_manager::get_nr_blocks() const { return nr_blocks_; } // FIXME: how do we unregister if block construction throws? template void block_manager::register_lock(block_address b, lock_type t) const { typename held_map::iterator it = held_locks_.find(b); if (it == held_locks_.end()) held_locks_.insert(make_pair(b, make_pair(t, 1))); else { if (it->second.first != t) throw std::runtime_error("lock type mismatch when locking"); if (it->second.first == WRITE_LOCK) throw std::runtime_error("cannot hold concurrent write locks"); it->second.second++; } } template void block_manager::unregister_lock(block_address b, lock_type t) const { typename held_map::iterator it = held_locks_.find(b); if (it == held_locks_.end()) throw std::runtime_error("lock not held"); if (it->second.first != t) throw std::runtime_error("lock type mismatch when unlocking"); it->second.second--; if (it->second.second == 0) held_locks_.erase(it); } //----------------------------------------------------------------