【Swift】RxSwift入門 ②

はじめに

RxSwift入門の第2回目です。

今回はコードを書くための環境の準備から、簡単なサンプルコードを用いて処理の流れを説明するところまで記載します。

第1回目はこちら

サンプルコードの動作確認の環境は以下の通りです。

  • macOS: Mojave (10.14.6)
  • Xcode: 11.3
  • CocoaPods: 1.4.0
  • Swift: 5
  • RxSwift: 5.0.1

まずは準備

プロジェクト作成

Xcodeを起動して、プロジェクトを作成します。
名前はもちろん任意ですが、ひとまずここでは「LearnRxSwift」とします。

Podfile作成

ターミナルでプロジェクトの中に潜って、CocoaPodsのコマンドを実行。

pod init

作成された Podfile を開いて、以下のようにします。
(RxSwiftとRxCocoaの2行を追加)

# Uncomment the next line to define a global platform for your project
# platform :ios, '9.0'

target 'LearnRxSwift' do
  # Comment the next line if you're not using Swift and don't want to use dynamic frameworks
  use_frameworks!

  # Pods for LearnRxSwift
  pod 'RxSwift', '~> 5'

end

ファイルを保存したらインストールを実行します。

pod install

Playground作成

プロジェクトを一度閉じて、CocoaPodsが作成したワークスペース(ここではLearnRxSwift.xcworkspace)から開き直します。

開けたら、Xcodeのメニュー File > New > Playground… をクリックしてPlaygroundを作成します。(ひとまず Blank でOK)

簡単なサンプルコード

Playgroundに以下のようなコードを書いてみます。

import RxSwift

let prices = [100, 250, 560, 980]
let taxRate = 1.08

Observable
    .from(prices)
    .map({ price in
        Int(Double(price) * taxRate)
    })
    .subscribe({ event in
        print(event)
    })
    .dispose()

これを実行すると、以下のような結果が表示されるかと思います。

next(108)
next(270)
next(604)
next(1058)
completed

pricesで定義された複数の価格に対し、taxRateで定義された税率を掛けていくというだけのサンプルです。
(現在の消費税は10%ですけど、結果的に面白くないのでここでは8%にしています)

んで。
このコードをパッと目にして、何をやっているかわかりますでしょうか?

ReactiveXの謳い文句である宣言的に書かれており、pricesとtaxRate、結果表示などから何となくはわかるかと思いますが、細かく見ていくと「Observableって何?」「subscribe?」「dispose? 要るのこれ?」「どのタイミングで処理が動き出してるの?」「nextやcompletedって?」など、いろいろと疑問が浮かぶのではないでしょうか?

正直、私は最初疑問だらけでした。

あまりに疑問だらけで、「クセが強いんじゃあ〜」「これなら↓の感じで書いたほうがいいじゃん。宣言的だし!」などと思ってRxSwiftの学習を投げ出しかけました。

prices
    .map({
        Int(Double($0) * taxRate)
    })
    .forEach({ value in
        print(value)
    })

そのとおり、こういう処理を書くためならわざわざRxSwiftを使う必要はないと思います。

ただ、これはあくまでRxSwiftを理解するためのコードなので、ひとまず我慢してポイントごとの説明をしていきたいと思います。

Observable

Observable は、RxSwiftにおいては「処理を記述する“起点”」と捉えてください。

RxSwiftはObserverパターンに則ったライブラリなので、何をするにもまずはObserverが観測できるもの、すなわちObservableを立てることから始まります。

「いや、Subjectは?」と思われたそこのアナタ!

ひとまず忘れてください……。
後で出てきます。

from

from はObservableのクラスメソッドの1つで、Observableのインスタンスを生成するメソッドです。

引数として配列を受けるようになっており、ここで渡した配列の各要素(Element)が「Subjectの起こした“動き”」として、1つずつ順番にObserverに発行(Emit)されていくことになります。

