Reactive Programming - RxJava


Reactive Programming là gì?

https://github.com/amitshekhariitbhu/RxJava2-Android-Samples 

https://medium.com/capital-one-tech/rxjava2-android-mvvm-lifecycle-app-structure-with-retrofit-2-cf903849f49e

Reactive Programming về cơ bản là dựa trên sự kiện lập trình không đồng bộ. Mọi thứ bạn thấy là một luồng dữ liệu không đồng bộ (asynchronous data stream), cái mà có thể quan sát được và một hành động sẽ được thực hiện khi nó phát ra các giá trị. Bạn có thể tạo ra luồng dữ liệu này từ bất kỳ thứ gì : thay đổi biến, sự kiện click, http call, data storage, errors và có thể là không thứ gì. Khi nói đến đến bất đồng bộ có nghĩa là mọi module code thì mỗi module chạy trên từng thread riêng của nó, và do đó cùng một lúc có nhiều khối mã được thực thi. Một lợi thế của bất đồng bộ là khi mọi nhiệm vụ chạy trên thread riêng của nó, tất cả các nhiệm vụ có thể bắt đầu đồng thời và lượng thời gian để hoàn thành nhiệm vụ là nhanh hơn khi ta thực hiện tuần tự. Khi nói đến các ứng dụng cho mobile, khi các tác vụ chạy trên background thread, bạn có thể đạt được trải nghiệm người dùng liền mạch mà không block main thread.
  • Lấy một ví dụ đơn giản là x = y + z, trong đó tổng của y và z được gán cho x. Trong reactive programming, khi giá trị y thay đổi thì x cũng tự động thay đổi theo mà không cần phải thực hiện lại câu lệnh x = y + z. Điều này có thể nhận được khi ta lắng nghe, quan sát giá trị của y và z.
  • Một mảng có thể là một luồng dữ liệu và mỗi hành động có thể được thực hiện khi mỗi phần tử của mảng đó được phát ra. Có thể bạn muốn lọc lấy các số chẵn và bỏ qua các số lẻ chẳng hạn. Điều này có thể thực hiện khi bạn thực hiện vòng lặp thông thường và các câu lệnh có điều kiện. Nhưng trong reactive programming bạn có thể đạt được điều này theo một cách tiếp cận khác. Khi bạn bắt đầu một ứng dụng bằng cách áp dụng reactive programming, cách mà các bạn chọn architecture và viết code hoàn toàn khác so với trước đây. Ứng dụng sẽ vô cùng mạnh mẽ khi bạn dùng Clean Architecture, MVP, MVVM, …
RxJava là một trong những Reactive Extension, dành cho ngôn ngữ Java. Về cơ bản nó là một thư viện follow theo Observer Pattern. Bạn có thể tạo ra bất kì luồng dữ liệu không đồng bộ trên bất kỳ thread nào, chuyển đổi dữ liệu và dữ liệu này được sử dụng bởi Observer trên bất kỳ thread nào. Thư viện này cung cấp nhiều toán tử tuyệt vời như Map, Combine, Merge, Filter, .... có thể áp dụng cho một luồng dữ liệu.

RxAndroid là một loại Rx dành cho nền tảng Android. Nó được hình thành từ RxJava với vài lớp được thêm vào. Cụ thể hơn, Schedulers được giới thiệu trong RxAndroid (AndroidSchedulers.mainThread()) đóng vai trò quan trọng trong việc hỗ trợ đa luồng trong các ứng dụng Android. Schedulers về cơ bản quyết định. Có rất nhiều loại Schedulers có sẵn nhưng Schedulers.io() và AndroidSchedulers.mainThread() là được dùng nhiều nhất

Phần cơ bản nhất của Reactive bao gồm các Observable và các Subscriber (1). 
Observables: là nguồn dữ liệu
Subscribers (or observers) : lắng nghe từ Observables
Có một ví dụ về cách mà các item được phát ra. Một Observable sẽ phát ra một số các item (bao gồm cả không có item). Sau đó nó sẽ kết thúc việc phát itemt do đã hoàn thành hay là có lỗi xảy ra. Với mỗi Subscrbver mà Observable có thì nó sẽ gọi đến hàm Subscriber.onNext() một số lần nào đó, cùng với đó là một trong hai phương thức Subscriber.onCompleted() hoặc Subscriber.onError().

