Merge pull request #10801 from MarkCiliaVincenti/AsyncKeyedLock-migration

AsyncKeyedLock migration
pull/10955/head
Claus Vium 4 months ago committed by GitHub
commit 1e955c4347
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194

@ -78,6 +78,7 @@
- [Marenz](https://github.com/Marenz)
- [marius-luca-87](https://github.com/marius-luca-87)
- [mark-monteiro](https://github.com/mark-monteiro)
- [MarkCiliaVincenti](https://github.com/MarkCiliaVincenti)
- [Matt07211](https://github.com/Matt07211)
- [Maxr1998](https://github.com/Maxr1998)
- [mcarlton00](https://github.com/mcarlton00)

@ -4,6 +4,7 @@
</PropertyGroup>
<!-- Run "dotnet list package (dash,dash)outdated" to see the latest versions of each package.-->
<ItemGroup Label="Package Dependencies">
<PackageVersion Include="AsyncKeyedLock" Version="6.3.4" />
<PackageVersion Include="AutoFixture.AutoMoq" Version="4.18.1" />
<PackageVersion Include="AutoFixture.Xunit2" Version="4.18.1" />
<PackageVersion Include="AutoFixture" Version="4.18.1" />

@ -11,6 +11,7 @@ using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using Jellyfin.Data.Entities;
using Jellyfin.Data.Enums;
using Jellyfin.Extensions;
@ -52,7 +53,7 @@ namespace Emby.Server.Implementations.Library
private readonly IDirectoryService _directoryService;
private readonly ConcurrentDictionary<string, ILiveStream> _openStreams = new ConcurrentDictionary<string, ILiveStream>(StringComparer.OrdinalIgnoreCase);
private readonly SemaphoreSlim _liveStreamSemaphore = new SemaphoreSlim(1, 1);
private readonly AsyncNonKeyedLocker _liveStreamLocker = new(1);
private readonly JsonSerializerOptions _jsonOptions = JsonDefaults.Options;
private IMediaSourceProvider[] _providers;
@ -468,12 +469,10 @@ namespace Emby.Server.Implementations.Library
public async Task<Tuple<LiveStreamResponse, IDirectStreamProvider>> OpenLiveStreamInternal(LiveStreamRequest request, CancellationToken cancellationToken)
{
await _liveStreamSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
MediaSourceInfo mediaSource;
ILiveStream liveStream;
try
using (await _liveStreamLocker.LockAsync(cancellationToken).ConfigureAwait(false))
{
var (provider, keyId) = GetProvider(request.OpenToken);
@ -493,10 +492,6 @@ namespace Emby.Server.Implementations.Library
_openStreams[mediaSource.LiveStreamId] = liveStream;
}
finally
{
_liveStreamSemaphore.Release();
}
try
{
@ -837,9 +832,7 @@ namespace Emby.Server.Implementations.Library
{
ArgumentException.ThrowIfNullOrEmpty(id);
await _liveStreamSemaphore.WaitAsync().ConfigureAwait(false);
try
using (await _liveStreamLocker.LockAsync().ConfigureAwait(false))
{
if (_openStreams.TryGetValue(id, out ILiveStream liveStream))
{
@ -858,10 +851,6 @@ namespace Emby.Server.Implementations.Library
}
}
}
finally
{
_liveStreamSemaphore.Release();
}
}
private (IMediaSourceProvider MediaSourceProvider, string KeyId) GetProvider(string key)
@ -898,7 +887,7 @@ namespace Emby.Server.Implementations.Library
CloseLiveStream(key).GetAwaiter().GetResult();
}
_liveStreamSemaphore.Dispose();
_liveStreamLocker.Dispose();
}
}
}

@ -294,9 +294,7 @@ public class DynamicHlsController : BaseJellyfinApiController
if (!System.IO.File.Exists(playlistPath))
{
var transcodingLock = _transcodeManager.GetTranscodingLock(playlistPath);
await transcodingLock.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _transcodeManager.LockAsync(playlistPath, cancellationToken).ConfigureAwait(false))
{
if (!System.IO.File.Exists(playlistPath))
{
@ -326,10 +324,6 @@ public class DynamicHlsController : BaseJellyfinApiController
}
}
}
finally
{
transcodingLock.Release();
}
}
job ??= _transcodeManager.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
@ -1442,95 +1436,80 @@ public class DynamicHlsController : BaseJellyfinApiController
return await GetSegmentResult(state, playlistPath, segmentPath, segmentExtension, segmentId, job, cancellationToken).ConfigureAwait(false);
}
var transcodingLock = _transcodeManager.GetTranscodingLock(playlistPath);
await transcodingLock.WaitAsync(cancellationToken).ConfigureAwait(false);
var released = false;
var startTranscoding = false;
try
using (await _transcodeManager.LockAsync(playlistPath, cancellationToken).ConfigureAwait(false))
{
var startTranscoding = false;
if (System.IO.File.Exists(segmentPath))
{
job = _transcodeManager.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
transcodingLock.Release();
released = true;
_logger.LogDebug("returning {0} [it exists, try 2]", segmentPath);
return await GetSegmentResult(state, playlistPath, segmentPath, segmentExtension, segmentId, job, cancellationToken).ConfigureAwait(false);
}
else
{
var currentTranscodingIndex = GetCurrentTranscodingIndex(playlistPath, segmentExtension);
var segmentGapRequiringTranscodingChange = 24 / state.SegmentLength;
if (segmentId == -1)
{
_logger.LogDebug("Starting transcoding because fmp4 init file is being requested");
startTranscoding = true;
segmentId = 0;
}
else if (currentTranscodingIndex is null)
{
_logger.LogDebug("Starting transcoding because currentTranscodingIndex=null");
startTranscoding = true;
}
else if (segmentId < currentTranscodingIndex.Value)
{
_logger.LogDebug("Starting transcoding because requestedIndex={0} and currentTranscodingIndex={1}", segmentId, currentTranscodingIndex);
startTranscoding = true;
}
else if (segmentId - currentTranscodingIndex.Value > segmentGapRequiringTranscodingChange)
{
_logger.LogDebug("Starting transcoding because segmentGap is {0} and max allowed gap is {1}. requestedIndex={2}", segmentId - currentTranscodingIndex.Value, segmentGapRequiringTranscodingChange, segmentId);
startTranscoding = true;
}
if (startTranscoding)
{
// If the playlist doesn't already exist, startup ffmpeg
try
{
await _transcodeManager.KillTranscodingJobs(streamingRequest.DeviceId, streamingRequest.PlaySessionId, p => false)
.ConfigureAwait(false);
var currentTranscodingIndex = GetCurrentTranscodingIndex(playlistPath, segmentExtension);
var segmentGapRequiringTranscodingChange = 24 / state.SegmentLength;
if (currentTranscodingIndex.HasValue)
{
DeleteLastFile(playlistPath, segmentExtension, 0);
}
if (segmentId == -1)
{
_logger.LogDebug("Starting transcoding because fmp4 init file is being requested");
startTranscoding = true;
segmentId = 0;
}
else if (currentTranscodingIndex is null)
{
_logger.LogDebug("Starting transcoding because currentTranscodingIndex=null");
startTranscoding = true;
}
else if (segmentId < currentTranscodingIndex.Value)
{
_logger.LogDebug("Starting transcoding because requestedIndex={0} and currentTranscodingIndex={1}", segmentId, currentTranscodingIndex);
startTranscoding = true;
}
else if (segmentId - currentTranscodingIndex.Value > segmentGapRequiringTranscodingChange)
{
_logger.LogDebug("Starting transcoding because segmentGap is {0} and max allowed gap is {1}. requestedIndex={2}", segmentId - currentTranscodingIndex.Value, segmentGapRequiringTranscodingChange, segmentId);
startTranscoding = true;
}
streamingRequest.StartTimeTicks = streamingRequest.CurrentRuntimeTicks;
if (startTranscoding)
{
// If the playlist doesn't already exist, startup ffmpeg
try
{
await _transcodeManager.KillTranscodingJobs(streamingRequest.DeviceId, streamingRequest.PlaySessionId, p => false)
.ConfigureAwait(false);
state.WaitForPath = segmentPath;
job = await _transcodeManager.StartFfMpeg(
state,
playlistPath,
GetCommandLineArguments(playlistPath, state, false, segmentId),
Request.HttpContext.User.GetUserId(),
TranscodingJobType,
cancellationTokenSource).ConfigureAwait(false);
}
catch
if (currentTranscodingIndex.HasValue)
{
state.Dispose();
throw;
DeleteLastFile(playlistPath, segmentExtension, 0);
}
// await WaitForMinimumSegmentCount(playlistPath, 1, cancellationTokenSource.Token).ConfigureAwait(false);
streamingRequest.StartTimeTicks = streamingRequest.CurrentRuntimeTicks;
state.WaitForPath = segmentPath;
job = await _transcodeManager.StartFfMpeg(
state,
playlistPath,
GetCommandLineArguments(playlistPath, state, false, segmentId),
Request.HttpContext.User.GetUserId(),
TranscodingJobType,
cancellationTokenSource).ConfigureAwait(false);
}
else
catch
{
job = _transcodeManager.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
if (job?.TranscodingThrottler is not null)
{
await job.TranscodingThrottler.UnpauseTranscoding().ConfigureAwait(false);
}
state.Dispose();
throw;
}
// await WaitForMinimumSegmentCount(playlistPath, 1, cancellationTokenSource.Token).ConfigureAwait(false);
}
}
finally
{
if (!released)
else
{
transcodingLock.Release();
job = _transcodeManager.OnTranscodeBeginRequest(playlistPath, TranscodingJobType);
if (job?.TranscodingThrottler is not null)
{
await job.TranscodingThrottler.UnpauseTranscoding().ConfigureAwait(false);
}
}
}

@ -93,9 +93,7 @@ public static class FileStreamResponseHelpers
return new OkResult();
}
var transcodingLock = transcodeManager.GetTranscodingLock(outputPath);
await transcodingLock.WaitAsync(cancellationTokenSource.Token).ConfigureAwait(false);
try
using (await transcodeManager.LockAsync(outputPath, cancellationTokenSource.Token).ConfigureAwait(false))
{
TranscodingJob? job;
if (!File.Exists(outputPath))
@ -117,9 +115,5 @@ public static class FileStreamResponseHelpers
var stream = new ProgressiveFileStream(outputPath, job, transcodeManager);
return new FileStreamResult(stream, contentType);
}
finally
{
transcodingLock.Release();
}
}
}

