Custom Operators and Operator Development
1. Creating Custom Pipeable Operators
| Concept | Pattern | Description | Use Case |
|---|---|---|---|
| Pipeable Operator | function<T, R>(Observable<T>): Observable<R> |
Function that transforms observables, chainable with pipe() | Creating reusable transformation logic |
| Operator Factory | function(...args) { return (source$) => ... } |
Higher-order function that returns an operator | Parameterized custom operators |
| lift() Method DEPRECATED | source.lift(operator) |
Legacy operator creation method | Use pipeable operators instead |
| Observable Constructor | new Observable(subscriber => ...) |
Manual observable creation within operator | Full control over emission logic |
| pipe() Composition | pipe(operator1, operator2, ...) |
Combine multiple operators into one | Building complex operators from simple ones |
Example: Basic custom pipeable operator
// Simple operator without parameters
function doubleValue<T extends number>() {
return (source$: Observable<T>): Observable<T> => {
return new Observable(subscriber => {
return source$.subscribe({
next: value => subscriber.next((value * 2) as T),
error: err => subscriber.error(err),
complete: () => subscriber.complete()
});
});
};
}
// Usage
of(1, 2, 3).pipe(
doubleValue()
).subscribe(console.log); // 2, 4, 6
Example: Parameterized custom operator
// Operator factory pattern
function multiplyBy<T extends number>(multiplier: number) {
return (source$: Observable<T>): Observable<T> => {
return new Observable(subscriber => {
return source$.subscribe({
next: value => subscriber.next((value * multiplier) as T),
error: err => subscriber.error(err),
complete: () => subscriber.complete()
});
});
};
}
// Usage
of(1, 2, 3).pipe(
multiplyBy(10)
).subscribe(console.log); // 10, 20, 30
// More concise using existing operators
function multiplyBy<T extends number>(multiplier: number) {
return (source$: Observable<T>) => source$.pipe(
map(value => (value * multiplier) as T)
);
}
// Even more concise using direct pipe return
const multiplyBy = <T extends number>(multiplier: number) =>
map<T, T>(value => (value * multiplier) as T);
Example: Operator with state management
// Custom operator with internal state
function rateLimit<T>(count: number, timeWindow: number) {
return (source$: Observable<T>): Observable<T> => {
return new Observable(subscriber => {
const timestamps: number[] = [];
return source$.subscribe({
next: value => {
const now = Date.now();
// Remove timestamps outside window
while (timestamps.length > 0 &&
timestamps[0] < now - timeWindow) {
timestamps.shift();
}
if (timestamps.length < count) {
timestamps.push(now);
subscriber.next(value);
} else {
console.warn('Rate limit exceeded');
}
},
error: err => subscriber.error(err),
complete: () => subscriber.complete()
});
});
};
}
// Usage: Max 5 emissions per 1000ms
clicks$.pipe(
rateLimit(5, 1000)
).subscribe(event => processClick(event));
2. Operator Function Composition Patterns
| Pattern | Syntax | Description | Benefit |
|---|---|---|---|
| Pipe Composition | pipe(op1, op2, op3) |
Chain operators sequentially | Readable, maintainable operator chains |
| Operator Wrapping | source => source.pipe(...ops) |
Create operator from operator chain | Encapsulate complex logic |
| Higher-Order Composition | (...args) => source => pipe(...) |
Parameterized operator composition | Flexible, reusable operators |
| Conditional Operators | condition ? operator1 : operator2 |
Dynamic operator selection | Runtime operator switching |
| Operator Arrays | pipe(...operatorArray) |
Dynamic operator chains from arrays | Programmatic pipeline construction |
Example: Composing operators into reusable functions
// Create reusable operator combinations
const debugOperator = <T>(label: string) => (source$: Observable<T>) =>
source$.pipe(
tap(value => console.log(`[${label}] Next:`, value)),
tap({ error: err => console.error(`[${label}] Error:`, err) }),
tap({ complete: () => console.log(`[${label}] Complete`) })
);
// Compose multiple operators
const retryWithBackoff = (maxRetries: number) => <T>(source$: Observable<T>) =>
source$.pipe(
retryWhen(errors => errors.pipe(
scan((retryCount, err) => {
if (retryCount >= maxRetries) throw err;
return retryCount + 1;
}, 0),
delayWhen(retryCount => timer(retryCount * 1000))
))
);
// Usage
http.get('/api/data').pipe(
debugOperator('API Call'),
retryWithBackoff(3)
).subscribe(data => console.log(data));
Example: Dynamic operator composition
// Build operator chain dynamically
function createFilterPipeline<T>(config: {
debounce?: number;
distinct?: boolean;
minLength?: number;
}) {
const operators: any[] = [];
if (config.debounce) {
operators.push(debounceTime(config.debounce));
}
if (config.distinct) {
operators.push(distinctUntilChanged());
}
if (config.minLength) {
operators.push(
filter((value: string) => value.length >= config.minLength)
);
}
return (source$: Observable<T>) => source$.pipe(...operators);
}
// Usage
searchInput$.pipe(
createFilterPipeline({
debounce: 300,
distinct: true,
minLength: 3
}),
switchMap(term => searchAPI(term))
).subscribe(results => displayResults(results));
3. Reusable Operator Libraries
| Library Type | Example | Organization | Best Practice |
|---|---|---|---|
| Utility Operators | filterNullish, tapOnce, delayedRetry | Group by functionality | Single responsibility per operator |
| Domain Operators | validateEmail, parseJSON, sanitizeHTML | Group by domain logic | Business logic encapsulation |
| Integration Operators | fromWebSocket, toLocalStorage, withAuth | Group by integration type | External system interactions |
| Performance Operators | memoize, shareWithExpiry, bufferOptimal | Group by optimization type | Performance enhancement |
Example: Building a custom operator library
// operators/filtering.ts
export const filterNullish = <T>() => (source$: Observable<T | null | undefined>): Observable<T> =>
source$.pipe(filter((value): value is T => value != null));
export const filterByType = <T, K extends keyof T>(
key: K,
type: string
) => filter<T>(item => typeof item[key] === type);
// operators/transformation.ts
export const parseJSON = <T>() => (source$: Observable<string>): Observable<T> =>
source$.pipe(
map(str => JSON.parse(str) as T),
catchError(err => {
console.error('JSON Parse Error:', err);
return EMPTY;
})
);
export const toArray = <T>() => (source$: Observable<T>): Observable<T[]> =>
source$.pipe(
scan((acc: T[], value) => [...acc, value], [])
);
// operators/timing.ts
export const debounceAfterFirst = <T>(duration: number) =>
(source$: Observable<T>): Observable<T> => {
return new Observable(subscriber => {
let isFirst = true;
return source$.pipe(
mergeMap(value =>
isFirst
? (isFirst = false, of(value))
: of(value).pipe(debounceTime(duration))
)
).subscribe(subscriber);
});
};
// Usage
import { filterNullish, parseJSON, debounceAfterFirst } from './operators';
apiResponse$.pipe(
filterNullish(),
parseJSON<UserData>(),
debounceAfterFirst(300)
).subscribe(data => console.log(data));
Example: Domain-specific operator library
// operators/http.ts
export const withRetry = (maxRetries = 3, delayMs = 1000) => <T>(source$: Observable<T>) =>
source$.pipe(
retry({
count: maxRetries,
delay: delayMs
})
);
export const withTimeout = (ms: number, fallback?: any) => <T>(source$: Observable<T>) =>
source$.pipe(
timeout(ms),
catchError(err =>
fallback !== undefined ? of(fallback) : throwError(() => err)
)
);
export const withCache = <T>(cacheTime = 60000) => (source$: Observable<T>) =>
source$.pipe(
shareReplay({
bufferSize: 1,
refCount: true,
windowTime: cacheTime
})
);
// Combine operators
export const httpRequest = <T>(config?: {
retries?: number;
timeout?: number;
cache?: number;
}) => (source$: Observable<T>) => {
let pipeline = source$;
if (config?.retries) {
pipeline = pipeline.pipe(withRetry(config.retries));
}
if (config?.timeout) {
pipeline = pipeline.pipe(withTimeout(config.timeout));
}
if (config?.cache) {
pipeline = pipeline.pipe(withCache(config.cache));
}
return pipeline;
};
// Usage
http.get<User[]>('/api/users').pipe(
httpRequest({ retries: 3, timeout: 5000, cache: 60000 })
).subscribe(users => console.log(users));
4. Higher-Order Operator Development
| Type | Pattern | Description | Example |
|---|---|---|---|
| Flattening Operators | source => inner$ => result$ |
Transform and flatten inner observables | Custom switchMap variant |
| Buffering Operators | source => buffer$[] => emit |
Accumulate and emit batches | Smart buffering logic |
| Windowing Operators | source => window$ => Observable<Observable> |
Group emissions into observable windows | Custom window logic |
| Joining Operators | source1 + source2 => combined$ |
Combine multiple observables with custom logic | Special combination patterns |
Example: Custom flattening operator
// switchMapWithPriority: Cancel low-priority, keep high-priority
function switchMapWithPriority<T, R>(
project: (value: T) => Observable<R>,
getPriority: (value: T) => number
) {
return (source$: Observable<T>): Observable<R> => {
return new Observable(subscriber => {
let currentPriority = -Infinity;
let innerSubscription: Subscription | null = null;
const subscription = source$.subscribe({
next: value => {
const priority = getPriority(value);
// Only switch if new priority is higher or equal
if (priority >= currentPriority) {
currentPriority = priority;
innerSubscription?.unsubscribe();
innerSubscription = project(value).subscribe({
next: result => subscriber.next(result),
error: err => subscriber.error(err),
complete: () => {
currentPriority = -Infinity;
innerSubscription = null;
}
});
}
},
error: err => subscriber.error(err),
complete: () => {
if (!innerSubscription || innerSubscription.closed) {
subscriber.complete();
}
}
});
return () => {
subscription.unsubscribe();
innerSubscription?.unsubscribe();
};
});
};
}
// Usage
interface SearchRequest {
query: string;
priority: number; // 1=normal, 2=high, 3=critical
}
searchRequests$.pipe(
switchMapWithPriority(
req => searchAPI(req.query),
req => req.priority
)
).subscribe(results => displayResults(results));
Example: Custom buffering operator
// bufferUntilSizeOrTime: Buffer until size OR time limit
function bufferUntilSizeOrTime<T>(size: number, timeMs: number) {
return (source$: Observable<T>): Observable<T[]> => {
return new Observable(subscriber => {
let buffer: T[] = [];
let timer: any = null;
const emit = () => {
if (buffer.length > 0) {
subscriber.next([...buffer]);
buffer = [];
}
if (timer) {
clearTimeout(timer);
timer = null;
}
};
const resetTimer = () => {
if (timer) clearTimeout(timer);
timer = setTimeout(emit, timeMs);
};
const subscription = source$.subscribe({
next: value => {
buffer.push(value);
if (buffer.length === 1) {
resetTimer(); // Start timer on first item
}
if (buffer.length >= size) {
emit(); // Emit when size reached
}
},
error: err => {
emit(); // Emit remaining buffer on error
subscriber.error(err);
},
complete: () => {
emit(); // Emit remaining buffer on complete
subscriber.complete();
}
});
return () => {
if (timer) clearTimeout(timer);
subscription.unsubscribe();
};
});
};
}
// Usage: Batch API calls (max 10 items OR every 1 second)
userActions$.pipe(
bufferUntilSizeOrTime(10, 1000),
filter(batch => batch.length > 0),
mergeMap(batch => sendBatchToAPI(batch))
).subscribe(response => console.log('Batch sent:', response));
5. Operator Testing and Validation
| Testing Type | Tool | Description | Focus |
|---|---|---|---|
| Marble Testing | TestScheduler | Visual testing with marble diagrams | Timing and sequence verification |
| Unit Testing | Jest/Mocha | Test operator logic in isolation | Correctness and edge cases |
| Integration Testing | TestBed | Test operators in real scenarios | Integration with other operators |
| Performance Testing | Benchmarking | Measure execution time and memory | Efficiency and optimization |
| Type Testing | TypeScript | Verify type safety and inference | Type correctness and generics |
Example: Marble testing custom operators
import { TestScheduler } from 'rxjs/testing';
describe('Custom multiplyBy operator', () => {
let scheduler: TestScheduler;
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
it('should multiply values by multiplier', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold('a-b-c|', { a: 1, b: 2, c: 3 });
const expected = 'a-b-c|';
const values = { a: 10, b: 20, c: 30 };
const result$ = source$.pipe(multiplyBy(10));
expectObservable(result$).toBe(expected, values);
});
});
it('should handle errors correctly', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold('a-b-#', { a: 1, b: 2 });
const expected = 'a-b-#';
const values = { a: 5, b: 10 };
const result$ = source$.pipe(multiplyBy(5));
expectObservable(result$).toBe(expected, values);
});
});
it('should complete when source completes', () => {
scheduler.run(({ cold, expectObservable }) => {
const source$ = cold('a-b-c-|', { a: 2, b: 4, c: 6 });
const expected = 'a-b-c-|';
const values = { a: 4, b: 8, c: 12 };
const result$ = source$.pipe(multiplyBy(2));
expectObservable(result$).toBe(expected, values);
});
});
});
Example: Testing operator with state
describe('rateLimit operator', () => {
let scheduler: TestScheduler;
beforeEach(() => {
scheduler = new TestScheduler((actual, expected) => {
expect(actual).toEqual(expected);
});
});
it('should limit emissions within time window', () => {
scheduler.run(({ cold, expectObservable, time }) => {
const windowTime = time('---|'); // 3 time units
// Emit 5 values quickly
const source$ = cold('(abcde)', { a: 1, b: 2, c: 3, d: 4, e: 5 });
// Only first 3 should pass (limit: 3 per window)
const expected = '(abc)';
const values = { a: 1, b: 2, c: 3 };
const result$ = source$.pipe(rateLimit(3, 3));
expectObservable(result$).toBe(expected, values);
});
});
it('should reset limit after time window', (done) => {
const emissions: number[] = [];
// Use real time for this test
interval(100).pipe(
take(10),
rateLimit(3, 500) // 3 per 500ms
).subscribe({
next: value => emissions.push(value),
complete: () => {
// Should get ~6 values (3 in first 500ms, 3 in next 500ms)
expect(emissions.length).toBeGreaterThanOrEqual(6);
expect(emissions.length).toBeLessThanOrEqual(7);
done();
}
});
});
});
Example: Type testing
// Type testing with TypeScript
import { Observable } from 'rxjs';
import { expectType } from 'tsd';
// Test type inference
const numberObs$ = of(1, 2, 3);
const multiplied$ = numberObs$.pipe(multiplyBy(10));
expectType<Observable<number>>(multiplied$);
// Test generic constraints
interface User { id: number; name: string; }
const users$ = of<User>({ id: 1, name: 'Alice' });
// This should work
const filtered$ = users$.pipe(filterByType('name', 'string'));
expectType<Observable<User>>(filtered$);
// Test error cases (should not compile)
// const invalid$ = of('string').pipe(multiplyBy(10)); // Error: string not assignable to number
// Test operator signature
type MultiplyByOperator = <T extends number>(
multiplier: number
) => (source$: Observable<T>) => Observable<T>;
expectType<MultiplyByOperator>(multiplyBy);
6. Community Operator Patterns and Libraries
| Library/Pattern | Category | Key Operators | Use Case |
|---|---|---|---|
| rxjs-etc | Utilities | tapIf, filterMap, pluckFirst | Common utility operators |
| rxjs-spy | Debugging | tag, snapshot, deck | Observable debugging and inspection |
| ngx-operators | Angular | filterNil, mapArray, ofType | Angular-specific operators |
| Custom Retry | Error Handling | exponentialBackoff, retryStrategy | Advanced retry patterns |
| Caching | Performance | cacheable, stale-while-revalidate | Response caching strategies |
| State Management | State | scan-with-reducer, snapshot | Observable state patterns |
Example: Common community operator patterns
// filterNil: Remove null and undefined
const filterNil = <T>() => filter<T | null | undefined, T>(
(value): value is T => value != null
);
// tapIf: Conditional side effect
const tapIf = <T>(
predicate: (value: T) => boolean,
fn: (value: T) => void
) => tap<T>(value => {
if (predicate(value)) fn(value);
});
// filterMap: Combine filter and map
const filterMap = <T, R>(
fn: (value: T) => R | null | undefined
) => (source$: Observable<T>): Observable<R> =>
source$.pipe(
map(fn),
filter((value): value is R => value != null)
);
// Usage
apiResponse$.pipe(
filterNil(),
tapIf(
data => data.isImportant,
data => logImportantData(data)
),
filterMap(data => data.value > 0 ? data : null)
).subscribe(result => console.log(result));
Example: Advanced retry with exponential backoff
// Exponential backoff retry strategy
const retryWithExponentialBackoff = (config: {
maxRetries?: number;
initialDelay?: number;
maxDelay?: number;
backoffMultiplier?: number;
shouldRetry?: (error: any) => boolean;
} = {}) => <T>(source$: Observable<T>): Observable<T> => {
const {
maxRetries = 3,
initialDelay = 1000,
maxDelay = 30000,
backoffMultiplier = 2,
shouldRetry = () => true
} = config;
return source$.pipe(
retryWhen(errors => errors.pipe(
mergeMap((error, index) => {
const retryAttempt = index + 1;
// Don't retry if max retries reached or shouldn't retry
if (retryAttempt > maxRetries || !shouldRetry(error)) {
return throwError(() => error);
}
// Calculate delay with exponential backoff
const delay = Math.min(
initialDelay * Math.pow(backoffMultiplier, index),
maxDelay
);
console.log(`Retry ${retryAttempt}/${maxRetries} after ${delay}ms`);
return timer(delay);
})
))
);
};
// Usage
http.get('/api/data').pipe(
retryWithExponentialBackoff({
maxRetries: 5,
initialDelay: 500,
maxDelay: 10000,
backoffMultiplier: 2,
shouldRetry: (error) => error.status !== 404
})
).subscribe({
next: data => console.log('Success:', data),
error: err => console.error('Failed after retries:', err)
});
Example: Caching with stale-while-revalidate
// Stale-while-revalidate caching pattern
const staleWhileRevalidate = <T>(config: {
cacheTime?: number;
staleTime?: number;
}) => {
const { cacheTime = 60000, staleTime = 5000 } = config;
let cached: { value: T; timestamp: number } | null = null;
return (source$: Observable<T>): Observable<T> => {
return new Observable(subscriber => {
const now = Date.now();
// Return cached value if fresh
if (cached && now - cached.timestamp < staleTime) {
subscriber.next(cached.value);
subscriber.complete();
return;
}
// Return stale cache while revalidating
if (cached && now - cached.timestamp < cacheTime) {
subscriber.next(cached.value);
// Fetch fresh data in background
source$.subscribe({
next: value => {
cached = { value, timestamp: Date.now() };
},
error: err => console.warn('Revalidation failed:', err)
});
subscriber.complete();
return;
}
// No cache or cache expired, fetch fresh
return source$.subscribe({
next: value => {
cached = { value, timestamp: Date.now() };
subscriber.next(value);
},
error: err => subscriber.error(err),
complete: () => subscriber.complete()
});
});
};
};
// Usage
function getUser(id: number): Observable<User> {
return http.get<User>(`/api/users/${id}`).pipe(
staleWhileRevalidate({
staleTime: 5000, // Fresh for 5 seconds
cacheTime: 60000 // Valid for 60 seconds
})
);
}
// First call: fetches from API
getUser(1).subscribe(user => console.log('Fresh:', user));
// Within 5s: returns cached instantly
setTimeout(() => {
getUser(1).subscribe(user => console.log('Cached:', user));
}, 2000);
// After 5s but before 60s: returns stale, revalidates in background
setTimeout(() => {
getUser(1).subscribe(user => console.log('Stale:', user));
}, 10000);
Best Practices:
- Keep operators pure - no side effects except in tap-like operators
- Use TypeScript generics for type safety and inference
- Document operators with JSDoc comments including marble diagrams
- Handle unsubscription properly to prevent memory leaks
- Write tests for all custom operators, especially edge cases
- Follow naming conventions - verb-based names (filterBy, mapTo, etc.)
Common Pitfalls:
- Not handling errors properly in custom operators
- Creating memory leaks by not cleaning up subscriptions
- Using stateful logic without considering multiple subscriptions
- Ignoring backpressure in buffering operators
- Over-engineering - use existing operators when possible
Section 13 Summary
- Pipeable operators return functions that transform observables - highly composable
- Operator factories allow parameterization using higher-order functions
- Compose operators from existing ones for reusability and clarity
- Build libraries organized by functionality, domain, or integration type
- Test operators thoroughly using marble diagrams and unit tests
- Community patterns provide proven solutions for common use cases