RxJS.

Wat is Reactive Programming?

Reactive programming is een manier van programmeren met asynchrone datastreams te verwerken. Je weet vaak niet óf en wanneer je de volgende elementen op een stream gaat ontvangen.
Datastream

  • Een Observable opent een continue communicatiekanaal met de datastream.
  • Observables kunnen zowel cold als hot zijn.
    • Cold observable (unicast):
      Cold observables zijn lazy. Ze doen niets totdat iemand hen begint te observeren (abonneren/subscribe). Elke geabonneerde krijgt zo zijn eigen stream.
    • Hot observable (multicast):
      Hot-streams zijn reeds vóór het abonnement actief. Bijvoorbeeld een temperatuursensor die elke minuut een waarde uitzendt. Elke geabonneerde krijgt nu dezelfde stream.
  • Binnen een observable kan je de inkomende elementen eventueel nog manipuleren voor je het aan de geabonneerde doorgeeft.
  • Het meest gebruikte framework voor reactive programming is http://reactivex.io/. Hier vind je bibliotheken voor o.a. Java, Python, PHP, JavaScript (RxJS), ....
  • Angular en Firebase steunen voor een groot deel op RxJS. Denk hierbij aan reactive forms, HTTP-requests, Firebase authenticatie, Cloud Firestore, ...
  • RxJS kent tal van operators. Enkel de meest essentiële operators worden op deze pagina kort toegelicht.
  • Een observable-naam eindig je bij voorkeur met een $-teken. Bijboorbeeld user$.

Een interactief marble diagram van de operators geeft vaak een duidelijk beeld van wat er in de achtergrond gebeurt.

Reactive programming with RxJS and angular

Zie video (1:54:45) Reactive programming with RxJS and angular

new Observable<number>(): datastream van getallen

Bestudeer volgende code: https://stackblitz.com/edit/tmk-observable-number.

Als eerste wordt een nieuwe observable obs$ aangemaakt. Binnen de observable kan je drie methodes gebruiken.

  • observer.next(...): plaatst een nieuwe waarde in de stream.
  • observer.error(): zendt een error naar de subscribers.
  • observer.complete(): de stream is teneinde.

In deze toepassing zal obs$ elke seconde een willekeurig getal tussen 0 en 99 uitzenden. Na iets meer dan 5sec stopt de stream.

Binnen Angular kan je op twee manieren op een strean abonneren (subscribe).

  • In de component.
    Via this.obs$.subscribe(). Merk op dat we deze functie aan de variabele subscription koppelen. Dit is nodig omdat je bij het verlaten van de component (ngOnDestroy) moet unsubscriben , zodat je het gereserveerde geheugen weer vrijgeeft.
  • In de template.
    Via de async-pipe {{ obs$ | async}}. Hier hoef je niet te unsudscriben. Angular doet dit automatisch.

Merk op dat obs$ een cold observable is. Elke geabonneerde (één in de component en vier in de template) krijgt zijn eigen stream aan willekeurige getallen.

Datastream met getallen:

new Observable<Counter>(): datastream van objecten

Bestudeer volgende code: https://stackblitz.com/edit/tmk-observable-object.

Een stream hoeft zich niet te beperken tot een getal of en string. Later in de toepassing zal je merken dat er vaak volledige objecten worden doorgestuurd. De verwerking is uiteraard hetzelfde, maar wat als je bepaalde waardes uit het object dynamisch, bijvoorbeeld via property binding, in de template gaat verwerken? In dit voorbeeld zie je hoe je dit best aanpakt.

De observable obs$ bevat nu een teller (i) en genereert elke seconde een willekeurig getal. Beide worden samen in de key text verwerkt. Afhankelijk of het getal even of oneven is, krijgt de key class een andere waarde.

Is het willekeurig getal bijvoorbeeld 30, dan is dit het object dat wordt uitgezonden:

{
  "text": "Package 1 send: <b>30</b>",
  "class": "alert-success"
}

Kijk nu in de template hoe je de key class en de key text via property binding aan een alert-box kan koppelen.

