@ -130,7 +130,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
onStarted = ( ) = > openTaskCompletionSource . TrySetResult ( true ) ;
}
await _multicastStream . CopyUntilCancelled ( udpClient , onStarted , cancellationToken ) . ConfigureAwait ( false ) ;
await _multicastStream . CopyUntilCancelled ( new UdpClientStream ( udpClient ) , onStarted , cancellationToken ) . ConfigureAwait ( false ) ;
}
}
catch ( OperationCanceledException ex )
@ -167,4 +167,127 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
return _multicastStream . CopyToAsync ( stream ) ;
}
}
// This handles the ReadAsync function only of a Stream object
// This is used to wrap a UDP socket into a stream for MulticastStream which only uses ReadAsync
public class UdpClientStream : Stream
{
private static int RtpHeaderBytes = 12 ;
private static int PacketSize = 1316 ;
private readonly ISocket _udpClient ;
bool disposed ;
public UdpClientStream ( ISocket udpClient ) : base ( )
{
_udpClient = udpClient ;
}
public override async Task < int > ReadAsync ( byte [ ] buffer , int offset , int count , CancellationToken cancellationToken )
{
if ( buffer = = null )
throw new ArgumentNullException ( "buffer" ) ;
if ( offset + count < 0 )
throw new ArgumentOutOfRangeException ( "offset + count must not be negative" , "offset+count" ) ;
if ( offset + count > buffer . Length )
throw new ArgumentException ( "offset + count must not be greater than the length of buffer" , "offset+count" ) ;
if ( disposed )
throw new ObjectDisposedException ( typeof ( UdpClientStream ) . ToString ( ) ) ;
// This will always receive a 1328 packet size (PacketSize + RtpHeaderSize)
// The RTP header will be stripped so see how many reads we need to make to fill the buffer.
int numReads = count / PacketSize ;
int totalBytesRead = 0 ;
for ( int i = 0 ; i < numReads ; + + i )
{
var data = await _udpClient . ReceiveAsync ( cancellationToken ) . ConfigureAwait ( false ) ;
var bytesRead = data . ReceivedBytes - RtpHeaderBytes ;
// remove rtp header
Buffer . BlockCopy ( data . Buffer , RtpHeaderBytes , buffer , offset , bytesRead ) ;
offset + = bytesRead ;
totalBytesRead + = bytesRead ;
}
return totalBytesRead ;
}
protected override void Dispose ( bool disposing )
{
disposed = true ;
}
public override bool CanRead
{
get
{
throw new NotImplementedException ( ) ;
}
}
public override bool CanSeek
{
get
{
throw new NotImplementedException ( ) ;
}
}
public override bool CanWrite
{
get
{
throw new NotImplementedException ( ) ;
}
}
public override long Length
{
get
{
throw new NotImplementedException ( ) ;
}
}
public override long Position
{
get
{
throw new NotImplementedException ( ) ;
}
set
{
throw new NotImplementedException ( ) ;
}
}
public override void Flush ( )
{
throw new NotImplementedException ( ) ;
}
public override int Read ( byte [ ] buffer , int offset , int count )
{
throw new NotImplementedException ( ) ;
}
public override long Seek ( long offset , SeekOrigin origin )
{
throw new NotImplementedException ( ) ;
}
public override void SetLength ( long value )
{
throw new NotImplementedException ( ) ;
}
public override void Write ( byte [ ] buffer , int offset , int count )
{
throw new NotImplementedException ( ) ;
}
}
}