Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
19 commits
Select commit Hold shift + click to select a range
75bd8fd
storage: enable FileCache on disagg storage nodes
JaySon-Huang Jun 27, 2026
b97fcf8
storage: wire write FileCache builder hook for background reads
JaySon-Huang Jun 27, 2026
1b5c65d
storage: add MetaV2 merged file collector for write FileCache staging
JaySon-Huang Jun 27, 2026
85bf22d
storage: stage MetaV2 merged files via FileCache in DMFileReader
JaySon-Huang Jun 27, 2026
092469a
storage: use TiFlash metrics for write FileCache staging
JaySon-Huang Jun 27, 2026
d04e529
storage: add segment integration tests for write FileCache staging
JaySon-Huang Jun 28, 2026
7e2e5de
Storages: Add dmf_disk_bytes to logging
JaySon-Huang Jun 28, 2026
7d00a6e
storage: add prepare/remote upload subtask metrics and stable column …
JaySon-Huang Jun 28, 2026
e0da308
storage: log prepare and remote upload timing in merge finish
JaySon-Huang Jun 28, 2026
865773b
storage: log prepare and remote upload timing in split finish
JaySon-Huang Jun 28, 2026
2e5dc31
Format codes
JaySon-Huang Jun 28, 2026
8800b34
disable flaky fap test
JaySon-Huang Jun 28, 2026
1814fd9
storage: tag FileCache downloadImpl with foreground/background type
JaySon-Huang Jun 29, 2026
65d944e
storage: stage standalone MetaV2 column files for write FileCache
JaySon-Huang Jun 29, 2026
6cc0912
storage: log elapsed time for slow split ingest after 10s
JaySon-Huang Jun 29, 2026
41a1981
storage: add ingestFiles duration metric and elapsed logging
JaySon-Huang Jun 29, 2026
6b08e65
Format codes
JaySon-Huang Jun 29, 2026
4f49df1
tests: disable flaky fap test
JaySon-Huang Jun 30, 2026
063328c
test: fix flaky DisaggReadSnapshot — skip background tasks during wri…
JaySon-Huang Jun 11, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 23 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,12 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_seg_split_fg, {"type", "seg_split_fg"}), \
F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \
F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
F(type_place_index_update, {"type", "place_index_update"}), \
F(type_prepare_merge_delta, {"type", "prepare_merge_delta"}), \
F(type_prepare_merge, {"type", "prepare_merge"}), \
F(type_prepare_split_physical, {"type", "prepare_split_physical"}), \
F(type_remote_upload, {"type", "remote_upload"}), \
F(type_ingest, {"type", "ingest"})) \
M(tiflash_storage_subtask_duration_seconds, \
"Bucketed histogram of storage's sub task duration", \
Histogram, /* increase the bucket from 10ms to 87 minutes */ \
Expand All @@ -274,7 +279,12 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.010, 2, 20}), \
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.010, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.010, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.010, 2, 20})) \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.010, 2, 20}), \
F(type_prepare_merge_delta, {{"type", "prepare_merge_delta"}}, ExpBuckets{0.010, 2, 20}), \
F(type_prepare_merge, {{"type", "prepare_merge"}}, ExpBuckets{0.010, 2, 20}), \
F(type_prepare_split_physical, {{"type", "prepare_split_physical"}}, ExpBuckets{0.010, 2, 20}), \
F(type_remote_upload, {{"type", "remote_upload"}}, ExpBuckets{0.010, 2, 20}), \
F(type_ingest, {{"type", "ingest"}}, ExpBuckets{0.010, 2, 20})) \
M(tiflash_storage_subtask_throughput_bytes, \
"Calculate the throughput of (maybe foreground) tasks of storage in bytes", \
Counter, /**/ \
Expand Down Expand Up @@ -925,6 +935,17 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
Gauge, \
F(type_bg_downloading_count, {{"type", "bg_downloading_count"}}), \
F(type_bg_download_queue_count, {{"type", "bg_download_queue_count"}})) \
M(tiflash_storage_write_filecache_staging, \
"Write-side FileCache local staging for background DMFile reads", \
Counter, \
F(type_attempt, {"type", "attempt"}), \
F(type_object, {"type", "object"}), \
F(type_download_ok, {"type", "download_ok"}), \
F(type_download_failed, {"type", "download_failed"})) \
M(tiflash_storage_write_filecache_staging_bytes, \
"Bytes staged by write FileCache local staging", \
Counter, \
F(type_staged, {"type", "staged"})) \
M(tiflash_system_seconds, \
"system calls duration in seconds", \
Histogram, \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ struct Settings
M(SettingDouble, dt_filecache_max_downloading_count_scale, 10.0, "Max queue size of download task count of FileCache = number of logical cpu cores * dt_filecache_max_downloading_count_scale.") \
M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \
M(SettingUInt64, dt_filecache_wait_on_downloading_ms, 0, "When a remote cache lookup sees the same key is already being downloaded, wait up to this many milliseconds for that download to finish. 0 disables the bounded wait.") \
M(SettingBool, dt_enable_write_filecache, false, "Enable local FileCache staging for disaggregated tiflash write nodes when FileCache is available.") \
M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \
M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \
M(SettingDouble, dt_fetch_page_concurrency_scale, 4.0, "Concurrency of fetching pages of one query equals to num_streams * dt_fetch_page_concurrency_scale.") \
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -673,8 +673,9 @@ try
const auto is_disagg_compute_mode = global_context->getSharedContextDisagg()->isDisaggregatedComputeMode();
const auto is_disagg_storage_mode = global_context->getSharedContextDisagg()->isDisaggregatedStorageMode();
const auto not_disagg_mode = global_context->getSharedContextDisagg()->notDisaggregatedMode();
const bool enable_remote_cache = is_disagg_compute_mode || is_disagg_storage_mode;
const auto [remote_cache_paths, remote_cache_capacity_quota]
= storage_config.remote_cache_config.getCacheDirInfos(is_disagg_compute_mode);
= storage_config.remote_cache_config.getCacheDirInfos(enable_remote_cache);
global_context->initializePathCapacityMetric( //
global_capacity_quota, //
storage_config.main_data_paths,
Expand All @@ -690,7 +691,7 @@ try
storage_config.kvstore_data_path, //
global_context->getPathCapacity(),
global_context->getFileProvider());
if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && is_disagg_compute_mode)
if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && enable_remote_cache)
{
config.initCacheDir();
FileCache::initialize(
Expand All @@ -702,6 +703,11 @@ try
// here so startup-time values like dt_filecache_wait_on_downloading_ms take effect immediately
// instead of waiting for a later config reload.
FileCache::instance()->updateConfig(global_context->getSettingsRef());
LOG_INFO(
log,
"Initialized FileCache for disaggregated remote cache (compute_mode={} storage_mode={})",
is_disagg_compute_mode,
is_disagg_storage_mode);
}

/// Determining PageStorage run mode based on current files on disk and storage config.
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,13 +732,14 @@ UInt64 StorageRemoteCacheConfig::getReservedCapacity() const
return capacity * reserved_rate;
}

