Framework Integration Patterns

1. Angular RxJS Integration and async Pipe

Feature Syntax Description Benefit
async Pipe {{ observable$ | async }} Subscribe/unsubscribe automatically in template No manual subscription management
HttpClient http.get<T>(url) Returns observables for HTTP requests Built-in RxJS integration
Reactive Forms formControl.valueChanges Observable streams for form values Reactive form validation
Router Events router.events Observable stream of navigation events React to route changes
Event Emitters @Output() event = new EventEmitter() Component communication with observables Type-safe event handling
Service State BehaviorSubject in services Centralized state management Share state across components

Example: async pipe for automatic subscription management

// Component
@Component({
  selector: 'app-user-list',
  template: `
    <div *ngIf="users$ | async as users; else loading">
      <div *ngFor="let user of users">
        {{ user.name }}
      </div>
    </div>
    
    <ng-template #loading>
      <div>Loading...</div>
    </ng-template>
    
    <!-- Error handling -->
    <div *ngIf="error$ | async as error" class="error">
      {{ error }}
    </div>
  `
})
export class UserListComponent {
  users$: Observable<User[]>;
  error$ = new Subject<string>();
  
  constructor(private userService: UserService) {
    this.users$ = this.userService.getUsers().pipe(
      catchError(err => {
        this.error$.next(err.message);
        return of([]);
      })
    );
  }
  // No ngOnDestroy needed - async pipe handles cleanup!
}

Example: Reactive forms with RxJS

@Component({
  selector: 'app-search',
  template: `
    <input [formControl]="searchControl" placeholder="Search...">
    <div *ngFor="let result of results$ | async">
      {{ result.title }}
    </div>
  `
})
export class SearchComponent implements OnInit {
  searchControl = new FormControl('');
  results$: Observable<SearchResult[]>;
  
  constructor(private searchService: SearchService) {}
  
  ngOnInit() {
    this.results$ = this.searchControl.valueChanges.pipe(
      debounceTime(300),           // Wait for user to stop typing
      distinctUntilChanged(),      // Only if value changed
      filter(term => term.length >= 3),  // Minimum 3 characters
      switchMap(term => 
        this.searchService.search(term).pipe(
          catchError(() => of([]))  // Handle errors gracefully
        )
      ),
      shareReplay({ bufferSize: 1, refCount: true })  // Cache results
    );
  }
}

Example: Angular service with state management

// User service with BehaviorSubject
@Injectable({ providedIn: 'root' })
export class UserService {
  private currentUserSubject = new BehaviorSubject<User | null>(null);
  public currentUser$ = this.currentUserSubject.asObservable();
  
  private loadingSubject = new BehaviorSubject<boolean>(false);
  public loading$ = this.loadingSubject.asObservable();
  
  constructor(private http: HttpClient) {}
  
  loadCurrentUser() {
    this.loadingSubject.next(true);
    
    return this.http.get<User>('/api/user/current').pipe(
      tap(user => {
        this.currentUserSubject.next(user);
        this.loadingSubject.next(false);
      }),
      catchError(err => {
        this.loadingSubject.next(false);
        return throwError(() => err);
      })
    );
  }
  
  updateUser(user: User) {
    return this.http.put<User>(`/api/users/${user.id}`, user).pipe(
      tap(updatedUser => this.currentUserSubject.next(updatedUser))
    );
  }
  
  logout() {
    this.currentUserSubject.next(null);
  }
}

// Component using the service
@Component({
  template: `
    <div *ngIf="user$ | async as user">
      Welcome, {{ user.name }}!
    </div>
    <div *ngIf="loading$ | async">Loading...</div>
  `
})
export class HeaderComponent {
  user$ = this.userService.currentUser$;
  loading$ = this.userService.loading$;
  
  constructor(private userService: UserService) {}
}

2. React Hooks Integration with RxJS

Hook Pattern Purpose Implementation Use Case
useObservable Subscribe to observable useState + useEffect with observable Render observable values
useSubject Create managed subject useRef for subject + cleanup Event streams in components
useEventObservable Convert events to observable fromEvent with cleanup DOM event handling
useObservableState Observable-driven state BehaviorSubject + useState Complex state management
useRxEffect Side effects with observables useEffect with observable pipeline API calls, subscriptions

Example: Custom useObservable hook

// useObservable.ts
function useObservable<T>(
  observable$: Observable<T>, 
  initialValue: T
): T {
  const [value, setValue] = useState<T>(initialValue);
  
  useEffect(() => {
    const subscription = observable$.subscribe({
      next: setValue,
      error: err => console.error('Observable error:', err)
    });
    
    return () => subscription.unsubscribe();
  }, [observable$]);
  
  return value;
}

