re-organize file streaming

pull/1154/head
Luke Pulverenti 7 years ago
parent 29185eb9bd
commit a660aa001e

@ -2,6 +2,7 @@
using System.Net;
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;
using Emby.Common.Implementations.Networking;
using MediaBrowser.Model.Net;
using MediaBrowser.Model.Logging;
@ -59,7 +60,7 @@ namespace Emby.Common.Implementations.Net
#if NET46
Socket.Close();
#else
Socket.Dispose();
Socket.Dispose();
#endif
}
@ -96,6 +97,46 @@ namespace Emby.Common.Implementations.Net
_acceptor.StartAccept();
}
#if NET46
public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken)
{
var options = TransmitFileOptions.Disconnect | TransmitFileOptions.ReuseSocket | TransmitFileOptions.UseKernelApc;
var completionSource = new TaskCompletionSource<bool>();
var result = Socket.BeginSendFile(path, preBuffer, postBuffer, options, new AsyncCallback(FileSendCallback), new Tuple<Socket, string, TaskCompletionSource<bool>>(Socket, path, completionSource));
return completionSource.Task;
}
private void FileSendCallback(IAsyncResult ar)
{
// Retrieve the socket from the state object.
Tuple<Socket, string, TaskCompletionSource<bool>> data = (Tuple<Socket, string, TaskCompletionSource<bool>>)ar.AsyncState;
var client = data.Item1;
var path = data.Item2;
var taskCompletion = data.Item3;
// Complete sending the data to the remote device.
try {
client.EndSendFile(ar);
taskCompletion.TrySetResult(true);
}
catch(SocketException ex){
_logger.Info("Socket.SendFile failed for {0}. error code {1}", path, ex.SocketErrorCode);
taskCompletion.TrySetException(ex);
}catch(Exception ex){
taskCompletion.TrySetException(ex);
}
}
#else
public Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken)
{
throw new NotImplementedException();
}
#endif
public void Dispose()
{
Socket.Dispose();

@ -59,6 +59,7 @@ namespace Emby.Common.Implementations.Net
if (remotePort < 0) throw new ArgumentException("remotePort cannot be less than zero.", "remotePort");
var retVal = new Socket(AddressFamily.InterNetwork, System.Net.Sockets.SocketType.Stream, System.Net.Sockets.ProtocolType.Tcp);
try
{
retVal.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);

@ -613,7 +613,7 @@ namespace Emby.Server.Core
CertificatePath = GetCertificatePath(true);
Certificate = GetCertificate(CertificatePath);
HttpServer = HttpServerFactory.CreateServer(this, LogManager, ServerConfigurationManager, NetworkManager, MemoryStreamFactory, "Emby", "web/index.html", textEncoding, SocketFactory, CryptographyProvider, JsonSerializer, XmlSerializer, EnvironmentInfo, Certificate, SupportsDualModeSockets);
HttpServer = HttpServerFactory.CreateServer(this, LogManager, ServerConfigurationManager, NetworkManager, MemoryStreamFactory, "Emby", "web/index.html", textEncoding, SocketFactory, CryptographyProvider, JsonSerializer, XmlSerializer, EnvironmentInfo, Certificate, FileSystemManager, SupportsDualModeSockets);
HttpServer.GlobalResponse = LocalizationManager.GetLocalizedString("StartupEmbyServerIsLoading");
RegisterSingleInstance(HttpServer, false);
progress.Report(10);

@ -45,6 +45,7 @@ namespace Emby.Server.Core
IXmlSerializer xml,
IEnvironmentInfo environment,
ICertificate certificate,
IFileSystem fileSystem,
bool enableDualModeSockets)
{
var logger = logManager.GetLogger("HttpServer");
@ -65,7 +66,8 @@ namespace Emby.Server.Core
certificate,
new StreamFactory(),
GetParseFn,
enableDualModeSockets);
enableDualModeSockets,
fileSystem);
}
private static Func<string, object> GetParseFn(Type propertyType)

@ -84,6 +84,7 @@
<Compile Include="FileOrganization\NameUtils.cs" />
<Compile Include="FileOrganization\OrganizerScheduledTask.cs" />
<Compile Include="FileOrganization\TvFolderOrganizer.cs" />
<Compile Include="HttpServer\FileWriter.cs" />
<Compile Include="HttpServer\GetSwaggerResource.cs" />
<Compile Include="HttpServer\HttpListenerHost.cs" />
<Compile Include="HttpServer\HttpResultFactory.cs" />