std::pair<Strings, std::vector<size_t>> StorageRemoteCacheConfig::getCacheDirInfos(bool is_compute_mode) const
std::pair<Strings, std::vector<size_t>> StorageRemoteCacheConfig::getCacheDirInfos(bool is_disagg_mode) const
{
if (is_compute_mode && isCacheEnabled())
if (is_disagg_mode && isCacheEnabled())
{
return {
Strings{getDTFileCacheDir(), getPageCacheDir()},
std::vector<size_t>{getDTFileCapacity(), getPageCapacity()}};
std::vector<size_t>{getDTFileCapacity(), getPageCapacity()},
};
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ struct StorageRemoteCacheConfig
UInt64 getReservedCapacity() const;
void parse(const String & content, const LoggerPtr & log);

std::pair<Strings, std::vector<size_t>> getCacheDirInfos(bool is_compute_mode) const;
// `is_disagg_mode` is true when the process is a disaggregated compute or storage node that may use remote cache.
std::pair<Strings, std::vector<size_t>> getCacheDirInfos(bool is_disagg_mode) const;
};

struct TiFlashStorageConfig
Expand Down
41 changes: 41 additions & 0 deletions dbms/src/Server/tests/gtest_storage_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,47 @@ delta_rate = 1.1
}
CATCH

