diff --git a/.tekton/integration-test-eaas.yaml b/.tekton/integration-test-eaas.yaml index c8e1a413..5da2d2b2 100644 --- a/.tekton/integration-test-eaas.yaml +++ b/.tekton/integration-test-eaas.yaml @@ -77,6 +77,386 @@ spec: - name: ownerUid value: $(context.pipelineRun.uid) + - name: deploy-openldap + runAfter: + - provision-environment + taskSpec: + params: + - name: kubeconfig-secret + type: string + steps: + - name: create-openldap + image: quay.io/konflux-ci/appstudio-utils:latest + script: | + #!/usr/bin/env bash + set -euo pipefail + + KUBECONFIG=/tmp/kubeconfig + kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG + export KUBECONFIG + + echo "==========================================" + echo "Deploying LDAP Server (Python/ldaptor)" + echo "==========================================" + + # Deploy a Python-based in-memory LDAP server using ldaptor. + # osixia/openldap:1.5.0 requires root and fails in OpenShift's + # restricted-v2 SCC. ldaptor runs as an arbitrary UID on a + # non-privileged port (1389), so no SCC changes are needed. + + kubectl apply -f - <<'EOFYAML' + apiVersion: v1 + kind: ConfigMap + metadata: + name: ldap-server-script + data: + server.py: | + """ + Minimal in-memory LDAP server using ldaptor. + + Serves posixGroup entries under ou=groups,dc=example,dc=com with + anonymous-read access so that CTS's query_ldap_groups() can + retrieve group membership without a bind DN. + """ + import io + from twisted.internet import reactor + from twisted.internet.protocol import ServerFactory + from twisted.python.components import registerAdapter + from ldaptor.inmemory import fromLDIFFile + from ldaptor.interfaces import IConnectedLDAPEntry + from ldaptor.protocols.ldap.ldapserver import LDAPServer + + LDIF = b"""\ + dn: dc=example,dc=com + dc: example + objectClass: top + objectClass: domain + + dn: ou=groups,dc=example,dc=com + ou: groups + objectClass: top + objectClass: organizationalUnit + + dn: cn=cts-builders,ou=groups,dc=example,dc=com + cn: cts-builders + objectClass: top + objectClass: posixGroup + gidNumber: 5501 + memberUid: builder@example.com + + dn: cn=readonly-users,ou=groups,dc=example,dc=com + cn: readonly-users + objectClass: top + objectClass: posixGroup + gidNumber: 5502 + memberUid: readonly@example.com + + """ + + class LDAPServerFactory(ServerFactory): + protocol = LDAPServer + + def __init__(self, root): + self.root = root + + def buildProtocol(self, addr): + proto = self.protocol() + proto.factory = self + return proto + + registerAdapter( + lambda f: f.root, LDAPServerFactory, IConnectedLDAPEntry + ) + + def start(root): + factory = LDAPServerFactory(root) + reactor.listenTCP(1389, factory, interface="0.0.0.0") + print("LDAP server listening on port 1389", flush=True) + + d = fromLDIFFile(io.BytesIO(LDIF)) + d.addCallback(start) + reactor.run() + EOFYAML + + kubectl apply -f - <<'EOFYAML' + apiVersion: apps/v1 + kind: Deployment + metadata: + name: openldap + labels: + app: openldap + spec: + replicas: 1 + selector: + matchLabels: + app: openldap + template: + metadata: + labels: + app: openldap + spec: + containers: + - name: openldap + image: quay.io/konflux-ci/appstudio-utils:latest + command: ["/bin/bash", "-c"] + args: + - | + set -e + export HOME=/tmp + echo "Installing ldaptor and twisted..." + python3 -m ensurepip + python3 -m pip install --target /tmp/ldap-deps --quiet ldaptor twisted + echo "Starting LDAP server..." + export PYTHONPATH=/tmp/ldap-deps + exec python3 /scripts/server.py + ports: + - containerPort: 1389 + name: ldap + readinessProbe: + tcpSocket: + port: 1389 + initialDelaySeconds: 15 + periodSeconds: 5 + timeoutSeconds: 3 + failureThreshold: 12 + resources: + requests: + memory: "128Mi" + cpu: "50m" + limits: + memory: "256Mi" + cpu: "200m" + volumeMounts: + - name: ldap-script + mountPath: /scripts + readOnly: true + volumes: + - name: ldap-script + configMap: + name: ldap-server-script + --- + apiVersion: v1 + kind: Service + metadata: + name: openldap + labels: + app: openldap + spec: + ports: + - port: 1389 + targetPort: 1389 + name: ldap + selector: + app: openldap + EOFYAML + + echo "Waiting for LDAP server to be ready..." + if ! kubectl wait --for=condition=available --timeout=180s deployment/openldap; then + echo "LDAP deployment failed! Debug info:" + kubectl describe deployment openldap + kubectl describe pod -l app=openldap + kubectl logs -l app=openldap --tail=50 || echo "No logs available" + exit 1 + fi + echo "✓ LDAP server is ready" + params: + - name: kubeconfig-secret + value: $(tasks.provision-environment.results.secretRef) + + - name: deploy-dex + runAfter: + - provision-environment + taskSpec: + params: + - name: kubeconfig-secret + type: string + steps: + - name: create-dex + image: quay.io/konflux-ci/appstudio-utils:latest + script: | + #!/usr/bin/env bash + set -euo pipefail + + KUBECONFIG=/tmp/kubeconfig + kubectl get secret $(params.kubeconfig-secret) -o jsonpath='{.data.kubeconfig}' | base64 -d > $KUBECONFIG + export KUBECONFIG + + echo "==========================================" + echo "Generating Dex TLS certificate" + echo "==========================================" + + # Generate a self-signed CA and Dex server certificate (SAN: DNS:dex) + # so that OIDCOAuthVerifyJwksUri can use https://dex:5556/keys + TMPDIR=$(mktemp -d) + + # Generate CA key and cert + openssl genrsa -out "$TMPDIR/ca.key" 4096 2>/dev/null + openssl req -x509 -new -nodes -key "$TMPDIR/ca.key" \ + -sha256 -days 365 \ + -subj "/CN=dex-test-ca" \ + -out "$TMPDIR/ca.crt" 2>/dev/null + + # Generate Dex server key and CSR + openssl genrsa -out "$TMPDIR/dex.key" 4096 2>/dev/null + openssl req -new -key "$TMPDIR/dex.key" \ + -subj "/CN=dex" \ + -out "$TMPDIR/dex.csr" 2>/dev/null + + # Sign the Dex cert with the CA, adding SAN for DNS:dex + cat > "$TMPDIR/dex.ext" <<'EXTEOF' + [SAN] + subjectAltName=DNS:dex + EXTEOF + openssl x509 -req -in "$TMPDIR/dex.csr" \ + -CA "$TMPDIR/ca.crt" -CAkey "$TMPDIR/ca.key" -CAcreateserial \ + -out "$TMPDIR/dex.crt" -days 365 -sha256 \ + -extfile "$TMPDIR/dex.ext" -extensions SAN 2>/dev/null + + # Store CA cert and Dex TLS cert/key in Kubernetes Secrets + kubectl create secret generic dex-tls \ + --from-file=tls.crt="$TMPDIR/dex.crt" \ + --from-file=tls.key="$TMPDIR/dex.key" + + kubectl create secret generic dex-ca \ + --from-file=ca.crt="$TMPDIR/ca.crt" + + rm -rf "$TMPDIR" + echo "✓ Dex TLS certificate generated" + + echo "==========================================" + echo "Deploying Dex OIDC Provider" + echo "==========================================" + + kubectl apply -f - <<'EOFYAML' + apiVersion: v1 + kind: ConfigMap + metadata: + name: dex-config + data: + config.yaml: | + issuer: https://dex:5556 + + storage: + type: memory + + web: + https: 0.0.0.0:5556 + tlsCert: /etc/dex/tls/tls.crt + tlsKey: /etc/dex/tls/tls.key + + logger: + level: debug + + enablePasswordDB: true + + staticPasswords: + - email: builder@example.com + hash: "$2y$10$6cZyh42YBb2Q5.fE9nj3mu3UHs21NUmVP2fpU63EM3/ecJuHVscEC" + username: builder + userID: "builder" + - email: readonly@example.com + hash: "$2y$10$6cZyh42YBb2Q5.fE9nj3mu3UHs21NUmVP2fpU63EM3/ecJuHVscEC" + username: readonly + userID: "readonly" + + staticClients: + - id: cts-integration + secret: cts-integration-secret + name: CTS Integration + redirectURIs: + - http://cts:8080/redirect_uri + grantTypes: + - password + - authorization_code + + oauth2: + skipApprovalScreen: true + passwordConnector: local + EOFYAML + + kubectl apply -f - <<'EOFYAML' + apiVersion: apps/v1 + kind: Deployment + metadata: + name: dex + labels: + app: dex + spec: + replicas: 1 + selector: + matchLabels: + app: dex + template: + metadata: + labels: + app: dex + spec: + containers: + - name: dex + image: ghcr.io/dexidp/dex:v2.41.1 + args: ["dex", "serve", "/etc/dex/config.yaml"] + ports: + - containerPort: 5556 + name: https + resources: + requests: + memory: "64Mi" + cpu: "50m" + limits: + memory: "256Mi" + cpu: "500m" + readinessProbe: + httpGet: + path: /.well-known/openid-configuration + port: 5556 + scheme: HTTPS + initialDelaySeconds: 5 + periodSeconds: 5 + timeoutSeconds: 5 + failureThreshold: 12 + volumeMounts: + - name: dex-config + mountPath: /etc/dex + readOnly: true + - name: dex-tls + mountPath: /etc/dex/tls + readOnly: true + volumes: + - name: dex-config + configMap: + name: dex-config + - name: dex-tls + secret: + secretName: dex-tls + --- + apiVersion: v1 + kind: Service + metadata: + name: dex + labels: + app: dex + spec: + ports: + - port: 5556 + targetPort: 5556 + name: https + selector: + app: dex + EOFYAML + + echo "Waiting for Dex to be ready..." + if ! kubectl wait --for=condition=available --timeout=180s deployment/dex; then + echo "Dex deployment failed! Debug info:" + kubectl describe deployment dex + kubectl describe pod -l app=dex + kubectl logs -l app=dex --tail=50 || echo "No logs available" + exit 1 + fi + echo "✓ Dex is ready" + params: + - name: kubeconfig-secret + value: $(tasks.provision-environment.results.secretRef) + - name: deploy-database runAfter: - provision-environment @@ -215,6 +595,8 @@ spec: - name: deploy-cts runAfter: - deploy-database + - deploy-openldap + - deploy-dex taskSpec: params: - name: kubeconfig-secret @@ -241,7 +623,7 @@ spec: echo "Image: $IMAGE" # Create ConfigMap with production configuration and httpd config for CTS - kubectl apply -f - < WSGIProcessGroup cts WSGIApplicationGroup %{GLOBAL} + + AuthType auth-openidc + # Dex access tokens do not include a scope claim in the JWT payload. + # Inject it here so load_openidc_user's validate_scopes() check passes. + # Scoped to Bearer requests only so that Kerberos requests (which lack + # OIDC_ environ variables) are still routed to load_krb_user_from_request + # by load_oidc_or_krb_user_from_request instead of always hitting the + # OIDC branch. + SetEnv OIDC_CLAIM_scope "openid email" + + + AuthType openid-connect + + + + Require expr %{REQUEST_URI} !~ /userinfo/ + Require method GET + + Require valid-user + - EOF + EOFYAML # Deploy CTS kubectl apply -f - < /tmp/dex-ca.crt + + echo 'Installing pytest and requests...' python3 -m ensurepip - python3 -m pip install --user pytest + python3 -m pip install --target /tmp/test-deps --quiet pytest requests echo '' echo 'Cloning repository...' @@ -492,7 +930,7 @@ spec: echo '' echo 'Running pytest...' - CTS_URL=http://cts:8080 python3 -m pytest tests/test_integration_api.py -v -s -o addopts= + PYTHONPATH=/tmp/test-deps REQUESTS_CA_BUNDLE=/tmp/dex-ca.crt CTS_URL=http://cts:8080 AUTH_BACKEND=oidc_or_kerberos DEX_URL=https://dex:5556 python3 -m pytest tests/test_integration_api.py -v -s -o addopts= " TEST_RESULT=$? set -e diff --git a/tests/test_integration_api.py b/tests/test_integration_api.py index b270ad76..7c4d8fa6 100755 --- a/tests/test_integration_api.py +++ b/tests/test_integration_api.py @@ -7,16 +7,37 @@ In CI, the tests run in a pod deployed to the same namespace as the CTS service, so they can access it directly via: http://cts:5005 + +For OIDC auth tests, set AUTH_BACKEND=openidc or AUTH_BACKEND=oidc_or_kerberos. +These tests are skipped when AUTH_BACKEND is "noauth" or unset. """ import json import os +import ssl +import urllib.parse from urllib.request import urlopen, Request from urllib.error import HTTPError, URLError import pytest +def _make_ssl_context(): + """Return an ssl.SSLContext that trusts the CA in REQUESTS_CA_BUNDLE (if set). + + urllib.request.urlopen does not honour the REQUESTS_CA_BUNDLE environment + variable (only the *requests* library does). When the integration tests run + against a Dex instance that uses a self-signed certificate, we need to load + the CA cert explicitly so that the ROPC token requests succeed. + """ + ca_bundle = os.environ.get("REQUESTS_CA_BUNDLE") or os.environ.get("SSL_CERT_FILE") + if ca_bundle: + ctx = ssl.create_default_context(cafile=ca_bundle) + else: + ctx = ssl.create_default_context() + return ctx + + class HTTPClient: """Simple HTTP client for making requests to the CTS API""" @@ -46,7 +67,7 @@ def _request(self, method, path, json_data=None): if e.headers.get("Content-Type", "").startswith("application/json"): return e.code, json.loads(error_data) return e.code, error_data.decode("utf-8") - except: + except Exception: return e.code, None except URLError as e: raise Exception(f"Failed to connect to {url}: {e}") @@ -68,6 +89,74 @@ def delete(self, path): return self._request("DELETE", path) +class AuthHTTPClient(HTTPClient): + """HTTP client that injects an Authorization: Bearer header on every request.""" + + def __init__(self, base_url, token): + super().__init__(base_url) + self.token = token + + def _request(self, method, path, json_data=None): + url = f"{self.base_url}{path}" + req = Request(url, method=method) + req.add_header("Authorization", f"Bearer {self.token}") + if json_data: + req.add_header("Content-Type", "application/json") + req.data = json.dumps(json_data).encode("utf-8") + + try: + with urlopen(req, timeout=10) as response: + data = response.read() + if response.headers.get("Content-Type", "").startswith( + "application/json" + ): + return response.status, json.loads(data) + return response.status, data.decode("utf-8") + except HTTPError as e: + try: + error_data = e.read() + if e.headers.get("Content-Type", "").startswith("application/json"): + return e.code, json.loads(error_data) + return e.code, error_data.decode("utf-8") + except Exception: + return e.code, None + except URLError as e: + raise Exception(f"Failed to connect to {url}: {e}") + + +def _get_oidc_token(username, password): + """Obtain an OIDC access token from Dex via the Resource Owner Password Credentials grant.""" + dex_base_url = os.environ.get("DEX_URL", "https://dex:5556") + dex_token_url = f"{dex_base_url}/token" + payload = urllib.parse.urlencode( + { + "grant_type": "password", + "client_id": "cts-integration", + "client_secret": "cts-integration-secret", + "username": username, + "password": password, + "scope": "openid email", + } + ).encode("utf-8") + + req = Request(dex_token_url, data=payload, method="POST") + req.add_header("Content-Type", "application/x-www-form-urlencoded") + try: + with urlopen(req, timeout=10, context=_make_ssl_context()) as resp: + token_data = json.loads(resp.read()) + return token_data["access_token"] + except HTTPError as e: + error_body = e.read().decode("utf-8") + raise Exception( + f"Failed to obtain token for {username}: HTTP {e.code}: {error_body}" + ) + + +def _is_oidc_backend(): + """Return True when AUTH_BACKEND is set to an OIDC-enabled value.""" + return os.environ.get("AUTH_BACKEND") in ("openidc", "oidc_or_kerberos") + + @pytest.fixture(scope="module") def http_client(): """HTTP client fixture that reads CTS_URL from environment""" @@ -80,6 +169,54 @@ def http_client(): return HTTPClient(base_url=base_url) +@pytest.fixture(scope="module") +def write_http_client(): + """HTTP client with write access. + + Returns an AuthHTTPClient authenticated as 'builder' when AUTH_BACKEND is an + OIDC-enabled value (openidc or oidc_or_kerberos), or a plain HTTPClient when + running in noauth mode. Workflow tests that POST, PATCH, or DELETE should use + this fixture so they work in both configurations. + """ + base_url = os.environ.get("CTS_URL") + if not base_url: + pytest.skip("Must set CTS_URL environment variable") + + if _is_oidc_backend(): + token = _get_oidc_token("builder@example.com", "password") + return AuthHTTPClient(base_url=base_url, token=token) + + return HTTPClient(base_url=base_url) + + +@pytest.fixture(scope="module") +def auth_http_client_builder(): + """AuthHTTPClient authenticated as the 'builder' user (has ALLOWED_BUILDERS access).""" + if not _is_oidc_backend(): + pytest.skip("OIDC auth tests require AUTH_BACKEND=openidc or oidc_or_kerberos") + + base_url = os.environ.get("CTS_URL") + if not base_url: + pytest.skip("Must set CTS_URL environment variable") + + token = _get_oidc_token("builder@example.com", "password") + return AuthHTTPClient(base_url=base_url, token=token) + + +@pytest.fixture(scope="module") +def auth_http_client_readonly(): + """AuthHTTPClient authenticated as the 'readonly' user (no write permissions).""" + if not _is_oidc_backend(): + pytest.skip("OIDC auth tests require AUTH_BACKEND=openidc or oidc_or_kerberos") + + base_url = os.environ.get("CTS_URL") + if not base_url: + pytest.skip("Must set CTS_URL environment variable") + + token = _get_oidc_token("readonly@example.com", "password") + return AuthHTTPClient(base_url=base_url, token=token) + + def _create_compose_info( release_short, release_version, date, compose_type="test", respin=1 ): @@ -231,18 +368,20 @@ def test_composes_list(http_client): print(f" Found {len(data['items'])} composes") -def test_composes_pagination(http_client): +def test_composes_pagination(write_http_client): """Test that pagination parameters work correctly""" # Import 3 test composes compose_ids = [] for i in range(1, 4): - response = import_compose(http_client, "PaginationTest", "1.0", f"2025010{i}") + response = import_compose( + write_http_client, "PaginationTest", "1.0", f"2025010{i}" + ) compose_ids.append(response["payload"]["compose"]["id"]) print(f" Imported {len(compose_ids)} composes for pagination test") # Test page 1 with per_page=2 - status, data = http_client.get("/api/1/composes/?page=1&per_page=2") + status, data = write_http_client.get("/api/1/composes/?page=1&per_page=2") assert status == 200 assert isinstance(data, dict) assert "items" in data @@ -256,7 +395,7 @@ def test_composes_pagination(http_client): print(f" Page 1 (per_page=2): {len(data['items'])} items, total: {total}") # Test page 2 with per_page=2 - should have 1 item (we imported 3 total) - status, data = http_client.get("/api/1/composes/?page=2&per_page=2") + status, data = write_http_client.get("/api/1/composes/?page=2&per_page=2") assert status == 200 assert "items" in data assert ( @@ -292,11 +431,11 @@ def test_404_handling(http_client): # Workflow tests -def test_workflow_tag_creation(http_client): +def test_workflow_tag_creation(write_http_client): """Test creating a tag and managing taggers/untaggers""" # Step 1: Create a tag data = create_tag( - http_client, + write_http_client, "integration-test-tag", "Tag created during integration testing", "https://example.com/docs/integration-test", @@ -310,30 +449,30 @@ def test_workflow_tag_creation(http_client): print(f" 2. Initial taggers: {data['taggers']}, untaggers: {data['untaggers']}") # Step 2: Add a tagger - data = add_tagger(http_client, tag_id, "test-user") + data = add_tagger(write_http_client, tag_id, "test-user") print(f" 3. Added tagger 'test-user': taggers={data['taggers']}") # Step 3: Add an untagger - data = add_untagger(http_client, tag_id, "other-user") + data = add_untagger(write_http_client, tag_id, "other-user") assert "test-user" in data["taggers"] print(f" 4. Added untagger 'other-user': untaggers={data['untaggers']}") # Step 4: Add another tagger - data = add_tagger(http_client, tag_id, "another-user") + data = add_tagger(write_http_client, tag_id, "another-user") assert set(data["taggers"]) == {"test-user", "another-user"} print(f" 5. Added tagger 'another-user': taggers={data['taggers']}") # Step 5: Remove a tagger - data = remove_tagger(http_client, tag_id, "test-user") + data = remove_tagger(write_http_client, tag_id, "test-user") assert "another-user" in data["taggers"] print(f" 6. Removed tagger 'test-user': taggers={data['taggers']}") # Step 6: Remove the untagger - data = remove_untagger(http_client, tag_id, "other-user") + data = remove_untagger(write_http_client, tag_id, "other-user") print(f" 7. Removed untagger 'other-user': untaggers={data['untaggers']}") # Step 7: Verify final state - status, final_data = http_client.get(f"/api/1/tags/{tag_id}") + status, final_data = write_http_client.get(f"/api/1/tags/{tag_id}") assert status == 200 assert final_data["taggers"] == ["another-user"] assert final_data["untaggers"] == [] @@ -343,29 +482,29 @@ def test_workflow_tag_creation(http_client): print(" ✓ Tag creation and tagger/untagger management completed successfully") -def test_workflow_compose_import(http_client): +def test_workflow_compose_import(write_http_client): """Test importing a compose""" - data = import_compose(http_client, "IntegrationTest", "1.0", "20250101") + data = import_compose(write_http_client, "IntegrationTest", "1.0", "20250101") compose_id = data["payload"]["compose"]["id"] print(f" Imported compose: {compose_id}") -def test_workflow_respin_increment(http_client): +def test_workflow_respin_increment(write_http_client): """Test that respin numbers are automatically incremented for duplicate composes""" # Import first compose - response1 = import_compose(http_client, "RespinTest", "1.0", "20250102") + response1 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") compose_id1 = response1["payload"]["compose"]["id"] respin1 = response1["payload"]["compose"]["respin"] print(f" 1. First compose: {compose_id1} (respin: {respin1})") # Import second compose with same release/date - respin should auto-increment - response2 = import_compose(http_client, "RespinTest", "1.0", "20250102") + response2 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") compose_id2 = response2["payload"]["compose"]["id"] respin2 = response2["payload"]["compose"]["respin"] print(f" 2. Second compose: {compose_id2} (respin: {respin2})") # Import third compose - respin should increment again - response3 = import_compose(http_client, "RespinTest", "1.0", "20250102") + response3 = import_compose(write_http_client, "RespinTest", "1.0", "20250102") compose_id3 = response3["payload"]["compose"]["id"] respin3 = response3["payload"]["compose"]["respin"] print(f" 3. Third compose: {compose_id3} (respin: {respin3})") @@ -386,11 +525,11 @@ def test_workflow_respin_increment(http_client): print(f" ✓ Respin auto-increment verified: {respin1} → {respin2} → {respin3}") -def test_workflow_full_lifecycle(http_client): +def test_workflow_full_lifecycle(write_http_client): """Test complete workflow: create tag, import compose, tag it, untag it""" # Step 1: Create a tag tag_response = create_tag( - http_client, + write_http_client, "workflow-test", "Tag for workflow testing", "https://example.com/docs/workflow", @@ -400,34 +539,109 @@ def test_workflow_full_lifecycle(http_client): print(f" 1. Created tag: {tag_name} (ID: {tag_id})") # Step 2: Import a compose - compose_response = import_compose(http_client, "WorkflowTest", "1.0", "20250101") + compose_response = import_compose( + write_http_client, "WorkflowTest", "1.0", "20250101" + ) compose_id = compose_response["payload"]["compose"]["id"] print(f" 2. Imported compose: {compose_id}") # Verify compose has no tags initially - status, compose_data = http_client.get(f"/api/1/composes/{compose_id}") + status, compose_data = write_http_client.get(f"/api/1/composes/{compose_id}") assert status == 200 assert "tags" in compose_data initial_tags = compose_data.get("tags", []) print(f" 3. Initial tags: {initial_tags}") # Step 3: Tag the compose - tag_result = tag_compose(http_client, compose_id, tag_name) + tag_result = tag_compose(write_http_client, compose_id, tag_name) print(f" 4. Tagged compose with '{tag_name}': {tag_result.get('tags', [])}") # Step 4: Verify tag was applied - status, compose_data = http_client.get(f"/api/1/composes/{compose_id}") + status, compose_data = write_http_client.get(f"/api/1/composes/{compose_id}") assert status == 200 assert tag_name in compose_data.get("tags", []) print(f" 5. Verified tags: {compose_data.get('tags', [])}") # Step 5: Untag the compose - untag_result = untag_compose(http_client, compose_id, tag_name) + untag_result = untag_compose(write_http_client, compose_id, tag_name) print(f" 6. Untagged compose: {untag_result.get('tags', [])}") # Step 6: Verify tag was removed - status, compose_data = http_client.get(f"/api/1/composes/{compose_id}") + status, compose_data = write_http_client.get(f"/api/1/composes/{compose_id}") assert status == 200 assert tag_name not in compose_data.get("tags", []) print(f" 7. Final tags: {compose_data.get('tags', [])}") print(" ✓ Full workflow completed successfully") + + +# OIDC authentication tests +# These are skipped when AUTH_BACKEND is "noauth" or unset. + + +def test_auth_unauthenticated_write_returns_401(http_client): + """Unauthenticated POST to a write endpoint must return 401 when openidc is active.""" + if not _is_oidc_backend(): + pytest.skip("OIDC auth tests require AUTH_BACKEND=openidc or oidc_or_kerberos") + + compose_info = _create_compose_info("AuthTest", "1.0", "20260101") + status, data = http_client.post("/api/1/composes/", {"compose_info": compose_info}) + assert ( + status == 401 + ), f"Expected 401 for unauthenticated POST, got {status}. Response: {data}" + # Confirm the unauthenticated path is indeed blocked (positive + negative check) + assert status != 200, "Unauthenticated write must not succeed" + + +def test_auth_builder_can_post_compose(auth_http_client_builder): + """Authenticated 'builder' user (in ALLOWED_BUILDERS) can POST a compose.""" + if not _is_oidc_backend(): + pytest.skip("OIDC auth tests require AUTH_BACKEND=openidc or oidc_or_kerberos") + + compose_info = _create_compose_info("AuthBuilderTest", "1.0", "20260101") + status, data = auth_http_client_builder.post( + "/api/1/composes/", {"compose_info": compose_info} + ) + assert ( + status == 200 + ), f"Expected 200 for authenticated builder POST, got {status}. Response: {data}" + assert isinstance(data, dict) + assert "payload" in data + assert "compose" in data["payload"] + compose_id = data["payload"]["compose"]["id"] + assert compose_id, "Compose ID must be non-empty" + + +def test_auth_unauthorized_user_returns_403(auth_http_client_readonly): + """Authenticated 'readonly' user (not in ALLOWED_BUILDERS) gets 403 on write endpoints.""" + if not _is_oidc_backend(): + pytest.skip("OIDC auth tests require AUTH_BACKEND=openidc or oidc_or_kerberos") + + compose_info = _create_compose_info("AuthReadonlyTest", "1.0", "20260101") + status, data = auth_http_client_readonly.post( + "/api/1/composes/", {"compose_info": compose_info} + ) + assert ( + status == 403 + ), f"Expected 403 for readonly user POST, got {status}. Response: {data}" + # Confirm write is actually blocked (not silently succeeding) + assert status != 200, "Unauthorized user must not be able to write" + + +def test_auth_get_endpoints_accessible_without_token(http_client): + """GET endpoints remain accessible without authentication (mod_auth_openidc pass-through).""" + if not _is_oidc_backend(): + pytest.skip("OIDC auth tests require AUTH_BACKEND=openidc or oidc_or_kerberos") + + # Listing composes must work without a token + status, data = http_client.get("/api/1/composes/") + assert ( + status == 200 + ), f"Expected 200 for unauthenticated GET /api/1/composes/, got {status}" + assert "items" in data, "GET response must contain 'items' key" + + # Listing tags must also work without a token + status, data = http_client.get("/api/1/tags/") + assert ( + status == 200 + ), f"Expected 200 for unauthenticated GET /api/1/tags/, got {status}" + assert isinstance(data, dict), "GET /api/1/tags/ must return a dict"