@ -0,0 +1,188 @@
using System;
using System.Collections.Generic;
using System.Globalization;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.IO;
using MediaBrowser.Model.Logging;
using MediaBrowser.Model.Services;
namespace Emby.Server.Implementations.HttpServer
{
public class FileWriter : IHttpResult
{
private ILogger Logger { get; set; }
private string RangeHeader { get; set; }
private bool IsHeadRequest { get; set; }
private long RangeStart { get; set; }
private long RangeEnd { get; set; }
private long RangeLength { get; set; }
private long TotalContentLength { get; set; }
public Action OnComplete { get; set; }
public Action OnError { get; set; }
private static readonly CultureInfo UsCulture = new CultureInfo("en-US");
public List<Cookie> Cookies { get; private set; }
/// <summary>
/// The _options
/// </summary>
private readonly IDictionary<string, string> _options = new Dictionary<string, string>();
/// <summary>
/// Gets the options.
/// </summary>
/// <value>The options.</value>
public IDictionary<string, string> Headers
{
get { return _options; }
}
public string Path { get; set; }
public FileWriter(string path, string contentType, string rangeHeader, ILogger logger, IFileSystem fileSystem)
{
if (string.IsNullOrEmpty(contentType))
{
throw new ArgumentNullException("contentType");
}
Path = path;
Logger = logger;
RangeHeader = rangeHeader;
Headers["Content-Type"] = contentType;
TotalContentLength = fileSystem.GetFileInfo(path).Length;
if (string.IsNullOrWhiteSpace(rangeHeader))
{
Headers["Content-Length"] = TotalContentLength.ToString(UsCulture);
StatusCode = HttpStatusCode.OK;
}
else
{
Headers["Accept-Ranges"] = "bytes";
StatusCode = HttpStatusCode.PartialContent;
SetRangeValues();
}
Cookies = new List<Cookie>();
}
/// <summary>
/// Sets the range values.
/// </summary>
private void SetRangeValues()
{
var requestedRange = RequestedRanges[0];
// If the requested range is "0-", we can optimize by just doing a stream copy
if (!requestedRange.Value.HasValue)
{
RangeEnd = TotalContentLength - 1;
}
else
{
RangeEnd = requestedRange.Value.Value;
}
RangeStart = requestedRange.Key;
RangeLength = 1 + RangeEnd - RangeStart;
// Content-Length is the length of what we're serving, not the original content
Headers["Content-Length"] = RangeLength.ToString(UsCulture);
Headers["Content-Range"] = string.Format("bytes {0}-{1}/{2}", RangeStart, RangeEnd, TotalContentLength);
}
/// <summary>
/// The _requested ranges
/// </summary>
private List<KeyValuePair<long, long?>> _requestedRanges;
/// <summary>
/// Gets the requested ranges.
/// </summary>
/// <value>The requested ranges.</value>
protected List<KeyValuePair<long, long?>> RequestedRanges
{
get
{
if (_requestedRanges == null)
{
_requestedRanges = new List<KeyValuePair<long, long?>>();
// Example: bytes=0-,32-63
var ranges = RangeHeader.Split('=')[1].Split(',');
foreach (var range in ranges)
{
var vals = range.Split('-');
long start = 0;
long? end = null;
if (!string.IsNullOrEmpty(vals[0]))
{
start = long.Parse(vals[0], UsCulture);
}
if (!string.IsNullOrEmpty(vals[1]))
{
end = long.Parse(vals[1], UsCulture);
}
_requestedRanges.Add(new KeyValuePair<long, long?>(start, end));
}
}
return _requestedRanges;
}
}
public async Task WriteToAsync(IResponse response, CancellationToken cancellationToken)
{
try
{
// Headers only
if (IsHeadRequest)
{
return;
}
if (string.IsNullOrWhiteSpace(RangeHeader) || (RangeStart <= 0 && RangeEnd >= TotalContentLength - 1))
{
Logger.Info("Transmit file {0}", Path);
await response.TransmitFile(Path, 0, 0, cancellationToken).ConfigureAwait(false);
return;
}
await response.TransmitFile(Path, RangeStart, RangeEnd, cancellationToken).ConfigureAwait(false);
}
finally
{
if (OnComplete != null)
{
OnComplete();
}
}
}
public string ContentType { get; set; }
public IRequest RequestContext { get; set; }
public object Response { get; set; }
public int Status { get; set; }
public HttpStatusCode StatusCode
{
get { return (HttpStatusCode)Status; }
set { Status = (int)value; }
}
public string StatusDescription { get; set; }
}
}

