Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
77 changes: 77 additions & 0 deletions src/bthread/shared_mutex.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#include "shared_mutex.h"
#include "butex.h"
#include "butil/logging.h"

namespace bthread {

SharedMutex::SharedMutex(): _reader_count(0), _reader_wait(0) {
Comment thread
lycplus marked this conversation as resolved.
_writer_butex = butex_create_checked<uint32_t>();
*_writer_butex = 0;
_reader_butex = butex_create_checked<uint32_t>();
*_reader_butex = 0;
}
Comment thread
lycplus marked this conversation as resolved.
SharedMutex::~SharedMutex() {
butex_destroy(_writer_butex);
butex_destroy(_reader_butex);
}

void SharedMutex::lock_shared() {
if (_reader_count.fetch_add(1) < 0) {
butex_wait(_reader_butex, 1, nullptr);
*_reader_butex -= 1;
}
}

void SharedMutex::unlock_shared() {
int32_t r = _reader_count.fetch_add(-1);
if (r < 0) {
unlock_shared_slow(r);
}
}

void SharedMutex::unlock_shared_slow(int32_t r) {
CHECK(r != 0 && r != -max_readers) << "unlock of unlocked SharedMutex";
if (_reader_wait.fetch_add(-1) == 1) {
*_writer_butex = 1;
butex_wake(_writer_butex);
Comment thread
lycplus marked this conversation as resolved.
}
}

void SharedMutex::lock() {
_w.lock();
int32_t r = _reader_count.fetch_add(-max_readers);
if (r != 0 && _reader_wait.fetch_add(r) + r != 0) {
butex_wait(_writer_butex, 1, nullptr);
*_writer_butex = 0;
}
}

void SharedMutex::unlock() {
int32_t r = _reader_count.fetch_add(max_readers) + max_readers;
for(int32_t i = 0; i < r; i++) {
*_reader_butex += 1;
butex_wake(_reader_butex);
Comment thread
lycplus marked this conversation as resolved.
}
_w.unlock();
}
}



49 changes: 49 additions & 0 deletions src/bthread/shared_mutex.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

#ifndef BTHREAD_SHARED_MUTEX_H
#define BTHREAD_SHARED_MUTEX_H

#include "mutex.h"
Comment thread
lycplus marked this conversation as resolved.

namespace bthread {

// compatible with c++17 std::shared_mutex, migration from golang
// see https://github.com/golang/go/blob/master/src/sync/rwmutex.go
class SharedMutex {
public:
SharedMutex();
~SharedMutex();
void lock_shared();
void unlock_shared();
void lock();
void unlock();

private:
DISALLOW_COPY_AND_ASSIGN(SharedMutex);
void unlock_shared_slow(int32_t r);

static constexpr int32_t max_readers = 1 << 30;
Mutex _w;
uint32_t* _writer_butex;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

改成butil::atomic<uint32_t>*,保证操作的内存序

uint32_t* _reader_butex;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

同上

butil::atomic<int32_t> _reader_count;
butil::atomic<int32_t> _reader_wait;
};
}

#endif //BTHREAD_SHARED_MUTEX_H