Con l’arrivo di RxJS 6 è stato introdotto il metodo pipe(), che se da un lato rende gli operatori un po’ più scomodi da scrivere rispetto a prima, dall’altro porta dei grossi vantaggi: non siamo più costretti a modificare il prototype degli Observable (fiuu!) e gli operatori diventano delle semplici funzioni pure.

In questo articolo vediamo come e perché creare degli operatori custom: è estremamente facile!

Cos’è un operatore

Un operatore è una semplice funzione, che accetta un Observable e ritorna un Observable. Solitamente, gli operatori sono funzioni pure, ovvero non modificano i dati in ingresso né variabili al di fuori della funzione stessa, e dati gli stessi parametri di ingresso ritornano sempre lo stesso valore.

L’operatore quindi crea a sua volta un nuovo Observable, ma lo fa in modo trasparente: l’utilizzatore non si accorge di questo nuovo Observable, gli sembra semplicemente che i valori emessi dall’Observable di partenza siano stati in qualche modo alterati.

Ci sono molte tipologie di operatori: combinazione, condizionali, creazione, trasformazione, filtraggio, e così via… Abbiamo insomma un vasto arsenale che possiamo usare per manipolare i nostri stream di dati. E quindi, perché ci potrebbe essere la necessità di crearne di custom? Vediamo qualche esempio.

Non troviamo l’operatore giusto in RxJS

Devo dirvi la verità: solo una volta mi è personalmente capitato di dover scrivere un operatore manualmente perché non riuscivo a trovare una combinazione che mi soddisfacesse con gli operatori esistenti.

Il caso era questo: mi serviva un operatore come takeWhile(), ma volevo che venisse emesso anche il valore che faceva completare l’Observable. In poche parole…

const source$ = from([1, 2, 3, 4, 5]);

/**
 * Nel subscribe arrivano: 1, 2, 3.
 * Quando arriva il 4, la condizione diventa falsa e si ferma tutto.
*/
source$.pipe(
  takeWhile(item => item < 4)
).subscribe();

// ...ma a me serve anche il 4!

Da un po’ di tempo è stato introdotto un secondo parametro in takeWhile, che se settato a true ci fa passare anche il valore che ci interessa, ma questo parametro non c’era quando mi serviva, e quindi la strada più veloce era creare un operatore custom.

/**
 * Ora nel subscribe arrivano: 1, 2, 3, 4.
*/
source$.pipe(
  takeWhile(item => item < 4, true)
).subscribe();

Come l’ho creato? Non è difficilissimo, ma nemmeno troppo immediato. Prima di tutto ho dovuto creare una Higher Order Function (non sapete cos’è? c’è un articolo!) perché c’era bisogno di passare una callback al nostro operatore (questa: item => item < 4).

// Higher Order Function
function takeWhileWithLatest(callback) {

  // Ritorniamo il nostro operatore
  return function myOperator(source) {
    // Creiamo il nuovo Observable
    return Observable.create(subscriber => {
      ...
   });
  }
}

E poi bisogna occuparsi della logica, ovvero: sottoscriverci all’Observable di partenza (source), verificare la condizione (con la callback) ed emettere solo i valori che ci interessano.

function takeWhileWithLatest(callback) {

  return function myOperator(source) {
    return Observable.create(subscriber => {
      // Ci sottoscriviamo al vecchio Observable
      const subscription = source.subscribe(value => {

        // Verifichiamo che il nostro valore faccia scattare la condizione
        const meetsCondition = callback(value);

        // Lo emettiamo in qualsiasi caso
        subscriber.next(value);

        // Se questo valore ha fatto scattare la condizione, completiamo
        if (meetsCondition) {
          subscriber.complete();
        }
        
      },
      err => subscriber.error(err),
      () => subscriber.complete());

      return subscription;
   });
  }
}

Complicato? Un pochino, ma a volte serve. Serve appunto se proprio non riusciamo a cavarcela con gli operatori esistenti. Probabilmente ripensandoci un modo per cavarmela senza l’operatore custom l’avrei potuto trovare, ma non importa: questo è per dirvi che, mal che vada, c’è questo approccio.

