MapReduce

De Wikipedia, la enciclopedia libre
Saltar a: navegación, búsqueda
MapReduce nace de los principios conocidos desde los años ochenta de la computación distribuida.

MapReduce es un modelo de programación utilizado por Google para dar soporte a la computación paralela sobre grandes colecciones de datos en grupos de computadoras y al commodity computing. El nombre del framework está inspirado en los nombres de dos importantes métodos, macros o funciones en programación funcional: Map y Reduce. MapReduce ha sido adoptado mundialmente, ya que existe una implementación OpenSource denominada Hadoop. Su desarrollo fue liderado inicialmente por Yahoo y actualmente lo realiza el proyecto Apache. En esta década de los años 2010 existen diversas iniciativas similares a Hadoop tanto en la industria como en la academia. Se han escrito implementaciones de bibliotecas de MapReduce en diversos lenguajes de programación como C++, Java y Python.

MapReduce se emplea en la resolución práctica de algunos algorítmos susceptibles de ser paralelizados.[1] No obstante MapReduce no es la solución para cualquier problema, de la misma forma que cualquier problema no puede ser resuelto eficientemente por MapReduce.[2] Por regla general se abordan problemas con datasets de gran tamaño, alcanzando los petabytes de tamaño. Es por esta razón por la que este framework suele ejecutarse en sistema de archivos distribuidos (HDFS).

Historia[editar]

Las primeras implementaciones de Google necesitaban realizar operaciones de multiplicación de grandes matrices para calcular el PageRank, esto es, la clasificación de páginas en una búsqueda. De esta forma se hizo popular MapReduce como un método de cálculo de álgebra lineal. La preocupación por tratar grandes colecciones de datos, llevó a crear algoritmos y frameworks capaces de poder procesar terabytes de información. Una de las primeras aplicaciones capaces de programar MapReduce fue implementado inicialmente en Hadoop, diseñado inicialmente por Doug Cutting,[3] que lo nombró así por el elefante de juguete de su hijo.[4] Fue desarrollado originalmente para apoyar la distribución del proyecto de motor de búsqueda Nutch.[5]

Concepto[editar]

No todos los procesos pueden ser abordados desde el framework MapReduce. Concretamente son abordables sólo aquellos que se pueden disgregar en las operaciones de map() y de reduce() y esto es importante a la hora de poder elegir este framework para resolver un problema. Las funciones Map y Reduce están definidas ambas con respecto a datos estructurados en tuplas del tipo (clave, valor).

Función Map()[editar]

Map toma uno de estos pares de datos con un tipo en un dominio de datos, y devuelve una lista de pares en un dominio diferente:

Map(k1,v1) -> list(k2,v2)
  • La función map(): se encarga del mapeo y es aplicada en paralelo para cada ítem en la entrada de datos. Esto produce una lista de pares (k2,v2) por cada llamada. Después de eso, el framework de MapReduce junta todos los pares con la misma clave de todas las listas y los agrupa, creando un grupo por cada una de las diferentes claves generadas. Desde el punto de vista arquitectural el nodo master toma el input, lo divide en pequeñas piezas o problemas de menor identidad, y los distribuye a los denominados worker nodes. Un worker node puede volver a sub-dividir, dando lugar a una estructura arbórea. El worker node procesa el problema y pasa la respuesta al nodo maestro.

Función Reduce()[editar]

La función reduce es aplicada en paralelo para cada grupo, produciendo una colección de valores para cada dominio:

Reduce(k2, list (v2)) -> list(v3)
  • La función reduce(): cada llamada a Reduce típicamente produce un valor v3 o una llamada vacía, aunque una llamada puede retornar más de un valor. El retorno de todas esas llamadas se recoge como la lista de resultado deseado.

Por lo tanto, el framework MapReduce transforma una lista de pares (clave, valor) en una lista de valores. Este comportamiento es diferente de la combinación "map and reduce" de programación funcional, que acepta una lista arbitraria de valores y devuelve un valor único que combina todos los valores devueltos por mapa.

Arquitectura del MapReduce[editar]

