Observable Creation Functions and Syntax

1. of, from, and fromEvent Creation Operators

Operator Syntax Description Use Case
of of(...values) Creates Observable emitting specified values synchronously, then completes Static value sequences, testing, immediate emission
from from(array | promise | iterable) Converts arrays, promises, iterables, or observables into Observable stream Array iteration, Promise conversion, async/await integration
fromEvent fromEvent(target, eventName) Creates Observable from DOM events or Node.js EventEmitter Click handlers, keyboard input, mouse events, custom events

Example: Basic creation operators

import { of, from, fromEvent } from 'rxjs';

// of - emit static values
of(1, 2, 3).subscribe(val => console.log(val)); // 1, 2, 3

// from - convert array to observable
from([10, 20, 30]).subscribe(val => console.log(val)); // 10, 20, 30

// from - convert promise to observable
from(fetch('/api/data')).subscribe(response => console.log(response));

// fromEvent - DOM event to observable
const clicks$ = fromEvent(document, 'click');
clicks$.subscribe(event => console.log('Clicked at:', event.clientX, event.clientY));

2. interval, timer, and range Observable Creators

Operator Syntax Description Emission Pattern
interval interval(period) Emits sequential numbers at specified interval (ms), starting from 0 0, 1, 2, 3... (every period ms)
timer timer(delay, period?) Emits after delay, then optionally repeats at interval. Single emission if no period Initial delay, then interval emissions
range range(start, count?) Emits sequence of numbers synchronously from start to start+count-1 Synchronous number sequence

Example: Time-based and sequence creators

import { interval, timer, range } from 'rxjs';
import { take } from 'rxjs/operators';

// interval - emit every 1000ms
interval(1000).pipe(take(3))
  .subscribe(n => console.log('Interval:', n)); // 0, 1, 2

// timer - emit after 2s, then every 1s
timer(2000, 1000).pipe(take(3))
  .subscribe(n => console.log('Timer:', n)); // 0 (after 2s), 1, 2

// timer - single emission after delay
timer(3000).subscribe(() => console.log('Delayed action'));

// range - synchronous sequence
range(1, 5).subscribe(n => console.log('Range:', n)); // 1, 2, 3, 4, 5

3. fromPromise and async Observable Integration

Method Syntax Description Behavior
from(promise) from(promiseObj) Converts Promise to Observable - emits resolved value or error Single emission on resolve, error on reject
defer defer(() => promise) Creates Observable factory - Promise created per subscription (lazy) Fresh Promise instance for each subscriber
async/await await firstValueFrom(obs$) Convert Observable to Promise for async/await syntax Waits for first emission, then resolves

Example: Promise and async integration

import { from, defer } from 'rxjs';
import { firstValueFrom } from 'rxjs';

// Convert Promise to Observable
const promise = fetch('/api/user');
from(promise).subscribe(
  response => console.log('Success:', response),
  error => console.error('Error:', error)
);

// Lazy Promise creation with defer
const lazyPromise$ = defer(() => fetch('/api/data'));
lazyPromise$.subscribe(); // Fetch happens only on subscribe

// Convert Observable to Promise for async/await
async function fetchData() {
  const obs$ = from(fetch('/api/items'));
  const response = await firstValueFrom(obs$);
  return response.json();
}
Note: Use defer when you need fresh Promise instances for each subscription. Direct from(promise) shares the same Promise result across all subscribers.

4. ajax and fetch Observable Wrappers

Function Syntax Description Features
ajax ajax(urlOrRequest) RxJS XMLHttpRequest wrapper with built-in operators support Progress events, timeout, retry, cancellation
ajax.get ajax.get(url, headers?) Convenience method for GET requests Automatic JSON parsing, CORS support
ajax.post ajax.post(url, body, headers?) Convenience method for POST requests JSON body serialization, custom headers
from(fetch) from(fetch(url, options)) Modern Fetch API wrapped in Observable Promise-based, streaming, simpler API

Example: HTTP requests with ajax and fetch

import { ajax } from 'rxjs/ajax';
import { from } from 'rxjs';
import { map, catchError } from 'rxjs/operators';

// ajax GET request
ajax.get('https://api.example.com/users')
  .pipe(
    map(response => response.response),
    catchError(error => of({ error: error.message }))
  )
  .subscribe(data => console.log(data));

