Wednesday, October 27, 2021

Comparing SQLite, DuckDB and Arrow

Comparing SQLite, DuckDB and Arrow with UN Trade Data

Fri, Aug 27, 2021

Context

This is not a competition, is just to show how to use the hardware with relative efficiency, being the idea is to show something collaborative rather than competitive.

Assume that you work at customs and your boss asked you to obtain the aggregate exports per year for the US, Canada, the UK, France, Italy, Sri Lanka and Chile in order to conduct a benchmark that compares R native format (RDS), SQLite, DuckDB and Arrow speeds. The statistician in your team pointed you to some RDS files obtained from UN COMTRADE, an official United Nations database with comprehensive bilateral trade records at product level that goes back to 1962.

Exploring the data

Before proceeding, let’s create a table to see what we’ve got and an approximation of the file sizes in MB. Please note that these files are RDS with GZ compression, so the same in CSV would be like 150 MB each.

files <- list.files("~/comtrade", pattern = "rds", full.names = T)
sizes <- sapply(files, function(x) { file.size(x) / 1024^2 })

data.frame(
  file = files,
  size_in_mb = sizes
)
##                                                        file size_in_mb
## /home/pacha/comtrade/2011.rds /home/pacha/comtrade/2011.rds   21.30967
## /home/pacha/comtrade/2012.rds /home/pacha/comtrade/2012.rds   21.85174
## /home/pacha/comtrade/2013.rds /home/pacha/comtrade/2013.rds   22.33888
## /home/pacha/comtrade/2014.rds /home/pacha/comtrade/2014.rds   22.39536
## /home/pacha/comtrade/2015.rds /home/pacha/comtrade/2015.rds   22.81787
## /home/pacha/comtrade/2016.rds /home/pacha/comtrade/2016.rds   22.77938
## /home/pacha/comtrade/2017.rds /home/pacha/comtrade/2017.rds   23.13934
## /home/pacha/comtrade/2018.rds /home/pacha/comtrade/2018.rds   22.97443
## /home/pacha/comtrade/2019.rds /home/pacha/comtrade/2019.rds   22.01083
## /home/pacha/comtrade/2020.rds /home/pacha/comtrade/2020.rds   17.98888

Now we can explore the files, all the file have the same columns but contain data for different years. Each line in this file is telling us how much apples/oranges/bananas/etc country X sent to country Y in the year T, and this is measured in USD.

head(readRDS("~/comtrade/2011.rds"))
##   year  reporter_iso   partner_iso commodity_code trade_value_usd_exp
## 1 2011 0-unspecified 0-unspecified           0303                8861
## 2 2011 0-unspecified 0-unspecified           0403             1037391
## 3 2011 0-unspecified 0-unspecified           0901                4346
## 4 2011 0-unspecified 0-unspecified           0902              103447
## 5 2011 0-unspecified 0-unspecified           1806              476699
## 6 2011 0-unspecified 0-unspecified           2101               21728
##   trade_value_usd_imp
## 1                  68
## 2                8524
## 3                 374
## 4                 102
## 5                   0
## 6                   0

Creating a SQLite database

This section assumes intermediate SQL knowledge, see A Crash Course on PostgreSQL for R Users in case of questions.

SQLite is the most famous embedded database solution. In order to use SQLite from R, we need to load (and install) the package RSQlite. We can start by creating a schema, which is basically a structure with 0 rows but column names and their types (strings, numbers, etc), and we’ll add rows to it later. We shall add indexes to our table, in order to allow faster filtering, therefore this allows a fair comparison in our benchmarks that we’ll show later.

library(RSQLite)

sqlite_file <- "~/comtrade/2011_2020.sqlite"

