DuckDB + dbplyr: When Your Pipeline Gives Different Results Every Time It Runs

Author : Rtask
Categories : database, development, tips
Tags : dbplyr, dplyr, DuckDB, reproductibilité
Date :

Short on time? Here’s the gist: DuckDB parallelizes query execution and never guarantees row order unless you explicitly ask for it. If any step in your pipeline is order-sensitive, row_number(), cumsum(), lag(), distinct(.keep_all = TRUE), inequality joins, you are silently producing non-deterministic results. This post shows the four patterns that bite you and how to fix each one.


The Setup: A SAS Pipeline, Now in R

You have inherited (or written) a data pipeline originally coded in SAS. It processes administrative billing records: matching line items against reference tables, applying time-varying coefficients, deduplicating based on business identifiers, computing running counters. Classic ETL work.

The migration to R goes well. You use {DBI} to open a DuckDB connection, load your source files as lazy tables via {arrow} or dplyr::tbl(), build the transformations with {dbplyr}, and collect the result at the very end. Your code is readable, your tests compare the R output to the SAS reference, and they pass (maybe using {datadiff}).

Then you run the pipeline again.

The numbers are different.

Not wildly different. A few rows shifted, a few amounts swapped. Exactly the kind of difference that would slip through a quick visual check but break a reconciliation report. You run it ten more times. Seven match the first run. Three match the second. You are now staring at intermittent, data-dependent non-determinism, which is the worst kind.

This post documents the four root causes we encountered in production and the patterns that fix them.


Why DuckDB Is Different From What You Expect

In SAS, the data step processes rows in physical order, the order they sit on disk. That order is stable. Procedures like PROC SORT make it explicit. The whole language is built around the idea that row order matters and is predictable.

DuckDB is a columnar, parallel query engine. It splits work across CPU cores, processes data in chunks (vectors), and reassembles results. The order in which chunks are processed is not guaranteed. It depends on the query plan, the number of threads, the size of the data, and internal scheduling decisions that can change between runs.

This is not a bug. It is the expected behavior of any modern analytical database. The SQL standard does not define a row order unless you write ORDER BY. DuckDB simply makes this visible in ways that SQLite or an in-memory data frame do not, because it actually parallelizes.

The consequence for {dbplyr} users: any R code that implicitly relies on row order, even if it looks like ordinary dplyr, will produce unpredictable results when translated to SQL and executed by DuckDB.


Source 1, Window Functions Without an Explicit Order

This is the most common culprit.

The problem

# Looks fine. It isn't.
data |>
  group_by(entity_id) |>
  mutate(rn = row_number()) |>
  filter(rn == 1)

row_number() without an order clause assigns numbers in whatever order the rows happen to arrive at the window function. In DuckDB that order is non-deterministic. The row you keep is random.

The same applies to cumsum(), lag(), and lead():

# cumsum() accumulates in random order if rows aren't sorted first
data |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(counter = cumsum(code == "TYPE_A"))

# lag() reads the "previous" row, undefined if order is undefined
data |>
  group_by(code) |>
  mutate(prev_rate = lag(rate))

The fix: window_order() before every window function

dbplyr provides window_order() to inject an ORDER BY clause inside the window frame. The key is that the columns listed must collectively break all ties within a group, otherwise rows with identical sort keys are still processed in random order.

# WRONG, all rows in the same group have identical values for these three columns
# The tie is never broken
data |>
  window_order(entity_id, invoice_id, delay) |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(rn = row_number())

# CORRECT, row_id is unique per line and breaks every tie
data |>
  window_order(entity_id, invoice_id, delay, row_id) |>
  group_by(entity_id, invoice_id, delay) |>
  mutate(rn = row_number())

Rule: the window_order() key must include at least one column that is unique within the group. The columns of group_by() alone are never sufficient, they are identical for every row in the group by definition.


Source 2, distinct(.keep_all = TRUE)

The problem

distinct() without .keep_all is safe: it only retains the columns listed, which are identical across all matching rows by definition. But .keep_all = TRUE asks DuckDB to also return the other columns from one of the matching rows, and it picks arbitrarily.

