Introduction à RxJava

Introduction

Une des grandes problématiques des développeurs Android est le traitement des taches longues, dites asynchrones. Les petites mains de Google nous avaient pour cela fourni des AsyncTask, qui ont un comportement très limité et une utilisation très austère. En 2015 ce mot est à bannir du dictionnaire “Androidophone” 😀

Oubliez la définitivement, passez à RxJava !

Asynchrone

La plupart des tâches “lourdes” sous Android sont à effectuer dans un Thread différent de l’affichage graphique. Cette précaution sert à ne pas ralentir l’affichage d’une vue, ou (pire encore) à ne pas bloquer l’utilisateur sur un écran.

Parmi ces tâches nous avons principalement des appels réseau, de la lecture de fichiers, ou encore des algorithmes assez lourds comme un tri, une recherche, etc.

Si vous avez l’habitude d’utiliser Retrofit ou toute autre librairie, vous cachez vos traitements derrière des Listener ou Callbacks :

objet.methodeLongue(parametre1, parametre2, new Callback() {

     public void onMethodeLongueFinieReussie(int resultat) {
     
     }

     public void onMethodeLongueFinieErreur(Throwable erreur) {
     
     }
});

Cette méthode fonctionne très bien, le seul reproche qu’on peut lui faire est l’enchaînement des appels, qui va s’avérer un peu lourd.
En effet, si au retour du callback, nous devons réutiliser la valeur reçue en paramètre d’une nouvelle action longue :

metier.methodeLongue(parametre1, new Callback() {

     public void onMethodeLongueFinieReussie(String resultat1) {

     	metier.methodeLongue2(resultat1, parametre2, new Callback() {

     		public void onMethodeLongueFinieReussie(String resultat2) {

     			metier.methodeLongue3(resultat2, new Callback() {

     				public void onMethodeLongueFinieReussie(String resultatFinal) {
     	                             //action finale
     				}

     				public void onMethodeLongueFinieErreur(int resultat) {
     
     				}
			});
     		}

     		public void onMethodeLongueFinieErreur(int resultat){
     
     		}
	});

     }

     public void onMethodeLongueFinieErreur(int resultat){
     
     }
});

Le code devient assez vite compliqué à reprendre… Rx rend l’enchainement de ces taches beaucoup plus faciles à comprendre !

Reactive Programming / Observer Pattern

RxJava est basé sur le patron de conception Observable, semblable à nos CallBack, mais permettant de définir ici deux objets :

  • l’opération longue, nommé Observable
  • le notifié, nommé Subscriber

Opérations longues / Observable

Un Observable peux être compris comme un Runnable, il va contenir une méthode qui va être executé dans un Thread différent, nous pourrons par exemple dans celui-ci gérer un appel réseau.

Par exemple :


OperationLongue operationLongue = OperationLongue.nouvelle(new Observable.OnSubscribe<Object>() {
            
            @Override
            public void execution(Notifieur notifieur) {
                //ce code est executé dans un thread
               
                //je peux par exemple faire un appel reseau bloquant
                User user = webservice.getUser("florent37");

                //une fois l'objet récupéré, il faut l'envoyer au notifieur,
                notifieur.nouvelleValeur(user);

                //puis dire au notifieur que nous avons finit
                notifieur.finit();
            }

        });

Ce code s’écrit en RxJava de la façon suivante

Observable<User> userObservable = Observable.create(new Observable.OnSubscribe<User>() {
            @Override
            public void call(Subscriber<? super Object> subscriber) {
                
                //je peux par exemple faire un appel reseau bloquant
                User user = webservice.getUser("florent37");

                //une fois l'objet récupéré, il faut l'envoyer au notifieur,
                subscriber.onNext(user);

                //puis dire au notifieur que nous avons finit
                subscriber.onCompleted();
            }
        });

Ecouter nos opérations longues / Subscriber

Une fois nos opérations longues terminées, nous souhaitons récupérer le résultat, pour cela nous utiliserons des Subscriber

