Skip to content

Commit

Permalink
pipeline in progress
Browse files Browse the repository at this point in the history
  • Loading branch information
Insang Song committed Mar 26, 2024
1 parent 40a8c46 commit 15a1289
Show file tree
Hide file tree
Showing 7 changed files with 277 additions and 100 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -95,4 +95,7 @@ input/narr
input/nlcd
input/tri
input/NCEP-NCAR-Reanalysis-1
input/nei
input/nei

# pipeline
_targets
83 changes: 45 additions & 38 deletions _targets.R
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,11 @@ source("./tools/pipeline/targets_metalearner.R")
source("./tools/pipeline/targets_predict.R")

# bypass option
Sys.setenv("BTV_DOWNLOAD_PASS" = "FALSE")
Sys.setenv("BTV_DOWNLOAD_PASS" = "TRUE")

# nullify download target if bypass option is set

tar_invalidate(any_of(tar_older(Sys.time() - as.difftime(183, units = "days"))))
# # nullify download target if bypass option is set
if (Sys.getenv("BTV_DOWNLOAD_PASS") == "TRUE") {
target_download <- NULL
}
Expand All @@ -27,52 +29,57 @@ tar_option_set(
"crew", "crew.cluster",
"future", "future.apply", "future.callr",
"sftime", "stars", "rlang", "foreach", "parallelly"),
library = "../../r-libs",
repository = "local",
controller =
crew.cluster::crew_controller_slurm(
slurm_log_output = "output/slurm_pipeline_log.out",
slurm_log_error = "output/slurm_pipeline_error.err",
tasks_max = 32L,
slurm_memory_gigabytes_per_cpu = 8,
slurm_cpus_per_task = 8L,
slurm_time_minutes = NULL,
slurm_partition = "geo"
),
controller = NULL,

# crew.cluster::crew_controller_slurm(
# slurm_log_output = "output/slurm_pipeline_log.out",
# slurm_log_error = "output/slurm_pipeline_error.err",
# tasks_max = 32L,
# slurm_memory_gigabytes_per_cpu = 8,
# slurm_cpus_per_task = 8L,
# slurm_time_minutes = NULL,
# slurm_partition = "geo"
# ),
error = "null",
memory = "persistent",
format = "qs",
storage = "worker",
seed = 202401L
)



list(
target_init,
targets::tar_target(
radii,
command = c(1e3, 1e4, 5e4),
iteration = "vector"
),
# targets::tar_target(
# radii,
# command = c(1e3, 1e4, 5e4),
# iteration = "vector"
# ),
target_download,
target_calculate_fit,
target_calculate_predict,
target_baselearner,
target_metalearner,
target_predict,
# documents and summary statistics
targets::tar_target(
summary_urban_rural,
summary_prediction(
grid_filled,
level = "point",
contrast = "urbanrural"))
,
targets::tar_target(
summary_state,
summary_prediction(
grid_filled,
level = "point",
contrast = "state"
)
)
target_calculate_fit#,
# target_baselearner,
# target_metalearner,
# target_calculate_predict,
# target_predict,
# # documents and summary statistics
# targets::tar_target(
# summary_urban_rural,
# summary_prediction(
# grid_filled,
# level = "point",
# contrast = "urbanrural"))
# ,
# targets::tar_target(
# summary_state,
# summary_prediction(
# grid_filled,
# level = "point",
# contrast = "state"
# )
# )
)

# targets::tar_visnetwork()
Expand Down
25 changes: 25 additions & 0 deletions tools/pipeline/narr_variables.csv
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"dirs"
"input/narr/air.sfc"
"input/narr/albedo"
"input/narr/apcp"
"input/narr/dswrf"
"input/narr/evap"
"input/narr/hcdc"
"input/narr/hpbl"
"input/narr/lcdc"
"input/narr/lhtfl"
"input/narr/mcdc"
"input/narr/omega"
"input/narr/pr_wtr"
"input/narr/prate"
"input/narr/pres.sfc"
"input/narr/shtfl"
"input/narr/shum"
"input/narr/snowc"
"input/narr/soilm"
"input/narr/tcdc"
"input/narr/ulwrf.sfc"
"input/narr/uwnd.10m"
"input/narr/vis"
"input/narr/vwnd.10m"
"input/narr/weasd"
32 changes: 20 additions & 12 deletions tools/pipeline/pipeline_base_functions.R
Original file line number Diff line number Diff line change
Expand Up @@ -33,16 +33,16 @@ meta_run(varname = "dir_input_modis_mod11")
#' @returns Depending on `fun_aqs` specification.
read_locs <-
function(
fun_aqs = get("import_aqs"),
fun_aqs = amadeus::process_aqs,
...
) {
fun_aqs(...)
}


