Mastering RxJS Operators in Angular

Unlock the power of Reactive Extensions for JavaScript (RxJS) operators. Learn how to transform, filter, and manipulate data streams using map, filter, tap, switchMap, and more in Angular applications.

🌊12345

Welcome to the RxJS Pipeline. Think of an Observable as a stream of water (data) flowing through a pipe.

source$ = of(1, 2, 3, 4, 5);

Transformation Operators

Transformation operators change the data flowing through the stream. map transforms each value (like array map), while pluck (deprecated) or map(x => x.prop) extracts properties. scan acts like a reducer, accumulating values over time.

Filtering Operators

Filtering operators determine if a value should pass through to the subscription. filter uses a condition, take(n) completes after n emissions, first() takes only the first, and debounceTime waits for silence before emitting (great for search).

Higher-Order Mapping

Higher-order mapping operators are crucial in Angular. switchMap cancels previous inner observables (good for search), mergeMap runs them concurrently (good for writes), and concatMap queues them (good for order-sensitive tasks).

Utility & Error Handling

Utility operators handle side effects and error management. tap allows you to "spy" on the stream or perform actions (logging) without affecting the data. catchError intercepts errors to prevent the stream from crashing.

Practice Zone


Interactive Test 1: Operator Roles

Match the Operator to its Behavior.

Drag in the corresponding order.


Drag the options:

map()
filter()
switchMap()

Completa el código:

Transforms Value______
Blocks/Allows Data______
Switches & Cancels______

Interactive Test 2: Typeahead Logic

Complete this Typeahead Search implementation.

Rellena los huecos en cada casilla.

this.searchControl.valueChanges.pipe(
  (300), // Wait 300ms
  (), // Ignore if same as previous
  (term => this.api.search(term))
).subscribe();

Practice Example: Build a Pipe

Create a stream that takes numbers 1-5, filters for even numbers only, and then multiplies them by 10.

* Write the code below. Correct characters will be shown in green and incorrect ones in red.

import { from } from 'rxjs'; import { filter, map } from 'rxjs/operators'; const source$ = from([1, 2, 3, 4, 5]); source$.pipe( filter(num => num % 2 === 0), map(num => num * 10) ).subscribe(console.log); // Output: 20, 40

Knowledge Check

Which operator is best for an HTTP search input where you want to cancel previous pending requests?


The Art of RxJS Operators

Operators are pure functions that enable a functional programming style for dealing with collections with operations like map, filter, concat, reduce, etc.


1. Flattening Maps: The "Switch", "Merge", and "Concat"

In Angular, you often map a value to another Observable (like an HTTP request). You get an "Observable of an Observable." You must flatten this. Choosing the right operator is critical:

  • switchMap: The "Latest and Greatest". If a new value comes in, it unsubscribes from the previous inner Observable. Perfect for Search.
  • mergeMap: The "Parallel Processor". It subscribes to every new inner Observable immediately and handles them all at once. Good for deleting multiple items.
  • concatMap: The "Orderly Queue". It waits for the previous inner Observable to complete before subscribing to the next. Good for sequential updates.
  • exhaustMap: The "Do Not Disturb". It ignores new values while an inner Observable is active. Good for login buttons (prevents double clicking).
// Search Example with switchMap
term$.pipe( debounceTime(400), distinctUntilChanged(), switchMap(term => this.http.get('/search?q=' + term)) ).subscribe(results => ...);}</code></pre></div></div> </div> <div> <h3 className="text-2xl font-semibold text-blue-700 mb-4">2. Combining Streams</h3> <p className="mb-4">Sometimes you need data from multiple sources.</p> <ul className="list-disc pl-6 space-y-2 mb-4"> <li><strong>combineLatest:</strong> Emits an array of the latest values from all source Observables whenever <em>any</em> of them emit. (Requires all to emit at least once).</li> <li><strong>forkJoin:</strong> waits for all Observables to complete, then emits the last value from each. Think <code>Promise.all</code>.</li> <li><strong>withLatestFrom:</strong> Used inside a pipe. It grabs the latest value from another stream only when the <em>source</em> stream emits.</li> </ul> </div> <div> <h3 className="text-2xl font-semibold text-blue-700 mb-4">3. Error Handling</h3> <p className="mb-4">If an error occurs in an Observable, the stream dies. To keep the app alive, use <code>catchError</code>.</p> <div className="flex flex-col md:flex-row gap-4 items-center bg-gray-50 p-4 rounded-lg"><div className="w-full"><pre className="bg-gray-800 text-white text-base p-4 rounded-lg overflow-x-auto"><code>{this.http.get('/api/data').pipe( catchError(error => { console.error('Error occurred:', error); return of([]); // Return fallback value so stream completes successfully }) );

Pro Tip: Always put catchError inside the inner pipe of a switchMap if you want the main stream to stay alive after a failed HTTP request.

RxJS Glossary

Operator
A function that creates a new Observable based on the current Observable. It is a pure operation: the previous Observable stays unmodified.
Pipe
A method on Observables used to compose operators. obs$.pipe(op1(), op2()).
Stream
A sequence of data elements made available over time. Can be anything: variables, user inputs, properties, caches, data structures, etc.
Higher-Order Observable
An Observable that emits other Observables. Operators like switchMap and mergeMap are used to flatten these.
Subscription
Represents the execution of an Observable. It is primarily used to cancel the execution (unsubscribe).