diff --git a/include/resp-parser.h b/include/resp-parser.h new file mode 100644 index 0000000..8a1c327 --- /dev/null +++ b/include/resp-parser.h @@ -0,0 +1,68 @@ +// +// Copyright 2018 Christoph Heiss +// Distributed under the Boost Software License, Version 1.0. +// +// See accompanying file LICENSE in the project root directory +// or copy at http://www.boost.org/LICENSE_1_0.txt +// + +#pragma once + +#include +#include +#include "resply.h" + + +/*! \brief A streaming parser for RESP. + * + * This parser is written after the specs at . + */ +class RespParser { +public: + RespParser() + : state_{State::NeedType}, remaining_bytes_{READ_UNTIL_EOL}, + remaining_elements_{0} + { } + + /*! \brief Does the actual parsing of the data. + * \param stream Content to parse. + * \return Status if still more data is needed. + */ + bool parse(std::istream& stream); + + /*! \brief Returns the (current) result. + * \return The current result. + * + * Should only be called after the parsing is complete, i.e. #parse returned false. + */ + const resply::Result& result() const { return result_; } + +private: + /*! \brief Represents the current internal parser state. */ + enum class State { + NeedType, /*!< The parser needs a datatype for the result. */ + NeedSize, /*!< The parser needs a size for the result. */ + NeedData, /*!< The parser needs some (more) data. */ + Finished /*!< Parsing is finished (duh.) */ + }; + + /*! \brief The response was either a Simple String or Error - which means reading until CRLF */ + const long READ_UNTIL_EOL = -1; + + /*! \brief Sets the parser state according to the type for further parsing. */ + void parse_type(char type); + void parse_type(char type, resply::Result& result); + + /*! \brief Sets #state_ and #remaining_ accordingly for further parsing. */ + void parse_size(const std::string& buffer); + + /*! \brief Consume data and puts them into the result. */ + void parse_data(const std::string& buffer); + + resply::Result result_; + State state_; + long remaining_bytes_; + long remaining_elements_; +}; + + diff --git a/include/resply.h b/include/resply.h index 02165a8..c066828 100644 --- a/include/resply.h +++ b/include/resply.h @@ -35,6 +35,7 @@ namespace resply { }; Result() : type{Type::Nil} { } + Result(long long integer) : type{Type::Integer}, integer{integer} { } /*! \brief Holds the type of the response. */ Type type; diff --git a/src/libresply.cc b/src/libresply.cc index 9240448..f8aeaaf 100644 --- a/src/libresply.cc +++ b/src/libresply.cc @@ -118,122 +118,22 @@ public: asio::write(socket_, asio::buffer(command), error_code); check_asio_error(error_code); - Result result; + RespParser parser; asio::streambuf buffer; - size_t remaining{}; + bool cont{false}; do { - asio::read(socket_, buffer, error_code); + buffer.consume(buffer.size()); + asio::read_until(socket_, buffer, '\n'); check_asio_error(error_code); + std::istream stream{&buffer}; + cont = parser.parse(stream); + } while (cont); - parse_response(result, buffer, remaining); - } while (remaining); - - return result; + return parser.result(); } private: - static void parse_response(Result& result, asio::streambuf& streambuf, size_t& remaining) - { - std::string buffer{asio::buffers_begin(streambuf.data()), - asio::buffers_end(streambuf.data())}; - - if (!remaining) { - // First pass - parse_type(result, buffer, remaining); - } else { - // All other - continue_parse_type(result, buffer, remaining); - } - - streambuf.consume(streambuf.size()); - } - - static void parse_type(Result& result, const std::string& buffer, size_t& remaining) - { - switch (buffer.front()) { - case '+': - result.type = Result::Type::String; - // Exclude the final \r\n bytes - result.string += buffer.substr(1, buffer.length() - 3); - - break; - - - case '-': - result.type = Result::Type::ProtocolError; - // Exclude the final \r\n bytes - result.string += buffer.substr(1, buffer.length() - 3); - - break; - - case ':': - result.type = Result::Type::Integer; - result.integer = std::stoll(buffer.substr(1)); - - break; - - case '$': { - size_t size; - long length{std::stol(buffer.substr(1), &size)}; - - if (length == -1) { - result.type = Result::Type::Nil; - break; - } - - result.type = Result::Type::String; - if (static_cast(length) < buffer.length()) { - result.string += buffer.substr(size + 3, buffer.length() - size - 3); - } else { - result.string += buffer.substr(size + 3); - remaining = length - result.string.length(); - } - - break; - } - case '*': { - size_t size; - long length{std::stol(buffer.substr(1), &size)}; - - if (length == -1) { - result.type = Result::Type::Nil; - break; - } - - result.type = Result::Type::Array; - remaining = length; - - while (remaining) { - - } - - break; - } - default: - // Error - break; - } - } - - static void continue_parse_type(Result& result, const std::string& buffer, size_t& remaining) - { - switch (result.type) { - case Result::Type::String: - case Result::Type::ProtocolError: - case Result::Type::IOError: - result.string += buffer.substr(0, remaining); - - if (remaining == buffer.length() - 2) { - remaining = 0; - } else { - remaining -= buffer.length(); - } - default: - break; - } - } - const std::string host_; const std::string port_; const size_t timeout_; diff --git a/src/resp-parser.cc b/src/resp-parser.cc new file mode 100644 index 0000000..8ca7492 --- /dev/null +++ b/src/resp-parser.cc @@ -0,0 +1,199 @@ +// +// Copyright 2018 Christoph Heiss +// Distributed under the Boost Software License, Version 1.0. +// +// See accompanying file LICENSE in the project root directory +// or copy at http://www.boost.org/LICENSE_1_0.txt +// + +#include +#include +#include + +#include "resp-parser.h" + + +using resply::Result; + +enum RespTypes { + SIMPLE_STRING = '+', + ERROR = '-', + INTEGER = ':', + BULK_STRING = '$', + ARRAY = '*' +}; + + +bool RespParser::parse(std::istream& stream) +{ + std::string line; + + while (state_ != State::Finished && !stream.eof()) { + switch (state_) { + case State::NeedType: + parse_type(stream.get()); + break; + + case State::NeedSize: + std::getline(stream, line); + line.pop_back(); + parse_size(line); + break; + + case State::NeedData: { + std::getline(stream, line); + if (line.back() == '\r') { + line.push_back('\n'); + } + + parse_data(line); + break; + } + + case State::Finished: + break; + } + + // Update internal stream state. + // Especially, sets the eofbit appropriatly if we read all data. + // This is needed because when using .read() does not update the + // internal stream state, i.e. does not set eofbit even if all data + // was read. + stream.peek(); + } + + if (result_.type == Result::Type::Array && !remaining_elements_) { + state_ = State::Finished; + } + + return remaining_bytes_ <= 0 || remaining_elements_ ? false : state_ != State::Finished; +} + + +void RespParser::parse_type(char type) +{ + if (result_.type == Result::Type::Array) { + Result result; + parse_type(type, result); + result_.array.push_back(result); + } else { + parse_type(type, result_); + } +} + + +void RespParser::parse_type(char type, Result& result) +{ + switch (type) { + case RespTypes::SIMPLE_STRING: + result.type = Result::Type::String; + state_ = State::NeedData; + break; + + case RespTypes::ERROR: + result.type = Result::Type::ProtocolError; + state_ = State::NeedData; + break; + + case RespTypes::INTEGER: + result.type = Result::Type::Integer; + state_ = State::NeedData; + break; + + case RespTypes::BULK_STRING: + result.type = Result::Type::String; + state_ = State::NeedSize; + break; + + case RespTypes::ARRAY: + result.type = Result::Type::Array; + state_ = State::NeedSize; + break; + + default: + result.type = Result::Type::ProtocolError; + result.string = "Parsing error."; + state_ = State::Finished; + break; + } +} + + +void RespParser::parse_size(const std::string& buffer) +{ + long size{std::stol(buffer)}; + + if (size == -1) { + Result& result = result_.type == Result::Type::Array ? result_.array.back() : result_; + + result.type = Result::Type::Nil; + state_ = State::Finished; + } else { + if (result_.type == Result::Type::Array && !remaining_elements_) { + remaining_elements_ = size; + state_ = State::NeedType; + } else { + remaining_bytes_ = size; + state_ = State::NeedData; + } + } +} + + +void RespParser::parse_data(const std::string& buffer) +{ + Result::Type type{result_.type == Result::Type::Array ? result_.array.back().type : result_.type}; + + switch (type) { + case Result::Type::String: + case Result::Type::ProtocolError: + case Result::Type::IOError: + if (result_.type == Result::Type::Array) { + result_.array.back().string += buffer; + } else { + result_.string += buffer; + } + + if (remaining_bytes_ == READ_UNTIL_EOL) { + // Response was a simple string + state_ = State::Finished; + result_.string.pop_back(); + result_.string.pop_back(); + } else { + remaining_bytes_ -= buffer.length(); + + if (remaining_bytes_ <= 0) { + if (result_.type == Result::Type::Array) { + result_.array.back().string.pop_back(); + result_.array.back().string.pop_back(); + remaining_elements_--; + } else { + state_ = State::Finished; + result_.string.pop_back(); + result_.string.pop_back(); + } + } + } + + break; + + case Result::Type::Integer: + if (result_.type == Result::Type::Array) { + result_.array.emplace_back(std::stoll(buffer)); + remaining_elements_--; + } else { + result_.integer = std::stoll(buffer); + state_ = State::Finished; + } + + break; + + case Result::Type::Array: + default: + break; + } + + if (remaining_bytes_ <= 0 && remaining_elements_ > 0) { + state_ = State::NeedType; + } +}