RxJS Operators

Back

Loading concept...

๐ŸŽญ RxJS Operators: The Magic Factory Workers

Imagine a toy factory where raw materials flow through different machines. Each machine does ONE special job โ€” painting, shaping, sorting, or packaging. RxJS operators are exactly like these machines! Data flows in, gets transformed, and flows out ready for use.


๐ŸŒŠ The River of Data

Think of your app as a river. Data flows through it like water. Observables are the river itself. Operators are the special machines along the riverbank that:

  • Create new streams of water
  • Transform the water (maybe add color!)
  • Filter out the dirty bits
  • Combine multiple streams into one
  • Handle problems when the flow breaks

Letโ€™s meet each type of factory worker!


๐Ÿญ 1. Creation Operators โ€” The Spring Makers

Creation operators start a new stream of data from scratch. Theyโ€™re like opening a tap!

of() โ€” The Instant Deliverer

Wraps values and sends them one by one, then says โ€œdone!โ€

import { of } from 'rxjs';

of('๐ŸŽ', '๐ŸŠ', '๐Ÿ‹').subscribe(
  fruit => console.log(fruit)
);
// Output: ๐ŸŽ  ๐ŸŠ  ๐Ÿ‹

from() โ€” The Array Unpacker

Takes a collection and sends each item separately.

import { from } from 'rxjs';

from([1, 2, 3]).subscribe(
  num => console.log(num)
);
// Output: 1  2  3

interval() โ€” The Ticking Clock

Sends a number every X milliseconds, forever.

import { interval } from 'rxjs';

interval(1000).subscribe(
  count => console.log(count)
);
// Output: 0  1  2  3 ... (every second)

timer() โ€” The Delayed Start

Waits, then ticks. Like setting an alarm.

import { timer } from 'rxjs';

timer(2000, 1000).subscribe(
  count => console.log(count)
);
// Waits 2s, then: 0  1  2 ... (every 1s)

fromEvent() โ€” The Event Listener

Turns DOM events into a stream.

import { fromEvent } from 'rxjs';

fromEvent(button, 'click').subscribe(
  () => console.log('Clicked!')
);

๐Ÿ”„ 2. Transformation Operators โ€” The Shape Shifters

These operators change whatโ€™s flowing through the stream.

map() โ€” The Translator

Changes each value into something new.

import { of } from 'rxjs';
import { map } from 'rxjs/operators';

of(1, 2, 3).pipe(
  map(n => n * 10)
).subscribe(console.log);
// Output: 10  20  30

pluck() โ€” The Property Picker

Grabs one property from objects.

import { of } from 'rxjs';
import { pluck } from 'rxjs/operators';

of(
  { name: 'Ana', age: 25 },
  { name: 'Bob', age: 30 }
).pipe(
  pluck('name')
).subscribe(console.log);
// Output: Ana  Bob

scan() โ€” The Running Total Keeper

Like reduce(), but shows every step.

import { of } from 'rxjs';
import { scan } from 'rxjs/operators';

of(1, 2, 3, 4).pipe(
  scan((total, n) => total + n, 0)
).subscribe(console.log);
// Output: 1  3  6  10

reduce() โ€” The Final Answer

Collects everything, emits only the final result.

import { of } from 'rxjs';
import { reduce } from 'rxjs/operators';

of(1, 2, 3, 4).pipe(
  reduce((sum, n) => sum + n, 0)
).subscribe(console.log);
// Output: 10

๐Ÿšฆ 3. Filtering Operators โ€” The Bouncers

These let only certain values through.

filter() โ€” The Gatekeeper

Only passes values that meet your rule.

import { of } from 'rxjs';
import { filter } from 'rxjs/operators';

of(1, 2, 3, 4, 5).pipe(
  filter(n => n % 2 === 0)
).subscribe(console.log);
// Output: 2  4

take() โ€” The Counter

Takes only the first N values, then stops.

import { interval } from 'rxjs';
import { take } from 'rxjs/operators';

interval(500).pipe(
  take(3)
).subscribe(console.log);
// Output: 0  1  2 (then completes)

takeUntil() โ€” The Until-Stopper

Keeps taking until another observable fires.

import { interval, timer } from 'rxjs';
import { takeUntil } from 'rxjs/operators';

