Skip to content
Snippets Groups Projects
Commit fc33f2a0 authored by Sergey Yakubov's avatar Sergey Yakubov
Browse files

update datacache

parent 70a33dfe
No related branches found
No related tags found
No related merge requests found
......@@ -2,6 +2,8 @@
#include <iostream>
#include <chrono>
#include <algorithm>
namespace asapo {
......@@ -27,16 +29,16 @@ void* DataCache::AllocateSlot(uint64_t size) {
auto addr = cache_.get() + cur_pointer_;
cur_pointer_ += size;
if (!CleanOldSlots()) {
if (!CleanOldSlots(size)) {
cur_pointer_ = tmp;
return nullptr;
}
return addr;
}
void* DataCache::GetFreeSlot(uint64_t size, uint64_t* id) {
void* DataCache::GetFreeSlotAndLock(uint64_t size, CacheMeta** meta) {
std::lock_guard<std::mutex> lock{mutex_};
*meta = nullptr;
if (!CheckAllocationSize(size)) {
return nullptr;
}
......@@ -46,9 +48,10 @@ void* DataCache::GetFreeSlot(uint64_t size, uint64_t* id) {
return nullptr;
}
*id = GetNextId();
auto id = GetNextId();
meta_.emplace_back(CacheMeta{*id, addr, size, false});
*meta = new CacheMeta{id, addr, size, 1};
meta_.emplace_back(std::unique_ptr<CacheMeta> {*meta});
return addr;
}
......@@ -57,12 +60,12 @@ uint64_t DataCache::GetNextId() {
counter_++;
std::chrono::system_clock::time_point now = std::chrono::system_clock::now();
uint32_t timeMillis = std::chrono::duration_cast<std::chrono::milliseconds>(now.time_since_epoch()).count();
return (uint64_t)timeMillis << 32 | counter_;
return (uint64_t) timeMillis << 32 | counter_;
}
bool DataCache::SlotTooCloseToCurrentPointer(const CacheMeta& meta) {
bool DataCache::SlotTooCloseToCurrentPointer(const CacheMeta* meta) {
uint64_t dist;
uint64_t shift = (uint8_t*) meta.addr - cache_.get();
uint64_t shift = (uint8_t*) meta->addr - cache_.get();
if (shift > cur_pointer_) {
dist = shift - cur_pointer_;
} else {
......@@ -71,37 +74,45 @@ bool DataCache::SlotTooCloseToCurrentPointer(const CacheMeta& meta) {
return dist < cache_size_ * keepunlocked_ratio_;
}
void* DataCache::GetSlotToReadAndLock(uint64_t id, uint64_t* size) {
// we allow to read if it was already locked - if lock come from reading - no problems, from writing -should not happen!
void* DataCache::GetSlotToReadAndLock(uint64_t id, CacheMeta** meta) {
std::lock_guard<std::mutex> lock{mutex_};
for (auto& meta : meta_) {
if (meta.id == id) {
if (SlotTooCloseToCurrentPointer(meta)) {
for (auto& meta_rec : meta_) {
if (meta_rec->id == id) {
if (SlotTooCloseToCurrentPointer(meta_rec.get())) {
return nullptr;
}
*size = meta.size;
meta.locked = true;
return meta.addr;
meta_rec->lock++;
*meta = meta_rec.get();
return meta_rec->addr;
}
}
*meta = nullptr;
return nullptr;
}
bool DataCache::CleanOldSlots() {
uint64_t last_ok = meta_.size();
for (int64_t i = last_ok - 1; i >= 0; i--) {
uint64_t start_position = (uint8_t*) meta_[i].addr - cache_.get();
if (start_position > cur_pointer_) {
last_ok = i;
bool Intersects(uint64_t left1, uint64_t right1, uint64_t left2, uint64_t right2) {
return (left1 >= left2 && left1 < right2) || (right1 <= right2 && right1 > left2);
}
bool DataCache::CleanOldSlots(uint64_t size) {
uint64_t n_del = 0;
for (uint64_t i = 0; i < meta_.size(); i++) {
uint64_t start_position = (uint8_t*) meta_[i]->addr - cache_.get();
if (Intersects(start_position, start_position + meta_[i]->size, cur_pointer_ - size, cur_pointer_)) {
n_del++;
} else {
break;
}
}
for (int64_t i = 0; i < last_ok; i++) {
if (meta_[i].locked) return false;
for (uint64_t i = 0; i < n_del; i++) {
if (meta_[i]->lock > 0) return false;
}
if (last_ok != 0) {
meta_.erase(meta_.begin(), meta_.begin() + last_ok);
}
meta_.erase(meta_.begin(), meta_.begin() + n_del);
return true;
}
......@@ -109,4 +120,13 @@ bool DataCache::CheckAllocationSize(uint64_t size) {
return size <= cache_size_;
}
bool DataCache::UnlockSlot(CacheMeta* meta) {
if (meta == nullptr) {
return false;
}
std::lock_guard<std::mutex> lock{mutex_};
meta->lock = std::max(0, (int)meta->lock - 1);
return true;
}
}
\ No newline at end of file
......@@ -15,14 +15,16 @@ struct CacheMeta {
uint64_t id;
void* addr;
uint64_t size;
bool locked;
uint lock;
};
class DataCache {
public:
explicit DataCache(uint64_t cache_size_gb, float keepunlocked_ratio);
VIRTUAL void* GetFreeSlot(uint64_t size, uint64_t* id);
void* GetSlotToReadAndLock(uint64_t id, uint64_t* size);
VIRTUAL void* GetFreeSlotAndLock(uint64_t size, CacheMeta** meta);
void* GetSlotToReadAndLock(uint64_t id, CacheMeta** meta);
VIRTUAL bool UnlockSlot(CacheMeta* meta);
~DataCache() = default;
private:
uint64_t cache_size_;
float keepunlocked_ratio_;
......@@ -30,9 +32,9 @@ class DataCache {
uint64_t cur_pointer_ = 0;
std::unique_ptr<uint8_t[]> cache_;
std::mutex mutex_;
std::deque<CacheMeta> meta_;
bool SlotTooCloseToCurrentPointer(const CacheMeta& meta);
bool CleanOldSlots();
std::deque<std::unique_ptr<CacheMeta>> meta_;
bool SlotTooCloseToCurrentPointer(const CacheMeta* meta);
bool CleanOldSlots(uint64_t size);
void* AllocateSlot(uint64_t size);
bool CheckAllocationSize(uint64_t size);
uint64_t GetNextId();
......
......@@ -20,10 +20,10 @@ Error Request::PrepareDataBuffer() {
return err;
}
} else {
uint64_t slot_id;
data_ptr = cache__->GetFreeSlot(request_header_.data_size, &slot_id);
CacheMeta* slot;
data_ptr = cache__->GetFreeSlotAndLock(request_header_.data_size, &slot);
if (data_ptr) {
slot_id_ = slot_id;
slot_meta_ = slot;
} else {
return ErrorTemplates::kMemoryAllocationError.Generate("cannot allocate slot in cache");
}
......@@ -37,6 +37,9 @@ Error Request::ReceiveData() {
return err;
}
io__->Receive(socket_fd_, GetData(), request_header_.data_size, &err);
if (slot_meta_) {
cache__->UnlockSlot(slot_meta_);
}
return err;
}
......@@ -118,8 +121,13 @@ void Request::SetBeamline(std::string beamline) {
const std::string& Request::GetBeamline() const {
return beamline_;
}
uint64_t Request::GetSlotId() const {
return slot_id_;
if (slot_meta_) {
return slot_meta_->id;
} else {
return 0;
}
}
std::unique_ptr<Request> RequestFactory::GenerateRequest(const GenericRequestHeader&
......
......@@ -53,7 +53,7 @@ class Request {
std::string origin_uri_;
std::string beamtime_id_;
std::string beamline_;
uint64_t slot_id_{0};
CacheMeta* slot_meta_ = nullptr;
};
class RequestFactory {
......
......@@ -25,7 +25,7 @@ Error RequestHandlerDbWrite::ProcessRequest(Request* request) const {
return err;
}
return InsertRecordToDb(request);
return InsertRecordToDb(request);
}
......@@ -56,8 +56,8 @@ Error RequestHandlerDbWrite::InsertRecordToDb(const Request* request) const {
auto err = db_client__->Insert(file_info, true);
if (!err) {
log__->Debug(std::string{"insert record id "} + std::to_string(file_info.id) + " to " + kDBCollectionName + " in " +
db_name_ +
" at " + GetReceiverConfig()->broker_db_uri);
db_name_ +
" at " + GetReceiverConfig()->broker_db_uri);
}
return err;
}
......
......@@ -66,7 +66,9 @@ class MockRequest: public Request {
class MockDataCache: public DataCache {
public:
MockDataCache(): DataCache(0, 0) {};
MOCK_METHOD2(GetFreeSlot, void* (uint64_t size, uint64_t* id));
MOCK_METHOD2(GetFreeSlotAndLock, void* (uint64_t
size, CacheMeta** meta));
MOCK_METHOD1(UnlockSlot, bool(CacheMeta* meta));
};
......
......@@ -15,6 +15,7 @@ using ::testing::_;
using ::testing::ElementsAreArray;
using asapo::DataCache;
using asapo::CacheMeta;
namespace {
......@@ -26,6 +27,8 @@ class DataCacheTests : public Test {
uint64_t expected_size = 10;
uint64_t expected_val = 1;
float expected_keepunlocked_ratio = 0.2;
CacheMeta* meta1;
CacheMeta* meta2;
DataCache cache{expected_cache_size, expected_keepunlocked_ratio};
void SetUp() override {
}
......@@ -34,8 +37,7 @@ class DataCacheTests : public Test {
};
TEST_F(DataCacheTests, GetFreeSlotFailsDueToSize) {
uint64_t id;
auto addr = cache.GetFreeSlot(expected_cache_size + 1, &id);
auto addr = cache.GetFreeSlotAndLock(expected_cache_size + 1, &meta1);
ASSERT_THAT(addr, Eq(nullptr));
}
......@@ -46,121 +48,172 @@ void set_array(uint8_t* addr, uint64_t size, uint8_t val) {
}
TEST_F(DataCacheTests, GetFreeSlotOK) {
uint64_t id;
uint8_t* addr = (uint8_t*) cache.GetFreeSlot(1, &id);
uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1);
set_array(addr, 1, 2);
ASSERT_THAT(addr[0], Eq(2));
ASSERT_THAT(id, Gt(0));
ASSERT_THAT(meta1->id, Gt(0));
}
TEST_F(DataCacheTests, GetFreeSlotStartsFromLastPointer) {
uint64_t id1, id2;
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlot(1, &id1);
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1);
set_array(ini_addr, 1, 2);
uint8_t* addr = (uint8_t*) cache.GetFreeSlot(1, &id2);
uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta2);
set_array(addr, 1, 1);
ASSERT_THAT(ini_addr[0], Eq(2));
ASSERT_THAT(ini_addr[1], Eq(1));
ASSERT_THAT(id1, Ne(id2));
ASSERT_THAT(meta1->id, Ne(meta2->id));
}
TEST_F(DataCacheTests, GetFreeSlotLocks) {
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1);
uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2);
ASSERT_THAT(ini_addr, Ne(nullptr));
ASSERT_THAT(addr, Eq(nullptr));
ASSERT_THAT(meta1, Ne(nullptr));
ASSERT_THAT(meta2, Eq(nullptr));
}
TEST_F(DataCacheTests, GetFreeSlotStartsFromBeginIfNotFit) {
uint64_t id1, id2;
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlot(1, &id1);
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(1, &meta1);
auto id = meta1->id;
set_array(ini_addr, 1, 2);
uint8_t* addr = (uint8_t*) cache.GetFreeSlot(expected_cache_size, &id2);
cache.UnlockSlot(meta1);
uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2);
set_array(addr, expected_cache_size, 1);
ASSERT_THAT(ini_addr[0], Eq(1));
ASSERT_THAT(id1, Ne(id2));
ASSERT_THAT(id, Ne(meta2->id));
}
TEST_F(DataCacheTests, GetFreeSlotCannotWriteIfAlreadyWriting) {
cache.GetFreeSlotAndLock(1, &meta1);
uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2);
ASSERT_THAT(addr, Eq(nullptr));
ASSERT_THAT(meta2, Eq(nullptr));
}
TEST_F(DataCacheTests, PrepareToReadIdNotFound) {
uint64_t size, id;
uint64_t id;
id = 0;
uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(id, &size);
uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(id, &meta1);
ASSERT_THAT(addr, Eq(nullptr));
ASSERT_THAT(meta1, Eq(nullptr));
}
TEST_F(DataCacheTests, PrepareToReadOk) {
uint64_t size, id;
uint64_t data_size = expected_cache_size * 0.7;
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlot(data_size, &id);
set_array(ini_addr, data_size, expected_val);
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(data_size, &meta1);
uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(id, &size);
uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, &meta2);
ASSERT_THAT(addr, Eq(ini_addr));
ASSERT_THAT(size, Eq(data_size));
ASSERT_THAT(meta1, Eq(meta2));
ASSERT_THAT(meta2->size, Eq(data_size));
}
TEST_F(DataCacheTests, PrepareToReadFailsIfTooCloseToCurrentPointer) {
uint64_t size, id;
auto data_size = expected_cache_size * 0.9;
cache.GetFreeSlot(data_size, &id);
cache.GetFreeSlotAndLock(data_size, &meta1);
uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(id, &size);
uint8_t* addr = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, &meta2);
ASSERT_THAT(addr, Eq(nullptr));
}
TEST_F(DataCacheTests, GetFreeSlotRemovesOldMetadataRecords) {
uint64_t id1, id2, id3, id4, id5, size;
cache.GetFreeSlot(10, &id1);
cache.GetFreeSlot(10, &id2);
cache.GetFreeSlot(expected_cache_size - 30, &id3);
cache.GetFreeSlot(10, &id4);
cache.GetFreeSlot(30, &id5);
uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(id1, &size);
uint8_t* addr2 = (uint8_t*) cache.GetSlotToReadAndLock(id2, &size);
uint8_t* addr3 = (uint8_t*) cache.GetSlotToReadAndLock(id3, &size);
uint8_t* addr4 = (uint8_t*) cache.GetSlotToReadAndLock(id4, &size);
CacheMeta* meta3, *meta4, *meta5;
CacheMeta* meta;
cache.GetFreeSlotAndLock(10, &meta1);
cache.GetFreeSlotAndLock(10, &meta2);
cache.GetFreeSlotAndLock(expected_cache_size - 30, &meta3);
cache.GetFreeSlotAndLock(10, &meta4);
cache.GetFreeSlotAndLock(30, &meta5);
uint8_t* addr1 = (uint8_t*) cache.GetSlotToReadAndLock(meta1->id, &meta);
uint8_t* addr2 = (uint8_t*) cache.GetSlotToReadAndLock(meta2->id, &meta);
uint8_t* addr3 = (uint8_t*) cache.GetSlotToReadAndLock(meta3->id, &meta);
uint8_t* addr4 = (uint8_t*) cache.GetSlotToReadAndLock(meta4->id, &meta);
ASSERT_THAT(addr1, Eq(nullptr));
ASSERT_THAT(addr2, Eq(nullptr));
ASSERT_THAT(addr3, Eq(nullptr));
ASSERT_THAT(addr4, Ne(nullptr));
ASSERT_THAT(size, Eq(10));
ASSERT_THAT(meta->size, Eq(10));
}
TEST_F(DataCacheTests, CannotGetFreeSlotIfNeedCleanOnebeingReaded) {
uint64_t id1, id2, size;
CacheMeta* meta;
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlot(10, &id1);
uint8_t* ini_addr = (uint8_t*) cache.GetFreeSlotAndLock(10, &meta1);
auto res = cache.GetSlotToReadAndLock(meta1->id, &meta);
uint8_t* addr = (uint8_t*) cache.GetFreeSlotAndLock(expected_cache_size, &meta2);
ASSERT_THAT(ini_addr, Ne(nullptr));
ASSERT_THAT(res, Eq(ini_addr));
ASSERT_THAT(addr, Eq(nullptr));
}
auto res = cache.GetSlotToReadAndLock(id1, &size);
TEST_F(DataCacheTests, CanGetFreeSlotIfWasUnlocked) {
CacheMeta* meta;
cache.GetFreeSlotAndLock(10, &meta1);
cache.UnlockSlot(meta1);
cache.GetSlotToReadAndLock(meta1->id, &meta);
cache.UnlockSlot(meta);
auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2);
uint8_t* addr = (uint8_t*) cache.GetFreeSlot(expected_cache_size, &id2);
ASSERT_THAT(addr, Ne(nullptr));
}
TEST_F(DataCacheTests, IncreasLockForEveryRead) {
CacheMeta* meta;
cache.GetFreeSlotAndLock(10, &meta1);
cache.GetSlotToReadAndLock(meta1->id, &meta);
cache.GetSlotToReadAndLock(meta1->id, &meta);
cache.UnlockSlot(meta);
auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2);
ASSERT_THAT(res, Ne(nullptr));
ASSERT_THAT(ini_addr, Ne(nullptr));
ASSERT_THAT(addr, Eq(nullptr));
}
TEST_F(DataCacheTests, DecreasLockForEveryUnlock) {
CacheMeta* meta;
cache.GetFreeSlotAndLock(10, &meta1);
cache.UnlockSlot(meta1);
TEST_F(DataCacheTests, GetFreeSlotCheckIds) {
uint64_t id1, id2, id3, id4;
cache.GetFreeSlot(10, &id1);
cache.GetSlotToReadAndLock(meta1->id, &meta);
cache.GetSlotToReadAndLock(meta1->id, &meta);
cache.UnlockSlot(meta);
cache.UnlockSlot(meta);
auto addr = cache.GetFreeSlotAndLock(expected_cache_size, &meta2);
ASSERT_THAT(addr, Ne(nullptr));
}
TEST_F(DataCacheTests, GetFreeSlotCreatesCorrectIds) {
CacheMeta* meta3, *meta4;
cache.GetFreeSlotAndLock(10, &meta1);
std::this_thread::sleep_for(std::chrono::milliseconds(100));
cache.GetFreeSlot(10, &id2);
cache.GetFreeSlotAndLock(10, &meta2);
std::this_thread::sleep_for(std::chrono::milliseconds(10));
cache.GetFreeSlot(10, &id3);
cache.GetFreeSlotAndLock(10, &meta3);
std::this_thread::sleep_for(std::chrono::milliseconds(1));
cache.GetFreeSlot(10, &id4);
cache.GetFreeSlotAndLock(10, &meta4);
auto c1 = static_cast<uint32_t>(id1);
auto c2 = static_cast<uint32_t>(id2);
auto c3 = static_cast<uint32_t>(id3);
auto c4 = static_cast<uint32_t>(id4);
auto c1 = static_cast<uint32_t>(meta1->id);
auto c2 = static_cast<uint32_t>(meta2->id);
auto c3 = static_cast<uint32_t>(meta3->id);
auto c4 = static_cast<uint32_t>(meta4->id);
auto t1 = id1 >> 32;
auto t2 = id2 >> 32;
auto t3 = id3 >> 32;
auto t4 = id4 >> 32;
auto t1 = meta1->id >> 32;
auto t2 = meta2->id >> 32;
auto t3 = meta3->id >> 32;
auto t4 = meta4->id >> 32;
ASSERT_THAT(c2, Eq(c1 + 1));
ASSERT_THAT(c3, Eq(c2 + 1));
......@@ -175,6 +228,4 @@ TEST_F(DataCacheTests, GetFreeSlotCheckIds) {
ASSERT_THAT(t4 - t3, Ge(1));
}
}
......@@ -120,11 +120,15 @@ TEST_F(RequestTests, HandleReturnsErrorOnDataReceive) {
TEST_F(RequestTests, HandleGetsMemoryFromCache) {
request->cache__ = &mock_cache;
EXPECT_CALL(mock_cache, GetFreeSlot(data_size_, _)).WillOnce(
DoAll(SetArgPointee<1>(expected_slot_id),
asapo::CacheMeta meta;
meta.id = expected_slot_id;
EXPECT_CALL(mock_cache, GetFreeSlotAndLock(data_size_, _)).WillOnce(
DoAll(SetArgPointee<1>(&meta),
Return(&mock_cache)
));
EXPECT_CALL(mock_cache, UnlockSlot(&meta));
auto err = request->Handle(stat);
ASSERT_THAT(request->GetSlotId(), Eq(expected_slot_id));
......@@ -134,10 +138,13 @@ TEST_F(RequestTests, HandleGetsMemoryFromCache) {
TEST_F(RequestTests, ErrorGetMemoryFromCache) {
request->cache__ = &mock_cache;
EXPECT_CALL(mock_cache, GetFreeSlot(data_size_, _)).WillOnce(
EXPECT_CALL(mock_cache, GetFreeSlotAndLock(data_size_, _)).WillOnce(
Return(nullptr)
);
EXPECT_CALL(mock_cache, UnlockSlot(_)).Times(0);
auto err = request->Handle(stat);
ASSERT_THAT(request->GetSlotId(), Eq(0));
......
......@@ -53,5 +53,5 @@ $1 localhost:8400 ${beamtime_id2} 100 900 4 0 100 &
#producerid=`echo $!`
$2 ${proxy_address} ${beamtime_id1} 2 $token1 5000 | tee /dev/stderr | grep "Processed 1000 file(s)"
$2 ${proxy_address} ${beamtime_id2} 2 $token2 5000 | tee /dev/stderr | grep "Processed 900 file(s)"
$2 ${proxy_address} ${beamtime_id1} 2 $token1 1000 | tee /dev/stderr | grep "Processed 1000 file(s)"
$2 ${proxy_address} ${beamtime_id2} 2 $token2 1000 | tee /dev/stderr | grep "Processed 900 file(s)"
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment