From 4b9bf8640fd3fcd4e1ebc00a0fb3270442b4e76f Mon Sep 17 00:00:00 2001 From: Christoph Heiss Date: Mon, 5 Mar 2018 15:50:32 +0100 Subject: [PATCH] Add redlock implementation and test to demonstrate a distributed lock using redis. --- include/resply.h | 136 +++++++++++++++++++++++++++++++++++++++++- src/libresply.cc | 147 ++++++++++++++++++++++++++++++++++++++++++++++ tests/distlock.cc | 36 ++++++++++++ 3 files changed, 318 insertions(+), 1 deletion(-) create mode 100644 tests/distlock.cc diff --git a/include/resply.h b/include/resply.h index d13415e..35689e2 100644 --- a/include/resply.h +++ b/include/resply.h @@ -14,8 +14,9 @@ #include #include #include -#include #include +#include +#include namespace resply { @@ -235,6 +236,11 @@ namespace resply { */ const std::string& port() const; + /*! \brief Checks if the client is connected to a redis server. + * \return If the client is connected. + */ + bool is_connected() const; + /*! \brief Creates a new pipelined client using this client. * \return A pipelined client. */ @@ -296,4 +302,132 @@ namespace resply { /*! \brief Internal client implementation. */ std::unique_ptr impl_; }; + + /*! \brief Implementation for a distributed lock based on the Redlock algorithm. + * + * \see https://redis.io/topics/distlock + */ + class Redlock { + public: + /*! \brief Constructs a new distributed lock. + * \param resource_name Name of the lock. + * \param hosts List of redis servers to lock. + */ + Redlock(std::string resource_name, const std::vector& hosts); + + /*! \brief Constructs a new distributed lock. + * \param resource_name Name of the lock. + * \param clients List of redis clients to use. + */ + Redlock(std::string resource_name, std::vector> clients); + + /*! \brief Constructs a new distributed lock. + * \param resource_name Name of the lock. + * \param hosts List of redis servers to lock. + */ + Redlock(std::string resource_name, const std::initializer_list hosts); + + /*! \brief Constructs a new distributed lock. + * \param resource_name Name of the lock. + * \param clients List of redis clients to use. + */ + Redlock(std::string resource_name, std::initializer_list> clients); + + /*! \brief Unlocks the distributed lock if needed. */ + ~Redlock(); + + /*! \brief Connects all clients to the server. + * + * This is only needed if the Redlock is constructed using hostnames + * or the clients passed into are not connected yet. + */ + void initialize(); + + /*! \brief Locks the distributed lock. + * \param ttl Lifetime of the lock. + * \return The validity time of the lock. + */ + size_t lock(size_t ttl); + + /*! \brief Unlocks the distributed lock. */ + void unlock(); + + /*! \brief Gets the number of retries to acquire the lock. + * \return The number of retries to the acquire the lock. + */ + size_t retry_count() const { return retry_count_; } + + /*! \brief Sets the number of retries to acquire the lock. + * \param count The new number of retries to acquire the lock. + */ + void retry_count(size_t count) { retry_count_ = count; } + + /*! \brief Gets the maximum retry delay in milliseconds. + * \return The maximum retry delay in milliseconds. + * + * The actual retry delay is random, this is the upper limit for delay. + */ + size_t retry_delay_max() const { return retry_delay_max_; } + + /*! \brief Sets the maximum retry delay in milliseconds. + * \param delay The maximum retry delay in milliseconds. + * + * \see retry_delay_max() + */ + void retry_delay_max(size_t delay) { retry_delay_max_ = delay; } + + private: + /*! \brief Acquires the lock on a single instance. + * \param client The instance to acquire the lock on. + * \param ttl The intended lifetime of the lock. + * \return True if the lock was successfully acquired, otherwise false. + */ + bool lock_instance(std::shared_ptr client, size_t ttl); + + /*! \brief Releases the lock on a single instance. + * + * It just tries to unlock and will not care wethever it was successful + * or not. + */ + void unlock_instance(std::shared_ptr client); + + /*! \brief Generates a random delay value based on #retry_delay_max_ + * \return The generated random delay. + */ + std::chrono::milliseconds get_random_delay(); + + /*! \brief The clients this distributed lock will try the lock on. */ + std::vector> clients_; + + /*! \brief Name of the lock. */ + const std::string resource_name_; + + /*! \brief Unique randomly-generated lock value. */ + const std::string lock_value_; + + /*! \brief Amount of times #lock will try to acquire the lock. */ + size_t retry_count_; + + /*! \brief Maximum retry delay in milliseconds. */ + size_t retry_delay_max_; + + /*! \brief Random number generator for #get_random_delay(). */ + std::mt19937 random_number_gen_; + + /*! \brief Generates a random, unique lock value. */ + static std::string generate_lock_value(); + + /*! \brief Lua script for unlocking the lock. */ + static std::string UNLOCK_SCRIPT_; + + /*! \brief Clock drift divisor. + * + * This is used to calculate the clock drift to account for based + * on the targeted lifetime of the lock. + * + * The clock drift is caluclated as following: + * Lifetime of the lock / clock drift divisor + */ + static constexpr size_t CLOCK_DRIFT_DIV = 100; + }; } diff --git a/src/libresply.cc b/src/libresply.cc index 9e4c8e2..565e2d6 100644 --- a/src/libresply.cc +++ b/src/libresply.cc @@ -13,6 +13,10 @@ #include #include #include +#include +#include +#include +#include #include @@ -31,6 +35,16 @@ bool check_asio_error(asio::error_code& error_code) return !!error_code; } +long get_system_clock_ms() +{ + namespace chrono = std::chrono; + + auto now{chrono::system_clock::now()}; + auto millisec{chrono::time_point_cast(now)}; + + return millisec.time_since_epoch().count(); +} + } @@ -173,6 +187,11 @@ public: return port_; } + bool is_connected() const + { + return socket_.is_open(); + } + bool in_subscribed_mode() const { return channel_callbacks_.size(); @@ -249,6 +268,7 @@ void Client::connect() { impl_->connect(); } void Client::close() { impl_->close(); } const std::string& Client::host() const { return impl_->host(); } const std::string& Client::port() const { return impl_->port(); } +bool Client::is_connected() const { return impl_->is_connected(); } bool Client::in_subscribed_mode() const { @@ -305,5 +325,132 @@ Client::Pipeline& Client::Pipeline::finish_command(const std::string& command) return *this; } + +Redlock::Redlock(std::string resource_name, const std::vector& hosts) : + resource_name_{resource_name}, lock_value_{generate_lock_value()}, + retry_count_{3}, retry_delay_max_{250}, random_number_gen_{std::random_device()()} +{ + for (const std::string& host: hosts) { + clients_.push_back(std::make_shared(host)); + } +} + +Redlock::Redlock(std::string resource_name, std::vector> clients) : + clients_{std::move(clients)}, resource_name_{resource_name}, lock_value_{generate_lock_value()}, + retry_count_{3}, retry_delay_max_{250}, random_number_gen_{std::random_device()()} +{ + +} + +Redlock::Redlock(std::string resource_name, const std::initializer_list hosts) : + resource_name_{resource_name}, lock_value_{generate_lock_value()}, + retry_count_{3}, retry_delay_max_{250}, random_number_gen_{std::random_device()()} +{ + for (const std::string& host: hosts) { + clients_.push_back(std::make_shared(host)); + } +} + +Redlock::Redlock(std::string resource_name, std::initializer_list> clients) : + clients_{std::move(clients)}, resource_name_{resource_name}, lock_value_{generate_lock_value()}, + retry_count_{3}, retry_delay_max_{250}, random_number_gen_{std::random_device()()} +{ + +} + +Redlock::~Redlock() +{ + unlock(); +} + +void Redlock::initialize() +{ + for (auto& client: clients_) { + if (!client->is_connected()) { + client->connect(); + } + } +} + +size_t Redlock::lock(size_t ttl) +{ + for (size_t retries{}; retries < retry_count_; retries++) { + size_t locked{}; + long start_time{get_system_clock_ms()}; + + for (auto& client: clients_) { + locked += lock_instance(client, ttl); + } + + size_t drift{ttl / CLOCK_DRIFT_DIV}; + size_t valid_time{ttl - (get_system_clock_ms() - start_time) - drift}; + + // We need to have at least N/2 + 1 instances locked + if (locked >= (clients_.size() / 2 + 1) && valid_time > 0) { + return valid_time; + } else { + unlock(); + } + + // Retry after random delay + std::this_thread::sleep_for(get_random_delay()); + } + + return 0; +} + +void Redlock::unlock() +{ + for (auto& client: clients_) { + unlock_instance(client); + } +} + +bool Redlock::lock_instance(std::shared_ptr client, size_t ttl) +{ + auto result{client->command("set", resource_name_, lock_value_, "NX", "PX", ttl)}; + + return result.type == Result::Type::String && result.string == "OK"; +} + +void Redlock::unlock_instance(std::shared_ptr client) +{ + client->command("eval", UNLOCK_SCRIPT_, 1, resource_name_, lock_value_); +} + +std::chrono::milliseconds Redlock::get_random_delay() +{ + std::uniform_int_distribution<> dist(1, retry_delay_max_); + + return std::chrono::milliseconds{dist(random_number_gen_)}; +} + +std::string Redlock::generate_lock_value() +{ + static const std::string BASE36_LUT{"0123456789abcdefghijklmnopqrstuvwxyz"}; + + std::ifstream file{"/dev/urandom", std::ios_base::binary}; + std::array buffer; + file.read(buffer.data(), 20); + + std::string uid; + for (unsigned char byte: buffer) { + while (byte) { + uid += BASE36_LUT[byte % 36]; + byte /= 36; + } + } + + return uid; +} + +std::string Redlock::UNLOCK_SCRIPT_ = R"( +if redis.call('get', KEYS[1]) == ARGV[1] then + return redis.call('del', KEYS[1]) +else + return 0 +end +)"; + } diff --git a/tests/distlock.cc b/tests/distlock.cc new file mode 100644 index 0000000..c402ff2 --- /dev/null +++ b/tests/distlock.cc @@ -0,0 +1,36 @@ +// +// Copyright 2018 Christoph Heiss +// Distributed under the Boost Software License, Version 1.0. +// +// See accompanying file LICENSE in the project root directory +// or copy at http://www.boost.org/LICENSE_1_0.txt +// + +#include +#include "resply.h" + + +int main() +{ + resply::Redlock rlock1{"resply-test", { + "localhost:6379", "localhost:6380", "localhost:6381", + "localhost:6382", "localhost:6383" + }}; + rlock1.initialize(); + + resply::Redlock rlock2{"resply-test", { + "localhost:6379", "localhost:6380", "localhost:6381", + "localhost:6382", "localhost:6383" + }}; + rlock2.initialize(); + + std::cout << "Locking lock 1 (should succeed) ... "; + size_t status1{rlock1.lock(750)}; + std::cout << (status1 ? "success" : "failed") << std::endl; + + std::cout << "Locking lock 2 (should fail) ... "; + size_t status2{rlock2.lock(500)}; + std::cout << (status2 ? "success" : "failed") << std::endl; + + return status1 && !status2; +}