diff --git a/src/cluster/cluster.cc b/src/cluster/cluster.cc index 20fe3d36293..8b8e7badbb5 100644 --- a/src/cluster/cluster.cc +++ b/src/cluster/cluster.cc @@ -29,6 +29,7 @@ #include #include "cluster/cluster_defs.h" +#include "cluster/cluster_failover.h" #include "commands/commander.h" #include "common/io_util.h" #include "fmt/format.h" @@ -221,6 +222,9 @@ Status Cluster::SetClusterNodes(const std::string &nodes_str, int64_t version, b // Clear migrated and imported slot info migrated_slots_.clear(); imported_slots_.clear(); + if (srv_->cluster_failover) { + srv_->cluster_failover->ResetFailoverState(); + } return Status::OK(); } @@ -447,6 +451,12 @@ Status Cluster::GetClusterInfo(std::string *cluster_infos) { std::string import_infos; srv_->slot_import->GetImportInfo(&import_infos); *cluster_infos += import_infos; + + if (srv_->cluster_failover) { + std::string failover_info; + srv_->cluster_failover->GetFailoverInfo(&failover_info); + *cluster_infos += failover_info; + } } return Status::OK(); @@ -898,6 +908,10 @@ Status Cluster::CanExecByMySelf(const redis::CommandAttributes *attributes, cons uint64_t flags = attributes->GenerateFlags(cmd_tokens, *srv_->GetConfig()); + if (srv_->cluster_failover && srv_->cluster_failover->IsWriteForbidden() && (flags & redis::kCmdWrite)) { + return {Status::RedisTryAgain, "Failover in progress"}; + } + if (myself_ && myself_ == slots_nodes_[slot]) { // We use central controller to manage the topology of the cluster. // Server can't change the topology directly, so we record the migrated slots @@ -976,3 +990,51 @@ Status Cluster::Reset() { unlink(srv_->GetConfig()->NodesFilePath().data()); return Status::OK(); } + +StatusOr> Cluster::GetNodeIPPort(const std::string &node_id) { + auto it = nodes_.find(node_id); + if (it == nodes_.end()) { + return {Status::NotOK, "Node not found"}; + } + return std::make_pair(it->second->host, it->second->port); +} + +Status Cluster::OnTakeOver() { + info("[Failover] OnTakeOver received myself_: {}", myself_ ? myself_->id : "null"); + if (!myself_) { + return {Status::NotOK, "Cluster is not initialized"}; + } + if (myself_->role == kClusterMaster) { + info("[Failover] OnTakeOver myself_ is master, return"); + return Status::OK(); + } + + std::string old_master_id = myself_->master_id; + if (old_master_id.empty()) { + info("[Failover] OnTakeOver no master to takeover, return"); + return {Status::NotOK, "No master to takeover"}; + } + + for (int i = 0; i < kClusterSlots; i++) { + if (slots_nodes_[i] && slots_nodes_[i]->id == old_master_id) { + imported_slots_.insert(i); + } + } + info("[Failover] OnTakeOver Success "); + return Status::OK(); +} + +void Cluster::SetMySlotsMigrated(const std::string &dst_ip_port) { + // It is called by failover thread. + auto exclusivity = srv_->WorkExclusivityGuard(); + + for (int i = 0; i < kClusterSlots; i++) { + if (slots_nodes_[i] == myself_) { + migrated_slots_[i] = dst_ip_port; + } + } +} + +bool Cluster::IsSlotImported(int slot) const { + return imported_slots_.count(slot) > 0; +} diff --git a/src/cluster/cluster.h b/src/cluster/cluster.h index 468c154d4d8..3fbe6ae770a 100644 --- a/src/cluster/cluster.h +++ b/src/cluster/cluster.h @@ -93,6 +93,11 @@ class Cluster { Status DumpClusterNodes(const std::string &file); Status LoadClusterNodes(const std::string &file_path); Status Reset(); + Status OnTakeOver(); + + StatusOr> GetNodeIPPort(const std::string &node_id); + void SetMySlotsMigrated(const std::string &dst_ip_port); + bool IsSlotImported(int slot) const; static bool SubCommandIsExecExclusive(const std::string &subcommand); diff --git a/src/cluster/cluster_failover.cc b/src/cluster/cluster_failover.cc new file mode 100644 index 00000000000..934a8e333a8 --- /dev/null +++ b/src/cluster/cluster_failover.cc @@ -0,0 +1,293 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#include "cluster_failover.h" + +#include + +#include "cluster/cluster.h" +#include "common/io_util.h" +#include "common/time_util.h" +#include "logging.h" +#include "server/redis_reply.h" +#include "server/server.h" + +ClusterFailover::ClusterFailover(Server *srv) : srv_(srv) { + t_ = std::thread([this]() { loop(); }); +} + +ClusterFailover::~ClusterFailover() { + { + std::lock_guard lock(mutex_); + stop_thread_ = true; + cv_.notify_all(); + } + if (t_.joinable()) t_.join(); +} + +Status ClusterFailover::Run(std::string slave_node_id, int timeout_ms) { + std::lock_guard lock(mutex_); + if (state_ != FailoverState::kNone && state_ != FailoverState::kFailed) { + return {Status::NotOK, "Failover is already in progress"}; + } + + if (srv_->IsSlave()) { + return {Status::NotOK, "Current node is a slave, can't failover"}; + } + + slave_node_id_ = std::move(slave_node_id); + timeout_ms_ = timeout_ms; + state_ = FailoverState::kStarted; + failover_job_triggered_ = true; + cv_.notify_all(); + return Status::OK(); +} + +void ClusterFailover::loop() { + while (true) { + std::unique_lock lock(mutex_); + cv_.wait(lock, [this]() { return stop_thread_ || failover_job_triggered_; }); + + if (stop_thread_) return; + + if (failover_job_triggered_) { + failover_job_triggered_ = false; + lock.unlock(); + runFailoverProcess(); + } + } +} + +void ClusterFailover::runFailoverProcess() { + auto ip_port = srv_->cluster->GetNodeIPPort(slave_node_id_); + if (!ip_port.IsOK()) { + error("[Failover] slave node not found in cluster {}", slave_node_id_); + abortFailover("Slave node not found in cluster"); + return; + } + node_ip_port_ = ip_port.GetValue().first + ":" + std::to_string(ip_port.GetValue().second); + node_ip_ = ip_port.GetValue().first; + node_port_ = ip_port.GetValue().second; + info("[Failover] slave node {} {} failover state: {}", slave_node_id_, node_ip_port_, static_cast(state_.load())); + state_ = FailoverState::kCheck; + + auto s = checkSlaveStatus(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + + s = checkSlaveLag(); + if (!s.IsOK()) { + abortFailover("Slave lag check failed: " + s.Msg()); + return; + } + + info("[Failover] slave node {} {} check slave status success, enter pause state", slave_node_id_, node_ip_port_); + start_time_ms_ = util::GetTimeStampMS(); + // Enter Pause state (Stop writing) + state_ = FailoverState::kPause; + // Get current sequence + target_seq_ = srv_->storage->LatestSeqNumber(); + info("[Failover] slave node {} {} target sequence {}", slave_node_id_, node_ip_port_, target_seq_); + + state_ = FailoverState::kSyncWait; + s = waitReplicationSync(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + info("[Failover] slave node {} {} wait replication sync success, enter switch state, cost {} ms", slave_node_id_, + node_ip_port_, util::GetTimeStampMS() - start_time_ms_); + + state_ = FailoverState::kSwitch; + s = sendTakeoverCmd(); + if (!s.IsOK()) { + abortFailover(s.Msg()); + return; + } + + // Redirect slots + srv_->cluster->SetMySlotsMigrated(node_ip_port_); + + state_ = FailoverState::kSuccess; + info("[Failover] success {} {}", slave_node_id_, node_ip_port_); +} + +Status ClusterFailover::checkSlaveLag() { + auto start_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!start_offset_status.IsOK()) { + return {Status::NotOK, "Failed to get slave offset: " + start_offset_status.Msg()}; + } + uint64_t start_offset = *start_offset_status; + int64_t start_sampling_ms = util::GetTimeStampMS(); + + // Wait 3s or half of timeout, but at least a bit to measure speed + int64_t wait_time = std::max(100, std::min(3000, timeout_ms_ / 2)); + std::this_thread::sleep_for(std::chrono::milliseconds(wait_time)); + + auto end_offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!end_offset_status.IsOK()) { + return {Status::NotOK, "Failed to get slave offset: " + end_offset_status.Msg()}; + } + uint64_t end_offset = *end_offset_status; + int64_t end_sampling_ms = util::GetTimeStampMS(); + + double elapsed_sec = (end_sampling_ms - start_sampling_ms) / 1000.0; + if (elapsed_sec <= 0) elapsed_sec = 0.001; + + uint64_t bytes = 0; + if (end_offset > start_offset) bytes = end_offset - start_offset; + double speed = bytes / elapsed_sec; + + uint64_t master_seq = srv_->storage->LatestSeqNumber(); + uint64_t lag = 0; + if (master_seq > end_offset) lag = master_seq - end_offset; + + if (lag == 0) return Status::OK(); + + if (speed <= 0.1) { // Basically 0 + return {Status::NotOK, fmt::format("Slave is not replicating (lag: {})", lag)}; + } + + double required_sec = lag / speed; + int64_t required_ms = static_cast(required_sec * 1000); + + int64_t elapsed_total = end_sampling_ms - start_sampling_ms; + int64_t remaining = timeout_ms_ - elapsed_total; + + if (required_ms > remaining) { + return {Status::NotOK, fmt::format("Estimated catchup time {}ms > remaining time {}ms (lag: {}, speed: {:.2f}/s)", + required_ms, remaining, lag, speed)}; + } + + info("[Failover] check: lag={}, speed={:.2f}/s, estimated_time={}ms, remaining={}ms", lag, speed, required_ms, + remaining); + return Status::OK(); +} + +Status ClusterFailover::checkSlaveStatus() { + // We could try to connect, but GetSlaveReplicationOffset checks connection. + auto offset = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!offset.IsOK()) { + error("[Failover] slave node {} {} not connected or not syncing", slave_node_id_, node_ip_port_); + return {Status::NotOK, "Slave not connected or not syncing"}; + } + info("[Failover] slave node {} {} is connected and syncing offset {}", slave_node_id_, node_ip_port_, offset.Msg()); + return Status::OK(); +} + +Status ClusterFailover::waitReplicationSync() { + while (true) { + if (util::GetTimeStampMS() - start_time_ms_ > static_cast(timeout_ms_)) { + return {Status::NotOK, "Timeout waiting for replication sync"}; + } + + auto offset_status = srv_->GetSlaveReplicationOffset(node_ip_port_); + if (!offset_status.IsOK()) { + return {Status::NotOK, "Failed to get slave offset: " + offset_status.Msg()}; + } + + if (*offset_status >= target_seq_) { + return Status::OK(); + } + + std::this_thread::sleep_for(std::chrono::milliseconds(10)); + } +} + +Status ClusterFailover::sendTakeoverCmd() { + auto s = util::SockConnect(node_ip_, node_port_); + if (!s.IsOK()) { + return {Status::NotOK, "Failed to connect to slave: " + s.Msg()}; + } + int fd = *s; + + std::string pass = srv_->GetConfig()->requirepass; + if (!pass.empty()) { + std::string auth_cmd = redis::ArrayOfBulkStrings({"AUTH", pass}); + auto s_auth = util::SockSend(fd, auth_cmd); + if (!s_auth.IsOK()) { + close(fd); + return {Status::NotOK, "Failed to send AUTH: " + s_auth.Msg()}; + } + auto s_line = util::SockReadLine(fd); + if (!s_line.IsOK() || s_line.GetValue().substr(0, 3) != "+OK") { + close(fd); + return {Status::NotOK, "AUTH failed"}; + } + } + + std::string cmd = redis::ArrayOfBulkStrings({"CLUSTERX", "TAKEOVER"}); + auto s_send = util::SockSend(fd, cmd); + if (!s_send.IsOK()) { + close(fd); + return {Status::NotOK, "Failed to send TAKEOVER: " + s_send.Msg()}; + } + + auto s_resp = util::SockReadLine(fd); + close(fd); + + if (!s_resp.IsOK()) { + return {Status::NotOK, "Failed to read TAKEOVER response: " + s_resp.Msg()}; + } + + if (s_resp.GetValue().substr(0, 3) != "+OK") { + return {Status::NotOK, "TAKEOVER failed: " + s_resp.GetValue()}; + } + + return Status::OK(); +} + +void ClusterFailover::abortFailover(const std::string &reason) { + error("[Failover] node {} {} failover failed: {}", slave_node_id_, node_ip_port_, reason); + state_ = FailoverState::kFailed; +} + +void ClusterFailover::GetFailoverInfo(std::string *info) { + *info = "cluster_failover_state:"; + switch (state_.load()) { + case FailoverState::kNone: + *info += "none"; + break; + case FailoverState::kStarted: + *info += "started"; + break; + case FailoverState::kCheck: + *info += "check_slave"; + break; + case FailoverState::kPause: + *info += "pause_write"; + break; + case FailoverState::kSyncWait: + *info += "wait_sync"; + break; + case FailoverState::kSwitch: + *info += "switching"; + break; + case FailoverState::kSuccess: + *info += "success"; + break; + case FailoverState::kFailed: + *info += "failed"; + break; + } + *info += "\r\n"; +} diff --git a/src/cluster/cluster_failover.h b/src/cluster/cluster_failover.h new file mode 100644 index 00000000000..bf62cdc9694 --- /dev/null +++ b/src/cluster/cluster_failover.h @@ -0,0 +1,74 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +#pragma once + +#include +#include +#include +#include +#include + +#include "status.h" + +class Server; + +enum class FailoverState { kNone = 0, kStarted, kCheck, kPause, kSyncWait, kSwitch, kSuccess, kFailed }; + +class ClusterFailover { + public: + explicit ClusterFailover(Server *srv); + ~ClusterFailover(); + + Status Run(std::string slave_node_id, int timeout_ms); + bool IsWriteForbidden() { + auto s = state_.load(); + return s == FailoverState::kPause || s == FailoverState::kSyncWait || s == FailoverState::kSwitch; + } + std::string GetSlaveNodeId() { return slave_node_id_; } + void GetFailoverInfo(std::string *info); + void ResetFailoverState() { state_ = FailoverState::kNone; } + + private: + void loop(); + void runFailoverProcess(); + + Status checkSlaveStatus(); + Status checkSlaveLag(); + Status waitReplicationSync(); + Status sendTakeoverCmd(); + void abortFailover(const std::string &reason); + + Server *srv_; + std::atomic state_{FailoverState::kNone}; + std::string slave_node_id_; + std::string node_ip_port_; + std::string node_ip_; + int node_port_ = 0; + int timeout_ms_ = 0; + uint64_t target_seq_ = 0; + int64_t start_time_ms_ = 0; + + std::thread t_; + std::mutex mutex_; + std::condition_variable cv_; + bool stop_thread_ = false; + std::atomic failover_job_triggered_{false}; +}; diff --git a/src/commands/cmd_cluster.cc b/src/commands/cmd_cluster.cc index 7a16ddd9ab1..db8b0d511bd 100644 --- a/src/commands/cmd_cluster.cc +++ b/src/commands/cmd_cluster.cc @@ -19,6 +19,7 @@ */ #include "cluster/cluster_defs.h" +#include "cluster/cluster_failover.h" #include "cluster/slot_import.h" #include "cluster/sync_migrate_context.h" #include "commander.h" @@ -237,7 +238,29 @@ class CommandClusterX : public Commander { return Status::OK(); } - return {Status::RedisParseErr, "CLUSTERX command, CLUSTERX VERSION|MYID|SETNODEID|SETNODES|SETSLOT|MIGRATE"}; + if (subcommand_ == "failover") { + if (args.size() != 3 && args.size() != 4) return {Status::RedisParseErr, errWrongNumOfArguments}; + + slave_node_id_ = args_[2]; + + if (args.size() == 4) { + auto parse_result = ParseInt(args_[3], 10); + if (!parse_result) return {Status::RedisParseErr, "Invalid timeout"}; + if (*parse_result < 0) return {Status::RedisParseErr, errTimeoutIsNegative}; + failover_timeout_ = *parse_result; + } else { + failover_timeout_ = 1000; + } + return Status::OK(); + } + + if (subcommand_ == "takeover") { + if (args.size() != 2) return {Status::RedisParseErr, errWrongNumOfArguments}; + return Status::OK(); + } + + return {Status::RedisParseErr, + "CLUSTERX command, CLUSTERX VERSION|MYID|SETNODEID|SETNODES|SETSLOT|MIGRATE|FAILOVER"}; } Status Execute([[maybe_unused]] engine::Context &ctx, Server *srv, Connection *conn, std::string *output) override { @@ -289,6 +312,20 @@ class CommandClusterX : public Commander { } else { return s; } + } else if (subcommand_ == "failover") { + Status s = srv->cluster_failover->Run(slave_node_id_, failover_timeout_); + if (s.IsOK()) { + *output = redis::RESP_OK; + } else { + return s; + } + } else if (subcommand_ == "takeover") { + Status s = srv->cluster->OnTakeOver(); + if (s.IsOK()) { + *output = redis::RESP_OK; + } else { + return s; + } } else { return {Status::RedisExecErr, "Invalid cluster command options"}; } @@ -309,6 +346,8 @@ class CommandClusterX : public Commander { bool sync_migrate_ = false; int sync_migrate_timeout_ = 0; std::unique_ptr sync_migrate_ctx_ = nullptr; + std::string slave_node_id_; + int failover_timeout_ = 0; }; static uint64_t GenerateClusterFlag(uint64_t flags, const std::vector &args) { diff --git a/src/server/redis_connection.cc b/src/server/redis_connection.cc index e6154868956..39f6c7043cc 100644 --- a/src/server/redis_connection.cc +++ b/src/server/redis_connection.cc @@ -37,6 +37,7 @@ #endif #include "commands/blocking_commander.h" +#include "cluster/redis_slot.h" #include "redis_connection.h" #include "scope_exit.h" #include "server.h" @@ -480,6 +481,25 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { continue; } + // Get slot for imported_slots_ check (needed for failover scenario) + int slot = -1; + if (config->cluster_enabled && (cmd_flags & kCmdWrite)) { + std::vector key_indexes; + attributes->ForEachKeyRange( + [&](const std::vector &, redis::CommandKeyRange key_range) { + key_range.ForEachKeyIndex([&](int i) { key_indexes.push_back(i); }, cmd_tokens.size()); + }, + cmd_tokens); + if (!key_indexes.empty()) { + for (auto i : key_indexes) { + if (i < static_cast(cmd_tokens.size())) { + slot = GetSlotIdFromKey(cmd_tokens[i]); + break; + } + } + } + } + if (config->cluster_enabled) { s = srv_->cluster->CanExecByMySelf(attributes, cmd_tokens, this); if (!s.IsOK()) { @@ -502,8 +522,16 @@ void Connection::ExecuteCommands(std::deque *to_process_cmds) { } if (config->slave_readonly && srv_->IsSlave() && (cmd_flags & kCmdWrite)) { - Reply(redis::Error({Status::RedisReadOnly, "You can't write against a read only slave."})); - continue; + // Allow write if slot is in imported_slots_ (failover scenario) + // The slot is already imported via OnTakeOver(), but topology hasn't been updated yet + bool allow_write = false; + if (config->cluster_enabled && slot >= 0) { + allow_write = srv_->cluster->IsSlotImported(slot); + } + if (!allow_write) { + Reply(redis::Error({Status::RedisReadOnly, "You can't write against a read only slave."})); + continue; + } } if ((cmd_flags & kCmdWrite) && !(cmd_flags & kCmdNoDBSizeCheck) && srv_->storage->ReachedDBSizeLimit()) { diff --git a/src/server/server.cc b/src/server/server.cc index a952cdadd28..929e0e30e3e 100644 --- a/src/server/server.cc +++ b/src/server/server.cc @@ -38,6 +38,7 @@ #include #include +#include "cluster/cluster_failover.h" #include "commands/command_parser.h" #include "commands/commander.h" #include "common/string_util.h" @@ -181,6 +182,7 @@ Status Server::Start() { if (config_->cluster_enabled) { // Create objects used for slot migration slot_migrator = std::make_unique(this); + cluster_failover = std::make_unique(this); if (config_->persist_cluster_nodes_enabled) { auto s = cluster->LoadClusterNodes(config_->NodesFilePath()); @@ -2210,3 +2212,14 @@ AuthResult Server::AuthenticateUser(const std::string &user_password, std::strin *ns = kDefaultNamespace; return AuthResult::IS_ADMIN; } + +StatusOr Server::GetSlaveReplicationOffset(const std::string &node_ip_port) { + + std::shared_lock guard(slave_threads_mu_); + for (const auto &slave : slave_threads_) { + if (slave->GetConn()->GetAnnounceAddr() == node_ip_port) { + return slave->GetAckSeq(); + } + } + return {Status::NotOK, "Slave not connected"}; +} diff --git a/src/server/server.h b/src/server/server.h index 597ea00aec6..e5a1acf7bca 100644 --- a/src/server/server.h +++ b/src/server/server.h @@ -177,6 +177,7 @@ class ServerLogData { class SlotImport; class SlotMigrator; +class ClusterFailover; class Server { public: @@ -335,6 +336,7 @@ class Server { static inline std::atomic unix_time_secs = 0; std::unique_ptr slot_migrator; std::unique_ptr slot_import; + std::unique_ptr cluster_failover; void UpdateWatchedKeysFromArgs(const std::vector &args, const redis::CommandAttributes &attr); void UpdateWatchedKeysManually(const std::vector &keys); @@ -342,6 +344,7 @@ class Server { static bool IsWatchedKeysModified(redis::Connection *conn); void ResetWatchedKeys(redis::Connection *conn); std::list> GetSlaveHostAndPort(); + StatusOr GetSlaveReplicationOffset(const std::string &node_ip_port); Namespace *GetNamespace() { return &namespace_; } AuthResult AuthenticateUser(const std::string &user_password, std::string *ns); diff --git a/src/storage/scripting.cc b/src/storage/scripting.cc index c8ddfc6677d..b172ce77c70 100644 --- a/src/storage/scripting.cc +++ b/src/storage/scripting.cc @@ -865,8 +865,16 @@ int RedisGenericCommand(lua_State *lua, int raise_error) { } if (config->slave_readonly && srv->IsSlave() && (cmd_flags & redis::kCmdWrite)) { - PushError(lua, "READONLY You can't write against a read only slave."); - return raise_error ? RaiseError(lua) : 1; + // Allow write if slot is in imported_slots_ (failover scenario) + // The slot is already imported via OnTakeOver(), but topology hasn't been updated yet + bool allow_write = false; + if (config->cluster_enabled && script_run_ctx && script_run_ctx->current_slot >= 0) { + allow_write = srv->cluster->IsSlotImported(script_run_ctx->current_slot); + } + if (!allow_write) { + PushError(lua, "READONLY You can't write against a read only slave."); + return raise_error ? RaiseError(lua) : 1; + } } if (!config->slave_serve_stale_data && srv->IsSlave() && cmd_name != "info" && cmd_name != "slaveof" && diff --git a/tests/gocase/go.mod b/tests/gocase/go.mod index 7ec1c0e2b96..709d3dc1f9d 100644 --- a/tests/gocase/go.mod +++ b/tests/gocase/go.mod @@ -1,9 +1,8 @@ module github.com/apache/kvrocks/tests/gocase -go 1.24.0 +go 1.25 require ( - github.com/linxGnu/grocksdb v1.10.2 github.com/redis/go-redis/v9 v9.14.0 github.com/shirou/gopsutil/v4 v4.25.8 github.com/stretchr/testify v1.11.1 diff --git a/tests/gocase/integration/failover/failover_test.go b/tests/gocase/integration/failover/failover_test.go new file mode 100644 index 00000000000..febfdf830d2 --- /dev/null +++ b/tests/gocase/integration/failover/failover_test.go @@ -0,0 +1,972 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ + +package failover + +import ( + "context" + "fmt" + "strings" + "testing" + "time" + + "github.com/apache/kvrocks/tests/gocase/util" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +// testNameWrapper wraps testing.TB to sanitize test names for MkdirTemp +// This is needed because subtest names contain "/" which causes MkdirTemp to fail +type testNameWrapper struct { + testing.TB + sanitizedName string +} + +func (w *testNameWrapper) Name() string { + return w.sanitizedName +} + +// sanitizeTestName replaces path separators in test names to avoid issues with MkdirTemp +func sanitizeTestName(tb testing.TB) testing.TB { + sanitizedName := strings.ReplaceAll(tb.Name(), "/", "_") + return &testNameWrapper{TB: tb, sanitizedName: sanitizedName} +} + +// startServerWithSanitizedName starts a server with a sanitized test name +func startServerWithSanitizedName(t testing.TB, configs map[string]string) *util.KvrocksServer { + return util.StartServer(sanitizeTestName(t), configs) +} + +type FailoverState string + +const ( + FailoverStateNone FailoverState = "none" + FailoverStateStarted FailoverState = "started" + FailoverStateCheckSlave FailoverState = "check_slave" + FailoverStatePauseWrite FailoverState = "pause_write" + FailoverStateWaitSync FailoverState = "wait_sync" + FailoverStateSwitching FailoverState = "switching" + FailoverStateSuccess FailoverState = "success" + FailoverStateFailed FailoverState = "failed" +) + +// TestFailoverBasicFlow tests the basic failover process and custom timeout parameter. +// Test Case 1.1: Basic Failover Flow - Master successfully transfers control to Slave +// Test Case 1.2: Failover with Custom Timeout - Using custom timeout parameter +func TestFailoverBasicFlow(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + // Wait for replication to establish + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 1.1: Basic Failover Flow + t.Run("FAILOVER - Basic failover flow", func(t *testing.T) { + // Write some data + require.NoError(t, masterClient.Set(ctx, "key1", "value1", 0).Err()) + require.NoError(t, masterClient.Set(ctx, "key2", "value2", 0).Err()) + + // Start failover + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + + // Wait for failover to complete + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify slots are migrated (MOVED response) + require.ErrorContains(t, masterClient.Set(ctx, "key1", "newvalue", 0).Err(), "MOVED") + require.ErrorContains(t, masterClient.Get(ctx, "key1").Err(), "MOVED") + + // Verify data is accessible on new master (slave) + require.Equal(t, "value1", slaveClient.Get(ctx, "key1").Val()) + require.Equal(t, "value2", slaveClient.Get(ctx, "key2").Val()) + }) + + // Test Case 1.2: Failover with Custom Timeout + t.Run("FAILOVER - Failover with custom timeout", func(t *testing.T) { + // Reset failover state by updating topology + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start failover with custom timeout + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "5000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + }) +} + +// TestFailoverFailureCases tests various failure scenarios and timeout values. +// Test Case 2.1: Slave Node Not Found - Specified slave_node_id is not in cluster +// Test Case 2.2: Slave Not Connected - Slave node exists but no replication connection +// Test Case 3.5: Different Timeout Values - Testing various timeout values (0, 100, 10000) +func TestFailoverFailureCases(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + // Test Case 2.1: Slave Node Not Found + t.Run("FAILOVER - Failover to non-existent node", func(t *testing.T) { + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383", masterID, master.Host(), master.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + nonExistentID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx99" + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", nonExistentID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + // Test Case 2.2: Slave Not Connected (node exists as master, not slave) + t.Run("FAILOVER - Failover to non-slave node", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + // Set slave as master (not slave) + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d master -", slaveID, slave.Host(), slave.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + }) + + // Test Case 3.5: Invalid timeout value (negative) + t.Run("FAILOVER - Invalid timeout value", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + + // Negative timeout should return error + require.Error(t, masterClient.Do(ctx, "clusterx", "failover", slaveID, "-1").Err()) + }) + + // Test Case 3.5: Different Timeout Values (0, 100, 10000) + t.Run("FAILOVER - Different timeout values", func(t *testing.T) { + slave := startServerWithSanitizedName(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "4").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave is already synced (lag=0) + // When lag=0, failover should succeed because no waiting is needed + // But if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // Wait for either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Check final state - can be success (if lag=0) or failed (if lag>0) + finalInfo := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(finalInfo, "cluster_failover_state:success") { + t.Logf("timeout=0 with lag=0: failover succeeded as expected") + } else if strings.Contains(finalInfo, "cluster_failover_state:failed") { + t.Logf("timeout=0: failover failed (slave may have lag)") + } + + // Reset for next test + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "5").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test with timeout = 0 when slave has lag + // Create lag by writing data to master without waiting for sync + for i := 0; i < 100; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err()) + } + // Don't wait for sync, start failover immediately to create lag scenario + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "0").Val()) + // With lag, timeout=0 should fail + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + + // Reset and test with small timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "6").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "100").Val()) + // Small timeout may fail, but should start + time.Sleep(200 * time.Millisecond) + info := masterClient.ClusterInfo(ctx).Val() + require.True(t, strings.Contains(info, "cluster_failover_state:failed") || + strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:wait_sync") || + strings.Contains(info, "cluster_failover_state:switching")) + + // Reset and test with large timeout + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "7").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +// TestFailoverConcurrency tests concurrent failover scenarios. +// Test Case 3.1: Duplicate Failover - Cannot start failover when one is in progress +// Test Case 3.2: Restart After Failure - Can restart failover after previous failure +func TestFailoverConcurrency(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 3.1: Duplicate Failover + t.Run("FAILOVER - Cannot start failover when one is in progress", func(t *testing.T) { + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + + // Try to start another failover immediately - should return error + // Wait a bit to ensure first failover has started + time.Sleep(100 * time.Millisecond) + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + // second failover may return an error indicating a failover is already in progress. + _, err := result.Result() + if err != nil { + require.Contains(t, err.Error(), "Failover is already in progress") + } else { + // should not reach here + require.Fail(t, "second failover should return error") + } + + // We verify the first one completes successfully + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + }) + + // Test Case 3.2: Restart After Failure + t.Run("FAILOVER - Can restart after failure", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Start a failover with very short timeout + // If slave is synced (lag=0), it may succeed; if slave has lag, it will fail + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "1").Val()) + // Accept either success or failed state + require.Eventually(t, func() bool { + info := masterClient.ClusterInfo(ctx).Val() + return strings.Contains(info, "cluster_failover_state:success") || + strings.Contains(info, "cluster_failover_state:failed") + }, 5*time.Second, 100*time.Millisecond) + + // Can restart after failure + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "3").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +// TestFailoverWriteBlocking tests write and read request behavior during failover. +// Test Case 3.3: Write Requests During Failover - Write requests return TRYAGAIN in blocking states +// Test Case 3.4: Read Requests During Failover - Read requests are not blocked +func TestFailoverWriteBlocking(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 3.3: Write Requests During Failover + t.Run("FAILOVER - Write requests blocked during failover", func(t *testing.T) { + // Write initial data + require.NoError(t, masterClient.Set(ctx, "testkey", "testvalue", 0).Err()) + + // Start failover with long timeout to observe blocking + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + + // Try to write during failover - should return TRYAGAIN in blocking states + // Poll for blocking state (pause_write, wait_sync, or switching) + for i := 0; i < 50; i++ { + time.Sleep(50 * time.Millisecond) + err := masterClient.Set(ctx, "testkey", "newvalue", 0).Err() + if err != nil && (strings.Contains(err.Error(), "TRYAGAIN") || strings.Contains(err.Error(), "Failover in progress")) { + break + } + // Check if failover already completed + info := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(info, "cluster_failover_state:success") { + break + } + } + // At least one write should have been blocked, or failover completed very quickly + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + + // After success, writes should return MOVED + require.ErrorContains(t, masterClient.Set(ctx, "testkey", "newvalue2", 0).Err(), "MOVED") + }) + + // Test Case 3.4: Read Requests During Failover + t.Run("FAILOVER - Read requests not blocked during failover", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.NoError(t, masterClient.Set(ctx, "readkey", "readvalue", 0).Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000").Val()) + + // Reads should work during failover (not blocked) + // Try reading multiple times during failover + for i := 0; i < 10; i++ { + time.Sleep(100 * time.Millisecond) + val := masterClient.Get(ctx, "readkey").Val() + require.Equal(t, "readvalue", val) + // Check if failover completed + info := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(info, "cluster_failover_state:success") { + break + } + } + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + }) +} + +// TestFailoverWithAuth tests failover with password authentication. +// Test Case 1.4: Failover with Password Authentication - Cluster configured with requirepass +func TestFailoverWithAuth(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "requirepass": "password123", + }) + defer func() { master.Close() }() + masterClient := master.NewClient() + masterClient = redis.NewClient(&redis.Options{ + Addr: master.HostPort(), + Password: "password123", + }) + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := startServerWithSanitizedName(t, map[string]string{ + "cluster-enabled": "yes", + "requirepass": "password123", + "masterauth": "password123", + }) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + slaveClient = redis.NewClient(&redis.Options{ + Addr: slave.HostPort(), + Password: "password123", + }) + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 1.4: Failover with Password Authentication + t.Run("FAILOVER - Failover with authentication", func(t *testing.T) { + require.NoError(t, masterClient.Set(ctx, "authkey", "authvalue", 0).Err()) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify data on new master + require.Equal(t, "authvalue", slaveClient.Get(ctx, "authkey").Val()) + }) +} + +// TestFailoverStateQuery tests querying failover state information. +// Test Case 4.1: CLUSTER INFO State Output - Query failover state and verify all state transitions +func TestFailoverStateQuery(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 4.1: CLUSTER INFO State Output + t.Run("FAILOVER - Query failover state via CLUSTER INFO", func(t *testing.T) { + // Initial state should be none + info := masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:none") + + // Start failover + result := masterClient.Do(ctx, "clusterx", "failover", slaveID) + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + + // Wait for success + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify state is success + info = masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:success") + }) + + // Test Case 4.1: All State Transitions + t.Run("FAILOVER - All state transitions", func(t *testing.T) { + // Reset state + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Initial state: none + info := masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:none") + + // Start failover + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + + // We may catch intermediate states, but they're very fast + // The important thing is we transition through them and end at success + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Verify final state + info = masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:success") + }) +} + +// TestFailoverTakeoverCommand tests the TAKEOVER command handling on slave. +// Test Case 5.2: TAKEOVER Command Processing - Slave receives and processes TAKEOVER command +func TestFailoverTakeoverCommand(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 5.2: TAKEOVER Command Processing + t.Run("FAILOVER - TAKEOVER command on slave", func(t *testing.T) { + // Slave should accept TAKEOVER command + require.Equal(t, "OK", slaveClient.Do(ctx, "clusterx", "takeover").Val()) + + // Verify imported slots are set + // After takeover, slave should be able to serve the slots + require.NoError(t, masterClient.Set(ctx, "takeoverkey", "takeovervalue", 0).Err()) + + // Start failover to test the full flow + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "2").Err()) + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + }) +} + +// TestFailoverDataConsistency tests data consistency after failover. +// Test Case 5.4: Data Consistency Verification - All data is replicated to new master without loss +func TestFailoverDataConsistency(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 5.4: Data Consistency Verification + t.Run("FAILOVER - Data consistency after failover", func(t *testing.T) { + // Write various types of data + require.NoError(t, masterClient.Set(ctx, "string_key", "string_value", 0).Err()) + require.NoError(t, masterClient.LPush(ctx, "list_key", "item1", "item2", "item3").Err()) + require.NoError(t, masterClient.HSet(ctx, "hash_key", "field1", "value1", "field2", "value2").Err()) + require.NoError(t, masterClient.SAdd(ctx, "set_key", "member1", "member2").Err()) + require.NoError(t, masterClient.ZAdd(ctx, "zset_key", redis.Z{Score: 1.0, Member: "member1"}).Err()) + + // Start failover with longer timeout to ensure data sync + result := masterClient.Do(ctx, "clusterx", "failover", slaveID, "10000") + if result.Err() != nil { + t.Logf("FAILOVER command error: %v", result.Err()) + } + require.NoError(t, result.Err(), "FAILOVER command should succeed") + require.Equal(t, "OK", result.Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 15*time.Second) + + // Verify all data is on new master + require.Equal(t, "string_value", slaveClient.Get(ctx, "string_key").Val()) + require.EqualValues(t, []string{"item3", "item2", "item1"}, slaveClient.LRange(ctx, "list_key", 0, -1).Val()) + require.Equal(t, map[string]string{"field1": "value1", "field2": "value2"}, slaveClient.HGetAll(ctx, "hash_key").Val()) + require.EqualValues(t, []string{"member1", "member2"}, slaveClient.SMembers(ctx, "set_key").Val()) + require.EqualValues(t, []redis.Z{{Score: 1.0, Member: "member1"}}, slaveClient.ZRangeWithScores(ctx, "zset_key", 0, -1).Val()) + }) +} + +// TestFailoverStateReset tests failover state reset after topology update. +// Test Case 5.1: SETNODES Reset State - Controller updates topology and resets failover state +func TestFailoverStateReset(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 5.1: SETNODES Reset State + t.Run("FAILOVER - State reset after SETNODES", func(t *testing.T) { + // Start and complete failover + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateSuccess, 10*time.Second) + + // Update topology (simulating controller update) + newClusterNodes := fmt.Sprintf("%s %s %d slave %s\n", masterID, master.Host(), master.Port(), slaveID) + newClusterNodes += fmt.Sprintf("%s %s %d master - 0-16383", slaveID, slave.Host(), slave.Port()) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", newClusterNodes, "2").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", newClusterNodes, "2").Err()) + + // State should be reset to none + // After SETNODES, the original master becomes slave, and original slave becomes master + // Wait for replication relationship to be re-established + // The new master (slaveClient) should have a slave connection + require.Eventually(t, func() bool { + info := slaveClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 20*time.Second, 200*time.Millisecond) + + // Verify failover state is reset to none on the new master + info := slaveClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:none") + }) +} + +// Helper functions + +func waitForFailoverState(t testing.TB, client *redis.Client, state FailoverState, timeout time.Duration) { + var lastInfo string + require.Eventually(t, func() bool { + info := client.ClusterInfo(context.Background()).Val() + if info != lastInfo && strings.Contains(info, "cluster_failover_state:") { + // Log state changes for debugging + t.Logf("Failover state: %s", info) + lastInfo = info + } + return strings.Contains(info, fmt.Sprintf("cluster_failover_state:%s", state)) + }, timeout, 100*time.Millisecond) +} + +// TestFailoverSlaveNotConnected tests failover to a slave that is not connected. +// Test Case 2.2: Slave Not Connected - Slave node exists in topology but no replication connection +func TestFailoverSlaveNotConnected(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + + // Set up cluster topology but don't establish replication + // Slave exists in topology but is not connected + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + // Try to failover to unconnected slave + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + + // Verify error state + info := masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:failed") +} + +// TestFailoverWaitSyncTimeout tests failover timeout when waiting for replication sync. +// Test Case 2.6: Wait Sync Timeout - waitReplicationSync exceeds timeout +func TestFailoverWaitSyncTimeout(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Write some data to create lag + for i := 0; i < 100; i++ { + require.NoError(t, masterClient.Set(ctx, fmt.Sprintf("key%d", i), fmt.Sprintf("value%d", i), 0).Err()) + } + + // Start failover with very short timeout to trigger timeout + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID, "1").Val()) + waitForFailoverState(t, masterClient, FailoverStateFailed, 5*time.Second) + + // Verify timeout error + info := masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:failed") +} + +// TestFailoverAuthFailure tests failover with incorrect authentication. +// Test Case 2.8: AUTH Failed - Password Incorrect - requirepass configured but password is wrong +func TestFailoverAuthFailure(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "requirepass": "correctpass", + }) + defer func() { master.Close() }() + masterClient := master.NewClient() + masterClient = redis.NewClient(&redis.Options{ + Addr: master.HostPort(), + Password: "correctpass", + }) + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + // Slave with different password (simulating auth failure scenario) + // Note: In real scenario, master would try to connect with its own password + // but if slave has different password, AUTH would fail + // However, in our test setup, both use same config, so we test with wrong password scenario + slave := util.StartServer(t, map[string]string{ + "cluster-enabled": "yes", + "requirepass": "wrongpass", // Different password + }) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + slaveClient = redis.NewClient(&redis.Options{ + Addr: slave.HostPort(), + Password: "wrongpass", + }) + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + // Note: Replication won't establish due to password mismatch, but failover will try + // The failover will fail when trying to send TAKEOVER command with wrong password + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + // This should fail during TAKEOVER due to AUTH failure + waitForFailoverState(t, masterClient, FailoverStateFailed, 10*time.Second) + + info := masterClient.ClusterInfo(ctx).Val() + require.Contains(t, info, "cluster_failover_state:failed") +} + +// TestFailoverStateTransitions tests observing various failover state transitions. +// Test Case 4.1: State Transitions - Verify failover progresses through expected states +func TestFailoverStateTransitions(t *testing.T) { + ctx := context.Background() + + master := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { master.Close() }() + masterClient := master.NewClient() + defer func() { require.NoError(t, masterClient.Close()) }() + masterID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx00" + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODEID", masterID).Err()) + + slave := util.StartServer(t, map[string]string{"cluster-enabled": "yes"}) + defer func() { slave.Close() }() + slaveClient := slave.NewClient() + defer func() { require.NoError(t, slaveClient.Close()) }() + slaveID := "xxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxxx01" + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODEID", slaveID).Err()) + + clusterNodes := fmt.Sprintf("%s %s %d master - 0-16383\n", masterID, master.Host(), master.Port()) + clusterNodes += fmt.Sprintf("%s %s %d slave %s", slaveID, slave.Host(), slave.Port(), masterID) + require.NoError(t, masterClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + require.NoError(t, slaveClient.Do(ctx, "clusterx", "SETNODES", clusterNodes, "1").Err()) + + require.Eventually(t, func() bool { + info := masterClient.Info(ctx, "replication").Val() + return strings.Contains(info, "connected_slaves:1") + }, 10*time.Second, 100*time.Millisecond) + + // Test Case 4.1: State Transitions + t.Run("FAILOVER - Verify all possible states appear", func(t *testing.T) { + // Start failover + require.Equal(t, "OK", masterClient.Do(ctx, "clusterx", "failover", slaveID).Val()) + + // Poll for states - we may catch intermediate states + statesSeen := make(map[FailoverState]bool) + for i := 0; i < 100; i++ { + time.Sleep(50 * time.Millisecond) + info := masterClient.ClusterInfo(ctx).Val() + if strings.Contains(info, "cluster_failover_state:none") { + statesSeen[FailoverStateNone] = true + } + if strings.Contains(info, "cluster_failover_state:started") { + statesSeen[FailoverStateStarted] = true + } + if strings.Contains(info, "cluster_failover_state:check_slave") { + statesSeen[FailoverStateCheckSlave] = true + } + if strings.Contains(info, "cluster_failover_state:pause_write") { + statesSeen[FailoverStatePauseWrite] = true + } + if strings.Contains(info, "cluster_failover_state:wait_sync") { + statesSeen[FailoverStateWaitSync] = true + } + if strings.Contains(info, "cluster_failover_state:switching") { + statesSeen[FailoverStateSwitching] = true + } + if strings.Contains(info, "cluster_failover_state:success") { + statesSeen[FailoverStateSuccess] = true + break + } + if strings.Contains(info, "cluster_failover_state:failed") { + statesSeen[FailoverStateFailed] = true + break + } + } + + // We should at least see success or failed + require.True(t, statesSeen[FailoverStateSuccess] || statesSeen[FailoverStateFailed], + "Should reach either success or failed state") + }) +}