La función map() se ejecuta de forma distribuida a lo largo de varias máquinas. Los datos de entrada, procedentes por regla general de un gran archivo (fichero), se dividen en un conjunto de M particiones de entrada de generalmente 16 megabytes. Estas particiones pueden ser procesadas en diversas máquinas. En una invocación de MapReduce suelen ocurrir varias operaciones:

  • Se procede a dividir las entradas en M particiones de tamaño aproximado de 64 megabytes. El programa MapReduce se comienza a instanciar en las diversas máquinas del cluster. Por regla general, el número de instancias se configura en las aplicaciones.
  • Una de las copias del programa es especial y toma el papel de "maestro". El resto de copias se denominan como "workers" y reciben la asignación de sus tareas desde el master. Se considera que existen una cantidad de M map() tareas y de R reduce(). El "maestro" se encarga de recopilar "workers" en reposo (es decir sin tarea asignada) y le asignará una tarea específica de map() o de reduce(). Un worker sólo puede tener tres estados: reposo, trabajando, completo.
  • Un worker que tenga asignada una tarea específica de map() tomará como entrada la partición que le corresponda. Se dedicará a parsear los pares (clave, valor) para crear una nueva pareja de salida, tal y como se especifica en su programación. Los pares clave y valor producidos por la función map() se almacenan como buffer en la memoria.
  • Periódicamente, los pares clave-valor almacenados en el buffer se escriben en el disco local, repartidos en R regiones. Las regiones de estos pares clave-valor son pasados al master, que es responsable de redirigir a los "workers" que tienen tareas de reduce().
  • Cuando un worker de tipo reduce es notificado por el "maestro" con la localización de una partición, éste emplea llamadas remotas para hacer lecturas de la información almacenada en los discos duros de los diversos workers de tipo map(). cuando un worker de tipo reduce() lee todos los datos intermedios, ordena las claves de tal modo que a se agrupan los datos encontrados que poseen la misma clave. El ordenamiento es necesario debido a que, por regla general, muchas claves de funciones map() diversas pueden ir a una misma función reduce(). En aquellos casos en los que la cantidad de datos intermedios sean muy grandes, se suele emplear un ordenamiento externo.
  • El worker de tipo reduce() itera sobre el conjunto de valores ordenados intermedios, y lo hace por cada una de las claves únicas encontradas. Toma la clave y el conjunto de valores asociados a ella y se los pasa a la función reduce(). La salida de reduce() se añade al archivo (fichero) de salida de MapReduce.
  • Cuando todas las tareas map() y reduce() se han completado, el "maestro" levanta al programa del usuario. Llegados a este punto la llamada MapReduce retorna el control al código de un usuario.

Se considera que ha habido un final de las tareas cuando este control se ha devuelto al usuario. Las salidas se ditribuyen en un fichero completo, o en su defecto se reparten en R ficheros. Estos R ficheros pueden ser la entrada de otro MapReduce o puede ser procesado por cualquier otro programa que necesite estos datos.

Combinador (Agregadores locales)[editar]

En un entorno de clusterización, uno de las límitaciones se encuentra en el transporte de grandes ficheros entre ordenadores que debido a lo limitado de su ancho de banda. En el framework MapReduce la función map() escribe en una memoria intermedia de caracter local, como puede ser un disco duro. La información que se escribe en local es agregada y ordenada por una función agregadora encargada de realizar esta operación. Los valores ordenados son de la forma [k, [v1, v2, v3,] ..., vn]]. De esta forma la función reduce() recibe una lista de valores asociados a una única clave procedente del combinador. Debido a que la latencia de red de ordenadores, y de sus discos suele ser mayor que cualquier otra de las operaciones, cualquier reducción en la cantidad de datos intermedios incrementará la eficiencia de los algoritmos. En MapReduce, cualquier agregación local de los resultados intermedios causa una mejora real de la eficiencia global.

Es por esta razón por la que muchas ditribuciones oficiales de MapReduce suelen incluir operaciones de agregación en local, mediante el uso de funciones capaces de agregar datos localmente. Evitando, o reduciendo en la medida de lo posible el movimiento de grandes ficheros. Bien sea añadidas a las funciones map(), o a los agregadores locales.

Tolerancia a Fallos[editar]

El mecanismo de MapReduce es tolerante a fallos cuando uno de los workers se ve sometido a un fallo. Como MapReduce se ha diseñado para procesos en los que se encuentran involucrados grandes tamaños de datos mediante el empleo de cientos o miles de ordenadores. Aún siendo la probabilidad de fallo baja, es muy posible que uno (o varios) de los workers quede desactivo precisamente por fallo de la máquina que le daba soporte. El "master" periódicamente hace ping a cada worker para comprobar su estatus.

Si no existe respuesta tras un cierto instante de espera, el master interpreta que el worker está desactivado. Cualquier tarea map() que ha sido completa por el worker regresa de inmediato a su estado de espera, y por lo tanto puede resultar elegible para su asignación en otros workers. De forma similar, cualquier función map() (o reduce) que se encuentre en progreso durante el fallo, se resetea a estado de reposo pudiendo ser elegida para su nueva re-asignación.

Las tareas de map() completados se vuelven a re-ejecutar ante un fallo debido en parte a que su salida se almacena en los discos locales de la máquina que falló, y por lo tanto se consideran inaccesibles. Las tareas reduce() completas no son necesarias volver a ser re-ejecutadas debido a que su salida se ha almacenado en el sistema global. cuando la tarea de map() se ejecuta por un worker A y luego por un worker B (debido principalmente a un fallo), en este caso todas las tareas reduce() son notificadas para que eliminen datos procedentes del worker A y acepten las del worker B. De esta forma la ejecucción de MapReduce es resiliente.

Ejemplos[editar]

