Observable stream
ObservableStream ⇐ Observable
Kind: global class
Extends: Observable
- ObservableStream ⇐
Observable
- new ObservableStream()
- instance
- .map(transformFn) ⇒
ObservableStream
- .filter(predicateFn) ⇒
ObservableStream
- .reduce(reducerFn, initialValue) ⇒
Promise
- .takeUntil(notifier) ⇒
ObservableStream
- .take(n) ⇒
ObservableStream
- .drop(n) ⇒
ObservableStream
- .flatMap(transformFn) ⇒
ObservableStream
- .switchMap(transformFn) ⇒
ObservableStream
- .toArray() ⇒
Promise
- .forEach(callback) ⇒
Promise
- .every(predicate) ⇒
Promise
- .find(predicate) ⇒
Promise
- .some(predicate) ⇒
Promise
- .finally(callback) ⇒
ObservableStream
- .toState() ⇒
ObservableState
- .push(value)
- .plug(stream)
- .end()
- .catchError(fn) ⇒
ObservableStream
- .debounce(delay) ⇒
ObservableStream
- .tap(sideEffectFn) ⇒
ObservableStream
- .throttle(duration) ⇒
ObservableStream
- .distinctUntilChanged() ⇒
ObservableStream
- .concatMap(transformFn) ⇒
ObservableStream
- .combineLatest(...observables) ⇒
ObservableStream
- .startWith(...initialValues) ⇒
ObservableStream
- .map(transformFn) ⇒
- static
new ObservableStream()
ObservableStream class that extends Observable and provides additional methods for data transformation
observableStream.map(transformFn) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream instance with transformed data
Param | Type | Description |
---|---|---|
transformFn | function |
The function to transform the data |
Example
// Example 1: Transforming an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const transformedStream = observableStream.map(data => data.map(item => item * 2));
// Example 2: Transforming a user event stream
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const transformedStream = observableStream.map(event => ({ x: event.clientX, y: event.clientY }));
observableStream.filter(predicateFn) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream instance with filtered data
Param | Type | Description |
---|---|---|
predicateFn | function |
The function to filter the data |
Example
// Example 1: Filtering an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const filteredStream = observableStream.filter(data => data.someProperty === 'someValue');
// Example 2: Filtering a user event stream
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const filteredStream = observableStream.filter(event => event.target.id === 'someId');
observableStream.reduce(reducerFn, initialValue) ⇒ Promise
Kind: instance method of ObservableStream
Returns: Promise
- A promise that resolves with the reduced value
Param | Type | Description |
---|---|---|
reducerFn | function |
The function to reduce the data |
initialValue | any |
The initial value for the reducer |
Example
// Example 1: Reducing an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const reducedValuePromise = observableStream.reduce((acc, data) => acc + data.someProperty, 0);
// Example 2: Reducing a user event stream
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const reducedValuePromise = observableStream.reduce((acc, event) => acc + 1, 0);
observableStream.takeUntil(notifier) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that completes when the notifier emits
Param | Type | Description |
---|---|---|
notifier | Observable |
The Observable that will complete this Observable |
Example
// Example 1: Completing an API data stream when another stream emits
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const notifierStream = new ObservableStream(subscriber => {
setTimeout(() => subscriber.next(), 5000);
});
const completedStream = observableStream.takeUntil(notifierStream);
// Example 2: Completing a user event stream when another stream emits
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const notifierStream = new ObservableStream(subscriber => {
setTimeout(() => subscriber.next(), 5000);
});
const completedStream = observableStream.takeUntil(notifierStream);
observableStream.take(n) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that completes after emitting n values
Param | Type | Description |
---|---|---|
n | number |
The number of values to take |
Example
// Example 1: Taking a certain number of values from an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const takenStream = observableStream.take(5);
// Example 2: Taking a certain number of values from a user event stream
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const takenStream = observableStream.take(5);
observableStream.drop(n) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that starts emitting after n values have been emitted
Param | Type | Description |
---|---|---|
n | number |
The number of values to drop |
Example
// Example 1: Dropping a certain number of values from an API data stream
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const droppedStream = observableStream.drop(5);
// Example 2: Dropping a certain number of values from a user event stream
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const observableStream = ObservableStream.from(clickStream);
const droppedStream = observableStream.drop(5);
observableStream.flatMap(transformFn) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits the values from the inner Observables
Param | Type | Description |
---|---|---|
transformFn | function |
The function to transform the data into Observables |
Example
// Example 1: Transforming an API data stream into inner Observables
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const flatMappedStream = observableStream.flatMap(data => ObservableStream.from(fetch(`https://api.example.com/data/${data.id}`).then(response => response.json())));
// Example 2: Transforming a user event stream into inner Observables
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const positionStream = clickStream.flatMap(event => ObservableStream.from({ x: event.clientX, y: event.clientY }));
// Example 3: Transforming a stream of search terms into a stream of search results
const searchTerms = new ObservableStream(subscriber => {
const input = document.querySelector('#search-input');
input.addEventListener('input', event => subscriber.next(event.target.value));
});
const searchResults = searchTerms.debounce(300).flatMap(term => ObservableStream.from(fetch(`https://api.example.com/search?q=${term}`).then(response => response.json())));
observableStream.switchMap(transformFn) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits the values from the inner Observables
Param | Type | Description |
---|---|---|
transformFn | function |
The function to transform the data into Observables |
Example
// Example 1: Transforming click events into Observables
const clickStream = new ObservableStream();
document.addEventListener('click', (event) => clickStream.push(event));
const positionStream = clickStream.switchMap((event) => {
return new ObservableStream((subscriber) => {
subscriber.push({ x: event.clientX, y: event.clientY });
subscriber.complete();
});
});
positionStream.subscribe({
next: (position) => console.log(`Clicked at position: ${position.x}, ${position.y}`),
error: (err) => console.error(err),
});
// Example 2: Transforming API responses into Observables
const apiStream = new ObservableStream();
fetch('https://api.example.com/data')
.then((response) => response.json())
.then((data) => apiStream.push(data))
.catch((error) => apiStream.error(error));
const transformedStream = apiStream.switchMap((data) => {
return new ObservableStream((subscriber) => {
subscriber.push(transformData(data));
subscriber.complete();
});
});
transformedStream.subscribe({
next: (transformedData) => console.log(transformedData),
error: (err) => console.error(err),
});
observableStream.toArray() ⇒ Promise
Kind: instance method of ObservableStream
Returns: Promise
- A promise that resolves with an array of all values emitted by the Observable
Example
// Example: Collecting all emitted values from an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
numberStream.toArray().then((values) => console.log(values)); // Logs: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
observableStream.forEach(callback) ⇒ Promise
Kind: instance method of ObservableStream
Returns: Promise
- A promise that resolves when the Observable completes
Param | Type | Description |
---|---|---|
callback | function |
The function to call for each value emitted by the Observable |
Example
// Example: Logging each value emitted by an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
numberStream.forEach((value) => console.log(value)); // Logs each number from 0 to 9
observableStream.every(predicate) ⇒ Promise
Kind: instance method of ObservableStream
Returns: Promise
- A promise that resolves with a boolean indicating whether every value satisfies the predicate
Param | Type | Description |
---|---|---|
predicate | function |
The function to test each value |
Example
// Example: Checking if all emitted values are even
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
numberStream.every((value) => value % 2 === 0).then((allEven) => console.log(allEven)); // Logs: false
observableStream.find(predicate) ⇒ Promise
Kind: instance method of ObservableStream
Returns: Promise
- A promise that resolves with the first value that satisfies the predicate
Param | Type | Description |
---|---|---|
predicate | function |
The function to test each value |
Example
// Example: Finding the first emitted value that is greater than 5
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
numberStream.find((value) => value > 5).then((value) => console.log(value)); // Logs: 6
observableStream.some(predicate) ⇒ Promise
Kind: instance method of ObservableStream
Returns: Promise
- A promise that resolves with a boolean indicating whether some value satisfies the predicate
Param | Type | Description |
---|---|---|
predicate | function |
The function to test each value |
Example
// Example: Checking if any emitted values are greater than 5
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
numberStream.some((value) => value > 5).then((anyGreaterThan5) => console.log(anyGreaterThan5)); // Logs: true
observableStream.finally(callback) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that calls the callback when it completes
Param | Type | Description |
---|---|---|
callback | function |
The function to call when the Observable completes |
Example
// Example: Logging a message when the ObservableStream completes
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
const finalStream = numberStream.finally(() => console.log('Stream completed'));
finalStream.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
}); // Logs each number from 0 to 9, then logs 'Stream completed'
observableStream.toState() ⇒ ObservableState
Converts the ObservableStream to an ObservableState
Kind: instance method of ObservableStream
Returns: ObservableState
- A new ObservableState that represents the current value of the stream
Example
// Example: Converting an ObservableStream to an ObservableState
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
const numberState = numberStream.toState();
numberState.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
}); // Logs each number from 0 to 9
observableStream.push(value)
Pushes a value to the observers. The value can be an Observable, an async iterable, an iterable, a Promise, or any other value.
Kind: instance method of ObservableStream
Param | Type | Description |
---|---|---|
value | any |
The value to push |
Example
// Example 1: Pushing values from an Observable
const sourceStream = new ObservableStream();
const targetStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
sourceStream.push(i);
}
sourceStream.end();
targetStream.push(sourceStream);
targetStream.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
}); // Logs each number from 0 to 9
// Example 2: Pushing values from a Promise
const promiseStream = new ObservableStream();
const promise = new Promise((resolve) => {
setTimeout(() => resolve('Hello, world!'), 1000);
});
promiseStream.push(promise);
promiseStream.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
}); // Logs 'Hello, world!' after 1 second
observableStream.plug(stream)
Subscribes to a stream and pushes its values to the observers.
Kind: instance method of ObservableStream
Param | Type | Description |
---|---|---|
stream | ObservableStream |
The stream to plug |
Example
// Example: Plugging one ObservableStream into another
const sourceStream = new ObservableStream();
const targetStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
sourceStream.push(i);
}
sourceStream.end();
targetStream.plug(sourceStream);
targetStream.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
}); // Logs each number from 0 to 9
observableStream.end()
Ends the stream by calling the complete method of each observer.
Kind: instance method of ObservableStream
Example
// Example: Ending an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
numberStream.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
complete: () => console.log('Stream completed'),
}); // Logs each number from 0 to 9, then logs 'Stream completed'
observableStream.catchError(fn) ⇒ ObservableStream
Catches errors on the ObservableStream and replaces them with a new stream.
Kind: instance method of ObservableStream
Returns: ObservableStream
- - Returns a new ObservableStream that replaces the original stream when an error occurs.
Param | Type | Description |
---|---|---|
fn | function |
A function that receives the error and returns a new ObservableStream. |
Example
// Example: Catching and handling errors in an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
if (i === 5) {
numberStream.error(new Error('Something went wrong'));
} else {
numberStream.push(i);
}
}
numberStream.end();
const errorHandledStream = numberStream.catchError((error) => {
console.error(error);
return new ObservableStream((subscriber) => {
subscriber.push('Error handled');
subscriber.complete();
});
});
errorHandledStream.subscribe({
next: (value) => console.log(value),
error: (err) => console.error(err),
}); // Logs each number from 0 to 4, logs the error, then logs 'Error handled'
observableStream.debounce(delay) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits the latest value after the debounce delay
Param | Type | Description |
---|---|---|
delay | number |
The debounce delay in milliseconds |
Example
// Example: Debouncing an ObservableStream of click events
const clickStream = new ObservableStream();
document.addEventListener('click', (event) => clickStream.push(event));
const debouncedStream = clickStream.debounce(500);
debouncedStream.subscribe({
next: (event) => console.log(`Clicked at position: ${event.clientX}, ${event.clientY}`),
error: (err) => console.error(err),
}); // Logs the position of the last click event that occurred at least 500 milliseconds after the previous click event
observableStream.tap(sideEffectFn) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that is identical to the source
Param | Type | Description |
---|---|---|
sideEffectFn | function |
The function to perform side effect |
Example
// Example: Logging each value emitted by an ObservableStream
const numberStream = new ObservableStream();
for (let i = 0; i < 10; i++) {
numberStream.push(i);
}
numberStream.end();
const loggedStream = numberStream.tap((value) => console.log(value));
loggedStream.subscribe({
next: (value) => {},
error: (err) => console.error(err),
}); // Logs each number from 0 to 9
observableStream.throttle(duration) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits a value then ignores subsequent source values for duration milliseconds, then repeats this process.
Param | Type | Description |
---|---|---|
duration | number |
The throttle duration in milliseconds |
Example
// Example 1: Throttling scroll events
const scrollStream = new ObservableStream(subscriber => {
window.addEventListener('scroll', event => subscriber.next(event));
});
const throttledScrollStream = scrollStream.throttle(200);
throttledScrollStream.subscribe({
next: (event) => console.log('Scroll event:', event),
error: (err) => console.error(err),
});
// Example 2: Throttling search input for autocomplete
const searchInput = document.querySelector('#search-input');
const searchStream = new ObservableStream(subscriber => {
searchInput.addEventListener('input', event => subscriber.next(event.target.value));
});
const throttledSearchStream = searchStream.throttle(300);
throttledSearchStream.subscribe({
next: (searchTerm) => console.log('Search term:', searchTerm),
error: (err) => console.error(err),
});
observableStream.distinctUntilChanged() ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits all items emitted by the source Observable that are distinct by comparison from the previous item.
Example
// Example 1: Filtering out consecutive duplicate search terms
const searchInput = document.querySelector('#search-input');
const searchStream = new ObservableStream(subscriber => {
searchInput.addEventListener('input', event => subscriber.next(event.target.value));
});
const distinctSearchStream = searchStream.distinctUntilChanged();
distinctSearchStream.subscribe({
next: (searchTerm) => console.log('Search term:', searchTerm),
error: (err) => console.error(err),
});
// Example 2: Filtering out consecutive duplicate API responses
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const distinctDataStream = observableStream.distinctUntilChanged();
distinctDataStream.subscribe({
next: (data) => console.log('API data:', data),
error: (err) => console.error(err),
});
observableStream.concatMap(transformFn) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits the results of applying a given transform function to each value emitted by the source ObservableStream, sequentially.
Param | Type | Description |
---|---|---|
transformFn | function |
The function to transform each value in the source ObservableStream |
Example
// Example 1: Transforming a stream of search terms into a stream of search results
const searchInput = document.querySelector('#search-input');
const searchStream = new ObservableStream(subscriber => {
searchInput.addEventListener('input', event => subscriber.next(event.target.value));
});
const resultsStream = searchStream.concatMap(searchTerm =>
ObservableStream.from(fetch(`https://api.example.com/search?query=${searchTerm}`).then(response => response.json()))
);
resultsStream.subscribe({
next: (results) => console.log('Search results:', results),
error: (err) => console.error(err),
});
// Example 2: Transforming a stream of click events into a stream of clicked elements
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event.target));
});
const elementsStream = clickStream.concatMap(target =>
ObservableStream.from(Promise.resolve(target))
);
elementsStream.subscribe({
next: (element) => console.log('Clicked element:', element),
error: (err) => console.error(err),
});
observableStream.combineLatest(...observables) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits an array with the latest values from each source ObservableStream, whenever any source ObservableStream emits.
Param | Type | Description |
---|---|---|
...observables | ObservableStream |
The source ObservableStreams |
Example
// Example 1: Combining multiple API data streams
const apiDataStream1 = fetch('https://api.example.com/data1').then(response => response.json());
const apiDataStream2 = fetch('https://api.example.com/data2').then(response => response.json());
const observableStream1 = ObservableStream.from(apiDataStream1);
const observableStream2 = ObservableStream.from(apiDataStream2);
const combinedStream = observableStream1.combineLatest(observableStream2);
combinedStream.subscribe({
next: ([data1, data2]) => console.log('API data:', data1, data2),
error: (err) => console.error(err),
});
// Example 2: Combining multiple user event streams
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const scrollStream = new ObservableStream(subscriber => {
window.addEventListener('scroll', event => subscriber.next(event));
});
const combinedStream = clickStream.combineLatest(scrollStream);
combinedStream.subscribe({
next: ([clickEvent, scrollEvent]) => console.log('User events:', clickEvent, scrollEvent),
error: (err) => console.error(err),
});
observableStream.startWith(...initialValues) ⇒ ObservableStream
Kind: instance method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits the specified initial values, followed by all values emitted by the source ObservableStream.
Param | Type | Description |
---|---|---|
...initialValues | any |
The initial values to start with |
Example
// Example 1: Prepending an API data stream with a loading state
const apiDataStream = fetch('https://api.example.com/data').then(response => response.json());
const observableStream = ObservableStream.from(apiDataStream);
const loadingStream = observableStream.startWith('loading');
loadingStream.subscribe({
next: (state) => console.log('State:', state),
error: (err) => console.error(err),
});
// Example 2: Prepending a user event stream with an initial event
const clickStream = new ObservableStream(subscriber => {
document.addEventListener('click', event => subscriber.next(event));
});
const initialEvent = { type: 'initial' };
const eventStream = clickStream.startWith(initialEvent);
eventStream.subscribe({
next: (event) => console.log('Event:', event),
error: (err) => console.error(err),
});
ObservableStream.from(value) ⇒ ObservableStream
Kind: static method of ObservableStream
Returns: ObservableStream
- A new ObservableStream that emits the values from the value
Param | Type | Description |
---|---|---|
value | any |
The value to create an Observable from |
Example