in streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/http/RiakHttpPersistWriter.java [74:165]
public void write(StreamsDatum entry) {
Objects.nonNull(client);
String id = null;
String document;
String bucket;
String bucketType;
String contentType;
String charset;
if( StringUtils.isNotBlank(entry.getId())) {
id = entry.getId();
}
if( entry.getDocument() instanceof String) {
document = (String)entry.getDocument();
} else {
try {
document = MAPPER.writeValueAsString(entry.getDocument());
} catch( Exception e ) {
LOGGER.warn("Exception", e);
return;
}
}
if( entry.getMetadata() != null
&& entry.getMetadata().containsKey("bucket")
&& entry.getMetadata().get("bucket") instanceof String
&& StringUtils.isNotBlank((String)entry.getMetadata().get("bucket") )) {
bucket = (String)entry.getMetadata().get("bucket");
} else {
bucket = configuration.getDefaultBucket();
}
if( entry.getMetadata() != null
&& entry.getMetadata().containsKey("bucketType")
&& entry.getMetadata().get("bucketType") instanceof String
&& StringUtils.isNotBlank((String)entry.getMetadata().get("bucketType") )) {
bucketType = (String)entry.getMetadata().get("bucketType");
} else {
bucketType = configuration.getDefaultBucketType();
}
if( entry.getMetadata() != null
&& entry.getMetadata().containsKey("charset")
&& entry.getMetadata().get("charset") instanceof String
&& StringUtils.isNotBlank((String)entry.getMetadata().get("charset") )) {
charset = (String)entry.getMetadata().get("charset");
} else {
charset = configuration.getDefaultCharset();
}
if( entry.getMetadata() != null
&& entry.getMetadata().containsKey("contentType")
&& entry.getMetadata().get("contentType") instanceof String
&& StringUtils.isNotBlank((String)entry.getMetadata().get("contentType") )) {
contentType = (String)entry.getMetadata().get("contentType");
} else {
contentType = configuration.getDefaultContentType();
}
URIBuilder uriBuilder = new URIBuilder(client.baseURI);
if( bucket != null && StringUtils.isNotBlank(bucket)) {
uriBuilder.setPath("/riak/"+bucket);
}
if( id != null && StringUtils.isNotBlank(id)) {
uriBuilder.setPath("/riak/"+bucket+"/"+id);
}
URI uri;
try {
uri = uriBuilder.build();
} catch (URISyntaxException e) {
LOGGER.warn("URISyntaxException", e);
return;
}
HttpPost post = new HttpPost();
post.setHeader("Content-Type", contentType + "; charset=" + charset);
post.setURI(uri);
HttpEntity entity;
try {
entity = new StringEntity(document);
post.setEntity(entity);
} catch (UnsupportedEncodingException e) {
LOGGER.warn("UnsupportedEncodingException", e);
return;
}
try {
HttpResponse response = client.client().execute(post);
} catch (IOException e) {
LOGGER.warn("IOException", e);
return;
}
}