--- title: "Joins in Production" author: "Gilles Colling" date: "`r Sys.Date()`" output: rmarkdown::html_vignette vignette: > %\VignetteIndexEntry{Joins in Production} %\VignetteEngine{knitr::rmarkdown} %\VignetteEncoding{UTF-8} --- ```{r setup, include = FALSE} knitr::opts_chunk$set( collapse = TRUE, comment = "#>", fig.width = 6, fig.height = 4 ) library(joinspy) ``` At the console, we see the wrong row count, fix the key, and re-run. In a scheduled script, the same issue goes unnoticed until someone looks at the output. This vignette wires joinspy into automated pipelines so that join problems surface as errors or log entries. Every example uses synthetic data and `tempfile()` paths, so the whole thing runs end to end. Production here means any context where nobody watches the console: cron jobs, CI runs, scheduled reports, Shiny back ends, `targets` pipelines. The failure mode is depressingly uniform. An upstream table gains a duplicate key or a whitespace-padded ID, the join silently multiplies or drops rows, and the numbers in the final report drift. Weeks can pass before anyone traces the drift back to the join, and by then the bad rows have propagated into every table built on top of it. The tools below convert that slow, silent failure into a loud, immediate one: a non-zero exit status the scheduler flags, a log entry a monitoring script picks up, or a failed unit test in CI. We work through the pieces in increasing order of machinery -- hard assertions, silent instrumented joins, programmatic report inspection, cardinality guards, unit tests for join contracts, logging, multi-join chain analysis, and finally what all of this costs at runtime. ## Assertions with key_check() `key_check()` returns a single logical -- `TRUE` if no issues were detected, `FALSE` otherwise. Behind that logical sit four scans: duplicate keys, NA keys, leading or trailing whitespace, and case mismatches between the two tables. The simplest assertion wraps it in `stopifnot()`: ```{r} orders <- data.frame( customer_id = c("C01", "C02", "C03", "C04"), amount = c(100, 200, 150, 300), stringsAsFactors = FALSE ) customers <- data.frame( customer_id = c("C01", "C02", "C03", "C04"), region = c("East", "West", "East", "North"), stringsAsFactors = FALSE ) # This passes -- keys are clean stopifnot(key_check(orders, customers, by = "customer_id", warn = FALSE)) ``` An assertion this cheap can run on every execution. When it passes, it adds nothing to the log; when it fails, `stopifnot()` raises an error, the script exits with a non-zero status, and the scheduler marks the run as failed. That exit status is what makes the pattern work: cron, GitHub Actions, and Airflow all key their alerting off it, so a failed key check becomes a notification without any extra plumbing. When the keys have problems, the script halts: ```{r error = TRUE} orders_dirty <- data.frame( customer_id = c("C01", "C02 ", "C03 ", "C04"), amount = c(100, 200, 150, 300), stringsAsFactors = FALSE ) stopifnot(key_check(orders_dirty, customers, by = "customer_id", warn = FALSE)) ``` The two padded IDs (`"C02 "` and `"C03 "`) would silently fail to match in a left join -- the rows survive, the customer columns come back `NA`, and downstream aggregation quietly treats them as customerless orders. Halting at the check, before the join runs, keeps the bad rows out of everything built afterwards. With `warn = FALSE`, the printed diagnostics are suppressed; in a cron job or CI pipeline, the hard failure is the message. An explicit `if`/`stop` gives us a custom error text: ```{r error = TRUE} if (!key_check(orders_dirty, customers, by = "customer_id", warn = FALSE)) { stop("Key quality check failed for orders-customers join. ", "Run join_spy() interactively for details.", call. = FALSE) } ``` The custom message earns its keep months later, when the error lands in a log file at 3 a.m. and whoever reads it has no context. Naming the join and pointing at `join_spy()` turns a bare `stopifnot` failure into a starting point for diagnosis. We can also chain the assertion with a repair step -- run `key_check()` first, repair on failure, then re-check: ```{r} ok <- key_check(orders_dirty, customers, by = "customer_id", warn = FALSE) if (!ok) { repaired <- join_repair( orders_dirty, customers, by = "customer_id", trim_whitespace = TRUE, remove_invisible = TRUE ) orders_clean <- repaired$x customers_clean <- repaired$y # Re-check after repair stopifnot(key_check(orders_clean, customers_clean, by = "customer_id", warn = FALSE)) } ``` `join_repair()` only touches mechanical defects -- here, trimming whitespace and stripping invisible Unicode characters from the key columns. It cannot invent missing customers or decide which of two duplicate rows is correct, so the re-check after repair matters: if the second `key_check()` still fails, the problem is structural (true duplicates, NA keys) and the script should stop for a human. `key_check()` is a binary pass/fail gate; `join_spy()` builds a full report with match rates, expected row counts, and categorized issues. The split mirrors how the checks run: `key_check()` performs the cheap scans and skips the heavier analyses such as near-match detection, so it stays affordable on every execution. In production, `key_check()` runs every time. `join_spy()` is what we reach for when an assertion fails and we need to understand why. ## Silent Joins in Pipelines The `*_join_spy()` wrappers print diagnostic output by default. In a scheduled script, `.quiet = TRUE` suppresses all printed output while still computing the report internally. ```{r} sensors <- data.frame( sensor_id = c("S01", "S02", "S03", "S04"), location = c("Roof", "Basement", "Lobby", "Garage"), stringsAsFactors = FALSE ) readings <- data.frame( sensor_id = c("S01", "S02", "S03", "S05"), temperature = c(22.1, 18.5, 21.0, 19.3), stringsAsFactors = FALSE ) # Nothing printed result <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE) ``` `.quiet = TRUE` overrides `verbose`, so it wins regardless of what a caller passed for `verbose`. The report is computed either way and stored twice: once in the package environment, where `last_report()` finds it, and once as an attribute on the returned data frame. ```{r} rpt <- last_report() rpt$match_analysis$match_rate ``` The match rate is the share of unique left-table keys that found a partner on the right -- here 3 of the 4 sensor IDs, since `S04` has no reading and `S05` has no sensor. The join runs silently; later, we pull the report and check its contents programmatically: ```{r} rpt <- last_report() if (rpt$match_analysis$match_rate < 0.95) { warning(sprintf( "Low match rate (%.1f%%) in sensor join -- check for missing sensor IDs", rpt$match_analysis$match_rate * 100 )) } ``` The report object is a plain list, so standard R subsetting works for arbitrarily complex validation logic. Anything the printed report shows is reachable by name; the next section walks the full structure. One caveat: `last_report()` stores only the most recent report. If a script performs three joins in sequence, only the third report survives. To retain earlier reports, capture them explicitly: ```{r} result1 <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE) report1 <- last_report() # ... later ... result2 <- inner_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE) report2 <- last_report() ``` The same report also travels with the result as an attribute, which sidesteps the global state entirely: ```{r} identical(attr(result1, "join_report"), report1) ``` `attr(result1, "join_report")` returns the report for that specific join no matter how many joins ran since. Inside package code or functions other people call, the attribute is the safer interface; `last_report()` is a console convenience that happens to also work in scripts. ## Inspecting Reports Programmatically Threshold checks like the one above need to know where the numbers live. A `JoinReport` is an S3 object built on a named list, and its components map one-to-one onto the sections of the printed report: ```{r} rpt <- join_spy(orders_dirty, customers, by = "customer_id") names(rpt) ``` `x_summary` and `y_summary` describe the key columns of each table: row count (`n_rows`), unique keys (`n_unique`), duplicated keys (`n_duplicates`), rows affected by duplication (`n_duplicate_rows`), and NA keys (`n_na`). `match_analysis` holds the overlap: `n_matched`, `n_left_only`, `n_right_only`, and `match_rate`. `expected_rows` predicts the row count for each join type (`inner`, `left`, `right`, `full`), and `issues` is a list with one entry per detected problem. Of the remaining components, `by` records the key, `multicolumn_analysis` scores each column of a composite key separately, and `memory_estimate` holds human-readable result-size estimates per join type. ```{r} rpt$x_summary$n_duplicates rpt$match_analysis$match_rate rpt$expected_rows$left ``` The padded IDs cut the match rate to 50% even though neither table has a single duplicate. For alerting, three numbers cover most needs: the match rate, the issue count, and the expected row count for the join type about to run. ```{r} length(rpt$issues) vapply(rpt$issues, function(i) i$type, character(1)) ``` Each issue is itself a list with a `type`, a `severity` (`"error"`, `"warning"`, or `"info"`), a human-readable `message`, and usually a `details` element holding the offending values. Filtering on severity separates problems that should halt a pipeline from observations that belong in a log: ```{r} severities <- vapply(rpt$issues, function(i) i$severity, character(1)) severities sum(severities == "warning") ``` Here the whitespace issue carries `"warning"` severity while the near-match hint is merely `"info"`. Wrapping the thresholds in a function gives the pipeline a single reusable gate: ```{r} report_gate <- function(rpt, min_match = 0.95) { stopifnot(is_join_report(rpt)) problems <- character(0) if (rpt$match_analysis$match_rate < min_match) { problems <- c(problems, sprintf( "match rate %.0f%% below %.0f%%", 100 * rpt$match_analysis$match_rate, 100 * min_match )) } sev <- vapply(rpt$issues, function(i) i$severity, character(1)) if (any(sev == "warning")) { problems <- c(problems, sprintf("%d warning-level issue(s)", sum(sev == "warning"))) } problems } report_gate(rpt) ``` `is_join_report()` guards the entry point. `last_report()` returns `NULL` before any join has run, and a half-wired pipeline will happily pass that `NULL` along; failing on the class check localizes the bug to the gate's caller. The returned character vector doubles as the alert body -- empty means healthy, anything else is the text of the page. For metric collection, `summary()` flattens the same numbers into a two-column data frame: ```{r} summary(rpt) ``` That shape suits accumulating one row of metrics per pipeline run or writing into a database table; `summary(rpt, format = "markdown")` produces a table for inclusion in a rendered report. ## Cardinality Guards A join that was one-to-one in development can become many-to-many in production when upstream data changes. `join_strict()` enforces a cardinality constraint and throws an error if it is violated. In development, we use `detect_cardinality()` to understand the actual relationship: ```{r} products <- data.frame( product_id = c("P1", "P2", "P3"), name = c("Widget", "Gadget", "Gizmo"), stringsAsFactors = FALSE ) line_items <- data.frame( product_id = c("P1", "P1", "P2", "P3", "P3"), order_id = c(101, 102, 103, 104, 105), stringsAsFactors = FALSE ) detect_cardinality(products, line_items, by = "product_id") ``` One-to-many: each product appears once in `products` but can appear multiple times in `line_items`. `detect_cardinality()` prints its finding and returns the string invisibly, so the same call works interactively and as a value in a gate (`card <- detect_cardinality(...)`). We encode the expectation in production: ```{r} result <- join_strict( products, line_items, by = "product_id", type = "left", expect = "1:n" ) nrow(result) ``` If someone loads a `products` table with duplicate product IDs, the script fails immediately: ```{r error = TRUE} products_bad <- data.frame( product_id = c("P1", "P1", "P2", "P3"), name = c("Widget", "Widget v2", "Gadget", "Gizmo"), stringsAsFactors = FALSE ) join_strict( products_bad, line_items, by = "product_id", type = "left", expect = "1:n" ) ``` This is the classic slow failure made fast. A reference table that was clean for two years gains its first duplicate -- a vendor file gets loaded twice, or a re-run ETL step appends a second copy -- and every downstream left join starts multiplying rows. Without the guard, the symptom is inflated totals in a report some weeks later; with it, the run fails the same night, and the error message carries the cause: the expected cardinality, the one actually found, and the duplicate counts on each side. The four cardinality levels: - **1:1** -- lookup to lookup. Each key appears exactly once on both sides. - **1:n** -- reference on the left, transactions on the right (products to line items, stations to hourly readings). - **n:1** -- transactions on the left, lookup on the right (sales joined to a region table). - **n:m** -- duplicates on both sides. Almost always a bug; requiring an explicit `expect = "n:m"` acts as a speed bump. In practice, `"1:n"` and `"n:1"` cover most production joins. `detect_cardinality()` confirms the relationship during development; the `expect` value is then hard-coded in the production script. `check_cartesian()` solves a different problem: it warns about Cartesian product explosion when a key has many duplicates on *both* sides. A join can violate a `"1:1"` constraint without triggering a Cartesian explosion (one extra duplicate is enough), and a `"n:m"` join can produce a massive product that `join_strict()` would allow. The two functions complement each other. ```{r} events_a <- data.frame(id = rep(c("E1", "E2"), each = 20), src = "a", stringsAsFactors = FALSE) events_b <- data.frame(id = rep(c("E1", "E2"), each = 20), src = "b", stringsAsFactors = FALSE) chk <- check_cartesian(events_a, events_b, by = "id") chk$expansion_factor ``` The return value is a list with `has_explosion`, the `expansion_factor`, the predicted `total_inner` row count, and a `worst_keys` data frame naming the keys responsible. As a pipeline gate, `if (chk$has_explosion) stop(...)` catches the blow-up before the join allocates the memory. The 20x factor here turns a 40-row input into an 800-row result; the same arithmetic on a million-row table is what fills a server's RAM at 2 a.m. ## Testing Join Contracts with testthat The gates so far run when the pipeline runs. A second line of defense runs when the code changes: unit tests that pin down the join contracts -- which columns join which tables, at what cardinality, with what key quality. These contracts rarely appear in documentation; they live in the heads of whoever wrote the pipeline. Encoding them as testthat expectations lets them survive refactors and staff turnover. Since `key_check()` returns a logical and `detect_cardinality()` returns a string, both drop straight into expectations: ```{r, eval = requireNamespace("testthat", quietly = TRUE)} library(testthat) test_that("orders join customers cleanly on customer_id", { expect_true(key_check(orders, customers, by = "customer_id", warn = FALSE)) }) ``` ```{r, eval = requireNamespace("testthat", quietly = TRUE)} test_that("products to line_items is one-to-many", { expect_identical( detect_cardinality(products, line_items, by = "product_id"), "1:n" ) }) ``` The cardinality test is the one that pays off. Cardinality violations come from data, and data changes without commits; running this test against a fresh extract in CI, or on a schedule, catches the first duplicate product before the nightly join multiplies line items. A third useful contract is row preservation, written against the prediction in the report: ```{r, eval = requireNamespace("testthat", quietly = TRUE)} test_that("left join is predicted to preserve order rows", { rpt <- join_spy(orders, customers, by = "customer_id") expect_equal(rpt$expected_rows$left, nrow(orders)) }) ``` `expected_rows$left` equals `nrow(orders)` exactly when no left key matches a duplicated right key, so this single expectation encodes "the enrichment join does not multiply rows" without running the join. In a package or pipeline repository these tests live in `tests/testthat/test-join-contracts.R` and run with everything else under `testthat::test_dir()` or `R CMD check`. What data they point at is the design decision: against committed fixture files they test the code's assumptions; against a small fresh extract they test the data itself, which makes them a scheduled data-quality job wearing a unit-test interface. ## Logging and Audit Trails ### Manual logging `log_report()` writes a single report to a file. The format depends on the file extension: `.txt` and `.log` produce human-readable text, `.json` produces machine-readable JSON, and `.rds` saves the complete R object. ```{r} report <- join_spy(sensors, readings, by = "sensor_id") # Text format -- human-readable txt_log <- tempfile(fileext = ".log") log_report(report, txt_log) cat(readLines(txt_log), sep = "\n") unlink(txt_log) ``` The text entry mirrors the printed report -- key column, per-table summaries, match analysis, expected rows per join type, and an issue tally -- with a separator line so consecutive entries stay readable in one file. By default `log_report()` overwrites; `append = TRUE` adds to the end (text and log formats only), and the timestamp written with each entry means a grep for a date range works on the raw file. ```{r} # JSON format -- machine-readable json_log <- tempfile(fileext = ".json") log_report(report, json_log) cat(readLines(json_log), sep = "\n") unlink(json_log) ``` Text format works for tailing logs during a batch run; JSON format feeds into monitoring systems or downstream scripts. Reports can also be saved as `.rds` files, which preserves the full R object for later interactive inspection. The `.rds` route matters when the issues themselves need preserving: text and JSON record issue counts and types, while the RDS file keeps each issue's `details` element -- the actual offending key values -- which is what we want in front of us when reconstructing what went wrong last Tuesday. ### Automatic logging For scripts with many joins, `set_log_file()` at the top is cleaner than calling `log_report()` after each one. Every subsequent `*_join_spy()` call appends its report to the file. ```{r} auto_log <- tempfile(fileext = ".log") set_log_file(auto_log, format = "text") # These joins are automatically logged result1 <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE) result2 <- inner_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE) # Check what got logged cat(readLines(auto_log), sep = "\n") # Clean up set_log_file(NULL) unlink(auto_log) ``` Each wrapper call appends one entry, so the file accumulates a join-by-join history of the run even when every join is `.quiet`. `set_log_file()` returns the previous setting invisibly, which lets a function enable logging for its own joins and then put back whatever the caller had configured: ```{r} fn_log <- tempfile(fileext = ".log") previous <- set_log_file(fn_log) result <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE) set_log_file(previous) unlink(fn_log) ``` Automatic logging only triggers from `*_join_spy()` wrappers. `join_strict()` and bare `merge()` calls are not logged -- the wrappers are the instrumented path. To combine cardinality enforcement with logging, run `detect_cardinality()` as a separate check and use a `*_join_spy()` wrapper for the actual join. `get_log_file()` returns the current log path (or `NULL` if logging is disabled), which is useful inside helpers that should log only when the surrounding pipeline asked for it: ```{r} # Only log if logging is configured if (!is.null(get_log_file())) { message("Logging is active at: ", get_log_file()) } ``` ## Reading JSON Logs Downstream The JSON format exists for consumers that are not R: a dashboard, a cron wrapper in Python, a shell script that greps last night's log. joinspy serializes reports with an internal base-R writer, so the package carries no JSON dependency, and the output is plain JSON that any parser reads. Each entry records the join key, both table summaries, the match analysis, the expected row counts, the issue count and types, and a timestamp. Each report is appended as its own object, so the file is a sequence of JSON objects; line-oriented tools work on it directly, and a full parser reads it one object at a time. Reading it back in R needs nothing beyond base functions, so we log two joins and then play the part of the monitoring script: ```{r} json_log <- tempfile(fileext = ".json") set_log_file(json_log, format = "json") r1 <- left_join_spy(sensors, readings, by = "sensor_id", .quiet = TRUE) r2 <- inner_join_spy(orders, customers, by = "customer_id", .quiet = TRUE) set_log_file(NULL) ``` The monitoring side reads the file, finds the lines carrying the field it cares about, and extracts the values: ```{r} log_lines <- readLines(json_log) rate_lines <- grep('"match_rate"', log_lines, value = TRUE) rate_lines ``` ```{r} rates <- as.numeric(sub('.*"match_rate": *([0-9.]+).*', "\\1", rate_lines)) rates which(rates < 0.95) unlink(json_log) ``` The sensor join logged its 75% match rate, the order join a clean 100%, and the threshold flags entry one. The same extract pattern works on `"n_issues"` for issue-count alerting. It also works on `"logged_at"` for staleness checks -- a pipeline whose newest log entry is two days old has a different problem than a pipeline logging failures, and the timestamp catches it. For richer processing, any JSON library parses the entries one object at a time; the field names match the report components walked through earlier, so a script written against the JSON and a script written against `last_report()` read the same vocabulary. ## Diagnosing a Multi-Join Pipeline Real pipelines chain joins: orders pick up customer attributes, customers pick up regions, regions pick up targets. When the final table has the wrong row count, the offending join could be any of them, and checking each one by hand means re-running the pipeline step by step. `analyze_join_chain()` does that walk in one call -- it takes a named list of tables and a list of join specifications, runs `join_spy()` at every step, carries the intermediate result forward with a left join, and prints a per-step summary. ```{r} orders_chain <- data.frame( order_id = 1:6, customer_id = c("C1", "C2", "C2", "C3", "C4", "C4"), stringsAsFactors = FALSE ) customers_chain <- data.frame( customer_id = c("C1", "C2", "C3", "C4"), region_id = c("R1", "R1", "R2", "R3"), stringsAsFactors = FALSE ) regions <- data.frame( region_id = c("R1", "R2"), region_name = c("North", "South"), stringsAsFactors = FALSE ) ``` ```{r} chain <- analyze_join_chain( tables = list(orders = orders_chain, customers = customers_chain, regions = regions), joins = list( list(left = "orders", right = "customers", by = "customer_id"), list(left = "result", right = "regions", by = "region_id") ) ) ``` The literal name `"result"` refers to the accumulated output of the previous steps, so chains of any length need only that one keyword. Step 1 reports the order-side duplicates, which are expected for transaction data joining a lookup. The real finding is at step 2, where the match rate drops: region `"R3"` exists in the customer table and has no row in `regions`. That is the kind of fact that hides for months in a three-join pipeline, because each individual join "works". The return value is a list with one entry per step, each holding the table names, the `by` columns, and the full `JoinReport`, so everything from the programmatic-inspection section applies per step: ```{r} chain[[2]]$report$match_analysis$match_rate chain[[2]]$report$match_analysis$left_only_keys ``` A chain-level gate reduces to one vector: ```{r} vapply(chain, function(s) length(s$report$issues), integer(1)) ``` In production we run the chain analysis at deployment time and whenever an upstream schema changes. Since it performs each intermediate left join to feed the next step, its cost is comparable to running the pipeline's joins once, which makes it a pre-flight and post-incident tool; the per-run gates stay with `key_check()` and `join_strict()`. ## Sampling for Large Datasets The `sample` parameter in `join_spy()` runs the analysis on a random subset while the actual join (via a `*_join_spy()` wrapper) still operates on the full data. ```{r} # Simulate a large dataset set.seed(42) big_orders <- data.frame( customer_id = sample(paste0("C", sprintf("%04d", 1:5000)), 50000, replace = TRUE), amount = round(runif(50000, 10, 500), 2), stringsAsFactors = FALSE ) big_customers <- data.frame( customer_id = paste0("C", sprintf("%04d", 1:6000)), region = sample(c("North", "South", "East", "West"), 6000, replace = TRUE), stringsAsFactors = FALSE ) # Full analysis system.time(report_full <- join_spy(big_orders, big_customers, by = "customer_id")) # Sampled analysis system.time(report_sampled <- join_spy(big_orders, big_customers, by = "customer_id", sample = 5000)) ``` The sampled report records its own provenance in `report$sampling` -- the sample size and the original row counts -- and the printed report flags it, so a log reader can tell estimated diagnostics from exact ones: ```{r} report_sampled$sampling ``` The sampled report is approximate -- match rates and duplicate counts are estimated from the subset. For production monitoring, we typically care whether the match rate is roughly 95% or roughly 60%, not whether it is 94.7% or 95.1%. Sampling catches systemic problems (wrong key column, widespread encoding issues, duplicate explosion) with a fraction of the runtime. A `set.seed()` ahead of the call makes the sampled diagnostic reproducible, which keeps two runs on identical data from flapping between alert and no alert. Sampling can miss rare issues. If 0.1% of keys have a zero-width space, a 5,000-row sample from a 10-million-row table might not include any. Running full diagnostics periodically (weekly, or when the upstream source changes) alongside sampled daily runs covers both speed and thoroughness. ## A Complete Production Pattern Here is a realistic production workflow: a nightly job loads order and customer data, validates keys, repairs if needed, joins with cardinality enforcement, and logs everything. ```{r} # ============================================================ # Nightly order enrichment pipeline # ============================================================ # --- Setup logging --- pipeline_log <- tempfile(fileext = ".log") set_log_file(pipeline_log, format = "text") # --- Load data (simulated) --- orders <- data.frame( order_id = 1:6, customer_id = c("C001", "C002 ", "C003", "C003", "C004", "C005"), amount = c(150, 230, 89, 410, 320, 175), stringsAsFactors = FALSE ) customers <- data.frame( customer_id = c("C001", "C002", "C003", "C004", "C005", "C006"), name = c("Acme Corp", "Globex", "Initech", "Umbrella", "Soylent", "Wonka"), tier = c("gold", "silver", "gold", "bronze", "silver", "gold"), stringsAsFactors = FALSE ) # --- Gate 1: key quality assertion --- keys_ok <- key_check(orders, customers, by = "customer_id", warn = FALSE) if (!keys_ok) { message("Key issues detected -- attempting repair") repaired <- join_repair( orders, customers, by = "customer_id", trim_whitespace = TRUE, remove_invisible = TRUE ) orders <- repaired$x customers <- repaired$y } # --- Gate 2: cardinality check --- card <- detect_cardinality(orders, customers, by = "customer_id") if (card == "n:m") { set_log_file(NULL) unlink(pipeline_log) stop("Unexpected n:m cardinality in orders-customers join", call. = FALSE) } # --- Join (with auto-logging via *_join_spy) --- enriched <- left_join_spy(orders, customers, by = "customer_id", .quiet = TRUE) # --- Gate 3: row count sanity check --- # A left join should never lose rows from the left table if (nrow(enriched) < nrow(orders)) { set_log_file(NULL) unlink(pipeline_log) stop("Row count decreased after left join -- possible data corruption", call. = FALSE) } # --- Output --- message(sprintf("Pipeline complete: %d enriched orders", nrow(enriched))) head(enriched) # --- Review the log --- if (file.exists(pipeline_log)) { cat(readLines(pipeline_log), sep = "\n") } # --- Cleanup --- set_log_file(NULL) unlink(pipeline_log) ``` The three gates catch different failure modes. Gate 1 catches string-level problems and repairs the mechanical ones; in this run it finds the padded `"C002 "` and trims it, so that customer keeps its name and tier in the enriched output. Gate 2 halts on the one structural problem repair cannot touch: an unexpected many-to-many relationship, the kind that multiplies rows. Gate 3 is the catch-all -- a left join returning fewer rows than its left table means something went wrong at a level the key diagnostics do not see, such as an upstream filter applied to the wrong object. Logging runs throughout because `set_log_file()` was called at the top, so even a run that dies at gate 3 leaves a record of the join that preceded the death. Equally deliberate is what the pattern does not check. It never verifies that `customer_id` is the right column to join on; a clean key on the wrong column passes every gate. It enforces no match-rate floor, so orders for customers missing from the reference table sail through with `NA` names -- adding a fourth gate on `last_report()$match_analysis$match_rate` covers that case when unmatched keys count as failures for the job at hand. And it validates keys only: a corrupted `amount` column is invisible to every check here, which is why these gates complement value-level validation tools rather than replace them. The pattern stops scaling somewhere around a dozen joins. At that point the gate blocks become copy-paste boilerplate, the thresholds belong in a config file, and the per-join logic wants to be a function taking two tables, a key, and an expected cardinality. Past that, a pipeline framework such as `targets` is the better skeleton, with these same checks living inside each node. The log file also grows without bound under a daily schedule; naming it by date (`sprintf("enrich-%s.log", Sys.Date())`) keeps each day's audit trail separate and bounded. ## The Cost of Diagnostics Every gate adds runtime, and on a 50,000-row join it is worth seeing how much. We time the bare join, the instrumented wrapper, and the cheap gate on the tables from the sampling section: ```{r} t_merge <- system.time( merge(big_orders, big_customers, by = "customer_id", all.x = TRUE) ) t_spy <- system.time( left_join_spy(big_orders, big_customers, by = "customer_id", .quiet = TRUE) ) t_check <- system.time( key_check(big_orders, big_customers, by = "customer_id", warn = FALSE) ) rbind(merge = t_merge, left_join_spy = t_spy, key_check = t_check)[, 1:3] ``` The gap between `merge()` and `left_join_spy()` is the full diagnostic: key summaries, match analysis, row-count prediction, and the string scans. The scans are vectorized, and the one quadratic piece -- near-match detection -- is capped at 50 unmatched keys against 100 candidates regardless of table size, so the diagnostic cost grows roughly linearly with the number of unique keys. `key_check()` runs the cheap subset only, which is why it can sit in front of every join in a script. When the elapsed column stops looking ignorable -- tables in the tens of millions of rows, or a join inside a tight loop -- the fallback order is: keep `key_check()` on every run, move the full `join_spy()` diagnostic to a `sample =` run, and reserve the unsampled analysis for a weekly job or for the morning after an alert. The numbers above are the budget conversation in miniature: the gate costs a fraction of the join it protects, and the sampling section shows how to hold that fraction steady as the tables grow.