Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion maigret/activation.py
Original file line number Diff line number Diff line change
Expand Up @@ -149,7 +149,7 @@ def import_aiohttp_cookies(cookiestxt_filename):

cookies_list = []
for domain in cookies_obj._cookies.values(): # type: ignore[attr-defined]
for key, cookie in list(domain.values())[0].items():
for key, cookie in next(iter(domain.values())).items():
c: Morsel = Morsel()
c.set(key, cookie.value, cookie.value)
c["domain"] = cookie.domain
Expand Down
257 changes: 122 additions & 135 deletions maigret/checking.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@
import ssl
import sys
from typing import Any, Dict, List, Optional, Tuple
from unittest.mock import Mock
from urllib.parse import quote
from maigret.error_detection import detect_error_page

# Third party imports
import aiodns
Expand All @@ -21,6 +21,18 @@
ClientConnectorError,
ServerDisconnectedError,
)
from python_socks import _errors as proxy_errors
from socid_extractor import extract # type: ignore[import-not-found]

# Local imports
from . import errors
from .activation import ParsingActivator, import_aiohttp_cookies
from .error_detection import detect_error_page
from .errors import CheckError
from .executors import AsyncioQueueGeneratorExecutor
from .result import MaigretCheckResult, MaigretCheckStatus, KeywordMatchStatus, SiteResult
from .sites import MaigretDatabase, MaigretSite
from .utils import ascii_data_display, get_random_user_agent, is_plausible_username


