external-table/src/pxfprotocol.c (135 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information * regarding copyright ownership. The ASF licenses this file * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, * software distributed under the License is distributed on an * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY * KIND, either express or implied. See the License for the * specific language governing permissions and limitations * under the License. * */ #include "pxfbridge.h" #include "pxffilters.h" #if PG_VERSION_NUM >= 120000 #include "extension/gp_exttable_fdw/extaccess.h" #else #include "access/fileam.h" #endif #include "utils/elog.h" /* define magic module unless run as a part of test cases */ #ifndef UNIT_TESTING PG_MODULE_MAGIC; #endif PG_FUNCTION_INFO_V1(pxfprotocol_export); PG_FUNCTION_INFO_V1(pxfprotocol_import); PG_FUNCTION_INFO_V1(pxfprotocol_validate_urls); /* public function declarations */ Datum pxfprotocol_export(PG_FUNCTION_ARGS); Datum pxfprotocol_import(PG_FUNCTION_ARGS); Datum pxfprotocol_validate_urls(PG_FUNCTION_ARGS); /* helper function declarations */ static gphadoop_context *create_context(PG_FUNCTION_ARGS, bool is_import); static void cleanup_context(gphadoop_context *context); static void check_caller(PG_FUNCTION_ARGS, const char *func_name); /* * Validates external table URL */ Datum pxfprotocol_validate_urls(PG_FUNCTION_ARGS) { /* Must be called via the external table format manager */ if (!CALLED_AS_EXTPROTOCOL_VALIDATOR(fcinfo)) elog(ERROR, "cannot execute pxfprotocol_validate_urls outside protocol manager"); /* There must be only ONE url. */ if (EXTPROTOCOL_VALIDATOR_GET_NUM_URLS(fcinfo) != 1) ereport(ERROR, (errcode(ERRCODE_PROTOCOL_VIOLATION), errmsg("number of URLs must be one"))); char *uri_string = EXTPROTOCOL_VALIDATOR_GET_NTH_URL(fcinfo, 1); elog(DEBUG2, "pxfprotocol_validate_urls: uri %s", uri_string); GPHDUri *uri = parseGPHDUri(uri_string); /* No duplicate options. */ GPHDUri_verify_no_duplicate_options(uri); /* Check for existence of core options if profile wasn't supplied */ if (!GPHDUri_opt_exists(uri, PXF_PROFILE)) { List *coreOptions = list_make2(ACCESSOR, RESOLVER); if (EXTPROTOCOL_VALIDATOR_GET_DIRECTION(fcinfo) != EXT_VALIDATE_WRITE) coreOptions = lcons(FRAGMENTER, coreOptions); GPHDUri_verify_core_options_exist(uri, coreOptions); list_free(coreOptions); } freeGPHDUri(uri); PG_RETURN_VOID(); } /* * Writes to an external table */ Datum pxfprotocol_export(PG_FUNCTION_ARGS) { /* Must be called via the external table format manager */ check_caller(fcinfo, "pxfprotocol_export"); /* retrieve user context required for data write */ gphadoop_context *context = (gphadoop_context *) EXTPROTOCOL_GET_USER_CTX(fcinfo); /* last call -- cleanup */ if (EXTPROTOCOL_IS_LAST_CALL(fcinfo)) { cleanup_context(context); EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL); PG_RETURN_INT32(0); } /* first call -- do any desired init */ if (context == NULL) { context = create_context(fcinfo, false); EXTPROTOCOL_SET_USER_CTX(fcinfo, context); gpbridge_export_start(context); } /* Read data */ int bytes_written = gpbridge_write(context, EXTPROTOCOL_GET_DATABUF(fcinfo), EXTPROTOCOL_GET_DATALEN(fcinfo)); PG_RETURN_INT32(bytes_written); } /* * Reads tuples from an external table */ Datum pxfprotocol_import(PG_FUNCTION_ARGS) { /* Must be called via the external table format manager */ check_caller(fcinfo, "pxfprotocol_import"); /* retrieve user context required for data read */ gphadoop_context *context = (gphadoop_context *) EXTPROTOCOL_GET_USER_CTX(fcinfo); /* last call -- cleanup */ if (EXTPROTOCOL_IS_LAST_CALL(fcinfo)) { cleanup_context(context); EXTPROTOCOL_SET_USER_CTX(fcinfo, NULL); PG_RETURN_INT32(0); } /* first call -- do any desired init */ if (context == NULL) { context = create_context(fcinfo, true); EXTPROTOCOL_SET_USER_CTX(fcinfo, context); gpbridge_import_start(context); } /* sometimes an additional call can be executed even when we completed reading data from the stream */ if (context->completed) { PG_RETURN_INT32(0); } /* Read data */ int bytes_read = gpbridge_read(context, EXTPROTOCOL_GET_DATABUF(fcinfo), EXTPROTOCOL_GET_DATALEN(fcinfo)); PG_RETURN_INT32(bytes_read); } /* * Allocates context and sets values for the segment */ static gphadoop_context * create_context(PG_FUNCTION_ARGS, bool is_import) { /* parse and set uri */ GPHDUri *uri = parseGPHDUri(EXTPROTOCOL_GET_URL(fcinfo)); Relation relation = EXTPROTOCOL_GET_RELATION(fcinfo); List *filter_quals = NULL; ProjectionInfo *proj_info = NULL; char *filterstr = NULL; ExternalSelectDescData *desc = EXTPROTOCOL_GET_EXTERNAL_SELECT_DESC(fcinfo); if (desc != NULL) { filter_quals = desc->filter_quals; proj_info = desc->projInfo; if (filter_quals != NULL) { elog(DEBUG1, "create_context: filter_quals is provided"); filterstr = serializePxfFilterQuals(filter_quals); } } /* set context */ gphadoop_context *context = palloc0(sizeof(gphadoop_context)); context->gphd_uri = uri; initStringInfo(&context->uri); context->relation = relation; context->filterstr = filterstr; context->proj_info = proj_info; context->quals = filter_quals; context->completed = false; return context; } /* * De-allocates context and dependent structures. */ static void cleanup_context(gphadoop_context *context) { if (context != NULL) { gpbridge_cleanup(context); pfree(context->uri.data); pfree(context); } } /* * Checks that the caller is External Protocol Manager */ static void check_caller(PG_FUNCTION_ARGS, const char *func_name) { if (!CALLED_AS_EXTPROTOCOL(fcinfo)) ereport(ERROR, (errcode(ERRCODE_INTERNAL_ERROR), errmsg("%s not called by external protocol manager", func_name))); }