Performance Optimization and Memory Management

1. Subscription Management and Automatic Cleanup

Pattern Implementation Description Use Case
Manual Unsubscribe subscription.unsubscribe() Explicitly clean up single subscription Simple component cleanup
Subscription Container sub.add(sub1); sub.add(sub2) Compose multiple subscriptions Managing multiple subscriptions together
takeUntil Pattern takeUntil(destroy$) Auto-unsubscribe based on notifier Component lifecycle management
async Pipe (Angular) {{ obs$ | async }} Framework handles subscription lifecycle Template-driven subscriptions
SubSink Pattern Custom subscription manager class Centralized subscription management Complex components with many subscriptions
finalize Operator finalize(() => cleanup()) Execute cleanup on unsubscribe or complete Resource cleanup (timers, connections)

Example: Subscription container pattern

class MyComponent {
  private subscriptions = new Subscription();
  
  ngOnInit() {
    // Add multiple subscriptions to container
    this.subscriptions.add(
      interval(1000).subscribe(n => console.log(n))
    );
    
    this.subscriptions.add(
      fromEvent(button, 'click').subscribe(e => this.handleClick(e))
    );
    
    this.subscriptions.add(
      this.dataService.getData().subscribe(data => this.data = data)
    );
  }
  
  ngOnDestroy() {
    // Unsubscribe all at once
    this.subscriptions.unsubscribe();
  }
}

Example: takeUntil pattern for automatic cleanup

class MyComponent implements OnDestroy {
  private destroy$ = new Subject<void>();
  
  ngOnInit() {
    // All subscriptions auto-complete when destroy$ emits
    interval(1000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(n => console.log(n));
    
    this.userService.getUser().pipe(
      takeUntil(this.destroy$)
    ).subscribe(user => this.user = user);
    
    fromEvent(document, 'click').pipe(
      takeUntil(this.destroy$)
    ).subscribe(e => this.handleClick(e));
  }
  
  ngOnDestroy() {
    // Single emission completes all subscriptions
    this.destroy$.next();
    this.destroy$.complete();
  }
}

Example: SubSink pattern for advanced subscription management

// Custom SubSink class
class SubSink {
  private subscriptions: Subscription[] = [];
  
  add(...subs: Subscription[]) {
    this.subscriptions.push(...subs);
  }
  
  unsubscribe() {
    this.subscriptions.forEach(sub => sub.unsubscribe());
    this.subscriptions = [];
  }
  
  get count() {
    return this.subscriptions.length;
  }
}

// Usage in component
class MyComponent {
  private subs = new SubSink();
  
  ngOnInit() {
    this.subs.add(
      timer(1000).subscribe(() => console.log('tick')),
      this.api.getData().subscribe(data => this.data = data),
      this.route.params.subscribe(params => this.id = params.id)
    );
    
    console.log(`Active subscriptions: ${this.subs.count}`);
  }
  
