in results.c [3386:3558]
static SQLLEN LoadFromKeyset(StatementClass *stmt, QResultClass * res, int rows_per_fetch, SQLLEN limitrow)
{
CSTR func = "LoadFromKeyset";
ConnectionClass *conn = SC_get_conn(stmt);
SQLLEN i;
int j, rowc, rcnt = 0;
OID oid;
UInt4 blocknum;
SQLLEN kres_ridx;
UInt2 offset;
PQExpBufferData qval = {0};
int keys_per_fetch = 10;
#define return DONT_CALL_RETURN_FROM_HERE???
for (i = SC_get_rowset_start(stmt), kres_ridx = GIdx2KResIdx(i, stmt, res), rowc = 0;; i++, kres_ridx++)
{
if (i >= limitrow)
{
if (!rowc)
break;
if (res->reload_count > 0)
{
for (j = rowc; j < keys_per_fetch; j++)
{
appendPQExpBufferStr(&qval, j ? ",NULL" : "NULL");
}
}
rowc = -1; /* end of loop */
}
if (rowc < 0 || rowc >= keys_per_fetch)
{
QResultClass *qres;
appendPQExpBufferStr(&qval, ")");
if (PQExpBufferDataBroken(qval))
{
rcnt = -1;
SC_set_error(stmt, STMT_NO_MEMORY_ERROR, "Out of memory in LoadFromKeyset()", func);
goto cleanup;
}
qres = CC_send_query(conn, qval.data, NULL, CREATE_KEYSET | READ_ONLY_QUERY, stmt);
if (QR_command_maybe_successful(qres))
{
SQLLEN j, k, l;
Int2 m;
TupleField *tuple, *tuplew;
UInt4 bln;
UInt2 off;
for (j = 0; j < QR_get_num_total_read(qres); j++)
{
oid = getOid(qres, j);
getTid(qres, j, &blocknum, &offset);
for (k = SC_get_rowset_start(stmt); k < limitrow; k++)
{
getTid(res, k, &bln, &off);
if (oid == getOid(res, k) &&
bln == blocknum &&
off == offset)
{
l = GIdx2CacheIdx(k, stmt, res);
tuple = res->backend_tuples + res->num_fields * l;
tuplew = qres->backend_tuples + qres->num_fields * j;
for (m = 0; m < res->num_fields; m++, tuple++, tuplew++)
{
if (tuple->len > 0 && tuple->value)
free(tuple->value);
tuple->value = tuplew->value;
tuple->len = tuplew->len;
tuplew->value = NULL;
tuplew->len = -1;
}
res->keyset[k].status &= ~CURS_NEEDS_REREAD;
break;
}
}
}
}
else
{
SC_set_error(stmt, STMT_EXEC_ERROR, "Data Load Error", func);
rcnt = -1;
QR_Destructor(qres);
break;
}
QR_Destructor(qres);
if (rowc < 0)
break;
rowc = 0;
}
if (!rowc)
{
if (PQExpBufferDataBroken(qval))
{
initPQExpBuffer(&qval);
if (res->reload_count > 0)
keys_per_fetch = res->reload_count;
else
{
char planname[32];
int j;
QResultClass *qres;
if (rows_per_fetch >= pre_fetch_count * 2)
keys_per_fetch = pre_fetch_count;
else
keys_per_fetch = rows_per_fetch;
if (!keys_per_fetch)
keys_per_fetch = 2;
SPRINTF_FIXED(planname, "_KEYSET_%p", res);
printfPQExpBuffer(&qval, "PREPARE \"%s\"", planname);
for (j = 0; j < keys_per_fetch; j++)
{
appendPQExpBufferStr(&qval, j ? ",tid" : "(tid");
}
appendPQExpBuffer(&qval, ") as %s where ctid in ", stmt->load_statement);
for (j = 0; j < keys_per_fetch; j++)
{
if (j == 0)
appendPQExpBufferStr(&qval, "($1");
else
appendPQExpBuffer(&qval, ",$%d", j + 1);
}
appendPQExpBufferStr(&qval, ")");
if (PQExpBufferDataBroken(qval))
{
rcnt = -1;
SC_set_error(stmt, STMT_NO_MEMORY_ERROR, "Out of memory in LoadFromKeyset()", func);
goto cleanup;
}
qres = CC_send_query(conn, qval.data, NULL, READ_ONLY_QUERY, stmt);
if (QR_command_maybe_successful(qres))
{
res->reload_count = keys_per_fetch;
}
else
{
SC_set_error(stmt, STMT_EXEC_ERROR, "Prepare for Data Load Error", func);
rcnt = -1;
SC_set_Result(stmt, qres);
break;
}
QR_Destructor(qres);
}
}
if (res->reload_count > 0)
{
printfPQExpBuffer(&qval, "EXECUTE \"_KEYSET_%p\"(", res);
}
else
{
printfPQExpBuffer(&qval, "%s where ctid in (", stmt->load_statement);
}
}
if (rcnt >= 0 &&
0 != (res->keyset[kres_ridx].status & CURS_NEEDS_REREAD))
{
getTid(res, kres_ridx, &blocknum, &offset);
if (rowc)
appendPQExpBuffer(&qval, ",'(%u,%u)'", blocknum, offset);
else
appendPQExpBuffer(&qval, "'(%u,%u)'", blocknum, offset);
rowc++;
rcnt++;
}
}
cleanup:
#undef return
if (!PQExpBufferDataBroken(qval))
termPQExpBuffer(&qval);
return rcnt;
}