int gfile_open()

in src/backend/utils/misc/fstream/gfile.c [1013:1312]


int gfile_open(gfile_t* fd, const char* fpath, int flags, int* response_code, const char** response_string, struct gpfxdist_t* transform)
{
	const char* s = strrchr(fpath, '.');
#ifdef WIN32
	bool_t is_win_pipe = FALSE;
#else
	struct 		stat sta;
	memset(&sta, 0, sizeof(sta));
#endif

	memset(fd, 0, sizeof(*fd));

	/*
	 * check for subprocess and/or named pipe
	 */
#ifdef WIN32
	/* is this a windows named pipe, of the form \\<host>\... */
	if (strlen(fpath) > 2)
	{
		if (fpath[0] == '\\' && fpath[1] == '\\')
		{
			is_win_pipe = TRUE;
			gfile_printf_then_putc_newline("looks like a windows pipe");
		}
	}

	if (is_win_pipe)
	{
		/* Try and open it as a windows named pipe */
		HANDLE pipe = CreateFile(fpath, 
								 (flags != GFILE_OPEN_FOR_READ ? GENERIC_WRITE : GENERIC_READ),
								 0, /* no sharing */
								 NULL, /* default security */
								 OPEN_EXISTING, /* file must exist */
								 0, /* default attributes */
								 NULL /* no template */);
		gfile_printf_then_putc_newline("trying to connect to pipe");
		if (pipe != INVALID_HANDLE_VALUE)
		{
			fd->is_win_pipe = TRUE;
			fd->fd.pipefd = pipe;
			gfile_printf_then_putc_newline("connected to pipe");
		}
		else
		{
			LPSTR msg;

			FormatMessage(FORMAT_MESSAGE_ALLOCATE_BUFFER |
						  FORMAT_MESSAGE_FROM_SYSTEM,
				   		  NULL, GetLastError(),
						  MAKELANGID(LANG_ENGLISH, SUBLANG_DEFAULT),
						  (LPSTR) & msg, 0, NULL);
			gfile_printf_then_putc_newline("could not create pipe: %s", msg);

			if (GetLastError() != ERROR_PIPE_BUSY)
			{
				*response_code = 500;
				*response_string = "could not connect to pipe";
			}
			else
			{
				*response_code = 501;
				*response_string = "pipe is busy, close the pipe and try again";
			}
			return 1;
		}
	}
#else	/* not win32 */
#ifdef GPFXDIST
	fd->transform = transform;
	if (fd->transform)
	{
		/* caller wants a subprocess. nothing to do here just yet. */
		gfile_printf_then_putc_newline("looks like a subprocess");
	}
	else
#endif
	{
		if (!fd->is_win_pipe && (flags == GFILE_OPEN_FOR_READ))
		{
			if (stat(fpath, &sta))
			{
				if(errno == EOVERFLOW)
				{
					/*
					* ENGINF-176
					* 
					* Some platforms don't support stat'ing of "large files"
					* accurately (files over 2GB) - SPARC for example. In these
					* cases the storage size of st_size is too small and the
					* file size will overflow. Therefore, we look for cases where
					* overflow had occurred, and resume operation. At least we
					* know that the file does exist and that's the main goal of
					* stat'ing here anyway. we set the size to 0, similarly to
					* the winpipe path, so that negative sizes won't be used.
					* 
					* TODO: there may be side effects to setting the size to 0,
					* need to double check.
					* 
					* TODO: this hack could possibly now be removed after enabling
					* largefiles via the build process with compiler flags.
					*/
					sta.st_size = 0;
				}
				else
				{
					gfile_printf_then_putc_newline("gfile stat %s failure: %s", fpath, strerror(errno));
					*response_code = 404;
					*response_string = "file not found";
					return 1;
				}
			}
			if (S_ISDIR(sta.st_mode))
			{
				gfile_printf_then_putc_newline("gfile %s is a directory", fpath);
				*response_code = 403;
				*response_string = "Reading a directory is forbidden.";
				return 1;
			}
			fd->compressed_size = sta.st_size;
		}
	}
#endif	/* ifdef win32 */

	if (NULL == fd->transform && !fd->is_win_pipe)
	{
		int syncFlag = 0;
		int openFlags;
		mode_t openMode;

#ifndef WIN32
		/*
		 * MPP-13817 (support opening files without O_SYNC)
		 */
		if (flags & GFILE_OPEN_FOR_WRITE_SYNC)
		{
			/*
			 * caller explicitly requested O_SYNC
			 */
			syncFlag = O_SYNC;
		}
		else if ((stat(fpath, &sta) == 0) && S_ISFIFO(sta.st_mode))
		{
			/*
			 * use O_SYNC since we're writing to another process via a pipe
			 */
			syncFlag = O_SYNC;
		}
#endif
		if (flags != GFILE_OPEN_FOR_READ)
		{
			openFlags = O_WRONLY | O_CREAT | O_BINARY | O_APPEND | syncFlag;
			openMode = S_IRUSR | S_IWUSR;
		}
		else
		{
			openFlags = O_RDONLY | O_BINARY;
			openMode = 0;
		}

		do
		{
#ifdef FRONTEND
			fd->fd.filefd = open(fpath, openFlags, openMode);
#else
			fd->fd.filefd = OpenTransientFile((char *) fpath, openFlags);
#endif
		}
		while (fd->fd.filefd < 0 && errno == EINTR);

		if (-1 == fd->fd.filefd)
		{
			static char buf[256];
			gfile_printf_then_putc_newline("gfile open (for %s) failed %s: %s",
										((flags == GFILE_OPEN_FOR_READ) ? "read" :
											((flags == GFILE_OPEN_FOR_WRITE_SYNC) ? "write (sync)" : "write")),
										fpath, strerror(errno));
			*response_code = 404;
			snprintf(buf, sizeof buf, "file open failure %s: %s", fpath,
					strerror(errno));
			*response_string = buf;
			return 1;
		}

#if !defined(WIN32)
		/* Restrict only one reader session for each PIPE */
		if (S_ISFIFO(sta.st_mode) && (flags == GFILE_OPEN_FOR_READ))
		{
			if (flock (fd->fd.filefd, LOCK_EX | LOCK_NB) != 0)
			{
				fd->held_pipe_lock = FALSE;
				gfile_printf_then_putc_newline("gfile %s is a pipe", fpath);
				*response_code = 404;
				*response_string = "Multiple reader to a pipe is forbidden.";
				close_filefd(fd->fd.filefd);
				fd->fd.filefd = -1;
				return 1;
			}
			else
			{
				fd->held_pipe_lock = TRUE;
			}
		}
#endif
	}

	/*
	 * prepare to use the appropriate i/o routines 
	 */

#ifdef GPFXDIST
	if (fd->transform)
	{
		fd->read  = read_subprocess;
		fd->write = write_subprocess;
		fd->close = close_subprocess;
	}
	else 
#endif
	if (fd->is_win_pipe)
	{
		fd->read = readwinpipe;
		fd->write = writewinpipe;
		fd->close = closewinpipe;
	}
	else
	{
		fd->read = read_and_retry;
		fd->write = write_and_retry;
		fd->close = nothing_close;
	}

	/*
	 * delegate remaining setup work to an appropriate open routine
	 * or return an error if we can't handle the type
	 */

#ifdef GPFXDIST
	if (fd->transform)
	{
		return subprocess_open(fd, fpath, (flags != GFILE_OPEN_FOR_READ), response_code, response_string);
	}
	else 
#endif
	if (s && strcasecmp(s,".gz")==0)
	{
#ifndef HAVE_LIBZ
		gfile_printf_then_putc_newline(".gz not supported");
#else
		/*
		 * flag used by function gfile close
		 */
		fd->compression = GZ_COMPRESSION;
		
		if (flags != GFILE_OPEN_FOR_READ)
		{
			fd->is_write = TRUE;
		}

		return gz_file_open(fd);
#endif
	}
	else if (s && strcasecmp(s,".bz2")==0)
	{
#ifndef HAVE_LIBBZ2
		gfile_printf_then_putc_newline(".bz2 not supported");
#else
		fd->compression = BZ_COMPRESSION;
		if (flags != GFILE_OPEN_FOR_READ)
			gfile_printf_then_putc_newline(".bz2 not yet supported for writable tables");

		return bz_file_open(fd);
#endif
	}
	else if (s && strcasecmp(s, ".zst") == 0)
	{
#ifndef USE_ZSTD
		gfile_printf_then_putc_newline(".zst not supported");
#else
		fd->compression = ZSTD_COMPRESSION;
		if (flags != GFILE_OPEN_FOR_READ)
		{
			fd->is_write = TRUE;
		}

		return zstd_file_open(fd);
#endif
	}
	else if (s && strcasecmp(s,".z") == 0)
		gfile_printf_then_putc_newline("gfile compression .z file is not supported");
	else if (s && strcasecmp(s,".zip") == 0)
		gfile_printf_then_putc_newline("gfile compression zip is not supported");
	else
		return 0;

	*response_code = 415;
	*response_string = "Unsupported File Type";

	return 1;
}