interval(500).pipe(
  takeUntil(timer(2000))
).subscribe(console.log);
// Output: 0  1  2 (stops at 2 seconds)

first() & last() โ€” The Edge Catchers

Grab just the first or last value.

of('a', 'b', 'c').pipe(first())
  .subscribe(console.log); // a

of('a', 'b', 'c').pipe(last())
  .subscribe(console.log); // c

distinctUntilChanged() โ€” The Duplicate Detector

Ignores value if itโ€™s the same as previous.

of(1, 1, 2, 2, 3, 1).pipe(
  distinctUntilChanged()
).subscribe(console.log);
// Output: 1  2  3  1

debounceTime() โ€” The Wait-and-See

Waits for silence before passing the last value.

fromEvent(input, 'keyup').pipe(
  debounceTime(300)
).subscribe(/* fires 300ms after typing stops */);

๐ŸŽข 4. Higher-Order Mapping โ€” The Stream Managers

Sometimes one value creates a whole new stream. These operators handle streams-within-streams!

The Big Picture

Outer Stream: --A------B------C-->
                |      |      |
Inner Streams: โ–ผa1-a2  โ–ผb1-b2  โ–ผc1-c2

Higher-order operators decide how to handle overlapping inner streams.


๐Ÿ—บ๏ธ 5. Mapping Operators Compared

Hereโ€™s the secret sauce โ€” choosing the RIGHT one!

mergeMap โ€” The Juggler ๐Ÿคน

Runs all inner streams at the same time.

clicks.pipe(
  mergeMap(() => http.get('/data'))
).subscribe(console.log);
// 3 clicks = 3 parallel requests

Use when: Order doesnโ€™t matter, run everything!

switchMap โ€” The Channel Changer ๐Ÿ“บ

Cancels the previous stream when a new one starts.

searchInput.pipe(
  debounceTime(300),
  switchMap(term => http.get(`/search?q=${term}`))
).subscribe(console.log);
// Only latest search matters!

Use when: Only the latest matters (search, autocomplete).

concatMap โ€” The Polite Queue ๐Ÿšถโ€โ™‚๏ธ

Waits for each stream to finish before starting next.

clicks.pipe(
  concatMap(() => timer(1000))
).subscribe(console.log);
// Each click waits its turn

Use when: Order matters, one at a time.

exhaustMap โ€” The Busy Bouncer ๐Ÿ™…

Ignores new requests while one is in progress.

submitBtn.pipe(
  exhaustMap(() => http.post('/save'))
).subscribe(console.log);
// No double-submits!

Use when: Prevent duplicate actions.


๐Ÿ“Š Quick Comparison Table

Operator Parallel? Cancels? Queues? Best For
mergeMap โœ… โŒ โŒ Parallel work
switchMap โŒ โœ… โŒ Latest only
concatMap โŒ โŒ โœ… Sequential
exhaustMap โŒ โŒ โŒ No overlaps

๐Ÿค 6. Combination Operators โ€” The Team Builders

Merge multiple streams into one!

combineLatest() โ€” The Latest Combo

Emits when ANY source emits, combining latest from each.

import { combineLatest, of } from 'rxjs';

combineLatest([
  of('Hello'),
  of('World')
]).subscribe(console.log);
// Output: ['Hello', 'World']

merge() โ€” The Race Track

All streams race together, first come first served.

import { merge, interval } from 'rxjs';
import { map, take } from 'rxjs/operators';

merge(
  interval(500).pipe(map(() => 'A'), take(2)),
  interval(300).pipe(map(() => 'B'), take(2))
).subscribe(console.log);
// Output: B  A  B  A

forkJoin() โ€” The Group Photo

Waits for ALL streams to complete, then emits last values.

import { forkJoin } from 'rxjs';

forkJoin([
  http.get('/user'),
  http.get('/posts')
]).subscribe(([user, posts]) => {
  console.log('All loaded!', user, posts);
});

zip() โ€” The Pair Matcher

Pairs values by their index, waits for a match.

import { zip, of } from 'rxjs';

zip(
  of('๐Ÿ•', '๐Ÿ”', '๐ŸŒฎ'),
  of('Cola', 'Fanta', 'Sprite')
).subscribe(console.log);
// ['๐Ÿ•', 'Cola'] ['๐Ÿ”', 'Fanta'] ...

