Skip to content

Observable stream

ObservableStream ⇐ Observable

Kind: global class
Extends: Observable

new ObservableStream()

ObservableStream class that extends Observable and provides additional methods for data transformation

observableStream.map(transformFn) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream instance with transformed data

Param Type Description
transformFn function The function to transform the data

Example

// Example 1: Transforming an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const transformedStream = observableStream.map(data => data.map(item => item * 2));

// Example 2: Transforming a user event stream
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const transformedStream = observableStream.map(event => ({ x: event.clientX, y: event.clientY }));

observableStream.filter(predicateFn) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream instance with filtered data

Param Type Description
predicateFn function The function to filter the data

Example

// Example 1: Filtering an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const filteredStream = observableStream.filter(data => data.someProperty === 'someValue');

// Example 2: Filtering a user event stream
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const filteredStream = observableStream.filter(event => event.target.id === 'someId');

observableStream.reduce(reducerFn, initialValue) ⇒ Promise

Kind: instance method of ObservableStream
Returns: Promise - A promise that resolves with the reduced value

Param Type Description
reducerFn function The function to reduce the data
initialValue any The initial value for the reducer

Example

// Example 1: Reducing an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const reducedValuePromise = observableStream.reduce((acc, data) => acc + data.someProperty, 0);

// Example 2: Reducing a user event stream
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const reducedValuePromise = observableStream.reduce((acc, event) => acc + 1, 0);

observableStream.takeUntil(notifier) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that completes when the notifier emits

Param Type Description
notifier Observable The Observable that will complete this Observable

Example

// Example 1: Completing an API data stream when another stream emits
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const notifierStream = new ObservableStream(subscriber => {
  setTimeout(() => subscriber.next(), 5000);
});
const completedStream = observableStream.takeUntil(notifierStream);

// Example 2: Completing a user event stream when another stream emits
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const notifierStream = new ObservableStream(subscriber => {
  setTimeout(() => subscriber.next(), 5000);
});
const completedStream = observableStream.takeUntil(notifierStream);

observableStream.take(n) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that completes after emitting n values

Param Type Description
n number The number of values to take

Example

// Example 1: Taking a certain number of values from an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const takenStream = observableStream.take(5);

// Example 2: Taking a certain number of values from a user event stream
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const takenStream = observableStream.take(5);

observableStream.drop(n) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that starts emitting after n values have been emitted

Param Type Description
n number The number of values to drop

Example

// Example 1: Dropping a certain number of values from an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const droppedStream = observableStream.drop(5);

// Example 2: Dropping a certain number of values from a user event stream
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const droppedStream = observableStream.drop(5);

observableStream.flatMap(transformFn) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits the values from the inner Observables

Param Type Description
transformFn function The function to transform the data into Observables

Example

// Example 1: Transforming an API data stream into inner Observables
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const flatMappedStream = observableStream.flatMap(data => ObservableStream.from(fetch(`https://api.example.com/data/${data.id}`).then(response => response.json())));

// Example 2: Transforming a user event stream into inner Observables
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const positionStream = clickStream.flatMap(event => ObservableStream.from({ x: event.clientX, y: event.clientY }));

// Example 3: Transforming a stream of search terms into a stream of search results
const searchTerms = new ObservableStream(subscriber => {
  const input = document.querySelector('#search-input');
  input.addEventListener('input', event => subscriber.next(event.target.value));
});
const searchResults = searchTerms.debounce(300).flatMap(term => ObservableStream.from(fetch(`https://api.example.com/search?q=${term}`).then(response => response.json())));

observableStream.switchMap(transformFn) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits the values from the inner Observables

Param Type Description
transformFn function The function to transform the data into Observables

Example