@ -26,6 +26,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="AsyncKeyedLock" />
<PackageReference Include="EFCoreSecondLevelCacheInterceptor" />
<PackageReference Include="System.Linq.Async" />
<PackageReference Include="Microsoft.EntityFrameworkCore.Sqlite" />

@ -6,6 +6,7 @@ using System.Linq;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using Jellyfin.Data.Entities;
using MediaBrowser.Common.Configuration;
using MediaBrowser.Controller.Configuration;
@ -37,7 +38,7 @@ public class TrickplayManager : ITrickplayManager
private readonly IDbContextFactory<JellyfinDbContext> _dbProvider;
private readonly IApplicationPaths _appPaths;
private static readonly SemaphoreSlim _resourcePool = new(1, 1);
private static readonly AsyncNonKeyedLocker _resourcePool = new(1);
private static readonly string[] _trickplayImgExtensions = { ".jpg" };
/// <summary>
@ -107,93 +108,92 @@ public class TrickplayManager : ITrickplayManager
var imgTempDir = string.Empty;
var outputDir = GetTrickplayDirectory(video, width);
await _resourcePool.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _resourcePool.LockAsync(cancellationToken).ConfigureAwait(false))
{
if (!replace && Directory.Exists(outputDir) && (await GetTrickplayResolutions(video.Id).ConfigureAwait(false)).ContainsKey(width))
{
_logger.LogDebug("Found existing trickplay files for {ItemId}. Exiting.", video.Id);
return;
}
// Extract images
// Note: Media sources under parent items exist as their own video/item as well. Only use this video stream for trickplay.
var mediaSource = video.GetMediaSources(false).Find(source => Guid.Parse(source.Id).Equals(video.Id));
if (mediaSource is null)
try
{
_logger.LogDebug("Found no matching media source for item {ItemId}", video.Id);
return;
}
if (!replace && Directory.Exists(outputDir) && (await GetTrickplayResolutions(video.Id).ConfigureAwait(false)).ContainsKey(width))
{
_logger.LogDebug("Found existing trickplay files for {ItemId}. Exiting.", video.Id);
return;
}
var mediaPath = mediaSource.Path;
var mediaStream = mediaSource.VideoStream;
var container = mediaSource.Container;
// Extract images
// Note: Media sources under parent items exist as their own video/item as well. Only use this video stream for trickplay.
var mediaSource = video.GetMediaSources(false).Find(source => Guid.Parse(source.Id).Equals(video.Id));
_logger.LogInformation("Creating trickplay files at {Width} width, for {Path} [ID: {ItemId}]", width, mediaPath, video.Id);
imgTempDir = await _mediaEncoder.ExtractVideoImagesOnIntervalAccelerated(
mediaPath,
container,
mediaSource,
mediaStream,
width,
TimeSpan.FromMilliseconds(options.Interval),
options.EnableHwAcceleration,
options.ProcessThreads,
options.Qscale,
options.ProcessPriority,
_encodingHelper,
cancellationToken).ConfigureAwait(false);
if (mediaSource is null)
{
_logger.LogDebug("Found no matching media source for item {ItemId}", video.Id);
return;
}
if (string.IsNullOrEmpty(imgTempDir) || !Directory.Exists(imgTempDir))
{
throw new InvalidOperationException("Null or invalid directory from media encoder.");
}
var mediaPath = mediaSource.Path;
var mediaStream = mediaSource.VideoStream;
var container = mediaSource.Container;
_logger.LogInformation("Creating trickplay files at {Width} width, for {Path} [ID: {ItemId}]", width, mediaPath, video.Id);
imgTempDir = await _mediaEncoder.ExtractVideoImagesOnIntervalAccelerated(
mediaPath,
container,
mediaSource,
mediaStream,
width,
TimeSpan.FromMilliseconds(options.Interval),
options.EnableHwAcceleration,
options.ProcessThreads,
options.Qscale,
options.ProcessPriority,
_encodingHelper,
cancellationToken).ConfigureAwait(false);
if (string.IsNullOrEmpty(imgTempDir) || !Directory.Exists(imgTempDir))
{
throw new InvalidOperationException("Null or invalid directory from media encoder.");
}
var images = _fileSystem.GetFiles(imgTempDir, _trickplayImgExtensions, false, false)
.Select(i => i.FullName)
.OrderBy(i => i)
.ToList();
var images = _fileSystem.GetFiles(imgTempDir, _trickplayImgExtensions, false, false)
.Select(i => i.FullName)
.OrderBy(i => i)
.ToList();
// Create tiles
var trickplayInfo = CreateTiles(images, width, options, outputDir);
// Create tiles
var trickplayInfo = CreateTiles(images, width, options, outputDir);
// Save tiles info
try
{
if (trickplayInfo is not null)
// Save tiles info
try
{
trickplayInfo.ItemId = video.Id;
await SaveTrickplayInfo(trickplayInfo).ConfigureAwait(false);
if (trickplayInfo is not null)
{
trickplayInfo.ItemId = video.Id;
await SaveTrickplayInfo(trickplayInfo).ConfigureAwait(false);
_logger.LogInformation("Finished creation of trickplay files for {0}", mediaPath);
_logger.LogInformation("Finished creation of trickplay files for {0}", mediaPath);
}
else
{
throw new InvalidOperationException("Null trickplay tiles info from CreateTiles.");
}
}
else
catch (Exception ex)
{
throw new InvalidOperationException("Null trickplay tiles info from CreateTiles.");
_logger.LogError(ex, "Error while saving trickplay tiles info.");
// Make sure no files stay in metadata folders on failure
// if tiles info wasn't saved.
Directory.Delete(outputDir, true);
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error while saving trickplay tiles info.");
// Make sure no files stay in metadata folders on failure
// if tiles info wasn't saved.
Directory.Delete(outputDir, true);
_logger.LogError(ex, "Error creating trickplay images.");
}
}
catch (Exception ex)
{
_logger.LogError(ex, "Error creating trickplay images.");
}
finally
{
_resourcePool.Release();
if (!string.IsNullOrEmpty(imgTempDir))
finally
{
Directory.Delete(imgTempDir, true);
if (!string.IsNullOrEmpty(imgTempDir))
{
Directory.Delete(imgTempDir, true);
}
}
}
}