  ngOnDestroy() {
    this.subs.unsubscribe();
  }
}

2. Operator Chain Optimization and Pipeline Efficiency

Optimization Inefficient Efficient Benefit
Operator Order map().filter() filter().map() Reduce unnecessary transformations
Early Termination map().map().take(1) take(1).map().map() Stop processing early
Shared Computation Multiple subscriptions share() or shareReplay() Avoid duplicate work
Flattening Strategy Wrong operator choice switchMap vs mergeMap vs concatMap Match concurrency to use case
Avoid Nested Subscribes subscribe(x => obs.subscribe()) switchMap/mergeMap/concatMap Cleaner code, better memory management
Memoization Recalculate every time Cache expensive operations Reduce computation overhead

Example: Operator ordering optimization

// INEFFICIENT - processes all 1000 items before filtering
range(1, 1000).pipe(
  map(n => expensiveTransform(n)),  // 1000 operations
  filter(n => n > 500)               // Then filters
).subscribe();

// EFFICIENT - filters first, reducing transformations
range(1, 1000).pipe(
  filter(n => n > 500),              // Filters to ~500 items
  map(n => expensiveTransform(n))   // Only 500 operations
).subscribe();

// EVEN BETTER - early termination
range(1, 1000).pipe(
  filter(n => n > 500),
  take(10),                          // Stop after 10
  map(n => expensiveTransform(n))   // Only 10 operations
).subscribe();

Example: Avoiding nested subscriptions

// ANTI-PATTERN - nested subscriptions (hard to manage, memory leaks)
this.userService.getUser(userId).subscribe(user => {
  this.orderService.getOrders(user.id).subscribe(orders => {
    this.productService.getProducts(orders[0].id).subscribe(products => {
      console.log(products);
    });
  });
});

// BETTER - flattening operators
this.userService.getUser(userId).pipe(
  switchMap(user => this.orderService.getOrders(user.id)),
  switchMap(orders => this.productService.getProducts(orders[0].id))
).subscribe(products => {
  console.log(products);
});

// BEST - with error handling and cleanup
this.userService.getUser(userId).pipe(
  switchMap(user => this.orderService.getOrders(user.id)),
  switchMap(orders => this.productService.getProducts(orders[0].id)),
  catchError(err => {
    console.error('Error:', err);
    return of([]);
  }),
  takeUntil(this.destroy$)
).subscribe(products => {
  console.log(products);
});

Example: Choosing the right flattening operator

// switchMap - cancels previous, keeps latest (search/autocomplete)
searchInput$.pipe(
  debounceTime(300),
  switchMap(term => this.api.search(term))  // Cancel old searches
).subscribe(results => displayResults(results));

// mergeMap - concurrent processing (independent operations)
userIds$.pipe(
  mergeMap(id => this.api.getUser(id), 5)   // Max 5 concurrent
).subscribe(user => processUser(user));

// concatMap - sequential, ordered (must complete in order)
queue$.pipe(
  concatMap(task => this.processTask(task))  // One at a time
).subscribe(result => console.log(result));

// exhaustMap - ignore while busy (prevent duplicate submissions)
saveButton$.pipe(
  exhaustMap(() => this.api.save(data))      // Ignore clicks while saving
).subscribe(response => console.log('Saved:', response));

3. Memory Leak Detection and Prevention

Leak Source Symptom Prevention Detection Method
Unsubscribed Observables Growing memory usage Always unsubscribe or use takeUntil Chrome DevTools memory profiler
Long-lived Subjects Retained observers Complete subjects, unsubscribe Check observer count
Event Listeners DOM nodes not GC'd Use fromEvent with takeUntil Event listener count in DevTools
Timers/Intervals Background activity Unsubscribe from interval/timer Network/console activity when idle
shareReplay Misuse Cached subscriptions never release Use refCount: true and windowTime Memory snapshots over time
Closure Captures Large objects retained Limit closure scope, use finalize Heap snapshot analysis

Example: Common memory leak scenarios and fixes

// MEMORY LEAK - interval never stops
class LeakyComponent {
  ngOnInit() {
    interval(1000).subscribe(n => console.log(n));
    // Leak: subscription never cleaned up
  }
}

// FIXED - properly managed subscription
class FixedComponent implements OnDestroy {
  private destroy$ = new Subject<void>();
  
