in contrib/native/client/example/querySubmitter.cpp [286:587]
int main(int argc, char* argv[]) {
try {
parseArgs(argc, argv);
std::vector<std::string*> queries;
std::string connectStr=qsOptionValues["connectStr"];
std::string schema=qsOptionValues["schema"];
std::string queryList=qsOptionValues["query"];
std::string planList=qsOptionValues["plan"];
std::string api=qsOptionValues["api"];
std::string type_str=qsOptionValues["type"];
std::string logLevel=qsOptionValues["logLevel"];
std::string testCancel=qsOptionValues["testCancel"];
std::string syncSend=qsOptionValues["syncSend"];
std::string hshakeTimeout=qsOptionValues["hshakeTimeout"];
std::string queryTimeout=qsOptionValues["queryTimeout"];
std::string heartbeatFrequency=qsOptionValues["heartbeatFrequency"];
std::string user=qsOptionValues["user"];
std::string password=qsOptionValues["password"];
std::string saslPluginPath=qsOptionValues["saslPluginPath"];
std::string sasl_encrypt=qsOptionValues["sasl_encrypt"];
std::string serviceHost=qsOptionValues["service_host"];
std::string serviceName=qsOptionValues["service_name"];
std::string auth=qsOptionValues["auth"];
std::string enableSSL=qsOptionValues["enableSSL"];
std::string tlsProtocol=qsOptionValues["TLSProtocol"];
std::string certFilePath=qsOptionValues["certFilePath"];
std::string disableHostnameVerification=qsOptionValues["disableHostnameVerification"];
std::string disableCertVerification=qsOptionValues["disableCertVerification"];
std::string useSystemTrustStore = qsOptionValues["useSystemTrustStore"];
std::string customSSLOptions = qsOptionValues["CustomSSLCtxOptions"];
std::string supportComplexTypes = qsOptionValues["supportComplexTypes"];
std::string hostnameOverride = qsOptionValues["hostnameOverride"];
Drill::QueryType type;
if(!validate(type_str, queryList, planList)){
exit(1);
}
Drill::logLevel_t l=getLogLevel(logLevel.c_str());
std::vector<std::string> queryInputs;
if(type_str=="sql" ){
readQueries(queryList, queryInputs);
type=Drill::SQL;
}else if(type_str=="physical" ){
readPlans(planList, queryInputs);
type=Drill::PHYSICAL;
}else if(type_str == "logical"){
readPlans(planList, queryInputs);
type=Drill::LOGICAL;
}else{
readQueries(queryList, queryInputs);
type=Drill::SQL;
}
bTestCancel = !strcmp(testCancel.c_str(), "true")?true:false;
bSyncSend = !strcmp(syncSend.c_str(), "true")?true:false;
std::vector<std::string>::iterator queryInpIter;
std::vector<Drill::RecordIterator*> recordIterators;
std::vector<Drill::RecordIterator*>::iterator recordIterIter;
std::vector<Drill::QueryHandle_t> queryHandles;
std::vector<Drill::QueryHandle_t>::iterator queryHandleIter;
Drill::DrillClient client;
#if defined _WIN32 || defined _WIN64
TCHAR tempPath[MAX_PATH];
GetTempPath(MAX_PATH, tempPath);
char logpathPrefix[MAX_PATH + 128];
strcpy(logpathPrefix,tempPath);
strcat(logpathPrefix, "\\drillclient");
#else
const char* logpathPrefix = "/var/log/drill/drillclient";
#endif
// To log to file
Drill::DrillClient::initLogging(logpathPrefix, l);
// To log to stderr
//Drill::DrillClient::initLogging(NULL, l);
int nQueries=queryInputs.size();
Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. The size of a record batch may vary, but is unlikely to exceed the 256 MB which is the default.
if(!hshakeTimeout.empty()){
Drill::DrillClientConfig::setHandshakeTimeout(atoi(hshakeTimeout.c_str()));
}
if (!queryTimeout.empty()){
Drill::DrillClientConfig::setQueryTimeout(atoi(queryTimeout.c_str()));
}
if(!heartbeatFrequency.empty()) {
Drill::DrillClientConfig::setHeartbeatFrequency(atoi(heartbeatFrequency.c_str()));
}
if (!saslPluginPath.empty()){
Drill::DrillClientConfig::setSaslPluginPath(saslPluginPath.c_str());
}
Drill::DrillUserProperties props;
if(schema.length()>0){
props.setProperty(USERPROP_SCHEMA, schema);
}
if(user.length()>0){
props.setProperty(USERPROP_USERNAME, user);
}
if(password.length()>0){
props.setProperty(USERPROP_PASSWORD, password);
}
if(sasl_encrypt.length()>0){
props.setProperty(USERPROP_SASL_ENCRYPT, sasl_encrypt);
}
if(serviceHost.length()>0){
props.setProperty(USERPROP_SERVICE_HOST, serviceHost);
}
if(serviceName.length()>0){
props.setProperty(USERPROP_SERVICE_NAME, serviceName);
}
if(auth.length()>0){
props.setProperty(USERPROP_AUTH_MECHANISM, auth);
}
if(enableSSL.length()>0){
props.setProperty(USERPROP_USESSL, enableSSL);
if (enableSSL == "true" && certFilePath.length() <= 0 && useSystemTrustStore.length() <= 0){
std::cerr<< "SSL is enabled but no certificate or truststore provided. " << std::endl;
return -1;
}
props.setProperty(USERPROP_TLSPROTOCOL, tlsProtocol);
props.setProperty(USERPROP_CERTFILEPATH, certFilePath);
props.setProperty(USERPROP_DISABLE_HOSTVERIFICATION, disableHostnameVerification);
props.setProperty(USERPROP_DISABLE_CERTVERIFICATION, disableCertVerification);
if (useSystemTrustStore.length() > 0){
props.setProperty(USERPROP_USESYSTEMTRUSTSTORE, useSystemTrustStore);
}
if (customSSLOptions.length() > 0){
props.setProperty(USERPROP_CUSTOM_SSLCTXOPTIONS, customSSLOptions);
}
}
if (supportComplexTypes.length() > 0){
props.setProperty(USERPROP_SUPPORT_COMPLEX_TYPES, supportComplexTypes);
}
if (hostnameOverride.length() > 0) {
props.setProperty(USERPROP_HOSTNAME_OVERRIDE, hostnameOverride);
}
if(client.connect(connectStr.c_str(), &props)!=Drill::CONN_SUCCESS){
std::cerr<< "Failed to connect with error: "<< client.getError() << " (Using:"<<connectStr<<")"<<std::endl;
return -1;
}
std::cout<< "Connected!\n" << std::endl;
if(api=="meta") {
Drill::Metadata* metadata = client.getMetadata();
if (metadata) {
std::cout << "Connector:" << std::endl;
std::cout << "\tname:" << metadata->getConnectorName() << std::endl;
std::cout << "\tversion:" << metadata->getConnectorVersion() << std::endl;
std::cout << std::endl;
std::cout << "Server:" << std::endl;
std::cout << "\tname:" << metadata->getServerName() << std::endl;
std::cout << "\tversion:" << metadata->getServerVersion() << std::endl;
std::cout << std::endl;
std::cout << "Metadata:" << std::endl;
std::cout << "\tall tables are selectable: " << metadata->areAllTableSelectable() << std::endl;
std::cout << "\tcatalog separator: " << metadata->getCatalogSeparator() << std::endl;
std::cout << "\tcatalog term: " << metadata->getCatalogTerm() << std::endl;
std::cout << "\tCOLLATE support: " << metadata->getCollateSupport() << std::endl;
std::cout << "\tcorrelation names: " << metadata->getCorrelationNames() << std::endl;
std::cout << "\tdate time functions: " << boost::algorithm::join(metadata->getDateTimeFunctions(), ", ") << std::endl;
std::cout << "\tdate time literals support: " << metadata->getDateTimeLiteralsSupport() << std::endl;
std::cout << "\tGROUP BY support: " << metadata->getGroupBySupport() << std::endl;
std::cout << "\tidentifier case: " << metadata->getIdentifierCase() << std::endl;
std::cout << "\tidentifier quote string: " << metadata->getIdentifierQuoteString() << std::endl;
std::cout << "\tmax binary literal length: " << metadata->getMaxBinaryLiteralLength() << std::endl;
std::cout << "\tmax catalog name length: " << metadata->getMaxCatalogNameLength() << std::endl;
std::cout << "\tmax char literal length: " << metadata->getMaxCharLiteralLength() << std::endl;
std::cout << "\tmax column name length: " << metadata->getMaxColumnNameLength() << std::endl;
std::cout << "\tmax columns in GROUP BY: " << metadata->getMaxColumnsInGroupBy() << std::endl;
std::cout << "\tmax columns in ORDER BY: " << metadata->getMaxColumnsInOrderBy() << std::endl;
std::cout << "\tmax columns in SELECT: " << metadata->getMaxColumnsInSelect() << std::endl;
std::cout << "\tmax cursor name length: " << metadata->getMaxCursorNameLength() << std::endl;
std::cout << "\tmax logical lob size: " << metadata->getMaxLogicalLobSize() << std::endl;
std::cout << "\tmax row size: " << metadata->getMaxRowSize() << std::endl;
std::cout << "\tmax schema name length: " << metadata->getMaxSchemaNameLength() << std::endl;
std::cout << "\tmax statement length: " << metadata->getMaxStatementLength() << std::endl;
std::cout << "\tmax statements: " << metadata->getMaxStatements() << std::endl;
std::cout << "\tmax table name length: " << metadata->getMaxTableNameLength() << std::endl;
std::cout << "\tmax tables in SELECT: " << metadata->getMaxTablesInSelect() << std::endl;
std::cout << "\tmax user name length: " << metadata->getMaxUserNameLength() << std::endl;
std::cout << "\tNULL collation: " << metadata->getNullCollation() << std::endl;
std::cout << "\tnumeric functions: " << boost::algorithm::join(metadata->getNumericFunctions(), ", ") << std::endl;
std::cout << "\tOUTER JOIN support: " << metadata->getOuterJoinSupport() << std::endl;
std::cout << "\tquoted identifier case: " << metadata->getQuotedIdentifierCase() << std::endl;
std::cout << "\tSQL keywords: " << boost::algorithm::join(metadata->getSQLKeywords(), ",") << std::endl;
std::cout << "\tschema term: " << metadata->getSchemaTerm() << std::endl;
std::cout << "\tsearch escape string: " << metadata->getSearchEscapeString() << std::endl;
std::cout << "\tspecial characters: " << metadata->getSpecialCharacters() << std::endl;
std::cout << "\tstring functions: " << boost::algorithm::join(metadata->getStringFunctions(), ",") << std::endl;
std::cout << "\tsub query support: " << metadata->getSubQuerySupport() << std::endl;
std::cout << "\tsystem functions: " << boost::algorithm::join(metadata->getSystemFunctions(), ",") << std::endl;
std::cout << "\ttable term: " << metadata->getTableTerm() << std::endl;
std::cout << "\tUNION support: " << metadata->getUnionSupport() << std::endl;
std::cout << "\tBLOB included in max row size: " << metadata->isBlobIncludedInMaxRowSize() << std::endl;
std::cout << "\tcatalog at start: " << metadata->isCatalogAtStart() << std::endl;
std::cout << "\tcolumn aliasing supported: " << metadata->isColumnAliasingSupported() << std::endl;
std::cout << "\tLIKE escape clause supported: " << metadata->isLikeEscapeClauseSupported() << std::endl;
std::cout << "\tNULL plus non NULL equals to NULL: " << metadata->isNullPlusNonNullNull() << std::endl;
std::cout << "\tread-only: " << metadata->isReadOnly() << std::endl;
std::cout << "\tSELECT FOR UPDATE supported: " << metadata->isSelectForUpdateSupported() << std::endl;
std::cout << "\ttransaction supported: " << metadata->isTransactionSupported() << std::endl;
std::cout << "\tunrelated columns in ORDER BY supported: " << metadata->isUnrelatedColumnsInOrderBySupported() << std::endl;
client.freeMetadata(&metadata);
} else {
std::cerr << "Cannot get metadata:" << client.getError() << std::endl;
}
} else if(api=="sync"){
Drill::DrillClientError* err=NULL;
Drill::status_t ret;
int nQueries=0;
for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
Drill::RecordIterator* pRecIter = client.submitQuery(type, *queryInpIter, err);
if(pRecIter!=NULL){
recordIterators.push_back(pRecIter);
nQueries++;
}
}
Drill::DrillClientConfig::setBufferLimit(nQueries*2*1024*1024); // 2MB per query. Allows us to hold at least two record batches.
size_t row=0;
for(recordIterIter = recordIterators.begin(); recordIterIter != recordIterators.end(); recordIterIter++) {
// get fields.
row=0;
Drill::RecordIterator* pRecIter=*recordIterIter;
Drill::FieldDefPtr fields= pRecIter->getColDefs();
while((ret=pRecIter->next()), (ret==Drill::QRY_SUCCESS || ret==Drill::QRY_SUCCESS_WITH_INFO) && !pRecIter->hasError()){
fields = pRecIter->getColDefs();
row++;
if( (ret==Drill::QRY_SUCCESS_WITH_INFO && pRecIter->hasSchemaChanged() )|| ( row%100==1)){
for(size_t i=0; i<fields->size(); i++){
std::string name= fields->at(i)->getName();
printf("%s\t", name.c_str());
}
printf("\n");
}
printf("ROW: %ld\t", row);
for(size_t i=0; i<fields->size(); i++){
void* pBuf; size_t sz;
pRecIter->getCol(i, &pBuf, &sz);
print(fields->at(i), pBuf, sz);
}
printf("\n");
if(bTestCancel && row%100==1){
pRecIter->cancel();
printf("Application cancelled the query.\n");
}
}
if(ret!=Drill::QRY_NO_MORE_DATA && ret!=Drill::QRY_CANCEL){
std::cerr<< pRecIter->getError() << std::endl;
}
client.freeQueryIterator(&pRecIter);
}
client.waitForResults();
}else{
if(bSyncSend){
for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
Drill::QueryHandle_t qryHandle;
client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, &qryHandle);
client.registerSchemaChangeListener(&qryHandle, SchemaListener);
if(bTestCancel) {
// Send cancellation request after 5seconds
boost::this_thread::sleep(boost::posix_time::milliseconds(1000));
std::cout<< "\n Cancelling query: " << *queryInpIter << "\n" << std::endl;
client.cancelQuery(qryHandle);
} else {
client.waitForResults();
}
client.freeQueryResources(&qryHandle);
}
}else{
for(queryInpIter = queryInputs.begin(); queryInpIter != queryInputs.end(); queryInpIter++) {
Drill::QueryHandle_t qryHandle;
client.submitQuery(type, *queryInpIter, QueryResultsListener, NULL, &qryHandle);
client.registerSchemaChangeListener(&qryHandle, SchemaListener);
queryHandles.push_back(qryHandle);
}
client.waitForResults();
for(queryHandleIter = queryHandles.begin(); queryHandleIter != queryHandles.end(); queryHandleIter++) {
client.freeQueryResources(&*queryHandleIter);
}
}
}
client.close();
} catch (std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 0;
}