phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/CsvEventSerializer.java [136:181]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
						Object[] vals = new Object[jsonArray.length()];
						for (int x = 0; x < jsonArray.length(); x++) {
							vals[x] = jsonArray.get(x);
						}
						String baseTypeSqlName = PDataType.arrayBaseType(pDataType).getSqlTypeName();
						Array array = connection.createArrayOf(baseTypeSqlName, vals);
						upsertValue = pDataType.toObject(array, pDataType);
					} else {
						upsertValue = pDataType.toObject(value);
					}
					if (upsertValue != null) {
						colUpsert.setObject(index++, upsertValue, sqlType);
					} else {
						colUpsert.setNull(index++, sqlType);
					}
				}

				// add headers if necessary
				Map<String, String> headerValues = event.getHeaders();
				for (int i = 0; i < headers.size(); i++, offset++) {
					String headerName = headers.get(i);
					String headerValue = headerValues.get(headerName);
					sqlType = columnMetadata[offset].getSqlType();
					Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue);
					if (upsertValue != null) {
						colUpsert.setObject(index++, upsertValue, sqlType);
					} else {
						colUpsert.setNull(index++, sqlType);
					}
				}

				if (autoGenerateKey) {
					sqlType = columnMetadata[offset].getSqlType();
					String generatedRowValue = this.keyGenerator.generate();
					Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
					colUpsert.setObject(index++, rowkeyValue, sqlType);
				}
				colUpsert.execute();
			}
			connection.commit();
		} catch (Exception ex) {
			logger.error("An error {} occurred during persisting the event ", ex.getMessage());
			throw new SQLException(ex.getMessage());
		} finally {
			if (wasAutoCommit) {
				connection.setAutoCommit(true);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/JsonEventSerializer.java [168:213]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
						Object[] vals = new Object[jsonArray.length()];
						for (int x = 0; x < jsonArray.length(); x++) {
							vals[x] = jsonArray.get(x);
						}
						String baseTypeSqlName = PDataType.arrayBaseType(pDataType).getSqlTypeName();
						Array array = connection.createArrayOf(baseTypeSqlName, vals);
						upsertValue = pDataType.toObject(array, pDataType);
					} else {
						upsertValue = pDataType.toObject(value);
					}
					if (upsertValue != null) {
						colUpsert.setObject(index++, upsertValue, sqlType);
					} else {
						colUpsert.setNull(index++, sqlType);
					}
				}

				// add headers if necessary
				Map<String, String> headerValues = event.getHeaders();
				for (int i = 0; i < headers.size(); i++, offset++) {
					String headerName = headers.get(i);
					String headerValue = headerValues.get(headerName);
					sqlType = columnMetadata[offset].getSqlType();
					Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue);
					if (upsertValue != null) {
						colUpsert.setObject(index++, upsertValue, sqlType);
					} else {
						colUpsert.setNull(index++, sqlType);
					}
				}

				if (autoGenerateKey) {
					sqlType = columnMetadata[offset].getSqlType();
					String generatedRowValue = this.keyGenerator.generate();
					Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
					colUpsert.setObject(index++, rowkeyValue, sqlType);
				}
				colUpsert.execute();
			}
			connection.commit();
		} catch (Exception ex) {
			logger.error("An error {} occurred during persisting the event ", ex.getMessage());
			throw new SQLException(ex.getMessage());
		} finally {
			if (wasAutoCommit) {
				connection.setAutoCommit(true);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



