Process received protobuf commands and send responses back.
This commit is contained in:
parent
1933db3442
commit
8ef46d3ce0
94
src/proxy.cc
94
src/proxy.cc
|
@ -172,14 +172,46 @@ public:
|
|||
|
||||
logger_->info("New connection from {}.", remote_address_);
|
||||
|
||||
client_.connect();
|
||||
|
||||
for (;;) {
|
||||
std::string request{receive_data()};
|
||||
if (!request.length()) {
|
||||
break;
|
||||
}
|
||||
|
||||
rslp::Command command;
|
||||
command.ParseFromString(request);
|
||||
|
||||
std::string debug_string{command.ShortDebugString()};
|
||||
logger_->debug("Received '{}' on {}", debug_string, remote_address_);
|
||||
|
||||
std::vector<std::string> resply_command;
|
||||
const auto arguments{command.mutable_data()};
|
||||
|
||||
for (int i{}; i < arguments->size(); i++) {
|
||||
resply_command.push_back(arguments->Get(i).str());
|
||||
}
|
||||
|
||||
resply::Result result{client_.command(resply_command)};
|
||||
|
||||
rslp::Command response;
|
||||
resply_result_to_rslp(response, result);
|
||||
|
||||
send_data(response);
|
||||
}
|
||||
}
|
||||
|
||||
private:
|
||||
std::string receive_data()
|
||||
{
|
||||
asio::error_code error_code;
|
||||
|
||||
uint32_t size;
|
||||
{
|
||||
auto buffer{asio::buffer(&size, 4)};
|
||||
asio::read(socket_, buffer, asio::transfer_exactly(4), error_code);
|
||||
if (error_code) { break; }
|
||||
if (error_code) { return {}; }
|
||||
|
||||
size = ntohl(size);
|
||||
}
|
||||
|
@ -188,23 +220,65 @@ public:
|
|||
{
|
||||
auto buffer{asio::buffer(&data[0], size)};
|
||||
asio::read(socket_, buffer, asio::transfer_exactly(size), error_code);
|
||||
if (error_code) { break; }
|
||||
if (error_code) { return {}; }
|
||||
}
|
||||
|
||||
rslp::Command command;
|
||||
command.ParseFromString(data);
|
||||
return data;
|
||||
}
|
||||
|
||||
void send_data(const rslp::Command& command)
|
||||
{
|
||||
std::string str{command.DebugString()};
|
||||
str.erase(std::remove_if(str.begin(), str.end(), ::isspace), str.end());
|
||||
std::string output;
|
||||
command.SerializeToString(&output);
|
||||
|
||||
logger_->debug("Received '{}' on {}", str, remote_address_);
|
||||
uint32_t size{htonl(static_cast<uint32_t>(output.size()))};
|
||||
asio::write(socket_, asio::buffer(&size, 4));
|
||||
asio::write(socket_, asio::buffer(output.data(), output.size()));
|
||||
}
|
||||
|
||||
static void resply_result_to_rslp(rslp::Command& command, const resply::Result& result) {
|
||||
using Type = resply::Result::Type;
|
||||
|
||||
switch (result.type) {
|
||||
case Type::String: {
|
||||
command.set_type(rslp::Command::String);
|
||||
auto data{command.add_data()};
|
||||
data->set_str(result.string);
|
||||
break;
|
||||
}
|
||||
|
||||
case Type::Integer: {
|
||||
command.set_type(rslp::Command::Integer);
|
||||
auto data{command.add_data()};
|
||||
data->set_int_(result.integer);
|
||||
break;
|
||||
}
|
||||
|
||||
case Type::Array: {
|
||||
command.set_type(rslp::Command::Array);
|
||||
|
||||
for (const auto& element: result.array) {
|
||||
auto data{command.add_data()};
|
||||
resply_result_to_rslp(*data->mutable_subdata(), element);
|
||||
}
|
||||
|
||||
break;
|
||||
}
|
||||
|
||||
case Type::ProtocolError:
|
||||
case Type::IOError: {
|
||||
command.set_type(rslp::Command::Error);
|
||||
auto data{command.add_data()};
|
||||
data->set_str(result.string);
|
||||
break;
|
||||
}
|
||||
|
||||
case Type::Nil:
|
||||
command.set_type(rslp::Command::Nil);
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
private:
|
||||
resply::Client client_;
|
||||
asio::ip::tcp::socket socket_;
|
||||
std::shared_ptr<spdlog::logger> logger_;
|
||||
|
|
Reference in a new issue