From be7654b5e077075718b2f7615e1374ef56cc0609 Mon Sep 17 00:00:00 2001 From: Zeke Gabrielse Date: Tue, 10 Feb 2026 12:36:00 -0600 Subject: [PATCH 1/2] add base for online cutover lib --- config/initializers/online_cutover.rb | 12 + lib/online_cutover.rb | 412 +++++++++++++++++++++++ lib/tasks/online_cutover.rake | 94 ++++++ spec/lib/online_cutover_spec.rb | 452 ++++++++++++++++++++++++++ 4 files changed, 970 insertions(+) create mode 100644 config/initializers/online_cutover.rb create mode 100644 lib/online_cutover.rb create mode 100644 lib/tasks/online_cutover.rake create mode 100644 spec/lib/online_cutover_spec.rb diff --git a/config/initializers/online_cutover.rb b/config/initializers/online_cutover.rb new file mode 100644 index 0000000000..530a05793f --- /dev/null +++ b/config/initializers/online_cutover.rb @@ -0,0 +1,12 @@ +# frozen_string_literal: true + +require_dependency Rails.root / 'lib' / 'online_cutover' + +OnlineCutover.configure do |config| + config.enabled = Keygen.server? || Keygen.worker? + + config.replica_available = -> { Keygen.database.read_replica_available? } + config.replica_enabled = -> { Keygen.database.read_replica_enabled? } + + config.quiesce_timeout = ENV.fetch('ONLINE_CUTOVER_QUIESCE_TIMEOUT') { 30 }.to_i +end diff --git a/lib/online_cutover.rb b/lib/online_cutover.rb new file mode 100644 index 0000000000..a10d8048e5 --- /dev/null +++ b/lib/online_cutover.rb @@ -0,0 +1,412 @@ +# frozen_string_literal: true + +require 'active_support' +require 'active_record' +require 'singleton' +require 'json' + +# coordinates zero-downtime database cutover across web and worker processes for a promoted replica +# +# == Cutover procedure +# +# 1. run `rake online_cutover:quiesce` to pause new work and complete in-flight work +# 2. manually monitor replication lag until 0 +# 3. manually promote replica +# 4. run `rake online_cutover:promote` to route all traffic to promoted replica +# 5. run `rake online_cutover:resume` to resume work +# +# == Abort procedure +# +# run `rake online_cutover:abort` to route all traffic to primary +# +module OnlineCutover + class QuiesceTimeoutError < StandardError; end + class InvalidPhaseError < StandardError; end + class InvalidRoutingError < StandardError; end + + REDIS_KEY_PREFIX = 'online_cutover' + REDIS_STATE_KEY = "#{REDIS_KEY_PREFIX}:state" + REDIS_CHANNEL = "#{REDIS_KEY_PREFIX}:notifications" + + PHASE_NORMAL = 'normal' + PHASE_QUIESCING = 'quiescing' + PHASES = [PHASE_NORMAL, PHASE_QUIESCING].freeze + + ROUTING_NORMAL = 'normal' + ROUTING_PROMOTED = 'promoted' + ROUTING_ABORTED = 'aborted' + ROUTINGS = [ROUTING_NORMAL, ROUTING_PROMOTED, ROUTING_ABORTED].freeze + + SHARD_NORMAL = 'normal' + SHARD_PROMOTED = 'promoted' + SHARD_ABORTED = 'aborted' + + class Configuration + # the primary database key that will be replaced + attr_accessor :primary_database_key + + # the replica database key that will be promoted + attr_accessor :replica_database_key + + # how long to block requests during quiescing phase before timing out + attr_accessor :quiesce_timeout + + # ttl for redis state keys (should be longer than any expected cutover) + attr_accessor :state_ttl + + # whether the replica is available + attr_writer :replica_available + + # whether the replica is enabled + attr_writer :replica_enabled + + # whether to enable cutover functionality + attr_writer :enabled + + # whether to enable debug mode + attr_writer :debug + + def initialize + @primary_database_key = :primary + @replica_database_key = :replica + @quiesce_timeout = 30.seconds + @state_ttl = 1.hour + @replica_available = true + @replica_enabled = true + @enabled = true + @debug = Rails.env.local? + end + + def replica_available? = (@replica_available in Proc) ? @replica_available.call : !!@replica_available + def replica_enabled? = (@replica_enabled in Proc) ? @replica_enabled.call : !!@replica_enabled + def enabled? = (@enabled in Proc) ? @enabled.call : !!@enabled + def debug? = (@debug in Proc) ? @debug.call : !!@debug + end + + class << self + def configuration = @configuration ||= Configuration.new + def configure + yield configuration + end + + # helper for accessing state singleton + def state = State.instance + + def current_phase = state.phase.to_s.inquiry + def current_routing = state.routing.to_s.inquiry + def current_shard + shard = case current_routing + when ROUTING_PROMOTED then SHARD_PROMOTED + when ROUTING_ABORTED then SHARD_ABORTED + else SHARD_NORMAL + end + + shard.to_s.inquiry + end + + # state mutations used by operational rake tasks + def set_phase!(phase) + raise InvalidPhaseError, "invalid phase: #{phase}" unless PHASES.include?(phase) + + state.set_phase!(phase) + end + + def set_routing!(routing) + raise InvalidRoutingError, "invalid routing: #{routing}" unless ROUTINGS.include?(routing) + + state.set_routing!(routing) + end + + # whether cutover is available i.e. replica available and enabled + def available? = configuration.replica_available? && configuration.replica_enabled? + + # whether cutover is enabled + def enabled? = configuration.enabled? + + # returns whether cutover is started i.e. we'er not in a nominal state + def started? = !current_phase.normal? || !current_routing.normal? + + def status + { + quiesce_timeout: configuration.quiesce_timeout, + phase: current_phase, + routing: current_routing, + shard: current_shard, + available: available?, + enabled: enabled?, + started: started?, + } + end + end + + class State + DEFAULT_STATE = { phase: PHASE_NORMAL, routing: ROUTING_NORMAL }.freeze + + include Singleton + + attr_reader :phase, + :routing + + def initialize + @mutex = Mutex.new + @phase = DEFAULT_STATE[:phase] + @routing = DEFAULT_STATE[:routing] + end + + def set_phase!(phase) + @mutex.synchronize do + state = current_state.merge(phase:) + + redis do |conn| + conn.multi do |txn| + txn.set(REDIS_STATE_KEY, JSON.generate(state), ex: OnlineCutover.configuration.state_ttl.to_i) + txn.publish(REDIS_CHANNEL, JSON.generate(event: 'phase.changed', phase:)) + end + end + + @phase = phase + end + end + + def set_routing!(routing) + @mutex.synchronize do + state = current_state.merge(routing:) + + redis do |conn| + conn.multi do |txn| + txn.set(REDIS_STATE_KEY, JSON.generate(state), ex: OnlineCutover.configuration.state_ttl.to_i) + txn.publish(REDIS_CHANNEL, JSON.generate(event: 'routing.changed', routing:)) + end + end + + @routing = routing + end + end + + def sync_from_redis! + @mutex.synchronize do + state = current_state + + @phase = state[:phase] + @routing = state[:routing] + end + end + + def update_local_phase(phase) + @mutex.synchronize { @phase = phase } + end + + def update_local_routing(routing) + @mutex.synchronize { @routing = routing } + end + + # helper for resetting state between tests + def reset! + @phase = DEFAULT_STATE[:phase] + @routing = DEFAULT_STATE[:routing] + end + + private + + attr_writer :phase, + :routing + + def current_state + if state = redis { it.get(REDIS_STATE_KEY) } + JSON.parse(state, symbolize_names: true) + else + DEFAULT_STATE + end + end + + def redis(&) = Rails.cache.redis.then(&) + end + + module Model + extend ActiveSupport::Concern + + class_methods do + def connects_to_with_cutover(config: OnlineCutover.configuration) + primary = config.primary_database_key + replica = config.replica_database_key + + # NB(ezekg) we're using shards to leverage ActiveRecord's connected_to(shard:) for dynamic + # connection switching during cutover, not for traditional horizontal sharding. + connects_to shards: { + SHARD_NORMAL => { writing: primary, reading: replica }, + SHARD_PROMOTED => { writing: replica, reading: replica }, + SHARD_ABORTED => { writing: primary, reading: primary }, + } + end + end + end + + class Middleware + POLL_INTERVAL = 0.1.seconds + + def initialize(app) + @app = app + end + + def call(env) + if OnlineCutover.current_phase.quiescing? + wait_for_resume_or_timeout! + end + + ActiveRecord::Base.connected_to(shard: OnlineCutover.current_shard) do + @app.call(env) + end + rescue QuiesceTimeoutError + [ + 503, + { 'Content-Type' => 'text/plain', 'Retry-After' => '5' }, + ['Service Unavailable'], + ] + end + + private + + def wait_for_resume_or_timeout! + timeout = OnlineCutover.configuration.quiesce_timeout + deadline = Time.current + timeout + + while OnlineCutover.current_phase.quiescing? + if Time.current > deadline + raise QuiesceTimeoutError, "Quiesce timeout exceeded (#{timeout}s)" + end + + sleep POLL_INTERVAL + end + end + end + + class SidekiqMiddleware + POLL_INTERVAL = 0.1.seconds + + def call(worker, job, queue) + if OnlineCutover.current_phase.quiescing? + wait_for_resume_or_retry! + end + + ActiveRecord::Base.connected_to(shard: OnlineCutover.current_shard) do + yield + end + end + + private + + def wait_for_resume_or_retry! + timeout = OnlineCutover.configuration.quiesce_timeout + deadline = Time.current + timeout + + while OnlineCutover.current_phase.quiescing? + if Time.current > deadline + raise QuiesceTimeoutError, "Quiesce timeout exceeded (#{timeout}s) - job will be retried" + end + + sleep POLL_INTERVAL + end + end + end + + class Subscriber + include Singleton + + def initialize + @thread = nil + end + + def start + return if @thread&.alive? + + State.instance.sync_from_redis! # initial state + + @thread = Thread.new { subscribe } + @thread.name = 'online_cutover_subscriber' + @thread.abort_on_exception = true + end + + def stop + @thread&.kill + @thread = nil + end + + def running? + !!@thread&.alive? + end + + private + + def subscribe + redis do |conn| + conn.subscribe(REDIS_CHANNEL) do |on| + on.message do |channel, message| + handle_message(message) + end + end + end + end + + def handle_message(raw_message) + message = JSON.parse(raw_message, symbolize_names: true) + + case message[:event] + when 'phase.changed' + State.instance.update_local_phase(message[:phase]) + when 'routing.changed' + State.instance.update_local_routing(message[:routing]) + end + end + + def redis(&) = Rails.cache.redis.then(&) + end + + class Railtie < Rails::Railtie + initializer 'online_cutover.middleware' do |app| + app.config.after_initialize do + next unless OnlineCutover.enabled? + + app.middleware.insert_before( + ActiveRecord::Middleware::DatabaseSelector, + OnlineCutover::Middleware + ) + end + end + + initializer 'online_cutover.active_record' do + ActiveSupport.on_load(:active_record) do + # configure shard-based connection switching for all models (still overridable per-model) + Rails.application.config.after_initialize do + next unless OnlineCutover.enabled? + + ApplicationRecord.include OnlineCutover::Model + ApplicationRecord.connects_to_with_cutover + end + end + end + + initializer 'online_cutover.sidekiq' do + next unless defined?(Sidekiq) + + Sidekiq.configure_server do |config| + config.server_middleware do |chain| + chain.add OnlineCutover::SidekiqMiddleware + end + end + end + + config.after_initialize do + next unless OnlineCutover.enabled? + + # start Pub/Sub subscriber + OnlineCutover::Subscriber.instance.start + end + + config.after_initialize do + # graceful shutdown + at_exit do + OnlineCutover::Subscriber.instance.stop if OnlineCutover::Subscriber.instance.running? + end + end + end +end diff --git a/lib/tasks/online_cutover.rake b/lib/tasks/online_cutover.rake new file mode 100644 index 0000000000..7b86e6cdc0 --- /dev/null +++ b/lib/tasks/online_cutover.rake @@ -0,0 +1,94 @@ +# frozen_string_literal: true + +namespace :online_cutover do + desc 'print current status' + task status: :environment do + status = OnlineCutover.status + + puts "phase: #{status[:phase]}" + puts "routing: #{status[:routing]}" + puts "started: #{status[:started]}" + puts "quiesce timeout: #{status[:quiesce_timeout].to_i}s" + end + + desc 'set quiescing phase - blocks new work' + task quiesce: :environment do + if OnlineCutover.current_phase.quiescing? + puts 'already in quiescing phase.' + + exit 0 + end + + puts 'setting phase to quiescing...' + + OnlineCutover.set_phase!(OnlineCutover::PHASE_QUIESCING) + + puts "phase is now: #{OnlineCutover.current_phase}" + puts + puts 'new requests/jobs will now block before acquiring database connections. next:' + puts + puts '1. wait for existing work to complete' + puts '2. wait for replication lag to be 0' + puts '3. perform the database promotion' + puts + puts 'after promotion, run: rake online_cutover:promote' + end + + desc 'set promoted routing - all traffic routed to promoted replica' + task promote: :environment do + unless OnlineCutover.current_phase.quiescing? + warn "WARNING: system is not in quiescing phase (current: #{OnlineCutover.current_phase})" + exit 1 + end + + puts 'setting routing to promoted...' + + OnlineCutover.set_routing!(OnlineCutover::ROUTING_PROMOTED) + + puts "routing is now: #{OnlineCutover.current_routing}" + puts + puts 'all database traffic now routes to the promoted replica.' + puts + puts 'to resume normal traffic, run: rake online_cutover:resume' + end + + desc 'resume normal operations on promoted database' + task resume: :environment do + unless OnlineCutover.current_routing.promoted? + warn "WARNING: system is not in promoted routing (current: #{OnlineCutover.current_routing})" + exit 1 + end + + puts 'resuming traffic...' + + OnlineCutover.set_phase!(OnlineCutover::PHASE_NORMAL) + + puts "phase is now: #{OnlineCutover.current_phase}" + puts + puts 'traffic has resumed. Cutover complete.' + end + + desc 'abort cutover - all traffic routed to primary' + task abort: :environment do + puts 'aborting cutover...' + + OnlineCutover.set_routing!(OnlineCutover::ROUTING_ABORTED) + OnlineCutover.set_phase!(OnlineCutover::PHASE_NORMAL) + + puts "routing is now: #{OnlineCutover.current_routing}" + puts "phase is now: #{OnlineCutover.current_phase}" + puts + puts 'cutover aborted. All traffic routed to primary.' + puts 'the replica will NOT be used.' + end + + desc 'reset cutover - write traffic routed to primary with reads routed to replica' + task reset: :environment do + puts 'resetting cutover state...' + + OnlineCutover.set_routing!(OnlineCutover::ROUTING_NORMAL) + OnlineCutover.set_phase!(OnlineCutover::PHASE_NORMAL) + + puts 'state reset to defaults (phase: normal, routing: normal).' + end +end diff --git a/spec/lib/online_cutover_spec.rb b/spec/lib/online_cutover_spec.rb new file mode 100644 index 0000000000..4dfa016133 --- /dev/null +++ b/spec/lib/online_cutover_spec.rb @@ -0,0 +1,452 @@ +# frozen_string_literal: true + +require 'rails_helper' +require 'spec_helper' + +require_dependency Rails.root / 'lib' / 'online_cutover' + +describe OnlineCutover do + let(:redis) { Rails.cache.redis } + + after do + OnlineCutover.state.reset! + end + + describe '.configure' do + around do |example| + config_was = described_class.instance_variable_get(:@configuration) + described_class.instance_variable_set(:@configuration, nil) + + example.run + ensure + described_class.instance_variable_set(:@configuration, config_was) + end + + it 'should yield configuration' do + described_class.configure do |config| + config.quiesce_timeout = 60.seconds + config.state_ttl = 2.hours + end + + expect(described_class.configuration.quiesce_timeout).to eq(60.seconds) + expect(described_class.configuration.state_ttl).to eq(2.hours) + end + + it 'should have sensible defaults' do + expect(described_class.configuration.quiesce_timeout).to eq(30.seconds) + expect(described_class.configuration.state_ttl).to eq(1.hour) + end + + it 'should support proc for debug' do + described_class.configure { it.debug = -> { true } } + + expect(described_class.configuration.debug?).to be(true) + end + end + + describe 'phase transitions' do + it 'should default to normal phase' do + expect(described_class.current_phase).to eq(OnlineCutover::PHASE_NORMAL) + end + + it 'should transition to quiescing' do + described_class.set_phase!(OnlineCutover::PHASE_QUIESCING) + + expect(described_class.current_phase).to eq(OnlineCutover::PHASE_QUIESCING) + expect(described_class.current_phase.quiescing?).to be(true) + expect(described_class.current_phase.normal?).to be(false) + end + + it 'should transition back to normal' do + described_class.set_phase!(OnlineCutover::PHASE_QUIESCING) + described_class.set_phase!(OnlineCutover::PHASE_NORMAL) + + expect(described_class.current_phase).to eq(OnlineCutover::PHASE_NORMAL) + expect(described_class.current_phase.normal?).to be(true) + end + + it 'should persist to Redis' do + described_class.set_phase!(OnlineCutover::PHASE_QUIESCING) + + raw = redis.then { it.get(OnlineCutover::REDIS_STATE_KEY) } + state = JSON.parse(raw, symbolize_names: true) + + expect(state[:phase]).to eq(OnlineCutover::PHASE_QUIESCING) + end + + it 'should reject invalid phases' do + expect { described_class.set_phase!('invalid') }.to raise_error(OnlineCutover::InvalidPhaseError) + end + end + + describe 'routing transitions' do + it 'should default to current routing' do + expect(described_class.current_routing).to eq(OnlineCutover::ROUTING_NORMAL) + end + + it 'should transition to promoted' do + described_class.set_routing!(OnlineCutover::ROUTING_PROMOTED) + + expect(described_class.current_routing).to eq(OnlineCutover::ROUTING_PROMOTED) + expect(described_class.current_routing.promoted?).to be(true) + expect(described_class.current_routing.normal?).to be(false) + end + + it 'should transition to aborted' do + described_class.set_routing!(OnlineCutover::ROUTING_ABORTED) + + expect(described_class.current_routing).to eq(OnlineCutover::ROUTING_ABORTED) + expect(described_class.current_routing.aborted?).to be(true) + end + + it 'should persist to Redis' do + described_class.set_routing!(OnlineCutover::ROUTING_PROMOTED) + + raw = redis.then { it.get(OnlineCutover::REDIS_STATE_KEY) } + state = JSON.parse(raw, symbolize_names: true) + + expect(state[:routing]).to eq(OnlineCutover::ROUTING_PROMOTED) + end + + it 'should reject invalid routings' do + expect { described_class.set_routing!('invalid') }.to raise_error(OnlineCutover::InvalidRoutingError) + end + end + + describe '.started?' do + it 'should return false when in nominal state' do + expect(described_class.started?).to be(false) + end + + it 'should return true when quiescing' do + described_class.set_phase!(OnlineCutover::PHASE_QUIESCING) + + expect(described_class.started?).to be(true) + end + + it 'should return true when promoted' do + described_class.set_routing!(OnlineCutover::ROUTING_PROMOTED) + + expect(described_class.started?).to be(true) + end + + it 'should return true when aborted' do + described_class.set_routing!(OnlineCutover::ROUTING_ABORTED) + + expect(described_class.started?).to be(true) + end + end + + describe '.status' do + it 'should return current state' do + status = described_class.status + + expect(status[:phase]).to eq(OnlineCutover::PHASE_NORMAL) + expect(status[:routing]).to eq(OnlineCutover::ROUTING_NORMAL) + expect(status[:started]).to be(false) + expect(status[:quiesce_timeout]).to eq(30.seconds) + end + end + + describe OnlineCutover::State do + describe '#sync_from_redis!' do + it 'should sync phase from Redis' do + redis.then do |conn| + conn.set(OnlineCutover::REDIS_STATE_KEY, JSON.generate(phase: OnlineCutover::PHASE_QUIESCING, routing: OnlineCutover::ROUTING_NORMAL)) + end + + described_class.instance.sync_from_redis! + + expect(OnlineCutover.current_phase).to eq(OnlineCutover::PHASE_QUIESCING) + end + + it 'should sync routing from Redis' do + redis.then do |conn| + conn.set(OnlineCutover::REDIS_STATE_KEY, JSON.generate(phase: OnlineCutover::PHASE_NORMAL, routing: OnlineCutover::ROUTING_PROMOTED)) + end + + described_class.instance.sync_from_redis! + + expect(OnlineCutover.current_routing).to eq(OnlineCutover::ROUTING_PROMOTED) + end + + it 'should handle missing key gracefully' do + redis.then { it.del(OnlineCutover::REDIS_STATE_KEY) } + + described_class.instance.sync_from_redis! + + expect(OnlineCutover.current_phase).to eq(OnlineCutover::PHASE_NORMAL) + expect(OnlineCutover.current_routing).to eq(OnlineCutover::ROUTING_NORMAL) + end + end + + describe '#update_local_phase' do + it 'should update local phase without touching Redis' do + described_class.instance.update_local_phase(OnlineCutover::PHASE_QUIESCING) + + expect(OnlineCutover.current_phase).to eq(OnlineCutover::PHASE_QUIESCING) + + # Redis should still have default + raw = redis.then { it.get(OnlineCutover::REDIS_STATE_KEY) } + expect(raw).to be_nil + end + end + + describe '#update_local_routing' do + it 'should update local routing without touching Redis' do + described_class.instance.update_local_routing(OnlineCutover::ROUTING_PROMOTED) + + expect(OnlineCutover.current_routing).to eq(OnlineCutover::ROUTING_PROMOTED) + + # Redis should still have default + raw = redis.then { it.get(OnlineCutover::REDIS_STATE_KEY) } + expect(raw).to be_nil + end + end + end + + describe OnlineCutover::Middleware do + let(:app) { ->(env) { [200, { 'Content-Type' => 'text/plain' }, ['OK']] } } + let(:middleware) { described_class.new(app) } + let(:env) { Rack::MockRequest.env_for('/test') } + + context 'when not quiescing' do + it 'should pass through normally' do + status, headers, body = middleware.call(env) + + expect(status).to eq(200) + expect(headers).to include('Content-Type' => 'text/plain') + expect(body).to eq(['OK']) + end + end + + context 'when quiescing' do + before do + OnlineCutover.set_phase!(OnlineCutover::PHASE_QUIESCING) + end + + it 'should block until resumed' do + Thread.new do + sleep 0.1 # resume after a brief delay + + OnlineCutover.set_phase!(OnlineCutover::PHASE_NORMAL) + end + + status, headers, body = middleware.call(env) + + expect(status).to eq(200) + end + + context 'when timeout expires' do + around do |example| + timeout_was, OnlineCutover.configuration.quiesce_timeout = OnlineCutover.configuration.quiesce_timeout, 0.1.seconds + + example.run + ensure + OnlineCutover.configuration.quiesce_timeout = timeout_was + end + + it 'should return 503' do + status, headers, body = middleware.call(env) + + expect(status).to eq(503) + expect(headers).to include('Content-Type' => 'text/plain', 'Retry-After' => '5') + expect(body).to eq(['Service Unavailable']) + end + end + end + + context 'with shard routing' do + let(:app) { ->(env) { [200, {}, [ActiveRecord::Base.current_shard.to_s]] } } + + context 'when routing is default' do + it 'should use current shard' do + status, headers, body = middleware.call(env) + + expect(body.first).to eq('normal') + end + end + + context 'when routing is current' do + before do + OnlineCutover.set_routing!(OnlineCutover::ROUTING_NORMAL) + end + + it 'should use current shard' do + status, headers, body = middleware.call(env) + + expect(body.first).to eq(OnlineCutover::ROUTING_NORMAL) + end + end + + context 'when routing is promoted' do + before do + OnlineCutover.set_routing!(OnlineCutover::ROUTING_PROMOTED) + end + + it 'should use promoted shard' do + status, headers, body = middleware.call(env) + + expect(body.first).to eq(OnlineCutover::ROUTING_PROMOTED) + end + end + + context 'when routing is aborted' do + before do + OnlineCutover.set_routing!(OnlineCutover::ROUTING_ABORTED) + end + + it 'should use aborted shard' do + status, headers, body = middleware.call(env) + + expect(body.first).to eq(OnlineCutover::ROUTING_ABORTED) + end + end + end + end + + describe OnlineCutover::SidekiqMiddleware do + let(:middleware) { described_class.new } + let(:worker) { double('worker') } + let(:job) { {} } + let(:queue) { 'default' } + + context 'when not quiescing' do + it 'should yield to the block' do + yielded = false + + middleware.call(worker, job, queue) { yielded = true } + + expect(yielded).to be(true) + end + end + + context 'when quiescing' do + before do + OnlineCutover.set_phase!(OnlineCutover::PHASE_QUIESCING) + end + + it 'should block until resumed' do + Thread.new do + sleep 0.1 + OnlineCutover.set_phase!(OnlineCutover::PHASE_NORMAL) + end + + yielded = false + middleware.call(worker, job, queue) { yielded = true } + + expect(yielded).to be(true) + end + + context 'when timeout expires' do + around do |example| + config_was = OnlineCutover.configuration.quiesce_timeout + OnlineCutover.configuration.quiesce_timeout = 0.1.seconds + + example.run + ensure + OnlineCutover.configuration.quiesce_timeout = config_was + end + + it 'should raise' do + expect { + middleware.call(worker, job, queue) { } + }.to raise_error(OnlineCutover::QuiesceTimeoutError) + end + end + end + + context 'when routing is normal' do + before do + OnlineCutover.set_routing!(OnlineCutover::ROUTING_NORMAL) + end + + it 'should use normal shard' do + shard = nil + + middleware.call(worker, job, queue) { shard = ActiveRecord::Base.current_shard } + + expect(shard).to eq(OnlineCutover::ROUTING_NORMAL) + end + end + + context 'when routing is promoted' do + before do + OnlineCutover.set_routing!(OnlineCutover::ROUTING_PROMOTED) + end + + it 'should use promoted shard' do + shard = nil + + middleware.call(worker, job, queue) { shard = ActiveRecord::Base.current_shard } + + expect(shard).to eq(OnlineCutover::ROUTING_PROMOTED) + end + end + + context 'when routing is aborted' do + before do + OnlineCutover.set_routing!(OnlineCutover::ROUTING_ABORTED) + end + + it 'should use aborted shard' do + shard = nil + + middleware.call(worker, job, queue) { shard = ActiveRecord::Base.current_shard } + + expect(shard).to eq(OnlineCutover::ROUTING_ABORTED) + end + end + end + + describe '.current_shard' do + it 'should return current shard by default' do + expect(described_class.current_shard).to eq(OnlineCutover::SHARD_NORMAL) + end + + it 'should return promoted shard when promoted' do + OnlineCutover.set_routing!(OnlineCutover::ROUTING_PROMOTED) + + expect(described_class.current_shard).to eq(OnlineCutover::SHARD_PROMOTED) + end + + it 'should return aborted shard when aborted' do + OnlineCutover.set_routing!(OnlineCutover::ROUTING_ABORTED) + + expect(described_class.current_shard).to eq(OnlineCutover::SHARD_ABORTED) + end + end + + describe OnlineCutover::Subscriber do + describe '#start' do + it 'should sync from redis' do + redis.then do |conn| + conn.set(OnlineCutover::REDIS_STATE_KEY, JSON.generate(phase: OnlineCutover::PHASE_QUIESCING, routing: OnlineCutover::ROUTING_NORMAL)) + end + + described_class.instance.start + + expect(OnlineCutover.current_phase).to eq(OnlineCutover::PHASE_QUIESCING) + ensure + described_class.instance.stop + end + + it 'should be running after start' do + described_class.instance.start + + expect(described_class.instance.running?).to be(true) + ensure + described_class.instance.stop + end + end + + describe '#stop' do + it 'should stop the subscriber' do + described_class.instance.start + described_class.instance.stop + + expect(described_class.instance.running?).to be(false) + end + end + end +end From 0932e2fa8abd179c7f04bf3cc16490009d0fdd6c Mon Sep 17 00:00:00 2001 From: Zeke Gabrielse Date: Tue, 10 Feb 2026 16:44:32 -0600 Subject: [PATCH 2/2] stfu sentry --- config/initializers/sentry.rb | 2 ++ 1 file changed, 2 insertions(+) diff --git a/config/initializers/sentry.rb b/config/initializers/sentry.rb index 65ac3b5035..9b7b572a94 100644 --- a/config/initializers/sentry.rb +++ b/config/initializers/sentry.rb @@ -7,5 +7,7 @@ config.profiles_sample_rate = ENV.fetch('SENTRY_PROFILES_SAMPLE_RATE') { 0.0 }.to_f config.excluded_exceptions += ENV.fetch('SENTRY_EXCLUDED_EXCEPTIONS') { '' }.split(',') config.environment = Rails.env + config.sdk_logger = Sentry::Logger.new(STDOUT) + config.sdk_logger.level = Logger::WARN end end