in src/backend/utils/misc/fstream/gfile.c [1013:1312]
int gfile_open(gfile_t* fd, const char* fpath, int flags, int* response_code, const char** response_string, struct gpfxdist_t* transform)
{
const char* s = strrchr(fpath, '.');
#ifdef WIN32
bool_t is_win_pipe = FALSE;
#else
struct stat sta;
memset(&sta, 0, sizeof(sta));
#endif
memset(fd, 0, sizeof(*fd));
/*
* check for subprocess and/or named pipe
*/
#ifdef WIN32
/* is this a windows named pipe, of the form \\<host>\... */
if (strlen(fpath) > 2)
{
if (fpath[0] == '\\' && fpath[1] == '\\')
{
is_win_pipe = TRUE;
gfile_printf_then_putc_newline("looks like a windows pipe");
}
}
if (is_win_pipe)
{
/* Try and open it as a windows named pipe */
HANDLE pipe = CreateFile(fpath,
(flags != GFILE_OPEN_FOR_READ ? GENERIC_WRITE : GENERIC_READ),
0, /* no sharing */
NULL, /* default security */
OPEN_EXISTING, /* file must exist */
0, /* default attributes */
NULL /* no template */);
gfile_printf_then_putc_newline("trying to connect to pipe");
if (pipe != INVALID_HANDLE_VALUE)
{
fd->is_win_pipe = TRUE;
fd->fd.pipefd = pipe;
gfile_printf_then_putc_newline("connected to pipe");
}
else
{
LPSTR msg;
FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |
FORMAT_MESSAGE_FROM_SYSTEM,
NULL, GetLastError(),
MAKELANGID(LANG_ENGLISH, SUBLANG_DEFAULT),
(LPSTR) & msg, 0, NULL);
gfile_printf_then_putc_newline("could not create pipe: %s", msg);
if (GetLastError() != ERROR_PIPE_BUSY)
{
*response_code = 500;
*response_string = "could not connect to pipe";
}
else
{
*response_code = 501;
*response_string = "pipe is busy, close the pipe and try again";
}
return 1;
}
}
#else /* not win32 */
#ifdef GPFXDIST
fd->transform = transform;
if (fd->transform)
{
/* caller wants a subprocess. nothing to do here just yet. */
gfile_printf_then_putc_newline("looks like a subprocess");
}
else
#endif
{
if (!fd->is_win_pipe && (flags == GFILE_OPEN_FOR_READ))
{
if (stat(fpath, &sta))
{
if(errno == EOVERFLOW)
{
/*
* ENGINF-176
*
* Some platforms don't support stat'ing of "large files"
* accurately (files over 2GB) - SPARC for example. In these
* cases the storage size of st_size is too small and the
* file size will overflow. Therefore, we look for cases where
* overflow had occurred, and resume operation. At least we
* know that the file does exist and that's the main goal of
* stat'ing here anyway. we set the size to 0, similarly to
* the winpipe path, so that negative sizes won't be used.
*
* TODO: there may be side effects to setting the size to 0,
* need to double check.
*
* TODO: this hack could possibly now be removed after enabling
* largefiles via the build process with compiler flags.
*/
sta.st_size = 0;
}
else
{
gfile_printf_then_putc_newline("gfile stat %s failure: %s", fpath, strerror(errno));
*response_code = 404;
*response_string = "file not found";
return 1;
}
}
if (S_ISDIR(sta.st_mode))
{
gfile_printf_then_putc_newline("gfile %s is a directory", fpath);
*response_code = 403;
*response_string = "Reading a directory is forbidden.";
return 1;
}
fd->compressed_size = sta.st_size;
}
}
#endif /* ifdef win32 */
if (NULL == fd->transform && !fd->is_win_pipe)
{
int syncFlag = 0;
int openFlags;
mode_t openMode;
#ifndef WIN32
/*
* MPP-13817 (support opening files without O_SYNC)
*/
if (flags & GFILE_OPEN_FOR_WRITE_SYNC)
{
/*
* caller explicitly requested O_SYNC
*/
syncFlag = O_SYNC;
}
else if ((stat(fpath, &sta) == 0) && S_ISFIFO(sta.st_mode))
{
/*
* use O_SYNC since we're writing to another process via a pipe
*/
syncFlag = O_SYNC;
}
#endif
if (flags != GFILE_OPEN_FOR_READ)
{
openFlags = O_WRONLY | O_CREAT | O_BINARY | O_APPEND | syncFlag;
openMode = S_IRUSR | S_IWUSR;
}
else
{
openFlags = O_RDONLY | O_BINARY;
openMode = 0;
}
do
{
#ifdef FRONTEND
fd->fd.filefd = open(fpath, openFlags, openMode);
#else
fd->fd.filefd = OpenTransientFile((char *) fpath, openFlags);
#endif
}
while (fd->fd.filefd < 0 && errno == EINTR);
if (-1 == fd->fd.filefd)
{
static char buf[256];
gfile_printf_then_putc_newline("gfile open (for %s) failed %s: %s",
((flags == GFILE_OPEN_FOR_READ) ? "read" :
((flags == GFILE_OPEN_FOR_WRITE_SYNC) ? "write (sync)" : "write")),
fpath, strerror(errno));
*response_code = 404;
snprintf(buf, sizeof buf, "file open failure %s: %s", fpath,
strerror(errno));
*response_string = buf;
return 1;
}
#if !defined(WIN32)
/* Restrict only one reader session for each PIPE */
if (S_ISFIFO(sta.st_mode) && (flags == GFILE_OPEN_FOR_READ))
{
if (flock (fd->fd.filefd, LOCK_EX | LOCK_NB) != 0)
{
fd->held_pipe_lock = FALSE;
gfile_printf_then_putc_newline("gfile %s is a pipe", fpath);
*response_code = 404;
*response_string = "Multiple reader to a pipe is forbidden.";
close_filefd(fd->fd.filefd);
fd->fd.filefd = -1;
return 1;
}
else
{
fd->held_pipe_lock = TRUE;
}
}
#endif
}
/*
* prepare to use the appropriate i/o routines
*/
#ifdef GPFXDIST
if (fd->transform)
{
fd->read = read_subprocess;
fd->write = write_subprocess;
fd->close = close_subprocess;
}
else
#endif
if (fd->is_win_pipe)
{
fd->read = readwinpipe;
fd->write = writewinpipe;
fd->close = closewinpipe;
}
else
{
fd->read = read_and_retry;
fd->write = write_and_retry;
fd->close = nothing_close;
}
/*
* delegate remaining setup work to an appropriate open routine
* or return an error if we can't handle the type
*/
#ifdef GPFXDIST
if (fd->transform)
{
return subprocess_open(fd, fpath, (flags != GFILE_OPEN_FOR_READ), response_code, response_string);
}
else
#endif
if (s && strcasecmp(s,".gz")==0)
{
#ifndef HAVE_LIBZ
gfile_printf_then_putc_newline(".gz not supported");
#else
/*
* flag used by function gfile close
*/
fd->compression = GZ_COMPRESSION;
if (flags != GFILE_OPEN_FOR_READ)
{
fd->is_write = TRUE;
}
return gz_file_open(fd);
#endif
}
else if (s && strcasecmp(s,".bz2")==0)
{
#ifndef HAVE_LIBBZ2
gfile_printf_then_putc_newline(".bz2 not supported");
#else
fd->compression = BZ_COMPRESSION;
if (flags != GFILE_OPEN_FOR_READ)
gfile_printf_then_putc_newline(".bz2 not yet supported for writable tables");
return bz_file_open(fd);
#endif
}
else if (s && strcasecmp(s, ".zst") == 0)
{
#ifndef USE_ZSTD
gfile_printf_then_putc_newline(".zst not supported");
#else
fd->compression = ZSTD_COMPRESSION;
if (flags != GFILE_OPEN_FOR_READ)
{
fd->is_write = TRUE;
}
return zstd_file_open(fd);
#endif
}
else if (s && strcasecmp(s,".z") == 0)
gfile_printf_then_putc_newline("gfile compression .z file is not supported");
else if (s && strcasecmp(s,".zip") == 0)
gfile_printf_then_putc_newline("gfile compression zip is not supported");
else
return 0;
*response_code = 415;
*response_string = "Unsupported File Type";
return 1;
}