Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
  • Loading branch information
lakikowolfe committed May 10, 2023
2 parents cb4dc78 + 0745297 commit 494a48b
Show file tree
Hide file tree
Showing 11 changed files with 517 additions and 134 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ export(style_dashboard)
export(true_false_icon)
export(update_data_flow_manifest)
export(update_dfs_manifest)
export(visualize_component)
import(shiny)
importFrom(golem,activate_js)
importFrom(golem,add_resource_path)
Expand Down
23 changes: 15 additions & 8 deletions R/api_wrappers.R
Original file line number Diff line number Diff line change
Expand Up @@ -46,11 +46,17 @@ get_all_manifests <- function(asset_view,
if (nrow(manifests$content) > 0) {

# pull together in a dataframe
return(data.frame(Component = rep("DataFlow", nrow(manifests$content)),
contributor = rep(sp_name, nrow(manifests$content)),
entityId = manifests$content$dataset_id,
dataset_name = manifests$content$folder_name,
dataset = manifests$content$data_type))
df <- data.frame(Component = rep("DataFlow", nrow(manifests$content)),
contributor = rep(sp_name, nrow(manifests$content)),
entityId = manifests$content$dataset_id,
dataset_name = manifests$content$folder_name,
dataset = manifests$content$data_type)

# update empty cells to "Not Applicable"
df[ df == "" ] <- "Not Applicable"

return(df)

} else {
return(NULL)
}
Expand All @@ -74,11 +80,11 @@ calculate_items_per_manifest <- function(df,
asset_view,
input_token,
base_url) {

sapply(1:nrow(df), function(i) {

# dataset == "" indicates that there is no manifest
if (df$dataset[i] == "") {
if (df$dataset[i] == "Not Applicable"| df$dataset[i] == "" | is.na(df$dataset[i])) {

manifest_nrow <- "Not Applicable"

Expand All @@ -93,14 +99,15 @@ calculate_items_per_manifest <- function(df,
base_url = base_url)
},
error=function(e) {
message(e)
return(NULL)
}
)

# if no manifest is downloaded, return NA
# otherwise count rows and return nrow
manifest_nrow <- ifelse(is.null(manifest$content), "Not Applicable", nrow(manifest$content))
}
}

return(manifest_nrow)
})
Expand Down
246 changes: 194 additions & 52 deletions R/manifest.R
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ generate_data_flow_manifest_skeleton <- function(asset_view,
# count rows in each manifest listed
if (calc_num_items) {

num_items <- calculate_items_per_manifest(get_all_manifests_out = dfs_manifest,
num_items <- calculate_items_per_manifest(df = dfs_manifest,
asset_view = asset_view,
input_token = input_token,
base_url = base_url)
Expand Down Expand Up @@ -165,8 +165,9 @@ update_data_flow_manifest <- function(asset_view,

print(paste0("Checking asset view ", asset_view, " for updates"))
print(paste0("Getting data flow status manifest"))

# get current data flow manifest
dfs_manifest <- tryCatch(
dataflow_manifest_obj <- tryCatch(
{
manifest_download(asset_view = asset_view,
dataset_id = manifest_dataset_id,
Expand All @@ -179,8 +180,16 @@ update_data_flow_manifest <- function(asset_view,
}
)

dataflow_manifest <- dataflow_manifest_obj$content

# if uuid remove
if(any(grepl("Uuid", names(dataflow_manifest)))) {
idx <- grep("Uuid", names(dataflow_manifest))
dataflow_manifest <- dataflow_manifest[,-idx]
}

# get all manifests for each storage project
print("Getting all manifests")
print(paste0("Getting all manifests under asset view ", asset_view, " from Synapse"))
synapse_manifests <- tryCatch(
{
get_all_manifests(asset_view = asset_view,
Expand All @@ -194,81 +203,214 @@ update_data_flow_manifest <- function(asset_view,
}
)

print("Comparing data flow status manifest to current manifest list")
print("Checking data flow manifest for updates")

# check synapse for new datasets
dataflow_manifest_updated <- update_manifest_add_datasets(dataflow_manifest = dataflow_manifest,
get_all_manifests_out = synapse_manifests,
asset_view = asset_view,
input_token = input_token,
base_url = base_url)

# check synapse for removed datasets
dataflow_manifest_updated <- update_manifest_remove_datasets(dataflow_manifest = dataflow_manifest_updated,
get_all_manifests_out = synapse_manifests,
asset_view = asset_view,
input_token = input_token,
base_url = base_url)

# check synapse for updates to dataset_name column
dataflow_manifest_updated <- update_manifest_column(dataflow_manifest = dataflow_manifest_updated,
get_all_manifests_out = synapse_manifests,
update_column = "dataset_name",
asset_view = asset_view,
recalc_num_items = FALSE,
input_token = input_token,
base_url = base_url)

# check synapse for updates to dataset column
dataflow_manifest_updated <- update_manifest_column(dataflow_manifest = dataflow_manifest_updated,
get_all_manifests_out = synapse_manifests,
update_column = "dataset",
asset_view = asset_view,
recalc_num_items = TRUE,
input_token = input_token,
base_url = base_url)

# compare updated dataflow manifest to initial manifest

changes_made <- !identical(dataflow_manifest, dataflow_manifest_updated)

# if changes have been made submit to synapse
if (changes_made) {
# submit to synapse
# data_type = NULL until LP can fix model/submit endpoint for large manifests
# If no datatype indicated no validation will be done
message("submitting manifest to Synapse")

# create manifest directory if it doesn't exist yet
if (!file.exists("./manifest/")) {
dir.create("./manifest/")
}

# write to csv for submission
file_path <- "./manifest/synapse_storage_manifest_dataflow.csv"
write.csv(dataflow_manifest_updated, file_path, row.names = FALSE)

# submit to synapse
model_submit(data_type = NULL,
asset_view = asset_view,
dataset_id = manifest_dataset_id,
file_name = file_path,
restrict_rules = TRUE,
input_token = input_token,
manifest_record_type = "table_and_file",
base_url = base_url,
schema_url = "https://raw.githubusercontent.com/Sage-Bionetworks/data_flow/main/inst/data_flow_component.jsonld")
} else {
print("No updates to manifest required at this time")
}

}

#' Update manifest with new datasets found in Synapse
#'
#' @param dataflow_manifest A dataFlow manifest
#' @param get_all_manifests_out The output of get_all_manifests. Also can be a dataframe that includes Component, contributor, entityId, dataset_name, and dataset.
#' @param asset_view ID of view listing all project data assets. For example, for Synapse this would be the Synapse ID of the fileview listing all data assets for a given project.(i.e. master_fileview in config.yml)
#' @param input_token Synapse PAT
#' @param base_url Base URL of schematic API (Defaults to AWS version)

update_manifest_add_datasets <- function(dataflow_manifest,
get_all_manifests_out,
asset_view,
input_token,
base_url) {

# compare recent pull of all manifests to data flow manifest
missing_datasets_idx <- !synapse_manifests$entityId %in% dfs_manifest$content$entityId
missing_datasets <- synapse_manifests[missing_datasets_idx,]
# check for new datasets by entityId
new_datasets <- get_all_manifests_out[!get_all_manifests_out$entityId %in% dataflow_manifest$entityId,]

# if there are missing datasets calculate number of items for each dataset and add in missing information
if (nrow(missing_datasets) > 0) {
# if there are new datasets...
if (nrow(new_datasets) > 0) {

print(paste0(nrow(missing_datasets), " new dataset(s) found. Updating data flow status manifest"))
print(paste0(nrow(new_datasets), " new dataset(s) found on Synapse"))

# calculate number of items in each manifest
num_items <- tryCatch(
{
calculate_items_per_manifest(df = missing_datasets,
calculate_items_per_manifest(df = new_datasets,
asset_view = asset_view,
input_token = input_token,
base_url = base_url)
},
error = function(e) {
message("get_all_manifests failed")
message("num_items calculation failed")
message(e)
}
)

# fill dfs manifest rows for missing datasets
# fill data flow manifest rows for missing datasets
# FIXME: Remove hardcoded column names
# This function will break if dataflow schema changes
# Source column names from schema?
missing_datasets$release_scheduled <- rep("Not Applicable", nrow(missing_datasets))
missing_datasets$embargo <- rep("Not Applicable", nrow(missing_datasets))
missing_datasets$standard_compliance <- rep(FALSE, nrow(missing_datasets))
missing_datasets$data_portal <- rep(FALSE, nrow(missing_datasets))
missing_datasets$released <- rep(FALSE, nrow(missing_datasets))
missing_datasets$num_items <- num_items
new_datasets$release_scheduled <- rep("Not Applicable", nrow(new_datasets))
new_datasets$embargo <- rep("Not Applicable", nrow(new_datasets))
new_datasets$standard_compliance <- rep(FALSE, nrow(new_datasets))
new_datasets$data_portal <- rep(FALSE, nrow(new_datasets))
new_datasets$released <- rep(FALSE, nrow(new_datasets))
new_datasets$num_items <- num_items

# remove uuid if present
if (any(names(dfs_manifest$content) == "Uuid")) {
uuid_idx <- grep("Uuid", names(dfs_manifest$content))
dfs_manifest$content <- dfs_manifest$content[,-uuid_idx]
# remove uuid col (prep for rbind)
if (any(grepl("Uuid", names(dataflow_manifest)))) {
uuid_idx <- grep("Uuid", names(dataflow_manifest))
dataflow_manifest <- dataflow_manifest[, -uuid_idx]
}

# tack on missing datasets to end of dfs_status_manifest
updated_dfs_manifest <- rbind(dfs_manifest$content, missing_datasets)
# bind together new dataset rows and data flow manifest
dataflow_manifest <- rbind(dataflow_manifest, new_datasets)

# sort dataframe so that contributor is grouped
updated_dfs_manifest <- updated_dfs_manifest %>%
# rearrange data flow manifest
dataflow_manifest <- dataflow_manifest %>%
dplyr::group_by(contributor) %>%
dplyr::arrange(contributor)
}

return(data.frame(dataflow_manifest))

}

#' Remove datasets that are no longer found in Synapse
#'
#' @param dataflow_manifest A dataFlow manifest
#' @param get_all_manifests_out The output of get_all_manifests. Also can be a dataframe that includes Component, contributor, entityId, dataset_name, and dataset.
#' @param asset_view ID of view listing all project data assets. For example, for Synapse this would be the Synapse ID of the fileview listing all data assets for a given project.(i.e. master_fileview in config.yml)
#' @param input_token Synapse PAT
#' @param base_url Base URL of schematic API (Defaults to AWS version)

update_manifest_remove_datasets <- function(dataflow_manifest,
get_all_manifests_out,
asset_view,
input_token,
base_url) {

# check for removed datasets
remove_idx <- dataflow_manifest$entityId %in% get_all_manifests_out$entityId

# if any of the rows are flagged for removal print a message and remove from manifest
if (any(!remove_idx)) {
n_remove <- sum(!remove_idx)
print(paste0(n_remove, " dataset(s) removed from Synapse"))

# submit to synapse
# data_type = NULL until LP can fix model/submit endpoint for large manifests
# If no datatype indicated no validation will be done
message("submitting manifest to Synapse")
dataflow_manifest <- dataflow_manifest[remove_idx,]
}

return(dataflow_manifest)
}

#' Update dataFlow manifest when dataset folder name changes
#'
#' @param dataflow_manifest A dataFlow manifest
#' @param get_all_manifests_out The output of get_all_manifests. Also can be a dataframe that includes Component, contributor, entityId, dataset_name, and dataset.
#' @param asset_view ID of view listing all project data assets. For example, for Synapse this would be the Synapse ID of the fileview listing all data assets for a given project.(i.e. master_fileview in config.yml)
#' @param update_column Column name of the column to be updated
#' @param recalc_num_items TRUE/FALSE if there is an item to be updated, should the manifest
#' @param input_token Synapse PAT
#' @param base_url Base URL of schematic API (Defaults to AWS version)

update_manifest_column <- function(dataflow_manifest,
get_all_manifests_out,
update_column,
asset_view,
recalc_num_items = FALSE,
input_token,
base_url) {

# arrange by entityId
dataflow_manifest <- dplyr::arrange(dataflow_manifest, entityId)
get_all_manifests_out <- dplyr::arrange(get_all_manifests_out, entityId)

# get logical index of which items have changed
idx <- dataflow_manifest[,update_column] != get_all_manifests_out[, update_column]

# if any items have changed update dataset type column
if (any(idx)) {
n_changed <- sum(idx)
print(paste0("Making ", n_changed, " update(s) to ", update_column, " column"))
dataflow_manifest[idx, update_column] <- get_all_manifests_out[idx, update_column]

# create manifest directory if it doesn't exist yet
if (!file.exists("./manifest/")) {
dir.create("./manifest/")
# if recalc_num_items = TRUE recalculate number of items in the manifest for updated items
if (recalc_num_items) {
dataflow_manifest$num_items[idx] <- calculate_items_per_manifest(df = dataflow_manifest[idx,],
asset_view = asset_view,
input_token = input_token,
base_url = base_url)
}

# write to csv for submission
file_path <- "./manifest/synapse_storage_manifest_dataflow.csv"
write.csv(updated_dfs_manifest, file_path, row.names = FALSE)

# submit to synapse
model_submit(data_type = NULL,
asset_view = asset_view,
dataset_id = manifest_dataset_id,
file_name = file_path,
restrict_rules = TRUE,
input_token = input_token,
manifest_record_type = "table_and_file",
base_url = base_url,
schema_url = "https://raw.githubusercontent.com/Sage-Bionetworks/data_flow/main/inst/data_flow_component.jsonld")
} else {
print("No updates to manifest required at this time")
}
}

# rearrange data flow manifest
dataflow_manifest <- dataflow_manifest %>%
dplyr::group_by(contributor) %>%
dplyr::arrange(contributor)

return(data.frame(dataflow_manifest))
}
Loading

0 comments on commit 494a48b

Please sign in to comment.