Use .NET 6's `ParallelForEachAsync(...)`

pull/811/head
Tyrrrz 3 years ago
parent b8567d384f
commit 008bb2f591

@ -14,7 +14,6 @@ using DiscordChatExporter.Core.Exceptions;
using DiscordChatExporter.Core.Exporting;
using DiscordChatExporter.Core.Exporting.Filtering;
using DiscordChatExporter.Core.Exporting.Partitioning;
using DiscordChatExporter.Core.Utils.Extensions;
namespace DiscordChatExporter.Cli.Commands.Base;
@ -68,36 +67,47 @@ public abstract class ExportCommandBase : TokenCommandBase
await console.Output.WriteLineAsync($"Exporting {channels.Count} channel(s)...");
await console.CreateProgressTicker().StartAsync(async progressContext =>
{
await channels.ParallelForEachAsync(async channel =>
{
try
await Parallel.ForEachAsync(
channels,
new ParallelOptions
{
MaxDegreeOfParallelism = Math.Max(1, ParallelLimit),
CancellationToken = cancellationToken
},
async (channel, innerCancellationToken) =>
{
await progressContext.StartTaskAsync($"{channel.Category.Name} / {channel.Name}", async progress =>
try
{
var guild = await Discord.GetGuildAsync(channel.GuildId, cancellationToken);
var request = new ExportRequest(
guild,
channel,
OutputPath,
ExportFormat,
After,
Before,
PartitionLimit,
MessageFilter,
ShouldDownloadMedia,
ShouldReuseMedia,
DateFormat
await progressContext.StartTaskAsync(
$"{channel.Category.Name} / {channel.Name}",
async progress =>
{
var guild = await Discord.GetGuildAsync(channel.GuildId, innerCancellationToken);
var request = new ExportRequest(
guild,
channel,
OutputPath,
ExportFormat,
After,
Before,
PartitionLimit,
MessageFilter,
ShouldDownloadMedia,
ShouldReuseMedia,
DateFormat
);
await Exporter.ExportChannelAsync(request, progress, innerCancellationToken);
}
);
await Exporter.ExportChannelAsync(request, progress, cancellationToken);
});
}
catch (DiscordChatExporterException ex) when (!ex.IsFatal)
{
errors[channel] = ex.Message;
}
catch (DiscordChatExporterException ex) when (!ex.IsFatal)
{
errors[channel] = ex.Message;
}
}
}, Math.Max(ParallelLimit, 1), cancellationToken);
);
});
// Print result

@ -48,10 +48,9 @@ public partial record ExportRequest
Snowflake? after = null,
Snowflake? before = null)
{
// Formats path
outputPath = Regex.Replace(outputPath, "%.", m =>
PathEx.EscapePath(m.Value switch
PathEx.EscapeFileName(m.Value switch
{
"%g" => guild.Id.ToString(),
"%G" => guild.Name,
@ -118,9 +117,6 @@ public partial record ExportRequest
// File extension
buffer.Append($".{format.GetFileExtension()}");
// Replace invalid chars
PathEx.EscapePath(buffer);
return buffer.ToString();
return PathEx.EscapeFileName(buffer.ToString());
}
}

@ -104,6 +104,6 @@ internal partial class MediaDownloader
var fileNameWithoutExtension = Path.GetFileNameWithoutExtension(fileName);
var fileExtension = Path.GetExtension(fileName);
return PathEx.EscapePath(fileNameWithoutExtension.Truncate(42) + '-' + urlHash + fileExtension);
return PathEx.EscapeFileName(fileNameWithoutExtension.Truncate(42) + '-' + urlHash + fileExtension);
}
}

@ -1,8 +1,5 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Collections.Generic;
using System.Runtime.CompilerServices;
using System.Threading;
using System.Threading.Tasks;
namespace DiscordChatExporter.Core.Utils.Extensions;
@ -23,29 +20,4 @@ public static class AsyncExtensions
public static ValueTaskAwaiter<IReadOnlyList<T>> GetAwaiter<T>(
this IAsyncEnumerable<T> asyncEnumerable) =>
asyncEnumerable.AggregateAsync().GetAwaiter();
public static async ValueTask ParallelForEachAsync<T>(
this IEnumerable<T> source,
Func<T, ValueTask> handleAsync,
int degreeOfParallelism,
CancellationToken cancellationToken = default)
{
using var semaphore = new SemaphoreSlim(degreeOfParallelism);
await Task.WhenAll(source.Select(async item =>
{
// ReSharper disable once AccessToDisposedClosure
await semaphore.WaitAsync(cancellationToken);
try
{
await handleAsync(item);
}
finally
{
// ReSharper disable once AccessToDisposedClosure
semaphore.Release();
}
}));
}
}

