Completely rewrite the response parser.

This commit is contained in:
Christoph Heiss 2018-02-22 17:04:19 +01:00
parent 131a42c77a
commit f4e13cb016
4 changed files with 276 additions and 108 deletions

68
include/resp-parser.h Normal file
View file

@ -0,0 +1,68 @@
//
// Copyright 2018 Christoph Heiss <me@christoph-heiss.me>
// Distributed under the Boost Software License, Version 1.0.
//
// See accompanying file LICENSE in the project root directory
// or copy at http://www.boost.org/LICENSE_1_0.txt
//
#pragma once
#include <cstddef>
#include <istream>
#include "resply.h"
/*! \brief A streaming parser for RESP.
*
* This parser is written after the specs at <https://redis.io/topics/protocol>.
*/
class RespParser {
public:
RespParser()
: state_{State::NeedType}, remaining_bytes_{READ_UNTIL_EOL},
remaining_elements_{0}
{ }
/*! \brief Does the actual parsing of the data.
* \param stream Content to parse.
* \return Status if still more data is needed.
*/
bool parse(std::istream& stream);
/*! \brief Returns the (current) result.
* \return The current result.
*
* Should only be called after the parsing is complete, i.e. #parse returned false.
*/
const resply::Result& result() const { return result_; }
private:
/*! \brief Represents the current internal parser state. */
enum class State {
NeedType, /*!< The parser needs a datatype for the result. */
NeedSize, /*!< The parser needs a size for the result. */
NeedData, /*!< The parser needs some (more) data. */
Finished /*!< Parsing is finished (duh.) */
};
/*! \brief The response was either a Simple String or Error - which means reading until CRLF */
const long READ_UNTIL_EOL = -1;
/*! \brief Sets the parser state according to the type for further parsing. */
void parse_type(char type);
void parse_type(char type, resply::Result& result);
/*! \brief Sets #state_ and #remaining_ accordingly for further parsing. */
void parse_size(const std::string& buffer);
/*! \brief Consume data and puts them into the result. */
void parse_data(const std::string& buffer);
resply::Result result_;
State state_;
long remaining_bytes_;
long remaining_elements_;
};

View file

@ -35,6 +35,7 @@ namespace resply {
};
Result() : type{Type::Nil} { }
Result(long long integer) : type{Type::Integer}, integer{integer} { }
/*! \brief Holds the type of the response. */
Type type;

View file

@ -118,122 +118,22 @@ public:
asio::write(socket_, asio::buffer(command), error_code);
check_asio_error(error_code);
Result result;
RespParser parser;
asio::streambuf buffer;
size_t remaining{};
bool cont{false};
do {
asio::read(socket_, buffer, error_code);
buffer.consume(buffer.size());
asio::read_until(socket_, buffer, '\n');
check_asio_error(error_code);
std::istream stream{&buffer};
cont = parser.parse(stream);
} while (cont);
parse_response(result, buffer, remaining);
} while (remaining);
return result;
return parser.result();
}
private:
static void parse_response(Result& result, asio::streambuf& streambuf, size_t& remaining)
{
std::string buffer{asio::buffers_begin(streambuf.data()),
asio::buffers_end(streambuf.data())};
if (!remaining) {
// First pass
parse_type(result, buffer, remaining);
} else {
// All other
continue_parse_type(result, buffer, remaining);
}
streambuf.consume(streambuf.size());
}
static void parse_type(Result& result, const std::string& buffer, size_t& remaining)
{
switch (buffer.front()) {
case '+':
result.type = Result::Type::String;
// Exclude the final \r\n bytes
result.string += buffer.substr(1, buffer.length() - 3);
break;
case '-':
result.type = Result::Type::ProtocolError;
// Exclude the final \r\n bytes
result.string += buffer.substr(1, buffer.length() - 3);
break;
case ':':
result.type = Result::Type::Integer;
result.integer = std::stoll(buffer.substr(1));
break;
case '$': {
size_t size;
long length{std::stol(buffer.substr(1), &size)};
if (length == -1) {
result.type = Result::Type::Nil;
break;
}
result.type = Result::Type::String;
if (static_cast<size_t>(length) < buffer.length()) {
result.string += buffer.substr(size + 3, buffer.length() - size - 3);
} else {
result.string += buffer.substr(size + 3);
remaining = length - result.string.length();
}
break;
}
case '*': {
size_t size;
long length{std::stol(buffer.substr(1), &size)};
if (length == -1) {
result.type = Result::Type::Nil;
break;
}
result.type = Result::Type::Array;
remaining = length;
while (remaining) {
}
break;
}
default:
// Error
break;
}
}
static void continue_parse_type(Result& result, const std::string& buffer, size_t& remaining)
{
switch (result.type) {
case Result::Type::String:
case Result::Type::ProtocolError:
case Result::Type::IOError:
result.string += buffer.substr(0, remaining);
if (remaining == buffer.length() - 2) {
remaining = 0;
} else {
remaining -= buffer.length();
}
default:
break;
}
}
const std::string host_;
const std::string port_;
const size_t timeout_;

