Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions .github/workflows/build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,11 @@ jobs:

# Performs the SonarQube scan using the captured build commands and the
# downloaded coverage report. Requires GitHub and SonarQube tokens.
# sonar.cfamily.enableModules turns on the analyzer's (experimental) C++20
# named-modules support; this project is module-heavy (`import std;` plus
# .cppm interface units), so without it the analyzer can't parse those TUs.
# NB: keep these as --define args, not a `#` comment inside the folded
# `args: >` block — a `#` there would be passed to the scanner literally.
- name: SonarQube Scan
uses: SonarSource/sonarqube-scan-action@v4.2.1
env:
Expand All @@ -145,4 +150,5 @@ jobs:
with:
args: >
--define sonar.cfamily.compile-commands="${{ env.BUILD_WRAPPER_OUT_DIR }}/compile_commands.json"
--define sonar.cfamily.enableModules=true
--define sonar.coverageReportPaths=artifact/sonarqube-generic-coverage.xml
42 changes: 42 additions & 0 deletions scripts/ingest.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
#!/bin/bash
# Builds the engine and runs the `ingest` subcommand: it binds a UDP socket and
# streams decoded ticks (tickPacket) into the converted QuestDB via ILP-over-HTTP
# (source/ingest/ingestCommand.cppm). Blocks until SIGINT/SIGTERM.
#
# Config is read from the environment by IngestCommand (defaults in parens):
# QUESTDB_HOST QuestDB host (127.0.0.1)
# QUESTDB_ILP_PORT converted-DB ILP/HTTP port (9001)
# INGEST_BIND_ADDR UDP bind address (127.0.0.1)
# INGEST_UDP_PORT UDP bind port (11111, == UDPPorts.PortSave)
#
# An optional first argument overrides the bind port (argv[2] to the engine):
# sh scripts/ingest.sh # bind 11111 (or $INGEST_UDP_PORT)
# sh scripts/ingest.sh 22222 # bind 22222

current_dir="$( cd "$( dirname "${BASH_SOURCE[0]}" )" && pwd )"

if ! source "$current_dir/build.sh"; then
echo "Error: Build failed. Aborting."
exit 1
fi

if [ ! -f "$BUILD_DIR/$EXECUTABLE_NAME" ]; then
echo "Error: Executable $EXECUTABLE_NAME not found in $BUILD_DIR."
ls -la "$BUILD_DIR"
exit 1
fi

# Non-fatal QuestDB reachability check: the receiver still binds and buffers if
# QuestDB is down (the writer sheds once its buffer fills), so warn rather than
# abort.
quest_host="${QUESTDB_HOST:-127.0.0.1}"
quest_port="${QUESTDB_ILP_PORT:-9001}"
# Probe a real endpoint (/exec), not "/": QuestDB's web console root can stall
# indefinitely, which made a 1s timeout report a healthy DB as unreachable.
if ! curl --fail --silent --max-time 2 "http://$quest_host:$quest_port/exec?query=SELECT%201" >/dev/null 2>&1; then
echo "Warning: QuestDB not reachable at http://$quest_host:$quest_port — writes will be dropped until it comes up."
fi

# exec so signals reach the engine directly for clean shutdown. "$@" passes an
# optional bind-port override straight through to the subcommand.
exec ./"$BUILD_DIR/$EXECUTABLE_NAME" ingest "$@"
97 changes: 97 additions & 0 deletions source/ingest/ingestCommand.cppm
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
// Backtesting Engine in C++
//
// (c) 2026 Ryan McCaffery | https://mccaffers.com
// This code is licensed under MIT license (see LICENSE.txt for details)
// ---------------------------------------
//
// ingestCommand — the `ingest` subcommand. Receives tick datagrams over UDP,
// decodes them (tickPacket) and persists each into the local QuestDB via ILP
// over HTTP (questdbIngestClient), so the backtester's `run` path can read them
// back through the unchanged DatabaseConnection.
//
// The GMF only #includes Asio-/curl-free headers (the receiver and writer hide
// those behind pimpls), so it is safe to `import std` here.

module;

#include "ingest/questdbIngestClient.hpp"
#include "ingest/udpPorts.hpp"
#include "ingest/udpReceiver.hpp"
#include "shared/utilities/env.hpp"