@ -1,17 +1,20 @@
using System.IO;
using System.Collections.Generic;
using System.IO;
using System.Text;
namespace DiscordChatExporter.Core.Utils;
public static class PathEx
{
public static StringBuilder EscapePath(StringBuilder pathBuffer)
private static readonly HashSet<char> InvalidFileNameChars = new(Path.GetInvalidFileNameChars());
public static string EscapeFileName(string path)
{
foreach (var invalidChar in Path.GetInvalidFileNameChars())
pathBuffer.Replace(invalidChar, '_');
var buffer = new StringBuilder(path.Length);
return pathBuffer;
}
foreach (var c in path)
buffer.Append(!InvalidFileNameChars.Contains(c) ? c : '_');
public static string EscapePath(string path) => EscapePath(new StringBuilder(path)).ToString();
return buffer.ToString();
}
}

@ -210,39 +210,46 @@ public class RootViewModel : Screen
var operations = ProgressManager.CreateOperations(dialog.Channels!.Count);
var successfulExportCount = 0;
await dialog.Channels.Zip(operations).ParallelForEachAsync(async tuple =>
{
var (channel, operation) = tuple;
try
{
var request = new ExportRequest(
dialog.Guild!,
channel!,
dialog.OutputPath!,
dialog.SelectedFormat,
dialog.After?.Pipe(Snowflake.FromDate),
dialog.Before?.Pipe(Snowflake.FromDate),
dialog.PartitionLimit,
dialog.MessageFilter,
dialog.ShouldDownloadMedia,
_settingsService.ShouldReuseMedia,
_settingsService.DateFormat
);
await exporter.ExportChannelAsync(request, operation);
Interlocked.Increment(ref successfulExportCount);
}
catch (DiscordChatExporterException ex) when (!ex.IsFatal)
await Parallel.ForEachAsync(
dialog.Channels.Zip(operations),
new ParallelOptions
{
Notifications.Enqueue(ex.Message.TrimEnd('.'));
}
finally
MaxDegreeOfParallelism = Math.Max(1, _settingsService.ParallelLimit)
},
async (tuple, cancellationToken) =>
{
operation.Dispose();
var (channel, operation) = tuple;
try
{
var request = new ExportRequest(
dialog.Guild!,
channel,
dialog.OutputPath!,
dialog.SelectedFormat,
dialog.After?.Pipe(Snowflake.FromDate),
dialog.Before?.Pipe(Snowflake.FromDate),
dialog.PartitionLimit,
dialog.MessageFilter,
dialog.ShouldDownloadMedia,
_settingsService.ShouldReuseMedia,
_settingsService.DateFormat
);
await exporter.ExportChannelAsync(request, operation, cancellationToken);
Interlocked.Increment(ref successfulExportCount);
}
catch (DiscordChatExporterException ex) when (!ex.IsFatal)
{
Notifications.Enqueue(ex.Message.TrimEnd('.'));
}
finally
{
operation.Dispose();
}
}
}, Math.Max(1, _settingsService.ParallelLimit));
);
// Notify of overall completion
if (successfulExportCount > 0)

Loading…
Cancel
Save