# If multiple rows share (client_id, product_id) with different amounts,
# the amount you get back is random
data |>
  distinct(client_id, product_id, .keep_all = TRUE)

# Adding a filter upstream doesn't save you if the filter can still
# return multiple rows per group
data |>
  group_by(client_id, product_id) |>
  filter(date == min(date, na.rm = TRUE)) |>  # ties on date → multiple rows
  ungroup() |>
  distinct(client_id, product_id, .keep_all = TRUE)   # ← still random

Option A: summarise() when you only need one aggregated value

data |>
  group_by(client_id, product_id) |>
  summarise(
    first_date = min(date, na.rm = TRUE),
    .groups = "drop"
  )

Option B: window_order() + filter(row_number() == 1L) when you need the whole row

data |>
  group_by(client_id, product_id) |>
  window_order(date, desc(amount)) |>   # explicit, deterministic choice
  filter(row_number() == 1L) |>
  ungroup()

The second option lets you express which row you actually want, which is almost always what the business logic intended in the first place.


Source 3, Inequality Joins That Create a Fan-Out

This one is subtle and data-dependent, which makes it especially dangerous.

The problem

A common pattern in billing pipelines is joining a transaction table against a reference table of time-varying rates or coefficients:

data |>
  left_join(
    ref_rates,
    by = join_by(code, date >= rate_start, date <= rate_end)
  )

If ref_rates has two overlapping validity periods for the same code, say one row covers Jan–Dec and another covers Jul–Dec for a corrected value, then every transaction in that period matches two rows in ref_rates. The join doubles those rows (fan-out ×2).

This fan-out then propagates silently through every downstream step. Your cumsum() accumulates double. Your row_number() sees duplicate keys and becomes non-deterministic even with a window_order() that was previously sufficient.

The worst part: this only manifests for the specific code values that happen to have overlapping periods in your reference data. It may affect one entity out of fifty, making it look like a rare data quality issue rather than a structural pipeline bug.

# Verify whether a fan-out has already occurred
data_after_join |>
  count(entity_id, line_id) |>
  filter(n > 1) |>
  collect()
# Non-empty → fan-out confirmed

The fix: pre-resolve by (key × date) before the equi-join

Instead of joining the full transaction table against the reference with an inequality condition, first build a small lookup that maps each unique (key, date) pair to exactly one reference row:

# Step 1: find all unique (code, date) combinations present in the data
# Step 2: apply the inequality join only on this small lookup
# Step 3: deduplicate to one row per (code, date), choosing explicitly which period wins
# Step 4: join back to the full table with a simple equi-join, no fan-out possible

rates_resolved <- data |>
  distinct(code, date) |>
  left_join(
    ref_rates,
    by = join_by(code, date >= rate_start, date <= rate_end)
  ) |>
  group_by(code, date) |>
  window_order(desc(rate_start), desc(rate_end)) |>  # most recent period wins
  filter(row_number() == 1L) |>
  ungroup() |>
  select(-rate_start, -rate_end)

data <- data |>
  left_join(rates_resolved, by = c("code", "date"))  # equi-join, safe

Note that you should not deduplicate the reference table globally by key before the join. That would discard non-overlapping historical periods that are still valid for other dates. The pre-resolution must be surgical: resolve only the pairs where multiple periods are simultaneously valid for a given target date.


Source 4, Synthetic Rows That Are Perfectly Identical

The problem

Some pipelines expand rows based on a quantity field: an invoice line with qty = 3 becomes three separate line items. If you discard the expansion index after duplicating, the three rows become perfect duplicates, identical on every column. No window_order() can distinguish them.

# Expansion creates qty identical rows, then throws away the only discriminant
data |>
  slice(rep(seq_len(n()), times = qty)) |>
  select(-qty)   # ← now you have perfect duplicates

Any downstream window function operating on these rows will produce arbitrary results because the engine has no way to deterministically assign numbers or order to indistinguishable objects.

The fix: keep the expansion index as a tiebreaker

# Keep the position within the expansion as a discriminant column
expanded <- data |>
  mutate(series = as.integer(series))   # position 1, 2, 3, …

# Include it in every downstream window_order
expanded |>
  window_order(entity_id, line_id, series) |>
  group_by(entity_id) |>
  mutate(rn = row_number())

