Subjects and Multicasting Patterns

1. Subject for Multicast and Imperative Emission

Type Syntax Description Characteristics
Subject new Subject<T>() Both Observable and Observer - multicasts to multiple subscribers Hot observable, no initial value, no replay
next() subject.next(value) Imperative emission to all active subscribers Push values manually
error() subject.error(err) Emit error to all subscribers, terminates subject Cannot emit after error
complete() subject.complete() Notify completion to all subscribers, terminates subject Cannot emit after complete

Example: Subject basic usage and multicasting

import { Subject } from 'rxjs';

// Create Subject
const subject = new Subject<number>();

// Subscribe multiple observers
subject.subscribe(val => console.log('Observer A:', val));
subject.subscribe(val => console.log('Observer B:', val));

// Emit values imperatively
subject.next(1);
subject.next(2);
// Output:
// Observer A: 1
// Observer B: 1
// Observer A: 2
// Observer B: 2

// Late subscriber misses previous emissions
subject.next(3);
subject.subscribe(val => console.log('Observer C:', val));
subject.next(4);
// Output:
// Observer A: 3, Observer B: 3
// Observer A: 4, Observer B: 4, Observer C: 4

// Subject as Observer
const observable$ = interval(1000).pipe(take(3));
observable$.subscribe(subject); // Pipe observable into subject
subject.subscribe(val => console.log('Via subject:', val));

// Practical: Event bus
class EventBus {
  private subject = new Subject<any>();
  
  emit(event: any) {
    this.subject.next(event);
  }
  
  on(eventType: string) {
    return this.subject.pipe(
      filter(event => event.type === eventType)
    );
  }
}

const bus = new EventBus();
bus.on('user:login').subscribe(event => console.log('User logged in:', event.data));
bus.emit({ type: 'user:login', data: { userId: 123 } });

// Practical: Component communication
class DataService {
  private dataUpdated = new Subject<any>();
  
  dataUpdated$ = this.dataUpdated.asObservable(); // Expose as Observable
  
  updateData(data: any) {
    this.processData(data);
    this.dataUpdated.next(data); // Notify subscribers
  }
}

// Practical: Click stream
const clicks = new Subject<MouseEvent>();
clicks.subscribe(event => console.log('Clicked at:', event.clientX, event.clientY));

document.addEventListener('click', event => clicks.next(event));

// Complete subject to stop emissions
subject.complete();
subject.next(5); // This will not emit - subject is completed
Note: Subjects are hot observables - late subscribers don't receive previous emissions. Always call complete() to properly terminate and release resources.

2. BehaviorSubject for State Management with Initial Value

Feature Syntax Description Use Case
BehaviorSubject new BehaviorSubject<T>(initialValue) Requires initial value, replays current value to new subscribers State management, current values
getValue() subject.getValue() Synchronously returns current value (not available on regular Subject) Imperative state access
value property subject.value Read-only property for current value (same as getValue()) State inspection

Example: BehaviorSubject for state management

import { BehaviorSubject } from 'rxjs';

// Create with initial value
const count$ = new BehaviorSubject<number>(0);

// New subscriber immediately receives current value
count$.subscribe(val => console.log('Subscriber A:', val)); // 0

count$.next(1);
count$.next(2);
// Output: Subscriber A: 1, Subscriber A: 2

// Late subscriber gets latest value immediately
count$.subscribe(val => console.log('Subscriber B:', val)); // 2
count$.next(3);
// Output: Subscriber A: 3, Subscriber B: 3

// Synchronous value access
console.log('Current value:', count$.getValue()); // 3
console.log('Via property:', count$.value); // 3

// Practical: User authentication state
class AuthService {
  private isAuthenticatedSubject = new BehaviorSubject<boolean>(false);
  isAuthenticated$ = this.isAuthenticatedSubject.asObservable();
  
  login(credentials: any) {
    // Perform login...
    this.isAuthenticatedSubject.next(true);
  }
  
  logout() {
    this.isAuthenticatedSubject.next(false);
  }
  
  get isAuthenticated(): boolean {
    return this.isAuthenticatedSubject.value;
  }
}

const auth = new AuthService();
auth.isAuthenticated$.subscribe(status => 
  console.log('Auth status:', status)
); // Immediately: false

