Esta vez no tengo un caso de uso concreto, pero es algo que es interesante de implementar usando simplicity pays, baby steps, y el flujo de trabajo de TDD (Test-Driven Development). No son como los streams nativos de NodeJS. En este proyecto:
https://github.com/ajlopez/RkStreams
Un stream es como un canal, por el que entran y salen simples objetos/valores JavaScript. Debe ser parecido a algunos conceptos de Redux (pero sin manejo de estado inmutable) y se debe parecer a los mailboxes del lenguaje Elm.
Lo interesante que la funcionalidad que logré implementar hasta ahora sale muy simple usando TDD. Es un buen ejercicio, y pueden ver siempre el historial de commits para ver cómo el proyecto va creciendo en capacidades de a poco, como respuesta a un nuevo test que se plantea, de pasar de “rojo” a “verde”.
Veamos un primer test, crear y usar un stream:
var rks = require('..'); exports['create and use stream'] = function (test) { test.async(); var stream = rks.stream(); stream.process(function (data) { test.ok(data); test.equal(data, 1); test.done(); }); stream.post(1); };
Como es usual en mis proyectos, no expongo una “clase” de JavaScript, sino un método factoría .stream(). Luego el .post() permite enviar un objeto por el stream. Y el .process() permite tener funciones digamos subscriptoras, que procesan cada mensaje que llegue al stream. Esto permite separar la generación de eventos/objetos, de su proceso.
Pero los streams también puede producir otros streams. Un ejemplo clásico es transformar los objetos, con un .map(), que construye un nuevo stream:
exports['map stream'] = function (test) { test.async(); var counter = 0; var stream = rks.stream(); stream.map(function (x) { return x * 2; }).process(function (data) { counter++; test.ok(data); test.equal(data, counter * 2); if (data === 6) test.done(); }); stream.post(1); stream.post(2); stream.post(3); };
O también podemos unir dos streams, con .merge(), de nuevo, devuelve un nuevo stream:
exports['post three messages to streams and merge'] = function (test) { test.async(); var counter = 0; var stream1 = rks.stream(); var stream2 = rks.stream(); stream1.merge(stream2) .process(function (data) { counter++; test.ok(data); test.equal(data, counter); if (data === 3) test.done(); }); stream1.post(1); stream2.post(2); stream1.post(3); };
O podemos crear un nuevo stream que sólo emita los objetos que cumplan con un predicado, usando .filter():
exports['filter stream'] = function (test) { test.async(); var stream = rks.stream(); stream .filter(function (x) { return x % 2 === 0}) .filter(function (x) { return x % 5 === 0}) .process(function (data) { test.ok(data); test.equal(data, 10); test.done(); }); for (var k = 1; k <= 10; k++) stream.post(k); };
Tengo pendiente decidir el tratamiento de las excepciones en los filtros, mapeos y procesos. También quiero implementar un .branch() que permite que un stream se alimente de otro.
En próximos posts: esas nuevas funcionalidades, .process() selectivo, y ver cómo va quedando por dentro la implementación (a hoy, son unas pocas líneas de código).
Nos leemos!
Angel “Java” Lopez
http://www.ajlopez.com
http://twitter.com/ajlopez