Cloudflare improvements (#1448)

* Upgraded cloudscraper to fix multiple issues with providers that uses antibot page.

* Fixed subs4series provider. It now require anti-captcha provider to download subtitles. One captcha will have to be solved for each download. #1442
pull/1451/head
morpheus65535 3 years ago committed by GitHub
parent 058ae489f0
commit cb420628f8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23

@ -144,11 +144,11 @@ export const ProviderList: Readonly<ProviderInfo[]> = [
{
key: "subs4series",
name: "Subs4Series",
description: "Greek Subtitles Provider",
description:
"Greek Subtitles Provider. Require anti-captcha provider to solve on each download.",
},
{
key: "subscene",
description: "Requires Anti-Captcha Provider",
defaultKey: {
username: "",
password: "",
@ -184,7 +184,8 @@ export const ProviderList: Readonly<ProviderInfo[]> = [
{
key: "tusubtitulo",
name: "Tusubtitulo.com",
description: "LATAM Spanish / Spanish / English Subtitles Provider for TV Shows",
description:
"LATAM Spanish / Spanish / English Subtitles Provider for TV Shows",
},
{ key: "tvsubtitles", name: "TVSubtitles" },
{ key: "wizdom", description: "Wizdom.xyz Subtitles Provider." },

@ -1,243 +1,755 @@
# ------------------------------------------------------------------------------- #
import logging
import re
import requests
import sys
import ssl
from copy import deepcopy
from time import sleep
from collections import OrderedDict
from copy import deepcopy
from requests.sessions import Session
from requests.adapters import HTTPAdapter
from requests.packages.urllib3.util.ssl_ import create_urllib3_context
from requests.sessions import Session
from requests_toolbelt.utils import dump
from .interpreters import JavaScriptInterpreter
from .user_agent import User_Agent
from time import sleep
# ------------------------------------------------------------------------------- #
try:
from requests_toolbelt.utils import dump
import brotli
except ImportError:
pass
try:
import brotli
import copyreg
except ImportError:
pass
import copy_reg as copyreg
try:
from HTMLParser import HTMLParser
except ImportError:
if sys.version_info >= (3, 4):
import html
else:
from html.parser import HTMLParser
try:
from urlparse import urlparse
from urlparse import urlunparse
from urlparse import urlparse, urljoin
except ImportError:
from urllib.parse import urlparse
from urllib.parse import urlunparse
from urllib.parse import urlparse, urljoin
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
__version__ = '1.1.9'
from .exceptions import (
CloudflareLoopProtection,
CloudflareCode1020,
CloudflareIUAMError,
CloudflareSolveError,
CloudflareChallengeError,
CloudflareCaptchaError,
CloudflareCaptchaProvider
)
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
from .interpreters import JavaScriptInterpreter
from .captcha import Captcha
from .user_agent import User_Agent
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
__version__ = '1.2.58'
# ------------------------------------------------------------------------------- #
class CipherSuiteAdapter(HTTPAdapter):
def __init__(self, cipherSuite=None, **kwargs):
self.cipherSuite = cipherSuite
__attrs__ = [
'ssl_context',
'max_retries',
'config',
'_pool_connections',
'_pool_maxsize',
'_pool_block',
'source_address'
]
if hasattr(ssl, 'PROTOCOL_TLS'):
self.ssl_context = create_urllib3_context(
ssl_version=getattr(ssl, 'PROTOCOL_TLSv1_3', ssl.PROTOCOL_TLSv1_2),
ciphers=self.cipherSuite
)
else:
self.ssl_context = create_urllib3_context(ssl_version=ssl.PROTOCOL_TLSv1)
def __init__(self, *args, **kwargs):
self.ssl_context = kwargs.pop('ssl_context', None)
self.cipherSuite = kwargs.pop('cipherSuite', None)
self.source_address = kwargs.pop('source_address', None)
if self.source_address:
if isinstance(self.source_address, str):
self.source_address = (self.source_address, 0)
if not isinstance(self.source_address, tuple):
raise TypeError(
"source_address must be IP address string or (ip, port) tuple"
)
if not self.ssl_context:
self.ssl_context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
self.ssl_context.set_ciphers(self.cipherSuite)
self.ssl_context.set_ecdh_curve('prime256v1')
self.ssl_context.options |= (ssl.OP_NO_SSLv2 | ssl.OP_NO_SSLv3 | ssl.OP_NO_TLSv1 | ssl.OP_NO_TLSv1_1)
super(CipherSuiteAdapter, self).__init__(**kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def init_poolmanager(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
kwargs['source_address'] = self.source_address
return super(CipherSuiteAdapter, self).init_poolmanager(*args, **kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def proxy_manager_for(self, *args, **kwargs):
kwargs['ssl_context'] = self.ssl_context
kwargs['source_address'] = self.source_address
return super(CipherSuiteAdapter, self).proxy_manager_for(*args, **kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
class CloudScraper(Session):
def __init__(self, *args, **kwargs):
self.debug = kwargs.pop('debug', False)
self.delay = kwargs.pop('delay', None)
self.interpreter = kwargs.pop('interpreter', 'js2py')
self.allow_brotli = kwargs.pop('allow_brotli', True if 'brotli' in sys.modules.keys() else False)
self.cipherSuite = None
self.cipherSuite = kwargs.pop('cipherSuite', None)
self.ssl_context = kwargs.pop('ssl_context', None)
self.interpreter = kwargs.pop('interpreter', 'native')
self.captcha = kwargs.pop('captcha', {})
self.requestPreHook = kwargs.pop('requestPreHook', None)
self.requestPostHook = kwargs.pop('requestPostHook', None)
self.source_address = kwargs.pop('source_address', None)
self.doubleDown = kwargs.pop('doubleDown', True)
self.allow_brotli = kwargs.pop(
'allow_brotli',
True if 'brotli' in sys.modules.keys() else False
)
self.user_agent = User_Agent(
allow_brotli=self.allow_brotli,
browser=kwargs.pop('browser', None)
)
self._solveDepthCnt = 0
self.solveDepth = kwargs.pop('solveDepth', 3)
super(CloudScraper, self).__init__(*args, **kwargs)
# pylint: disable=E0203
if 'requests' in self.headers['User-Agent']:
# ------------------------------------------------------------------------------- #
# Set a random User-Agent if no custom User-Agent has been set
self.headers = User_Agent(allow_brotli=self.allow_brotli).headers
# ------------------------------------------------------------------------------- #
self.headers = self.user_agent.headers
if not self.cipherSuite:
self.cipherSuite = self.user_agent.cipherSuite
if isinstance(self.cipherSuite, list):
self.cipherSuite = ':'.join(self.cipherSuite)
self.mount(
'https://',
CipherSuiteAdapter(
cipherSuite=self.cipherSuite,
ssl_context=self.ssl_context,
source_address=self.source_address
)
)
self.mount('https://', CipherSuiteAdapter(self.loadCipherSuite()))
# purely to allow us to pickle dump
copyreg.pickle(ssl.SSLContext, lambda obj: (obj.__class__, (obj.protocol,)))
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Allow us to pickle our session back with all variables
# ------------------------------------------------------------------------------- #
@staticmethod
def debugRequest(req):
try:
print(dump.dump_all(req).decode('utf-8'))
except: # noqa
pass
def __getstate__(self):
return self.__dict__
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Allow replacing actual web request call via subclassing
# ------------------------------------------------------------------------------- #
def loadCipherSuite(self):
if self.cipherSuite:
return self.cipherSuite
def perform_request(self, method, url, *args, **kwargs):
return super(CloudScraper, self).request(method, url, *args, **kwargs)
self.cipherSuite = ''
# ------------------------------------------------------------------------------- #
# Raise an Exception with no stacktrace and reset depth counter.
# ------------------------------------------------------------------------------- #
if hasattr(ssl, 'PROTOCOL_TLS'):
ciphers = [
'ECDHE-ECDSA-AES128-GCM-SHA256', 'ECDHE-RSA-AES128-GCM-SHA256', 'ECDHE-ECDSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES256-GCM-SHA384', 'ECDHE-ECDSA-CHACHA20-POLY1305-SHA256', 'ECDHE-RSA-CHACHA20-POLY1305-SHA256',
'ECDHE-RSA-AES128-CBC-SHA', 'ECDHE-RSA-AES256-CBC-SHA', 'RSA-AES128-GCM-SHA256', 'RSA-AES256-GCM-SHA384',
'ECDHE-RSA-AES128-GCM-SHA256', 'RSA-AES256-SHA', '3DES-EDE-CBC'
]
def simpleException(self, exception, msg):
self._solveDepthCnt = 0
sys.tracebacklimit = 0
raise exception(msg)
if hasattr(ssl, 'PROTOCOL_TLSv1_3'):
ciphers.insert(0, ['GREASE_3A', 'GREASE_6A', 'AES128-GCM-SHA256', 'AES256-GCM-SHA256', 'AES256-GCM-SHA384', 'CHACHA20-POLY1305-SHA256'])
# ------------------------------------------------------------------------------- #
# debug the request via the response
# ------------------------------------------------------------------------------- #
ctx = ssl.SSLContext(getattr(ssl, 'PROTOCOL_TLSv1_3', ssl.PROTOCOL_TLSv1_2))
@staticmethod
def debugRequest(req):
try:
print(dump.dump_all(req).decode('utf-8', errors='backslashreplace'))
except ValueError as e:
print(f"Debug Error: {getattr(e, 'message', e)}")
for cipher in ciphers:
try:
ctx.set_ciphers(cipher)
self.cipherSuite = '{}:{}'.format(self.cipherSuite, cipher).rstrip(':')
except ssl.SSLError:
pass
# ------------------------------------------------------------------------------- #
# Unescape / decode html entities
# ------------------------------------------------------------------------------- #
return self.cipherSuite
@staticmethod
def unescape(html_text):
if sys.version_info >= (3, 0):
if sys.version_info >= (3, 4):
return html.unescape(html_text)
##########################################################################################################################################################
return HTMLParser().unescape(html_text)
def request(self, method, url, *args, **kwargs):
ourSuper = super(CloudScraper, self)
resp = ourSuper.request(method, url, *args, **kwargs)
return HTMLParser().unescape(html_text)
if resp.headers.get('Content-Encoding') == 'br':
# ------------------------------------------------------------------------------- #
# Decode Brotli on older versions of urllib3 manually
# ------------------------------------------------------------------------------- #
def decodeBrotli(self, resp):
if requests.packages.urllib3.__version__ < '1.25.1' and resp.headers.get('Content-Encoding') == 'br':
if self.allow_brotli and resp._content:
resp._content = brotli.decompress(resp.content)
else:
logging.warning('Brotli content detected, But option is disabled, we will not continue.')
return resp
logging.warning(
f'You\'re running urllib3 {requests.packages.urllib3.__version__}, Brotli content detected, '
'Which requires manual decompression, '
'But option allow_brotli is set to False, '
'We will not continue to decompress.'
)
return resp
# ------------------------------------------------------------------------------- #
# Our hijacker request function
# ------------------------------------------------------------------------------- #
def request(self, method, url, *args, **kwargs):
# pylint: disable=E0203
if kwargs.get('proxies') and kwargs.get('proxies') != self.proxies:
self.proxies = kwargs.get('proxies')
# ------------------------------------------------------------------------------- #
# Pre-Hook the request via user defined function.
# ------------------------------------------------------------------------------- #
if self.requestPreHook:
(method, url, args, kwargs) = self.requestPreHook(
self,
method,
url,
*args,
**kwargs
)
# ------------------------------------------------------------------------------- #
# Make the request via requests.
# ------------------------------------------------------------------------------- #
response = self.decodeBrotli(
self.perform_request(method, url, *args, **kwargs)
)
# ------------------------------------------------------------------------------- #
# Debug the request via the Response object.
# ------------------------------------------------------------------------------- #
# Debug request
if self.debug:
self.debugRequest(resp)
self.debugRequest(response)
# ------------------------------------------------------------------------------- #
# Post-Hook the request aka Post-Hook the response via user defined function.
# ------------------------------------------------------------------------------- #
if self.requestPostHook:
response = self.requestPostHook(self, response)
if self.debug:
self.debugRequest(response)
# Check if Cloudflare anti-bot is on
if self.isChallengeRequest(resp):
if resp.request.method != 'GET':
# Work around if the initial request is not a GET,
# Supersede with a GET then re-request the original METHOD.
self.request('GET', resp.url)
resp = ourSuper.request(method, url, *args, **kwargs)
else:
# Solve Challenge
resp = self.sendChallengeResponse(resp, **kwargs)
if self.is_Challenge_Request(response):
# ------------------------------------------------------------------------------- #
# Try to solve the challenge and send it back
# ------------------------------------------------------------------------------- #
if self._solveDepthCnt >= self.solveDepth:
_ = self._solveDepthCnt
self.simpleException(
CloudflareLoopProtection,
f"!!Loop Protection!! We have tried to solve {_} time(s) in a row."
)
return resp
self._solveDepthCnt += 1
##########################################################################################################################################################
response = self.Challenge_Response(response, **kwargs)
else:
if not response.is_redirect and response.status_code not in [429, 503]:
self._solveDepthCnt = 0
return response
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare Bot Fight Mode challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def isChallengeRequest(resp):
if resp.headers.get('Server', '').startswith('cloudflare'):
if b'why_captcha' in resp.content or b'/cdn-cgi/l/chk_captcha' in resp.content:
raise ValueError('Captcha')
def is_BFM_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and re.search(
r"\/cdn-cgi\/bm\/cv\/\d+\/api\.js.*?"
r"window\['__CF\$cv\$params'\]\s*=\s*{",
resp.text,
re.M | re.S
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains a valid Cloudflare challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_IUAM_Challenge(resp):
try:
return (
resp.status_code in [429, 503]
and all(s in resp.content for s in [b'jschl_vc', b'jschl_answer'])
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code in [429, 503]
and re.search(
r'<form .*?="challenge-form" action="/.*?__cf_chl_jschl_tk__=\S+"',
resp.text,
re.M | re.S
)
)
except AttributeError:
pass
return False
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# check if the response contains new Cloudflare challenge
# ------------------------------------------------------------------------------- #
def sendChallengeResponse(self, resp, **original_kwargs):
body = resp.text
@staticmethod
def is_New_IUAM_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code in [429, 503]
and re.search(
r'cpo.src\s*=\s*"/cdn-cgi/challenge-platform/\S+orchestrate/jsch/v1',
resp.text,
re.M | re.S
)
and re.search(r'window._cf_chl_enter\s*[\(=]', resp.text, re.M | re.S)
)
except AttributeError:
pass
# Cloudflare requires a delay before solving the challenge
if not self.delay:
try:
delay = float(re.search(r'submit\(\);\r?\n\s*},\s*([0-9]+)', body).group(1)) / float(1000)
if isinstance(delay, (int, float)):
self.delay = delay
except: # noqa
pass
return False
sleep(self.delay)
# ------------------------------------------------------------------------------- #
# check if the response contains a v2 hCaptcha Cloudflare challenge
# ------------------------------------------------------------------------------- #
parsed_url = urlparse(resp.url)
domain = parsed_url.netloc
submit_url = '{}://{}/cdn-cgi/l/chk_jschl'.format(parsed_url.scheme, domain)
@staticmethod
def is_New_Captcha_Challenge(resp):
try:
return (
CloudScraper.is_Captcha_Challenge(resp)
and re.search(
r'cpo.src\s*=\s*"/cdn-cgi/challenge-platform/\S+orchestrate/captcha/v1',
resp.text,
re.M | re.S
)
and re.search(r'\s*id="trk_captcha_js"', resp.text, re.M | re.S)
)
except AttributeError:
pass
cloudflare_kwargs = deepcopy(original_kwargs)
return False
# ------------------------------------------------------------------------------- #
# check if the response contains a Cloudflare hCaptcha challenge
# ------------------------------------------------------------------------------- #
@staticmethod
def is_Captcha_Challenge(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'action="/\S+__cf_chl_captcha_tk__=\S+',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# check if the response contains Firewall 1020 Error
# ------------------------------------------------------------------------------- #
@staticmethod
def is_Firewall_Blocked(resp):
try:
return (
resp.headers.get('Server', '').startswith('cloudflare')
and resp.status_code == 403
and re.search(
r'<span class="cf-error-code">1020</span>',
resp.text,
re.M | re.DOTALL
)
)
except AttributeError:
pass
return False
# ------------------------------------------------------------------------------- #
# Wrapper for is_Captcha_Challenge, is_IUAM_Challenge, is_Firewall_Blocked
# ------------------------------------------------------------------------------- #
def is_Challenge_Request(self, resp):
if self.is_Firewall_Blocked(resp):
self.simpleException(
CloudflareCode1020,
'Cloudflare has blocked this request (Code 1020 Detected).'
)
if self.is_New_Captcha_Challenge(resp):
self.simpleException(
CloudflareChallengeError,
'Detected a Cloudflare version 2 Captcha challenge, This feature is not available in the opensource (free) version.'
)
if self.is_New_IUAM_Challenge(resp):
self.simpleException(
CloudflareChallengeError,
'Detected a Cloudflare version 2 challenge, This feature is not available in the opensource (free) version.'
)
if self.is_Captcha_Challenge(resp) or self.is_IUAM_Challenge(resp):
if self.debug:
print('Detected a Cloudflare version 1 challenge.')
return True
return False
# ------------------------------------------------------------------------------- #
# Try to solve cloudflare javascript challenge.
# ------------------------------------------------------------------------------- #
def IUAM_Challenge_Response(self, body, url, interpreter):
try:
params = OrderedDict()
formPayload = re.search(
r'<form (?P<form>.*?="challenge-form" '
r'action="(?P<challengeUUID>.*?'
r'__cf_chl_jschl_tk__=\S+)"(.*?)</form>)',
body,
re.M | re.DOTALL
).groupdict()
if not all(key in formPayload for key in ['form', 'challengeUUID']):
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
s = re.search(r'name="s"\svalue="(?P<s_value>[^"]+)', body)
if s:
params['s'] = s.group('s_value')
payload = OrderedDict()
for challengeParam in re.findall(r'^\s*<input\s(.*?)/>', formPayload['form'], re.M | re.S):
inputPayload = dict(re.findall(r'(\S+)="(\S+)"', challengeParam))
if inputPayload.get('name') in ['r', 'jschl_vc', 'pass']:
payload.update({inputPayload['name']: inputPayload['value']})
params.update(
[
('jschl_vc', re.search(r'name="jschl_vc" value="(\w+)"', body).group(1)),
('pass', re.search(r'name="pass" value="(.+?)"', body).group(1))
]
except AttributeError:
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM detected, unfortunately we can't extract the parameters correctly."
)
params = cloudflare_kwargs.setdefault('params', params)
hostParsed = urlparse(url)
try:
payload['jschl_answer'] = JavaScriptInterpreter.dynamicImport(
interpreter
).solveChallenge(body, hostParsed.netloc)
except Exception as e:
raise ValueError('Unable to parse Cloudflare anti-bots page: {} {}'.format(e.message, BUG_REPORT))
# Solve the Javascript challenge
params['jschl_answer'] = JavaScriptInterpreter.dynamicImport(self.interpreter).solveChallenge(body, domain)
# Requests transforms any request into a GET after a redirect,
# so the redirect has to be handled manually here to allow for
# performing other types of requests even as the first request.
cloudflare_kwargs['allow_redirects'] = False
redirect = self.request(resp.request.method, submit_url, **cloudflare_kwargs)
redirect_location = urlparse(redirect.headers['Location'])
if not redirect_location.netloc:
redirect_url = urlunparse(
(
parsed_url.scheme,
domain,
redirect_location.path,
redirect_location.params,
redirect_location.query,
redirect_location.fragment
self.simpleException(
CloudflareIUAMError,
f"Unable to parse Cloudflare anti-bots page: {getattr(e, 'message', e)}"
)
return {
'url': f"{hostParsed.scheme}://{hostParsed.netloc}{self.unescape(formPayload['challengeUUID'])}",
'data': payload
}
# ------------------------------------------------------------------------------- #
# Try to solve the Captcha challenge via 3rd party.
# ------------------------------------------------------------------------------- #
def captcha_Challenge_Response(self, provider, provider_params, body, url):
try:
formPayload = re.search(
r'<form (?P<form>.*?="challenge-form" '
r'action="(?P<challengeUUID>.*?__cf_chl_captcha_tk__=\S+)"(.*?)</form>)',
body,
re.M | re.DOTALL
).groupdict()
if not all(key in formPayload for key in ['form', 'challengeUUID']):
self.simpleException(
CloudflareCaptchaError,
"Cloudflare Captcha detected, unfortunately we can't extract the parameters correctly."
)
payload = OrderedDict(
re.findall(
r'(name="r"\svalue|data-ray|data-sitekey|name="cf_captcha_kind"\svalue)="(.*?)"',
formPayload['form']
)
)
return self.request(resp.request.method, redirect_url, **original_kwargs)
return self.request(resp.request.method, redirect.headers['Location'], **original_kwargs)
captchaType = 'reCaptcha' if payload['name="cf_captcha_kind" value'] == 're' else 'hCaptcha'
except (AttributeError, KeyError):
self.simpleException(
CloudflareCaptchaError,
"Cloudflare Captcha detected, unfortunately we can't extract the parameters correctly."
)
# ------------------------------------------------------------------------------- #
# Pass proxy parameter to provider to solve captcha.
# ------------------------------------------------------------------------------- #
if self.proxies and self.proxies != self.captcha.get('proxy'):
self.captcha['proxy'] = self.proxies
# ------------------------------------------------------------------------------- #
# Pass User-Agent if provider supports it to solve captcha.
# ------------------------------------------------------------------------------- #
self.captcha['User-Agent'] = self.headers['User-Agent']
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Submit job to provider to request captcha solve.
# ------------------------------------------------------------------------------- #
captchaResponse = Captcha.dynamicImport(
provider.lower()
).solveCaptcha(
captchaType,
url,
payload['data-sitekey'],
provider_params
)
# ------------------------------------------------------------------------------- #
# Parse and handle the response of solved captcha.
# ------------------------------------------------------------------------------- #
dataPayload = OrderedDict([
('r', payload.get('name="r" value', '')),
('cf_captcha_kind', payload['name="cf_captcha_kind" value']),
('id', payload.get('data-ray')),
('g-recaptcha-response', captchaResponse)
])
if captchaType == 'hCaptcha':
dataPayload.update({'h-captcha-response': captchaResponse})
hostParsed = urlparse(url)
return {
'url': f"{hostParsed.scheme}://{hostParsed.netloc}{self.unescape(formPayload['challengeUUID'])}",
'data': dataPayload
}
# ------------------------------------------------------------------------------- #
# Attempt to handle and send the challenge response back to cloudflare
# ------------------------------------------------------------------------------- #
def Challenge_Response(self, resp, **kwargs):
if self.is_Captcha_Challenge(resp):
# ------------------------------------------------------------------------------- #
# double down on the request as some websites are only checking
# if cfuid is populated before issuing Captcha.
# ------------------------------------------------------------------------------- #
if self.doubleDown:
resp = self.decodeBrotli(
self.perform_request(resp.request.method, resp.url, **kwargs)
)
if not self.is_Captcha_Challenge(resp):
return resp
# ------------------------------------------------------------------------------- #
# if no captcha provider raise a runtime error.
# ------------------------------------------------------------------------------- #
if not self.captcha or not isinstance(self.captcha, dict) or not self.captcha.get('provider'):
self.simpleException(
CloudflareCaptchaProvider,
"Cloudflare Captcha detected, unfortunately you haven't loaded an anti Captcha provider "
"correctly via the 'captcha' parameter."
)
# ------------------------------------------------------------------------------- #
# if provider is return_response, return the response without doing anything.
# ------------------------------------------------------------------------------- #
if self.captcha.get('provider') == 'return_response':
return resp
# ------------------------------------------------------------------------------- #
# Submit request to parser wrapper to solve captcha
# ------------------------------------------------------------------------------- #
submit_url = self.captcha_Challenge_Response(
self.captcha.get('provider'),
self.captcha,
resp.text,
resp.url
)
else:
# ------------------------------------------------------------------------------- #
# Cloudflare requires a delay before solving the challenge
# ------------------------------------------------------------------------------- #
if not self.delay:
try:
delay = float(
re.search(
r'submit\(\);\r?\n\s*},\s*([0-9]+)',
resp.text
).group(1)
) / float(1000)
if isinstance(delay, (int, float)):
self.delay = delay
except (AttributeError, ValueError):
self.simpleException(
CloudflareIUAMError,
"Cloudflare IUAM possibility malformed, issue extracing delay value."
)
sleep(self.delay)
# ------------------------------------------------------------------------------- #
submit_url = self.IUAM_Challenge_Response(
resp.text,
resp.url,
self.interpreter
)
# ------------------------------------------------------------------------------- #
# Send the Challenge Response back to Cloudflare
# ------------------------------------------------------------------------------- #
if submit_url:
def updateAttr(obj, name, newValue):
try:
obj[name].update(newValue)
return obj[name]
except (AttributeError, KeyError):
obj[name] = {}
obj[name].update(newValue)
return obj[name]
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['allow_redirects'] = False
cloudflare_kwargs['data'] = updateAttr(
cloudflare_kwargs,
'data',
submit_url['data']
)
urlParsed = urlparse(resp.url)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{
'Origin': f'{urlParsed.scheme}://{urlParsed.netloc}',
'Referer': resp.url
}
)
challengeSubmitResponse = self.request(
'POST',
submit_url['url'],
**cloudflare_kwargs
)
if challengeSubmitResponse.status_code == 400:
self.simpleException(
CloudflareSolveError,
'Invalid challenge answer detected, Cloudflare broken?'
)
# ------------------------------------------------------------------------------- #
# Return response if Cloudflare is doing content pass through instead of 3xx
# else request with redirect URL also handle protocol scheme change http -> https
# ------------------------------------------------------------------------------- #
if not challengeSubmitResponse.is_redirect:
return challengeSubmitResponse
else:
cloudflare_kwargs = deepcopy(kwargs)
cloudflare_kwargs['headers'] = updateAttr(
cloudflare_kwargs,
'headers',
{'Referer': challengeSubmitResponse.url}
)
if not urlparse(challengeSubmitResponse.headers['Location']).netloc:
redirect_location = urljoin(
challengeSubmitResponse.url,
challengeSubmitResponse.headers['Location']
)
else:
redirect_location = challengeSubmitResponse.headers['Location']
return self.request(
resp.request.method,
redirect_location,
**cloudflare_kwargs
)
# ------------------------------------------------------------------------------- #
# We shouldn't be here...
# Re-request the original query and/or process again....
# ------------------------------------------------------------------------------- #
return self.request(resp.request.method, resp.url, **kwargs)
# ------------------------------------------------------------------------------- #
@classmethod
def create_scraper(cls, sess=None, **kwargs):
@ -247,31 +759,41 @@ class CloudScraper(Session):
scraper = cls(**kwargs)
if sess:
attrs = ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']
for attr in attrs:
for attr in ['auth', 'cert', 'cookies', 'headers', 'hooks', 'params', 'proxies', 'data']:
val = getattr(sess, attr, None)
if val:
setattr(scraper, attr, val)
return scraper
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
# Functions for integrating cloudscraper with other applications and scripts
# ------------------------------------------------------------------------------- #
@classmethod
def get_tokens(cls, url, **kwargs):
scraper = cls.create_scraper(
debug=kwargs.pop('debug', False),
delay=kwargs.pop('delay', None),
interpreter=kwargs.pop('interpreter', 'js2py'),
allow_brotli=kwargs.pop('allow_brotli', True),
**{
field: kwargs.pop(field, None) for field in [
'allow_brotli',
'browser',
'debug',
'delay',
'doubleDown',
'captcha',
'interpreter',
'source_address'
'requestPreHook',
'requestPostHook'
] if field in kwargs
}
)
try:
resp = scraper.get(url, **kwargs)
resp.raise_for_status()
except Exception:
logging.error('"{}" returned an error. Could not collect tokens.'.format(url))
logging.error(f'"{url}" returned an error. Could not collect tokens.')
raise
domain = urlparse(resp.url).netloc
@ -279,11 +801,15 @@ class CloudScraper(Session):
cookie_domain = None
for d in scraper.cookies.list_domains():
if d.startswith('.') and d in ('.{}'.format(domain)):
if d.startswith('.') and d in (f'.{domain}'):
cookie_domain = d
break
else:
raise ValueError('Unable to find Cloudflare cookies. Does the site actually have Cloudflare IUAM ("I\'m Under Attack Mode") enabled?')
cls.simpleException(
CloudflareIUAMError,
"Unable to find Cloudflare cookies. Does the site actually "
"have Cloudflare IUAM (I'm Under Attack Mode) enabled?"
)
return (
{
@ -293,7 +819,7 @@ class CloudScraper(Session):
scraper.headers['User-Agent']
)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
@classmethod
def get_cookie_string(cls, url, **kwargs):
@ -304,7 +830,16 @@ class CloudScraper(Session):
return '; '.join('='.join(pair) for pair in tokens.items()), user_agent
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
if ssl.OPENSSL_VERSION_INFO < (1, 1, 1):
print(
f"DEPRECATION: The OpenSSL being used by this python install ({ssl.OPENSSL_VERSION}) does not meet the minimum supported "
"version (>= OpenSSL 1.1.1) in order to support TLS 1.3 required by Cloudflare, "
"You may encounter an unexpected Captcha or cloudflare 1020 blocks."
)
# ------------------------------------------------------------------------------- #
create_scraper = CloudScraper.create_scraper
get_tokens = CloudScraper.get_tokens

@ -0,0 +1,260 @@
from __future__ import absolute_import
import requests
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from ..exceptions import (
CaptchaServiceUnavailable,
CaptchaAPIError,
CaptchaTimeout,
CaptchaParameter,
CaptchaBadJobID,
CaptchaReportError
)
try:
import polling2
except ImportError:
raise ImportError("Please install the python module 'polling2' via pip")
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
super(captchaSolver, self).__init__('2captcha')
self.host = 'https://2captcha.com'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response, request_type):
if response.status_code in [500, 502]:
raise CaptchaServiceUnavailable(f'2Captcha: Server Side Error {response.status_code}')
errors = {
'in.php': {
"ERROR_WRONG_USER_KEY": "You've provided api_key parameter value is in incorrect format, it should contain 32 symbols.",
"ERROR_KEY_DOES_NOT_EXIST": "The api_key you've provided does not exists.",
"ERROR_ZERO_BALANCE": "You don't have sufficient funds on your account.",
"ERROR_PAGEURL": "pageurl parameter is missing in your request.",
"ERROR_NO_SLOT_AVAILABLE":
"No Slots Available.\nYou can receive this error in two cases:\n"
"1. If you solve ReCaptcha: the queue of your captchas that are not distributed to workers is too long. "
"Queue limit changes dynamically and depends on total amount of captchas awaiting solution and usually it's between 50 and 100 captchas.\n"
"2. If you solve Normal Captcha: your maximum rate for normal captchas is lower than current rate on the server."
"You can change your maximum rate in your account's settings.",
"ERROR_IP_NOT_ALLOWED": "The request is sent from the IP that is not on the list of your allowed IPs.",
"IP_BANNED": "Your IP address is banned due to many frequent attempts to access the server using wrong authorization keys.",
"ERROR_BAD_TOKEN_OR_PAGEURL":
"You can get this error code when sending ReCaptcha V2. "
"That happens if your request contains invalid pair of googlekey and pageurl. "
"The common reason for that is that ReCaptcha is loaded inside an iframe hosted on another domain/subdomain.",
"ERROR_GOOGLEKEY":
"You can get this error code when sending ReCaptcha V2. "
"That means that sitekey value provided in your request is incorrect: it's blank or malformed.",
"MAX_USER_TURN": "You made more than 60 requests within 3 seconds.Your account is banned for 10 seconds. Ban will be lifted automatically."
},
'res.php': {
"ERROR_CAPTCHA_UNSOLVABLE":
"We are unable to solve your captcha - three of our workers were unable solve it "
"or we didn't get an answer within 90 seconds (300 seconds for ReCaptcha V2). "
"We will not charge you for that request.",
"ERROR_WRONG_USER_KEY": "You've provided api_key parameter value in incorrect format, it should contain 32 symbols.",
"ERROR_KEY_DOES_NOT_EXIST": "The api_key you've provided does not exists.",
"ERROR_WRONG_ID_FORMAT": "You've provided captcha ID in wrong format. The ID can contain numbers only.",
"ERROR_WRONG_CAPTCHA_ID": "You've provided incorrect captcha ID.",
"ERROR_BAD_DUPLICATES":
"Error is returned when 100% accuracy feature is enabled. "
"The error means that max numbers of tries is reached but min number of matches not found.",
"REPORT_NOT_RECORDED": "Error is returned to your complain request if you already complained lots of correctly solved captchas.",
"ERROR_IP_ADDRES":
"You can receive this error code when registering a pingback (callback) IP or domain."
"That happes if your request is coming from an IP address that doesn't match the IP address of your pingback IP or domain.",
"ERROR_TOKEN_EXPIRED": "You can receive this error code when sending GeeTest. That error means that challenge value you provided is expired.",
"ERROR_EMPTY_ACTION": "Action parameter is missing or no value is provided for action parameter."
}
}
rPayload = response.json()
if rPayload.get('status') == 0 and rPayload.get('request') in errors.get(request_type):
raise CaptchaAPIError(
f"{rPayload['request']} {errors.get(request_type).get(rPayload['request'])}"
)
# ------------------------------------------------------------------------------- #
def reportJob(self, jobID):
if not jobID:
raise CaptchaBadJobID(
"2Captcha: Error bad job id to request Captcha."
)
def _checkRequest(response):
self.checkErrorStatus(response, 'res.php')
if response.ok and response.json().get('status') == 1:
return response
return None
response = polling2.poll(
lambda: self.session.get(
f'{self.host}/res.php',
params={
'key': self.api_key,
'action': 'reportbad',
'id': jobID,
'json': '1'
},
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return True
else:
raise CaptchaReportError(
"2Captcha: Error - Failed to report bad Captcha solve."
)
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise CaptchaBadJobID("2Captcha: Error bad job id to request Captcha.")
def _checkRequest(response):
self.checkErrorStatus(response, 'res.php')
if response.ok and response.json().get('status') == 1:
return response
return None
response = polling2.poll(
lambda: self.session.get(
f'{self.host}/res.php',
params={
'key': self.api_key,
'action': 'get',
'id': jobID,
'json': '1'
},
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json().get('request')
else:
raise CaptchaTimeout(
"2Captcha: Error failed to solve Captcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
self.checkErrorStatus(response, 'in.php')
if response.ok and response.json().get("status") == 1 and response.json().get('request'):
return response
return None
data = {
'key': self.api_key,
'pageurl': url,
'json': 1,
'soft_id': 2905
}
data.update(
{
'method': 'userrcaptcha',
'googlekey': siteKey
} if captchaType == 'reCaptcha' else {
'method': 'hcaptcha',
'sitekey': siteKey
}
)
if self.proxy:
data.update(
{
'proxy': self.proxy,
'proxytype': self.proxyType
}
)
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/in.php',
data=data,
allow_redirects=False,
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json().get('request')
else:
raise CaptchaBadJobID(
'2Captcha: Error no job id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
jobID = None
if not captchaParams.get('api_key'):
raise CaptchaParameter(
"2Captcha: Missing api_key parameter."
)
self.api_key = captchaParams.get('api_key')
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
hostParsed = urlparse(captchaParams.get('proxy', {}).get('https'))
if not hostParsed.scheme:
raise CaptchaParameter('Cannot parse proxy correctly, bad scheme')
if not hostParsed.netloc:
raise CaptchaParameter('Cannot parse proxy correctly, bad netloc')
self.proxyType = hostParsed.scheme
self.proxy = hostParsed.netloc
else:
self.proxy = None
try:
jobID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(jobID)
except polling2.TimeoutException:
try:
if jobID:
self.reportJob(jobID)
except polling2.TimeoutException:
raise CaptchaTimeout(
f"2Captcha: Captcha solve took to long and also failed reporting the job the job id {jobID}."
)
raise CaptchaTimeout(
f"2Captcha: Captcha solve took to long to execute job id {jobID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()

@ -0,0 +1,212 @@
from __future__ import absolute_import
import re
import requests
try:
import polling
except ImportError:
raise ImportError(
"Please install the python module 'polling' via pip or download it from "
"https://github.com/justiniso/polling/"
)
from ..exceptions import (
reCaptchaServiceUnavailable,
reCaptchaAPIError,
reCaptchaTimeout,
reCaptchaParameter,
reCaptchaBadJobID
)
from . import reCaptcha
class captchaSolver(reCaptcha):
def __init__(self):
super(captchaSolver, self).__init__('9kw')
self.host = 'https://www.9kw.eu/index.cgi'
self.maxtimeout = 180
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
if response.status_code in [500, 502]:
raise reCaptchaServiceUnavailable(
f'9kw: Server Side Error {response.status_code}'
)
error_codes = {
1: 'No API Key available.',
2: 'No API key found.',
3: 'No active API key found.',
4: 'API Key has been disabled by the operator. ',
5: 'No user found.',
6: 'No data found.',
7: 'Found No ID.',
8: 'found No captcha.',
9: 'No image found.',
10: 'Image size not allowed.',
11: 'credit is not sufficient.',
12: 'what was done.',
13: 'No answer contain.',
14: 'Captcha already been answered.',
15: 'Captcha to quickly filed.',
16: 'JD check active.',
17: 'Unknown problem.',
18: 'Found No ID.',
19: 'Incorrect answer.',
20: 'Do not timely filed (Incorrect UserID).',
21: 'Link not allowed.',
22: 'Prohibited submit.',
23: 'Entering prohibited.',
24: 'Too little credit.',
25: 'No entry found.',
26: 'No Conditions accepted.',
27: 'No coupon code found in the database.',
28: 'Already unused voucher code.',
29: 'maxTimeout under 60 seconds.',
30: 'User not found.',
31: 'An account is not yet 24 hours in system.',
32: 'An account does not have the full rights.',
33: 'Plugin needed a update.',
34: 'No HTTPS allowed.',
35: 'No HTTP allowed.',
36: 'Source not allowed.',
37: 'Transfer denied.',
38: 'Incorrect answer without space',
39: 'Incorrect answer with space',
40: 'Incorrect answer with not only numbers',
41: 'Incorrect answer with not only A-Z, a-z',
42: 'Incorrect answer with not only 0-9, A-Z, a-z',
43: 'Incorrect answer with not only [0-9,- ]',
44: 'Incorrect answer with not only [0-9A-Za-z,- ]',
45: 'Incorrect answer with not only coordinates',
46: 'Incorrect answer with not only multiple coordinates',
47: 'Incorrect answer with not only data',
48: 'Incorrect answer with not only rotate number',
49: 'Incorrect answer with not only text',
50: 'Incorrect answer with not only text and too short',
51: 'Incorrect answer with not enough chars',
52: 'Incorrect answer with too many chars',
53: 'Incorrect answer without no or yes',
54: 'Assignment was not found.'
}
if response.text.startswith('{'):
if response.json().get('error'):
raise reCaptchaAPIError(error_codes.get(int(response.json().get('error'))))
else:
error_code = int(re.search(r'^00(?P<error_code>\d+)', response.text).groupdict().get('error_code', 0))
if error_code:
raise reCaptchaAPIError(error_codes.get(error_code))
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise reCaptchaBadJobID(
"9kw: Error bad job id to request reCaptcha against."
)
def _checkRequest(response):
if response.ok and response.json().get('answer') != 'NO DATA':
return response
self.checkErrorStatus(response)
return None
response = polling.poll(
lambda: self.session.get(
self.host,
params={
'apikey': self.api_key,
'action': 'usercaptchacorrectdata',
'id': jobID,
'info': 1,
'json': 1
}
),
check_success=_checkRequest,
step=10,
timeout=(self.maxtimeout + 10)
)
if response:
return response.json().get('answer')
else:
raise reCaptchaTimeout("9kw: Error failed to solve reCaptcha.")
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
if response.ok and response.text.startswith('{') and response.json().get('captchaid'):
return response
self.checkErrorStatus(response)
return None
captchaMap = {
'reCaptcha': 'recaptchav2',
'hCaptcha': 'hcaptcha'
}
response = polling.poll(
lambda: self.session.post(
self.host,
data={
'apikey': self.api_key,
'action': 'usercaptchaupload',
'interactive': 1,
'file-upload-01': siteKey,
'oldsource': captchaMap[captchaType],
'pageurl': url,
'maxtimeout': self.maxtimeout,
'json': 1
},
allow_redirects=False
),
check_success=_checkRequest,
step=5,
timeout=(self.maxtimeout + 10)
)
if response:
return response.json().get('captchaid')
else:
raise reCaptchaBadJobID('9kw: Error no valid job id was returned.')
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, reCaptchaParams):
jobID = None
if not reCaptchaParams.get('api_key'):
raise reCaptchaParameter("9kw: Missing api_key parameter.")
self.api_key = reCaptchaParams.get('api_key')
if reCaptchaParams.get('maxtimeout'):
self.maxtimeout = reCaptchaParams.get('maxtimeout')
if reCaptchaParams.get('proxy'):
self.session.proxies = reCaptchaParams.get('proxies')
try:
jobID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(jobID)
except polling.TimeoutException:
raise reCaptchaTimeout(
f"9kw: reCaptcha solve took to long to execute 'captchaid' {jobID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()

@ -0,0 +1,47 @@
import abc
import logging
import sys
if sys.version_info >= (3, 4):
ABC = abc.ABC # noqa
else:
ABC = abc.ABCMeta('ABC', (), {})
# ------------------------------------------------------------------------------- #
captchaSolvers = {}
# ------------------------------------------------------------------------------- #
class Captcha(ABC):
@abc.abstractmethod
def __init__(self, name):
captchaSolvers[name] = self
# ------------------------------------------------------------------------------- #
@classmethod
def dynamicImport(cls, name):
if name not in captchaSolvers:
try:
__import__(f'{cls.__module__}.{name}')
if not isinstance(captchaSolvers.get(name), Captcha):
raise ImportError('The anti captcha provider was not initialized.')
except ImportError as e:
sys.tracebacklimit = 0
logging.error(f'Unable to load {name} anti captcha provider -> {e}')
raise
return captchaSolvers[name]
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
pass
# ------------------------------------------------------------------------------- #
def solveCaptcha(self, captchaType, url, siteKey, captchaParams):
return self.getCaptchaAnswer(captchaType, url, siteKey, captchaParams)

@ -0,0 +1,109 @@
from __future__ import absolute_import
from ..exceptions import (
CaptchaParameter,
CaptchaTimeout,
CaptchaAPIError
)
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
try:
from python_anticaptcha import (
AnticaptchaClient,
NoCaptchaTaskProxylessTask,
HCaptchaTaskProxyless,
NoCaptchaTask,
HCaptchaTask,
AnticaptchaException
)
except ImportError:
raise ImportError(
"Please install/upgrade the python module 'python_anticaptcha' via "
"pip install python-anticaptcha or https://github.com/ad-m/python-anticaptcha/"
)
import sys
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
if sys.modules['python_anticaptcha'].__version__ < '0.6':
raise ImportError(
"Please upgrade the python module 'python_anticaptcha' via "
"pip install -U python-anticaptcha or https://github.com/ad-m/python-anticaptcha/"
)
super(captchaSolver, self).__init__('anticaptcha')
# ------------------------------------------------------------------------------- #
def parseProxy(self, url, user_agent):
parsed = urlparse(url)
return dict(
proxy_type=parsed.scheme,
proxy_address=parsed.hostname,
proxy_port=parsed.port,
proxy_login=parsed.username,
proxy_password=parsed.password,
user_agent=user_agent
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
if not captchaParams.get('api_key'):
raise CaptchaParameter("anticaptcha: Missing api_key parameter.")
client = AnticaptchaClient(captchaParams.get('api_key'))
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
captchaMap = {
'reCaptcha': NoCaptchaTask,
'hCaptcha': HCaptchaTask
}
proxy = self.parseProxy(
captchaParams.get('proxy', {}).get('https'),
captchaParams.get('User-Agent', '')
)
task = captchaMap[captchaType](
url,
siteKey,
**proxy
)
else:
captchaMap = {
'reCaptcha': NoCaptchaTaskProxylessTask,
'hCaptcha': HCaptchaTaskProxyless
}
task = captchaMap[captchaType](url, siteKey)
if not hasattr(client, 'createTaskSmee'):
raise NotImplementedError(
"Please upgrade 'python_anticaptcha' via pip or download it from "
"https://github.com/ad-m/python-anticaptcha/tree/hcaptcha"
)
job = client.createTaskSmee(task, timeout=180)
try:
job.join(maximum_time=180)
except (AnticaptchaException) as e:
raise CaptchaTimeout(f"{getattr(e, 'message', e)}")
if 'solution' in job._last_result:
return job.get_solution_response()
else:
raise CaptchaAPIError('Job did not return `solution` key in payload.')
# ------------------------------------------------------------------------------- #
captchaSolver()

@ -0,0 +1,190 @@
from __future__ import absolute_import
import requests
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
from ..exceptions import (
CaptchaServiceUnavailable,
CaptchaAPIError,
CaptchaTimeout,
CaptchaParameter,
CaptchaBadJobID
)
try:
import polling2
except ImportError:
raise ImportError("Please install the python module 'polling2' via pip")
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
super(captchaSolver, self).__init__('capmonster')
self.host = 'https://api.capmonster.cloud'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
if response.status_code in [500, 502]:
raise CaptchaServiceUnavailable(
f'CapMonster: Server Side Error {response.status_code}'
)
payload = response.json()
if payload['errorId'] == 1:
if 'errorDescription' in payload:
raise CaptchaAPIError(
payload['errorDescription']
)
else:
raise CaptchaAPIError(payload['errorCode'])
# ------------------------------------------------------------------------------- #
def requestJob(self, taskID):
if not taskID:
raise CaptchaBadJobID(
'CapMonster: Error bad task id to request Captcha.'
)
def _checkRequest(response):
self.checkErrorStatus(response)
if response.ok and response.json()['status'] == 'ready':
return True
return None
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/getTaskResult',
json={
'clientKey': self.clientKey,
'taskId': taskID
},
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json()['solution']['gRecaptchaResponse']
else:
raise CaptchaTimeout(
"CapMonster: Error failed to solve Captcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
self.checkErrorStatus(response)
if response.ok and response.json()['taskId']:
return True
return None
data = {
'clientKey': self.clientKey,
'task': {
'websiteURL': url,
'websiteKey': siteKey,
'softId': 37,
'type': 'NoCaptchaTask' if captchaType == 'reCaptcha' else 'HCaptchaTask'
}
}
if self.proxy:
data['task'].update(self.proxy)
else:
data['task']['type'] = f"{data['task']['type']}Proxyless"
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/createTask',
json=data,
allow_redirects=False,
timeout=30
),
check_success=_checkRequest,
step=5,
timeout=180
)
if response:
return response.json()['taskId']
else:
raise CaptchaBadJobID(
'CapMonster: Error no task id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
taskID = None
if not captchaParams.get('clientKey'):
raise CaptchaParameter(
"CapMonster: Missing clientKey parameter."
)
self.clientKey = captchaParams.get('clientKey')
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
hostParsed = urlparse(captchaParams.get('proxy', {}).get('https'))
if not hostParsed.scheme:
raise CaptchaParameter('Cannot parse proxy correctly, bad scheme')
if not hostParsed.netloc:
raise CaptchaParameter('Cannot parse proxy correctly, bad netloc')
ports = {
'http': 80,
'https': 443
}
self.proxy = {
'proxyType': hostParsed.scheme,
'proxyAddress': hostParsed.hostname,
'proxyPort': hostParsed.port if hostParsed.port else ports[self.proxy['proxyType']],
'proxyLogin': hostParsed.username,
'proxyPassword': hostParsed.password,
}
else:
self.proxy = None
try:
taskID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(taskID)
except polling2.TimeoutException:
try:
if taskID:
self.reportJob(taskID)
except polling2.TimeoutException:
raise CaptchaTimeout(
"CapMonster: Captcha solve took to long and also failed "
f"reporting the task with task id {taskID}."
)
raise CaptchaTimeout(
"CapMonster: Captcha solve took to long to execute "
f"task id {taskID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()

@ -0,0 +1,268 @@
from __future__ import absolute_import
import json
import requests
try:
from urlparse import urlparse
except ImportError:
from urllib.parse import urlparse
try:
import polling2
except ImportError:
raise ImportError("Please install the python module 'polling2' via pip")
from ..exceptions import (
CaptchaServiceUnavailable,
CaptchaTimeout,
CaptchaParameter,
CaptchaBadJobID,
CaptchaReportError
)
from . import Captcha
class captchaSolver(Captcha):
def __init__(self):
super(captchaSolver, self).__init__('deathbycaptcha')
self.host = 'http://api.dbcapi.me/api'
self.session = requests.Session()
# ------------------------------------------------------------------------------- #
@staticmethod
def checkErrorStatus(response):
errors = dict(
[
(400, "DeathByCaptcha: 400 Bad Request"),
(403, "DeathByCaptcha: 403 Forbidden - Invalid credentails or insufficient credits."),
# (500, "DeathByCaptcha: 500 Internal Server Error."),
(503, "DeathByCaptcha: 503 Service Temporarily Unavailable.")
]
)
if response.status_code in errors:
raise CaptchaServiceUnavailable(errors.get(response.status_code))
# ------------------------------------------------------------------------------- #
def login(self, username, password):
self.username = username
self.password = password
def _checkRequest(response):
if response.ok:
if response.json().get('is_banned'):
raise CaptchaServiceUnavailable('DeathByCaptcha: Your account is banned.')
if response.json().get('balanace') == 0:
raise CaptchaServiceUnavailable('DeathByCaptcha: insufficient credits.')
return response
self.checkErrorStatus(response)
return None
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/user',
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password
}
),
check_success=_checkRequest,
step=10,
timeout=120
)
self.debugRequest(response)
# ------------------------------------------------------------------------------- #
def reportJob(self, jobID):
if not jobID:
raise CaptchaBadJobID(
"DeathByCaptcha: Error bad job id to report failed reCaptcha."
)
def _checkRequest(response):
if response.status_code == 200:
return response
self.checkErrorStatus(response)
return None
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/captcha/{jobID}/report',
headers={'Accept': 'application/json'},
data={
'username': self.username,
'password': self.password
}
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return True
else:
raise CaptchaReportError(
"DeathByCaptcha: Error report failed reCaptcha."
)
# ------------------------------------------------------------------------------- #
def requestJob(self, jobID):
if not jobID:
raise CaptchaBadJobID(
"DeathByCaptcha: Error bad job id to request reCaptcha."
)
def _checkRequest(response):
if response.ok and response.json().get('text'):
return response
self.checkErrorStatus(response)
return None
response = polling2.poll(
lambda: self.session.get(
f'{self.host}/captcha/{jobID}',
headers={'Accept': 'application/json'}
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return response.json().get('text')
else:
raise CaptchaTimeout(
"DeathByCaptcha: Error failed to solve reCaptcha."
)
# ------------------------------------------------------------------------------- #
def requestSolve(self, captchaType, url, siteKey):
def _checkRequest(response):
if response.ok and response.json().get("is_correct") and response.json().get('captcha'):
return response
self.checkErrorStatus(response)
return None
data = {
'username': self.username,
'password': self.password,
}
if captchaType == 'reCaptcha':
jPayload = {
'googlekey': siteKey,
'pageurl': url
}
if self.proxy:
jPayload.update({
'proxy': self.proxy,
'proxytype': self.proxyType
})
data.update({
'type': '4',
'token_params': json.dumps(jPayload)
})
else:
jPayload = {
'sitekey': siteKey,
'pageurl': url
}
if self.proxy:
jPayload.update({
'proxy': self.proxy,
'proxytype': self.proxyType
})
data.update({
'type': '7',
'hcaptcha_params': json.dumps(jPayload)
})
response = polling2.poll(
lambda: self.session.post(
f'{self.host}/captcha',
headers={'Accept': 'application/json'},
data=data,
allow_redirects=False
),
check_success=_checkRequest,
step=10,
timeout=180
)
if response:
return response.json().get('captcha')
else:
raise CaptchaBadJobID(
'DeathByCaptcha: Error no job id was returned.'
)
# ------------------------------------------------------------------------------- #
def getCaptchaAnswer(self, captchaType, url, siteKey, captchaParams):
jobID = None
for param in ['username', 'password']:
if not captchaParams.get(param):
raise CaptchaParameter(
f"DeathByCaptcha: Missing '{param}' parameter."
)
setattr(self, param, captchaParams.get(param))
if captchaParams.get('proxy') and not captchaParams.get('no_proxy'):
hostParsed = urlparse(captchaParams.get('proxy', {}).get('https'))
if not hostParsed.scheme:
raise CaptchaParameter('Cannot parse proxy correctly, bad scheme')
if not hostParsed.netloc:
raise CaptchaParameter('Cannot parse proxy correctly, bad netloc')
self.proxyType = hostParsed.scheme.upper()
self.proxy = captchaParams.get('proxy', {}).get('https')
else:
self.proxy = None
try:
jobID = self.requestSolve(captchaType, url, siteKey)
return self.requestJob(jobID)
except polling2.TimeoutException:
try:
if jobID:
self.reportJob(jobID)
except polling2.TimeoutException:
raise CaptchaTimeout(
f"DeathByCaptcha: Captcha solve took to long and also failed reporting the job id {jobID}."
)
raise CaptchaTimeout(
f"DeathByCaptcha: Captcha solve took to long to execute job id {jobID}, aborting."
)
# ------------------------------------------------------------------------------- #
captchaSolver()

@ -0,0 +1,111 @@
# -*- coding: utf-8 -*-
# ------------------------------------------------------------------------------- #
"""
cloudscraper.exceptions
~~~~~~~~~~~~~~~~~~~
This module contains the set of cloudscraper exceptions.
"""
# ------------------------------------------------------------------------------- #
class CloudflareException(Exception):
"""
Base exception class for cloudscraper for Cloudflare
"""
class CloudflareLoopProtection(CloudflareException):
"""
Raise an exception for recursive depth protection
"""
class CloudflareCode1020(CloudflareException):
"""
Raise an exception for Cloudflare code 1020 block
"""
class CloudflareIUAMError(CloudflareException):
"""
Raise an error for problem extracting IUAM paramters
from Cloudflare payload
"""
class CloudflareChallengeError(CloudflareException):
"""
Raise an error when detected new Cloudflare challenge
"""
class CloudflareSolveError(CloudflareException):
"""
Raise an error when issue with solving Cloudflare challenge
"""
class CloudflareCaptchaError(CloudflareException):
"""
Raise an error for problem extracting Captcha paramters
from Cloudflare payload
"""
class CloudflareCaptchaProvider(CloudflareException):
"""
Raise an exception for no Captcha provider loaded for Cloudflare.
"""
# ------------------------------------------------------------------------------- #
class CaptchaException(Exception):
"""
Base exception class for cloudscraper captcha Providers
"""
class CaptchaServiceUnavailable(CaptchaException):
"""
Raise an exception for external services that cannot be reached
"""
class CaptchaAPIError(CaptchaException):
"""
Raise an error for error from API response.
"""
class CaptchaAccountError(CaptchaException):
"""
Raise an error for captcha provider account problem.
"""
class CaptchaTimeout(CaptchaException):
"""
Raise an exception for captcha provider taking too long.
"""
class CaptchaParameter(CaptchaException):
"""
Raise an exception for bad or missing Parameter.
"""
class CaptchaBadJobID(CaptchaException):
"""
Raise an exception for invalid job id.
"""
class CaptchaReportError(CaptchaException):
"""
Raise an error for captcha provider unable to report bad solve.
"""

@ -0,0 +1,72 @@
import json
import platform
import requests
import ssl
import sys
import urllib3
from collections import OrderedDict
from . import __version__ as cloudscraper_version
# ------------------------------------------------------------------------------- #
def getPossibleCiphers():
try:
context = ssl.create_default_context(ssl.Purpose.SERVER_AUTH)
context.set_ciphers('ALL')
return sorted([cipher['name'] for cipher in context.get_ciphers()])
except AttributeError:
return 'get_ciphers() is unsupported'
# ------------------------------------------------------------------------------- #
def _pythonVersion():
interpreter = platform.python_implementation()
interpreter_version = platform.python_version()
if interpreter == 'PyPy':
interpreter_version = \
f'{sys.pypy_version_info.major}.{sys.pypy_version_info.minor}.{sys.pypy_version_info.micro}'
if sys.pypy_version_info.releaselevel != 'final':
interpreter_version = f'{interpreter_version}{sys.pypy_version_info.releaselevel}'
return {
'name': interpreter,
'version': interpreter_version
}
# ------------------------------------------------------------------------------- #
def systemInfo():
try:
platform_info = {
'system': platform.system(),
'release': platform.release(),
}
except IOError:
platform_info = {
'system': 'Unknown',
'release': 'Unknown',
}
return OrderedDict([
('platform', platform_info),
('interpreter', _pythonVersion()),
('cloudscraper', cloudscraper_version),
('requests', requests.__version__),
('urllib3', urllib3.__version__),
('OpenSSL', OrderedDict(
[
('version', ssl.OPENSSL_VERSION),
('ciphers', getPossibleCiphers())
]
))
])
# ------------------------------------------------------------------------------- #
if __name__ == '__main__':
print(json.dumps(systemInfo(), indent=4))

@ -1,27 +1,31 @@
import re
import sys
import logging
import abc
from ..exceptions import CloudflareSolveError
if sys.version_info >= (3, 4):
ABC = abc.ABC # noqa
else:
ABC = abc.ABCMeta('ABC', (), {})
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
interpreters = {}
# ------------------------------------------------------------------------------- #
class JavaScriptInterpreter(ABC):
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def __init__(self, name):
interpreters[name] = self
# ------------------------------------------------------------------------------- #
@classmethod
def dynamicImport(cls, name):
if name not in interpreters:
@ -35,55 +39,18 @@ class JavaScriptInterpreter(ABC):
return interpreters[name]
# ------------------------------------------------------------------------------- #
@abc.abstractmethod
def eval(self, jsEnv, js):
pass
# ------------------------------------------------------------------------------- #
def solveChallenge(self, body, domain):
try:
js = re.search(
r'setTimeout\(function\(\){\s+(var s,t,o,p,b,r,e,a,k,i,n,g,f.+?\r?\n[\s\S]+?a\.value =.+?)\r?\n',
body
).group(1)
return '{0:.10f}'.format(float(self.eval(body, domain)))
except Exception:
raise ValueError('Unable to identify Cloudflare IUAM Javascript on website. {}'.format(BUG_REPORT))
js = re.sub(r'\s{2,}', ' ', js, flags=re.MULTILINE | re.DOTALL).replace('\'; 121\'', '')
js += '\na.value;'
jsEnv = '''
String.prototype.italics=function(str) {{return "<i>" + this + "</i>";}};
var document = {{
createElement: function () {{
return {{ firstChild: {{ href: "https://{domain}/" }} }}
}},
getElementById: function () {{
return {{"innerHTML": "{innerHTML}"}};
}}
}};
'''
try:
innerHTML = re.search(
r'<div(?: [^<>]*)? id="([^<>]*?)">([^<>]*?)</div>',
body,
re.MULTILINE | re.DOTALL
raise CloudflareSolveError(
'Error trying to solve Cloudflare IUAM Javascript, they may have changed their technique.'
)
innerHTML = innerHTML.group(2) if innerHTML else ''
except: # noqa
logging.error('Error extracting Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
try:
result = self.eval(
re.sub(r'\s{2,}', ' ', jsEnv.format(domain=domain, innerHTML=innerHTML), flags=re.MULTILINE | re.DOTALL),
js
)
float(result)
except Exception:
logging.error('Error executing Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
return result

@ -0,0 +1,103 @@
from __future__ import absolute_import
import os
import sys
import ctypes.util
from ctypes import c_void_p, c_size_t, byref, create_string_buffer, CDLL
from . import JavaScriptInterpreter
from .encapsulated import template
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('chakracore')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
chakraCoreLibrary = None
# check current working directory.
for _libraryFile in ['libChakraCore.so', 'libChakraCore.dylib', 'ChakraCore.dll']:
if os.path.isfile(os.path.join(os.getcwd(), _libraryFile)):
chakraCoreLibrary = os.path.join(os.getcwd(), _libraryFile)
continue
if not chakraCoreLibrary:
chakraCoreLibrary = ctypes.util.find_library('ChakraCore')
if not chakraCoreLibrary:
sys.tracebacklimit = 0
raise RuntimeError(
'ChakraCore library not found in current path or any of your system library paths, '
'please download from https://www.github.com/VeNoMouS/cloudscraper/tree/ChakraCore/, '
'or https://github.com/Microsoft/ChakraCore/'
)
try:
chakraCore = CDLL(chakraCoreLibrary)
except OSError:
sys.tracebacklimit = 0
raise RuntimeError('There was an error loading the ChakraCore library {}'.format(chakraCoreLibrary))
if sys.platform != 'win32':
chakraCore.DllMain(0, 1, 0)
chakraCore.DllMain(0, 2, 0)
script = create_string_buffer(template(body, domain).encode('utf-16'))
runtime = c_void_p()
chakraCore.JsCreateRuntime(0, 0, byref(runtime))
context = c_void_p()
chakraCore.JsCreateContext(runtime, byref(context))
chakraCore.JsSetCurrentContext(context)
fname = c_void_p()
chakraCore.JsCreateString(
'iuam-challenge.js',
len('iuam-challenge.js'),
byref(fname)
)
scriptSource = c_void_p()
chakraCore.JsCreateExternalArrayBuffer(
script,
len(script),
0,
0,
byref(scriptSource)
)
jsResult = c_void_p()
chakraCore.JsRun(scriptSource, 0, fname, 0x02, byref(jsResult))
resultJSString = c_void_p()
chakraCore.JsConvertValueToString(jsResult, byref(resultJSString))
stringLength = c_size_t()
chakraCore.JsCopyString(resultJSString, 0, 0, byref(stringLength))
resultSTR = create_string_buffer(stringLength.value + 1)
chakraCore.JsCopyString(
resultJSString,
byref(resultSTR),
stringLength.value + 1,
0
)
chakraCore.JsDisposeRuntime(runtime)
return resultSTR.value
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

@ -0,0 +1,62 @@
import logging
import re
# ------------------------------------------------------------------------------- #
def template(body, domain):
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
try:
js = re.search(
r'setTimeout\(function\(\){\s+(.*?a\.value\s*=\s*\S+toFixed\(10\);)',
body,
re.M | re.S
).group(1)
except Exception:
raise ValueError('Unable to identify Cloudflare IUAM Javascript on website. {}'.format(BUG_REPORT))
jsEnv = '''String.prototype.italics=function(str) {{return "<i>" + this + "</i>";}};
var subVars= {{{subVars}}};
var document = {{
createElement: function () {{
return {{ firstChild: {{ href: "https://{domain}/" }} }}
}},
getElementById: function (str) {{
return {{"innerHTML": subVars[str]}};
}}
}};
'''
try:
js = js.replace(
r"(setInterval(function(){}, 100),t.match(/https?:\/\//)[0]);",
r"t.match(/https?:\/\//)[0];"
)
k = re.search(r" k\s*=\s*'(?P<k>\S+)';", body).group('k')
r = re.compile(r'<div id="{}(?P<id>\d+)">\s*(?P<jsfuck>[^<>]*)</div>'.format(k))
subVars = ''
for m in r.finditer(body):
subVars = '{}\n\t\t{}{}: {},\n'.format(subVars, k, m.group('id'), m.group('jsfuck'))
subVars = subVars[:-2]
except: # noqa
logging.error('Error extracting Cloudflare IUAM Javascript. {}'.format(BUG_REPORT))
raise
return '{}{}'.format(
re.sub(
r'\s{2,}',
' ',
jsEnv.format(
domain=domain,
subVars=subVars
),
re.MULTILINE | re.DOTALL
),
js
)
# ------------------------------------------------------------------------------- #

@ -6,27 +6,39 @@ import base64
from . import JavaScriptInterpreter
from .encapsulated import template
from .jsunfuck import jsunfuck
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('js2py')
def eval(self, jsEnv, js):
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
jsPayload = template(body, domain)
if js2py.eval_js('(+(+!+[]+[+!+[]]+(!![]+[])[!+[]+!+[]+!+[]]+[!+[]+!+[]]+[+[]])+[])[+!+[]]') == '1':
logging.warning('WARNING - Please upgrade your js2py https://github.com/PiotrDabkowski/Js2Py, applying work around for the meantime.')
js = jsunfuck(js)
jsPayload = jsunfuck(jsPayload)
def atob(s):
return base64.b64decode('{}'.format(s)).decode('utf-8')
js2py.disable_pyimport()
context = js2py.EvalJs({'atob': atob})
result = context.eval('{}{}'.format(jsEnv, js))
result = context.eval(jsPayload)
return result
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

@ -0,0 +1,233 @@
from __future__ import absolute_import
import ast
import re
import operator as op
import pyparsing
from ..exceptions import CloudflareSolveError
from . import JavaScriptInterpreter
# ------------------------------------------------------------------------------- #
_OP_MAP = {
ast.Add: op.add,
ast.Sub: op.sub,
ast.Mult: op.mul,
ast.Div: op.truediv,
ast.Invert: op.neg,
}
# ------------------------------------------------------------------------------- #
class Calc(ast.NodeVisitor):
def visit_BinOp(self, node):
return _OP_MAP[type(node.op)](self.visit(node.left), self.visit(node.right))
# ------------------------------------------------------------------------------- #
def visit_Num(self, node):
return node.n
# ------------------------------------------------------------------------------- #
def visit_Expr(self, node):
return self.visit(node.value)
# ------------------------------------------------------------------------------- #
@classmethod
def doMath(cls, expression):
tree = ast.parse(expression)
calc = cls()
return calc.visit(tree.body[0])
# ------------------------------------------------------------------------------- #
class Parentheses(object):
def fix(self, s):
res = []
self.visited = set([s])
self.dfs(s, self.invalid(s), res)
return res
# ------------------------------------------------------------------------------- #
def dfs(self, s, n, res):
if n == 0:
res.append(s)
return
for i in range(len(s)):
if s[i] in ['(', ')']:
s_new = s[:i] + s[i + 1:]
if s_new not in self.visited and self.invalid(s_new) < n:
self.visited.add(s_new)
self.dfs(s_new, self.invalid(s_new), res)
# ------------------------------------------------------------------------------- #
def invalid(self, s):
plus = minus = 0
memo = {"(": 1, ")": -1}
for c in s:
plus += memo.get(c, 0)
minus += 1 if plus < 0 else 0
plus = max(0, plus)
return plus + minus
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('native')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
operators = {
'+': op.add,
'-': op.sub,
'*': op.mul,
'/': op.truediv
}
# ------------------------------------------------------------------------------- #
def flatten(lists):
return sum(map(flatten, lists), []) if isinstance(lists, list) else [lists]
# ------------------------------------------------------------------------------- #
def jsfuckToNumber(jsFuck):
# "Clean Up" JSFuck
jsFuck = jsFuck.replace('!+[]', '1').replace('!![]', '1').replace('[]', '0')
jsFuck = jsFuck.lstrip('+').replace('(+', '(').replace(' ', '')
jsFuck = Parentheses().fix(jsFuck)[0]
# Hackery Parser for Math
stack = []
bstack = []
for i in flatten(pyparsing.nestedExpr().parseString(jsFuck).asList()):
if i == '+':
stack.append(bstack)
bstack = []
continue
bstack.append(i)
stack.append(bstack)
return int(''.join([str(Calc.doMath(''.join(i))) for i in stack]))
# ------------------------------------------------------------------------------- #
def divisorMath(payload, needle, domain):
jsfuckMath = payload.split('/')
if needle in jsfuckMath[1]:
expression = re.findall(r"^(.*?)(.)\(function", jsfuckMath[1])[0]
expression_value = operators[expression[1]](
float(jsfuckToNumber(expression[0])),
float(ord(domain[jsfuckToNumber(jsfuckMath[1][
jsfuckMath[1].find('"("+p+")")}') + len('"("+p+")")}'):-2
])]))
)
else:
expression_value = jsfuckToNumber(jsfuckMath[1])
expression_value = jsfuckToNumber(jsfuckMath[0]) / float(expression_value)
return expression_value
# ------------------------------------------------------------------------------- #
def challengeSolve(body, domain):
jschl_answer = 0
try:
jsfuckChallenge = re.search(
r"setTimeout\(function\(\){\s+var.*?f,\s*(?P<variable>\w+).*?:(?P<init>\S+)};"
r".*?\('challenge-form'\);.*?;(?P<challenge>.*?a\.value)\s*=\s*\S+\.toFixed\(10\);",
body,
re.DOTALL | re.MULTILINE
).groupdict()
except AttributeError:
raise CloudflareSolveError('There was an issue extracting "jsfuckChallenge" from the Cloudflare challenge.')
kJSFUCK = re.search(r'(;|)\s*k.=(?P<kJSFUCK>\S+);', jsfuckChallenge['challenge'], re.S | re.M)
if kJSFUCK:
try:
kJSFUCK = jsfuckToNumber(kJSFUCK.group('kJSFUCK'))
except IndexError:
raise CloudflareSolveError('There was an issue extracting "kJSFUCK" from the Cloudflare challenge.')
try:
kID = re.search(r"\s*k\s*=\s*'(?P<kID>\S+)';", body).group('kID')
except IndexError:
raise CloudflareSolveError('There was an issue extracting "kID" from the Cloudflare challenge.')
try:
r = re.compile(r'<div id="{}(?P<id>\d+)">\s*(?P<jsfuck>[^<>]*)</div>'.format(kID))
kValues = {}
for m in r.finditer(body):
kValues[int(m.group('id'))] = m.group('jsfuck')
jsfuckChallenge['k'] = kValues[kJSFUCK]
except (AttributeError, IndexError):
raise CloudflareSolveError('There was an issue extracting "kValues" from the Cloudflare challenge.')
jsfuckChallenge['challenge'] = re.finditer(
r'{}.*?([+\-*/])=(.*?);(?=a\.value|{})'.format(
jsfuckChallenge['variable'],
jsfuckChallenge['variable']
),
jsfuckChallenge['challenge']
)
# ------------------------------------------------------------------------------- #
if '/' in jsfuckChallenge['init']:
val = jsfuckChallenge['init'].split('/')
jschl_answer = jsfuckToNumber(val[0]) / float(jsfuckToNumber(val[1]))
else:
jschl_answer = jsfuckToNumber(jsfuckChallenge['init'])
# ------------------------------------------------------------------------------- #
for expressionMatch in jsfuckChallenge['challenge']:
oper, expression = expressionMatch.groups()
if '/' in expression:
expression_value = divisorMath(expression, 'function(p)', domain)
else:
if 'Element' in expression:
expression_value = divisorMath(jsfuckChallenge['k'], '"("+p+")")}', domain)
else:
expression_value = jsfuckToNumber(expression)
jschl_answer = operators[oper](jschl_answer, expression_value)
# ------------------------------------------------------------------------------- #
# if not jsfuckChallenge['k'] and '+ t.length' in body:
# jschl_answer += len(domain)
# ------------------------------------------------------------------------------- #
return '{0:.10f}'.format(jschl_answer)
# ------------------------------------------------------------------------------- #
return challengeSolve(body, domain)
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

@ -1,22 +1,23 @@
import base64
import logging
import subprocess
import sys
from . import JavaScriptInterpreter
from .encapsulated import template
##########################################################################################################################################################
BUG_REPORT = 'Cloudflare may have changed their technique, or there may be a bug in the script.'
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
# ------------------------------------------------------------------------------- #
def __init__(self):
super(ChallengeInterpreter, self).__init__('nodejs')
def eval(self, jsEnv, js):
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
try:
js = 'var atob = function(str) {return Buffer.from(str, "base64").toString("binary");};' \
'var challenge = atob("%s");' \
@ -24,23 +25,25 @@ class ChallengeInterpreter(JavaScriptInterpreter):
'var options = {filename: "iuam-challenge.js", timeout: 4000};' \
'var answer = require("vm").runInNewContext(challenge, context, options);' \
'process.stdout.write(String(answer));' \
% base64.b64encode('{}{}'.format(jsEnv, js).encode('UTF-8')).decode('ascii')
% base64.b64encode(template(body, domain).encode('UTF-8')).decode('ascii')
return subprocess.check_output(['node', '-e', js])
except OSError as e:
if e.errno == 2:
raise EnvironmentError(
'Missing Node.js runtime. Node is required and must be in the PATH (check with `node -v`). Your Node binary may be called `nodejs` rather than `node`, '
'in which case you may need to run `apt-get install nodejs-legacy` on some Debian-based systems. (Please read the cloudscraper'
' README\'s Dependencies section: https://github.com/VeNoMouS/cloudscraper#dependencies.'
'Missing Node.js runtime. Node is required and must be in the PATH (check with `node -v`).\n\n'
'Your Node binary may be called `nodejs` rather than `node`, '
'in which case you may need to run `apt-get install nodejs-legacy` on some Debian-based systems.\n\n'
'(Please read the cloudscraper README\'s Dependencies section: '
'https://github.com/VeNoMouS/cloudscraper#dependencies.)'
)
raise
except Exception:
logging.error('Error executing Cloudflare IUAM Javascript. %s' % BUG_REPORT)
raise
sys.tracebacklimit = 0
raise RuntimeError('Error executing Cloudflare IUAM Javascript in nodejs')
pass
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

@ -0,0 +1,33 @@
from __future__ import absolute_import
import sys
try:
import v8eval
except ImportError:
sys.tracebacklimit = 0
raise RuntimeError('Please install the python module v8eval either via pip or download it from https://github.com/sony/v8eval')
from . import JavaScriptInterpreter
from .encapsulated import template
# ------------------------------------------------------------------------------- #
class ChallengeInterpreter(JavaScriptInterpreter):
def __init__(self):
super(ChallengeInterpreter, self).__init__('v8')
# ------------------------------------------------------------------------------- #
def eval(self, body, domain):
try:
return v8eval.V8().eval(template(body, domain))
except (TypeError, v8eval.V8Error):
RuntimeError('We encountered an error running the V8 Engine.')
# ------------------------------------------------------------------------------- #
ChallengeInterpreter()

@ -1,38 +1,124 @@
import os
import json
import os
import random
import logging
import re
import sys
import ssl
from collections import OrderedDict
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
class User_Agent():
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def __init__(self, *args, **kwargs):
self.headers = None
self.cipherSuite = []
self.loadUserAgent(*args, **kwargs)
##########################################################################################################################################################
# ------------------------------------------------------------------------------- #
def filterAgents(self, user_agents):
filtered = {}
if self.mobile:
if self.platform in user_agents['mobile'] and user_agents['mobile'][self.platform]:
filtered.update(user_agents['mobile'][self.platform])
if self.desktop:
if self.platform in user_agents['desktop'] and user_agents['desktop'][self.platform]:
filtered.update(user_agents['desktop'][self.platform])
return filtered
# ------------------------------------------------------------------------------- #
def tryMatchCustom(self, user_agents):
for device_type in user_agents['user_agents']:
for platform in user_agents['user_agents'][device_type]:
for browser in user_agents['user_agents'][device_type][platform]:
if re.search(re.escape(self.custom), ' '.join(user_agents['user_agents'][device_type][platform][browser])):
self.headers = user_agents['headers'][browser]
self.headers['User-Agent'] = self.custom
self.cipherSuite = user_agents['cipherSuite'][browser]
return True
return False
# ------------------------------------------------------------------------------- #
def loadUserAgent(self, *args, **kwargs):
browser = kwargs.pop('browser', 'chrome')
self.browser = kwargs.pop('browser', None)
self.platforms = ['linux', 'windows', 'darwin', 'android', 'ios']
self.browsers = ['chrome', 'firefox']
if isinstance(self.browser, dict):
self.custom = self.browser.get('custom', None)
self.platform = self.browser.get('platform', None)
self.desktop = self.browser.get('desktop', True)
self.mobile = self.browser.get('mobile', True)
self.browser = self.browser.get('browser', None)
else:
self.custom = kwargs.pop('custom', None)
self.platform = kwargs.pop('platform', None)
self.desktop = kwargs.pop('desktop', True)
self.mobile = kwargs.pop('mobile', True)
if not self.desktop and not self.mobile:
sys.tracebacklimit = 0
raise RuntimeError("Sorry you can't have mobile and desktop disabled at the same time.")
with open(os.path.join(os.path.dirname(__file__), 'browsers.json'), 'r') as fp:
user_agents = json.load(
fp,
object_pairs_hook=OrderedDict
)
if self.custom:
if not self.tryMatchCustom(user_agents):
self.cipherSuite = [
ssl._DEFAULT_CIPHERS,
'!AES128-SHA',
'!ECDHE-RSA-AES256-SHA',
]
self.headers = OrderedDict([
('User-Agent', self.custom),
('Accept', 'text/html,application/xhtml+xml,application/xml;q=0.9,image/webp,image/apng,*/*;q=0.8'),
('Accept-Language', 'en-US,en;q=0.9'),
('Accept-Encoding', 'gzip, deflate, br')
])
else:
if self.browser and self.browser not in self.browsers:
sys.tracebacklimit = 0
raise RuntimeError(f'Sorry "{self.browser}" browser is not valid, valid browsers are [{", ".join(self.browsers)}].')
if not self.platform:
self.platform = random.SystemRandom().choice(self.platforms)
if self.platform not in self.platforms:
sys.tracebacklimit = 0
raise RuntimeError(f'Sorry the platform "{self.platform}" is not valid, valid platforms are [{", ".join(self.platforms)}]')
filteredAgents = self.filterAgents(user_agents['user_agents'])
with open(os.path.join(os.path.dirname(__file__), 'browsers.json'), 'r') as file:
user_agents = json.load(file, object_pairs_hook=OrderedDict)
if not self.browser:
# has to be at least one in there...
while not filteredAgents.get(self.browser):
self.browser = random.SystemRandom().choice(list(filteredAgents.keys()))
if not user_agents.get(browser):
logging.error('Sorry "{}" browser User-Agent was not found.'.format(browser))
raise
if not filteredAgents[self.browser]:
sys.tracebacklimit = 0
raise RuntimeError(f'Sorry "{self.browser}" browser was not found with a platform of "{self.platform}".')
user_agent = random.choice(user_agents.get(browser))
self.cipherSuite = user_agents['cipherSuite'][self.browser]
self.headers = user_agents['headers'][self.browser]
self.headers = user_agent.get('headers')
self.headers['User-Agent'] = random.choice(user_agent.get('User-Agent'))
self.headers['User-Agent'] = random.SystemRandom().choice(filteredAgents[self.browser])
if not kwargs.get('allow_brotli', False):
if 'br' in self.headers['Accept-Encoding']:
self.headers['Accept-Encoding'] = ','.join([encoding for encoding in self.headers['Accept-Encoding'].split(',') if encoding.strip() != 'br']).strip()
if not kwargs.get('allow_brotli', False) and 'br' in self.headers['Accept-Encoding']:
self.headers['Accept-Encoding'] = ','.join([
encoding for encoding in self.headers['Accept-Encoding'].split(',') if encoding.strip() != 'br'
]).strip()

File diff suppressed because it is too large Load Diff

File diff suppressed because it is too large Load Diff

@ -92,7 +92,7 @@ class CFSession(CloudScraper):
# Check if Cloudflare anti-bot is on
try:
if self.isChallengeRequest(resp):
if self.is_Challenge_Request(resp):
if resp.request.method != 'GET':
# Work around if the initial request is not a GET,
# Supersede with a GET then re-request the original METHOD.

@ -7,17 +7,20 @@ import os
import rarfile
import re
import zipfile
import cloudscraper
from subzero.language import Language
from guessit import guessit
from requests import Session
from subliminal.providers import ParserBeautifulSoup, Provider
from subliminal.cache import SHOW_EXPIRATION_TIME, region
from dogpile.cache.api import NO_VALUE
from subliminal.score import get_equivalent_release_groups
from subliminal.subtitle import SUBTITLE_EXTENSIONS, Subtitle, fix_line_ending
from subliminal.utils import sanitize, sanitize_release_group
from subliminal.video import Episode
from subliminal_patch.http import RetryingCFSession
from subliminal_patch.pitcher import pitchers, load_verification, store_verification
from subliminal_patch.subtitle import guess_matches
logger = logging.getLogger(__name__)
@ -81,9 +84,10 @@ class Subs4SeriesProvider(Provider):
def __init__(self):
self.session = None
self.captcha_session = None
def initialize(self):
self.session = cloudscraper.create_scraper(debug=False)
self.session = RetryingCFSession()
self.session.headers['User-Agent'] = 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, ' \
'like Gecko) Chrome/83.0.4103.116 Safari/537.36'
# We don't use FIRST_THOUSAND_OR_SO_USER_AGENTS list because it includes mobile browser that get redirected to
@ -204,12 +208,41 @@ class Subs4SeriesProvider(Provider):
if isinstance(subtitle, Subs4SeriesSubtitle):
# download the subtitle
logger.info('Downloading subtitle %r', subtitle)
r = self.session.get(subtitle.download_link, headers={'Referer': subtitle.page_link}, timeout=10)
r.raise_for_status()
if not r.content:
logger.debug('Unable to download subtitle. No data returned from provider')
return
data = {"my_recaptcha_challenge_field": "manual_challenge"}
tries = 0
while tries <= 3:
tries += 1
r = self.session.get(subtitle.download_link, headers={'Referer': subtitle.page_link}, timeout=10)
if "g-recaptcha" in r.text or "grecaptcha" in r.text:
logger.info('Subs4series: Solving captcha. This might take a couple of minutes, but should only '
'happen once every so often')
for g, s in (("g-recaptcha-response", r'g-recaptcha.+?data-sitekey=\"(.+?)\"'),
("recaptcha_response", r'grecaptcha.execute\(\'(.+?)\',')):
site_key = re.search(s, r.text).group(1)
if site_key:
break
if not site_key:
logger.error("Subs4series: Captcha site-key not found!")
return
pitcher = pitchers.get_pitcher()("Subs4series", subtitle.download_link, site_key,
user_agent=self.session.headers["User-Agent"],
cookies=self.session.cookies.get_dict(),
headers={'Referer': subtitle.page_link},
is_invisible=True)
result = pitcher.throw()
if not result:
if tries >= 3:
raise Exception("Subs4series: Couldn't solve captcha!")
logger.info("Subs4series: Couldn't solve captcha! Retrying")
continue
else:
data['g-recaptcha-response'] = result
logger.info("Subs4series: Captcha solved. Trying to download subtitles...")
break
soup = ParserBeautifulSoup(r.content, ['lxml', 'html.parser'])
download_element = soup.select_one('a.style55ws')
@ -226,8 +259,10 @@ class Subs4SeriesProvider(Provider):
self.apply_anti_block(subtitle)
download_url = self.server_url + target
r = self.session.get(download_url, headers={'Referer': subtitle.download_link}, timeout=10)
r.raise_for_status()
r = self.session.post(download_url, data, headers={'Referer': subtitle.download_link},
allow_redirects=True, timeout=10)
if r.status_code == 403:
raise Exception("Subs4series: captcha expired waiting to be solved.")
if not r.content:
logger.debug('Unable to download subtitle. No data returned from provider')

@ -7,6 +7,7 @@ bidict=0.18.4
bottle-fdsend=0.1.1
bottle=0.12.13
chardet=3.0.4
cloudscraper=1.2.58
dogpile.cache=0.6.5
engineio=4.0.2dev
enzyme=0.4.1
@ -24,6 +25,7 @@ peewee=3.14.4
py-pretty=1
pycountry=18.2.23
pyga=2.6.1
pyparsing=2.4.7
pysrt=1.1.1
pytz=2021.1
rarfile=3.0

Loading…
Cancel
Save