update live stream management

pull/1154/head
Luke Pulverenti 7 years ago
parent 141e5b56b0
commit 9b71812325

@ -411,8 +411,6 @@ namespace Emby.Server.Implementations.HttpServer
host = host ?? string.Empty; host = host ?? string.Empty;
_logger.Debug("Validating host {0}", host);
if (_networkManager.IsInPrivateAddressSpace(host)) if (_networkManager.IsInPrivateAddressSpace(host))
{ {
hosts.Add("localhost"); hosts.Add("localhost");

@ -1222,7 +1222,7 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
_logger.Info("Closing live stream {0}", id); _logger.Info("Closing live stream {0}", id);
await stream.Close().ConfigureAwait(false); stream.Close();
_logger.Info("Live stream {0} closed successfully", id); _logger.Info("Live stream {0} closed successfully", id);
} }
} }
@ -1286,9 +1286,9 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
Id = timer.Id Id = timer.Id
}; };
if (_activeRecordings.TryAdd(timer.Id, activeRecordingInfo)) if (!_activeRecordings.ContainsKey(timer.Id))
{ {
await RecordStream(timer, recordingEndDate, activeRecordingInfo, activeRecordingInfo.CancellationTokenSource.Token).ConfigureAwait(false); await RecordStream(timer, recordingEndDate, activeRecordingInfo).ConfigureAwait(false);
} }
else else
{ {
@ -1397,8 +1397,7 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
return Path.Combine(recordPath, recordingFileName); return Path.Combine(recordPath, recordingFileName);
} }
private async Task RecordStream(TimerInfo timer, DateTime recordingEndDate, private async Task RecordStream(TimerInfo timer, DateTime recordingEndDate, ActiveRecordingInfo activeRecordingInfo)
ActiveRecordingInfo activeRecordingInfo, CancellationToken cancellationToken)
{ {
if (timer == null) if (timer == null)
{ {
@ -1420,19 +1419,18 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
if (programInfo != null) if (programInfo != null)
{ {
CopyProgramInfoToTimerInfo(programInfo, timer); CopyProgramInfoToTimerInfo(programInfo, timer);
//activeRecordingInfo.Program = programInfo;
} }
string seriesPath = null; string seriesPath = null;
var recordPath = GetRecordingPath(timer, out seriesPath); var recordPath = GetRecordingPath(timer, out seriesPath);
var recordingStatus = RecordingStatus.New; var recordingStatus = RecordingStatus.New;
var recorder = await GetRecorder().ConfigureAwait(false);
string liveStreamId = null; string liveStreamId = null;
try try
{ {
var recorder = await GetRecorder().ConfigureAwait(false);
var allMediaSources = await GetChannelStreamMediaSources(timer.ChannelId, CancellationToken.None).ConfigureAwait(false); var allMediaSources = await GetChannelStreamMediaSources(timer.ChannelId, CancellationToken.None).ConfigureAwait(false);
_logger.Info("Opening recording stream from tuner provider"); _logger.Info("Opening recording stream from tuner provider");
@ -1442,14 +1440,10 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
var mediaStreamInfo = liveStreamInfo.Item2; var mediaStreamInfo = liveStreamInfo.Item2;
liveStreamId = mediaStreamInfo.Id; liveStreamId = mediaStreamInfo.Id;
// HDHR doesn't seem to release the tuner right away after first probing with ffmpeg
//await Task.Delay(3000, cancellationToken).ConfigureAwait(false);
recordPath = recorder.GetOutputPath(mediaStreamInfo, recordPath); recordPath = recorder.GetOutputPath(mediaStreamInfo, recordPath);
recordPath = EnsureFileUnique(recordPath, timer.Id); recordPath = EnsureFileUnique(recordPath, timer.Id);
_libraryMonitor.ReportFileSystemChangeBeginning(recordPath); _libraryMonitor.ReportFileSystemChangeBeginning(recordPath);
activeRecordingInfo.Path = recordPath;
var duration = recordingEndDate - DateTime.UtcNow; var duration = recordingEndDate - DateTime.UtcNow;
@ -1459,6 +1453,10 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
Action onStarted = async () => Action onStarted = async () =>
{ {
activeRecordingInfo.Path = recordPath;
_activeRecordings.TryAdd(timer.Id, activeRecordingInfo);
timer.Status = RecordingStatus.InProgress; timer.Status = RecordingStatus.InProgress;
_timerProvider.AddOrUpdate(timer, false); _timerProvider.AddOrUpdate(timer, false);
@ -1470,7 +1468,7 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
EnforceKeepUpTo(timer, seriesPath); EnforceKeepUpTo(timer, seriesPath);
}; };
await recorder.Record(liveStreamInfo.Item1 as IDirectStreamProvider, mediaStreamInfo, recordPath, duration, onStarted, cancellationToken).ConfigureAwait(false); await recorder.Record(liveStreamInfo.Item1 as IDirectStreamProvider, mediaStreamInfo, recordPath, duration, onStarted, activeRecordingInfo.CancellationTokenSource.Token).ConfigureAwait(false);
recordingStatus = RecordingStatus.Completed; recordingStatus = RecordingStatus.Completed;
_logger.Info("Recording completed: {0}", recordPath); _logger.Info("Recording completed: {0}", recordPath);

@ -203,7 +203,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var uri = new Uri(GetApiUrl(info, false)); var uri = new Uri(GetApiUrl(info, false));
using (var manager = new HdHomerunManager(_socketFactory)) using (var manager = new HdHomerunManager(_socketFactory, Logger))
{ {
// Legacy HdHomeruns are IPv4 only // Legacy HdHomeruns are IPv4 only
var ipInfo = _networkManager.ParseIpAddress(uri.Host); var ipInfo = _networkManager.ParseIpAddress(uri.Host);

@ -22,8 +22,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly IHttpClient _httpClient; private readonly IHttpClient _httpClient;
private readonly IServerApplicationHost _appHost; private readonly IServerApplicationHost _appHost;
private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>();
public HdHomerunHttpStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, IEnvironmentInfo environment) public HdHomerunHttpStream(MediaSourceInfo mediaSource, string originalStreamId, IFileSystem fileSystem, IHttpClient httpClient, ILogger logger, IServerApplicationPaths appPaths, IServerApplicationHost appHost, IEnvironmentInfo environment)
: base(mediaSource, environment, fileSystem, logger, appPaths) : base(mediaSource, environment, fileSystem, logger, appPaths)
{ {
@ -32,7 +30,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
OriginalStreamId = originalStreamId; OriginalStreamId = originalStreamId;
} }
protected override Task OpenInternal(CancellationToken openCancellationToken) public override async Task Open(CancellationToken openCancellationToken)
{ {
LiveStreamCancellationTokenSource.Token.ThrowIfCancellationRequested(); LiveStreamCancellationTokenSource.Token.ThrowIfCancellationRequested();
@ -40,11 +38,26 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var url = mediaSource.Path; var url = mediaSource.Path;
FileSystem.CreateDirectory(FileSystem.GetDirectoryName(TempFilePath));
Logger.Info("Opening HDHR Live stream from {0}", url); Logger.Info("Opening HDHR Live stream from {0}", url);
var taskCompletionSource = new TaskCompletionSource<bool>(); var response = await _httpClient.SendAsync(new HttpRequestOptions
{
Url = url,
CancellationToken = CancellationToken.None,
BufferContent = false,
StartStreaming(url, taskCompletionSource, LiveStreamCancellationTokenSource.Token); // Increase a little bit
TimeoutMs = 30000,
EnableHttpCompression = false
}, "GET").ConfigureAwait(false);
Logger.Info("Opened HDHR stream from {0}", url);
StartStreaming(response, LiveStreamCancellationTokenSource.Token);
//OpenedMediaSource.Protocol = MediaProtocol.File; //OpenedMediaSource.Protocol = MediaProtocol.File;
//OpenedMediaSource.Path = tempFile; //OpenedMediaSource.Path = tempFile;
@ -58,50 +71,30 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
//OpenedMediaSource.SupportsDirectPlay = false; //OpenedMediaSource.SupportsDirectPlay = false;
//OpenedMediaSource.SupportsDirectStream = true; //OpenedMediaSource.SupportsDirectStream = true;
//OpenedMediaSource.SupportsTranscoding = true; //OpenedMediaSource.SupportsTranscoding = true;
return taskCompletionSource.Task;
//await Task.Delay(5000).ConfigureAwait(false);
} }
public override async Task Close() public override void Close()
{ {
Logger.Info("Closing HDHR live stream"); Logger.Info("Closing HDHR live stream");
LiveStreamCancellationTokenSource.Cancel(); LiveStreamCancellationTokenSource.Cancel();
await _liveStreamTaskCompletionSource.Task.ConfigureAwait(false);
await DeleteTempFile(TempFilePath).ConfigureAwait(false);
} }
private Task StartStreaming(string url, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) private Task StartStreaming(HttpResponseInfo response, CancellationToken cancellationToken)
{ {
return Task.Run(async () => return Task.Run(async () =>
{ {
try try
{ {
using (var response = await _httpClient.SendAsync(new HttpRequestOptions using (response)
{
Url = url,
CancellationToken = cancellationToken,
BufferContent = false,
// Increase a little bit
TimeoutMs = 30000,
EnableHttpCompression = false
}, "GET").ConfigureAwait(false))
{ {
using (var stream = response.Content) using (var stream = response.Content)
{ {
Logger.Info("Opened HDHR stream from {0}", url); Logger.Info("Beginning HdHomerunHttpStream stream to file");
Logger.Info("Beginning multicastStream.CopyUntilCancelled");
FileSystem.CreateDirectory(FileSystem.GetDirectoryName(TempFilePath)); FileSystem.CreateDirectory(FileSystem.GetDirectoryName(TempFilePath));
using (var fileStream = FileSystem.GetFileStream(TempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None)) using (var fileStream = FileSystem.GetFileStream(TempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
{ {
StreamHelper.CopyTo(stream, fileStream, 81920, () => Resolve(openTaskCompletionSource), cancellationToken); StreamHelper.CopyTo(stream, fileStream, 81920, null, cancellationToken);
} }
} }
} }
@ -114,7 +107,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
Logger.ErrorException("Error copying live stream.", ex); Logger.ErrorException("Error copying live stream.", ex);
} }
_liveStreamTaskCompletionSource.TrySetResult(true); await DeleteTempFile(TempFilePath).ConfigureAwait(false);
}); });
} }

@ -6,6 +6,7 @@ using System.Text.RegularExpressions;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using MediaBrowser.Model.Net; using MediaBrowser.Model.Net;
using MediaBrowser.Model.Logging;
namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{ {
@ -89,9 +90,12 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly ISocketFactory _socketFactory; private readonly ISocketFactory _socketFactory;
private IpAddressInfo _remoteIp; private IpAddressInfo _remoteIp;
public HdHomerunManager(ISocketFactory socketFactory) private ILogger _logger;
public HdHomerunManager(ISocketFactory socketFactory, ILogger logger)
{ {
_socketFactory = socketFactory; _socketFactory = socketFactory;
_logger = logger;
} }
public void Dispose() public void Dispose()
@ -140,6 +144,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_lockkey = (uint)rand.Next(); _lockkey = (uint)rand.Next();
} }
var lockKeyValue = _lockkey.Value;
var ipEndPoint = new IpEndPointInfo(_remoteIp, HdHomeRunPort); var ipEndPoint = new IpEndPointInfo(_remoteIp, HdHomeRunPort);
for (int i = 0; i < numTuners; ++i) for (int i = 0; i < numTuners; ++i)
@ -148,7 +154,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
continue; continue;
_activeTuner = i; _activeTuner = i;
var lockKeyString = String.Format("{0:d}", _lockkey.Value); var lockKeyString = String.Format("{0:d}", lockKeyValue);
var lockkeyMsg = CreateSetMessage(i, "lockkey", lockKeyString, null); var lockkeyMsg = CreateSetMessage(i, "lockkey", lockKeyString, null);
await tcpClient.SendToAsync(lockkeyMsg, 0, lockkeyMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false); await tcpClient.SendToAsync(lockkeyMsg, 0, lockkeyMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false);
var response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); var response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
@ -160,27 +166,27 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var commandList = commands.GetCommands(); var commandList = commands.GetCommands();
foreach(Tuple<string,string> command in commandList) foreach(Tuple<string,string> command in commandList)
{ {
var channelMsg = CreateSetMessage(i, command.Item1, command.Item2, _lockkey.Value); var channelMsg = CreateSetMessage(i, command.Item1, command.Item2, lockKeyValue);
await tcpClient.SendToAsync(channelMsg, 0, channelMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false); await tcpClient.SendToAsync(channelMsg, 0, channelMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false);
response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
// parse response to make sure it worked // parse response to make sure it worked
if (!ParseReturnMessage(response.Buffer, response.ReceivedBytes, out returnVal)) if (!ParseReturnMessage(response.Buffer, response.ReceivedBytes, out returnVal))
{ {
await ReleaseLockkey(tcpClient).ConfigureAwait(false); await ReleaseLockkey(tcpClient, lockKeyValue).ConfigureAwait(false);
continue; continue;
} }
} }
var targetValue = String.Format("rtp://{0}:{1}", localIp, localPort); var targetValue = String.Format("rtp://{0}:{1}", localIp, localPort);
var targetMsg = CreateSetMessage(i, "target", targetValue, _lockkey.Value); var targetMsg = CreateSetMessage(i, "target", targetValue, lockKeyValue);
await tcpClient.SendToAsync(targetMsg, 0, targetMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false); await tcpClient.SendToAsync(targetMsg, 0, targetMsg.Length, ipEndPoint, cancellationToken).ConfigureAwait(false);
response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
// parse response to make sure it worked // parse response to make sure it worked
if (!ParseReturnMessage(response.Buffer, response.ReceivedBytes, out returnVal)) if (!ParseReturnMessage(response.Buffer, response.ReceivedBytes, out returnVal))
{ {
await ReleaseLockkey(tcpClient).ConfigureAwait(false); await ReleaseLockkey(tcpClient, lockKeyValue).ConfigureAwait(false);
continue; continue;
} }
@ -201,7 +207,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
foreach (Tuple<string, string> command in commandList) foreach (Tuple<string, string> command in commandList)
{ {
var channelMsg = CreateSetMessage(_activeTuner, command.Item1, command.Item2, _lockkey.Value); var channelMsg = CreateSetMessage(_activeTuner, command.Item1, command.Item2, _lockkey);
await tcpClient.SendToAsync(channelMsg, 0, channelMsg.Length, new IpEndPointInfo(_remoteIp, HdHomeRunPort), cancellationToken).ConfigureAwait(false); await tcpClient.SendToAsync(channelMsg, 0, channelMsg.Length, new IpEndPointInfo(_remoteIp, HdHomeRunPort), cancellationToken).ConfigureAwait(false);
var response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false); var response = await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, cancellationToken).ConfigureAwait(false);
// parse response to make sure it worked // parse response to make sure it worked
@ -216,24 +222,28 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
public async Task StopStreaming() public async Task StopStreaming()
{ {
if (!_lockkey.HasValue) var lockKey = _lockkey;
if (!lockKey.HasValue)
return; return;
using (var socket = _socketFactory.CreateTcpSocket(_remoteIp, HdHomeRunPort)) using (var socket = _socketFactory.CreateTcpSocket(_remoteIp, HdHomeRunPort))
{ {
await ReleaseLockkey(socket).ConfigureAwait(false); await ReleaseLockkey(socket, lockKey.Value).ConfigureAwait(false);
} }
} }
private async Task ReleaseLockkey(ISocket tcpClient) private async Task ReleaseLockkey(ISocket tcpClient, uint lockKeyValue)
{ {
var releaseTarget = CreateSetMessage(_activeTuner, "target", "none", _lockkey); _logger.Info("HdHomerunManager.ReleaseLockkey {0}", lockKeyValue);
var releaseTarget = CreateSetMessage(_activeTuner, "target", "none", lockKeyValue);
await tcpClient.SendToAsync(releaseTarget, 0, releaseTarget.Length, new IpEndPointInfo(_remoteIp, HdHomeRunPort), CancellationToken.None).ConfigureAwait(false); await tcpClient.SendToAsync(releaseTarget, 0, releaseTarget.Length, new IpEndPointInfo(_remoteIp, HdHomeRunPort), CancellationToken.None).ConfigureAwait(false);
var receiveBuffer = new byte[8192]; var receiveBuffer = new byte[8192];
await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, CancellationToken.None).ConfigureAwait(false); await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, CancellationToken.None).ConfigureAwait(false);
var releaseKeyMsg = CreateSetMessage(_activeTuner, "lockkey", "none", _lockkey); var releaseKeyMsg = CreateSetMessage(_activeTuner, "lockkey", "none", lockKeyValue);
_lockkey = null; _lockkey = null;
await tcpClient.SendToAsync(releaseKeyMsg, 0, releaseKeyMsg.Length, new IpEndPointInfo(_remoteIp, HdHomeRunPort), CancellationToken.None).ConfigureAwait(false); await tcpClient.SendToAsync(releaseKeyMsg, 0, releaseKeyMsg.Length, new IpEndPointInfo(_remoteIp, HdHomeRunPort), CancellationToken.None).ConfigureAwait(false);
await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, CancellationToken.None).ConfigureAwait(false); await tcpClient.ReceiveAsync(receiveBuffer, 0, receiveBuffer.Length, CancellationToken.None).ConfigureAwait(false);

@ -1,23 +1,16 @@
using System; using System;
using System.Collections.Generic;
using System.IO; using System.IO;
using System.Linq;
using System.Text;
using System.Threading; using System.Threading;
using System.Threading.Tasks; using System.Threading.Tasks;
using Emby.Server.Implementations.IO;
using MediaBrowser.Common.Net; using MediaBrowser.Common.Net;
using MediaBrowser.Controller; using MediaBrowser.Controller;
using MediaBrowser.Controller.Library; using MediaBrowser.Controller.Library;
using MediaBrowser.Controller.LiveTv;
using MediaBrowser.Model.Dto; using MediaBrowser.Model.Dto;
using MediaBrowser.Model.IO; using MediaBrowser.Model.IO;
using MediaBrowser.Model.Logging; using MediaBrowser.Model.Logging;
using MediaBrowser.Model.MediaInfo; using MediaBrowser.Model.MediaInfo;
using MediaBrowser.Model.Net; using MediaBrowser.Model.Net;
using MediaBrowser.Model.System; using MediaBrowser.Model.System;
using System.Globalization;
using MediaBrowser.Controller.IO;
namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
{ {
@ -26,7 +19,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
private readonly IServerApplicationHost _appHost; private readonly IServerApplicationHost _appHost;
private readonly ISocketFactory _socketFactory; private readonly ISocketFactory _socketFactory;
private readonly TaskCompletionSource<bool> _liveStreamTaskCompletionSource = new TaskCompletionSource<bool>();
private readonly IHdHomerunChannelCommands _channelCommands; private readonly IHdHomerunChannelCommands _channelCommands;
private readonly int _numTuners; private readonly int _numTuners;
private readonly INetworkManager _networkManager; private readonly INetworkManager _networkManager;
@ -42,7 +34,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
_numTuners = numTuners; _numTuners = numTuners;
} }
protected override Task OpenInternal(CancellationToken openCancellationToken) public override async Task Open(CancellationToken openCancellationToken)
{ {
LiveStreamCancellationTokenSource.Token.ThrowIfCancellationRequested(); LiveStreamCancellationTokenSource.Token.ThrowIfCancellationRequested();
@ -51,11 +43,53 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
var uri = new Uri(mediaSource.Path); var uri = new Uri(mediaSource.Path);
var localPort = _networkManager.GetRandomUnusedUdpPort(); var localPort = _networkManager.GetRandomUnusedUdpPort();
FileSystem.CreateDirectory(FileSystem.GetDirectoryName(TempFilePath));
Logger.Info("Opening HDHR UDP Live stream from {0}", uri.Host); Logger.Info("Opening HDHR UDP Live stream from {0}", uri.Host);
var remoteAddress = _networkManager.ParseIpAddress(uri.Host);
IpAddressInfo localAddress = null;
using (var tcpSocket = _socketFactory.CreateSocket(remoteAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp, false))
{
try
{
tcpSocket.Connect(new IpEndPointInfo(remoteAddress, HdHomerunManager.HdHomeRunPort));
localAddress = tcpSocket.LocalEndPoint.IpAddress;
tcpSocket.Close();
}
catch (Exception)
{
Logger.Error("Unable to determine local ip address for Legacy HDHomerun stream.");
return;
}
}
var udpClient = _socketFactory.CreateUdpSocket(localPort);
var hdHomerunManager = new HdHomerunManager(_socketFactory, Logger);
try
{
// send url to start streaming
await hdHomerunManager.StartStreaming(remoteAddress, localAddress, localPort, _channelCommands, _numTuners, openCancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
using (udpClient)
{
using (hdHomerunManager)
{
if (!(ex is OperationCanceledException))
{
Logger.ErrorException("Error opening live stream:", ex);
}
throw;
}
}
}
var taskCompletionSource = new TaskCompletionSource<bool>(); var taskCompletionSource = new TaskCompletionSource<bool>();
StartStreaming(uri.Host, localPort, taskCompletionSource, LiveStreamCancellationTokenSource.Token); StartStreaming(udpClient, hdHomerunManager, remoteAddress, localAddress, localPort, taskCompletionSource, LiveStreamCancellationTokenSource.Token);
//OpenedMediaSource.Protocol = MediaProtocol.File; //OpenedMediaSource.Protocol = MediaProtocol.File;
//OpenedMediaSource.Path = tempFile; //OpenedMediaSource.Path = tempFile;
@ -67,86 +101,47 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
//OpenedMediaSource.SupportsDirectStream = true; //OpenedMediaSource.SupportsDirectStream = true;
//OpenedMediaSource.SupportsTranscoding = true; //OpenedMediaSource.SupportsTranscoding = true;
return taskCompletionSource.Task;
//await Task.Delay(5000).ConfigureAwait(false); //await Task.Delay(5000).ConfigureAwait(false);
await taskCompletionSource.Task.ConfigureAwait(false);
} }
public override Task Close() public override void Close()
{ {
Logger.Info("Closing HDHR UDP live stream"); Logger.Info("Closing HDHR UDP live stream");
LiveStreamCancellationTokenSource.Cancel(); LiveStreamCancellationTokenSource.Cancel();
return _liveStreamTaskCompletionSource.Task;
} }
private Task StartStreaming(string remoteIp, int localPort, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) private Task StartStreaming(ISocket udpClient, HdHomerunManager hdHomerunManager, IpAddressInfo remoteAddress, IpAddressInfo localAddress, int localPort, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{ {
return Task.Run(async () => return Task.Run(async () =>
{ {
var isFirstAttempt = true; using (udpClient)
using (var udpClient = _socketFactory.CreateUdpSocket(localPort))
{
using (var hdHomerunManager = new HdHomerunManager(_socketFactory))
{ {
var remoteAddress = _networkManager.ParseIpAddress(remoteIp); using (hdHomerunManager)
IpAddressInfo localAddress = null;
using (var tcpSocket = _socketFactory.CreateSocket(remoteAddress.AddressFamily, SocketType.Stream, ProtocolType.Tcp, false))
{ {
try try
{ {
tcpSocket.Connect(new IpEndPointInfo(remoteAddress, HdHomerunManager.HdHomeRunPort)); await CopyTo(udpClient, TempFilePath, openTaskCompletionSource, cancellationToken).ConfigureAwait(false);
localAddress = tcpSocket.LocalEndPoint.IpAddress;
tcpSocket.Close();
}
catch (Exception)
{
Logger.Error("Unable to determine local ip address for Legacy HDHomerun stream.");
return;
}
}
while (!cancellationToken.IsCancellationRequested)
{
try
{
// send url to start streaming
await hdHomerunManager.StartStreaming(remoteAddress, localAddress, localPort, _channelCommands, _numTuners, cancellationToken).ConfigureAwait(false);
Logger.Info("Opened HDHR UDP stream from {0}", remoteAddress);
if (!cancellationToken.IsCancellationRequested)
{
FileSystem.CreateDirectory(FileSystem.GetDirectoryName(TempFilePath));
using (var fileStream = FileSystem.GetFileStream(TempFilePath, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
{
CopyTo(udpClient, fileStream, openTaskCompletionSource, cancellationToken);
}
}
} }
catch (OperationCanceledException ex) catch (OperationCanceledException ex)
{ {
Logger.Info("HDHR UDP stream cancelled or timed out from {0}", remoteAddress); Logger.Info("HDHR UDP stream cancelled or timed out from {0}", remoteAddress);
openTaskCompletionSource.TrySetException(ex); openTaskCompletionSource.TrySetException(ex);
break;
} }
catch (Exception ex) catch (Exception ex)
{
if (isFirstAttempt)
{ {
Logger.ErrorException("Error opening live stream:", ex); Logger.ErrorException("Error opening live stream:", ex);
openTaskCompletionSource.TrySetException(ex); openTaskCompletionSource.TrySetException(ex);
break;
} }
Logger.ErrorException("Error copying live stream, will reopen", ex); try
{
await hdHomerunManager.StopStreaming().ConfigureAwait(false);
} }
catch
{
isFirstAttempt = false;
} }
await hdHomerunManager.StopStreaming().ConfigureAwait(false);
_liveStreamTaskCompletionSource.TrySetResult(true);
} }
} }
@ -163,24 +158,31 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
} }
private static int RtpHeaderBytes = 12; private static int RtpHeaderBytes = 12;
private void CopyTo(ISocket udpClient, Stream target, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken) private async Task CopyTo(ISocket udpClient, string file, TaskCompletionSource<bool> openTaskCompletionSource, CancellationToken cancellationToken)
{ {
var source = _socketFactory.CreateNetworkStream(udpClient, false);
var bufferSize = 81920; var bufferSize = 81920;
byte[] buffer = new byte[bufferSize]; byte[] buffer = new byte[bufferSize];
int read; int read;
var resolved = false; var resolved = false;
while ((read = source.Read(buffer, 0, buffer.Length)) != 0) using (var source = _socketFactory.CreateNetworkStream(udpClient, false))
{
using (var fileStream = FileSystem.GetFileStream(file, FileOpenMode.Create, FileAccessMode.Write, FileShareMode.Read, FileOpenOptions.None))
{
var currentCancellationToken = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken, new CancellationTokenSource(TimeSpan.FromSeconds(30)).Token).Token;
while ((read = await source.ReadAsync(buffer, 0, buffer.Length, currentCancellationToken).ConfigureAwait(false)) != 0)
{ {
cancellationToken.ThrowIfCancellationRequested(); cancellationToken.ThrowIfCancellationRequested();
currentCancellationToken = cancellationToken;
read -= RtpHeaderBytes; read -= RtpHeaderBytes;
if (read > 0) if (read > 0)
{ {
target.Write(buffer, RtpHeaderBytes, read); fileStream.Write(buffer, RtpHeaderBytes, read);
} }
if (!resolved) if (!resolved)
@ -190,6 +192,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun
} }
} }
} }
}
}
public class UdpClientStream : Stream public class UdpClientStream : Stream
{ {

@ -47,19 +47,13 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
TempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts"); TempFilePath = Path.Combine(appPaths.TranscodingTempPath, UniqueId + ".ts");
} }
public Task Open(CancellationToken cancellationToken) public virtual Task Open(CancellationToken openCancellationToken)
{
return OpenInternal(cancellationToken);
}
protected virtual Task OpenInternal(CancellationToken cancellationToken)
{ {
return Task.FromResult(true); return Task.FromResult(true);
} }
public virtual Task Close() public virtual void Close()
{ {
return Task.FromResult(true);
} }
protected Stream GetInputStream(string path, bool allowAsyncFileRead) protected Stream GetInputStream(string path, bool allowAsyncFileRead)
@ -76,6 +70,11 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts
protected async Task DeleteTempFile(string path, int retryCount = 0) protected async Task DeleteTempFile(string path, int retryCount = 0)
{ {
if (retryCount == 0)
{
Logger.Info("Deleting temp file {0}", path);
}
try try
{ {
FileSystem.DeleteFile(path); FileSystem.DeleteFile(path);

@ -59,8 +59,8 @@ namespace MediaBrowser.Controller.LiveTv
public interface ILiveStream public interface ILiveStream
{ {
Task Open(CancellationToken cancellationToken); Task Open(CancellationToken openCancellationToken);
Task Close(); void Close();
int ConsumerCount { get; } int ConsumerCount { get; }
string OriginalStreamId { get; set; } string OriginalStreamId { get; set; }
bool EnableStreamSharing { get; set; } bool EnableStreamSharing { get; set; }

@ -1,3 +1,3 @@
using System.Reflection; using System.Reflection;
[assembly: AssemblyVersion("3.2.33.19")] [assembly: AssemblyVersion("3.2.33.20")]

Loading…
Cancel
Save