Traîtement distribué des données

Master 1 ISD

Données massives
MapReduce

kn@lmf.cnrs.fr

Propriétés et limites des bases SQL

SQL

Le standard SQL spécifie un grand nombre d'aspects des bases de données relationnelles :

Garanties

Le modèle relationnel et les bases de données SQL offrent des garanties très fortes :

Quel impact ?

Maintenir un modèle ACID a un impact énorme sur les performances (création de snapshots, mise en place de verrous coûteux, attente de confirmation par le disque que les données sont sauvées, …).

Comment augmenter les performances et le volume des données ?

Distribution et modèle ACID

Il existe deux modèles de distribution possible :

Le modèle ACID est-il toujours adapté ?

Le modèle ACID est parfois trop contraignant :

Y a t-il un modèle alternatif ?

Théorème de Brewer

Le théorème (énoncé comme une conjecture en 1999 par E. Brewer et prouvé en 2004) dit que dans un système distribué partageant des données, au plus 2 des 3 propriétés suivantes peuvent être optimale :

Le théorème en lui-même est souvent mal interprété. Il a surtout été énoncé (d'après Brewer) pour inciter les gens à explorer d'autres modèles/compromis.

Modèle BASE

Le modèle BASE est régit par les propriétés suivantes :

Exemple concret

On considère un système avec deux nœuds N1 et N2 :

Que peut on faire avec BASE ?

Le modèle BASE permet de créer des systèmes composés de plusieurs nœuds, capable de stocker des données indépendemment et de répondre rapidement. Comment le mettre à profit ?

Paradigme MapReduce

Programmation fonctionnelle, le retour

Un peu de code OCaml pour se détendre

let words = [ "It'"; "s"; "a"; "beautifull"; "day" ] (* On veut compter la longueur totale des chaînes de caractères dans words *) let lengths = List.map (fun s -> String.length s) words (* lengths = [ 3; 1; 1; 10; 3 ] *) let total = List.fold_left (fun acc i -> i + acc) 0 lengths (* total = 18 *)

Les itérateurs map et fold

map
de type (α → β) → α list → β list , prend en argument une fonction et une liste d'éléments et applique la fonction à chaque élément. Elle renvoie la liste des éléments transformés (dans le même ordre).

fold
de type (α → β → α) → α → β list → α, prend une fonction d'agrégat, un accumulateur initial et une liste d'élément qui sont passés tour à tour à la fonction. La valeur finale de l'accumulateur est renvoyée.

Visuellement

map
f f f f f
fold
f
f
f
f
f

Propriétés de map et fold ?

Ces observations sont à la base de MapReduce

MapReduce

Popularisé par Jeffrey Dean et Sanjay Ghemawat dans l'article :
MapReduce: Simplified Data Processing on Large Clusters (OSDI, 2004)

Remarque: le paradigme n'est pas nouveau (BD distribuées, langages fonctionnels), mais l'article l'a popularisé et a permis l'arrivée d'implémentations Open-Source robustes.

Cadre : on dispose d'un grand nombre de machines, dont chacune dispose localement d'un ensemble de données (distinct des autres). Un nœud particulier joue le rôle d'orchestrateur les autres sont des travailleurs (workers).

Une transformation MapReduce se décompose en trois phases :

  1. Phase Map
  2. Phase Shuffle
  3. Phase Reduce (fold)

Préambule

Le programmeur fournit deux transformations :

map(InputKey k, InputValue v) → (OutputKey * IntermediateValue list) reduce(OutputKey, IntermediateValue list) → OutputValue list

Phase Map

Lors de la phase Map, l'orchestrateur exécute une copie de la transformation map sur chaque worker. Chacun ne transforme que les données qu'il possède localement.

La fonction map reçoit ses données d'entrées sous la forme d'une paire clé, valeur (par exemple (nom de fichier, fichier) ou (id, ligne correspondant à l'id dans une table)).

La fonction renvoie comme résultat une liste de valeur transformées associé à une clé de groupe

Phase Shuffle

Lorsque la phase Map est terminée sur tous les nœuds, les données sont échangées entre nœuds et groupées selon la clé de groupe

Cette opération est une barrière, elle ne peut se produire qu'après la fin de la phase Map. De plus elle nécessite l'échange de données sur le réseau (coûteux).

Phase Reduce

L'orchestrateur exécute une copie de la fonction reduce sur chaque nœud. Cette fonction reçoit en argument une clé de groupe et la liste de toutes les valeurs intermédiaires associées, et produit un résultat final par clé.

La liste de ces résultats (pour chaque clé) est renvoyée au programmeur ou stockée sur les nœuds pour être réutilisée dans un nouveau cycle Map/Shuffle/Reduce

Exemple complet : word count

On suppose stocké sur les nœuds des ensembles de fichiers, auxquels on peut accéder sous forme de paires :
(nom de fichier, contenu du fichier)

Exemple complet : word count (Map)

map(InputKey file, InputValue content) { for each word in content { Output(word, 1); } }

Exemple complet : word count (Shuffle)

Exemple complet : word count (Map)

reduce(OutputKey word, InputermediateValue list ones) { int total = 0; for i in ones { total += i; } return total; }

Parallélisme et modèle de coût

Chaque nœud travaille en parallèle lors des phases Map et Reduce.

Un nœud plus lent retarde tout le monde (en pratique, le même calcul peut être lancé de manière redondante sur plusieurs nœuds et le résultat du premier qui termine est gardé)

La phase de Shuffle est coûteuse en accès réseaux, le modèle de coût est différent des BD relationnelles (on va chercher à minimiser les échanges réseaux)

Optimisation : si les fonctions de réduction est associative et commutative, on peut appliquer Reduce localement avant la phase de Shuffle et échanger des paquets de réduits.

Résistance aux pannes

Passage à l'échelle

Le paradigme MapReduce est destiné aux très gros volumes de données. Une BD relationnelle sera toujours plus rapide, qu'un seul nœud. Pour tirer partie du parallélisme il faut un jeu de données inexploitable sur un machine unique (de l'ordre du tera-octet ou peta-octet).

Expressivité (digression en SQL)

On suppose que l'on dispose d'une table Text possédant une unique colonne VARCHAR(100) word.
Donner une requête SQL qui simule WordCount :

SELECT word, SUM(num) FROM (SELECT word, 1 AS num FROM WORD) AS T GROUP BY word;

Quelles autres fonctionnalité de SQL peut on facilement supporter ?

Fonctionnalités non supportées par le modèle

Pourquoi ?

Rappel du pseudo-code de la jointure naturelle entre deux attributs a et b de deux tables A et B :

for each row r in A do for each row s in B do if r.a == s.b then output r &join; s done done

On doit comparer chaque ligne de A, à toutes les lignes de B. Dans un contexte distribué, une grande partie des lignes de B n'est pas locale
&implies; trop d'échanges sur le réseau !

Quelles solutions ?

Les jointures permettent de relier des relations entre elles (via des clés primaires/étrangères). On élimine ces jointures en stockant des données de manière dupliquées:

MOVIE(INTEGER mid, VARCHAR(30) title); PEOPLE(INTEGER pid, VARCHAR(30) name); DIRECTOR(INTEGER pid REFERENCES PEOPLE, INTEGER mid REFERENCES MOVIE);

devient :

MOVIE(INTEGER mid, VARCHAR(30) title, List of (INTEGER,VARCHAR(30)) directors); PEOPLE(INTEGER pid, VARCHAR(30) name);

Note : comme on n'est pas en SQL, on n'est plus contraint par le modèle relationnel « plat ».

Conclusions sur MapReduce

Hadoop et HDFS

Le framework Hadoop

Apache Hadoop a initialement été développé en interne à Yahoo! en 2005. C'est un projet OpenSource, comprenant :

Hadoop Common
ensemble de bibliothèques partagées par le reste de la pile d'application
Hadoop Distributed File System (HDFS)
système de fichiers distribués permettant de stocker des gros volumes de données sur des clusters de machines modestes (comodity servers)
Hadoop YARN
ordonnanceur de tâches, gestion de la reprise sur panne, …
Hadoop MapReduce
bibliothèques permettant de programmer des tâches MapReduce

Utilisation

Hadoop est installable facilement, peut s'exécuter sans privilège particulier et se programme principalement en Java (mais des interfaces existent pour d'autres langages)

Les nœuds communiquent entre eux par des requêtes HTTP ou over ssh.

HDFS

HDFS est un système de fichiers distribués. C'est par lui que transitent les données des tâches MapReduce d'Hadoop. Ces caractéristiques sont :

Encore un peu d'OCaml

let words = [ "It'"; "s"; "a"; "beautifull"; "day" ] let words2 = words @ [ ","; "today"; "!" ]

La liste words n'est pas modifié par la concaténation (à l'inverse de Collection.addAll() en Java qui modifie en place l'argument)

Si un programme (ou plutôt un thread) travaille en parallèle sur words, la concaténation ne le gène pas (en Java &implies; ConcurrentAccesException)

On dit que le type liste est immuable : on ne peut pas modifier le contenu d'une liste une fois crée, on peut juste créer des copies de celle-ci.

HDFS

HDFS est un système de fichier très particulier. Il suppose que les fichiers vont être lus séquentiellement ne permet pas de les modifier (on peut par contre supprimer tout un fichier)

Les fichiers sont découpés en gros blocs (typiquement 64 ou 128 Mo), à contraster avec les blocs d'un disque dur (entre 512 et 4096 octets) et les pages d'une base de données (de 8Ko à 1Mo).

L'accès direct au milieu d'un fichier est relativement coûteux (presque autant que lire le fichier jusqu'à la position donnée)

Une tâche MapReduce a la garantie que les données persistent au moins jusqu'à la fin de la tâche.

Problèmes d'Hadoop MapReduce + HDFS

Architecture logicielle très bas niveau :

L'API MapReduce d'Hadoop :

Démo

On va effectuer les manipulations suivantes :