Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 59 additions & 6 deletions maigret/submit.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@
from typing import Any, Dict, List, Optional, Tuple
from urllib.parse import urlparse

from aiohttp import ClientSession, TCPConnector
import aiohttp
from aiohttp import ClientSession, TCPConnector, CookieJar
from colorama import Fore, Style

from .activation import import_aiohttp_cookies
Expand Down Expand Up @@ -147,6 +148,38 @@ def extract_username_dialog(url):
)
return entered_username if entered_username else supposed_username

@staticmethod
def _parse_cookie_file(cookie_file: str) -> Optional[aiohttp.CookieJar]:
"""Parse a Netscape/Mozilla-format cookies.txt file into an aiohttp CookieJar.

Returns None if the file does not exist or cannot be read.
"""
if not cookie_file or not os.path.exists(cookie_file):
return None
try:
return import_aiohttp_cookies(cookie_file)
except Exception:
return None

def _inject_cookie_headers(
self, headers: Optional[dict], cookie_jar: Optional[aiohttp.CookieJar], url: str
) -> dict:
"""Merge cookies applicable to *url* from *cookie_jar* into *headers*."""
effective_headers = dict(headers) if headers else {}
if not cookie_jar:
return effective_headers
cookie_strs = [
f"{cookie.key}={cookie.value}"
for cookie in cookie_jar
if cookie.value # skip empty/placeholder values
]
if cookie_strs:
existing = effective_headers.get("Cookie", "")
effective_headers["Cookie"] = (
f"{existing}; {';'.join(cookie_strs)}" if existing else "; ".join(cookie_strs)
)
return effective_headers

# TODO: replace with checking.py/SimpleAiohttpChecker call
@staticmethod
async def get_html_response_to_compare(
Expand All @@ -170,7 +203,7 @@ async def check_features_manually(
self,
username: str,
url_exists: str,
cookie_filename="", # TODO: use cookies
cookie_filename="",
session: Optional[ClientSession] = None,
follow_redirects=False,
headers: Optional[dict] = None,
Expand All @@ -181,17 +214,37 @@ async def check_features_manually(
username.lower(), random_username
)

cookie_jar = self._parse_cookie_file(cookie_filename)

# If a cookie file is provided, use it to build request headers.
# The cookie jar is passed directly to get_html_response_to_compare via
# a session that is configured with that jar, overriding any
# pre-configured session cookie state for this manual check.
_session = session or self.session
_own_session = False
try:
session = session or self.session
if cookie_jar is not None:
# Build a session with the derived cookie jar and the same
# connector / SSL config as the default session.
ssl_ctx = __import__("ssl").create_default_context()
ssl_ctx.check_hostname = False
ssl_ctx.verify_mode = __import__("ssl").CERT_NONE
connector = TCPConnector(ssl=ssl_ctx)
_session = ClientSession(cookie_jar=cookie_jar, connector=connector, trust_env=True)
_own_session = True
headers_for_url = self._inject_cookie_headers(headers, cookie_jar, url_exists)
first_html_response, first_status = await self.get_html_response_to_compare(
url_exists, session, follow_redirects, headers
url_exists, _session, follow_redirects, headers_for_url
)
headers_for_404 = self._inject_cookie_headers(headers, cookie_jar, url_of_non_existing_account)
second_html_response, second_status = (
await self.get_html_response_to_compare(
url_of_non_existing_account, session, follow_redirects, headers
url_of_non_existing_account, _session, follow_redirects, headers_for_404
)
)
await session.close()
if _own_session:
await _session.close()
_session = None
except Exception as e:
self.logger.error(
f"Error while getting HTTP response for username {username}: {e}",
Expand Down