in dbViewR/R/selectFromDB.R [35:225]
selectFromDB <- function( queryIn = jsonlite::toJSON(
list(
SELECT =list(COLUMN=c('pathogen','encountered_date','residence_puma','residence_census_tract')),
GROUP_BY =list(COLUMN=c('encountered_week','residence_puma','residence_census_tract')),
SUMMARIZE=list(COLUMN='pathogen', IN= c('h1n1pdm'))
)
), source = 'production',
credentials_path = '/home/rstudio/seattle_flu',
na.rm = FALSE
){
if(class(queryIn) == "json"){
queryList <- jsonlite::fromJSON(queryIn)
} else if(class(queryIn) == "list"){
queryList<-queryIn
}
# connect to database
if(source == 'simulated_data'){
rawData <- RCurl::getURL("https://raw.githubusercontent.com/seattleflu/simulated-data/master/simulated_subject_database.csv")
db <- read.table(text = rawData, header=TRUE, sep=",", stringsAsFactors = FALSE)
} else if(source == 'production'){
# Define standard Pg environment variables for our connection files.
#
# These are set here instead of passed in via the Dockerfile or
# docker-compose.rstudio.yaml file, because this typically runs as an
# rstudio-managed user with its own shell environment separate from the
# base Docker user.
Sys.setenv(PGSERVICEFILE = file.path(credentials_path, ".pg_service.conf"),
PGPASSFILE = file.path(credentials_path, ".pgpass"))
# Connect to database using service definition and credentials in files
# defined by the environment variables above.
rawData <- DBI::dbConnect(RPostgres::Postgres(), service = "seattleflu-production")
db <- DBI::dbGetQuery(rawData, "select distinct * from shipping.incidence_model_observation_v2;")
# db <- DBI::dbGetQuery(rawData, paste('select distinct * from shipping.incidence_model_observation_v1 encounter',
# 'left join shipping.presence_absence_result_v1 taq',
# 'on encounter.sample = taq.sample',
# ';'),sep=' ')
# this logic should be substantially rethought, as I'm mixing sql and dplyr in confusing ways, but it will have to do for now!
# get all samples and nest
db2 <- DBI::dbGetQuery(rawData, paste('select distinct * from shipping.presence_absence_result_v1',
';'),sep=' ')
names(db2)[names(db2)=='target'] <- 'pathogen'
# count pathogens found and tests performed
db2 <- db2 %>% group_by(sample) %>%
mutate(number_pathogens_found = sum(present), number_pathogens_tested = n())
# add in "undetected" pathogen for samples that were tested but had no detections
db3 <- db2 %>% group_by(sample) %>% filter(all(present == FALSE) & all(number_pathogens_tested>0)) %>%
summarize(pathogen = 'undetected') %>% mutate(present=TRUE)
# join undetecteds with positives
db4 <- bind_rows(db2 %>% filter(present == TRUE), db3)
# join with encounter list, using nice formatting
db <- db %>% left_join(db4)
# put in "not_yet_tested" for samples with no test results
idx<-is.na(db$number_pathogens_tested)
db$number_pathogens_tested[idx] <- 0
db$pathogen[idx] <- 'not_yet_tested'
db$present[idx] <- TRUE
# fix missing self-test until repaired in db
db$site_type[db$site == 'self-test']<-'self-test'
DBI::dbDisconnect(rawData)
} else {
print('unknown source database!')
}
# run query
# this logic will probably move to sql queries in the database instead of dplyr after....
if(queryList$SELECT !="*"){
#(Needed hack until higher-level shape labels are in database)
if ( any( grepl('residence',queryList$SELECT$COLUMN) | grepl('work',queryList$SELECT$COLUMN) ) ){
if (! any( grepl('cra_name',queryList$SELECT$COLUMN) | grepl('neighbo',queryList$SELECT$COLUMN) ) ){
shp = dbViewR::masterSpatialDB(shape_level = 'census_tract', source = 'wa_geojson')
} else {
shp = dbViewR::masterSpatialDB(shape_level = 'census_tract', source = 'king_county_geojson')
}
# append higher-level spatial labels
# this feature will eventually be in the database, but it's needed for now to index to pumas, cra_name, etc
nestedVariables <- c('cra_name','neighborhood_district_name','puma','city')
for( COLUMN in nestedVariables){
COLNAME <- paste0('residence_',COLUMN)
if( ('residence_census_tract' %in% names(db)) & !(COLNAME %in% names(db)) & (COLNAME %in% names(shp))){
db[[COLNAME]] <- as.character(shp[[COLNAME]][match(db$residence_census_tract,shp$residence_census_tract)])
}
COLNAME <- paste0('work_',COLUMN)
if( ('work_census_tract' %in% names(db)) & !(COLNAME %in% names(db)) & (COLNAME %in% names(shp))){
db[[COLNAME]] <- as.character(shp[[COLNAME]][match(db$work_census_tract,shp$work_census_tract)])
}
}
}
## real flow starts here
db <- db %>% dplyr::select(dplyr::one_of(queryList$SELECT$COLUMN))
for(FILTER in which(grepl('WHERE',names(queryList)))){
if( any(grepl('IN',names(queryList[[FILTER]])))){
if(any(queryList[[FILTER]]$IN != 'all')){
filter_criteria <- lazyeval::interp(~y %in% x, .values=list(y = as.name(queryList[[FILTER]]$COLUMN), x = queryList[[FILTER]]$IN))
db <- db %>% dplyr::filter_(filter_criteria)
}
} else if( any(grepl('BETWEEN',names(queryList[[FILTER]])))){
filter_criteria_low <- lazyeval::interp(~y >= x, .values=list(y = as.name(queryList[[FILTER]]$COLUMN), x = queryList[[FILTER]]$BETWEEN[1]))
filter_criteria_high <- lazyeval::interp(~y <= x, .values=list(y = as.name(queryList[[FILTER]]$COLUMN), x = queryList[[FILTER]]$BETWEEN[2]))
db <- db %>% dplyr::filter_(filter_criteria_low) %>% dplyr::filter_(filter_criteria_high)
}
}
if('GROUP_BY' %in% names(queryList)){
db<- db %>% dplyr::group_by_(.dots=queryList$GROUP_BY$COLUMN)
}
if('SUMMARIZE' %in% names(queryList)){
if (queryList$SUMMARIZE$IN != 'all'){
summary_criteria <- lazyeval::interp(~sum(y %in% x), .values=list(y = as.name(queryList$SUMMARIZE$COLUMN), x = queryList$SUMMARIZE$IN))
} else {
summary_criteria <- lazyeval::interp(~n()) # must always output n and positive for downstream interpretation!
}
db <- db %>% dplyr::summarise_(n = lazyeval::interp(~n()), positive = summary_criteria)
}
# "pathogen" column is required for incidenceMapR model definitions
if(!('pathogen' %in% queryList$GROUP_BY$COLUMN)){
if( 'pathogen' %in% queryList$WHERE$COLUMN){
db$pathogen <- paste(queryList$WHERE$IN['pathogen' %in% queryList$WHERE$COLUMN],collapse='-')
} else {
db$pathogen<-'all'
}
}
}
# type harmonization
for( COLUMN in names(db)[names(db) %in% c('residence_census_tract','residence_cra_name','residence_puma','residence_neighborhood_district_name','residence_city',
'work_census_tract','work_cra_name','work_puma','work_neighborhood_district_name','work_city')]){
db[[COLUMN]] <- as.character(db[[COLUMN]])
}
# drop rows with NA since incidenceMapR (INLA) will ignore them anyway
if(na.rm){
# fixes #Error in charToDate(x): character string is not in a standard unambiguous format
dateIdx<- (sapply(db,class)=="Date")
db[,dateIdx] <- as.character(db[,dateIdx])
db <- db %>% replace(.=='NA', NA) %>% tidyr::drop_na()
# restore type
if (any(dateIdx)){
db[,dateIdx] <- as.Date(db[,dateIdx])
}
}
summarizedData <- list(observedData = db,queryList = c(queryList))
return(summarizedData)
}