TEST_F(StorageConfigTest, RemoteCacheDirInfosForDisaggMode)
try
{
auto log = Logger::get("StorageConfigTest.RemoteCacheDirInfosForDisaggMode");
const String config_str = R"(
[storage]
[storage.main]
dir = ["123"]
[storage.remote.cache]
dir = "/tmp/StorageConfigTest/RemoteCacheDirInfosForDisaggMode"
capacity = 10000000
dtfile_level = 11
delta_rate = 0.33
)";

auto config = loadConfigFromString(config_str);
size_t global_capacity_quota = 0;
TiFlashStorageConfig storage;
std::tie(global_capacity_quota, storage) = TiFlashStorageConfig::parseSettings(*config, log);

const auto & cache_config = storage.remote_cache_config;
ASSERT_TRUE(cache_config.isCacheEnabled());

{
const auto [paths, capacities] = cache_config.getCacheDirInfos(false);
ASSERT_TRUE(paths.empty());
ASSERT_TRUE(capacities.empty());
}

{
const auto [paths, capacities] = cache_config.getCacheDirInfos(true);
ASSERT_EQ(paths.size(), 2);
ASSERT_EQ(capacities.size(), 2);
ASSERT_EQ(paths[0], cache_config.getDTFileCacheDir());
ASSERT_EQ(paths[1], cache_config.getPageCacheDir());
ASSERT_EQ(capacities[0], cache_config.getDTFileCapacity());
ASSERT_EQ(capacities[1], cache_config.getPageCapacity());
}
}
CATCH

