Skip to content
Open
Show file tree
Hide file tree
Changes from 13 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
6edeb6e
storage: enable FileCache on disagg storage nodes
JaySon-Huang Jun 27, 2026
1e4cd02
storage: wire write FileCache builder hook for background reads
JaySon-Huang Jun 27, 2026
fa2cffd
storage: add MetaV2 merged file collector for write FileCache staging
JaySon-Huang Jun 27, 2026
6cd745f
storage: stage MetaV2 merged files via FileCache in DMFileReader
JaySon-Huang Jun 27, 2026
ef8166c
storage: use TiFlash metrics for write FileCache staging
JaySon-Huang Jun 27, 2026
444044b
storage: add segment integration tests for write FileCache staging
JaySon-Huang Jun 28, 2026
28408f3
Storages: Add dmf_disk_bytes to logging
JaySon-Huang Jun 28, 2026
8620ad2
storage: add prepare/remote upload subtask metrics and stable column …
JaySon-Huang Jun 28, 2026
8ae7f2e
storage: log prepare and remote upload timing in merge finish
JaySon-Huang Jun 28, 2026
0f8b43c
storage: log prepare and remote upload timing in split finish
JaySon-Huang Jun 28, 2026
ab8750a
Format codes
JaySon-Huang Jun 28, 2026
ef66e9b
disable flaky fap test
JaySon-Huang Jun 28, 2026
3a23788
storage: tag FileCache downloadImpl with foreground/background type
JaySon-Huang Jun 29, 2026
d575a25
storage: stage standalone MetaV2 column files for write FileCache
JaySon-Huang Jun 29, 2026
7d2f613
storage: log elapsed time for slow split ingest after 10s
JaySon-Huang Jun 29, 2026
a98accc
storage: add ingestFiles duration metric and elapsed logging
JaySon-Huang Jun 29, 2026
5a86402
Format codes
JaySon-Huang Jun 29, 2026
3936355
Replenish the design docs
JaySon-Huang Jun 29, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
23 changes: 21 additions & 2 deletions dbms/src/Common/TiFlashMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,11 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_seg_split_fg, {"type", "seg_split_fg"}), \
F(type_seg_split_ingest, {"type", "seg_split_ingest"}), \
F(type_seg_merge_bg_gc, {"type", "seg_merge_bg_gc"}), \
F(type_place_index_update, {"type", "place_index_update"})) \
F(type_place_index_update, {"type", "place_index_update"}), \
F(type_prepare_merge_delta, {"type", "prepare_merge_delta"}), \
F(type_prepare_merge, {"type", "prepare_merge"}), \
F(type_prepare_split_physical, {"type", "prepare_split_physical"}), \
F(type_remote_upload, {"type", "remote_upload"})) \
M(tiflash_storage_subtask_duration_seconds, \
"Bucketed histogram of storage's sub task duration", \
Histogram, /* increase the bucket from 10ms to 87 minutes */ \
Expand All @@ -288,7 +292,11 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
F(type_seg_split_fg, {{"type", "seg_split_fg"}}, ExpBuckets{0.010, 2, 20}), \
F(type_seg_split_ingest, {{"type", "seg_split_ingest"}}, ExpBuckets{0.010, 2, 20}), \
F(type_seg_merge_bg_gc, {{"type", "seg_merge_bg_gc"}}, ExpBuckets{0.010, 2, 20}), \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.010, 2, 20})) \
F(type_place_index_update, {{"type", "place_index_update"}}, ExpBuckets{0.010, 2, 20}), \
F(type_prepare_merge_delta, {{"type", "prepare_merge_delta"}}, ExpBuckets{0.010, 2, 20}), \
F(type_prepare_merge, {{"type", "prepare_merge"}}, ExpBuckets{0.010, 2, 20}), \
F(type_prepare_split_physical, {{"type", "prepare_split_physical"}}, ExpBuckets{0.010, 2, 20}), \
F(type_remote_upload, {{"type", "remote_upload"}}, ExpBuckets{0.010, 2, 20})) \
M(tiflash_storage_subtask_throughput_bytes, \
"Calculate the throughput of (maybe foreground) tasks of storage in bytes", \
Counter, /**/ \
Expand Down Expand Up @@ -940,6 +948,17 @@ static_assert(RAFT_REGION_BIG_WRITE_THRES * 4 < RAFT_REGION_BIG_WRITE_MAX, "Inva
Gauge, \
F(type_bg_downloading_count, {{"type", "bg_downloading_count"}}), \
F(type_bg_download_queue_count, {{"type", "bg_download_queue_count"}})) \
M(tiflash_storage_write_filecache_staging, \
"Write-side FileCache local staging for background DMFile reads", \
Counter, \
F(type_attempt, {"type", "attempt"}), \
F(type_object, {"type", "object"}), \
F(type_download_ok, {"type", "download_ok"}), \
F(type_download_failed, {"type", "download_failed"})) \
M(tiflash_storage_write_filecache_staging_bytes, \
"Bytes staged by write FileCache local staging", \
Counter, \
F(type_staged, {"type", "staged"})) \
M(tiflash_system_seconds, \
"system calls duration in seconds", \
Histogram, \
Expand Down
1 change: 1 addition & 0 deletions dbms/src/Interpreters/Settings.h
Original file line number Diff line number Diff line change
Expand Up @@ -252,6 +252,7 @@ struct Settings
M(SettingDouble, dt_filecache_max_downloading_count_scale, 10.0, "Max queue size of download task count of FileCache = number of logical cpu cores * dt_filecache_max_downloading_count_scale.") \
M(SettingUInt64, dt_filecache_min_age_seconds, 1800, "Files of the same priority can only be evicted from files that were not accessed within `dt_filecache_min_age_seconds` seconds.") \
M(SettingUInt64, dt_filecache_wait_on_downloading_ms, 0, "When a remote cache lookup sees the same key is already being downloaded, wait up to this many milliseconds for that download to finish. 0 disables the bounded wait.") \
M(SettingBool, dt_enable_write_filecache, false, "Enable local FileCache staging for disaggregated tiflash write nodes when FileCache is available.") \
M(SettingBool, dt_enable_fetch_memtableset, true, "Whether fetching delta cache in FetchDisaggPages") \
M(SettingUInt64, dt_fetch_pages_packet_limit_size, 512 * 1024, "Response packet bytes limit of FetchDisaggPages, 0 means one page per packet") \
M(SettingDouble, dt_fetch_page_concurrency_scale, 4.0, "Concurrency of fetching pages of one query equals to num_streams * dt_fetch_page_concurrency_scale.") \
Expand Down
10 changes: 8 additions & 2 deletions dbms/src/Server/Server.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -752,8 +752,9 @@ try
const auto is_disagg_compute_mode = global_context->getSharedContextDisagg()->isDisaggregatedComputeMode();
const auto is_disagg_storage_mode = global_context->getSharedContextDisagg()->isDisaggregatedStorageMode();
const auto not_disagg_mode = global_context->getSharedContextDisagg()->notDisaggregatedMode();
const bool enable_remote_cache = is_disagg_compute_mode || is_disagg_storage_mode;
const auto [remote_cache_paths, remote_cache_capacity_quota]
= storage_config.remote_cache_config.getCacheDirInfos(is_disagg_compute_mode);
= storage_config.remote_cache_config.getCacheDirInfos(enable_remote_cache);
global_context->initializePathCapacityMetric( //
global_capacity_quota, //
storage_config.main_data_paths,
Expand All @@ -769,7 +770,7 @@ try
storage_config.kvstore_data_path, //
global_context->getPathCapacity(),
global_context->getFileProvider());
if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && is_disagg_compute_mode)
if (const auto & config = storage_config.remote_cache_config; config.isCacheEnabled() && enable_remote_cache)
{
config.initCacheDir();
FileCache::initialize(
Expand All @@ -781,6 +782,11 @@ try
// here so startup-time values like dt_filecache_wait_on_downloading_ms take effect immediately
// instead of waiting for a later config reload.
FileCache::instance()->updateConfig(global_context->getSettingsRef());
LOG_INFO(
log,
"Initialized FileCache for disaggregated remote cache (compute_mode={} storage_mode={})",
is_disagg_compute_mode,
is_disagg_storage_mode);
}

/// Determining PageStorage run mode based on current files on disk and storage config.
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Server/StorageConfigParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -732,13 +732,14 @@ UInt64 StorageRemoteCacheConfig::getReservedCapacity() const
return capacity * reserved_rate;
}

std::pair<Strings, std::vector<size_t>> StorageRemoteCacheConfig::getCacheDirInfos(bool is_compute_mode) const
std::pair<Strings, std::vector<size_t>> StorageRemoteCacheConfig::getCacheDirInfos(bool is_disagg_mode) const
{
if (is_compute_mode && isCacheEnabled())
if (is_disagg_mode && isCacheEnabled())
{
return {
Strings{getDTFileCacheDir(), getPageCacheDir()},
std::vector<size_t>{getDTFileCapacity(), getPageCapacity()}};
std::vector<size_t>{getDTFileCapacity(), getPageCapacity()},
};
}
else
{
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/Server/StorageConfigParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,8 @@ struct StorageRemoteCacheConfig
UInt64 getReservedCapacity() const;
void parse(const String & content, const LoggerPtr & log);

std::pair<Strings, std::vector<size_t>> getCacheDirInfos(bool is_compute_mode) const;
// `is_disagg_mode` is true when the process is a disaggregated compute or storage node that may use remote cache.
std::pair<Strings, std::vector<size_t>> getCacheDirInfos(bool is_disagg_mode) const;
};

struct TiFlashStorageConfig
Expand Down
41 changes: 41 additions & 0 deletions dbms/src/Server/tests/gtest_storage_config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1000,6 +1000,47 @@ delta_rate = 1.1
}
CATCH