@ -96,9 +96,10 @@ public interface ITranscodeManager
public void OnTranscodeEndRequest(TranscodingJob job);
/// <summary>
/// Gets the transcoding lock.
/// Transcoding lock.
/// </summary>
/// <param name="outputPath">The output path of the transcoded file.</param>
/// <returns>A <see cref="SemaphoreSlim"/>.</returns>
public SemaphoreSlim GetTranscodingLock(string outputPath);
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>An <see cref="IDisposable"/>.</returns>
ValueTask<IDisposable> LockAsync(string outputPath, CancellationToken cancellationToken);
}

@ -8,6 +8,7 @@ using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using MediaBrowser.Common.Configuration;
using MediaBrowser.Common.Extensions;
using MediaBrowser.Controller.Entities;
@ -22,7 +23,7 @@ using Microsoft.Extensions.Logging;
namespace MediaBrowser.MediaEncoding.Attachments
{
public sealed class AttachmentExtractor : IAttachmentExtractor
public sealed class AttachmentExtractor : IAttachmentExtractor, IDisposable
{
private readonly ILogger<AttachmentExtractor> _logger;
private readonly IApplicationPaths _appPaths;
@ -30,8 +31,11 @@ namespace MediaBrowser.MediaEncoding.Attachments
private readonly IMediaEncoder _mediaEncoder;
private readonly IMediaSourceManager _mediaSourceManager;
private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphoreLocks =
new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly AsyncKeyedLocker<string> _semaphoreLocks = new(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});
public AttachmentExtractor(
ILogger<AttachmentExtractor> logger,
@ -84,11 +88,7 @@ namespace MediaBrowser.MediaEncoding.Attachments
string outputPath,
CancellationToken cancellationToken)
{
var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!Directory.Exists(outputPath))
{
@ -99,10 +99,6 @@ namespace MediaBrowser.MediaEncoding.Attachments
cancellationToken).ConfigureAwait(false);
}
}
finally
{
semaphore.Release();
}
}
public async Task ExtractAllAttachmentsExternal(
@ -111,11 +107,7 @@ namespace MediaBrowser.MediaEncoding.Attachments
string outputPath,
CancellationToken cancellationToken)
{
var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!File.Exists(Path.Join(outputPath, id)))
{
@ -131,10 +123,6 @@ namespace MediaBrowser.MediaEncoding.Attachments
}
}
}
finally
{
semaphore.Release();
}
}
private async Task ExtractAllAttachmentsInternal(
@ -256,11 +244,7 @@ namespace MediaBrowser.MediaEncoding.Attachments
string outputPath,
CancellationToken cancellationToken)
{
var semaphore = _semaphoreLocks.GetOrAdd(outputPath, key => new SemaphoreSlim(1, 1));
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!File.Exists(outputPath))
{
@ -271,10 +255,6 @@ namespace MediaBrowser.MediaEncoding.Attachments
cancellationToken).ConfigureAwait(false);
}
}
finally
{
semaphore.Release();
}
}
private async Task ExtractAttachmentInternal(
@ -379,5 +359,11 @@ namespace MediaBrowser.MediaEncoding.Attachments
var prefix = filename.AsSpan(0, 1);
return Path.Join(_appPaths.DataPath, "attachments", prefix, filename);
}
/// <inheritdoc />
public void Dispose()
{
_semaphoreLocks.Dispose();
}
}
}

