black formatted, fixed interpolated_string, and some function naming

pull/1957/head
sondreal 11 months ago
parent 7ec56895a3
commit b0ec99e959

@ -26,6 +26,7 @@ from result import QueryResult
from notify import QueryNotifyPrint
from sites import SitesInformation
from colorama import init
from argparse import ArgumentTypeError
module_name = "Sherlock: Find Usernames Across Social Networks"
__version__ = "0.14.3"
@ -91,10 +92,9 @@ class SherlockFuturesSession(FuturesSession):
# No response hook was already defined, so install it ourselves.
hooks["response"] = [response_time]
return super(SherlockFuturesSession, self).request(method,
url,
hooks=hooks,
*args, **kwargs)
return super(SherlockFuturesSession, self).request(
method, url, hooks=hooks, *args, **kwargs
)
def get_response(request_future, error_type, social_network):
@ -127,42 +127,43 @@ def get_response(request_future, error_type, social_network):
return response, error_context, exception_text
def interpolate_string(object, username):
"""Insert a string into the string properties of an object recursively."""
def interpolate_string(input_object, username):
if isinstance(input_object, str):
return input_object.replace("{}", username)
elif isinstance(input_object, dict):
return {k: interpolate_string(v, username) for k, v in input_object.items()}
elif isinstance(input_object, list):
return [interpolate_string(i, username) for i in input_object]
return input_object
if isinstance(object, str):
return object.replace("{}", username)
elif isinstance(object, dict):
for key, value in object.items():
object[key] = interpolate_string(value, username)
elif isinstance(object, list):
for i in object:
object[i] = interpolate_string(object[i], username)
return object
def CheckForParameter(username):
'''checks if {?} exists in the username
if exist it means that sherlock is looking for more multiple username'''
return ("{?}" in username)
def check_for_parameter(username):
"""checks if {?} exists in the username
if exist it means that sherlock is looking for more multiple username"""
return "{?}" in username
checksymbols = []
checksymbols = ["_", "-", "."]
def MultipleUsernames(username):
'''replace the parameter with with symbols and return a list of usernames'''
def multiple_usernames(username):
"""replace the parameter with with symbols and return a list of usernames"""
allUsernames = []
for i in checksymbols:
allUsernames.append(username.replace("{?}", i))
return allUsernames
def sherlock(username, site_data, query_notify,
tor=False, unique_tor=False,
proxy=None, timeout=60):
def sherlock(
username,
site_data,
query_notify,
tor=False,
unique_tor=False,
proxy=None,
timeout=60,
):
"""Run Sherlock Analysis.
Checks for existence of username on various social media sites.
@ -214,15 +215,15 @@ def sherlock(username, site_data, query_notify,
max_workers = len(site_data)
# Create multi-threaded session for all requests.
session = SherlockFuturesSession(max_workers=max_workers,
session=underlying_session)
session = SherlockFuturesSession(
max_workers=max_workers, session=underlying_session
)
# Results from analysis of all sites
results_total = {}
# First create futures for all requests. This allows for the requests to run in parallel
for social_network, net_info in site_data.items():
# Results from analysis of this specific site
results_site = {"url_main": net_info.get("urlMain")}
@ -245,10 +246,9 @@ def sherlock(username, site_data, query_notify,
regex_check = net_info.get("regexCheck")
if regex_check and re.search(regex_check, username) is None:
# No need to do the check at the site: this username is not allowed.
results_site["status"] = QueryResult(username,
social_network,
url,
QueryStatus.ILLEGAL)
results_site["status"] = QueryResult(
username, social_network, url, QueryStatus.ILLEGAL
)
results_site["url_user"] = ""
results_site["http_status"] = ""
results_site["response_text"] = ""
@ -309,18 +309,22 @@ def sherlock(username, site_data, query_notify,
# This future starts running the request in a new thread, doesn't block the main thread
if proxy is not None:
proxies = {"http": proxy, "https": proxy}
future = request(url=url_probe, headers=headers,
proxies=proxies,
allow_redirects=allow_redirects,
timeout=timeout,
json=request_payload
)
future = request(
url=url_probe,
headers=headers,
proxies=proxies,
allow_redirects=allow_redirects,
timeout=timeout,
json=request_payload,
)
else:
future = request(url=url_probe, headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
json=request_payload
)
future = request(
url=url_probe,
headers=headers,
allow_redirects=allow_redirects,
timeout=timeout,
json=request_payload,
)
# Store future in data for access later
net_info["request_future"] = future
@ -335,7 +339,6 @@ def sherlock(username, site_data, query_notify,
# Open the file containing account links
# Core logic: If tor requests, make them here. If multi-threaded requests, wait for responses
for social_network, net_info in site_data.items():
# Retrieve results again
results_site = results_total.get(social_network)
@ -352,9 +355,9 @@ def sherlock(username, site_data, query_notify,
# Retrieve future and ensure it has finished
future = net_info["request_future"]
r, error_text, exception_text = get_response(request_future=future,
error_type=error_type,
social_network=social_network)
r, error_text, exception_text = get_response(
request_future=future, error_type=error_type, social_network=social_network
)
# Get response time for response of our request.
try:
@ -424,16 +427,19 @@ def sherlock(username, site_data, query_notify,
query_status = QueryStatus.AVAILABLE
else:
# It should be impossible to ever get here...
raise ValueError(f"Unknown Error Type '{error_type}' for "
f"site '{social_network}'")
raise ValueError(
f"Unknown Error Type '{error_type}' for " f"site '{social_network}'"
)
# Notify caller about results of query.
result = QueryResult(username=username,
site_name=social_network,
site_url_user=url,
status=query_status,
query_time=response_time,
context=error_context)
result = QueryResult(
username=username,
site_name=social_network,
site_url_user=url,
status=query_status,
query_time=response_time,
context=error_context,
)
query_notify.update(result)
# Save status of request
@ -463,16 +469,13 @@ def timeout_check(value):
NOTE: Will raise an exception if the timeout in invalid.
"""
from argparse import ArgumentTypeError
try:
timeout = float(value)
except:
raise ArgumentTypeError(f"Timeout '{value}' must be a number.")
if timeout <= 0:
if value <= 0:
raise ArgumentTypeError(
f"Timeout '{value}' must be greater than 0.0s.")
return timeout
f"Invalid timeout value: {value}. Timeout must be a positive number."
)
return float(value)
def handler(signal_received, frame):
@ -484,86 +487,159 @@ def handler(signal_received, frame):
def main():
version_string = f"%(prog)s {__version__}\n" + \
f"{requests.__description__}: {requests.__version__}\n" + \
f"Python: {platform.python_version()}"
parser = ArgumentParser(formatter_class=RawDescriptionHelpFormatter,
description=f"{module_name} (Version {__version__})"
)
parser.add_argument("--version",
action="version", version=version_string,
help="Display version information and dependencies."
)
parser.add_argument("--verbose", "-v", "-d", "--debug",
action="store_true", dest="verbose", default=False,
help="Display extra debugging information and metrics."
)
parser.add_argument("--folderoutput", "-fo", dest="folderoutput",
help="If using multiple usernames, the output of the results will be saved to this folder."
)
parser.add_argument("--output", "-o", dest="output",
help="If using single username, the output of the result will be saved to this file."
)
parser.add_argument("--tor", "-t",
action="store_true", dest="tor", default=False,
help="Make requests over Tor; increases runtime; requires Tor to be installed and in system path.")
parser.add_argument("--unique-tor", "-u",
action="store_true", dest="unique_tor", default=False,
help="Make requests over Tor with new Tor circuit after each request; increases runtime; requires Tor to be installed and in system path.")
parser.add_argument("--csv",
action="store_true", dest="csv", default=False,
help="Create Comma-Separated Values (CSV) File."
)
parser.add_argument("--xlsx",
action="store_true", dest="xlsx", default=False,
help="Create the standard file for the modern Microsoft Excel spreadsheet (xslx)."
)
parser.add_argument("--site",
action="append", metavar="SITE_NAME",
dest="site_list", default=None,
help="Limit analysis to just the listed sites. Add multiple options to specify more than one site."
)
parser.add_argument("--proxy", "-p", metavar="PROXY_URL",
action="store", dest="proxy", default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080"
)
parser.add_argument("--json", "-j", metavar="JSON_FILE",
dest="json_file", default=None,
help="Load data from a JSON file or an online, valid, JSON file.")
parser.add_argument("--timeout",
action="store", metavar="TIMEOUT",
dest="timeout", type=timeout_check, default=60,
help="Time (in seconds) to wait for response to requests (Default: 60)"
)
parser.add_argument("--print-all",
action="store_true", dest="print_all", default=False,
help="Output sites where the username was not found."
)
parser.add_argument("--print-found",
action="store_true", dest="print_found", default=True,
help="Output sites where the username was found (also if exported as file)."
)
parser.add_argument("--no-color",
action="store_true", dest="no_color", default=False,
help="Don't color terminal output"
)
parser.add_argument("username",
nargs="+", metavar="USERNAMES",
action="store",
help="One or more usernames to check with social networks. Check similar usernames using {%%} (replace to '_', '-', '.')."
)
parser.add_argument("--browse", "-b",
action="store_true", dest="browse", default=False,
help="Browse to all results on default browser.")
parser.add_argument("--local", "-l",
action="store_true", default=False,
help="Force the use of the local data.json file.")
parser.add_argument("--nsfw",
action="store_true", default=False,
help="Include checking of NSFW sites from default list.")
version_string = (
f"%(prog)s {__version__}\n"
+ f"{requests.__description__}: {requests.__version__}\n"
+ f"Python: {platform.python_version()}"
)
parser = ArgumentParser(
formatter_class=RawDescriptionHelpFormatter,
description=f"{module_name} (Version {__version__})",
)
parser.add_argument(
"--version",
action="version",
version=version_string,
help="Display version information and dependencies.",
)
parser.add_argument(
"--verbose",
"-v",
"-d",
"--debug",
action="store_true",
dest="verbose",
default=False,
help="Display extra debugging information and metrics.",
)
parser.add_argument(
"--folderoutput",
"-fo",
dest="folderoutput",
help="If using multiple usernames, the output of the results will be saved to this folder.",
)
parser.add_argument(
"--output",
"-o",
dest="output",
help="If using single username, the output of the result will be saved to this file.",
)
parser.add_argument(
"--tor",
"-t",
action="store_true",
dest="tor",
default=False,
help="Make requests over Tor; increases runtime; requires Tor to be installed and in system path.",
)
parser.add_argument(
"--unique-tor",
"-u",
action="store_true",
dest="unique_tor",
default=False,
help="Make requests over Tor with new Tor circuit after each request; increases runtime; requires Tor to be installed and in system path.",
)
parser.add_argument(
"--csv",
action="store_true",
dest="csv",
default=False,
help="Create Comma-Separated Values (CSV) File.",
)
parser.add_argument(
"--xlsx",
action="store_true",
dest="xlsx",
default=False,
help="Create the standard file for the modern Microsoft Excel spreadsheet (xslx).",
)
parser.add_argument(
"--site",
action="append",
metavar="SITE_NAME",
dest="site_list",
default=None,
help="Limit analysis to just the listed sites. Add multiple options to specify more than one site.",
)
parser.add_argument(
"--proxy",
"-p",
metavar="PROXY_URL",
action="store",
dest="proxy",
default=None,
help="Make requests over a proxy. e.g. socks5://127.0.0.1:1080",
)
parser.add_argument(
"--json",
"-j",
metavar="JSON_FILE",
dest="json_file",
default=None,
help="Load data from a JSON file or an online, valid, JSON file.",
)
parser.add_argument(
"--timeout",
action="store",
metavar="TIMEOUT",
dest="timeout",
type=timeout_check,
default=60,
help="Time (in seconds) to wait for response to requests (Default: 60)",
)
parser.add_argument(
"--print-all",
action="store_true",
dest="print_all",
default=False,
help="Output sites where the username was not found.",
)
parser.add_argument(
"--print-found",
action="store_true",
dest="print_found",
default=True,
help="Output sites where the username was found (also if exported as file).",
)
parser.add_argument(
"--no-color",
action="store_true",
dest="no_color",
default=False,
help="Don't color terminal output",
)
parser.add_argument(
"username",
nargs="+",
metavar="USERNAMES",
action="store",
help="One or more usernames to check with social networks. Check similar usernames using {%%} (replace to '_', '-', '.').",
)
parser.add_argument(
"--browse",
"-b",
action="store_true",
dest="browse",
default=False,
help="Browse to all results on default browser.",
)
parser.add_argument(
"--local",
"-l",
action="store_true",
default=False,
help="Force the use of the local data.json file.",
)
parser.add_argument(
"--nsfw",
action="store_true",
default=False,
help="Include checking of NSFW sites from default list.",
)
args = parser.parse_args()
@ -573,14 +649,17 @@ def main():
# Check for newer version of Sherlock. If it exists, let the user know about it
try:
r = requests.get(
"https://raw.githubusercontent.com/sherlock-project/sherlock/master/sherlock/sherlock.py")
"https://raw.githubusercontent.com/sherlock-project/sherlock/master/sherlock/sherlock.py"
)
remote_version = str(re.findall('__version__ = "(.*)"', r.text)[0])
local_version = __version__
if remote_version != local_version:
print("Update Available!\n" +
f"You are running version {local_version}. Version {remote_version} is available at https://github.com/sherlock-project/sherlock")
print(
"Update Available!\n"
+ f"You are running version {local_version}. Version {remote_version} is available at https://github.com/sherlock-project/sherlock"
)
except Exception as error:
print(f"A problem occurred while checking for an update: {error}")
@ -598,7 +677,8 @@ def main():
print("Using Tor to make requests")
print(
"Warning: some websites might refuse connecting over Tor, so note that using this option might increase connection errors.")
"Warning: some websites might refuse connecting over Tor, so note that using this option might increase connection errors."
)
if args.no_color:
# Disable color output.
@ -620,8 +700,9 @@ def main():
# Create object with all information about sites we are aware of.
try:
if args.local:
sites = SitesInformation(os.path.join(
os.path.dirname(__file__), "resources/data.json"))
sites = SitesInformation(
os.path.join(os.path.dirname(__file__), "resources/data.json")
)
else:
sites = SitesInformation(args.json_file)
except Exception as error:
@ -654,35 +735,34 @@ def main():
site_missing.append(f"'{site}'")
if site_missing:
print(
f"Error: Desired sites not found: {', '.join(site_missing)}.")
print(f"Error: Desired sites not found: {', '.join(site_missing)}.")
if not site_data:
sys.exit(1)
# Create notify object for query results.
query_notify = QueryNotifyPrint(result=None,
verbose=args.verbose,
print_all=args.print_all,
browse=args.browse)
query_notify = QueryNotifyPrint(
result=None, verbose=args.verbose, print_all=args.print_all, browse=args.browse
)
# Run report on all specified users.
all_usernames = []
for username in args.username:
if (CheckForParameter(username)):
for name in MultipleUsernames(username):
if check_for_parameter(username):
for name in multiple_usernames(username):
all_usernames.append(name)
else:
all_usernames.append(username)
for username in all_usernames:
results = sherlock(username,
site_data,
query_notify,
tor=args.tor,
unique_tor=args.unique_tor,
proxy=args.proxy,
timeout=args.timeout)
results = sherlock(
username,
site_data,
query_notify,
tor=args.tor,
unique_tor=args.unique_tor,
proxy=args.proxy,
timeout=args.timeout,
)
if args.output:
result_file = args.output
@ -701,8 +781,7 @@ def main():
if dictionary.get("status").status == QueryStatus.CLAIMED:
exists_counter += 1
file.write(dictionary["url_user"] + "\n")
file.write(
f"Total Websites Username Detected On : {exists_counter}\n")
file.write(f"Total Websites Username Detected On : {exists_counter}\n")
if args.csv:
result_file = f"{username}.csv"
@ -712,33 +791,41 @@ def main():
os.makedirs(args.folderoutput, exist_ok=True)
result_file = os.path.join(args.folderoutput, result_file)
with open(result_file, "w", newline='', encoding="utf-8") as csv_report:
with open(result_file, "w", newline="", encoding="utf-8") as csv_report:
writer = csv.writer(csv_report)
writer.writerow(["username",
"name",
"url_main",
"url_user",
"exists",
"http_status",
"response_time_s"
]
)
writer.writerow(
[
"username",
"name",
"url_main",
"url_user",
"exists",
"http_status",
"response_time_s",
]
)
for site in results:
if args.print_found and not args.print_all and results[site]["status"].status != QueryStatus.CLAIMED:
if (
args.print_found
and not args.print_all
and results[site]["status"].status != QueryStatus.CLAIMED
):
continue
response_time_s = results[site]["status"].query_time
if response_time_s is None:
response_time_s = ""
writer.writerow([username,
site,
results[site]["url_main"],
results[site]["url_user"],
str(results[site]["status"].status),
results[site]["http_status"],
response_time_s
]
)
writer.writerow(
[
username,
site,
results[site]["url_main"],
results[site]["url_user"],
str(results[site]["status"].status),
results[site]["http_status"],
response_time_s,
]
)
if args.xlsx:
usernames = []
names = []
@ -749,7 +836,11 @@ def main():
response_time_s = []
for site in results:
if args.print_found and not args.print_all and results[site]["status"].status != QueryStatus.CLAIMED:
if (
args.print_found
and not args.print_all
and results[site]["status"].status != QueryStatus.CLAIMED
):
continue
if response_time_s is None:
@ -763,8 +854,18 @@ def main():
exists.append(str(results[site]["status"].status))
http_status.append(results[site]["http_status"])
DataFrame = pd.DataFrame({"username": usernames, "name": names, "url_main": url_main, "url_user": url_user, "exists": exists, "http_status": http_status, "response_time_s": response_time_s})
DataFrame.to_excel(f'{username}.xlsx', sheet_name='sheet1', index=False)
DataFrame = pd.DataFrame(
{
"username": usernames,
"name": names,
"url_main": url_main,
"url_user": url_user,
"exists": exists,
"http_status": http_status,
"response_time_s": response_time_s,
}
)
DataFrame.to_excel(f"{username}.xlsx", sheet_name="sheet1", index=False)
print()
query_notify.finish()

@ -1,4 +1,4 @@
import imp
import importlib
import unittest
import sys
sys.path.append('../')
@ -7,9 +7,9 @@ import sherlock as sh
checksymbols = []
checksymbols = ["_", "-", "."]
"""Test for mulriple usernames.
"""Test for multiple usernames.
This test ensures that the function MultipleUsernames works properly. More specific,
This test ensures that the function multiple_usernames works properly. More specific,
different scenarios are tested and only usernames that contain this specific sequence: {?}
should return positive.
@ -23,7 +23,7 @@ class TestMultipleUsernames(unittest.TestCase):
def test_area(self):
test_usernames = ["test{?}test" , "test{?feo" , "test"]
for name in test_usernames:
if(sh.CheckForParameter(name)):
self.assertAlmostEqual(sh.MultipleUsernames(name), ["test_test" , "test-test" , "test.test"])
if(sh.check_for_parameter(name)):
self.assertAlmostEqual(sh.multiple_usernames(name), ["test_test" , "test-test" , "test.test"])
else:
self.assertAlmostEqual(name, name)
Loading…
Cancel
Save