Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 54 additions & 11 deletions cpp/include/raft/core/detail/nvtx_range_stack.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,14 @@

#include <raft/core/detail/macros.hpp>

#include <atomic>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <stack>
#include <string>
#include <utility>
#include <vector>

namespace raft {
namespace common::nvtx {
Expand All @@ -35,6 +37,17 @@ class current_range {
return {value_, depth_};
}

/**
* Read the full root->leaf range path with instance ids, formatted as
* "name#id > name#id > ..." (empty when no range is active).
* This identifies the exact nvtx range stack responsible for an allocation.
*/
auto get_path() const -> std::string
{
std::lock_guard lock(mu_);
return path_;
}

operator std::string() const
{
std::lock_guard lock(mu_);
Expand All @@ -45,38 +58,68 @@ class current_range {
mutable std::mutex mu_;
std::string value_;
std::size_t depth_{0};
std::string path_;

void set(const char* name, std::size_t depth)
void set(const char* name, std::size_t depth, std::string path)
{
std::lock_guard lock(mu_);
value_ = name ? name : "";
depth_ = depth;
path_ = std::move(path);
}
};

namespace detail {

RAFT_EXPORT inline std::atomic<std::uint64_t> range_instance_counter{0};

struct nvtx_range_name_stack {
void push(const char* name)
{
stack_.emplace(name);
current_->set(name, stack_.size());
ensure_current();
auto id = range_instance_counter.fetch_add(1, std::memory_order_relaxed) + 1;
stack_.emplace_back(id, name ? name : "");
current_->set(stack_.back().second.c_str(), stack_.size(), build_path());
}

void pop()
{
if (!stack_.empty()) { stack_.pop(); }
current_->set(stack_.empty() ? nullptr : stack_.top().c_str(), stack_.size());
ensure_current();
if (!stack_.empty()) { stack_.pop_back(); }
current_->set(
stack_.empty() ? nullptr : stack_.back().second.c_str(), stack_.size(), build_path());
}

auto current() const -> std::shared_ptr<const current_range> { return current_; }
[[nodiscard]] auto current() const -> std::shared_ptr<const current_range>
{
ensure_current();
return current_;
}

private:
std::stack<std::string> stack_{};
std::shared_ptr<current_range> current_{std::make_shared<current_range>()};
void ensure_current() const
{
if (!current_) { current_ = std::make_shared<current_range>(); }
}

// Serialize the active stack as "name#id > name#id > ..." (outer -> inner).
[[nodiscard]] auto build_path() const -> std::string
{
std::string path;
for (auto const& [id, name] : stack_) {
if (!path.empty()) { path += " > "; }
path += name;
path += '#';
path += std::to_string(id);
}
return path;
}

std::vector<std::pair<std::uint64_t, std::string>> stack_{};
mutable std::shared_ptr<current_range> current_{std::make_shared<current_range>()};
};

inline thread_local nvtx_range_name_stack range_name_stack_instance{};
RAFT_EXPORT inline thread_local nvtx_range_name_stack range_name_stack_instance{};

} // namespace detail

Expand All @@ -85,7 +128,7 @@ inline thread_local nvtx_range_name_stack range_name_stack_instance{};
* Pass the returned shared_ptr to another thread to read this thread's current NVTX range name at
* any time.
*/
inline auto thread_local_current_range() -> std::shared_ptr<const current_range>
RAFT_EXPORT inline auto thread_local_current_range() -> std::shared_ptr<const current_range>
{
return detail::range_name_stack_instance.current();
}
Expand Down
193 changes: 193 additions & 0 deletions cpp/include/raft/mr/allocation_event_monitor.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,193 @@
/*
* SPDX-FileCopyrightText: Copyright (c) 2026, NVIDIA CORPORATION.
* SPDX-License-Identifier: Apache-2.0
*/
#pragma once

#include <raft/core/detail/macros.hpp>

#include <chrono>
#include <condition_variable>
#include <cstddef>
#include <cstdint>
#include <memory>
#include <mutex>
#include <ostream>
#include <string>
#include <thread>
#include <utility>
#include <vector>

namespace raft {
namespace mr {

/**
* @brief A single allocation or deallocation event, captured on the allocating thread.
*/
struct allocation_event {
int source_id{0}; //< which registered source this belongs to
std::int64_t current{0}; //< source's live bytes after this event
std::int64_t total_alloc{0}; //< cumulative bytes allocated (this source)
std::int64_t total_freed{0}; //< cumulative bytes freed (this source)
std::size_t nvtx_depth{0}; //< NVTX stack depth at event time
std::string nvtx_range; //< NVTX range name active at event time
std::int64_t event_bytes{0}; //< signed bytes for THIS event (+alloc / -free)
std::string alloc_range; //< responsible range path "name#id > ..."
// captured at ALLOCATION time (empty if unknown)
std::chrono::steady_clock::time_point timestamp{}; //< when the event happened
};

/**
* @brief Thread-safe multi-producer / single-consumer queue of allocation_events.
*/
class allocation_event_queue {
public:
/** @brief Append an event (any thread). */
void push(allocation_event event)
{
{
std::lock_guard<std::mutex> lock(mtx_);
events_.push_back(std::move(event));
}
cv_.notify_one();
}

/**
* @brief Block until events are available or the queue is stopped, then move
* all pending events into `out`.
*
* @return false once the queue is stopped AND drained (consumer should exit),
* true otherwise.
*/
bool wait_and_take(std::vector<allocation_event>& out)
{
std::unique_lock<std::mutex> lock(mtx_);
cv_.wait(lock, [this] { return stopped_ || !events_.empty(); });
out.clear();
out.swap(events_);
return !(stopped_ && out.empty());
}

/** @brief Signal the consumer to drain and exit. */
void stop()
{
{
std::lock_guard<std::mutex> lock(mtx_);
stopped_ = true;
}
cv_.notify_all();
}

private:
std::mutex mtx_;
std::condition_variable cv_;
std::vector<allocation_event> events_;
bool stopped_{false};
};

/**
* @brief Consumes allocation_events from a queue and writes one CSV row per
* event from a background thread.
*/
class allocation_event_monitor {
public:
explicit allocation_event_monitor(std::ostream& out) : out_(out) {}

~allocation_event_monitor() { stop(); }

allocation_event_monitor(allocation_event_monitor const&) = delete;
allocation_event_monitor& operator=(allocation_event_monitor const&) = delete;

[[nodiscard]] auto get_queue() const noexcept -> std::shared_ptr<allocation_event_queue>
{
return queue_;
}

/**
* @brief Register a named source and return its id (column-group index).
* Must be called before start().
*/
auto register_source(std::string name) -> int
{
int id = static_cast<int>(source_names_.size()); // TODO (huuanhhuyn) conflict id?
source_names_.push_back(std::move(name));
view_.emplace_back();
return id;
}

void start()
{
if (worker_.joinable()) { return; }
write_header();
worker_ = std::thread([this] { run(); });
}

void stop()
{
if (!worker_.joinable()) { return; }
queue_->stop(); // drains the queue and causes the worker to exit its loop
worker_.join();
}

private:
struct source_view {
std::int64_t current{0};
std::int64_t total_alloc{0};
std::int64_t total_freed{0};
};

void write_header()
{
out_ << "timestamp_us";
for (auto const& name : source_names_) {
out_ << ',' << name << "_current," << name << "_peak," << name << "_total_alloc," << name
<< "_total_freed";
}
out_ << ",nvtx_depth,nvtx_range,event_source,event_bytes,alloc_range\n";
out_.flush();
}

void run()
{
std::vector<allocation_event> batch;
for (;;) {
bool keep_going = queue_->wait_and_take(batch);
for (auto const& event : batch) {
write_row(event);
}
out_.flush();
if (!keep_going) { break; }
}
}

void write_row(allocation_event const& event)
{
if (event.source_id >= 0 && event.source_id < static_cast<int>(view_.size())) {
view_[event.source_id] = source_view{event.current, event.total_alloc, event.total_freed};
}

auto us =
std::chrono::duration_cast<std::chrono::microseconds>(event.timestamp - start_).count();
out_ << us;
for (auto const& v : view_) {
out_ << ',' << v.current << ',' << v.current << ',' << v.total_alloc << ',' << v.total_freed;
}
out_ << ',' << event.nvtx_depth << ",\"" << event.nvtx_range << "\"";

auto const* src_name =
(event.source_id >= 0 && event.source_id < static_cast<int>(source_names_.size()))
? source_names_[event.source_id].c_str()
: "";
out_ << ',' << src_name << ',' << event.event_bytes << ",\"" << event.alloc_range << "\"\n";
}

std::ostream& out_;
std::shared_ptr<allocation_event_queue> queue_{std::make_shared<allocation_event_queue>()};
std::vector<std::string> source_names_;
std::vector<source_view> view_;
std::chrono::steady_clock::time_point start_{std::chrono::steady_clock::now()};
std::thread worker_;
};

} // namespace mr
} // namespace raft
6 changes: 3 additions & 3 deletions cpp/include/raft/mr/host_memory_resource.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ struct default_host_resource_holder {
}
};

inline default_host_resource_holder default_host_resource_holder_{};
RAFT_EXPORT inline default_host_resource_holder default_host_resource_holder_{};

} // namespace detail

Expand All @@ -56,7 +56,7 @@ inline default_host_resource_holder default_host_resource_holder_{};
* Returns raft::mr::host_resource_ref pointing to the resource installed
* via set_default_host_resource(), or new_delete_resource() if none was set.
*/
inline auto get_default_host_resource() -> raft::mr::host_resource_ref
RAFT_EXPORT inline auto get_default_host_resource() -> raft::mr::host_resource_ref
{
return detail::default_host_resource_holder_.get();
}
Expand All @@ -70,7 +70,7 @@ inline auto get_default_host_resource() -> raft::mr::host_resource_ref
* @param ref Non-owning reference to the resource to install.
* @return The previous default host resource ref.
*/
inline auto set_default_host_resource(raft::mr::host_resource_ref ref)
RAFT_EXPORT inline auto set_default_host_resource(raft::mr::host_resource_ref ref)
-> raft::mr::host_resource_ref
{
return detail::default_host_resource_holder_.set(ref);
Expand Down
Loading
Loading