// Usage in component
function UserProfile({ userId }: { userId: string }) {
  const user$ = useMemo(
    () => userService.getUser(userId),
    [userId]
  );
  
  const user = useObservable(user$, null);
  
  if (!user) return <div>Loading...</div>;
  
  return (
    <div>
      <h1>{user.name}</h1>
      <p>{user.email}</p>
    </div>
  );
}

Example: useSubject for event handling

// useSubject.ts
function useSubject<T>(): [Observable<T>, (value: T) => void] {
  const subjectRef = useRef<Subject<T>>();
  
  if (!subjectRef.current) {
    subjectRef.current = new Subject<T>();
  }
  
  useEffect(() => {
    return () => {
      subjectRef.current?.complete();
    };
  }, []);
  
  const emit = useCallback((value: T) => {
    subjectRef.current?.next(value);
  }, []);
  
  return [subjectRef.current.asObservable(), emit];
}

// Usage: Search with debounce
function SearchComponent() {
  const [search$, emitSearch] = useSubject<string>();
  
  const results = useObservable(
    useMemo(() => search$.pipe(
      debounceTime(300),
      distinctUntilChanged(),
      switchMap(term => searchAPI(term))
    ), [search$]),
    []
  );
  
  return (
    <>
      <input 
        onChange={e => emitSearch(e.target.value)} 
        placeholder="Search..."
      />
      {results.map(r => <div key={r.id}>{r.title}</div>)}
    </>
  );
}

Example: Advanced observable state management

// useObservableState.ts
function useObservableState<T>(
  initialState: T
): [T, (value: T | ((prev: T) => T)) => void, Observable<T>] {
  const [state$] = useState(() => new BehaviorSubject<T>(initialState));
  const [state, setState] = useState<T>(initialState);
  
  useEffect(() => {
    const sub = state$.subscribe(setState);
    return () => {
      sub.unsubscribe();
      state$.complete();
    };
  }, [state$]);
  
  const updateState = useCallback((value: T | ((prev: T) => T)) => {
    const newValue = typeof value === 'function' 
      ? (value as (prev: T) => T)(state$.value)
      : value;
    state$.next(newValue);
  }, [state$]);
  
  return [state, updateState, state$.asObservable()];
}

// Usage: Complex state with derived values
function TodoList() {
  const [todos, setTodos, todos$] = useObservableState<Todo[]>([]);
  
  const stats = useObservable(
    useMemo(() => todos$.pipe(
      map(todos => ({
        total: todos.length,
        completed: todos.filter(t => t.done).length,
        pending: todos.filter(t => !t.done).length
      }))
    ), [todos$]),
    { total: 0, completed: 0, pending: 0 }
  );
  
  const addTodo = (text: string) => {
    setTodos(prev => [...prev, { id: Date.now(), text, done: false }]);
  };
  
  return (
    <div>
      <div>Total: {stats.total}, Completed: {stats.completed}</div>
      {todos.map(todo => <TodoItem key={todo.id} todo={todo} />)}
    </div>
  );
}

3. Vue.js Composition API and RxJS

Composable Purpose Implementation Use Case
useObservable Observable to ref ref + watchEffect with subscription Reactive observable values
useSubscription Manage subscription lifecycle onUnmounted cleanup Side effect subscriptions
fromRef Ref to observable Observable from watch Convert reactive refs
useRxState BehaviorSubject state Subject + ref sync Observable state management

Example: Vue composables for RxJS

// useObservable.ts
import { ref, onUnmounted, Ref } from 'vue';
import { Observable } from 'rxjs';

export function useObservable<T>(
  observable$: Observable<T>,
  initialValue?: T
): Ref<T | undefined> {
  const value = ref<T | undefined>(initialValue);
  
  const subscription = observable$.subscribe({
    next: (v) => { value.value = v; },
    error: (err) => console.error('Observable error:', err)
  });
  
  onUnmounted(() => {
    subscription.unsubscribe();
  });
  
  return value;
}

// useSubscription.ts
export function useSubscription() {
  const subscriptions: Subscription[] = [];
  
  const add = (subscription: Subscription) => {
    subscriptions.push(subscription);
  };
  
  onUnmounted(() => {
    subscriptions.forEach(sub => sub.unsubscribe());
  });
  
  return { add };
}

