add support for rwlock#1031
Closed
hairet wants to merge 5 commits into
Closed
Conversation
Contributor
|
这个实现性能有很大优化吧。这个无脑 butex 唤醒有问题吧?理想状态下应该有个 flag 标识有没有真的有 bthread 在等待。 |
Contributor
|
我的实现是这样的,由于 bthread 的 butex 没有按 tag 唤醒,有可能产生读写者同时唤醒(惊群) #pragma once
#include <bthread/bthread.h>
#include <bthread/butex.h>
#include <butil/atomicops.h>
#include <cassert>
namespace bcache {
class SingleWriterBthreadRWLock {
public:
SingleWriterBthreadRWLock() : flag_(bthread::butex_create_checked<unsigned int>()) { *flag_ = 0; }
~SingleWriterBthreadRWLock() { bthread::butex_destroy(flag_); }
void lock_shared() { CHECK_EQ(0, ReadLock()); }
void unlock_shared() { ReadUnlock(); }
void lock() { CHECK_EQ(0, WriteLock()); }
void unlock() { WriteUnlock(); }
private:
int ReadLock() {
auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
unsigned int state;
wait_stage:
while ((state = f.load(std::memory_order_relaxed)) & kLockMask) {
unsigned int after = state | kWaitMask;
while (!(state & kWaitMask) && !f.compare_exchange_weak(state, after, std::memory_order_relaxed,
std::memory_order_relaxed)) {
if (!(state & kLockMask)) {
goto lock_stage;
}
after = state | kWaitMask;
}
if (bthread::butex_wait(flag_, after, nullptr) < 0 && errno != EWOULDBLOCK && errno != EINTR) {
return errno;
}
}
lock_stage:
if (f.fetch_add(1, std::memory_order_acquire) & kLockMask) {
state = f.fetch_sub(1, std::memory_order_relaxed) - 1;
if ((state & (kWaitMask | kReaderCountMask)) == kWaitMask) {
state = f.fetch_and(~kWaitMask, std::memory_order_relaxed);
if (state & kWaitMask) {
bthread::butex_wake_all(flag_);
}
}
if (state & kLockMask) {
goto wait_stage;
} else {
goto lock_stage;
}
}
return 0;
}
void ReadUnlock() {
auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
unsigned int state = f.fetch_sub(1, std::memory_order_release) - 1;
if ((state & (kWaitMask | kReaderCountMask)) == kWaitMask) {
assert(state & kLockMask);
state = f.fetch_and(~kWaitMask, std::memory_order_relaxed);
if (state & kWaitMask) {
bthread::butex_wake_all(flag_);
}
}
}
int WriteLock() {
auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
unsigned int state;
if ((state = f.fetch_or(kLockMask, std::memory_order_acquire))) {
assert(!(state & (kLockMask | kWaitMask)));
state |= kLockMask;
do {
assert(state & kLockMask);
unsigned int after = state | kWaitMask;
while (!(state & kWaitMask) &&
!f.compare_exchange_weak(state, after, std::memory_order_relaxed,
std::memory_order_acquire)) {
assert(state & kLockMask);
if (!(state & kReaderCountMask)) {
return 0;
}
after = state | kWaitMask;
}
if (bthread::butex_wait(flag_, after, nullptr) < 0 && errno != EWOULDBLOCK &&
errno != EINTR) {
WriteUnlock();
return errno;
}
} while ((state = f.load(std::memory_order_acquire)) & kReaderCountMask);
}
return 0;
}
void WriteUnlock() {
auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
unsigned int state = f.fetch_and(~(kLockMask | kWaitMask), std::memory_order_release);
assert(state & kLockMask);
if (state & kWaitMask) {
bthread::butex_wake_all(flag_);
}
}
private:
static constexpr unsigned int kLockMask = 1U << 31U;
static constexpr unsigned int kWaitMask = 1U << 30U;
static constexpr unsigned int kReaderCountMask = (1U << 30U) - 1U;
unsigned int* const flag_;
};
class GeneralBthreadRWLock {
public:
void lock_shared() { internal_rwlock_.lock_shared(); }
void unlock_shared() { internal_rwlock_.unlock_shared(); }
void lock() {
writer_mtx_.lock();
internal_rwlock_.lock();
}
void unlock() {
internal_rwlock_.unlock();
writer_mtx_.unlock();
}
private:
bthread::Mutex writer_mtx_;
SingleWriterBthreadRWLock internal_rwlock_;
};
} // namespace bcache
|
polyrabbit
reviewed
Mar 5, 2022
| if(r != 0) { | ||
| if(bthread::butex_wait(whole, r, NULL) < 0 && | ||
| errno != EWOULDBLOCK && errno != EINTR) { | ||
| whole->fetch_sub(1); |
There was a problem hiding this comment.
这里需要减一的为啥是whole?我感觉应该是w_wait_count吧?
Author
There was a problem hiding this comment.
是的,应该是w_wait_count,我们内部版本很久以前改过了, 很久没来这里更新了,抱歉
Contributor
|
@hairet 这个PR和master冲突了,可以解决一下吗 |
Closed
Contributor
|
Closed this as completed in #2752. |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
补充写优先读写锁实现,变量和状态应该算是比较简洁的了,参考了内核写优先设计。
UT里有性能相关测试