Add synchronous gRPC proxy server.

This commit is contained in:
Christoph Heiss 2018-03-06 14:59:46 +01:00
parent ab685a66e7
commit 67050147f2

View file

@ -17,6 +17,7 @@
#include <memory>
#include <cctype>
#include <algorithm>
#include <functional>
#include <arpa/inet.h>
#include "asio.hpp"
@ -26,6 +27,7 @@
#include "resply.h"
#include "rslp.pb.h"
#include "grpc++/grpc++.h"
#include "grpc/support/log.h"
#pragma GCC diagnostic push
#pragma GCC diagnostic ignored "-Wunused-parameter"
@ -53,7 +55,7 @@ struct Options {
namespace {
const std::string GLOBAL_LOGGER_NAME{"proxy"};
const std::string GLOBAL_LOGGER_NAME{"Proxy"};
void resply_result_to_rslp_data(rslp::Command_Data* data, const resply::Result& result);
@ -323,12 +325,12 @@ private:
}
const std::string LOGGER_NAME{"ProtobufAdapter"};
resply::Client client_;
asio::ip::tcp::socket socket_;
std::shared_ptr<spdlog::logger> logger_;
std::string remote_address_;
const std::string LOGGER_NAME{"ProtobufAdapter"};
std::shared_ptr<spdlog::logger> logger_;
};
class ProtobufServer {
@ -352,7 +354,7 @@ public:
for (;;) {
auto server{std::make_shared<ProtobufAdapter>(
options.remote_host, io_context, logsink
options.remote_host, io_context, logsink
)};
acceptor.accept(server->socket());
@ -383,6 +385,7 @@ public:
for (const auto& arg: request->data()) {
command.push_back(arg.str());
}
logger_->debug("[{}] execute(): {}", context->peer(), request->ShortDebugString());
if (command.size()) {
std::string name{command.front()};
@ -431,6 +434,10 @@ public:
resply::Result result{client_.command(command)};
rslp::Command response;
resply_result_to_rslp(response, result);
writer->Write(response);
client_.listen_for_messages([writer](const auto& channel, const auto& message) {
rslp::Command response;
response.add_data()->set_str("message");
@ -444,16 +451,19 @@ public:
}
private:
const std::string LOGGER_NAME{"GrpcAdapter"};
resply::Client client_;
std::shared_ptr<spdlog::logger> logger_;
const std::string LOGGER_NAME{"GrpcAdapter"};
};
class GrpcServer {
public:
void start(const Options& options, std::shared_ptr<spdlog::sinks::sink> logsink)
{
logger_.reset(new spdlog::logger(LOGGER_NAME, logsink));
setup_grpc_logging();
grpc::ServerBuilder builder;
builder.AddListeningPort("0.0.0.0:" + options.grpc_port, grpc::InsecureServerCredentials());
@ -461,9 +471,43 @@ public:
adapter.initialize();
builder.RegisterService(&adapter);
auto server{builder.BuildAndStart()};
server->Wait();
server_ = builder.BuildAndStart();
server_->Wait();
}
void shutdown()
{
server_->Shutdown();
}
private:
void setup_grpc_logging()
{
gpr_set_log_function([](gpr_log_func_args *args) {
// We cannot use LOGGER_NAME here as we cannot capture
// any variables.
auto logger{spdlog::get("GrpcServer")};
switch (args->severity) {
case GPR_LOG_SEVERITY_DEBUG:
logger->debug("{}", args->message);
break;
case GPR_LOG_SEVERITY_INFO:
logger->info("{}", args->message);
break;
case GPR_LOG_SEVERITY_ERROR:
logger->error("{}", args->message);
break;
}
});
}
const std::string LOGGER_NAME{"GrpcServer"};
std::shared_ptr<spdlog::logger> logger_;
std::unique_ptr<grpc::Server> server_;
};
@ -477,6 +521,7 @@ public:
std::make_shared<spdlog::sinks::ansicolor_stdout_sink_mt>()
};
logger_.reset(new spdlog::logger(LOGGER_NAME, logsink));
spdlog::register_logger(logger_);
setup_logger();
read_config_file();
@ -496,18 +541,14 @@ public:
setup_logger();
}
std::thread protobuf_server{[this, logsink]() {
ProtobufServer server;
server.start(options_, logsink);
}};
std::thread grpc_server{[this, logsink]() {
GrpcServer server;
server.start(options_, logsink);
}};
ProtobufServer protobuf_server;
std::thread{&ProtobufServer::start, &protobuf_server, std::cref(options_), logsink}.detach();
protobuf_server.join();
grpc_server.join();
GrpcServer grpc_server;
std::thread{&GrpcServer::start, &grpc_server, std::cref(options_), logsink}.detach();
for (;;);
}
private: