【Swift】RxSwift入門 ③

はじめに

RxSwift入門の第3回目です。

今回はObservableの便利なOperatorの紹介と、Schedulerを用いたマルチスレッドの利用方法を紹介したいと思います。

それと最後に、非同期にHTTP通信を行う少し踏み込んだサンプルコードを載せたいと思います。

第2回目はこちら
第1回目はこちら

サンプルコードの動作確認の環境は以下の通りです。

  • macOS: Mojave (10.14.6)
  • Xcode: 11.3
  • CocoaPods: 1.4.0
  • Swift: 5
  • RxSwift: 5.0.1

コードはXcodeのPlaygroundで動作するようになっています。
環境作りについては第2回目に記載しています。

生成に関するOperator

Create

create は前回少し触れましたが、要素の発行を自前で行うマニュアルなObservableを作成するためのOperatorです。

サンプルコード↓

func createSample() {
    
    let disposeBag = DisposeBag()
    
    // マニュアルなObservableの作成
    let observable = Observable<String>
        .create({ observer in
            observer.onNext("🍺")
            observer.onNext("🍶")
            observer.onNext("🍷")
            observer.onCompleted()
            
            return Disposables.create {
                print("Observable: Dispose")
            }
        })
    
    // Observer購読
    observable
        .subscribe(onNext: { element in
            print("Observer: \(element)")
        }, onDisposed: {
            print("Observer: onDisposed")
        })
        .disposed(by: disposeBag)
}

createSample()

実行結果↓

Observer: 🍺
Observer: 🍶
Observer: 🍷
Observer: onDisposed
Observable: Dispose

createに渡すクロージャーではsubscribeしてきたObserver(実際はそれをラップしたクラスのインスタンス)が受け取れるので、これに対してonNextと、正常終了ならonCompleted、エラー終了の場合はonErrorを使ってイベントを発生させてあげます。

クロージャーの最後はDisposableまたはCancelableのインスタンスを作って返してあげます。

「Disposable」はObservableやObserverにdisposeの挙動を付加するためのプロトコルで、「Cancelable」はObservableにdisposeしたかどうかの判定の挙動を付加するためのプロトコルです。

サンプルにあるDisposables.create({…})を呼ぶと、Cancelableを実装した「AnonymousDisposable」クラスのインスタンスが作成されます。
このDisposables.create({…})で渡したクロージャーはObservableがdisposeされると呼ばれ、Observable.createの中で作成したリソースを解放したりするための場として使用できます。

サンプルのように特に解放の必要もない場合は、単に引数なしのDisposables.create()を使用することができます。
これを呼び出すと「NopDisposable」という空っぽの挙動を持つクラスのインスタンスが作成されます。
多くのケースではこれで十分かと思います。

Deferred

deferred はObserverがsubscribeしたタイミングでObservableの生成を行い、かつObserverごとにObservableを生成するOperatorです。

サンプルコード↓

func deferredSample() {
    
    let disposeBag = DisposeBag()
    var count = 0
    
    // Observer毎に作られるObservableの作成
    let observable = Observable<Date>.deferred({
        count += 1
        print("Create Observable: \(count)")
        return Observable<Date>.just(Date())
    })
    
    // Observer購読1
    observable
        .subscribe(onNext: { element in
            print("Observer1: \(element)")
        })
        .disposed(by: disposeBag)
    
    // 2秒待つ
    Thread.sleep(until: Date(timeIntervalSinceNow: 2))
    
    // Observer購読2
    observable
        .subscribe(onNext: { element in
            print("Observer2: \(element)")
        })
        .disposed(by: disposeBag)
}

deferredSample()

実行結果↓

Create Observable: 1
Observer1: 2020-02-03 06:10:43 +0000
Create Observable: 2
Observer2: 2020-02-03 06:10:45 +0000

deferredの引数として、Observable生成用のクロージャーを渡します。

「Dateのインスタンスを要素として流すObservable」を生成する記述を書き、sleep 2秒を挟んで2回subscribeすることで、クロージャーがsubscribeするごとに呼ばれ、都度Observableが生成されていることがわかります。

試しにdeferredではなく、単に
let observable = Observable<Date>.just(Date())
とすると、2回とも同じ時刻が表示される(= 同じ要素の使い回しがされている)ことが確認できるかと思います。

Timer

timer は指定した時間が経過した時に要素を発行するObservableを生成します。

サンプルコード↓

func timerSample() {

    // 一定時間後に発行するObservableの作成
    let observable = Observable<Int>
        .timer(.seconds(3), scheduler: MainScheduler.instance)

    print(Date())
    
    // Observer購読
    _ = observable
        .subscribe(onNext: { element in
            print("Observer: \(element), Date: \(Date())")
        })
}

timerSample()

実行結果↓

2020-02-03 06:43:12 +0000
Observer: 0, Date: 2020-02-03 06:43:15 +0000

subscribeして3秒後に要素が発行されることが確認できます。

この時渡される要素は 0 固定で、Observable<Int>のジェネリクス部分をStringなどにするとコンパイルエラーになります。

また、
.timer(.seconds(3), period: .seconds(1), scheduler: MainScheduler.instance)
のように period という引数を指定することもでき、これを指定すると指定時間後の要素発行からさらに、periodの時間ごとに 0 からインクリメントした値を要素として連続した発行が行われるようになります。

引数でschedulerにMainScheduler.instanceを渡していますが、これは下記「Scheduler」の項目にて説明しますので、ひとまず無視してください。

また、disposed(by:)を使うとイベントが来る前に解放してしまうため、このサンプルでは省略しています。

Interval

interval は指定した時間毎に要素を発行するObservableを生成します。

サンプルコード↓

func intervalSample() {
    
    // 一定間隔で発行するObservableの作成
    let observable = Observable<Int>
        .interval(.seconds(2), scheduler: MainScheduler.instance)

    print(Date())

    // Observer購読
    _ = observable
        .subscribe(onNext: { element in
            print("Observer: \(element), Date: \(Date())")
        })
}

intervalSample()

実行結果↓

2020-02-03 06:59:10 +0000
Observer: 0, Date: 2020-02-03 06:59:12 +0000
Observer: 1, Date: 2020-02-03 06:59:14 +0000
Observer: 2, Date: 2020-02-03 06:59:16 +0000

渡される要素は 0 からのインクリメント値固定です。

これは実は1つ前の timer のラッパーで、
.timer(.seconds(2), period: .seconds(2), scheduler: MainScheduler.instance)
という呼び出しとイコールになります。

また、動かすと永遠に発行を続けるため、適当なところでXcodeから実行を止めてください。

あとの説明はtimerと一緒です。

発行に関するOperator

TakeUntil

takeUntil は引数として渡したObservableが何かしらの要素を発行した時に、自身の発行を完了させる(onCompletedを起こす)Operatorです。

サンプルコード↓

func takeUntilSample() {
    
    // 一定時間後に発行するObservableの作成
    let timerObservable = Observable<Int>
        .timer(.seconds(5), scheduler: MainScheduler.asyncInstance)
    
    print(Date())
    
    // Observer購読
    _ = timerObservable
        .subscribe({ event in
            print("Timer Observer: \(event), Date: \(Date())")
        })

    // 一定間隔で発行するObservableの作成
    let observable = Observable<Int>
        .interval(.seconds(1), scheduler: MainScheduler.asyncInstance)
        .takeUntil(timerObservable)
    
    // Observer購読
    _ = observable
        .subscribe({ event in
            print("Observer: \(event), Date: \(Date())")
        })
}

takeUntilSample()

実行結果↓

2020-02-03 08:31:06 +0000
Observer: next(0), Date: 2020-02-03 08:31:07 +0000
Observer: next(1), Date: 2020-02-03 08:31:08 +0000
Observer: next(2), Date: 2020-02-03 08:31:09 +0000
Observer: next(3), Date: 2020-02-03 08:31:10 +0000
Timer Observer: next(0), Date: 2020-02-03 08:31:11 +0000
Timer Observer: completed, Date: 2020-02-03 08:31:11 +0000
Observer: completed, Date: 2020-02-03 08:31:11 +0000

先程の永遠に続くintervalを、timerを使って止めてみました。

結果を見ての通り、5秒後にtimerのnextイベントが発生し、それをトリガーとしてintervalがcompletedになっているのが確認できます。

MainScheduler.asyncInstanceについては後述します。

Connectable

Connectable とは、Observableに要素の発行に関するより高度な振る舞いを加えるための仕組みのことを指します。
(振る舞いが加えられたObservableを「Connectable Observable」と呼びます)

具体的に言うと、ColdなObservableでもObserverがsubscribeした時点では発行は行われず、connectメソッドを呼ぶと、購読しているすべてのObserverに対して一斉に発行が開始されるようになります。

ObservableをConnectable Observableにするためのメソッドはいくつかありますが、ここでは代表的な publishmulticast の2つを紹介したいと思います。

publish

publish はObservableをConnectable Observableに変換する標準的な方法です。

サンプルコード↓

func connectableSample() {
    
    let disposeBag = DisposeBag()
    
    // ConnectableなObservable作成
    let observable = Observable<String>
        .of("🍺","🍶","🍷")
        .publish()
    
    // Observer購読1
    observable
        .subscribe({ event in
            print("Observer1: \(event)")
        })
        .disposed(by: disposeBag)
    
    // Observer購読2
    observable
        .subscribe({ event in
            print("Observer2: \(event)")
        })
        .disposed(by: disposeBag)
    
    // 発行開始
    _ = observable.connect()
}

connectableSample()

実行結果↓

Observer1: next(🍺)
Observer2: next(🍺)
Observer1: next(🍶)
Observer2: next(🍶)
Observer1: next(🍷)
Observer2: next(🍷)
Observer1: completed
Observer2: completed

通常のObservableだと1つ目のObserverへの発行が完了してから2つ目に移りますが、Connectableにすることで、connectを呼んだタイミングで2つ同時に発行が行われていることが確認できます。

multicast

multicast は自前で用意したSubjectを渡して、それ経由で一斉発行を行います。

サンプルコード↓

func multicastSample() {
    
    let disposeBag = DisposeBag()
    
    // Subject作成
    let subject = PublishSubject<String>()
    
    subject
        .subscribe({ event in
            print("Subject: \(event)")
        })
        .disposed(by: disposeBag)
    
    // ConnectableなObservable作成
    let observable = Observable<String>
        .of("🍺","🍶","🍷")
        .multicast(subject)
    
    // Observer購読1
    observable
        .subscribe({ event in
            print("Observer1: \(event)")
        })
        .disposed(by: disposeBag)
    
    // Observer購読2
    observable
        .subscribe({ event in
            print("Observer2: \(event)")
        })
        .disposed(by: disposeBag)
    
    // Subjectから発行してみる
    subject.onNext("🥃")
    
    // 発行開始
    _ = observable.connect()
    
    // このお茶は誰にも届かない。。
    subject.onNext("🍵")
}

multicastSample()

実行結果↓

Subject: next(🥃)
Observer1: next(🥃)
Observer2: next(🥃)
Subject: next(🍺)
Observer1: next(🍺)
Observer2: next(🍺)
Subject: next(🍶)
Observer1: next(🍶)
Observer2: next(🍶)
Subject: next(🍷)
Observer1: next(🍷)
Observer2: next(🍷)
Subject: completed
Observer1: completed
Observer2: completed

自作のsubjectを通すので、subject.onNextでもObservableのObserverに発行されることが確認でき、かつconnectを呼ぶことで、subjectのObserverにも要素が発行されることも確認することができます。
まさにマルチキャスト。

ただし、connectを呼ぶとsubjectまでcompletedになってしまうため、それ以降はsubjectからの発行までできなくなってしまいます。

ちなみに最初のpublishは、中で無名のPublishSubjectを作ってmulticastを呼んでいますので、実質2つのコードに差異はほとんどありません。

変換に関するOperator

Filter / Take / Buffer / DistinctUntilChanged

4つまとめてご紹介。
filter は条件による要素の絞り込みです。
take は要素を先頭から指定した数だけ取得します。
buffer は指定した時間が経つまで、あるいは指定した数だけ発行されるまで、要素を溜め込みます。
distinctUntilChanged は同じ要素の連続を省きます。

では、サンプルコード↓

func filterSample() {
    
    let disposeBag = DisposeBag()
    
    // Observableの作成
    let observable = Observable<String>
        .of("🍺", "🍶", "🍶", "🍷")
    
    // Observer購読 [filter]
    observable
        .filter({ element -> Bool in
            element != "🍺" // ビールはちょっと・・・
        })
        .subscribe(onNext: { element in
            print("filter: \(element)")
        })
        .disposed(by: disposeBag)
    
    // Observer購読 [take]
    observable
        .take(2) // 2杯までしか飲めません
        .subscribe(onNext: { element in
            print("take: \(element)")
        })
        .disposed(by: disposeBag)

    // Observer購読 [buffer]
    observable
        .buffer(timeSpan: .never, count: 2, scheduler: MainScheduler.instance) // 2杯ずつ持ってこい
        .subscribe(onNext: { element in
            print("buffer: \(element)")
        })
        .disposed(by: disposeBag)
    
    // Observer購読 [distinctUntilChanged]
    observable
        .distinctUntilChanged() // 同じものは飲まん!
        .subscribe(onNext: { element in
            print("distinctUntilChanged: \(element)")
        })
        .disposed(by: disposeBag)
}

filterSample()

実行結果↓

filter: 🍶
filter: 🍶
filter: 🍷
take: 🍺
take: 🍶
buffer: ["🍺", "🍶"]
buffer: ["🍶", "🍷"]
buffer: []
distinctUntilChanged: 🍺
distinctUntilChanged: 🍶
distinctUntilChanged: 🍷

お酒の飲み方は人それぞれ。
特に難しいこともありませんね。

bufferが最後、空の配列が発行されるのが気になりますが。

FlatMap

flatMap は最初なかなか理解がしづらく、だけど一番利便性が高くて利用率も高いOperatorではないでしょうか。

これは上流のObservableの流れを汲んで、新たなObservableに流れを置き換えてしまうOperatorです。

サンプルコード↓

func flatMapSample() {
    
    let disposeBag = DisposeBag()
    
    // Observable作成
    Observable<String>
        .of("Hello", "Nice")
        .flatMap({ element in
            // 新たなObservableに変換
            Observable<String>
                .of("🇯🇵", "🇺🇸", "🇨🇳")
                .map({ value in
                    "\(element), \(value)!"
                })
        })
        .subscribe(onNext: { element in
            print(element)
        })
        .disposed(by: disposeBag)
}

flatMapSample()

実行結果↓

Hello, 🇯🇵!
Hello, 🇺🇸!
Nice, 🇯🇵!
Hello, 🇨🇳!
Nice, 🇺🇸!
Nice, 🇨🇳!

最初のObservableの要素を取りつつ、flatMap内で作成した新たなObservableの流れに接続され、その結果がObserverに流れていっているのがわかるかと思います。

前回紹介したmapはObservableの流れはそのままに、要素の中身・型だけを変換していたのに対し、このflatMapはObservableごと変換してしまうわけです。

ただ、flatMapの性質はわかっても、最初はこれの何が嬉しいのかがわからないかと思います。

例えば、もしこのサンプルと同じことをflatMapなしでやろうとすると、以下のような感じになるかと思います↓

func nonFlatMapSample() {
    
    let disposeBag = DisposeBag()
    
    // Observable作成
    Observable<String>
        .of("Hello", "Nice")
        .subscribe(onNext: { element in
            
            // Observable作成
            Observable<String>
                .of("🇯🇵", "🇺🇸", "🇨🇳")
                .map({ value in
                    "\(element), \(value)!"
                })
                .subscribe(onNext: { element in
                    print(element)
                })
                .disposed(by: disposeBag)
        })
        .disposed(by: disposeBag)
}

nonFlatMapSample()

見事にネストしてますね。
これが2つ目のObservableの流れを受けて、さらに別なObservableを用いたいと思うと……。

しかし、flatMapなら3段、4段となってもネストは深くなりません。
flatMapの下にもう1つflatMapが置かれるだけです。

ネストが深くなってしまう状況をフラットにしつつObservableの変換をしてくれる、だからflatMapという名前なんだということがわかります。

「それはわかったけど、Observableを何段も重ねて使うことなんてある?」

あるんです。
それは今回の一番最後に記載する「非同期にHTTP通信する」サンプルコードを見ていただければと思います。

結合に関するOperator

Merge

merge は複数のObservableを1つにまとめるOperatorです。

サンプルコード↓

func mergeSample() {
    
    let disposeBag = DisposeBag()
    
    // Subject作成
    let subject1 = PublishSubject<String>()
    let subject2 = PublishSubject<String>()

    // Observable作成
    let observable = Observable<String>
        .merge([subject1, subject2])
    
    // Observer購読
    observable
        .subscribe({ event in
            print("Observer: \(event)")
        })
        .disposed(by: disposeBag)
    
    // 発行
    subject1.onNext("🍵")
    subject2.onNext("☕")
}

mergeSample()

実行結果↓

Observer: next(🍵)
Observer: next(☕)

2つのsubjectを1本化したObservableに対してsubscribeすれば、そのどちらから発行されても受け取れる。

実にイージー。

CombineLatest

combineLatest は複数のObservableを1本化するところまではmergeと同じで、そこからさらに、どのObservableでも発行を起こした時に、すべてのObservableの最後に発行された要素をそれぞれ持ち寄って、それらを1つの要素に合体させたものを下流へと流すというOperatorです。

サンプルコード↓

func combineLatestSample() {
    
    let disposeBag = DisposeBag()
    
    // Subject作成
    let subject1 = PublishSubject<String>()
    let subject2 = PublishSubject<Int>()

    // Observable作成
    let observable = Observable<String>
        .combineLatest(subject1, subject2) { element1, element2 in
            "\(element1)が\(element2)杯"
    }
    
    // Observer購読
    observable
        .subscribe(onNext: { element in
            print("Observer: \(element)")
        })
        .disposed(by: disposeBag)
    
    // 発行
    subject1.onNext("🥤")
    subject1.onNext("🍵")
    subject2.onNext(2)
    subject1.onNext("☕")
    subject1.onNext("🥛")
    subject2.onNext(1)
    subject2.onNext(3)
    subject1.onNext("🍹")
}

combineLatestSample()

実行結果↓

Observer: 🍵が2杯
Observer: ☕が2杯
Observer: 🥛が2杯
Observer: 🥛が1杯
Observer: 🥛が3杯
Observer: 🍹が3杯

subject1が最初にonNextを行った時、まだsubject2は1つも発行を行っていないため、Observerへは何も通知されません。

その後、subject2が初めてonNextを行ったタイミングで、subject1の最後の要素である「🍵」と合わせて「🍵が2杯」という結果がObserverへと通知されているのがわかります。

あとは簡単ですね。

WithLatestFrom

withLatestFrom はcombineLatestと似ていますが、combineLatestがどのObservableが発行してもObserverに通知が行くのと違って、withLatestFromはその主体となるObservableが決まっています。

サンプルコード↓

func withLatestFromSample() {
    
    let disposeBag = DisposeBag()
    
    // Subject作成
    let subject1 = PublishSubject<String>()
    let subject2 = PublishSubject<Int>()

    // Observable作成
    let observable = subject1
        .withLatestFrom(subject2) { element1, element2 in
            "\(element1)が\(element2)杯"
    }
    
    // Observer購読
    observable
        .subscribe(onNext: { element in
            print("Observer: \(element)")
        })
        .disposed(by: disposeBag)
    
    // 発行
    subject1.onNext("🥤")
    subject1.onNext("🍵")
    subject2.onNext(2)
    subject1.onNext("☕")
    subject1.onNext("🥛")
    subject2.onNext(1)
    subject2.onNext(3)
    subject1.onNext("🍹")
}

withLatestFromSample()

実行結果↓

Observer: ☕が2杯
Observer: 🥛が2杯
Observer: 🍹が3杯

subject1からwithLatestFromを呼んでいます。
これにより、subject1がここでは発行の主体となります。

で、subject2が未発行な状況ではcombineLatestと同じ。
ただしその後も、subject2のonNextではObserverへは何も行かず、subject1がonNextを行った時に、subject2の最後の要素をもらって新たな要素へと結合し、Observerに通知しているのがわかります。

マルチスレッド

Scheduler

上記いくつかのサンプルで出てきましたが、RxSwiftでマルチスレッドを実現させるためにはこのSchedulerというものを使います。

Schedulerは、Observableから連なる各処理をどのように実行するか(即時、一定間隔で、バックグラウンドで等)を決定するためのコンポーネントです。

このSchedulerを使い分けることで、処理をメインスレッドで実行するのかあるいはサブスレッドにするのかや、直列で処理するのか並行にするのかを決めることができます。

※Schedulerは内部でGCD(Grand Central Dispatch)を使用しているため、その知識も多少必要になります。

ここでは、よく使用する4つに絞って紹介したいと思います。

MainScheduler

メインスレッドで動作する直列キューのSchedulerです。

通常は自分でインスタンスを作らず、SingletonインスタンスであるinstanceプロパティまたはasyncInstanceプロパティを使用します。

この両者の違いは、instanceは現在の処理がメインキュー(DispatchQueue.main)にいる場合はすぐにスケジュールされたアクションを実行し、そうでない場合は非同期にアクションをスケジュール(DispatchQueue.main.async)するのに対し、asyncInstanceは常に非同期にアクションをスケジュールします。

また、このSchedulerは後述するobserveOnというOperatorに最適化されています。

ConcurrentMainScheduler

メインスレッドで動作するSchedulerです。
Concurrentと名前に付いていますが、メインキューでメインスレッドなので並行には処理されません。

こちらも自分でインスタンスを作らず、Singletonインスタンスであるinstanceプロパティを使用します。

また、このSchedulerは後述するsubscribeOnというOperatorに最適化されています。

SerialDispatchQueueScheduler

直列キューのSchedulerです。

メインスレッドかサブスレッドかは、インスタンス生成時に渡すキューによって変わります。

ただし、上記のMainSchedulerはSerialDispatchQueueSchedulerのサブクラスであるため、メインスレッドで使う場合はこちらを使わず、MainSchedulerを使用します。

なお、生成時に並行キューを渡しても、内部では直列で動作する(アクションのスケジュールは内部で別途作った直列キューで行われる)ため、並行処理を行いたい場合は次のConcurrentDispatchQueueSchedulerを使用してください。

生成のサンプルコード↓

// 名前だけ指定して生成(内部で直列キューが作られ、それでアクションも実行される)
let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "any.unique.name")

// キューを指定して生成(内部で直列キューが作られ、そのターゲット(実行用)キューに引数のキューが使われる)
let scheduler = SerialDispatchQueueScheduler(queue: DispatchQueue(label: "any.unique.name.1"), internalSerialQueueName: "any.unique.name.2")

// QOSを指定して生成(内部で直列キューが作られ、そのターゲット(実行用)キューにQOS指定のグローバルキューが使われる)
let scheduler = SerialDispatchQueueScheduler(qos: .default, internalSerialQueueName: "any.unique.name")

“any.unique.name”の部分はリバースドメイン形式にするなどして、他と被らないように適宜変換してください。

ConcurrentDispatchQueueScheduler

並行キューのSchedulerです。

メインスレッドかサブスレッドかは、インスタンス生成時に渡すキューによって変わります。

また、先程のSerialDispatchQueueSchedulerと違い、こちらは引数に直列キューを渡すとアクションも直列に処理されてしまうので注意が必要です。

生成のサンプルコード↓

// キューを指定して生成(渡したキューがそのまま使われる)
let scheduler = ConcurrentDispatchQueueScheduler(queue: DispatchQueue.global(qos: .default))

// QOSを指定して生成(内部でQOS指定の並行キューが作られる)
let scheduler = ConcurrentDispatchQueueScheduler(qos: .default)

Schedulerの使用方法

Schedulerを使ってマルチスレッドに処理させる方法は、大きく分けて2つあります。
1つは、Observableのインスタンスを生成する時にSchedulerを渡す方法、
もう1つは、subscribeOnやobserveOnを使う方法です。

最初に前者のサンプルコードを記載します↓

func schedulerSample1() {
    
    // バックグラウンド&直列なスケジューラーの作成
    let bgScheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "hogehoge")

    _ = Observable<String>
        .just("🇯🇵", scheduler: bgScheduler)
        .map({ element in
            "\(element)\nmap: \(Thread.current)"
        })
        .subscribe(onNext: { element in
            print("\(element)\nsubscribe: \(Thread.current)")
        })
}

schedulerSample1()

実行結果↓

🇯🇵
map: <NSThread: 0x60000357a400>{number = 4, name = (null)}
subscribe: <NSThread: 0x60000357a400>{number = 4, name = (null)}

justのscheduleパラメータにSerialDispatchQueueSchedulerを渡した結果、mapとsubscribeの中がサブスレッドで実行されているのが確認できます。

では次に、subscribeOnとobserveOnを使った方法を試してみます。

subscribeOn

subscribeOn は、subscribeを呼んでから後の処理のSchedulerを決定するためのOperatorです。
簡単に言えば、一連の処理のデフォルトのSchedulerになります。

サンプルコード↓

func subscribeOnSample() {
    
    // バックグラウンド&直列なスケジューラーの作成
    let bgScheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "hogehoge")

    _ = Observable<String>
        .create({ observer in
            observer.onNext("create: \(Thread.current)")
            observer.onCompleted()
            return Disposables.create()
        })
        .map({ element in
            "\(element)\nmap: \(Thread.current)"
        })
        .subscribeOn(bgScheduler)
        .subscribe(onNext: { element in
            print("\(element)\nsubscribe: \(Thread.current)")
        })
}

subscribeOnSample()

実行結果↓

create: <NSThread: 0x6000007c8ac0>{number = 5, name = (null)}
map: <NSThread: 0x6000007c8ac0>{number = 5, name = (null)}
subscribe: <NSThread: 0x6000007c8ac0>{number = 5, name = (null)}

Observableの生成をcreateにしてみましたが、この中もサブスレッドで実行されていることがわかります。

上でも少し触れましたが、subscribeOnでメインスレッドを指定したい場合は、このOperatorに最適化されたConcurrentMainSchedulerを使用します。

observeOn

observeOn は、宣言的に書いた一連の処理において、observeOnを書いた以降の処理のSchedulerを決定するためのOperatorです。
簡単に言えば、途中の切り替えスイッチみたいなものです。

