@ -13,6 +13,7 @@ using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Server.Implementations.LiveTv.EmbyTV ;
using System.Collections.Generic ;
using System.Linq ;
using MediaBrowser.Common.Extensions ;
namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
@ -26,8 +27,10 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly CancellationTokenSource _liveStreamCancellationTokenSource = new CancellationTokenSource ( ) ;
private readonly TaskCompletionSource < bool > _liveStreamTaskCompletionSource = new TaskCompletionSource < bool > ( ) ;
private readonly MulticastStream _multicastStream ;
public HdHomerunLiveStream ( MediaSourceInfo mediaSource , IFileSystem fileSystem , IHttpClient httpClient , ILogger logger , IServerApplicationPaths appPaths , IServerApplicationHost appHost )
public HdHomerunLiveStream ( MediaSourceInfo mediaSource , string originalStreamId , IFileSystem fileSystem , IHttpClient httpClient , ILogger logger , IServerApplicationPaths appPaths , IServerApplicationHost appHost )
: base ( mediaSource )
{
_fileSystem = fileSystem ;
@ -35,6 +38,8 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_logger = logger ;
_appPaths = appPaths ;
_appHost = appHost ;
OriginalStreamId = originalStreamId ;
_multicastStream = new MulticastStream ( _logger ) ;
}
protected override async Task OpenInternal ( CancellationToken openCancellationToken )
@ -44,22 +49,18 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var mediaSource = OriginalMediaSource ;
var url = mediaSource . Path ;
var tempFile = Path . Combine ( _appPaths . TranscodingTempPath , Guid . NewGuid ( ) . ToString ( "N" ) + ".ts" ) ;
Directory . CreateDirectory ( Path . GetDirectoryName ( tempFile ) ) ;
_logger . Info ( "Opening HDHR Live stream from {0} to {1}" , url , tempFile ) ;
var output = _fileSystem . GetFileStream ( tempFile , FileMode . Create , FileAccess . Write , FileShare . Read ) ;
_logger . Info ( "Opening HDHR Live stream from {0}" , url ) ;
var taskCompletionSource = new TaskCompletionSource < bool > ( ) ;
StartStreaming ToTempFile ( output , tempFile , url , taskCompletionSource , _liveStreamCancellationTokenSource . Token ) ;
StartStreaming ( url , taskCompletionSource , _liveStreamCancellationTokenSource . Token ) ;
//OpenedMediaSource.Protocol = MediaProtocol.File;
//OpenedMediaSource.Path = tempFile;
//OpenedMediaSource.ReadAtNativeFramerate = true;
OpenedMediaSource . Path = _appHost . GetLocalApiUrl ( "localhost" ) + "/LiveTv/LiveStreamFiles/" + Path. GetFileNameWithoutExtension ( tempFile ) + "/stream.ts" ;
OpenedMediaSource . Path = _appHost . GetLocalApiUrl ( "localhost" ) + "/LiveTv/LiveStreamFiles/" + UniqueId + "/stream.ts" ;
OpenedMediaSource . Protocol = MediaProtocol . Http ;
OpenedMediaSource . SupportsDirectPlay = false ;
OpenedMediaSource . SupportsDirectStream = true ;
@ -78,178 +79,67 @@ namespace MediaBrowser.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return _liveStreamTaskCompletionSource . Task ;
}
private async Task StartStreaming ToTempFile ( Stream outputStream , string tempFilePath , string url , TaskCompletionSource < bool > openTaskCompletionSource , CancellationToken cancellationToken )
private async Task StartStreaming ( string url , TaskCompletionSource < bool > openTaskCompletionSource , CancellationToken cancellationToken )
{
await Task . Run ( async ( ) = >
{
using ( outputStream )
{
var isFirstAttempt = true ;
var isFirstAttempt = true ;
while ( ! cancellationToken . IsCancellationRequested )
while ( ! cancellationToken . IsCancellationRequested )
{
try
{
try
using ( var response = await _httpClient . SendAsync ( new HttpRequestOptions
{
using ( var response = await _httpClient . SendAsync ( new HttpRequestOptions
{
Url = url ,
CancellationToken = cancellationToken ,
BufferContent = false
Url = url ,
CancellationToken = cancellationToken ,
BufferContent = false
} , "GET" ) . ConfigureAwait ( false ) )
{
_logger . Info ( "Opened HDHR stream from {0}" , url ) ;
} , "GET" ) . ConfigureAwait ( false ) )
if ( ! cancellationToken . IsCancellationRequested )
{
_logger . Info ( " Opened HDHR stream from {0}", url ) ;
_logger . Info ( " Beginning multicastStream.CopyUntilCancelled" ) ;
if ( ! cancellationToken . IsCancellationRequested )
Action onStarted = null ;
if ( isFirstAttempt )
{
_logger . Info ( "Beginning DirectRecorder.CopyUntilCancelled" ) ;
Action onStarted = null ;
if ( isFirstAttempt )
{
onStarted = ( ) = > ResolveWhenExists ( openTaskCompletionSource , tempFilePath , cancellationToken ) ;
}
await CopyUntilCancelled ( response . Content , outputStream , onStarted , cancellationToken ) . ConfigureAwait ( false ) ;
onStarted = ( ) = > openTaskCompletionSource . TrySetResult ( true ) ;
}
}
}
catch ( OperationCanceledException )
{
break ;
}
catch ( Exception ex )
{
if ( isFirstAttempt )
{
_logger . ErrorException ( "Error opening live stream:" , ex ) ;
openTaskCompletionSource . TrySetException ( ex ) ;
break ;
}
_logger . ErrorException ( "Error copying live stream, will reopen" , ex ) ;
await _multicastStream . CopyUntilCancelled ( response . Content , onStarted , cancellationToken ) . ConfigureAwait ( false ) ;
}
}
isFirstAttempt = false ;
}
}
_liveStreamTaskCompletionSource . TrySetResult ( true ) ;
DeleteTempFile ( tempFilePath ) ;
} ) . ConfigureAwait ( false ) ;
}
private readonly List < Tuple < Stream , CancellationToken , TaskCompletionSource < bool > > > _additionalStreams = new List < Tuple < Stream , CancellationToken , TaskCompletionSource < bool > > > ( ) ;
public Task CopyToAsync ( Stream stream , CancellationToken cancellationToken )
{
var taskCompletionSource = new TaskCompletionSource < bool > ( ) ;
_additionalStreams . Add ( new Tuple < Stream , CancellationToken , TaskCompletionSource < bool > > ( stream , cancellationToken , taskCompletionSource ) ) ;
return taskCompletionSource . Task ;
}
private void PopAdditionalStream ( Tuple < Stream , CancellationToken , TaskCompletionSource < bool > > stream , Exception exception )
{
if ( _additionalStreams . Remove ( stream ) )
{
stream . Item3 . TrySetException ( exception ) ;
}
}
private const int BufferSize = 81920 ;
private async Task CopyUntilCancelled ( Stream source , Stream target , Action onStarted , CancellationToken cancellationToken )
{
while ( ! cancellationToken . IsCancellationRequested )
{
var bytesRead = await CopyToAsyncInternal ( source , target , BufferSize , onStarted , cancellationToken ) . ConfigureAwait ( false ) ;
onStarted = null ;
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if ( bytesRead = = 0 )
{
await Task . Delay ( 100 ) . ConfigureAwait ( false ) ;
}
}
}
private async Task < int > CopyToAsyncInternal ( Stream source , Stream destination , Int32 bufferSize , Action onStarted , CancellationToken cancellationToken )
{
byte [ ] buffer = new byte [ bufferSize ] ;
int bytesRead ;
int totalBytesRead = 0 ;
while ( ( bytesRead = await source . ReadAsync ( buffer , 0 , buffer . Length , cancellationToken ) . ConfigureAwait ( false ) ) ! = 0 )
{
await destination . WriteAsync ( buffer , 0 , bytesRead , cancellationToken ) . ConfigureAwait ( false ) ;
var additionalStreams = _additionalStreams . ToList ( ) ;
foreach ( var additionalStream in additionalStreams )
{
cancellationToken . ThrowIfCancellationRequested ( ) ;
try
catch ( OperationCanceledException )
{
await additionalStream . Item1 . WriteAsync ( buffer , 0 , bytesRead , additionalStream . Item2 ) . ConfigureAwait ( false ) ;
break ;
}
catch ( Exception ex )
{
_logger . ErrorException ( "Error writing HDHR data to stream" , ex ) ;
if ( isFirstAttempt )
{
_logger . ErrorException ( "Error opening live stream:" , ex ) ;
openTaskCompletionSource . TrySetException ( ex ) ;
break ;
}
PopAdditionalStream ( additionalStream , ex ) ;
_logger . ErrorException ( "Error copying live stream, will reopen" , ex ) ;
}
}
totalBytesRead + = bytesRead ;
if ( onStarted ! = null )
{
onStarted ( ) ;
isFirstAttempt = false ;
}
onStarted = null ;
}
return totalBytesRead ;
}
private async void ResolveWhenExists ( TaskCompletionSource < bool > taskCompletionSource , string file , CancellationToken cancellationToken )
{
while ( ! File . Exists ( file ) & & ! cancellationToken . IsCancellationRequested )
{
await Task . Delay ( 50 ) . ConfigureAwait ( false ) ;
}
_liveStreamTaskCompletionSource . TrySetResult ( true ) ;
taskCompletionSource . TrySetResult ( tru e) ;
} ) . ConfigureAwait ( false ) ;
}
p rivate async void DeleteTempFile ( string path )
public Task CopyToAsync ( Stream stream , CancellationToken cancellationToken )
{
for ( var i = 0 ; i < 10 ; i + + )
{
try
{
File . Delete ( path ) ;
return ;
}
catch ( FileNotFoundException )
{
return ;
}
catch ( DirectoryNotFoundException )
{
return ;
}
catch ( Exception ex )
{
_logger . ErrorException ( "Error deleting temp file {0}" , ex , path ) ;
}
await Task . Delay ( 1000 ) . ConfigureAwait ( false ) ;
}
return _multicastStream . CopyToAsync ( stream ) ;
}
}
}