Compare commits

...

9 commits

12 changed files with 82 additions and 24 deletions

View file

@ -43,6 +43,10 @@
pkg-config
];
buildInputs = with final; [
boost
];
checkInputs = with final; [
gtest
];

View file

@ -9,10 +9,13 @@ add_subdirectory(utils)
configure_file(config.h.in config.h)
find_package(Boost REQUIRED COMPONENTS thread)
target_link_libraries(kraken PRIVATE
csv
engine
parse
Boost::thread
)
install(TARGETS kraken)

View file

@ -66,6 +66,6 @@ struct FlushOrder {
auto operator<=>(FlushOrder const&) const = default;
};
using Order = std::variant<TradeOrder, CancelOrder, FlushOrder>;
using Order = std::variant<FlushOrder, TradeOrder, CancelOrder>;
} // namespace kraken

View file

@ -4,10 +4,7 @@
namespace kraken::csv {
namespace {
// for convenience, use a stringstream which does not accept string_view inputs
csv_line_type parse_line(std::string const& line) {
csv_line_type read_csv_line(std::string const& line) {
auto parsed = csv_line_type{};
auto input = std::istringstream(line);
@ -18,8 +15,6 @@ csv_line_type parse_line(std::string const& line) {
return parsed;
}
} // namespace
csv_type read_csv(std::istream& input, CsvHeader header) {
auto parsed = std::vector<csv_line_type>{};
@ -30,7 +25,7 @@ csv_type read_csv(std::istream& input, CsvHeader header) {
continue;
}
parsed.emplace_back(parse_line(std::move(line)));
parsed.emplace_back(read_csv_line(std::move(line)));
}
return parsed;

View file

@ -14,6 +14,9 @@ enum class CsvHeader {
KEEP,
};
/// Parse a single CSV line, no error checking.
csv_line_type read_csv_line(std::string const& input);
/// Parse a CSV file from an input-stream, return a vector of parsed lines.
csv_type read_csv(std::istream& input, CsvHeader header = CsvHeader::SKIP);

View file

@ -10,6 +10,10 @@ csv::csv_type const& CsvEngineListener::output() const {
return output_;
}
csv::csv_type& CsvEngineListener::output() {
return output_;
}
void CsvEngineListener::on_acknowledgement(User user, UserOrderId id) {
output_.emplace_back(csv::csv_line_type{
"A",

View file

@ -14,6 +14,7 @@ struct CsvEngineListener : EngineListener {
virtual ~CsvEngineListener();
csv::csv_type const& output() const;
csv::csv_type& output();
/// Called when a new trade or cancel order has been acknowledged.
void on_acknowledgement(User user, UserOrderId id) override;

View file

@ -91,10 +91,14 @@ Engine::Engine(std::shared_ptr<EngineListener> listener,
CrossBehaviour cross_behaviour)
: listener_(listener), cross_behaviour_(cross_behaviour) {}
void Engine::process_single_order(Order const& order) {
std::visit([this](auto const& trade_order) { (*this)(trade_order); },
order);
}
void Engine::process_orders(std::vector<Order> const& orders) {
for (auto const& order : orders) {
std::visit([this](auto const& trade_order) { (*this)(trade_order); },
order);
process_single_order(order);
}
}

View file

@ -25,6 +25,9 @@ struct Engine {
Engine(std::shared_ptr<EngineListener> listener,
CrossBehaviour cross_behaviour = CrossBehaviour::REJECT);
/// Process a single order, triggerring the listener appropriately.
void process_single_order(Order const& order);
/// Process orders, triggerring the listener on each event.
void process_orders(std::vector<Order> const& orders);

View file

@ -1,5 +1,9 @@
#include <iostream>
#include <thread>
#include <boost/lockfree/spsc_queue.hpp>
#include "csv/read-csv.hh"
#include "csv/write-csv.hh"
#include "engine/csv-engine-listener.hh"
#include "engine/engine.hh"
@ -15,13 +19,41 @@ int main(int argc, char** argv) {
}
}
auto const orders = kraken::parse::parse_orders(std::cin);
// Up to 512 pending orders.
auto pending_orders = boost::lockfree::spsc_queue<kraken::Order>(512);
auto listener = std::make_shared<kraken::engine::CsvEngineListener>();
auto writer = std::jthread([&](std::stop_token stop_token) {
auto listener = std::make_shared<kraken::engine::CsvEngineListener>();
auto engine = kraken::engine::Engine(listener, cross_behaviour);
auto engine = kraken::engine::Engine(listener, cross_behaviour);
while (true) {
auto order = kraken::Order();
while (!pending_orders.pop(order)) {
// FIXME: busy wait
// Check that we didn't miss an order between last 'pop' and
// stop request, just in case.
if (stop_token.stop_requested() && pending_orders.empty()) {
return;
}
}
engine.process_single_order(order);
auto& output = listener->output();
kraken::csv::write_csv(std::cout, output);
output.resize(0);
}
});
engine.process_orders(orders);
auto reader = std::jthread([&]() {
for (std::string line; std::getline(std::cin, line);) {
auto const order = kraken::parse::parse_single_order(
kraken::csv::read_csv_line(line));
while (!pending_orders.push(order)) {
// FIXME: busy wait
}
}
// EOF, bring process orders and bring
writer.request_stop();
});
kraken::csv::write_csv(std::cout, listener->output());
reader.join();
}

View file

@ -57,20 +57,24 @@ CancelOrder cancel_from_raw(csv::csv_line_type const& raw_order) {
} // namespace
Order parse_single_order(csv::csv_line_type const& order) {
if (order[0] == "N") {
return trade_from_raw(order);
} else if (order[0] == "C") {
return cancel_from_raw(order);
} else if (order[0] == "F") {
return FlushOrder{};
} else {
throw ParseError("Not a valid order");
}
}
std::vector<Order> parse_orders(std::istream& input) {
auto const raw_orders = read_csv(input, kraken::csv::CsvHeader::KEEP);
auto orders = std::vector<Order>{};
for (auto const& raw_order : raw_orders) {
if (raw_order[0] == "N") {
orders.emplace_back(trade_from_raw(raw_order));
} else if (raw_order[0] == "C") {
orders.emplace_back(cancel_from_raw(raw_order));
} else if (raw_order[0] == "F") {
orders.emplace_back(FlushOrder{});
} else {
throw ParseError("Not a valid order");
}
orders.emplace_back(parse_single_order(raw_order));
}
return orders;

View file

@ -5,6 +5,7 @@
#include <vector>
#include "book/order.hh"
#include "csv/csv.hh"
namespace kraken::parse {
@ -12,6 +13,10 @@ struct ParseError : public std::runtime_error {
using std::runtime_error::runtime_error;
};
/// Convert a single CSV input line to an `Order`.
Order parse_single_order(csv::csv_line_type const& order);
/// Parse orders from an input stream as CSV.
std::vector<Order> parse_orders(std::istream& input);
} // namespace kraken::parse