From 49b01ea7b6c22b77b0938e9241a6d38225a8f50e Mon Sep 17 00:00:00 2001 From: LiYiChao Date: Sun, 30 Oct 2022 18:18:28 +0800 Subject: [PATCH] support SharedMutex. --- src/bthread/shared_mutex.cpp | 77 ++++++++++++++++++++++++++++++++++++ src/bthread/shared_mutex.h | 49 +++++++++++++++++++++++ 2 files changed, 126 insertions(+) create mode 100644 src/bthread/shared_mutex.cpp create mode 100644 src/bthread/shared_mutex.h diff --git a/src/bthread/shared_mutex.cpp b/src/bthread/shared_mutex.cpp new file mode 100644 index 0000000000..e01bcff058 --- /dev/null +++ b/src/bthread/shared_mutex.cpp @@ -0,0 +1,77 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "shared_mutex.h" +#include "butex.h" +#include "butil/logging.h" + +namespace bthread { + +SharedMutex::SharedMutex(): _reader_count(0), _reader_wait(0) { + _writer_butex = butex_create_checked(); + *_writer_butex = 0; + _reader_butex = butex_create_checked(); + *_reader_butex = 0; +} +SharedMutex::~SharedMutex() { + butex_destroy(_writer_butex); + butex_destroy(_reader_butex); +} + +void SharedMutex::lock_shared() { + if (_reader_count.fetch_add(1) < 0) { + butex_wait(_reader_butex, 1, nullptr); + *_reader_butex -= 1; + } +} + +void SharedMutex::unlock_shared() { + int32_t r = _reader_count.fetch_add(-1); + if (r < 0) { + unlock_shared_slow(r); + } +} + +void SharedMutex::unlock_shared_slow(int32_t r) { + CHECK(r != 0 && r != -max_readers) << "unlock of unlocked SharedMutex"; + if (_reader_wait.fetch_add(-1) == 1) { + *_writer_butex = 1; + butex_wake(_writer_butex); + } +} + +void SharedMutex::lock() { + _w.lock(); + int32_t r = _reader_count.fetch_add(-max_readers); + if (r != 0 && _reader_wait.fetch_add(r) + r != 0) { + butex_wait(_writer_butex, 1, nullptr); + *_writer_butex = 0; + } +} + +void SharedMutex::unlock() { + int32_t r = _reader_count.fetch_add(max_readers) + max_readers; + for(int32_t i = 0; i < r; i++) { + *_reader_butex += 1; + butex_wake(_reader_butex); + } + _w.unlock(); +} +} + + + diff --git a/src/bthread/shared_mutex.h b/src/bthread/shared_mutex.h new file mode 100644 index 0000000000..027dc0b310 --- /dev/null +++ b/src/bthread/shared_mutex.h @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#ifndef BTHREAD_SHARED_MUTEX_H +#define BTHREAD_SHARED_MUTEX_H + +#include "mutex.h" + +namespace bthread { + +// compatible with c++17 std::shared_mutex, migration from golang +// see https://github.com/golang/go/blob/master/src/sync/rwmutex.go +class SharedMutex { +public: + SharedMutex(); + ~SharedMutex(); + void lock_shared(); + void unlock_shared(); + void lock(); + void unlock(); + +private: + DISALLOW_COPY_AND_ASSIGN(SharedMutex); + void unlock_shared_slow(int32_t r); + + static constexpr int32_t max_readers = 1 << 30; + Mutex _w; + uint32_t* _writer_butex; + uint32_t* _reader_butex; + butil::atomic _reader_count; + butil::atomic _reader_wait; +}; +} + +#endif //BTHREAD_SHARED_MUTEX_H \ No newline at end of file