Custom Operators and Operator Development

1. Creating Custom Pipeable Operators

Concept Pattern Description Use Case
Pipeable Operator function<T, R>(Observable<T>): Observable<R> Function that transforms observables, chainable with pipe() Creating reusable transformation logic
Operator Factory function(...args) { return (source$) => ... } Higher-order function that returns an operator Parameterized custom operators
lift() Method DEPRECATED source.lift(operator) Legacy operator creation method Use pipeable operators instead
Observable Constructor new Observable(subscriber => ...) Manual observable creation within operator Full control over emission logic
pipe() Composition pipe(operator1, operator2, ...) Combine multiple operators into one Building complex operators from simple ones

Example: Basic custom pipeable operator

// Simple operator without parameters
function doubleValue<T extends number>() {
  return (source$: Observable<T>): Observable<T> => {
    return new Observable(subscriber => {
      return source$.subscribe({
        next: value => subscriber.next((value * 2) as T),
        error: err => subscriber.error(err),
        complete: () => subscriber.complete()
      });
    });
  };
}

// Usage
of(1, 2, 3).pipe(
  doubleValue()
).subscribe(console.log); // 2, 4, 6

Example: Parameterized custom operator

// Operator factory pattern
function multiplyBy<T extends number>(multiplier: number) {
  return (source$: Observable<T>): Observable<T> => {
    return new Observable(subscriber => {
      return source$.subscribe({
        next: value => subscriber.next((value * multiplier) as T),
        error: err => subscriber.error(err),
        complete: () => subscriber.complete()
      });
    });
  };
}

// Usage
of(1, 2, 3).pipe(
  multiplyBy(10)
).subscribe(console.log); // 10, 20, 30

// More concise using existing operators
function multiplyBy<T extends number>(multiplier: number) {
  return (source$: Observable<T>) => source$.pipe(
    map(value => (value * multiplier) as T)
  );
}

// Even more concise using direct pipe return
const multiplyBy = <T extends number>(multiplier: number) =>
  map<T, T>(value => (value * multiplier) as T);

Example: Operator with state management

// Custom operator with internal state
function rateLimit<T>(count: number, timeWindow: number) {
  return (source$: Observable<T>): Observable<T> => {
    return new Observable(subscriber => {
      const timestamps: number[] = [];
      
      return source$.subscribe({
        next: value => {
          const now = Date.now();
          // Remove timestamps outside window
          while (timestamps.length > 0 && 
                 timestamps[0] < now - timeWindow) {
            timestamps.shift();
          }
          
          if (timestamps.length < count) {
            timestamps.push(now);
            subscriber.next(value);
          } else {
            console.warn('Rate limit exceeded');
          }
        },
        error: err => subscriber.error(err),
        complete: () => subscriber.complete()
      });
    });
  };
}

// Usage: Max 5 emissions per 1000ms
clicks$.pipe(
  rateLimit(5, 1000)
).subscribe(event => processClick(event));

2. Operator Function Composition Patterns

Pattern Syntax Description Benefit
Pipe Composition pipe(op1, op2, op3) Chain operators sequentially Readable, maintainable operator chains
Operator Wrapping source => source.pipe(...ops) Create operator from operator chain Encapsulate complex logic
Higher-Order Composition (...args) => source => pipe(...) Parameterized operator composition Flexible, reusable operators
Conditional Operators condition ? operator1 : operator2 Dynamic operator selection Runtime operator switching
Operator Arrays pipe(...operatorArray) Dynamic operator chains from arrays Programmatic pipeline construction

Example: Composing operators into reusable functions

// Create reusable operator combinations
const debugOperator = <T>(label: string) => (source$: Observable<T>) =>
  source$.pipe(
    tap(value => console.log(`[${label}] Next:`, value)),
    tap({ error: err => console.error(`[${label}] Error:`, err) }),
    tap({ complete: () => console.log(`[${label}] Complete`) })
  );

// Compose multiple operators
const retryWithBackoff = (maxRetries: number) => <T>(source$: Observable<T>) =>
  source$.pipe(
    retryWhen(errors => errors.pipe(
      scan((retryCount, err) => {
        if (retryCount >= maxRetries) throw err;
        return retryCount + 1;
      }, 0),
      delayWhen(retryCount => timer(retryCount * 1000))
    ))
  );

// Usage
http.get('/api/data').pipe(
  debugOperator('API Call'),
  retryWithBackoff(3)
).subscribe(data => console.log(data));