@ -11,6 +11,7 @@ using System.Text.Json;
using System.Text.RegularExpressions;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using Jellyfin.Extensions;
using Jellyfin.Extensions.Json;
using Jellyfin.Extensions.Json.Converters;
@ -60,7 +61,7 @@ namespace MediaBrowser.MediaEncoding.Encoder
private readonly IServerConfigurationManager _serverConfig;
private readonly string _startupOptionFFmpegPath;
private readonly SemaphoreSlim _thumbnailResourcePool;
private readonly AsyncNonKeyedLocker _thumbnailResourcePool;
private readonly object _runningProcessesLock = new object();
private readonly List<ProcessWrapper> _runningProcesses = new List<ProcessWrapper>();
@ -116,7 +117,7 @@ namespace MediaBrowser.MediaEncoding.Encoder
_jsonSerializerOptions.Converters.Add(new JsonBoolStringConverter());
var semaphoreCount = 2 * Environment.ProcessorCount;
_thumbnailResourcePool = new SemaphoreSlim(semaphoreCount, semaphoreCount);
_thumbnailResourcePool = new(semaphoreCount);
}
/// <inheritdoc />
@ -754,8 +755,7 @@ namespace MediaBrowser.MediaEncoding.Encoder
{
bool ranToCompletion;
await _thumbnailResourcePool.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _thumbnailResourcePool.LockAsync(cancellationToken).ConfigureAwait(false))
{
StartProcess(processWrapper);
@ -776,10 +776,6 @@ namespace MediaBrowser.MediaEncoding.Encoder
ranToCompletion = false;
}
}
finally
{
_thumbnailResourcePool.Release();
}
var exitCode = ranToCompletion ? processWrapper.ExitCode ?? 0 : -1;
var file = _fileSystem.GetFileInfo(tempExtractPath);
@ -908,8 +904,7 @@ namespace MediaBrowser.MediaEncoding.Encoder
{
bool ranToCompletion = false;
await _thumbnailResourcePool.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _thumbnailResourcePool.LockAsync(cancellationToken).ConfigureAwait(false))
{
StartProcess(processWrapper);
@ -963,10 +958,6 @@ namespace MediaBrowser.MediaEncoding.Encoder
StopProcess(processWrapper, 1000);
}
}
finally
{
_thumbnailResourcePool.Release();
}
var exitCode = ranToCompletion ? processWrapper.ExitCode ?? 0 : -1;

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<!-- ProjectGuid is only included as a requirement for SonarQube analysis -->
<PropertyGroup>
@ -22,6 +22,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="AsyncKeyedLock" />
<PackageReference Include="BDInfo" />
<PackageReference Include="libse" />
<PackageReference Include="Microsoft.Extensions.Http" />

