|
|
|
@ -1,5 +1,7 @@
|
|
|
|
|
from __future__ import absolute_import
|
|
|
|
|
import logging
|
|
|
|
|
import time
|
|
|
|
|
from datetime import timedelta
|
|
|
|
|
|
|
|
|
|
from requests import Session
|
|
|
|
|
|
|
|
|
@ -122,6 +124,13 @@ whisper_languages = {
|
|
|
|
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
|
|
|
|
def set_log_level(newLevel="INFO"):
|
|
|
|
|
newLevel = newLevel.upper()
|
|
|
|
|
# print(f'WhisperAI log level changing from {logging._levelToName[logger.getEffectiveLevel()]} to {newLevel}')
|
|
|
|
|
logger.setLevel(getattr(logging, newLevel))
|
|
|
|
|
|
|
|
|
|
# initialize to default above
|
|
|
|
|
set_log_level()
|
|
|
|
|
|
|
|
|
|
@functools.lru_cache(2)
|
|
|
|
|
def encode_audio_stream(path, ffmpeg_path, audio_stream_language=None):
|
|
|
|
@ -138,7 +147,8 @@ def encode_audio_stream(path, ffmpeg_path, audio_stream_language=None):
|
|
|
|
|
.run(cmd=[ffmpeg_path, "-nostdin"], capture_stdout=True, capture_stderr=True)
|
|
|
|
|
|
|
|
|
|
except ffmpeg.Error as e:
|
|
|
|
|
raise RuntimeError(f"Failed to load audio: {e.stderr.decode()}") from e
|
|
|
|
|
logger.warning(f"ffmpeg failed to load audio: {e.stderr.decode()}")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
logger.debug(f"Finished encoding audio stream in {path} with no errors")
|
|
|
|
|
|
|
|
|
@ -161,6 +171,9 @@ def whisper_get_language_reverse(alpha3):
|
|
|
|
|
return wl
|
|
|
|
|
raise ValueError
|
|
|
|
|
|
|
|
|
|
def language_from_alpha3(lang):
|
|
|
|
|
name = Language(lang).name
|
|
|
|
|
return name
|
|
|
|
|
|
|
|
|
|
class WhisperAISubtitle(Subtitle):
|
|
|
|
|
'''Whisper AI Subtitle.'''
|
|
|
|
@ -198,12 +211,10 @@ class WhisperAIProvider(Provider):
|
|
|
|
|
for lan in whisper_languages:
|
|
|
|
|
languages.update({whisper_get_language(lan, whisper_languages[lan])})
|
|
|
|
|
|
|
|
|
|
languages.update(set(Language.rebuild(lang, hi=True) for lang in languages))
|
|
|
|
|
languages.update(set(Language.rebuild(lang, forced=True) for lang in languages))
|
|
|
|
|
|
|
|
|
|
video_types = (Episode, Movie)
|
|
|
|
|
|
|
|
|
|
def __init__(self, endpoint=None, timeout=None, ffmpeg_path=None):
|
|
|
|
|
def __init__(self, endpoint=None, timeout=None, ffmpeg_path=None, loglevel=None):
|
|
|
|
|
set_log_level(loglevel)
|
|
|
|
|
if not endpoint:
|
|
|
|
|
raise ConfigurationError('Whisper Web Service Endpoint must be provided')
|
|
|
|
|
|
|
|
|
@ -230,12 +241,16 @@ class WhisperAIProvider(Provider):
|
|
|
|
|
def detect_language(self, path) -> Language:
|
|
|
|
|
out = encode_audio_stream(path, self.ffmpeg_path)
|
|
|
|
|
|
|
|
|
|
if out == None:
|
|
|
|
|
logger.info(f"Whisper cannot detect language of {path} because of missing/bad audio track")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
r = self.session.post(f"{self.endpoint}/detect-language",
|
|
|
|
|
params={'encode': 'false'},
|
|
|
|
|
files={'audio_file': out},
|
|
|
|
|
timeout=(5, self.timeout))
|
|
|
|
|
timeout=(self.timeout, self.timeout))
|
|
|
|
|
|
|
|
|
|
logger.info(f"Whisper detected language of {path} as {r.json()['detected_language']}")
|
|
|
|
|
logger.debug(f"Whisper detected language of {path} as {r.json()['detected_language']}")
|
|
|
|
|
|
|
|
|
|
return whisper_get_language(r.json()["language_code"], r.json()["detected_language"])
|
|
|
|
|
|
|
|
|
@ -262,6 +277,11 @@ class WhisperAIProvider(Provider):
|
|
|
|
|
else:
|
|
|
|
|
# We must detect the language manually
|
|
|
|
|
detected_lang = self.detect_language(video.original_path)
|
|
|
|
|
if detected_lang == None:
|
|
|
|
|
sub.task = "error"
|
|
|
|
|
# tell the user what is wrong
|
|
|
|
|
sub.release_info = "bad/missing audio track - cannot transcribe"
|
|
|
|
|
return sub
|
|
|
|
|
|
|
|
|
|
if detected_lang != language:
|
|
|
|
|
sub.task = "translate"
|
|
|
|
@ -270,9 +290,11 @@ class WhisperAIProvider(Provider):
|
|
|
|
|
|
|
|
|
|
if sub.task == "translate":
|
|
|
|
|
if language.alpha3 != "eng":
|
|
|
|
|
logger.info(f"Translation only possible from {language} to English")
|
|
|
|
|
logger.debug(f"Translation only possible from {language} to English")
|
|
|
|
|
return None
|
|
|
|
|
|
|
|
|
|
# tell the user what we are about to do
|
|
|
|
|
sub.release_info = f"{sub.task} {language_from_alpha3(sub.audio_language)} audio -> {language_from_alpha3(language.alpha3)} SRT"
|
|
|
|
|
logger.debug(f"Whisper ({video.original_path}): {sub.audio_language} -> {language.alpha3} [TASK: {sub.task}]")
|
|
|
|
|
|
|
|
|
|
return sub
|
|
|
|
@ -285,11 +307,29 @@ class WhisperAIProvider(Provider):
|
|
|
|
|
# Invoke Whisper through the API. This may take a long time depending on the file.
|
|
|
|
|
# TODO: This loads the entire file into memory, find a good way to stream the file in chunks
|
|
|
|
|
|
|
|
|
|
out = None
|
|
|
|
|
if subtitle.task != "error":
|
|
|
|
|
out = encode_audio_stream(subtitle.video.original_path, self.ffmpeg_path, subtitle.force_audio_stream)
|
|
|
|
|
if out == None:
|
|
|
|
|
logger.info(f"Whisper cannot process {subtitle.video.original_path} because of missing/bad audio track")
|
|
|
|
|
subtitle.content = None
|
|
|
|
|
return
|
|
|
|
|
|
|
|
|
|
if subtitle.task == "transcribe":
|
|
|
|
|
output_language = subtitle.audio_language
|
|
|
|
|
else:
|
|
|
|
|
output_language = "eng"
|
|
|
|
|
|
|
|
|
|
logger.info(f'Starting WhisperAI {subtitle.task} to {language_from_alpha3(output_language)} for {subtitle.video.original_path}')
|
|
|
|
|
startTime = time.time()
|
|
|
|
|
|
|
|
|
|
r = self.session.post(f"{self.endpoint}/asr",
|
|
|
|
|
params={'task': subtitle.task, 'language': whisper_get_language_reverse(subtitle.audio_language), 'output': 'srt', 'encode': 'false'},
|
|
|
|
|
files={'audio_file': out},
|
|
|
|
|
timeout=(5, self.timeout))
|
|
|
|
|
timeout=(self.timeout, self.timeout))
|
|
|
|
|
|
|
|
|
|
endTime = time.time()
|
|
|
|
|
elapsedTime = timedelta(seconds=round(endTime - startTime))
|
|
|
|
|
logger.info(f'Completed WhisperAI {subtitle.task} to {language_from_alpha3(output_language)} in {elapsedTime} for {subtitle.video.original_path}')
|
|
|
|
|
|
|
|
|
|
subtitle.content = r.content
|
|
|
|
|