auth.login({ username: 'alice' }); // Emits: true

// Practical: Application state store
class Store<T> {
  private state$: BehaviorSubject<T>;
  
  constructor(initialState: T) {
    this.state$ = new BehaviorSubject<T>(initialState);
  }
  
  select<K extends keyof T>(key: K): Observable<T[K]> {
    return this.state$.pipe(
      map(state => state[key]),
      distinctUntilChanged()
    );
  }
  
  update(partialState: Partial<T>) {
    this.state$.next({ ...this.state$.value, ...partialState });
  }
  
  get snapshot(): T {
    return this.state$.value;
  }
}

const store = new Store({ user: null, loading: false, error: null });
store.select('loading').subscribe(loading => toggleSpinner(loading));
store.update({ loading: true });

// Practical: Form state
class FormState {
  private formValue$ = new BehaviorSubject<any>({
    name: '',
    email: '',
    age: 0
  });
  
  value$ = this.formValue$.asObservable();
  
  updateField(field: string, value: any) {
    const current = this.formValue$.value;
    this.formValue$.next({ ...current, [field]: value });
  }
  
  get currentValue() {
    return this.formValue$.value;
  }
  
  reset() {
    this.formValue$.next({ name: '', email: '', age: 0 });
  }
}

// Practical: Theme management
const theme$ = new BehaviorSubject<'light' | 'dark'>('light');
theme$.subscribe(theme => document.body.className = theme);
theme$.next('dark'); // Switch theme

// Practical: Derived state
const items$ = new BehaviorSubject<any[]>([]);
const itemCount$ = items$.pipe(map(items => items.length));
const hasItems$ = items$.pipe(map(items => items.length > 0));

itemCount$.subscribe(count => console.log('Count:', count)); // 0
items$.next([1, 2, 3]); // Count: 3

3. ReplaySubject for Historical Value Replay

Configuration Syntax Description Replay Behavior
Buffer size new ReplaySubject<T>(bufferSize) Replays last N emissions to new subscribers Stores N most recent values
Time window new ReplaySubject<T>(bufferSize, windowTime) Replays values emitted within time window (ms) Age-based filtering
Infinite replay new ReplaySubject<T>() Replays all values to new subscribers (memory warning!) Complete history

Example: ReplaySubject value replay

import { ReplaySubject } from 'rxjs';

// Replay last 3 values
const replay$ = new ReplaySubject<number>(3);

replay$.next(1);
replay$.next(2);
replay$.next(3);
replay$.next(4);

// New subscriber receives last 3 values
replay$.subscribe(val => console.log('Subscriber A:', val));
// Output: 1, 2, 3, 4 (wait, buffer is 3, so: 2, 3, 4)

replay$.next(5);
// Output: Subscriber A: 5

// Another late subscriber
replay$.subscribe(val => console.log('Subscriber B:', val));
// Output: Subscriber B: 3, 4, 5

// ReplaySubject with time window
const replayTime$ = new ReplaySubject<number>(100, 1000); // 100 items, 1 second

replayTime$.next(1);
setTimeout(() => replayTime$.next(2), 500);
setTimeout(() => replayTime$.next(3), 1500);

setTimeout(() => {
  replayTime$.subscribe(val => console.log('Timed:', val));
  // Only receives values within last 1 second
}, 2000);

// Practical: Chat message history
class ChatService {
  private messages$ = new ReplaySubject<Message>(50); // Last 50 messages
  
  messages = this.messages$.asObservable();
  
  sendMessage(message: Message) {
    this.messages$.next(message);
    this.saveToServer(message);
  }
  
  // New users joining see recent chat history
  joinChat() {
    return this.messages$; // Replays last 50 messages
  }
}

// Practical: Activity log
class ActivityLogger {
  private log$ = new ReplaySubject<LogEntry>(100, 5 * 60 * 1000); // Last 5 minutes
  
  logActivity(entry: LogEntry) {
    this.log$.next({ ...entry, timestamp: Date.now() });
  }
  
  getRecentActivity() {
    return this.log$.asObservable();
  }
}

// Practical: Undo/Redo functionality
class UndoRedoService {
  private actions$ = new ReplaySubject<Action>(20); // Last 20 actions
  
