Implement gRPC-based server.

Also do a small cleanup regarding the logging facility.
This commit is contained in:
Christoph Heiss 2018-03-06 10:23:10 +01:00
parent 27e39a3585
commit ebb20044d9
2 changed files with 258 additions and 105 deletions

View file

@ -25,3 +25,7 @@ message Command {
} }
service ProtoAdapter {
rpc execute(Command) returns (Command) {}
rpc subscribe(Command) returns (stream Command) {}
}

View file

@ -16,6 +16,7 @@
#include <thread> #include <thread>
#include <memory> #include <memory>
#include <cctype> #include <cctype>
#include <algorithm>
#include <arpa/inet.h> #include <arpa/inet.h>
#include "asio.hpp" #include "asio.hpp"
@ -24,29 +25,40 @@
#include "nlohmann/json.hpp" #include "nlohmann/json.hpp"
#include "resply.h" #include "resply.h"
#include "rslp.pb.h" #include "rslp.pb.h"
#include "grpc++/grpc++.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
#include "rslp.grpc.pb.h"
#pragma GCC diagnostic pop
using json = nlohmann::json; using json = nlohmann::json;
using namespace google; using namespace google;
const std::string LOGGER_NAME{"proxy-log"};
struct Options { struct Options {
Options() : Options() :
config_path{".proxy-conf.json"}, daemonize{}, log_path{"proxy.log"}, config_path{".proxy-conf.json"}, daemonize{}, log_path{"proxy.log"},
port{6543}, remote_host{"localhost:6379"}, verbose{} { } protobuf_port{6543}, grpc_port{"6544"}, remote_host{"localhost:6379"}, verbose{} { }
std::string config_path; std::string config_path;
bool daemonize; bool daemonize;
std::string log_path; std::string log_path;
unsigned short port; unsigned short protobuf_port;
std::string grpc_port;
std::string remote_host; std::string remote_host;
bool verbose; bool verbose;
}; };
static Options parse_commandline(int argc, char** argv) namespace {
const std::string GLOBAL_LOGGER_NAME{"proxy"};
void resply_result_to_rslp_data(rslp::Command_Data* data, const resply::Result& result);
Options parse_commandline(int argc, char** argv)
{ {
Options options; Options options;
bool show_help{}, show_version{}; bool show_help{}, show_version{};
@ -57,8 +69,10 @@ static Options parse_commandline(int argc, char** argv)
clipp::option("-d", "--daemonize").set(options.daemonize).doc("Fork to background."), clipp::option("-d", "--daemonize").set(options.daemonize).doc("Fork to background."),
clipp::option("-l", "--log-path").set(options.log_path) clipp::option("-l", "--log-path").set(options.log_path)
.doc("Path to the log file [default: $CWD/proxy.log] (Only applies when not daemonized.)"), .doc("Path to the log file [default: $CWD/proxy.log] (Only applies when not daemonized.)"),
clipp::option("-p", "--port").set(options.port) clipp::option("--protobuf-port").set(options.protobuf_port)
.doc("Port to listen on [default: 6543]"), .doc("Port the protobuf server should listen on [default: 6543]"),
clipp::option("--grpc-port").set(options.grpc_port)
.doc("Port the gRPC server should listen on [default: 6544]"),
clipp::option("-r", "--remote-host").set(options.remote_host) clipp::option("-r", "--remote-host").set(options.remote_host)
.doc("Host (redis-server) to connect to [default: localhost:6379]"), .doc("Host (redis-server) to connect to [default: localhost:6379]"),
clipp::option("-v", "--verbose").set(options.verbose).doc("Enable verbose logging."), clipp::option("-v", "--verbose").set(options.verbose).doc("Enable verbose logging."),
@ -83,17 +97,16 @@ static Options parse_commandline(int argc, char** argv)
return options; return options;
} }
void cleanup()
static void cleanup()
{ {
spdlog::get(LOGGER_NAME)->info("Shutting down."); spdlog::get(GLOBAL_LOGGER_NAME)->info("Shutting down.");
spdlog::drop_all(); spdlog::drop_all();
google::protobuf::ShutdownProtobufLibrary(); google::protobuf::ShutdownProtobufLibrary();
} }
static void install_signal_handler() void install_signal_handler()
{ {
std::thread{[&]() { std::thread{[&]() {
::sigset_t sigset; ::sigset_t sigset;
@ -110,10 +123,9 @@ static void install_signal_handler()
}}.detach(); }}.detach();
} }
void daemonize_process()
static void daemonize_process()
{ {
auto logger{spdlog::get(LOGGER_NAME)}; auto logger{spdlog::get(GLOBAL_LOGGER_NAME)};
pid_t pid{::fork()}; pid_t pid{::fork()};
@ -146,16 +158,75 @@ static void daemonize_process()
::freopen("/dev/null", "w", ::stderr); ::freopen("/dev/null", "w", ::stderr);
} }
void resply_result_to_rslp(rslp::Command& command, const resply::Result& result)
{
using Type = resply::Result::Type;
switch (result.type) {
case Type::ProtocolError:
case Type::IOError:
command.add_data()->set_err(result.string);
break;
case Type::String:
command.add_data()->set_str(result.string);
break;
case Type::Integer:
command.add_data()->set_int_(result.integer);
break;
case Type::Array:
for (const auto& element: result.array) {
resply_result_to_rslp_data(command.add_data(), element);
}
break;
case Type::Nil:
break;
}
}
void resply_result_to_rslp_data(rslp::Command_Data* data, const resply::Result& result)
{
using Type = resply::Result::Type;
switch (result.type) {
case Type::String:
data->set_str(result.string);
break;
case Type::Integer:
data->set_int_(result.integer);
break;
case Type::Array:
for (const auto& element: result.array) {
resply_result_to_rslp(*data->mutable_array(), element);
}
break;
default:
/* Cannot happen */
break;
}
}
}
class ProtobufAdapter { class ProtobufAdapter {
public: public:
ProtobufAdapter(const std::string& redis_host, asio::io_context& io_context) : ProtobufAdapter(const std::string& redis_host, asio::io_context& io_context,
client_{redis_host}, socket_{io_context}, logger_{spdlog::get(LOGGER_NAME)} std::shared_ptr<spdlog::sinks::sink> logsink) :
client_{redis_host}, socket_{io_context},
logger_{std::make_shared<spdlog::logger>(LOGGER_NAME, logsink)}
{ } { }
~ProtobufAdapter() ~ProtobufAdapter()
{ {
logger_->info("Connection to {} closed.", remote_address_); logger_->info("[{}] Connection closed.", remote_address_);
} }
asio::ip::tcp::socket& socket() asio::ip::tcp::socket& socket()
@ -183,8 +254,7 @@ public:
rslp::Command command; rslp::Command command;
command.ParseFromString(request); command.ParseFromString(request);
std::string debug_string{command.ShortDebugString()}; logger_->debug("[{}] Received message '{}'", remote_address_, command.ShortDebugString());
logger_->debug("Received protobuf message '{}' from {}", debug_string, remote_address_);
std::vector<std::string> resply_command; std::vector<std::string> resply_command;
for (const auto& arg: command.data()) { for (const auto& arg: command.data()) {
@ -252,77 +322,162 @@ private:
asio::write(socket_, asio::buffer(output.data(), output.size())); asio::write(socket_, asio::buffer(output.data(), output.size()));
} }
static void resply_result_to_rslp(rslp::Command& command, const resply::Result& result)
{
using Type = resply::Result::Type;
switch (result.type) {
case Type::ProtocolError:
case Type::IOError:
command.add_data()->set_err(result.string);
break;
case Type::String:
command.add_data()->set_str(result.string);
break;
case Type::Integer:
command.add_data()->set_int_(result.integer);
break;
case Type::Array:
for (const auto& element: result.array) {
resply_result_to_rslp_data(command.add_data(), element);
}
break;
case Type::Nil:
break;
}
}
static void resply_result_to_rslp_data(rslp::Command_Data* data, const resply::Result& result)
{
using Type = resply::Result::Type;
switch (result.type) {
case Type::String:
data->set_str(result.string);
break;
case Type::Integer:
data->set_int_(result.integer);
break;
case Type::Array:
for (const auto& element: result.array) {
resply_result_to_rslp(*data->mutable_array(), element);
}
break;
default:
/* Cannot happen */
break;
}
}
resply::Client client_; resply::Client client_;
asio::ip::tcp::socket socket_; asio::ip::tcp::socket socket_;
std::shared_ptr<spdlog::logger> logger_; std::shared_ptr<spdlog::logger> logger_;
std::string remote_address_; std::string remote_address_;
const std::string LOGGER_NAME{"ProtobufAdapter"};
};
class ProtobufServer {
public:
void start(const Options& options, std::shared_ptr<spdlog::sinks::sink> logsink)
{
auto logger{std::make_shared<spdlog::logger>(LOGGER_NAME, logsink)};
asio::io_context io_context;
asio::ip::tcp::acceptor acceptor{io_context};
try {
acceptor = {io_context, {asio::ip::tcp::v4(), options.protobuf_port}};
} catch (const asio::system_error& ex) {
logger->error("Could not start protobuf server on 0.0.0.0:{}, exiting! ({})",
options.protobuf_port, ex.what());
return;
}
logger->info("Started protobuf server on 0.0.0.0:{}", options.protobuf_port);
for (;;) {
auto server{std::make_shared<ProtobufAdapter>(
options.remote_host, io_context, logsink
)};
acceptor.accept(server->socket());
std::thread{&ProtobufAdapter::start, server}.detach();
}
}
private:
const std::string LOGGER_NAME{"ProtobufServer"};
};
class GrpcAdapter final : public rslp::ProtoAdapter::Service {
public:
GrpcAdapter(const std::string& redis_host, std::shared_ptr<spdlog::sinks::sink> logsink) :
client_{redis_host}, logger_{std::make_shared<spdlog::logger>(LOGGER_NAME, logsink)}
{ }
void initialize()
{
client_.connect();
}
grpc::Status execute(grpc::ServerContext* context, const rslp::Command* request,
rslp::Command* response) override
{
std::vector<std::string> command;
for (const auto& arg: request->data()) {
command.push_back(arg.str());
}
if (command.size()) {
std::string name{command.front()};
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
if (name == "subscribe" || name == "psubscribe") {
logger_->warn("[{}] Received subscription command in execute() rpc, ignoring!",
context->peer());
return grpc::Status{
grpc::StatusCode::INVALID_ARGUMENT,
"SUBSCRIBE/PSUBSCRIBE can only be used with rpc subscribe()!"
};
}
}
logger_->debug("[{}] execute(): {}", context->peer(), request->ShortDebugString());
resply::Result result{client_.command(command)};
resply_result_to_rslp(*response, result);
return grpc::Status::OK;
}
grpc::Status subscribe(grpc::ServerContext* context, const rslp::Command* request,
grpc::ServerWriter<rslp::Command>* writer) override
{
std::vector<std::string> command;
for (const auto& arg: request->data()) {
command.push_back(arg.str());
}
if (command.size()) {
std::string name{command.front()};
std::transform(name.begin(), name.end(), name.begin(), ::tolower);
if (name != "subscribe" && name != "psubscribe") {
logger_->warn("Received non-subscription command in subscribe() rpc, ignoring!");
return grpc::Status{
grpc::StatusCode::INVALID_ARGUMENT,
"subscribe() rpc can only be used with SUBSCRIBE/PSUBSCRIBE!"
};
}
}
logger_->debug("[{}] subscribe(): {}", context->peer(), request->ShortDebugString());
resply::Result result{client_.command(command)};
client_.listen_for_messages([writer](const auto& channel, const auto& message) {
rslp::Command response;
response.add_data()->set_str("message");
response.add_data()->set_str(channel);
response.add_data()->set_str(message);
writer->Write(response);
});
return grpc::Status::OK;
}
private:
resply::Client client_;
std::shared_ptr<spdlog::logger> logger_;
const std::string LOGGER_NAME{"GrpcAdapter"};
};
class GrpcServer {
public:
void start(const Options& options, std::shared_ptr<spdlog::sinks::sink> logsink)
{
grpc::ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:" + options.grpc_port, grpc::InsecureServerCredentials());
GrpcAdapter adapter{options.remote_host, logsink};
adapter.initialize();
builder.RegisterService(&adapter);
auto server{builder.BuildAndStart()};
server->Wait();
}
}; };
class Proxy { class Proxy {
public: public:
Proxy(const Options& options) : Proxy(const Options& options) : options_{options} { }
options_{options}, logger_{spdlog::stdout_color_mt(LOGGER_NAME)}
{ }
[[noreturn]]
void run() void run()
{ {
std::shared_ptr<spdlog::sinks::sink> logsink{
std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>()
};
logger_.reset(new spdlog::logger(LOGGER_NAME, logsink));
setup_logger(); setup_logger();
read_config_file(); read_config_file();
@ -333,39 +488,29 @@ public:
// Drop the console logger and create a rotating file logger. // Drop the console logger and create a rotating file logger.
spdlog::drop(LOGGER_NAME); spdlog::drop(LOGGER_NAME);
logger_ = spdlog::rotating_logger_mt(LOGGER_NAME, options_.log_path, logsink.reset(new spdlog::sinks::rotating_file_sink_mt(
1048576 * 10, 10); options_.log_path, 1048576 * 10, 10)
);
logger_.reset(new spdlog::logger(LOGGER_NAME, logsink));
setup_logger(); setup_logger();
} }
listen_for_connections(); std::thread protobuf_server{[this, logsink]() {
ProtobufServer server;
server.start(options_, logsink);
}};
std::thread grpc_server{[this, logsink]() {
GrpcServer server;
server.start(options_, logsink);
}};
protobuf_server.join();
grpc_server.join();
} }
private: private:
[[noreturn]]
void listen_for_connections()
{
asio::io_context io_context;
asio::ip::tcp::acceptor acceptor{io_context};
try {
acceptor = {io_context, {asio::ip::tcp::v4(), options_.port}};
} catch (const asio::system_error& ex) {
logger_->error("Could not listen on 0.0.0.0:{}, exiting! ({})",
options_.port, ex.what());
std::exit(3);
}
logger_->info("Started listening on 0.0.0.0:{}", options_.port);
for (;;) {
auto server{std::make_shared<ProtobufAdapter>(options_.remote_host, io_context)};
acceptor.accept(server->socket());
std::thread{&ProtobufAdapter::start, server}.detach();
}
}
void read_config_file() void read_config_file()
{ {
if (!options_.config_path.length()) { if (!options_.config_path.length()) {
@ -396,6 +541,8 @@ private:
const Options& options_; const Options& options_;
std::shared_ptr<spdlog::logger> logger_; std::shared_ptr<spdlog::logger> logger_;
json config_; json config_;
const std::string LOGGER_NAME{GLOBAL_LOGGER_NAME};
}; };
@ -407,4 +554,6 @@ int main(int argc, char* argv[])
install_signal_handler(); install_signal_handler();
Proxy{options}.run(); Proxy{options}.run();
return 0;
} }