TEST_F(StorageConfigTest, RemoteCacheDirInfosForDisaggMode)
try
{
auto log = Logger::get("StorageConfigTest.RemoteCacheDirInfosForDisaggMode");
const String config_str = R"(
[storage]
[storage.main]
dir = ["123"]
[storage.remote.cache]
dir = "/tmp/StorageConfigTest/RemoteCacheDirInfosForDisaggMode"
capacity = 10000000
dtfile_level = 11
delta_rate = 0.33
)";

auto config = loadConfigFromString(config_str);
size_t global_capacity_quota = 0;
TiFlashStorageConfig storage;
std::tie(global_capacity_quota, storage) = TiFlashStorageConfig::parseSettings(*config, log);

const auto & cache_config = storage.remote_cache_config;
ASSERT_TRUE(cache_config.isCacheEnabled());

{
const auto [paths, capacities] = cache_config.getCacheDirInfos(false);
ASSERT_TRUE(paths.empty());
ASSERT_TRUE(capacities.empty());
}

{
const auto [paths, capacities] = cache_config.getCacheDirInfos(true);
ASSERT_EQ(paths.size(), 2);
ASSERT_EQ(capacities.size(), 2);
ASSERT_EQ(paths[0], cache_config.getDTFileCacheDir());
ASSERT_EQ(paths[1], cache_config.getPageCacheDir());
ASSERT_EQ(capacities[0], cache_config.getDTFileCapacity());
ASSERT_EQ(capacities[1], cache_config.getPageCapacity());
}
}
CATCH