  recordAction(action: Action) {
    this.actions$.next(action);
  }
  
  getHistory() {
    const history: Action[] = [];
    this.actions$.subscribe(action => history.push(action));
    return history;
  }
}

// Practical: Real-time sensor data
class SensorMonitor {
  private readings$ = new ReplaySubject<Reading>(10, 60000); // Last 10, max 1 min old
  
  recordReading(reading: Reading) {
    this.readings$.next(reading);
  }
  
  getRecentReadings() {
    return this.readings$.pipe(
      scan((acc, reading) => [...acc, reading], [])
    );
  }
}

// Warning: Infinite replay (memory intensive)
const infiniteReplay$ = new ReplaySubject<number>(); // All values!
for (let i = 0; i < 10000; i++) {
  infiniteReplay$.next(i);
}
// New subscriber receives ALL 10,000 values - memory leak risk!

// Practical: Command history
class CommandHistory {
  private commands$ = new ReplaySubject<Command>(30);
  
  execute(command: Command) {
    command.execute();
    this.commands$.next(command);
  }
  
  getHistory(): Command[] {
    const history: Command[] = [];
    const sub = this.commands$.subscribe(cmd => history.push(cmd));
    sub.unsubscribe();
    return history;
  }
}
Warning: ReplaySubject without buffer size stores ALL values in memory. Always specify buffer size to prevent memory leaks.

4. AsyncSubject for Last Value Emission

Feature Syntax Description Emission Pattern
AsyncSubject new AsyncSubject<T>() Emits only the last value when completed Single emission on complete
Behavior Buffers last value until complete() Similar to Promise - emits once when "resolved" Late subscribers get same final value

Example: AsyncSubject final value emission

import { AsyncSubject } from 'rxjs';

// Create AsyncSubject
const async$ = new AsyncSubject<number>();

// Subscribe before emissions
async$.subscribe(val => console.log('Subscriber A:', val));

async$.next(1);
async$.next(2);
async$.next(3);
// No output yet - waiting for complete()

async$.complete(); // Triggers emission of last value
// Output: Subscriber A: 3

// Late subscriber still gets last value
async$.subscribe(val => console.log('Subscriber B:', val));
// Output: Subscriber B: 3

// Practical: One-time calculation result
class Calculator {
  calculate(operation: string): Observable<number> {
    const result$ = new AsyncSubject<number>();
    
    setTimeout(() => {
      const result = this.performCalculation(operation);
      result$.next(result);
      result$.complete(); // Emit final result
    }, 1000);
    
    return result$.asObservable();
  }
}

// Practical: Async initialization
class AppInitializer {
  private initialized$ = new AsyncSubject<boolean>();
  
  init() {
    Promise.all([
      this.loadConfig(),
      this.loadUserData(),
      this.connectServices()
    ]).then(() => {
      this.initialized$.next(true);
      this.initialized$.complete();
    });
    
    return this.initialized$.asObservable();
  }
}

const initializer = new AppInitializer();
initializer.init().subscribe(ready => {
  if (ready) console.log('App ready!');
});

// Practical: Final test result
class TestRunner {
  runTests(): Observable<TestResult> {
    const result$ = new AsyncSubject<TestResult>();
    
    this.executeTests().then(results => {
      const finalResult = this.aggregateResults(results);
      result$.next(finalResult);
      result$.complete();
    });
    
    return result$;
  }
}

// Practical: Migration completion
class DataMigration {
  migrate(): Observable<MigrationStatus> {
    const status$ = new AsyncSubject<MigrationStatus>();
    
    this.performMigration()
      .then(result => {
        status$.next({ 
          success: true, 
          recordsMigrated: result.count 
        });
        status$.complete();
      })
      .catch(err => {
        status$.error(err);
      });
    
    return status$;
  }
}

// Compare with Promise
const promise = new Promise(resolve => {
  setTimeout(() => resolve('Promise result'), 1000);
});

const asyncSubject = new AsyncSubject();
setTimeout(() => {
  asyncSubject.next('AsyncSubject result');
  asyncSubject.complete();
}, 1000);

// Both emit once when resolved/completed
promise.then(val => console.log(val));
asyncSubject.subscribe(val => console.log(val));