// Usage in Vue component
export default {
  setup() {
    const userService = inject('userService');
    
    // Convert observable to ref
    const users = useObservable(
      userService.getUsers(),
      []
    );
    
    const searchTerm = ref('');
    
    // Create observable from ref
    const searchResults = useObservable(
      computed(() => searchTerm.value).pipe(
        debounceTime(300),
        switchMap(term => searchAPI(term))
      ),
      []
    );
    
    return {
      users,
      searchTerm,
      searchResults
    };
  }
};

Example: Vue RxJS state management

// composables/useRxState.ts
export function useRxState<T>(initialValue: T) {
  const subject = new BehaviorSubject<T>(initialValue);
  const state = ref<T>(initialValue);
  
  const subscription = subject.subscribe(value => {
    state.value = value;
  });
  
  onUnmounted(() => {
    subscription.unsubscribe();
    subject.complete();
  });
  
  const setState = (value: T | ((prev: T) => T)) => {
    const newValue = typeof value === 'function'
      ? value(subject.value)
      : value;
    subject.next(newValue);
  };
  
  return {
    state: readonly(state),
    setState,
    state$: subject.asObservable()
  };
}

// Usage
export default {
  setup() {
    const { state: todos, setState: setTodos, state$ } = useRxState<Todo[]>([]);
    
    // Derived state
    const completedCount = useObservable(
      state$.pipe(
        map(todos => todos.filter(t => t.completed).length)
      ),
      0
    );
    
    const addTodo = (text: string) => {
      setTodos(prev => [...prev, { 
        id: Date.now(), 
        text, 
        completed: false 
      }]);
    };
    
    return {
      todos,
      completedCount,
      addTodo
    };
  }
};

4. Node.js Streams Integration

Integration Method Description Use Case
Readable to Observable fromEvent(stream, 'data') Convert Node.js readable stream File reading, HTTP response
Observable to Writable Subscribe and write to stream Pipe observable to writable stream File writing, HTTP request
Transform Stream Observable operators as Transform Use RxJS operators on streams Data processing pipelines
EventEmitter fromEvent(emitter, 'event') Convert EventEmitter to observable Event-driven architectures

Example: Node.js stream to observable

import { fromEvent, merge } from 'rxjs';
import { map, takeUntil } from 'rxjs/operators';
import * as fs from 'fs';
import * as readline from 'readline';

// Read file line by line with RxJS
function readFileLines(filePath: string): Observable<string> {
  return new Observable(subscriber => {
    const readStream = fs.createReadStream(filePath);
    const rl = readline.createInterface({
      input: readStream,
      crlfDelay: Infinity
    });
    
    rl.on('line', line => subscriber.next(line));
    rl.on('close', () => subscriber.complete());
    rl.on('error', err => subscriber.error(err));
    
    return () => {
      rl.close();
      readStream.destroy();
    };
  });
}

// Usage: Process large files
readFileLines('./large-file.txt').pipe(
  map(line => line.toUpperCase()),
  filter(line => line.includes('ERROR')),
  take(100)
).subscribe({
  next: line => console.log(line),
  complete: () => console.log('Done processing')
});

// Convert stream events to observable
const readStream = fs.createReadStream('./data.txt');

const data$ = fromEvent(readStream, 'data');
const end$ = fromEvent(readStream, 'end');
const error$ = fromEvent(readStream, 'error');

data$.pipe(
  map((chunk: Buffer) => chunk.toString()),
  takeUntil(merge(end$, error$))
).subscribe({
  next: chunk => processChunk(chunk),
  error: err => console.error('Stream error:', err),
  complete: () => console.log('Stream ended')
});

Example: Observable to Node.js stream

import { interval } from 'rxjs';
import { map } from 'rxjs/operators';
import * as fs from 'fs';

// Write observable data to file
function observableToStream<T>(
  observable$: Observable<T>,
  writeStream: fs.WriteStream
): Promise<void> {
  return new Promise((resolve, reject) => {
    const subscription = observable$.subscribe({
      next: value => {
        const canWrite = writeStream.write(JSON.stringify(value) + '\n');
        if (!canWrite) {
          // Backpressure handling
          subscription.unsubscribe();
          writeStream.once('drain', () => {
            // Resume when buffer drained
          });
        }
      },
      error: err => {
        writeStream.end();
        reject(err);
      },
      complete: () => {
        writeStream.end();
        resolve();
      }
    });
    
    writeStream.on('error', err => {
      subscription.unsubscribe();
      reject(err);
    });
  });
}

// Usage: Stream data to file
const writeStream = fs.createWriteStream('./output.jsonl');

const data$ = interval(1000).pipe(
  take(100),
  map(n => ({ id: n, timestamp: Date.now() }))
);