RxJava gồm một bộ sưu tập khổng lồ các toán tử chủ yếu nhằm giúp chúng ta sửa đổi, lọc, hợp nhất và chuyển đổi dữ liệu được phát ra bởi Observables. Chúng ta có thể tìm thấy danh sách đầy đủ các toán tử trên trang chủ của nó (Xem Thêm…). Nhưng do nó quá đồ sộ nên chúng ta sẽ mất rất nhiều thời gian để có thể hiểu hết được các toán tử này.
Nói 1 cách đơn giản, RxJava là 1 thư viện “tiện lợi” cho việc xư lý bất đồng bộ, nhưng mà ko hẳn là như vậy. Nếu tìm hiểu sâu hơn 1 chút, các bạn sẽ có cảm giác thư viện này mang màu sắc “đổi mới, cách tân” chứ chỉ ko dừng ở khía cạnh “tiện lợi”.
Trong bài viết này, chúng ta sẽ cùng tìm hiểu một số toán tử hữu ích thường hay được sử dụng (thành thật thì bản thân mình cũng là một beginnger về RxJava nên có thiếu sót các bạn đựng ngại mà comment góp ý ạ).

1. From

Observable.from() nhận một tập các item và phát ra mỗi lần một item:

Observable.from("url1", "url2", "url3")
        .subscribe({ url -> System.out.println(url) })
Chúng ta có thể sử dụng trong trường hợp cụ thể :
query("db")
        .subscribe({ urls ->
            Observable.from(urls)
                    .subscribe({ url -> System.out.println(url) })
        })
Chúng ta đã xử lý được vấn đề vòng lặp for-each bằng cách sử dụng Observable.from(), nhưng mà code lại trở nên phức tạp hơn vì có tới hai subscription. Code trở nên xấu và khó thay đổi hơn, nó cũng phá vỡ một vài tính năng quan trọng của RxJava. Và chúng ta sẽ sử dụng 1 cách tốt hơn dưới đây.

2. Map

Map sẽ chuyển đổi các item được phát ra bởi 1 Observable bằng cách áp dụng mỗi hàm cho mỗi item, dễ hiểu hơn thì nó dùng để chuyển đối 1 item thành 1 item khác

Observable.just(1, 2, 3)
        .map({ i -> 10 * i })
        .subscribe({ i -> Log.v("Result", "" + i) })Kết quả: 
Result: 10, 20, 30

3. FlatMap

Observable.flatMap() biến đổi danh sách những items từ Observable vào Observables khác.

Dưới đây là cách giải quyết vấn đề:
query("db")
        .flatMap(object : Func1<List<String>, Observable<String>>() {
            fun call(urls: List<String>): Observable<String> {
                return Observable.from(urls)
            }
        })
        .subscribe({ url -> System.out.println(url) })
Chúng ta cũng có thể rút gọn gắn code bằng cách sử dụng lambda.
query("db")
        .flatMap({ urls -> Observable.from(urls) })
        .subscribe({ url -> System.out.println(url) })
Observable.flatMap() ở đây trả về một Subscriber và chúng ta nhìn thấy nó không trả về 1 List<String> mà nó nhận 1 chuỗi các String độc lập bởi Observable.from().
Điểm khác biệt chính giữa Map và FlatMap là FlatMap bản thân nó sẽ trả về một Observable. Nó được dùng để map trên các hoạt động bất đồng bộ.

4. Filter, take

Tiếp đến sẽ là Filter trả ra những items thoả mãn với điều kiện kiểm tra.

Ví dụ về filter dưới đây sẽ lọc ra những items % 2 == 0.
Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .filter({ i -> i % 2 == 0 })
        .subscribe({ i -> Log.v("Result", "Numbers = [ $i ]") })
Ví dụ dưới sẽ kết hợp filtertake.
Observable.just(1,2,3,4,5,6,7,8,9,10)
        .filter({ i -> i %2 == 0 })
        .take(2)
        .subscribe({ title -> System.out.println(i) })
Take sẽ định nghĩa số items được trả ra. Nếu số lượng items nhỏ hơn số lượng đã định nghĩa thì nó sẽ kết thúc nhanh hơn.

5. GroupBy

Chia các thành của Observable thành tập hợp Observables mà mỗi 1 loại trả ra là 1 tập hợp các items của Observable.