@ -1,7 +1,6 @@
#pragma warning disable CS1591
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Diagnostics;
using System.Diagnostics.CodeAnalysis;
@ -12,6 +11,7 @@ using System.Net.Http;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using MediaBrowser.Common;
using MediaBrowser.Common.Configuration;
using MediaBrowser.Common.Extensions;
@ -28,7 +28,7 @@ using UtfUnknown;
namespace MediaBrowser.MediaEncoding.Subtitles
{
public sealed class SubtitleEncoder : ISubtitleEncoder
public sealed class SubtitleEncoder : ISubtitleEncoder, IDisposable
{
private readonly ILogger<SubtitleEncoder> _logger;
private readonly IApplicationPaths _appPaths;
@ -41,8 +41,11 @@ namespace MediaBrowser.MediaEncoding.Subtitles
/// <summary>
/// The _semaphoreLocks.
/// </summary>
private readonly ConcurrentDictionary<string, SemaphoreSlim> _semaphoreLocks =
new ConcurrentDictionary<string, SemaphoreSlim>();
private readonly AsyncKeyedLocker<string> _semaphoreLocks = new(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});
public SubtitleEncoder(
ILogger<SubtitleEncoder> logger,
@ -293,16 +296,6 @@ namespace MediaBrowser.MediaEncoding.Subtitles
throw new ArgumentException("Unsupported format: " + format);
}
/// <summary>
/// Gets the lock.
/// </summary>
/// <param name="filename">The filename.</param>
/// <returns>System.Object.</returns>
private SemaphoreSlim GetLock(string filename)
{
return _semaphoreLocks.GetOrAdd(filename, _ => new SemaphoreSlim(1, 1));
}
/// <summary>
/// Converts the text subtitle to SRT.
/// </summary>
@ -313,21 +306,13 @@ namespace MediaBrowser.MediaEncoding.Subtitles
/// <returns>Task.</returns>
private async Task ConvertTextSubtitleToSrt(MediaStream subtitleStream, MediaSourceInfo mediaSource, string outputPath, CancellationToken cancellationToken)
{
var semaphore = GetLock(outputPath);
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!File.Exists(outputPath))
{
await ConvertTextSubtitleToSrtInternal(subtitleStream, mediaSource, outputPath, cancellationToken).ConfigureAwait(false);
}
}
finally
{
semaphore.Release();
}
}
/// <summary>
@ -472,7 +457,7 @@ namespace MediaBrowser.MediaEncoding.Subtitles
/// <returns>Task.</returns>
private async Task ExtractAllTextSubtitles(MediaSourceInfo mediaSource, CancellationToken cancellationToken)
{
var semaphores = new List<SemaphoreSlim>();
var locks = new List<AsyncKeyedLockReleaser<string>>();
var extractableStreams = new List<MediaStream>();
try
@ -484,16 +469,16 @@ namespace MediaBrowser.MediaEncoding.Subtitles
{
var outputPath = GetSubtitleCachePath(mediaSource, subtitleStream.Index, "." + GetTextSubtitleFormat(subtitleStream));
var semaphore = GetLock(outputPath);
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
var @lock = _semaphoreLocks.GetOrAdd(outputPath);
await @lock.SemaphoreSlim.WaitAsync(cancellationToken).ConfigureAwait(false);
if (File.Exists(outputPath))
{
semaphore.Release();
@lock.Dispose();
continue;
}
semaphores.Add(semaphore);
locks.Add(@lock);
extractableStreams.Add(subtitleStream);
}
@ -508,9 +493,9 @@ namespace MediaBrowser.MediaEncoding.Subtitles
}
finally
{
foreach (var semaphore in semaphores)
foreach (var @lock in locks)
{
semaphore.Release();
@lock.Dispose();
}
}
}
@ -657,16 +642,12 @@ namespace MediaBrowser.MediaEncoding.Subtitles
string outputPath,
CancellationToken cancellationToken)
{
var semaphore = GetLock(outputPath);
await semaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
var subtitleStreamIndex = EncodingHelper.FindIndex(mediaSource.MediaStreams, subtitleStream);
try
using (await _semaphoreLocks.LockAsync(outputPath, cancellationToken).ConfigureAwait(false))
{
if (!File.Exists(outputPath))
{
var subtitleStreamIndex = EncodingHelper.FindIndex(mediaSource.MediaStreams, subtitleStream);
var args = _mediaEncoder.GetInputArgument(mediaSource.Path, mediaSource);
if (subtitleStream.IsExternal)
@ -682,10 +663,6 @@ namespace MediaBrowser.MediaEncoding.Subtitles
cancellationToken).ConfigureAwait(false);
}
}
finally
{
semaphore.Release();
}
}
private async Task ExtractTextSubtitleInternal(
@ -901,6 +878,12 @@ namespace MediaBrowser.MediaEncoding.Subtitles
}
}
/// <inheritdoc />
public void Dispose()
{
_semaphoreLocks.Dispose();
}
#pragma warning disable CA1034 // Nested types should not be visible
// Only public for the unit tests
public readonly record struct SubtitleInfo

