From 7c2a25a6b00fd19ea6ec062468b6a63177380635 Mon Sep 17 00:00:00 2001 From: Bruno BELANYI Date: Thu, 24 Mar 2022 17:50:52 +0100 Subject: [PATCH] kraken: make multi-threaded --- src/kraken.cc | 47 +++++++++++++++++++++++++++++++++++++---------- 1 file changed, 37 insertions(+), 10 deletions(-) diff --git a/src/kraken.cc b/src/kraken.cc index e228e55..995dd76 100644 --- a/src/kraken.cc +++ b/src/kraken.cc @@ -1,4 +1,7 @@ #include +#include + +#include #include "csv/read-csv.hh" #include "csv/write-csv.hh" @@ -16,17 +19,41 @@ int main(int argc, char** argv) { } } - auto listener = std::make_shared(); - auto engine = kraken::engine::Engine(listener, cross_behaviour); + // Up to 512 pending orders. + auto pending_orders = boost::lockfree::spsc_queue(512); - for (std::string line; std::getline(std::cin, line);) { - auto const order = kraken::parse::parse_single_order( - kraken::csv::read_csv_line(line)); + auto writer = std::jthread([&](std::stop_token stop_token) { + auto listener = std::make_shared(); + auto engine = kraken::engine::Engine(listener, cross_behaviour); - engine.process_single_order(order); + while (true) { + auto order = kraken::Order(); + while (!pending_orders.pop(order)) { + // FIXME: busy wait + // Check that we didn't miss an order between last 'pop' and + // stop request, just in case. + if (stop_token.stop_requested() && pending_orders.empty()) { + return; + } + } + engine.process_single_order(order); + auto& output = listener->output(); + kraken::csv::write_csv(std::cout, output); + output.resize(0); + } + }); - auto& output = listener->output(); - kraken::csv::write_csv(std::cout, output); - output.resize(0); - } + auto reader = std::jthread([&]() { + for (std::string line; std::getline(std::cin, line);) { + auto const order = kraken::parse::parse_single_order( + kraken::csv::read_csv_line(line)); + while (!pending_orders.push(order)) { + // FIXME: busy wait + } + } + // EOF, bring process orders and bring + writer.request_stop(); + }); + + reader.join(); }