// Example 1: Transforming click events into Observables
const clickStream = new ObservableStream();
document.addEventListener('click', (event) => clickStream.push(event));
const positionStream = clickStream.switchMap((event) => {
  return new ObservableStream((subscriber) => {
    subscriber.push({ x: event.clientX, y: event.clientY });
    subscriber.complete();
  });
});
positionStream.subscribe({
  next: (position) => console.log(`Clicked at position: ${position.x}, ${position.y}`),
  error: (err) => console.error(err),
});

// Example 2: Transforming API responses into Observables
const apiStream = new ObservableStream();
fetch('https://api.example.com/data')
  .then((response) => response.json())
  .then((data) => apiStream.push(data))
  .catch((error) => apiStream.error(error));
const transformedStream = apiStream.switchMap((data) => {
  return new ObservableStream((subscriber) => {
    subscriber.push(transformData(data));
    subscriber.complete();
  });
});
transformedStream.subscribe({
  next: (transformedData) => console.log(transformedData),
  error: (err) => console.error(err),
});

observableStream.toArray() ⇒ Promise

Kind: instance method of ObservableStream
Returns: Promise - A promise that resolves with an array of all values emitted by the Observable
Example

// Example: Collecting all emitted values from an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
numberStream.toArray().then((values) => console.log(values)); // Logs: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]

observableStream.forEach(callback) ⇒ Promise

Kind: instance method of ObservableStream
Returns: Promise - A promise that resolves when the Observable completes

Param Type Description
callback function The function to call for each value emitted by the Observable

Example

// Example: Logging each value emitted by an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
numberStream.forEach((value) => console.log(value)); // Logs each number from 0 to 9

observableStream.every(predicate) ⇒ Promise

Kind: instance method of ObservableStream
Returns: Promise - A promise that resolves with a boolean indicating whether every value satisfies the predicate

Param Type Description
predicate function The function to test each value

Example

// Example: Checking if all emitted values are even
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
numberStream.every((value) => value % 2 === 0).then((allEven) => console.log(allEven)); // Logs: false

observableStream.find(predicate) ⇒ Promise

Kind: instance method of ObservableStream
Returns: Promise - A promise that resolves with the first value that satisfies the predicate

Param Type Description
predicate function The function to test each value

Example

// Example: Finding the first emitted value that is greater than 5
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
numberStream.find((value) => value > 5).then((value) => console.log(value)); // Logs: 6

observableStream.some(predicate) ⇒ Promise

Kind: instance method of ObservableStream
Returns: Promise - A promise that resolves with a boolean indicating whether some value satisfies the predicate

Param Type Description
predicate function The function to test each value

Example

// Example: Checking if any emitted values are greater than 5
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
numberStream.some((value) => value > 5).then((anyGreaterThan5) => console.log(anyGreaterThan5)); // Logs: true

observableStream.finally(callback) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that calls the callback when it completes

Param Type Description
callback function The function to call when the Observable completes

Example

// Example: Logging a message when the ObservableStream completes
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
const finalStream = numberStream.finally(() => console.log('Stream completed'));
finalStream.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
}); // Logs each number from 0 to 9, then logs 'Stream completed'

observableStream.toState() ⇒ ObservableState

Converts the ObservableStream to an ObservableState

Kind: instance method of ObservableStream
Returns: ObservableState - A new ObservableState that represents the current value of the stream
Example

// Example: Converting an ObservableStream to an ObservableState
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
const numberState = numberStream.toState();
numberState.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
}); // Logs each number from 0 to 9

observableStream.push(value)

Pushes a value to the observers. The value can be an Observable, an async iterable, an iterable, a Promise, or any other value.

Kind: instance method of ObservableStream

Param Type Description
value any The value to push

Example

// Example 1: Pushing values from an Observable
const sourceStream = new ObservableStream();
const targetStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  sourceStream.push(i);
}
sourceStream.end();
targetStream.push(sourceStream);
targetStream.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
}); // Logs each number from 0 to 9

// Example 2: Pushing values from a Promise
const promiseStream = new ObservableStream();
const promise = new Promise((resolve) => {
  setTimeout(() => resolve('Hello, world!'), 1000);
});
promiseStream.push(promise);
promiseStream.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
}); // Logs 'Hello, world!' after 1 second