TEST_F(StorageConfigTest, TempPath)
try
{
Expand Down
3 changes: 1 addition & 2 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore.h
Original file line number Diff line number Diff line change
Expand Up @@ -170,11 +170,10 @@ class DeltaMergeStore
#endif

using TaskQueue = std::queue<BackgroundTask, std::list<BackgroundTask>>;
std::mutex mutex;
TaskQueue light_tasks;
TaskQueue heavy_tasks;

std::mutex mutex;

public:
size_t length()
{
Expand Down
27 changes: 22 additions & 5 deletions dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -279,8 +279,13 @@ SegmentPair DeltaMergeStore::segmentSplit(

LOG_INFO(
log,
"Split - {} - Finish, segment is split into two, old_segment={} new_left={} new_right={}",
"Split - {} - Finish, segment is split into two, reason={} "
"prepare_seconds={:.3f} remote_upload_seconds={:.3f} "
"old_segment={} new_left={} new_right={}",
split_info.is_logical ? "SplitLogical" : "SplitPhysical",
magic_enum::enum_name(reason),
split_info.prepare_seconds,
split_info.remote_upload_seconds,
segment->info(),
new_left->info(),
new_right->info());
Expand Down Expand Up @@ -395,7 +400,9 @@ SegmentPtr DeltaMergeStore::segmentMerge(
});

WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter());
auto merged_stable = Segment::prepareMerge(dm_context, schema_snap, ordered_segments, ordered_snapshots, wbs);
const auto prepare_result
= Segment::prepareMerge(dm_context, schema_snap, ordered_segments, ordered_snapshots, wbs);
auto merged_stable = prepare_result.stable;
wbs.writeLogAndData();
merged_stable->enableDMFilesGC(dm_context);

Expand Down Expand Up @@ -437,9 +444,13 @@ SegmentPtr DeltaMergeStore::segmentMerge(

LOG_INFO(
log,
"Merge - Finish, {} segments are merged into one, reason={} merged={} segments_to_merge={}",
"Merge - Finish, {} segments are merged into one, reason={} "
"prepare_seconds={:.3f} remote_upload_seconds={:.3f} "
"merged={} segments_to_merge={}",
ordered_segments.size(),
magic_enum::enum_name(reason),
prepare_result.prepare_seconds,
prepare_result.remote_upload_seconds,
merged->info(),
Segment::info(ordered_segments));
}
Expand Down Expand Up @@ -1273,7 +1284,8 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

WriteBatches wbs(*storage_pool, dm_context.getWriteLimiter());

auto new_stable = segment->prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs);
const auto prepare_result = segment->prepareMergeDelta(dm_context, schema_snap, segment_snap, wbs);
auto new_stable = prepare_result.stable;
wbs.writeLogAndData();
new_stable->enableDMFilesGC(dm_context);