if (!file.exists(sqlite_file)) {
  con <- dbConnect(SQLite(), sqlite_file)
  
  # table ----

  dbSendQuery(
    con,
    "CREATE TABLE yrpc (
    year integer NOT NULL,
    reporter_iso varchar(3) NOT NULL,
    partner_iso varchar(3) NOT NULL,
    commodity_code varchar(4) NOT NULL,
    trade_value_usd_exp decimal(16,2) DEFAULT NULL,
    trade_value_usd_imp decimal(16,2) DEFAULT NULL)"
  )
  
  # indexes ----
  
  dbSendQuery(con, "CREATE INDEX year ON yrpc (year)")
  dbSendQuery(con, "CREATE INDEX reporter_iso ON yrpc (reporter_iso)")
  
  # copy contents ----
  
  for (x in files) { 
    dbWriteTable(con, "yrpc", readRDS(x), append = TRUE, overwrite = FALSE, row.names = FALSE)
  }
  
  dbDisconnect(con)
  gc() # clear the memory
}

Creating a DuckDB database

This section assumes intermediate SQL knowledge, see A Crash Course on PostgreSQL for R Users in case of questions.

DuckDB is a high performance embedded database for analytics which provides a few enhancements over SQLite such as increased speed and allowing a larger number of columns. In order to use DuckDB from R, we need to load (and install) the package duckdb. This is analogous to the SQLite example. We can start by creating a schema, which is basically a structure with 0 rows but column names and their types (strings, numbers, etc), and we’ll add rows to it later. To be fair, newer DuckDB provide the duckdb_read_csv() function, which saves us creating a schema, but it doesn’t work with RDS.

library(duckdb)

duckdb_file <- "~/comtrade/2011_2020.duckdb"

if (!file.exists(duckdb_file)) {
  con <- dbConnect(duckdb(), duckdb_file)
  
  # table ----
  
  dbSendQuery(
    con,
    "CREATE TABLE yrpc (
    year integer NOT NULL,
    reporter_iso varchar(3) NOT NULL,
    partner_iso varchar(3) NOT NULL,
    commodity_code varchar(4) NOT NULL,
    trade_value_usd_exp decimal(16,2) DEFAULT NULL,
    trade_value_usd_imp decimal(16,2) DEFAULT NULL)"
  )
  
  # indexes ----
  
  dbSendQuery(con, "CREATE INDEX year ON yrpc (year)")
  dbSendQuery(con, "CREATE INDEX reporter_iso ON yrpc (reporter_iso)")
  
  # copy contents ----
  
  for (x in files) { 
    dbWriteTable(con, "yrpc", readRDS(x), append = TRUE, overwrite = FALSE, row.names = FALSE)
  }
  
  dbDisconnect(con, shutdown = TRUE)
  gc() # clear the memory
}

Creating Arrow datasets

This procedure should be similar to most tidyverse users. Unlike SQL, here we shall use partitioning variables, year and reporter, in order to divide our data into smaller parts. This is somewhat similar to SQL indexes, but instead of creating a table with less columns (what an index is) this creates a structure of folders containing different files according to the partitioning, and therefore reading the resulting data is very efficient as it allows to just skipping fragments instead of reading and then filtering as one would do with RDS (or SQL, but filtering on indexes is very efficient).

library(arrow)
library(dplyr)

arrow_dir <- "~/comtrade/2011_2020"

if (!dir.exists(arrow_dir)) {
  for (x in files) {
    readRDS(x) %>% 
      group_by(year, reporter_iso) %>% 
      write_dataset(arrow_dir, hive_style = FALSE)
    
    gc() # clean memory
  }
}

Comparing what we created

We shall use the bench package to compare the performance of what we created. But before that, let’s compare the file size of our creations.

files2 <- list.files("~/comtrade", pattern = "rds|sqlite|duckdb|parquet", full.names = T, recursive = T)
sizes2 <- sapply(files2, function(x) { file.size(x) / 1024^2 })

tibble(
  file = files2,
  size_in_mb = sizes2
) %>% 
  mutate(extension = gsub(".*\\.", "", file)) %>% 
  group_by(extension) %>% 
  summarise(total_size_in_mb = sum(size_in_mb))
## # A tibble: 4 × 2
##   extension total_size_in_mb
##   <chr>                <dbl>
## 1 duckdb               2538.
## 2 parquet               477.
## 3 rds                   220.
## 4 sqlite               2977.