fromの他には、1つの要素だけを渡す just や 複数を渡す of 、自分で要素を発行するマニュアルなObservableを作成する create (この例は別途記載します)などがあります。

// 1つの要素
Observable.just("あいうえお")

// 固定数の複数要素
Observable.of("a", "b", "c", "d")

map

map は配列が持つmapと同じで、流れてくる各要素の加工が行えるメソッドです。
(fromやmapなどのメソッドのことを、ReactiveXでは“Operator”と呼んでいます)

ただし、Observableのmapは戻り値として Observable<Result>(このResultはジェネリクスの名前なので、実際には任意の型になります)を取るので、

.map({ price in
    Int(Double(price) * taxRate)
})

を以下のようにすると、

.map({ price in
    Double(price) * taxRate // Intへのキャストを削除
})

mapの戻り値は Observable<Double> となります。

subscribe

subscribe は文字通り「購読」を意味し、Observableが発行する各要素を受け取って何かしらの処理を行う購読者(Observer。Subscriberとも)を引数として渡します。

と同時に、このsubscribeを呼び出すことで from や map で記述した処理が動き出し、Observer(今回のサンプルではクロージャー)に各要素が渡されていくようになります。

イメージとしては、subscribeを呼ぶまでが「装置作り」で、subscribeが「装置を動かす」ことになります。

また、サンプルでは event を唯一の引数として受け取るクロージャーを渡していますが、この event は列挙型(enum)であり、RxSwiftでは以下のように定義されています。

public enum Event<Element> {
    /// Next element is produced.
    case next(Element)

    /// Sequence terminated with an error.
    case error(Swift.Error)

    /// Sequence completed successfully.
    case completed
}

このコメントに書いてある通りですが、Observableから1つの要素が発行され、Observerに渡される時のイベントが next 、すべての要素を渡し終えて処理が完了した時のイベントが completed 、そして途中で何かしらのエラーが発生した場合のイベントが error となっています。

サンプルコードを実行した時に「next(108)」や「completed」などと表示されていたのは、クロージャーに渡されていたのがこのEventのインスタンスだったからです。

ちなみに、クロージャーは↓のように、イベントごとに個別で書くことも可能です。

.subscribe(onNext: { element in
    print(element)
}, onError: { error in
    print(error)
}, onCompleted: {
    print("completed")
}, onDisposed: {
    print("disposed")
})

どのイベントも引数を省略することもできるので、onNextだけとかonCompletedだけなど、特定のイベントだけ必要な場合はこちらが便利です。
(個別の場合、onDiposedのイベントまで拾えるのでいいですね)

dispose

dispose はリソース解放のためのメソッドです。

subscribeによって動作している処理をキャンセルし、Observableが内部で保持しているObserverを解放してくれます。

ただし、今回のサンプルのようにsubscribeと同時にすぐに処理が行われ、そのまま完了してしまうケースでは、disposeを呼ばずともリソースは解放されます。
また、同期的に処理が行われるため、disposeを呼んでも途中キャンセルをすることもできません。

更に言うと、多くのケースでdisposeを呼ぶことは悪手とされており、代わりとしてDisposeBagやtakeUntilメソッド(別途説明)を使用することが主となります。

disposeが必要とされるのは、例えばボタンのタップ動作の監視など、不定期にイベントが発生し、Subject側からは終了を決められないケースです。
こうしたケースで任意のタイミングで購読処理の終了と解放を行うために、disposeが使われます。

そのため、今回のサンプルでは厳密にはdiposeは書かなくても良いのですが、しかしこれは公式↓でも言及しているように、リソース解放を確実とするため、ケースによらず常に書いていくことが推奨されています。

Using dispose bags or takeUntil operator is a robust way of making sure resources are cleaned up. We recommend using them in production even if the sequences will terminate in finite time.

RxSwift: ReactiveX for Swift

ここまでのまとめ

いかがでしょうか?