withLatestFrom() โ€” The Snapshot

Main stream grabs the latest from another.

clicks.pipe(
  withLatestFrom(currentUser$)
).subscribe(([click, user]) => {
  console.log('Clicked by', user);
});

๐Ÿšจ 7. Error Handling Operators โ€” The Safety Net

Errors happen. These operators keep your stream alive!

catchError() โ€” The Fixer

Catches errors and returns a backup value or stream.

http.get('/data').pipe(
  catchError(err => {
    console.log('Oops!', err);
    return of({ fallback: true });
  })
).subscribe(console.log);

retry() โ€” The Try-Again

Retries the source N times before giving up.

http.get('/flaky-api').pipe(
  retry(3)
).subscribe(console.log);
// Tries up to 4 times total

retryWhen() โ€” The Smart Retry

Custom retry logic with delays.

http.get('/api').pipe(
  retryWhen(errors =>
    errors.pipe(delay(1000), take(3))
  )
).subscribe(console.log);
// Retries 3 times, 1 second apart

finalize() โ€” The Cleanup Crew

Always runs when stream ends (complete OR error).

http.get('/data').pipe(
  finalize(() => hideSpinner())
).subscribe({
  next: data => showData(data),
  error: err => showError(err)
});

๐ŸŽฏ 8. takeUntilDestroyed โ€” The Angular Guardian

Angular 16+ brought a superhero for cleanup!

The Problem

Components subscribe to observables. When the component dies, subscriptions must die too โ€” or you get memory leaks!

Old Way (Manual Cleanup)

export class OldComponent implements OnDestroy {
  private destroy$ = new Subject<void>();

  ngOnInit() {
    interval(1000).pipe(
      takeUntil(this.destroy$)
    ).subscribe(console.log);
  }

  ngOnDestroy() {
    this.destroy$.next();
    this.destroy$.complete();
  }
}

New Way (takeUntilDestroyed) โœจ

import { takeUntilDestroyed } from '@angular/core/rxjs-interop';

@Component({...})
export class NewComponent {
  constructor() {
    interval(1000).pipe(
      takeUntilDestroyed()
    ).subscribe(console.log);
    // Auto-cleanup when component dies!
  }
}

Rules for takeUntilDestroyed()

  1. Call it in constructor or field initializer (not ngOnInit!)
  2. Or pass DestroyRef explicitly:
export class MyComponent {
  private destroyRef = inject(DestroyRef);

  ngOnInit() {
    interval(1000).pipe(
      takeUntilDestroyed(this.destroyRef)
    ).subscribe(console.log);
  }
}

๐Ÿงฉ Putting It All Together

graph TD A["Data Source"] --> B["Creation Operator"] B --> C["Transform with map/scan"] C --> D["Filter unwanted values"] D --> E{Higher-Order?} E -->|Yes| F["switchMap/mergeMap/etc"] E -->|No| G["Combine with others"] F --> G G --> H["Handle Errors"] H --> I["takeUntilDestroyed"] I --> J["Subscribe!"]

๐ŸŽ‰ You Made It!

You now understand the factory workers of RxJS:

Worker Type Job Example Operators
๐Ÿญ Creation Start streams of, from, interval
๐Ÿ”„ Transform Change values map, scan, pluck
๐Ÿšฆ Filter Block values filter, take, debounceTime
๐ŸŽข Higher-Order Handle streams switchMap, mergeMap
๐Ÿค Combine Merge streams combineLatest, forkJoin
๐Ÿšจ Errors Stay safe catchError, retry
๐ŸŽฏ Cleanup No leaks takeUntilDestroyed

Remember: Operators are just functions that take a stream in and return a new stream out. Chain them like LEGO blocks to build exactly what you need!

โ€œRxJS is like having a superpower for managing async data. Once you understand operators, you can handle anything!โ€

Loading story...

Story - Premium Content

Please sign in to view this story and start learning.

Upgrade to Premium to unlock full access to all stories.

Stay Tuned!

Story is coming soon.

Story Preview

Story - Premium Content

Please sign in to view this concept and start learning.

Upgrade to Premium to unlock full access to all content.