@ -4,10 +4,12 @@ using System.Diagnostics;
using System.Globalization;
using System.IO;
using System.Linq;
using System.Runtime.CompilerServices;
using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using Jellyfin.Data.Enums;
using Jellyfin.Extensions;
using MediaBrowser.Common;
@ -43,7 +45,11 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
private readonly IAttachmentExtractor _attachmentExtractor;
private readonly List<TranscodingJob> _activeTranscodingJobs = new();
private readonly Dictionary<string, SemaphoreSlim> _transcodingLocks = new();
private readonly AsyncKeyedLocker<string> _transcodingLocks = new(o =>
{
o.PoolSize = 20;
o.PoolInitialFill = 1;
});
/// <summary>
/// Initializes a new instance of the <see cref="TranscodeManager"/> class.
@ -224,11 +230,6 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
}
}
lock (_transcodingLocks)
{
_transcodingLocks.Remove(job.Path!);
}
job.Stop();
if (delete(job.Path!))
@ -625,11 +626,6 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
}
}
lock (_transcodingLocks)
{
_transcodingLocks.Remove(path);
}
if (!string.IsNullOrWhiteSpace(state.Request.DeviceId))
{
_sessionManager.ClearTranscodingInfo(state.Request.DeviceId);
@ -705,21 +701,6 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
}
}
/// <inheritdoc />
public SemaphoreSlim GetTranscodingLock(string outputPath)
{
lock (_transcodingLocks)
{
if (!_transcodingLocks.TryGetValue(outputPath, out SemaphoreSlim? result))
{
result = new SemaphoreSlim(1, 1);
_transcodingLocks[outputPath] = result;
}
return result;
}
}
private void OnPlaybackProgress(object? sender, PlaybackProgressEventArgs e)
{
if (!string.IsNullOrWhiteSpace(e.PlaySessionId))
@ -742,10 +723,23 @@ public sealed class TranscodeManager : ITranscodeManager, IDisposable
}
}
/// <summary>
/// Transcoding lock.
/// </summary>
/// <param name="outputPath">The output path of the transcoded file.</param>
/// <param name="cancellationToken">The cancellation token.</param>
/// <returns>An <see cref="IDisposable"/>.</returns>
[MethodImpl(MethodImplOptions.AggressiveInlining)]
public ValueTask<IDisposable> LockAsync(string outputPath, CancellationToken cancellationToken)
{
return _transcodingLocks.LockAsync(outputPath, cancellationToken);
}
/// <inheritdoc />
public void Dispose()
{
_sessionManager.PlaybackProgress -= OnPlaybackProgress;
_sessionManager.PlaybackStart -= OnPlaybackProgress;
_transcodingLocks.Dispose();
}
}

