Stratio Sparta, una forma muy simple de hacer agregaciones con Spark Streaming

Es habitual tener la necesidad de agrupar grandes cantidades de datos en tiempo real, ya sean procedentes del uso de un servicio, de redes sociales como Twitter o de un medio físico como es la meteorología. Una buena forma de procesar toda esta información es con Spark Streaming, para poder disponer en tiempo real de la información para ser servida, pero tiene un inconveniente: hay que programar la lógica.

La gente de Stratio nos cuenta cómo evitar realizar el desarrollo de la propia agregación de datos usando Stratio Sparta, que se encarga de reducir la información en bruto para poder disponer de una información útil para ser servida. Todo a través de una interfaz web muy simple de usar y siendo totalmente flexible. Tras el procesamiento, persistiremos el resultado en un cluster de MongoDB y serviremos en streaming a través de WebSockets con Node.js.

Stratio Sparta es una herramienta realmente flexible y simple de usar, que nos permite realizar transformaciones a la información en bruto con Morphline a través de su interfaz web y además realizar agregaciones por distintas dimensiones y rangos de tiempo.

Primeros pasos con Stratio Sparta

En primer lugar, debemos de instalar Stratio Sparta siguiendo los pasos de la propia documentación. Aunque el ejemplo también se puede realizado con Stratio Manager, que se encarga de montar un cluster de 3 máquinas con MongoDB, Stratio Sparta y Spark sobre Mesos, que nos permite gestionar estas operaciones de mapreduce en nuestro cluster.

Tras la instalación podemos ejecutar Stratio Sparta, que abrirá su servicio en el puerto 9090, donde podremos empezar a configurar los siguientes parámetros:

  • Input: es el origen de los datos y puede proceder de Flume, Kafka, RabbitMQ, de un Socket, de un WebSocket o directamente de Twitter.
  • Output: podemos persistir o mostrar esta información de diferentes formas, como MongoDB, Elasticsearch, Cassandra, Parquet, Redis o directamente en un CSV o en pantalla.
  • Políticas: aquí está el grueso de la configuración y podemos hacer las siguientes acciones:
    • Configurar el input.
    • Configurar los outpus.
    • Realizar transformaciones con Morphlines, de tipo de dato y de fechas.
    • Realizar agregaciones, que pueden ser de distintas dimensiones y diferente granuralidad de tiempo. También debemos de realizar alguna de las diferentes funciones típicas en agrupaciones de datos (como count, sum, max, min…).

Lo primero, en este ejemplo cogeremos los datos de la API de meetup.com que provee un WebSocket con la información que se están creando o actualizando en el servicio, que posteriormente agregaremos por país y haremos un recuento de eventos por hora.

Posteriormente añadimos los 3 nodos con MongoDB como output y establecemos el nombre de la colección que generará (entre otras cosas).

De toda la información en bruto que nos provee el WebSocket, solo cogeremos 3 de los atributos y le daremos el formato adecuado. Estableceremos como output de la transformación country, response y modified (este último para devolver desde Node.js las últimas actualizaciones fácilmente).

{
  "morphline": {
    "id": "morphline1",
    "importCommands": [
      "org.kitesdk.**"
    ],
    "commands": [
      {
        "readJson": {}
      },
      {
        "extractJsonPaths": {
          "paths": {
            "response": "/response",
            "country": "/group/group_country",
            "modified": "/mtime"
          }
        }
      },
      {
        "removeFields": {
          "blacklist": [
            "literal:_attachment_body"
          ]
        }
      }
    ]
  }
}

Tras establecer cómo será la estructura de la información procesada, configuraremos una agregación o cubo de la siguiente forma:

  • Time dimension: nombre del atributo que contendrá la fecha por la cual se agregan los datos (ya sea por días, horas…).
  • Granurality: rango de tiempo en el que se agrega la información. En este caso será por cada hora.
  • Dimensions: agrupamos por country, aunque otra posible agregación sería la de country y response.
  • Operators: aquí haremos varias operaciones, la primera es el recuento de eventos (count) y posteriormente añadirá el último valor de entrada de modified, que era cogido del atributo mtime del WebSocket (con lastValue).