ここまでで、基本となる以下の流れが理解できたのではないかと思います。

  1. 「Observable」を書く
  2. Observableのインスタンスを作る(from、just、ofなど)
  3. mapなどで要素の加工を行う
  4. subscribeで購読(処理)を開始する
  5. Observerで要素を受けて何かの処理を行う
  6. disposeで後片付けする

次はもう1つ、別なサンプルを書いてみたいと思います。

別パターンのサンプルコード

次に以下のようなコードを書いてみます。

import RxSwift

func test() {
    
    let disposeBag = DisposeBag()
    
    // Subject作成
    let subject = PublishSubject<String>()
    
    subject.onNext("🍎")
    
    // Observer1購読
    subject
        .subscribe(onNext: { element in
            print("Observer: 1 - Event: \(element)")
        }, onCompleted: {
            print("Observer: 1 - Event: completed")
        }, onDisposed: {
            print("Observer: 1 - Event: disposed")
        })
        .disposed(by: disposeBag)
    
    subject.onNext("🍣")
    
    // Observer2購読
    subject
        .map({ element in
            "\(element) is nice!"
        })
        .subscribe(onNext: { element in
            print("Observer: 2 - Event: \(element)")
        }, onCompleted: {
            print("Observer: 2 - Event: completed")
        }, onDisposed: {
            print("Observer: 2 - Event: disposed")
        })
        .disposed(by: disposeBag)

    subject.onNext("🍔")
    
    // Observer3購読
    subject
        .subscribe(onNext: { element in
            print("Observer: 3 - Event: \(element)")
        }, onCompleted: {
            print("Observer: 3 - Event: completed")
        }, onDisposed: {
            print("Observer: 3 - Event: disposed")
        })
        .disposed(by: disposeBag)
    
    subject.onNext("🍜")
    
    subject.onCompleted()
}

test()

実行してみます。

Observer: 1 - Event: 🍣
Observer: 1 - Event: 🍔
Observer: 2 - Event: 🍔 is nice!
Observer: 1 - Event: 🍜
Observer: 2 - Event: 🍜 is nice!
Observer: 3 - Event: 🍜
Observer: 1 - Event: completed
Observer: 1 - Event: disposed
Observer: 2 - Event: completed
Observer: 2 - Event: disposed
Observer: 3 - Event: completed
Observer: 3 - Event: disposed

↑こんな結果が得られたかと思います。

Subjectが登場し、動作的にもよりObserverパターンっぽい感じがします。
また、DisposeBagを使って少し実用的な形にもなっています。

以下、コードのポイントを説明します。

PublishSubject

PublishSubject は複数あるSubjectの1つです。

onNextメソッドを使うことで、購読しているObserverたちに要素を発行することができます。

最初に「🍎」を発行していますが、これはまだObserverが0なので意味をなしません。
その後、1つずつObserverをsubscribeさせていくことで、「🍣」、「🍔」、「🍜」と発行されたものを受け取る人が増えていくことが確認できるかと思います。

また、SubjectはObservableを継承していますので、先程のサンプルのように、mapを挟んで加工を加えた形で購読を行うことも可能です。

その他のSubject

ちなみに、PublishSubjectの他に、以下のようなSubjectがあります。

BehaviorSubject
PublishSubjectの挙動に加え、Observerが新規に加わった時、一番最後に発行された要素をそのObserverに対して発行する挙動を取ります。

上記サンプルで言うと、Observer1が加えられた時に、Observer1は「🍎」を受け取ります。
また、Observer2が加えられた時は、Observer2は「🍣」を受け取ります。
そして、Observer3は「🍔」。

この挙動から、BehaviorSubjectは最低1つは発行済みの要素を持っていないといけないため、インスタンス生成時に1つの要素を引数として渡す必要があります。

ReplaySubject
PublishSubjectの挙動に加え、Observerが新規に加わった時、最後からbufferSize(インスタンス生成時の引数)で指定された数までの要素をそのObserverに対して発行する挙動を取ります。