  ngOnInit() {
    interval(1000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(n => console.log(n));
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// MEMORY LEAK - Subject with many subscribers never completes
class DataService {
  private data$ = new Subject<any>();
  
  subscribe() {
    return this.data$.subscribe();  // Subscribers accumulate
  }
  
  // Missing: complete() or cleanup mechanism
}

// FIXED - proper Subject lifecycle
class DataService implements OnDestroy {
  private data$ = new Subject<any>();
  
  subscribe() {
    return this.data$.asObservable();
  }
  
  ngOnDestroy() {
    this.data$.complete();  // Release all subscribers
  }
}

Example: Detecting memory leaks

// Add leak detection helper
class LeakDetector {
  private subscriptions = new Map<string, Subscription>();
  
  track(name: string, subscription: Subscription) {
    if (this.subscriptions.has(name)) {
      console.warn(`Subscription '${name}' already exists`);
    }
    this.subscriptions.set(name, subscription);
  }
  
  check() {
    const active = Array.from(this.subscriptions.entries())
      .filter(([_, sub]) => !sub.closed);
    
    if (active.length > 0) {
      console.warn(`${active.length} active subscriptions:`, 
        active.map(([name]) => name));
    }
    return active.length;
  }
  
  cleanup() {
    this.subscriptions.forEach(sub => sub.unsubscribe());
    this.subscriptions.clear();
  }
}

// Usage
const detector = new LeakDetector();

detector.track('interval', interval(1000).subscribe());
detector.track('http', http.get('/api').subscribe());

// Check for leaks before component destruction
detector.check();  // Warns if subscriptions still active
detector.cleanup();  // Clean up all

Example: shareReplay memory leak prevention

// MEMORY LEAK - shareReplay without refCount
const data$ = this.http.get('/api/data').pipe(
  shareReplay(1)  // Keeps subscription alive forever
);

// Multiple components subscribe
data$.subscribe(d1 => console.log(d1));
data$.subscribe(d2 => console.log(d2));
// Even after components destroy, HTTP subscription remains

// FIXED - use refCount to allow cleanup
const data$ = this.http.get('/api/data').pipe(
  shareReplay({ bufferSize: 1, refCount: true })
  // Unsubscribes from source when no subscribers
);

// BETTER - add time limit for cache
const data$ = this.http.get('/api/data').pipe(
  shareReplay({ 
    bufferSize: 1, 
    refCount: true,
    windowTime: 60000  // Cache for 60 seconds
  })
);

// BEST - complete observable pattern
const data$ = this.http.get('/api/data').pipe(
  shareReplay({ bufferSize: 1, refCount: true }),
  takeUntil(this.serviceDestroy$)  // Complete on service destroy
);

4. Cold vs Hot Observable Performance Characteristics

Characteristic Cold Observable Hot Observable Performance Impact
Execution Per subscriber (unicast) Shared (multicast) Hot avoids duplicate work
Memory N × subscription overhead 1 × shared overhead Hot uses less memory
Side Effects Repeated per subscription Executed once Hot prevents duplicate API calls
Late Subscribers Get full sequence Miss past emissions Consider with shareReplay
When to Use Independent data per subscriber Shared data across subscribers Match pattern to use case

Example: Cold observable behavior

// Cold observable - separate execution per subscriber
const cold$ = new Observable(subscriber => {
  console.log('Observable execution started');
  
  // Expensive operation
  const data = fetchExpensiveData();
  subscriber.next(data);
  subscriber.complete();
});

// Each subscription triggers new execution
cold$.subscribe(data => console.log('Sub 1:', data));
// Console: "Observable execution started"
// Console: "Sub 1: data"

cold$.subscribe(data => console.log('Sub 2:', data));
// Console: "Observable execution started" (again!)
// Console: "Sub 2: data"

// Performance issue: fetchExpensiveData() called twice!

Example: Converting cold to hot for better performance

// Convert cold to hot with share()
const hot$ = new Observable(subscriber => {
  console.log('Observable execution started');
  const data = fetchExpensiveData();
  subscriber.next(data);
  subscriber.complete();
}).pipe(
  share()  // Multicast to all subscribers
);

// First subscription triggers execution
hot$.subscribe(data => console.log('Sub 1:', data));
// Console: "Observable execution started"
// Console: "Sub 1: data"

// Second subscription reuses same execution
hot$.subscribe(data => console.log('Sub 2:', data));
// Console: "Sub 2: data" (no duplicate execution!)

// Performance benefit: fetchExpensiveData() called only once

// Use shareReplay to cache for late subscribers
const cached$ = expensive$.pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);

// Even late subscribers get cached result
setTimeout(() => {
  cached$.subscribe(data => console.log('Late sub:', data));
  // Gets cached data without re-execution
}, 5000);

Example: Real-world HTTP optimization

// Service with cold HTTP observable
class UserService {
  getUsers() {
    return this.http.get<User[]>('/api/users');
    // Cold - each subscription = new HTTP request
  }
}

// Component using service
class UserListComponent {
  users$ = this.userService.getUsers();
  
  ngOnInit() {
    // Multiple subscriptions
    this.users$.subscribe(u => this.userCount = u.length);
    this.users$.subscribe(u => this.firstUser = u[0]);
    this.users$.subscribe(u => this.lastUser = u[u.length - 1]);
    // Problem: 3 HTTP requests for same data!
  }
}

// OPTIMIZED - share the observable
class UserService {
  getUsers() {
    return this.http.get<User[]>('/api/users').pipe(
      shareReplay({ bufferSize: 1, refCount: true })
    );
    // Hot - single HTTP request, shared result
  }
}

// Now component makes only 1 HTTP request
// All 3 subscriptions share the same response

5. Share Strategy Selection (share, shareReplay, refCount)

Operator Behavior Cache Best For
share() Multicast while subscribers exist No replay Real-time streams, live data
shareReplay(1) Cache last value forever Infinite Config data, rarely changes (with refCount!)
shareReplay({refCount: true}) Cache while subscribers, cleanup when none Temporary API responses, user data
shareReplay({windowTime}) Cache for time duration Time-limited Polling data, time-sensitive cache
publish() + refCount() Manual multicast control Custom Advanced scenarios, custom logic

Example: Choosing the right sharing strategy

// share() - for real-time data (WebSocket, events)
const clicks$ = fromEvent(button, 'click').pipe(
  share()  // Share emissions while any subscriber exists
);

clicks$.subscribe(e => console.log('Handler 1'));
clicks$.subscribe(e => console.log('Handler 2'));
// Both handlers receive same click events

// shareReplay(1) - for static/config data
const config$ = this.http.get('/api/config').pipe(
  shareReplay({ bufferSize: 1, refCount: true })
  // IMPORTANT: use refCount to prevent memory leaks
);

// Late subscribers get cached config
config$.subscribe(c => this.applyConfig(c));
setTimeout(() => {
  config$.subscribe(c => console.log('Cached:', c));
}, 5000);

// shareReplay with windowTime - for time-sensitive data
const stockPrice$ = this.api.getStockPrice('AAPL').pipe(
  shareReplay({ 
    bufferSize: 1, 
    refCount: true,
    windowTime: 10000  // Cache for 10 seconds
  })
);

// Subscribers within 10s get cached price
// After 10s, new request is made

Example: Share strategy comparison

// Test observable that logs executions
const test$ = defer(() => {
  console.log('Execution!');
  return of(Math.random());
});

// 1. No sharing - multiple executions
console.log('--- No sharing ---');
test$.subscribe(v => console.log('A:', v));  // Execution! A: 0.123
test$.subscribe(v => console.log('B:', v));  // Execution! B: 0.456

// 2. share() - single execution, no replay
console.log('--- share() ---');
const shared$ = test$.pipe(share());
shared$.subscribe(v => console.log('A:', v));  // Execution! A: 0.789
shared$.subscribe(v => console.log('B:', v));  // B: 0.789 (same value)
setTimeout(() => {
  shared$.subscribe(v => console.log('C:', v));  // Execution! C: 0.321 (new)
}, 100);

// 3. shareReplay() - single execution, replay to late subscribers
console.log('--- shareReplay ---');
const replayed$ = test$.pipe(shareReplay(1));
replayed$.subscribe(v => console.log('A:', v));  // Execution! A: 0.555
replayed$.subscribe(v => console.log('B:', v));  // B: 0.555 (replayed)
setTimeout(() => {
  replayed$.subscribe(v => console.log('C:', v));  // C: 0.555 (replayed!)
}, 100);

// 4. shareReplay with refCount - cleanup when no subscribers
console.log('--- shareReplay with refCount ---');
const refCounted$ = test$.pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);
const sub1 = refCounted$.subscribe(v => console.log('A:', v));  // Execution!
const sub2 = refCounted$.subscribe(v => console.log('B:', v));  // Replayed
sub1.unsubscribe();
sub2.unsubscribe();  // All unsubscribed - source cleaned up
setTimeout(() => {
  refCounted$.subscribe(v => console.log('C:', v));  // Execution! (new)
}, 100);

6. Backpressure Handling and Buffer Management

Strategy Operator Behavior Use Case
Sampling sample(), sampleTime() Periodically emit most recent value Fast producers, slow consumers
Throttling throttleTime(), throttle() Emit first, ignore rest for duration Rate limiting, preventing spam
Debouncing debounceTime(), debounce() Emit after silence period Search input, form validation
Buffering buffer(), bufferTime() Collect values, emit batches Batch API requests, analytics
Windowing window(), windowTime() Group into observable windows Complex batching logic
Dropping take(), first(), filter() Selectively drop emissions Limiting data volume

Example: Handling fast producers with sampling

// Problem: Mouse moves emit 100+ events per second
const mouseMoves$ = fromEvent<MouseEvent>(document, 'mousemove');

// BAD - processes every event (performance issue)
mouseMoves$.subscribe(e => {
  updateUI(e.clientX, e.clientY);  // Called 100+ times/sec
});

// GOOD - sample at reasonable rate
mouseMoves$.pipe(
  sampleTime(50)  // Sample every 50ms (20 updates/sec)
).subscribe(e => {
  updateUI(e.clientX, e.clientY);  // Called 20 times/sec
});

// ALTERNATIVE - throttle (emit first, ignore rest)
mouseMoves$.pipe(
  throttleTime(50, undefined, { leading: true, trailing: false })
).subscribe(e => {
  updateUI(e.clientX, e.clientY);
});

Example: Buffering for batch processing

// Collect analytics events and send in batches
const analyticsEvent$ = new Subject<AnalyticsEvent>();

// Buffer events and send every 10 seconds or 100 events
analyticsEvent$.pipe(
  bufferTime(10000, null, 100),  // Time OR count
  filter(batch => batch.length > 0),
  mergeMap(batch => this.api.sendAnalytics(batch))
).subscribe(
  response => console.log('Batch sent:', response),
  error => console.error('Batch failed:', error)
);

// Track events
analyticsEvent$.next({ type: 'click', target: 'button1' });
analyticsEvent$.next({ type: 'view', page: '/home' });
// ... events accumulate and sent in batch

// Alternative: buffer with custom trigger
const flushTrigger$ = new Subject<void>();

analyticsEvent$.pipe(
  buffer(flushTrigger$),  // Buffer until trigger emits
  filter(batch => batch.length > 0)
).subscribe(batch => this.api.sendAnalytics(batch));

// Manually trigger flush
flushTrigger$.next();  // Sends accumulated events

Example: Backpressure with concurrency control

// Problem: Processing 1000 files simultaneously
const files$ = from(largeFileArray);  // 1000+ files

// BAD - processes all concurrently (memory issue)
files$.pipe(
  mergeMap(file => this.processFile(file))
).subscribe();  // 1000+ concurrent operations!

// GOOD - limit concurrency
files$.pipe(
  mergeMap(file => this.processFile(file), 5)  // Max 5 concurrent
).subscribe(
  result => console.log('Processed:', result),
  error => console.error('Error:', error)
);

// ALTERNATIVE - use concatMap for sequential (slow but safe)
files$.pipe(
  concatMap(file => this.processFile(file))  // One at a time
).subscribe();

// ADVANCED - adaptive concurrency based on system load
let currentConcurrency = 5;

files$.pipe(
  mergeMap(file => 
    this.processFile(file).pipe(
      tap(() => {
        // Adjust based on performance
        if (performance.now() % 1000 < 500) {
          currentConcurrency = Math.min(10, currentConcurrency + 1);
        } else {
          currentConcurrency = Math.max(1, currentConcurrency - 1);
        }
      })
    ),
    currentConcurrency
  )
).subscribe();

Example: Preventing queue overflow

// Queue with overflow protection
class BoundedQueue<T> {
  private queue$ = new Subject<T>();
  private queueSize = 0;
  private maxSize = 100;
  
