You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
recyclarr/src/Common/Extensions/RxExtensions.cs

48 lines
1.7 KiB

using System;
using System.Reactive.Disposables;
using System.Reactive.Linq;
using Serilog;
namespace Common.Extensions;
public static class RxExtensions
{
public static IObservable<T> Spy<T>(this IObservable<T> source, ILogger log, string? opName = null)
{
opName ??= "IObservable";
log.Debug("{OpName}: Observable obtained on Thread: {ThreadId}",
opName,
Environment.CurrentManagedThreadId);
return Observable.Create<T>(obs =>
{
log.Debug("{OpName}: Subscribed to on Thread: {ThreadId}",
opName,
Environment.CurrentManagedThreadId);
try
{
var subscription = source
.Do(
x => log.Debug("{OpName}: OnNext({Result}) on Thread: {ThreadId}", opName, x,
Environment.CurrentManagedThreadId),
ex => log.Debug("{OpName}: OnError({Result}) on Thread: {ThreadId}", opName, ex.Message,
Environment.CurrentManagedThreadId),
() => log.Debug("{OpName}: OnCompleted() on Thread: {ThreadId}", opName,
Environment.CurrentManagedThreadId))
.Subscribe(obs);
return new CompositeDisposable(
subscription,
Disposable.Create(() => log.Debug(
"{OpName}: Cleaned up on Thread: {ThreadId}",
opName,
Environment.CurrentManagedThreadId)));
}
finally
{
log.Debug("{OpName}: Subscription completed", opName);
}
});
}
}