@ -52,6 +52,7 @@ namespace Emby.Server.Implementations.HttpServer
private readonly ISocketFactory _socketFactory;
private readonly ICryptoProvider _cryptoProvider;
private readonly IFileSystem _fileSystem;
private readonly IJsonSerializer _jsonSerializer;
private readonly IXmlSerializer _xmlSerializer;
private readonly ICertificate _certificate;
@ -70,8 +71,7 @@ namespace Emby.Server.Implementations.HttpServer
ILogger logger,
IServerConfigurationManager config,
string serviceName,
string defaultRedirectPath, INetworkManager networkManager, IMemoryStreamFactory memoryStreamProvider, ITextEncoding textEncoding, ISocketFactory socketFactory, ICryptoProvider cryptoProvider, IJsonSerializer jsonSerializer, IXmlSerializer xmlSerializer, IEnvironmentInfo environment, ICertificate certificate, IStreamFactory streamFactory, Func<Type, Func<string, object>> funcParseFn, bool enableDualModeSockets)
: base()
string defaultRedirectPath, INetworkManager networkManager, IMemoryStreamFactory memoryStreamProvider, ITextEncoding textEncoding, ISocketFactory socketFactory, ICryptoProvider cryptoProvider, IJsonSerializer jsonSerializer, IXmlSerializer xmlSerializer, IEnvironmentInfo environment, ICertificate certificate, IStreamFactory streamFactory, Func<Type, Func<string, object>> funcParseFn, bool enableDualModeSockets, IFileSystem fileSystem)
{
Instance = this;
@ -89,6 +89,7 @@ namespace Emby.Server.Implementations.HttpServer
_streamFactory = streamFactory;
_funcParseFn = funcParseFn;
_enableDualModeSockets = enableDualModeSockets;
_fileSystem = fileSystem;
_config = config;
_logger = logger;
@ -226,7 +227,8 @@ namespace Emby.Server.Implementations.HttpServer
_cryptoProvider,
_streamFactory,
_enableDualModeSockets,
GetRequest);
GetRequest,
_fileSystem);
}
private IHttpRequest GetRequest(HttpListenerContext httpContext)

@ -474,10 +474,6 @@ namespace Emby.Server.Implementations.HttpServer
{
throw new ArgumentNullException("cacheKey");
}
if (options.ContentFactory == null)
{
throw new ArgumentNullException("factoryFn");
}
var key = cacheKey.ToString("N");
@ -560,30 +556,43 @@ namespace Emby.Server.Implementations.HttpServer
{
var rangeHeader = requestContext.Headers.Get("Range");
var stream = await factoryFn().ConfigureAwait(false);
if (!isHeadRequest && !string.IsNullOrWhiteSpace(options.Path) && options.FileShare == FileShareMode.Read)
{
return new FileWriter(options.Path, contentType, rangeHeader, _logger, _fileSystem)
{
OnComplete = options.OnComplete,
OnError = options.OnError
};
}
if (!string.IsNullOrEmpty(rangeHeader))
{
var stream = await factoryFn().ConfigureAwait(false);
return new RangeRequestWriter(rangeHeader, stream, contentType, isHeadRequest, _logger)
{
OnComplete = options.OnComplete
};
}
else
{
var stream = await factoryFn().ConfigureAwait(false);
responseHeaders["Content-Length"] = stream.Length.ToString(UsCulture);
responseHeaders["Content-Length"] = stream.Length.ToString(UsCulture);
if (isHeadRequest)
{
stream.Dispose();
if (isHeadRequest)
{
stream.Dispose();
return GetHttpResult(new byte[] { }, contentType, true);
}
return GetHttpResult(new byte[] { }, contentType, true);
}
return new StreamWriter(stream, contentType, _logger)
{
OnComplete = options.OnComplete,
OnError = options.OnError
};
return new StreamWriter(stream, contentType, _logger)
{
OnComplete = options.OnComplete,
OnError = options.OnError
};
}
}
using (var stream = await factoryFn().ConfigureAwait(false))