observableStream.plug(stream)

Subscribes to a stream and pushes its values to the observers.

Kind: instance method of ObservableStream

Param Type Description
stream ObservableStream The stream to plug

Example

// Example: Plugging one ObservableStream into another
const sourceStream = new ObservableStream();
const targetStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  sourceStream.push(i);
}
sourceStream.end();
targetStream.plug(sourceStream);
targetStream.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
}); // Logs each number from 0 to 9

observableStream.end()

Ends the stream by calling the complete method of each observer.

Kind: instance method of ObservableStream
Example

// Example: Ending an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
numberStream.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
  complete: () => console.log('Stream completed'),
}); // Logs each number from 0 to 9, then logs 'Stream completed'

observableStream.catchError(fn) ⇒ ObservableStream

Catches errors on the ObservableStream and replaces them with a new stream.

Kind: instance method of ObservableStream
Returns: ObservableStream - - Returns a new ObservableStream that replaces the original stream when an error occurs.

Param Type Description
fn function A function that receives the error and returns a new ObservableStream.

Example

// Example: Catching and handling errors in an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  if (i === 5) {
    numberStream.error(new Error('Something went wrong'));
  } else {
    numberStream.push(i);
  }
}
numberStream.end();
const errorHandledStream = numberStream.catchError((error) => {
  console.error(error);
  return new ObservableStream((subscriber) => {
    subscriber.push('Error handled');
    subscriber.complete();
  });
});
errorHandledStream.subscribe({
  next: (value) => console.log(value),
  error: (err) => console.error(err),
}); // Logs each number from 0 to 4, logs the error, then logs 'Error handled'

observableStream.debounce(delay) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits the latest value after the debounce delay

Param Type Description
delay number The debounce delay in milliseconds

Example

// Example: Debouncing an ObservableStream of click events
const clickStream = new ObservableStream();
document.addEventListener('click', (event) => clickStream.push(event));
const debouncedStream = clickStream.debounce(500);
debouncedStream.subscribe({
  next: (event) => console.log(`Clicked at position: ${event.clientX}, ${event.clientY}`),
  error: (err) => console.error(err),
}); // Logs the position of the last click event that occurred at least 500 milliseconds after the previous click event

observableStream.tap(sideEffectFn) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that is identical to the source

Param Type Description
sideEffectFn function The function to perform side effect

Example

// Example: Logging each value emitted by an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
  numberStream.push(i);
}
numberStream.end();
const loggedStream = numberStream.tap((value) => console.log(value));
loggedStream.subscribe({
  next: (value) => {},
  error: (err) => console.error(err),
}); // Logs each number from 0 to 9

observableStream.throttle(duration) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits a value then ignores subsequent source values for duration milliseconds, then repeats this process.

Param Type Description
duration number The throttle duration in milliseconds

Example

// Example 1: Throttling scroll events
const scrollStream = new ObservableStream(subscriber => {
  window.addEventListener('scroll', event => subscriber.next(event));
});
const throttledScrollStream = scrollStream.throttle(200);
throttledScrollStream.subscribe({
  next: (event) => console.log('Scroll event:', event),
  error: (err) => console.error(err),
});

// Example 2: Throttling search input for autocomplete
const searchInput = document.querySelector('#search-input');
const searchStream = new ObservableStream(subscriber => {
  searchInput.addEventListener('input', event => subscriber.next(event.target.value));
});
const throttledSearchStream = searchStream.throttle(300);
throttledSearchStream.subscribe({
  next: (searchTerm) => console.log('Search term:', searchTerm),
  error: (err) => console.error(err),
});

observableStream.distinctUntilChanged() ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits all items emitted by the source Observable that are distinct by comparison from the previous item.
Example

