in gremlin-client-demo/src/main/java/software/amazon/neptune/CustomSelectorsDemo.java [90:165]
public void run() {
try {
EndpointsSelector writerSelector = (cluster) -> {
List<NeptuneInstanceMetadata> endpoints = cluster.getInstances().stream()
.filter(NeptuneInstanceMetadata::isPrimary)
.filter(NeptuneInstanceMetadata::isAvailable)
.collect(Collectors.toList());
return endpoints.isEmpty() ?
new EndpointCollection(Collections.singletonList(cluster.getClusterEndpoint())) :
new EndpointCollection(endpoints);
};
EndpointsSelector readerSelector = (cluster) ->
new EndpointCollection(
cluster.getInstances().stream()
.filter(NeptuneInstanceMetadata::isReader)
.filter(NeptuneInstanceMetadata::isAvailable)
.collect(Collectors.toList()));
ClusterEndpointsRefreshAgent refreshAgent = createRefreshAgent();
GremlinCluster writerCluster = createCluster(writerSelector, refreshAgent);
GremlinCluster readerCluster = createCluster(readerSelector, refreshAgent);
GremlinClient writer = writerCluster.connect();
GremlinClient reader = readerCluster.connect();
refreshAgent.startPollingNeptuneAPI(
Arrays.asList(
RefreshTask.refresh(writer, writerSelector),
RefreshTask.refresh(reader, readerSelector)
),
60,
TimeUnit.SECONDS);
GraphTraversalSource gWriter = createGraphTraversalSource(writer);
GraphTraversalSource gReader = createGraphTraversalSource(reader);
for (int i = 0; i < queryCount; i++) {
try {
if (i % 2 == 1) {
List<Map<Object, Object>> results = gReader.V().limit(10).valueMap(true).toList();
for (Map<Object, Object> result : results) {
//Do nothing
}
} else {
gWriter.addV("TestNode").property("my-id", i).next();
}
} catch (Exception e) {
logger.warn("Error processing query: {}", e.getMessage());
}
if (i % 10000 == 0) {
System.out.println();
System.out.println("Number of queries: " + i);
}
}
refreshAgent.close();
writer.close();
reader.close();
writerCluster.close();
readerCluster.close();
} catch (Exception e) {
System.err.println("An error occurred while connecting to Neptune:");
e.printStackTrace();
System.exit(-1);
}
}