in streams-contrib/streams-persist-riak/src/main/java/org/apache/streams/riak/binary/RiakBinaryPersistWriter.java [71:156]
public void write(StreamsDatum entry) {
Objects.nonNull(client);
String id = null;
String document;
String bucket;
String bucketType;
String contentType;
String charset;
if( StringUtils.isNotBlank(entry.getId())) {
id = entry.getId();
}
if( entry.getDocument() instanceof String) {
document = (String)entry.getDocument();
} else {
try {
document = MAPPER.writeValueAsString(entry.getDocument());
} catch( Exception e ) {
LOGGER.warn("Exception", e);
return;
}
}
if( entry.getMetadata() != null
&& entry.getMetadata().containsKey("bucket")
&& entry.getMetadata().get("bucket") instanceof String
&& StringUtils.isNotBlank((String)entry.getMetadata().get("bucket") )) {
bucket = (String)entry.getMetadata().get("bucket");
} else {
bucket = configuration.getDefaultBucket();
}
if( entry.getMetadata() != null
&& entry.getMetadata().containsKey("bucketType")
&& entry.getMetadata().get("bucketType") instanceof String
&& StringUtils.isNotBlank((String)entry.getMetadata().get("bucketType") )) {
bucketType = (String)entry.getMetadata().get("bucketType");
} else {
bucketType = configuration.getDefaultBucketType();
}
if( entry.getMetadata() != null
&& entry.getMetadata().containsKey("charset")
&& entry.getMetadata().get("charset") instanceof String
&& StringUtils.isNotBlank((String)entry.getMetadata().get("charset") )) {
charset = (String)entry.getMetadata().get("charset");
} else {
charset = configuration.getDefaultCharset();
}
if( entry.getMetadata() != null
&& entry.getMetadata().containsKey("contentType")
&& entry.getMetadata().get("contentType") instanceof String
&& StringUtils.isNotBlank((String)entry.getMetadata().get("contentType") )) {
contentType = (String)entry.getMetadata().get("contentType");
} else {
contentType = configuration.getDefaultContentType();
}
try {
RiakObject riakObject = new RiakObject();
riakObject.setContentType(contentType);
riakObject.setCharset(charset);
riakObject.setValue(BinaryValue.create(document));
Namespace ns = new Namespace(bucketType, bucket);
StoreValue.Builder storeValueBuilder = new StoreValue.Builder(riakObject);
if( id != null && StringUtils.isNotBlank(id)) {
Location location = new Location(ns, id);
storeValueBuilder = storeValueBuilder.withLocation(location);
} else {
storeValueBuilder = storeValueBuilder.withNamespace(ns);
}
StoreValue store = storeValueBuilder.build();
StoreValue.Response storeResponse = client.client().execute(store);
LOGGER.debug("storeResponse", storeResponse);
} catch (InterruptedException e) {
e.printStackTrace();
} catch (ExecutionException e) {
e.printStackTrace();
}
}