  enqueue(item: T): boolean {
    if (this.queueSize >= this.maxSize) {
      console.warn('Queue full, dropping item');
      return false;  // Drop item
    }
    
    this.queueSize++;
    this.queue$.next(item);
    return true;
  }
  
  process(processor: (item: T) => Observable<any>, concurrency = 1) {
    return this.queue$.pipe(
      mergeMap(item => 
        processor(item).pipe(
          finalize(() => this.queueSize--)
        ),
        concurrency
      )
    );
  }
}

// Usage
const queue = new BoundedQueue<Task>();

queue.process(task => this.processTask(task), 3)
  .subscribe(result => console.log('Processed:', result));

// Add tasks
tasks.forEach(task => {
  if (!queue.enqueue(task)) {
    console.error('Task dropped:', task);
  }
});
Performance Best Practices:
  • Always unsubscribe from long-lived observables
  • Use takeUntil for automatic cleanup in components
  • Choose correct flattening operator for your use case
  • Share expensive computations with share/shareReplay
  • Use refCount: true with shareReplay to prevent leaks
  • Apply backpressure strategies for high-volume streams
  • Limit concurrency in mergeMap for resource-intensive operations
  • Monitor memory with Chrome DevTools heap snapshots
Common Performance Issues:
  • Missing unsubscribe - number one cause of memory leaks
  • shareReplay without refCount - subscriptions never cleanup
  • Nested subscriptions - hard to manage, causes leaks
  • Wrong flattening operator - unnecessary work or race conditions
  • No concurrency limits - resource exhaustion
  • Unbounded buffers - memory overflow
  • Hot observables without cleanup - event listeners pile up

Section 15 Summary

  • Subscription management via takeUntil, containers, or finalize prevents leaks
  • Operator ordering and early termination significantly improve performance
  • Memory leaks from unsubscribed observables detectable via DevTools profiling
  • Cold observables repeat work; hot observables share - choose wisely
  • Sharing strategies - share() for real-time, shareReplay() for caching (with refCount!)
  • Backpressure managed via sampling, throttling, buffering, or concurrency limits