Ma se possiamo cavarcela con operatori esistenti ben venga, è molto più facile!

Riutilizzare Operatori esistenti

Facciamo un esempio: è buona prassi, quando si tratta di creare un form di ricerca, utilizzare gli operatori debounceTime() e distinctUntilChanged() per assicurarci di aspettare un po’ di tempo dopo che l’utente ha finito di scrivere, e verificare che il valore corrente non sia lo stesso della chiamata precedente, in questo modo:

const source$ = fromEvent(myInput, 'keyup');

source$.pipe(
  debounceTime(500),
  distinctUntilChanged(),
  switchMap(value => ajax(...))
).subscribe(result => {
  console.log('API found: ', result);
});

Se già utilizzate Angular, so già che avrete visto questo esempio mille volte. Quindi, al posto di ripetere quella sequenza mille volte, potremmo racchiuderne la logica in un operatore custom:

function liveSearch() {
  // Questa volta utilizziamo una Arrow Function per chiarezza
  return input$ => input$.pipe(
    debounceTime(500),
    distinctUntilChanged(),
  );
}

source$.pipe(
  liveSearch(),
  switchMap(value => ajax(...))
).subscribe(result => {
  console.log('API found: ', result);
});

Come vedete non c’è più bisogno di fare un subscribe all’Observable di partenza, ci basta utilizzare pipe() e gli operatori esistenti, ottenendo codice dichiarativo molto più bello da leggere!

Possiamo anche fare uno step successivo e parametrizzare la nostra factory, passandole il valore di debouceTime e includendo lo switchMap:

function liveSearch(time, callback) {
  return input$ => input$.pipe(
    debounceTime(time),
    distinctUntilChanged(),
    switchMap(callback)
  );
}

source$.pipe(
  liveSearch(500, value => ajax(...)),
).subscribe(result => {
  console.log('API found: ', result);
});

Come ultimo step, se usate TypeScript, tipizzatelo:

function liveSearch<T>(time: number, callback: (string) => ObservableInput<T>): OperatorFunction<string, T> {
  return input$ => input$.pipe(
    debounceTime(time),
    distinctUntilChanged(),
    switchMap(callback)
  );
}

Cos’è quell’ObservableInput? Significa che possiamo specificare un Observable, una Promise o un valore Array-like: in pratica, ciò che viene accettato da switchMap.

La funzione pipe()

Come ultima cosa, RxJS ci mette a disposizione una ulteriore utility per semplificare ancora di più i nostri operatori: non solo pipe() è un metodo della classe Observable, è anche una funzione stand-alone che possiamo utilizzare per raggruppare degli operatori!

Ecco il nostro codice finale: semplice, pulito, senza fronzoli.

import { pipe } from 'rxjs';

function liveSearch(time, callback) {
  return pipe(
    debounceTime(time),
    distinctUntilChanged(),
    switchMap(callback)
  );
}

Conclusioni

Creare operatori custom da RxJS è diventato un gioco da ragazzi. Non fatelo se non vi serve! Ma se doveste trovare una particolare esigenza non soddisfatta dagli operatori esistenti (molto difficile) oppure trovaste un set di operatori ripetuto molte volte all’interno della vostra app, fateci un pensiero.

Ti è piaciuto l’articolo? Condividilo!

Rilasciare risorse gratuite non è facile: un sacco di tempo vola fra la ricerca dell’idea, la stesura, la preparazione di codice funzionante e la revisione. Se ti piace ciò che facciamo, condividi l’articolo: nel peggiore dei casi ti daranno del secchione!

Ti interesserà anche…

Iscriviti alla nostra newsletter!

Rimani aggiornato per quando rilasceremo nuovi articoli, nuovi corsinuove promozioni e nuove risorse gratuite. Ti promettiamo che useremo la tua email con prudenza, e non la condivideremo con nessuno senza il tuo consenso!

Abbiamo a cuore la tua privacy. Niente spam. Leggi la nostra privacy policy qui.

Menu