export module ingestCommand;

import std;
import priceData; // PriceData
import tickPacket; // ingest::decodeTick

export class IngestCommand {
public:
static int run(int argc, const char* argv[]);
};

namespace {

// Parse a port from a string, returning `fallback` on empty/garbage input so a
// stray env var can't crash startup.
std::uint16_t parsePort(std::string_view text, std::uint16_t fallback) {
unsigned value = 0;
const auto [ptr, ec] = std::from_chars(text.data(), text.data() + text.size(), value);
if (ec != std::errc{} || value == 0 || value > 65535) {
return fallback;
}
return static_cast<std::uint16_t>(value);
}

} // namespace

int IngestCommand::run(const int argc, const char* argv[]) {
const std::string questHost = env::getOr("QUESTDB_HOST", "127.0.0.1");
// 9001 is the converted (INT32) QuestDB's HTTP/ILP port — the instance the
// backtester reads and that convert.py writes to. (9000 is the legacy DOUBLE
// source DB.) Override with $QUESTDB_ILP_PORT.
const std::uint16_t ilpPort = parsePort(env::getOr("QUESTDB_ILP_PORT", ""), 9001);
const std::string bindAddr = env::getOr("INGEST_BIND_ADDR", "127.0.0.1");

// Bind port: CLI arg (argv[2]) overrides $INGEST_UDP_PORT overrides kSave.
std::uint16_t bindPort = parsePort(env::getOr("INGEST_UDP_PORT", ""), udp_ports::kSave);
if (argc >= 3) {
bindPort = parsePort(argv[2], bindPort);
}

ingest::QuestdbIngestClient writer(questHost, ilpPort);

std::atomic<std::uint64_t> received{0};
std::atomic<std::uint64_t> dropped{0};

ingest::UdpReceiver receiver(
bindAddr, bindPort, [&](std::span<const std::byte> bytes) {
const auto tick = ingest::decodeTick(bytes);
if (!tick) {
dropped.fetch_add(1, std::memory_order_relaxed);
return;
}
// ILP line: table = symbol, integer fields ask/bid, designated
// timestamp in nanoseconds (QuestDB HTTP default precision). This
// yields exactly the ask/bid/timestamp columns the read path expects.
const auto tsNanos = std::chrono::duration_cast<std::chrono::nanoseconds>(
tick->timestamp.time_since_epoch())
.count();
writer.enqueueLine(std::format("{} ask={}i,bid={}i {}\n",
tick->symbol, tick->ask, tick->bid, tsNanos));
received.fetch_add(1, std::memory_order_relaxed);
});

std::println("IngestCommand: starting; binding udp://{}:{} -> QuestDB ILP http://{}:{}/write",
bindAddr, bindPort, questHost, ilpPort);

if (!receiver.run()) { // binds the socket, then blocks until SIGINT/SIGTERM
return 1; // bind failed (bad address / port in use) — logged
}

// decode-dropped: bad size / unknown symbol. queue-dropped: shed by the
// writer when QuestDB couldn't keep up and the buffer hit its cap.
std::println("IngestCommand: shutting down (received={}, decode-dropped={}, queue-dropped={})",
received.load(), dropped.load(), writer.droppedLines());
return 0;
}
197 changes: 197 additions & 0 deletions source/ingest/questdbIngestClient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,197 @@
// Backtesting Engine in C++
//
// (c) 2026 Ryan McCaffery | https://mccaffers.com
// This code is licensed under MIT license (see LICENSE.txt for details)
// ---------------------------------------
//
// QuestDB ILP-over-HTTP writer. libcurl and the batching worker thread are
// confined to this classic translation unit (no `import std`), so curl's
// C-macro pollution never reaches the module boundary — the same isolation
// elasticPublisher.cpp uses for the Elasticsearch client.

#include "ingest/questdbIngestClient.hpp"

#include <algorithm>
#include <condition_variable>
#include <deque>
#include <mutex>
#include <stop_token>
#include <string>
#include <thread>
#include <utility>

#include <curl/curl.h>

#include "shared/utilities/backtestLog.hpp"