# Drop it only at the very end of the pipeline, after all window operations
result |>
  select(-series)

The same logic applies whenever you union_all() tables that might contain identical rows: add a source tag before the union so downstream steps can use it as a tiebreaker.


Bonus: Type-Dependent Deduplication

A related trap: when a table contains multiple row types that share a key column, a single deduplication pass using one type’s counter will silently drop the other type’s rows.

# records contains TYPE_A and TYPE_B rows sharing the same entity_id
# Deduplicating by (entity_id, counter_a) eliminates TYPE_B rows
# because counter_a is the same for both types within a given entity_id
records |>
  group_by(entity_id, counter_a) |>
  window_order(entity_id, counter_a, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

The fix is to split into branches and apply the correct counter to each type:

records_a <- records |>
  filter(type != "TYPE_B") |>
  group_by(entity_id, counter_a) |>
  window_order(entity_id, counter_a, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

records_b <- records |>
  filter(type == "TYPE_B") |>
  group_by(entity_id, counter_b) |>
  window_order(entity_id, counter_b, line_id) |>
  filter(row_number() == 1L) |>
  ungroup()

records_final <- union_all(records_a, records_b)

Checklist Before You Ship DuckDB/dbplyr Code

Copy this into your code review template:

Window functions
– [ ] Every mutate(rn = row_number()) is preceded by window_order() with a key that breaks all ties within the group
– [ ] Every mutate(x = cumsum(...)) is preceded by window_order() that includes at least one column unique within the group
– [ ] Every mutate(prev = lag(...)) is preceded by a deterministic window_order()
– [ ] None of the window_order() columns are exclusively group_by() columns

distinct()
– [ ] No distinct(..., .keep_all = TRUE) is used unless the upstream filter is guaranteed to return exactly one row per group
– [ ] All distinct(.keep_all = TRUE) have been replaced by summarise() or window_order() + filter(row_number() == 1L)

Inequality joins
– [ ] Every join_by(key, date >= start, date <= end) is followed by a check that no two periods in the reference table overlap for the same key
– [ ] Where overlap is possible, the pre-resolution pattern (key × target date) is used instead of a direct join
– [ ] Deduplication after an inequality join is on (key × target date), not on (key) alone

Synthetic rows
– [ ] Every slice(rep(...)) or equivalent expansion retains an index column usable as a tiebreaker in downstream window_order() calls
– [ ] That index column is dropped only after all window operations are complete

Type-dependent logic
– [ ] When deduplication logic differs by row type, each type is processed in a separate branch with its own reference counter


How to Detect Residual Non-Determinism

The most direct method: run the pipeline multiple times and compare the aggregate output.

library(purrr)

runs <- map(1:8, function(i) {
  source("pipeline.R")
  result_table |>
    summarise(
      total_amount = sum(amount, na.rm = TRUE),
      n_rows = n()
    ) |>
    collect()
})

map_dfr(runs, identity)
# If total_amount or n_rows varies across the 8 runs → residual non-determinism

If you find variation, binary-search your pipeline: collect intermediate tables at the midpoint of your transformation chain and run the first half N times. If the midpoint is stable, the bug is in the second half. Repeat until you isolate the step where variation first appears.


Conclusion

DuckDB is an excellent tool for this kind of work, fast, embeddable, compatible with Arrow and Parquet, and it composes beautifully with {dbplyr}. But it is not a data frame with SQL syntax. It is a parallel query engine, and it will silently expose every assumption your code makes about row order.

The good news: all four patterns described here are fixable without restructuring your pipeline. The rules are simple once you internalize them:

  1. Every order-sensitive window operation needs an explicit window_order() with a true tiebreaker.
  2. distinct(.keep_all = TRUE) is a code smell in DuckDB, replace it with an explicit choice.
  3. Inequality joins need a pre-resolution step if the reference table can have overlapping periods.
  4. Synthetic rows need to keep their expansion index until the end.

The tricky part is that none of these bugs announce themselves. The code runs without errors, tests pass on some data, and the difference between two runs can be as small as one row in a thousand. The only defense is systematic code review against the checklist above, and running your pipeline more than once during development.


Comments


Also read