Any budding data scientist is likely to come to an impasse where they learn they have to take on calling API endpoints to GET or to POST data. The httr package is a convenient and intuitive wrapper to the curl package, but it can fall short when you need to open hundreds or even thousands of connections and make requests rapid fire. The curl package comes with the function curl_fetch_multi and multi_run which can coordinate an asynchronous request. The vignette is great, though it can feel a bit sparse for your first foray. This post will cover using the R package curl’s asynch tools. This is going to get a little in-depth, so be warned: this is a technical piece for people already comfortable with synchronous API calls.
# you'll need these packages ---library(curl)library(jsonlite)# I like to work with these packageslibrary(data.table)library(glue)# I like to build these. Since every API has their own flavor of query string # encoding or arrays, it's worth starting fresh most of the time. compose_query <-function(...) { params =c(list(...), recursive =TRUE) param_elements =paste(names(params), params, sep ="=")for (i inseq_along(param_elements)) {if (i == 1L) { query_string <-paste0("?", param_elements[1]) } elseif (i > 1L) { query_string <-paste(query_string, param_elements[i], sep ="&") } } query_string}compose_query(state ="OH")
[1] "?state=OH"
# at the time I wrote this, the call for CA stations was throwing a 500 error. # I am excluding CA from state.abb here. It's kind of a no-no. state.abb <- state.abb[state.abb!="CA"]
Asynchronous Calls
How’s the weather out there?
We’ll us the National Weather Service as our guinea pig. It’s a simple interface with a high rate limit and no fees involved. You can find the specification and information here. To start, we’ll get rolling with a single API call to grab weather stations and see what we’re working with.
The first step is a request header which becomes a part of the request handle. Using the .list args you can put whatever you want or need in the handle. This will get used over and over again for each of your asynch calls farther down the line.
# the spec tells us we need to add a user-agent for authentication. # This is a request header that we'll use to make our calls. # I just used my email and a a very weakly hashed string to keep # my identifier unique. you can use literally anything, and I ask # that you not use my User-Agent string. h <-new_handle() |># create the handlehandle_setheaders(.list =list(`User-Agent`=paste0("[email protected]", digest::digest("pawpawanalytics"))))
Once we’ve built our handle we’ll call an endpoint. The base url is given in the overview section. We’ll call to find the stations in the state of Ohio.
@id @type
1: https://api.weather.gov/stations/0262W wx:ObservationStation
2: https://api.weather.gov/stations/0562W wx:ObservationStation
3: https://api.weather.gov/stations/0731W wx:ObservationStation
4: https://api.weather.gov/stations/0891W wx:ObservationStation
5: https://api.weather.gov/stations/1035W wx:ObservationStation
6: https://api.weather.gov/stations/1360W wx:ObservationStation
stationIdentifier name timeZone
1: 0262W The Wellington School America/New_York
2: 0562W North Central Ohio ESC America/New_York
3: 0731W The Ohio State University America/New_York
4: 0891W Marburn Academy America/New_York
5: 1035W Natural Science Technology Center America/New_York
6: 1360W Fairview High School America/New_York
forecast
1: https://api.weather.gov/zones/forecast/OHZ055
2: https://api.weather.gov/zones/forecast/OHZ018
3: https://api.weather.gov/zones/forecast/OHZ055
4: https://api.weather.gov/zones/forecast/OHZ055
5: https://api.weather.gov/zones/forecast/OHZ003
6: https://api.weather.gov/zones/forecast/OHZ011
county
1: https://api.weather.gov/zones/county/OHC049
2: https://api.weather.gov/zones/county/OHC147
3: https://api.weather.gov/zones/county/OHC049
4: https://api.weather.gov/zones/county/OHC049
5: https://api.weather.gov/zones/county/OHC095
6: https://api.weather.gov/zones/county/OHC035
fireWeatherZone
1: https://api.weather.gov/zones/fire/OHZ055
2: https://api.weather.gov/zones/fire/OHZ018
3: https://api.weather.gov/zones/fire/OHZ055
4: https://api.weather.gov/zones/fire/OHZ055
5: https://api.weather.gov/zones/fire/OHZ003
6: https://api.weather.gov/zones/fire/OHZ011
If we index into the response object we can find a tabular data structure that, super handily, lists the endpoints for various products per station in the state of Ohio. You’ll notice that in processing the response I dropped the elevation column. This column was a list-column of arrays, so wasn’t easily converted to a tabular data set. On the off chance you need to incorporate such things I recommend looking into the function jsonlite::flatten(). For now, just keep in mind that we needed to take an extra step in creating a dataframe or data.table object.
So now we have a base url, a handle, and an understanding of our API’s query parameters. Let’s say we want to gather the station names for every station in the country. We can simply loop through our curl_fetch_memory() and wait 10,000 years for the responses to aggregate… Or… Since the long part isn’t making the call but getting a response, we can make the call and let that simmer while we spin off the remaining calls. This is the heart of asynchronous processing.
BE WARNED
Asynchronous processing cannot rely on order to sort and combine data. Because calls are asynchronous, and the world is random, making an asynch call to query params a, b, c might return the results ordered as c, b, a, which is a big shift away from synchronous operations. If you’re making calls that rely on each other, such as a paging key, asynchronous processes are not going to fit your situation. We’ll see this play out in the example below.
All 50 States
So, we want to gather the station names for all 50 states in the country. Let’s do this asynchronously. We will need a few things. The first is a pool. This is essentially a bank of queries that our asynch call can pull from whenever it’s ready to send off the next request. The next things we’ll need are callback functions. We’ll define a done and a fail function that will be called every time a request is fulfilled. Finally, we’ll use the curl_fetch_multi call to populate our pool. Then multi_run will actually run the calls for us. Let’s see it in action.
pool <-new_pool() # default values are plenty powerful here. # done will take 1 argument, which is the response object of our call. We can # call it res, or any name you choose. bin <-list() # this is the repository for our responses after done or fail. # it starts as an empty list in our calling environment. # to make life easier, you can give yourself a progress bar. This will go # sideways with a parallel call, but for now we'll enjoy it. pb <- progress::progress_bar$new(format =" downloading [:bar] :percent eta: :eta",total =length(state.abb))done <-function(res) {# this portion should look familiar. It's how we wrangled the data for our # single call. bin_resp <- res$content |>rawToChar() |> jsonlite::fromJSON() |> {\(l) l$features$properties[which(names(l$features$properties) !="elevation")]}() |>setDT() # here's where it gets fun. We'll use <<- to go up one level and make an # assignment outside the calling scope of our done call. In this case the# global environment. bin <<-c(bin, list(bin_resp)) pb$tick() # for our progress bar}fail <-function(res) {message(res$status_code) pb$tick() # for our progress bar}# now we need to put in our list of URL's. Since we want all 50 states, we need # 50 URL's that add each state as a query param. We'll use the base R state.abblapply(state.abb, \(x) { query <-compose_query(state = x) h <-new_handle(url =glue("{base_url}/stations{query}")) |>handle_setheaders(.list =list(`User-Agent`=paste0("[email protected]", digest::digest("pawpawanalytics"))))multi_add(h, done, fail, pool = pool)})multi_run(pool = pool)
And voila, there is step 1, generating an asynchronous call that is many times faster than simply looping through calls one at a time.
Organization
If you look at the object returned by the last call, you get a list of data.tables. You can of course use rbind or more directly rbindlist to concatenate them together into one large table. But if you do so you’ll notice pretty quickly that you’re missing an important part of organizing your data. The state is not listed in the returned tables, so we don’t know which stations belong to which states.
Normally you could take the order of state.abb and determine the order of returned calls in a synchronous workflow. But if CA has three times as many stations as Rhode Island and they just happen to be called simultaneously, you can see pretty quickly how the mix up happens. a, b, c can be returned as c, a, b.
In these cases, I recommend using the response object in your done callback function. The state is in the query parameters you used to call the API, simply extract that query param from the response object URL and append it as a column in your data. See below example of how to modify the callback to include the state.
done <-function(res) {# this portion should look familiar. It's how we wrangled the data for our # single call. bin_resp <- res$content |>rawToChar() |> jsonlite::fromJSON() |> {\(l) l$features$properties[which(names(l$features$properties) !="elevation")]}() |>setDT() |> {\(d) d[,# see line below: extract state from query params state := stringr::str_extract(res$url, "(?<=state=)[A-Z]{2}") ]}()# here's where it gets fun. We'll use <<- to go up one level and make an # assignment outside the calling scope of our done call. In this case the# global environment. bin <<-c(bin, list(bin_resp)) pb$tick() # for our progress bar}
Have fun, experiment, go wild. You can shoot me an email on the hire me page if you’re really bamboozled.
Below I’m going to put the whole asynch enchilada into a single function you can use if you want to follow along in your own REPL session.
#' Asynchronous Call for State Stations#'#' @param states a character vector of state abbreviations#' @param user_agent the user agent you supply to weather service API#'asynch_call <-function(states = state.abb, user_agent ="yourUserAgent") { pool <-new_pool() bin <-list() pb <- progress::progress_bar$new(format =" downloading [:bar] :percent eta: :eta",total =length(state.abb)) done <-function(res) { bin_resp <- res$content |>rawToChar() |> jsonlite::fromJSON() |> {\(l) l$features$properties[which(names(l$features$properties) !="elevation")]}() |>setDT() bin <<-c(bin, list(bin_resp)) pb$tick() } fail <-function(res) {message(res$status_code) pb$tick() }lapply(states, \(x) { query <-compose_query(state = x) h <-new_handle(url =glue("{base_url}/stations{query}")) |>handle_setheaders(.list =list(`User-Agent`= user_agent))multi_add(h, done, fail, pool = pool) })multi_run(pool = pool) bin}
What’s Next?
Up next, I invite you to take on parallel computing with your asynchronous calls. Especailly when pushing data this method can up your speed significantly.