Le standard SQL spécifie un grand nombre d'aspects des bases de données relationnelles :
Le modèle relationnel et les bases de données SQL offrent des garanties très fortes :
Maintenir un modèle ACID a un impact énorme sur les performances (création de snapshots, mise en place de verrous coûteux, attente de confirmation par le disque que les données sont sauvées, …).
Comment augmenter les performances et le volume des données ?
Il existe deux modèles de distribution possible :
Le modèle ACID est parfois trop contraignant :
Y a t-il un modèle alternatif ?
Le théorème (énoncé comme une conjecture en 1999 par E. Brewer et prouvé en 2004) dit que dans un système distribué partageant des données, au plus 2 des 3 propriétés suivantes peuvent être optimale :
Le théorème en lui-même est souvent mal interprété. Il a surtout été énoncé (d'après Brewer) pour inciter les gens à explorer d'autres modèles/compromis.
Le modèle BASE est régit par les propriétés suivantes :
On considère un système avec deux nœuds N1 et N2 :
Le modèle BASE permet de créer des systèmes composés de plusieurs nœuds, capable de stocker des données indépendemment et de répondre rapidement. Comment le mettre à profit ?
Un peu de code OCaml pour se détendre
let words = [ "It'"; "s"; "a"; "beautifull"; "day" ]
(* On veut compter la longueur totale des chaînes de caractères
dans words *)
let lengths = List.map (fun s -> String.length s) words
(* lengths = [ 3; 1; 1; 10; 3 ] *)
let total = List.fold_left (fun acc i -> i + acc) 0 lengths
(* total = 18 *)
Ces observations sont à la base de MapReduce
Popularisé par Jeffrey Dean et Sanjay Ghemawat dans l'article
:
MapReduce: Simplified Data Processing on Large
Clusters (OSDI, 2004)
Remarque: le paradigme n'est pas nouveau (BD distribuées, langages fonctionnels), mais l'article l'a popularisé et a permis l'arrivée d'implémentations Open-Source robustes.
Cadre : on dispose d'un grand nombre de machines, dont chacune dispose localement d'un ensemble de données (distinct des autres). Un nœud particulier joue le rôle d'orchestrateur les autres sont des travailleurs (workers).
Une transformation MapReduce se décompose en trois phases :
Le programmeur fournit deux transformations :
map(InputKey k, InputValue v) → (OutputKey * IntermediateValue list)
reduce(OutputKey, IntermediateValue list) → OutputValue list
Lors de la phase Map, l'orchestrateur exécute une copie de la transformation map sur chaque worker. Chacun ne transforme que les données qu'il possède localement.
La fonction map reçoit ses données d'entrées sous la forme d'une paire clé, valeur (par exemple (nom de fichier, fichier) ou (id, ligne correspondant à l'id dans une table)).
La fonction renvoie comme résultat une liste de valeur transformées associé à une clé de groupe
Lorsque la phase Map est terminée sur tous les nœuds, les données sont échangées entre nœuds et groupées selon la clé de groupe
Cette opération est une barrière, elle ne peut se produire qu'après la fin de la phase Map. De plus elle nécessite l'échange de données sur le réseau (coûteux).
L'orchestrateur exécute une copie de la fonction reduce sur chaque nœud. Cette fonction reçoit en argument une clé de groupe et la liste de toutes les valeurs intermédiaires associées, et produit un résultat final par clé.
La liste de ces résultats (pour chaque clé) est renvoyée au programmeur ou stockée sur les nœuds pour être réutilisée dans un nouveau cycle Map/Shuffle/Reduce
On suppose stocké sur les nœuds des ensembles de fichiers,
auxquels on peut accéder sous forme de paires :
(nom de fichier, contenu du fichier)
map(InputKey file, InputValue content) {
for each word in content {
Output(word, 1);
}
}
reduce(OutputKey word, InputermediateValue list ones) {
int total = 0;
for i in ones {
total += i;
}
return total;
}
Chaque nœud travaille en parallèle lors des phases Map et Reduce.
Un nœud plus lent retarde tout le monde (en pratique, le même calcul peut être lancé de manière redondante sur plusieurs nœuds et le résultat du premier qui termine est gardé)
La phase de Shuffle est coûteuse en accès réseaux, le modèle de coût est différent des BD relationnelles (on va chercher à minimiser les échanges réseaux)
Optimisation : si les fonctions de réduction est associative et commutative, on peut appliquer Reduce localement avant la phase de Shuffle et échanger des paquets de réduits.
Le paradigme MapReduce est destiné aux très gros volumes de données. Une BD relationnelle sera toujours plus rapide, qu'un seul nœud. Pour tirer partie du parallélisme il faut un jeu de données inexploitable sur un machine unique (de l'ordre du tera-octet ou peta-octet).
On suppose que l'on dispose d'une table Text
possédant une unique colonne VARCHAR(100) word.
Donner une requête SQL qui simule WordCount :
SELECT word, SUM(num) FROM
(SELECT word, 1 AS num
FROM WORD) AS T
GROUP BY word;
Quelles autres fonctionnalité de SQL peut on facilement supporter ?
Pourquoi ?
Rappel du pseudo-code de la jointure naturelle entre deux attributs a et b de deux tables A et B :
for each row r in A
do
for each row s in B
do
if r.a == s.b then output r &join; s
done
done
On doit comparer chaque ligne de A,
à toutes les lignes de B. Dans un contexte
distribué, une grande partie des lignes de B n'est
pas locale
&implies; trop d'échanges sur le réseau !
Les jointures permettent de relier des relations entre elles (via des clés primaires/étrangères). On élimine ces jointures en stockant des données de manière dupliquées:
MOVIE(INTEGER mid, VARCHAR(30) title);
PEOPLE(INTEGER pid, VARCHAR(30) name);
DIRECTOR(INTEGER pid REFERENCES PEOPLE, INTEGER mid REFERENCES MOVIE);
devient :
MOVIE(INTEGER mid, VARCHAR(30) title, List of (INTEGER,VARCHAR(30)) directors);
PEOPLE(INTEGER pid, VARCHAR(30) name);
Note : comme on n'est pas en SQL, on n'est plus contraint par le modèle relationnel « plat ».
Apache Hadoop a initialement été développé en interne à Yahoo! en 2005. C'est un projet OpenSource, comprenant :
Hadoop est installable facilement, peut s'exécuter sans privilège particulier et se programme principalement en Java (mais des interfaces existent pour d'autres langages)
Les nœuds communiquent entre eux par des requêtes HTTP ou over ssh.
HDFS est un système de fichiers distribués. C'est par lui que transitent les données des tâches MapReduce d'Hadoop. Ces caractéristiques sont :
let words = [ "It'"; "s"; "a"; "beautifull"; "day" ]
let words2 = words @ [ ","; "today"; "!" ]
La liste words n'est pas modifié par la concaténation (à l'inverse de Collection.addAll() en Java qui modifie en place l'argument)
Si un programme (ou plutôt un thread) travaille en parallèle sur words, la concaténation ne le gène pas (en Java &implies; ConcurrentAccesException)
On dit que le type liste est immuable : on ne peut pas modifier le contenu d'une liste une fois crée, on peut juste créer des copies de celle-ci.
HDFS est un système de fichier très particulier. Il suppose que les fichiers vont être lus séquentiellement ne permet pas de les modifier (on peut par contre supprimer tout un fichier)
Les fichiers sont découpés en gros blocs (typiquement 64 ou 128 Mo), à contraster avec les blocs d'un disque dur (entre 512 et 4096 octets) et les pages d'une base de données (de 8Ko à 1Mo).
L'accès direct au milieu d'un fichier est relativement coûteux (presque autant que lire le fichier jusqu'à la position donnée)
Une tâche MapReduce a la garantie que les données persistent au moins jusqu'à la fin de la tâche.
Architecture logicielle très bas niveau :
On va effectuer les manipulations suivantes :
Hadoop, HDFS et MapReduce proposent des briques de bases permettant de traîter des gros volumes de données
Ce sont des outils très bas-niveau, on perd le côté déclaratif des bases de données relationnelles (ou XML) alors qu'on voulait uniquement relâcher un peu les contraintes du modèle ACID
Apache Hive est un système d'entrepôt de données (data-warehouse system), i.e. une base de données spécialisée pour le reporting et l'analyse de données
Il a été développé en interne par Facebook avant d'être diffusé sous licence OpenSource
Initialement construit au dessus de Hadoop/HDFS/MapReduce, il supporte maintenant d'autres backends
Les tables HiveQL sont stockées comme des fichiers dans HDFS.
Que se passe-t-il quand on insère ou supprime une seule ligne ?
Un delta entre la table originale et la table modifiée est stocké (dans un nouveau fichier). Lors du chargement d'une portion de table, les delta sont appliqués pour reconstituer la table courante.
Un garbage collector compacte les tables et leur delta quand suffisamment de delta se sont accumulés.
Dans Hive, l'optimiseur de requête ne cherche pas le plan optimal pour les jointures mais cherche à exprimer la requête sous forme d'un arbre (ou plus exactement d'un DAG) dont les nœuds sont des MapReduce.
Le modèle de coût utilisé est complexe, et fait intervenir entre autres les coût des communications réseau
On va effectuer les manipulations suivantes :
Java 8 apporte des traits fonctionnels au langage. Intérêt de la programmation fonctionnelle :
Trois ingrédients :
Une interface fonctionnelle est simplement une interface contenant une seule méthode abstraite. Cette utilisation permet de définir des types de « fonctions ».
class MyIncr implements Function<Integer,Integer> {
//Doit s'appeler apply
Integer apply(Integer x) { return x + 1; }
}
class MyOdd implements Predicate<Integer> {
//Doit s'appler test
boolean test(Integer x) { return x % 2 == 1; }
}
class MyPrint implements Consumer<Integer> {
//Doit s'appler accept
void accept(Integer x) { System.out.println(x); }
}
Nouvelle API de Java 8. Permet de définir de manière déclarative des itérateurs sur des flux. Étant donné un objet qui implémente l'(ancienne) interface Collection<E> (par exemple Vector<E>, List<E>, Set<E>) :
Vector<Integer> v = …;
…
Stream<Integer> s = v.stream();
Que gagne t'on ?
On va pouvoir définir des itérateurs persistants sur les flux, paresseux et parallélisable
Quelle différence avec les opérateurs sur les Collections ?
Les collections sous-jacentes ne sont pas détruites!
Supposons que l'on a un vecteur d'Integer v. On souhaite, tous les incrémenter, les trier, puis afficher ceux qui sont impairs :
Vector<Integer> v = … ;
v.stream().map(new MyIncr())
.sorted()
.filter(new MyOdd())
.forEach(new MyPrint());
Exercice, faire la même chose sans passer par les flux
Il est malcommode de définir les opérations à passer aux itérateurs dans des classes (MyIncr, …, dans l'exemple précédent). Java 8 fournit une syntaxe commode pour définir des fonctions :
Vector<Integer> v = … ;
v.stream().map(i -> i+1)
.sorted()
.filter(i -> i % 2 == 1)
.forEach(i -> System.out.println(i));
La syntaxe générale est :
(Type1 p1, …, Typen pn) -> corps
Le corps peut être soit une expression simple soit un bloc :
(Integer i) -> i+1
(Integer i) -> {
System.out.println(i);
return i+1;
}
Avantage des λ-abstractions ?
PrintWriter out = …;
…
v.stream().forEach(i -> out.println(i) );
Stream<Integer> vs = v.stream().sorted();
if (do_print)
vs.forEach(i -> System.out(i));
//le tri n'est fait que si on force (en utilisant .forEach())