diff --git a/.gitignore b/.gitignore index eec6a75a..2c6502e4 100644 --- a/.gitignore +++ b/.gitignore @@ -95,4 +95,7 @@ input/narr input/nlcd input/tri input/NCEP-NCAR-Reanalysis-1 -input/nei \ No newline at end of file +input/nei + +# pipeline +_targets \ No newline at end of file diff --git a/_targets.R b/_targets.R index 34d20d13..4ecc9f41 100644 --- a/_targets.R +++ b/_targets.R @@ -12,9 +12,11 @@ source("./tools/pipeline/targets_metalearner.R") source("./tools/pipeline/targets_predict.R") # bypass option -Sys.setenv("BTV_DOWNLOAD_PASS" = "FALSE") +Sys.setenv("BTV_DOWNLOAD_PASS" = "TRUE") -# nullify download target if bypass option is set + +tar_invalidate(any_of(tar_older(Sys.time() - as.difftime(183, units = "days")))) +# # nullify download target if bypass option is set if (Sys.getenv("BTV_DOWNLOAD_PASS") == "TRUE") { target_download <- NULL } @@ -27,52 +29,57 @@ tar_option_set( "crew", "crew.cluster", "future", "future.apply", "future.callr", "sftime", "stars", "rlang", "foreach", "parallelly"), + library = "../../r-libs", repository = "local", - controller = - crew.cluster::crew_controller_slurm( - slurm_log_output = "output/slurm_pipeline_log.out", - slurm_log_error = "output/slurm_pipeline_error.err", - tasks_max = 32L, - slurm_memory_gigabytes_per_cpu = 8, - slurm_cpus_per_task = 8L, - slurm_time_minutes = NULL, - slurm_partition = "geo" - ), + controller = NULL, + + # crew.cluster::crew_controller_slurm( + # slurm_log_output = "output/slurm_pipeline_log.out", + # slurm_log_error = "output/slurm_pipeline_error.err", + # tasks_max = 32L, + # slurm_memory_gigabytes_per_cpu = 8, + # slurm_cpus_per_task = 8L, + # slurm_time_minutes = NULL, + # slurm_partition = "geo" + # ), error = "null", memory = "persistent", + format = "qs", storage = "worker", seed = 202401L ) + + list( target_init, - targets::tar_target( - radii, - command = c(1e3, 1e4, 5e4), - iteration = "vector" - ), + # targets::tar_target( + # radii, + # command = c(1e3, 1e4, 5e4), + # iteration = "vector" + # ), target_download, - target_calculate_fit, - target_calculate_predict, - target_baselearner, - target_metalearner, - target_predict, - # documents and summary statistics - targets::tar_target( - summary_urban_rural, - summary_prediction( - grid_filled, - level = "point", - contrast = "urbanrural")) - , - targets::tar_target( - summary_state, - summary_prediction( - grid_filled, - level = "point", - contrast = "state" - ) - ) + target_calculate_fit#, + # target_baselearner, + # target_metalearner, + # target_calculate_predict, + # target_predict, + # # documents and summary statistics + # targets::tar_target( + # summary_urban_rural, + # summary_prediction( + # grid_filled, + # level = "point", + # contrast = "urbanrural")) + # , + # targets::tar_target( + # summary_state, + # summary_prediction( + # grid_filled, + # level = "point", + # contrast = "state" + # ) + # ) ) # targets::tar_visnetwork() diff --git a/tools/pipeline/narr_variables.csv b/tools/pipeline/narr_variables.csv new file mode 100644 index 00000000..2181c01a --- /dev/null +++ b/tools/pipeline/narr_variables.csv @@ -0,0 +1,25 @@ +"dirs" +"input/narr/air.sfc" +"input/narr/albedo" +"input/narr/apcp" +"input/narr/dswrf" +"input/narr/evap" +"input/narr/hcdc" +"input/narr/hpbl" +"input/narr/lcdc" +"input/narr/lhtfl" +"input/narr/mcdc" +"input/narr/omega" +"input/narr/pr_wtr" +"input/narr/prate" +"input/narr/pres.sfc" +"input/narr/shtfl" +"input/narr/shum" +"input/narr/snowc" +"input/narr/soilm" +"input/narr/tcdc" +"input/narr/ulwrf.sfc" +"input/narr/uwnd.10m" +"input/narr/vis" +"input/narr/vwnd.10m" +"input/narr/weasd" diff --git a/tools/pipeline/pipeline_base_functions.R b/tools/pipeline/pipeline_base_functions.R index 3662b14e..0775114c 100644 --- a/tools/pipeline/pipeline_base_functions.R +++ b/tools/pipeline/pipeline_base_functions.R @@ -33,7 +33,7 @@ meta_run(varname = "dir_input_modis_mod11") #' @returns Depending on `fun_aqs` specification. read_locs <- function( - fun_aqs = get("import_aqs"), + fun_aqs = amadeus::process_aqs, ... ) { fun_aqs(...) @@ -41,8 +41,8 @@ read_locs <- #' Filter monitors with the minimum POC value -#' -#' @param input_df data.frame/tbl_df/data.table +#' @param path data.frame/tbl_df/data.table +#' @param site_spt Space-time site data. #' @param locs_id character(1). Name of site id (not monitor id) #' @param poc_name character(1). Name of column containing POC values. #' @param date_start character(1). @@ -54,11 +54,13 @@ read_locs <- #' @importFrom dplyr filter #' @importFrom dplyr ungroup #' @importFrom data.table data.table +#' @importFrom data.table rbindlist #' @importFrom rlang sym #' @export get_aqs_data <- function( path = file.path(mr("dir_output"), mr("file_aqs_pm")), + site_spt = NULL, locs_id = mr("pointid"), time_id = mr("timeid"), poc_name = "POC", @@ -73,21 +75,27 @@ get_aqs_data <- if (!is.character(poc_name)) { stop("poc_name should be character.\n") } - aqs_prep <- - amadeus::process_aqs( - path = path, - date = NULL, - return_format = return_format - ) - input_df <- readRDS(path) - + # aqs_prep <- + # amadeus::process_aqs( + # path = path, + # date = NULL, + # return_format = return_format + # ) + input_df <- lapply(path, read.csv) |> data.table::rbindlist() + input_df <- input_df[, + list( + pm2.5 = Arithmetic.Mean, + site_id = sprintf("%02d%03d%04d%05d", State.Code, County.Code, Site.Num, Parameter.Code), + time = as.Date(Date.Local), + POC = POC + )] poc_filtered <- input_df |> dplyr::group_by(!!rlang::sym(locs_id), !!rlang::sym(time_id)) |> dplyr::filter(!!rlang::sym(poc_name) == min(!!rlang::sym(poc_name))) |> dplyr::ungroup() |> data.table::data.table() - poc_res <- merge(poc_filtered, aqs_prep, by = c(locs_id, time_id)) + poc_res <- merge(poc_filtered, as.data.frame(site_spt), by = c(locs_id, time_id)) return(poc_res) #nocov end } diff --git a/tools/pipeline/punchcard.csv b/tools/pipeline/punchcard.csv index ff9d3576..a874fdd5 100644 --- a/tools/pipeline/punchcard.csv +++ b/tools/pipeline/punchcard.csv @@ -1,28 +1,29 @@ varname,value,index,class,command +root_custom,../../../../group/set/Projects/NRT-AP-Model/,0,path,paste0 root_absolute,missing,1,path,getwd root_relative,.,2,path,paste0 dir_input,input,3,path,file.path dir_output,output,4,path,file.path dir_input_aqs,input/aqs,5,path,file.path dir_input_nei,input/nei,6,path,file.path -dir_input_narrmono,input/narr/monolevel,7,path,file.path -dir_input_narrplevels,input/narr/p_levels,8,path,file.path +dir_input_narr,input/narr,7,path,file.path +file_narr_variables,tools/pipeline/narr_variables.csv,8,path,file.path dir_input_modis,input/modis,9,path,file.path dir_input_nlcd,input/nlcd,10,path,file.path dir_input_ecoregion,input/ecoregions,11,path,file.path -dir_input_koppen,input/koppengeiger,12,path,file.path +dir_input_koppen,input/koppen_geiger,12,path,file.path dir_input_gmted,input/gmted,13,path,file.path dir_input_sedac_population,input/sedac_population,14,path,file.path dir_input_sedac_groads,input/sedac_groads,15,path,file.path dir_input_hms,input/noaa_hms,16,path,file.path dir_input_tri,input/tri,17,path,file.path dir_input_geos,input/geos,18,path,file.path -dir_input_modis_mod11,input/modis/raw/MOD11A1,19,path,file.path -dir_input_modis_mod13,input/modis/raw/MOD13A2,20,path,file.path -dir_input_modis_mcd19,input/modis/raw/MCD19A2,21,path,file.path -dir_input_modis_mod09,input/modis/raw/MOD09GA,22,path,file.path -dir_input_modis_mod06,input/modis/raw/MOD06_L2,23,path,file.path -dir_input_modis_vnp46,input/modis/raw/VNP46A2,24,path,file.path +dir_input_modis_mod11,input/modis/raw/61/MOD11A1,19,path,file.path +dir_input_modis_mod13,input/modis/raw/61/MOD13A2,20,path,file.path +dir_input_modis_mcd19,input/modis/raw/61/MCD19A2,21,path,file.path +dir_input_modis_mod09,input/modis/raw/61/MOD09GA,22,path,file.path +dir_input_modis_mod06,input/modis/raw/61/MOD06_L2,23,path,file.path +dir_input_modis_vnp46,input/modis/raw/5000/VNP46A2,24,path,file.path y2018,2018,25,indicator,paste0 y2019,2019,26,indicator,paste0 y2020,2020,27,indicator,paste0 @@ -90,4 +91,7 @@ file_covar_predict_hms,covar_predict_hms.rds,88,status,paste0 file_covar_predict_sedac_population,covar_predict_sedac_population.rds,89,status,paste0 file_covar_predict_sedac_groads,covar_predict_sedac_groads.rds,90,status,paste0 dir_input_nei2017,input/nei/nei_onroad_byregions_2017,91,path,file.path -dir_input_nei2020,input/nei/nei_onroad_byregions_2020,92,path,file.path \ No newline at end of file +dir_input_nei2020,input/nei/nei_onroad_byregions_2020,92,path,file.path +date_start,2018-01-01,93,domain,as.Date +date_end,2022-12-31,94,domain,as.Date +extent,-126|-62|22|52,95,domain,paste0 \ No newline at end of file diff --git a/tools/pipeline/targets_calculate.R b/tools/pipeline/targets_calculate.R index 5edcc33f..498ec638 100644 --- a/tools/pipeline/targets_calculate.R +++ b/tools/pipeline/targets_calculate.R @@ -145,22 +145,22 @@ target_calculate_fit <- ) , targets::tar_target( - covariates_narrmono, - calculate_multi( - locs = sites_spat, - path = mr("dir_input_narrmono"), - ... # other args - ) + narr_variables, + command = read.csv(mr("file_narr_variables"))$dirs, + iteration = "vector" ) , targets::tar_target( - covariates_narrplevels, + covariates_narr, calculate_multi( - status = status_narrplevels, locs = sites_spat, - path = mr("dir_input_narrplevels"), - ... # other args - ) + path = narr_variables, + date = c(mr("date_start"), mr("date_end")), + variable = strsplit(narr_variables, "/")[[1]][3], + locs_id = mr("pointid") + ), + pattern = map(narr_variables), + iteration = "vector" ) , targets::tar_target( @@ -170,7 +170,8 @@ target_calculate_fit <- , targets::tar_target( nei_dirs, - command = c(rep(mr("dir_input_nei2017"), 2), rep(mr("dir_input_nei2020"), 3)), + command = + c(rep(mr("dir_input_nei2017"), 2), rep(mr("dir_input_nei2020"), 3)), iteration = "vector" ) , @@ -187,20 +188,45 @@ target_calculate_fit <- iteration = "vector" ) , + targets::tar_target( + gmted_combination_stat, + command = rep( + c( + "Breakline Emphasis", "Systematic Subsample", + "Median Statistic", "Minimum Statistic", + "Mean Statistic", "Maximum Statistic", + "Standard Deviation Statistic" + ), 3L), + iteration = "vector" + ) + , + targets::tar_target( + gmted_combination_res, + command = rep( + c("7.5 arc-seconds", "15 arc-seconds", "30 arc-seconds"), + each = 7L + ), + iteration = "vector" + ) + , targets::tar_target( covariates_gmted, calculate_multi( - status = status_gmted, locs = sites_spat, path = mr("dir_input_gmted"), - ... # other args - ) + locs_id = mr("pointid"), + covariate = "gmted", + radius = 0, + variable = c(gmted_combination_stat, gmted_combination_res) + ), + pattern = map(gmted_combination_stat, gmted_combination_res), + iteration = "vector" ) , targets::tar_target( geos_dates, - command = seq(as.Date("2018-01-01"), as.Date("2022-12-31"), by = "1 day"), - iteration = "list" + command = as.character(seq(as.Date("2018-01-01"), as.Date("2022-12-31"), by = "1 day")), + iteration = "vector" ) , targets::tar_target( @@ -214,7 +240,7 @@ target_calculate_fit <- snap = "out" ), pattern = map(geos_dates), - iteration = "list" + iteration = "vector" ) , targets::tar_target( @@ -222,6 +248,66 @@ target_calculate_fit <- command = do.call(rbind, covariates_geos_list) ) , + targets::tar_target( + modis_mod06_paths, + list.files( + mr("dir_input_modis_mod06"), + pattern = "hdf$", + full.names = TRUE, + recursive = TRUE + ) + ) + , + targets::tar_target( + modis_mod11_paths, + list.files( + mr("dir_input_modis_mod11"), + pattern = "hdf$", + full.names = TRUE, + recursive = TRUE + ) + ) + , + targets::tar_target( + modis_mod13_paths, + list.files( + mr("dir_input_modis_mod13"), + pattern = "hdf$", + full.names = TRUE, + recursive = TRUE + ) + ) + , + targets::tar_target( + modis_mod09_paths, + list.files( + mr("dir_input_modis_mod09"), + pattern = "hdf$", + full.names = TRUE, + recursive = TRUE + ) + ) + , + targets::tar_target( + modis_mcd19_paths, + list.files( + mr("dir_input_modis_mcd19"), + pattern = "hdf$", + full.names = TRUE, + recursive = TRUE + ) + ) + , + targets::tar_target( + modis_vnp46_paths, + list.files( + mr("dir_input_modis_vnp46"), + pattern = "h5$", + full.names = TRUE, + recursive = TRUE + ) + ) + , targets::tar_target( covariates_modis_mod11, calculate_multi( @@ -240,18 +326,25 @@ target_calculate_fit <- , targets::tar_target( covariates_modis_mod06, - calculate_multi( - locs = sites_spat, - path = mr("dir_input_modis_mod06"), - process_function = NULL, - calc_function = amadeus::calc_modis_par, + amadeus::calc_modis_par( + from = modis_mod06_paths, + locs = sf::st_as_sf(sites_spat), locs_id = mr("pointid"), - from = mr("dir_input_modis_mod06"), - preprocess = amadeus::process_modis_swath, name_covariates = c("MOD_CLCVD_0_", "MOD_CLCVN_0_"), - subdataset = "(Cloud_Fraction_Day|Cloud_Fraction_Night)", - nthreads = 8 + subdataset = "(Cloud_Fraction_Day|Cloud_Fraction_Night)" ) + # calculate_multi( + # locs = sites_spat, + # path = mr("dir_input_modis_mod06"), + # process_function = NULL, + # calc_function = amadeus::calc_modis_par, + # locs_id = mr("pointid"), + # from = mr("dir_input_modis_mod06"), + # preprocess = amadeus::process_modis_swath, + # name_covariates = c("MOD_CLCVD_0_", "MOD_CLCVN_0_"), + # subdataset = "(Cloud_Fraction_Day|Cloud_Fraction_Night)", + # nthreads = 8 + # ) ) , targets::tar_target( @@ -307,19 +400,27 @@ target_calculate_fit <- , targets::tar_target( covariates_modis_vnp46, - calculate_multi( - locs = sites_spat, - path = mr("dir_input_modis_vnp46"), - process_function = NULL, - calc_function = amadeus::calc_modis_par, + amadeus::calc_modis_par( + from = modis_vnp46_paths, + locs = sf::st_as_sf(sites_spat), locs_id = mr("pointid"), - from = mr("dir_input_modis_vnp46"), - preprocess = amadeus::process_bluemarble, name_covariates = "MOD_LGHTN_0_", subdataset = 3, - nthreads = 8 - ) + preprocess = amadeus::process_bluemarble + # calculate_multi( + # locs = sites_spat, + # path = mr("dir_input_modis_vnp46"), + # process_function = NULL, + # calc_function = amadeus::calc_modis_par, + # locs_id = mr("pointid"), + # from = mr("dir_input_modis_vnp46"), + # preprocess = amadeus::process_bluemarble, + # name_covariates = "MOD_LGHTN_0_", + # subdataset = 3, + # nthreads = 8 + # ) ) + ) , # combine each covariate set into one data.frame (data.table; if any) targets::tar_target( diff --git a/tools/pipeline/targets_initialize.R b/tools/pipeline/targets_initialize.R index 0e1ac09f..0e2a05b2 100644 --- a/tools/pipeline/targets_initialize.R +++ b/tools/pipeline/targets_initialize.R @@ -1,12 +1,41 @@ target_init <- list( # tar_target for base directories and files - targets::tar_target(sites_spat, read_locs(mr("dir_input_aqs"))) + targets::tar_target( + sites_spat, + read_locs( + path = list.files( + path = mr("dir_input_aqs"), + pattern = "daily_88101_[0-9]{4}.csv", + full.names = TRUE), + date = NULL + )[1:10, ] + ) , - targets::tar_target(sites_time, read_locs(mr("dir_input_aqs"))) + targets::tar_target( + sites_time, + read_locs( + path = list.files( + path = mr("dir_input_aqs"), + pattern = "daily_88101_[0-9]{4}.csv", + full.names = TRUE), + date = c(mr("date_start"), mr("date_end")) + )[1:10, ] + ) , targets::tar_target( sites_pm, - get_aqs_data() + get_aqs_data( + path = list.files( + path = mr("dir_input_aqs"), + pattern = "daily_88101_[0-9]{4}.csv", + full.names = TRUE), + site_spt = sites_time + )[1:10, ] + ) + , + targets::tar_target( + time_range, + command = c(mr("date_start"), mr("date_end")) ) ) \ No newline at end of file