namespace {

// Bound how long a single POST can block, so a slow/black-holed QuestDB can't
// wedge the worker thread (and therefore shutdown, which joins it) indefinitely.
constexpr long kConnectTimeoutSeconds = 5;
constexpr long kTransferTimeoutSeconds = 10;

void ensureCurlInit() {
static const struct CurlGlobal {

Check failure on line 35 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Customize this struct's copy constructor to participate in resource management. Customize or delete its copy assignment operator. Also consider whether move operations should be customized.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHC&open=AZ8I6taMHYiRC2s7ZYHC&pullRequest=54
CurlGlobal() { curl_global_init(CURL_GLOBAL_ALL); }
~CurlGlobal() { curl_global_cleanup(); }
} guard;

Check warning on line 38 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Declare this variable in a separate statement.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHB&open=AZ8I6taMHYiRC2s7ZYHB&pullRequest=54
(void)guard;
}

// Swallow the response body so curl does not dump it to stdout (its default when
// no write callback is set). QuestDB replies 204 on success, 400 + a JSON body
// describing the offending line on a parse/type error.
std::size_t discardResponse(char* /*ptr*/, std::size_t size, std::size_t nmemb,
void* /*userdata*/) {

Check failure on line 46 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this use of "void *" with a more meaningful type.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHN&open=AZ8I6taMHYiRC2s7ZYHN&pullRequest=54
return size * nmemb;
}

} // namespace

namespace ingest {

struct QuestdbIngestClient::Impl {
std::string url; // http://host:port/write
std::size_t batchSize;
std::chrono::milliseconds flushInterval;
std::size_t maxQueue;

std::mutex mtx;
std::condition_variable cv; // plain CV (not _any): avoids the
// import-std condition_variable_any
// link bug, and this is a classic TU
std::deque<std::string> queue;
std::size_t droppedTotal = 0; // guarded by mtx

CURL* curl = nullptr; // owned by the worker thread only
std::jthread worker;

Impl(std::string host, std::uint16_t port, std::size_t batch,
std::chrono::milliseconds interval, std::size_t maxQueueLines)
: url("http://" + std::move(host) + ":" + std::to_string(port) + "/write"),

Check warning on line 72 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Using HTTP is insecure. Use HTTPS instead.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHE&open=AZ8I6taMHYiRC2s7ZYHE&pullRequest=54

Check warning on line 72 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use std::format instead of concatenating pieces manually.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHD&open=AZ8I6taMHYiRC2s7ZYHD&pullRequest=54
batchSize(batch),
flushInterval(interval),
maxQueue(maxQueueLines) {
ensureCurlInit();
curl = curl_easy_init();

Check failure on line 77 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use stronger SSL and TLS versions

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHO&open=AZ8I6taMHYiRC2s7ZYHO&pullRequest=54
if (!curl) {
backtest_log::error("QuestdbIngestClient: curl_easy_init failed");
}
worker = std::jthread([this](std::stop_token st) { workerLoop(st); });
}

~Impl() {
worker.request_stop();
cv.notify_all();
worker.join(); // drain the queue before tearing down curl
if (curl) {
curl_easy_cleanup(curl);
}
}

void workerLoop(std::stop_token st) {
while (!st.stop_requested()) {
{
std::unique_lock lock(mtx);
cv.wait_for(lock, flushInterval,
[&] { return !queue.empty() || st.stop_requested(); });

Check failure on line 98 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Explicitly capture all local variables required in this lambda.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHG&open=AZ8I6taMHYiRC2s7ZYHG&pullRequest=54

Check failure on line 98 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Explicitly capture the required scope variables.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHF&open=AZ8I6taMHYiRC2s7ZYHF&pullRequest=54
}
drainAndPost();
}
// Final drain after stop so queued ticks are not lost on shutdown. One
// batch per POST keeps each body bounded by the transfer timeout.
while (drainAndPost()) {
}

Check warning on line 105 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Fill this compound statement, remove it, or add a nested comment explaining why it is empty.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHH&open=AZ8I6taMHYiRC2s7ZYHH&pullRequest=54
}

// Move up to batchSize lines off the queue and POST them. Returns false when
// the queue was empty (nothing sent).
bool drainAndPost() {
std::string batch;
{
std::unique_lock lock(mtx);
const std::size_t n = std::min(queue.size(), batchSize);
for (std::size_t i = 0; i < n; ++i) {
batch += std::move(queue.front());

Check warning on line 116 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

The result of "std::move" should not be passed as a const reference.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHI&open=AZ8I6taMHYiRC2s7ZYHI&pullRequest=54
queue.pop_front();
}
}
if (batch.empty()) {
return false;
}
post(batch);
return true;
}

void post(const std::string& body) {

Check warning on line 127 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Replace this const reference to "std::string" by a "std::string_view".

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHK&open=AZ8I6taMHYiRC2s7ZYHK&pullRequest=54
if (!curl) {
return; // init failed earlier; error already logged
}
curl_easy_reset(curl);
curl_easy_setopt(curl, CURLOPT_URL, url.c_str());
curl_easy_setopt(curl, CURLOPT_POST, 1L);
curl_easy_setopt(curl, CURLOPT_POSTFIELDS, body.data());
curl_easy_setopt(curl, CURLOPT_POSTFIELDSIZE, static_cast<long>(body.size()));
curl_easy_setopt(curl, CURLOPT_WRITEFUNCTION, discardResponse);
// Bound blocking time so a dead/slow QuestDB can't wedge the worker.
curl_easy_setopt(curl, CURLOPT_CONNECTTIMEOUT, kConnectTimeoutSeconds);
curl_easy_setopt(curl, CURLOPT_TIMEOUT, kTransferTimeoutSeconds);

const CURLcode rc = curl_easy_perform(curl);
if (rc != CURLE_OK) {

Check warning on line 142 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use the init-statement to declare "rc" inside the if statement.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHJ&open=AZ8I6taMHYiRC2s7ZYHJ&pullRequest=54
backtest_log::error(std::string("QuestdbIngestClient: POST failed: ")
+ curl_easy_strerror(rc));
return;
}
long status = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &status);
if (status < 200 || status >= 300) {
backtest_log::error("QuestdbIngestClient: HTTP " + std::to_string(status)
+ " from " + url);

Check warning on line 151 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use std::format instead of concatenating pieces manually.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHL&open=AZ8I6taMHYiRC2s7ZYHL&pullRequest=54
}
}
};

