Observable
public class Observable<Element> : Subscribable, ObservableConvertible
An abstract base class for observable sequences.
Subclasses must override subscribe(_:) to provide the subscription behavior.
For a callback-style observable, use Observables.create and provide a SubscriptionHandler.
Completion contract: After onComplete() is forwarded to a downstream observer,
no further onNext events will be delivered through that subscription.
The subscription is considered terminated and its resources are released.
Warning
This class is not generically Thread safe. Usage, including operators, subscription and emission of events, must be done from a single queue. When the source emits on a different queue than the caller, usesubscribeOn immediately before subscribing,
to ensure subscription happens on the source’s queue.
-
A handler called upon subscription to an observable with the given observer.
Declaration
Swift
public typealias SubscriptionHandler = (any Observer<Element>) -> any Disposable -
Subscribes an
Observerto receive elements and completion.Declaration
Swift
@discardableResult public func subscribe<O>(_ observer: O) -> any Disposable where Element == O.Element, O : ObserverReturn Value
A
Disposablerepresenting the subscription. Disposing it cancels event delivery (the observer will no longer receiveonNextoronComplete). -
Ensures that the subscription to the source observable happens on the provided queue.
Warning
Returns aSubscribable, not anObservable. Do not chain operators after this — operators use non-thread-safeObservers that would race if the upstream emits on the specified queue while disposal happens from the caller’s thread. Usesubscribe(onNext:onComplete:)as the final step aftersubscribeOn.Declaration
Swift
func subscribeOn(_ queue: TealiumQueue) -> some Subscribable<Element> -
Ensures that downstream observers receive events on the provided queue.
Warning
The subscription (and therefore disposal) must happen from the same queue. Disposing from a different thread races with event delivery on the specified queue.Declaration
Swift
func observeOn(_ queue: TealiumQueue) -> Observable<Element> -
Transforms the events provided to the observable into new events before calling the observers of the new observable.
Declaration
Swift
func map<Result>(_ transform: @escaping (Element) -> Result) -> Observable<Result> -
Transforms the events provided to the observable into new events, stripping out the nil events, before calling the observers of the new observable.
Declaration
Swift
func compactMap<Result>(_ transform: @escaping (Element) -> Result?) -> Observable<Result> -
Only report the events that are included by the provided filter.
Declaration
Swift
func filter(_ isIncluded: @escaping (Element) -> Bool) -> Observable<Element> -
Returns an observable that emits subsequent values only if they are different from the last one emitted by the underlying observable.
Declaration
Swift
func distinct(isEqual: @escaping (Element, Element) -> Bool) -> Observable<Element> -
Returns an observable that ignores the first N emitted events.
Declaration
Swift
func ignore(_ count: Int) -> Observable<Element> -
Returns an observable that ignores the first emitted event.
Declaration
Swift
func ignoreFirst() -> Observable<Element> -
On subscription emits the provided elements before providing the other events from the original observable.
Declaration
Swift
func startWith(_ elements: Element...) -> Observable<Element> -
Transforms each event into a new observable, subscribing to all of them and flattening their emissions.
Completion: Completes only when the upstream AND all inner observables have completed.
Declaration
Swift
func flatMap<Result>(_ selector: @escaping (Element) -> Observable<Result>) -> Observable<Result>Parameters
selectorthe function that will return a new observable when an event is emitted by the original observable.
Return Value
an observable that flattens the observables returned by the selector and emits all of their events.
-
Transforms an event by providing a new observable that is flattened in the observable that is returned by this method. Every new observable returned will cancel the old observable subscriptions, therefore only emitting events for the latest returned observable.
Warning
If the observable returned fromselector, on subscription, synchronously emits a new element upstream, then the selector will be triggered again. This can cause a endless loop in which we endlessly resubscribe to the returned observable. If this is the case, make sure to have an exit condition, from which the subscription doesn’t publish elements upstream anymore, to avoid blocking the thread in which this operator is being called.As a very simplified example, the following code causes an endless loop:
let subject = Subject<Int>() _ = subject.asObservable().flatMapLatest { value in Observable<Int> { observer in // This is the block that is called on each `subscribe` call subject.onNext(value + 1) observer.onNext(value) return Subscription(onDispose: {}) } }.subscribe { _ in } subject.onNext(0)The following, instead, has an exit condition, so it’s safe to use:
let subject = Subject<Int>() _ = subject.asObservable().flatMapLatest { value in Observable<Int> { observer in // This is the block that is called on each `subscribe` call if value < 10 { subject.onNext(value + 1) } observer.onNext(value) return Subscription(onDispose: {}) } }.subscribe { _ in } subject.onNext(0)Note
more complex examples can be created where the upstream emit is less clear, so use this with caution.
Declaration
Swift
func flatMapLatest<Result>(_ selector: @escaping (Element) -> Observable<Result>) -> Observable<Result>Parameters
selectorthe function that will return a new observable when an event is emitted by the original observable.
Return Value
an observable that flattens the observable returned by the selector and emits all of the events from the latest returned observable.
-
Returns a new observable that emits the events of the original observable and all other observables passed as parameters.
Completion: Completes only when all merged sources have completed.
Declaration
Swift
func merge(_ otherObservables: Observable<Element>...) -> Observable<Element> -
Returns an observable that emits only the first event matching the filter, then completes.
If you don’t provide a block then the first event will always be taken.
Completion: Completes immediately after the first matching element is emitted. Also completes (without emitting) if the upstream completes before a match is found.
Declaration
Swift
func first(where isIncluded: @escaping (Element) -> Bool = { _ in true }) -> Observable<Element> -
Combines the latest values from this observable and the provided observable into a tuple.
The first emission occurs once both observables have emitted at least one event. After that, a new tuple is emitted each time either source emits a new value.
Completion: Completes when both sources complete, or early if either source completes without ever having emitted a value.
Declaration
Swift
func combineLatest<Other>(_ otherObservable: Observable<Other>) -> Observable<(Element, Other)> -
Unsubscribes and subscribes again on each event while the condition is met.
This is mainly used for cold observables that, when subscribed, start a new stream from zero. Use when you want to trigger the underlying observable to restart every time.
Completion: Completes when the upstream completes without emitting a matching element, or when the predicate returns
false.Warning
If the underlying observable always emits a new event and the condition is always met, this will end up calling endlessly until, eventually, the app will crash for stack overflow or out of memory exceptions. You need to treat the underlying observable as a recursive function and make sure there is an exit condition.Declaration
Swift
func resubscribingWhile(_ isIncluded: @escaping (Element) -> Bool) -> Observable<Element> -
Returns an observable that emits elements while the condition is met, then completes. If
inclusiveistrue, the first element failing the condition is also emitted before completion.Completion: Completes when the predicate returns
false, or when the upstream completes.Declaration
Swift
func takeWhile(_ isIncluded: @escaping (Element) -> Bool, inclusive: Bool = false) -> Observable<Element> -
Returns a
Singlethat only emits the first value from the underlying observable, on the givenTealiumQueue.Declaration
Swift
func asSingle(queue: TealiumQueue) -> Single<Element> -
Returns an observable that will emit values in a possibly asynchronous manner determined by the given
block.Declaration
Swift
func callback<Result>(from block: @escaping (_ element: Element, _ completion: @escaping (Result) -> Void) -> Void) -> Observable<Result>Parameters
blocka block of code, to be executed with the next value from the source, along with the observer with which to emit downstream.
-
Returns an observable that will emit values in a possibly asynchronous manner determined by the given
block.Declaration
Swift
func callback<Result>(fromDisposable block: @escaping (_ element: Element, _ completion: @escaping (Result) -> Void) -> any Disposable) -> Observable<Result>Parameters
blocka block of code, to be executed with the next value from the source, along with the observer with which to emit downstream, disposable by the returned
Disposableobject. -
Returns an observable that will emit the last element received after the provided delay if no other event is emitted in the meantime.
If the provided milliseconds are 0 or less, this behaves like an
observeOn, dispatching events immediately on the provided queue, potentially synchronously if the element was emitted from that same queue.Declaration
Swift
func debounce(_ milliseconds: Int, on queue: TealiumQueue) -> Observable<Element>Parameters
millisecondsThe time to wait before emitting the elements
queueThe queue on which the debouncer is working on.
-
Returns an observable that will emit each value received after the provided delay.
If the provided milliseconds are 0 or less, this behaves like an
observeOn, dispatching events immediately on the provided queue, potentially synchronously if the element was emitted from that same queue.Declaration
Swift
func delay(_ milliseconds: Int, on queue: TealiumQueue) -> Observable<Element>Parameters
millisecondsThe time to wait before emitting the elements
queueThe queue on which the debouncer is working on.
-
Subscribes the observer only once and then automatically disposes it.
This is meant to be used when you only need one observer to be registered once. Use the standalone
first()operator if multiple observers all need to register for one event.Warning
This method must be called from the thread on which the
Observableemits events. Calling from a different thread can cause race conditions and crashes.Declaration
Swift
@discardableResult func subscribeOnce(_ observer: @escaping (Element) -> Void) -> any DisposableReturn Value
a
Disposablethat can be used to dispose this observer before the first event is sent to the observer, in case it’s not needed any longer. -
Subscribes the given
observerto receive updates, including anObserver.onCompletenotification when the upstream source terminates. The subscription is stored in the givencomposite.Upon the
Observer.onCompletesignal, theobserveris removed from the givencomposite.Warning
adding to the
compositehappens on the caller thread, and removals may happen on a different one. As such, users should be certain that eithercompositeis thread-safe, or that the caller and disposal thread are the same.Declaration
Swift
@discardableResult func subscribe(composite: any CompositeDisposable, observer: any Observer<Element>) -> any DisposableReturn Value
A
Disposablefor cancelling the subscription. Can be ignored as it will be stored in the composite until completed.
-
Only emits new events if the last one is different from the new one.
Declaration
Swift
func distinct() -> Observable<Element>
View on GitHub