DuckDB + dbplyr : quand votre pipeline donne des résultats différents à chaque exécution

Author : Vincent Guyader
Categories : astuces, base de données, développement
Tags : dbplyr, dplyr, DuckDB
Date :

TL;DR ? L’essentiel en une phrase : DuckDB parallélise l’exécution des requêtes et ne garantit jamais l’ordre des lignes sans ORDER BY explicite. Si une étape de votre pipeline dépend de l’ordre, row_number(), cumsum(), lag(), distinct(.keep_all = TRUE), jointures par inégalité, vous produisez silencieusement des résultats non reproductibles. Cet article présente les quatre patterns qui font mal et comment les corriger.

Le contexte : un pipeline SAS, migré en R

Vous avez hérité (ou écrit) un pipeline de données initialement codé en SAS. Il traite des enregistrements de facturation administrative : rapprochement de lignes avec des tables de référence, application de coefficients variables dans le temps, déduplication sur des identifiants métier, calcul de compteurs cumulatifs. De l’ETL classique.

La migration vers R se passe bien. Vous ouvrez une connexion DuckDB via {DBI}, chargez vos fichiers sources comme tables lazy via {arrow} ou dplyr::tbl(), construisez les transformations avec {dbplyr}, et ne collectez le résultat qu’en toute fin de chaîne. Le code est lisible, vos tests comparent la sortie R à la référence SAS, et ils passent. (peut être utilisez vous {datadiff} pour cela)

Puis vous relancez le pipeline.

Les chiffres sont différents.

Pas radicalement différents. Quelques lignes décalées, quelques montants intervertis. Exactement le genre d’écart qui passerait inaperçu à l’œil nu mais ferait planter un rapport de rapprochement. Vous relancez dix fois. Sept fois le résultat correspond au premier run. Trois fois au second. Vous êtes en train de contempler un non-déterminisme intermittent et dépendant des données, ce qui est la pire variété qui soit.

Cet article documente les quatre causes racines rencontrées en production et les patterns qui y remédient.

Pourquoi DuckDB se comporte différemment de ce qu’on attend

En SAS, le data step traite les lignes dans l’ordre physique, l’ordre dans lequel elles se trouvent sur le disque. Cet ordre est stable. Les procédures comme PROC SORT le rendent explicite. Tout le langage est construit autour de l’idée que l’ordre des lignes compte et est prévisible.

DuckDB est un moteur de requêtes colonnaire et parallèle. Il répartit le travail sur les CPU, traite les données par blocs (vecteurs) et réassemble les résultats. L’ordre dans lequel les blocs sont traités n’est pas garanti. Il dépend du plan de requête, du nombre de threads, de la taille des données et de décisions d’ordonnancement interne qui peuvent changer d’une exécution à l’autre.

Ce n’est pas un bug. C’est le comportement attendu de tout moteur analytique moderne. Le standard SQL ne définit pas d’ordre des lignes sans ORDER BY. DuckDB rend simplement ce fait visible d’une façon que SQLite ou un data frame en mémoire ne font pas, parce qu’il parallélise vraiment.

La conséquence pour les utilisateurs de {dbplyr} : tout code R qui s’appuie implicitement sur l’ordre des lignes, même s’il ressemble à du dplyr ordinaire, produira des résultats imprévisibles une fois traduit en SQL et exécuté par DuckDB.

Source 1, Fonctions window sans ordre explicite

C’est le coupable le plus fréquent.

Le problème

# Ça a l'air normal. Ce ne l'est pas.
data |>
  group_by(entity_id) |>
  mutate(rn = row_number()) |>
  filter(rn == 1)

row_number() sans clause d’ordre attribue les numéros dans l’ordre dans lequel les lignes arrivent à la fonction window. Sous DuckDB, cet ordre est non-déterministe. La ligne que vous conservez est aléatoire.

Le même problème s’applique à cumsum(), lag() et lead() :

# cumsum() accumule dans un ordre aléatoire si les lignes ne sont pas triées au préalable
data |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(counter = cumsum(code == "TYPE_A"))

# lag() lit la ligne "précédente", notion indéfinie si l'ordre l'est aussi
data |>
  group_by(code) |>
  mutate(prev_rate = lag(rate))

La correction : window_order() avant chaque fonction window

dbplyr fournit window_order() pour injecter une clause ORDER BY dans le cadre de la fenêtre. La règle clé : les colonnes listées doivent collectivement briser toutes les égalités au sein d’un groupe, sinon les lignes avec des clés de tri identiques sont toujours traitées dans un ordre aléatoire.

# FAUX, toutes les lignes du même groupe ont des valeurs identiques
# sur ces trois colonnes. L'égalité n'est jamais brisée.
data |>
  window_order(entity_id, invoice_id, delay) |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(rn = row_number())

# CORRECT, row_id est unique par ligne et brise toute égalité
data |>
  window_order(entity_id, invoice_id, delay, row_id) |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(rn = row_number())

Règle : la clé de window_order() doit inclure au moins une colonne unique dans le groupe. Les colonnes du group_by() seules ne suffisent jamais, elles sont par définition identiques pour chaque ligne du groupe.