// Error handling
const errorAsync$ = new AsyncSubject<number>();
errorAsync$.subscribe({
  next: val => console.log('Value:', val),
  error: err => console.error('Error:', err)
});

errorAsync$.next(1);
errorAsync$.next(2);
errorAsync$.error(new Error('Failed'));
// Output: Error: Failed (no value emission)

5. share and shareReplay for Reference Counting

Operator Syntax Description Replay Behavior
share share(config?) Multicasts observable, shares single subscription among all subscribers No replay - late subscribers miss previous values
shareReplay shareReplay(bufferSize?, windowTime?) Like share but replays buffered values to new subscribers Replays last N values or time window
shareReplay (config) shareReplay({ bufferSize, refCount }) Configurable replay with reference counting control refCount: true completes when all unsubscribe

Example: Sharing and replay strategies

import { interval, of } from 'rxjs';
import { share, shareReplay, take, tap } from 'rxjs/operators';

// WITHOUT share - each subscriber creates new subscription
const cold$ = interval(1000).pipe(
  take(3),
  tap(val => console.log('Source emission:', val))
);

cold$.subscribe(val => console.log('Sub A:', val));
cold$.subscribe(val => console.log('Sub B:', val));
// Creates 2 separate interval instances!

// WITH share - single shared subscription
const hot$ = interval(1000).pipe(
  take(3),
  tap(val => console.log('Source emission:', val)),
  share()
);

hot$.subscribe(val => console.log('Sub A:', val));
hot$.subscribe(val => console.log('Sub B:', val));
// Single interval shared between subscribers

// shareReplay - late subscribers get history
const replay$ = interval(1000).pipe(
  take(5),
  tap(val => console.log('Source:', val)),
  shareReplay(2) // Buffer last 2 values
);

replay$.subscribe(val => console.log('Early:', val));

setTimeout(() => {
  replay$.subscribe(val => console.log('Late:', val));
  // Receives last 2 buffered values immediately
}, 3500);

// Practical: Expensive HTTP call sharing
const users$ = this.http.get('/api/users').pipe(
  shareReplay(1) // Cache result, share with all subscribers
);

// Multiple components can subscribe without duplicate requests
users$.subscribe(users => this.displayInComponent1(users));
users$.subscribe(users => this.displayInComponent2(users));
users$.subscribe(users => this.displayInComponent3(users));
// Only ONE HTTP request made!

// shareReplay with refCount
const data$ = this.http.get('/api/data').pipe(
  shareReplay({ bufferSize: 1, refCount: true })
);
// When last subscriber unsubscribes, cache is cleared

// Practical: WebSocket connection sharing
class WebSocketService {
  private connection$ = this.connectWebSocket().pipe(
    share() // Share single WebSocket connection
  );
  
  messages$ = this.connection$.pipe(
    filter(msg => msg.type === 'message')
  );
  
  notifications$ = this.connection$.pipe(
    filter(msg => msg.type === 'notification')
  );
}

// Practical: Current user sharing
class UserService {
  currentUser$ = this.http.get('/api/current-user').pipe(
    shareReplay({ bufferSize: 1, refCount: false })
    // Keeps cached even if all unsubscribe
  );
}

// share vs shareReplay comparison
const source$ = interval(1000).pipe(take(4));

// share - no replay
const shared$ = source$.pipe(share());
shared$.subscribe(val => console.log('A:', val));
setTimeout(() => {
  shared$.subscribe(val => console.log('B:', val)); // Misses early values
}, 2500);

// shareReplay - with replay
const sharedReplay$ = source$.pipe(shareReplay(2));
sharedReplay$.subscribe(val => console.log('A:', val));
setTimeout(() => {
  sharedReplay$.subscribe(val => console.log('B:', val)); // Gets last 2 values
}, 2500);

// Practical: Configuration sharing
const config$ = this.http.get('/api/config').pipe(
  shareReplay(1) // All components get same config
);

// Multiple feature modules
config$.subscribe(config => this.featureA.init(config));
config$.subscribe(config => this.featureB.init(config));
config$.subscribe(config => this.featureC.init(config));
Note: Use share() for hot observables without replay. Use shareReplay(1) for caching expensive operations like HTTP requests.