Bij het openen van de pagina, is het object null. Pas na de eerste seconde zal het object een waarde bevatten. Om fouten te vermijden, gebruik je niet counter.class, maar wel counter?.class. Het vraagteken betekent dat de code pas wordt uitgevoerd indien de key class ook echt aanwezig is.

In de eerste twee voorbeelden hebben we een observable aangemaakt met new Observable(). Dit is prima, maar stel dat je enkel een teller met gehele getallen wilt aanmaken of je wilt een bestaande array omvormen tot een observable, dan kan het eenvoudiger.
Een volledig overzicht voor het aanmaken van observables vind je hier: http://reactivex.io/documentation/operators.html#creating

from(): van array naar observable.

Bestudeer volgende code: https://stackblitz.com/edit/tmk-observable-fromarray.

In dit voorbeeld wordt een array (via from()) omgezet naar een observable en dadelijk gevolgd door subscribe. Binnen de subscription wordt elke kleur uit de array aan de lokale variabele colors[] toegevoegd en vervolgens getoond in de template.

Merk op dat de preloader niet op de pagina verschijnt. De preloader krijgt immers niet de tijd om getoond te worden omdat de omzetting van de array naar een observable dadelijk gebeurt.

Waarom een lokale array omvormen tot een observable?
Dit is inderdaad niet erg nuttig in dit voorbeeld. In werkelijkheid komen de gegevens meestal van een externe bron en dan ga je de data asynchroon via HTTP inlezen. Dit brengt natuurlijk altijd een bepaalde tijdsvertraging met zich mee.
Een HTTP-call in Angular resulteert trouwens altijd in een observable, zij het dan van één element.

Dit gedrag kunnen we met dit voorbeeld perfect simuleren. Het enige dat je hiervoor hoeft te doen, is een tijdsvertraging in te bouwen. Hiervoor gebruik je de "pipable operator" delay().

Voeg na from(this.rainbowArr) een delay van 3sec toe:


        

De preloader is nu gedurende 3sec zichtbaar.

timer(): stream van gehele getallen.

Bestudeer volgende code: https://stackblitz.com/edit/tmk-observable-timer.

timer(0, 1000) maakt een observable die elke seconde een geheel getal in de stream plaatst.

De code is niet echt spectaculair, maar in de volgende oefening gaan we de getallen manipuleren voor we ze aan de component geven.

RxJS operators.

Operators zijn functies die de data in een stream manipuleren. In voorgaande voorbeelden hebben we trouwens al enkele operators gebruikt (tap() en delay()). Sinds RxJS 6 moet je alle operators binnen pipe() plaatsen. En, zoals we ook in het voorbeeld gezien hebben, komt pipe() altijd NET NA het aanmaken van de observable.

RxJS bevat operators om data te transformeren, te filteren, te combineren, enz.
Een volledig overzicht vind je hier: http://reactivex.io/documentation/operators.html#categorized

In dit deel bouw je verder op ons laatste voorbeeld. Aan de hand van enkele operators tonen we hoe je de inkomende stroom van getallen kan manipuleren voor je op de stream subscribed.

Alle operators importeer je van "rsjs/operators".

Clone voorgaand voorbeeld en experimenteer met de verschillende operators.

  • tap()
    Handig om tussentijdse waarde te debuggen.
    
                In de console zal je merken dat elk getal tweemaal verschijnt! We hebben hier een cold observable.
                In de template ga je tweemaal subscriben, vandaar dat de functie ook tweemaal wordt uitgevoerd.
              
  • shareReplay()
    Maak van een cold observable (unicast) een hot observable (multicast).
    
                In de console verschijnt elk getal maar één keer.
              
  • take(n)
    Laat enkel de eerste n-waardes door.
    
                Zie ook first(),
                last(), ...
              
  • filter()
    Enkel waardes die aan de voorwaarde voldoen, mogen door.
    
              
  • sample(x)
    Laat om de x ms een waarde door.
    
              
  • map()
    Transformeer de inkomende waarde naar een nieuwe waarde.
    
              
  • finalize()
    Voer, op het einde van een stream of na een fout, een functie uit.