From 78592034b3cea3c167cbfd88574e43063d625c37 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Wed, 17 Jan 2024 13:41:22 -0500 Subject: [PATCH 01/10] Ensure sync client always sends error before UNBIND --- src/realm/sync/config.hpp | 2 + src/realm/sync/noinst/client_impl_base.cpp | 36 +- test/object-store/sync/flx_sync.cpp | 326 +++++++++++------- .../util/sync/flx_sync_harness.hpp | 1 + test/object-store/util/test_utils.hpp | 31 ++ 5 files changed, 249 insertions(+), 147 deletions(-) diff --git a/src/realm/sync/config.hpp b/src/realm/sync/config.hpp index 1ac0c374468..75371cc90ff 100644 --- a/src/realm/sync/config.hpp +++ b/src/realm/sync/config.hpp @@ -131,6 +131,8 @@ enum class SyncClientHookEvent { ErrorMessageReceived, SessionSuspended, BindMessageSent, + IdentMessageSent, + ClientErrorMessageSent, BootstrapBatchAboutToProcess, }; diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 5f7c03c8b33..cff11a7365a 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -1671,7 +1671,7 @@ void Session::activate() m_download_progress = m_progress.download; REALM_ASSERT_3(m_last_version_available, >=, m_progress.upload.client_version); - logger.debug("last_version_available = %1", m_last_version_available); // Throws + logger.debug("last_version_available = %1", m_last_version_available); // Throws logger.debug("progress_download_server_version = %1", m_progress.download.server_version); // Throws logger.debug("progress_download_client_version = %1", m_progress.download.last_integrated_client_version); // Throws @@ -1754,6 +1754,9 @@ void Session::send_message() REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state); REALM_ASSERT(m_enlisted_to_send); m_enlisted_to_send = false; + if (m_error_to_send && m_bind_message_sent && have_client_file_ident()) + return send_json_error_message(); // Throws + if (m_state == Deactivating || m_error_message_received || m_suspended) { // Deactivation has been initiated. If the UNBIND message has not been // sent yet, there is no point in sending it. Instead, we can let the @@ -1777,6 +1780,11 @@ void Session::send_message() if (!m_bind_message_sent) return send_bind_message(); // Throws + // Stop sending upload, mark and query messages when the client detects an error. + if (m_client_error) { + return; + } + if (!m_ident_message_sent) { if (have_client_file_ident()) send_ident_message(); // Throws @@ -1791,13 +1799,6 @@ void Session::send_message() return send_test_command_message(); } - if (m_error_to_send) - return send_json_error_message(); // Throws - - // Stop sending upload, mark and query messages when the client detects an error. - if (m_client_error) { - return; - } if (m_target_download_mark > m_last_download_mark_sent) return send_mark_message(); // Throws @@ -1943,7 +1944,8 @@ void Session::send_ident_message() m_conn.initiate_write_message(out, this); // Throws m_ident_message_sent = true; - + call_debug_hook(SyncClientHookEvent::IdentMessageSent, m_progress, m_last_sent_flx_query_version, + DownloadBatchState::SteadyState, 0); // Other messages may be waiting to be sent enlist_to_send(); // Throws } @@ -2153,8 +2155,9 @@ void Session::send_unbind_message() void Session::send_json_error_message() { - REALM_ASSERT_EX(m_state == Active, m_state); - REALM_ASSERT(m_ident_message_sent); + REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state); + REALM_ASSERT(m_bind_message_sent); + REALM_ASSERT(have_client_file_ident()); REALM_ASSERT(!m_unbind_message_sent); REALM_ASSERT(m_error_to_send); REALM_ASSERT(m_client_error); @@ -2162,19 +2165,22 @@ void Session::send_json_error_message() ClientProtocol& protocol = m_conn.get_client_protocol(); OutputBuffer& out = m_conn.get_output_buffer(); session_ident_type session_ident = get_ident(); - auto protocol_error = m_client_error->error_for_server; + auto protocol_error = static_cast(m_client_error->error_for_server); auto message = util::format("%1", m_client_error->to_status()); - logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, static_cast(protocol_error), + logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, protocol_error, session_ident); // Throws nlohmann::json error_body_json; error_body_json["message"] = std::move(message); - protocol.make_json_error_message(out, session_ident, static_cast(protocol_error), + protocol.make_json_error_message(out, session_ident, protocol_error, error_body_json.dump()); // Throws m_conn.initiate_write_message(out, this); // Throws - m_error_to_send = false; + + call_debug_hook(SyncClientHookEvent::ClientErrorMessageSent, + ProtocolErrorInfo(protocol_error, message, IsFatal{false})); + enlist_to_send(); // Throws } diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 467d29e318a..9012884934c 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2665,20 +2665,32 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot SyncConfig::FLXSyncEnabled{}); interrupted_realm_config.cache = false; - auto check_interrupted_state = [&](const DBRef& realm) { - auto tr = realm->start_read(); - auto top_level = tr->get_table("class_TopLevel"); - REQUIRE(top_level); - REQUIRE(top_level->is_empty()); - - auto sub_store = sync::SubscriptionStore::create(realm); - auto version_info = sub_store->get_version_info(); - REQUIRE(version_info.latest == 1); - REQUIRE(version_info.active == 0); - auto latest_subs = sub_store->get_latest(); - REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping); - REQUIRE(latest_subs.size() == 1); - REQUIRE(latest_subs.at(0).object_class_name == "TopLevel"); + auto check_interrupted_state = [&] { + _impl::RealmCoordinator::assert_no_open_realms(); + DBOptions options; + options.encryption_key = test_util::crypt_key(); + auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); + auto logger = util::Logger::get_default_logger(); + sync::PendingBootstrapStore bootstrap_store(realm, *logger); + REQUIRE(bootstrap_store.has_pending()); + auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); + REQUIRE(pending_batch.query_version == 1); + REQUIRE(pending_batch.progress); + { + auto tr = realm->start_read(); + auto top_level = tr->get_table("class_TopLevel"); + REQUIRE(top_level); + REQUIRE(top_level->is_empty()); + + auto sub_store = sync::SubscriptionStore::create(realm); + auto version_info = sub_store->get_version_info(); + REQUIRE(version_info.latest == 1); + REQUIRE(version_info.active == 0); + auto latest_subs = sub_store->get_latest(); + REQUIRE(latest_subs.state() == sync::SubscriptionSet::State::Bootstrapping); + REQUIRE(latest_subs.size() == 1); + REQUIRE(latest_subs.at(0).object_class_name == "TopLevel"); + } }; auto mutate_realm = [&] { @@ -2690,7 +2702,9 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot }); }; - SECTION("unknown exception occurs during bootstrap application on session startup") { + SECTION("exception occurs during bootstrap application on session startup") { + enum ExceptionType { Novel, BadChangeset }; + auto exception_to_throw = GENERATE(ExceptionType::Novel, ExceptionType::BadChangeset); { auto [interrupted_promise, interrupted] = util::make_promise_future(); Realm::Config config = interrupted_realm_config; @@ -2727,53 +2741,98 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - _impl::RealmCoordinator::assert_no_open_realms(); - - // Open up the realm without the sync client attached and verify that the realm got interrupted in the state - // we expected it to be in. - { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); - auto logger = util::Logger::get_default_logger(); - sync::PendingBootstrapStore bootstrap_store(realm, *logger); - REQUIRE(bootstrap_store.has_pending()); - auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); - REQUIRE(pending_batch.query_version == 1); - REQUIRE(pending_batch.progress); - - check_interrupted_state(realm); - } - - auto error_pf = util::make_promise_future(); - interrupted_realm_config.sync_config->error_handler = - [promise = std::make_shared>(std::move(error_pf.promise))]( - std::shared_ptr, SyncError error) { - promise->emplace_value(std::move(error)); - }; + check_interrupted_state(); + enum class State { + Interrupted, + ExceptionThrown, + ClientErrorMessageSentToServer, + ClientErrorPropagatedToHandler + }; + TestingStateMachine state(State::Interrupted); + + std::optional propagated_error; + interrupted_realm_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { + propagated_error = std::move(error); + state.transition_with([&](State cur_state) -> std::optional { + REQUIRE(cur_state == State::ExceptionThrown); + return State::ClientErrorPropagatedToHandler; + }); + }; interrupted_realm_config.sync_config->on_sync_client_event_hook = - [&, download_message_received = false](std::weak_ptr, - const SyncClientHookData& data) mutable { + [&, download_message_received = false, bind_message_sent = false, + ident_message_sent = false](std::weak_ptr, const SyncClientHookData& data) mutable { if (data.event == SyncClientHookEvent::DownloadMessageReceived) { download_message_received = true; } - if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) { - return SyncClientHookAction::NoAction; + else if (data.event == SyncClientHookEvent::IdentMessageSent) { + ident_message_sent = true; + } + else if (data.event == SyncClientHookEvent::BindMessageSent) { + bind_message_sent = true; + } + else if (data.event == SyncClientHookEvent::BootstrapBatchAboutToProcess) { + REQUIRE(!ident_message_sent); + REQUIRE(!download_message_received); + state.transition_with([&](State cur_state) -> std::optional { + REQUIRE(cur_state == State::Interrupted); + return State::ExceptionThrown; + }); + switch (exception_to_throw) { + case ExceptionType::Novel: + throw NovelException{}; + case ExceptionType::BadChangeset: + throw sync::IntegrationException(ErrorCodes::BadChangeset, "simulated failure"); + } + } + else if (data.event == SyncClientHookEvent::ClientErrorMessageSent) { + REQUIRE(bind_message_sent); + REQUIRE(!ident_message_sent); + state.transition_with([&](State cur_state) -> std::optional { + REQUIRE(cur_state == State::ClientErrorPropagatedToHandler); + return State::ClientErrorMessageSentToServer; + }); } - REQUIRE(!download_message_received); - throw NovelException{}; return SyncClientHookAction::NoAction; }; - auto realm = Realm::get_shared_realm(interrupted_realm_config); - const auto& error = error_pf.future.get(); - REQUIRE(!error.is_fatal); - REQUIRE(error.server_requests_action == sync::ProtocolErrorInfo::Action::Warning); - REQUIRE(error.status == ErrorCodes::UnknownError); - REQUIRE_THAT(error.status.reason(), - Catch::Matchers::ContainsSubstring("Oh no, a really weird exception happened!")); + auto wait_for_client_error = GENERATE(true, false); + { + auto realm = Realm::get_shared_realm(interrupted_realm_config); + if (wait_for_client_error) { + state.wait_for(State::ClientErrorPropagatedToHandler); + } + else { + state.wait_for(State::ExceptionThrown); + } + } + + std::vector server_errors; + timed_sleeping_wait_for([&] { + server_errors = + harness.session().app_session().admin_api.get_errors(harness.session().app_session().server_app_id); + return !server_errors.empty(); + }); + + std::pair expected_error = [&] { + switch (exception_to_throw) { + case ExceptionType::Novel: + return std::make_pair(ErrorCodes::UnknownError, NovelException{}.what()); + case ExceptionType::BadChangeset: + return std::make_pair(ErrorCodes::BadChangeset, "simulated failure"); + } + }(); + if (wait_for_client_error) { + REQUIRE(propagated_error); + REQUIRE(!propagated_error->is_fatal); + REQUIRE(propagated_error->server_requests_action == sync::ProtocolErrorInfo::Action::Warning); + REQUIRE(propagated_error->status == expected_error.first); + REQUIRE_THAT(propagated_error->status.reason(), + Catch::Matchers::ContainsSubstring(expected_error.second)); + } + + REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring(expected_error.second))); } SECTION("exception occurs during bootstrap application") { @@ -2820,23 +2879,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - _impl::RealmCoordinator::assert_no_open_realms(); - - // Open up the realm without the sync client attached and verify that the realm got interrupted in the state - // we expected it to be in. - { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); - util::StderrLogger logger; - sync::PendingBootstrapStore bootstrap_store(realm, logger); - REQUIRE(bootstrap_store.has_pending()); - auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); - REQUIRE(pending_batch.query_version == 1); - REQUIRE(pending_batch.progress); - - check_interrupted_state(realm); - } + check_interrupted_state(); auto realm = Realm::get_shared_realm(interrupted_realm_config); auto table = realm->read_group().get_table("class_TopLevel"); @@ -2850,6 +2893,14 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot for (auto& id : obj_ids_at_end) { REQUIRE(table->find_primary_key(Mixed{id})); } + + std::vector server_errors; + timed_sleeping_wait_for([&] { + server_errors = + harness.session().app_session().admin_api.get_errors(harness.session().app_session().server_app_id); + return !server_errors.empty(); + }); + REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring(error_status.reason()))); } SECTION("interrupted before final bootstrap message") { @@ -2889,25 +2940,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - _impl::RealmCoordinator::assert_no_open_realms(); - - // Open up the realm without the sync client attached and verify that the realm got interrupted in the state - // we expected it to be in. - { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); - auto logger = util::Logger::get_default_logger(); - sync::PendingBootstrapStore bootstrap_store(realm, *logger); - REQUIRE(bootstrap_store.has_pending()); - auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); - REQUIRE(pending_batch.query_version == 1); - REQUIRE(!pending_batch.progress); - REQUIRE(pending_batch.remaining_changesets == 0); - REQUIRE(pending_batch.changesets.size() == 1); - - check_interrupted_state(realm); - } + check_interrupted_state(); // Now we'll open a different realm and make some changes that would leave orphan objects on the client // if the bootstrap batches weren't being cached until lastInBatch were true. @@ -2966,25 +2999,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - _impl::RealmCoordinator::assert_no_open_realms(); - - // Open up the realm without the sync client attached and verify that the realm got interrupted in the state - // we expected it to be in. - { - DBOptions options; - options.encryption_key = test_util::crypt_key(); - auto realm = DB::create(sync::make_client_replication(), interrupted_realm_config.path, options); - auto logger = util::Logger::get_default_logger(); - sync::PendingBootstrapStore bootstrap_store(realm, *logger); - REQUIRE(bootstrap_store.has_pending()); - auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); - REQUIRE(pending_batch.query_version == 1); - REQUIRE(static_cast(pending_batch.progress)); - REQUIRE(pending_batch.remaining_changesets == 0); - REQUIRE(pending_batch.changesets.size() == 6); - - check_interrupted_state(realm); - } + check_interrupted_state(); // Now we'll open a different realm and make some changes that would leave orphan objects on the client // if the bootstrap batches weren't being cached until lastInBatch were true. @@ -3250,6 +3265,16 @@ TEST_CASE("flx: data ingest", "[sync][flx][data ingest][baas]") { auto err = error_future.get(); CHECK(error_count == 2); + + std::vector server_errors; + timed_sleeping_wait_for([&] { + server_errors = harness->session().app_session().admin_api.get_errors( + harness->session().app_session().server_app_id); + return !server_errors.empty(); + }); + REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring( + "BadChangeset: Failed to transform received changeset: Schema mismatch: " + "'Asymmetric' is asymmetric on one side, but not on the other."s))); }); } @@ -3537,34 +3562,71 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { // An integration error is simulated while bootstrapping. // This results in the client sending an error message to the server. SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); - config.sync_config->simulate_integration_error = true; - - auto [error_promise, error_future] = util::make_promise_future(); - auto error_count = 0; - auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)), - &error_count](std::shared_ptr, SyncError err) mutable { - ++error_count; - if (error_count == 1) { - // Bad changeset detected by the client. - CHECK(err.status == ErrorCodes::BadChangeset); - } - else if (error_count == 2) { - // Server asking for a client reset. - CHECK(err.status == ErrorCodes::SyncClientResetRequired); - CHECK(err.is_client_reset_requested()); - promise.get_promise().emplace_value(std::move(err)); - } + auto mutate_subscriptions = [&](SharedRealm realm) { + auto table = realm->read_group().get_table("class_TopLevel"); + auto new_query = realm->get_latest_subscription_set().make_mutable_copy(); + new_query.insert_or_assign(Query(table)); + new_query.commit(); }; - config.sync_config->error_handler = err_handler; - auto realm = Realm::get_shared_realm(config); - auto table = realm->read_group().get_table("class_TopLevel"); - auto new_query = realm->get_latest_subscription_set().make_mutable_copy(); - new_query.insert_or_assign(Query(table)); - new_query.commit(); + SECTION("immediately close session after bad changeset") { + auto [error_promise, error_future] = util::make_promise_future(); + config.sync_config->on_sync_client_event_hook = + [promise = util::CopyablePromiseHolder(std::move(error_promise))]( + std::weak_ptr weak_sess, const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) { + return SyncClientHookAction::NoAction; + } + + auto sess = weak_sess.lock(); + sess->pause(); + + promise.get_promise().emplace_value(); + throw sync::IntegrationException(ErrorCodes::BadChangeset, "simulated failure"); + }; + + auto realm = Realm::get_shared_realm(config); + mutate_subscriptions(realm); + + error_future.get(); + } - auto err = error_future.get(); - CHECK(error_count == 2); + SECTION("check for client reset error") { + config.sync_config->simulate_integration_error = true; + auto [error_promise, error_future] = util::make_promise_future(); + auto error_count = 0; + auto err_handler = [promise = util::CopyablePromiseHolder(std::move(error_promise)), + &error_count](std::shared_ptr, SyncError err) mutable { + ++error_count; + if (error_count == 1) { + // Bad changeset detected by the client. + CHECK(err.status == ErrorCodes::BadChangeset); + } + else if (error_count == 2) { + // Server asking for a client reset. + CHECK(err.status == ErrorCodes::SyncClientResetRequired); + CHECK(err.is_client_reset_requested()); + promise.get_promise().emplace_value(std::move(err)); + } + }; + + config.sync_config->error_handler = err_handler; + + auto realm = Realm::get_shared_realm(config); + mutate_subscriptions(realm); + + auto err = error_future.get(); + CHECK(error_count == 2); + } + + std::vector server_errors; + timed_sleeping_wait_for([&] { + server_errors = + harness.session().app_session().admin_api.get_errors(harness.session().app_session().server_app_id); + return !server_errors.empty(); + }); + REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring( + "BadChangeset: simulated failure (ProtocolErrorCode=201)"s))); } TEST_CASE("flx: bootstraps contain all changes", "[sync][flx][bootstrap][baas]") { diff --git a/test/object-store/util/sync/flx_sync_harness.hpp b/test/object-store/util/sync/flx_sync_harness.hpp index 5c4b0701268..bdcb61541e2 100644 --- a/test/object-store/util/sync/flx_sync_harness.hpp +++ b/test/object-store/util/sync/flx_sync_harness.hpp @@ -22,6 +22,7 @@ #include #include +#include #include diff --git a/test/object-store/util/test_utils.hpp b/test/object-store/util/test_utils.hpp index 7ae8cb73c00..9d090f93933 100644 --- a/test/object-store/util/test_utils.hpp +++ b/test/object-store/util/test_utils.hpp @@ -66,6 +66,37 @@ class TestingStateMachine { E m_cur_state; }; +template +class VectorElemMatchesMatcher final : public Catch::Matchers::MatcherGenericBase { +public: + explicit VectorElemMatchesMatcher(Matcher&& matcher) + : m_matcher(std::move(matcher)) + { + } + + template + bool match(Container const& vec) const + { + return std::any_of(vec.begin(), vec.end(), [&](const auto& e) { + return m_matcher.match(e); + }); + } + + std::string describe() const override + { + return util::format("VectorElemMatches(%1)", m_matcher.toString()); + } + +private: + Matcher m_matcher; +}; + +template +inline auto VectorElemMatches(Matcher&& matcher) +{ + return VectorElemMatchesMatcher(std::move(matcher)); +} + template class ExceptionMatcher final : public Catch::Matchers::MatcherBase { public: From 9c7c9561692b07fe82768a64a87ccc2d94d2483b Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Wed, 17 Jan 2024 14:03:22 -0500 Subject: [PATCH 02/10] ensure realm is destroyed before throwing --- test/object-store/sync/flx_sync.cpp | 52 +++++++++++++++-------------- 1 file changed, 27 insertions(+), 25 deletions(-) diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 9012884934c..54dd68f8c5a 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -3562,33 +3562,36 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { // An integration error is simulated while bootstrapping. // This results in the client sending an error message to the server. SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); - auto mutate_subscriptions = [&](SharedRealm realm) { - auto table = realm->read_group().get_table("class_TopLevel"); - auto new_query = realm->get_latest_subscription_set().make_mutable_copy(); - new_query.insert_or_assign(Query(table)); - new_query.commit(); - }; - SECTION("immediately close session after bad changeset") { - auto [error_promise, error_future] = util::make_promise_future(); - config.sync_config->on_sync_client_event_hook = - [promise = util::CopyablePromiseHolder(std::move(error_promise))]( - std::weak_ptr weak_sess, const SyncClientHookData& data) mutable { - if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) { - return SyncClientHookAction::NoAction; - } - - auto sess = weak_sess.lock(); - sess->pause(); - - promise.get_promise().emplace_value(); - throw sync::IntegrationException(ErrorCodes::BadChangeset, "simulated failure"); - }; + enum class State { Initial, AboutToThrow, RealmDestroyed, Thrown }; + TestingStateMachine state(State::Initial); + config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr, + const SyncClientHookData& data) mutable { + if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) { + return SyncClientHookAction::NoAction; + } - auto realm = Realm::get_shared_realm(config); - mutate_subscriptions(realm); + state.transition_with([&](State cur_state) -> std::optional { + REQUIRE(cur_state == State::Initial); + return State::AboutToThrow; + }); + state.wait_for(State::RealmDestroyed); + state.transition_with([&](State cur_state) -> std::optional { + REQUIRE(cur_state == State::RealmDestroyed); + return State::Thrown; + }); + throw sync::IntegrationException(ErrorCodes::BadChangeset, "simulated failure"); + }; - error_future.get(); + { + auto realm = Realm::get_shared_realm(config); + state.wait_for(State::AboutToThrow); + } + state.transition_with([&](State cur_state) -> std::optional { + REQUIRE(cur_state == State::AboutToThrow); + return State::RealmDestroyed; + }); + state.wait_for(State::Thrown); } SECTION("check for client reset error") { @@ -3613,7 +3616,6 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { config.sync_config->error_handler = err_handler; auto realm = Realm::get_shared_realm(config); - mutate_subscriptions(realm); auto err = error_future.get(); CHECK(error_count == 2); From fa9e5cd35cf221d7ca8ffadcf94f1eb305231351 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Wed, 17 Jan 2024 14:51:57 -0500 Subject: [PATCH 03/10] fix bootstrap batching test --- test/object-store/sync/flx_sync.cpp | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 54dd68f8c5a..271621476a0 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2665,7 +2665,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot SyncConfig::FLXSyncEnabled{}); interrupted_realm_config.cache = false; - auto check_interrupted_state = [&] { + auto check_interrupted_state = [&](bool bootstrap_fully_received) { _impl::RealmCoordinator::assert_no_open_realms(); DBOptions options; options.encryption_key = test_util::crypt_key(); @@ -2675,7 +2675,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot REQUIRE(bootstrap_store.has_pending()); auto pending_batch = bootstrap_store.peek_pending(1024 * 1024 * 16); REQUIRE(pending_batch.query_version == 1); - REQUIRE(pending_batch.progress); + REQUIRE(pending_batch.progress.has_value() == bootstrap_fully_received); { auto tr = realm->start_read(); auto top_level = tr->get_table("class_TopLevel"); @@ -2741,7 +2741,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - check_interrupted_state(); + check_interrupted_state(true); enum class State { Interrupted, ExceptionThrown, @@ -2879,7 +2879,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - check_interrupted_state(); + check_interrupted_state(true); auto realm = Realm::get_shared_realm(interrupted_realm_config); auto table = realm->read_group().get_table("class_TopLevel"); @@ -2940,7 +2940,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - check_interrupted_state(); + check_interrupted_state(false); // Now we'll open a different realm and make some changes that would leave orphan objects on the client // if the bootstrap batches weren't being cached until lastInBatch were true. @@ -2999,7 +2999,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot realm->close(); } - check_interrupted_state(); + check_interrupted_state(true); // Now we'll open a different realm and make some changes that would leave orphan objects on the client // if the bootstrap batches weren't being cached until lastInBatch were true. From 06fed60310e8bd9a143b3dca3f0964358a427c40 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Wed, 17 Jan 2024 15:47:57 -0500 Subject: [PATCH 04/10] fix asan --- src/realm/sync/client.cpp | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index b0fd90b7002..9853f632fbf 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -1019,13 +1019,13 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con int64_t query_version, DownloadBatchState batch_state, size_t num_changesets) { - if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { - return SyncClientHookAction::NoAction; - } if (REALM_UNLIKELY(m_state != State::Active)) { return SyncClientHookAction::NoAction; } + if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { + return SyncClientHookAction::NoAction; + } SyncClientHookData data; data.event = event; data.batch_state = batch_state; @@ -1038,13 +1038,15 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info) { - if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { + if (REALM_UNLIKELY(m_state != State::Active)) { return SyncClientHookAction::NoAction; } - if (REALM_UNLIKELY(m_state != State::Active)) { + + if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { return SyncClientHookAction::NoAction; } + SyncClientHookData data; data.event = event; data.batch_state = DownloadBatchState::SteadyState; From 8a346f7755bc8dd4c6498d2c0b87b7e34ae7232f Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Thu, 18 Jan 2024 12:48:18 -0500 Subject: [PATCH 05/10] extend SessionImpl lifecycle to send errors while Deactivating --- src/realm/sync/client.cpp | 6 ++ src/realm/sync/noinst/client_impl_base.cpp | 72 ++++++++++++---------- src/realm/sync/noinst/client_impl_base.hpp | 8 ++- test/object-store/sync/flx_sync.cpp | 6 +- 4 files changed, 52 insertions(+), 40 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 9853f632fbf..f90abb10694 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -962,6 +962,9 @@ void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch SubscriptionStore* SessionImpl::get_flx_subscription_store() { + if (m_state != State::Active) { + return nullptr; + } // Should never be called if session is not active REALM_ASSERT_EX(m_state == State::Active, m_state); return m_wrapper.get_flx_subscription_store(); @@ -969,6 +972,9 @@ SubscriptionStore* SessionImpl::get_flx_subscription_store() MigrationStore* SessionImpl::get_migration_store() { + if (m_state != State::Active) { + return nullptr; + } // Should never be called if session is not active REALM_ASSERT_EX(m_state == State::Active, m_state); return m_wrapper.get_migration_store(); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index cff11a7365a..776673d522e 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -350,10 +350,6 @@ void Connection::initiate_session_deactivation(Session* sess) if (sess->m_state == Session::Deactivated) { finish_session_deactivation(sess); } - if (REALM_UNLIKELY(--m_num_active_sessions == 0)) { - if (m_activated && m_state == ConnectionState::disconnected) - m_on_idle->trigger(); - } } @@ -403,6 +399,11 @@ void ClientImpl::Connection::finish_session_deactivation(Session* sess) auto ident = sess->m_ident; m_sessions.erase(ident); m_session_history.erase(ident); + + if (REALM_UNLIKELY(--m_num_active_sessions == 0)) { + if (m_activated && m_state == ConnectionState::disconnected) + m_on_idle->trigger(); + } } void Connection::force_close() @@ -424,20 +425,13 @@ void Connection::force_close() m_disconnect_delay_in_progress = false; } - // We must copy any session pointers we want to close to a vector because force_closing - // the session may remove it from m_sessions and invalidate the iterator uses to loop - // through the map. By copying to a separate vector we ensure our iterators remain valid. - std::vector to_close; - for (auto& session_pair : m_sessions) { - if (session_pair.second->m_state == Session::State::Active) { - to_close.push_back(session_pair.second.get()); + for (auto it = m_sessions.begin(); it != m_sessions.end();) { + auto cur_sess_it = it++; + if (cur_sess_it->second->m_state == Session::Active) { + cur_sess_it->second->force_close(); } } - for (auto& sess : to_close) { - sess->force_close(); - } - logger.debug("Force closed idle connection"); } @@ -832,9 +826,9 @@ void Connection::handle_connection_established() fast_reconnect = true; } - for (auto& p : m_sessions) { - Session& sess = *p.second; - sess.connection_established(fast_reconnect); // Throws + for (auto it = m_sessions.begin(); it != m_sessions.end();) { + auto cur_sess_it = it++; + cur_sess_it->second->connection_established(fast_reconnect); } report_connection_state_change(ConnectionState::connected); // Throws @@ -1174,8 +1168,11 @@ void Connection::disconnect(const SessionErrorInfo& info) auto j = i++; Session& sess = *j->second; sess.connection_lost(); // Throws - if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated) + if (sess.m_state == Session::Unactivated || sess.m_state == Session::Deactivated) { m_sessions.erase(j); + REALM_ASSERT(m_num_active_sessions); + --m_num_active_sessions; + } } } @@ -1592,7 +1589,7 @@ void Session::on_integration_failure(const IntegrationException& error) // Since the deactivation process has not been initiated, the UNBIND // message cannot have been sent unless an ERROR message was received. REALM_ASSERT(m_suspended || m_error_message_received || !m_unbind_message_sent); - if (m_ident_message_sent && !m_error_message_received && !m_suspended) { + if (m_bind_message_sent && !m_error_message_received && !m_suspended) { ensure_enlisted_to_send(); // Throws } } @@ -1710,9 +1707,6 @@ void Session::initiate_deactivation() m_state = Deactivating; - if (!m_suspended) - m_conn.one_less_active_unsuspended_session(); // Throws - if (m_enlisted_to_send) { REALM_ASSERT(!unbind_process_complete()); return; @@ -1721,14 +1715,17 @@ void Session::initiate_deactivation() // Deactivate immediately if the BIND message has not yet been sent and the // session is not enlisted to send, or if the unbinding process has already // completed. - if (!m_bind_message_sent || unbind_process_complete()) { + if ((!m_bind_message_sent || unbind_process_complete()) && !m_error_to_send) { complete_deactivation(); // Throws // Life cycle state is now Deactivated return; } - // Ready to send the UNBIND message, if it has not already been sent - if (!m_unbind_message_sent) { + // Ready to send the UNBIND message, if it has not already been sent, unless we've + // never sent the BIND message but still have an error message to send. In that case + // when the connection becomes connected we'll send the error message and immediately + // complete de-activation. + if (!m_unbind_message_sent && m_bind_message_sent) { enlist_to_send(); // Throws return; } @@ -1739,8 +1736,9 @@ void Session::complete_deactivation() { REALM_ASSERT_EX(m_state == Deactivating, m_state); m_state = Deactivated; - - logger.debug("Deactivation completed"); // Throws + if (!m_suspended) + m_conn.one_less_active_unsuspended_session(); // Throws + logger.debug("Deactivation completed"); // Throws } @@ -1754,10 +1752,13 @@ void Session::send_message() REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state); REALM_ASSERT(m_enlisted_to_send); m_enlisted_to_send = false; - if (m_error_to_send && m_bind_message_sent && have_client_file_ident()) - return send_json_error_message(); // Throws - if (m_state == Deactivating || m_error_message_received || m_suspended) { + if (m_error_to_send) { + send_json_error_message(); // Throws + return; + } + + if ((m_state == Deactivating || m_error_message_received || m_suspended) && !m_error_to_send) { // Deactivation has been initiated. If the UNBIND message has not been // sent yet, there is no point in sending it. Instead, we can let the // deactivation process complete. @@ -1780,6 +1781,7 @@ void Session::send_message() if (!m_bind_message_sent) return send_bind_message(); // Throws + // Stop sending upload, mark and query messages when the client detects an error. if (m_client_error) { return; @@ -2156,8 +2158,6 @@ void Session::send_unbind_message() void Session::send_json_error_message() { REALM_ASSERT_EX(m_state == Active || m_state == Deactivating, m_state); - REALM_ASSERT(m_bind_message_sent); - REALM_ASSERT(have_client_file_ident()); REALM_ASSERT(!m_unbind_message_sent); REALM_ASSERT(m_error_to_send); REALM_ASSERT(m_client_error); @@ -2181,7 +2181,11 @@ void Session::send_json_error_message() call_debug_hook(SyncClientHookEvent::ClientErrorMessageSent, ProtocolErrorInfo(protocol_error, message, IsFatal{false})); - enlist_to_send(); // Throws + // If we are not active then enlisting to send will crash - we're likely the last + // message to get sent for this session. + if (m_state == Active) { + enlist_to_send(); // Throws + } } diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index 6f67fc2b1aa..f076a47d7ec 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -1457,7 +1457,7 @@ inline bool ClientImpl::Session::unbind_process_complete() const noexcept inline void ClientImpl::Session::connection_established(bool fast_reconnect) { - REALM_ASSERT(m_state == Active); + REALM_ASSERT(m_state == Active || (m_state == Deactivating && m_error_to_send)); if (!fast_reconnect && !get_client().m_disable_upload_activation_delay) { // Disallow immediate activation of the upload process, even if download @@ -1504,6 +1504,12 @@ inline void ClientImpl::Session::message_sent() // No message will be sent after the UNBIND message REALM_ASSERT(!m_unbind_message_send_complete); + if (m_state == Deactivating && m_client_error && !m_bind_message_sent) { + REALM_ASSERT(m_client_error); + complete_deactivation(); + return; + } + if (m_unbind_message_sent) { REALM_ASSERT(!m_enlisted_to_send); diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 271621476a0..8155c7dbe41 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2760,7 +2760,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot }; interrupted_realm_config.sync_config->on_sync_client_event_hook = - [&, download_message_received = false, bind_message_sent = false, + [&, download_message_received = false, ident_message_sent = false](std::weak_ptr, const SyncClientHookData& data) mutable { if (data.event == SyncClientHookEvent::DownloadMessageReceived) { download_message_received = true; @@ -2768,9 +2768,6 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot else if (data.event == SyncClientHookEvent::IdentMessageSent) { ident_message_sent = true; } - else if (data.event == SyncClientHookEvent::BindMessageSent) { - bind_message_sent = true; - } else if (data.event == SyncClientHookEvent::BootstrapBatchAboutToProcess) { REQUIRE(!ident_message_sent); REQUIRE(!download_message_received); @@ -2786,7 +2783,6 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot } } else if (data.event == SyncClientHookEvent::ClientErrorMessageSent) { - REQUIRE(bind_message_sent); REQUIRE(!ident_message_sent); state.transition_with([&](State cur_state) -> std::optional { REQUIRE(cur_state == State::ClientErrorPropagatedToHandler); From 89f81c92c6c2dbe6ecfedc3cb315cfef0cc45277 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Thu, 18 Jan 2024 15:08:47 -0500 Subject: [PATCH 06/10] wait for error message to finish sending before deactivating --- src/realm/sync/client.cpp | 6 ------ src/realm/sync/noinst/client_impl_base.cpp | 20 ++++++++++---------- src/realm/sync/noinst/client_impl_base.hpp | 21 +++++++++++++++++---- test/object-store/sync/flx_sync.cpp | 5 +++-- 4 files changed, 30 insertions(+), 22 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index f90abb10694..9853f632fbf 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -962,9 +962,6 @@ void SessionImpl::on_flx_sync_progress(int64_t version, DownloadBatchState batch SubscriptionStore* SessionImpl::get_flx_subscription_store() { - if (m_state != State::Active) { - return nullptr; - } // Should never be called if session is not active REALM_ASSERT_EX(m_state == State::Active, m_state); return m_wrapper.get_flx_subscription_store(); @@ -972,9 +969,6 @@ SubscriptionStore* SessionImpl::get_flx_subscription_store() MigrationStore* SessionImpl::get_migration_store() { - if (m_state != State::Active) { - return nullptr; - } // Should never be called if session is not active REALM_ASSERT_EX(m_state == State::Active, m_state); return m_wrapper.get_migration_store(); diff --git a/src/realm/sync/noinst/client_impl_base.cpp b/src/realm/sync/noinst/client_impl_base.cpp index 776673d522e..2879811ec99 100644 --- a/src/realm/sync/noinst/client_impl_base.cpp +++ b/src/realm/sync/noinst/client_impl_base.cpp @@ -1715,7 +1715,7 @@ void Session::initiate_deactivation() // Deactivate immediately if the BIND message has not yet been sent and the // session is not enlisted to send, or if the unbinding process has already // completed. - if ((!m_bind_message_sent || unbind_process_complete()) && !m_error_to_send) { + if ((!m_bind_message_sent || unbind_process_complete()) && !pending_client_error()) { complete_deactivation(); // Throws // Life cycle state is now Deactivated return; @@ -1758,11 +1758,11 @@ void Session::send_message() return; } - if ((m_state == Deactivating || m_error_message_received || m_suspended) && !m_error_to_send) { + if (m_state == Deactivating || m_error_message_received || m_suspended) { // Deactivation has been initiated. If the UNBIND message has not been // sent yet, there is no point in sending it. Instead, we can let the // deactivation process complete. - if (!m_bind_message_sent) { + if (!m_bind_message_sent && !pending_client_error()) { return complete_deactivation(); // Throws // Life cycle state is now Deactivated } @@ -1783,7 +1783,7 @@ void Session::send_message() // Stop sending upload, mark and query messages when the client detects an error. - if (m_client_error) { + if (m_error_message_sent) { return; } @@ -2162,12 +2162,13 @@ void Session::send_json_error_message() REALM_ASSERT(m_error_to_send); REALM_ASSERT(m_client_error); + auto client_error = std::move(m_client_error); ClientProtocol& protocol = m_conn.get_client_protocol(); OutputBuffer& out = m_conn.get_output_buffer(); session_ident_type session_ident = get_ident(); - auto protocol_error = static_cast(m_client_error->error_for_server); + auto protocol_error = static_cast(client_error->error_for_server); - auto message = util::format("%1", m_client_error->to_status()); + auto message = util::format("%1", client_error->to_status()); logger.info("Sending: ERROR \"%1\" (error_code=%2, session_ident=%3)", message, protocol_error, session_ident); // Throws @@ -2177,13 +2178,12 @@ void Session::send_json_error_message() error_body_json.dump()); // Throws m_conn.initiate_write_message(out, this); // Throws m_error_to_send = false; + m_error_message_sent = true; call_debug_hook(SyncClientHookEvent::ClientErrorMessageSent, ProtocolErrorInfo(protocol_error, message, IsFatal{false})); - // If we are not active then enlisting to send will crash - we're likely the last - // message to get sent for this session. - if (m_state == Active) { + if (m_state == Active && m_bind_message_sent) { enlist_to_send(); // Throws } } @@ -2356,7 +2356,7 @@ Status Session::receive_download_message(const SyncProgress& progress, std::uint // Ignore download messages when the client detects an error. This is to prevent transforming the same bad // changeset over and over again. - if (m_client_error) { + if (m_error_message_sent) { logger.debug("Ignoring download message because the client detected an integration error"); return Status::OK(); } diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index f076a47d7ec..162b8977b03 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -1040,6 +1040,8 @@ class ClientImpl::Session { bool m_error_message_received; // Session specific ERROR message received bool m_unbound_message_received; // UNBOUND message received bool m_error_to_send; + bool m_error_message_sent; + bool m_error_message_send_complete; // True when there is a new FLX sync query we need to send to the server. util::Optional m_pending_flx_sub_set; @@ -1159,6 +1161,7 @@ class ClientImpl::Session { // session is in the Active state, and the unbinding process has completed // (unbind_process_complete()). bool unbind_process_complete() const noexcept; + bool pending_client_error() const noexcept; void activate(); void initiate_deactivation(); @@ -1455,6 +1458,11 @@ inline bool ClientImpl::Session::unbind_process_complete() const noexcept return (m_unbind_message_send_complete && (m_error_message_received || m_unbound_message_received)); } +inline bool ClientImpl::Session::pending_client_error() const noexcept +{ + return m_error_to_send || (m_error_message_sent && !m_error_message_send_complete); +} + inline void ClientImpl::Session::connection_established(bool fast_reconnect) { REALM_ASSERT(m_state == Active || (m_state == Deactivating && m_error_to_send)); @@ -1504,10 +1512,12 @@ inline void ClientImpl::Session::message_sent() // No message will be sent after the UNBIND message REALM_ASSERT(!m_unbind_message_send_complete); - if (m_state == Deactivating && m_client_error && !m_bind_message_sent) { - REALM_ASSERT(m_client_error); - complete_deactivation(); - return; + if (m_error_message_sent) { + m_error_message_send_complete = true; + if (m_state == Deactivating && !m_bind_message_sent) { + complete_deactivation(); + return; + } } if (m_unbind_message_sent) { @@ -1556,6 +1566,9 @@ inline void ClientImpl::Session::reset_protocol_state() noexcept m_enlisted_to_send = false; m_bind_message_sent = false; m_error_to_send = false; + m_error_message_sent = false; + m_error_message_send_complete = false; + m_ident_message_sent = false; m_unbind_message_sent = false; m_unbind_message_send_complete = false; diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 8155c7dbe41..68a4b1db51e 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2759,6 +2759,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot }); }; + auto wait_for_client_error = GENERATE(true, false); interrupted_realm_config.sync_config->on_sync_client_event_hook = [&, download_message_received = false, ident_message_sent = false](std::weak_ptr, const SyncClientHookData& data) mutable { @@ -2785,7 +2786,8 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot else if (data.event == SyncClientHookEvent::ClientErrorMessageSent) { REQUIRE(!ident_message_sent); state.transition_with([&](State cur_state) -> std::optional { - REQUIRE(cur_state == State::ClientErrorPropagatedToHandler); + REQUIRE((cur_state == State::ClientErrorPropagatedToHandler || + cur_state == State::ExceptionThrown)); return State::ClientErrorMessageSentToServer; }); } @@ -2793,7 +2795,6 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot return SyncClientHookAction::NoAction; }; - auto wait_for_client_error = GENERATE(true, false); { auto realm = Realm::get_shared_realm(interrupted_realm_config); if (wait_for_client_error) { From 2966d37a937f316ad812cfab1a17f8268e82e498 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Thu, 18 Jan 2024 17:35:24 -0500 Subject: [PATCH 07/10] more robust testing of app log entries --- src/realm/sync/client.cpp | 32 ++++++++----------- src/realm/sync/noinst/client_impl_base.hpp | 17 +++++----- test/object-store/sync/flx_sync.cpp | 27 ++++++++++------ .../object-store/util/sync/baas_admin_api.cpp | 9 ++++-- .../object-store/util/sync/baas_admin_api.hpp | 3 +- 5 files changed, 47 insertions(+), 41 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 9853f632fbf..63e5ec73fad 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -162,8 +162,9 @@ class SessionWrapper final : public util::AtomicRefCountBase, DB::CommitListener util::UniqueFunction m_progress_handler; util::UniqueFunction m_connection_state_change_listener; - std::function m_debug_hook; - bool m_in_debug_hook = false; + // This gets passed to the SessionImpl constructor and owned by the SessionImpl after + // actualization so that it can outlive the SessionWrapper. + std::function m_debug_hook_for_sess_impl; SessionReason m_session_reason; @@ -988,15 +989,15 @@ SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data REALM_ASSERT_EX(m_state == State::Active, m_state); // Make sure we don't call the debug hook recursively. - if (m_wrapper.m_in_debug_hook) { + if (m_in_debug_hook) { return SyncClientHookAction::NoAction; } - m_wrapper.m_in_debug_hook = true; + m_in_debug_hook = true; auto in_hook_guard = util::make_scope_exit([&]() noexcept { - m_wrapper.m_in_debug_hook = false; + m_in_debug_hook = false; }); - auto action = m_wrapper.m_debug_hook(data); + auto action = m_debug_hook(data); switch (action) { case realm::SyncClientHookAction::SuspendWithRetryableError: { SessionErrorInfo err_info(Status{ErrorCodes::RuntimeError, "hook requested error"}, IsFatal{false}); @@ -1019,11 +1020,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con int64_t query_version, DownloadBatchState batch_state, size_t num_changesets) { - if (REALM_UNLIKELY(m_state != State::Active)) { - return SyncClientHookAction::NoAction; - } - - if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { + if (REALM_LIKELY(!m_debug_hook)) { return SyncClientHookAction::NoAction; } SyncClientHookData data; @@ -1038,11 +1035,7 @@ SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, con SyncClientHookAction SessionImpl::call_debug_hook(SyncClientHookEvent event, const ProtocolErrorInfo& error_info) { - if (REALM_UNLIKELY(m_state != State::Active)) { - return SyncClientHookAction::NoAction; - } - - if (REALM_LIKELY(!m_wrapper.m_debug_hook)) { + if (REALM_LIKELY(!m_debug_hook)) { return SyncClientHookAction::NoAction; } @@ -1140,7 +1133,7 @@ SessionWrapper::SessionWrapper(ClientImpl& client, DBRef db, std::shared_ptr sess = std::make_unique(*this, conn); // Throws + conn.update_connect_info(m_http_request_path_prefix, m_signed_access_token); // Throws + std::unique_ptr sess = std::make_unique( + *this, conn, m_client.get_next_session_ident(), std::move(m_debug_hook_for_sess_impl)); // Throws if (sync_mode == SyncServerMode::FLX) { m_flx_pending_bootstrap_store = std::make_unique(m_db, sess->logger); } diff --git a/src/realm/sync/noinst/client_impl_base.hpp b/src/realm/sync/noinst/client_impl_base.hpp index 162b8977b03..ad41b65bb66 100644 --- a/src/realm/sync/noinst/client_impl_base.hpp +++ b/src/realm/sync/noinst/client_impl_base.hpp @@ -866,7 +866,8 @@ class ClientImpl::Session { /// The specified transaction reporter (via the config object) is guaranteed /// to not be called before activation, and also not after initiation of /// deactivation. - Session(SessionWrapper&, ClientImpl::Connection&); + Session(SessionWrapper&, ClientImpl::Connection&, session_ident_type, + std::function); ~Session(); void force_close(); @@ -1138,13 +1139,14 @@ class ClientImpl::Session { SessionWrapper& m_wrapper; + std::function m_debug_hook; + bool m_in_debug_hook = false; + request_ident_type m_last_pending_test_command_ident = 0; std::list m_pending_test_commands; static std::string make_logger_prefix(session_ident_type); - Session(SessionWrapper& wrapper, Connection&, session_ident_type); - bool do_recognize_sync_version(version_type) noexcept; bool have_client_file_ident() const noexcept; @@ -1420,12 +1422,8 @@ inline void ClientImpl::Session::request_download_completion_notification() ensure_enlisted_to_send(); // Throws } -inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn) - : Session{wrapper, conn, conn.get_client().get_next_session_ident()} // Throws -{ -} - -inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, session_ident_type ident) +inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, session_ident_type ident, + std::function debug_hook) : logger_ptr{std::make_shared(make_logger_prefix(ident), conn.logger_ptr)} // Throws , logger{*logger_ptr} , m_conn{conn} @@ -1434,6 +1432,7 @@ inline ClientImpl::Session::Session(SessionWrapper& wrapper, Connection& conn, s , m_is_flx_sync_session(conn.is_flx_sync_connection()) , m_fix_up_object_ids(get_client().m_fix_up_object_ids) , m_wrapper{wrapper} + , m_debug_hook(std::move(debug_hook)) { if (get_client().m_disable_upload_activation_delay) m_allow_upload = true; diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 68a4b1db51e..20c0464563c 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2801,14 +2801,14 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot state.wait_for(State::ClientErrorPropagatedToHandler); } else { - state.wait_for(State::ExceptionThrown); + state.wait_for(State::ClientErrorMessageSentToServer); } } std::vector server_errors; timed_sleeping_wait_for([&] { - server_errors = - harness.session().app_session().admin_api.get_errors(harness.session().app_session().server_app_id); + server_errors = harness.session().app_session().admin_api.get_errors( + harness.session().app_session().server_app_id, "SYNC_ERROR"); return !server_errors.empty(); }); @@ -2893,8 +2893,8 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot std::vector server_errors; timed_sleeping_wait_for([&] { - server_errors = - harness.session().app_session().admin_api.get_errors(harness.session().app_session().server_app_id); + server_errors = harness.session().app_session().admin_api.get_errors( + harness.session().app_session().server_app_id, "SYNC_ERROR"); return !server_errors.empty(); }); REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring(error_status.reason()))); @@ -3266,7 +3266,7 @@ TEST_CASE("flx: data ingest", "[sync][flx][data ingest][baas]") { std::vector server_errors; timed_sleeping_wait_for([&] { server_errors = harness->session().app_session().admin_api.get_errors( - harness->session().app_session().server_app_id); + harness->session().app_session().server_app_id, "SYNC_ERROR"); return !server_errors.empty(); }); REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring( @@ -3560,10 +3560,17 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { // This results in the client sending an error message to the server. SyncTestFile config(harness.app()->current_user(), harness.schema(), SyncConfig::FLXSyncEnabled{}); SECTION("immediately close session after bad changeset") { - enum class State { Initial, AboutToThrow, RealmDestroyed, Thrown }; + enum class State { Initial, AboutToThrow, RealmDestroyed, Thrown, ClientErrorMessageSent }; TestingStateMachine state(State::Initial); config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr, const SyncClientHookData& data) mutable { + if (data.event == SyncClientHookEvent::ClientErrorMessageSent) { + state.transition_with([&](State cur_state) -> std::optional { + REQUIRE(cur_state == State::Thrown); + return State::ClientErrorMessageSent; + }); + return SyncClientHookAction::NoAction; + } if (data.event != SyncClientHookEvent::BootstrapBatchAboutToProcess) { return SyncClientHookAction::NoAction; } @@ -3588,7 +3595,7 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { REQUIRE(cur_state == State::AboutToThrow); return State::RealmDestroyed; }); - state.wait_for(State::Thrown); + state.wait_for(State::ClientErrorMessageSent); } SECTION("check for client reset error") { @@ -3620,8 +3627,8 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { std::vector server_errors; timed_sleeping_wait_for([&] { - server_errors = - harness.session().app_session().admin_api.get_errors(harness.session().app_session().server_app_id); + server_errors = harness.session().app_session().admin_api.get_errors( + harness.session().app_session().server_app_id, "SYNC_ERROR"); return !server_errors.empty(); }); REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring( diff --git a/test/object-store/util/sync/baas_admin_api.cpp b/test/object-store/util/sync/baas_admin_api.cpp index 0b8a0074bfb..328b10bba57 100644 --- a/test/object-store/util/sync/baas_admin_api.cpp +++ b/test/object-store/util/sync/baas_admin_api.cpp @@ -556,10 +556,15 @@ std::vector AdminAPISession::get_services(const std::s } -std::vector AdminAPISession::get_errors(const std::string& app_id) const +std::vector AdminAPISession::get_errors(const std::string& app_id, + std::optional error_type) const { auto endpoint = apps()[app_id]["logs"]; - auto response = endpoint.get_json({{"errors_only", "true"}}); + auto req = std::vector>{{"errors_only", "true"}}; + if (error_type) { + req.push_back({"type", *error_type}); + } + auto response = endpoint.get_json(std::move(req)); std::vector errors; const auto& logs = response["logs"]; std::transform(logs.begin(), logs.end(), std::back_inserter(errors), [](const auto& err) { diff --git a/test/object-store/util/sync/baas_admin_api.hpp b/test/object-store/util/sync/baas_admin_api.hpp index fcf3d5e3ab4..24beee1a532 100644 --- a/test/object-store/util/sync/baas_admin_api.hpp +++ b/test/object-store/util/sync/baas_admin_api.hpp @@ -111,7 +111,8 @@ class AdminAPISession { }; std::vector get_services(const std::string& app_id) const; - std::vector get_errors(const std::string& app_id) const; + std::vector get_errors(const std::string& app_id, + std::optional type = std::nullopt) const; Service get_sync_service(const std::string& app_id) const; ServiceConfig get_config(const std::string& app_id, const Service& service) const; ServiceConfig disable_sync(const std::string& app_id, const std::string& service_id, From 90b57ad81f968b69534860f14c31a35fa683d085 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Thu, 18 Jan 2024 19:27:24 -0500 Subject: [PATCH 08/10] try to make sure we're getting only our applog entries --- test/object-store/sync/flx_sync.cpp | 11 +++++++---- test/object-store/util/sync/baas_admin_api.cpp | 11 ++++++----- test/object-store/util/sync/baas_admin_api.hpp | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 20c0464563c..0661e91b7b0 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2808,7 +2808,8 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot std::vector server_errors; timed_sleeping_wait_for([&] { server_errors = harness.session().app_session().admin_api.get_errors( - harness.session().app_session().server_app_id, "SYNC_ERROR"); + harness.session().app_session().server_app_id, + {{"user_id", harness.app()->current_user()->identity()}}); return !server_errors.empty(); }); @@ -2894,7 +2895,8 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot std::vector server_errors; timed_sleeping_wait_for([&] { server_errors = harness.session().app_session().admin_api.get_errors( - harness.session().app_session().server_app_id, "SYNC_ERROR"); + harness.session().app_session().server_app_id, + {{"user_id", harness.app()->current_user()->identity()}}); return !server_errors.empty(); }); REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring(error_status.reason()))); @@ -3266,7 +3268,8 @@ TEST_CASE("flx: data ingest", "[sync][flx][data ingest][baas]") { std::vector server_errors; timed_sleeping_wait_for([&] { server_errors = harness->session().app_session().admin_api.get_errors( - harness->session().app_session().server_app_id, "SYNC_ERROR"); + harness->session().app_session().server_app_id, + {{"user_id", harness->app()->current_user()->identity()}}); return !server_errors.empty(); }); REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring( @@ -3628,7 +3631,7 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { std::vector server_errors; timed_sleeping_wait_for([&] { server_errors = harness.session().app_session().admin_api.get_errors( - harness.session().app_session().server_app_id, "SYNC_ERROR"); + harness.session().app_session().server_app_id, {{"user_id", harness.app()->current_user()->identity()}}); return !server_errors.empty(); }); REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring( diff --git a/test/object-store/util/sync/baas_admin_api.cpp b/test/object-store/util/sync/baas_admin_api.cpp index 328b10bba57..46885f0e656 100644 --- a/test/object-store/util/sync/baas_admin_api.cpp +++ b/test/object-store/util/sync/baas_admin_api.cpp @@ -557,14 +557,15 @@ std::vector AdminAPISession::get_services(const std::s std::vector AdminAPISession::get_errors(const std::string& app_id, - std::optional error_type) const + std::vector> filters) const { auto endpoint = apps()[app_id]["logs"]; - auto req = std::vector>{{"errors_only", "true"}}; - if (error_type) { - req.push_back({"type", *error_type}); + if (!std::any_of(filters.begin(), filters.end(), [](auto& pair) { + return pair.first == "errors_only"; + })) { + filters.push_back({"errors_only", "true"}); } - auto response = endpoint.get_json(std::move(req)); + auto response = endpoint.get_json(std::move(filters)); std::vector errors; const auto& logs = response["logs"]; std::transform(logs.begin(), logs.end(), std::back_inserter(errors), [](const auto& err) { diff --git a/test/object-store/util/sync/baas_admin_api.hpp b/test/object-store/util/sync/baas_admin_api.hpp index 24beee1a532..550e3f03b69 100644 --- a/test/object-store/util/sync/baas_admin_api.hpp +++ b/test/object-store/util/sync/baas_admin_api.hpp @@ -112,7 +112,7 @@ class AdminAPISession { std::vector get_services(const std::string& app_id) const; std::vector get_errors(const std::string& app_id, - std::optional type = std::nullopt) const; + std::vector> filters = {}) const; Service get_sync_service(const std::string& app_id) const; ServiceConfig get_config(const std::string& app_id, const Service& service) const; ServiceConfig disable_sync(const std::string& app_id, const std::string& service_id, From 3338c04ebc6e61552a94e9111c32b983816130ba Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Thu, 18 Jan 2024 20:30:20 -0500 Subject: [PATCH 09/10] hooks no longer need an active session --- src/realm/sync/client.cpp | 3 --- 1 file changed, 3 deletions(-) diff --git a/src/realm/sync/client.cpp b/src/realm/sync/client.cpp index 63e5ec73fad..45fb761e3aa 100644 --- a/src/realm/sync/client.cpp +++ b/src/realm/sync/client.cpp @@ -985,9 +985,6 @@ void SessionImpl::on_flx_sync_version_complete(int64_t version) SyncClientHookAction SessionImpl::call_debug_hook(const SyncClientHookData& data) { - // Should never be called if session is not active - REALM_ASSERT_EX(m_state == State::Active, m_state); - // Make sure we don't call the debug hook recursively. if (m_in_debug_hook) { return SyncClientHookAction::NoAction; From 9bc6f345ba6b7feab42fc53d8a06e111291a0951 Mon Sep 17 00:00:00 2001 From: Jonathan Reams Date: Fri, 19 Jan 2024 00:00:49 -0500 Subject: [PATCH 10/10] omg that race was so obvious --- test/object-store/sync/flx_sync.cpp | 48 ++++++++++++++--------------- 1 file changed, 23 insertions(+), 25 deletions(-) diff --git a/test/object-store/sync/flx_sync.cpp b/test/object-store/sync/flx_sync.cpp index 0661e91b7b0..7f9478ca649 100644 --- a/test/object-store/sync/flx_sync.cpp +++ b/test/object-store/sync/flx_sync.cpp @@ -2745,21 +2745,21 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot enum class State { Interrupted, ExceptionThrown, + ClientErrorPropagatedToHandler, + RealmDestroyed, ClientErrorMessageSentToServer, - ClientErrorPropagatedToHandler }; TestingStateMachine state(State::Interrupted); std::optional propagated_error; interrupted_realm_config.sync_config->error_handler = [&](std::shared_ptr, SyncError error) { propagated_error = std::move(error); - state.transition_with([&](State cur_state) -> std::optional { + state.transition_with([&](State cur_state) { REQUIRE(cur_state == State::ExceptionThrown); return State::ClientErrorPropagatedToHandler; }); }; - auto wait_for_client_error = GENERATE(true, false); interrupted_realm_config.sync_config->on_sync_client_event_hook = [&, download_message_received = false, ident_message_sent = false](std::weak_ptr, const SyncClientHookData& data) mutable { @@ -2772,7 +2772,7 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot else if (data.event == SyncClientHookEvent::BootstrapBatchAboutToProcess) { REQUIRE(!ident_message_sent); REQUIRE(!download_message_received); - state.transition_with([&](State cur_state) -> std::optional { + state.transition_with([&](State cur_state) { REQUIRE(cur_state == State::Interrupted); return State::ExceptionThrown; }); @@ -2785,9 +2785,9 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot } else if (data.event == SyncClientHookEvent::ClientErrorMessageSent) { REQUIRE(!ident_message_sent); - state.transition_with([&](State cur_state) -> std::optional { - REQUIRE((cur_state == State::ClientErrorPropagatedToHandler || - cur_state == State::ExceptionThrown)); + state.wait_for(State::RealmDestroyed); + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::RealmDestroyed); return State::ClientErrorMessageSentToServer; }); } @@ -2797,14 +2797,15 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot { auto realm = Realm::get_shared_realm(interrupted_realm_config); - if (wait_for_client_error) { - state.wait_for(State::ClientErrorPropagatedToHandler); - } - else { - state.wait_for(State::ClientErrorMessageSentToServer); - } + state.wait_for(State::ClientErrorPropagatedToHandler); } + state.transition_with([&](State cur_state) { + REQUIRE(cur_state == State::ClientErrorPropagatedToHandler); + return State::RealmDestroyed; + }); + state.wait_for(State::ClientErrorMessageSentToServer); + std::vector server_errors; timed_sleeping_wait_for([&] { server_errors = harness.session().app_session().admin_api.get_errors( @@ -2821,14 +2822,11 @@ TEST_CASE("flx: bootstrap batching prevents orphan documents", "[sync][flx][boot return std::make_pair(ErrorCodes::BadChangeset, "simulated failure"); } }(); - if (wait_for_client_error) { - REQUIRE(propagated_error); - REQUIRE(!propagated_error->is_fatal); - REQUIRE(propagated_error->server_requests_action == sync::ProtocolErrorInfo::Action::Warning); - REQUIRE(propagated_error->status == expected_error.first); - REQUIRE_THAT(propagated_error->status.reason(), - Catch::Matchers::ContainsSubstring(expected_error.second)); - } + REQUIRE(propagated_error); + REQUIRE(!propagated_error->is_fatal); + REQUIRE(propagated_error->server_requests_action == sync::ProtocolErrorInfo::Action::Warning); + REQUIRE(propagated_error->status == expected_error.first); + REQUIRE_THAT(propagated_error->status.reason(), Catch::Matchers::ContainsSubstring(expected_error.second)); REQUIRE_THAT(server_errors, VectorElemMatches(Catch::Matchers::ContainsSubstring(expected_error.second))); } @@ -3568,7 +3566,7 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { config.sync_config->on_sync_client_event_hook = [&](std::weak_ptr, const SyncClientHookData& data) mutable { if (data.event == SyncClientHookEvent::ClientErrorMessageSent) { - state.transition_with([&](State cur_state) -> std::optional { + state.transition_with([&](State cur_state) { REQUIRE(cur_state == State::Thrown); return State::ClientErrorMessageSent; }); @@ -3578,12 +3576,12 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { return SyncClientHookAction::NoAction; } - state.transition_with([&](State cur_state) -> std::optional { + state.transition_with([&](State cur_state) { REQUIRE(cur_state == State::Initial); return State::AboutToThrow; }); state.wait_for(State::RealmDestroyed); - state.transition_with([&](State cur_state) -> std::optional { + state.transition_with([&](State cur_state) { REQUIRE(cur_state == State::RealmDestroyed); return State::Thrown; }); @@ -3594,7 +3592,7 @@ TEST_CASE("flx: send client error", "[sync][flx][baas]") { auto realm = Realm::get_shared_realm(config); state.wait_for(State::AboutToThrow); } - state.transition_with([&](State cur_state) -> std::optional { + state.transition_with([&](State cur_state) { REQUIRE(cur_state == State::AboutToThrow); return State::RealmDestroyed; });