#' Filter monitors with the minimum POC value
#'
#' @param input_df data.frame/tbl_df/data.table
#' @param path data.frame/tbl_df/data.table
#' @param site_spt Space-time site data.
#' @param locs_id character(1). Name of site id (not monitor id)
#' @param poc_name character(1). Name of column containing POC values.
#' @param date_start character(1).
Expand All @@ -54,11 +54,13 @@ read_locs <-
#' @importFrom dplyr filter
#' @importFrom dplyr ungroup
#' @importFrom data.table data.table
#' @importFrom data.table rbindlist
#' @importFrom rlang sym
#' @export
get_aqs_data <-
function(
path = file.path(mr("dir_output"), mr("file_aqs_pm")),
site_spt = NULL,
locs_id = mr("pointid"),
time_id = mr("timeid"),
poc_name = "POC",
Expand All @@ -73,21 +75,27 @@ get_aqs_data <-
if (!is.character(poc_name)) {
stop("poc_name should be character.\n")
}
aqs_prep <-
amadeus::process_aqs(
path = path,
date = NULL,
return_format = return_format
)
input_df <- readRDS(path)

# aqs_prep <-
# amadeus::process_aqs(
# path = path,
# date = NULL,
# return_format = return_format
# )
input_df <- lapply(path, read.csv) |> data.table::rbindlist()
input_df <- input_df[,
list(
pm2.5 = Arithmetic.Mean,
site_id = sprintf("%02d%03d%04d%05d", State.Code, County.Code, Site.Num, Parameter.Code),
time = as.Date(Date.Local),
POC = POC
)]
poc_filtered <- input_df |>
dplyr::group_by(!!rlang::sym(locs_id), !!rlang::sym(time_id)) |>
dplyr::filter(!!rlang::sym(poc_name) == min(!!rlang::sym(poc_name))) |>
dplyr::ungroup() |>
data.table::data.table()

poc_res <- merge(poc_filtered, aqs_prep, by = c(locs_id, time_id))
poc_res <- merge(poc_filtered, as.data.frame(site_spt), by = c(locs_id, time_id))
return(poc_res)
#nocov end
}
Expand Down
24 changes: 14 additions & 10 deletions tools/pipeline/punchcard.csv
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
varname,value,index,class,command
root_custom,../../../../group/set/Projects/NRT-AP-Model/,0,path,paste0
root_absolute,missing,1,path,getwd
root_relative,.,2,path,paste0
dir_input,input,3,path,file.path
dir_output,output,4,path,file.path
dir_input_aqs,input/aqs,5,path,file.path
dir_input_nei,input/nei,6,path,file.path
dir_input_narrmono,input/narr/monolevel,7,path,file.path
dir_input_narrplevels,input/narr/p_levels,8,path,file.path
dir_input_narr,input/narr,7,path,file.path
file_narr_variables,tools/pipeline/narr_variables.csv,8,path,file.path
dir_input_modis,input/modis,9,path,file.path
dir_input_nlcd,input/nlcd,10,path,file.path
dir_input_ecoregion,input/ecoregions,11,path,file.path
dir_input_koppen,input/koppengeiger,12,path,file.path
dir_input_koppen,input/koppen_geiger,12,path,file.path
dir_input_gmted,input/gmted,13,path,file.path
dir_input_sedac_population,input/sedac_population,14,path,file.path
dir_input_sedac_groads,input/sedac_groads,15,path,file.path
dir_input_hms,input/noaa_hms,16,path,file.path
dir_input_tri,input/tri,17,path,file.path
dir_input_geos,input/geos,18,path,file.path
dir_input_modis_mod11,input/modis/raw/MOD11A1,19,path,file.path
dir_input_modis_mod13,input/modis/raw/MOD13A2,20,path,file.path
dir_input_modis_mcd19,input/modis/raw/MCD19A2,21,path,file.path
dir_input_modis_mod09,input/modis/raw/MOD09GA,22,path,file.path
dir_input_modis_mod06,input/modis/raw/MOD06_L2,23,path,file.path
dir_input_modis_vnp46,input/modis/raw/VNP46A2,24,path,file.path
dir_input_modis_mod11,input/modis/raw/61/MOD11A1,19,path,file.path
dir_input_modis_mod13,input/modis/raw/61/MOD13A2,20,path,file.path
dir_input_modis_mcd19,input/modis/raw/61/MCD19A2,21,path,file.path
dir_input_modis_mod09,input/modis/raw/61/MOD09GA,22,path,file.path
dir_input_modis_mod06,input/modis/raw/61/MOD06_L2,23,path,file.path
dir_input_modis_vnp46,input/modis/raw/5000/VNP46A2,24,path,file.path
y2018,2018,25,indicator,paste0
y2019,2019,26,indicator,paste0
y2020,2020,27,indicator,paste0
Expand Down Expand Up @@ -90,4 +91,7 @@ file_covar_predict_hms,covar_predict_hms.rds,88,status,paste0
file_covar_predict_sedac_population,covar_predict_sedac_population.rds,89,status,paste0
file_covar_predict_sedac_groads,covar_predict_sedac_groads.rds,90,status,paste0
dir_input_nei2017,input/nei/nei_onroad_byregions_2017,91,path,file.path
dir_input_nei2020,input/nei/nei_onroad_byregions_2020,92,path,file.path
dir_input_nei2020,input/nei/nei_onroad_byregions_2020,92,path,file.path
date_start,2018-01-01,93,domain,as.Date
date_end,2022-12-31,94,domain,as.Date
extent,-126|-62|22|52,95,domain,paste0
Loading

0 comments on commit 15a1289

Please sign in to comment.