in phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java [95:217]
public void upsertEvents(List<Event> events) throws SQLException {
if (events == null){
throw new NullPointerException();
}
if (connection == null){
throw new NullPointerException();
}
if (this.upsertStatement == null){
throw new NullPointerException();
}
if (!isProperMapping) {
throw new IllegalArgumentException("Please verify fields mapping is not properly done..");
}
boolean wasAutoCommit = connection.getAutoCommit();
connection.setAutoCommit(false);
try (PreparedStatement colUpsert = connection.prepareStatement(upsertStatement)) {
String value = null;
Integer sqlType = null;
JSONObject inputJson = new JSONObject();
for (Event event : events) {
byte[] payloadBytes = event.getBody();
if (payloadBytes == null || payloadBytes.length == 0) {
continue;
}
String payload = new String(payloadBytes);
try {
inputJson = new JSONObject(payload);
} catch (Exception e) {
logger.debug("payload is not proper json");
continue;
}
Map<String, String> data = new HashMap<String, String>();
for (String colName : colNames) {
String pattern = colName;
if (jsonSchema.has(colName)) {
Object obj = jsonSchema.opt(colName);
if (null != obj) {
pattern = obj.toString();
}
}
pattern = "$." + pattern;
value = getPatternData(inputJson, pattern);
// if field mapping data is null then look for column data
if (null == value && partialSchema) {
pattern = "$." + colName;
value = getPatternData(inputJson, pattern);
}
data.put(colName, value);
}
Collection<String> values = data.values();
if (values.contains(null)) {
logger.debug("payload data {} doesn't match the fields mapping {} ", inputJson, jsonSchema);
continue;
}
int index = 1;
int offset = 0;
for (int i = 0; i < colNames.size(); i++, offset++) {
if (columnMetadata[offset] == null) {
continue;
}
String colName = colNames.get(i);
value = data.get(colName);
sqlType = columnMetadata[offset].getSqlType();
PDataType pDataType = PDataType.fromTypeId(sqlType);
Object upsertValue;
if (pDataType.isArrayType()) {
JSONArray jsonArray = new JSONArray(new JSONTokener(value));
Object[] vals = new Object[jsonArray.length()];
for (int x = 0; x < jsonArray.length(); x++) {
vals[x] = jsonArray.get(x);
}
String baseTypeSqlName = PDataType.arrayBaseType(pDataType).getSqlTypeName();
Array array = connection.createArrayOf(baseTypeSqlName, vals);
upsertValue = pDataType.toObject(array, pDataType);
} else {
upsertValue = pDataType.toObject(value);
}
if (upsertValue != null) {
colUpsert.setObject(index++, upsertValue, sqlType);
} else {
colUpsert.setNull(index++, sqlType);
}
}
// add headers if necessary
Map<String, String> headerValues = event.getHeaders();
for (int i = 0; i < headers.size(); i++, offset++) {
String headerName = headers.get(i);
String headerValue = headerValues.get(headerName);
sqlType = columnMetadata[offset].getSqlType();
Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue);
if (upsertValue != null) {
colUpsert.setObject(index++, upsertValue, sqlType);
} else {
colUpsert.setNull(index++, sqlType);
}
}
if (autoGenerateKey) {
sqlType = columnMetadata[offset].getSqlType();
String generatedRowValue = this.keyGenerator.generate();
Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
colUpsert.setObject(index++, rowkeyValue, sqlType);
}
colUpsert.execute();
}
connection.commit();
} catch (Exception ex) {
logger.error("An error {} occurred during persisting the event ", ex.getMessage());
throw new SQLException(ex.getMessage());
} finally {
if (wasAutoCommit) {
connection.setAutoCommit(true);
}
}
}