operationLongue.vaNotifier(new Notifié<User>(){
     public void nouvelleValeur(User valeur){
         //appelé à chaque fois qu'on reçoit un objet User
     }

     public void operationLongueFinie(){
         //appelé lorsque l'observable a finit d'envoyer l'ensemble des objets User
     }

     public void erreur(Throwable error){
     }
});

Ce code va s’écrire en RxJava de la façon suivante

userObservable.subscribe(new Subscriber<User>(){
     public void onNext(Object result){
         //appelé à chaque fois que l'observable reçoit un objet User
     }

     public void onCompleted(){
         //appelé lorsque l'observable a finit d'envoyer l'ensemble des objets User
     }

     public void onError(Throwable error){
     }
});

Dans beaucoup de cas, nous aurons envie d’être notifié qu’une seule fois, par exemple lors d’un retour de webservice, nous ne recevrons qu’une seule valeur, dans ce cas nous pouvons aussi utiliser l’objet Action

userObservable.subscribe(new Action1<User>(){
     public void call(User result){
         //appelé après fois que l'observable ait envoyé un objet User
     }
});

De cette façon, plus besoin de regarder si le onComplete() est appelé.

RxJava

Ce patron de conception est implémenté en java dans la bibliothèque RxJava

Une version spécifique pour Android est disponible, c’est celle-ci que nous utiliserons par la suite
https://github.com/ReactiveX/RxAndroid

Importer RX

Rien de plus simple, comme toute librairie elle s’importe via gradle

compile 'io.reactivex:rxandroid:1.2.1'
compile 'io.reactivex:rxjava:1.1.6'

Schedulers

En utilisant RxJava / RxAndroid, il est possible de définir sur quel Thread s’exécutera notre opération longue, pour cela il suffit d’appeller la méthode .subscribeOn avec un Scheduler, par exemple avec Schedulers.newThread().

Le subscriber (donc celui qui va recevoir le résultat de notre opération longue) peux aussi être appellé dans un Thread choisi, sur Android ce sera souvent le mainThread. Dans ce cas nous appellerons .observeOn avec AndroidSchedulers.mainThread()

Observable.create(new Observable.OnSubscribe<String>() {
                        @Override
                        public void call(Subscriber<? super Object> subscriber) {
                               //code appellé dans un newThread
                               subscriber.onNext("valeur");
                               subscriber.onComplete();
                        }
                 })

          .subscribeOn(Schedulers.newThread()) //Schedulers dans lequel le code de l'observable est éxécuté
          .observeOn(AndroidSchedulers.mainThread()) //Schedulers dans lequel le code du subscriber est éxécuté
          
          .subscribe(new Action1<String>() {
                         public void call(String result){
                               //code appellé dans le mainTread
                         }
          });

FlatMap – Exécuter des opérations l’une après l’autre

Revenons à notre enchainement de callback, ce que nous aurions voulu est une suite d’action comme suivant :

metier.appelAsynchrone(parametre1)
                .success(new Callback<Object>() {
                    public onMethodeLongueFinieReussie(Object resultat1){
                        //on utilise cet objet pour un deuxième appel
			return metier.appelAsynchrone2(resultat1, parametre2);
                    }
                })
                .success(new Callback<Object>() {
                    public onMethodeLongueFinieReussie(Object resultat2){
                            //on utilise cet objet pour un dernier appel
                    	return metier.appelAsynchrone3(resultat2);
                    }
                })
                .success(new Callback<Object>(){
                    public onMethodeLongueFinieReussie(Object resultatFinal){
                    	//action finale
                    }
                });

Pourra s’écrire OperationLongue / Notifié