_DNS_ERROR_MARKERS = (
Expand All @@ -44,19 +56,6 @@ def _is_dns_error(exc: Exception) -> bool:
return True
text = str(exc).lower()
return any(m in text for m in _DNS_ERROR_MARKERS)
from python_socks import _errors as proxy_errors
from socid_extractor import extract # type: ignore[import-not-found]

from unittest.mock import Mock

# Local imports
from . import errors
from .activation import ParsingActivator, import_aiohttp_cookies
from .errors import CheckError
from .executors import AsyncioQueueGeneratorExecutor
from .result import MaigretCheckResult, MaigretCheckStatus, KeywordMatchStatus, SiteResult
from .sites import MaigretDatabase, MaigretSite
from .utils import ascii_data_display, get_random_user_agent, is_plausible_username


SUPPORTED_IDS = (
Expand Down Expand Up @@ -167,26 +166,20 @@ async def _make_request(
self, session, url, headers, allow_redirects, timeout, method, logger, payload=None
) -> Tuple[Optional[str], int, Optional[CheckError]]:
try:
if method.lower() == 'get':
request_method = session.get
elif method.lower() == 'post':
request_method = session.post
elif method.lower() == 'head':
request_method = session.head
else:
request_method = session.get
method = method.lower()
request_method = getattr(
session, method if method in ('get', 'post', 'head') else 'get'
)

kwargs = {
'url': url,
'headers': headers,
'allow_redirects': allow_redirects,
'timeout': timeout,
}
if payload and method.lower() == 'post':
if headers and headers.get('Content-Type') == 'application/x-www-form-urlencoded':
kwargs['data'] = payload
else:
kwargs['json'] = payload
if payload and method == 'post':
is_form = headers and headers.get('Content-Type') == 'application/x-www-form-urlencoded'
kwargs['data' if is_form else 'json'] = payload

async with request_method(**kwargs) as response:
status_code = response.status
Expand All @@ -213,14 +206,11 @@ async def _make_request(
except KeyboardInterrupt:
return None, 0, CheckError("Interrupted")
except Exception as e:
if sys.version_info.minor > 6 and (
isinstance(e, ssl.SSLCertVerificationError)
or isinstance(e, ssl.SSLError)
):
is_ssl = sys.version_info.minor > 6 and isinstance(e, (ssl.SSLCertVerificationError, ssl.SSLError))
if is_ssl:
return None, 0, CheckError("SSL", str(e))
else:
logger.debug(e, exc_info=True)
return None, 0, CheckError("Unexpected", str(e))
logger.debug(e, exc_info=True)
return None, 0, CheckError("Unexpected", str(e))

async def check(self) -> Tuple[Optional[str], int, Optional[CheckError]]:
from aiohttp_socks import ProxyConnector
Expand Down Expand Up @@ -696,21 +686,17 @@ def process_site_result(
# Keyword detection logic
keywords = results_info.get("keywords", [])
keyword_match_status = None

if keywords and html_text:
keywords_found = []
for keyword in keywords:
if keyword.lower() in html_text.lower():
keywords_found.append(keyword)


if not keywords or not html_text:
keyword_match_status = KeywordMatchStatus.NO_KEYWORDS
else:
keywords_found = [k for k in keywords if k.lower() in html_text.lower()]
if keywords_found:
keyword_match_status = KeywordMatchStatus.KEYWORD_FOUND
logger.debug(f"Keywords found in {site.name}: {keywords_found}")
else:
keyword_match_status = KeywordMatchStatus.KEYWORDS_NOT_FOUND
logger.debug(f"No keywords found in {site.name}")
else:
keyword_match_status = KeywordMatchStatus.NO_KEYWORDS

def build_result(status, **kwargs):
return MaigretCheckResult(
Expand All @@ -732,36 +718,40 @@ def build_result(status, **kwargs):
error=check_error,
context=str(check_error),
)
elif check_type == "message":
# Checks if the error message is in the HTML
is_absence_detected = any(
[(absence_flag in html_text) for absence_flag in site.absence_strs]
)
if not is_absence_detected and is_presense_detected:
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type == "status_code":
# Checks if the status code of the response is 2XX
if 200 <= status_code < 300:
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(MaigretCheckStatus.AVAILABLE)
elif check_type == "response_url":
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will always
# match the request. Instead, we will ensure that the response
# code indicates that the request was successful (i.e. no 404, or
# forward to some odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
result = build_result(MaigretCheckStatus.CLAIMED)
else:
result = build_result(MaigretCheckStatus.AVAILABLE)
else:
# It should be impossible to ever get here...
raise ValueError(
f"Unknown check type '{check_type}' for " f"site '{site.name}'"
)
def _check_message():
# Checks if the error message is in the HTML
is_absence_detected = any(
absence_flag in html_text for absence_flag in site.absence_strs
)
if not is_absence_detected and is_presense_detected:
return MaigretCheckStatus.CLAIMED
return MaigretCheckStatus.AVAILABLE

def _check_status_code():
# Checks if the status code of the response is 2XX
if 200 <= status_code < 300:
return MaigretCheckStatus.CLAIMED
return MaigretCheckStatus.AVAILABLE

def _check_response_url():
# For this detection method, we have turned off the redirect.
# So, there is no need to check the response URL: it will
# always match the request. Instead, we ensure that the
# response code indicates success (no 404 / odd redirect).
if 200 <= status_code < 300 and is_presense_detected:
return MaigretCheckStatus.CLAIMED
return MaigretCheckStatus.AVAILABLE

_checkers = {
"message": _check_message,
"status_code": _check_status_code,
"response_url": _check_response_url,
}
checker_fn = _checkers.get(check_type)
if checker_fn is None:
raise ValueError(f"Unknown check type '{check_type}' for site '{site.name}'")
result = build_result(checker_fn())

extracted_ids_data = {}

Expand Down Expand Up @@ -861,17 +851,23 @@ def make_site_result(
MaigretCheckStatus.ILLEGAL,
error=CheckError("Check is disabled"),
)
results_site["checker"] = checker
return results_site

# current username type could not be applied
elif site.type != options["id_type"]:
if site.type != options["id_type"]:
results_site["status"] = MaigretCheckResult(
username,
site.name,
url,
MaigretCheckStatus.ILLEGAL,
error=CheckError('Unsupported identifier type', f'Want "{site.type}"'),
)
results_site["checker"] = checker
return results_site

# username is not allowed.
elif site.regex_check and re.search(site.regex_check, username) is None:
if site.regex_check and re.search(site.regex_check, username) is None:
results_site["status"] = MaigretCheckResult(
username,
site.name,
Expand All @@ -884,71 +880,62 @@ def make_site_result(
results_site["url_user"] = ""
results_site["http_status"] = ""
results_site["response_text"] = ""
# query_notify.update(results_site["status"])
else:
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = site.url_probe
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username,
)
results_site["checker"] = checker
return results_site

for k, v in site.get_params.items():
url_probe += f"&{k}={v}"

results_site["url_probe"] = url_probe
# URL of user on site (if it exists)
results_site["url_user"] = url
url_probe = site.url_probe
if url_probe is None:
# Probe URL is normal one seen by people out on the web.
url_probe = url
else:
# There is a special URL for probing existence separate
# from where the user profile normally can be found.
url_probe = url_probe.format(
urlMain=site.url_main,
urlSubpath=site.url_subpath,
username=username,
)

if site.request_method:
request_method = site.request_method.lower()
elif site.check_type == "status_code" and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = 'head'
else:
# Either this detect method needs the content associated
# with the GET response, or this specific website will
# not respond properly unless we request the whole page.
request_method = 'get'

payload = None
if site.request_payload:
payload = {}
for k, v in site.request_payload.items():
if isinstance(v, str):
payload[k] = v.format(username=username)
else:
payload[k] = v
for k, v in site.get_params.items():
url_probe += f"&{k}={v}"

results_site["url_probe"] = url_probe

request_method = 'get'
if site.request_method:
request_method = site.request_method.lower()
elif site.check_type == "status_code" and site.request_head_only:
# In most cases when we are detecting by status code,
# it is not necessary to get the entire body: we can
# detect fine with just the HEAD response.
request_method = 'head'

payload = None
if site.request_payload:
payload = {
k: v.format(username=username) if isinstance(v, str) else v
for k, v in site.request_payload.items()
}

if site.check_type == "response_url":
# Site forwards request to a different URL if username not
# found. Disallow the redirect so we can capture the
# http status from the original URL request.
allow_redirects = False
else:
# Allow whatever redirect that the site wants to do.
# The final result of the request will be what is available.
allow_redirects = True

future = checker.prepare(
method=request_method,
url=url_probe,
headers=headers,
allow_redirects=allow_redirects,
timeout=options['timeout'],
payload=payload,
)
# Site forwards request to a different URL if username not found
# (response_url check). Disallow the redirect so we can capture the
# http status from the original URL request. For every other check
# type, allow whatever redirect the site wants to do.
allow_redirects = site.check_type != "response_url"

future = checker.prepare(
method=request_method,
url=url_probe,
headers=headers,
allow_redirects=allow_redirects,
timeout=options['timeout'],
payload=payload,
)

# Store future request object in the results object
results_site["future"] = future
# Store future request object in the results object
results_site["future"] = future

results_site["checker"] = checker

Expand Down
Loading