observableToStream(data$, writeStream)
  .then(() => console.log('Write complete'))
  .catch(err => console.error('Write error:', err));

5. WebSocket Integration Patterns

Pattern Implementation Description Feature
webSocket() Function webSocket(url) RxJS built-in WebSocket subject Bidirectional communication
Reconnection retryWhen() with delay Auto-reconnect on disconnect Connection resilience
Message Queue Buffer messages during disconnect Queue messages until reconnected Guaranteed delivery
Heartbeat Periodic ping/pong Keep connection alive Connection monitoring
multiplexing Share single WebSocket Multiple logical channels Resource efficiency

Example: Basic WebSocket observable

import { webSocket, WebSocketSubject } from 'rxjs/webSocket';

// Create WebSocket observable
const ws$: WebSocketSubject<any> = webSocket({
  url: 'ws://localhost:8080',
  openObserver: {
    next: () => console.log('WebSocket connected')
  },
  closeObserver: {
    next: () => console.log('WebSocket disconnected')
  }
});

// Subscribe to messages
ws$.subscribe({
  next: msg => console.log('Message:', msg),
  error: err => console.error('WebSocket error:', err),
  complete: () => console.log('WebSocket completed')
});

// Send messages
ws$.next({ type: 'subscribe', channel: 'updates' });
ws$.next({ type: 'message', data: 'Hello Server!' });

// Unsubscribe closes connection
// ws$.complete();

Example: WebSocket with auto-reconnect

import { webSocket } from 'rxjs/webSocket';
import { retryWhen, delay, tap } from 'rxjs/operators';

function createWebSocketWithReconnect(url: string) {
  return webSocket({
    url,
    openObserver: {
      next: () => console.log('Connected to WebSocket')
    },
    closeObserver: {
      next: () => console.log('Disconnected from WebSocket')
    }
  }).pipe(
    retryWhen(errors => errors.pipe(
      tap(err => console.log('Connection lost, reconnecting...')),
      delay(5000)  // Wait 5s before reconnect
    ))
  );
}

// Usage with automatic reconnection
const ws$ = createWebSocketWithReconnect('ws://localhost:8080');

ws$.subscribe({
  next: msg => handleMessage(msg),
  error: err => console.error('Fatal error:', err)
});

// Advanced: Exponential backoff
function createResilientWebSocket(url: string) {
  return webSocket(url).pipe(
    retryWhen(errors => errors.pipe(
      scan((retryCount, error) => {
        if (retryCount >= 10) throw error;
        console.log(`Retry ${retryCount + 1}/10`);
        return retryCount + 1;
      }, 0),
      delayWhen(retryCount => 
        timer(Math.min(1000 * Math.pow(2, retryCount), 30000))
      )
    ))
  );
}

Example: WebSocket with message queue and multiplexing

// WebSocket service with message queue
class WebSocketService {
  private ws$: WebSocketSubject<any>;
  private connectionState$ = new BehaviorSubject<'connected' | 'disconnected'>('disconnected');
  private messageQueue: any[] = [];
  
  constructor(url: string) {
    this.ws$ = webSocket({
      url,
      openObserver: {
        next: () => {
          this.connectionState$.next('connected');
          this.flushQueue();
        }
      },
      closeObserver: {
        next: () => this.connectionState$.next('disconnected')
      }
    });
    
    // Auto-reconnect
    this.ws$.pipe(
      retryWhen(errors => errors.pipe(delay(5000)))
    ).subscribe();
  }
  
  // Send message (queue if disconnected)
  send(message: any) {
    if (this.connectionState$.value === 'connected') {
      this.ws$.next(message);
    } else {
      this.messageQueue.push(message);
    }
  }
  
  // Flush queued messages
  private flushQueue() {
    while (this.messageQueue.length > 0) {
      const msg = this.messageQueue.shift();
      this.ws$.next(msg);
    }
  }
  
  // Subscribe to specific message type
  on<T>(messageType: string): Observable<T> {
    return this.ws$.pipe(
      filter((msg: any) => msg.type === messageType),
      map((msg: any) => msg.data as T)
    );
  }
  
  // Multiplexing - multiple logical channels
  channel<T>(channelName: string): Observable<T> {
    return this.ws$.pipe(
      filter((msg: any) => msg.channel === channelName),
      map((msg: any) => msg.data as T)
    );
  }
}

// Usage
const wsService = new WebSocketService('ws://localhost:8080');

// Subscribe to user updates
wsService.on<User>('user-update').subscribe(
  user => console.log('User updated:', user)
);

// Subscribe to notifications
wsService.on<Notification>('notification').subscribe(
  notification => showNotification(notification)
);