RDS is the lightest option, but won’t be the fastest for what we need to do. Now we perform the same task, aggregating out datasets, with the different formats we obtained.

library(bench)
library(purrr)

countries <- c("usa", "can", "gbr", "fra", "ita", "lka", "chl")

# RDS ----

benchmark_rds <- mark(
  map_df(
    files, 
    function(x) {
      readRDS(x) %>% 
        filter(reporter_iso %in% countries) %>% 
        group_by(year, reporter_iso) %>% 
        summarise(
          total_exports_in_usd = sum(trade_value_usd_exp, na.rm = T),
          total_imports_in_usd = sum(trade_value_usd_imp, na.rm = T)
        )
    }
  )
)

# SQLite ----

# we need to open and close a connection for SQLite
con <- dbConnect(SQLite(), sqlite_file)

benchmark_sqlite <- mark(
  tbl(con, "yrpc") %>% 
    filter(reporter_iso %in% countries) %>% 
    group_by(year, reporter_iso) %>% 
    summarise(
      total_exports_in_usd = sum(trade_value_usd_exp, na.rm = T),
      total_imports_in_usd = sum(trade_value_usd_imp, na.rm = T)
    ) %>% 
    # here we need a collect at the end to move the data into R
    collect()
)

dbDisconnect(con, shutdown = T)

# DuckDB ----

# we need to open and close a connection for DuckDB
con <- dbConnect(duckdb(), duckdb_file)

benchmark_duckdb <- mark(
  tbl(con, "yrpc") %>% 
    filter(reporter_iso %in% countries) %>% 
    group_by(year, reporter_iso) %>% 
    summarise(
      total_exports_in_usd = sum(trade_value_usd_exp, na.rm = T),
      total_imports_in_usd = sum(trade_value_usd_imp, na.rm = T)
    ) %>% 
    # here we need a collect at the end to move the data to R
    collect()
)

dbDisconnect(con, shutdown = T)

# Arrow ----

benchmark_arrow <- mark(
  open_dataset("~/comtrade/2011_2020", 
               partitioning = c("year", "reporter_iso")) %>% 
    filter(reporter_iso %in% countries) %>% 
    group_by(year, reporter_iso) %>% 
    # we need a collect() before summarizing
    collect() %>% 
    summarise(
      total_exports_in_usd = sum(trade_value_usd_exp, na.rm = T),
      total_imports_in_usd = sum(trade_value_usd_imp, na.rm = T)
    )
)

Now let’s compare the time and the RAM memory used to read, filter and summarise in each case.

bind_rows(
  benchmark_rds %>% mutate(format = "R (RDS)") %>% select(format, median_time = median, mem_alloc),
  benchmark_sqlite %>% mutate(format = "SQL (SQLite)") %>% select(format, median_time = median, mem_alloc),
  benchmark_duckdb %>% mutate(format = "SQL (DuckDB)") %>% select(format, median_time = median, mem_alloc),
  benchmark_arrow %>% mutate(format = "Arrow (Parquet)") %>% select(format, median_time = median, mem_alloc)
)
## # A tibble: 4 × 3
##   format          median_time mem_alloc
##   <chr>              <bch:tm> <bch:byt>
## 1 R (RDS)               1.34m    4.08GB
## 2 SQL (SQLite)          5.48s    6.17MB
## 3 SQL (DuckDB)          1.76s  104.66KB
## 4 Arrow (Parquet)       1.36s  453.89MB

Why these differences?

RDS files have to be read completely (220 MB, ~54,000,000 rows and 6 columns) before filtering for the required countries and doing aggregation.

SQLite and DuckDB files consists in a single large file (3 GB and 2.5 GB each), but the indexes we created allow their respective packages to read a copy of the tables that has just the year and reporter_iso columns, and therefore allows very fast filtering to provide the exact location of what we need to read in the large tables.

Arrow creates a structure of directories (477 MB), so that the package navigated through a directories structure and reads just the countries that we need, which can be more efficient than indexing in SQL for some applications like this.



from Hacker News https://ift.tt/3jG5XSk

No comments:

Post a Comment

Note: Only a member of this blog may post a comment.