例えばbufferSizeに 2 を指定した場合、上記サンプルでは、Observer1が加えられた時にObserver1は「🍎」を受け取ります。
Observer2が加えられた時は、Observer2は「🍎」と「🍣」を受け取ります。
Observer3は「🍣」と「🍔」です。

AsyncSubject
これは少し変わっていて、最後にonNextで発行された要素を、onCompletedが呼ばれた後に1つだけ発行する挙動を持つSubjectです。

上記サンプルで言うと、各ObserverともonCompletedの後で「🍜」だけを受け取ります。

他のSubjectはonNextと同時に要素がObserverへ届けられ、そしてonCompletedが呼ばれた後では、いくらonNextを呼んでもそれ以降の発行はされません。

逆にこのAsyncSubjectは、onCompletedを呼ぶ前にいくらonNextを呼んでも要素は発行されず、onCompletedが呼ばれた後で初めて(最後の要素のみ)発行が行われます。

しかし、onCompletedが呼ばれた後で新たなObserverを追加した場合、そのObserverにも発行がされるため、例えばAPI通信などの非同期処理を監視するような処理(通信が終わっていなければ終わりを待つ、すでに終わっているなら即時結果をもらう)に使うことができます。

ちなみに、onCompletedが呼ばれなければ一切の発行はされません。

以下に各Subjectの生成例を載せますので、サンプルコードのsubjectを置き換えて挙動の変化を試してみてください。

// BehaviorSubject
let subject = BehaviorSubject<String>(value: "☕")

// ReplaySubject
let subject = ReplaySubject<String>.create(bufferSize: 2)

// AsyncSubject
let subject = AsyncSubject<String>()

DisposeBag

subscribeをした後、最初のサンプルのようにすぐ dispose() を呼んでしまうと、そこでそのObserverは破棄されてしまいます。
今回のサンプルのように不定期に発行が行われる場合、それだと困りますね。

かと言って、それぞれのObserverを配列などで保持しておき、必要な時にdisposeしていくのも面倒でやりたくありません。

そんな時に使うのがこの DisposeBag です。

DisposeBagのインスタンスを作成しておき、subscribeの後のdisposed(by:)メソッドでそれを渡しておけば、DisposeBagのインスタンスが破棄されるタイミングで一緒にそのObserverも破棄してくれます。

上記サンプルで言うと、test関数の中で宣言していますので、test関数を抜けたタイミングでdisposeBagが破棄され、と同時に3つのObserverも破棄されます。

ここまでのまとめ

いかがでしょうか?
最初のサンプルが理解できていれば、これもそんなに難しくはないかと思います。

ここで大事なポイントとなるのは、最初のサンプルはsubscribeと同時に発行が始まっていますが、今回のサンプルではsubscribeはあくまで購読の設定をしただけで、実際に発行を受けるのはsubjectがonNext(AsyncSubjectならonCompleted)を呼んだ時、ということです。

RxSwiftを使う場合、ほとんどのケースで2つのサンプルの内のどちらかのパターンになると思いますので、これでRxSwiftの約半分を理解したも同然と言えるのではないでしょうか。

“Hot” and “Cold”

最後にひとつ。

これは用語の問題なのですが、ReactiveXの世界では最初のサンプルのように、subscribeするとすぐに要素の発行が始まるもの、逆に言うとsubscribeされるまでは何の発行も行われないObservableのことを「Cold なObservable」と呼んでいます。

一方、2つ目のサンプルのように、誰かがsubscribeする前から動いて(発行が始まって)おり、Observerは途中からそれに参加して発行を受けるようなObservableのことを「Hot なObservable」と呼んでいます。

サーバーのコールドスタンバイ、ホットスタンバイと同じイメージですね。

HotやColdといった言葉はコードを書く上では不要なのですが、ドキュメントを読むと出てくる言葉なので覚えておくと役に立つかもしれません。

おわりに

第3回に続きます。

次回はObservableのいろいろなメソッドの紹介やスレッドの制御など、細かい部分に着目して書きたいと思います。