Example: Dynamic operator composition

// Build operator chain dynamically
function createFilterPipeline<T>(config: {
  debounce?: number;
  distinct?: boolean;
  minLength?: number;
}) {
  const operators: any[] = [];
  
  if (config.debounce) {
    operators.push(debounceTime(config.debounce));
  }
  
  if (config.distinct) {
    operators.push(distinctUntilChanged());
  }
  
  if (config.minLength) {
    operators.push(
      filter((value: string) => value.length >= config.minLength)
    );
  }
  
  return (source$: Observable<T>) => source$.pipe(...operators);
}

// Usage
searchInput$.pipe(
  createFilterPipeline({
    debounce: 300,
    distinct: true,
    minLength: 3
  }),
  switchMap(term => searchAPI(term))
).subscribe(results => displayResults(results));

3. Reusable Operator Libraries

Library Type Example Organization Best Practice
Utility Operators filterNullish, tapOnce, delayedRetry Group by functionality Single responsibility per operator
Domain Operators validateEmail, parseJSON, sanitizeHTML Group by domain logic Business logic encapsulation
Integration Operators fromWebSocket, toLocalStorage, withAuth Group by integration type External system interactions
Performance Operators memoize, shareWithExpiry, bufferOptimal Group by optimization type Performance enhancement

Example: Building a custom operator library

// operators/filtering.ts
export const filterNullish = <T>() => (source$: Observable<T | null | undefined>): Observable<T> =>
  source$.pipe(filter((value): value is T => value != null));

export const filterByType = <T, K extends keyof T>(
  key: K, 
  type: string
) => filter<T>(item => typeof item[key] === type);

// operators/transformation.ts
export const parseJSON = <T>() => (source$: Observable<string>): Observable<T> =>
  source$.pipe(
    map(str => JSON.parse(str) as T),
    catchError(err => {
      console.error('JSON Parse Error:', err);
      return EMPTY;
    })
  );

export const toArray = <T>() => (source$: Observable<T>): Observable<T[]> =>
  source$.pipe(
    scan((acc: T[], value) => [...acc, value], [])
  );

// operators/timing.ts
export const debounceAfterFirst = <T>(duration: number) => 
  (source$: Observable<T>): Observable<T> => {
    return new Observable(subscriber => {
      let isFirst = true;
      
      return source$.pipe(
        mergeMap(value => 
          isFirst 
            ? (isFirst = false, of(value))
            : of(value).pipe(debounceTime(duration))
        )
      ).subscribe(subscriber);
    });
  };

// Usage
import { filterNullish, parseJSON, debounceAfterFirst } from './operators';

apiResponse$.pipe(
  filterNullish(),
  parseJSON<UserData>(),
  debounceAfterFirst(300)
).subscribe(data => console.log(data));

Example: Domain-specific operator library

// operators/http.ts
export const withRetry = (maxRetries = 3, delayMs = 1000) => <T>(source$: Observable<T>) =>
  source$.pipe(
    retry({
      count: maxRetries,
      delay: delayMs
    })
  );

export const withTimeout = (ms: number, fallback?: any) => <T>(source$: Observable<T>) =>
  source$.pipe(
    timeout(ms),
    catchError(err => 
      fallback !== undefined ? of(fallback) : throwError(() => err)
    )
  );

export const withCache = <T>(cacheTime = 60000) => (source$: Observable<T>) =>
  source$.pipe(
    shareReplay({
      bufferSize: 1,
      refCount: true,
      windowTime: cacheTime
    })
  );

// Combine operators
export const httpRequest = <T>(config?: {
  retries?: number;
  timeout?: number;
  cache?: number;
}) => (source$: Observable<T>) => {
  let pipeline = source$;
  
  if (config?.retries) {
    pipeline = pipeline.pipe(withRetry(config.retries));
  }
  if (config?.timeout) {
    pipeline = pipeline.pipe(withTimeout(config.timeout));
  }
  if (config?.cache) {
    pipeline = pipeline.pipe(withCache(config.cache));
  }
  
  return pipeline;
};

// Usage
http.get<User[]>('/api/users').pipe(
  httpRequest({ retries: 3, timeout: 5000, cache: 60000 })
).subscribe(users => console.log(users));

4. Higher-Order Operator Development