Y tras realizar esta serie de acciones, solo tenemos que establecer la salida (que en este caso es MongoDB) y ejecutar la política. Con esta información ya debería de empezar a producirse las agregaciones de la siguiente forma:

El resultado lo tenemos en tiempo real en nuestro cluster de MongoDB, al que podemos hacerle consultas desde cualquier otra tecnología.

Servidor de WebSocket con Node.js

Para montar nuestro servidor de WebSocket conectado a MongoDB necesitamos haber instalado ambos módulos (websocket y mongodb). Posteriormente podremos leer los registros en la base de datos que serán enviados después por WebSocket:

var MongoClient = require('mongodb').MongoClient;

MongoClient.connect('mongodb://localhost:27017/sparta', function(err, db) {
  db
    .collection('id_country_hour')
    .find()
    .toArray(function(err, docs){
      console.log(docs);
    });
});

Tras ejecutar el script con node, se mostrará por consola el contenido de MongoDB.

Tras comprobar el acceso a la información crearemos nuestro servidor de WebSocket para poder transmitir en tiempo real los cambios producidos por Stratio Sparta.

Para poder servir todos los datos en tiempo real debemos de hacer lo siguiente:

  • En primer lugar, cuando se conecte un nuevo cliente, debemos de almacenar la conexión que nos genera la propia API de WebSockets, para poder hacer un broadcast posteriormente. Al igual que eliminar la conexión tras la desconexión.
  • Cuando se conecte un nuevo cliente tenemos que enviar todos los registros existentes hasta la fecha.
  • Cada cierto tiempo enviamos los cambios que se han realizado en MongoDB, usando el atributo modified para conocer cuáles son los últimos cambios.
  • En el ejemplo, para no enviar demasiados datos de más, filtraremos los países que se enviarán.
  • También filtraremos los atributos a enviar, ya que solo usaremos el atributo country, hour y count.

El ejemplo es breve y se puede consultar en este gist.

Cliente de WebSocket con JavaScript

Una vez que tenemos los datos en nuestro front-end, podemos mostrarlos en forma de gráfico, tabla o con el registro de la consola, por ejemplo. En este caso mostraremos en forma de tabla esta información con console.table para hacer más sencilla la visualización.

var ws = new WebSocket('ws://127.0.0.1:8008/');
var data = {};

ws.onmessage = function(response){
  var responseData = JSON.parse(response.data);

  responseData
    .map(function(row){
      row.hour = new Date(row.hour).toISOString().slice(0, 16).replace('T',' ');
      return row;
    })
    .forEach(function(row){
      if(!data[row.hour]){
        data[row.hour] = {};
      }
      data[row.hour][row.country] = row.count;
    });

  console.clear();
  console.log("Last update:", new Date().toLocaleString());
  console.table(data);
}

El resultado es simple pero funcional:

También podemos realizar una visualización más estética con gráficos. Una forma de realizar gráficas rápidamente es con Chart.js. Simplemente tenemos que formatear los datos para que se ajusten al esquema de datos necesario para Chart.js y obtendremos el siguiente resultado.

Este ejemplo tan simple de visualización de datos también está disponible en este gist.

Es realmente simple manipular información desde JavaScript y hay multitud de librerías de visualización de datos, lo que hace que sea trivial su uso y visualización de datos generados a partir de agregaciones con Stratio Sparta, para poder generar informes y estadísticas. Algo que resulta mucho más eficiente si lo comparamos con almacenar todos los datos y posteriormente procesarlos para mostrarlos.

Vía | Stratio

Ver todos los comentarios en https://www.genbeta.com

VER 0 Comentario

Portada de Genbeta