From a124b7ce2639391607fac832edf418be4f43cb68 Mon Sep 17 00:00:00 2001 From: Joe Thornber Date: Tue, 14 Jun 2016 16:27:17 +0100 Subject: [PATCH] [block-cache] Fix some bugs in the copier --- block-cache/copier.cc | 87 ++++++++++++++++++++++++++++++++------- block-cache/copier.h | 9 ++++ unit-tests/copier_t.cc | 87 +++++++++++++++++++++++++++++---------- unit-tests/io_engine_t.cc | 63 ++++++++++++++++++---------- 4 files changed, 188 insertions(+), 58 deletions(-) diff --git a/block-cache/copier.cc b/block-cache/copier.cc index 9910b58..b5c21ef 100644 --- a/block-cache/copier.cc +++ b/block-cache/copier.cc @@ -11,12 +11,12 @@ using namespace std; copier::copier(io_engine &engine, string const &src, string const &dest, sector_t block_size, size_t mem) - : pool_(block_size * 512, mem), + : pool_(block_size * 512, mem, PAGE_SIZE), block_size_(block_size), nr_blocks_(mem / block_size), engine_(engine), - src_handle_(engine_.open_file(src, io_engine::READ_ONLY)), - dest_handle_(engine_.open_file(dest, io_engine::READ_WRITE)), + src_handle_(engine_.open_file(src, io_engine::M_READ_ONLY)), + dest_handle_(engine_.open_file(dest, io_engine::M_READ_WRITE)), genkey_count_(0) { } @@ -30,14 +30,14 @@ copier::~copier() void copier::issue(copy_op const &op) { - auto data = pool_.alloc(); - if (!data) { - wait_(); - data = pool_.alloc(); + void *data; - if (!data) - // Shouldn't get here - throw runtime_error("couldn't allocate buffer"); + while (!(data = pool_.alloc())) { + wait_(); + + // data may still not be present because the wait_ could + // have completed a read and issued the corresponding + // write. } copy_job job(op, data); @@ -45,7 +45,7 @@ copier::issue(copy_op const &op) unsigned key = genkey(); // used as context for the io_engine auto r = engine_.issue_io(src_handle_, - io_engine::READ, + io_engine::D_READ, to_sector(op.src_b), to_sector(op.src_e), data, @@ -53,6 +53,7 @@ copier::issue(copy_op const &op) if (r) jobs_.insert(make_pair(key, job)); + else complete(job); } @@ -66,23 +67,74 @@ copier::nr_pending() const boost::optional copier::wait() { - while (!jobs_.empty() && complete_.empty()) + if (complete_.empty()) wait_(); + return wait_complete(); +} + +boost::optional +copier::wait(unsigned µ) +{ if (complete_.empty()) + wait_(micro); + return wait_complete(); +} + +bool +copier::pending() const +{ + return !jobs_.empty(); +} + +boost::optional +copier::wait_complete() +{ + if (complete_.empty()) { return optional(); - else { + } else { auto op = complete_.front(); complete_.pop_front(); return optional(op); } } +void +copier::wait_(unsigned µ) +{ + optional mp; + + if (!pending()) + return; + + + bool completed = false; + while (pending() && !completed) { + mp = engine_.wait(micro); + if (mp) + completed = wait_successful(*mp); + + if (!micro) + break; + } +} + void copier::wait_() { - auto p = engine_.wait(); + bool completed = false; + + while (pending() && !completed) { + auto mp = engine_.wait(); + if (mp) + completed = wait_successful(*mp); + } +} + +bool +copier::wait_successful(io_engine::wait_result const &p) +{ auto it = jobs_.find(p.second); if (it == jobs_.end()) throw runtime_error("Internal error. Lost track of copy job."); @@ -92,26 +144,29 @@ copier::wait_() // IO was unsuccessful complete(j); jobs_.erase(it); - return; + return true; } // IO was successful if (!j.op.read_complete) { j.op.read_complete = true; if (!engine_.issue_io(dest_handle_, - io_engine::WRITE, + io_engine::D_WRITE, to_sector(j.op.dest_b), to_sector(j.op.dest_b + (j.op.src_e - j.op.src_b)), j.data, it->first)) { complete(j); jobs_.erase(it); + return true; } + return false; } else { j.op.write_complete = true; complete(j); jobs_.erase(it); + return true; } } diff --git a/block-cache/copier.h b/block-cache/copier.h index 5eb2a51..bd49966 100644 --- a/block-cache/copier.h +++ b/block-cache/copier.h @@ -32,6 +32,10 @@ namespace bcache { write_complete(false) { } + bool operator <(copy_op const &rhs) const { + return dest_b < rhs.dest_b; + } + bool success() const { return read_complete && write_complete; } @@ -69,8 +73,13 @@ namespace bcache { unsigned nr_pending() const; boost::optional wait(); + boost::optional wait(unsigned µ); private: + bool pending() const; + bool wait_successful(io_engine::wait_result const &p); + boost::optional wait_complete(); + void wait_(unsigned µ); void wait_(); void complete(copy_job const &j); diff --git a/unit-tests/copier_t.cc b/unit-tests/copier_t.cc index 0185d9e..1621c4c 100644 --- a/unit-tests/copier_t.cc +++ b/unit-tests/copier_t.cc @@ -20,7 +20,6 @@ #include "block-cache/copier.h" #include "test_utils.h" - #include using namespace boost; @@ -31,18 +30,32 @@ using namespace testing; //---------------------------------------------------------------- namespace { + unsigned const BLOCK_SIZE = 64u; + using wait_result = io_engine::wait_result; + + ostream &operator <<(ostream &out, wait_result const &wr) { + out << "wait_result[" << wr.first << ", " << wr.second << "]"; + return out; + } + + ostream &operator <<(ostream &out, optional const &mwr) { + if (mwr) { + out << "Just[wait_result[" << mwr->first << ", " << mwr->second << "]]"; + } else + out << "Nothing"; + return out; + } + class io_engine_mock : public io_engine { public: MOCK_METHOD3(open_file, handle(string const &, mode, sharing)); MOCK_METHOD1(close_file, void(handle)); MOCK_METHOD6(issue_io, bool(handle, dir, sector_t, sector_t, void *, unsigned)); - MOCK_METHOD0(wait, wait_result()); + MOCK_METHOD0(wait, optional()); + MOCK_METHOD1(wait, optional(unsigned &)); }; - unsigned const BLOCK_SIZE = 64u; - using wait_result = io_engine::wait_result; - class CopierTests : public Test { public: CopierTests() @@ -51,9 +64,9 @@ namespace { } unique_ptr make_copier() { - EXPECT_CALL(engine_, open_file(src_file_, io_engine::READ_ONLY, io_engine::EXCLUSIVE)). + EXPECT_CALL(engine_, open_file(src_file_, io_engine::M_READ_ONLY, io_engine::EXCLUSIVE)). WillOnce(Return(SRC_HANDLE)); - EXPECT_CALL(engine_, open_file(dest_file_, io_engine::READ_WRITE, io_engine::EXCLUSIVE)). + EXPECT_CALL(engine_, open_file(dest_file_, io_engine::M_READ_WRITE, io_engine::EXCLUSIVE)). WillOnce(Return(DEST_HANDLE)); EXPECT_CALL(engine_, close_file(SRC_HANDLE)).Times(1); @@ -64,11 +77,15 @@ namespace { BLOCK_SIZE, 1 * 1024 * 1024)); } + static optional make_wr(bool success, unsigned context) { + return optional(wait_result(success, context)); + } + void issue_successful_op(copier &c, copy_op &op, unsigned context) { InSequence dummy; unsigned nr_pending = c.nr_pending(); - EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, + EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, op.src_b * BLOCK_SIZE, op.src_e * BLOCK_SIZE, _, context)). WillOnce(Return(true)); @@ -77,15 +94,15 @@ namespace { ASSERT_TRUE(c.nr_pending() == nr_pending + 1); EXPECT_CALL(engine_, wait()). - WillOnce(Return(wait_result(true, context))); + WillOnce(Return(make_wr(true, context))); - EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::WRITE, + EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::D_WRITE, op.dest_b * BLOCK_SIZE, (op.dest_b + (op.src_e - op.src_b)) * BLOCK_SIZE, _, context)). WillOnce(Return(true)); EXPECT_CALL(engine_, wait()). - WillOnce(Return(wait_result(true, context))); + WillOnce(Return(make_wr(true, context))); auto mop = c.wait(); ASSERT_EQ(c.nr_pending(), nr_pending); @@ -124,7 +141,7 @@ TEST_F(CopierTests, unsuccessful_issue_read) auto c = make_copier(); InSequence dummy; - EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, 0, BLOCK_SIZE, _, 0)). + EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)). WillOnce(Return(false)); c->issue(op1); @@ -141,13 +158,13 @@ TEST_F(CopierTests, unsuccessful_read) auto c = make_copier(); InSequence dummy; - EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, 0, BLOCK_SIZE, _, 0)). + EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)). WillOnce(Return(true)); c->issue(op1); ASSERT_EQ(c->nr_pending(), 1u); EXPECT_CALL(engine_, wait()). - WillOnce(Return(wait_result(false, 0u))); + WillOnce(Return(make_wr(false, 0u))); ASSERT_EQ(c->nr_pending(), 1u); auto mop = c->wait(); @@ -161,17 +178,17 @@ TEST_F(CopierTests, unsuccessful_issue_write) auto c = make_copier(); InSequence dummy; - EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, 0, BLOCK_SIZE, _, 0)). + EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)). WillOnce(Return(true)); c->issue(op1); ASSERT_EQ(c->nr_pending(), 1u); EXPECT_CALL(engine_, wait()). - WillOnce(Return(wait_result(true, 0u))); + WillOnce(Return(make_wr(true, 0u))); ASSERT_EQ(c->nr_pending(), 1u); - EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::WRITE, 0, BLOCK_SIZE, _, 0)). + EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::D_WRITE, 0, BLOCK_SIZE, _, 0)). WillOnce(Return(false)); auto mop = c->wait(); @@ -187,19 +204,19 @@ TEST_F(CopierTests, unsuccessful_write) auto c = make_copier(); InSequence dummy; - EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::READ, 0, BLOCK_SIZE, _, 0)). + EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)). WillOnce(Return(true)); c->issue(op1); ASSERT_EQ(c->nr_pending(), 1u); EXPECT_CALL(engine_, wait()). - WillOnce(Return(wait_result(true, 0u))); + WillOnce(Return(make_wr(true, 0u))); - EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::WRITE, 0, BLOCK_SIZE, _, 0)). + EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::D_WRITE, 0, BLOCK_SIZE, _, 0)). WillOnce(Return(true)); EXPECT_CALL(engine_, wait()). - WillOnce(Return(wait_result(false, 0u))); + WillOnce(Return(make_wr(false, 0u))); auto mop = c->wait(); ASSERT_EQ(c->nr_pending(), 0u); @@ -225,4 +242,32 @@ TEST_F(CopierTests, copy_different_blocks) } } +TEST_F(CopierTests, wait_can_timeout) +{ + copy_op op1(0, 1, 0); + auto c = make_copier(); + + InSequence dummy; + EXPECT_CALL(engine_, issue_io(SRC_HANDLE, io_engine::D_READ, 0, BLOCK_SIZE, _, 0)). + WillOnce(Return(true)); + c->issue(op1); + + ASSERT_EQ(c->nr_pending(), 1u); + + unsigned micro = 10000; + EXPECT_CALL(engine_, wait(micro)). + WillOnce(Return(make_wr(true, 0u))); + ASSERT_EQ(c->nr_pending(), 1u); + + EXPECT_CALL(engine_, issue_io(DEST_HANDLE, io_engine::D_WRITE, 0, BLOCK_SIZE, _, 0)). + WillOnce(Return(true)); + + EXPECT_CALL(engine_, wait(micro)). + WillOnce(DoAll(SetArgReferee<0>(0u), Return(optional()))); + + auto mop = c->wait(micro); + ASSERT_FALSE(mop); + ASSERT_EQ(c->nr_pending(), 1u); +} + //---------------------------------------------------------------- diff --git a/unit-tests/io_engine_t.cc b/unit-tests/io_engine_t.cc index f1b3a9e..2790eb6 100644 --- a/unit-tests/io_engine_t.cc +++ b/unit-tests/io_engine_t.cc @@ -64,8 +64,8 @@ TEST_F(IOEngineTests, empty_test) TEST_F(IOEngineTests, open_and_close) { - auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); - auto dest_handle = engine_->open_file(dest_file_.get_path(), io_engine::READ_WRITE); + auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY); + auto dest_handle = engine_->open_file(dest_file_.get_path(), io_engine::M_READ_WRITE); ASSERT_TRUE(src_handle != dest_handle); engine_->close_file(src_handle); engine_->close_file(dest_handle); @@ -74,17 +74,17 @@ TEST_F(IOEngineTests, open_and_close) TEST_F(IOEngineTests, you_can_read_a_read_only_handle) { unsigned nr_sectors = 8; - auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); + auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY); void *data = pool_.alloc(); bool r = engine_->issue_io(src_handle, - io_engine::READ, + io_engine::D_READ, 0, nr_sectors, data, 123); ASSERT_TRUE(r); auto wr = engine_->wait(); - ASSERT_TRUE(wr.first); - ASSERT_TRUE(wr.second == 123); + ASSERT_TRUE(wr->first); + ASSERT_TRUE(wr->second == 123); engine_->close_file(src_handle); pool_.free(data); @@ -94,10 +94,10 @@ TEST_F(IOEngineTests, you_can_read_a_read_only_handle) TEST_F(IOEngineTests, you_cannot_write_to_a_read_only_handle) { unsigned nr_sectors = 8; - auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); + auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY); void *data = pool_.alloc(); bool r = engine_->issue_io(src_handle, - io_engine::WRITE, + io_engine::D_WRITE, 0, nr_sectors, data, 0); @@ -109,17 +109,17 @@ TEST_F(IOEngineTests, you_cannot_write_to_a_read_only_handle) TEST_F(IOEngineTests, you_can_write_to_a_read_write_handle) { unsigned nr_sectors = 8; - auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); + auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY); void *data = pool_.alloc(); bool r = engine_->issue_io(src_handle, - io_engine::READ, + io_engine::D_READ, 0, nr_sectors, data, 123); ASSERT_TRUE(r); auto wr = engine_->wait(); - ASSERT_TRUE(wr.first); - ASSERT_TRUE(wr.second == 123); + ASSERT_TRUE(wr->first); + ASSERT_TRUE(wr->second == 123); engine_->close_file(src_handle); pool_.free(data); @@ -128,16 +128,16 @@ TEST_F(IOEngineTests, you_can_write_to_a_read_write_handle) TEST_F(IOEngineTests, final_block_read_succeeds) { unsigned nr_sectors = 8; - auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); + auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY); void *data = pool_.alloc(); bool r = engine_->issue_io(src_handle, - io_engine::READ, + io_engine::D_READ, meg(32) - nr_sectors, meg(32), data, 123); ASSERT_TRUE(r); auto wr = engine_->wait(); - ASSERT_TRUE(wr.first); + ASSERT_TRUE(wr->first); engine_->close_file(src_handle); pool_.free(data); @@ -147,16 +147,16 @@ TEST_F(IOEngineTests, final_block_read_succeeds) TEST_F(IOEngineTests, out_of_bounds_read_fails) { unsigned nr_sectors = 8; - auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::READ_ONLY); + auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY); void *data = pool_.alloc(); bool r = engine_->issue_io(src_handle, - io_engine::READ, + io_engine::D_READ, meg(32), meg(32) + nr_sectors, data, 123); ASSERT_TRUE(r); auto wr = engine_->wait(); - ASSERT_FALSE(wr.first); + ASSERT_FALSE(wr->first); engine_->close_file(src_handle); pool_.free(data); @@ -166,20 +166,41 @@ TEST_F(IOEngineTests, out_of_bounds_read_fails) TEST_F(IOEngineTests, out_of_bounds_write_succeeds) { unsigned nr_sectors = 8; - auto handle = engine_->open_file(dest_file_.get_path(), io_engine::READ_WRITE); + auto handle = engine_->open_file(dest_file_.get_path(), io_engine::M_READ_WRITE); void *data = pool_.alloc(); bool r = engine_->issue_io(handle, - io_engine::WRITE, + io_engine::D_WRITE, meg(32), meg(32) + nr_sectors, data, 123); ASSERT_TRUE(r); auto wr = engine_->wait(); - ASSERT_TRUE(wr.first); + ASSERT_TRUE(wr->first); engine_->close_file(handle); pool_.free(data); } +TEST_F(IOEngineTests, succeed_with_timeout) +{ + unsigned nr_sectors = 8; + auto src_handle = engine_->open_file(src_file_.get_path(), io_engine::M_READ_ONLY); + void *data = pool_.alloc(); + bool r = engine_->issue_io(src_handle, + io_engine::D_READ, + 0, nr_sectors, + data, + 123); + ASSERT_TRUE(r); + unsigned micro = 10; + auto wr = engine_->wait(micro); + ASSERT_TRUE(wr->first); + ASSERT_TRUE(wr->second == 123); + + engine_->close_file(src_handle); + pool_.free(data); +} + + //----------------------------------------------------------------