Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pr_agent/algo/cli_args.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ def validate_user_args(args: list) -> (bool, str):

# decode forbidden args
# b64encode('word'.encode()).decode()
_encoded_args = 'c2hhcmVkX3NlY3JldA==:dXNlcg==:c3lzdGVt:ZW5hYmxlX2NvbW1lbnRfYXBwcm92YWw=:ZW5hYmxlX21hbnVhbF9hcHByb3ZhbA==:ZW5hYmxlX2F1dG9fYXBwcm92YWw=:YXBwcm92ZV9wcl9vbl9zZWxmX3Jldmlldw==:YmFzZV91cmw=:dXJs:YXBwX25hbWU=:c2VjcmV0X3Byb3ZpZGVy:Z2l0X3Byb3ZpZGVy:c2tpcF9rZXlz:b3BlbmFpLmtleQ==:QU5BTFlUSUNTX0ZPTERFUg==:dXJp:YXBwX2lk:d2ViaG9va19zZWNyZXQ=:YmVhcmVyX3Rva2Vu:UEVSU09OQUxfQUNDRVNTX1RPS0VO:b3ZlcnJpZGVfZGVwbG95bWVudF90eXBl:cHJpdmF0ZV9rZXk=:bG9jYWxfY2FjaGVfcGF0aA==:ZW5hYmxlX2xvY2FsX2NhY2hl:amlyYV9iYXNlX3VybA==:YXBpX2Jhc2U=:YXBpX3R5cGU=:YXBpX3ZlcnNpb24=:c2tpcF9rZXlz'
_encoded_args = 'c2hhcmVkX3NlY3JldA==:dXNlcg==:c3lzdGVt:ZW5hYmxlX2NvbW1lbnRfYXBwcm92YWw=:ZW5hYmxlX21hbnVhbF9hcHByb3ZhbA==:ZW5hYmxlX2F1dG9fYXBwcm92YWw=:YXBwcm92ZV9wcl9vbl9zZWxmX3Jldmlldw==:YmFzZV91cmw=:dXJs:YXBwX25hbWU=:c2VjcmV0X3Byb3ZpZGVy:Z2l0X3Byb3ZpZGVy:c2tpcF9rZXlz:b3BlbmFpLmtleQ==:QU5BTFlUSUNTX0ZPTERFUg==:dXJp:YXBwX2lk:d2ViaG9va19zZWNyZXQ=:YmVhcmVyX3Rva2Vu:UEVSU09OQUxfQUNDRVNTX1RPS0VO:b3ZlcnJpZGVfZGVwbG95bWVudF90eXBl:cHJpdmF0ZV9rZXk=:bG9jYWxfY2FjaGVfcGF0aA==:ZW5hYmxlX2xvY2FsX2NhY2hl:amlyYV9iYXNlX3VybA==:YXBpX2Jhc2U=:YXBpX3R5cGU=:YXBpX3ZlcnNpb24=:c2tpcF9rZXlz:cHJfaGVscF9kb2NzLnJlcG9fdXJs:cHJfaGVscF9kb2NzLnJlcG9fZGVmYXVsdF9icmFuY2g=:cHJfaGVscF9kb2NzLmRvY3NfcGF0aA=='

forbidden_cli_args = []
for e in _encoded_args.split(':'):
Expand Down
16 changes: 7 additions & 9 deletions pr_agent/git_providers/bitbucket_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -624,24 +624,22 @@ def get_pr_labels(self, update=False):
pass
#Clone related
def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
if "bitbucket.org" not in repo_url_to_clone:
get_logger().error("Repo URL is not a valid bitbucket URL.")
return None

(scheme, base_url) = repo_url_to_clone.split("bitbucket.org")
if not all([scheme, base_url]):
get_logger().error(f"repo_url_to_clone: {repo_url_to_clone} is not a valid bitbucket URL.")
# Validate the requested url against bitbucket.org exactly, rather than a weak substring
# check which would let 'bitbucket.org.attacker.tld' leak the token (issue #2445).
repo_path = self._validate_clone_url_and_extract_path(repo_url_to_clone, "bitbucket.org")
if not repo_path:
return None

