@ -16,6 +16,8 @@ using MediaBrowser.Model.Logging;
using MediaBrowser.Model.MediaInfo ;
using MediaBrowser.Model.Net ;
using MediaBrowser.Model.System ;
using System.Globalization ;
using MediaBrowser.Controller.IO ;
namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{
@ -32,6 +34,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly INetworkManager _networkManager ;
private readonly string _tempFilePath ;
private bool _enableFileBuffer = false ;
private readonly MulticastStream _multicastStream ;
public HdHomerunUdpStream ( MediaSourceInfo mediaSource , string originalStreamId , IHdHomerunChannelCommands channelCommands , int numTuners , IFileSystem fileSystem , IHttpClient httpClient , ILogger logger , IServerApplicationPaths appPaths , IServerApplicationHost appHost , ISocketFactory socketFactory , INetworkManager networkManager , IEnvironmentInfo environment )
: base ( mediaSource , environment , fileSystem )
@ -44,6 +48,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_channelCommands = channelCommands ;
_numTuners = numTuners ;
_tempFilePath = Path . Combine ( appPaths . TranscodingTempPath , UniqueId + ".ts" ) ;
_multicastStream = new MulticastStream ( _logger ) ;
}
protected override async Task OpenInternal ( CancellationToken openCancellationToken )
@ -121,10 +126,17 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
if ( ! cancellationToken . IsCancellationRequested )
{
FileSystem . CreateDirectory ( FileSystem . GetDirectoryName ( _tempFilePath ) ) ;
using ( var fileStream = FileSystem . GetFileStream ( _tempFilePath , FileOpenMode . Create , FileAccessMode . Write , FileShareMode . Read , FileOpenOptions . Asynchronous ) )
if ( _enableFileBuffer )
{
await CopyTo ( udpClient , fileStream , openTaskCompletionSource , cancellationToken ) . ConfigureAwait ( false ) ;
FileSystem . CreateDirectory ( FileSystem . GetDirectoryName ( _tempFilePath ) ) ;
using ( var fileStream = FileSystem . GetFileStream ( _tempFilePath , FileOpenMode . Create , FileAccessMode . Write , FileShareMode . Read , FileOpenOptions . None ) )
{
CopyTo ( udpClient , fileStream , openTaskCompletionSource , cancellationToken ) ;
}
}
else
{
await _multicastStream . CopyUntilCancelled ( new UdpClientStream ( udpClient ) , ( ) = > Resolve ( openTaskCompletionSource ) , cancellationToken ) . ConfigureAwait ( false ) ;
}
}
}
@ -166,80 +178,111 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
} ) ;
}
public Task CopyToAsync ( Stream s tream, CancellationToken cancellationToken )
public async Task CopyToAsync ( Stream outputS tream, CancellationToken cancellationToken )
{
return CopyFileTo ( _tempFilePath , false , stream , cancellationToken ) ;
}
if ( ! _enableFileBuffer )
{
await _multicastStream . CopyToAsync ( outputStream , cancellationToken ) . ConfigureAwait ( false ) ;
return ;
}
protected async Task CopyFileTo ( string path , bool allowEndOfFile , Stream outputStream , CancellationToken cancellationToken )
{
var eofCount = 0 ;
var path = _tempFilePath ;
long startPosition = - 2 5 000;
long startPosition = - 20000 ;
if ( startPosition < 0 )
{
var length = FileSystem . GetFileInfo ( path ) . Length ;
startPosition = Math . Max ( length - startPosition , 0 ) ;
}
using ( var inputStream = GetInputStream ( path , startPosition , true ) )
_logger . Info ( "Live stream starting position is {0} bytes" , startPosition . ToString ( CultureInfo . InvariantCulture ) ) ;
var allowAsync = Environment . OperatingSystem ! = MediaBrowser . Model . System . OperatingSystem . Windows ;
// use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039
using ( var inputStream = GetInputStream ( path , startPosition , allowAsync ) )
{
if ( startPosition > 0 )
{
inputStream . Position = startPosition ;
}
while ( eofCount < 20 | | ! allowEndOfFile )
while ( ! cancellationToken . IsCancellationRequested )
{
var bytesRead = await AsyncStreamCopier . CopyStream ( inputStream , outputStream , 81920 , 4 , cancellationToken ) . ConfigureAwait ( false ) ;
long bytesRead ;
if ( allowAsync )
{
bytesRead = await AsyncStreamCopier . CopyStream ( inputStream , outputStream , 81920 , 2 , cancellationToken ) . ConfigureAwait ( false ) ;
}
else
{
StreamHelper . CopyTo ( inputStream , outputStream , 81920 , cancellationToken ) ;
bytesRead = 1 ;
}
//var position = fs.Position;
//_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path);
if ( bytesRead = = 0 )
{
eofCount + + ;
await Task . Delay ( 100 , cancellationToken ) . ConfigureAwait ( false ) ;
}
else
{
eofCount = 0 ;
}
}
}
}
private static int RtpHeaderBytes = 12 ;
private Task CopyTo ( ISocket udpClient , Stream outputStream , TaskCompletionSource < bool > openTaskCompletionSource , CancellationToken cancellationToken )
private void CopyTo ( ISocket udpClient , Stream target , TaskCompletionSource < bool > openTaskCompletionSource , CancellationToken cancellationToken )
{
return CopyStream ( _socketFactory . CreateNetworkStream ( udpClient , false ) , outputStream , 81920 , 4 , openTaskCompletionSource , cancellationToken ) ;
}
var source = _socketFactory . CreateNetworkStream ( udpClient , false ) ;
var bufferSize = 81920 ;
private Task CopyStream ( Stream source , Stream target , int bufferSize , int bufferCount , TaskCompletionSource < bool > openTaskCompletionSource , CancellationToken cancellationToken )
{
var copier = new AsyncStreamCopier ( source , target , 0 , cancellationToken , false , bufferSize , bufferCount ) ;
copier . IndividualReadOffset = RtpHeaderBytes ;
byte [ ] buffer = new byte [ bufferSize ] ;
int read ;
var resolved = false ;
var taskCompletion = new TaskCompletionSource < long > ( ) ;
while ( ( read = source . Read ( buffer , 0 , buffer . Length ) ) ! = 0 )
{
cancellationToken . ThrowIfCancellationRequested ( ) ;
copier . TaskCompletionSource = taskCompletion ;
read - = RtpHeaderBytes ;
var result = copier . BeginCopy ( StreamCopyCallback , copier ) ;
if ( read > 0 )
{
target . Write ( buffer , RtpHeaderBytes , read ) ;
}
if ( openTaskCompletionSource ! = null )
{
Resolve ( openTaskCompletionSource ) ;
openTaskCompletionSource = null ;
if ( ! resolved )
{
resolved = true ;
Resolve ( openTaskCompletionSource ) ;
}
}
if ( result . CompletedSynchronously )
{
StreamCopyCallback ( result ) ;
}
//var copier = new AsyncStreamCopier(source, target, 0, cancellationToken, false, bufferSize, bufferCount);
//copier.IndividualReadOffset = RtpHeaderBytes;
//var taskCompletion = new TaskCompletionSource<long>();
//copier.TaskCompletionSource = taskCompletion;
//var result = copier.BeginCopy(StreamCopyCallback, copier);
cancellationToken . Register ( ( ) = > taskCompletion . TrySetCanceled ( ) ) ;
//if (openTaskCompletionSource != null)
//{
// Resolve(openTaskCompletionSource);
// openTaskCompletionSource = null;
//}
return taskCompletion . Task ;
//if (result.CompletedSynchronously)
//{
// StreamCopyCallback(result);
//}
//cancellationToken.Register(() => taskCompletion.TrySetCanceled());
//return taskCompletion.Task;
}
private void StreamCopyCallback ( IAsyncResult result )
@ -258,5 +301,155 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
}
}
public class UdpClientStream : Stream
{
private static int RtpHeaderBytes = 12 ;
private static int PacketSize = 1316 ;
private readonly ISocket _udpClient ;
bool disposed ;
public UdpClientStream ( ISocket udpClient ) : base ( )
{
_udpClient = udpClient ;
}
public override async Task < int > ReadAsync ( byte [ ] buffer , int offset , int count , CancellationToken cancellationToken )
{
if ( buffer = = null )
throw new ArgumentNullException ( "buffer" ) ;
if ( offset + count < 0 )
throw new ArgumentOutOfRangeException ( "offset + count must not be negative" , "offset+count" ) ;
if ( offset + count > buffer . Length )
throw new ArgumentException ( "offset + count must not be greater than the length of buffer" , "offset+count" ) ;
if ( disposed )
throw new ObjectDisposedException ( typeof ( UdpClientStream ) . ToString ( ) ) ;
// This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
// The RTP header will be stripped so see how many reads we need to make to fill the buffer.
int numReads = count / PacketSize ;
int totalBytesRead = 0 ;
byte [ ] receiveBuffer = new byte [ 81920 ] ;
for ( int i = 0 ; i < numReads ; + + i )
{
var data = await _udpClient . ReceiveAsync ( receiveBuffer , 0 , receiveBuffer . Length , cancellationToken ) . ConfigureAwait ( false ) ;
var bytesRead = data . ReceivedBytes - RtpHeaderBytes ;
// remove rtp header
Buffer . BlockCopy ( data . Buffer , RtpHeaderBytes , buffer , offset , bytesRead ) ;
offset + = bytesRead ;
totalBytesRead + = bytesRead ;
}
return totalBytesRead ;
}
public override int Read ( byte [ ] buffer , int offset , int count )
{
if ( buffer = = null )
throw new ArgumentNullException ( "buffer" ) ;
if ( offset + count < 0 )
throw new ArgumentOutOfRangeException ( "offset + count must not be negative" , "offset+count" ) ;
if ( offset + count > buffer . Length )
throw new ArgumentException ( "offset + count must not be greater than the length of buffer" , "offset+count" ) ;
if ( disposed )
throw new ObjectDisposedException ( typeof ( UdpClientStream ) . ToString ( ) ) ;
// This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
// The RTP header will be stripped so see how many reads we need to make to fill the buffer.
int numReads = count / PacketSize ;
int totalBytesRead = 0 ;
byte [ ] receiveBuffer = new byte [ 81920 ] ;
for ( int i = 0 ; i < numReads ; + + i )
{
var receivedBytes = _udpClient . Receive ( receiveBuffer , 0 , receiveBuffer . Length ) ;
var bytesRead = receivedBytes - RtpHeaderBytes ;
// remove rtp header
Buffer . BlockCopy ( receiveBuffer , RtpHeaderBytes , buffer , offset , bytesRead ) ;
offset + = bytesRead ;
totalBytesRead + = bytesRead ;
}
return totalBytesRead ;
}
protected override void Dispose ( bool disposing )
{
disposed = true ;
}
public override bool CanRead
{
get
{
throw new NotImplementedException ( ) ;
}
}
public override bool CanSeek
{
get
{
throw new NotImplementedException ( ) ;
}
}
public override bool CanWrite
{
get
{
throw new NotImplementedException ( ) ;
}
}
public override long Length
{
get
{
throw new NotImplementedException ( ) ;
}
}
public override long Position
{
get
{
throw new NotImplementedException ( ) ;
}
set
{
throw new NotImplementedException ( ) ;
}
}
public override void Flush ( )
{
throw new NotImplementedException ( ) ;
}
public override long Seek ( long offset , SeekOrigin origin )
{
throw new NotImplementedException ( ) ;
}
public override void SetLength ( long value )
{
throw new NotImplementedException ( ) ;
}
public override void Write ( byte [ ] buffer , int offset , int count )
{
throw new NotImplementedException ( ) ;
}
}
}
}