Background
In this follow-up post (see part 1 if you missed it), we will explore what happens to the query performance if we read the files straight into Arrow instead of downloading them locally first.
Reading remote CSV files
In the first part, we first downloaded the compressed CSV files locally
(using the download.file()
function) and then we used the
open_dataset()
function on this set of files to make it available to
Arrow.
However, it is possible to bypass the local download. We can import the
files directly over an Internet connection using the read_csv_arrow()
function and providing the file URL as the first argument. Once the file
is loaded in memory, we can then write it to disk in the parquet format
(given that we learned in
part 1 that this
format provided the best compromise of disk space usage and query
performance).
We can then modify the code from the download_daily_package_logs_csv()
function from part 1 to the following (lines changed have comments
indicated by # <---
at the end of the line).
library(tidyverse)
library(arrow)
## Download the data set for a given date from the RStudio CRAN log website.
## `date` is a single date for which we want the data
## `path` is where we want the data to live
download_daily_package_logs_parquet <- function(date,
path = "~/datasets/cran-logs-parquet-by-day") {
## build the URL for the download
date <- parse_date(date)
url <- paste0(
'https://cran-logs.rstudio.com/', date$year, '/', date$date_chr, '.csv.gz'
)
## build the path for the destination of the download
file <- file.path(
path,
paste0("year=", date$year),
paste0("month=", date$month),
paste0(date$date_chr, ".parquet") # <--- change extension to .parquet
)
## create the folder if it doesn't exist
if (!dir.exists(dirname(file))) {
dir.create(dirname(file), recursive = TRUE)
}
## download the file
message("Downloading data for ", date$date_chr, " ... ", appendLF = FALSE)
arrow::read_csv_arrow(url) %>% # <--- read directly from URL
arrow::write_parquet(sink = file) # <--- convert to parquet on disk
message("done.")
## quick check to make sure that the file was created
if (!file.exists(file)) {
stop("Download failed for ", date$date_chr, call. = FALSE)
}
## return the path
file
}
## This function is unchanged from part 1
## and extract the year and month from it
parse_date <- function(date) {
stopifnot(
"`date` must be a date" = inherits(date, "Date"),
"provide only one date" = identical(length(date), 1L),
"date must be in the past" = date < Sys.Date()
)
list(
date_chr = as.character(date),
year = as.POSIXlt(date)$year + 1900L,
month = as.POSIXlt(date)$mon + 1L
)
}
Now that we are set up, we can create the file system the same way we did, in part 1.
dates_to_get <- seq(
as.Date("2022-06-01"),
as.Date("2022-08-15"),
by = "day"
)
purrr::walk(dates_to_get, download_daily_package_logs_parquet)
The result is similar to what we achieved in part 1. We have one file for each day placed in a folder corresponding to their month. Except that this time, instead of having compressed CSV files, we have parquet files:
~/datasets/cran-logs-parquet-by-day/
└── year=2022
├── month=6
│ ├── 2022-06-01.parquet
│ ├── 2022-06-02.parquet
│ ├── 2022-06-03.parquet
│ ├── ...
│ └── 2022-06-30.parquet
├── month=7
│ ├── 2022-07-01.parquet
│ ├── 2022-07-02.parquet
│ ├── 2022-07-03.parquet
│ ├── ...
│ └── 2022-07-31.parquet
└── month=8
├── 2022-08-01.parquet
├── 2022-08-02.parquet
├── 2022-08-03.parquet
├── ...
└── 2022-08-15.parquet
Let’s check how large this data is compared to the datasets we created in part 1:
dataset_size <- function(path) {
fs::dir_info(path, recurse = TRUE) %>%
filter(type == "file") %>%
pull(size) %>%
sum()
}
tribble(
~ Format, ~ size,
"Compressed CSV", dataset_size("~/datasets/cran-logs-csv/"),
"Arrow", dataset_size("~/datasets/cran-logs-arrow/"),
"Parquet", dataset_size("~/datasets/cran-logs-parquet/"),
"Parquet by day", dataset_size("~/datasets/cran-logs-parquet-by-day/")
)
# A tibble: 4 × 2
Format size
<chr> <fs::bytes>
1 Compressed CSV 5.01G
2 Arrow 29.67G
3 Parquet 5.06G
4 Parquet by day 4.63G
The dataset with one parquet file per day, is slightly smaller than when
we let write_dataset()
do its own partitioning which led to one file
per month.
We can now compare how quickly Arrow can read these datasets.
bench::mark(
parquet = open_dataset("~/datasets/cran-logs-parquet", format = "parquet"),
parquet_by_day = open_dataset("~/datasets/cran-logs-parquet-by-day", format = "parquet"),
check = FALSE
)
# A tibble: 2 × 6
expression min median `itr/sec` mem_alloc `gc/sec`
<bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
1 parquet 139.43ms 143.66ms 6.62 7.91MB 0
2 parquet_by_day 3.52ms 3.82ms 254. 4.28KB 6.45
Even though there are more files to parse (76 vs. 3), loading the dataset with a parquet file per day is a bit faster.
cran_logs_parquet <- open_dataset("~/datasets/cran-logs-parquet", format = "parquet")
cran_logs_parquet_by_day <- open_dataset("~/datasets/cran-logs-parquet-by-day", format = "parquet")
Let’s now explore the performance of a few queries on these datasets.
First, how long does it take to compute the number of rows in these datasets:
bench::mark(
parquet = nrow(cran_logs_parquet),
parquet_by_day = nrow(cran_logs_parquet)
)
# A tibble: 2 × 6
expression min median `itr/sec` mem_alloc `gc/sec`
<bch:expr> <bch:tm> <bch:tm> <dbl> <bch:byt> <dbl>
1 parquet 743µs 773µs 1267. 4.74KB 8.48
2 parquet_by_day 745µs 773µs 1273. 1.97KB 10.7
Not much of a difference.
Let’s now compare the performance of the query we ran in part 1, where we computed the 10 most downloaded packages in the period covered by our dataset.
top_10_packages <- function(data) {
data %>%
count(package, sort = TRUE) %>%
head(10) %>%
mutate(n_million_downloads = n/1e6) %>%
select( - n) %>%
collect()
}
bench::mark(
top_10_packages(cran_logs_parquet),
top_10_packages(cran_logs_parquet_by_day)
)
Warning: Some expressions had a GC in every iteration; so filtering is disabled.
# A tibble: 2 × 6
expression min median `itr/sec` mem_al…¹
<bch:expr> <bch:tm> <bch:tm> <dbl> <bch:by>
1 top_10_packages(cran_logs_parquet) 3.58s 3.58s 0.279 7.19MB
2 top_10_packages(cran_logs_parquet_by_day) 5.76s 5.76s 0.174 165.36KB
# … with 1 more variable: `gc/sec` <dbl>, and abbreviated variable name
# ¹mem_alloc
# ℹ Use `colnames()` to see all variable names
This query runs 1.5 seconds faster on the dataset with one parquet file per month compared to the dataset with one parquet file per day.
The way a dataset is partitioned has an impact on the performance of queries. If you are filtering your dataset along a variable used in the partitioning, some of the files can be skipped. Arrow can directly and only read the file(s) with the relevant information for your query. For instance, if you are performing a query that only touches the month of July, Arrow does not need to look at the files for June or August, leading to potential speed-ups.
Would the partitioning by day help us run our query faster if we were to compute the 10 most downloaded packages for a single day? After all, in this case, we would only need to look at one of the files in our folder of parquet files, and the file in question would be smaller than one that has all the data for the month. Let’s compare the performance of this query for August 1st, 2022:
top_10_packages_by_day <- function(data) {
data %>%
filter(date == as.Date("2022-08-01")) %>%
count(package, sort = TRUE) %>%
head(10) %>%
collect()
}
bench::mark(
top_10_packages_by_day(cran_logs_parquet),
top_10_packages_by_day(cran_logs_parquet_by_day)
)
# A tibble: 2 × 6
expression min median itr/s…¹ mem_a…²
<bch:expr> <bch:> <bch:> <dbl> <bch:b>
1 top_10_packages_by_day(cran_logs_parquet) 304ms 348ms 2.87 222KB
2 top_10_packages_by_day(cran_logs_parquet_by_day) 354ms 354ms 2.82 167KB
# … with 1 more variable: `gc/sec` <dbl>, and abbreviated variable names
# ¹`itr/sec`, ²mem_alloc
# ℹ Use `colnames()` to see all variable names
Interestingly, running the query on the monthly parquet file is still faster. It takes about 30% longer to run the queries on the one parquet file per day. The overhead associated with having too many small files in this situation does not compensate for having to look inside a single file to perform this operation. For the benefits of partitioning to be visible, we would need to have more data in each parquet file.
We don’t see a performance benefit of having many small files even when we try to get the result on a single day. But how does this partitioning impact the performance of a query that needs to access multiple random rows? Let’s compare how a query that looks at the number of downloads per day for a given package.
package_downloads_by_day <- function(data, pkg = "arrow") {
data %>%
filter(package == pkg) %>%
count(date) %>%
arrange(date) %>%
collect()
}
bench::mark(
package_downloads_by_day(cran_logs_parquet),
package_downloads_by_day(cran_logs_parquet_by_day)
)
Warning: Some expressions had a GC in every iteration; so filtering is disabled.
# A tibble: 2 × 6
expression min median `itr/sec`
<bch:expr> <bch:tm> <bch:tm> <dbl>
1 package_downloads_by_day(cran_logs_parquet) 3.31s 3.31s 0.302
2 package_downloads_by_day(cran_logs_parquet_by_day) 4.46s 4.46s 0.224
# … with 2 more variables: mem_alloc <bch:byt>, `gc/sec` <dbl>
# ℹ Use `colnames()` to see all variable names
In this case, it takes about 45% longer to perform this query. In this situation, the performance is affected by having to look inside many more files in the dataset with one parquet file per day.
Conclusion
This small example illustrates that it might be worth exploring how best to partition your dataset to benefit the most from the speed that Arrow brings to your queries. In this example, the partitioning that seemed the most “natural” based on the format the data is provided (one parquet file per day) is not the best to make queries run fast.
The variables you include in your queries have also a role to play when deciding how to partition your dataset. It might be best to partition your dataset according to variables you use most often in your queries.
The useR!2022 Arrow tutorial has a convincing demonstration that taking advantage of partitioning for your queries makes them run much faster.
Expand for Session Info
sessioninfo::session_info()
─ Session info ───────────────────────────────────────────────────────────────
setting value
version R version 4.2.1 (2022-06-23)
os Ubuntu 22.04.1 LTS
system x86_64, linux-gnu
ui X11
language en_US
collate en_US.UTF-8
ctype en_US.UTF-8
tz Europe/Paris
date 2022-09-01
pandoc NA (via rmarkdown)
─ Packages ───────────────────────────────────────────────────────────────────
package * version date (UTC) lib source
arrow * 9.0.0 2022-08-10 [1] CRAN (R 4.2.1)
assertthat 0.2.1 2019-03-21 [1] RSPM
backports 1.4.1 2021-12-13 [1] RSPM
bench 1.1.2 2021-11-30 [1] RSPM
bit 4.0.4 2020-08-04 [1] RSPM
bit64 4.0.5 2020-08-30 [1] RSPM
broom 1.0.0 2022-07-01 [1] RSPM
cellranger 1.1.0 2016-07-27 [1] RSPM
cli 3.3.0 2022-04-25 [1] RSPM (R 4.2.0)
colorspace 2.0-3 2022-02-21 [1] RSPM
crayon 1.5.1 2022-03-26 [1] RSPM
DBI 1.1.3 2022-06-18 [1] RSPM
dbplyr 2.2.1 2022-06-27 [1] RSPM
digest 0.6.29 2021-12-01 [1] RSPM
dplyr * 1.0.9 2022-04-28 [1] RSPM
ellipsis 0.3.2 2021-04-29 [1] RSPM
evaluate 0.15 2022-02-18 [1] RSPM
fansi 1.0.3 2022-03-24 [1] RSPM
fastmap 1.1.0 2021-01-25 [1] RSPM
forcats * 0.5.1 2021-01-27 [1] RSPM
fs 1.5.2 2021-12-08 [1] RSPM
gargle 1.2.0 2021-07-02 [1] RSPM
generics 0.1.3 2022-07-05 [1] RSPM
ggplot2 * 3.3.6 2022-05-03 [1] RSPM
glue 1.6.2 2022-02-24 [1] RSPM (R 4.2.0)
googledrive 2.0.0 2021-07-08 [1] RSPM
googlesheets4 1.0.0 2021-07-21 [1] RSPM
gtable 0.3.0 2019-03-25 [1] RSPM
haven 2.5.0 2022-04-15 [1] RSPM
hms 1.1.1 2021-09-26 [1] RSPM
htmltools 0.5.3 2022-07-18 [1] RSPM
httr 1.4.3 2022-05-04 [1] RSPM
jsonlite 1.8.0 2022-02-22 [1] RSPM
knitr 1.39 2022-04-26 [1] RSPM
lifecycle 1.0.1 2021-09-24 [1] RSPM
lubridate 1.8.0 2021-10-07 [1] RSPM
magrittr 2.0.3 2022-03-30 [1] RSPM
modelr 0.1.8 2020-05-19 [1] RSPM
munsell 0.5.0 2018-06-12 [1] RSPM
pillar 1.8.0 2022-07-18 [1] RSPM
pkgconfig 2.0.3 2019-09-22 [1] RSPM
profmem 0.6.0 2020-12-13 [1] RSPM
purrr * 0.3.4 2020-04-17 [1] RSPM
R6 2.5.1 2021-08-19 [1] RSPM
readr * 2.1.2 2022-01-30 [1] RSPM
readxl 1.4.0 2022-03-28 [1] RSPM
reprex 2.0.1 2021-08-05 [1] RSPM
rlang 1.0.4 2022-07-12 [1] RSPM (R 4.2.0)
rmarkdown 2.14 2022-04-25 [1] RSPM
rvest 1.0.2 2021-10-16 [1] RSPM
scales 1.2.0 2022-04-13 [1] RSPM
sessioninfo 1.2.2 2021-12-06 [1] RSPM
stringi 1.7.8 2022-07-11 [1] RSPM
stringr * 1.4.0 2019-02-10 [1] RSPM
tibble * 3.1.8 2022-07-22 [1] RSPM
tidyr * 1.2.0 2022-02-01 [1] RSPM
tidyselect 1.1.2 2022-02-21 [1] RSPM
tidyverse * 1.3.2 2022-07-18 [1] RSPM
tzdb 0.3.0 2022-03-28 [1] RSPM
utf8 1.2.2 2021-07-24 [1] RSPM
vctrs 0.4.1 2022-04-13 [1] RSPM
withr 2.5.0 2022-03-03 [1] RSPM
xfun 0.31 2022-05-10 [1] RSPM
xml2 1.3.3 2021-11-30 [1] RSPM
yaml 2.3.5 2022-02-21 [1] RSPM
[1] /home/francois/.R-library
[2] /usr/lib/R/library
──────────────────────────────────────────────────────────────────────────────
Comments