if self.auth_type == "basic":
# Basic auth with token
clone_url = f"{scheme}x-token-auth:{self.basic_token}@bitbucket.org{base_url}"
token = self.basic_token
elif self.auth_type == "bearer":
# Bearer token
clone_url = f"{scheme}x-token-auth:{self.bearer_token}@bitbucket.org{base_url}"
token = self.bearer_token
else:
# This case should ideally not be reached if __init__ validates auth_type
get_logger().error(f"Unsupported or uninitialized auth_type: {getattr(self, 'auth_type', 'N/A')}. Returning None")
return None

clone_url = f"https://x-token-auth:{token}@bitbucket.org{repo_path}"
return clone_url
30 changes: 23 additions & 7 deletions pr_agent/git_providers/bitbucket_server_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -540,15 +540,25 @@ def _get_merge_base(self):
return f"rest/api/latest/projects/{self.workspace_slug}/repos/{self.repo_slug}/pull-requests/{self.pr_num}/merge-base"
# Clone related
def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
if 'bitbucket.' not in repo_url_to_clone:
get_logger().error("Repo URL is not a valid bitbucket URL.")
return None
bearer_token = self.bearer_token
if not bearer_token:
get_logger().error("No bearer token provided. Returning None")
return None
# Return unmodified URL as the token is passed via HTTP headers in _clone_inner, as seen below.
return repo_url_to_clone

# The bearer token is sent as an Authorization header to whatever host this URL points at
# (see _clone_inner). A weak substring check ("bitbucket." in url) would let
# 'bitbucket.attacker.tld' receive the token (issue #2445), so validate the host exactly
# against the configured Bitbucket Server and rebuild the URL from its trusted authority.
base_parsed = urlparse(self.bitbucket_server_url) if self.bitbucket_server_url else None
if not base_parsed or not base_parsed.hostname:
get_logger().error("Missing or unparseable bitbucket server url. Returning None")
return None
repo_path = self._validate_clone_url_and_extract_path(repo_url_to_clone, base_parsed.hostname)
if not repo_path:
return None

scheme = (base_parsed.scheme or "https") + "://"
return f"{scheme}{base_parsed.netloc}{repo_path}"