6. multicast and publish Operators for Custom Multicasting

Operator Syntax Description Connection
multicast multicast(subjectFactory) Multicasts using provided Subject/BehaviorSubject/ReplaySubject factory Returns ConnectableObservable - manual connect()
publish publish() Shorthand for multicast(() => new Subject()) Requires connect() to start
publishReplay publishReplay(bufferSize?) Shorthand for multicast(() => new ReplaySubject(bufferSize)) Replays to late subscribers
refCount publish().refCount() Auto-connects on first subscription, disconnects when count reaches 0 Automatic connection management

Example: Custom multicasting patterns

import { interval, Subject, ReplaySubject } from 'rxjs';
import { multicast, publish, publishReplay, refCount, take, tap } from 'rxjs/operators';

// multicast with Subject
const source$ = interval(1000).pipe(
  take(4),
  tap(val => console.log('Source:', val)),
  multicast(new Subject())
);

source$.subscribe(val => console.log('A:', val));
source$.subscribe(val => console.log('B:', val));

// Must call connect() to start
source$.connect();

// publish - simpler syntax
const published$ = interval(1000).pipe(
  take(3),
  publish()
);

const sub1 = published$.subscribe(val => console.log('Sub1:', val));
const sub2 = published$.subscribe(val => console.log('Sub2:', val));

published$.connect(); // Start emissions

// publishReplay - with buffering
const publishedReplay$ = interval(1000).pipe(
  take(5),
  publishReplay(2) // Buffer last 2
);

publishedReplay$.subscribe(val => console.log('Early:', val));
publishedReplay$.connect();

setTimeout(() => {
  publishedReplay$.subscribe(val => console.log('Late:', val));
  // Gets buffered values
}, 3500);

// refCount - automatic connection management
const autoCounted$ = interval(1000).pipe(
  take(5),
  tap(val => console.log('Source:', val)),
  publish(),
  refCount() // Auto-connect on first sub, auto-disconnect on zero subs
);

const subscription1 = autoCounted$.subscribe(val => console.log('A:', val));
// Source starts here (first subscriber)

setTimeout(() => {
  const subscription2 = autoCounted$.subscribe(val => console.log('B:', val));
  // Joins existing stream
}, 2000);

setTimeout(() => {
  subscription1.unsubscribe();
  // Source still running (subscription2 active)
}, 3000);

setTimeout(() => {
  subscription2.unsubscribe();
  // Source stops (zero subscribers)
}, 4000);

// Practical: Shared expensive operation
const expensiveCalc$ = of(null).pipe(
  tap(() => console.log('Expensive calculation...')),
  map(() => performExpensiveCalculation()),
  publishReplay(1),
  refCount()
);

expensiveCalc$.subscribe(result => console.log('User 1:', result));
expensiveCalc$.subscribe(result => console.log('User 2:', result));
// Only one calculation performed

// Practical: Controlled broadcast
class NotificationService {
  private notifications$ = new Subject<Notification>();
  
  private broadcast$ = this.notifications$.pipe(
    multicast(new ReplaySubject<Notification>(5))
  );
  
  constructor() {
    this.broadcast$.connect(); // Start broadcasting
  }
  
  send(notification: Notification) {
    this.notifications$.next(notification);
  }
  
  subscribe() {
    return this.broadcast$; // Subscribers get last 5 notifications
  }
}

// Modern alternative: share() is preferred
// Instead of: publish().refCount()
// Use: share()
const modern$ = interval(1000).pipe(
  take(5),
  share() // Equivalent to publish().refCount()
);

// Instead of: publishReplay(1).refCount()
// Use: shareReplay(1)
const modernReplay$ = interval(1000).pipe(
  take(5),
  shareReplay(1) // Equivalent to publishReplay(1).refCount()
);

Section 7 Summary

  • Subject is both Observable and Observer - multicast without replay
  • BehaviorSubject requires initial value, replays current value to new subscribers
  • ReplaySubject buffers N values, replays to late subscribers (specify buffer to avoid leaks)
  • AsyncSubject emits only last value on completion (Promise-like behavior)
  • share() for hot multicasting, shareReplay(1) for caching expensive operations
  • Modern pattern: prefer share/shareReplay over publish/multicast operators