// Example 1: Filtering out consecutive duplicate search terms
const searchInput = document.querySelector('#search-input');
const searchStream = new ObservableStream(subscriber => {
  searchInput.addEventListener('input', event => subscriber.next(event.target.value));
});
const distinctSearchStream = searchStream.distinctUntilChanged();
distinctSearchStream.subscribe({
  next: (searchTerm) => console.log('Search term:', searchTerm),
  error: (err) => console.error(err),
});

// Example 2: Filtering out consecutive duplicate API responses
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const distinctDataStream = observableStream.distinctUntilChanged();
distinctDataStream.subscribe({
  next: (data) => console.log('API data:', data),
  error: (err) => console.error(err),
});

observableStream.concatMap(transformFn) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits the results of applying a given transform function to each value emitted by the source ObservableStream, sequentially.

Param Type Description
transformFn function The function to transform each value in the source ObservableStream

Example

// Example 1: Transforming a stream of search terms into a stream of search results
const searchInput = document.querySelector('#search-input');
const searchStream = new ObservableStream(subscriber => {
  searchInput.addEventListener('input', event => subscriber.next(event.target.value));
});
const resultsStream = searchStream.concatMap(searchTerm =>
  ObservableStream.from(fetch(`https://api.example.com/search?query=${searchTerm}`).then(response => response.json()))
);
resultsStream.subscribe({
  next: (results) => console.log('Search results:', results),
  error: (err) => console.error(err),
});

// Example 2: Transforming a stream of click events into a stream of clicked elements
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event.target));
});
const elementsStream = clickStream.concatMap(target =>
  ObservableStream.from(Promise.resolve(target))
);
elementsStream.subscribe({
  next: (element) => console.log('Clicked element:', element),
  error: (err) => console.error(err),
});

observableStream.combineLatest(...observables) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits an array with the latest values from each source ObservableStream, whenever any source ObservableStream emits.

Param Type Description
...observables ObservableStream The source ObservableStreams

Example

// Example 1: Combining multiple API data streams
const apiDataStream1 = fetch('https://api.example.com/data1').then(response => response.json());
const apiDataStream2 = fetch('https://api.example.com/data2').then(response => response.json());
const observableStream1 = ObservableStream.from(apiDataStream1);
const observableStream2 = ObservableStream.from(apiDataStream2);
const combinedStream = observableStream1.combineLatest(observableStream2);
combinedStream.subscribe({
  next: ([data1, data2]) => console.log('API data:', data1, data2),
  error: (err) => console.error(err),
});

// Example 2: Combining multiple user event streams
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const scrollStream = new ObservableStream(subscriber => {
  window.addEventListener('scroll', event => subscriber.next(event));
});
const combinedStream = clickStream.combineLatest(scrollStream);
combinedStream.subscribe({
  next: ([clickEvent, scrollEvent]) => console.log('User events:', clickEvent, scrollEvent),
  error: (err) => console.error(err),
});

observableStream.startWith(...initialValues) ⇒ ObservableStream

Kind: instance method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits the specified initial values, followed by all values emitted by the source ObservableStream.

Param Type Description
...initialValues any The initial values to start with

Example

// Example 1: Prepending an API data stream with a loading state
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const loadingStream = observableStream.startWith('loading');
loadingStream.subscribe({
  next: (state) => console.log('State:', state),
  error: (err) => console.error(err),
});

// Example 2: Prepending a user event stream with an initial event
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const initialEvent = { type: 'initial' };
const eventStream = clickStream.startWith(initialEvent);
eventStream.subscribe({
  next: (event) => console.log('Event:', event),
  error: (err) => console.error(err),
});

ObservableStream.from(value) ⇒ ObservableStream

Kind: static method of ObservableStream
Returns: ObservableStream - A new ObservableStream that emits the values from the value

Param Type Description
value any The value to create an Observable from

Example

// Example 1: Creating an ObservableStream from a user event stream
const clickStream = new ObservableStream(subscriber => {
  document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);