Implement simple ProtobufRedisAdapter.
Currently, this just prints out the received messages as traces.
This commit is contained in:
parent
6c135fa512
commit
d768a8e4ef
98
src/proxy.cc
98
src/proxy.cc
|
@ -14,7 +14,10 @@
|
|||
#include <cstdlib>
|
||||
#include <csignal>
|
||||
#include <thread>
|
||||
#include <memory>
|
||||
#include <cctype>
|
||||
|
||||
#include "asio.hpp"
|
||||
#include "clipp.h"
|
||||
#include "spdlog/spdlog.h"
|
||||
#include "nlohmann/json.hpp"
|
||||
|
@ -22,6 +25,7 @@
|
|||
#include "rslp.pb.h"
|
||||
|
||||
using json = nlohmann::json;
|
||||
using namespace google;
|
||||
|
||||
const std::string LOGGER_NAME{"proxy-log"};
|
||||
|
||||
|
@ -131,6 +135,11 @@ static void daemonize_process()
|
|||
std::exit(2);
|
||||
}
|
||||
|
||||
// Change working directory
|
||||
//if (::chdir("/") < 0) {
|
||||
//logger->warn("Could not change working directory, current directory might be locked! Reason: {}", ::strerror(errno));
|
||||
//}
|
||||
|
||||
// Change iostreams
|
||||
::freopen("/dev/null", "r", ::stdin);
|
||||
::freopen("/dev/null", "w", ::stdout);
|
||||
|
@ -138,6 +147,67 @@ static void daemonize_process()
|
|||
}
|
||||
|
||||
|
||||
class ProtobufRedisAdapter {
|
||||
public:
|
||||
ProtobufRedisAdapter(const std::string& redis_host, asio::io_service& io_service) :
|
||||
client_{redis_host}, socket_{io_service}, logger_{spdlog::get(LOGGER_NAME)}
|
||||
{ }
|
||||
|
||||
~ProtobufRedisAdapter()
|
||||
{
|
||||
logger_->info("Connection to {} closed.", remote_address_);
|
||||
}
|
||||
|
||||
asio::ip::tcp::socket& socket()
|
||||
{
|
||||
return socket_;
|
||||
}
|
||||
|
||||
void start()
|
||||
{
|
||||
{
|
||||
auto endpoint{socket_.remote_endpoint()};
|
||||
remote_address_ = endpoint.address().to_string() + ':' + std::to_string(endpoint.port());
|
||||
}
|
||||
|
||||
logger_->info("New connection from {}.", remote_address_);
|
||||
|
||||
for (;;) {
|
||||
asio::error_code error_code;
|
||||
|
||||
uint32_t size;
|
||||
auto size_buffer{asio::buffer(&size, 4)};
|
||||
asio::read(socket_, size_buffer, asio::transfer_exactly(4), error_code);
|
||||
if (error_code)
|
||||
break;
|
||||
|
||||
std::string input(size, '\0');
|
||||
auto input_buffer{asio::buffer(&input[0], size)};
|
||||
asio::read(socket_, input_buffer, asio::transfer_exactly(size), error_code);
|
||||
if (error_code)
|
||||
break;
|
||||
|
||||
rslp::Command command;
|
||||
command.ParseFromString(input);
|
||||
|
||||
{
|
||||
std::string str{command.DebugString()};
|
||||
str.erase(std::remove_if(str.begin(), str.end(), ::isspace), str.end());
|
||||
|
||||
logger_->trace("Received {}", str);
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private:
|
||||
resply::Client client_;
|
||||
asio::ip::tcp::socket socket_;
|
||||
std::shared_ptr<spdlog::logger> logger_;
|
||||
std::string remote_address_;
|
||||
};
|
||||
|
||||
|
||||
static void install_signal_handler()
|
||||
{
|
||||
std::thread{[&]() {
|
||||
|
@ -156,8 +226,25 @@ static void install_signal_handler()
|
|||
}
|
||||
|
||||
|
||||
#include <chrono>
|
||||
#include <thread>
|
||||
[[noreturn]]
|
||||
static void listen_for_connections(const Options& options)
|
||||
{
|
||||
asio::io_service io_service;
|
||||
|
||||
asio::ip::tcp::endpoint endpoint{asio::ip::tcp::v4(), options.port};
|
||||
asio::ip::tcp::acceptor acceptor{io_service, endpoint};
|
||||
|
||||
spdlog::get(LOGGER_NAME)->info("Started listening on 0.0.0.0:{}", options.port);
|
||||
|
||||
for (;;) {
|
||||
auto server{std::make_shared<ProtobufRedisAdapter>(options.remote_host, io_service)};
|
||||
acceptor.accept(server->socket());
|
||||
|
||||
std::thread{&ProtobufRedisAdapter::start, server}.detach();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
int main(int argc, char* argv[])
|
||||
{
|
||||
GOOGLE_PROTOBUF_VERIFY_VERSION;
|
||||
|
@ -181,10 +268,5 @@ int main(int argc, char* argv[])
|
|||
}
|
||||
|
||||
install_signal_handler();
|
||||
|
||||
for (;;) {
|
||||
std::this_thread::sleep_for(std::chrono::seconds(1));
|
||||
}
|
||||
|
||||
return 0;
|
||||
listen_for_connections(options);
|
||||
}
|
||||
|
|
Reference in a new issue