Source 2, distinct(.keep_all = TRUE)

Le problème

distinct() sans .keep_all est sans danger : il ne conserve que les colonnes listées, qui sont identiques sur toutes les lignes correspondantes par définition. Mais .keep_all = TRUE demande à DuckDB de retourner aussi les autres colonnes depuis une des lignes correspondantes, et il choisit arbitrairement.

# Si plusieurs lignes partagent (client_id, product_id) avec des montants différents,
# le montant retourné est aléatoire
data |>
  distinct(client_id, product_id, .keep_all = TRUE)

# Ajouter un filtre en amont ne suffit pas si le filtre peut toujours
# retourner plusieurs lignes par groupe
data |>
  group_by(client_id, product_id) |>
  filter(date == min(date, na.rm = TRUE)) |>  # égalité sur date → plusieurs lignes
  ungroup() |>
  distinct(client_id, product_id, .keep_all = TRUE)   # ← toujours aléatoire

Option A : summarise() quand on ne veut qu’une valeur agrégée

data |>
  group_by(client_id, product_id) |>
  summarise(
    first_date = min(date, na.rm = TRUE),
    .groups = "drop"
  )

Option B : window_order() + filter(row_number() == 1L) quand on veut toute la ligne

data |>
  group_by(client_id, product_id) |>
  window_order(date, desc(amount)) |>   # choix explicite et déterministe
  filter(row_number() == 1L) |>
  ungroup()

La deuxième option vous force à exprimer quelle ligne vous voulez réellement, ce qui correspond presque toujours à ce que la logique métier attendait en premier lieu.

Source 3, Jointures par inégalité créant un fan-out

Celle-ci est subtile et dépendante des données, ce qui la rend particulièrement dangereuse.

Le problème

Un pattern classique dans les pipelines de facturation consiste à joindre une table de transactions contre une table de référence de tarifs ou coefficients variables dans le temps :

data |>
  left_join(
    ref_rates,
    by = join_by(code, date >= rate_start, date <= rate_end)
  )

Si ref_rates contient deux plages de validité qui se chevauchent pour le même code, par exemple une ligne couvre Jan–Déc et une autre couvre Juil–Déc pour une valeur corrigée, alors chaque transaction dans cette période correspond à deux lignes dans ref_rates. La jointure double ces lignes (fan-out ×2).

Ce fan-out se propage ensuite silencieusement dans toutes les étapes en aval. Votre cumsum() accumule en double. Votre row_number() voit des clés dupliquées et devient non-déterministe même avec un window_order() qui était auparavant suffisant.

Le pire : ce bug ne se manifeste que pour les valeurs de code qui ont des plages chevauchantes dans vos données de référence. Il peut n’affecter qu’une entité sur cinquante, et ressembler à un problème de qualité des données plutôt qu’à un bug structurel du pipeline.

# Vérifier si un fan-out s'est déjà produit
data_after_join |>
  count(entity_id, line_id) |>
  filter(n > 1) |>
  collect()
# Non-vide → fan-out confirmé

La correction : pré-résolution par (clé × date) avant l’equi-join

Au lieu de joindre la table complète des transactions contre la référence avec une condition d’inégalité, construisez d’abord un petit lookup qui associe chaque paire unique (clé, date) à exactement une ligne de référence :

# Étape 1 : identifier toutes les combinaisons (code, date) présentes dans les données
# Étape 2 : appliquer la jointure par inégalité uniquement sur ce lookup réduit
# Étape 3 : dédupliquer à une ligne par (code, date) en choisissant explicitement quelle plage prime
# Étape 4 : relier ce lookup à la table principale par equi-join, aucun fan-out possible

rates_resolved <- data |>
  distinct(code, date) |>
  left_join(
    ref_rates,
    by = join_by(code, date >= rate_start, date <= rate_end)
  ) |>
  group_by(code, date) |>
  window_order(desc(rate_start), desc(rate_end)) |>  # la plage la plus récente prime
  filter(row_number() == 1L) |>
  ungroup() |>
  select(-rate_start, -rate_end)

data <- data |>
  left_join(rates_resolved, by = c("code", "date"))  # equi-join, sans fan-out

Attention : il ne faut pas dédupliquer la table de référence globalement par clé avant la jointure. Cela supprimerait des plages historiques non-chevauchantes qui restent valides pour d’autres dates. La pré-résolution doit être chirurgicale : on ne résout que les paires où plusieurs plages sont simultanément valides pour une date cible donnée.

Source 4, Lignes synthétiques parfaitement identiques

Le problème

Certains pipelines expansent des lignes en fonction d’un champ de quantité : une ligne de facture avec qty = 3 devient trois lignes distinctes. Si on supprime l’index d’expansion après la duplication, les trois lignes deviennent des doublons parfaits, identiques sur chaque colonne. Aucun window_order() ne peut les distinguer.

# L'expansion crée qty lignes identiques, puis supprime le seul discriminant
data |>
  slice(rep(seq_len(n()), times = qty)) |>
  select(-qty)   # ← doublons parfaits désormais