#Overriding the shell command, since for some reason usage of x-token-auth doesn't work, as mentioned here:
# https://stackoverflow.com/questions/56760396/cloning-bitbucket-server-repo-with-access-tokens
Expand All @@ -558,8 +568,14 @@ def _clone_inner(self, repo_url: str, dest_folder: str, operation_timeout_in_sec
#Shouldn't happen since this is checked in _prepare_clone, therefore - throwing an exception.
raise RuntimeError(f"Bearer token is required!")

cli_args = shlex.split(f"git clone -c http.extraHeader='Authorization: Bearer {bearer_token}' "
f"--filter=blob:none --depth 1 {repo_url} {dest_folder}")
# Build argv as a list (not shlex.split of an f-string) so neither the token nor the repo url
# can be re-parsed into extra git arguments (issue #2445 hardening).
cli_args = [
"git", "clone",
"-c", f"http.extraHeader=Authorization: Bearer {bearer_token}",
"--filter=blob:none", "--depth", "1",
repo_url, dest_folder,
]

ssl_env = get_git_ssl_env()

Expand Down
43 changes: 43 additions & 0 deletions pr_agent/git_providers/git_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import shutil
import subprocess
from typing import Optional, Tuple
from urllib.parse import urlparse

from pr_agent.algo.types import FilePatchInfo
from pr_agent.algo.utils import Range, process_description
Expand Down Expand Up @@ -109,6 +110,48 @@ def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
get_logger().warning("Not implemented! Returning None")
return None

@staticmethod
def _validate_clone_url_and_extract_path(repo_url_to_clone: str, allowed_host: str) -> Optional[str]:
"""Structurally validate a clone URL against a single allowed host and return its path.

Providers embed an auth token into the clone URL (or an Authorization header). A naive
substring check such as ``allowed_host in repo_url_to_clone`` lets an attacker-controlled
host that merely *contains* the allowed host pass validation, e.g. ``github.com.attacker.tld``
or ``https://github.com@attacker.tld/...``, causing the token to be sent to that host
(issue #2445). This helper parses the URL and requires the host to match ``allowed_host``
exactly, rejecting embedded credentials and non-http(s) schemes.

Returns the repository path (leading slash included) on success, or None if the URL must
be rejected. The caller is responsible for rebuilding the clone URL from a trusted host.
"""
if not allowed_host:
get_logger().error("No allowed host configured for clone url validation")
return None
normalized_url = (repo_url_to_clone or "").strip()
if "://" not in normalized_url: # tolerate scheme-less inputs like 'github.com/owner/repo'
normalized_url = "https://" + normalized_url
parsed = urlparse(normalized_url)
if parsed.scheme not in ("http", "https"):
get_logger().error(f"url to clone: {repo_url_to_clone} has an unsupported scheme")
return None
# Reject embedded credentials (userinfo) to avoid 'https://allowed_host@attacker.tld/...' tricks.
if parsed.username or parsed.password:
get_logger().error(f"url to clone: {repo_url_to_clone} must not contain embedded credentials")
return None
if (parsed.hostname or "").lower() != allowed_host.lower():
get_logger().error(f"url to clone: {repo_url_to_clone} host does not match allowed host: {allowed_host}")
return None
repo_path = parsed.path
if not repo_path or repo_path == "/":
get_logger().error(f"url to clone: {repo_url_to_clone} is malformed - missing repository path")
return None
# A legitimate repository path never contains whitespace/control chars. Rejecting them prevents
# argument injection where the path is later interpolated into a 'git clone ...' command string.
if any(ch.isspace() for ch in repo_path):
get_logger().error(f"url to clone: {repo_url_to_clone} path contains whitespace")
return None
return repo_path

# Does a shallow clone, using a forked process to support a timeout guard.
# In case operation has failed, it is expected to throw an exception as this method does not return a value.
def _clone_inner(self, repo_url: str, dest_folder: str, operation_timeout_in_seconds: int=None) -> None:
Expand Down
21 changes: 8 additions & 13 deletions pr_agent/git_providers/gitea_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -717,25 +717,20 @@ def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:

gitea_token = self.gitea_access_token
gitea_base_url = self.base_url
scheme = gitea_base_url.split("://")[0]
scheme += "://"
if not all([gitea_token, gitea_base_url]):
get_logger().error("Either missing auth token or missing base url")
return None
base_url = gitea_base_url.split(scheme)[1]
if not base_url:
get_logger().error(f"Base url: {gitea_base_url} has an empty base url")
return None
if base_url not in repo_url_to_clone:
get_logger().error(f"url to clone: {repo_url_to_clone} does not contain {base_url}")
return None
repo_full_name = repo_url_to_clone.split(base_url)[-1]

# Validate the requested url against the configured gitea host exactly, rather than a weak
# substring check which would let 'gitea.com.attacker.tld' leak the token (issue #2445).
base_parsed = urlparse(gitea_base_url)
repo_full_name = self._validate_clone_url_and_extract_path(repo_url_to_clone, base_parsed.hostname)
if not repo_full_name:
get_logger().error(f"url to clone: {repo_url_to_clone} is malformed")
return None

clone_url = scheme
clone_url += f"{gitea_token}@{base_url}{repo_full_name}"
# Rebuild from the trusted authority (host[:port]) so the token cannot reach a different host.
scheme = (base_parsed.scheme or "https") + "://"
clone_url = f"{scheme}{gitea_token}@{base_parsed.netloc}{repo_full_name}"
return clone_url

class RepoApi(giteapy.RepositoryApi):
Expand Down
21 changes: 8 additions & 13 deletions pr_agent/git_providers/github_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -1207,23 +1207,18 @@ def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
if not all([github_token, github_base_url]):
get_logger().error("Either missing auth token or missing base url")
return None
if scheme not in github_base_url:
get_logger().error(f"Base url: {github_base_url} is missing prefix: {scheme}")
return None
github_com = github_base_url.split(scheme)[1] # e.g. 'github.com' or github.<org>.com
if not github_com:
get_logger().error(f"Base url: {github_base_url} has an empty base url")
return None
if github_com not in repo_url_to_clone:
get_logger().error(f"url to clone: {repo_url_to_clone} does not contain {github_com}")
return None
repo_full_name = repo_url_to_clone.split(github_com)[-1]

# Determine the single host we are allowed to clone from (e.g. 'github.com' or 'github.<org>.com')
# and validate the requested url against it, rather than a weak substring check (issue #2445).
base_parsed = urlparse(github_base_url)
repo_full_name = self._validate_clone_url_and_extract_path(repo_url_to_clone, base_parsed.hostname)
if not repo_full_name:
get_logger().error(f"url to clone: {repo_url_to_clone} is malformed")
return None

# Always rebuild the clone url from the trusted authority (host[:port]) so the token can never
# reach a different host, while preserving any non-standard port of the configured base url.
clone_url = scheme
if self.deployment_type == 'app':
clone_url += "git:"
clone_url += f"{github_token}@{github_com}{repo_full_name}"
clone_url += f"{github_token}@{base_parsed.netloc}{repo_full_name}"
return clone_url
20 changes: 12 additions & 8 deletions pr_agent/git_providers/gitlab_provider.py
Original file line number Diff line number Diff line change
Expand Up @@ -955,14 +955,16 @@ def generate_link_to_relevant_line_number(self, suggestion) -> str:
return ""
#Clone related
def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
if "gitlab." not in repo_url_to_clone:
get_logger().error(f"Repo URL: {repo_url_to_clone} is not a valid gitlab URL.")
return None
(scheme, base_url) = repo_url_to_clone.split("gitlab.")
access_token = getattr(self.gl, 'oauth_token', None) or getattr(self.gl, 'private_token', None)
if not all([scheme, access_token, base_url]):
get_logger().error(f"Either no access token found, or repo URL: {repo_url_to_clone} "
f"is missing prefix: {scheme} and/or base URL: {base_url}.")
if not access_token:
get_logger().error("No access token found")
return None

# Validate the requested url against the configured gitlab host exactly, rather than a weak
# substring check ("gitlab." in url) which would let 'gitlab.attacker.tld' leak the token (issue #2445).
base_parsed = urlparse(self.gitlab_url)
repo_full_name = self._validate_clone_url_and_extract_path(repo_url_to_clone, base_parsed.hostname)
if not repo_full_name:
return None

#Note that the ""official"" method found here:
Expand All @@ -972,5 +974,7 @@ def _prepare_clone_url_with_token(self, repo_url_to_clone: str) -> str | None:
# For example: For repo url: https://gitlab.codium-inc.com/qodo/autoscraper.git
# Then to clone one will issue: 'git clone https://oauth2:<access token>@gitlab.codium-inc.com/qodo/autoscraper.git'

clone_url = f"{scheme}oauth2:{access_token}@gitlab.{base_url}"
# Rebuild from the trusted authority (host[:port]) so the token cannot reach a different host.
scheme = (base_parsed.scheme or "https") + "://"
clone_url = f"{scheme}oauth2:{access_token}@{base_parsed.netloc}{repo_full_name}"
Comment on lines +977 to +979

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Action required

1. Gitlab clone url embeds token 📎 Requirement gap ⛨ Security

The code constructs clone URLs for multiple git providers (GitLab, GitHub, Gitea, and Bitbucket
Cloud) by embedding provider access tokens directly in the URL (e.g., https://oauth2:<*******t;@... /
https://<token>@... / https://x-token-auth:<*******t;@...). This violates the requirement to avoid
placing secrets in clone URLs where they can leak via logs, process arguments, or stored git remote
strings.
Agent Prompt
## Issue description
Several provider implementations build git clone URLs by embedding authentication tokens directly in the URL (GitLab `oauth2:<token>@...`, GitHub/Gitea `<token>@...`, Bitbucket Cloud `x-token-auth:<token>@...`), which violates compliance requirements.

## Issue Context
Compliance (PR Compliance ID 10) requires that tokens/secrets are not embedded in URLs used for git operations because they can be exposed via logs, process lists/args, or persisted git remote strings. Prefer authentication approaches that keep secrets out of URLs, such as git credential helpers or `http.extraHeader`-based auth.

## Fix Focus Areas
- pr_agent/git_providers/gitlab_provider.py[977-980]
- pr_agent/git_providers/github_provider.py[1220-1223]
- pr_agent/git_providers/gitea_provider.py[731-734]
- pr_agent/git_providers/bitbucket_provider.py[633-645]

ⓘ Copy this prompt and use it to remediate the issue with your preferred AI generation tools

return clone_url
Loading
Loading