diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 87c620f4200..b41ee1b5c82 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -22,6 +22,7 @@ #include "cluster/slot_import.h" #include "cluster/sync_migrate_context.h" #include "commander.h" +#include "common/time_util.h" #include "error_constants.h" #include "status.h" @@ -243,8 +244,25 @@ class CommandClusterX : public Commander { return CommandTable::ParseSlotRanges(args.back(), slot_ranges_); } + // CLUSTERX HEARTBEAT + if (subcommand_ == "heartbeat") { + if (args.size() != 5) return {Status::RedisParseErr, errWrongNumOfArguments}; + master_node_id_ = args[2]; + + auto parse_lease_ms = ParseInt(args[3], 10); + if (!parse_lease_ms) return {Status::RedisParseErr, "lease_ms is not an integer or out of range"}; + if (*parse_lease_ms == 0) return {Status::RedisParseErr, "invalid lease_ms: must be greater than 0"}; + lease_ms_ = *parse_lease_ms; + + auto parse_election_version = ParseInt(args[4], 10); + if (!parse_election_version) + return {Status::RedisParseErr, "election_version is not an integer or out of range"}; + election_version_ = *parse_election_version; + return Status::OK(); + } + return {Status::RedisParseErr, - "CLUSTERX command, CLUSTERX VERSION|MYID|SETNODEID|SETNODES|SETSLOT|MIGRATE|FLUSHSLOTS"}; + "CLUSTERX command, CLUSTERX VERSION|MYID|SETNODEID|SETNODES|SETSLOT|MIGRATE|FLUSHSLOTS|HEARTBEAT"}; } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { @@ -310,6 +328,22 @@ class CommandClusterX : public Commander { } *output = redis::RESP_OK; + } else if (subcommand_ == "heartbeat") { + // Only renew lease if master_node_id matches our own node id and we are a cluster master. + // Slave nodes and nodes with a different id fall through and return normal node info. + if (master_node_id_ == srv->cluster->GetMyId() && !srv->cluster->IsNotMaster()) { + uint64_t local_ver = srv->storage->GetLocalElectionVersion(); + if (election_version_ < local_ver) { + // Stale controller: do not renew the lease. Return an error so the controller can investigate. + return {Status::RedisExecErr, + fmt::format("election version mismatch: local={}, received={}", local_ver, election_version_)}; + } + srv->storage->UpdateLease(election_version_, util::GetTimeStampMS() + lease_ms_); + } + // Return the same node info format as the INFO command (Replication + Keyspace sections) + // so the controller can reuse parseClusterNodeInfo() for both paths. + // Only two sections are returned to keep the response lightweight. + *output = conn->VerbatimString("txt", srv->GetInfo(conn->GetNamespace(), {"Replication", "Keyspace"})); } else { return {Status::RedisExecErr, "Invalid cluster command options"}; } @@ -330,6 +364,11 @@ class CommandClusterX : public Commander { bool sync_migrate_ = false; int sync_migrate_timeout_ = 0; std::unique_ptr sync_migrate_ctx_ = nullptr; + + // HEARTBEAT fields + std::string master_node_id_; + uint64_t lease_ms_ = 0; + uint64_t election_version_ = 0; }; static uint64_t GenerateClusterFlag(uint64_t flags, const std::vector &args) { diff --git a/src/config/config.cc b/src/config/config.cc index 31694809138..e139d45bfb1 100644 --- a/src/config/config.cc +++ b/src/config/config.cc @@ -93,6 +93,12 @@ const std::vector> cache_types{[] { const std::vector> migration_types{{"redis-command", MigrationType::kRedisCommand}, {"raw-key-value", MigrationType::kRawKeyValue}}; +const std::vector> master_lease_modes{ + {"disabled", MasterLeaseMode::kDisabled}, + {"log-only", MasterLeaseMode::kLogOnly}, + {"block-write", MasterLeaseMode::kBlockWrite}, +}; + std::string TrimRocksDbPrefix(std::string s) { constexpr std::string_view prefix = "rocksdb."; if (!util::StartsWithICase(s, prefix)) return s; @@ -243,6 +249,8 @@ Config::Config() { {"json-max-nesting-depth", false, new IntField(&json_max_nesting_depth, 1024, 0, INT_MAX)}, {"json-storage-format", false, new EnumField(&json_storage_format, json_storage_formats, JsonStorageFormat::JSON)}, + {"master-lease-mode", false, + new EnumField(&master_lease_mode, master_lease_modes, MasterLeaseMode::kDisabled)}, {"txn-context-enabled", true, new YesNoField(&txn_context_enabled, false)}, {"skip-block-cache-deallocation-on-close", false, new YesNoField(&skip_block_cache_deallocation_on_close, false)}, {"histogram-bucket-boundaries", true, new StringField(&histogram_bucket_boundaries_str_, "")}, diff --git a/src/config/config.h b/src/config/config.h index 675bbd34c24..805f0f1be9a 100644 --- a/src/config/config.h +++ b/src/config/config.h @@ -74,6 +74,8 @@ const std::vector> slowlog_dump_logfile_le enum class BlockCacheType { kCacheTypeLRU = 0, kCacheTypeHCC }; +enum class MasterLeaseMode { kDisabled, kLogOnly, kBlockWrite }; + struct CLIOptions { std::string conf_file; std::vector> cli_options; @@ -196,6 +198,9 @@ struct Config { int json_max_nesting_depth = 1024; JsonStorageFormat json_storage_format = JsonStorageFormat::JSON; + // master lease + MasterLeaseMode master_lease_mode = MasterLeaseMode::kDisabled; + // Enable transactional mode in engine::Context bool txn_context_enabled = false; diff --git a/src/server/server.cc b/src/server/server.cc index cc1f5b67793..6df7ebb3a28 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -297,6 +297,11 @@ Status Server::AddMaster(const std::string &host, uint32_t port, bool force_reco if (GetConfig()->master_use_repl_port) master_listen_port += 1; replication_thread_ = std::make_unique(host, master_listen_port, this); + // Reset lease before starting the replication thread so that any local writes occurring + // between Start() and the next HEARTBEAT are not blocked by a previously-expired lease. + // (Replication writes bypass writeToDB() entirely and are unaffected, but internal writes + // such as cron tasks could be blocked if the old lease is expired.) + storage->ResetLease(); auto s = replication_thread_->Start([this]() { return PrepareRestoreDB(); }, [this]() { this->is_loading_ = false; diff --git a/src/storage/storage.cc b/src/storage/storage.cc index ca0f17daabc..1b994a4d6dc 100644 --- a/src/storage/storage.cc +++ b/src/storage/storage.cc @@ -719,6 +719,23 @@ rocksdb::Status Storage::Write(engine::Context &ctx, const rocksdb::WriteOptions rocksdb::Status Storage::writeToDB(engine::Context &ctx, const rocksdb::WriteOptions &options, rocksdb::WriteBatch *updates) { + // Master lease check: applied here so it covers both Storage::Write() and CommitTxn(). + // Only active when master_lease_mode != disabled. Read mode once to avoid TOCTOU. + auto lease_mode = config_->master_lease_mode; + if (lease_mode != MasterLeaseMode::kDisabled) { + uint64_t deadline = lease_deadline_ms_.load(std::memory_order_relaxed); + // deadline == 0 means cold start (never renewed): writes always allowed. + if (deadline > 0 && util::GetTimeStampMS() > deadline) { + if (lease_mode == MasterLeaseMode::kBlockWrite) { + return rocksdb::Status::Aborted( + "Write rejected: master lease expired (master_lease_mode=block-write)"); + } else { // kLogOnly + LOG(ERROR) << "Master lease expired but write allowed (master_lease_mode=log-only)"; + // TODO: increment stats counter lease_expired_writes + } + } + } + // No point trying to commit an empty write batch: in fact this will fail on read-only DBs // even if the write batch is empty. if (updates->Count() == 0) { diff --git a/src/storage/storage.h b/src/storage/storage.h index 69afeff2829..e4c853b3bc7 100644 --- a/src/storage/storage.h +++ b/src/storage/storage.h @@ -313,6 +313,24 @@ class Storage { Status CommitTxn(); ObserverOrUniquePtr GetWriteBatchBase(); + // Master lease management. Thread-safe; called from CLUSTERX HEARTBEAT handler. + void UpdateLease(uint64_t election_version, uint64_t deadline_ms) { + // Both stores use memory_order_relaxed. The two atomics are independent: + // writeToDB() reads only lease_deadline_ms_ (not local_election_version_), + // so no cross-thread ordering constraint exists between them. + // In the rare concurrent-HEARTBEAT case, the last writer wins; the legitimate + // controller will overwrite within one probe cycle (bounded by lease_ms). + lease_deadline_ms_.store(deadline_ms, std::memory_order_relaxed); + local_election_version_.store(election_version, std::memory_order_relaxed); + } + void ResetLease() { + // Clear deadline first to ensure concurrent readers stop renewing immediately. + lease_deadline_ms_.store(0, std::memory_order_relaxed); + local_election_version_.store(0, std::memory_order_relaxed); + } + uint64_t GetLeaseDeadlineMs() const { return lease_deadline_ms_.load(std::memory_order_relaxed); } + uint64_t GetLocalElectionVersion() const { return local_election_version_.load(std::memory_order_relaxed); } + Storage(const Storage &) = delete; Storage &operator=(const Storage &) = delete; @@ -389,6 +407,11 @@ class Storage { // is_txn_mode_ is used to determine whether the current Storage is in transactional mode, // .i.e, in "EXEC" command(CommandExec). std::atomic is_txn_mode_ = false; + // Master lease: tracks the deadline (ms timestamp) until which this node holds the lease. + // 0 = never renewed (cold start), writes always allowed in that case. + std::atomic lease_deadline_ms_{0}; + // Tracks the election term version from the last HEARTBEAT renewal; used to reject stale renewals. + std::atomic local_election_version_{0}; // txn_write_batch_ is used as the global write batch for the transaction mode, // all writes will be grouped in this write batch when entering the transaction mode, // then write it at once when committing. diff --git a/tests/cppunit/config_test.cc b/tests/cppunit/config_test.cc index b7d931be461..2862c60a9bb 100644 --- a/tests/cppunit/config_test.cc +++ b/tests/cppunit/config_test.cc @@ -263,3 +263,26 @@ TEST(Config, DisableL0Slowdown) { ASSERT_TRUE(slowdown_is(50)); ASSERT_TRUE(stop_is(50)); } + +TEST(Config, MasterLeaseMode) { + Config config; + // Default is disabled + EXPECT_EQ(config.master_lease_mode, MasterLeaseMode::kDisabled); + + // Set via string + auto s = config.Set(nullptr, "master-lease-mode", "log-only"); + ASSERT_TRUE(s.IsOK()); + EXPECT_EQ(config.master_lease_mode, MasterLeaseMode::kLogOnly); + + s = config.Set(nullptr, "master-lease-mode", "block-write"); + ASSERT_TRUE(s.IsOK()); + EXPECT_EQ(config.master_lease_mode, MasterLeaseMode::kBlockWrite); + + s = config.Set(nullptr, "master-lease-mode", "disabled"); + ASSERT_TRUE(s.IsOK()); + EXPECT_EQ(config.master_lease_mode, MasterLeaseMode::kDisabled); + + // Invalid value + s = config.Set(nullptr, "master-lease-mode", "invalid"); + EXPECT_FALSE(s.IsOK()); +} diff --git a/tests/cppunit/lease_test.cc b/tests/cppunit/lease_test.cc new file mode 100644 index 00000000000..72315b31a58 --- /dev/null +++ b/tests/cppunit/lease_test.cc @@ -0,0 +1,124 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include + +#include + +#include "common/time_util.h" + +// Standalone tests for lease atomic logic, mirroring Storage's internal behavior. +// NOTE: Storage cannot be instantiated without RocksDB, so these tests verify the +// identical atomic logic directly. This is an intentional tradeoff: if Storage's +// method implementations diverge from this logic, these tests won't catch it. + +TEST(Lease, UpdateLeaseStoresCorrectValues) { + std::atomic deadline{0}; + std::atomic version{0}; + + uint64_t now = util::GetTimeStampMS(); + uint64_t lease_ms = 2000; + // Mirrors UpdateLease(5, now + lease_ms): deadline written first, then version + deadline.store(now + lease_ms, std::memory_order_relaxed); + version.store(5, std::memory_order_relaxed); + + EXPECT_EQ(version.load(std::memory_order_relaxed), 5U); + EXPECT_GE(deadline.load(std::memory_order_relaxed), now + lease_ms - 10); + EXPECT_LE(deadline.load(std::memory_order_relaxed), now + lease_ms + 10); +} + +TEST(Lease, ResetLeaseZerosDeadlineAndVersion) { + std::atomic deadline{12345}; + std::atomic version{99}; + + // Mirrors ResetLease(): deadline cleared first, then version + deadline.store(0, std::memory_order_relaxed); + version.store(0, std::memory_order_relaxed); + + EXPECT_EQ(deadline.load(std::memory_order_relaxed), 0U); + EXPECT_EQ(version.load(std::memory_order_relaxed), 0U); +} + +TEST(Lease, DeadlineZeroMeansNeverExpired) { + // deadline == 0 is the cold-start state: writes always allowed + std::atomic deadline{0}; + uint64_t now = util::GetTimeStampMS(); + // Simulate the check in Storage::Write() + bool expired = (deadline.load(std::memory_order_relaxed) > 0 && now > deadline.load(std::memory_order_relaxed)); + EXPECT_FALSE(expired); +} + +TEST(Lease, DeadlineInFutureNotExpired) { + std::atomic deadline{0}; + uint64_t now = util::GetTimeStampMS(); + deadline.store(now + 5000, std::memory_order_relaxed); + bool expired = (deadline.load(std::memory_order_relaxed) > 0 && now > deadline.load(std::memory_order_relaxed)); + EXPECT_FALSE(expired); +} + +TEST(Lease, DeadlineInPastExpired) { + std::atomic deadline{0}; + uint64_t now = util::GetTimeStampMS(); + deadline.store(now - 1000, std::memory_order_relaxed); // 1 second ago + bool expired = (deadline.load(std::memory_order_relaxed) > 0 && now > deadline.load(std::memory_order_relaxed)); + EXPECT_TRUE(expired); +} + +TEST(Lease, ElectionVersionGuard) { + // Mirrors the version guard in CLUSTERX HEARTBEAT Execute(): + // if received_version >= local_version -> renew; else -> reject + std::atomic local_ver{10}; + std::atomic deadline{0}; + + // Case: received version >= local -> renew + uint64_t received = 10; + if (received >= local_ver.load(std::memory_order_relaxed)) { + deadline.store(util::GetTimeStampMS() + 2000, std::memory_order_relaxed); + local_ver.store(received, std::memory_order_relaxed); + } + EXPECT_EQ(local_ver.load(std::memory_order_relaxed), 10U); + EXPECT_GT(deadline.load(std::memory_order_relaxed), 0U); + + // Case: received version < local -> no renew + deadline.store(0, std::memory_order_relaxed); + received = 9; + if (received >= local_ver.load(std::memory_order_relaxed)) { + deadline.store(util::GetTimeStampMS() + 2000, std::memory_order_relaxed); + local_ver.store(received, std::memory_order_relaxed); + } + EXPECT_EQ(deadline.load(std::memory_order_relaxed), 0U); // not renewed + EXPECT_EQ(local_ver.load(std::memory_order_relaxed), 10U); // unchanged +} + +TEST(Lease, ResetOnRoleTransition) { + // Simulate: node had a lease as master, then became slave via SLAVEOF + std::atomic deadline{util::GetTimeStampMS() + 5000}; + std::atomic version{42}; + + // Simulate SLAVEOF path: ResetLease() + deadline.store(0, std::memory_order_relaxed); + version.store(0, std::memory_order_relaxed); + + // After reset, the Write() check should treat it as cold start (writes allowed) + uint64_t now = util::GetTimeStampMS(); + bool expired = (deadline.load(std::memory_order_relaxed) > 0 && now > deadline.load(std::memory_order_relaxed)); + EXPECT_FALSE(expired); // deadline==0 means not expired + EXPECT_EQ(version.load(std::memory_order_relaxed), 0U); +} diff --git a/tests/gocase/integration/cluster/lease_test.go b/tests/gocase/integration/cluster/lease_test.go new file mode 100644 index 00000000000..6589cafcc83 --- /dev/null +++ b/tests/gocase/integration/cluster/lease_test.go @@ -0,0 +1,269 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +package cluster + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/apache/kvrocks/tests/gocase/util" +) + +const ( + testNodeID = "07c37dfeb235213a872192d90877d0cd55635b91" + // A short lease so tests don't have to wait long for expiry. + shortLeaseMs = 200 +) + +// initClusterMaster sets up a single-node cluster in master mode and returns a helper +// function that builds the clusterx SETNODES argument for that node. +func initClusterMaster(t *testing.T, srv *util.KvrocksServer) { + t.Helper() + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODEID", testNodeID).Err()) + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", testNodeID, srv.Host(), srv.Port()) + require.NoError(t, rdb.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) +} + +// TestLeaseDisabledMode verifies that master_lease_mode=disabled (default) has no effect. +func TestLeaseDisabledMode(t *testing.T) { + t.Parallel() + srv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "master-lease-mode": "disabled", + }) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + initClusterMaster(t, srv) + + // Renew once with a very short lease. + require.NoError(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, shortLeaseMs, 1).Err()) + + // Wait for the lease to expire. + time.Sleep(time.Duration(shortLeaseMs+100) * time.Millisecond) + + // Writes should still succeed because mode is disabled. + require.NoError(t, rdb.Set(ctx, "key-disabled", "value", 0).Err()) +} + +// TestLeaseBlockWriteMode verifies that writes are rejected after lease expiry in block-write mode. +func TestLeaseBlockWriteMode(t *testing.T) { + t.Parallel() + srv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "master-lease-mode": "block-write", + }) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + initClusterMaster(t, srv) + + t.Run("cold start: writes allowed before first HEARTBEAT", func(t *testing.T) { + require.NoError(t, rdb.Set(ctx, "cold-key", "value", 0).Err()) + }) + + t.Run("write succeeds when lease is valid", func(t *testing.T) { + // Renew with a generous lease. + require.NoError(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, 5000, 1).Err()) + require.NoError(t, rdb.Set(ctx, "valid-key", "value", 0).Err()) + }) + + t.Run("write rejected after lease expiry", func(t *testing.T) { + // Renew with a very short lease. + require.NoError(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, shortLeaseMs, 1).Err()) + // Wait for the lease to expire. + time.Sleep(time.Duration(shortLeaseMs+100) * time.Millisecond) + // Write should be rejected with the specific lease-expired error. + err := rdb.Set(ctx, "expired-key", "value", 0).Err() + require.Error(t, err) + require.Contains(t, err.Error(), "master lease expired") + }) + + t.Run("write succeeds again after lease is renewed", func(t *testing.T) { + // Renew again with a generous lease. + require.NoError(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, 5000, 1).Err()) + require.NoError(t, rdb.Set(ctx, "renewed-key", "value", 0).Err()) + }) +} + +// TestLeaseLogOnlyMode verifies that writes succeed after expiry in log-only mode. +func TestLeaseLogOnlyMode(t *testing.T) { + t.Parallel() + srv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "master-lease-mode": "log-only", + }) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + initClusterMaster(t, srv) + + // Renew with a very short lease. + require.NoError(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, shortLeaseMs, 1).Err()) + // Wait for the lease to expire. + time.Sleep(time.Duration(shortLeaseMs+100) * time.Millisecond) + + // Write should succeed even though the lease is expired. + require.NoError(t, rdb.Set(ctx, "log-only-key", "value", 0).Err()) +} + +// TestHeartbeatElectionVersionMismatch verifies that a stale election_version is rejected. +func TestHeartbeatElectionVersionMismatch(t *testing.T) { + t.Parallel() + srv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "master-lease-mode": "block-write", + }) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + initClusterMaster(t, srv) + + // First, renew with version 5 to establish local_election_version = 5. + require.NoError(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, 5000, 5).Err()) + + // Now send a stale version (< 5). The lease must not be renewed and an error returned. + err := rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, 5000, 4).Err() + require.Error(t, err) + require.Contains(t, err.Error(), "election version mismatch") +} + +// TestHeartbeatNonMatchingNodeID verifies that HEARTBEAT with a mismatched master_node_id +// does not renew the lease but returns a normal info response (no error). +func TestHeartbeatNonMatchingNodeID(t *testing.T) { + t.Parallel() + srv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "master-lease-mode": "block-write", + }) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + initClusterMaster(t, srv) + + // Use a different node ID so the node does not renew its lease. + differentNodeID := "aabbccddeeff00112233445566778899aabbccdd" + result, err := rdb.Do(ctx, "clusterx", "HEARTBEAT", differentNodeID, 5000, 1).Text() + require.NoError(t, err) + // Normal INFO response returned; no lease renewal. + require.Contains(t, result, "role:") + + // Wait for any hypothetical lease to expire (there should be none since deadline == 0). + time.Sleep(time.Duration(shortLeaseMs+100) * time.Millisecond) + + // Writes should still succeed because no lease was renewed (cold start: deadline == 0). + require.NoError(t, rdb.Set(ctx, "no-lease-key", "value", 0).Err()) +} + +// TestHeartbeatLeaseResetOnSlaveOf verifies that becoming a slave resets the lease so that +// replication writes are not blocked. +func TestHeartbeatLeaseResetOnSlaveOf(t *testing.T) { + t.Parallel() + srv := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "master-lease-mode": "block-write", + }) + defer srv.Close() + + masterSrv := util.StartServer(t, map[string]string{}) + defer masterSrv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + masterRdb := masterSrv.NewClient() + defer func() { require.NoError(t, masterRdb.Close()) }() + + initClusterMaster(t, srv) + + // Establish a short-lived lease on the node (which is currently a cluster master). + require.NoError(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, shortLeaseMs, 1).Err()) + // Wait for the lease to expire so block-write mode is active. + time.Sleep(time.Duration(shortLeaseMs+100) * time.Millisecond) + err := rdb.Set(ctx, "pre-slaveof-key", "blocked-value", 0).Err() + require.Error(t, err) + require.Contains(t, err.Error(), "master lease expired") + + // Demote the node to slave. This must reset the lease. + util.SlaveOf(t, rdb, masterSrv) + util.WaitForSync(t, rdb) + + // After becoming a slave, the lease is reset (deadline == 0). Replication writes + // go through writeToDB() and must not be blocked. We verify by writing to master and + // confirming the replica syncs successfully. + require.NoError(t, masterRdb.Set(ctx, "master-key", "replicated-value", 0).Err()) + util.WaitForOffsetSync(t, masterRdb, rdb, 5*time.Second) +} + +// TestHeartbeatInvalidArgs verifies validation of HEARTBEAT arguments. +func TestHeartbeatInvalidArgs(t *testing.T) { + t.Parallel() + srv := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer srv.Close() + + ctx := context.Background() + rdb := srv.NewClient() + defer func() { require.NoError(t, rdb.Close()) }() + + initClusterMaster(t, srv) + + t.Run("missing arguments", func(t *testing.T) { + require.Error(t, rdb.Do(ctx, "clusterx", "HEARTBEAT").Err()) + require.Error(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID).Err()) + require.Error(t, rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, 1000).Err()) + }) + + t.Run("lease_ms zero", func(t *testing.T) { + err := rdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, 0, 1).Err() + require.Error(t, err) + require.Contains(t, err.Error(), "invalid lease_ms") + }) + + t.Run("cluster not enabled", func(t *testing.T) { + noClusterSrv := util.StartServer(t, map[string]string{}) + defer noClusterSrv.Close() + noClusterRdb := noClusterSrv.NewClient() + defer func() { require.NoError(t, noClusterRdb.Close()) }() + require.ErrorContains(t, noClusterRdb.Do(ctx, "clusterx", "HEARTBEAT", testNodeID, 1000, 1).Err(), "not enabled") + }) +}