Toute fonction window opérant ensuite sur ces lignes produira des résultats arbitraires, car le moteur n’a aucun moyen d’assigner de façon déterministe des numéros ou un ordre à des objets indiscernables.

La correction : conserver l’index d’expansion comme tiebreaker

# Conserver la position dans l'expansion comme colonne discriminante
expanded <- data |>
  mutate(series = as.integer(series))   # position 1, 2, 3, …

# L'inclure dans chaque window_order en aval
expanded |>
  window_order(entity_id, line_id, series) |>
  group_by(entity_id) |>
  mutate(rn = row_number())

# Ne la supprimer qu'en toute fin de pipeline, après toutes les opérations window
result |>
  select(-series)

La même logique s’applique chaque fois qu’on fait un union_all() de tables pouvant contenir des lignes identiques : ajouter un tag de source avant l’union pour que les étapes en aval puissent s’en servir comme tiebreaker.

Bonus : déduplication dépendante du type de ligne

Un piège apparenté : quand une table contient plusieurs types de lignes qui partagent une colonne de clé, un seul passage de déduplication utilisant le compteur d’un type éliminera silencieusement les lignes de l’autre type.

# records contient des lignes TYPE_A et TYPE_B partageant le même entity_id
# Dédupliquer par (entity_id, counter_a) élimine les lignes TYPE_B
# parce que counter_a est identique pour les deux types au sein d'un même entity_id
records |>
  group_by(entity_id, counter_a) |>
  window_order(entity_id, counter_a, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

La correction : séparer en branches et appliquer le bon compteur à chaque type :

records_a <- records |>
  filter(type != "TYPE_B") |>
  group_by(entity_id, counter_a) |>
  window_order(entity_id, counter_a, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

records_b <- records |>
  filter(type == "TYPE_B") |>
  group_by(entity_id, counter_b) |>
  window_order(entity_id, counter_b, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

records_final <- union_all(records_a, records_b)

Checklist avant de livrer du code DuckDB/dbplyr

À copier dans votre template de revue de code :

Fonctions window
– [ ] Chaque mutate(rn = row_number()) est précédé d’un window_order() dont la clé brise toutes les égalités dans le groupe
– [ ] Chaque mutate(x = cumsum(...)) est précédé d’un window_order() incluant au moins une colonne unique dans le groupe
– [ ] Chaque mutate(prev = lag(...)) est précédé d’un window_order() déterministe
– [ ] Les colonnes du window_order() ne sont pas exclusivement des colonnes du group_by()

distinct()
– [ ] Aucun distinct(..., .keep_all = TRUE) n’est utilisé sans garantie que le filtre en amont retourne exactement une ligne par groupe
– [ ] Tous les distinct(.keep_all = TRUE) ont été remplacés par summarise() ou window_order() + filter(row_number() == 1L)

Jointures par inégalité
– [ ] Chaque join_by(clé, date >= début, date <= fin) est suivi d’une vérification que deux plages de la table de référence ne se chevauchent pas pour la même clé
– [ ] En cas de chevauchement possible, le pattern de pré-résolution (clé × date cible) est utilisé à la place de la jointure directe
– [ ] La déduplication après une jointure par inégalité se fait sur (clé × date cible), pas sur (clé) seule

Lignes synthétiques
– [ ] Chaque slice(rep(...)) ou équivalente conserve une colonne d’index utilisable comme tiebreaker dans les window_order() en aval
– [ ] Cette colonne d’index n’est supprimée qu’après toutes les opérations window

Conclusion

DuckDB est un excellent outil pour ce type de travail, rapide, embarquable, compatible avec Arrow et Parquet, et il se compose élégamment avec {dbplyr}. Mais ce n’est pas un data frame avec une syntaxe SQL. C’est un moteur de requêtes parallèle, et il exposera silencieusement chaque hypothèse implicite que votre code fait sur l’ordre des lignes.

La bonne nouvelle : les quatre patterns décrits ici sont tous corrigeables sans restructurer votre pipeline. Les règles sont simples une fois qu’on les a intégrées :

  1. Toute opération window sensible à l’ordre nécessite un window_order() explicite avec un vrai tiebreaker.
  2. distinct(.keep_all = TRUE) est un code smell sous DuckDB, remplacez-le par un choix explicite.
  3. Les jointures par inégalité nécessitent une étape de pré-résolution si la table de référence peut avoir des plages qui se chevauchent.
  4. Les lignes synthétiques doivent conserver leur index d’expansion jusqu’à la fin.

Le plus comlpiqué, c’est qu’il sagit de « bug » silencieux. Le code s’exécute sans erreur, les tests passent sur certaines données, et la différence entre deux runs peut se limiter à une ligne sur mille. La seule défense est une revue de code systématique sur la checklist ci-dessus, et lancer votre pipeline plus d’une fois pendant le développement.


À propos de l'auteur

Vincent Guyader

Vincent Guyader

Codeur fou, formateur et expert logiciel R


Comments


Also read