benchmark/select-copy.c (263 lines of code) (raw):
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
#include <arpa/inet.h>
#include <stdbool.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/time.h>
#include <libpq-fe.h>
#include <catalog/pg_type_d.h>
/* See the "Binary Format" section in
* https://www.postgresql.org/docs/current/sql-copy.html for
* details. */
/* The last '\0' is also part of the signature. */
static const char signature[11] = "PGCOPY\n\377\r\n";
static const size_t signatureSize = sizeof(signature);
typedef struct {
char* data;
size_t size;
} Buffer;
static bool
read_uint16(Buffer* buffer, uint16_t* output, const char* tag)
{
if (buffer->size < sizeof(uint16_t))
{
fprintf(stderr,
"%s: can't read uint16_t (%d bytes) value: %d",
tag,
(int)sizeof(uint16_t),
(int)(buffer->size));
return false;
}
*output = ntohs(*((uint16_t*)(buffer->data)));
buffer->data += sizeof(uint16_t);
buffer->size -= sizeof(uint16_t);
return true;
}
static bool
read_uint32(Buffer* buffer, uint32_t* output, const char* tag)
{
if (buffer->size < sizeof(uint32_t))
{
fprintf(stderr,
"%s: can't read uint32_t (%d bytes) value: %d",
tag,
(int)sizeof(uint32_t),
(int)(buffer->size));
return false;
}
*output = ntohl(*((uint32_t*)(buffer->data)));
buffer->data += sizeof(uint32_t);
buffer->size -= sizeof(uint32_t);
return true;
}
static bool
parse_header(Buffer* buffer)
{
uint32_t flags;
uint32_t extensionLength;
if (buffer->size < signatureSize)
{
fprintf(stderr,
"Signature (%d bytes) doesn't exist: %d",
(int)signatureSize,
(int)(buffer->size));
return false;
}
if (memcmp(buffer->data, signature, signatureSize) != 0)
{
fprintf(stderr, "Wrong signature: <%.*s>", (int)signatureSize, buffer->data);
return false;
}
buffer->data += signatureSize;
buffer->size -= signatureSize;
if (!read_uint32(buffer, &flags, "header: flags"))
{
return false;
}
if (!read_uint32(buffer, &extensionLength, "header: extension length"))
{
return false;
}
if (buffer->size < extensionLength)
{
fprintf(stderr,
"Too large header extension length: %d: %d",
(int)extensionLength,
(int)(buffer->size));
return false;
}
buffer->data += extensionLength;
buffer->size -= extensionLength;
return true;
}
static bool
parse_tuples(Buffer* buffer, Oid* types, bool* finished)
{
while (buffer->size > 0)
{
uint16_t i;
uint16_t nFields;
if (!read_uint16(buffer, &nFields, "tuple: number of fields"))
{
return false;
}
if (nFields == (uint16_t)-1)
{
*finished = true;
return true;
}
for (i = 0; i < nFields; i++)
{
Oid type = types[i];
uint32_t size;
if (!read_uint32(buffer, &size, "tuple: field size"))
{
return false;
}
if (size == (uint32_t)-1)
{
/* NULL */
continue;
}
switch (type)
{
case INT4OID:
{
uint32_t value;
if (!read_uint32(buffer, &value, "tuple: field: integer"))
{
return false;
}
/* printf("%d\n", (int32_t)value); */
break;
}
case TEXTOID:
{
/* printf("%.*s\n", (int)size, buffer->data); */
buffer->data += size;
buffer->size -= size;
break;
}
default:
fprintf(stderr,
"tuple: field: %u: unsupported type: %u: %u: %d\n",
i,
type,
size,
(int)(buffer->size));
return false;
}
}
}
return true;
}
int
main(int argc, char** argv)
{
PGconn* connection;
PGresult* result;
Oid* types = NULL;
struct timeval before;
struct timeval after;
bool inTuples = false;
if (getenv("PGDATABASE"))
{
connection = PQconnectdb("");
}
else
{
connection = PQconnectdb("dbname=afs_benchmark");
}
if (PQstatus(connection) != CONNECTION_OK)
{
fprintf(stderr, "failed to connect: %s\n", PQerrorMessage(connection));
PQfinish(connection);
return EXIT_FAILURE;
}
gettimeofday(&before, NULL);
result = PQprepare(connection, "", "SELECT * FROM data", 0, NULL);
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
fprintf(stderr,
"failed to prepare to infer schema: %s\n",
PQerrorMessage(connection));
PQclear(result);
PQfinish(connection);
return EXIT_FAILURE;
}
PQclear(result);
result = PQdescribePrepared(connection, "");
if (PQresultStatus(result) != PGRES_COMMAND_OK)
{
fprintf(stderr,
"failed to describe prepared statement to infer schema: %s\n",
PQerrorMessage(connection));
PQclear(result);
PQfinish(connection);
return EXIT_FAILURE;
}
{
int i;
int nFields = PQnfields(result);
types = malloc(sizeof(Oid) * PQnfields(result));
for (i = 0; i < nFields; i++)
{
types[i] = PQftype(result, i);
}
}
PQclear(result);
result = PQexec(connection, "COPY data TO STDOUT (FORMAT binary)");
if (PQresultStatus(result) != PGRES_COPY_OUT)
{
fprintf(stderr, "failed to copy: %s\n", PQerrorMessage(connection));
free(types);
PQclear(result);
PQfinish(connection);
return EXIT_FAILURE;
}
while (true)
{
char* data;
bool finished = false;
Buffer buffer;
int size = PQgetCopyData(connection, &data, 0);
if (size == -1)
{
break;
}
if (size == -2)
{
fprintf(stderr, "failed to read copy data: %s\n", PQerrorMessage(connection));
free(types);
PQclear(result);
PQfinish(connection);
return EXIT_FAILURE;
}
buffer.data = data;
buffer.size = (size_t)size;
if (!inTuples)
{
if (!parse_header(&buffer))
{
free(types);
PQclear(result);
PQfinish(connection);
return EXIT_FAILURE;
}
inTuples = true;
}
if (!parse_tuples(&buffer, types, &finished))
{
free(types);
PQclear(result);
PQfinish(connection);
return EXIT_FAILURE;
}
free(data);
if (finished)
{
break;
}
}
gettimeofday(&after, NULL);
printf("%.3f\n",
(after.tv_sec + (after.tv_usec / 1000000.0)) -
(before.tv_sec + (before.tv_usec / 1000000.0)));
free(types);
PQclear(result);
PQfinish(connection);
return EXIT_SUCCESS;
}