Skip to content
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions contrib/tiflash-columnar-hub/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
[target.'cfg(not(target_os = "macos"))']
rustflags = ["-C", "link-arg=-Wl,--allow-multiple-definition"]

[env]
PROMETHEUS_METRIC_NAME_PREFIX = "tiflash_proxy_"
3 changes: 1 addition & 2 deletions contrib/tiflash-columnar-hub/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions contrib/tiflash-columnar-hub/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ keys = { git = "https://github.com/tidbcloud/cloud-storage-engine.git", branch =
kvengine = { git = "https://github.com/tidbcloud/cloud-storage-engine.git", branch = "cloud-engine" }
kvenginepb = { git = "https://github.com/tidbcloud/cloud-storage-engine.git", branch = "cloud-engine" }
pd_client = { git = "https://github.com/tidbcloud/cloud-storage-engine.git", branch = "cloud-engine", default-features = false }
prometheus = { version = "=0.13.0", features = ["nightly", "push"], default-features = true }
security = { git = "https://github.com/tidbcloud/cloud-storage-engine.git", branch = "cloud-engine", default-features = false }
tikv_util = { git = "https://github.com/tidbcloud/cloud-storage-engine.git", branch = "cloud-engine" }
url = { version = "2", default-features = true }
Expand All @@ -26,6 +27,7 @@ tokio-executor = { git = "https://github.com/tikv/tokio", branch = "tokio-timer-
lindera = { git = "https://github.com/breezewish/lindera", branch = "v0.43.1-tokio-1.24" }
tantivy = { git = "https://github.com/breezewish/tikv-tantivy.git", branch = "patch-0.22.1" }
pprof = { git = "https://github.com/tikv/pprof-rs.git", rev = "01cff82dbe6fe110a707bf2b38d8ebb1d14a18f8" }
prometheus = { git = "https://github.com/solotzg/rust-prometheus.git", rev = "b4fe98a06a58d29f9b9987a0d7186f6ed5230193", features = ["nightly", "push"], default-features = true }
kvproto = { git = "https://github.com/pingcap/kvproto.git" }
tipb = { git = "https://github.com/pingcap/tipb.git" }

Expand Down
1 change: 1 addition & 0 deletions contrib/tiflash-columnar-hub/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ export PROXY_BUILD_RUSTC_VERSION := $(shell rustc --version 2> /dev/null || echo
export PROXY_BUILD_GIT_HASH ?= $(shell git -C $(PROJECT_DIR) rev-parse HEAD 2> /dev/null || echo ${BUILD_INFO_GIT_FALLBACK})
export PROXY_BUILD_GIT_BRANCH ?= $(shell git -C $(PROJECT_DIR) rev-parse --abbrev-ref HEAD 2> /dev/null || echo ${BUILD_INFO_GIT_FALLBACK})
export PROXY_PROFILE ?= debug
export PROMETHEUS_METRIC_NAME_PREFIX ?= tiflash_proxy_

# `kvengine` pulls in both `usearch` and the standalone `simsimd` crate from
# cloud-storage-engine. They currently export the same C symbols, so allow the
Expand Down
2 changes: 1 addition & 1 deletion contrib/tiflash-columnar-hub/hub-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ num_cpus = "1"
pd_client = { workspace = true }
pprof = { version = "0.15", default-features = false, features = ["flamegraph", "cpp", "framehop-unwinder", "protobuf-codec"] }
pprof_util = { version = "0.8.2", features = ["flamegraph"] }
prometheus = { version = "=0.13.0", features = ["nightly", "push"], default-features = true }
prometheus = { workspace = true }
protobuf = { version = "2.8", features = ["bytes"], default-features = true }
quick_cache = "0.6.14"
regex = "1"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::sync::atomic::{AtomicPtr, Ordering};
use crate::{
interfaces_ffi::{
BaseBuffView, EngineStoreServerHelper, EngineStoreServerStatus, HttpRequestRes, MsgPBType,
RaftStoreProxyFFIHelper, RawCppStringPtr, RawVoidPtr, RAFT_STORE_PROXY_MAGIC_NUMBER,
RAFT_STORE_PROXY_VERSION,
RaftStoreProxyFFIHelper, RawCppStringPtr, RawVoidPtr, StoreStats,
RAFT_STORE_PROXY_MAGIC_NUMBER, RAFT_STORE_PROXY_VERSION,
},
UnwrapExternCFunc,
};
Expand All @@ -41,6 +41,7 @@ unsafe impl Sync for EngineStoreServerHelper {}
pub trait EngineStoreServerHelperExt {
fn check(&self);
fn set_proxy(&self, proxy: &mut RaftStoreProxyFFIHelper);
fn handle_compute_store_stats(&self) -> StoreStats;
fn handle_get_engine_store_server_status(&self) -> EngineStoreServerStatus;
fn handle_http_request(
&self,
Expand All @@ -66,6 +67,10 @@ impl EngineStoreServerHelperExt for EngineStoreServerHelper {
}
}

fn handle_compute_store_stats(&self) -> StoreStats {
unsafe { self.fn_handle_compute_store_stats.into_inner()(self.inner) }
}

fn handle_get_engine_store_server_status(&self) -> EngineStoreServerStatus {
unsafe { self.fn_handle_get_engine_store_server_status.into_inner()(self.inner) }
}
Expand Down
38 changes: 35 additions & 3 deletions contrib/tiflash-columnar-hub/hub-runtime/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
process,
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
Arc, Once,
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
Expand Down Expand Up @@ -240,6 +240,25 @@ const STORE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
const HEARTBEAT_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(200);
const STORE_TOMBSTONE_WAIT_TIMEOUT: Duration = Duration::from_secs(30);
const STORE_TOMBSTONE_POLL_INTERVAL: Duration = Duration::from_secs(1);
fn init_metrics() {
static INIT: Once = Once::new();

INIT.call_once(|| {
tikv_util::metrics::monitor_process().unwrap_or_else(|err| {
panic!("failed to start process monitor: {}", err);
});
tikv_util::metrics::warn_if_kernel_metrics_disabled();
tikv_util::metrics::monitor_threads("").unwrap_or_else(|err| {
panic!("failed to start thread monitor: {}", err);
});
tikv_util::metrics::monitor_system_psi("").unwrap_or_else(|err| {
panic!("failed to start PSI monitor: {}", err);
});
tikv_util::metrics::monitor_allocator_stats("").unwrap_or_else(|err| {
panic!("failed to monitor allocator stats: {}", err);
});
});
}

fn load_config(path: Option<&OsStr>) -> ConfigFile {
path.map_or_else(ConfigFile::default, |path| {
Expand Down Expand Up @@ -916,6 +935,14 @@ fn collect_store_space_stats(data_dir: &Path) -> Option<(u64, u64)> {
}
}

fn collect_store_space_stats_from_engine_store() -> Option<(u64, u64)> {
let stats = get_engine_store_server_helper().handle_compute_store_stats();
if stats.fs_stats.ok == 0 {
return None;
}
Some((stats.fs_stats.capacity_size, stats.fs_stats.avail_size))
Comment thread
yongman marked this conversation as resolved.
}

fn build_store_heartbeat_stats_from_space(
store_id: u64,
start_time: u32,
Expand Down Expand Up @@ -946,7 +973,8 @@ fn build_store_heartbeat_stats(
last_report_ts: u64,
data_dir: &Path,
) -> Option<pdpb::StoreStats> {
let (capacity, available) = collect_store_space_stats(data_dir)?;
let (capacity, available) = collect_store_space_stats_from_engine_store()
.or_else(|| collect_store_space_stats(data_dir))?;
if capacity == 0 {
warn!(
"skip store heartbeat because disk capacity is unavailable";
Expand Down Expand Up @@ -1321,6 +1349,8 @@ pub unsafe fn run_proxy(argc: c_int, argv: *const *const c_char, helper_ptr: *co
process::exit(0);
}

init_metrics();

let security_mgr = Arc::new(SecurityManager::new(&config.security).unwrap());
let env = Arc::new(EnvBuilder::new().cq_count(1).name_prefix("pd").build());
let pd_client: Arc<dyn PdClient> =
Expand Down Expand Up @@ -1797,7 +1827,9 @@ log-rotation-size = "1024MiB"
));
fs::create_dir_all(&temp_dir).unwrap();

let stats = build_store_heartbeat_stats(9527, 123, 456, &temp_dir).unwrap();
let (capacity, available) = collect_store_space_stats(&temp_dir).unwrap();
let stats =
build_store_heartbeat_stats_from_space(9527, 123, 456, capacity, available).unwrap();
assert_eq!(stats.get_store_id(), 9527);
assert_eq!(stats.get_start_time(), 123);
assert!(stats.get_capacity() > 0);
Expand Down
8 changes: 8 additions & 0 deletions contrib/tiflash-proxy-cmake/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,7 @@ list(FILTER _TIFLASH_PROXY_SRCS EXCLUDE REGEX "/target/")
set(_TIFLASH_PROXY_MAKEFILE "${_TIFLASH_PROXY_SOURCE_DIR}/Makefile")
set(_TIFLASH_PROXY_CARGO_MANIFEST "${_TIFLASH_PROXY_SOURCE_DIR}/Cargo.toml")
set(_TIFLASH_PROXY_CARGO_LOCK "${_TIFLASH_PROXY_SOURCE_DIR}/Cargo.lock")
set(_TIFLASH_PROXY_CARGO_CONFIG "${_TIFLASH_PROXY_SOURCE_DIR}/.cargo/config.toml")
set(_TIFLASH_PROXY_RUST_TOOLCHAIN "${TiFlash_SOURCE_DIR}/rust-toolchain.toml")
if (EXISTS "${_TIFLASH_PROXY_SOURCE_DIR}/rust-toolchain.toml")
set(_TIFLASH_PROXY_RUST_TOOLCHAIN "${_TIFLASH_PROXY_SOURCE_DIR}/rust-toolchain.toml")
Expand All @@ -128,6 +129,10 @@ endif()
# Build in the build directory instead of the default source directory
set(TIFLASH_RUST_ENV "CARGO_TARGET_DIR=${CMAKE_CURRENT_BINARY_DIR}" ${TIFLASH_RUST_ENV})

if (ENABLE_NEXT_GEN_COLUMNAR)
set(TIFLASH_RUST_ENV "PROMETHEUS_METRIC_NAME_PREFIX=tiflash_proxy_" ${TIFLASH_RUST_ENV})
Comment thread
yongman marked this conversation as resolved.
endif()

# Set CMAKE_POLICY_VERSION_MINIMUM to support CMake 4.0+ with older crate CMakeLists.txt files.
set(TIFLASH_RUST_ENV "CMAKE_POLICY_VERSION_MINIMUM=3.5" ${TIFLASH_RUST_ENV})

Expand Down Expand Up @@ -169,6 +174,9 @@ set(_TIFLASH_PROXY_CUSTOM_DEPENDS
if (EXISTS "${_TIFLASH_PROXY_CARGO_LOCK}")
list(APPEND _TIFLASH_PROXY_CUSTOM_DEPENDS "${_TIFLASH_PROXY_CARGO_LOCK}")
endif()
if (EXISTS "${_TIFLASH_PROXY_CARGO_CONFIG}")
list(APPEND _TIFLASH_PROXY_CUSTOM_DEPENDS "${_TIFLASH_PROXY_CARGO_CONFIG}")
endif()

add_custom_command(OUTPUT ${_TIFLASH_PROXY_LIBRARY}
COMMENT "Building TiFlash Proxy using ${_TIFLASH_PROXY_BUILD_PROFILE} profile"
Expand Down