サンプルコード↓

func observeOnSample() {
    
    // バックグラウンド&直列なスケジューラーの作成
    let bgScheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "hogehoge")

    _ = Observable<String>
        .create({ observer in
            observer.onNext("create: \(Thread.current)")
            observer.onCompleted()
            return Disposables.create()
        })
        .observeOn(bgScheduler) // 以下、サブスレッド
        .map({ element in
            "\(element)\nmap: \(Thread.current)"
        })
        .observeOn(MainScheduler.instance) // 以下、メインスレッド
        .subscribe(onNext: { element in
            print("\(element)\nsubscribe: \(Thread.current)")
        })
}

observeOnSample()

実行結果↓

create: <NSThread: 0x6000031d10c0>{number = 1, name = main}
map: <NSThread: 0x6000031c1a80>{number = 6, name = (null)}
subscribe: <NSThread: 0x6000031d10c0>{number = 1, name = main}

createはメイン、mapの前にサブへの切り替えがあるのでmapはサブで、そしてsubscribeの前にメインに戻す記述があるのでsubscribeはメインで、それぞれ処理されているのがわかります。

また、サンプルコード内でもそうしていますが、observeOnでメインスレッドを指定したい場合はこのOperatorに最適化されたMainSchedulerを使用します。

subscribeOn + observeOn

一応、2つを合わせて使ってみます↓

func schedulerSample2() {
    
    // バックグラウンド&直列なスケジューラーの作成
    let bgScheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "hogehoge")

    _ = Observable<String>
        .create({ observer in
            observer.onNext("create: \(Thread.current)")
            observer.onCompleted()
            return Disposables.create()
        })
        .observeOn(MainScheduler.instance) // 以下、メインスレッド
        .map({ element in
            "\(element)\nmap1: \(Thread.current)"
        })
        .observeOn(bgScheduler) // 以下、サブスレッド
        .map({ element in
            "\(element)\nmap2: \(Thread.current)"
        })
        .observeOn(MainScheduler.instance) // 以下、メインスレッド
        .subscribeOn(bgScheduler) // 最初はサブスレッド
        .subscribe(onNext: { element in
            print("\(element)\nsubscribe: \(Thread.current)")
        })
}

schedulerSample2()

実行結果↓

create: <NSThread: 0x600003101300>{number = 4, name = (null)}
map1: <NSThread: 0x60000311c600>{number = 1, name = main}
map2: <NSThread: 0x600003101300>{number = 4, name = (null)}
subscribe: <NSThread: 0x60000311c600>{number = 1, name = main}

複雑なのはこのコードそのもので、動作的には問題ありませんね。

並行処理

上記はすべて直列処理でしたので、最後に並行処理を試してみます。

サンプルコード↓

func concurrentSample() {
    
    // バックグラウンド&並行なスケジューラーの作成
    let bgScheduler = ConcurrentDispatchQueueScheduler(qos: .default)
    
    // Observable作成
    let observable = Observable<Int>.create({ observer in
        for i in 0..<5 {
            observer.onNext(i)
            Thread.sleep(until: Date(timeIntervalSinceNow: Double.random(in: 0.0..<1.0)))
        }
        observer.onCompleted()
        return Disposables.create()
    })
    
    // Observer1購読
    _ = observable
        .subscribeOn(bgScheduler)
        .subscribe({ event in
            print("🍎: \(event), Thread: \(Thread.current)")
        })
    
    // Observer2購読
    _ = observable
        .subscribeOn(bgScheduler)
        .subscribe({ event in
            print("🍵: \(event), Thread: \(Thread.current)")
        })
}

concurrentSample()

実行結果↓

🍎: next(0), Thread: <NSThread: 0x600003c46780>{number = 3, name = (null)}
🍵: next(0), Thread: <NSThread: 0x600003c45400>{number = 5, name = (null)}
🍎: next(1), Thread: <NSThread: 0x600003c46780>{number = 3, name = (null)}
🍵: next(1), Thread: <NSThread: 0x600003c45400>{number = 5, name = (null)}
🍵: next(2), Thread: <NSThread: 0x600003c45400>{number = 5, name = (null)}
🍎: next(2), Thread: <NSThread: 0x600003c46780>{number = 3, name = (null)}
🍎: next(3), Thread: <NSThread: 0x600003c46780>{number = 3, name = (null)}
🍎: next(4), Thread: <NSThread: 0x600003c46780>{number = 3, name = (null)}
🍵: next(3), Thread: <NSThread: 0x600003c45400>{number = 5, name = (null)}
🍎: completed, Thread: <NSThread: 0x600003c46780>{number = 3, name = (null)}
🍵: next(4), Thread: <NSThread: 0x600003c45400>{number = 5, name = (null)}
🍵: completed, Thread: <NSThread: 0x600003c45400>{number = 5, name = (null)}

