Add redlock implementation and test to demonstrate a distributed lock using redis.
This commit is contained in:
parent
4258a48d7d
commit
4b9bf8640f
136
include/resply.h
136
include/resply.h
|
@ -14,8 +14,9 @@
|
|||
#include <cstddef>
|
||||
#include <sstream>
|
||||
#include <type_traits>
|
||||
#include <iostream>
|
||||
#include <functional>
|
||||
#include <initializer_list>
|
||||
#include <random>
|
||||
|
||||
|
||||
namespace resply {
|
||||
|
@ -235,6 +236,11 @@ namespace resply {
|
|||
*/
|
||||
const std::string& port() const;
|
||||
|
||||
/*! \brief Checks if the client is connected to a redis server.
|
||||
* \return If the client is connected.
|
||||
*/
|
||||
bool is_connected() const;
|
||||
|
||||
/*! \brief Creates a new pipelined client using this client.
|
||||
* \return A pipelined client.
|
||||
*/
|
||||
|
@ -296,4 +302,132 @@ namespace resply {
|
|||
/*! \brief Internal client implementation. */
|
||||
std::unique_ptr<ClientImpl> impl_;
|
||||
};
|
||||
|
||||
/*! \brief Implementation for a distributed lock based on the Redlock algorithm.
|
||||
*
|
||||
* \see https://redis.io/topics/distlock
|
||||
*/
|
||||
class Redlock {
|
||||
public:
|
||||
/*! \brief Constructs a new distributed lock.
|
||||
* \param resource_name Name of the lock.
|
||||
* \param hosts List of redis servers to lock.
|
||||
*/
|
||||
Redlock(std::string resource_name, const std::vector<std::string>& hosts);
|
||||
|
||||
/*! \brief Constructs a new distributed lock.
|
||||
* \param resource_name Name of the lock.
|
||||
* \param clients List of redis clients to use.
|
||||
*/
|
||||
Redlock(std::string resource_name, std::vector<std::shared_ptr<Client>> clients);
|
||||
|
||||
/*! \brief Constructs a new distributed lock.
|
||||
* \param resource_name Name of the lock.
|
||||
* \param hosts List of redis servers to lock.
|
||||
*/
|
||||
Redlock(std::string resource_name, const std::initializer_list<std::string> hosts);
|
||||
|
||||
/*! \brief Constructs a new distributed lock.
|
||||
* \param resource_name Name of the lock.
|
||||
* \param clients List of redis clients to use.
|
||||
*/
|
||||
Redlock(std::string resource_name, std::initializer_list<std::shared_ptr<Client>> clients);
|
||||
|
||||
/*! \brief Unlocks the distributed lock if needed. */
|
||||
~Redlock();
|
||||
|
||||
/*! \brief Connects all clients to the server.
|
||||
*
|
||||
* This is only needed if the Redlock is constructed using hostnames
|
||||
* or the clients passed into are not connected yet.
|
||||
*/
|
||||
void initialize();
|
||||
|
||||
/*! \brief Locks the distributed lock.
|
||||
* \param ttl Lifetime of the lock.
|
||||
* \return The validity time of the lock.
|
||||
*/
|
||||
size_t lock(size_t ttl);
|
||||
|
||||
/*! \brief Unlocks the distributed lock. */
|
||||
void unlock();
|
||||
|
||||
/*! \brief Gets the number of retries to acquire the lock.
|
||||
* \return The number of retries to the acquire the lock.
|
||||
*/
|
||||
size_t retry_count() const { return retry_count_; }
|
||||
|
||||
/*! \brief Sets the number of retries to acquire the lock.
|
||||
* \param count The new number of retries to acquire the lock.
|
||||
*/
|
||||
void retry_count(size_t count) { retry_count_ = count; }
|
||||
|
||||
/*! \brief Gets the maximum retry delay in milliseconds.
|
||||
* \return The maximum retry delay in milliseconds.
|
||||
*
|
||||
* The actual retry delay is random, this is the upper limit for delay.
|
||||
*/
|
||||
size_t retry_delay_max() const { return retry_delay_max_; }
|
||||
|
||||
/*! \brief Sets the maximum retry delay in milliseconds.
|
||||
* \param delay The maximum retry delay in milliseconds.
|
||||
*
|
||||
* \see retry_delay_max()
|
||||
*/
|
||||
void retry_delay_max(size_t delay) { retry_delay_max_ = delay; }
|
||||
|
||||
private:
|
||||
/*! \brief Acquires the lock on a single instance.
|
||||
* \param client The instance to acquire the lock on.
|
||||
* \param ttl The intended lifetime of the lock.
|
||||
* \return True if the lock was successfully acquired, otherwise false.
|
||||
*/
|
||||
bool lock_instance(std::shared_ptr<Client> client, size_t ttl);
|
||||
|
||||
/*! \brief Releases the lock on a single instance.
|
||||
*
|
||||
* It just tries to unlock and will not care wethever it was successful
|
||||
* or not.
|
||||
*/
|
||||
void unlock_instance(std::shared_ptr<Client> client);
|
||||
|
||||
/*! \brief Generates a random delay value based on #retry_delay_max_
|
||||
* \return The generated random delay.
|
||||
*/
|
||||
std::chrono::milliseconds get_random_delay();
|
||||
|
||||
/*! \brief The clients this distributed lock will try the lock on. */
|
||||
std::vector<std::shared_ptr<Client>> clients_;
|
||||
|
||||
/*! \brief Name of the lock. */
|
||||
const std::string resource_name_;
|
||||
|
||||
/*! \brief Unique randomly-generated lock value. */
|
||||
const std::string lock_value_;
|
||||
|
||||
/*! \brief Amount of times #lock will try to acquire the lock. */
|
||||
size_t retry_count_;
|
||||
|
||||
/*! \brief Maximum retry delay in milliseconds. */
|
||||
size_t retry_delay_max_;
|
||||
|
||||
/*! \brief Random number generator for #get_random_delay(). */
|
||||
std::mt19937 random_number_gen_;
|
||||
|
||||
/*! \brief Generates a random, unique lock value. */
|
||||
static std::string generate_lock_value();
|
||||
|
||||
/*! \brief Lua script for unlocking the lock. */
|
||||
static std::string UNLOCK_SCRIPT_;
|
||||
|
||||
/*! \brief Clock drift divisor.
|
||||
*
|
||||
* This is used to calculate the clock drift to account for based
|
||||
* on the targeted lifetime of the lock.
|
||||
*
|
||||
* The clock drift is caluclated as following:
|
||||
* Lifetime of the lock / clock drift divisor
|
||||
*/
|
||||
static constexpr size_t CLOCK_DRIFT_DIV = 100;
|
||||
};
|
||||
}
|
||||
|
|
147
src/libresply.cc
147
src/libresply.cc
|
@ -13,6 +13,10 @@
|
|||
#include <numeric>
|
||||
#include <unordered_map>
|
||||
#include <cctype>
|
||||
#include <fstream>
|
||||
#include <array>
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
|
||||
#include <asio.hpp>
|
||||
|
||||
|
@ -31,6 +35,16 @@ bool check_asio_error(asio::error_code& error_code)
|
|||
return !!error_code;
|
||||
}
|
||||
|
||||
long get_system_clock_ms()
|
||||
{
|
||||
namespace chrono = std::chrono;
|
||||
|
||||
auto now{chrono::system_clock::now()};
|
||||
auto millisec{chrono::time_point_cast<chrono::milliseconds>(now)};
|
||||
|
||||
return millisec.time_since_epoch().count();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
|
||||
|
@ -173,6 +187,11 @@ public:
|
|||
return port_;
|
||||
}
|
||||
|
||||
bool is_connected() const
|
||||
{
|
||||
return socket_.is_open();
|
||||
}
|
||||
|
||||
bool in_subscribed_mode() const
|
||||
{
|
||||
return channel_callbacks_.size();
|
||||
|
@ -249,6 +268,7 @@ void Client::connect() { impl_->connect(); }
|
|||
void Client::close() { impl_->close(); }
|
||||
const std::string& Client::host() const { return impl_->host(); }
|
||||
const std::string& Client::port() const { return impl_->port(); }
|
||||
bool Client::is_connected() const { return impl_->is_connected(); }
|
||||
|
||||
bool Client::in_subscribed_mode() const
|
||||
{
|
||||
|
@ -305,5 +325,132 @@ Client::Pipeline& Client::Pipeline::finish_command(const std::string& command)
|
|||
return *this;
|
||||
}
|
||||
|
||||
|
||||
Redlock::Redlock(std::string resource_name, const std::vector<std::string>& hosts) :
|
||||
resource_name_{resource_name}, lock_value_{generate_lock_value()},
|
||||
retry_count_{3}, retry_delay_max_{250}, random_number_gen_{std::random_device()()}
|
||||
{
|
||||
for (const std::string& host: hosts) {
|
||||
clients_.push_back(std::make_shared<Client>(host));
|
||||
}
|
||||
}
|
||||
|
||||
Redlock::Redlock(std::string resource_name, std::vector<std::shared_ptr<Client>> clients) :
|
||||
clients_{std::move(clients)}, resource_name_{resource_name}, lock_value_{generate_lock_value()},
|
||||
retry_count_{3}, retry_delay_max_{250}, random_number_gen_{std::random_device()()}
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
Redlock::Redlock(std::string resource_name, const std::initializer_list<std::string> hosts) :
|
||||
resource_name_{resource_name}, lock_value_{generate_lock_value()},
|
||||
retry_count_{3}, retry_delay_max_{250}, random_number_gen_{std::random_device()()}
|
||||
{
|
||||
for (const std::string& host: hosts) {
|
||||
clients_.push_back(std::make_shared<Client>(host));
|
||||
}
|
||||
}
|
||||
|
||||
Redlock::Redlock(std::string resource_name, std::initializer_list<std::shared_ptr<Client>> clients) :
|
||||
clients_{std::move(clients)}, resource_name_{resource_name}, lock_value_{generate_lock_value()},
|
||||
retry_count_{3}, retry_delay_max_{250}, random_number_gen_{std::random_device()()}
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
Redlock::~Redlock()
|
||||
{
|
||||
unlock();
|
||||
}
|
||||
|
||||
void Redlock::initialize()
|
||||
{
|
||||
for (auto& client: clients_) {
|
||||
if (!client->is_connected()) {
|
||||
client->connect();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
size_t Redlock::lock(size_t ttl)
|
||||
{
|
||||
for (size_t retries{}; retries < retry_count_; retries++) {
|
||||
size_t locked{};
|
||||
long start_time{get_system_clock_ms()};
|
||||
|
||||
for (auto& client: clients_) {
|
||||
locked += lock_instance(client, ttl);
|
||||
}
|
||||
|
||||
size_t drift{ttl / CLOCK_DRIFT_DIV};
|
||||
size_t valid_time{ttl - (get_system_clock_ms() - start_time) - drift};
|
||||
|
||||
// We need to have at least N/2 + 1 instances locked
|
||||
if (locked >= (clients_.size() / 2 + 1) && valid_time > 0) {
|
||||
return valid_time;
|
||||
} else {
|
||||
unlock();
|
||||
}
|
||||
|
||||
// Retry after random delay
|
||||
std::this_thread::sleep_for(get_random_delay());
|
||||
}
|
||||
|
||||
return 0;
|
||||
}
|
||||
|
||||
void Redlock::unlock()
|
||||
{
|
||||
for (auto& client: clients_) {
|
||||
unlock_instance(client);
|
||||
}
|
||||
}
|
||||
|
||||
bool Redlock::lock_instance(std::shared_ptr<Client> client, size_t ttl)
|
||||
{
|
||||
auto result{client->command("set", resource_name_, lock_value_, "NX", "PX", ttl)};
|
||||
|
||||
return result.type == Result::Type::String && result.string == "OK";
|
||||
}
|
||||
|
||||
void Redlock::unlock_instance(std::shared_ptr<Client> client)
|
||||
{
|
||||
client->command("eval", UNLOCK_SCRIPT_, 1, resource_name_, lock_value_);
|
||||
}
|
||||
|
||||
std::chrono::milliseconds Redlock::get_random_delay()
|
||||
{
|
||||
std::uniform_int_distribution<> dist(1, retry_delay_max_);
|
||||
|
||||
return std::chrono::milliseconds{dist(random_number_gen_)};
|
||||
}
|
||||
|
||||
std::string Redlock::generate_lock_value()
|
||||
{
|
||||
static const std::string BASE36_LUT{"0123456789abcdefghijklmnopqrstuvwxyz"};
|
||||
|
||||
std::ifstream file{"/dev/urandom", std::ios_base::binary};
|
||||
std::array<char, 20> buffer;
|
||||
file.read(buffer.data(), 20);
|
||||
|
||||
std::string uid;
|
||||
for (unsigned char byte: buffer) {
|
||||
while (byte) {
|
||||
uid += BASE36_LUT[byte % 36];
|
||||
byte /= 36;
|
||||
}
|
||||
}
|
||||
|
||||
return uid;
|
||||
}
|
||||
|
||||
std::string Redlock::UNLOCK_SCRIPT_ = R"(
|
||||
if redis.call('get', KEYS[1]) == ARGV[1] then
|
||||
return redis.call('del', KEYS[1])
|
||||
else
|
||||
return 0
|
||||
end
|
||||
)";
|
||||
|
||||
}
|
||||
|
||||
|
|
36
tests/distlock.cc
Normal file
36
tests/distlock.cc
Normal file
|
@ -0,0 +1,36 @@
|
|||
//
|
||||
// Copyright 2018 Christoph Heiss <me@christoph-heiss.me>
|
||||
// Distributed under the Boost Software License, Version 1.0.
|
||||
//
|
||||
// See accompanying file LICENSE in the project root directory
|
||||
// or copy at http://www.boost.org/LICENSE_1_0.txt
|
||||
//
|
||||
|
||||
#include <iostream>
|
||||
#include "resply.h"
|
||||
|
||||
|
||||
int main()
|
||||
{
|
||||
resply::Redlock rlock1{"resply-test", {
|
||||
"localhost:6379", "localhost:6380", "localhost:6381",
|
||||
"localhost:6382", "localhost:6383"
|
||||
}};
|
||||
rlock1.initialize();
|
||||
|
||||
resply::Redlock rlock2{"resply-test", {
|
||||
"localhost:6379", "localhost:6380", "localhost:6381",
|
||||
"localhost:6382", "localhost:6383"
|
||||
}};
|
||||
rlock2.initialize();
|
||||
|
||||
std::cout << "Locking lock 1 (should succeed) ... ";
|
||||
size_t status1{rlock1.lock(750)};
|
||||
std::cout << (status1 ? "success" : "failed") << std::endl;
|
||||
|
||||
std::cout << "Locking lock 2 (should fail) ... ";
|
||||
size_t status2{rlock2.lock(500)};
|
||||
std::cout << (status2 ? "success" : "failed") << std::endl;
|
||||
|
||||
return status1 && !status2;
|
||||
}
|
Reference in a new issue