QuestdbIngestClient::QuestdbIngestClient(std::string host, std::uint16_t port,
std::size_t batchSize,
std::chrono::milliseconds flushInterval,
std::size_t maxQueue)
: impl_(std::make_unique<Impl>(std::move(host), port, batchSize, flushInterval,
maxQueue)) {}

QuestdbIngestClient::~QuestdbIngestClient() = default;

void QuestdbIngestClient::enqueueLine(std::string ilpLine) {
bool wake = false;
std::size_t dropped = 0;
{
std::scoped_lock lock(impl_->mtx);
if (impl_->queue.size() >= impl_->maxQueue) {
// Queue is full (QuestDB can't keep up). Drop the oldest line to keep
// memory bounded and retain the freshest ticks.
impl_->queue.pop_front();
dropped = ++impl_->droppedTotal;
}
impl_->queue.push_back(std::move(ilpLine));
wake = impl_->queue.size() >= impl_->batchSize;
}
// Warn on the first drop and then sparsely, so a sustained overload doesn't
// flood the log. Done outside the lock.
if (dropped == 1 || (dropped != 0 && dropped % 100000 == 0)) {
backtest_log::error("QuestdbIngestClient: queue full (cap "
+ std::to_string(impl_->maxQueue)
+ "), dropping oldest lines; total dropped="
+ std::to_string(dropped));

Check warning on line 185 in source/ingest/questdbIngestClient.cpp

View check run for this annotation

SonarQubeCloud / SonarCloud Code Analysis

Use std::format instead of concatenating pieces manually.

See more on https://sonarcloud.io/project/issues?id=mccaffers_backtesting-engine-cpp&issues=AZ8I6taMHYiRC2s7ZYHM&open=AZ8I6taMHYiRC2s7ZYHM&pullRequest=54
}
if (wake) {
impl_->cv.notify_one(); // a full batch is ready — flush now
}
}

std::size_t QuestdbIngestClient::droppedLines() const {
std::scoped_lock lock(impl_->mtx);
return impl_->droppedTotal;
}

} // namespace ingest
Loading
Loading