feat: expose response times

- Add hook to FuturesSession to calculate absolute response time (ms)
- In verbose mode, display response times for each request
- Always outputs response times to CSV
pull/109/head
Tejasvi Nareddy 6 years ago
parent 7e5bac0cb8
commit 4c3967fd6d

@ -13,11 +13,12 @@ import os
import sys
import platform
import re
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from concurrent.futures import ThreadPoolExecutor
from time import time
import requests
from colorama import Back, Fore, Style, init
from argparse import ArgumentParser, RawDescriptionHelpFormatter
from concurrent.futures import ThreadPoolExecutor
from colorama import Fore, Style, init
from requests_futures.sessions import FuturesSession
from torrequest import TorRequest
@ -28,28 +29,75 @@ amount=0
# TODO: fix tumblr
# Extends FutureSession to add response time metric
# This is taken (almost) directly from here: https://github.com/ross/requests-futures#working-in-the-background
class ElapsedFuturesSession(FuturesSession):
def request(self, method, url, hooks={}, *args, **kwargs):
start = time()
def timing(r, *args, **kwargs):
elapsed_sec = time() - start
r.elapsed = round(elapsed_sec * 1000)
try:
if isinstance(hooks['response'], (list, tuple)):
# needs to be first so we don't time other hooks execution
hooks['response'].insert(0, timing)
else:
hooks['response'] = [timing, hooks['response']]
except KeyError:
hooks['response'] = timing
return super(ElapsedFuturesSession, self).request(method, url, hooks=hooks, *args, **kwargs)
def open_file(fname):
return open(fname, "a")
def write_to_file(url, f):
f.write(url + "\n")
def final_score(amount, f):
f.write("Total: "+str(amount) + "\n")
def print_error(err, errstr, var, debug=False):
def print_error(err, errstr, var, verbose=False):
print(Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.RED + f" {errstr}" +
Fore.YELLOW + f" {err if debug else var}")
Fore.YELLOW + f" {err if verbose else var}")
def create_response_time(response_time, verbose):
return " [{} ms]".format(response_time) if verbose else ""
def print_found(social_network, url, response_time, verbose=False):
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.GREEN + "+" +
Fore.WHITE + "]" +
create_response_time(response_time, verbose) +
Fore.GREEN + " {}:").format(social_network), url)
def print_not_found(social_network, response_time, verbose=False):
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
create_response_time(response_time, verbose) +
Fore.GREEN + " {}:" +
Fore.YELLOW + " Not Found!").format(social_network))
def get_response(request_future, error_type, social_network, verbose=False):
try:
rsp = request_future.result()
if rsp.status_code:
return rsp, error_type
return rsp, error_type, rsp.elapsed
except requests.exceptions.HTTPError as errh:
print_error(errh, "HTTP Error:", social_network, verbose)
except requests.exceptions.ConnectionError as errc:
@ -58,7 +106,7 @@ def get_response(request_future, error_type, social_network, verbose=False):
print_error(errt, "Timeout Error:", social_network, verbose)
except requests.exceptions.RequestException as err:
print_error(err, "Unknown error:", social_network, verbose)
return None, ""
return None, "", -1
def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False):
@ -119,8 +167,8 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False):
underlying_request = TorRequest()
underlying_session = underlying_request.session()
# Create multi-threaded session for all requests
session = FuturesSession(executor=executor, session=underlying_session)
# Create multi-threaded session for all requests. Use our custom FuturesSession that exposes response time
session = ElapsedFuturesSession(executor=executor, session=underlying_session)
# Results from analysis of all sites
results_total = {}
@ -193,7 +241,7 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False):
# Retrieve future and ensure it has finished
future = net_info["request_future"]
r, error_type = get_response(request_future=future,
r, error_type, response_time = get_response(request_future=future,
error_type=error_type,
social_network=social_network,
verbose=verbose)
@ -212,59 +260,35 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False):
error = net_info.get("errorMsg")
# Checks if the error message is in the HTML
if not error in r.text:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.GREEN + "+" +
Fore.WHITE + "]" +
Fore.GREEN + " {}:").format(social_network), url)
print_found(social_network, url, response_time, verbose)
write_to_file(url, f)
exists = "yes"
amount=amount+1
else:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.GREEN + " {}:" +
Fore.YELLOW + " Not Found!").format(social_network))
print_not_found(social_network, response_time, verbose)
exists = "no"
elif error_type == "status_code":
# Checks if the status code of the response is 2XX
if not r.status_code >= 300 or r.status_code < 200:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.GREEN + "+" +
Fore.WHITE + "]" +
Fore.GREEN + " {}:").format(social_network), url)
print_found(social_network, url, response_time, verbose)
write_to_file(url, f)
exists = "yes"
amount=amount+1
else:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.GREEN + " {}:" +
Fore.YELLOW + " Not Found!").format(social_network))
print_not_found(social_network, response_time, verbose)
exists = "no"
elif error_type == "response_url":
error = net_info.get("errorUrl")
# Checks if the redirect url is the same as the one defined in data.json
if not error in r.url:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.GREEN + "+" +
Fore.WHITE + "]" +
Fore.GREEN + " {}:").format(social_network), url)
print_found(social_network, url, response_time, verbose)
write_to_file(url, f)
exists = "yes"
amount=amount+1
else:
print((Style.BRIGHT + Fore.WHITE + "[" +
Fore.RED + "-" +
Fore.WHITE + "]" +
Fore.GREEN + " {}:" +
Fore.YELLOW + " Not Found!").format(social_network))
print_not_found(social_network, response_time, verbose)
exists = "no"
elif error_type == "":
@ -281,6 +305,7 @@ def sherlock(username, site_data, verbose=False, tor=False, unique_tor=False):
# Save results from request
results_site['http_status'] = http_status
results_site['response_text'] = response_text
results_site['response_time_ms'] = response_time
# Add this site's results into final dictionary with all of the other results.
results_total[social_network] = results_site
@ -393,7 +418,8 @@ def main():
'url_main',
'url_user',
'exists',
'http_status'
'http_status',
'response_time_ms'
]
)
for site in results:
@ -402,9 +428,11 @@ def main():
results[site]['url_main'],
results[site]['url_user'],
results[site]['exists'],
results[site]['http_status']
results[site]['http_status'],
results[site]['response_time_ms']
]
)
if __name__ == "__main__":
main()
Loading…
Cancel
Save