Type Pattern Description Example
Flattening Operators source => inner$ => result$ Transform and flatten inner observables Custom switchMap variant
Buffering Operators source => buffer$[] => emit Accumulate and emit batches Smart buffering logic
Windowing Operators source => window$ => Observable<Observable> Group emissions into observable windows Custom window logic
Joining Operators source1 + source2 => combined$ Combine multiple observables with custom logic Special combination patterns

Example: Custom flattening operator

// switchMapWithPriority: Cancel low-priority, keep high-priority
function switchMapWithPriority<T, R>(
  project: (value: T) => Observable<R>,
  getPriority: (value: T) => number
) {
  return (source$: Observable<T>): Observable<R> => {
    return new Observable(subscriber => {
      let currentPriority = -Infinity;
      let innerSubscription: Subscription | null = null;
      
      const subscription = source$.subscribe({
        next: value => {
          const priority = getPriority(value);
          
          // Only switch if new priority is higher or equal
          if (priority >= currentPriority) {
            currentPriority = priority;
            innerSubscription?.unsubscribe();
            
            innerSubscription = project(value).subscribe({
              next: result => subscriber.next(result),
              error: err => subscriber.error(err),
              complete: () => {
                currentPriority = -Infinity;
                innerSubscription = null;
              }
            });
          }
        },
        error: err => subscriber.error(err),
        complete: () => {
          if (!innerSubscription || innerSubscription.closed) {
            subscriber.complete();
          }
        }
      });
      
      return () => {
        subscription.unsubscribe();
        innerSubscription?.unsubscribe();
      };
    });
  };
}

// Usage
interface SearchRequest {
  query: string;
  priority: number; // 1=normal, 2=high, 3=critical
}

searchRequests$.pipe(
  switchMapWithPriority(
    req => searchAPI(req.query),
    req => req.priority
  )
).subscribe(results => displayResults(results));

Example: Custom buffering operator

// bufferUntilSizeOrTime: Buffer until size OR time limit
function bufferUntilSizeOrTime<T>(size: number, timeMs: number) {
  return (source$: Observable<T>): Observable<T[]> => {
    return new Observable(subscriber => {
      let buffer: T[] = [];
      let timer: any = null;
      
      const emit = () => {
        if (buffer.length > 0) {
          subscriber.next([...buffer]);
          buffer = [];
        }
        if (timer) {
          clearTimeout(timer);
          timer = null;
        }
      };
      
      const resetTimer = () => {
        if (timer) clearTimeout(timer);
        timer = setTimeout(emit, timeMs);
      };
      
      const subscription = source$.subscribe({
        next: value => {
          buffer.push(value);
          
          if (buffer.length === 1) {
            resetTimer(); // Start timer on first item
          }
          
          if (buffer.length >= size) {
            emit(); // Emit when size reached
          }
        },
        error: err => {
          emit(); // Emit remaining buffer on error
          subscriber.error(err);
        },
        complete: () => {
          emit(); // Emit remaining buffer on complete
          subscriber.complete();
        }
      });
      
      return () => {
        if (timer) clearTimeout(timer);
        subscription.unsubscribe();
      };
    });
  };
}

// Usage: Batch API calls (max 10 items OR every 1 second)
userActions$.pipe(
  bufferUntilSizeOrTime(10, 1000),
  filter(batch => batch.length > 0),
  mergeMap(batch => sendBatchToAPI(batch))
).subscribe(response => console.log('Batch sent:', response));

5. Operator Testing and Validation

Testing Type Tool Description Focus
Marble Testing TestScheduler Visual testing with marble diagrams Timing and sequence verification
Unit Testing Jest/Mocha Test operator logic in isolation Correctness and edge cases
Integration Testing TestBed Test operators in real scenarios Integration with other operators
Performance Testing Benchmarking Measure execution time and memory Efficiency and optimization
Type Testing TypeScript Verify type safety and inference Type correctness and generics

Example: Marble testing custom operators

import { TestScheduler } from 'rxjs/testing';

