diff --git a/protos/rslp.proto b/protos/rslp.proto index 0cc00e2..7db82d5 100644 --- a/protos/rslp.proto +++ b/protos/rslp.proto @@ -25,3 +25,7 @@ message Command { } +service ProtoAdapter { + rpc execute(Command) returns (Command) {} + rpc subscribe(Command) returns (stream Command) {} +} diff --git a/src/proxy.cc b/src/proxy.cc index 1a22da2..ee2d29e 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -16,6 +16,7 @@ #include #include #include +#include #include #include "asio.hpp" @@ -24,29 +25,40 @@ #include "nlohmann/json.hpp" #include "resply.h" #include "rslp.pb.h" +#include "grpc++/grpc++.h" + +#pragma GCC diagnostic push +#pragma GCC diagnostic ignored "-Wunused-parameter" +#include "rslp.grpc.pb.h" +#pragma GCC diagnostic pop using json = nlohmann::json; using namespace google; -const std::string LOGGER_NAME{"proxy-log"}; - - struct Options { Options() : config_path{".proxy-conf.json"}, daemonize{}, log_path{"proxy.log"}, - port{6543}, remote_host{"localhost:6379"}, verbose{} { } + protobuf_port{6543}, grpc_port{"6544"}, remote_host{"localhost:6379"}, verbose{} { } std::string config_path; bool daemonize; std::string log_path; - unsigned short port; + unsigned short protobuf_port; + std::string grpc_port; std::string remote_host; bool verbose; }; -static Options parse_commandline(int argc, char** argv) +namespace { + +const std::string GLOBAL_LOGGER_NAME{"proxy"}; + +void resply_result_to_rslp_data(rslp::Command_Data* data, const resply::Result& result); + + +Options parse_commandline(int argc, char** argv) { Options options; bool show_help{}, show_version{}; @@ -57,8 +69,10 @@ static Options parse_commandline(int argc, char** argv) clipp::option("-d", "--daemonize").set(options.daemonize).doc("Fork to background."), clipp::option("-l", "--log-path").set(options.log_path) .doc("Path to the log file [default: $CWD/proxy.log] (Only applies when not daemonized.)"), - clipp::option("-p", "--port").set(options.port) - .doc("Port to listen on [default: 6543]"), + clipp::option("--protobuf-port").set(options.protobuf_port) + .doc("Port the protobuf server should listen on [default: 6543]"), + clipp::option("--grpc-port").set(options.grpc_port) + .doc("Port the gRPC server should listen on [default: 6544]"), clipp::option("-r", "--remote-host").set(options.remote_host) .doc("Host (redis-server) to connect to [default: localhost:6379]"), clipp::option("-v", "--verbose").set(options.verbose).doc("Enable verbose logging."), @@ -83,17 +97,16 @@ static Options parse_commandline(int argc, char** argv) return options; } - -static void cleanup() +void cleanup() { - spdlog::get(LOGGER_NAME)->info("Shutting down."); + spdlog::get(GLOBAL_LOGGER_NAME)->info("Shutting down."); spdlog::drop_all(); google::protobuf::ShutdownProtobufLibrary(); } -static void install_signal_handler() +void install_signal_handler() { std::thread{[&]() { ::sigset_t sigset; @@ -110,10 +123,9 @@ static void install_signal_handler() }}.detach(); } - -static void daemonize_process() +void daemonize_process() { - auto logger{spdlog::get(LOGGER_NAME)}; + auto logger{spdlog::get(GLOBAL_LOGGER_NAME)}; pid_t pid{::fork()}; @@ -146,16 +158,75 @@ static void daemonize_process() ::freopen("/dev/null", "w", ::stderr); } +void resply_result_to_rslp(rslp::Command& command, const resply::Result& result) +{ + using Type = resply::Result::Type; + + switch (result.type) { + case Type::ProtocolError: + case Type::IOError: + command.add_data()->set_err(result.string); + break; + + case Type::String: + command.add_data()->set_str(result.string); + break; + + case Type::Integer: + command.add_data()->set_int_(result.integer); + break; + + case Type::Array: + for (const auto& element: result.array) { + resply_result_to_rslp_data(command.add_data(), element); + } + + break; + + case Type::Nil: + break; + } +} + +void resply_result_to_rslp_data(rslp::Command_Data* data, const resply::Result& result) +{ + using Type = resply::Result::Type; + + switch (result.type) { + case Type::String: + data->set_str(result.string); + break; + + case Type::Integer: + data->set_int_(result.integer); + break; + + case Type::Array: + for (const auto& element: result.array) { + resply_result_to_rslp(*data->mutable_array(), element); + } + break; + + default: + /* Cannot happen */ + break; + } +} + +} + class ProtobufAdapter { public: - ProtobufAdapter(const std::string& redis_host, asio::io_context& io_context) : - client_{redis_host}, socket_{io_context}, logger_{spdlog::get(LOGGER_NAME)} + ProtobufAdapter(const std::string& redis_host, asio::io_context& io_context, + std::shared_ptr logsink) : + client_{redis_host}, socket_{io_context}, + logger_{std::make_shared(LOGGER_NAME, logsink)} { } ~ProtobufAdapter() { - logger_->info("Connection to {} closed.", remote_address_); + logger_->info("[{}] Connection closed.", remote_address_); } asio::ip::tcp::socket& socket() @@ -183,8 +254,7 @@ public: rslp::Command command; command.ParseFromString(request); - std::string debug_string{command.ShortDebugString()}; - logger_->debug("Received protobuf message '{}' from {}", debug_string, remote_address_); + logger_->debug("[{}] Received message '{}'", remote_address_, command.ShortDebugString()); std::vector resply_command; for (const auto& arg: command.data()) { @@ -252,77 +322,162 @@ private: asio::write(socket_, asio::buffer(output.data(), output.size())); } - static void resply_result_to_rslp(rslp::Command& command, const resply::Result& result) - { - using Type = resply::Result::Type; - - switch (result.type) { - case Type::ProtocolError: - case Type::IOError: - command.add_data()->set_err(result.string); - break; - - case Type::String: - command.add_data()->set_str(result.string); - break; - - case Type::Integer: - command.add_data()->set_int_(result.integer); - break; - - case Type::Array: - for (const auto& element: result.array) { - resply_result_to_rslp_data(command.add_data(), element); - } - - break; - - case Type::Nil: - break; - } - } - - static void resply_result_to_rslp_data(rslp::Command_Data* data, const resply::Result& result) - { - using Type = resply::Result::Type; - - switch (result.type) { - case Type::String: - data->set_str(result.string); - break; - - case Type::Integer: - data->set_int_(result.integer); - break; - - case Type::Array: - for (const auto& element: result.array) { - resply_result_to_rslp(*data->mutable_array(), element); - } - break; - - default: - /* Cannot happen */ - break; - } - } resply::Client client_; asio::ip::tcp::socket socket_; std::shared_ptr logger_; std::string remote_address_; + + const std::string LOGGER_NAME{"ProtobufAdapter"}; +}; + +class ProtobufServer { +public: + void start(const Options& options, std::shared_ptr logsink) + { + auto logger{std::make_shared(LOGGER_NAME, logsink)}; + + asio::io_context io_context; + asio::ip::tcp::acceptor acceptor{io_context}; + + try { + acceptor = {io_context, {asio::ip::tcp::v4(), options.protobuf_port}}; + } catch (const asio::system_error& ex) { + logger->error("Could not start protobuf server on 0.0.0.0:{}, exiting! ({})", + options.protobuf_port, ex.what()); + return; + } + + logger->info("Started protobuf server on 0.0.0.0:{}", options.protobuf_port); + + for (;;) { + auto server{std::make_shared( + options.remote_host, io_context, logsink + )}; + acceptor.accept(server->socket()); + + std::thread{&ProtobufAdapter::start, server}.detach(); + } + } + +private: + const std::string LOGGER_NAME{"ProtobufServer"}; +}; + + +class GrpcAdapter final : public rslp::ProtoAdapter::Service { +public: + GrpcAdapter(const std::string& redis_host, std::shared_ptr logsink) : + client_{redis_host}, logger_{std::make_shared(LOGGER_NAME, logsink)} + { } + + void initialize() + { + client_.connect(); + } + + grpc::Status execute(grpc::ServerContext* context, const rslp::Command* request, + rslp::Command* response) override + { + std::vector command; + for (const auto& arg: request->data()) { + command.push_back(arg.str()); + } + + if (command.size()) { + std::string name{command.front()}; + std::transform(name.begin(), name.end(), name.begin(), ::tolower); + + if (name == "subscribe" || name == "psubscribe") { + logger_->warn("[{}] Received subscription command in execute() rpc, ignoring!", + context->peer()); + return grpc::Status{ + grpc::StatusCode::INVALID_ARGUMENT, + "SUBSCRIBE/PSUBSCRIBE can only be used with rpc subscribe()!" + }; + } + } + + logger_->debug("[{}] execute(): {}", context->peer(), request->ShortDebugString()); + + resply::Result result{client_.command(command)}; + resply_result_to_rslp(*response, result); + + return grpc::Status::OK; + } + + grpc::Status subscribe(grpc::ServerContext* context, const rslp::Command* request, + grpc::ServerWriter* writer) override + { + std::vector command; + for (const auto& arg: request->data()) { + command.push_back(arg.str()); + } + + if (command.size()) { + std::string name{command.front()}; + std::transform(name.begin(), name.end(), name.begin(), ::tolower); + + if (name != "subscribe" && name != "psubscribe") { + logger_->warn("Received non-subscription command in subscribe() rpc, ignoring!"); + return grpc::Status{ + grpc::StatusCode::INVALID_ARGUMENT, + "subscribe() rpc can only be used with SUBSCRIBE/PSUBSCRIBE!" + }; + } + } + + logger_->debug("[{}] subscribe(): {}", context->peer(), request->ShortDebugString()); + + resply::Result result{client_.command(command)}; + + client_.listen_for_messages([writer](const auto& channel, const auto& message) { + rslp::Command response; + response.add_data()->set_str("message"); + response.add_data()->set_str(channel); + response.add_data()->set_str(message); + + writer->Write(response); + }); + + return grpc::Status::OK; + } + +private: + resply::Client client_; + std::shared_ptr logger_; + + const std::string LOGGER_NAME{"GrpcAdapter"}; +}; + +class GrpcServer { +public: + void start(const Options& options, std::shared_ptr logsink) + { + grpc::ServerBuilder builder; + builder.AddListeningPort("0.0.0.0:" + options.grpc_port, grpc::InsecureServerCredentials()); + + GrpcAdapter adapter{options.remote_host, logsink}; + adapter.initialize(); + builder.RegisterService(&adapter); + + auto server{builder.BuildAndStart()}; + server->Wait(); + } }; class Proxy { public: - Proxy(const Options& options) : - options_{options}, logger_{spdlog::stdout_color_mt(LOGGER_NAME)} - { } + Proxy(const Options& options) : options_{options} { } - [[noreturn]] void run() { + std::shared_ptr logsink{ + std::make_shared() + }; + logger_.reset(new spdlog::logger(LOGGER_NAME, logsink)); + setup_logger(); read_config_file(); @@ -333,39 +488,29 @@ public: // Drop the console logger and create a rotating file logger. spdlog::drop(LOGGER_NAME); - logger_ = spdlog::rotating_logger_mt(LOGGER_NAME, options_.log_path, - 1048576 * 10, 10); + logsink.reset(new spdlog::sinks::rotating_file_sink_mt( + options_.log_path, 1048576 * 10, 10) + ); + logger_.reset(new spdlog::logger(LOGGER_NAME, logsink)); + setup_logger(); } - listen_for_connections(); + std::thread protobuf_server{[this, logsink]() { + ProtobufServer server; + server.start(options_, logsink); + }}; + + std::thread grpc_server{[this, logsink]() { + GrpcServer server; + server.start(options_, logsink); + }}; + + protobuf_server.join(); + grpc_server.join(); } private: - [[noreturn]] - void listen_for_connections() - { - asio::io_context io_context; - asio::ip::tcp::acceptor acceptor{io_context}; - - try { - acceptor = {io_context, {asio::ip::tcp::v4(), options_.port}}; - } catch (const asio::system_error& ex) { - logger_->error("Could not listen on 0.0.0.0:{}, exiting! ({})", - options_.port, ex.what()); - std::exit(3); - } - - logger_->info("Started listening on 0.0.0.0:{}", options_.port); - - for (;;) { - auto server{std::make_shared(options_.remote_host, io_context)}; - acceptor.accept(server->socket()); - - std::thread{&ProtobufAdapter::start, server}.detach(); - } - } - void read_config_file() { if (!options_.config_path.length()) { @@ -396,6 +541,8 @@ private: const Options& options_; std::shared_ptr logger_; json config_; + + const std::string LOGGER_NAME{GLOBAL_LOGGER_NAME}; }; @@ -407,4 +554,6 @@ int main(int argc, char* argv[]) install_signal_handler(); Proxy{options}.run(); + + return 0; }