En la descripción de los ejemplos de uso de MapReduce sólo es necesario describir en detalle como se implementan las operaciones de map() y de reduce() en cada caso. La literatura muestra ejemplos reiterados de conteo de palabras en un documento, de operaciones matriciales y de operaciones de consulta a bases de datos relacionales.

Conteo de palabras[editar]

Este ejemplo de MapReduce es un proceso para contar las apariciones de cada palabra en un conjunto de documentos:

 map(String name, String document):
  // clave: nombre del documento
  // valor: contenido del documento
  for each word w in document:
    EmitIntermediate(w, 1);

La función map() en este caso divide un documento en palabras (es decir lo tokeniza) mediante el empleo de un simple analizador léxico, y emite una serie de tuplas de la foma (clave, valor) donde la clave es la palabra y el valor es "1". Es decir, por ejemplo, del documento "La casa de la pradera" la función map retornaría: ("la", "1"), ("casa", "1"), ("de", "1"), ("la", "1"), ("pradera", "1").

 
 reduce(String word, Iterator partialCounts):
  // word: una palabra
  // partialCounts: una [[Iterador (patrón de diseño)|lista parcial]] para realizar cuentas agregadas
  int result = 0;
  for each v in partialCounts:
    result += ParseInt(v);
  Emit(result);

Aquí, cada documento es dividido en palabras, y cada palabra se cuenta con valor inicial "1" por la función Map, utilizando la palabra como el resultado clave. El framework reúne todos los pares con la misma clave y se alimenta a la misma llamada Reduce, por lo tanto, esta función sólo necesita la suma de todos los valores de su entrada para encontrar el total de las apariciones de esa palabra. En el ejemplo anterior ("la", "1") aparece dos veces debido a que la clave "la" tiene dos ocurrencias, el resto de claves sólo aparece una vez.

Multiplicación de una matriz por un vector[editar]

Los ejemplos de algebra lineal para operaciones de matrices son los más adecuados por la idonidad del framework en estos casos. Supongamos que tenemos una matriz cuadrada M de tamaño nxn. Al elemento ubicado en la fila i y columna j le denominamos mij. Supongamos que tenemos un vector v de tal forma que en la posición j se tiene el elemento vj. De esta forma la resultante de la multiplicación entre la matriz M y el vector v será un vector x de longitud n, de tal forma que el elemento xi es tal que:

x_{i} = \sum_{j=1}^n m_{ij} v_j

Esta operación se realiza sin problema alguno para matrices de varios miles de elementos, siendo costoso para varios millones. El problema de su computación proviene cuando se pretende realizar con centenares de billones. Es por esta razón por la que se asume en la aplicación de MapReduce que n es del orden de 1012. La función map () en este caso toma una fila i de la matriz y completo el vector v para formar pares: (i, mijvj). Es decir de la forma (1, m11v1), (1, m12v2), (1, mi3v3) ... (1, mijvj).

 map(Vector rowMatrix, Vector vector):
  // clave: i -> índice del vector
  // valor: producto de m<sub>ij</sub> por v<sub>j</sub>.
  for each position i in vector:
    EmitIntermediate(i, value);

La función reduce() en este caso sólo tiene que colectar los pares que poseen la misma clave i y sumarlos.

 
 reduce(String word, Iterator partialCounts):
  // word: una palabra
  // partialCounts: una [[Iterador (patrón de diseño)|lista parcial]] para realizar cuentas agregadas
  int result = 0;
  for each v in partialCounts:
    result += ParseInt(v);
  Emit(result);

Usos[editar]

Por regla general se emplea MapReduce en aquellos problemas de Computación concurrente entre los que se encuentran involucrados grandes datasets que deben ser procesandos por una gran cantidad de computadoras (nodos), a los que se refiere de forma colectiva como clusteres (si todos los nodos se encuentran en la misma red de área local y empleando el mismo hardware), o a grids (si los nodos se comparten de forma distribuida a lo largo de extensas zonas geográficas o administrativas, y que generalmente poseen un hardware más heterogéneo). El procesamiento paralelo puede ocurrir con el empleo de datos almacenados tanto en filesystem (no estructurado) o en una database (estructurados).[1] Es por esta razón por la que se emplea en aplicaciones que poseen datos a gran escala, tales como aplicaciones paralelas, indexación web, data mining, y simulación científica.

Referencias[editar]

  1. a b Jeffrey Dean, Sanjay Ghemawat, (2008), MapReduce: simplified data processing on large clusters, Communications of the ACM - 50th anniversary issue: 1958 - 2008, Volume 51 Issue 1, January 2008 Pages 107-113
  2. Anand Rajaraman,Jeffrey David Ullman, (2012), Mining of Massive Datasets
  3. Hadoop creator goes to Cloudera
  4. Ashlee Vance (17-03-2009). «Hadoop, a Free Software Program, Finds Uses Beyond Search». New York Times. Consultado el 20-01-2010. 
  5. "Hadoop contains the distributed computing platform that was formerly a part of Nutch. This includes the Hadoop Distributed Filesystem (HDFS) and an implementation of map/reduce." About Hadoop