@ -27,10 +27,11 @@ namespace Emby.Server.Implementations.HttpServer.SocketSharp
private readonly ISocketFactory _socketFactory;
private readonly ICryptoProvider _cryptoProvider;
private readonly IStreamFactory _streamFactory;
private readonly IFileSystem _fileSystem;
private readonly Func<HttpListenerContext, IHttpRequest> _httpRequestFactory;
private readonly bool _enableDualMode;
public WebSocketSharpListener(ILogger logger, ICertificate certificate, IMemoryStreamFactory memoryStreamProvider, ITextEncoding textEncoding, INetworkManager networkManager, ISocketFactory socketFactory, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, bool enableDualMode, Func<HttpListenerContext, IHttpRequest> httpRequestFactory)
public WebSocketSharpListener(ILogger logger, ICertificate certificate, IMemoryStreamFactory memoryStreamProvider, ITextEncoding textEncoding, INetworkManager networkManager, ISocketFactory socketFactory, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, bool enableDualMode, Func<HttpListenerContext, IHttpRequest> httpRequestFactory, IFileSystem fileSystem)
{
_logger = logger;
_certificate = certificate;
@ -42,6 +43,7 @@ namespace Emby.Server.Implementations.HttpServer.SocketSharp
_streamFactory = streamFactory;
_enableDualMode = enableDualMode;
_httpRequestFactory = httpRequestFactory;
_fileSystem = fileSystem;
}
public Action<Exception, IRequest, bool> ErrorHandler { get; set; }
@ -54,7 +56,7 @@ namespace Emby.Server.Implementations.HttpServer.SocketSharp
public void Start(IEnumerable<string> urlPrefixes)
{
if (_listener == null)
_listener = new HttpListener(_logger, _cryptoProvider, _streamFactory, _socketFactory, _networkManager, _textEncoding, _memoryStreamProvider);
_listener = new HttpListener(_logger, _cryptoProvider, _streamFactory, _socketFactory, _networkManager, _textEncoding, _memoryStreamProvider, _fileSystem);
_listener.EnableDualMode = _enableDualMode;

@ -3,6 +3,9 @@ using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.IO;
using MediaBrowser.Model.Logging;
using SocketHttpListener.Net;
using HttpListenerResponse = SocketHttpListener.Net.HttpListenerResponse;
@ -189,5 +192,10 @@ namespace Emby.Server.Implementations.HttpServer.SocketSharp
public void ClearCookies()
{
}
public Task TransmitFile(string path, long offset, long count, CancellationToken cancellationToken)
{
return _response.TransmitFile(path, offset, count, cancellationToken);
}
}
}