ランダムの秒数でSleepを掛けているため、実行するたびに結果が変わります。

ともあれ、別々のスレッドで並行して処理が走っているのが確認できました。

非同期にHTTP通信してみる

ここまでに紹介したものをいくつか使って、非同期にHTTP通信を行うちょっとそれっぽいコードを書いてみたいと思います。

概要としては、「インターネットからRSSを取得、そのXMLからタイトルを抽出して一覧表示する」というものになります。(RSSはVer2.0を想定しています。ソース中のRSSへのURLは適宜書き換えてください)

では、サンプルコードです↓

/// HTTP通信を行うユーティリティクラス
class HTTP {
    /// GET
    static func get(url: URL) -> Observable<String> {
        
        return Observable<String>.create({ observer in
            
            let request = URLRequest(url: url)
            let task = URLSession.shared.dataTask(with: request) { data, response, error in
                if let error = error {
                    observer.onError(error)
                }
                else {
                    observer.onNext(String(data: data!, encoding: .utf8)!)
                    observer.onCompleted()
                }
            }
            task.resume()
            
            return Disposables.create()
        })
    }
}

/// RSSユーティリティクラス
class RSS {
    /// タイトル一覧取得
    static func getTitles(_ xml: String) -> Observable<String> {
        
        return Observable<String>.create { observer in
            do {
                // 正規表現でタイトル抽出
                let pattern = "<title>([^<]+)</title>"
                let regex = try NSRegularExpression(pattern: pattern, options: [
                    .anchorsMatchLines, .dotMatchesLineSeparators
                ])
                let range = NSRange(location: 0, length: xml.count)
                let results = regex.matches(in: xml, options: [], range: range)
                
                for result in results {
                    let start = xml.index(xml.startIndex, offsetBy: result.range(at: 1).location)
                    let end = xml.index(start, offsetBy: result.range(at: 1).length)
                    let text = String(xml[start..<end])
                    observer.onNext(text)
                }
                
                observer.onCompleted()
            }
            catch {
                observer.onError(error)
            }
            
            return Disposables.create()
        }
    }
}

/// API通信クラス
class API {
    /// RSSのURL定義
    private static let rssURLs = [
        "(RSSのURL 1つ目)",
        "(RSSのURL 2つ目)",
    ]
    
    /// サブスレッド処理用のスケジューラー
    private static let scheduler = SerialDispatchQueueScheduler(internalSerialQueueName: "hogehoge")

    /// RSSのタイトル一覧取得
    static func getRSSTitles() -> Observable<[String]> {
        
        return Observable<String>
            .from(rssURLs)
            .flatMap({ url in
                return HTTP.get(url: URL(string: url)!)
            })
            .flatMap({ xml in
                return RSS.getTitles(xml)
                    .skip(1) // 1個目はRSSのタイトルなので飛ばす
                    .take(3) // 最新3件だけ
            })
            .buffer(timeSpan: .never, count: 0, scheduler: scheduler)
            .observeOn(MainScheduler.instance) // UI処理とかしやすいようにメインスレッドにして返してあげる
            .subscribeOn(scheduler) // サブスレッドで処理
    }
}

let disposeBag = DisposeBag()

func asyncSample() {

    API.getRSSTitles()
        .subscribe(onNext: { element in
            print(element.joined(separator: "\n"))
        })
        .disposed(by: disposeBag)
}

asyncSample()

いかがでしょうか?

実戦で使うにはエラー処理とかクラス分けとかネーミングとかAlamofire使えよとかいろいろ問題山積です。
RSSもXMLをパースせず正規表現でタイトル引っこ抜いてるだけですし。

ただ、このサンプルで「RxSwiftはこんな感じに使っていけるんだ」ということを少しでも掴んでいただけたのなら、それで十分です。

おわりに

思った以上に長くなってしまいました。
でもこれでも紹介しきれておらず、まだまだRxSwiftには便利なOperatorがたくさんあります。

ReactiveXのサイトなどで他にどんなものがあるのか確認できますので、少しずつ覚えていくとさらにコーディングが捗るのではないかと思います。

RxSwiftについては以上となります。
この記事が誰かのお役に立てたなら幸いです。