Skip to content
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions contrib/tiflash-columnar-hub/.cargo/config.toml
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
[target.'cfg(not(target_os = "macos"))']
rustflags = ["-C", "link-arg=-Wl,--allow-multiple-definition"]
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ use std::sync::atomic::{AtomicPtr, Ordering};
use crate::{
interfaces_ffi::{
BaseBuffView, EngineStoreServerHelper, EngineStoreServerStatus, HttpRequestRes, MsgPBType,
RaftStoreProxyFFIHelper, RawCppStringPtr, RawVoidPtr, RAFT_STORE_PROXY_MAGIC_NUMBER,
RAFT_STORE_PROXY_VERSION,
RaftStoreProxyFFIHelper, RawCppStringPtr, RawVoidPtr, StoreStats,
RAFT_STORE_PROXY_MAGIC_NUMBER, RAFT_STORE_PROXY_VERSION,
},
UnwrapExternCFunc,
};
Expand All @@ -41,6 +41,7 @@ unsafe impl Sync for EngineStoreServerHelper {}
pub trait EngineStoreServerHelperExt {
fn check(&self);
fn set_proxy(&self, proxy: &mut RaftStoreProxyFFIHelper);
fn handle_compute_store_stats(&self) -> StoreStats;
fn handle_get_engine_store_server_status(&self) -> EngineStoreServerStatus;
fn handle_http_request(
&self,
Expand All @@ -66,6 +67,10 @@ impl EngineStoreServerHelperExt for EngineStoreServerHelper {
}
}

fn handle_compute_store_stats(&self) -> StoreStats {
unsafe { self.fn_handle_compute_store_stats.into_inner()(self.inner) }
}

fn handle_get_engine_store_server_status(&self) -> EngineStoreServerStatus {
unsafe { self.fn_handle_get_engine_store_server_status.into_inner()(self.inner) }
}
Expand Down
46 changes: 43 additions & 3 deletions contrib/tiflash-columnar-hub/hub-runtime/src/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::{
process,
sync::{
atomic::{AtomicBool, AtomicU8, Ordering},
Arc,
Arc, Once,
},
thread,
time::{Duration, SystemTime, UNIX_EPOCH},
Expand Down Expand Up @@ -240,6 +240,27 @@ const STORE_HEARTBEAT_INTERVAL: Duration = Duration::from_secs(10);
const HEARTBEAT_SHUTDOWN_POLL_INTERVAL: Duration = Duration::from_millis(200);
const STORE_TOMBSTONE_WAIT_TIMEOUT: Duration = Duration::from_secs(30);
const STORE_TOMBSTONE_POLL_INTERVAL: Duration = Duration::from_secs(1);
const METRICS_PREFIX: &str = "tiflash_proxy";

fn init_metrics() {
static INIT: Once = Once::new();

INIT.call_once(|| {
tikv_util::metrics::monitor_process().unwrap_or_else(|err| {
panic!("failed to start process monitor: {}", err);
});
tikv_util::metrics::warn_if_kernel_metrics_disabled();
tikv_util::metrics::monitor_threads(METRICS_PREFIX).unwrap_or_else(|err| {
panic!("failed to start thread monitor: {}", err);
});
tikv_util::metrics::monitor_system_psi(METRICS_PREFIX).unwrap_or_else(|err| {
panic!("failed to start PSI monitor: {}", err);
});
tikv_util::metrics::monitor_allocator_stats(METRICS_PREFIX).unwrap_or_else(|err| {
panic!("failed to monitor allocator stats: {}", err);
});
});
}

fn load_config(path: Option<&OsStr>) -> ConfigFile {
path.map_or_else(ConfigFile::default, |path| {
Expand Down Expand Up @@ -916,6 +937,14 @@ fn collect_store_space_stats(data_dir: &Path) -> Option<(u64, u64)> {
}
}

fn collect_store_space_stats_from_engine_store() -> Option<(u64, u64)> {
let stats = get_engine_store_server_helper().handle_compute_store_stats();
if stats.fs_stats.ok == 0 {
return None;
}
Some((stats.fs_stats.capacity_size, stats.fs_stats.avail_size))
Comment thread
yongman marked this conversation as resolved.
}

fn build_store_heartbeat_stats_from_space(
store_id: u64,
start_time: u32,
Expand Down Expand Up @@ -946,7 +975,8 @@ fn build_store_heartbeat_stats(
last_report_ts: u64,
data_dir: &Path,
) -> Option<pdpb::StoreStats> {
let (capacity, available) = collect_store_space_stats(data_dir)?;
let (capacity, available) = collect_store_space_stats_from_engine_store()
.or_else(|| collect_store_space_stats(data_dir))?;
if capacity == 0 {
warn!(
"skip store heartbeat because disk capacity is unavailable";
Expand Down Expand Up @@ -1321,6 +1351,8 @@ pub unsafe fn run_proxy(argc: c_int, argv: *const *const c_char, helper_ptr: *co
process::exit(0);
}

init_metrics();

let security_mgr = Arc::new(SecurityManager::new(&config.security).unwrap());
let env = Arc::new(EnvBuilder::new().cq_count(1).name_prefix("pd").build());
let pd_client: Arc<dyn PdClient> =
Expand Down Expand Up @@ -1797,7 +1829,15 @@ log-rotation-size = "1024MiB"
));
fs::create_dir_all(&temp_dir).unwrap();

let stats = build_store_heartbeat_stats(9527, 123, 456, &temp_dir).unwrap();
let (capacity, available) = collect_store_space_stats(&temp_dir).unwrap();
let stats = build_store_heartbeat_stats_from_space(
9527,
123,
456,
capacity,
available,
)
.unwrap();
assert_eq!(stats.get_store_id(), 9527);
assert_eq!(stats.get_start_time(), 123);
assert!(stats.get_capacity() > 0);
Expand Down