@ -1276,6 +1276,14 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
return;
}
var registration = await _liveTvManager.GetRegistrationInfo("dvr").ConfigureAwait(false);
if (!registration.IsValid)
{
_logger.Warn("Emby Premiere required to use Emby DVR.");
OnTimerOutOfDate(timer);
return;
}
var activeRecordingInfo = new ActiveRecordingInfo
{
CancellationTokenSource = new CancellationTokenSource(),
@ -2319,6 +2327,9 @@ namespace Emby.Server.Implementations.LiveTv.EmbyTV
{
UpdateExistingTimerWithNewMetadata(existingTimer, timer);
// Needed by ShouldCancelTimerForSeriesTimer
timer.IsManual = existingTimer.IsManual;
if (ShouldCancelTimerForSeriesTimer(seriesTimer, timer))
{
existingTimer.Status = RecordingStatus.Cancelled;

@ -12,45 +12,6 @@ namespace Emby.Server.Implementations.Services
{
public static class ResponseHelper
{
private static async Task<bool> WriteToOutputStream(IResponse response, object result)
{
var asyncStreamWriter = result as IAsyncStreamWriter;
if (asyncStreamWriter != null)
{
await asyncStreamWriter.WriteToAsync(response.OutputStream, CancellationToken.None).ConfigureAwait(false);
return true;
}
var streamWriter = result as IStreamWriter;
if (streamWriter != null)
{
streamWriter.WriteTo(response.OutputStream);
return true;
}
var stream = result as Stream;
if (stream != null)
{
using (stream)
{
await stream.CopyToAsync(response.OutputStream).ConfigureAwait(false);
return true;
}
}
var bytes = result as byte[];
if (bytes != null)
{
response.ContentType = "application/octet-stream";
response.SetContentLength(bytes.Length);
await response.OutputStream.WriteAsync(bytes, 0, bytes.Length).ConfigureAwait(false);
return true;
}
return false;
}
public static Task WriteToResponse(IResponse httpRes, IRequest httpReq, object result)
{
if (result == null)
@ -155,6 +116,13 @@ namespace Emby.Server.Implementations.Services
return;
}
var fileWriter = result as FileWriter;
if (fileWriter != null)
{
await fileWriter.WriteToAsync(response, CancellationToken.None).ConfigureAwait(false);
return;
}
var stream = result as Stream;
if (stream != null)
{

@ -1,7 +1,7 @@
<?xml version="1.0" encoding="utf-8"?>
<packages>
<package id="Emby.XmlTv" version="1.0.7" targetFramework="portable45-net45+win8" />
<package id="MediaBrowser.Naming" version="1.0.4" targetFramework="portable45-net45+win8" />
<package id="MediaBrowser.Naming" version="1.0.5" targetFramework="portable45-net45+win8" />
<package id="SQLitePCL.pretty" version="1.1.0" targetFramework="portable45-net45+win8" />
<package id="SQLitePCLRaw.core" version="1.1.2" targetFramework="portable45-net45+win8" />
<package id="UniversalDetector" version="1.0.1" targetFramework="portable45-net45+win8" />

@ -206,6 +206,8 @@ namespace MediaBrowser.Controller.Entities
void SetImage(ItemImageInfo image, int index);
double? GetDefaultPrimaryImageAspectRatio();
int? ProductionYear { get; set; }
}
public static class HasImagesExtensions

@ -23,21 +23,18 @@ namespace MediaBrowser.Controller.Net
public Action OnComplete { get; set; }
public Action OnError { get; set; }
public string Path { get; set; }
public FileShareMode FileShare { get; set; }
public StaticResultOptions()
{
ResponseHeaders = new Dictionary<string, string>(StringComparer.OrdinalIgnoreCase);
FileShare = FileShareMode.Read;
}
}
public class StaticFileResultOptions : StaticResultOptions
{
public string Path { get; set; }
public FileShareMode FileShare { get; set; }
public StaticFileResultOptions()
{
FileShare = FileShareMode.Read;
}
}
}

@ -1,4 +1,6 @@
using System;
using System.Threading;
using System.Threading.Tasks;
namespace MediaBrowser.Model.Net
{
@ -13,6 +15,7 @@ namespace MediaBrowser.Model.Net
void Bind(IpEndPointInfo endpoint);
void Connect(IpEndPointInfo endPoint);
void StartAccept(Action<IAcceptSocket> onAccept, Func<bool> isClosed);
Task SendFile(string path, byte[] preBuffer, byte[] postBuffer, CancellationToken cancellationToken);
}
public class SocketCreateException : Exception

@ -8,4 +8,9 @@ namespace MediaBrowser.Model.Services
{
Task WriteToAsync(Stream responseStream, CancellationToken cancellationToken);
}
public interface IFileWriter
{
Task WriteToAsync(Stream responseStream, CancellationToken cancellationToken);
}
}

@ -2,6 +2,8 @@
using System.Collections.Generic;
using System.IO;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
namespace MediaBrowser.Model.Services
{
@ -151,6 +153,7 @@ namespace MediaBrowser.Model.Services
//Add Metadata to Response
Dictionary<string, object> Items { get; }
}
Task TransmitFile(string path, long offset, long count, CancellationToken cancellationToken);
}
}

@ -1,3 +1,3 @@
using System.Reflection;
[assembly: AssemblyVersion("3.2.7.2")]
[assembly: AssemblyVersion("3.2.7.3")]

@ -32,8 +32,9 @@ namespace SocketHttpListener.Net
private readonly ISocketFactory _socketFactory;
private readonly ITextEncoding _textEncoding;
private readonly IMemoryStreamFactory _memoryStreamFactory;
private readonly IFileSystem _fileSystem;
public EndPointListener(HttpListener listener, IpAddressInfo addr, int port, bool secure, ICertificate cert, ILogger logger, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, ISocketFactory socketFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding)
public EndPointListener(HttpListener listener, IpAddressInfo addr, int port, bool secure, ICertificate cert, ILogger logger, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, ISocketFactory socketFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem)
{
this.listener = listener;
_logger = logger;
@ -42,6 +43,7 @@ namespace SocketHttpListener.Net
_socketFactory = socketFactory;
_memoryStreamFactory = memoryStreamFactory;
_textEncoding = textEncoding;
_fileSystem = fileSystem;
this.secure = secure;
this.cert = cert;
@ -107,7 +109,7 @@ namespace SocketHttpListener.Net
return;
}
HttpConnection conn = await HttpConnection.Create(_logger, accepted, listener, listener.secure, listener.cert, _cryptoProvider, _streamFactory, _memoryStreamFactory, _textEncoding).ConfigureAwait(false);
HttpConnection conn = await HttpConnection.Create(_logger, accepted, listener, listener.secure, listener.cert, _cryptoProvider, _streamFactory, _memoryStreamFactory, _textEncoding, _fileSystem).ConfigureAwait(false);
//_logger.Debug("Adding unregistered connection to {0}. Id: {1}", accepted.RemoteEndPoint, connectionId);
lock (listener.unregistered)