describe('Custom multiplyBy operator', () => {
  let scheduler: TestScheduler;
  
  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });
  
  it('should multiply values by multiplier', () => {
    scheduler.run(({ cold, expectObservable }) => {
      const source$ = cold('a-b-c|', { a: 1, b: 2, c: 3 });
      const expected =     'a-b-c|';
      const values = { a: 10, b: 20, c: 30 };
      
      const result$ = source$.pipe(multiplyBy(10));
      
      expectObservable(result$).toBe(expected, values);
    });
  });
  
  it('should handle errors correctly', () => {
    scheduler.run(({ cold, expectObservable }) => {
      const source$ = cold('a-b-#', { a: 1, b: 2 });
      const expected =     'a-b-#';
      const values = { a: 5, b: 10 };
      
      const result$ = source$.pipe(multiplyBy(5));
      
      expectObservable(result$).toBe(expected, values);
    });
  });
  
  it('should complete when source completes', () => {
    scheduler.run(({ cold, expectObservable }) => {
      const source$ = cold('a-b-c-|', { a: 2, b: 4, c: 6 });
      const expected =     'a-b-c-|';
      const values = { a: 4, b: 8, c: 12 };
      
      const result$ = source$.pipe(multiplyBy(2));
      
      expectObservable(result$).toBe(expected, values);
    });
  });
});

Example: Testing operator with state

describe('rateLimit operator', () => {
  let scheduler: TestScheduler;
  
  beforeEach(() => {
    scheduler = new TestScheduler((actual, expected) => {
      expect(actual).toEqual(expected);
    });
  });
  
  it('should limit emissions within time window', () => {
    scheduler.run(({ cold, expectObservable, time }) => {
      const windowTime = time('---|'); // 3 time units
      
      // Emit 5 values quickly
      const source$ = cold('(abcde)', { a: 1, b: 2, c: 3, d: 4, e: 5 });
      // Only first 3 should pass (limit: 3 per window)
      const expected =     '(abc)';
      const values = { a: 1, b: 2, c: 3 };
      
      const result$ = source$.pipe(rateLimit(3, 3));
      
      expectObservable(result$).toBe(expected, values);
    });
  });
  
  it('should reset limit after time window', (done) => {
    const emissions: number[] = [];
    
    // Use real time for this test
    interval(100).pipe(
      take(10),
      rateLimit(3, 500) // 3 per 500ms
    ).subscribe({
      next: value => emissions.push(value),
      complete: () => {
        // Should get ~6 values (3 in first 500ms, 3 in next 500ms)
        expect(emissions.length).toBeGreaterThanOrEqual(6);
        expect(emissions.length).toBeLessThanOrEqual(7);
        done();
      }
    });
  });
});

Example: Type testing

// Type testing with TypeScript
import { Observable } from 'rxjs';
import { expectType } from 'tsd';

// Test type inference
const numberObs$ = of(1, 2, 3);
const multiplied$ = numberObs$.pipe(multiplyBy(10));
expectType<Observable<number>>(multiplied$);

// Test generic constraints
interface User { id: number; name: string; }
const users$ = of<User>({ id: 1, name: 'Alice' });

// This should work
const filtered$ = users$.pipe(filterByType('name', 'string'));
expectType<Observable<User>>(filtered$);

// Test error cases (should not compile)
// const invalid$ = of('string').pipe(multiplyBy(10)); // Error: string not assignable to number

// Test operator signature
type MultiplyByOperator = <T extends number>(
  multiplier: number
) => (source$: Observable<T>) => Observable<T>;

expectType<MultiplyByOperator>(multiplyBy);

6. Community Operator Patterns and Libraries

Library/Pattern Category Key Operators Use Case
rxjs-etc Utilities tapIf, filterMap, pluckFirst Common utility operators
rxjs-spy Debugging tag, snapshot, deck Observable debugging and inspection
ngx-operators Angular filterNil, mapArray, ofType Angular-specific operators
Custom Retry Error Handling exponentialBackoff, retryStrategy Advanced retry patterns
Caching Performance cacheable, stale-while-revalidate Response caching strategies
State Management State scan-with-reducer, snapshot Observable state patterns

Example: Common community operator patterns

// filterNil: Remove null and undefined
const filterNil = <T>() => filter<T | null | undefined, T>(
  (value): value is T => value != null
);

// tapIf: Conditional side effect
const tapIf = <T>(
  predicate: (value: T) => boolean,
  fn: (value: T) => void
) => tap<T>(value => {
  if (predicate(value)) fn(value);
});

// filterMap: Combine filter and map
const filterMap = <T, R>(
  fn: (value: T) => R | null | undefined
) => (source$: Observable<T>): Observable<R> =>
  source$.pipe(
    map(fn),
    filter((value): value is R => value != null)
  );

// Usage
apiResponse$.pipe(
  filterNil(),
  tapIf(
    data => data.isImportant,
    data => logImportantData(data)
  ),
  filterMap(data => data.value > 0 ? data : null)
).subscribe(result => console.log(result));

Example: Advanced retry with exponential backoff

