Skip to content

add support for rwlock#1031

Closed
hairet wants to merge 5 commits into
apache:masterfrom
hairet:rwlock_branche
Closed

add support for rwlock#1031
hairet wants to merge 5 commits into
apache:masterfrom
hairet:rwlock_branche

Conversation

@hairet
Copy link
Copy Markdown

@hairet hairet commented Jan 19, 2020

补充写优先读写锁实现,变量和状态应该算是比较简洁的了,参考了内核写优先设计。
UT里有性能相关测试

@JimChengLin
Copy link
Copy Markdown
Contributor

这个实现性能有很大优化吧。这个无脑 butex 唤醒有问题吧?理想状态下应该有个 flag 标识有没有真的有 bthread 在等待。

@JimChengLin
Copy link
Copy Markdown
Contributor

JimChengLin commented May 10, 2021

我的实现是这样的,由于 bthread 的 butex 没有按 tag 唤醒,有可能产生读写者同时唤醒(惊群)

#pragma once

#include <bthread/bthread.h>
#include <bthread/butex.h>
#include <butil/atomicops.h>

#include <cassert>

namespace bcache {

class SingleWriterBthreadRWLock {
 public:
    SingleWriterBthreadRWLock() : flag_(bthread::butex_create_checked<unsigned int>()) { *flag_ = 0; }

    ~SingleWriterBthreadRWLock() { bthread::butex_destroy(flag_); }

    void lock_shared() { CHECK_EQ(0, ReadLock()); }

    void unlock_shared() { ReadUnlock(); }

    void lock() { CHECK_EQ(0, WriteLock()); }

    void unlock() { WriteUnlock(); }

 private:
    int ReadLock() {
        auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
        unsigned int state;
    wait_stage:
        while ((state = f.load(std::memory_order_relaxed)) & kLockMask) {
            unsigned int after = state | kWaitMask;
            while (!(state & kWaitMask) && !f.compare_exchange_weak(state, after, std::memory_order_relaxed,
                                                                    std::memory_order_relaxed)) {
                if (!(state & kLockMask)) {
                    goto lock_stage;
                }
                after = state | kWaitMask;
            }
            if (bthread::butex_wait(flag_, after, nullptr) < 0 && errno != EWOULDBLOCK && errno != EINTR) {
                return errno;
            }
        }
    lock_stage:
        if (f.fetch_add(1, std::memory_order_acquire) & kLockMask) {
            state = f.fetch_sub(1, std::memory_order_relaxed) - 1;
            if ((state & (kWaitMask | kReaderCountMask)) == kWaitMask) {
                state = f.fetch_and(~kWaitMask, std::memory_order_relaxed);
                if (state & kWaitMask) {
                    bthread::butex_wake_all(flag_);
                }
            }
            if (state & kLockMask) {
                goto wait_stage;
            } else {
                goto lock_stage;
            }
        }
        return 0;
    }

    void ReadUnlock() {
        auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
        unsigned int state = f.fetch_sub(1, std::memory_order_release) - 1;
        if ((state & (kWaitMask | kReaderCountMask)) == kWaitMask) {
            assert(state & kLockMask);
            state = f.fetch_and(~kWaitMask, std::memory_order_relaxed);
            if (state & kWaitMask) {
                bthread::butex_wake_all(flag_);
            }
        }
    }

    int WriteLock() {
        auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
        unsigned int state;
        if ((state = f.fetch_or(kLockMask, std::memory_order_acquire))) {
            assert(!(state & (kLockMask | kWaitMask)));
            state |= kLockMask;
            do {
                assert(state & kLockMask);
                unsigned int after = state | kWaitMask;
                while (!(state & kWaitMask) &&
                       !f.compare_exchange_weak(state, after, std::memory_order_relaxed,
                                                std::memory_order_acquire)) {
                    assert(state & kLockMask);
                    if (!(state & kReaderCountMask)) {
                        return 0;
                    }
                    after = state | kWaitMask;
                }
                if (bthread::butex_wait(flag_, after, nullptr) < 0 && errno != EWOULDBLOCK &&
                    errno != EINTR) {
                    WriteUnlock();
                    return errno;
                }
            } while ((state = f.load(std::memory_order_acquire)) & kReaderCountMask);
        }
        return 0;
    }

    void WriteUnlock() {
        auto& f = *reinterpret_cast<butil::atomic<unsigned int>*>(flag_);
        unsigned int state = f.fetch_and(~(kLockMask | kWaitMask), std::memory_order_release);
        assert(state & kLockMask);
        if (state & kWaitMask) {
            bthread::butex_wake_all(flag_);
        }
    }

 private:
    static constexpr unsigned int kLockMask = 1U << 31U;
    static constexpr unsigned int kWaitMask = 1U << 30U;
    static constexpr unsigned int kReaderCountMask = (1U << 30U) - 1U;

    unsigned int* const flag_;
};

class GeneralBthreadRWLock {
 public:
    void lock_shared() { internal_rwlock_.lock_shared(); }

    void unlock_shared() { internal_rwlock_.unlock_shared(); }

    void lock() {
        writer_mtx_.lock();
        internal_rwlock_.lock();
    }

    void unlock() {
        internal_rwlock_.unlock();
        writer_mtx_.unlock();
    }

 private:
    bthread::Mutex writer_mtx_;
    SingleWriterBthreadRWLock internal_rwlock_;
};

}  // namespace bcache

Comment thread src/bthread/rwlock.cpp Outdated
if(r != 0) {
if(bthread::butex_wait(whole, r, NULL) < 0 &&
errno != EWOULDBLOCK && errno != EINTR) {
whole->fetch_sub(1);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里需要减一的为啥是whole?我感觉应该是w_wait_count吧?

Copy link
Copy Markdown
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

是的,应该是w_wait_count,我们内部版本很久以前改过了, 很久没来这里更新了,抱歉

@wwbmmm
Copy link
Copy Markdown
Contributor

wwbmmm commented Sep 5, 2022

@hairet 这个PR和master冲突了,可以解决一下吗

@chenBright
Copy link
Copy Markdown
Contributor

Closed this as completed in #2752.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants