From 5ce2cfcc8fd1ac82b5bd22bedc12a59ef2f5e01a Mon Sep 17 00:00:00 2001 From: Ryan <16667079+mccaffers@users.noreply.github.com> Date: Thu, 25 Jun 2026 09:18:56 +0100 Subject: [PATCH 1/2] deduplicating Elastic logic, reorganising definitions, improving exception logging, and adding CLI options to the load command to support different strategies --- .github/workflows/build.yml | 6 + source/load/loadCommand.cppm | 18 ++- source/load/sweep/randomStrategySweep.cppm | 16 ++ source/main.cpp | 2 +- source/run/operations.cppm | 6 + source/run/reporting/elasticClient.cppm | 119 +-------------- source/run/reporting/tradingResults.cpp | 11 +- source/shared/redis/consumer/drainRuns.cpp | 150 +++++++++++++++++++ source/shared/redis/consumer/drainRuns.hpp | 32 ++++ source/shared/redis/consumer/redisRunner.cpp | 147 ++---------------- source/shared/redis/consumer/runQueue.cpp | 4 +- source/shared/redis/consumer/runQueue.hpp | 2 +- source/shared/reporting/elasticPublisher.cpp | 146 ++++++++++++++++++ source/shared/reporting/elasticPublisher.hpp | 37 +++++ source/shared/reporting/engineException.hpp | 32 ++++ 15 files changed, 462 insertions(+), 266 deletions(-) create mode 100644 source/shared/redis/consumer/drainRuns.cpp create mode 100644 source/shared/redis/consumer/drainRuns.hpp create mode 100644 source/shared/reporting/elasticPublisher.cpp create mode 100644 source/shared/reporting/elasticPublisher.hpp create mode 100644 source/shared/reporting/engineException.hpp diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml index f457627..8254a86 100644 --- a/.github/workflows/build.yml +++ b/.github/workflows/build.yml @@ -137,6 +137,11 @@ jobs: # Performs the SonarQube scan using the captured build commands and the # downloaded coverage report. Requires GitHub and SonarQube tokens. + # sonar.cfamily.enableModules turns on the analyzer's (experimental) C++20 + # named-modules support; this project is module-heavy (`import std;` plus + # .cppm interface units), so without it the analyzer can't parse those TUs. + # NB: keep these as --define args, not a `#` comment inside the folded + # `args: >` block — a `#` there would be passed to the scanner literally. - name: SonarQube Scan uses: SonarSource/sonarqube-scan-action@v4.2.1 env: @@ -145,4 +150,5 @@ jobs: with: args: > --define sonar.cfamily.compile-commands="${{ env.BUILD_WRAPPER_OUT_DIR }}/compile_commands.json" + --define sonar.cfamily.enableModules=true --define sonar.coverageReportPaths=artifact/sonarqube-generic-coverage.xml diff --git a/source/load/loadCommand.cppm b/source/load/loadCommand.cppm index 87e2d01..3574c14 100644 --- a/source/load/loadCommand.cppm +++ b/source/load/loadCommand.cppm @@ -27,18 +27,26 @@ import makeStrategy; // sweep::makeStrategy export class LoadCommand { public: - static int run(); + static int run(int argc, const char* argv[]); }; -int LoadCommand::run() { +int LoadCommand::run(const int argc, const char* argv[]) { // One RUN_ID identifies the whole sweep; each combination becomes its own // queue entry, distinguished by its parameter values. const auto runId = boost::uuids::to_string(boost::uuids::random_generator()()); - // Build random here - // TODO Configurable? maybe in the future ENV variable? - const auto generator = sweep::buildRandomStrategySweep(); + // Select the sweep generator from the command line, e.g. `load random`. + // Defaults to "random" — the only generator today — when omitted. An + // unknown name is a usage error, not a crash, so report it and bail. + const std::string_view sweepName = argc > 2 ? argv[2] : "random"; + sweep::ParameterGenerator generator; + try { + generator = sweep::buildSweep(sweepName); + } catch (const std::exception& ex) { + std::println(stderr, "LoadCommand: {}", ex.what()); + return 1; + } const auto combinations = generator.generateAllCombinations(); diff --git a/source/load/sweep/randomStrategySweep.cppm b/source/load/sweep/randomStrategySweep.cppm index 565c4e0..adc305f 100644 --- a/source/load/sweep/randomStrategySweep.cppm +++ b/source/load/sweep/randomStrategySweep.cppm @@ -6,6 +6,10 @@ module; +#include +#include +#include + #include "shared/utilities/parameterSweep.hpp" export module randomStrategySweep; @@ -24,4 +28,16 @@ ParameterGenerator buildRandomStrategySweep() { return generator; } +// Resolves a generator by name (as passed on the `load` command line). Throws +// std::invalid_argument for an unknown name so the caller can report it. A new +// generator is one extra branch here plus an entry in `valid`. +ParameterGenerator buildSweep(std::string_view name) { + constexpr std::string_view valid = "random"; + if (name == "random") { + return buildRandomStrategySweep(); + } + throw std::invalid_argument("unknown sweep generator '" + std::string(name) + + "' (valid: " + std::string(valid) + ")"); +} + } // namespace sweep diff --git a/source/main.cpp b/source/main.cpp index d4b7ad5..f7300cc 100644 --- a/source/main.cpp +++ b/source/main.cpp @@ -19,7 +19,7 @@ int main(const int argc, const char* argv[]) { const std::string_view subcommand = argv[1]; - if (subcommand == "load") return LoadCommand::run(); + if (subcommand == "load") return LoadCommand::run(argc, argv); if (subcommand == "run") return RunCommand::run(argc, argv); std::println(std::cerr, "Error: unknown subcommand '{}'.", subcommand); diff --git a/source/run/operations.cppm b/source/run/operations.cppm index 7eea7e7..297464a 100644 --- a/source/run/operations.cppm +++ b/source/run/operations.cppm @@ -7,6 +7,7 @@ module; #include "shared/utilities/backtestLog.hpp" +#include "shared/reporting/elasticPublisher.hpp" #include "shared/tradingDefinitions/config/configuration.hpp" #include "run/reporting/tradingResults.hpp" @@ -144,7 +145,12 @@ void Operations::run(const std::vector& ticks, } catch (const std::exception& e) { backtest_log::error(std::string("Operations: outcome put failed: ") + e.what()); + // Best-effort: also record the failure in Elasticsearch (noexcept). + elastic::putEngineException( + {elastic::nowIsoUtc(), "Operations", e.what(), config.RUN_ID}); } catch (...) { backtest_log::error("Operations: outcome put failed: unknown error"); + elastic::putEngineException( + {elastic::nowIsoUtc(), "Operations", "unknown error", config.RUN_ID}); } } diff --git a/source/run/reporting/elasticClient.cppm b/source/run/reporting/elasticClient.cppm index a4eb51b..b8e8dc9 100644 --- a/source/run/reporting/elasticClient.cppm +++ b/source/run/reporting/elasticClient.cppm @@ -6,131 +6,28 @@ module; -#include -#include -#include #include -#include "shared/utilities/backtestLog.hpp" -#include "shared/utilities/env.hpp" +#include "shared/reporting/elasticPublisher.hpp" #include "run/reporting/tradingResults.hpp" export module elasticClient; -import std; // replaces , - -// Minimal Elasticsearch HTTP client — PUT-only, for indexing run outcomes. -// Host is read from $ELASTIC_HOST (default http://localhost:9200) with optional -// HTTP basic auth from $ELASTIC_USER / $ELASTIC_USER_PASSWORD. Completed runs -// land in index "trading_results"; runs cut off early (loss limit) land in -// "trading_failures". Each doc gets a freshly generated UUID. +// Typed front-end over the shared Elasticsearch publisher (elastic::putDocument +// in shared/reporting). Completed runs land in index "trading_results"; runs +// cut off early (loss limit) land in "trading_failures". The transport, env +// config and auth all live in the shared publisher so the run path and the +// shared redis-consumer path report through one implementation. export class ElasticClient { public: static int putTradingResults(const TradingResults& results); static int putTradingFailure(const TradingFailure& failure); }; -namespace { - -void ensureCurlInit() { - static const struct CurlGlobal { - CurlGlobal() { curl_global_init(CURL_GLOBAL_ALL); } - ~CurlGlobal() { curl_global_cleanup(); } - } guard; - (void)guard; -} - -std::string generateUuid() { - static thread_local boost::uuids::random_generator gen; - return boost::uuids::to_string(gen()); -} - -// Swallow the response body so curl does not dump it to stdout (its default -// behaviour when no write callback is configured). -std::size_t discardResponse(char* /*ptr*/, std::size_t size, std::size_t nmemb, - void* /*userdata*/) { - return size * nmemb; -} - -// Shared PUT of one JSON document into the given index. Both public entry -// points only differ in target index and body shape. -int putDocument(const std::string& index, const std::string& body) { - // Allow runs to opt out of result reporting entirely (e.g. local backtests - // with no Elastic instance). On by default to preserve existing behaviour. - if (env::getOr("ELASTIC_ENABLED", "1") == "0") { - return 0; - } - - ensureCurlInit(); - - const std::string host = env::getOr("ELASTIC_HOST", "http://localhost:9200"); - const std::string user = env::getOr("ELASTIC_USER", ""); - const std::string password = env::getOr("ELASTIC_USER_PASSWORD", ""); - const std::string url = host + "/" + index + "/_doc/" + generateUuid(); - - CURL* curl = curl_easy_init(); - if (!curl) { - backtest_log::error("ElasticClient: curl_easy_init failed"); - return 1; - } - - struct curl_slist* headers = nullptr; - headers = curl_slist_append(headers, "Content-Type: application/json"); - - curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); - curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); - - // HTTP basic auth when credentials are supplied via the environment. - if (!user.empty()) { - curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); - curl_easy_setopt(curl, CURLOPT_USERNAME, user.c_str()); - curl_easy_setopt(curl, CURLOPT_PASSWORD, password.c_str()); - } - curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); - curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); - curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, static_cast(body.size())); - - // Discard the response body instead of letting curl print it to stdout. - curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, discardResponse); - - // Require TLS 1.2 or newer and enforce certificate / hostname verification - // for any HTTPS endpoint (Sonar cpp:S4423 / S5527). - curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); - curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 1L); - curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 2L); - - const CURLcode rc = curl_easy_perform(curl); - - long httpStatus = 0; - curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpStatus); - - curl_slist_free_all(headers); - curl_easy_cleanup(curl); - - if (rc != CURLE_OK) { - backtest_log::error(std::string("ElasticClient: PUT failed: ") - + curl_easy_strerror(rc)); - return 2; - } - if (httpStatus < 200 || httpStatus >= 300) { - backtest_log::error("ElasticClient: HTTP " + std::to_string(httpStatus) - + " from " + url); - return 3; - } - // Per-strategy success line is skipped under concurrent backtests (quiet). - if (!backtest_log::is_quiet()) { - std::cout << "ElasticClient: PUT " << url << " (HTTP " << httpStatus << ")" - << std::endl; - } - return 0; -} - -} // namespace - int ElasticClient::putTradingResults(const TradingResults& results) { - return putDocument("trading_results", nlohmann::json(results).dump()); + return elastic::putDocument("trading_results", nlohmann::json(results).dump()); } int ElasticClient::putTradingFailure(const TradingFailure& failure) { - return putDocument("trading_failures", nlohmann::json(failure).dump()); + return elastic::putDocument("trading_failures", nlohmann::json(failure).dump()); } diff --git a/source/run/reporting/tradingResults.cpp b/source/run/reporting/tradingResults.cpp index 8dd614b..4900473 100644 --- a/source/run/reporting/tradingResults.cpp +++ b/source/run/reporting/tradingResults.cpp @@ -6,17 +6,10 @@ #include "run/reporting/tradingResults.hpp" -#include -#include +#include "shared/reporting/elasticPublisher.hpp" std::string TradingResults::nowIsoUtc() { - const auto now = std::chrono::system_clock::to_time_t( - std::chrono::system_clock::now()); - std::tm tm_buf{}; - gmtime_r(&now, &tm_buf); - char buf[32]; - std::strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%SZ", &tm_buf); - return std::string{buf}; + return elastic::nowIsoUtc(); } void to_json(nlohmann::json& j, const TradingResultsStats& s) { diff --git a/source/shared/redis/consumer/drainRuns.cpp b/source/shared/redis/consumer/drainRuns.cpp new file mode 100644 index 0000000..48deeaf --- /dev/null +++ b/source/shared/redis/consumer/drainRuns.cpp @@ -0,0 +1,150 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#include "shared/redis/consumer/drainRuns.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "shared/utilities/jsonParser.hpp" +#include "shared/utilities/queueKeys.hpp" +#include "shared/utilities/threadPool.hpp" +#include "shared/reporting/elasticPublisher.hpp" +#include "shared/redis/consumer/runQueue.hpp" +#include "run/execution/runnerBridge.hpp" + +namespace asio = boost::asio; +namespace redis = boost::redis; + +namespace redis_runner { + +asio::awaitable drainRuns(std::shared_ptr conn, + std::string questdbHost) { + int exitCode = 0; + + try { + + // Get CPU threads available, max of 6 + const unsigned hw = std::thread::hardware_concurrency(); + ThreadPool pool(std::clamp(hw != 0 ? hw : 6u, 1u, 6u)); + + // Loop forever, claiming one run per iteration. An empty queue makes us + // wait and re-peek (below); only an exception blows the loop + bool waitingLogged = false; + for (;;) { + const std::optional descriptorB64 = co_await run_queue::peekRunTail(conn); + if (!descriptorB64.has_value()) { + // Queue empty: stay alive and poll until work reappears. The timer + // is co_awaited, so this suspends (not a busy wait) while keeping + // the io_context and the Redis connection alive. + if (!waitingLogged) { + std::println("RedisRunner: queue empty, waiting for work..."); + waitingLogged = true; + } + asio::steady_timer timer(co_await asio::this_coro::executor); + timer.expires_after(std::chrono::seconds(1)); + co_await timer.async_wait(asio::use_awaitable); + continue; // re-peek; never exit just because the queue is empty + } + waitingLogged = false; // got a run; re-arm the idle log for next time + + const tradingDefinitions::RunConfiguration runCfg = JsonParser::parseRunConfigurationFromBase64(*descriptorB64); + const std::string strategyKey = queue_keys::strategyKey(runCfg.RUN_ID); + std::println("RedisRunner: picked up RUN_ID={} SYMBOLS={} LAST_MONTHS={}", runCfg.RUN_ID, runCfg.SYMBOLS, runCfg.LAST_MONTHS); + + // Pull this run's tick data once, then reuse across every strategy. + // `ticks` is an opaque shared handle (see runnerBridge.hpp) so this + // TU never names PriceData or imports a module. + const std::shared_ptr ticks = + bridgeLoadTicks(questdbHost, runCfg.SYMBOLS, runCfg.LAST_MONTHS); + + // Backtests run on the pool but every task reads this run's `ticks` + // by reference (no copies). This guard joins all in-flight backtests + // before `ticks` is destroyed, even if a co_await below throws. + // wait() never throws, so it is safe during stack unwinding. + struct Quiesce { + ThreadPool& pool; + ~Quiesce() { pool.wait(); } + } quiesce{pool}; + + // Drain the run's strategy list, competing with any other workers. + // Redis stays on this coroutine thread; only the CPU-bound backtest + // is handed to the pool. submit() applies backpressure, so we keep + // popping at the rate the workers can absorb. + int strategiesRun = 0; + for (;;) { + const std::optional strategyB64 = + co_await run_queue::popStrategy(conn, strategyKey); + if (!strategyB64.has_value()) { + break; // strategy list drained + } + + // Reassemble the Configuration the rest of the pipeline expects. + // Parsing stays on this thread; the worker only runs the backtest. + tradingDefinitions::Configuration config{ + .RUN_ID = runCfg.RUN_ID, + .SYMBOLS = runCfg.SYMBOLS, + .LAST_MONTHS = runCfg.LAST_MONTHS, + .STARTING_BALANCE = runCfg.STARTING_BALANCE, + .MAX_LOSS_PERCENT = runCfg.MAX_LOSS_PERCENT, + .MAX_OPEN_TRADES = runCfg.MAX_OPEN_TRADES, + .REPORT_FAILURES = runCfg.REPORT_FAILURES, + .STRATEGY = JsonParser::parseStrategyFromBase64(*strategyB64), + }; + pool.submit([&ticks, cfg = std::move(config)]() { + bridgeRunOnTicks(*ticks, cfg); + }); + ++strategiesRun; + } + + // Wait for this run's backtests to finish, then surface the first + // failure (if any) so it aborts the drain exactly as the old + // synchronous call did. + pool.wait(); + if (std::exception_ptr err = pool.takeError()) { + std::rethrow_exception(err); + } + + // Retire the run. A failure here propagates and aborts the loop, we + // never re-peek the same run and reload its ticks in a tight loop. + co_await run_queue::removeRun(conn, *descriptorB64); + + std::println("RedisRunner: completed RUN_ID={} ({} strateg{})", + runCfg.RUN_ID, strategiesRun, + strategiesRun == 1 ? "y" : "ies"); + } + } catch (const std::exception& ex) { + std::println(stderr, "RedisRunner aborted: {}", ex.what()); + elastic::putEngineException( + {elastic::nowIsoUtc(), "drainRuns", ex.what(), ""}); + exitCode = 3; + } catch (...) { + std::println(stderr, "RedisRunner aborted: unknown error"); + elastic::putEngineException( + {elastic::nowIsoUtc(), "drainRuns", "unknown error", ""}); + exitCode = 3; + } + + // Only reached when an exception aborted the drain — the loop waits on an + // empty queue rather than exiting. Tear the connection down so + // io_context::run() can return. + conn->cancel(); + co_return exitCode; +} + +} // namespace redis_runner diff --git a/source/shared/redis/consumer/drainRuns.hpp b/source/shared/redis/consumer/drainRuns.hpp new file mode 100644 index 0000000..c1c6e8c --- /dev/null +++ b/source/shared/redis/consumer/drainRuns.hpp @@ -0,0 +1,32 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#pragma once + +#include +#include + +#include +#include + +// The runner's per-run drain loop, split out of redisRunner.cpp so that TU stays +// focused on orchestration (run()). Like runQueue, this is a purely textual unit: +// it uses ThreadPool, whose std::condition_variable_any::wait instantiation the +// toolchain miscompiles when the same TU also imports a C++ module, so it reaches +// the backtest engine only through runnerBridge.hpp's global-module functions and +// imports nothing itself. +namespace redis_runner { + +// Drains BACKTESTING_QUEUE_RUN on a single long-lived connection. For each run it +// loads the QuestDB ticks once, drains that run's strategy list, then retires the +// run. When the queue is empty it waits and re-peeks rather than exiting, so the +// worker stays up as a daemon; only a Redis/DB/decode error leaves the loop +// (return 3). +boost::asio::awaitable drainRuns( + std::shared_ptr conn, + std::string questdbHost); + +} // namespace redis_runner diff --git a/source/shared/redis/consumer/redisRunner.cpp b/source/shared/redis/consumer/redisRunner.cpp index 83a385f..7256f04 100644 --- a/source/shared/redis/consumer/redisRunner.cpp +++ b/source/shared/redis/consumer/redisRunner.cpp @@ -6,158 +6,26 @@ #include "shared/redis/consumer/redisRunner.hpp" -#include -#include -#include #include #include -#include #include #include -#include -#include #include #include -#include -#include -#include #include #include "shared/utilities/backtestLog.hpp" -#include "shared/utilities/jsonParser.hpp" -#include "shared/utilities/queueKeys.hpp" +#include "shared/reporting/elasticPublisher.hpp" #include "shared/redis/connection/redisConnection.hpp" -#include "shared/redis/consumer/runQueue.hpp" -#include "run/execution/runnerBridge.hpp" -#include "shared/utilities/threadPool.hpp" +#include "shared/redis/consumer/drainRuns.hpp" namespace asio = boost::asio; -namespace redis = boost::redis; - -namespace { - -// Drains BACKTESTING_QUEUE_RUN on a single long-lived connection. For each run it -// loads the QuestDB ticks once, drains that run's strategy list, then retires the -// run. When the queue is empty it waits and re-peeks rather than exiting, so the -// worker stays up as a daemon; only a Redis/DB/decode error leaves the loop -// (return 3). -asio::awaitable drainRuns(std::shared_ptr conn, - std::string questdbHost) { - int exitCode = 0; - - try { - - // Get CPU threads available, max of 6 - const unsigned hw = std::thread::hardware_concurrency(); - ThreadPool pool(std::clamp(hw == 0 ? 6u : hw, 1u, 6u)); - - // Loop forever, claiming one run per iteration. An empty queue makes us - // wait and re-peek (below); only an exception blows the loop - bool waitingLogged = false; - for (;;) { - const std::optional descriptorB64 = co_await run_queue::peekRunTail(conn); - if (!descriptorB64.has_value()) { - // Queue empty: stay alive and poll until work reappears. The timer - // is co_awaited, so this suspends (not a busy wait) while keeping - // the io_context and the Redis connection alive. - if (!waitingLogged) { - std::println("RedisRunner: queue empty, waiting for work..."); - waitingLogged = true; - } - asio::steady_timer timer(co_await asio::this_coro::executor); - timer.expires_after(std::chrono::seconds(1)); - co_await timer.async_wait(asio::use_awaitable); - continue; // re-peek; never exit just because the queue is empty - } - waitingLogged = false; // got a run; re-arm the idle log for next time - - const tradingDefinitions::RunConfiguration runCfg = JsonParser::parseRunConfigurationFromBase64(*descriptorB64); - const std::string strategyKey = queue_keys::strategyKey(runCfg.RUN_ID); - std::println("RedisRunner: picked up RUN_ID={} SYMBOLS={} LAST_MONTHS={}", runCfg.RUN_ID, runCfg.SYMBOLS, runCfg.LAST_MONTHS); - - // Pull this run's tick data once, then reuse across every strategy. - // `ticks` is an opaque shared handle (see runnerBridge.hpp) so this - // TU never names PriceData or imports a module. - const std::shared_ptr ticks = - bridgeLoadTicks(questdbHost, runCfg.SYMBOLS, runCfg.LAST_MONTHS); - - // Backtests run on the pool but every task reads this run's `ticks` - // by reference (no copies). This guard joins all in-flight backtests - // before `ticks` is destroyed, even if a co_await below throws. - // wait() never throws, so it is safe during stack unwinding. - struct Quiesce { - ThreadPool& pool; - ~Quiesce() { pool.wait(); } - } quiesce{pool}; - - // Drain the run's strategy list, competing with any other workers. - // Redis stays on this coroutine thread; only the CPU-bound backtest - // is handed to the pool. submit() applies backpressure, so we keep - // popping at the rate the workers can absorb. - int strategiesRun = 0; - for (;;) { - const std::optional strategyB64 = - co_await run_queue::popStrategy(conn, strategyKey); - if (!strategyB64.has_value()) { - break; // strategy list drained - } - - // Reassemble the Configuration the rest of the pipeline expects. - // Parsing stays on this thread; the worker only runs the backtest. - tradingDefinitions::Configuration config{ - .RUN_ID = runCfg.RUN_ID, - .SYMBOLS = runCfg.SYMBOLS, - .LAST_MONTHS = runCfg.LAST_MONTHS, - .STARTING_BALANCE = runCfg.STARTING_BALANCE, - .MAX_LOSS_PERCENT = runCfg.MAX_LOSS_PERCENT, - .MAX_OPEN_TRADES = runCfg.MAX_OPEN_TRADES, - .REPORT_FAILURES = runCfg.REPORT_FAILURES, - .STRATEGY = JsonParser::parseStrategyFromBase64(*strategyB64), - }; - pool.submit([&ticks, cfg = std::move(config)]() { - bridgeRunOnTicks(*ticks, cfg); - }); - ++strategiesRun; - } - - // Wait for this run's backtests to finish, then surface the first - // failure (if any) so it aborts the drain exactly as the old - // synchronous call did. - pool.wait(); - if (std::exception_ptr err = pool.takeError()) { - std::rethrow_exception(err); - } - - // Retire the run. A failure here propagates and aborts the loop, we - // never re-peek the same run and reload its ticks in a tight loop. - co_await run_queue::removeRun(conn, *descriptorB64); - - std::println("RedisRunner: completed RUN_ID={} ({} strateg{})", - runCfg.RUN_ID, strategiesRun, - strategiesRun == 1 ? "y" : "ies"); - } - } catch (const std::exception& ex) { - std::println(stderr, "RedisRunner aborted: {}", ex.what()); - exitCode = 3; - } catch (...) { - std::println(stderr, "RedisRunner aborted: unknown error"); - exitCode = 3; - } - - // Only reached when an exception aborted the drain — the loop waits on an - // empty queue rather than exiting. Tear the connection down so - // io_context::run() can return. - conn->cancel(); - co_return exitCode; -} - -} // namespace int RedisRunner::run(const std::string& questdbHost, const std::string& redisHost, const int redisPort) { - + backtest_log::set_quiet(true); // Mute logs to prevent interleaved thread spam asio::io_context ioc; // Set up the async event loop @@ -170,7 +38,7 @@ int RedisRunner::run(const std::string& questdbHost, // Launch the async task to process runs asio::co_spawn( ioc, - drainRuns(conn, questdbHost), + redis_runner::drainRuns(conn, questdbHost), [&result, &error](std::exception_ptr e, int r) { // Completion callback if (e) { error = e; // Save exception if it failed @@ -188,10 +56,15 @@ int RedisRunner::run(const std::string& questdbHost, std::rethrow_exception(error); } catch (const std::exception& ex) { std::println(stderr, "RedisRunner failed: {}", ex.what()); + // Surface the top-level failure in Elasticsearch alongside run + // outcomes. No RUN_ID is in scope here — this is a whole-process + // failure, not a single run. + elastic::putEngineException( + {elastic::nowIsoUtc(), "RedisRunner", ex.what(), ""}); } return 3; // Exit with error status } // Return the final execution status code return result; -} \ No newline at end of file +} diff --git a/source/shared/redis/consumer/runQueue.cpp b/source/shared/redis/consumer/runQueue.cpp index 2946679..850300a 100644 --- a/source/shared/redis/consumer/runQueue.cpp +++ b/source/shared/redis/consumer/runQueue.cpp @@ -24,8 +24,8 @@ namespace { // A nil reply (RPOP/LINDEX out of range) maps to nullopt. The connection is NOT // cancelled here, it stays open for the rest of the worker loop. asio::awaitable> execOptionalString( - std::shared_ptr conn, - redis::request req) { + const std::shared_ptr conn, + const redis::request req) { redis::response> resp; co_await conn->async_exec(req, resp, asio::use_awaitable); co_return std::move(std::get<0>(resp).value()); diff --git a/source/shared/redis/consumer/runQueue.hpp b/source/shared/redis/consumer/runQueue.hpp index f35f73e..e594c35 100644 --- a/source/shared/redis/consumer/runQueue.hpp +++ b/source/shared/redis/consumer/runQueue.hpp @@ -16,7 +16,7 @@ // Redis queue access for the runner's drain loop. These coroutines issue one // command each on the shared, long-lived connection and never cancel it — the // connection stays open for the rest of the worker loop. Keeping them out of -// redisRunner.cpp leaves that TU focused on orchestration (drainRuns + run()). +// the drain loop (see drainRuns.cpp) keeps each TU small and single-purpose. namespace run_queue { // Non-destructively reads the run that RPOP would take (the queue tail, i.e. the diff --git a/source/shared/reporting/elasticPublisher.cpp b/source/shared/reporting/elasticPublisher.cpp new file mode 100644 index 0000000..4b8326e --- /dev/null +++ b/source/shared/reporting/elasticPublisher.cpp @@ -0,0 +1,146 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#include "shared/reporting/elasticPublisher.hpp" + +#include +#include +#include +#include +#include + +#include +#include +#include +#include + +#include "shared/utilities/backtestLog.hpp" +#include "shared/utilities/env.hpp" + +namespace { + +void ensureCurlInit() { + static const struct CurlGlobal { + CurlGlobal() { curl_global_init(CURL_GLOBAL_ALL); } + ~CurlGlobal() { curl_global_cleanup(); } + } guard; + (void)guard; +} + +std::string generateUuid() { + static thread_local boost::uuids::random_generator gen; + return boost::uuids::to_string(gen()); +} + +// Swallow the response body so curl does not dump it to stdout (its default +// behaviour when no write callback is configured). +std::size_t discardResponse(char* /*ptr*/, std::size_t size, std::size_t nmemb, + void* /*userdata*/) { + return size * nmemb; +} + +} // namespace + +namespace elastic { + +std::string nowIsoUtc() { + const auto now = std::chrono::system_clock::to_time_t( + std::chrono::system_clock::now()); + std::tm tm_buf{}; + gmtime_r(&now, &tm_buf); + char buf[32]; + std::strftime(buf, sizeof(buf), "%Y-%m-%dT%H:%M:%SZ", &tm_buf); + return std::string{buf}; +} + +int putDocument(const std::string& index, const std::string& body) { + // Allow runs to opt out of result reporting entirely (e.g. local backtests + // with no Elastic instance). On by default to preserve existing behaviour. + if (env::getOr("ELASTIC_ENABLED", "1") == "0") { + return 0; + } + + ensureCurlInit(); + + const std::string host = env::getOr("ELASTIC_HOST", "http://localhost:9200"); + const std::string user = env::getOr("ELASTIC_USER", ""); + const std::string password = env::getOr("ELASTIC_USER_PASSWORD", ""); + const std::string url = host + "/" + index + "/_doc/" + generateUuid(); + + CURL* curl = curl_easy_init(); + if (!curl) { + backtest_log::error("ElasticPublisher: curl_easy_init failed"); + return 1; + } + + struct curl_slist* headers = nullptr; + headers = curl_slist_append(headers, "Content-Type: application/json"); + + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_HTTPHEADER, headers); + + // HTTP basic auth when credentials are supplied via the environment. + if (!user.empty()) { + curl_easy_setopt(curl, CURLOPT_HTTPAUTH, CURLAUTH_BASIC); + curl_easy_setopt(curl, CURLOPT_USERNAME, user.c_str()); + curl_easy_setopt(curl, CURLOPT_PASSWORD, password.c_str()); + } + curl_easy_setopt(curl, CURLOPT_CUSTOMREQUEST, "PUT"); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.c_str()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, static_cast(body.size())); + + // Discard the response body instead of letting curl print it to stdout. + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, discardResponse); + + // Require TLS 1.2 or newer and enforce certificate / hostname verification + // for any HTTPS endpoint (Sonar cpp:S4423 / S5527). + curl_easy_setopt(curl, CURLOPT_SSLVERSION, CURL_SSLVERSION_TLSv1_2); + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYPEER, 1L); + curl_easy_setopt(curl, CURLOPT_SSL_VERIFYHOST, 2L); + + const CURLcode rc = curl_easy_perform(curl); + + long httpStatus = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &httpStatus); + + curl_slist_free_all(headers); + curl_easy_cleanup(curl); + + if (rc != CURLE_OK) { + backtest_log::error(std::string("ElasticPublisher: PUT failed: ") + + curl_easy_strerror(rc)); + return 2; + } + if (httpStatus < 200 || httpStatus >= 300) { + backtest_log::error("ElasticPublisher: HTTP " + std::to_string(httpStatus) + + " from " + url); + return 3; + } + // Per-strategy success line is skipped under concurrent backtests (quiet). + if (!backtest_log::is_quiet()) { + std::cout << "ElasticPublisher: PUT " << url << " (HTTP " << httpStatus + << ")" << std::endl; + } + return 0; +} + +int putEngineException(const EngineException& ex) noexcept { + // Callers report from inside catch handlers, so this must never throw back + // out. nlohmann's dump() throws type_error.316 on invalid UTF-8, and the + // message is arbitrary exception text — guard it explicitly. + try { + return putDocument("engine_exceptions", nlohmann::json(ex).dump()); + } catch (const std::exception& e) { + backtest_log::error( + std::string("ElasticPublisher: failed to report engine exception: ") + + e.what()); + return -1; + } catch (...) { + return -1; + } +} + +} // namespace elastic diff --git a/source/shared/reporting/elasticPublisher.hpp b/source/shared/reporting/elasticPublisher.hpp new file mode 100644 index 0000000..5f82dab --- /dev/null +++ b/source/shared/reporting/elasticPublisher.hpp @@ -0,0 +1,37 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#pragma once + +#include + +#include "shared/reporting/engineException.hpp" + +// Minimal Elasticsearch HTTP client — PUT-only, for indexing run outcomes and +// engine exceptions. Host is read from $ELASTIC_HOST (default +// http://localhost:9200) with optional HTTP basic auth from $ELASTIC_USER / +// $ELASTIC_USER_PASSWORD. Reporting can be disabled entirely with +// $ELASTIC_ENABLED=0. +// +// Lives in shared/ (as a plain header, not a module) so both the run path +// (elasticClient) and the shared redis-consumer path (redisRunner, drainRuns, +// which import nothing) can report through one implementation. +namespace elastic { + +// PUT one JSON document into `index`, with a freshly generated UUID id. +// Returns 0 on success (or when reporting is disabled), non-zero otherwise. +int putDocument(const std::string& index, const std::string& body); + +// ISO-8601 UTC timestamp ("YYYY-MM-DDTHH:MM:SSZ"). +std::string nowIsoUtc(); + +// Convenience: serialise and PUT an engine exception into "engine_exceptions". +// noexcept by contract — every caller invokes this from a catch handler where a +// throw would mask the original failure (and, on a worker, abort the drain +// loop). Serialisation/transport errors are logged and swallowed. +int putEngineException(const EngineException& ex) noexcept; + +} // namespace elastic diff --git a/source/shared/reporting/engineException.hpp b/source/shared/reporting/engineException.hpp new file mode 100644 index 0000000..d264301 --- /dev/null +++ b/source/shared/reporting/engineException.hpp @@ -0,0 +1,32 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#pragma once + +#include + +#include + +// Wire shape for an engine-level exception (as opposed to a trading result or +// failure). Captures the failure so it lands in Elasticsearch alongside run +// outcomes for searchability. `source` names the component that caught it +// (e.g. "RedisRunner", "drainRuns", "Operations") and `RUN_ID` is filled when +// a run is in scope, empty for top-level failures. +struct EngineException { + std::string timestamp; // serialised as @timestamp for Kibana + std::string source; + std::string message; + std::string RUN_ID; +}; + +inline void to_json(nlohmann::json& j, const EngineException& e) { + j = nlohmann::json{ + {"@timestamp", e.timestamp}, + {"source", e.source}, + {"message", e.message}, + {"RUN_ID", e.RUN_ID}, + }; +} From 9c97a8ef3dcb40ac4af775a34bc5107ddb2083ef Mon Sep 17 00:00:00 2001 From: Ryan <16667079+mccaffers@users.noreply.github.com> Date: Sat, 27 Jun 2026 12:33:35 +0100 Subject: [PATCH 2/2] Refactoring trading results to be numbers instead of strings --- scripts/ingest.sh | 42 +++++ source/ingest/ingestCommand.cppm | 97 +++++++++++ source/ingest/questdbIngestClient.cpp | 197 +++++++++++++++++++++++ source/ingest/questdbIngestClient.hpp | 58 +++++++ source/ingest/tickPacket.cppm | 134 +++++++++++++++ source/ingest/udpPorts.hpp | 20 +++ source/ingest/udpReceiver.cpp | 117 ++++++++++++++ source/ingest/udpReceiver.hpp | 51 ++++++ source/main.cpp | 2 + source/run/reporting/tradingResults.cpp | 45 ++++-- source/shared/utilities/decimalJson.hpp | 9 ++ source/shared/utilities/symbolScale.cppm | 128 ++++++++------- tests/CMakeLists.txt | 1 + tests/tickPacket.cpp | 151 +++++++++++++++++ 14 files changed, 983 insertions(+), 69 deletions(-) create mode 100755 scripts/ingest.sh create mode 100644 source/ingest/ingestCommand.cppm create mode 100644 source/ingest/questdbIngestClient.cpp create mode 100644 source/ingest/questdbIngestClient.hpp create mode 100644 source/ingest/tickPacket.cppm create mode 100644 source/ingest/udpPorts.hpp create mode 100644 source/ingest/udpReceiver.cpp create mode 100644 source/ingest/udpReceiver.hpp create mode 100644 tests/tickPacket.cpp diff --git a/scripts/ingest.sh b/scripts/ingest.sh new file mode 100755 index 0000000..bc42579 --- /dev/null +++ b/scripts/ingest.sh @@ -0,0 +1,42 @@ +#!/bin/bash +# Builds the engine and runs the `ingest` subcommand: it binds a UDP socket and +# streams decoded ticks (tickPacket) into the converted QuestDB via ILP-over-HTTP +# (source/ingest/ingestCommand.cppm). Blocks until SIGINT/SIGTERM. +# +# Config is read from the environment by IngestCommand (defaults in parens): +# QUESTDB_HOST QuestDB host (127.0.0.1) +# QUESTDB_ILP_PORT converted-DB ILP/HTTP port (9001) +# INGEST_BIND_ADDR UDP bind address (127.0.0.1) +# INGEST_UDP_PORT UDP bind port (11111, == UDPPorts.PortSave) +# +# An optional first argument overrides the bind port (argv[2] to the engine): +# sh scripts/ingest.sh # bind 11111 (or $INGEST_UDP_PORT) +# sh scripts/ingest.sh 22222 # bind 22222 + +current_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )" + +if ! source "$current_dir/build.sh"; then + echo "Error: Build failed. Aborting." + exit 1 +fi + +if [ ! -f "$BUILD_DIR/$EXECUTABLE_NAME" ]; then + echo "Error: Executable $EXECUTABLE_NAME not found in $BUILD_DIR." + ls -la "$BUILD_DIR" + exit 1 +fi + +# Non-fatal QuestDB reachability check: the receiver still binds and buffers if +# QuestDB is down (the writer sheds once its buffer fills), so warn rather than +# abort. +quest_host="${QUESTDB_HOST:-127.0.0.1}" +quest_port="${QUESTDB_ILP_PORT:-9001}" +# Probe a real endpoint (/exec), not "/": QuestDB's web console root can stall +# indefinitely, which made a 1s timeout report a healthy DB as unreachable. +if ! curl --fail --silent --max-time 2 "http://$quest_host:$quest_port/exec?query=SELECT%201" >/dev/null 2>&1; then + echo "Warning: QuestDB not reachable at http://$quest_host:$quest_port — writes will be dropped until it comes up." +fi + +# exec so signals reach the engine directly for clean shutdown. "$@" passes an +# optional bind-port override straight through to the subcommand. +exec ./"$BUILD_DIR/$EXECUTABLE_NAME" ingest "$@" diff --git a/source/ingest/ingestCommand.cppm b/source/ingest/ingestCommand.cppm new file mode 100644 index 0000000..b4e018c --- /dev/null +++ b/source/ingest/ingestCommand.cppm @@ -0,0 +1,97 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- +// +// ingestCommand — the `ingest` subcommand. Receives tick datagrams over UDP, +// decodes them (tickPacket) and persists each into the local QuestDB via ILP +// over HTTP (questdbIngestClient), so the backtester's `run` path can read them +// back through the unchanged DatabaseConnection. +// +// The GMF only #includes Asio-/curl-free headers (the receiver and writer hide +// those behind pimpls), so it is safe to `import std` here. + +module; + +#include "ingest/questdbIngestClient.hpp" +#include "ingest/udpPorts.hpp" +#include "ingest/udpReceiver.hpp" +#include "shared/utilities/env.hpp" + +export module ingestCommand; + +import std; +import priceData; // PriceData +import tickPacket; // ingest::decodeTick + +export class IngestCommand { +public: + static int run(int argc, const char* argv[]); +}; + +namespace { + +// Parse a port from a string, returning `fallback` on empty/garbage input so a +// stray env var can't crash startup. +std::uint16_t parsePort(std::string_view text, std::uint16_t fallback) { + unsigned value = 0; + const auto [ptr, ec] = std::from_chars(text.data(), text.data() + text.size(), value); + if (ec != std::errc{} || value == 0 || value > 65535) { + return fallback; + } + return static_cast(value); +} + +} // namespace + +int IngestCommand::run(const int argc, const char* argv[]) { + const std::string questHost = env::getOr("QUESTDB_HOST", "127.0.0.1"); + // 9001 is the converted (INT32) QuestDB's HTTP/ILP port — the instance the + // backtester reads and that convert.py writes to. (9000 is the legacy DOUBLE + // source DB.) Override with $QUESTDB_ILP_PORT. + const std::uint16_t ilpPort = parsePort(env::getOr("QUESTDB_ILP_PORT", ""), 9001); + const std::string bindAddr = env::getOr("INGEST_BIND_ADDR", "127.0.0.1"); + + // Bind port: CLI arg (argv[2]) overrides $INGEST_UDP_PORT overrides kSave. + std::uint16_t bindPort = parsePort(env::getOr("INGEST_UDP_PORT", ""), udp_ports::kSave); + if (argc >= 3) { + bindPort = parsePort(argv[2], bindPort); + } + + ingest::QuestdbIngestClient writer(questHost, ilpPort); + + std::atomic received{0}; + std::atomic dropped{0}; + + ingest::UdpReceiver receiver( + bindAddr, bindPort, [&](std::span bytes) { + const auto tick = ingest::decodeTick(bytes); + if (!tick) { + dropped.fetch_add(1, std::memory_order_relaxed); + return; + } + // ILP line: table = symbol, integer fields ask/bid, designated + // timestamp in nanoseconds (QuestDB HTTP default precision). This + // yields exactly the ask/bid/timestamp columns the read path expects. + const auto tsNanos = std::chrono::duration_cast( + tick->timestamp.time_since_epoch()) + .count(); + writer.enqueueLine(std::format("{} ask={}i,bid={}i {}\n", + tick->symbol, tick->ask, tick->bid, tsNanos)); + received.fetch_add(1, std::memory_order_relaxed); + }); + + std::println("IngestCommand: starting; binding udp://{}:{} -> QuestDB ILP http://{}:{}/write", + bindAddr, bindPort, questHost, ilpPort); + + if (!receiver.run()) { // binds the socket, then blocks until SIGINT/SIGTERM + return 1; // bind failed (bad address / port in use) — logged + } + + // decode-dropped: bad size / unknown symbol. queue-dropped: shed by the + // writer when QuestDB couldn't keep up and the buffer hit its cap. + std::println("IngestCommand: shutting down (received={}, decode-dropped={}, queue-dropped={})", + received.load(), dropped.load(), writer.droppedLines()); + return 0; +} diff --git a/source/ingest/questdbIngestClient.cpp b/source/ingest/questdbIngestClient.cpp new file mode 100644 index 0000000..af6a06a --- /dev/null +++ b/source/ingest/questdbIngestClient.cpp @@ -0,0 +1,197 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- +// +// QuestDB ILP-over-HTTP writer. libcurl and the batching worker thread are +// confined to this classic translation unit (no `import std`), so curl's +// C-macro pollution never reaches the module boundary — the same isolation +// elasticPublisher.cpp uses for the Elasticsearch client. + +#include "ingest/questdbIngestClient.hpp" + +#include +#include +#include +#include +#include +#include +#include +#include + +#include + +#include "shared/utilities/backtestLog.hpp" + +namespace { + +// Bound how long a single POST can block, so a slow/black-holed QuestDB can't +// wedge the worker thread (and therefore shutdown, which joins it) indefinitely. +constexpr long kConnectTimeoutSeconds = 5; +constexpr long kTransferTimeoutSeconds = 10; + +void ensureCurlInit() { + static const struct CurlGlobal { + CurlGlobal() { curl_global_init(CURL_GLOBAL_ALL); } + ~CurlGlobal() { curl_global_cleanup(); } + } guard; + (void)guard; +} + +// Swallow the response body so curl does not dump it to stdout (its default when +// no write callback is set). QuestDB replies 204 on success, 400 + a JSON body +// describing the offending line on a parse/type error. +std::size_t discardResponse(char* /*ptr*/, std::size_t size, std::size_t nmemb, + void* /*userdata*/) { + return size * nmemb; +} + +} // namespace + +namespace ingest { + +struct QuestdbIngestClient::Impl { + std::string url; // http://host:port/write + std::size_t batchSize; + std::chrono::milliseconds flushInterval; + std::size_t maxQueue; + + std::mutex mtx; + std::condition_variable cv; // plain CV (not _any): avoids the + // import-std condition_variable_any + // link bug, and this is a classic TU + std::deque queue; + std::size_t droppedTotal = 0; // guarded by mtx + + CURL* curl = nullptr; // owned by the worker thread only + std::jthread worker; + + Impl(std::string host, std::uint16_t port, std::size_t batch, + std::chrono::milliseconds interval, std::size_t maxQueueLines) + : url("http://" + std::move(host) + ":" + std::to_string(port) + "/write"), + batchSize(batch), + flushInterval(interval), + maxQueue(maxQueueLines) { + ensureCurlInit(); + curl = curl_easy_init(); + if (!curl) { + backtest_log::error("QuestdbIngestClient: curl_easy_init failed"); + } + worker = std::jthread([this](std::stop_token st) { workerLoop(st); }); + } + + ~Impl() { + worker.request_stop(); + cv.notify_all(); + worker.join(); // drain the queue before tearing down curl + if (curl) { + curl_easy_cleanup(curl); + } + } + + void workerLoop(std::stop_token st) { + while (!st.stop_requested()) { + { + std::unique_lock lock(mtx); + cv.wait_for(lock, flushInterval, + [&] { return !queue.empty() || st.stop_requested(); }); + } + drainAndPost(); + } + // Final drain after stop so queued ticks are not lost on shutdown. One + // batch per POST keeps each body bounded by the transfer timeout. + while (drainAndPost()) { + } + } + + // Move up to batchSize lines off the queue and POST them. Returns false when + // the queue was empty (nothing sent). + bool drainAndPost() { + std::string batch; + { + std::unique_lock lock(mtx); + const std::size_t n = std::min(queue.size(), batchSize); + for (std::size_t i = 0; i < n; ++i) { + batch += std::move(queue.front()); + queue.pop_front(); + } + } + if (batch.empty()) { + return false; + } + post(batch); + return true; + } + + void post(const std::string& body) { + if (!curl) { + return; // init failed earlier; error already logged + } + curl_easy_reset(curl); + curl_easy_setopt(curl, CURLOPT_URL, url.c_str()); + curl_easy_setopt(curl, CURLOPT_POST, 1L); + curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.data()); + curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, static_cast(body.size())); + curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, discardResponse); + // Bound blocking time so a dead/slow QuestDB can't wedge the worker. + curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, kConnectTimeoutSeconds); + curl_easy_setopt(curl, CURLOPT_TIMEOUT, kTransferTimeoutSeconds); + + const CURLcode rc = curl_easy_perform(curl); + if (rc != CURLE_OK) { + backtest_log::error(std::string("QuestdbIngestClient: POST failed: ") + + curl_easy_strerror(rc)); + return; + } + long status = 0; + curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status); + if (status < 200 || status >= 300) { + backtest_log::error("QuestdbIngestClient: HTTP " + std::to_string(status) + + " from " + url); + } + } +}; + +QuestdbIngestClient::QuestdbIngestClient(std::string host, std::uint16_t port, + std::size_t batchSize, + std::chrono::milliseconds flushInterval, + std::size_t maxQueue) + : impl_(std::make_unique(std::move(host), port, batchSize, flushInterval, + maxQueue)) {} + +QuestdbIngestClient::~QuestdbIngestClient() = default; + +void QuestdbIngestClient::enqueueLine(std::string ilpLine) { + bool wake = false; + std::size_t dropped = 0; + { + std::scoped_lock lock(impl_->mtx); + if (impl_->queue.size() >= impl_->maxQueue) { + // Queue is full (QuestDB can't keep up). Drop the oldest line to keep + // memory bounded and retain the freshest ticks. + impl_->queue.pop_front(); + dropped = ++impl_->droppedTotal; + } + impl_->queue.push_back(std::move(ilpLine)); + wake = impl_->queue.size() >= impl_->batchSize; + } + // Warn on the first drop and then sparsely, so a sustained overload doesn't + // flood the log. Done outside the lock. + if (dropped == 1 || (dropped != 0 && dropped % 100000 == 0)) { + backtest_log::error("QuestdbIngestClient: queue full (cap " + + std::to_string(impl_->maxQueue) + + "), dropping oldest lines; total dropped=" + + std::to_string(dropped)); + } + if (wake) { + impl_->cv.notify_one(); // a full batch is ready — flush now + } +} + +std::size_t QuestdbIngestClient::droppedLines() const { + std::scoped_lock lock(impl_->mtx); + return impl_->droppedTotal; +} + +} // namespace ingest diff --git a/source/ingest/questdbIngestClient.hpp b/source/ingest/questdbIngestClient.hpp new file mode 100644 index 0000000..7fa8a1b --- /dev/null +++ b/source/ingest/questdbIngestClient.hpp @@ -0,0 +1,58 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#pragma once + +#include +#include +#include +#include +#include + +// Writes ticks into QuestDB using the InfluxDB Line Protocol over HTTP +// (POST http://:/write), reusing the project's existing libcurl +// dependency (mirrors elasticPublisher). Lines are queued from the receive +// thread and flushed in batches by an internal worker thread, so a slow QuestDB +// never stalls the UDP receive loop. +// +// The libcurl implementation and the worker thread live entirely in the .cpp +// (pimpl), keeping this header light enough to #include from a module's global +// module fragment. +namespace ingest { + +class QuestdbIngestClient { +public: + // host/port address the QuestDB ILP-over-HTTP endpoint. batchSize caps how + // many lines go in one POST; flushInterval bounds how long a partial batch + // waits before being sent. maxQueue caps how many unsent lines are buffered + // when QuestDB is slow/down — past it the oldest lines are dropped so memory + // stays bounded rather than growing without limit. + explicit QuestdbIngestClient(std::string host, + std::uint16_t port = 9001, + std::size_t batchSize = 1000, + std::chrono::milliseconds flushInterval = + std::chrono::milliseconds(100), + std::size_t maxQueue = 1'000'000); + ~QuestdbIngestClient(); + + QuestdbIngestClient(const QuestdbIngestClient&) = delete; + QuestdbIngestClient& operator=(const QuestdbIngestClient&) = delete; + + // Queue one ready-to-send ILP line (must already end in '\n'). Thread-safe + // and non-blocking; the worker thread does the actual HTTP POST. If the queue + // is at capacity the oldest queued line is dropped to make room. + void enqueueLine(std::string ilpLine); + + // Total lines dropped so far because the queue was full (QuestDB couldn't + // keep up). Thread-safe. + [[nodiscard]] std::size_t droppedLines() const; + +private: + struct Impl; + std::unique_ptr impl_; +}; + +} // namespace ingest diff --git a/source/ingest/tickPacket.cppm b/source/ingest/tickPacket.cppm new file mode 100644 index 0000000..1624e35 --- /dev/null +++ b/source/ingest/tickPacket.cppm @@ -0,0 +1,134 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- +// +// tickPacket — the on-the-wire UDP tick format and its decoder. +// +// Replaces the C# side's MemoryPack serialisation (which has no C++ port) with a +// fixed, language-neutral binary layout that both ends produce/parse by hand. +// Each datagram is exactly kPacketSize bytes, little-endian: +// +// offset size field +// 0 8 double bid // raw price, e.g. 1.10000 +// 8 8 double ask +// 16 8 int64 tsMicros // Unix epoch microseconds, UTC +// 24 16 char symbol[16] // ASCII, NUL-padded, e.g. "EURUSD\0\0..." +// // total = 40 bytes +// +// x86-64 and ARM64 (the only deploy targets) are little-endian, so the bytes map +// straight onto the native scalars with no byte-swapping; the C# sender writes +// little-endian explicitly to make that contract intentional. +// +// decodeTick turns those bytes into the engine's existing PriceData: it scales +// the real bid/ask into the stored INT32 "points" via the symbol's price +// multiplier (symbol_scale::getPriceScale) so the ingest writes exactly what the +// backtester's read path already expects. + +export module tickPacket; + +import std; // , , , , , , + // , , , , +import priceData; // PriceData +import symbolScale; // symbol_scale::getPriceScale, kUnknown + +export namespace ingest { + +// Fixed packet geometry (see the header comment for the byte map). +inline constexpr std::size_t kBidOffset = 0; +inline constexpr std::size_t kAskOffset = 8; +inline constexpr std::size_t kTsOffset = 16; +inline constexpr std::size_t kSymbolOffset = 24; +inline constexpr std::size_t kSymbolSize = 16; +inline constexpr std::size_t kPacketSize = 40; + +namespace detail { + +// Reinterpret the first sizeof(T) bytes of `bytes` as a little-endian T. On the +// little-endian deploy targets this is a straight bit_cast (no swap); bit_cast +// keeps it well-defined (no reinterpret_cast / aliasing UB). +template +[[nodiscard]] T readLE(std::span bytes) noexcept { + std::array raw{}; + std::ranges::copy(bytes.first(sizeof(T)), raw.begin()); + return std::bit_cast(raw); +} + +} // namespace detail + +// Decode one datagram into a PriceData. Returns nullopt — i.e. "drop this tick" +// rather than a hard error, since UDP is best-effort and a stray/garbled or +// malformed datagram must not take the ingest down — for any of: +// - a wrong-sized packet, +// - a zero bid or ask (the bad-data signature found in the historical feed; +// the C# producer already guards this at SubListener.cs:126 — we re-check +// here as defense in depth against a future non-guarding producer), +// - a non-positive timestamp (unset / pre-1970 — the null/epoch signature of +// the same bad rows), +// - an unknown symbol (multiplier 0), +// - a scaled price that overflows INT32 (latent: real prices sit ~300x below +// the limit, but we never silently store a truncated value). +[[nodiscard]] inline std::optional decodeTick(std::span bytes) { + if (bytes.size() != kPacketSize) { + return std::nullopt; + } + + const double bid = detail::readLE(bytes.subspan(kBidOffset)); + const double ask = detail::readLE(bytes.subspan(kAskOffset)); + const std::int64_t tsMicros = detail::readLE(bytes.subspan(kTsOffset)); + + // Drop zero-price ticks: a real quote always has a non-zero bid and ask, so + // a 0 here is corrupt/unset data, not a tradable price. + if (bid == 0.0 || ask == 0.0) { + return std::nullopt; + } + + // Drop unset / pre-epoch timestamps: a real tick is always strictly after + // the Unix epoch, so <= 0 is the null-date signature. + if (tsMicros <= 0) { + return std::nullopt; + } + + // Symbol: NUL-padded ASCII in a fixed 16-byte field. Copy out the bytes and + // trim at the first NUL. + std::array symBuf{}; + std::ranges::transform(bytes.subspan(kSymbolOffset, kSymbolSize), symBuf.begin(), + [](std::byte b) { return static_cast(b); }); + const auto nul = std::ranges::find(symBuf, '\0'); + const std::string symbol(symBuf.begin(), nul); + + const int multiplier = symbol_scale::getPriceScale(symbol); + if (multiplier == symbol_scale::kUnknown) { + return std::nullopt; + } + + // decimal -> double -> scaled INT32. llround recovers the exact scaled + // integer despite double's inability to represent e.g. 1.10001 exactly + // (1.10001 * 100000 = 110000.999... -> 110001). Returns nullopt if the + // result won't fit in INT32 — we never quietly truncate into PriceData's + // storage. (llround returns long long, so the comparison is done in 64-bit + // before the narrowing cast.) + const auto scale = [multiplier](double price) -> std::optional { + const long long scaled = std::llround(price * multiplier); + if (scaled < std::numeric_limits::min() || + scaled > std::numeric_limits::max()) { + return std::nullopt; + } + return static_cast(scaled); + }; + + const std::optional bidScaled = scale(bid); + const std::optional askScaled = scale(ask); + if (!bidScaled || !askScaled) { + return std::nullopt; + } + + const auto timestamp = std::chrono::system_clock::time_point( + std::chrono::duration_cast( + std::chrono::microseconds(tsMicros))); + + return PriceData(*askScaled, *bidScaled, timestamp, symbol); +} + +} // namespace ingest diff --git a/source/ingest/udpPorts.hpp b/source/ingest/udpPorts.hpp new file mode 100644 index 0000000..931753d --- /dev/null +++ b/source/ingest/udpPorts.hpp @@ -0,0 +1,20 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#pragma once + +#include + +// The C# streamer (vortex/shared/UDPPorts.cs) fans each serialized tick out to +// several UDP sinks. This engine only consumes the persistence stream, so the +// ingest cares about exactly one port: PortSave. Keep kSave in lockstep with +// UDPPorts.PortSave. The bind port is also overridable at runtime +// (CLI arg / $INGEST_UDP_PORT). +namespace udp_ports { + +inline constexpr std::uint16_t kSave = 11111; // == UDPPorts.PortSave + +} // namespace udp_ports diff --git a/source/ingest/udpReceiver.cpp b/source/ingest/udpReceiver.cpp new file mode 100644 index 0000000..f3587a8 --- /dev/null +++ b/source/ingest/udpReceiver.cpp @@ -0,0 +1,117 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- +// +// Boost.Asio UDP receiver implementation. All Asio includes are confined to this +// translation unit (classic includes only, no `import std`) so their templates +// never reach the module boundary — mirroring how boostRedisImpl.cpp / +// redisConnection.cpp isolate Boost.Asio from the rest of the build. + +#include "ingest/udpReceiver.hpp" + +#include +#include +#include +#include + +#include + +#include "shared/utilities/backtestLog.hpp" + +namespace ingest { + +namespace asio = boost::asio; +using asio::ip::udp; + +struct UdpReceiver::Impl { + asio::io_context ioc; + udp::socket socket; // opened/bound in open(), not the ctor + asio::signal_set signals; + udp::endpoint sender; // filled in per datagram (source address) + std::array buffer{}; // one max-size datagram, reused + Handler handler; + std::string bindAddr; + std::uint16_t port; + + Impl(std::string addr, std::uint16_t p, Handler h) + : socket(ioc), + signals(ioc, SIGINT, SIGTERM), + handler(std::move(h)), + bindAddr(std::move(addr)), + port(p) {} + + // Open and bind the socket. Returns false (with a logged reason) instead of + // throwing, so a bad address or an already-bound port is a clean error + // rather than an uncaught exception / std::terminate. + bool open() { + boost::system::error_code ec; + const auto address = asio::ip::make_address(bindAddr, ec); + if (ec) { + backtest_log::error("UdpReceiver: invalid bind address '" + bindAddr + + "': " + ec.message()); + return false; + } + const udp::endpoint endpoint(address, port); + if (const auto openEc = socket.open(endpoint.protocol(), ec); openEc) { + backtest_log::error("UdpReceiver: socket open failed: " + openEc.message()); + return false; + } + if (const auto bindEc = socket.bind(endpoint, ec); bindEc) { + backtest_log::error("UdpReceiver: cannot bind " + bindAddr + ":" + + std::to_string(port) + ": " + bindEc.message()); + return false; + } + return true; + } + + void receive() { + socket.async_receive_from( + asio::buffer(buffer), sender, + [this](const boost::system::error_code& ec, std::size_t n) { + if (!ec) { + handler(std::span(buffer.data(), n)); + } else if (ec == asio::error::operation_aborted) { + return; // socket closed during shutdown — stop re-arming + } else { + backtest_log::error("UdpReceiver: " + ec.message()); + } + if (socket.is_open()) { + receive(); // re-arm for the next datagram + } + }); + } + + void shutdown() { + boost::system::error_code ec; + if (const auto closeEc = socket.close(ec); closeEc) { + backtest_log::error("UdpReceiver: socket close failed: " + closeEc.message()); + } + ioc.stop(); + } +}; + +UdpReceiver::UdpReceiver(std::string bindAddr, std::uint16_t port, Handler onDatagram) + : impl_(std::make_unique(std::move(bindAddr), port, std::move(onDatagram))) {} + +UdpReceiver::~UdpReceiver() = default; + +bool UdpReceiver::run() { + if (!impl_->open()) { + return false; // bind failed; reason already logged + } + impl_->signals.async_wait( + [this](const boost::system::error_code&, int) { impl_->shutdown(); }); + impl_->receive(); + impl_->ioc.run(); + return true; +} + +void UdpReceiver::stop() { + // Hand the teardown to the io_context thread so the socket is only touched + // from there (Asio objects are not thread-safe for concurrent use). + asio::post(impl_->ioc, [this] { impl_->shutdown(); }); +} + +} // namespace ingest diff --git a/source/ingest/udpReceiver.hpp b/source/ingest/udpReceiver.hpp new file mode 100644 index 0000000..41eaeda --- /dev/null +++ b/source/ingest/udpReceiver.hpp @@ -0,0 +1,51 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#pragma once + +#include +#include +#include +#include +#include +#include + +// A minimal UDP datagram receiver. The Boost.Asio implementation is hidden +// behind a pimpl so this header stays free of Asio's heavy templates — that +// keeps it safe to #include from a C++23 module's global module fragment +// (ingestCommand.cppm), the same isolation the repo uses for Boost.Redis +// (boostRedisImpl.cpp) and the Redis connection (redisConnection.hpp). +namespace ingest { + +class UdpReceiver { +public: + // Invoked once per datagram with a view over the received bytes. The span is + // valid only for the duration of the call (it aliases an internal buffer). + // Called on the run() thread. + using Handler = std::function)>; + + UdpReceiver(std::string bindAddr, std::uint16_t port, Handler onDatagram); + ~UdpReceiver(); + + UdpReceiver(const UdpReceiver&) = delete; + UdpReceiver& operator=(const UdpReceiver&) = delete; + + // Bind the socket and run the receive loop. Blocks until stop() is called or + // SIGINT/SIGTERM is received (the receiver installs its own signal handler + // for clean shutdown). Returns false without blocking if the socket cannot be + // bound (bad address / port already in use) — the cause is logged. Returns + // true after a clean shutdown. + [[nodiscard]] bool run(); + + // Unblock run() from another thread (or a signal). Idempotent. + void stop(); + +private: + struct Impl; + std::unique_ptr impl_; +}; + +} // namespace ingest diff --git a/source/main.cpp b/source/main.cpp index f7300cc..5e17d3c 100644 --- a/source/main.cpp +++ b/source/main.cpp @@ -9,6 +9,7 @@ import std; // backtesting engine headers import loadCommand; import runCommand; +import ingestCommand; int main(const int argc, const char* argv[]) { @@ -21,6 +22,7 @@ int main(const int argc, const char* argv[]) { if (subcommand == "load") return LoadCommand::run(argc, argv); if (subcommand == "run") return RunCommand::run(argc, argv); + if (subcommand == "ingest") return IngestCommand::run(argc, argv); std::println(std::cerr, "Error: unknown subcommand '{}'.", subcommand); return 1; diff --git a/source/run/reporting/tradingResults.cpp b/source/run/reporting/tradingResults.cpp index 4900473..ebe32b3 100644 --- a/source/run/reporting/tradingResults.cpp +++ b/source/run/reporting/tradingResults.cpp @@ -12,9 +12,13 @@ std::string TradingResults::nowIsoUtc() { return elastic::nowIsoUtc(); } +// decimalToJsonNumber emits decimal64_t fields as JSON numbers (not the +// string-encoded form the shared adl_serializer uses) so Elasticsearch/Kibana +// types them numerically. These reporting structs are output-only — never parsed +// back — so the numeric form is safe here. See shared/utilities/decimalJson.hpp. void to_json(nlohmann::json& j, const TradingResultsStats& s) { j = nlohmann::json{ - {"finalPnl", s.finalPnl}, + {"finalPnl", decimalToJsonNumber(s.finalPnl)}, {"tradesOpened", s.tradesOpened}, {"tradesClosed", s.tradesClosed}, {"openedLong", s.openedLong}, @@ -25,27 +29,46 @@ void to_json(nlohmann::json& j, const TradingResultsStats& s) { {"losers", s.losers}, {"breakeven", s.breakeven}, {"liquidated", s.liquidated}, - {"performanceScore", s.performanceScore}, - {"winRate", s.winRate}, - {"tradeRatio", s.tradeRatio}, - {"expectancyScore", s.expectancyScore}, - {"calmarScore", s.calmarScore}, - {"confidenceMultiplier", s.confidenceMultiplier}, - {"maxDrawdownPercent", s.maxDrawdownPercent}, + {"performanceScore", decimalToJsonNumber(s.performanceScore)}, + {"winRate", decimalToJsonNumber(s.winRate)}, + {"tradeRatio", decimalToJsonNumber(s.tradeRatio)}, + {"expectancyScore", decimalToJsonNumber(s.expectancyScore)}, + {"calmarScore", decimalToJsonNumber(s.calmarScore)}, + {"confidenceMultiplier", decimalToJsonNumber(s.confidenceMultiplier)}, + {"maxDrawdownPercent", decimalToJsonNumber(s.maxDrawdownPercent)}, }; if (s.avgPnl) { - j["avgPnl"] = *s.avgPnl; + j["avgPnl"] = decimalToJsonNumber(*s.avgPnl); } else { j["avgPnl"] = nullptr; } } +namespace { +// Reporting-only view of the run Configuration: starts from the shared serializer +// (which string-encodes decimals and the pip/size ints for the Redis sweep +// round-trip) and overwrites only those numeric fields with real JSON numbers, so +// Elasticsearch types them numerically. Reaching in by key keeps this robust if +// Configuration gains fields, and leaves the shared to_json untouched for Redis. +nlohmann::json reportConfigJson(const tradingDefinitions::Configuration& c) { + nlohmann::json j = c; + j["STARTING_BALANCE"] = decimalToJsonNumber(c.STARTING_BALANCE); + j["MAX_LOSS_PERCENT"] = decimalToJsonNumber(c.MAX_LOSS_PERCENT); + const auto& v = c.STRATEGY.TRADING_VARIABLES; + auto& tv = j["STRATEGY"]["TRADING_VARIABLES"]; + tv["STOP_DISTANCE_IN_PIPS"] = v.STOP_DISTANCE_IN_PIPS; + tv["LIMIT_DISTANCE_IN_PIPS"] = v.LIMIT_DISTANCE_IN_PIPS; + tv["TRADING_SIZE"] = v.TRADING_SIZE; + return j; +} +} // namespace + void to_json(nlohmann::json& j, const TradingResults& r) { j = nlohmann::json{ {"RUN_ID", r.RUN_ID}, {"@timestamp", r.timestamp}, {"durationSeconds", r.durationSeconds}, - {"config", r.config}, + {"config", reportConfigJson(r.config)}, {"results", r.results}, }; } @@ -56,7 +79,7 @@ void to_json(nlohmann::json& j, const TradingFailure& f) { {"@timestamp", f.timestamp}, {"durationSeconds", f.durationSeconds}, {"reason", f.reason}, - {"config", f.config}, + {"config", reportConfigJson(f.config)}, {"results", f.results}, }; } diff --git a/source/shared/utilities/decimalJson.hpp b/source/shared/utilities/decimalJson.hpp index 2c56de3..319a43f 100644 --- a/source/shared/utilities/decimalJson.hpp +++ b/source/shared/utilities/decimalJson.hpp @@ -49,3 +49,12 @@ struct adl_serializer { } }; } + +// Reporting/visualisation helper: emit a decimal as a JSON *number* (double) so +// downstream tools (Kibana) can aggregate and do maths on it. The deliberate +// opposite of the adl_serializer above, which string-encodes decimals to preserve +// exact literals on the config round-trip (the Redis sweep queue). Use ONLY on +// output-only paths that are never parsed back into a decimal. +inline nlohmann::json decimalToJsonNumber(const boost::decimal::decimal64_t& value) { + return static_cast(value); +} diff --git a/source/shared/utilities/symbolScale.cppm b/source/shared/utilities/symbolScale.cppm index a3df2e9..1230a1a 100644 --- a/source/shared/utilities/symbolScale.cppm +++ b/source/shared/utilities/symbolScale.cppm @@ -42,9 +42,20 @@ export namespace symbol_scale { // brace-initialization below. `string_view` is safe to store here because // the strings it points to are static string literals with program-long // lifetime. +// +// Two independent scaling factors live on each entry: +// - `scale` : points-per-pip (10 / 100 / 1000), used by the backtester to +// convert between integer points and pips (see get()). +// - `priceScale` : the multiplier that turns a real decimal price into the +// scaled INT32 stored in QuestDB (FX majors x100000, JPY pairs +// & metals x1000, indices/commodities x100 — see priceData). +// Used by the UDP ingest path to scale incoming prices (see +// getPriceScale()). It is NOT derivable from `scale` alone: FX +// majors and JPY pairs share scale 10 but differ here. struct Entry { std::string_view symbol; int scale; + int priceScale; }; // Three keywords doing three different jobs on this one declaration: @@ -59,38 +70,40 @@ struct Entry { // // IMPORTANT: this table MUST stay sorted ascending by symbol. The // `static_assert` below enforces that at compile time. -// Values are integer price points per pip (see header comment). FX majors and -// JPY pairs are 10, indices/commodities 100, metals 1000. +// Columns are {symbol, scale (points-per-pip), priceScale (decimal->INT32 +// multiplier)}. scale: FX majors & JPY pairs 10, indices/commodities 100, +// metals 1000. priceScale: FX majors 100000, JPY pairs & metals 1000, +// indices/commodities 100. inline constexpr std::array kTable{{ - {"AUDNZD", 10}, - {"AUDUSD", 10}, - {"AUSIDXAUD", 100}, - {"BRENTCMDUSD", 100}, - {"COPPERCMDUSD", 100}, - {"DEUIDXEUR", 100}, - {"EURAUD", 10}, - {"EURCHF", 10}, - {"EURGBP", 10}, - {"EURJPY", 10}, - {"EURNOK", 10}, - {"EURUSD", 10}, - {"FRAIDXEUR", 100}, - {"GBPJPY", 10}, - {"GBPUSD", 10}, - {"GBRIDXGBP", 100}, - {"HKGIDXHKD", 100}, - {"JPNIDXJPY", 100}, - {"LIGHTCMDUSD", 100}, - {"NZDUSD", 10}, - {"USA30IDXUSD", 100}, - {"USA500IDXUSD", 100}, - {"USATECHIDXUSD", 100}, - {"USDCAD", 10}, - {"USDCHF", 10}, - {"USDJPY", 10}, - {"USDSEK", 10}, - {"XAGUSD", 1000}, - {"XAUUSD", 1000}, + {"AUDNZD", 10, 100000}, + {"AUDUSD", 10, 100000}, + {"AUSIDXAUD", 100, 100}, + {"BRENTCMDUSD", 100, 100}, + {"COPPERCMDUSD", 100, 100}, + {"DEUIDXEUR", 100, 100}, + {"EURAUD", 10, 100000}, + {"EURCHF", 10, 100000}, + {"EURGBP", 10, 100000}, + {"EURJPY", 10, 1000}, + {"EURNOK", 10, 100000}, + {"EURUSD", 10, 100000}, + {"FRAIDXEUR", 100, 100}, + {"GBPJPY", 10, 1000}, + {"GBPUSD", 10, 100000}, + {"GBRIDXGBP", 100, 100}, + {"HKGIDXHKD", 100, 100}, + {"JPNIDXJPY", 100, 100}, + {"LIGHTCMDUSD", 100, 100}, + {"NZDUSD", 10, 100000}, + {"USA30IDXUSD", 100, 100}, + {"USA500IDXUSD", 100, 100}, + {"USATECHIDXUSD", 100, 100}, + {"USDCAD", 10, 100000}, + {"USDCHF", 10, 100000}, + {"USDJPY", 10, 1000}, + {"USDSEK", 10, 100000}, + {"XAGUSD", 1000, 1000}, + {"XAUUSD", 1000, 1000}, }}; // Compile-time invariant check. The pattern is an IIFE — Immediately @@ -114,45 +127,44 @@ static_assert([] { // than corrupting silently. inline constexpr int kUnknown = 0; -// `[[nodiscard]]` : compiler warning if the caller ignores the returned -// value (this function has no other purpose, so -// ignoring the result is almost always a bug). -// `constexpr` : callable at compile time. When the symbol is a -// literal known to the compiler, the entire binary -// search is folded away and the result becomes a -// constant in the generated assembly. -// `noexcept` : promises this function will not throw. Lets the -// compiler skip exception-handling bookkeeping at -// call sites. -// Parameter is `std::string_view` (by value — it's just a pointer + -// length, cheap to copy) so callers can pass `std::string`, string -// literals, or `const char*` without converting or allocating. -[[nodiscard]] constexpr int get(std::string_view symbol) noexcept { - // Standard binary search over the sorted table. - // `lo` and `hi` are the half-open range [lo, hi) of indices still - // in play. Each iteration halves the range, so for 29 entries we - // do at most 5 iterations. +// Shared lookup behind get()/getPriceScale(): returns the matching entry or +// nullptr. `constexpr` so the whole search folds to a constant at compile time +// when the symbol is a literal; `noexcept` skips exception bookkeeping at call +// sites. The `std::string_view` parameter (pointer + length, cheap to copy) lets +// callers pass std::string, string literals, or const char* without allocating. +constexpr const Entry* findEntry(std::string_view symbol) noexcept { + // Standard binary search over the sorted table. [lo, hi) is the half-open + // range still in play; each iteration halves it (<=5 steps for 29 entries). + // `lo + ((hi - lo) >> 1)` is the overflow-safe midpoint idiom (`>> 1` is /2). std::size_t lo = 0; std::size_t hi = kTable.size(); while (lo < hi) { - // `lo + ((hi - lo) >> 1)` is the overflow-safe way to compute - // the midpoint. `(lo + hi) / 2` would be wrong if the indices - // were near std::size_t's max; not a real risk here, but it's - // the canonical idiom worth learning. `>> 1` is just `/ 2`. const std::size_t mid = lo + ((hi - lo) >> 1); const auto& entry = kTable[mid]; - - // Three-way compare on the symbol. `string_view::operator<` - // does a lexicographic comparison (essentially memcmp). + // `string_view::operator<` is a lexicographic (memcmp-like) compare. if (entry.symbol < symbol) { lo = mid + 1; // target is in the upper half } else if (symbol < entry.symbol) { hi = mid; // target is in the lower half } else { - return entry.scale; // exact match + return &entry; // exact match } } - return kUnknown; + return nullptr; +} + +// Points-per-pip for the symbol (10 / 100 / 1000), or kUnknown if not found. +[[nodiscard]] constexpr int get(std::string_view symbol) noexcept { + const Entry* entry = findEntry(symbol); + return entry ? entry->scale : kUnknown; +} + +// decimal->INT32 price multiplier (e.g. EURUSD 1.10001 -> 110001), or kUnknown. +// Used by the UDP ingest to scale incoming prices; kUnknown means "drop the +// tick" rather than scale by zero. +[[nodiscard]] constexpr int getPriceScale(std::string_view symbol) noexcept { + const Entry* entry = findEntry(symbol); + return entry ? entry->priceScale : kUnknown; } } // namespace symbol_scale diff --git a/tests/CMakeLists.txt b/tests/CMakeLists.txt index 82f6ba8..e5181a4 100644 --- a/tests/CMakeLists.txt +++ b/tests/CMakeLists.txt @@ -8,6 +8,7 @@ add_executable(unit_tests db.cpp sweep.cpp tradeManager.cpp + tickPacket.cpp ) target_link_libraries(unit_tests PRIVATE diff --git a/tests/tickPacket.cpp b/tests/tickPacket.cpp new file mode 100644 index 0000000..8273ddf --- /dev/null +++ b/tests/tickPacket.cpp @@ -0,0 +1,151 @@ +// Backtesting Engine in C++ +// +// (c) 2026 Ryan McCaffery | https://mccaffers.com +// This code is licensed under MIT license (see LICENSE.txt for details) +// --------------------------------------- + +#include + +#include +#include +#include +#include +#include +#include +#include + +import tickPacket; +import priceData; + +namespace { + +// Build the 40-byte little-endian packet exactly as the C# sender would +// (see the layout in tickPacket.cppm), so the test exercises the real contract. +std::array makePacket(double bid, double ask, + std::int64_t tsMicros, + const std::string& symbol) { + std::array packet{}; + + const auto put = [&](std::size_t offset, auto value) { + const auto raw = std::bit_cast>(value); + std::ranges::copy(raw, packet.begin() + offset); + }; + put(ingest::kBidOffset, bid); + put(ingest::kAskOffset, ask); + put(ingest::kTsOffset, tsMicros); + + for (std::size_t i = 0; i < symbol.size() && i < ingest::kSymbolSize; ++i) { + packet[ingest::kSymbolOffset + i] = static_cast(symbol[i]); + } + return packet; +} + +// A valid (strictly post-epoch) timestamp for the tests that aren't exercising +// the timestamp guard: 2024-06-26T00:00:00Z = 1719360000 s. decodeTick now drops +// tsMicros <= 0, so these must pass a real date rather than 0. +constexpr std::int64_t kValidTs = 1'719'360'000'000'000LL; + +} // namespace + +TEST_CASE("decodeTick parses a packet into scaled PriceData", "[tickPacket]") { + // 2024-06-26T00:00:00Z = 1719360000 s = 1719360000000000 us. + const std::int64_t tsMicros = 1'719'360'000'000'000LL; + const auto packet = makePacket(/*bid=*/1.10000, /*ask=*/1.10001, tsMicros, "EURUSD"); + + const auto tick = ingest::decodeTick(packet); + REQUIRE(tick.has_value()); + CHECK(tick->symbol == "EURUSD"); + // EURUSD price multiplier is 100000: 1.10000 -> 110000, 1.10001 -> 110001. + CHECK(tick->bid == 110000); + CHECK(tick->ask == 110001); + CHECK(std::chrono::duration_cast( + tick->timestamp.time_since_epoch()) + .count() == tsMicros); +} + +TEST_CASE("decodeTick scales by the per-symbol multiplier", "[tickPacket]") { + SECTION("JPY pair uses x1000") { + const auto packet = makePacket(156.123, 156.125, kValidTs, "USDJPY"); + const auto tick = ingest::decodeTick(packet); + REQUIRE(tick.has_value()); + CHECK(tick->bid == 156123); + CHECK(tick->ask == 156125); + } + SECTION("index uses x100") { + const auto packet = makePacket(5432.10, 5432.20, kValidTs, "USA500IDXUSD"); + const auto tick = ingest::decodeTick(packet); + REQUIRE(tick.has_value()); + CHECK(tick->bid == 543210); + CHECK(tick->ask == 543220); + } +} + +TEST_CASE("decodeTick rejects malformed input", "[tickPacket]") { + SECTION("wrong packet size") { + std::array tooSmall{}; + CHECK_FALSE(ingest::decodeTick(tooSmall).has_value()); + } + SECTION("unknown symbol is dropped") { + const auto packet = makePacket(1.0, 1.0, kValidTs, "NOPE"); + CHECK_FALSE(ingest::decodeTick(packet).has_value()); + } +} + +TEST_CASE("decodeTick golden vector pins the on-the-wire byte layout", "[tickPacket]") { + // The exact 40 bytes of the documented little-endian contract (the byte map + // in tickPacket.cppm) for bid 1.10000 / ask 1.10001 / + // ts 2024-06-26T00:00:00Z (1719360000000000 us) / "EURUSD". Unlike the + // makePacket-based tests, these literals are positional — they do NOT route + // through kBidOffset/kAskOffset — so a future swap or shift of the decoder's + // offset constants breaks this test even though the self-consistent tests + // stay green. (To turn this into a true cross-language check, regenerate + // these bytes from the real C# Serialize output for the same tick.) + constexpr std::array golden{ + std::byte{0x9A}, std::byte{0x99}, std::byte{0x99}, std::byte{0x99}, + std::byte{0x99}, std::byte{0x99}, std::byte{0xF1}, std::byte{0x3F}, // bid 1.10000 + std::byte{0x0B}, std::byte{0x5E}, std::byte{0xF4}, std::byte{0x15}, + std::byte{0xA4}, std::byte{0x99}, std::byte{0xF1}, std::byte{0x3F}, // ask 1.10001 + std::byte{0x00}, std::byte{0x80}, std::byte{0x0A}, std::byte{0xB2}, + std::byte{0xBF}, std::byte{0x1B}, std::byte{0x06}, std::byte{0x00}, // ts 1719360000000000 + std::byte{0x45}, std::byte{0x55}, std::byte{0x52}, std::byte{0x55}, + std::byte{0x53}, std::byte{0x44}, std::byte{0x00}, std::byte{0x00}, // "EURUSD" + std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, + std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, std::byte{0x00}, + }; + + const auto tick = ingest::decodeTick(golden); + REQUIRE(tick.has_value()); + CHECK(tick->symbol == "EURUSD"); + CHECK(tick->bid == 110000); + CHECK(tick->ask == 110001); + CHECK(std::chrono::duration_cast( + tick->timestamp.time_since_epoch()) + .count() == 1'719'360'000'000'000LL); +} + +TEST_CASE("decodeTick drops corrupt ticks (hardening guards)", "[tickPacket]") { + SECTION("zero bid is dropped") { + const auto packet = makePacket(/*bid=*/0.0, /*ask=*/1.10001, kValidTs, "EURUSD"); + CHECK_FALSE(ingest::decodeTick(packet).has_value()); + } + SECTION("zero ask is dropped") { + const auto packet = makePacket(/*bid=*/1.10000, /*ask=*/0.0, kValidTs, "EURUSD"); + CHECK_FALSE(ingest::decodeTick(packet).has_value()); + } + SECTION("zero (epoch) timestamp is dropped") { + const auto packet = makePacket(1.10000, 1.10001, /*tsMicros=*/0, "EURUSD"); + CHECK_FALSE(ingest::decodeTick(packet).has_value()); + } + SECTION("negative timestamp is dropped") { + const auto packet = makePacket(1.10000, 1.10001, /*tsMicros=*/-1, "EURUSD"); + CHECK_FALSE(ingest::decodeTick(packet).has_value()); + } + SECTION("price that overflows INT32 when scaled is dropped") { + // EURUSD scales x100000; INT32 max is 2,147,483,647, so any price above + // ~21474.83 overflows the scaled integer. A real EURUSD quote never gets + // near this (hence "latent"), but the guard must still reject it rather + // than store a truncated value. 30000 * 100000 = 3,000,000,000 > INT32. + const auto packet = makePacket(/*bid=*/1.10000, /*ask=*/30000.0, kValidTs, "EURUSD"); + CHECK_FALSE(ingest::decodeTick(packet).has_value()); + } +}