# Extensions
**Repository Path**: mirrors_reactiveui/Extensions
## Basic Information
- **Project Name**: Extensions
- **Description**: Extensions for System.Reactive
- **Primary Language**: Unknown
- **License**: MIT
- **Default Branch**: main
- **Homepage**: None
- **GVP Project**: No
## Statistics
- **Stars**: 0
- **Forks**: 0
- **Created**: 2025-09-17
- **Last Updated**: 2026-05-10
## Categories & Tags
**Categories**: Uncategorized
**Tags**: None
## README
[](https://github.com/reactiveui/Extensions/actions/workflows/ci-build.yml)
[](https://codecov.io/gh/reactiveui/Extensions)
# ReactiveUI.Extensions
A focused collection of high–value Reactive Extensions (Rx) operators that do **not** ship with `System.Reactive` but are commonly needed when building reactive .NET applications.
The goal of this library is to:
- Reduce boilerplate for frequent reactive patterns (timers, buffering, throttling, heartbeats, etc.)
- Provide pragmatic, allocation‑aware helpers for performance sensitive scenarios
- Avoid additional dependencies – only `System.Reactive` is required
Supported Target Frameworks: `.NET 4.6.2`, `.NET 4.7.2`, `.NET 4.8.1`, `.NET 8`, `.NET 9`, `.NET 10`.
---
## Table of Contents
1. [Installation](#installation)
2. [Quick Start](#quick-start)
3. [API Catalog](#api-catalog)
4. [Operator Categories & Examples](#operator-categories--examples)
- [Null / Signal Helpers](#null--signal-helpers)
- [Timing, Scheduling & Flow Control](#timing-scheduling--flow-control)
- [Inactivity / Liveness](#inactivity--liveness)
- [Error Handling & Resilience](#error-handling--resilience)
- [Combining, Partitioning & Logical Helpers](#combining-partitioning--logical-helpers)
- [Async / Task Integration](#async--task-integration)
- [Backpressure / Conflation](#backpressure--conflation)
- [Selective & Conditional Emission](#selective--conditional-emission)
- [Buffering & Transformation](#buffering--transformation)
- [Subscription / Side Effects](#subscription--side-effects)
- [Utility & Miscellaneous](#utility--miscellaneous)
5. [Async Observables (`IObservableAsync`)](#async-observables-iobservableasynct)
- [Async Quick Start](#async-quick-start)
- [Async API Catalog](#async-api-catalog)
- [Core Interfaces & Types](#core-interfaces--types)
- [Factory Methods](#factory-methods)
- [Transformation Operators](#transformation-operators)
- [Filtering Operators](#filtering-operators)
- [Combining & Merging](#combining--merging)
- [Error Handling & Retry](#error-handling--retry)
- [Timing & Scheduling (Async)](#timing--scheduling-async)
- [Aggregation & Element Operators](#aggregation--element-operators)
- [Side Effects & Lifecycle](#side-effects--lifecycle)
- [Multicasting & Sharing](#multicasting--sharing)
- [Bridging & Conversion](#bridging--conversion)
- [Subjects](#subjects)
- [Async Disposables](#async-disposables)
6. [Performance Notes](#performance-notes)
7. [Thread Safety](#thread-safety)
8. [License](#license)
---
## Installation
```bash
# Package coming soon (example)
dotnet add package ReactiveUI.Extensions
```
Reference the project directly while developing locally.
---
## Quick Start
```csharp
using System;
using System.Reactive.Linq;
using ReactiveUI.Extensions;
var source = Observable.Interval(TimeSpan.FromMilliseconds(120))
.Take(10)
.Select(i => (long?) (i % 3 == 0 ? null : i));
// 1. Filter nulls + convert to a Unit signal.
var signal = source.WhereIsNotNull().AsSignal();
// 2. Add a heartbeat if the upstream goes quiet for 500ms.
var withHeartbeat = source.WhereIsNotNull()
.Heartbeat(TimeSpan.FromMilliseconds(500), Scheduler.Default);
// 3. Retry with exponential backoff up to 5 times.
var resilient = Observable.Defer(() =>
Observable.Throw(new InvalidOperationException("Boom")))
.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));
// 4. Conflate bursty updates.
var conflated = source.Conflate(TimeSpan.FromMilliseconds(300), Scheduler.Default);
using (conflated.Subscribe(Console.WriteLine))
{
Console.ReadLine();
}
```
---
## API Catalog
Below is the full list of extension methods (grouped logically).
Some overloads omitted for brevity.
| Category | Operators |
|----------|-----------|
| Null & Signal | `WhereIsNotNull`, `AsSignal` |
| Timing & Scheduling | `SyncTimer`, `Schedule` (overloads), `ScheduleSafe`, `ThrottleFirst`, `ThrottleDistinct`, `DebounceImmediate` |
| Inactivity / Liveness | `Heartbeat`, `DetectStale`, `BufferUntilInactive` |
| Error Handling | `CatchIgnore`, `CatchAndReturn`, `OnErrorRetry` (overloads), `RetryWithBackoff` |
| Combining & Aggregation | `CombineLatestValuesAreAllTrue`, `CombineLatestValuesAreAllFalse`, `GetMax`, `GetMin`, `Partition` |
| Logical / Boolean | `Not`, `WhereTrue`, `WhereFalse` |
| Async / Task | `SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`, `SubscribeAsync` (overloads), `SynchronizeSynchronous`, `SynchronizeAsync`, `SubscribeSynchronous` (overloads), `ToHotTask` |
| Backpressure | `Conflate` |
| Filtering / Conditional | `Filter` (Regex), `TakeUntil` (predicate), `WaitUntil`, `SampleLatest`, `SwitchIfEmpty`, `DropIfBusy` |
| Buffering | `BufferUntil`, `BufferUntilInactive`, `BufferUntilIdle`, `Pairwise`, `ScanWithInitial` |
| Transformation & Utility | `Shuffle`, `ForEach`, `FromArray`, `Using`, `While`, `Start`, `OnNext` (params helper), `DoOnSubscribe`, `DoOnDispose`, `ToReadOnlyBehavior`, `ToPropertyObservable` |
---
## Operator Categories & Examples
### Null / Signal Helpers
```csharp
IObservable raw = GetPossiblyNullStream();
IObservable cleaned = raw.WhereIsNotNull();
IObservable signal = cleaned.AsSignal();
```
### Timing, Scheduling & Flow Control
```csharp
// Shared timer for a given period (one underlying timer per distinct TimeSpan)
var sharedTimer = ReactiveExtensions.SyncTimer(TimeSpan.FromSeconds(1));
// Delay emission of a single value
42.Schedule(TimeSpan.FromMilliseconds(250), Scheduler.Default)
.Subscribe(v => Console.WriteLine($"Delayed: {v}"));
// Safe scheduling when a scheduler may be null
IScheduler? maybeScheduler = null;
maybeScheduler.ScheduleSafe(() => Console.WriteLine("Ran inline"));
// ThrottleFirst: allow first item per window, ignore rest
var throttled = Observable.Interval(TimeSpan.FromMilliseconds(50))
.ThrottleFirst(TimeSpan.FromMilliseconds(200));
// DebounceImmediate: emit first immediately then debounce rest
var debounced = Observable.Interval(TimeSpan.FromMilliseconds(40))
.DebounceImmediate(TimeSpan.FromMilliseconds(250));
// ThrottleDistinct: throttle but only emit when the value actually changes
var source = Observable.Interval(TimeSpan.FromMilliseconds(50)).Take(20);
var distinctThrottled = source.ThrottleDistinct(TimeSpan.FromMilliseconds(200));
```
### Inactivity / Liveness
```csharp
// Heartbeat emits IHeartbeat where IsHeartbeat == true during quiet periods
var heartbeats = Observable.Interval(TimeSpan.FromMilliseconds(400))
.Take(5)
.Heartbeat(TimeSpan.FromMilliseconds(300), Scheduler.Default);
// DetectStale emits IStale: one stale marker after inactivity, or fresh update wrappers
var staleAware = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(500))
.Take(3)
.DetectStale(TimeSpan.FromMilliseconds(300), Scheduler.Default);
// BufferUntilInactive groups events separated by inactivity
var bursts = Observable.Interval(TimeSpan.FromMilliseconds(60)).Take(20);
var groups = bursts.BufferUntilInactive(TimeSpan.FromMilliseconds(200));
```
### Error Handling & Resilience
```csharp
var flaky = Observable.Create(o =>
{
o.OnNext(1);
o.OnError(new InvalidOperationException("Fail"));
return () => { };
});
// Ignore all errors and complete silently
var flakySafe = flaky.CatchIgnore();
// Replace error with a fallback value
var withFallback = flaky.CatchAndReturn(-1);
// Retry only specific exception type with logging
var retried = flaky.OnErrorRetry(ex => Console.WriteLine(ex.Message), retryCount: 3);
// Retry with exponential backoff
var backoff = flaky.RetryWithBackoff(maxRetries: 5, initialDelay: TimeSpan.FromMilliseconds(100));
```
### Combining, Partitioning & Logical Helpers
```csharp
var a = Observable.Interval(TimeSpan.FromMilliseconds(150)).Select(i => i % 2 == 0);
var b = Observable.Interval(TimeSpan.FromMilliseconds(170)).Select(i => i % 3 == 0);
var allTrue = new[] { a, b }.CombineLatestValuesAreAllTrue();
var allFalse = new[] { a, b }.CombineLatestValuesAreAllFalse();
var numbers = Observable.Range(1, 10);
var (even, odd) = numbers.Partition(n => n % 2 == 0); // Partition stream
var toggles = a.Not(); // Negate booleans
```
### Async / Task Integration
```csharp
IObservable inputs = Observable.Range(1, 5);
// Sequential (preserves order)
var seq = inputs.SelectAsyncSequential(async i => { await Task.Delay(50); return i * 2; });
// Latest only (cancels previous)
var latest = inputs.SelectLatestAsync(async i => { await Task.Delay(100); return i; });
// Limited parallelism
var concurrent = inputs.SelectAsyncConcurrent(async i => { await Task.Delay(100); return i; }, maxConcurrency: 2);
// Asynchronous subscription (serializing tasks)
inputs.SubscribeAsync(async i => await Task.Delay(10));
// Synchronous gate: ensures per-item async completion before next is emitted
inputs.SubscribeSynchronous(async i => await Task.Delay(25));
// ToHotTask: convert an observable to a Task that starts immediately
var source = Observable.Return(42);
var task = source.ToHotTask();
var result = await task; // 42
```
### Backpressure / Conflation
```csharp
// Conflate: enforce minimum spacing between emissions while always outputting the most recent value
var noisy = Observable.Interval(TimeSpan.FromMilliseconds(20)).Take(30);
var conflated = noisy.Conflate(TimeSpan.FromMilliseconds(200), Scheduler.Default);
```
### Selective & Conditional Emission
```csharp
// TakeUntil predicate (inclusive)
var untilFive = Observable.Range(1, 100).TakeUntil(x => x == 5);
// WaitUntil first match then complete
var firstEven = Observable.Range(1, 10).WaitUntil(x => x % 2 == 0);
// SampleLatest: sample the latest value whenever a trigger fires
var source = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10);
var trigger = Observable.Interval(TimeSpan.FromMilliseconds(300)).Take(3);
var sampled = source.SampleLatest(trigger);
// SwitchIfEmpty: provide a fallback if the source completes without emitting
var empty = Observable.Empty();
var fallback = Observable.Return(42);
var result = empty.SwitchIfEmpty(fallback); // emits 42
// DropIfBusy: drop values if the previous async operation is still running
var inputs = Observable.Range(1, 5);
var processed = inputs.DropIfBusy(async x => { await Task.Delay(200); Console.WriteLine(x); });
```
### Buffering & Transformation
```csharp
// BufferUntil - collect chars between delimiters
var chars = "".ToCharArray().ToObservable();
var frames = chars.BufferUntil('<', '>'); // emits "", "", ""
// Shuffle arrays in-place
var arrays = Observable.Return(new[] { 1, 2, 3, 4, 5 });
var shuffled = arrays.Shuffle();
// BufferUntilIdle: emit a batch when the stream goes quiet
var events = Observable.Interval(TimeSpan.FromMilliseconds(100)).Take(10);
var batches = events.BufferUntilIdle(TimeSpan.FromMilliseconds(250));
// Pairwise: emit consecutive pairs
var numbers = Observable.Range(1, 5);
var pairs = numbers.Pairwise(); // emits (1,2), (2,3), (3,4), (4,5)
// ScanWithInitial: scan that always emits the initial value first
var values = Observable.Return(5);
var accumulated = values.ScanWithInitial(10, (acc, x) => acc + x); // emits 10, then 15
```
### Subscription & Side Effects
```csharp
var stream = Observable.Range(1, 3)
.DoOnSubscribe(() => Console.WriteLine("Subscribed"))
.DoOnDispose(() => Console.WriteLine("Disposed"));
using (stream.Subscribe(Console.WriteLine))
{
// auto dispose at using end
}
```
### Utility & Miscellaneous
```csharp
// Emit list contents quickly with low allocations
var listSource = Observable.Return>(new List { 1, 2, 3 });
listSource.ForEach().Subscribe(Console.WriteLine);
// Using helper for deterministic disposal
var value = new MemoryStream().Using(ms => ms.Length);
// While loop (reactive)
var counter = 0;
ReactiveExtensions.While(() => counter++ < 3, () => Console.WriteLine(counter))
.Subscribe();
// Batch push with OnNext params
var subj = new Subject();
subj.OnNext(1, 2, 3, 4);
// ToReadOnlyBehavior: create a read-only behavior subject
var (observable, observer) = ReactiveExtensions.ToReadOnlyBehavior(10);
observer.OnNext(20); // observable emits 10, then 20
// ToPropertyObservable: observe property changes on INotifyPropertyChanged
public class ViewModel : INotifyPropertyChanged
{
private string _name;
public string Name
{
get => _name;
set { _name = value; PropertyChanged?.Invoke(this, new PropertyChangedEventArgs(nameof(Name))); }
}
public event PropertyChangedEventHandler? PropertyChanged;
}
var vm = new ViewModel();
var nameChanges = vm.ToPropertyObservable(x => x.Name);
vm.Name = "Hello"; // observable emits "Hello"
```
---
## Async Observables (`IObservableAsync`)
A fully **async-native** observable framework that mirrors the `System.Reactive` programming model but uses `ValueTask`, `CancellationToken`, and `IAsyncDisposable` throughout. Every observer callback (`OnNextAsync`, `OnErrorResumeAsync`, `OnCompletedAsync`) is asynchronous, enabling true end-to-end `async`/`await` pipelines without blocking threads.
**Key design goals:**
- **Async all the way down** – no synchronous observer callbacks; every notification is `ValueTask`-based
- **Cancellation-first** – every operator and subscription accepts `CancellationToken`
- **Async disposal** – subscriptions return `IAsyncDisposable` for clean async resource cleanup
- **Interop** – bidirectional bridge between `IObservable` and `IObservableAsync`
- **Configurable concurrency** – subjects offer serial or concurrent publishing, stateful or stateless modes
> **Note:** Async Observables require **.NET 8** or later.
### Async Quick Start
```csharp
using ReactiveUI.Extensions.Async;
using ReactiveUI.Extensions.Async.Subjects;
using ReactiveUI.Extensions.Async.Disposables;
// 1. Create an async observable from a factory
var source = ObservableAsync.Range(1, 5);
// 2. Apply operators – filter, transform, throttle
var pipeline = source
.Where(x => x % 2 != 0) // keep odd numbers
.Select(x => x * 10) // multiply by 10
.Do(x => Console.WriteLine($"Processing: {x}"));
// 3. Subscribe asynchronously
await using var subscription = await pipeline.SubscribeAsync(
async (value, ct) => Console.WriteLine($"Received: {value}"),
CancellationToken.None);
// 4. Use subjects for multicasting
var subject = SubjectAsync.Create();
await using var sub = await subject.Values.SubscribeAsync(
async (msg, ct) => Console.WriteLine(msg),
CancellationToken.None);
await subject.OnNextAsync("Hello, async Rx!", CancellationToken.None);
// 5. Bridge from classic IObservable
var classic = Observable.Interval(TimeSpan.FromMilliseconds(200)).Take(5);
var asyncVersion = classic.ToObservableAsync();
// 6. Aggregate results
int count = await source.CountAsync(CancellationToken.None);
int first = await source.FirstAsync(CancellationToken.None);
var items = await source.ToListAsync(CancellationToken.None);
```
### Async API Catalog
| Category | Operators / Types |
|----------|-------------------|
| Core Interfaces | `IObservableAsync`, `IObserverAsync`, `ObservableAsync`, `ConnectableObservableAsync` |
| Factory Methods | `Create`, `CreateAsBackgroundJob`, `Return`, `Empty`, `Never`, `Throw`, `Range`, `Interval`, `Timer`, `Defer`, `FromAsync`, `ToObservableAsync` (Task / IAsyncEnumerable / IEnumerable) |
| Transformation | `Select` (sync & async), `SelectMany`, `Scan` (sync & async), `Cast`, `OfType`, `GroupBy` |
| Filtering | `Where` (sync & async), `Take`, `Skip`, `TakeWhile`, `SkipWhile`, `TakeUntil` (observable / task / token / predicate), `Distinct`, `DistinctBy`, `DistinctUntilChanged`, `DistinctUntilChangedBy` |
| Combining | `Merge` (overloads), `Concat` (overloads), `Switch`, `CombineLatest` (2–8 sources + enumerable overloads), `Zip`, `Prepend`, `StartWith` |
| Error Handling | `Catch`, `CatchAndIgnoreErrorResume`, `CatchIgnore`, `CatchAndReturn`, `Retry` (infinite & count-limited), `OnErrorResumeAsFailure` |
| Timing | `Throttle`, `Delay`, `Timeout` (with optional fallback), `ThrottleDistinct`, `DebounceUntil` |
| Aggregation | `AggregateAsync`, `CountAsync`, `LongCountAsync`, `AnyAsync`, `AllAsync`, `ContainsAsync`, `FirstAsync`, `FirstOrDefaultAsync`, `LastAsync`, `LastOrDefaultAsync`, `SingleAsync`, `SingleOrDefaultAsync`, `ToListAsync`, `ToDictionaryAsync` |
| Side Effects | `Do` (sync & async), `DoOnSubscribe` (sync & async), `LogErrors`, `DropIfBusy`, `OnDispose` (sync & async), `Using` |
| Multicasting | `Publish`, `StatelessPublish`, `ReplayLatest`, `RefCount`, `Multicast` |
| Scheduling | `ObserveOn` (AsyncContext / SynchronizationContext / TaskScheduler / IScheduler), `ObserveOnSafe`, `ObserveOnIf`, `Yield` |
| Subscription | `SubscribeAsync` (multiple overloads), `ForEachAsync`, `WaitCompletionAsync` |
| Conversion | `ToObservable` (async → classic), `ToObservableAsync` (classic → async), `ToAsyncEnumerable`, `Wrap` |
| Helpers | `AsSignal`, `CombineLatestValuesAreAllTrue`, `CombineLatestValuesAreAllFalse`, `ForEach`, `GetMax`, `GetMin`, `LatestOrDefault`, `Not`, `Pairwise`, `Partition`, `ReplayLastOnSubscribe`, `ScanWithInitial` (sync & async), `SkipWhileNull`, `Start`, `WaitUntil`, `WhereFalse`, `WhereIsNotNull`, `WhereTrue` |
| Subjects | `SubjectAsync.Create`, `SubjectAsync.CreateBehavior`, `SubjectAsync.CreateReplayLatest` |
| Disposables | `DisposableAsync`, `CompositeDisposableAsync` |
| Mixins | `AsObserverAsync`, `MapValues`, `ToDisposableAsync` |
---
### Core Interfaces & Types
| Type | Description |
|------|-------------|
| `IObservableAsync` | Async push-based notification provider. `SubscribeAsync` returns `ValueTask`. |
| `IObserverAsync` | Receives async notifications: `OnNextAsync`, `OnErrorResumeAsync`, `OnCompletedAsync`. Implements `IAsyncDisposable`. |
| `ObservableAsync` | Abstract base class for building custom async observables. Override `SubscribeAsyncCore`. |
| `ConnectableObservableAsync` | Extends `ObservableAsync` with `ConnectAsync` for deferred subscription to the underlying source. |
| `ISubjectAsync` | Async subject: exposes `Values` (`IObservableAsync`) and publishing methods (`OnNextAsync`, `OnErrorResumeAsync`, `OnCompletedAsync`). |
| `Result` | Completion token carrying either success or a failure `Exception`. Used with `OnCompletedAsync`. |
| `AsyncContext` | Wraps a `SynchronizationContext` or `TaskScheduler` for scheduling async observer notifications. |
```csharp
// Implementing a custom async observable
public class TickObservable : ObservableAsync
{
protected override async ValueTask SubscribeAsyncCore(
IObserverAsync observer, CancellationToken cancellationToken)
{
for (int i = 0; i < 5 && !cancellationToken.IsCancellationRequested; i++)
{
await observer.OnNextAsync(i, cancellationToken);
await Task.Delay(100, cancellationToken);
}
await observer.OnCompletedAsync(Result.Success);
return DisposableAsync.Empty;
}
}
```
### Factory Methods
| Method | Description |
|--------|-------------|
| `ObservableAsync.Create(subscribe)` | Create from an async delegate that receives `(observer, ct)` and returns `IAsyncDisposable`. |
| `ObservableAsync.CreateAsBackgroundJob(subscribe)` | Like `Create`, but the subscribe delegate runs as a background job, returning immediately. |
| `ObservableAsync.Return(value)` | Emits a single value then completes. |
| `ObservableAsync.Empty()` | Completes immediately without emitting any values. |
| `ObservableAsync.Never()` | Never emits and never completes. |
| `ObservableAsync.Throw(exception)` | Completes immediately with the specified error. |
| `ObservableAsync.Range(start, count)` | Emits a sequence of integers. |
| `ObservableAsync.Interval(period)` | Emits incrementing `long` values at a regular interval. Accepts optional `TimeProvider`. |
| `ObservableAsync.Timer(dueTime)` | Emits `0L` after the specified delay. Supports one-shot and periodic overloads. Accepts optional `TimeProvider`. |
| `ObservableAsync.Defer(factory)` | Defers creation of the observable until subscription time. |
| `ObservableAsync.FromAsync(func)` | Wraps an async function as a single-element observable. |
| `task.ToObservableAsync()` | Converts a `Task` to an async observable. |
| `asyncEnumerable.ToObservableAsync()` | Converts an `IAsyncEnumerable` to an async observable. |
| `enumerable.ToObservableAsync()` | Converts an `IEnumerable` to an async observable. |
```csharp
// Create from factory delegate
var custom = ObservableAsync.Create(async (observer, ct) =>
{
await observer.OnNextAsync("Hello", ct);
await observer.OnNextAsync("World", ct);
await observer.OnCompletedAsync(Result.Success);
return DisposableAsync.Empty;
});
// From a Task
var fromTask = Task.FromResult(42).ToObservableAsync();
// From IAsyncEnumerable
async IAsyncEnumerable GenerateAsync()
{
for (int i = 0; i < 5; i++)
{
await Task.Delay(50);
yield return i;
}
}
var fromAsyncEnum = GenerateAsync().ToObservableAsync();
// Periodic timer (with TimeProvider support)
var ticks = ObservableAsync.Interval(TimeSpan.FromSeconds(1));
// Deferred creation
var deferred = ObservableAsync.Defer(() =>
ObservableAsync.Return(DateTime.Now.Second));
```
### Transformation Operators
| Operator | Description |
|----------|-------------|
| `Select(selector)` | Projects each element using a synchronous transform function. |
| `Select(asyncSelector)` | Projects each element using an async transform function. |
| `SelectMany(selector)` | Flattens nested async observables produced by a projection. Multiple overloads for collection selectors. |
| `Scan(seed, accumulator)` | Applies an accumulator over the sequence, emitting each intermediate result. Sync and async overloads. |
| `Cast()` | Casts each element to the specified type. |
| `OfType()` | Filters elements that are assignable to the specified type. |
| `GroupBy(keySelector)` | Groups elements by key, emitting `GroupedAsyncObservable` per group. |
```csharp
var source = ObservableAsync.Range(1, 5);
// Sync projection
var doubled = source.Select(x => x * 2);
// Async projection
var projected = source.Select(async (x, ct) =>
{
await Task.Delay(10, ct);
return x.ToString();
});
// Flat map: each item produces a sub-sequence
var flattened = source.SelectMany(x => ObservableAsync.Range(x * 10, 3));
// Running total with Scan
var runningTotal = source.Scan(0, (acc, x) => acc + x); // 1, 3, 6, 10, 15
// Group by even/odd
var grouped = source.GroupBy(x => x % 2 == 0 ? "even" : "odd");
```
### Filtering Operators
| Operator | Description |
|----------|-------------|
| `Where(predicate)` | Filters elements using a synchronous predicate. |
| `Where(asyncPredicate)` | Filters elements using an async predicate. |
| `Take(count)` | Takes the first N elements then completes. |
| `Skip(count)` | Skips the first N elements. |
| `TakeWhile(predicate)` | Takes elements while the predicate holds, then completes. |
| `SkipWhile(predicate)` | Skips elements while the predicate holds, then emits the rest. |
| `TakeUntil(other)` | Takes elements until another async observable, Task, CancellationToken, or predicate signals. |
| `Distinct()` | Emits only elements not previously seen. |
| `DistinctBy(keySelector)` | Emits only elements with unique keys. |
| `DistinctUntilChanged()` | Suppresses consecutive duplicate elements. |
| `DistinctUntilChangedBy(keySelector)` | Suppresses consecutive elements with the same key. |
```csharp
var source = ObservableAsync.Range(1, 10);
// Basic filter
var evens = source.Where(x => x % 2 == 0); // 2, 4, 6, 8, 10
// Async filter
var asyncFiltered = source.Where(async (x, ct) =>
{
await Task.Delay(1, ct);
return x > 5;
});
// Take and Skip
var firstThree = source.Take(3); // 1, 2, 3
var skipThree = source.Skip(3); // 4, 5, 6, 7, 8, 9, 10
// TakeWhile / SkipWhile
var whileLow = source.TakeWhile(x => x < 4); // 1, 2, 3
var afterLow = source.SkipWhile(x => x < 4); // 4, 5, 6, 7, 8, 9, 10
// TakeUntil with a cancellation token
using var cts = new CancellationTokenSource(TimeSpan.FromSeconds(5));
var bounded = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100))
.TakeUntil(cts.Token);
// Distinct and DistinctUntilChanged
var items = new[] { 1, 2, 2, 3, 1, 3 }.ToObservableAsync();
var unique = items.Distinct(); // 1, 2, 3
var noConsecDups = items.DistinctUntilChanged(); // 1, 2, 3, 1, 3
```
### Combining & Merging
| Operator | Description |
|----------|-------------|
| `Merge(sources)` | Merges multiple async observables, interleaving values as they arrive. Supports max concurrency. |
| `Concat(sources)` | Concatenates multiple async observables sequentially – subscribes to the next only after the previous completes. |
| `Switch()` | Flattens a higher-order async observable by always switching to the most recent inner sequence. |
| `CombineLatest(other, selector)` | Combines the latest values from 2–8 sources whenever any source emits. |
| `Zip(other)` | Pairs elements from two sources by index. Optional result selector. |
| `Prepend(value)` | Prepends a single value before the source sequence. |
| `StartWith(values)` | Prepends one or more values before the source sequence. |
```csharp
var a = ObservableAsync.Range(1, 3); // 1, 2, 3
var b = ObservableAsync.Range(10, 3); // 10, 11, 12
// Merge: interleave two streams
var merged = ObservableAsync.Merge(a, b);
// Concat: sequential
var sequential = ObservableAsync.Concat(a, b); // 1, 2, 3, 10, 11, 12
// CombineLatest
var combined = a.CombineLatest(b, (x, y) => $"{x}+{y}");
// Zip by index
var zipped = a.Zip(b, (x, y) => x + y); // 11, 13, 15
// Prepend / StartWith
var withPrefix = a.Prepend(0); // 0, 1, 2, 3
var withMany = a.StartWith([-2, -1, 0]); // -2, -1, 0, 1, 2, 3
```
### Error Handling & Retry
| Operator | Description |
|----------|-------------|
| `Catch(handler)` | Catches errors and switches to a fallback async observable. |
| `CatchAndIgnoreErrorResume()` | Catches error-resume notifications and suppresses them, allowing the sequence to continue. |
| `Retry()` | Re-subscribes to the source indefinitely on error. |
| `Retry(count)` | Re-subscribes up to `count` times on error. |
| `OnErrorResumeAsFailure()` | Converts `OnErrorResumeAsync` calls into `OnCompletedAsync` with a `Result.Failure`. |
```csharp
// Catch and switch to fallback
var flaky = ObservableAsync.Throw(new InvalidOperationException("Oops"));
var safe = flaky.Catch(ex => ObservableAsync.Return(-1)); // emits -1
// Retry up to 3 times
var retried = flaky.Retry(3);
// Retry indefinitely
var forever = flaky.Retry();
// Convert error-resume to failure completion
var strict = someSource.OnErrorResumeAsFailure();
```
### Timing & Scheduling (Async)
| Operator | Description |
|----------|-------------|
| `Throttle(timeSpan)` | Suppresses values that arrive within the specified window after the last emission. Accepts optional `TimeProvider`. |
| `Delay(timeSpan)` | Delays every emission by the specified duration. Accepts optional `TimeProvider`. |
| `Timeout(timeSpan)` | Raises a `TimeoutException` if no value arrives within the specified window. |
| `Timeout(timeSpan, fallback)` | Switches to a fallback source on timeout instead of faulting. |
| `ObserveOn(context)` | Shifts observer notifications to the specified `AsyncContext`, `SynchronizationContext`, `TaskScheduler`, or `IScheduler`. |
| `Yield()` | Yields control (via `Task.Yield`) between each notification, ensuring other work can interleave. |
```csharp
var source = ObservableAsync.Interval(TimeSpan.FromMilliseconds(50)).Take(10);
// Throttle rapid emissions (debounce)
var throttled = source.Throttle(TimeSpan.FromMilliseconds(200));
// Delay each value by 100ms
var delayed = source.Delay(TimeSpan.FromMilliseconds(100));
// Timeout if nothing arrives in 2 seconds
var guarded = source.Timeout(TimeSpan.FromSeconds(2));
// Timeout with fallback
var withFallback = source.Timeout(
TimeSpan.FromSeconds(2),
ObservableAsync.Return(999L));
// Schedule notifications onto a specific context
var onContext = source.ObserveOn(new AsyncContext(SynchronizationContext.Current!));
// Yield between each notification
var yielded = source.Yield();
```
### Aggregation & Element Operators
| Operator | Description |
|----------|-------------|
| `AggregateAsync(seed, accumulator)` | Applies an accumulator over the sequence and returns the final result. |
| `CountAsync()` | Returns the number of elements. |
| `LongCountAsync()` | Returns the element count as `long`. |
| `AnyAsync()` / `AnyAsync(predicate)` | Returns `true` if the sequence has any elements (optionally matching a predicate). |
| `AllAsync(predicate)` | Returns `true` if all elements match the predicate. |
| `ContainsAsync(value)` | Returns `true` if the sequence contains the specified value. |
| `FirstAsync()` | Returns the first element or throws if empty. |
| `FirstOrDefaultAsync()` | Returns the first element or `default` if empty. |
| `LastAsync()` | Returns the last element or throws if empty. |
| `LastOrDefaultAsync()` | Returns the last element or `default` if empty. |
| `SingleAsync()` | Returns the only element; throws if empty or more than one. |
| `SingleOrDefaultAsync()` | Returns the only element or `default`; throws if more than one. |
| `ToListAsync()` | Collects all elements into a `List`. |
| `ToDictionaryAsync(keySelector)` | Collects all elements into a `Dictionary`. |
| `ForEachAsync(action)` | Invokes an async action for each element. |
| `WaitCompletionAsync()` | Awaits the completion of the observable without capturing values. |
```csharp
var source = ObservableAsync.Range(1, 5);
var ct = CancellationToken.None;
int count = await source.CountAsync(ct); // 5
int sum = await source.AggregateAsync(0, (a, x) => a + x, ct); // 15
bool hasEvens = await source.AnyAsync(x => x % 2 == 0, ct); // true
bool allPos = await source.AllAsync(x => x > 0, ct); // true
bool has3 = await source.ContainsAsync(3, ct); // true
int first = await source.FirstAsync(ct); // 1
int last = await source.LastAsync(ct); // 5
List all = await source.ToListAsync(ct); // [1, 2, 3, 4, 5]
var dict = await source.ToDictionaryAsync(x => x.ToString(), ct); // {"1":1, "2":2, ...}
// Iterate with async action
await source.ForEachAsync(async (x, ct2) =>
Console.WriteLine($"Item: {x}"), ct);
// Wait for completion without capturing values
await source.WaitCompletionAsync(ct);
```
### Async Helpers & Operators
These additions bring the async surface closer to the high-value helper operators already available in the synchronous `ReactiveExtensions` API, while staying fully async-native and composed from `IObservableAsync`, `IObserverAsync`, `ValueTask`, and `IAsyncDisposable`.
| Method / Operator | Description |
|-------------------|-------------|
| `AsSignal()` | Converts any async observable into `IObservableAsync` by replacing each value with `Unit.Default`. |
| `CatchIgnore()` | Suppresses terminal failure and completes with an empty sequence instead. |
| `CatchIgnore(onError)` | Suppresses a matching terminal exception, invokes the callback, then completes. |
| `CatchAndReturn(fallback)` | Replaces terminal failure with a single fallback value. |
| `CatchAndReturn(fallbackFactory)` | Maps a matching terminal exception to a fallback value and completes successfully. |
| `CombineLatest(IEnumerable>)` | Combines the latest value from every source in an enumerable collection and emits snapshots as `IList`. |
| `CombineLatest(IEnumerable>, selector)` | Combines an enumerable collection of sources and projects the latest snapshot into a result. |
| `CombineLatestValuesAreAllTrue()` | Emits `true` when the latest value from every boolean source is `true`. |
| `CombineLatestValuesAreAllFalse()` | Emits `true` when the latest value from every boolean source is `false`. |
| `DoOnSubscribe(action)` | Executes a synchronous action each time the sequence is subscribed. |
| `DoOnSubscribe(asyncAction)` | Executes an async action each time the sequence is subscribed. |
| `DropIfBusy(asyncAction)` | Ignores new values while the previous async action is still running, emitting only accepted values. |
| `ForEach()` | Flattens `IObservableAsync>` into `IObservableAsync`. |
| `GetMax()` | Computes the maximum of the latest values across multiple async observable sources. |
| `GetMin()` | Computes the minimum of the latest values across multiple async observable sources. |
| `LatestOrDefault(defaultValue)` | Starts with a default value, then emits distinct latest source values. |
| `LogErrors(logger)` | Invokes an error logger for `OnErrorResumeAsync` notifications without altering the sequence. |
| `Not()` | Negates each boolean value in the sequence. |
| `ObserveOnSafe(asyncContext)` | Applies `ObserveOn` only when a non-null `AsyncContext` is provided. |
| `ObserveOnSafe(taskScheduler)` | Applies `ObserveOn` only when a non-null `TaskScheduler` is provided. |
| `ObserveOnIf(condition, asyncContext)` | Conditionally observes on the provided `AsyncContext`. |
| `ObserveOnIf(condition, taskScheduler)` | Conditionally observes on the provided `TaskScheduler`. |
| `Pairwise()` | Emits adjacent `(Previous, Current)` pairs from a single source subscription. |
| `Partition(predicate)` | Splits a source into two async observables: matching values and non-matching values. |
| `ReplayLastOnSubscribe(initialValue)` | Shares the sequence and replays the latest observed value, beginning with an initial value. |
| `ScanWithInitial(initial, accumulator)` | Emits the initial accumulator value first, then each intermediate accumulation. Sync and async overloads are available. |
| `SkipWhileNull()` | Skips `null` values until the first non-null value arrives, then emits all remaining values. |
| `Start(action)` | Executes an action as an async observable that emits `Unit.Default` when complete. |
| `Start(function)` | Executes a function as an async observable that emits the computed result. |
| `ThrottleDistinct()` | Applies `DistinctUntilChanged`, then `Throttle`, then `DistinctUntilChanged` again to reduce noisy duplicates. |
| `DebounceUntil()` | Delays values unless the provided condition is immediately satisfied. |
| `WaitUntil(predicate)` | Emits the first value that satisfies a predicate, then completes. |
| `WhereFalse()` | Filters a boolean sequence to `false` values only. |
| `WhereIsNotNull()` | Filters out `null` values and narrows the resulting type to non-nullable. |
| `WhereTrue()` | Filters a boolean sequence to `true` values only. |
```csharp
// AsSignal: convert any payload stream into Unit notifications
var saveClicks = ObservableAsync.Range(1, 3).AsSignal();
// CatchIgnore: suppress terminal failure and complete silently
var ignoredFailure = ObservableAsync.Throw(new InvalidOperationException("boom"))
.CatchIgnore();
// CatchIgnore: handle a specific exception type then complete
var ignoredWithHandler = ObservableAsync.Throw(new TimeoutException())
.CatchIgnore(ex => Console.WriteLine(ex.Message));
// CatchAndReturn: replace failure with a fallback value
var recovered = ObservableAsync.Throw(new InvalidOperationException("boom"))
.CatchAndReturn(-1);
// CatchAndReturn: map a matching exception to a fallback value
var recoveredTyped = ObservableAsync.Throw(new TimeoutException("late"))
.CatchAndReturn(ex => ex.Message.Length);
// CombineLatest over IEnumerable>
IObservableAsync[] numberSources =
[
ObservableAsync.Return(1),
ObservableAsync.Return(10),
ObservableAsync.Return(100),
];
var latestSnapshots = numberSources.CombineLatest();
var latestSum = numberSources.CombineLatest(values => values.Sum());
// CombineLatestValuesAreAllTrue / CombineLatestValuesAreAllFalse
IObservableAsync[] flags =
[
ObservableAsync.Return(true),
ObservableAsync.Return(false),
];
var allTrue = flags.CombineLatestValuesAreAllTrue();
var allFalse = flags.CombineLatestValuesAreAllFalse();
// DoOnSubscribe: run logic when a subscription starts
var withSubscribeLog = ObservableAsync.Range(1, 3)
.DoOnSubscribe(() => Console.WriteLine("Subscribed"));
// DoOnSubscribe async overload
var withAsyncSubscribeLog = ObservableAsync.Range(1, 3)
.DoOnSubscribe(async ct =>
{
await Task.Delay(10, ct);
Console.WriteLine("Subscribed asynchronously");
});
// DropIfBusy: ignore new values while previous async work is still running
var searchRequests = ObservableAsync.Range(1, 10)
.DropIfBusy(async (value, ct) =>
{
await Task.Delay(50, ct);
Console.WriteLine($"Processed {value}");
});
// ForEach: flatten IEnumerable batches into a single element stream
IObservableAsync> batches = new[]
{
new[] { 1, 2 },
new[] { 3, 4 },
}.ToObservableAsync();
var flattened = batches.ForEach();
// GetMax / GetMin across latest values
var maxValues = ObservableAsync.Return(2).GetMax(ObservableAsync.Return(5), ObservableAsync.Return(3));
var minValues = ObservableAsync.Return(2).GetMin(ObservableAsync.Return(5), ObservableAsync.Return(3));
// LatestOrDefault: emit an initial default before distinct latest source values
var latestName = ObservableAsync.Return("Alice").LatestOrDefault("(unknown)");
// LogErrors: inspect error notifications without changing the pipeline shape
var loggedErrors = ObservableAsync.Throw(new InvalidOperationException("boom"))
.LogErrors(ex => Console.WriteLine($"Logged: {ex.Message}"));
// Not / WhereTrue / WhereFalse helpers for boolean streams
var bools = new[] { true, false, true, false }.ToObservableAsync();
var negated = bools.Not();
var onlyTrue = bools.WhereTrue();
var onlyFalse = bools.WhereFalse();
// ObserveOnSafe: only switch context when one is available
var currentContext = AsyncContext.GetCurrent();
var observedSafely = ObservableAsync.Range(1, 3).ObserveOnSafe(currentContext);
var observedOnSchedulerSafely = ObservableAsync.Range(1, 3).ObserveOnSafe(TaskScheduler.Default);
// ObserveOnIf: conditional context switch
var maybeObserved = ObservableAsync.Range(1, 3).ObserveOnIf(true, currentContext);
var maybeObservedOnScheduler = ObservableAsync.Range(1, 3).ObserveOnIf(true, TaskScheduler.Default);
// Pairwise: emit adjacent value pairs
var pairs = ObservableAsync.Range(1, 4).Pairwise(); // (1,2), (2,3), (3,4)
// Partition: split a source into matching and non-matching branches
var (evens, odds) = ObservableAsync.Range(1, 6)
.Partition(x => x % 2 == 0);
// ReplayLastOnSubscribe: share the sequence and replay latest value to new subscribers
var replayed = ObservableAsync.Range(1, 3)
.ReplayLastOnSubscribe(0);
// ScanWithInitial: emit seed first, then running accumulation
var runningSum = ObservableAsync.Range(1, 3)
.ScanWithInitial(0, (acc, value) => acc + value); // 0, 1, 3, 6
// ScanWithInitial async overload
var runningSumAsync = ObservableAsync.Range(1, 3)
.ScanWithInitial(0, async (acc, value, ct) =>
{
await Task.Delay(1, ct);
return acc + value;
});
// SkipWhileNull / WhereIsNotNull: null-aware helpers for reference streams
string?[] names = [null, null, "Alice", null, "Bob"];
var afterFirstValue = names.ToObservableAsync().SkipWhileNull();
var withoutNulls = names.ToObservableAsync().WhereIsNotNull();
// Start helpers
var startedAction = ObservableAsync.Start(() => Console.WriteLine("Action ran"));
var startedFunction = ObservableAsync.Start(() => DateTime.UtcNow.Year);
// ThrottleDistinct: reduce duplicate noise around throttled updates
var throttledDistinct = ObservableAsync.Interval(TimeSpan.FromMilliseconds(50))
.Select(x => x % 2)
.ThrottleDistinct(TimeSpan.FromMilliseconds(100));
// DebounceUntil: bypass the delay when the condition is already satisfied
var debouncedUntil = ObservableAsync.Range(1, 5)
.DebounceUntil(TimeSpan.FromMilliseconds(100), value => value == 5);
// WaitUntil: emit the first matching value and complete
var firstLargeValue = ObservableAsync.Range(1, 10)
.WaitUntil(x => x > 7);
```
### Side Effects & Lifecycle
| Operator | Description |
|----------|-------------|
| `Do(onNext)` | Performs a synchronous side effect for each element without altering the sequence. |
| `Do(asyncOnNext)` | Performs an async side effect for each element. |
| `OnDispose(action)` | Registers a synchronous action to run when the subscription is disposed. |
| `OnDispose(asyncAction)` | Registers an async action to run on disposal. |
| `Using(resourceFactory, observableFactory)` | Creates a disposable resource tied to the subscription lifetime. |
```csharp
var source = ObservableAsync.Range(1, 3);
// Sync side effect
var logged = source.Do(x => Console.WriteLine($"[LOG] {x}"));
// Async side effect
var asyncLogged = source.Do(async (x, ct) =>
{
await Task.Delay(1, ct);
Console.WriteLine($"[ASYNC LOG] {x}");
});
// Cleanup on disposal
var withCleanup = source.OnDispose(() => Console.WriteLine("Cleaned up!"));
// Async cleanup
var withAsyncCleanup = source.OnDispose(async () =>
{
await Task.Delay(10);
Console.WriteLine("Async cleanup done!");
});
// Using: tie a resource to the subscription lifetime
var withResource = ObservableAsync.Using(
() => new MemoryStream(),
stream => ObservableAsync.Return(stream.Length));
```
### Multicasting & Sharing
| Operator | Description |
|----------|-------------|
| `Publish()` | Returns a `ConnectableObservableAsync` that multicasts the source to multiple observers using a serial subject. |
| `StatelessPublish()` | Returns a connectable observable using a stateless serial subject. |
| `ReplayLatest()` | Returns a connectable observable that replays the most recent value to new subscribers. |
| `RefCount()` | Automatically connects a connectable observable when the first observer subscribes and disconnects when the last unsubscribes. |
| `Multicast(subjectFactory)` | General-purpose multicasting with a custom subject factory. |
```csharp
var source = ObservableAsync.Interval(TimeSpan.FromMilliseconds(100)).Take(5);
// Publish + explicit connect
var published = source.Publish();
await using var sub1 = await published.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Sub1: {v}"), CancellationToken.None);
await using var sub2 = await published.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Sub2: {v}"), CancellationToken.None);
await using var connection = await published.ConnectAsync(CancellationToken.None);
// RefCount: auto-connect on first subscriber, auto-disconnect on last
var shared = source.Publish().RefCount();
await using var sub3 = await shared.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Shared: {v}"), CancellationToken.None);
// ReplayLatest: new subscribers get the most recent value immediately
var replayed = source.ReplayLatest().RefCount();
```
### Bridging & Conversion
| Method | Description |
|--------|-------------|
| `observable.ToObservableAsync()` | Wraps a classic `IObservable` as `IObservableAsync`. Synchronous notifications are forwarded sequentially through async callbacks. |
| `asyncObservable.ToObservable()` | Exposes an `IObservableAsync` as a classic `IObservable`. Async callbacks are awaited; the synchronous observer is notified on the completing thread. |
| `asyncObservable.ToAsyncEnumerable()` | Converts an async observable to `IAsyncEnumerable` for consumption with `await foreach`. |
| `source.Wrap(converter)` | Wraps each value using a converter function producing a new async observable type. |
```csharp
// Classic IObservable → IObservableAsync
IObservable classic = Observable.Range(1, 5);
IObservableAsync async1 = classic.ToObservableAsync();
// IObservableAsync → classic IObservable
IObservable backToClassic = async1.ToObservable();
// IObservableAsync → IAsyncEnumerable
var asyncEnum = ObservableAsync.Range(1, 5).ToAsyncEnumerable();
await foreach (var item in asyncEnum)
{
Console.WriteLine(item);
}
```
### Subjects
Subjects act as both an observer and an observable, enabling manual publishing of values to multiple subscribers.
| Factory Method | Description |
|----------------|-------------|
| `SubjectAsync.Create()` | Creates a subject with default options (serial publishing, stateful). |
| `SubjectAsync.Create(options)` | Creates a subject with configurable publishing (`Serial` / `Concurrent`) and statefulness. |
| `SubjectAsync.CreateBehavior(startValue)` | Creates a behavior subject that replays the latest value (starting with `startValue`) to new subscribers. |
| `SubjectAsync.CreateReplayLatest()` | Creates a subject that replays only the most recent value to new subscribers (no initial value required). |
**Publishing options:**
| Option | Behavior |
|--------|----------|
| `PublishingOption.Serial` | Notifications are delivered to observers one at a time. Safe for single-producer scenarios. |
| `PublishingOption.Concurrent` | Notifications are delivered concurrently to all observers. Useful when observers are independent. |
```csharp
// Basic subject
var subject = SubjectAsync.Create();
await using var sub = await subject.Values.SubscribeAsync(
async (value, ct) => Console.WriteLine($"Got: {value}"),
CancellationToken.None);
await subject.OnNextAsync(1, CancellationToken.None);
await subject.OnNextAsync(2, CancellationToken.None);
await subject.OnCompletedAsync(Result.Success);
// Behavior subject: new subscribers receive the current value
var behavior = SubjectAsync.CreateBehavior("initial");
await using var behaviorSub = await behavior.Values.SubscribeAsync(
async (value, ct) => Console.WriteLine($"Behavior: {value}"), // prints "initial"
CancellationToken.None);
await behavior.OnNextAsync("updated", CancellationToken.None); // prints "updated"
// Concurrent publishing for fan-out scenarios
var concurrent = SubjectAsync.Create(new SubjectCreationOptions
{
PublishingOption = PublishingOption.Concurrent,
IsStateless = false
});
// ReplayLatest: replays the last value without a required start value
var replay = SubjectAsync.CreateReplayLatest();
await replay.OnNextAsync(3.14, CancellationToken.None);
// Late subscriber still receives 3.14
await using var lateSub = await replay.Values.SubscribeAsync(
async (v, ct) => Console.WriteLine($"Replay: {v}"),
CancellationToken.None);
```
### Async Disposables
| Type | Description |
|------|-------------|
| `DisposableAsync.Empty` | A no-op `IAsyncDisposable` – useful as a default or placeholder. |
| `DisposableAsync.Create(func)` | Creates an `IAsyncDisposable` from a `Func` delegate. Ensures the delegate runs at most once. |
| `CompositeDisposableAsync` | A thread-safe collection of `IAsyncDisposable` objects that are disposed together. Supports `Add`, `Remove`, and bulk disposal. |
```csharp
// Create from a delegate
var disposable = DisposableAsync.Create(async () =>
{
await Task.Delay(10);
Console.WriteLine("Resource released");
});
await disposable.DisposeAsync();
// Compose multiple async disposables
var composite = new CompositeDisposableAsync();
composite.Add(DisposableAsync.Create(() => { Console.WriteLine("A"); return default; }));
composite.Add(DisposableAsync.Create(() => { Console.WriteLine("B"); return default; }));
Console.WriteLine($"Count: {composite.Count}"); // 2
await composite.DisposeAsync(); // disposes A and B
Console.WriteLine($"IsDisposed: {composite.IsDisposed}"); // true
```
---
## Performance Notes
- `FastForEach` path avoids iterator allocations for `List`, `IList`, and arrays.
- `SyncTimer` ensures only one shared timer per period reducing timer overhead.
- `Conflate` helps tame high–frequency producers without dropping the final value of a burst.
- `Heartbeat` and `DetectStale` use lightweight scheduling primitives.
- Most operators avoid capturing lambdas in hot loops where practical.
- Async observable operators use `ValueTask` throughout to minimize allocations on the hot path.
- `ObserverAsync` base class detects concurrent re-entrant calls, raising `ConcurrentObserverCallsException` in debug scenarios.
- Subject variants (serial vs. concurrent, stateful vs. stateless) let you choose the right trade-off between safety and throughput.
## Thread Safety
- All operators are pure functional transformations unless documented otherwise.
- `SyncTimer` uses a `ConcurrentDictionary` and returns a hot `IConnectableObservable` that connects once per unique `TimeSpan`.
- Methods returning shared observables (`SyncTimer`, `Partition` result sequences) are safe for multi-subscriber usage unless the upstream is inherently side-effecting.
- `ISubjectAsync` subjects with `PublishingOption.Serial` serialize notifications; `PublishingOption.Concurrent` variants deliver to all observers concurrently.
- `CompositeDisposableAsync` is thread-safe for concurrent `Add`/`Remove`/`DisposeAsync` calls.
## License
MIT – see LICENSE file.
---
## Contributing
Issues / PRs welcome. Please keep additions dependency–free and focused on broadly useful reactive patterns.
---
## Change Log (Excerpt)
(Keep this section updated as the library evolves.)
- Added async task projection helpers (`SelectAsyncSequential`, `SelectLatestAsync`, `SelectAsyncConcurrent`).
- Added liveness operators (`Heartbeat`, `DetectStale`, `BufferUntilInactive`).
- Added resilience (`RetryWithBackoff`, expanded `OnErrorRetry` overloads).
- Added flow control (`Conflate`, `ThrottleFirst`, `DebounceImmediate`, `ThrottleDistinct`).
- Added buffering and transformation operators (`BufferUntilIdle`, `Pairwise`, `ScanWithInitial`).
- Added filtering and conditional operators (`SampleLatest`, `SwitchIfEmpty`, `DropIfBusy`).
- Added utility operators (`ToReadOnlyBehavior`, `ToHotTask`, `ToPropertyObservable`).
- Fixed `SynchronizeSynchronous` to properly propagate OnError and OnCompleted events.
- Removed DisposeWith extension use System.Reactive.Disposables.Fluent from System.Reactive.
- Added fully async-native observable framework (`IObservableAsync`, `IObserverAsync`) with 60+ operators, subjects, bridge extensions, and async disposables.
---
Happy reactive coding!