Observable

public class Observable<Element> : Subscribable, ObservableConvertible

An abstract base class for observable sequences.

Subclasses must override subscribe(_:) to provide the subscription behavior. For a callback-style observable, use Observables.create and provide a SubscriptionHandler.

Completion contract: After onComplete() is forwarded to a downstream observer, no further onNext events will be delivered through that subscription. The subscription is considered terminated and its resources are released.

Warning

This class is not generically Thread safe. Usage, including operators, subscription and emission of events, must be done from a single queue. When the source emits on a different queue than the caller, use subscribeOn immediately before subscribing, to ensure subscription happens on the source’s queue.
  • A handler called upon subscription to an observable with the given observer.

    Declaration

    Swift

    public typealias SubscriptionHandler = (any Observer<Element>) -> any Disposable
  • Subscribes an Observer to receive elements and completion.

    Declaration

    Swift

    @discardableResult
    public func subscribe<O>(_ observer: O) -> any Disposable where Element == O.Element, O : Observer

    Return Value

    A Disposable representing the subscription. Disposing it cancels event delivery (the observer will no longer receive onNext or onComplete).

  • Ensures that the subscription to the source observable happens on the provided queue.

    Warning

    Returns a Subscribable, not an Observable. Do not chain operators after this — operators use non-thread-safe Observers that would race if the upstream emits on the specified queue while disposal happens from the caller’s thread. Use subscribe(onNext:onComplete:) as the final step after subscribeOn.

    Declaration

    Swift

    func subscribeOn(_ queue: TealiumQueue) -> some Subscribable<Element>
  • Ensures that downstream observers receive events on the provided queue.

    Warning

    The subscription (and therefore disposal) must happen from the same queue. Disposing from a different thread races with event delivery on the specified queue.

    Declaration

    Swift

    func observeOn(_ queue: TealiumQueue) -> Observable<Element>
  • Transforms the events provided to the observable into new events before calling the observers of the new observable.

    Declaration

    Swift

    func map<Result>(_ transform: @escaping (Element) -> Result) -> Observable<Result>
  • Transforms the events provided to the observable into new events, stripping out the nil events, before calling the observers of the new observable.

    Declaration

    Swift

    func compactMap<Result>(_ transform: @escaping (Element) -> Result?) -> Observable<Result>
  • Only report the events that are included by the provided filter.

    Declaration

    Swift

    func filter(_ isIncluded: @escaping (Element) -> Bool) -> Observable<Element>
  • Returns an observable that emits subsequent values only if they are different from the last one emitted by the underlying observable.

    Declaration

    Swift

    func distinct(isEqual: @escaping (Element, Element) -> Bool) -> Observable<Element>
  • Returns an observable that ignores the first N emitted events.

    Declaration

    Swift

    func ignore(_ count: Int) -> Observable<Element>
  • Returns an observable that ignores the first emitted event.

    Declaration

    Swift

    func ignoreFirst() -> Observable<Element>
  • On subscription emits the provided elements before providing the other events from the original observable.

    Declaration

    Swift

    func startWith(_ elements: Element...) -> Observable<Element>
  • Transforms each event into a new observable, subscribing to all of them and flattening their emissions.

    Completion: Completes only when the upstream AND all inner observables have completed.

    Declaration

    Swift

    func flatMap<Result>(_ selector: @escaping (Element) -> Observable<Result>) -> Observable<Result>

    Parameters

    selector

    the function that will return a new observable when an event is emitted by the original observable.

    Return Value

    an observable that flattens the observables returned by the selector and emits all of their events.

  • Transforms an event by providing a new observable that is flattened in the observable that is returned by this method. Every new observable returned will cancel the old observable subscriptions, therefore only emitting events for the latest returned observable.

    Warning

    If the observable returned from selector, on subscription, synchronously emits a new element upstream, then the selector will be triggered again. This can cause a endless loop in which we endlessly resubscribe to the returned observable. If this is the case, make sure to have an exit condition, from which the subscription doesn’t publish elements upstream anymore, to avoid blocking the thread in which this operator is being called.

    As a very simplified example, the following code causes an endless loop:

     let subject = Subject<Int>()
     _ = subject.asObservable().flatMapLatest { value in
         Observable<Int> { observer in
             // This is the block that is called on each `subscribe` call
             subject.onNext(value + 1)
             observer.onNext(value)
             return Subscription(onDispose: {})
         }
     }.subscribe { _ in }
     subject.onNext(0)
    

    The following, instead, has an exit condition, so it’s safe to use:

     let subject = Subject<Int>()
     _ = subject.asObservable().flatMapLatest { value in
         Observable<Int> { observer in
             // This is the block that is called on each `subscribe` call
             if value < 10 {
                 subject.onNext(value + 1)
             }
             observer.onNext(value)
             return Subscription(onDispose: {})
         }
     }.subscribe { _ in }
     subject.onNext(0)
    

    Note

    more complex examples can be created where the upstream emit is less clear, so use this with caution.

    Declaration

    Swift

    func flatMapLatest<Result>(_ selector: @escaping (Element) -> Observable<Result>) -> Observable<Result>

    Parameters

    selector

    the function that will return a new observable when an event is emitted by the original observable.

    Return Value

    an observable that flattens the observable returned by the selector and emits all of the events from the latest returned observable.

  • Returns a new observable that emits the events of the original observable and all other observables passed as parameters.

    Completion: Completes only when all merged sources have completed.

    Declaration

    Swift

    func merge(_ otherObservables: Observable<Element>...) -> Observable<Element>
  • Returns an observable that emits only the first event matching the filter, then completes.

    If you don’t provide a block then the first event will always be taken.

    Completion: Completes immediately after the first matching element is emitted. Also completes (without emitting) if the upstream completes before a match is found.

    Declaration

    Swift

    func first(where isIncluded: @escaping (Element) -> Bool = { _ in true }) -> Observable<Element>
  • Combines the latest values from this observable and the provided observable into a tuple.

    The first emission occurs once both observables have emitted at least one event. After that, a new tuple is emitted each time either source emits a new value.

    Completion: Completes when both sources complete, or early if either source completes without ever having emitted a value.

    Declaration

    Swift

    func combineLatest<Other>(_ otherObservable: Observable<Other>) -> Observable<(Element, Other)>
  • Unsubscribes and subscribes again on each event while the condition is met.

    This is mainly used for cold observables that, when subscribed, start a new stream from zero. Use when you want to trigger the underlying observable to restart every time.

    Completion: Completes when the upstream completes without emitting a matching element, or when the predicate returns false.

    Warning

    If the underlying observable always emits a new event and the condition is always met, this will end up calling endlessly until, eventually, the app will crash for stack overflow or out of memory exceptions. You need to treat the underlying observable as a recursive function and make sure there is an exit condition.

    Declaration

    Swift

    func resubscribingWhile(_ isIncluded: @escaping (Element) -> Bool) -> Observable<Element>
  • Returns an observable that emits elements while the condition is met, then completes. If inclusive is true, the first element failing the condition is also emitted before completion.

    Completion: Completes when the predicate returns false, or when the upstream completes.

    Declaration

    Swift

    func takeWhile(_ isIncluded: @escaping (Element) -> Bool, inclusive: Bool = false) -> Observable<Element>
  • Returns a Single that only emits the first value from the underlying observable, on the given TealiumQueue.

    Declaration

    Swift

    func asSingle(queue: TealiumQueue) -> Single<Element>
  • Returns an observable that will emit values in a possibly asynchronous manner determined by the given block.

    Declaration

    Swift

    func callback<Result>(from block: @escaping (_ element: Element, _ completion: @escaping (Result) -> Void) -> Void) -> Observable<Result>

    Parameters

    block

    a block of code, to be executed with the next value from the source, along with the observer with which to emit downstream.

  • Returns an observable that will emit values in a possibly asynchronous manner determined by the given block.

    Declaration

    Swift

    func callback<Result>(fromDisposable block: @escaping (_ element: Element, _ completion: @escaping (Result) -> Void) -> any Disposable) -> Observable<Result>

    Parameters

    block

    a block of code, to be executed with the next value from the source, along with the observer with which to emit downstream, disposable by the returned Disposable object.

  • Returns an observable that will emit the last element received after the provided delay if no other event is emitted in the meantime.

    If the provided milliseconds are 0 or less, this behaves like an observeOn, dispatching events immediately on the provided queue, potentially synchronously if the element was emitted from that same queue.

    Declaration

    Swift

    func debounce(_ milliseconds: Int, on queue: TealiumQueue) -> Observable<Element>

    Parameters

    milliseconds

    The time to wait before emitting the elements

    queue

    The queue on which the debouncer is working on.

  • Returns an observable that will emit each value received after the provided delay.

    If the provided milliseconds are 0 or less, this behaves like an observeOn, dispatching events immediately on the provided queue, potentially synchronously if the element was emitted from that same queue.

    Declaration

    Swift

    func delay(_ milliseconds: Int, on queue: TealiumQueue) -> Observable<Element>

    Parameters

    milliseconds

    The time to wait before emitting the elements

    queue

    The queue on which the debouncer is working on.

  • Subscribes the observer only once and then automatically disposes it.

    This is meant to be used when you only need one observer to be registered once. Use the standalone first() operator if multiple observers all need to register for one event.

    Warning

    This method must be called from the thread on which the Observable emits events. Calling from a different thread can cause race conditions and crashes.

    Declaration

    Swift

    @discardableResult
    func subscribeOnce(_ observer: @escaping (Element) -> Void) -> any Disposable

    Return Value

    a Disposable that can be used to dispose this observer before the first event is sent to the observer, in case it’s not needed any longer.

  • Subscribes the given observer to receive updates, including an Observer.onComplete notification when the upstream source terminates. The subscription is stored in the given composite.

    Upon the Observer.onComplete signal, the observer is removed from the given composite.

    Warning

    adding to the composite happens on the caller thread, and removals may happen on a different one. As such, users should be certain that either composite is thread-safe, or that the caller and disposal thread are the same.

    Declaration

    Swift

    @discardableResult
    func subscribe(composite: any CompositeDisposable, observer: any Observer<Element>) -> any Disposable

    Return Value

    A Disposable for cancelling the subscription. Can be ignored as it will be stored in the composite until completed.

Available where Element: Equatable

  • Only emits new events if the last one is different from the new one.

    Declaration

    Swift

    func distinct() -> Observable<Element>