@ -5,6 +5,7 @@ using System.Linq;
using System.Net;
using System.Reflection;
using System.Threading.Tasks;
using MediaBrowser.Model.IO;
using MediaBrowser.Model.Logging;
using MediaBrowser.Model.Net;
using SocketHttpListener.Primitives;
@ -105,7 +106,7 @@ namespace SocketHttpListener.Net
}
else
{
epl = new EndPointListener(listener, addr, port, secure, listener.Certificate, logger, listener.CryptoProvider, listener.StreamFactory, listener.SocketFactory, listener.MemoryStreamFactory, listener.TextEncoding);
epl = new EndPointListener(listener, addr, port, secure, listener.Certificate, logger, listener.CryptoProvider, listener.StreamFactory, listener.SocketFactory, listener.MemoryStreamFactory, listener.TextEncoding, listener.FileSystem);
p[port] = epl;
}

@ -35,13 +35,14 @@ namespace SocketHttpListener.Net
ICertificate cert;
Stream ssl_stream;
private ILogger _logger;
private readonly ILogger _logger;
private readonly ICryptoProvider _cryptoProvider;
private readonly IMemoryStreamFactory _memoryStreamFactory;
private readonly ITextEncoding _textEncoding;
private readonly IStreamFactory _streamFactory;
private readonly IFileSystem _fileSystem;
private HttpConnection(ILogger logger, IAcceptSocket sock, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding)
private HttpConnection(ILogger logger, IAcceptSocket sock, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem)
{
_logger = logger;
this.sock = sock;
@ -51,6 +52,7 @@ namespace SocketHttpListener.Net
_cryptoProvider = cryptoProvider;
_memoryStreamFactory = memoryStreamFactory;
_textEncoding = textEncoding;
_fileSystem = fileSystem;
_streamFactory = streamFactory;
}
@ -82,9 +84,9 @@ namespace SocketHttpListener.Net
Init();
}
public static async Task<HttpConnection> Create(ILogger logger, IAcceptSocket sock, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding)
public static async Task<HttpConnection> Create(ILogger logger, IAcceptSocket sock, EndPointListener epl, bool secure, ICertificate cert, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem)
{
var connection = new HttpConnection(logger, sock, epl, secure, cert, cryptoProvider, streamFactory, memoryStreamFactory, textEncoding);
var connection = new HttpConnection(logger, sock, epl, secure, cert, cryptoProvider, streamFactory, memoryStreamFactory, textEncoding, fileSystem);
await connection.InitStream().ConfigureAwait(false);
@ -121,7 +123,7 @@ namespace SocketHttpListener.Net
position = 0;
input_state = InputState.RequestLine;
line_state = LineState.None;
context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding);
context = new HttpListenerContext(this, _logger, _cryptoProvider, _memoryStreamFactory, _textEncoding, _fileSystem);
}
public bool IsClosed
@ -213,7 +215,9 @@ namespace SocketHttpListener.Net
if (context.Response.SendChunked || isExpect100Continue || context.Request.IsWebSocketRequest || true)
{
o_stream = new ResponseStream(stream, context.Response, _memoryStreamFactory, _textEncoding);
var supportsDirectSocketAccess = !context.Response.SendChunked && !isExpect100Continue && !secure;
o_stream = new ResponseStream(stream, context.Response, _memoryStreamFactory, _textEncoding, _fileSystem, sock, supportsDirectSocketAccess);
}
else
{

@ -18,6 +18,7 @@ namespace SocketHttpListener.Net
internal ICryptoProvider CryptoProvider { get; private set; }
internal IStreamFactory StreamFactory { get; private set; }
internal ISocketFactory SocketFactory { get; private set; }
internal IFileSystem FileSystem { get; private set; }
internal ITextEncoding TextEncoding { get; private set; }
internal IMemoryStreamFactory MemoryStreamFactory { get; private set; }
internal INetworkManager NetworkManager { get; private set; }
@ -39,7 +40,7 @@ namespace SocketHttpListener.Net
public Action<HttpListenerContext> OnContext { get; set; }
public HttpListener(ILogger logger, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, ISocketFactory socketFactory, INetworkManager networkManager, ITextEncoding textEncoding, IMemoryStreamFactory memoryStreamFactory)
public HttpListener(ILogger logger, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, ISocketFactory socketFactory, INetworkManager networkManager, ITextEncoding textEncoding, IMemoryStreamFactory memoryStreamFactory, IFileSystem fileSystem)
{
_logger = logger;
CryptoProvider = cryptoProvider;
@ -48,19 +49,20 @@ namespace SocketHttpListener.Net
NetworkManager = networkManager;
TextEncoding = textEncoding;
MemoryStreamFactory = memoryStreamFactory;
FileSystem = fileSystem;
prefixes = new HttpListenerPrefixCollection(logger, this);
registry = new Dictionary<HttpListenerContext, HttpListenerContext>();
connections = new Dictionary<HttpConnection, HttpConnection>();
auth_schemes = AuthenticationSchemes.Anonymous;
}
public HttpListener(ICertificate certificate, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, ISocketFactory socketFactory, INetworkManager networkManager, ITextEncoding textEncoding, IMemoryStreamFactory memoryStreamFactory)
:this(new NullLogger(), certificate, cryptoProvider, streamFactory, socketFactory, networkManager, textEncoding, memoryStreamFactory)
public HttpListener(ICertificate certificate, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, ISocketFactory socketFactory, INetworkManager networkManager, ITextEncoding textEncoding, IMemoryStreamFactory memoryStreamFactory, IFileSystem fileSystem)
:this(new NullLogger(), certificate, cryptoProvider, streamFactory, socketFactory, networkManager, textEncoding, memoryStreamFactory, fileSystem)
{
}
public HttpListener(ILogger logger, ICertificate certificate, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, ISocketFactory socketFactory, INetworkManager networkManager, ITextEncoding textEncoding, IMemoryStreamFactory memoryStreamFactory)
: this(logger, cryptoProvider, streamFactory, socketFactory, networkManager, textEncoding, memoryStreamFactory)
public HttpListener(ILogger logger, ICertificate certificate, ICryptoProvider cryptoProvider, IStreamFactory streamFactory, ISocketFactory socketFactory, INetworkManager networkManager, ITextEncoding textEncoding, IMemoryStreamFactory memoryStreamFactory, IFileSystem fileSystem)
: this(logger, cryptoProvider, streamFactory, socketFactory, networkManager, textEncoding, memoryStreamFactory, fileSystem)
{
_certificate = certificate;
}

@ -18,20 +18,18 @@ namespace SocketHttpListener.Net
HttpConnection cnc;
string error;
int err_status = 400;
private readonly ILogger _logger;
private readonly ICryptoProvider _cryptoProvider;
private readonly IMemoryStreamFactory _memoryStreamFactory;
private readonly ITextEncoding _textEncoding;
internal HttpListenerContext(HttpConnection cnc, ILogger logger, ICryptoProvider cryptoProvider, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding)
internal HttpListenerContext(HttpConnection cnc, ILogger logger, ICryptoProvider cryptoProvider, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem)
{
this.cnc = cnc;
_logger = logger;
_cryptoProvider = cryptoProvider;
_memoryStreamFactory = memoryStreamFactory;
_textEncoding = textEncoding;
request = new HttpListenerRequest(this, _textEncoding);
response = new HttpListenerResponse(this, _logger, _textEncoding);
response = new HttpListenerResponse(this, logger, _textEncoding, fileSystem);
}
internal int ErrorStatus

@ -3,6 +3,9 @@ using System.Globalization;
using System.IO;
using System.Net;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.IO;
using MediaBrowser.Model.Logging;
using MediaBrowser.Model.Text;
using SocketHttpListener.Primitives;
@ -32,12 +35,14 @@ namespace SocketHttpListener.Net
private readonly ILogger _logger;
private readonly ITextEncoding _textEncoding;
private readonly IFileSystem _fileSystem;
internal HttpListenerResponse(HttpListenerContext context, ILogger logger, ITextEncoding textEncoding)
internal HttpListenerResponse(HttpListenerContext context, ILogger logger, ITextEncoding textEncoding, IFileSystem fileSystem)
{
this.context = context;
_logger = logger;
_textEncoding = textEncoding;
_fileSystem = fileSystem;
}
internal bool CloseConnection
@ -366,7 +371,7 @@ namespace SocketHttpListener.Net
{
if (chunked)
{
return ;
return;
}
Version v = context.Request.ProtocolVersion;
@ -509,5 +514,10 @@ namespace SocketHttpListener.Net
cookies.Add(cookie);
}
public Task TransmitFile(string path, long offset, long count, CancellationToken cancellationToken)
{
return ((ResponseStream)OutputStream).TransmitFile(path, offset, count, cancellationToken);
}
}
}

