diff --git a/src/proxy.cc b/src/proxy.cc index ee2d29e..d920c3f 100644 --- a/src/proxy.cc +++ b/src/proxy.cc @@ -17,6 +17,7 @@ #include #include #include +#include #include #include "asio.hpp" @@ -26,6 +27,7 @@ #include "resply.h" #include "rslp.pb.h" #include "grpc++/grpc++.h" +#include "grpc/support/log.h" #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wunused-parameter" @@ -53,7 +55,7 @@ struct Options { namespace { -const std::string GLOBAL_LOGGER_NAME{"proxy"}; +const std::string GLOBAL_LOGGER_NAME{"Proxy"}; void resply_result_to_rslp_data(rslp::Command_Data* data, const resply::Result& result); @@ -323,12 +325,12 @@ private: } + const std::string LOGGER_NAME{"ProtobufAdapter"}; + resply::Client client_; asio::ip::tcp::socket socket_; - std::shared_ptr logger_; std::string remote_address_; - - const std::string LOGGER_NAME{"ProtobufAdapter"}; + std::shared_ptr logger_; }; class ProtobufServer { @@ -352,7 +354,7 @@ public: for (;;) { auto server{std::make_shared( - options.remote_host, io_context, logsink + options.remote_host, io_context, logsink )}; acceptor.accept(server->socket()); @@ -383,6 +385,7 @@ public: for (const auto& arg: request->data()) { command.push_back(arg.str()); } + logger_->debug("[{}] execute(): {}", context->peer(), request->ShortDebugString()); if (command.size()) { std::string name{command.front()}; @@ -431,6 +434,10 @@ public: resply::Result result{client_.command(command)}; + rslp::Command response; + resply_result_to_rslp(response, result); + writer->Write(response); + client_.listen_for_messages([writer](const auto& channel, const auto& message) { rslp::Command response; response.add_data()->set_str("message"); @@ -444,16 +451,19 @@ public: } private: + const std::string LOGGER_NAME{"GrpcAdapter"}; + resply::Client client_; std::shared_ptr logger_; - - const std::string LOGGER_NAME{"GrpcAdapter"}; }; class GrpcServer { public: void start(const Options& options, std::shared_ptr logsink) { + logger_.reset(new spdlog::logger(LOGGER_NAME, logsink)); + setup_grpc_logging(); + grpc::ServerBuilder builder; builder.AddListeningPort("0.0.0.0:" + options.grpc_port, grpc::InsecureServerCredentials()); @@ -461,9 +471,43 @@ public: adapter.initialize(); builder.RegisterService(&adapter); - auto server{builder.BuildAndStart()}; - server->Wait(); + server_ = builder.BuildAndStart(); + server_->Wait(); } + + void shutdown() + { + server_->Shutdown(); + } + +private: + void setup_grpc_logging() + { + gpr_set_log_function([](gpr_log_func_args *args) { + // We cannot use LOGGER_NAME here as we cannot capture + // any variables. + auto logger{spdlog::get("GrpcServer")}; + + switch (args->severity) { + case GPR_LOG_SEVERITY_DEBUG: + logger->debug("{}", args->message); + break; + + case GPR_LOG_SEVERITY_INFO: + logger->info("{}", args->message); + break; + + case GPR_LOG_SEVERITY_ERROR: + logger->error("{}", args->message); + break; + } + }); + } + + const std::string LOGGER_NAME{"GrpcServer"}; + + std::shared_ptr logger_; + std::unique_ptr server_; }; @@ -477,6 +521,7 @@ public: std::make_shared() }; logger_.reset(new spdlog::logger(LOGGER_NAME, logsink)); + spdlog::register_logger(logger_); setup_logger(); read_config_file(); @@ -496,18 +541,14 @@ public: setup_logger(); } - std::thread protobuf_server{[this, logsink]() { - ProtobufServer server; - server.start(options_, logsink); - }}; - std::thread grpc_server{[this, logsink]() { - GrpcServer server; - server.start(options_, logsink); - }}; + ProtobufServer protobuf_server; + std::thread{&ProtobufServer::start, &protobuf_server, std::cref(options_), logsink}.detach(); - protobuf_server.join(); - grpc_server.join(); + GrpcServer grpc_server; + std::thread{&GrpcServer::start, &grpc_server, std::cref(options_), logsink}.detach(); + + for (;;); } private: