# coding=utf-8
from __future__ import absolute_import
import logging
import os
import io
import time
import urllib . parse
from json . decoder import JSONDecodeError
from zipfile import ZipFile
from guessit import guessit
from requests import Session
from subliminal import Episode , Movie
from subliminal . utils import sanitize
from subliminal_patch . providers import Provider
from subliminal_patch . subtitle import Subtitle , guess_matches
from subliminal_patch . providers . mixins import ProviderSubtitleArchiveMixin
from subzero . language import Language
BASE_URL = " https://argenteam.net/ "
API_URL = BASE_URL + " api/v1/ "
logger = logging . getLogger ( __name__ )
class ArgenteamSubtitle ( Subtitle ) :
provider_name = " argenteam "
hearing_impaired_verifiable = False
def __init__ ( self , language , page_link , download_link , release_info , matches ) :
super ( ArgenteamSubtitle , self ) . __init__ ( language , page_link = page_link )
self . page_link = page_link
self . download_link = download_link
self . found_matches = matches
self . _release_info = release_info
# Original subtitle filename guessed from the URL
self . release_info = urllib . parse . unquote ( self . download_link . split ( " / " ) [ - 1 ] )
@property
def id ( self ) :
return self . download_link
def get_matches ( self , video ) :
type_ = " episode " if isinstance ( video , Episode ) else " movie "
self . found_matches | = guess_matches (
video ,
guessit ( self . release_info , { " type " : type_ } ) ,
)
self . found_matches | = guess_matches (
video ,
guessit ( self . _release_info , { " type " : type_ } ) ,
)
return self . found_matches
class ArgenteamProvider ( Provider , ProviderSubtitleArchiveMixin ) :
provider_name = " argenteam "
# Safe to assume every subtitle from Argenteam is Latam Spanish
languages = { Language ( " spa " , " MX " ) }
video_types = ( Episode , Movie )
subtitle_class = ArgenteamSubtitle
hearing_impaired_verifiable = False
language_list = list ( languages )
multi_result_throttle = 2 # seconds
def __init__ ( self ) :
self . session = None
def initialize ( self ) :
self . session = Session ( )
self . session . headers . update (
{ " User-Agent " : os . environ . get ( " SZ_USER_AGENT " , " Sub-Zero/2 " ) }
)
def terminate ( self ) :
self . session . close ( )
def search_ids ( self , title , * * kwargs ) :
query = title
titles = kwargs . get ( " titles " ) or [ ]
is_episode = False
if kwargs . get ( " season " ) and kwargs . get ( " episode " ) :
is_episode = True
query = f " { title } S { kwargs [ ' season ' ] : 02 } E { kwargs [ ' episode ' ] : 02 } "
logger . debug ( f " Searching ID (episode: { is_episode } ) for { query } " )
r = self . session . get ( API_URL + " search " , params = { " q " : query } , timeout = 10 )
r . raise_for_status ( )
try :
results = r . json ( )
except JSONDecodeError :
return [ ]
if not results . get ( " results " ) :
return [ ]
match_ids = [ ]
for result in results [ " results " ] :
if result [ " type " ] == " movie " and is_episode :
continue
imdb = f " tt { result . get ( ' imdb ' , ' n/a ' ) } "
if not is_episode and imdb == kwargs . get ( " imdb_id " ) :
logger . debug ( " Movie matched by IMDB ID, taking shortcut " )
match_ids = [ result [ " id " ] ]
break
# advanced title check in case of multiple movie results
title_year = kwargs . get ( " year " ) and kwargs . get ( " title " )
if results [ " total " ] > 1 and not is_episode and title_year :
sanitized = sanitize ( result [ " title " ] )
titles = [ f " { sanitize ( name ) } { kwargs [ ' year ' ] } " for name in titles ]
if sanitized not in titles :
continue
match_ids . append ( result [ " id " ] )
if match_ids :
ids = " , " . join ( str ( id ) for id in match_ids )
logger . debug ( " Found matching IDs: %s " , ids )
else :
logger . debug ( " Nothing found from %s query " , query )
return match_ids
def get_query_matches ( self , video , * * kwargs ) :
matches = set ( )
if isinstance ( video , Episode ) and kwargs . get ( " movie_kind " ) == " episode " :
if video . series and (
sanitize ( kwargs . get ( " title " ) )
in (
sanitize ( name ) for name in [ video . series ] + video . alternative_series
)
) :
matches . add ( " series " )
if video . season and kwargs . get ( " season " ) == video . season :
matches . add ( " season " )
if video . episode and kwargs . get ( " episode " ) == video . episode :
matches . add ( " episode " )
if video . tvdb_id and kwargs . get ( " tvdb_id " ) == str ( video . tvdb_id ) :
matches . add ( " tvdb_id " )
# year (year is not available for series, but we assume it matches)
matches . add ( " year " )
elif isinstance ( video , Movie ) and kwargs . get ( " movie_kind " ) == " movie " :
if video . title and (
sanitize ( kwargs . get ( " title " ) )
in ( sanitize ( name ) for name in [ video . title ] + video . alternative_titles )
) :
matches . add ( " title " )
if video . imdb_id and f " tt { kwargs . get ( ' imdb_id ' ) } " == str ( video . imdb_id ) :
matches . add ( " imdb_id " )
if video . year and kwargs . get ( " year " ) == video . year :
matches . add ( " year " )
else :
logger . info ( f " { kwargs . get ( ' movie_kind ' ) } is not a valid movie_kind " )
return matches
def combine_release_info ( self , release_dict ) :
keys = ( " source " , " codec " , " tags " , " team " )
combine = [ release_dict . get ( key ) for key in keys if release_dict . get ( key ) ]
if combine :
return " . " . join ( combine )
return " Unknown "
def query ( self , title , video , titles = None ) :
is_episode = isinstance ( video , Episode )
season = episode = None
url = API_URL + " movie "
if is_episode :
season = video . season
episode = video . episode
url = API_URL + " episode "
argenteam_ids = self . search_ids (
title , season = season , episode = episode , titles = titles
)
else :
argenteam_ids = self . search_ids (
title , year = video . year , imdb_id = video . imdb_id , titles = titles
)
if not argenteam_ids :
return [ ]
language = self . language_list [ 0 ]
subtitles = [ ]
has_multiple_ids = len ( argenteam_ids ) > 1
for aid in argenteam_ids :
response = self . session . get ( url , params = { " id " : aid } , timeout = 10 )
response . raise_for_status ( )
try :
content = response . json ( )
except JSONDecodeError :
continue
if not content or not content . get ( " releases " ) :
continue
imdb_id = year = None
returned_title = title
if not is_episode and " info " in content :
imdb_id = content [ " info " ] . get ( " imdb " )
year = content [ " info " ] . get ( " year " )
returned_title = content [ " info " ] . get ( " title " , title )
for r in content [ " releases " ] :
for s in r [ " subtitles " ] :
movie_kind = " episode " if is_episode else " movie "
page_link = f " { BASE_URL } { movie_kind } / { aid } "
release_info = self . combine_release_info ( r )
download_link = s [ " uri " ] . replace ( " http:// " , " https:// " )
matches_ = self . get_query_matches (
video ,
movie_kind = movie_kind ,
season = season ,
episode = episode ,
title = returned_title ,
year = year ,
imdb_id = imdb_id ,
tvdb_id = content . get ( " tvdb " ) ,
)
subtitles . append (
ArgenteamSubtitle (
language ,
page_link ,
download_link ,
release_info ,
matches_ ,
)
)
if has_multiple_ids :
time . sleep ( self . multi_result_throttle )
return subtitles
def list_subtitles ( self , video , languages ) :
if isinstance ( video , Episode ) :
titles = [ video . series ] + video . alternative_series [ : 2 ]
else :
titles = [ video . title ] + video . alternative_titles [ : 2 ]
for title in titles :
subs = self . query ( title , video , titles = titles )
if subs :
return subs
time . sleep ( self . multi_result_throttle )
return [ ]
def download_subtitle ( self , subtitle ) :
# download as a zip
logger . info ( " Downloading subtitle %r " , subtitle )
r = self . session . get ( subtitle . download_link , timeout = 10 )
r . raise_for_status ( )
# open the zip
with ZipFile ( io . BytesIO ( r . content ) ) as zf :
subtitle . content = self . get_subtitle_from_archive ( subtitle , zf )