@ -5,6 +5,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using MediaBrowser.Model.IO;
using MediaBrowser.Model.Net;
using MediaBrowser.Model.Text;
using SocketHttpListener.Primitives;
@ -22,12 +23,18 @@ namespace SocketHttpListener.Net
Stream stream;
private readonly IMemoryStreamFactory _memoryStreamFactory;
private readonly ITextEncoding _textEncoding;
private readonly IFileSystem _fileSystem;
private readonly IAcceptSocket _socket;
private readonly bool _supportsDirectSocketAccess;
internal ResponseStream(Stream stream, HttpListenerResponse response, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding)
internal ResponseStream(Stream stream, HttpListenerResponse response, IMemoryStreamFactory memoryStreamFactory, ITextEncoding textEncoding, IFileSystem fileSystem, IAcceptSocket socket, bool supportsDirectSocketAccess)
{
this.response = response;
_memoryStreamFactory = memoryStreamFactory;
_textEncoding = textEncoding;
_fileSystem = fileSystem;
_socket = socket;
_supportsDirectSocketAccess = supportsDirectSocketAccess;
this.stream = stream;
}
@ -299,5 +306,80 @@ namespace SocketHttpListener.Net
{
throw new NotSupportedException();
}
public Task TransmitFile(string path, long offset, long count, CancellationToken cancellationToken)
{
//if (_supportsDirectSocketAccess && offset == 0 && count == 0 && !response.SendChunked)
//{
// return TransmitFileOverSocket(path, offset, count, cancellationToken);
//}
return TransmitFileManaged(path, offset, count, cancellationToken);
}
private readonly byte[] _emptyBuffer = new byte[] { };
private async Task TransmitFileOverSocket(string path, long offset, long count, CancellationToken cancellationToken)
{
MemoryStream ms = GetHeaders(response, _memoryStreamFactory, false);
var buffer = new byte[] {};
if (ms != null)
{
ms.Position = 0;
byte[] msBuffer;
_memoryStreamFactory.TryGetBuffer(ms, out msBuffer);
buffer = msBuffer;
}
await _socket.SendFile(path, buffer, _emptyBuffer, cancellationToken).ConfigureAwait(false);
}
private async Task TransmitFileManaged(string path, long offset, long count, CancellationToken cancellationToken)
{
var chunked = response.SendChunked;
if (!chunked)
{
await WriteAsync(_emptyBuffer, 0, 0, cancellationToken).ConfigureAwait(false);
}
using (var fs = _fileSystem.GetFileStream(path, FileOpenMode.Open, FileAccessMode.Read, FileShareMode.Read, true))
{
if (offset > 0)
{
fs.Position = offset;
}
var targetStream = chunked ? this : stream;
if (count > 0)
{
await CopyToInternalAsync(fs, targetStream, count, cancellationToken).ConfigureAwait(false);
}
else
{
await fs.CopyToAsync(targetStream, 81920, cancellationToken).ConfigureAwait(false);
}
}
}
private async Task CopyToInternalAsync(Stream source, Stream destination, long copyLength, CancellationToken cancellationToken)
{
var array = new byte[81920];
int count;
while ((count = await source.ReadAsync(array, 0, array.Length, cancellationToken).ConfigureAwait(false)) != 0)
{
var bytesToCopy = Math.Min(count, copyLength);
await destination.WriteAsync(array, 0, Convert.ToInt32(bytesToCopy), cancellationToken).ConfigureAwait(false);
copyLength -= bytesToCopy;
if (copyLength <= 0)
{
break;
}
}
}
}
}

Loading…
Cancel
Save