199
src/resp-parser.cc Normal file
View file

@ -0,0 +1,199 @@
//
// Copyright 2018 Christoph Heiss <me@christoph-heiss.me>
// Distributed under the Boost Software License, Version 1.0.
//
// See accompanying file LICENSE in the project root directory
// or copy at http://www.boost.org/LICENSE_1_0.txt
//
#include <iostream>
#include <string>
#include <cstdlib>
#include "resp-parser.h"
using resply::Result;
enum RespTypes {
SIMPLE_STRING = '+',
ERROR = '-',
INTEGER = ':',
BULK_STRING = '$',
ARRAY = '*'
};
bool RespParser::parse(std::istream& stream)
{
std::string line;
while (state_ != State::Finished && !stream.eof()) {
switch (state_) {
case State::NeedType:
parse_type(stream.get());
break;
case State::NeedSize:
std::getline(stream, line);
line.pop_back();
parse_size(line);
break;
case State::NeedData: {
std::getline(stream, line);
if (line.back() == '\r') {
line.push_back('\n');
}
parse_data(line);
break;
}
case State::Finished:
break;
}
// Update internal stream state.
// Especially, sets the eofbit appropriatly if we read all data.
// This is needed because when using .read() does not update the
// internal stream state, i.e. does not set eofbit even if all data
// was read.
stream.peek();
}
if (result_.type == Result::Type::Array && !remaining_elements_) {
state_ = State::Finished;
}
return remaining_bytes_ <= 0 || remaining_elements_ ? false : state_ != State::Finished;
}
void RespParser::parse_type(char type)
{
if (result_.type == Result::Type::Array) {
Result result;
parse_type(type, result);
result_.array.push_back(result);
} else {
parse_type(type, result_);
}
}
void RespParser::parse_type(char type, Result& result)
{
switch (type) {
case RespTypes::SIMPLE_STRING:
result.type = Result::Type::String;
state_ = State::NeedData;
break;
case RespTypes::ERROR:
result.type = Result::Type::ProtocolError;
state_ = State::NeedData;
break;
case RespTypes::INTEGER:
result.type = Result::Type::Integer;
state_ = State::NeedData;
break;
case RespTypes::BULK_STRING:
result.type = Result::Type::String;
state_ = State::NeedSize;
break;
case RespTypes::ARRAY:
result.type = Result::Type::Array;
state_ = State::NeedSize;
break;
default:
result.type = Result::Type::ProtocolError;
result.string = "Parsing error.";
state_ = State::Finished;
break;
}
}
void RespParser::parse_size(const std::string& buffer)
{
long size{std::stol(buffer)};
if (size == -1) {
Result& result = result_.type == Result::Type::Array ? result_.array.back() : result_;
result.type = Result::Type::Nil;
state_ = State::Finished;
} else {
if (result_.type == Result::Type::Array && !remaining_elements_) {
remaining_elements_ = size;
state_ = State::NeedType;
} else {
remaining_bytes_ = size;
state_ = State::NeedData;
}
}
}
void RespParser::parse_data(const std::string& buffer)
{
Result::Type type{result_.type == Result::Type::Array ? result_.array.back().type : result_.type};
switch (type) {
case Result::Type::String:
case Result::Type::ProtocolError:
case Result::Type::IOError:
if (result_.type == Result::Type::Array) {
result_.array.back().string += buffer;
} else {
result_.string += buffer;
}
if (remaining_bytes_ == READ_UNTIL_EOL) {
// Response was a simple string
state_ = State::Finished;
result_.string.pop_back();
result_.string.pop_back();
} else {
remaining_bytes_ -= buffer.length();
if (remaining_bytes_ <= 0) {
if (result_.type == Result::Type::Array) {
result_.array.back().string.pop_back();
result_.array.back().string.pop_back();
remaining_elements_--;
} else {
state_ = State::Finished;
result_.string.pop_back();
result_.string.pop_back();
}
}
}
break;
case Result::Type::Integer:
if (result_.type == Result::Type::Array) {
result_.array.emplace_back(std::stoll(buffer));
remaining_elements_--;
} else {
result_.integer = std::stoll(buffer);
state_ = State::Finished;
}
break;
case Result::Type::Array:
default:
break;
}
if (remaining_bytes_ <= 0 && remaining_elements_ > 0) {
state_ = State::NeedType;
}
}