Skip to content
Merged
Show file tree
Hide file tree
Changes from 5 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 9 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,15 @@ api.set_provider(NoOpProvider())
open_feature_client = api.get_client()
```

`set_provider()` is non-blocking: it registers the provider immediately and runs initialization in a background thread.
Flag evaluations during the initialization window return the default value with a `PROVIDER_NOT_READY` error code.
Use `set_provider_and_wait()` if you need to ensure the provider is ready before proceeding:

```python
# blocks until the provider is initialized (or raises on failure)
api.set_provider_and_wait(NoOpProvider())
```

In some situations, it may be beneficial to register multiple providers in the same application.
This is possible using [domains](#domains), which is covered in more detail below.

Expand Down
8 changes: 8 additions & 0 deletions openfeature/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
"remove_handler",
"set_evaluation_context",
"set_provider",
"set_provider_and_wait",
"set_transaction_context",
"set_transaction_context_propagator",
"shutdown",
Expand All @@ -52,6 +53,13 @@ def set_provider(provider: FeatureProvider, domain: str | None = None) -> None:
provider_registry.set_provider(domain, provider)


def set_provider_and_wait(provider: FeatureProvider, domain: str | None = None) -> None:
if domain is None:
provider_registry.set_default_provider(provider, wait_for_init=True)
else:
provider_registry.set_provider(domain, provider, wait_for_init=True)


def clear_providers() -> None:
provider_registry.clear_providers()
_event_support.clear()
Expand Down
173 changes: 130 additions & 43 deletions openfeature/provider/_registry.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import threading

from openfeature._event_support import run_handlers_for_provider
from openfeature.evaluation_context import EvaluationContext, get_evaluation_context
from openfeature.event import (
Expand All @@ -13,77 +15,144 @@ class ProviderRegistry:
_default_provider: FeatureProvider
_providers: dict[str, FeatureProvider]
_provider_status: dict[FeatureProvider, ProviderStatus]
_lock: threading.RLock

def __init__(self) -> None:
self._lock = threading.RLock()
self._default_provider = NoOpProvider()
self._providers = {}
self._provider_status = {
self._default_provider: ProviderStatus.READY,
}

def set_provider(self, domain: str, provider: FeatureProvider) -> None:
def set_provider(
self, domain: str, provider: FeatureProvider, wait_for_init: bool = False
) -> None:
if provider is None:
raise GeneralError(error_message="No provider")
if domain is None:
raise GeneralError(error_message="No domain")
providers = self._providers
if domain in providers:
old_provider = providers[domain]
del providers[domain]
if (
old_provider != self._default_provider
and old_provider not in providers.values()
):
self._shutdown_provider(old_provider)
if provider != self._default_provider and provider not in providers.values():
self._initialize_provider(provider)
providers[domain] = provider

old_provider: FeatureProvider | None = None
needs_init = False
with self._lock:
old_provider = self._providers.get(domain)
self._providers[domain] = provider
already_bound = provider is self._default_provider or any(
p is provider for d, p in self._providers.items() if d != domain
)
if not already_bound:
needs_init = True
self._provider_status[provider] = ProviderStatus.NOT_READY

if needs_init:
self._initialize_provider(provider, wait_for_init=wait_for_init)
Comment thread
jonathannorris marked this conversation as resolved.

# old-provider shutdown is always async so a hanging shutdown() cannot
# block set_provider.
if old_provider is not None and old_provider is not provider:
self._shutdown_if_unused(old_provider)

def get_provider(self, domain: str | None) -> FeatureProvider:
if domain is None:
return self._default_provider
return self._providers.get(domain, self._default_provider)

def set_default_provider(self, provider: FeatureProvider) -> None:
def set_default_provider(
self, provider: FeatureProvider, wait_for_init: bool = False
) -> None:
if provider is None:
raise GeneralError(error_message="No provider")
if (
self._default_provider
and self._default_provider not in self._providers.values()
):
self._shutdown_provider(self._default_provider)
self._default_provider = provider

if self._default_provider not in self._providers.values():
self._initialize_provider(provider)
old_provider: FeatureProvider | None = None
needs_init = False
with self._lock:
old_provider = self._default_provider
self._default_provider = provider
if (
provider is not old_provider
and provider not in self._providers.values()
):
needs_init = True
self._provider_status[provider] = ProviderStatus.NOT_READY

if needs_init:
self._initialize_provider(provider, wait_for_init=wait_for_init)

if old_provider is not None and old_provider is not provider:
self._shutdown_if_unused(old_provider)

def get_default_provider(self) -> FeatureProvider:
return self._default_provider

def clear_providers(self) -> None:
self.shutdown()
self._providers.clear()
self._default_provider = NoOpProvider()
self._provider_status = {
self._default_provider: ProviderStatus.READY,
}
with self._lock:
self._providers.clear()
self._default_provider = NoOpProvider()
self._provider_status = {
self._default_provider: ProviderStatus.READY,
}

def shutdown(self) -> None:
for provider in {self._default_provider, *self._providers.values()}:
with self._lock:
providers = {self._default_provider, *self._providers.values()}

for provider in providers:
self._shutdown_provider(provider)

def _get_evaluation_context(self) -> EvaluationContext:
return get_evaluation_context()

def _initialize_provider(self, provider: FeatureProvider) -> None:
def _initialize_provider(
self, provider: FeatureProvider, wait_for_init: bool
) -> None:
provider.attach(self.dispatch_event)
if not hasattr(provider, "initialize"):
# nothing async to do; dispatch READY synchronously.
self.dispatch_event(
provider, ProviderEvent.PROVIDER_READY, ProviderEventDetails()
)
return
if wait_for_init:
self._run_initialize(provider, raise_on_error=True)
return

thread = threading.Thread(
target=self._run_initialize,
args=(provider,),
kwargs={"raise_on_error": False},
daemon=True,
)
thread.start()

def _run_initialize(
self, provider: FeatureProvider, raise_on_error: bool = False
) -> None:
try:
if hasattr(provider, "initialize"):
provider.initialize(self._get_evaluation_context())
provider.initialize(self._get_evaluation_context())
# stale init: provider was replaced/shut down during initialize(); drop event.
# Check active registration, not _provider_status, since replaced providers
# remain in _provider_status until async shutdown pops them.
with self._lock:
if (
provider is not self._default_provider
and provider not in self._providers.values()
):
return
self.dispatch_event(
provider, ProviderEvent.PROVIDER_READY, ProviderEventDetails()
)
except Exception as err:
# stale init: provider was replaced/shut down during initialize(); drop event.
# Check active registration, not _provider_status, since replaced providers
# remain in _provider_status until async shutdown pops them.
with self._lock:
if (
provider is not self._default_provider
and provider not in self._providers.values()
):
return
error_code = (
err.error_code
if isinstance(err, OpenFeatureError)
Expand All @@ -97,12 +166,29 @@ def _initialize_provider(self, provider: FeatureProvider) -> None:
error_code=error_code,
),
)
if raise_on_error:
raise

def _shutdown_if_unused(self, provider: FeatureProvider) -> None:
# only shut down if no longer referenced. shutdown runs on a daemon
# thread so a hanging shutdown() cannot block the caller.
with self._lock:
if provider is self._default_provider:
return
if provider in self._providers.values():
return

thread = threading.Thread(
target=self._shutdown_provider, args=(provider,), daemon=True
)
thread.start()

def _shutdown_provider(self, provider: FeatureProvider) -> None:
try:
if hasattr(provider, "shutdown"):
provider.shutdown()
del self._provider_status[provider]
with self._lock:
self._provider_status.pop(provider, None)
except Exception as err:
self.dispatch_event(
provider,
Expand Down Expand Up @@ -132,17 +218,18 @@ def _update_provider_status(
event: ProviderEvent,
details: ProviderEventDetails,
) -> None:
if event == ProviderEvent.PROVIDER_READY:
self._provider_status[provider] = ProviderStatus.READY
elif event == ProviderEvent.PROVIDER_STALE:
self._provider_status[provider] = ProviderStatus.STALE
elif event == ProviderEvent.PROVIDER_ERROR:
status = (
ProviderStatus.FATAL
if details.error_code == ErrorCode.PROVIDER_FATAL
else ProviderStatus.ERROR
)
self._provider_status[provider] = status
with self._lock:
if event == ProviderEvent.PROVIDER_READY:
self._provider_status[provider] = ProviderStatus.READY
elif event == ProviderEvent.PROVIDER_STALE:
self._provider_status[provider] = ProviderStatus.STALE
elif event == ProviderEvent.PROVIDER_ERROR:
status = (
ProviderStatus.FATAL
if details.error_code == ErrorCode.PROVIDER_FATAL
else ProviderStatus.ERROR
)
self._provider_status[provider] = status


provider_registry = ProviderRegistry()
2 changes: 1 addition & 1 deletion tests/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,5 @@ def clear_providers():

@pytest.fixture()
def no_op_provider_client():
api.set_provider(NoOpProvider())
api.set_provider_and_wait(NoOpProvider())
return api.get_client()
4 changes: 2 additions & 2 deletions tests/features/steps/metadata_steps.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,13 @@
from behave import given, then

from openfeature.api import get_client, set_provider
from openfeature.api import get_client, set_provider_and_wait
from openfeature.provider.in_memory_provider import InMemoryProvider
from tests.features.data import IN_MEMORY_FLAGS


@given("a stable provider")
def step_impl_stable_provider(context):
set_provider(InMemoryProvider(IN_MEMORY_FLAGS))
set_provider_and_wait(InMemoryProvider(IN_MEMORY_FLAGS))
context.client = get_client()


Expand Down
6 changes: 3 additions & 3 deletions tests/features/steps/steps.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from behave import given, then, when

from openfeature.api import get_client, set_provider
from openfeature.api import get_client, set_provider_and_wait
from openfeature.client import OpenFeatureClient
from openfeature.evaluation_context import EvaluationContext
from openfeature.exception import ErrorCode
Expand All @@ -28,13 +28,13 @@ def step_impl_resolved_should_be(context, flag_type, key, expected_reason):

@given("a provider is registered with cache disabled")
def step_impl_provider_without_cache(context):
set_provider(InMemoryProvider(IN_MEMORY_FLAGS))
set_provider_and_wait(InMemoryProvider(IN_MEMORY_FLAGS))
context.client = get_client()


@given("a provider is registered")
def step_impl_provider(context):
set_provider(InMemoryProvider(IN_MEMORY_FLAGS))
set_provider_and_wait(InMemoryProvider(IN_MEMORY_FLAGS))
context.client = get_client()


Expand Down
Loading
Loading