// Send messages
wsService.send({ type: 'subscribe', channel: 'users' });
wsService.send({ type: 'message', data: 'Hello' });

6. Service Worker and RxJS Integration

Pattern Implementation Description Use Case
Message Channel fromEvent on message port Observable messaging with SW Background sync, push notifications
Cache Strategy Observable cache operators Implement cache-first, network-first Offline-first apps
Background Sync Queue with retry logic Sync data when online Offline data persistence
Push Notifications fromEvent on push events Handle push notifications Real-time updates

Example: Service Worker messaging with RxJS

// In main app
class ServiceWorkerService {
  private messageSubject = new Subject<any>();
  public messages$ = this.messageSubject.asObservable();
  
  constructor() {
    if ('serviceWorker' in navigator) {
      navigator.serviceWorker.ready.then(registration => {
        // Listen for messages from SW
        fromEvent<MessageEvent>(navigator.serviceWorker, 'message')
          .pipe(
            map(event => event.data)
          )
          .subscribe(message => this.messageSubject.next(message));
      });
    }
  }
  
  // Send message to SW
  async postMessage(message: any) {
    const registration = await navigator.serviceWorker.ready;
    registration.active?.postMessage(message);
  }
  
  // Observable for specific message types
  on<T>(type: string): Observable<T> {
    return this.messages$.pipe(
      filter(msg => msg.type === type),
      map(msg => msg.data as T)
    );
  }
}

// Usage
const swService = new ServiceWorkerService();

// Listen for sync complete
swService.on<{ count: number }>('sync-complete').subscribe(
  data => console.log(`Synced ${data.count} items`)
);

// Request background sync
swService.postMessage({ 
  type: 'background-sync', 
  data: { items: [...] } 
});

Example: Cache strategies with RxJS

// Offline-first HTTP service
class OfflineHttpService {
  constructor(private http: HttpClient) {}
  
  // Cache-first strategy
  cacheFirst<T>(url: string): Observable<T> {
    return from(caches.match(url)).pipe(
      mergeMap(cached => {
        if (cached) {
          return from(cached.json() as Promise<T>);
        }
        // Not in cache, fetch from network
        return this.http.get<T>(url).pipe(
          tap(data => {
            // Store in cache
            caches.open('api-cache').then(cache => {
              cache.put(url, new Response(JSON.stringify(data)));
            });
          })
        );
      })
    );
  }
  
  // Network-first with cache fallback
  networkFirst<T>(url: string): Observable<T> {
    return this.http.get<T>(url).pipe(
      timeout(5000),
      tap(data => {
        // Update cache
        caches.open('api-cache').then(cache => {
          cache.put(url, new Response(JSON.stringify(data)));
        });
      }),
      catchError(() => 
        // Network failed, try cache
        from(caches.match(url)).pipe(
          mergeMap(cached => {
            if (cached) {
              return from(cached.json() as Promise<T>);
            }
            return throwError(() => new Error('No cached data'));
          })
        )
      )
    );
  }
  
  // Stale-while-revalidate
  staleWhileRevalidate<T>(url: string): Observable<T> {
    const cache$ = from(caches.match(url)).pipe(
      filter(response => !!response),
      mergeMap(response => from(response!.json() as Promise<T>))
    );
    
    const network$ = this.http.get<T>(url).pipe(
      tap(data => {
        caches.open('api-cache').then(cache => {
          cache.put(url, new Response(JSON.stringify(data)));
        });
      })
    );
    
    // Return cache immediately, then network
    return concat(cache$, network$).pipe(
      distinctUntilChanged()
    );
  }
}

// Usage
const offlineHttp = new OfflineHttpService(http);

// Get data with cache-first
offlineHttp.cacheFirst<User[]>('/api/users')
  .subscribe(users => displayUsers(users));
Framework Integration Best Practices:
  • Angular: Use async pipe for automatic subscription management
  • React: Create custom hooks for observable integration and cleanup
  • Vue: Use composables with onUnmounted for proper cleanup
  • Node.js: Handle backpressure when converting streams
  • WebSocket: Implement reconnection logic and message queuing
  • Service Workers: Use RxJS for offline-first patterns

Section 16 Summary

  • Angular async pipe provides automatic subscription lifecycle management
  • React hooks require custom implementation for observable integration and cleanup
  • Vue composables with onUnmounted enable reactive observable patterns
  • Node.js streams convert to/from observables for unified data processing
  • WebSocket subjects enable bidirectional communication with retry logic
  • Service Workers with RxJS implement sophisticated offline-first strategies