Core Observable Lifecycle and Subscription Management

1. subscribe() Method and Observer Pattern

Method Syntax Description Use Case
subscribe(observer) obs$.subscribe({next, error, complete}) Full observer object with all callbacks Complete control over all lifecycle events
subscribe(nextFn) obs$.subscribe(val => {}) Shorthand for next callback only Simple value consumption without error handling
subscribe(next, error) obs$.subscribe(val => {}, err => {}) Next and error callbacks Value handling with error management
subscribe(next, error, complete) obs$.subscribe(val => {}, err => {}, () => {}) All three callbacks as parameters Full lifecycle handling in function form

Example: Different subscribe patterns

import { of, throwError, interval } from 'rxjs';
import { take } from 'rxjs/operators';

// Full observer object pattern
of(1, 2, 3).subscribe({
  next: value => console.log('Next:', value),
  error: err => console.error('Error:', err),
  complete: () => console.log('Complete!')
});

// Shorthand next only
of(10, 20, 30).subscribe(val => console.log(val));

// Next and error callbacks
throwError(() => new Error('Fail')).subscribe(
  val => console.log(val),
  err => console.error('Caught:', err.message)
);

// All three callbacks
interval(1000).pipe(take(3)).subscribe(
  val => console.log('Value:', val),
  err => console.error('Error:', err),
  () => console.log('Stream completed')
);
Note: Observer object pattern {next, error, complete} is preferred for clarity and avoids positional parameter confusion.

2. Subscription Object and unsubscribe() Method

Property/Method Syntax Description Behavior
Subscription const sub = obs$.subscribe() Object representing active subscription lifecycle Returned by subscribe(), manages cleanup
unsubscribe() sub.unsubscribe() Cancels subscription and triggers cleanup logic Stops emissions, runs teardown functions
closed sub.closed Boolean indicating if subscription is closed True after unsubscribe or completion

Example: Subscription lifecycle management

import { interval } from 'rxjs';

// Create and store subscription
const subscription = interval(1000).subscribe(
  val => console.log('Tick:', val)
);

// Check subscription state
console.log('Closed?', subscription.closed); // false

// Unsubscribe after 5 seconds
setTimeout(() => {
  subscription.unsubscribe();
  console.log('Closed?', subscription.closed); // true
}, 5000);

// Component lifecycle example
class MyComponent {
  private subscription: Subscription;
  
  onInit() {
    this.subscription = interval(1000).subscribe(
      val => this.updateUI(val)
    );
  }
  
  onDestroy() {
    // Always unsubscribe to prevent memory leaks
    this.subscription.unsubscribe();
  }
}
Warning: Always call unsubscribe() to prevent memory leaks, especially for long-running observables like intervals, timers, and event streams.

3. Subscription add() and remove() for Composition

Method Syntax Description Use Case
add() parentSub.add(childSub) Adds child subscription to parent - unsubscribing parent unsubscribes children Group multiple subscriptions for bulk cleanup
add(teardown) sub.add(() => cleanup()) Adds custom teardown function to subscription Custom cleanup logic (close connections, clear timers)
remove() parentSub.remove(childSub) Removes child subscription from parent without unsubscribing it Separate subscription management before parent cleanup

Example: Composing subscriptions

import { interval, fromEvent } from 'rxjs';

// Parent subscription
const parentSub = interval(1000).subscribe(
  val => console.log('Parent:', val)
);

// Add child subscriptions
const childSub1 = interval(500).subscribe(val => console.log('Child1:', val));
const childSub2 = fromEvent(document, 'click').subscribe(() => console.log('Click'));

parentSub.add(childSub1);
parentSub.add(childSub2);

// Add custom teardown logic
parentSub.add(() => console.log('Custom cleanup executed'));

// Unsubscribing parent unsubscribes all children
setTimeout(() => {
  parentSub.unsubscribe(); // Cleans up parent + all children
}, 5000);

// Practical component example
class DataService {
  private subscriptions = new Subscription();
  
  startStreams() {
    this.subscriptions.add(
      interval(1000).subscribe(val => this.process(val))
    );
    this.subscriptions.add(
      fromEvent(window, 'resize').subscribe(() => this.handleResize())
    );
  }
  
  stopStreams() {
    this.subscriptions.unsubscribe(); // Stops all streams
    this.subscriptions = new Subscription(); // Reset for reuse
  }
}

4. AutoUnsubscribe Patterns and Lifecycle Management

Pattern Implementation Description Framework
takeUntil() obs$.pipe(takeUntil(destroy$)) Automatically unsubscribe when notifier emits Framework-agnostic, manual control
async pipe {{ obs$ | async }} Template subscription with automatic cleanup Angular built-in
takeUntilDestroyed obs$.pipe(takeUntilDestroyed()) Angular v16+ automatic cleanup tied to component lifecycle Angular v16+
Decorator pattern @AutoUnsubscribe() Class decorator for automatic unsubscription Custom implementation

Example: AutoUnsubscribe patterns

import { interval, Subject } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

// takeUntil pattern (most common)
class Component {
  private destroy$ = new Subject<void>();
  
