in src/Apache.IoTDB/SessionPool.cs [426:490]
private async Task<Client> CreateAndOpen(string host, int port, bool enableRpcCompression, int timeout, string sqlDialect, string database, CancellationToken cancellationToken = default)
{
var tcpClient = new TcpClient(host, port);
tcpClient.SendTimeout = timeout;
tcpClient.ReceiveTimeout = timeout;
var transport = new TFramedTransport(new TSocketTransport(tcpClient, null));
if (!transport.IsOpen)
{
await transport.OpenAsync(cancellationToken);
}
var client = enableRpcCompression ?
new IClientRPCService.Client(new TCompactProtocol(transport)) :
new IClientRPCService.Client(new TBinaryProtocol(transport));
var openReq = new TSOpenSessionReq(ProtocolVersion, _zoneId, _username)
{
Password = _password,
};
if (openReq.Configuration == null)
{
openReq.Configuration = new Dictionary<string, string>();
}
openReq.Configuration.Add("sql_dialect", sqlDialect);
if (!String.IsNullOrEmpty(database))
{
openReq.Configuration.Add("db", database);
}
try
{
var openResp = await client.openSessionAsync(openReq, cancellationToken);
if (openResp.ServerProtocolVersion != ProtocolVersion)
{
throw new TException($"Protocol Differ, Client version is {ProtocolVersion} but Server version is {openResp.ServerProtocolVersion}", null);
}
if (openResp.ServerProtocolVersion == 0)
{
throw new TException("Protocol not supported", null);
}
var sessionId = openResp.SessionId;
var statementId = await client.requestStatementIdAsync(sessionId, cancellationToken);
var endpoint = new TEndPoint(host, port);
var returnClient = new Client(
client,
sessionId,
statementId,
transport,
endpoint);
return returnClient;
}
catch (Exception)
{
transport.Close();
throw;
}
}