// Exponential backoff retry strategy
const retryWithExponentialBackoff = (config: {
  maxRetries?: number;
  initialDelay?: number;
  maxDelay?: number;
  backoffMultiplier?: number;
  shouldRetry?: (error: any) => boolean;
} = {}) => <T>(source$: Observable<T>): Observable<T> => {
  const {
    maxRetries = 3,
    initialDelay = 1000,
    maxDelay = 30000,
    backoffMultiplier = 2,
    shouldRetry = () => true
  } = config;
  
  return source$.pipe(
    retryWhen(errors => errors.pipe(
      mergeMap((error, index) => {
        const retryAttempt = index + 1;
        
        // Don't retry if max retries reached or shouldn't retry
        if (retryAttempt > maxRetries || !shouldRetry(error)) {
          return throwError(() => error);
        }
        
        // Calculate delay with exponential backoff
        const delay = Math.min(
          initialDelay * Math.pow(backoffMultiplier, index),
          maxDelay
        );
        
        console.log(`Retry ${retryAttempt}/${maxRetries} after ${delay}ms`);
        
        return timer(delay);
      })
    ))
  );
};

// Usage
http.get('/api/data').pipe(
  retryWithExponentialBackoff({
    maxRetries: 5,
    initialDelay: 500,
    maxDelay: 10000,
    backoffMultiplier: 2,
    shouldRetry: (error) => error.status !== 404
  })
).subscribe({
  next: data => console.log('Success:', data),
  error: err => console.error('Failed after retries:', err)
});

Example: Caching with stale-while-revalidate

// Stale-while-revalidate caching pattern
const staleWhileRevalidate = <T>(config: {
  cacheTime?: number;
  staleTime?: number;
}) => {
  const { cacheTime = 60000, staleTime = 5000 } = config;
  let cached: { value: T; timestamp: number } | null = null;
  
  return (source$: Observable<T>): Observable<T> => {
    return new Observable(subscriber => {
      const now = Date.now();
      
      // Return cached value if fresh
      if (cached && now - cached.timestamp < staleTime) {
        subscriber.next(cached.value);
        subscriber.complete();
        return;
      }
      
      // Return stale cache while revalidating
      if (cached && now - cached.timestamp < cacheTime) {
        subscriber.next(cached.value);
        
        // Fetch fresh data in background
        source$.subscribe({
          next: value => {
            cached = { value, timestamp: Date.now() };
          },
          error: err => console.warn('Revalidation failed:', err)
        });
        
        subscriber.complete();
        return;
      }
      
      // No cache or cache expired, fetch fresh
      return source$.subscribe({
        next: value => {
          cached = { value, timestamp: Date.now() };
          subscriber.next(value);
        },
        error: err => subscriber.error(err),
        complete: () => subscriber.complete()
      });
    });
  };
};

// Usage
function getUser(id: number): Observable<User> {
  return http.get<User>(`/api/users/${id}`).pipe(
    staleWhileRevalidate({
      staleTime: 5000,    // Fresh for 5 seconds
      cacheTime: 60000    // Valid for 60 seconds
    })
  );
}

// First call: fetches from API
getUser(1).subscribe(user => console.log('Fresh:', user));

// Within 5s: returns cached instantly
setTimeout(() => {
  getUser(1).subscribe(user => console.log('Cached:', user));
}, 2000);

// After 5s but before 60s: returns stale, revalidates in background
setTimeout(() => {
  getUser(1).subscribe(user => console.log('Stale:', user));
}, 10000);
Best Practices:
  • Keep operators pure - no side effects except in tap-like operators
  • Use TypeScript generics for type safety and inference
  • Document operators with JSDoc comments including marble diagrams
  • Handle unsubscription properly to prevent memory leaks
  • Write tests for all custom operators, especially edge cases
  • Follow naming conventions - verb-based names (filterBy, mapTo, etc.)
Common Pitfalls:
  • Not handling errors properly in custom operators
  • Creating memory leaks by not cleaning up subscriptions
  • Using stateful logic without considering multiple subscriptions
  • Ignoring backpressure in buffering operators
  • Over-engineering - use existing operators when possible

Section 13 Summary

  • Pipeable operators return functions that transform observables - highly composable
  • Operator factories allow parameterization using higher-order functions
  • Compose operators from existing ones for reusability and clarity
  • Build libraries organized by functionality, domain, or integration type
  • Test operators thoroughly using marble diagrams and unit tests
  • Community patterns provide proven solutions for common use cases