Expand Down Expand Up @@ -1307,7 +1319,12 @@ SegmentPtr DeltaMergeStore::segmentMergeDelta(

LOG_INFO(
log,
"MergeDelta - Finish, delta is merged, old_segment={} new_segment={}",
"MergeDelta - Finish, delta is merged, reason={} "
"prepare_seconds={:.3f} remote_upload_seconds={:.3f} "
"old_segment={} new_segment={}",
magic_enum::enum_name(reason),
prepare_result.prepare_seconds,
prepare_result.remote_upload_seconds,
segment->info(),
new_segment->info());
}
Expand Down
12 changes: 12 additions & 0 deletions dbms/src/Storages/DeltaMerge/File/DMFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ class DMFileMetaV2Test;
class DMStoreForSegmentReadTaskTest;
} // namespace tests

struct LocalReadObject;
std::vector<LocalReadObject> collectMetaV2MergedFilesForLocalRead(
const DMFilePtr & dmfile,
const ColumnDefines & read_columns,
const LoggerPtr & log,
const String & tracing_id);

class DMFile : private boost::noncopyable
{
Expand Down Expand Up @@ -132,6 +138,7 @@ class DMFile : private boost::noncopyable
const DMFileMeta::PackProperties & getPackProperties() const { return meta->pack_properties; }
const ColumnStats & getColumnStats() const { return meta->column_stats; }
const std::unordered_set<ColId> & getColumnIndices() const { return meta->column_indices; }
size_t getNumColumns() const { return meta->column_stats.size(); }

// only used in gtest
void clearPackProperties() const { meta->pack_properties.clear_property(); }
Expand Down Expand Up @@ -361,6 +368,11 @@ class DMFile : private boost::noncopyable
friend class ColumnReadStream;
friend class DMFilePackFilter;
friend class DMFileBlockInputStreamBuilder;
friend std::vector<LocalReadObject> collectMetaV2MergedFilesForLocalRead(
const DMFilePtr & dmfile,
const ColumnDefines & read_columns,
const LoggerPtr & log,
const String & tracing_id);
friend class tests::DMFileTest;
friend class tests::DMFileMetaV2Test;
friend class tests::DMStoreForSegmentReadTaskTest;
Expand Down
11 changes: 10 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#include <Common/config.h> // For ENABLE_CLARA
#include <Interpreters/Context.h>
#include <Storages/DeltaMerge/File/DMFileBlockInputStream.h>
#include <Storages/DeltaMerge/File/DMFileLocalStaging.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Perf.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Reader.h>
#include <Storages/DeltaMerge/Index/VectorIndex/Stream/Ctx.h>
Expand Down Expand Up @@ -91,6 +92,13 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::buildNoLocalIndex(
tracing_id);
}

auto local_read_files = tryDownloadMetaV2MergedFilesForLocalRead(
dmfile,
read_columns,
enable_write_filecache_local_read,
Logger::get(tracing_id),
tracing_id);

DMFileReader reader(
dmfile,
read_columns,
Expand All @@ -111,7 +119,8 @@ DMFileBlockInputStreamPtr DMFileBlockInputStreamBuilder::buildNoLocalIndex(
tracing_id,
max_sharing_column_bytes_for_all,
scan_context,
read_tag);
read_tag,
std::move(local_read_files));

return std::make_shared<DMFileBlockInputStream>(std::move(reader), max_sharing_column_bytes_for_all > 0);
}
Expand Down
8 changes: 7 additions & 1 deletion dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -192,6 +192,12 @@ class DMFileBlockInputStreamBuilder
return *this;
}

DMFileBlockInputStreamBuilder & enableWriteFileCacheLocalRead(bool enable)
{
enable_write_filecache_local_read = enable;
return *this;
}

private:
DMFileBlockInputStreamPtr buildNoLocalIndex(
const DMFilePtr & dmfile,
Expand Down Expand Up @@ -234,8 +240,8 @@ class DMFileBlockInputStreamBuilder
private:
FileProviderPtr file_provider;

bool enable_write_filecache_local_read = false;
// clean read

bool enable_handle_clean_read = false;
bool is_fast_scan = false;
bool enable_del_clean_read = false;
Expand Down
Loading