metier.appelAsynchrone(parametre1)
                .puisFaire(new NouvelleAction(){

                    @Override
                    public OperationLongue faire(Resultat1 resultat1) {
                        //on utilise cet objet pour un deuxième appel
                        return metier.appelAsynchrone2(resultat1, parametre2)
                    }

                })
                .puisFaire(new NouvelleAction(){

                    @Override
                    public OperationLongue faire(Resultat2 resultat2) {
                        return metier.appelAsynchrone3(resultat)
                    }

                })
                .vaNotifier(new Notifié<Object>() {

                    @Override
                    public void nouvelleValeur(Object resultatFinal) {
                        //action finale
                    }

                });

Ce qui donne, traduit en RxJava

metier.appelAsynchrone(parametre1)
                
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                
                //j'attend que appelAsynchrone soit terminé puis je travaillerai avec son résultat

                .flatMap(new Func1<Resultat1, Observable<Resultat2>>() {
 
                    @Override
                    public Observable<Resultat2> call(Resultat1 resultat1) {
                        //on utilise cet objet pour un deuxième appel
                        return metier.appelAsynchrone2(resultat1, parametre2)
                    }
 
                })

                //j'attend que appelAsynchrone2 soit terminé puis je travaillerai avec son résultat

                .flatMap(new Func1<Resultat2, Observable<Resultat3>>() {
 
                    @Override
                    public Observable<Resultat3> call(Resultat2 resultat2) {
                        return metier.appelAsynchrone3(resultat2)
                    }
 
                })

                //j'attend que appelAsynchrone3 soit terminé puis je travaillerai avec son résultat

                .subscribe(new Action1<Resultat3>() {
 
                    @Override
                    public void call(Resultat3 resultatFinal) {
                        //action finale
                    }
 
                });


flatMap permet d’effectuer une action en utilisant un valeur fournie par le onNext d’un observable. Avec cette opération, je peux attendre que l’appelAsynchrone(parametre1) soit finit pour utiliser le resultat1 pour créer un observable sur appelSynchrone2.

Les flatMap s’exécutent sur un Observable et doivent retourner un Observable. Ce dernier sera utilisé pour la suite des opérations.

Le résultat de notre enchainement d’opération sera accessible dans le .subscribe() final

Zip – Attendre que plusieurs opérations soient finies

Autre opération fastidieuse en Android, lancer plusieurs opérations asynchrones, par exemple pour aller chercher plusieurs configurations avant le lancement d’un écran, puis attendre que toutes les données soient arrivées avant d’éxécuter notre méthode.

Cette opération est gratuite en Rx !

Il nous suffit de créer un Observable pour chacune de nos actions longue, puis d’utiliser la méthode Zip sur ceux ci.

Zip va exécuter en parallèle nos observables puis finalement vous donner le résultat de ceux-ci.

String username = "florent37";

Observable<User> userObservable = webservice.getUser(username);
Observable<Configuration> configObservable = webservice.getConfiguration(username);
Observable<List<Comment>> commentairesObservable = webservice.getComments(username);

Observable<UserWithComments> userWithCommentsObservable = Observable.zip(
                        userObservable,
                        configObservable,
                        commentairesObservable,
                        new Func3<User, Configuration, List<Comment>>() {
                                @Override
                                public UserWithComments call(User user, Configuration configuration, List<Comment> comments) {
                                      //ce code est appellé une fois les 3 observables terminés
                                      user.setComments(comments);
                                      user.setConfiguration(configuration);
                                }
                        }
);     

userWithCommentsObservable.subscribe(new Action1<UserWithComments>() {
 
                    @Override
                    public void call(UserWithComments userWithComments) {
                        //action
                    }
 
                })
}

Conclusion

Rx est une librairie très puissante et facilitant la vie des développeurs Android, je ne vous ai montré ici qu’un infime partie de ce qu’il est possible de faire en utilisant RxJava.
Les observable possèdent une multitude de méthodes, de filtres ou d’opération permettant de les modifier, il devient très simple d’enchainer des actions avec Rx.

Si vous voulez allez plus loin, et vous voulez aller plus loin, je vous invite à vous rendre sur le site officiel
!

Laisser un commentaire

Votre adresse de messagerie ne sera pas publiée. Les champs obligatoires sont indiqués avec *