TEST_F(StorageConfigTest, TempPath)
try
{
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,10 @@ class DeltaMergeStore
#endif

using TaskQueue = std::queue<BackgroundTask, std::list<BackgroundTask>>;
std::mutex mutex;
TaskQueue light_tasks;
TaskQueue heavy_tasks;

std::mutex mutex;

public:
size_t length()
{
Expand Down
66 changes: 55 additions & 11 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_Ingest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@

#include <Common/EventRecorder.h>
#include <Common/FailPoint.h>
#include <Common/Stopwatch.h>
#include <Common/TiFlashMetrics.h>
#include <Interpreters/Context.h>
#include <Interpreters/SharedContexts/Disagg.h>
Expand Down Expand Up @@ -228,6 +229,25 @@ Segments DeltaMergeStore::ingestDTFilesUsingColumnFile(
return updated_segments;
}

namespace
{
constexpr double slow_split_ingest_log_threshold_seconds = 10.0;

struct SplitIngestLogContext
{
const Stopwatch & watch;

bool isSlow() const { return watch.elapsedSeconds() > slow_split_ingest_log_threshold_seconds; }

double elapsedSeconds() const { return watch.elapsedSeconds(); }

Poco::Message::Priority debugOrInfoLevel() const
{
return isSlow() ? Poco::Message::PRIO_INFORMATION : Poco::Message::PRIO_DEBUG;
}
};
} // namespace

/**
* Accept a target ingest range and a vector of DTFiles, ingest these DTFiles (clipped by the target ingest range)
* using logical split.
Expand All @@ -245,6 +265,9 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
const DMFiles & files,
bool clear_data_in_range)
{
Stopwatch watch;
SplitIngestLogContext split_ingest_log{watch};

{
RUNTIME_CHECK(files.size() == external_files.size(), files.size(), external_files.size());
for (size_t i = 0; i < files.size(); ++i)
Expand Down Expand Up @@ -331,9 +354,11 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
* ↑ The segment we ingest DMFile into
*/

LOG_DEBUG(
LOG_IMPL(
log,
"Table ingest using split - split ingest phase - begin, ingest_range={}, files_n={}",
split_ingest_log.debugOrInfoLevel(),
"Table ingest using split - split ingest phase - begin, elapsed_seconds={:.3f} ingest_range={} files_n={}",
split_ingest_log.elapsedSeconds(),
ingest_range.toDebugString(),
files.size());

Expand Down Expand Up @@ -389,8 +414,10 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(

LOG_INFO(
log,
"Table ingest using split - split ingest phase - Try to ingest file into segment, file_idx={} "
"Table ingest using split - split ingest phase - Try to ingest file into segment, "
"elapsed_seconds={:.3f} file_idx={} "
"file_id=dmf_{} file_ingest_range={} segment={} segment_ingest_range={}",
split_ingest_log.elapsedSeconds(),
file_idx,
files[file_idx]->fileId(),
file_ingest_range.toDebugString(),
Expand All @@ -412,15 +439,17 @@ Segments DeltaMergeStore::ingestDTFilesUsingSplit(
}
else
{
// this segment is abandoned, or may be split into multiples.
// this segment is abandoned, or may be split into multiples, or running segmentMerge/segmentMergeDelta, etc.
// retry with current range and file and find segment again.
}
}
}

LOG_DEBUG(
LOG_IMPL(
log,
"Table ingest using split - split ingest phase - finished, updated_segments_n={}",
split_ingest_log.debugOrInfoLevel(),
"Table ingest using split - split ingest phase - finished, elapsed_seconds={:.3f} updated_segments_n={}",
split_ingest_log.elapsedSeconds(),
updated_segments.size());

return std::vector<SegmentPtr>(updated_segments.begin(), updated_segments.end());
Expand Down Expand Up @@ -592,6 +621,11 @@ UInt64 DeltaMergeStore::ingestFiles(
throw Exception(msg);
}

GET_METRIC(tiflash_storage_subtask_count, type_ingest).Increment();
Stopwatch watch_ingest;
SCOPE_EXIT(
{ GET_METRIC(tiflash_storage_subtask_duration_seconds, type_ingest).Observe(watch_ingest.elapsedSeconds()); });

{
// `ingestDTFilesUsingSplit` requires external_files to be not overlapped. Otherwise the results will be incorrect.
// Here we verify the external_files are ordered and not overlapped.
Expand Down Expand Up @@ -651,8 +685,6 @@ UInt64 DeltaMergeStore::ingestFiles(
}
}

EventRecorder write_block_recorder(ProfileEvents::DMWriteFile, ProfileEvents::DMWriteFileNS);

auto delegate = dm_context->path_pool->getStableDiskDelegator();
auto file_provider = dm_context->global_context.getFileProvider();

Expand Down Expand Up @@ -787,10 +819,21 @@ UInt64 DeltaMergeStore::ingestFiles(
if (has_segments || !external_files.empty())
{
if (use_split_replace)
updated_segments
= ingestDTFilesUsingSplit(dm_context, range, external_files, files, clear_data_in_range);
{
// For large files, we use split+replace to ingest the files into stable layer directly.
// Check the `ingestDTFilesUsingSplit` for the details steps.
updated_segments = ingestDTFilesUsingSplit( //
dm_context,
range,
external_files,
files,
clear_data_in_range);
}
else
{
// For small files, we ingest them into the delta layer directly, which is more efficient than split+replace.
updated_segments = ingestDTFilesUsingColumnFile(dm_context, range, files, clear_data_in_range);
}
}
}

Expand Down Expand Up @@ -837,7 +880,8 @@ UInt64 DeltaMergeStore::ingestFiles(

LOG_INFO(
log,
"Table ingest files - finished ingested files into segments, {} clear={}",
"Table ingest files - finished ingested files into segments, elapsed_seconds={:.3f} {} clear={}",
watch_ingest.elapsedSeconds(),
get_ingest_info(),
clear_data_in_range);
}
Expand Down
Loading