Observable.just(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
        .groupBy({ i -> i % 2 == 0 })
        .subscribe({ grouped ->
            grouped.toList()
                    .subscribe({ integers -> Log.v("Result", integers + "Even(" + grouped.getKey() + ")") })
        })Kết quả:
Result: [1, 3, 5, 7, 9]Even(false)
Result: [2, 4, 6, 8, 10]Even(true)

6. First

Trả ra item đầu tiên bởi Observable



Observable.just(1, 2, 3, 4, 5, 6)
        .first().subscribe({ i -> Log.v("Result", "" + i) })Kết quả: 
Result: 1

7. Last

Trả ra item cuối cùng bởi Observable.

Observable.just(1, 2, 3, 4, 5, 6)
        .last().subscribe({ i -> Log.v("Result", "" + i) })Kết quả: 
Result: 6

8. Reduce

Áp dụng chức năng cho tường item trả ra theo tuần tự cho đến giá trị cuối cùng( như ví dụ dưới đây mình viết là cộng chuỗi).

Observable.just("abc", "123", "321", "1345")
        .reduce { t1, t2 ->
            return@reduce t1 + t2
        }
        .subscribe(getObserver())private fun getObserver(): MaybeObserver<String> {
    return object : MaybeObserver<String> {
        override fun onSuccess(t: String) {
            Log.d("Result", " onSuccess : value : $t")
        }
        override fun onSubscribe(d: Disposable) {
            Log.d("Result", " onSubscribe : " + d.isDisposed)
        }
        override fun onError(e: Throwable) {
            Log.d("Result", " onError : " + e.message)
        }

        override fun onComplete() {
            Log.d("Result", " onComplete")
        }
    }
}Kết quả: Result:  onSuccess : value : abc1233211345

9. Concat

Nó sẽ ghép 2 hay nhiều Observable lại với nhau rồi thực hiện tuần tự từ Observable đầu tiên đến hết Observable cuối cùng và trả về chung 1 kết quả trong 1 danh sách mảng.

val aStrings = arrayOf("A1", "A2", "A3", "A4")
val bStrings = arrayOf("B1", "B2", "B3")

val aObservable = Observable.fromArray(*aStrings)
val bObservable = Observable.fromArray(*bStrings)

Observable.concat(aObservable, bObservable)
        .subscribe(getObserver())private fun getObserver(): Observer<String> {
    return object : Observer<String> {
        override fun onSubscribe(d: Disposable) {
            Log.d("Result", " onSubscribe : " + d.isDisposed)
        }

        override fun onNext(value: String) {
            Log.d("Result", " onNext : value : $value")
        }

        override fun onError(e: Throwable) {
            Log.d("Result", " onError : " + e.message)
        }

        override fun onComplete() {
            Log.d("Result", " onComplete")
        }
    }
}Kết quả: 
Result:  onNext : value : A1
Result:  onNext : value : A2
Result:  onNext : value : A3
Result:  onNext : value : A4
Result:  onNext : value : B1
Result:  onNext : value : B2
Result:  onNext : value : B3
Result:  onComplete
Như các bạn thấy ở aObservable và bObservable kết quả trả về của nó là dữ liệu từ 2 nguồn khác nhau, nhưng kết quả chúng ta nhận được là 1 danh sách dữ liệu được hợp từ 2 nguồn khác nhau.

10. Merge

Sử dụng toán tử merge để hợp nhất Observable.
Hàm merge trong RxJava giúp chúng ta thực hiện đồng thời nhiều Observable và trả về riêng lẻ các kết quả của Observable sau khi thực hiện xong Observable đó.

class UserTest(var name: String)Observable
        .merge(getAObservable(), getBObservable())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeWith(object : Observer<UserTest> {
            override fun onSubscribe(d: Disposable) {}

            override fun onNext(user: UserTest) {
                Log.e("Result", user.name)
            }

            override fun onError(e: Throwable) {}

            override fun onComplete() {}
        })private fun getAObservable(): Observable<UserTest> {
    val names = arrayOf("A1", "A2", "A3")
    val users = ArrayList<UserTest>()
    for (name in names) {
        users.add(UserTest(name))
    }
    return Observable
            .create(ObservableOnSubscribe<UserTest> { emitter ->
                for (user in users) {
                    if (!emitter.isDisposed) {
                        Thread.sleep(1000)
                        emitter.onNext(user)
                    }
                }
                if (!emitter.isDisposed) {
                    emitter.onComplete()
                }
            }).subscribeOn(Schedulers.io())
}private fun getBObservable(): Observable<UserTest> {
    val names = arrayOf("B1", "B2", "B3", "B4")
    val users = ArrayList<UserTest>()
    for (name in names) {
        users.add(UserTest(name))
    }
    return Observable
            .create(ObservableOnSubscribe<UserTest> { emitter ->
                for (user in users) {
                    if (!emitter.isDisposed) {
                        Thread.sleep(500)
                        emitter.onNext(user)
                    }
                }
                if (!emitter.isDisposed) {
                    emitter.onComplete()
                }
            }).subscribeOn(Schedulers.io())
}Kết quả: 
Result:  B1
Result:  A1
Result:  B2
Result:  B3
Result:  A2
Result:  B4
Result:  A3
Ở đây trong hàm onNext() chúng ta thấy RxJava trả về từng phần tử sau khi đã thực hiện xong các Observable.
Kết quả Merge trả ra tổng 7 items và không theo thứ tự.”B1", “A1”, “B2”, “B3”, “A2”, “B4”, “A3” hoặc cũng có thể là 1 kết quả nào đó.

11. Zip

Zip trong RxJava giúp bạn thực hiện đồng thời nhiều Observable và gộp các kết quả của các Observable lại trong một kết quả trả về.

val indexes = Arrays.asList(0, 1, 2, 3, 4)
val letters = Arrays.asList("a", "b", "c", "d", "e")
val indexesObservable = Observable.fromIterable<Int>(indexes)
val lettersObservable = Observable.fromIterable<String>(letters)
Observable.zip(indexesObservable, lettersObservable, mergeEmittedItems())
        .subscribe(object : Observer<String> {
            override fun onSubscribe(d: Disposable) {}
            override fun onNext(value: String) {
                Log.e("Result", value)
            }
            override fun onError(e: Throwable) {}
            override fun onComplete() {}
        })private fun mergeEmittedItems(): BiFunction<Int, String, String> {
    return object : BiFunction<Int, String, String> {
        override fun apply(t1: Int, t2: String): String {
            return "[$t1] $t2"
        }
    }
}
Kết quả: 
Result:  [0] a
Result:  [1] b
Result:  [2] c
Result:  [3] d
Result:  [4] e

12. Debounce

Observable chỉ phát ra tín hiệu sau khi đã qua 1 khoảng thời gian được xác định và trong khỏang thời gian đó nó không thể phát ra tín hiệu nào.
RxTextView.textChanges(edtTest)
        .debounce(2000, TimeUnit.MILLISECONDS)
        .map { charSequence ->
            charSequence.toString()
        }
        .subscribe { string ->
            //todo something
        }

13. Throttle

Throttle giới hạn số lần gọi hàm trong một khoảng thời gian. Khi một hàm dùng throttle, throttle sẽ gọi hàm này nhiều nhất 1 lần mỗi X mili giây với X là khoảng thời gian mà ta cài đặt.
RxTextView.textChanges(edtTest)
        .throttleFirst(2000, TimeUnit.MILLISECONDS)
        .map { charSequence ->
            charSequence.toString()
        }
        .subscribe { string ->
            //todo something
        }


14. Count

Đếm số lượng items trả ra bởi Observable và chỉ trả ra giá trị đếm.
Observable.just(1, 2, 3, 4, 5,7)
        .count()
        .subscribe(object : SingleObserver<Long> {
            override fun onSuccess(t: Long) {
                Log.d("Result", "Count: $t")
            }
            override fun onSubscribe(d: Disposable) {}
            override fun onError(e: Throwable) {}
        })Kết quả: 
Result:  Count: 6


15. Max

Trả ra item Observable có giá trị lớn nhất.
Integer[] numbers = {2, 30, 22, 5, 60, 1}

Observable<Integer> observable = Observable.from(numbers)

MathObservable
        .max(observable)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onNext(Integer integer) {
                Log.d("Result", "Max value: " + integer)
            }
        });Kết quả: 
Result:  Max value: 60

16. Min

Trả ra item Observable có giá trị nhỏ nhất.
Integer[] numbers = {2, 30, 22, 5, 60, 1};

Observable<Integer> observable = Observable.from(numbers);

MathObservable
        .min(observable)
        .subscribe(new Subscriber<Integer>() {
            @Override
            public void onCompleted() {}
            @Override
            public void onError(Throwable e) {}
            @Override
            public void onNext(Integer integer) {
                Log.d("Result", "Min value: " + integer);
            }
        });Kết quả: 
Result:  Min value: 1







Comments