@ -7,6 +7,7 @@ using System.Net.Mime;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using Jellyfin.Data.Entities;
using MediaBrowser.Common.Extensions;
using MediaBrowser.Controller;
@ -38,7 +39,7 @@ public sealed class ImageProcessor : IImageProcessor, IDisposable
private readonly IServerApplicationPaths _appPaths;
private readonly IImageEncoder _imageEncoder;
private readonly SemaphoreSlim _parallelEncodingLimit;
private readonly AsyncNonKeyedLocker _parallelEncodingLimit;
private bool _disposed;
@ -68,7 +69,7 @@ public sealed class ImageProcessor : IImageProcessor, IDisposable
semaphoreCount = 2 * Environment.ProcessorCount;
}
_parallelEncodingLimit = new(semaphoreCount, semaphoreCount);
_parallelEncodingLimit = new(semaphoreCount);
}
private string ResizedImageCachePath => Path.Combine(_appPaths.ImageCachePath, "resized-images");
@ -193,18 +194,13 @@ public sealed class ImageProcessor : IImageProcessor, IDisposable
{
if (!File.Exists(cacheFilePath))
{
// Limit number of parallel (more precisely: concurrent) image encodings to prevent a high memory usage
await _parallelEncodingLimit.WaitAsync().ConfigureAwait(false);
string resultPath;
try
// Limit number of parallel (more precisely: concurrent) image encodings to prevent a high memory usage
using (await _parallelEncodingLimit.LockAsync().ConfigureAwait(false))
{
resultPath = _imageEncoder.EncodeImage(originalImagePath, dateModified, cacheFilePath, autoOrient, orientation, quality, options, outputFormat);
}
finally
{
_parallelEncodingLimit.Release();
}
if (string.Equals(resultPath, originalImagePath, StringComparison.OrdinalIgnoreCase))
{

@ -21,4 +21,8 @@
<Compile Include="..\..\SharedVersion.cs" />
</ItemGroup>
<ItemGroup>
<PackageReference Include="AsyncKeyedLock" />
</ItemGroup>
</Project>

@ -8,6 +8,7 @@ using System.Linq;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using Jellyfin.Data.Entities;
using Jellyfin.Data.Enums;
using Jellyfin.Extensions;
@ -50,7 +51,7 @@ namespace Jellyfin.LiveTv.Channels
private readonly IFileSystem _fileSystem;
private readonly IProviderManager _providerManager;
private readonly IMemoryCache _memoryCache;
private readonly SemaphoreSlim _resourcePool = new SemaphoreSlim(1, 1);
private readonly AsyncNonKeyedLocker _resourcePool = new(1);
private readonly JsonSerializerOptions _jsonOptions = JsonDefaults.Options;
private bool _disposed = false;
@ -811,9 +812,7 @@ namespace Jellyfin.LiveTv.Channels
{
}
await _resourcePool.WaitAsync(cancellationToken).ConfigureAwait(false);
try
using (await _resourcePool.LockAsync(cancellationToken).ConfigureAwait(false))
{
try
{
@ -860,10 +859,6 @@ namespace Jellyfin.LiveTv.Channels
return result;
}
finally
{
_resourcePool.Release();
}
}
private async Task CacheResponse(ChannelItemResult result, string path)

@ -14,6 +14,7 @@ using System.Text;
using System.Threading;
using System.Threading.Tasks;
using System.Xml;
using AsyncKeyedLock;
using Jellyfin.Data.Enums;
using Jellyfin.Data.Events;
using Jellyfin.Extensions;
@ -68,7 +69,7 @@ namespace Jellyfin.LiveTv.EmbyTV
private readonly ConcurrentDictionary<string, EpgChannelData> _epgChannels =
new ConcurrentDictionary<string, EpgChannelData>(StringComparer.OrdinalIgnoreCase);
private readonly SemaphoreSlim _recordingDeleteSemaphore = new SemaphoreSlim(1, 1);
private readonly AsyncNonKeyedLocker _recordingDeleteSemaphore = new(1);
private bool _disposed;
@ -1444,9 +1445,7 @@ namespace Jellyfin.LiveTv.EmbyTV
return;
}
await _recordingDeleteSemaphore.WaitAsync().ConfigureAwait(false);
try
using (await _recordingDeleteSemaphore.LockAsync().ConfigureAwait(false))
{
if (_disposed)
{
@ -1499,10 +1498,6 @@ namespace Jellyfin.LiveTv.EmbyTV
}
}
}
finally
{
_recordingDeleteSemaphore.Release();
}
}
private void DeleteLibraryItemsForTimers(List<TimerInfo> timers)

