Observable

public class Observable<Element> : Subscribable

A class that allows to subscribe for events until the returned subscription is disposed.

  • A handler called upon subscription to an observable with the observer.

    Declaration

    Swift

    public typealias SubscriptionHandler = (@escaping Observer) -> Disposable
  • Declaration

    Swift

    public func subscribe(_ observer: @escaping Observer) -> Disposable
  • Ensures that Observers to the returned observable are always subscribed on the provided queue.

    Warning

    Must be called as a last item in the observable chain. Failing to do so will result in subsequent operators to be subscribed on the calling Thread.

    Declaration

    Swift

    func subscribeOn(_ queue: TealiumQueue) -> any Subscribable<Element>
  • Ensures that Observers to the returned observable are always called on the provided queue.

    Declaration

    Swift

    func observeOn(_ queue: TealiumQueue) -> Observable<Element>
  • Transforms the events provided to the observable into new events before calling the observers of the new observable.

    Declaration

    Swift

    func map<Result>(_ transform: @escaping (Element) -> Result) -> Observable<Result>
  • Transforms the events provided to the observable into new events, stripping out the nil events, before calling the observers of the new observable.

    Declaration

    Swift

    func compactMap<Result>(_ transform: @escaping (Element) -> Result?) -> Observable<Result>
  • Only report the events that are included by the provided filter.

    Declaration

    Swift

    func filter(_ isIncluded: @escaping (Element) -> Bool) -> Observable<Element>
  • Transforms an event by providing a new observable that is flattened in the observable that is returned by this method.

    Declaration

    Swift

    func flatMap<Result>(_ selector: @escaping (Element) -> Observable<Result>) -> Observable<Result>

    Parameters

    selector

    the function that will return a new observable when an event is published by the original observable.

    Return Value

    an observable that flattens the observables returned by the selector and emits all of their events.

  • Transforms an event by providing a new observable that is flattened in the observable that is returned by this method. Every new observable returned will cancel the old observable subscriptions, therefore only emitting events for the latest returned observable.

    Warning

    If the observable returned from selector, on subscription, synchronously publishes a new element upstream, then the selector will be triggered again. This can cause a endless loop in which we endlessly resubscribe to the returned observable. If this is the case, make sure to have an exit condition, from which the subscription doesn’t publish elements upstream anymore, to avoid blocking the thread in which this operator is being called.

    As a very simplified example, the following code causes an endless loop:

     let subject = Subject<Int>()
     _ = subject.asObservable().flatMapLatest { value in
         Observable<Int> { observer in
             // This is the block that is called on each `subscribe` call
             subject.publish(value + 1)
             observer(value)
             return Subscription(unsubscribe: {})
         }
     }.subscribe { _ in }
     subject.publish(0)
    

    The following, instead, has an exit condition, so it’s safe to use:

     let subject = Subject<Int>()
     _ = subject.asObservable().flatMapLatest { value in
         Observable<Int> { observer in
             // This is the block that is called on each `subscribe` call
             if value < 10 {
                 subject.publish(value + 1)
             }
             observer(value)
             return Subscription(unsubscribe: {})
         }
     }.subscribe { _ in }
     subject.publish(0)
    

    Note

    more complex examples can be created where the upstream publish is less clear, so use this with caution.

    Declaration

    Swift

    func flatMapLatest<Result>(_ selector: @escaping (Element) -> Observable<Result>) -> Observable<Result>

    Parameters

    selector

    the function that will return a new observable when an event is published by the original observable.

    Return Value

    an observable that flattens the observable returned by the selector and emits all of the events from the latest returned observable.

  • On subscription emits the provided elements before providing the other events from the original observable.

    Declaration

    Swift

    func startWith(_ elements: Element...) -> Observable<Element>
  • Returns a new observable that emits the events of the original observable and the otherObservable passed as parameter.

    Declaration

    Swift

    func merge(_ otherObservables: Observable<Element>...) -> Observable<Element>
  • Returns an observable that emits only the first event that is included by the provided filter.

    If you don’t provide a block then the first event will always be taken. After the first event is published it automatically disposes the observer.

    Declaration

    Swift

    func first(where isIncluded: @escaping (Element) -> Bool = { _ in true }) -> Observable<Element>
  • Returns a new observable that will emit events with a tuple containing the last event of the original and the provided observable.

    The first event will be fired when both observable have emitted at least one event. Then a new event with the tuple will be emitted every time one of the two emits a new event.

    Declaration

    Swift

    func combineLatest<Other>(_ otherObservable: Observable<Other>) -> Observable<(Element, Other)>
  • Returns an observable that ignores the first N published events.

    Declaration

    Swift

    func ignore(_ count: Int) -> Observable<Element>
  • Returns an observable that ignores the first published events.

    Declaration

    Swift

    func ignoreFirst() -> Observable<Element>
  • Unsubscribes and subscribes again on each event while the condition is met.

    This is mainly used for cold observables that, when subscribed, start a new stream from zero. Use when you want to trigger the underlying observable to restart every time.

    Warning

    If the underlying observable always emits a new event and the condition is always met, this will end up calling endlessly until, eventually, the app will crash for stack overflow or out of memory exceptions. You need to treat the underlying observable as a recursive function and make sure there is an exit condition.

    Declaration

    Swift

    func resubscribingWhile(_ isIncluded: @escaping (Element) -> Bool) -> Observable<Element>
  • Returns an observable that automatically unsubscribes when the provided condition is no longer met. If inclusive is true the last element will also be published.

    Declaration

    Swift

    func takeWhile(_ isIncluded: @escaping (Element) -> Bool, inclusive: Bool = false) -> Observable<Element>
  • Returns an observable that emits subsequent values only if they are different from the last one emitted by the underlying observable.

    Declaration

    Swift

    func distinct(isEqual: @escaping (Element, Element) -> Bool) -> Observable<Element>
  • Returns a Single that only emits the first value from the underlying observable, on the given TealiumQueue.

    Declaration

    Swift

    func asSingle(queue: TealiumQueue) -> any Single<Element>
  • Returns an observable that will emit values in a possibly asynchronous manner determined by the given block.

    Declaration

    Swift

    func callback<Result>(from block: @escaping (Element, @escaping Observable<Result>.Observer) -> Void) -> Observable<Result>

    Parameters

    block

    a block of code, to be executed with the next value from the source, along with the observer with which to emit downstream.

  • Returns an observable that will emit values in a possibly asynchronous manner determined by the given block.

    Declaration

    Swift

    func callback<Result>(fromDisposable block: @escaping (Element, @escaping Observable<Result>.Observer) -> Disposable) -> Observable<Result>

    Parameters

    block

    a block of code, to be executed with the next value from the source, along with the observer with which to emit downstream, disposable by the returned Disposable object.

  • Returns an observable that will emit the last element received after the provided delay if no other event is emitted in the meantime.

    Declaration

    Swift

    func debounce(delay: TimeInterval, debouncer: DebouncerProtocol) -> Observable<Element>

    Parameters

    delay

    The delay to wait before emitting the elements

    debouncer

    The debouncer handling the debounce operation.

  • Returns an observable that emits each emitted value after a set time.

    Declaration

    Swift

    func delay(_ time: DispatchTimeInterval, on queue: TealiumQueue) -> Observable<Element>
  • Subscribes the observer only once and then automatically disposes it.

    This is meant to be used when you only need one observer to be registered once. Use the standalone first() operator if multiple observers all need to register for one event.

    Declaration

    Swift

    @discardableResult
    func subscribeOnce(_ observer: @escaping Observer) -> Disposable

    Return Value

    a Disposable that can be used to dispose this observer before the first event is sent to the observer, in case it’s not needed any longer.

Available where Element: Equatable

  • Only emits new events if the last one is different from the new one.

    Declaration

    Swift

    func distinct() -> Observable<Element>