// ajax POST request
ajax.post('https://api.example.com/users', 
  { name: 'John', email: 'john@example.com' },
  { 'Content-Type': 'application/json' }
).subscribe(response => console.log(response));

// Custom ajax configuration
ajax({
  url: 'https://api.example.com/data',
  method: 'PUT',
  headers: { 'Authorization': 'Bearer token123' },
  body: { value: 42 },
  timeout: 5000
}).subscribe(response => console.log(response));

// Fetch API as Observable
from(fetch('https://api.example.com/items'))
  .pipe(
    switchMap(response => from(response.json()))
  )
  .subscribe(data => console.log(data));

5. Custom Observable Creation with new Observable()

Component Syntax Description Purpose
Observable constructor new Observable(subscriber => {...}) Creates custom Observable with full control over emissions Custom data sources, complex logic
subscriber.next() subscriber.next(value) Emit value to observers Push data to subscribers
subscriber.error() subscriber.error(err) Signal error and terminate stream Error handling and propagation
subscriber.complete() subscriber.complete() Signal successful completion Terminate stream normally
Teardown function return () => cleanup() Cleanup logic executed on unsubscribe Resource cleanup, event removal

Example: Custom Observable implementation

import { Observable } from 'rxjs';

// Custom Observable with manual emissions
const custom$ = new Observable(subscriber => {
  subscriber.next(1);
  subscriber.next(2);
  subscriber.next(3);
  subscriber.complete();
  
  // Teardown function for cleanup
  return () => console.log('Unsubscribed');
});

custom$.subscribe({
  next: val => console.log('Value:', val),
  complete: () => console.log('Complete')
});

// Custom event-based Observable
const createEventObservable = (element, eventName) => {
  return new Observable(subscriber => {
    const handler = (event) => subscriber.next(event);
    element.addEventListener(eventName, handler);
    
    // Cleanup on unsubscribe
    return () => element.removeEventListener(eventName, handler);
  });
};

// WebSocket Observable
const websocket$ = new Observable(subscriber => {
  const ws = new WebSocket('wss://example.com/socket');
  
  ws.onmessage = (event) => subscriber.next(event.data);
  ws.onerror = (error) => subscriber.error(error);
  ws.onclose = () => subscriber.complete();
  
  return () => ws.close();
});
Warning: Always provide teardown function to prevent memory leaks. Clean up event listeners, timers, connections in the return statement.

6. EMPTY, NEVER, and throwError Constants

Constant Syntax Behavior Use Case
EMPTY EMPTY Completes immediately without emitting any values Placeholder, conditional logic, cancel operations
NEVER NEVER Never emits values and never completes (infinite stream) Testing, keep-alive streams, blocking scenarios
throwError throwError(() => error) Immediately emits error notification, no values Error injection, testing error handling, fallback errors

Example: Special Observable constants

import { EMPTY, NEVER, throwError, of } from 'rxjs';
import { catchError, timeout } from 'rxjs/operators';

// EMPTY - completes immediately
EMPTY.subscribe({
  next: val => console.log('Value:', val),      // Never called
  complete: () => console.log('Complete')        // Immediately called
});

// EMPTY in conditional logic
const shouldProcess = false;
(shouldProcess ? of(1, 2, 3) : EMPTY)
  .subscribe(val => console.log(val)); // No output

// NEVER - infinite stream
NEVER.subscribe({
  next: val => console.log(val),        // Never called
  complete: () => console.log('Done')   // Never called
});

// throwError - immediate error
throwError(() => new Error('Custom error'))
  .subscribe({
    next: val => console.log(val),
    error: err => console.error('Error:', err.message)
  });

// Practical use in error handling
of(1, 2, 3).pipe(
  map(n => {
    if (n === 2) throw new Error('Invalid value');
    return n * 10;
  }),
  catchError(err => throwError(() => new Error('Transformed: ' + err.message)))
).subscribe({
  next: val => console.log(val),
  error: err => console.error(err)
});

Section 1 Summary

  • of() and from() convert static values/arrays/promises to Observables
  • fromEvent() bridges DOM/Node events to reactive streams
  • interval(), timer(), range() create time-based and sequential emissions
  • ajax provides robust HTTP capabilities with cancellation and retry support
  • new Observable() enables full custom control for complex data sources
  • EMPTY, NEVER, throwError handle special control flow scenarios