public void upsertEvents()

in phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java [85:185]


	public void upsertEvents(List<Event> events) throws SQLException {
		if(events == null){
			throw new NullPointerException();
		}
		if(connection == null){
			throw new NullPointerException();
		}
		if(this.upsertStatement == null){
			throw new NullPointerException();
		}

		boolean wasAutoCommit = connection.getAutoCommit();
		connection.setAutoCommit(false);
		try (PreparedStatement colUpsert = connection.prepareStatement(upsertStatement)) {
			String value = null;
			Integer sqlType = null;
			for (Event event : events) {
				byte[] payloadBytes = event.getBody();
				if (payloadBytes == null || payloadBytes.length == 0) {
					continue;
				}
				String payload = new String(payloadBytes);
				CSVRecord csvRecord = csvLineParser.parse(payload);
				if (colNames.size() != csvRecord.size()) {
					logger.debug("payload data {} doesn't match the fields mapping {} ", payload, colNames);
					continue;
				}
				Map<String, String> data = new HashMap<String, String>();
				for (int i = 0; i < csvRecord.size(); i++) {
					data.put(colNames.get(i), csvRecord.get(i));
				}
				Collection<String> values = data.values();
				if (values.contains(null)) {
					logger.debug("payload data {} doesn't match the fields mapping {} ", payload, colNames);
					continue;
				}

				int index = 1;
				int offset = 0;
				for (int i = 0; i < colNames.size(); i++, offset++) {
					if (columnMetadata[offset] == null) {
						continue;
					}
					String colName = colNames.get(i);
					value = data.get(colName);
					sqlType = columnMetadata[offset].getSqlType();
					PDataType pDataType = PDataType.fromTypeId(sqlType);
					Object upsertValue;
					if (pDataType.isArrayType()) {
						String arrayJson = Arrays.toString(value.split(csvArrayDelimiter));
						JSONArray jsonArray = new JSONArray(new JSONTokener(arrayJson));
						Object[] vals = new Object[jsonArray.length()];
						for (int x = 0; x < jsonArray.length(); x++) {
							vals[x] = jsonArray.get(x);
						}
						String baseTypeSqlName = PDataType.arrayBaseType(pDataType).getSqlTypeName();
						Array array = connection.createArrayOf(baseTypeSqlName, vals);
						upsertValue = pDataType.toObject(array, pDataType);
					} else {
						upsertValue = pDataType.toObject(value);
					}
					if (upsertValue != null) {
						colUpsert.setObject(index++, upsertValue, sqlType);
					} else {
						colUpsert.setNull(index++, sqlType);
					}
				}

				// add headers if necessary
				Map<String, String> headerValues = event.getHeaders();
				for (int i = 0; i < headers.size(); i++, offset++) {
					String headerName = headers.get(i);
					String headerValue = headerValues.get(headerName);
					sqlType = columnMetadata[offset].getSqlType();
					Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue);
					if (upsertValue != null) {
						colUpsert.setObject(index++, upsertValue, sqlType);
					} else {
						colUpsert.setNull(index++, sqlType);
					}
				}

				if (autoGenerateKey) {
					sqlType = columnMetadata[offset].getSqlType();
					String generatedRowValue = this.keyGenerator.generate();
					Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
					colUpsert.setObject(index++, rowkeyValue, sqlType);
				}
				colUpsert.execute();
			}
			connection.commit();
		} catch (Exception ex) {
			logger.error("An error {} occurred during persisting the event ", ex.getMessage());
			throw new SQLException(ex.getMessage());
		} finally {
			if (wasAutoCommit) {
				connection.setAutoCommit(true);
			}
		}

	}