Refractored sites.py, sherlock.py and notify.py.

pull/1246/head
benni347 2 years ago
parent a3e2f7c214
commit f4673cc13c

@ -7,7 +7,7 @@ from result import QueryStatus
from colorama import Fore, Style, init
class QueryNotify():
class QueryNotify:
"""Query Notify Object.
Base class that describes methods available to notify the results of
@ -15,6 +15,7 @@ class QueryNotify():
It is intended that other classes inherit from this base class and
override the methods to implement specific functionality.
"""
def __init__(self, result=None):
"""Create Query Notify Object.
@ -32,7 +33,7 @@ class QueryNotify():
self.result = result
return
# return
def start(self, message=None):
"""Notify Start.
@ -51,7 +52,7 @@ class QueryNotify():
Nothing.
"""
return
# return
def update(self, result):
"""Notify Update.
@ -70,7 +71,7 @@ class QueryNotify():
self.result = result
return
# return
def finish(self, message=None):
"""Notify Finish.
@ -89,7 +90,7 @@ class QueryNotify():
Nothing.
"""
return
# return
def __str__(self):
"""Convert Object To String.
@ -100,9 +101,7 @@ class QueryNotify():
Return Value:
Nicely formatted string to get information about this object.
"""
result = str(self.result)
return result
return str(self.result)
class QueryNotifyPrint(QueryNotify):
@ -110,6 +109,7 @@ class QueryNotifyPrint(QueryNotify):
Query notify class that prints results.
"""
def __init__(self, result=None, verbose=False, color=True, print_all=False):
"""Create Query Notify Print Object.
@ -155,14 +155,14 @@ class QueryNotifyPrint(QueryNotify):
title = "Checking username"
if self.color:
print(Style.BRIGHT + Fore.GREEN + "[" +
Fore.YELLOW + "*" +
Fore.GREEN + f"] {title}" +
Fore.WHITE + f" {message}" +
Fore.GREEN + " on:")
Fore.YELLOW + "*" +
Fore.GREEN + f"] {title}" +
Fore.WHITE + f" {message}" +
Fore.GREEN + " on:")
else:
print(f"[*] {title} {message} on:")
return
# return
def update(self, result):
"""Notify Update.
@ -179,7 +179,7 @@ class QueryNotifyPrint(QueryNotify):
"""
self.result = result
if self.verbose == False or self.result.query_time is None:
if self.verbose is False or self.result.query_time is None:
response_time_text = ""
else:
response_time_text = f" [{round(self.result.query_time * 1000)} ms]"
@ -202,23 +202,23 @@ class QueryNotifyPrint(QueryNotify):
if self.print_all:
if self.color:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
response_time_text +
Fore.GREEN + f" {self.result.site_name}:" +
Fore.YELLOW + " Not Found!"))
Fore.RED + "-" +
Fore.WHITE + "]" +
response_time_text +
Fore.GREEN + f" {self.result.site_name}:" +
Fore.YELLOW + " Not Found!"))
else:
print(f"[-]{response_time_text} {self.result.site_name}: Not Found!")
elif result.status == QueryStatus.UNKNOWN:
if self.print_all:
if self.color:
print(Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.GREEN + f" {self.result.site_name}:" +
Fore.RED + f" {self.result.context}" +
Fore.YELLOW + f" ")
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.GREEN + f" {self.result.site_name}:" +
Fore.RED + f" {self.result.context}" +
Fore.YELLOW + ' '))
else:
print(f"[-] {self.result.site_name}: {self.result.context} ")
@ -236,8 +236,9 @@ class QueryNotifyPrint(QueryNotify):
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown Query Status '{str(result.status)}' for "
f"site '{self.result.site_name}'")
raise ValueError(
f"Unknown Query Status '{result.status}' for site '{self.result.site_name}'"
)
return
@ -250,6 +251,4 @@ class QueryNotifyPrint(QueryNotify):
Return Value:
Nicely formatted string to get information about this object.
"""
result = str(self.result)
return result
return str(self.result)

@ -22,16 +22,14 @@ from torrequest import TorRequest
from result import QueryStatus
from result import QueryResult
from notify import QueryNotifyPrint
from sites import SitesInformation
from sites import SitesInformation
module_name = "Sherlock: Find Usernames Across Social Networks"
__version__ = "0.14.0"
class SherlockFuturesSession(FuturesSession):
def request(self, method, url, hooks={}, *args, **kwargs):
def request(self, method, url, hooks=None, *args, **kwargs):
"""Request URL.
This extends the FuturesSession request method to calculate a response
@ -53,6 +51,8 @@ class SherlockFuturesSession(FuturesSession):
Request object.
"""
# Record the start time for the request.
if hooks is None:
hooks = {}
start = monotonic()
def response_time(resp, *args, **kwargs):
@ -95,12 +95,11 @@ class SherlockFuturesSession(FuturesSession):
def get_response(request_future, error_type, social_network):
# Default for Response object if some failure occurs.
response = None
error_context = "General Unknown Error"
expection_text = None
exception_text = None
try:
response = request_future.result()
if response.status_code:
@ -108,21 +107,21 @@ def get_response(request_future, error_type, social_network):
error_context = None
except requests.exceptions.HTTPError as errh:
error_context = "HTTP Error"
expection_text = str(errh)
exception_text = str(errh)
except requests.exceptions.ProxyError as errp:
error_context = "Proxy Error"
expection_text = str(errp)
exception_text = str(errp)
except requests.exceptions.ConnectionError as errc:
error_context = "Error Connecting"
expection_text = str(errc)
exception_text = str(errc)
except requests.exceptions.Timeout as errt:
error_context = "Timeout Error"
expection_text = str(errt)
exception_text = str(errt)
except requests.exceptions.RequestException as err:
error_context = "Unknown Error"
expection_text = str(err)
exception_text = str(err)
return response, error_context, expection_text
return response, error_context, exception_text
def interpolate_string(object, username):
@ -190,15 +189,14 @@ def sherlock(username, site_data, query_notify,
# Limit number of workers to 20.
# This is probably vastly overkill.
if len(site_data) >= 20:
max_workers=20
max_workers = 20
else:
max_workers=len(site_data)
max_workers = len(site_data)
# Create multi-threaded session for all requests.
session = SherlockFuturesSession(max_workers=max_workers,
session=underlying_session)
# Results from analysis of all sites
results_total = {}
@ -206,10 +204,9 @@ def sherlock(username, site_data, query_notify,
for social_network, net_info in site_data.items():
# Results from analysis of this specific site
results_site = {}
results_site = {"url_main": net_info.get("urlMain")}
# Record URL of main site
results_site["url_main"] = net_info.get("urlMain")
# A user agent is needed because some sites don't return the correct
# information since they think that we are bots (Which we actually are...)
@ -227,7 +224,7 @@ def sherlock(username, site_data, query_notify,
# Don't make request if username is invalid for the site
regex_check = net_info.get("regexCheck")
if regex_check and re.search(regex_check, username) is None:
# No need to do the check at the site: this user name is not allowed.
# No need to do the check at the site: this username is not allowed.
results_site["status"] = QueryResult(username,
social_network,
url,
@ -254,7 +251,7 @@ def sherlock(username, site_data, query_notify,
elif request_method == "PUT":
request = session.put
else:
raise RuntimeError( f"Unsupported request_method for {url}")
raise RuntimeError(f"Unsupported request_method for {url}")
if request_payload is not None:
request_payload = interpolate_string(request_payload, username)
@ -300,10 +297,10 @@ def sherlock(username, site_data, query_notify,
)
else:
future = request(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
json=request_payload
)
allow_redirects=allow_redirects,
timeout=timeout,
json=request_payload
)
# Store future in data for access later
net_info["request_future"] = future
@ -312,7 +309,7 @@ def sherlock(username, site_data, query_notify,
if unique_tor:
underlying_request.reset_identity()
# Add this site's results into final dictionary with all of the other results.
# Add this site's results into final dictionary with all the other results.
results_total[social_network] = results_site
# Open the file containing account links
@ -334,7 +331,7 @@ def sherlock(username, site_data, query_notify,
# Retrieve future and ensure it has finished
future = net_info["request_future"]
r, error_text, expection_text = get_response(request_future=future,
r, error_text, exception_text = get_response(request_future=future,
error_type=error_type,
social_network=social_network)
@ -365,13 +362,13 @@ def sherlock(username, site_data, query_notify,
# error_flag True denotes no error found in the HTML
# error_flag False denotes error found in the HTML
error_flag = True
errors=net_info.get("errorMsg")
errors = net_info.get("errorMsg")
# errors will hold the error message
# it can be string or list
# by insinstance method we can detect that
# by isinstance method we can detect that
# and handle the case for strings as normal procedure
# and if its list we can iterate the errors
if isinstance(errors,str):
if isinstance(errors, str):
# Checks if the error message is in the HTML
# if error is present we will set flag to False
if errors in r.text:
@ -431,7 +428,6 @@ def sherlock(username, site_data, query_notify,
raise ValueError(f"Unknown Error Type '{error_type}' for "
f"site '{social_network}'")
# Notify caller about results of query.
query_notify.update(result)
@ -477,8 +473,7 @@ def timeout_check(value):
def main():
version_string = f"%(prog)s {__version__}\n" + \
version_string = f"%(prog)s {__version__}\n" + \
f"{requests.__description__}: {requests.__version__}\n" + \
f"Python: {platform.python_version()}"
@ -486,11 +481,11 @@ def main():
description=f"{module_name} (Version {__version__})"
)
parser.add_argument("--version",
action="version", version=version_string,
action="version", version=version_string,
help="Display version information and dependencies."
)
parser.add_argument("--verbose", "-v", "-d", "--debug",
action="store_true", dest="verbose", default=False,
action="store_true", dest="verbose", default=False,
help="Display extra debugging information and metrics."
)
parser.add_argument("--folderoutput", "-fo", dest="folderoutput",
@ -506,7 +501,7 @@ def main():
action="store_true", dest="unique_tor", default=False,
help="Make requests over Tor with new Tor circuit after each request; increases runtime; requires Tor to be installed and in system path.")
parser.add_argument("--csv",
action="store_true", dest="csv", default=False,
action="store_true", dest="csv", default=False,
help="Create Comma-Separated Values (CSV) File."
)
parser.add_argument("--site",
@ -528,15 +523,15 @@ def main():
"Default timeout is infinity. "
"A longer timeout will be more likely to get results from slow sites. "
"On the other hand, this may cause a long delay to gather all results."
)
)
parser.add_argument("--print-all",
action="store_true", dest="print_all",
help="Output sites where the username was not found."
)
)
parser.add_argument("--print-found",
action="store_false", dest="print_all", default=False,
help="Output sites where the username was found."
)
)
parser.add_argument("--no-color",
action="store_true", dest="no_color", default=False,
help="Don't color terminal output"
@ -570,7 +565,6 @@ def main():
except Exception as error:
print(f"A problem occurred while checking for an update: {error}")
# Argument check
# TODO regex check on args.proxy
if args.tor and (args.proxy is not None):
@ -582,7 +576,8 @@ def main():
if args.tor or args.unique_tor:
print("Using Tor to make requests")
print("Warning: some websites might refuse connecting over Tor, so note that using this option might increase connection errors.")
print(
"Warning: some websites might refuse connecting over Tor, so note that using this option might increase connection errors.")
# Check if both output methods are entered as input.
if args.output is not None and args.folderoutput is not None:
@ -594,7 +589,6 @@ def main():
print("You can only use --output with a single username")
sys.exit(1)
# Create object with all information about sites we are aware of.
try:
if args.local:
@ -608,10 +602,7 @@ def main():
# Create original dictionary from SitesInformation() object.
# Eventually, the rest of the code will be updated to use the new object
# directly, but this will glue the two pieces together.
site_data_all = {}
for site in sites:
site_data_all[site.name] = site.information
site_data_all = {site.name: site.information for site in sites}
if args.site_list is None:
# Not desired to look at a sub-set of sites
site_data = site_data_all

@ -1,21 +1,18 @@
"""Sherlock Sites Information Module
This module supports storing information about web sites.
This module supports storing information about websites.
This is the raw data that will be used to search for usernames.
"""
import os
import json
import operator
import requests
import sys
class SiteInformation():
class SiteInformation:
def __init__(self, name, url_home, url_username_format, username_claimed,
username_unclaimed, information):
"""Create Site Information Object.
Contains information about a specific web site.
Contains information about a specific website.
Keyword Arguments:
self -- This object.
@ -30,13 +27,13 @@ class SiteInformation():
indicates that the individual
usernames would show up under the
"https://somesite.com/users/" area of
the web site.
the website.
username_claimed -- String containing username which is known
to be claimed on web site.
to be claimed on website.
username_unclaimed -- String containing username which is known
to be unclaimed on web site.
to be unclaimed on website.
information -- Dictionary containing all known information
about web site.
about website.
NOTE: Custom information about how to
actually detect the existence of the
username will be included in this
@ -49,13 +46,13 @@ class SiteInformation():
Nothing.
"""
self.name = name
self.url_home = url_home
self.name = name
self.url_home = url_home
self.url_username_format = url_username_format
self.username_claimed = username_claimed
self.username_unclaimed = username_unclaimed
self.information = information
self.username_claimed = username_claimed
self.username_unclaimed = username_unclaimed
self.information = information
return
@ -72,11 +69,11 @@ class SiteInformation():
return f"{self.name} ({self.url_home})"
class SitesInformation():
class SitesInformation:
def __init__(self, data_file_path=None):
"""Create Sites Information Object.
Contains information about all supported web sites.
Contains information about all supported websites.
Keyword Arguments:
self -- This object.
@ -109,7 +106,7 @@ class SitesInformation():
if data_file_path is None:
# The default data file is the live data.json which is in the GitHub repo. The reason why we are using
# this instead of the local one is so that the user has the most up to date data. This prevents
# this instead of the local one is so that the user has the most up-to-date data. This prevents
# users from creating issue about false positives which has already been fixed or having outdated data
data_file_path = "https://raw.githubusercontent.com/sherlock-project/sherlock/master/sherlock/resources/data.json"
@ -117,26 +114,29 @@ class SitesInformation():
if not data_file_path.lower().endswith(".json"):
raise FileNotFoundError(f"Incorrect JSON file extension for data file '{data_file_path}'.")
if "http://" == data_file_path[:7].lower() or "https://" == data_file_path[:8].lower():
if (
data_file_path[:7].lower() == "http://"
or data_file_path[:8].lower() == "https://"
):
# Reference is to a URL.
try:
response = requests.get(url=data_file_path)
except Exception as error:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file URL '{data_file_path}': "
f"{str(error)}"
)
if response.status_code == 200:
try:
site_data = response.json()
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
else:
raise FileNotFoundError(
f"Problem while attempting to access data file URL '{data_file_path}': {error}"
)
if response.status_code != 200:
raise FileNotFoundError(f"Bad response while accessing "
f"data file URL '{data_file_path}'."
)
)
try:
site_data = response.json()
except Exception as error:
raise ValueError(
f"Problem parsing json contents at '{data_file_path}': {error}."
)
else:
# Reference is to a file.
try:
@ -144,17 +144,18 @@ class SitesInformation():
try:
site_data = json.load(file)
except Exception as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': {str(error)}."
)
except FileNotFoundError as error:
raise ValueError(
f"Problem parsing json contents at '{data_file_path}': {error}."
)
except FileNotFoundError:
raise FileNotFoundError(f"Problem while attempting to access "
f"data file '{data_file_path}'."
)
)
self.sites = {}
# Add all of site information from the json file to internal site list.
# Add all site information from the json file to internal site list.
for site_name in site_data:
try:
@ -165,12 +166,11 @@ class SitesInformation():
site_data[site_name]["username_claimed"],
site_data[site_name]["username_unclaimed"],
site_data[site_name]
)
)
except KeyError as error:
raise ValueError(f"Problem parsing json contents at "
f"'{data_file_path}': "
f"Missing attribute {str(error)}."
)
raise ValueError(
f"Problem parsing json contents at '{data_file_path}': Missing attribute {error}."
)
return
@ -184,9 +184,7 @@ class SitesInformation():
List of strings containing names of sites.
"""
site_names = sorted([site.name for site in self], key=str.lower)
return site_names
return sorted([site.name for site in self], key=str.lower)
def __iter__(self):
"""Iterator For Object.

@ -3,9 +3,10 @@ This module generates the listing of supported sites
which can be found in sites.md
It also organizes all the sites in alphanumeric order
"""
import json
pool = list()
pool = []
with open("sherlock/resources/data.json", "r", encoding="utf-8") as data_file:
data = json.load(data_file)

Loading…
Cancel
Save