Performance Optimization and Memory Management
1. Subscription Management and Automatic Cleanup
| Pattern | Implementation | Description | Use Case |
|---|---|---|---|
| Manual Unsubscribe | subscription.unsubscribe() |
Explicitly clean up single subscription | Simple component cleanup |
| Subscription Container | sub.add(sub1); sub.add(sub2) |
Compose multiple subscriptions | Managing multiple subscriptions together |
| takeUntil Pattern | takeUntil(destroy$) |
Auto-unsubscribe based on notifier | Component lifecycle management |
| async Pipe (Angular) | {{ obs$ | async }} |
Framework handles subscription lifecycle | Template-driven subscriptions |
| SubSink Pattern | Custom subscription manager class | Centralized subscription management | Complex components with many subscriptions |
| finalize Operator | finalize(() => cleanup()) |
Execute cleanup on unsubscribe or complete | Resource cleanup (timers, connections) |
Example: Subscription container pattern
class MyComponent {
private subscriptions = new Subscription();
ngOnInit() {
// Add multiple subscriptions to container
this.subscriptions.add(
interval(1000).subscribe(n => console.log(n))
);
this.subscriptions.add(
fromEvent(button, 'click').subscribe(e => this.handleClick(e))
);
this.subscriptions.add(
this.dataService.getData().subscribe(data => this.data = data)
);
}
ngOnDestroy() {
// Unsubscribe all at once
this.subscriptions.unsubscribe();
}
}
Example: takeUntil pattern for automatic cleanup
class MyComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
// All subscriptions auto-complete when destroy$ emits
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(n => console.log(n));
this.userService.getUser().pipe(
takeUntil(this.destroy$)
).subscribe(user => this.user = user);
fromEvent(document, 'click').pipe(
takeUntil(this.destroy$)
).subscribe(e => this.handleClick(e));
}
ngOnDestroy() {
// Single emission completes all subscriptions
this.destroy$.next();
this.destroy$.complete();
}
}
Example: SubSink pattern for advanced subscription management
// Custom SubSink class
class SubSink {
private subscriptions: Subscription[] = [];
add(...subs: Subscription[]) {
this.subscriptions.push(...subs);
}
unsubscribe() {
this.subscriptions.forEach(sub => sub.unsubscribe());
this.subscriptions = [];
}
get count() {
return this.subscriptions.length;
}
}
// Usage in component
class MyComponent {
private subs = new SubSink();
ngOnInit() {
this.subs.add(
timer(1000).subscribe(() => console.log('tick')),
this.api.getData().subscribe(data => this.data = data),
this.route.params.subscribe(params => this.id = params.id)
);
console.log(`Active subscriptions: ${this.subs.count}`);
}
ngOnDestroy() {
this.subs.unsubscribe();
}
}
2. Operator Chain Optimization and Pipeline Efficiency
| Optimization | Inefficient | Efficient | Benefit |
|---|---|---|---|
| Operator Order | map().filter() |
filter().map() |
Reduce unnecessary transformations |
| Early Termination | map().map().take(1) |
take(1).map().map() |
Stop processing early |
| Shared Computation | Multiple subscriptions | share() or shareReplay() |
Avoid duplicate work |
| Flattening Strategy | Wrong operator choice | switchMap vs mergeMap vs concatMap | Match concurrency to use case |
| Avoid Nested Subscribes | subscribe(x => obs.subscribe()) |
switchMap/mergeMap/concatMap |
Cleaner code, better memory management |
| Memoization | Recalculate every time | Cache expensive operations | Reduce computation overhead |
Example: Operator ordering optimization
// INEFFICIENT - processes all 1000 items before filtering
range(1, 1000).pipe(
map(n => expensiveTransform(n)), // 1000 operations
filter(n => n > 500) // Then filters
).subscribe();
// EFFICIENT - filters first, reducing transformations
range(1, 1000).pipe(
filter(n => n > 500), // Filters to ~500 items
map(n => expensiveTransform(n)) // Only 500 operations
).subscribe();
// EVEN BETTER - early termination
range(1, 1000).pipe(
filter(n => n > 500),
take(10), // Stop after 10
map(n => expensiveTransform(n)) // Only 10 operations
).subscribe();
Example: Avoiding nested subscriptions
// ANTI-PATTERN - nested subscriptions (hard to manage, memory leaks)
this.userService.getUser(userId).subscribe(user => {
this.orderService.getOrders(user.id).subscribe(orders => {
this.productService.getProducts(orders[0].id).subscribe(products => {
console.log(products);
});
});
});
// BETTER - flattening operators
this.userService.getUser(userId).pipe(
switchMap(user => this.orderService.getOrders(user.id)),
switchMap(orders => this.productService.getProducts(orders[0].id))
).subscribe(products => {
console.log(products);
});
// BEST - with error handling and cleanup
this.userService.getUser(userId).pipe(
switchMap(user => this.orderService.getOrders(user.id)),
switchMap(orders => this.productService.getProducts(orders[0].id)),
catchError(err => {
console.error('Error:', err);
return of([]);
}),
takeUntil(this.destroy$)
).subscribe(products => {
console.log(products);
});
Example: Choosing the right flattening operator
// switchMap - cancels previous, keeps latest (search/autocomplete)
searchInput$.pipe(
debounceTime(300),
switchMap(term => this.api.search(term)) // Cancel old searches
).subscribe(results => displayResults(results));
// mergeMap - concurrent processing (independent operations)
userIds$.pipe(
mergeMap(id => this.api.getUser(id), 5) // Max 5 concurrent
).subscribe(user => processUser(user));
// concatMap - sequential, ordered (must complete in order)
queue$.pipe(
concatMap(task => this.processTask(task)) // One at a time
).subscribe(result => console.log(result));
// exhaustMap - ignore while busy (prevent duplicate submissions)
saveButton$.pipe(
exhaustMap(() => this.api.save(data)) // Ignore clicks while saving
).subscribe(response => console.log('Saved:', response));
3. Memory Leak Detection and Prevention
| Leak Source | Symptom | Prevention | Detection Method |
|---|---|---|---|
| Unsubscribed Observables | Growing memory usage | Always unsubscribe or use takeUntil | Chrome DevTools memory profiler |
| Long-lived Subjects | Retained observers | Complete subjects, unsubscribe | Check observer count |
| Event Listeners | DOM nodes not GC'd | Use fromEvent with takeUntil | Event listener count in DevTools |
| Timers/Intervals | Background activity | Unsubscribe from interval/timer | Network/console activity when idle |
| shareReplay Misuse | Cached subscriptions never release | Use refCount: true and windowTime | Memory snapshots over time |
| Closure Captures | Large objects retained | Limit closure scope, use finalize | Heap snapshot analysis |
Example: Common memory leak scenarios and fixes
// MEMORY LEAK - interval never stops
class LeakyComponent {
ngOnInit() {
interval(1000).subscribe(n => console.log(n));
// Leak: subscription never cleaned up
}
}
// FIXED - properly managed subscription
class FixedComponent implements OnDestroy {
private destroy$ = new Subject<void>();
ngOnInit() {
interval(1000).pipe(
takeUntil(this.destroy$)
).subscribe(n => console.log(n));
}
ngOnDestroy() {
this.destroy$.next();
this.destroy$.complete();
}
}
// MEMORY LEAK - Subject with many subscribers never completes
class DataService {
private data$ = new Subject<any>();
subscribe() {
return this.data$.subscribe(); // Subscribers accumulate
}
// Missing: complete() or cleanup mechanism
}
// FIXED - proper Subject lifecycle
class DataService implements OnDestroy {
private data$ = new Subject<any>();
subscribe() {
return this.data$.asObservable();
}
ngOnDestroy() {
this.data$.complete(); // Release all subscribers
}
}
Example: Detecting memory leaks
// Add leak detection helper
class LeakDetector {
private subscriptions = new Map<string, Subscription>();
track(name: string, subscription: Subscription) {
if (this.subscriptions.has(name)) {
console.warn(`Subscription '${name}' already exists`);
}
this.subscriptions.set(name, subscription);
}
check() {
const active = Array.from(this.subscriptions.entries())
.filter(([_, sub]) => !sub.closed);
if (active.length > 0) {
console.warn(`${active.length} active subscriptions:`,
active.map(([name]) => name));
}
return active.length;
}
cleanup() {
this.subscriptions.forEach(sub => sub.unsubscribe());
this.subscriptions.clear();
}
}
// Usage
const detector = new LeakDetector();
detector.track('interval', interval(1000).subscribe());
detector.track('http', http.get('/api').subscribe());
// Check for leaks before component destruction
detector.check(); // Warns if subscriptions still active
detector.cleanup(); // Clean up all
Example: shareReplay memory leak prevention
// MEMORY LEAK - shareReplay without refCount
const data$ = this.http.get('/api/data').pipe(
shareReplay(1) // Keeps subscription alive forever
);
// Multiple components subscribe
data$.subscribe(d1 => console.log(d1));
data$.subscribe(d2 => console.log(d2));
// Even after components destroy, HTTP subscription remains
// FIXED - use refCount to allow cleanup
const data$ = this.http.get('/api/data').pipe(
shareReplay({ bufferSize: 1, refCount: true })
// Unsubscribes from source when no subscribers
);
// BETTER - add time limit for cache
const data$ = this.http.get('/api/data').pipe(
shareReplay({
bufferSize: 1,
refCount: true,
windowTime: 60000 // Cache for 60 seconds
})
);
// BEST - complete observable pattern
const data$ = this.http.get('/api/data').pipe(
shareReplay({ bufferSize: 1, refCount: true }),
takeUntil(this.serviceDestroy$) // Complete on service destroy
);
4. Cold vs Hot Observable Performance Characteristics
| Characteristic | Cold Observable | Hot Observable | Performance Impact |
|---|---|---|---|
| Execution | Per subscriber (unicast) | Shared (multicast) | Hot avoids duplicate work |
| Memory | N × subscription overhead | 1 × shared overhead | Hot uses less memory |
| Side Effects | Repeated per subscription | Executed once | Hot prevents duplicate API calls |
| Late Subscribers | Get full sequence | Miss past emissions | Consider with shareReplay |
| When to Use | Independent data per subscriber | Shared data across subscribers | Match pattern to use case |
Example: Cold observable behavior
// Cold observable - separate execution per subscriber
const cold$ = new Observable(subscriber => {
console.log('Observable execution started');
// Expensive operation
const data = fetchExpensiveData();
subscriber.next(data);
subscriber.complete();
});
// Each subscription triggers new execution
cold$.subscribe(data => console.log('Sub 1:', data));
// Console: "Observable execution started"
// Console: "Sub 1: data"
cold$.subscribe(data => console.log('Sub 2:', data));
// Console: "Observable execution started" (again!)
// Console: "Sub 2: data"
// Performance issue: fetchExpensiveData() called twice!
Example: Converting cold to hot for better performance
// Convert cold to hot with share()
const hot$ = new Observable(subscriber => {
console.log('Observable execution started');
const data = fetchExpensiveData();
subscriber.next(data);
subscriber.complete();
}).pipe(
share() // Multicast to all subscribers
);
// First subscription triggers execution
hot$.subscribe(data => console.log('Sub 1:', data));
// Console: "Observable execution started"
// Console: "Sub 1: data"
// Second subscription reuses same execution
hot$.subscribe(data => console.log('Sub 2:', data));
// Console: "Sub 2: data" (no duplicate execution!)
// Performance benefit: fetchExpensiveData() called only once
// Use shareReplay to cache for late subscribers
const cached$ = expensive$.pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
// Even late subscribers get cached result
setTimeout(() => {
cached$.subscribe(data => console.log('Late sub:', data));
// Gets cached data without re-execution
}, 5000);
Example: Real-world HTTP optimization
// Service with cold HTTP observable
class UserService {
getUsers() {
return this.http.get<User[]>('/api/users');
// Cold - each subscription = new HTTP request
}
}
// Component using service
class UserListComponent {
users$ = this.userService.getUsers();
ngOnInit() {
// Multiple subscriptions
this.users$.subscribe(u => this.userCount = u.length);
this.users$.subscribe(u => this.firstUser = u[0]);
this.users$.subscribe(u => this.lastUser = u[u.length - 1]);
// Problem: 3 HTTP requests for same data!
}
}
// OPTIMIZED - share the observable
class UserService {
getUsers() {
return this.http.get<User[]>('/api/users').pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
// Hot - single HTTP request, shared result
}
}
// Now component makes only 1 HTTP request
// All 3 subscriptions share the same response
5. Share Strategy Selection (share, shareReplay, refCount)
| Operator | Behavior | Cache | Best For |
|---|---|---|---|
| share() | Multicast while subscribers exist | No replay | Real-time streams, live data |
| shareReplay(1) | Cache last value forever | Infinite | Config data, rarely changes (with refCount!) |
| shareReplay({refCount: true}) | Cache while subscribers, cleanup when none | Temporary | API responses, user data |
| shareReplay({windowTime}) | Cache for time duration | Time-limited | Polling data, time-sensitive cache |
| publish() + refCount() | Manual multicast control | Custom | Advanced scenarios, custom logic |
Example: Choosing the right sharing strategy
// share() - for real-time data (WebSocket, events)
const clicks$ = fromEvent(button, 'click').pipe(
share() // Share emissions while any subscriber exists
);
clicks$.subscribe(e => console.log('Handler 1'));
clicks$.subscribe(e => console.log('Handler 2'));
// Both handlers receive same click events
// shareReplay(1) - for static/config data
const config$ = this.http.get('/api/config').pipe(
shareReplay({ bufferSize: 1, refCount: true })
// IMPORTANT: use refCount to prevent memory leaks
);
// Late subscribers get cached config
config$.subscribe(c => this.applyConfig(c));
setTimeout(() => {
config$.subscribe(c => console.log('Cached:', c));
}, 5000);
// shareReplay with windowTime - for time-sensitive data
const stockPrice$ = this.api.getStockPrice('AAPL').pipe(
shareReplay({
bufferSize: 1,
refCount: true,
windowTime: 10000 // Cache for 10 seconds
})
);
// Subscribers within 10s get cached price
// After 10s, new request is made
Example: Share strategy comparison
// Test observable that logs executions
const test$ = defer(() => {
console.log('Execution!');
return of(Math.random());
});
// 1. No sharing - multiple executions
console.log('--- No sharing ---');
test$.subscribe(v => console.log('A:', v)); // Execution! A: 0.123
test$.subscribe(v => console.log('B:', v)); // Execution! B: 0.456
// 2. share() - single execution, no replay
console.log('--- share() ---');
const shared$ = test$.pipe(share());
shared$.subscribe(v => console.log('A:', v)); // Execution! A: 0.789
shared$.subscribe(v => console.log('B:', v)); // B: 0.789 (same value)
setTimeout(() => {
shared$.subscribe(v => console.log('C:', v)); // Execution! C: 0.321 (new)
}, 100);
// 3. shareReplay() - single execution, replay to late subscribers
console.log('--- shareReplay ---');
const replayed$ = test$.pipe(shareReplay(1));
replayed$.subscribe(v => console.log('A:', v)); // Execution! A: 0.555
replayed$.subscribe(v => console.log('B:', v)); // B: 0.555 (replayed)
setTimeout(() => {
replayed$.subscribe(v => console.log('C:', v)); // C: 0.555 (replayed!)
}, 100);
// 4. shareReplay with refCount - cleanup when no subscribers
console.log('--- shareReplay with refCount ---');
const refCounted$ = test$.pipe(
shareReplay({ bufferSize: 1, refCount: true })
);
const sub1 = refCounted$.subscribe(v => console.log('A:', v)); // Execution!
const sub2 = refCounted$.subscribe(v => console.log('B:', v)); // Replayed
sub1.unsubscribe();
sub2.unsubscribe(); // All unsubscribed - source cleaned up
setTimeout(() => {
refCounted$.subscribe(v => console.log('C:', v)); // Execution! (new)
}, 100);
6. Backpressure Handling and Buffer Management
| Strategy | Operator | Behavior | Use Case |
|---|---|---|---|
| Sampling | sample(), sampleTime() |
Periodically emit most recent value | Fast producers, slow consumers |
| Throttling | throttleTime(), throttle() |
Emit first, ignore rest for duration | Rate limiting, preventing spam |
| Debouncing | debounceTime(), debounce() |
Emit after silence period | Search input, form validation |
| Buffering | buffer(), bufferTime() |
Collect values, emit batches | Batch API requests, analytics |
| Windowing | window(), windowTime() |
Group into observable windows | Complex batching logic |
| Dropping | take(), first(), filter() |
Selectively drop emissions | Limiting data volume |
Example: Handling fast producers with sampling
// Problem: Mouse moves emit 100+ events per second
const mouseMoves$ = fromEvent<MouseEvent>(document, 'mousemove');
// BAD - processes every event (performance issue)
mouseMoves$.subscribe(e => {
updateUI(e.clientX, e.clientY); // Called 100+ times/sec
});
// GOOD - sample at reasonable rate
mouseMoves$.pipe(
sampleTime(50) // Sample every 50ms (20 updates/sec)
).subscribe(e => {
updateUI(e.clientX, e.clientY); // Called 20 times/sec
});
// ALTERNATIVE - throttle (emit first, ignore rest)
mouseMoves$.pipe(
throttleTime(50, undefined, { leading: true, trailing: false })
).subscribe(e => {
updateUI(e.clientX, e.clientY);
});
Example: Buffering for batch processing
// Collect analytics events and send in batches
const analyticsEvent$ = new Subject<AnalyticsEvent>();
// Buffer events and send every 10 seconds or 100 events
analyticsEvent$.pipe(
bufferTime(10000, null, 100), // Time OR count
filter(batch => batch.length > 0),
mergeMap(batch => this.api.sendAnalytics(batch))
).subscribe(
response => console.log('Batch sent:', response),
error => console.error('Batch failed:', error)
);
// Track events
analyticsEvent$.next({ type: 'click', target: 'button1' });
analyticsEvent$.next({ type: 'view', page: '/home' });
// ... events accumulate and sent in batch
// Alternative: buffer with custom trigger
const flushTrigger$ = new Subject<void>();
analyticsEvent$.pipe(
buffer(flushTrigger$), // Buffer until trigger emits
filter(batch => batch.length > 0)
).subscribe(batch => this.api.sendAnalytics(batch));
// Manually trigger flush
flushTrigger$.next(); // Sends accumulated events
Example: Backpressure with concurrency control
// Problem: Processing 1000 files simultaneously
const files$ = from(largeFileArray); // 1000+ files
// BAD - processes all concurrently (memory issue)
files$.pipe(
mergeMap(file => this.processFile(file))
).subscribe(); // 1000+ concurrent operations!
// GOOD - limit concurrency
files$.pipe(
mergeMap(file => this.processFile(file), 5) // Max 5 concurrent
).subscribe(
result => console.log('Processed:', result),
error => console.error('Error:', error)
);
// ALTERNATIVE - use concatMap for sequential (slow but safe)
files$.pipe(
concatMap(file => this.processFile(file)) // One at a time
).subscribe();
// ADVANCED - adaptive concurrency based on system load
let currentConcurrency = 5;
files$.pipe(
mergeMap(file =>
this.processFile(file).pipe(
tap(() => {
// Adjust based on performance
if (performance.now() % 1000 < 500) {
currentConcurrency = Math.min(10, currentConcurrency + 1);
} else {
currentConcurrency = Math.max(1, currentConcurrency - 1);
}
})
),
currentConcurrency
)
).subscribe();
Example: Preventing queue overflow
// Queue with overflow protection
class BoundedQueue<T> {
private queue$ = new Subject<T>();
private queueSize = 0;
private maxSize = 100;
enqueue(item: T): boolean {
if (this.queueSize >= this.maxSize) {
console.warn('Queue full, dropping item');
return false; // Drop item
}
this.queueSize++;
this.queue$.next(item);
return true;
}
process(processor: (item: T) => Observable<any>, concurrency = 1) {
return this.queue$.pipe(
mergeMap(item =>
processor(item).pipe(
finalize(() => this.queueSize--)
),
concurrency
)
);
}
}
// Usage
const queue = new BoundedQueue<Task>();
queue.process(task => this.processTask(task), 3)
.subscribe(result => console.log('Processed:', result));
// Add tasks
tasks.forEach(task => {
if (!queue.enqueue(task)) {
console.error('Task dropped:', task);
}
});
Performance Best Practices:
- Always unsubscribe from long-lived observables
- Use takeUntil for automatic cleanup in components
- Choose correct flattening operator for your use case
- Share expensive computations with share/shareReplay
- Use refCount: true with shareReplay to prevent leaks
- Apply backpressure strategies for high-volume streams
- Limit concurrency in mergeMap for resource-intensive operations
- Monitor memory with Chrome DevTools heap snapshots
Common Performance Issues:
- Missing unsubscribe - number one cause of memory leaks
- shareReplay without refCount - subscriptions never cleanup
- Nested subscriptions - hard to manage, causes leaks
- Wrong flattening operator - unnecessary work or race conditions
- No concurrency limits - resource exhaustion
- Unbounded buffers - memory overflow
- Hot observables without cleanup - event listeners pile up
Section 15 Summary
- Subscription management via takeUntil, containers, or finalize prevents leaks
- Operator ordering and early termination significantly improve performance
- Memory leaks from unsubscribed observables detectable via DevTools profiling
- Cold observables repeat work; hot observables share - choose wisely
- Sharing strategies - share() for real-time, shareReplay() for caching (with refCount!)
- Backpressure managed via sampling, throttling, buffering, or concurrency limits