  ngOnInit() {
    interval(1000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(val => console.log(val));
    
    fromEvent(window, 'resize').pipe(
      takeUntil(this.destroy$)
    ).subscribe(() => this.handleResize());
  }
  
  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

// Angular async pipe (template)
// component.ts
export class MyComponent {
  data$ = this.http.get('/api/data');
}
// template.html
// <div>{{ data$ | async }}</div>

// Angular v16+ takeUntilDestroyed
import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

export class ModernComponent {
  constructor() {
    interval(1000).pipe(
      takeUntilDestroyed() // Automatically tied to component lifecycle
    ).subscribe(val => console.log(val));
  }
}

// Custom decorator pattern
function AutoUnsubscribe() {
  return function(constructor: Function) {
    const original = constructor.prototype.ngOnDestroy;
    constructor.prototype.ngOnDestroy = function() {
      for (let prop in this) {
        const property = this[prop];
        if (property && typeof property.unsubscribe === 'function') {
          property.unsubscribe();
        }
      }
      original?.apply(this);
    };
  };
}

5. Memory Leak Prevention and Subscription Cleanup

Issue Symptom Solution Prevention
Infinite observables Interval/timer continues after component destroyed Use takeUntil() or manual unsubscribe Always cleanup long-running streams
Event listeners DOM event handlers accumulate, slow performance Unsubscribe fromEvent subscriptions Use async pipe or takeUntil
HTTP requests Completed requests still held in memory Auto-completes, but use take(1) for clarity Consider using async pipe in templates
Subject retention Subjects hold references to observers Call complete() on subjects in cleanup Complete subjects in ngOnDestroy/cleanup

Example: Memory leak prevention

import { interval, fromEvent, Subject } from 'rxjs';
import { takeUntil, take } from 'rxjs/operators';

// BAD - Memory leak
class LeakyComponent {
  ngOnInit() {
    interval(1000).subscribe(val => console.log(val)); // Never stops!
    fromEvent(window, 'scroll').subscribe(() => {}); // Listener never removed!
  }
}

// GOOD - Proper cleanup
class SafeComponent {
  private destroy$ = new Subject<void>();
  private dataSubject$ = new Subject<any>();
  
  ngOnInit() {
    // Auto-cleanup with takeUntil
    interval(1000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(val => console.log(val));
    
    // Event listener cleanup
    fromEvent(window, 'scroll').pipe(
      takeUntil(this.destroy$)
    ).subscribe(() => this.handleScroll());
    
    // HTTP request - auto-completes but explicit is better
    this.http.get('/api/data').pipe(
      take(1) // Ensure single emission
    ).subscribe(data => this.processData(data));
  }
  
  ngOnDestroy() {
    // Cleanup pattern
    this.destroy$.next();
    this.destroy$.complete();
    
    // Complete subjects to release observers
    this.dataSubject$.complete();
  }
}

// Subscription tracking for debugging
class DebugComponent {
  private subs: Subscription[] = [];
  
  ngOnInit() {
    this.subs.push(interval(1000).subscribe());
    this.subs.push(fromEvent(window, 'click').subscribe());
    console.log('Active subscriptions:', this.subs.length);
  }
  
  ngOnDestroy() {
    this.subs.forEach(sub => sub.unsubscribe());
    console.log('Cleaned up', this.subs.length, 'subscriptions');
  }
}
Warning: Infinite observables (interval, timer, fromEvent) are the primary source of memory leaks. Always use takeUntil() or explicit unsubscribe.

6. Subscription Containers and Group Management

Pattern Implementation Description Advantage
Subscription bag subs = new Subscription() Single parent subscription containing all children Single unsubscribe call cleans everything
Array container subs: Subscription[] = [] Array of subscriptions for iteration Easy to track count and iterate
Map container subs = new Map<string, Subscription>() Named subscriptions for selective cleanup Selective unsubscribe by key
SubSink pattern subs.sink = obs$.subscribe() Third-party library for subscription management Simplified API, automatic cleanup

Example: Subscription container patterns

import { Subscription, interval, fromEvent } from 'rxjs';

// Subscription bag pattern (recommended)
class SubscriptionBagComponent {
  private subscriptions = new Subscription();
  
  ngOnInit() {
    this.subscriptions.add(
      interval(1000).subscribe(val => this.tick(val))
    );
    this.subscriptions.add(
      fromEvent(window, 'resize').subscribe(() => this.resize())
    );
  }
  
  ngOnDestroy() {
    this.subscriptions.unsubscribe();
  }
}

// Array container pattern
class ArrayContainerComponent {
  private subscriptions: Subscription[] = [];
  
  ngOnInit() {
    this.subscriptions.push(interval(1000).subscribe());
    this.subscriptions.push(fromEvent(document, 'click').subscribe());
  }
  
  ngOnDestroy() {
    this.subscriptions.forEach(sub => sub.unsubscribe());
    this.subscriptions = [];
  }
}

// Map container for selective cleanup
class MapContainerComponent {
  private subscriptions = new Map<string, Subscription>();
  
  ngOnInit() {
    this.subscriptions.set('timer', 
      interval(1000).subscribe(val => console.log(val))
    );
    this.subscriptions.set('clicks', 
      fromEvent(document, 'click').subscribe(() => console.log('click'))
    );
  }
  
  stopTimer() {
    this.subscriptions.get('timer')?.unsubscribe();
    this.subscriptions.delete('timer');
  }
  
  ngOnDestroy() {
    this.subscriptions.forEach(sub => sub.unsubscribe());
    this.subscriptions.clear();
  }
}

// Reusable subscription manager
class SubscriptionManager {
  private subs = new Subscription();
  
  add(subscription: Subscription): void {
    this.subs.add(subscription);
  }
  
  unsubscribeAll(): void {
    this.subs.unsubscribe();
    this.subs = new Subscription();
  }
  
  get count(): number {
    // Track active subscriptions (custom implementation)
    return Object.keys(this.subs).length;
  }
}

Section 2 Summary

  • subscribe() returns Subscription object for lifecycle management
  • unsubscribe() must be called to prevent memory leaks in long-running streams
  • add() method allows subscription composition for bulk cleanup
  • takeUntil(destroy$) pattern is preferred for automatic cleanup
  • Always cleanup: intervals, timers, event listeners, and complete subjects in destroy hooks
  • Use subscription containers (bag/array/map) for organized group management