Archive for the 'Programación Reactiva' Category

RkStreams, streams reactivos en JavaScript/NodeJS (1)

Tuesday, January 26th, 2016

Esta vez no tengo un caso de uso concreto, pero es algo que es interesante de implementar usando simplicity pays, baby steps, y el flujo de trabajo de TDD (Test-Driven Development). No son como los streams nativos de NodeJS. En este proyecto:

https://github.com/ajlopez/RkStreams

Un stream es como un canal, por el que entran y salen simples objetos/valores JavaScript. Debe ser parecido a algunos conceptos de Redux (pero sin manejo de estado inmutable) y se debe parecer a los mailboxes del lenguaje Elm.

Lo interesante que la funcionalidad que logré implementar hasta ahora sale muy simple usando TDD. Es un buen ejercicio, y pueden ver siempre el historial de commits para ver cómo el proyecto va creciendo en capacidades de a poco, como respuesta a un nuevo test que se plantea, de pasar de “rojo” a “verde”.

Veamos un primer test, crear y usar un stream:

var rks = require('..');

exports['create and use stream'] = function (test) {
    test.async();
    
    var stream = rks.stream();
    
    stream.process(function (data) {
        test.ok(data);
        test.equal(data, 1);
        test.done();
    });
    
    stream.post(1);
};

Como es usual en mis proyectos, no expongo una “clase” de JavaScript, sino un método factoría .stream(). Luego el .post() permite enviar un objeto por el stream. Y el .process() permite tener funciones digamos subscriptoras, que procesan cada mensaje que llegue al stream. Esto permite separar la generación de eventos/objetos, de su proceso.

Pero los streams también puede producir otros streams. Un ejemplo clásico es transformar los objetos, con un .map(), que construye un nuevo stream:

exports['map stream'] = function (test) {
    test.async();
    
    var counter = 0;
    
    var stream = rks.stream();
    
    stream.map(function (x) { return x * 2; }).process(function (data) {
        counter++;
        
        test.ok(data);
        test.equal(data, counter * 2);
        
        if (data === 6)
            test.done();
    });
    
    stream.post(1);
    stream.post(2);
    stream.post(3);
};

O también podemos unir dos streams, con .merge(), de nuevo, devuelve un nuevo stream:

exports['post three messages to streams and merge'] = function (test) {
    test.async();
    
    var counter = 0;
    
    var stream1 = rks.stream();
    var stream2 = rks.stream();
    
    stream1.merge(stream2)
        .process(function (data) {
        counter++;
        
        test.ok(data);
        test.equal(data, counter);
        
        if (data === 3)
            test.done();
    });
    
    stream1.post(1);
    stream2.post(2);
    stream1.post(3);
};


O podemos crear un nuevo stream que sólo emita los objetos que cumplan con un predicado, usando .filter():

exports['filter stream'] = function (test) {
    test.async();
    
    var stream = rks.stream();
    
    stream
        .filter(function (x) { return x % 2 === 0})
        .filter(function (x) { return x % 5 === 0})
        .process(function (data) {
            test.ok(data);
            test.equal(data, 10);
            test.done();
        });
    
    for (var k = 1; k <= 10; k++)
        stream.post(k);
};

Tengo pendiente decidir el tratamiento de las excepciones en los filtros, mapeos y procesos. También quiero implementar un .branch() que permite que un stream se alimente de otro.

En próximos posts: esas nuevas funcionalidades, .process() selectivo, y ver cómo va quedando por dentro la implementación (a hoy, son unas pocas líneas de código).

Nos leemos!

Angel “Java” Lopez
http://www.ajlopez.com
http://twitter.com/ajlopez