in R/R/sqlPackage.R [282:573]
sqlRemoteExecuteFun <- function(connection, FUN, ..., useRemoteFun = FALSE, asuser = NULL, includeFun = list(), languageName)
{
g_scriptFile <- local(g_scriptFile, install.env)
if (is.character(asuser) && length(asuser) == 1)
{
if (nchar(asuser) == 0)
{
asuser <- NULL
}
}
else
{
asuser <- NULL
}
# input processing and checking
#
if (is.function(FUN))
{
funName <- deparse(substitute(FUN))
}
else
{
if (!is.character(FUN))
{
stop(paste("you must provide either a function object or a function"))
}
funName <- FUN
FUN <- match.fun(FUN)
}
# captures the R code and formats it to be embedded in t-sql sp_execute_external_script
#
deparseforSql <- function(funName, fun)
{
# counts the number of spaces at the beginning of the string
#
countSpacesAtBegin <- function(s)
{
p <- gregexpr("^ *", s)
return(attr(p[[1]], "match.length"))
}
funBody <- deparse(fun)
# add on the function definition
#
funBody[1] <- paste(funName, "<-", funBody[1], sep = " ")
# escape single quotes and get rid of tabs
#
funBody <- sapply(funBody, gsub, pattern = "\t", replacement = " ")
funBody <- sapply(funBody, gsub, pattern = "'", replacement = "''")
# handle the case where the function's rcode was indented
# more than 2 spaces and get rid of extra spaces.
# otherwsise the resulting indentation of R code in TSQL
# will depend on the indentation of the code in the R
#
if (length(funBody) > 1)
{
# temporarily discard empty lines so they don't affect space counting
#
no_empty_lines <- funBody[funBody != ""]
# remove the first line (function declaration line) from no_empty_lines
# and if the last line only contains a closing bracket align it with
# the function declaration and remove as well
#
if (grepl("^ *} *$", funBody[length(funBody)]))
{
funBody[length(funBody)] <- "}"
no_empty_lines <- no_empty_lines[2:(length(no_empty_lines) - 1)]
}
else
{
no_empty_lines <- no_empty_lines[2:length(no_empty_lines)]
}
# find the minimum number of extra spaces
#
extra_spaces <- min(sapply(no_empty_lines, countSpacesAtBegin)) - 2
# remove extra spaces
#
if (extra_spaces > 0)
{
for (i in 2:(length(funBody) - 1))
{
funBody[i] <- gsub(paste("^ {", extra_spaces,"}", sep = ""),
"", funBody[i])
}
}
}
funText <- paste(funBody, collapse = "\n")
return (funText)
}
# Define a function that will attempt to resolve the ellipsis arguments
# passed into the rxElem function and return those elements in a (named) list.
# For those elements that are not resolvable, leave them as promises to be
# evaluated on the cluster. This scheme avoids, for example, the need to have
# a particular package loaded locally in order to (locally) resolve symbols/data
# that belong to that package. In this case, the packagesToLoad argument is expected
# to name the package that is required to be loaded on the cluster nodes in order for
# the promised symbols to be resolvable.
#
tryEvalArgList <- function(...)
{
# Convert ellipsis arguments into a list of substituted values,
# which will result in names, symbols, or language objects and
# will avoid the evaluation.
#
argListSubstitute <- as.list(substitute(list(...)))[-1L]
# Now attempt to evaluate each argument. If we fail, then keep
# argument value as a substituted value. These substituted values
# essentially act as a promise and will be evaluated on the cluster.
# If they also fail (re not resolvable) on the cluster, an error will
# be returned.
#
envir <- parent.frame(n = 2)
sapply(argListSubstitute, function(x, envir)
{
res <- try(eval(x, envir = envir), silent = TRUE)
if (!inherits(res, "try-error")) res else x
}, envir = envir, simplify = FALSE)
}
argList <- tryEvalArgList(...)
binArgList <- serialize(argList, NULL)
binArgListCollapse <- paste0(binArgList, collapse = ";")
script <- ""
if (length(includeFun) > 0)
{
includeFunNames <- names(includeFun)
if(length(includeFunNames) != length(includeFun))
{
stop("invalid parameter 'includeFun' requires matching function names to be specified in list", call. = FALSE)
}
for (i in seq_along(includeFun))
{
script <- paste0(script, "\n", deparseforSql(includeFunNames[[i]], includeFun[[i]]))
}
}
if (!useRemoteFun)
{
funText <- deparseforSql(funName, FUN)
script <- paste0(script, "\n", funText)
}
script <- paste0(script,
sprintf("
result <- NULL
funerror <- NULL
funwarnings <- NULL
output <- capture.output(try(
withCallingHandlers({
binArgList <- unlist(lapply(lapply(strsplit(\"%s\",\";\")[[1]], as.hexmode), as.raw))
argList <- as.list(unserialize(binArgList))
result <- do.call(%s, argList)
},
error = function(err)
{
funerror <<- err$message
},
warning = function(warn)
{
funwarnings <<- c(funwarnings, warn$message)
}
), silent = TRUE
))
serializedResult <- as.character(serialize(list(result, funerror, funwarnings, output), NULL))
OutputDataSet <- data.frame(serializedResult, stringsAsFactors = FALSE)[1]
", binArgListCollapse, funName)
)
query <- ""
if (!is.null(asuser))
{
query <- paste0("EXECUTE AS USER = '", asuser, "';")
}
query <- paste0(query
,"\nEXEC sp_execute_external_script"
,"\n@language = N'", languageName, "'"
,"\n,@script = N'",script, "';"
)
if (!is.null(asuser))
{
query <- paste0(query, "\nREVERT;")
}
success <- FALSE
error <- ""
hodbc <- -1
queryResult <- NULL
tryCatch(
{
if(!is.null(g_scriptFile))
{
callingFun = as.character(as.list(sys.call()))
if("findPackages" %in% callingFun ||
"utils::installed.packages" %in% callingFun)
{
cat(sprintf("-- Called from %s\n", callingFun[[1]]), file=g_scriptFile, append=TRUE)
cat(query, file=g_scriptFile, append=TRUE)
cat("\n", file=g_scriptFile, append=TRUE)
}
}
sqlResult <- execute(connection, query)
if (is.data.frame(sqlResult))
{
serializedResult <- sqlResult[[1]]
success <- TRUE
}
else
{
# error happened, vector of string contains error messages
#
error <- paste(sqlResult, sep = "\n")
success <- FALSE
}
},
error = function(err)
{
success <<- FALSE
error <<- err$message
},
finally =
{
if(!is.null(queryResult))
{
dbClearResult(queryResult)
}
})
if (success)
{
lst <- unserialize(unlist(lapply(lapply(as.character(serializedResult),as.hexmode), as.raw)))
result <- lst[[1]]
funerror <- lst[[2]]
funwarnings <-lst[[3]]
output <- lst[[4]]
if (!is.null(output))
{
for(o in output)
{
cat(paste0(o,"\n"))
}
}
if (!is.null(funwarnings))
{
for(w in funwarnings)
{
warning(w, call. = FALSE)
}
}
if (!is.null(funerror))
{
stop(funerror, call. = FALSE)
}
return(result)
}
else
{
stop(error, call. = FALSE)
}
}