sqlRemoteExecuteFun <- function()

in R/R/sqlPackage.R [282:573]


sqlRemoteExecuteFun <- function(connection, FUN, ..., useRemoteFun = FALSE, asuser = NULL, includeFun = list(), languageName)
{
    g_scriptFile <- local(g_scriptFile, install.env)

    if (is.character(asuser) && length(asuser) == 1)
    {
        if (nchar(asuser) == 0)
        {
            asuser <- NULL
        }
    }
    else
    {
        asuser <- NULL
    }

    # input processing and checking
    #
    if (is.function(FUN))
    {
        funName <- deparse(substitute(FUN))
    }
    else
    {
        if (!is.character(FUN))
        {
            stop(paste("you must provide either a function object or a function"))
        }

        funName <- FUN
        FUN <- match.fun(FUN)
    }

    # captures the R code and formats it to be embedded in t-sql sp_execute_external_script
    #
    deparseforSql <- function(funName, fun)
    {
        # counts the number of spaces at the beginning of the string
        #
        countSpacesAtBegin <- function(s)
        {
            p <- gregexpr("^ *", s)
            return(attr(p[[1]], "match.length"))
        }

        funBody <- deparse(fun)

        # add on the function definition
        #
        funBody[1] <- paste(funName, "<-", funBody[1], sep = " ")

        # escape single quotes and get rid of tabs
        #
        funBody <- sapply(funBody, gsub, pattern = "\t", replacement = "  ")
        funBody <- sapply(funBody, gsub, pattern = "'", replacement = "''")

        # handle the case where the function's rcode was indented
        # more than 2 spaces and get rid of extra spaces.
        # otherwsise the resulting indentation of R code in TSQL
        # will depend on the indentation of the code in the R
        #
        if (length(funBody) > 1)
        {
            # temporarily discard empty lines so they don't affect space counting
            #
            no_empty_lines <- funBody[funBody != ""]

            # remove the first line (function declaration line) from no_empty_lines
            # and if the last line only contains a closing bracket align it with
            # the function declaration and remove as well
            #
            if (grepl("^ *} *$", funBody[length(funBody)]))
            {
                funBody[length(funBody)] <- "}"
                no_empty_lines <- no_empty_lines[2:(length(no_empty_lines) - 1)]
            }
            else
            {
                no_empty_lines <- no_empty_lines[2:length(no_empty_lines)]
            }

            # find the minimum number of extra spaces
            #
            extra_spaces <- min(sapply(no_empty_lines, countSpacesAtBegin)) - 2

            # remove extra spaces
            #
            if (extra_spaces > 0)
            {
                for (i in 2:(length(funBody) - 1))
                {
                    funBody[i] <- gsub(paste("^ {", extra_spaces,"}", sep = ""),
                                       "", funBody[i])
                }
            }
        }

        funText <- paste(funBody, collapse = "\n")

        return (funText)
    }


    # Define a function that will attempt to resolve the ellipsis arguments
    # passed into the rxElem function and return those elements in a (named) list.
    # For those elements that are not resolvable, leave them as promises to be
    # evaluated on the cluster. This scheme avoids, for example, the need to have
    # a particular package loaded locally in order to (locally) resolve symbols/data
    # that belong to that package. In this case, the packagesToLoad argument is expected
    # to name the package that is required to be loaded on the cluster nodes in order for
    # the promised symbols to be resolvable.
    #
    tryEvalArgList <- function(...)
    {
        # Convert ellipsis arguments into a list of substituted values,
        # which will result in names, symbols, or language objects and
        # will avoid the evaluation.
        #
        argListSubstitute <- as.list(substitute(list(...)))[-1L]

        # Now attempt to evaluate each argument. If we fail, then keep
        # argument value as a substituted value. These substituted values
        # essentially act as a promise and will be evaluated on the cluster.
        # If they also fail (re not resolvable) on the cluster, an error will
        # be returned.
        #
        envir <- parent.frame(n = 2)
        sapply(argListSubstitute, function(x, envir)
        {
            res <- try(eval(x, envir = envir), silent = TRUE)
            if (!inherits(res, "try-error")) res else x
        }, envir = envir, simplify = FALSE)
    }

    argList <- tryEvalArgList(...)
    binArgList <- serialize(argList, NULL)
    binArgListCollapse <- paste0(binArgList, collapse = ";")

    script <- ""

    if (length(includeFun) > 0)
    {
        includeFunNames <- names(includeFun)

        if(length(includeFunNames) != length(includeFun))
        {
            stop("invalid parameter 'includeFun' requires matching function names to be specified in list", call. = FALSE)
        }
        for (i in seq_along(includeFun))
        {
            script <- paste0(script, "\n", deparseforSql(includeFunNames[[i]], includeFun[[i]]))
        }
    }

    if (!useRemoteFun)
    {
        funText <- deparseforSql(funName, FUN)
        script <- paste0(script, "\n", funText)
    }


    script <- paste0(script,
                     sprintf("
                                 result <- NULL
                                 funerror <- NULL
                                 funwarnings <- NULL
                                 output <- capture.output(try(
                                 withCallingHandlers({
                                 binArgList <- unlist(lapply(lapply(strsplit(\"%s\",\";\")[[1]], as.hexmode), as.raw))
                                 argList <- as.list(unserialize(binArgList))
                                 result <- do.call(%s, argList)
                             },
                             error = function(err)
                             {
                                funerror <<- err$message
                             },
                             warning = function(warn)
                             {
                                funwarnings <<- c(funwarnings, warn$message)
                             }
                             ), silent = TRUE
                             ))

                             serializedResult <- as.character(serialize(list(result, funerror, funwarnings, output), NULL))
                             OutputDataSet <- data.frame(serializedResult, stringsAsFactors = FALSE)[1]
                             ", binArgListCollapse, funName)
                            )

    query <- ""
    if (!is.null(asuser))
    {
        query <- paste0("EXECUTE AS USER = '", asuser, "';")
    }

    query <- paste0(query
                    ,"\nEXEC sp_execute_external_script"
                    ,"\n@language = N'", languageName, "'"
                    ,"\n,@script = N'",script, "';"
    )

    if (!is.null(asuser))
    {
        query <- paste0(query, "\nREVERT;")
    }

    success <- FALSE
    error <- ""
    hodbc <- -1

    queryResult <- NULL
    tryCatch(
    {

        if(!is.null(g_scriptFile))
        {
            callingFun = as.character(as.list(sys.call()))

            if("findPackages" %in% callingFun ||
               "utils::installed.packages" %in% callingFun)
            {
                cat(sprintf("-- Called from %s\n", callingFun[[1]]), file=g_scriptFile, append=TRUE)
                cat(query, file=g_scriptFile, append=TRUE)
                cat("\n", file=g_scriptFile, append=TRUE)
            }
        }

        sqlResult <- execute(connection, query)

        if (is.data.frame(sqlResult))
        {
            serializedResult <- sqlResult[[1]]
            success <- TRUE
        }
        else
        {
            # error happened, vector of string contains error messages
            #
            error <- paste(sqlResult, sep = "\n")
            success <- FALSE
        }

    },
    error = function(err)
    {
        success <<- FALSE
        error <<- err$message
    },
    finally =
    {
        if(!is.null(queryResult))
        {
            dbClearResult(queryResult)
        }
    })

    if (success)
    {
        lst <- unserialize(unlist(lapply(lapply(as.character(serializedResult),as.hexmode), as.raw)))

        result <- lst[[1]]
        funerror <- lst[[2]]
        funwarnings <-lst[[3]]
        output <- lst[[4]]

        if (!is.null(output))
        {
            for(o in output)
            {
                cat(paste0(o,"\n"))
            }
        }

        if (!is.null(funwarnings))
        {
            for(w in funwarnings)
            {
                warning(w, call. = FALSE)
            }
        }

        if (!is.null(funerror))
        {
            stop(funerror, call. = FALSE)
        }

        return(result)
    }
    else
    {
        stop(error, call. = FALSE)
    }
}