@ -1,4 +1,4 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">
<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<GenerateDocumentationFile>true</GenerateDocumentationFile>
@ -11,6 +11,7 @@
</ItemGroup>
<ItemGroup>
<PackageReference Include="AsyncKeyedLock" />
<PackageReference Include="Jellyfin.XmlTv" />
<PackageReference Include="System.Linq.Async" />
</ItemGroup>

@ -16,6 +16,7 @@ using System.Text;
using System.Text.Json;
using System.Threading;
using System.Threading.Tasks;
using AsyncKeyedLock;
using Jellyfin.Extensions;
using Jellyfin.Extensions.Json;
using Jellyfin.LiveTv.Listings.SchedulesDirectDtos;
@ -35,7 +36,7 @@ namespace Jellyfin.LiveTv.Listings
private readonly ILogger<SchedulesDirect> _logger;
private readonly IHttpClientFactory _httpClientFactory;
private readonly SemaphoreSlim _tokenSemaphore = new SemaphoreSlim(1, 1);
private readonly AsyncNonKeyedLocker _tokenLock = new(1);
private readonly ConcurrentDictionary<string, NameValuePair> _tokens = new ConcurrentDictionary<string, NameValuePair>();
private readonly JsonSerializerOptions _jsonOptions = JsonDefaults.Options;
@ -573,27 +574,25 @@ namespace Jellyfin.LiveTv.Listings
}
}
await _tokenSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
var result = await GetTokenInternal(username, password, cancellationToken).ConfigureAwait(false);
savedToken.Name = result;
savedToken.Value = DateTime.UtcNow.Ticks.ToString(CultureInfo.InvariantCulture);
return result;
}
catch (HttpRequestException ex)
using (await _tokenLock.LockAsync(cancellationToken).ConfigureAwait(false))
{
if (ex.StatusCode.HasValue && ex.StatusCode.Value == HttpStatusCode.BadRequest)
try
{
_tokens.Clear();
_lastErrorResponse = DateTime.UtcNow;
var result = await GetTokenInternal(username, password, cancellationToken).ConfigureAwait(false);
savedToken.Name = result;
savedToken.Value = DateTime.UtcNow.Ticks.ToString(CultureInfo.InvariantCulture);
return result;
}
catch (HttpRequestException ex)
{
if (ex.StatusCode.HasValue && ex.StatusCode.Value == HttpStatusCode.BadRequest)
{
_tokens.Clear();
_lastErrorResponse = DateTime.UtcNow;
}
throw;
}
finally
{
_tokenSemaphore.Release();
throw;
}
}
}
@ -801,7 +800,7 @@ namespace Jellyfin.LiveTv.Listings
if (disposing)
{
_tokenSemaphore?.Dispose();
_tokenLock?.Dispose();
}
_disposed = true;

Loading…
Cancel
Save