diff --git a/Emby.Server.Implementations/Emby.Server.Implementations.csproj b/Emby.Server.Implementations/Emby.Server.Implementations.csproj index 719510fc39..bcda149d63 100644 --- a/Emby.Server.Implementations/Emby.Server.Implementations.csproj +++ b/Emby.Server.Implementations/Emby.Server.Implementations.csproj @@ -129,7 +129,6 @@ - diff --git a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs b/Emby.Server.Implementations/IO/AsyncStreamCopier.cs deleted file mode 100644 index 9e5ce0604c..0000000000 --- a/Emby.Server.Implementations/IO/AsyncStreamCopier.cs +++ /dev/null @@ -1,459 +0,0 @@ -using System; -using System.IO; -using System.Threading; -using System.Threading.Tasks; - -namespace Emby.Server.Implementations.IO -{ - public class AsyncStreamCopier : IDisposable - { - // size in bytes of the buffers in the buffer pool - private const int DefaultBufferSize = 81920; - private readonly int _bufferSize; - // number of buffers in the pool - private const int DefaultBufferCount = 4; - private readonly int _bufferCount; - - // indexes of the next buffer to read into/write from - private int _nextReadBuffer = -1; - private int _nextWriteBuffer = -1; - - // the buffer pool, implemented as an array, and used in a cyclic way - private readonly byte[][] _buffers; - // sizes in bytes of the available (read) data in the buffers - private readonly int[] _sizes; - // the streams... - private Stream _source; - private Stream _target; - private readonly bool _closeStreamsOnEnd; - - // number of buffers that are ready to be written - private int _buffersToWrite; - // flag indicating that there is still a read operation to be scheduled - // (source end of stream not reached) - private volatile bool _moreDataToRead; - // the result of the whole operation, returned by BeginCopy() - private AsyncResult _asyncResult; - // any exception that occurs during an async operation - // stored here for rethrow - private Exception _exception; - - public TaskCompletionSource TaskCompletionSource; - private long _bytesToRead; - private long _totalBytesWritten; - private CancellationToken _cancellationToken; - public int IndividualReadOffset = 0; - - public AsyncStreamCopier(Stream source, - Stream target, - long bytesToRead, - CancellationToken cancellationToken, - bool closeStreamsOnEnd = false, - int bufferSize = DefaultBufferSize, - int bufferCount = DefaultBufferCount) - { - if (source == null) - throw new ArgumentNullException("source"); - if (target == null) - throw new ArgumentNullException("target"); - if (!source.CanRead) - throw new ArgumentException("Cannot copy from a non-readable stream."); - if (!target.CanWrite) - throw new ArgumentException("Cannot copy to a non-writable stream."); - _source = source; - _target = target; - _moreDataToRead = true; - _closeStreamsOnEnd = closeStreamsOnEnd; - _bufferSize = bufferSize; - _bufferCount = bufferCount; - _buffers = new byte[_bufferCount][]; - _sizes = new int[_bufferCount]; - _bytesToRead = bytesToRead; - _cancellationToken = cancellationToken; - } - - ~AsyncStreamCopier() - { - // ensure any exception cannot be ignored - ThrowExceptionIfNeeded(); - } - - public static Task CopyStream(Stream source, Stream target, int bufferSize, int bufferCount, CancellationToken cancellationToken) - { - return CopyStream(source, target, 0, bufferSize, bufferCount, cancellationToken); - } - - public static Task CopyStream(Stream source, Stream target, long size, int bufferSize, int bufferCount, CancellationToken cancellationToken) - { - var copier = new AsyncStreamCopier(source, target, size, cancellationToken, false, bufferSize, bufferCount); - var taskCompletion = new TaskCompletionSource(); - - copier.TaskCompletionSource = taskCompletion; - - var result = copier.BeginCopy(StreamCopyCallback, copier); - - if (result.CompletedSynchronously) - { - StreamCopyCallback(result); - } - - cancellationToken.Register(() => taskCompletion.TrySetCanceled()); - - return taskCompletion.Task; - } - - private static void StreamCopyCallback(IAsyncResult result) - { - var copier = (AsyncStreamCopier)result.AsyncState; - var taskCompletion = copier.TaskCompletionSource; - - try - { - copier.EndCopy(result); - taskCompletion.TrySetResult(copier._totalBytesWritten); - } - catch (Exception ex) - { - taskCompletion.TrySetException(ex); - } - } - - public void Dispose() - { - if (_asyncResult != null) - _asyncResult.Dispose(); - if (_closeStreamsOnEnd) - { - if (_source != null) - { - _source.Dispose(); - _source = null; - } - if (_target != null) - { - _target.Dispose(); - _target = null; - } - } - GC.SuppressFinalize(this); - ThrowExceptionIfNeeded(); - } - - public IAsyncResult BeginCopy(AsyncCallback callback, object state) - { - // avoid concurrent start of the copy on separate threads - if (Interlocked.CompareExchange(ref _asyncResult, new AsyncResult(callback, state), null) != null) - throw new InvalidOperationException("A copy operation has already been started on this object."); - // allocate buffers - for (int i = 0; i < _bufferCount; i++) - _buffers[i] = new byte[_bufferSize]; - - // we pass false to BeginRead() to avoid completing the async result - // immediately which would result in invoking the callback - // when the method fails synchronously - BeginRead(false); - // throw exception synchronously if there is one - ThrowExceptionIfNeeded(); - return _asyncResult; - } - - public void EndCopy(IAsyncResult ar) - { - if (ar != _asyncResult) - throw new InvalidOperationException("Invalid IAsyncResult object."); - - if (!_asyncResult.IsCompleted) - _asyncResult.AsyncWaitHandle.WaitOne(); - - if (_closeStreamsOnEnd) - { - _source.Close(); - _source = null; - _target.Close(); - _target = null; - } - - //_logger.Info("AsyncStreamCopier {0} bytes requested. {1} bytes transferred", _bytesToRead, _totalBytesWritten); - ThrowExceptionIfNeeded(); - } - - /// - /// Here we'll throw a pending exception if there is one, - /// and remove it from our instance, so we know it has been consumed. - /// - private void ThrowExceptionIfNeeded() - { - if (_exception != null) - { - var exception = _exception; - _exception = null; - throw exception; - } - } - - private void BeginRead(bool completeOnError = true) - { - if (!_moreDataToRead) - { - return; - } - if (_asyncResult.IsCompleted) - return; - int bufferIndex = Interlocked.Increment(ref _nextReadBuffer) % _bufferCount; - - try - { - _source.BeginRead(_buffers[bufferIndex], 0, _bufferSize, EndRead, bufferIndex); - } - catch (Exception exception) - { - _exception = exception; - if (completeOnError) - _asyncResult.Complete(false); - } - } - - private void BeginWrite() - { - if (_asyncResult.IsCompleted) - return; - // this method can actually be called concurrently!! - // indeed, let's say we call a BeginWrite, and the thread gets interrupted - // just after making the IO request. - // At that moment, the thread is still in the method. And then the IO request - // ends (extremely fast io, or caching...), EndWrite gets called - // on another thread, and calls BeginWrite again! There we have it! - // That is the reason why an Interlocked is needed here. - int bufferIndex = Interlocked.Increment(ref _nextWriteBuffer) % _bufferCount; - - try - { - int bytesToWrite; - if (_bytesToRead > 0) - { - var bytesLeftToWrite = _bytesToRead - _totalBytesWritten; - bytesToWrite = Convert.ToInt32(Math.Min(_sizes[bufferIndex], bytesLeftToWrite)); - } - else - { - bytesToWrite = _sizes[bufferIndex]; - } - - _target.BeginWrite(_buffers[bufferIndex], IndividualReadOffset, bytesToWrite - IndividualReadOffset, EndWrite, null); - - _totalBytesWritten += bytesToWrite; - } - catch (Exception exception) - { - _exception = exception; - _asyncResult.Complete(false); - } - } - - private void EndRead(IAsyncResult ar) - { - try - { - int read = _source.EndRead(ar); - _moreDataToRead = read > 0; - var bufferIndex = (int)ar.AsyncState; - _sizes[bufferIndex] = read; - } - catch (Exception exception) - { - _exception = exception; - _asyncResult.Complete(false); - return; - } - - if (_moreDataToRead && !_cancellationToken.IsCancellationRequested) - { - int usedBuffers = Interlocked.Increment(ref _buffersToWrite); - // if we incremented from zero to one, then it means we just - // added the single buffer to write, so a writer could not - // be busy, and we have to schedule one. - if (usedBuffers == 1) - BeginWrite(); - // test if there is at least a free buffer, and schedule - // a read, as we have read some data - if (usedBuffers < _bufferCount) - BeginRead(); - } - else - { - // we did not add a buffer, because no data was read, and - // there is no buffer left to write so this is the end... - if (Thread.VolatileRead(ref _buffersToWrite) == 0) - { - _asyncResult.Complete(false); - } - } - } - - private void EndWrite(IAsyncResult ar) - { - try - { - _target.EndWrite(ar); - } - catch (Exception exception) - { - _exception = exception; - _asyncResult.Complete(false); - return; - } - - int buffersLeftToWrite = Interlocked.Decrement(ref _buffersToWrite); - // no reader could be active if all buffers were full of data waiting to be written - bool noReaderIsBusy = buffersLeftToWrite == _bufferCount - 1; - // note that it is possible that both a reader and - // a writer see the end of the copy and call Complete - // on the _asyncResult object. That race condition is handled by - // Complete that ensures it is only executed fully once. - - long bytesLeftToWrite; - if (_bytesToRead > 0) - { - bytesLeftToWrite = _bytesToRead - _totalBytesWritten; - } - else - { - bytesLeftToWrite = 1; - } - - if (!_moreDataToRead || bytesLeftToWrite <= 0 || _cancellationToken.IsCancellationRequested) - { - // at this point we know no reader can schedule a read or write - if (Thread.VolatileRead(ref _buffersToWrite) == 0) - { - // nothing left to write, so it is the end - _asyncResult.Complete(false); - return; - } - } - else - // here, we know we have something left to read, - // so schedule a read if no read is busy - if (noReaderIsBusy) - BeginRead(); - - // also schedule a write if we are sure we did not write the last buffer - // note that if buffersLeftToWrite is zero and a reader has put another - // buffer to write between the time we decremented _buffersToWrite - // and now, that reader will also schedule another write, - // as it will increment _buffersToWrite from zero to one - if (buffersLeftToWrite > 0) - BeginWrite(); - } - } - - internal class AsyncResult : IAsyncResult, IDisposable - { - // Fields set at construction which never change while - // operation is pending - private readonly AsyncCallback _asyncCallback; - private readonly object _asyncState; - - // Fields set at construction which do change after - // operation completes - private const int StatePending = 0; - private const int StateCompletedSynchronously = 1; - private const int StateCompletedAsynchronously = 2; - private int _completedState = StatePending; - - // Field that may or may not get set depending on usage - private ManualResetEvent _waitHandle; - - internal AsyncResult( - AsyncCallback asyncCallback, - object state) - { - _asyncCallback = asyncCallback; - _asyncState = state; - } - - internal bool Complete(bool completedSynchronously) - { - bool result = false; - - // The _completedState field MUST be set prior calling the callback - int prevState = Interlocked.CompareExchange(ref _completedState, - completedSynchronously ? StateCompletedSynchronously : - StateCompletedAsynchronously, StatePending); - if (prevState == StatePending) - { - // If the event exists, set it - if (_waitHandle != null) - _waitHandle.Set(); - - if (_asyncCallback != null) - _asyncCallback(this); - - result = true; - } - - return result; - } - - #region Implementation of IAsyncResult - - public Object AsyncState { get { return _asyncState; } } - - public bool CompletedSynchronously - { - get - { - return Thread.VolatileRead(ref _completedState) == - StateCompletedSynchronously; - } - } - - public WaitHandle AsyncWaitHandle - { - get - { - if (_waitHandle == null) - { - bool done = IsCompleted; - var mre = new ManualResetEvent(done); - if (Interlocked.CompareExchange(ref _waitHandle, - mre, null) != null) - { - // Another thread created this object's event; dispose - // the event we just created - mre.Close(); - } - else - { - if (!done && IsCompleted) - { - // If the operation wasn't done when we created - // the event but now it is done, set the event - _waitHandle.Set(); - } - } - } - return _waitHandle; - } - } - - public bool IsCompleted - { - get - { - return Thread.VolatileRead(ref _completedState) != - StatePending; - } - } - #endregion - - public void Dispose() - { - if (_waitHandle != null) - { - _waitHandle.Dispose(); - _waitHandle = null; - } - } - } -} diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs index d2e9c8bf02..84adf0cfc9 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunHttpStream.cs @@ -158,58 +158,7 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun public Task CopyToAsync(Stream stream, CancellationToken cancellationToken) { - if (_enableFileBuffer) - { - return CopyFileTo(_tempFilePath, stream, cancellationToken); - } return _multicastStream.CopyToAsync(stream, cancellationToken); - //return CopyFileTo(_tempFilePath, stream, cancellationToken); - } - - protected async Task CopyFileTo(string path, Stream outputStream, CancellationToken cancellationToken) - { - long startPosition = -20000; - if (startPosition < 0) - { - var length = FileSystem.GetFileInfo(path).Length; - startPosition = Math.Max(length - startPosition, 0); - } - - _logger.Info("Live stream starting position is {0} bytes", startPosition.ToString(CultureInfo.InvariantCulture)); - - var allowAsync = Environment.OperatingSystem != MediaBrowser.Model.System.OperatingSystem.Windows; - // use non-async filestream along with read due to https://github.com/dotnet/corefx/issues/6039 - - using (var inputStream = GetInputStream(path, startPosition, allowAsync)) - { - if (startPosition > 0) - { - inputStream.Position = startPosition; - } - - while (!cancellationToken.IsCancellationRequested) - { - long bytesRead; - - if (allowAsync) - { - bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 2, cancellationToken).ConfigureAwait(false); - } - else - { - StreamHelper.CopyTo(inputStream, outputStream, 81920, cancellationToken); - bytesRead = 1; - } - - //var position = fs.Position; - //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); - - if (bytesRead == 0) - { - await Task.Delay(100, cancellationToken).ConfigureAwait(false); - } - } - } } } } diff --git a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs index 5ad6e2e161..69fe59b4a0 100644 --- a/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs +++ b/Emby.Server.Implementations/LiveTv/TunerHosts/HdHomerun/HdHomerunUdpStream.cs @@ -211,15 +211,8 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun { long bytesRead; - if (allowAsync) - { - bytesRead = await AsyncStreamCopier.CopyStream(inputStream, outputStream, 81920, 2, cancellationToken).ConfigureAwait(false); - } - else - { - StreamHelper.CopyTo(inputStream, outputStream, 81920, cancellationToken); - bytesRead = 1; - } + StreamHelper.CopyTo(inputStream, outputStream, 81920, cancellationToken); + bytesRead = 1; //var position = fs.Position; //_logger.Debug("Streamed {0} bytes to position {1} from file {2}", bytesRead, position, path); @@ -285,22 +278,6 @@ namespace Emby.Server.Implementations.LiveTv.TunerHosts.HdHomerun //return taskCompletion.Task; } - private void StreamCopyCallback(IAsyncResult result) - { - var copier = (AsyncStreamCopier)result.AsyncState; - var taskCompletion = copier.TaskCompletionSource; - - try - { - copier.EndCopy(result); - taskCompletion.TrySetResult(0); - } - catch (Exception ex) - { - taskCompletion.TrySetException(ex); - } - } - public class UdpClientStream : Stream { private static int RtpHeaderBytes = 12;