in elasticsearch5/src/main/java/site/ycsb/db/elasticsearch5/ElasticsearchClient.java [75:139]
public void init() throws DBException {
final Properties props = getProperties();
this.indexKey = props.getProperty("es.index.key", DEFAULT_INDEX_KEY);
final int numberOfShards = parseIntegerProperty(props, "es.number_of_shards", NUMBER_OF_SHARDS);
final int numberOfReplicas = parseIntegerProperty(props, "es.number_of_replicas", NUMBER_OF_REPLICAS);
final Boolean newIndex = Boolean.parseBoolean(props.getProperty("es.new_index", "false"));
final Builder settings = Settings.builder().put("cluster.name", DEFAULT_CLUSTER_NAME);
// if properties file contains elasticsearch user defined properties
// add it to the settings file (will overwrite the defaults).
for (final Entry<Object, Object> e : props.entrySet()) {
if (e.getKey() instanceof String) {
final String key = (String) e.getKey();
if (key.startsWith("es.setting.")) {
settings.put(key.substring("es.setting.".length()), e.getValue());
}
}
}
settings.put("client.transport.sniff", true)
.put("client.transport.ignore_cluster_name", false)
.put("client.transport.ping_timeout", "30s")
.put("client.transport.nodes_sampler_interval", "30s");
// Default it to localhost:9300
final String[] nodeList = props.getProperty("es.hosts.list", DEFAULT_REMOTE_HOST).split(",");
client = new PreBuiltTransportClient(settings.build());
for (String h : nodeList) {
String[] nodes = h.split(":");
final InetAddress address;
try {
address = InetAddress.getByName(nodes[0]);
} catch (UnknownHostException e) {
throw new IllegalArgumentException("unable to identity host [" + nodes[0]+ "]", e);
}
final int port;
try {
port = Integer.parseInt(nodes[1]);
} catch (final NumberFormatException e) {
throw new IllegalArgumentException("unable to parse port [" + nodes[1] + "]", e);
}
client.addTransportAddress(new InetSocketTransportAddress(address, port));
}
final boolean exists =
client.admin().indices()
.exists(Requests.indicesExistsRequest(indexKey)).actionGet()
.isExists();
if (exists && newIndex) {
client.admin().indices().prepareDelete(indexKey).get();
}
if (!exists || newIndex) {
client.admin().indices().create(
new CreateIndexRequest(indexKey)
.settings(
Settings.builder()
.put("index.number_of_shards", numberOfShards)
.put("index.number_of_replicas", numberOfReplicas)
)).actionGet();
}
client.admin().cluster().health(new ClusterHealthRequest().waitForGreenStatus()).actionGet();
}