public void upsertEvents()

in phoenix5-flume/src/main/java/org/apache/phoenix/flume/serializer/RegexEventSerializer.java [66:148]


    public void upsertEvents(List<Event> events) throws SQLException {
        if (events == null){
            throw new NullPointerException();
        }
        if (connection == null){
            throw new NullPointerException();
        }
        if (this.upsertStatement == null){
            throw new NullPointerException();
        }

       boolean wasAutoCommit = connection.getAutoCommit();
       connection.setAutoCommit(false);
       try (PreparedStatement colUpsert = connection.prepareStatement(upsertStatement)) {
           String value = null;
           Integer sqlType = null;
           for(Event event : events) {
               byte [] payloadBytes = event.getBody();
               if(payloadBytes == null || payloadBytes.length == 0) {
                   continue;
               }
               String payload = new String(payloadBytes);
               Matcher m = inputPattern.matcher(payload.trim());
               
               if (!m.matches()) {
                 logger.debug("payload {} doesn't match the pattern {} ", payload, inputPattern.toString());  
                 continue;
               }
               if (m.groupCount() != colNames.size()) {
                 logger.debug("payload {} size doesn't match the pattern {} ", m.groupCount(), colNames.size());
                 continue;
               }
               int index = 1 ;
               int offset = 0;
               for (int i = 0 ; i <  colNames.size() ; i++,offset++) {
                   if (columnMetadata[offset] == null ) {
                       continue;
                   }
                   
                   value = m.group(i + 1);
                   sqlType = columnMetadata[offset].getSqlType();
                   Object upsertValue = PDataType.fromTypeId(sqlType).toObject(value);
                   if (upsertValue != null) {
                       colUpsert.setObject(index++, upsertValue, sqlType);
                   } else {
                       colUpsert.setNull(index++, sqlType);
                   }
                }
               
               //add headers if necessary
               Map<String,String> headerValues = event.getHeaders();
               for(int i = 0 ; i < headers.size() ; i++ , offset++) {
                
                   String headerName  = headers.get(i);
                   String headerValue = headerValues.get(headerName);
                   sqlType = columnMetadata[offset].getSqlType();
                   Object upsertValue = PDataType.fromTypeId(sqlType).toObject(headerValue);
                   if (upsertValue != null) {
                       colUpsert.setObject(index++, upsertValue, sqlType);
                   } else {
                       colUpsert.setNull(index++, sqlType);
                   }
               }
  
               if(autoGenerateKey) {
                   sqlType = columnMetadata[offset].getSqlType();
                   String generatedRowValue = this.keyGenerator.generate();
                   Object rowkeyValue = PDataType.fromTypeId(sqlType).toObject(generatedRowValue);
                   colUpsert.setObject(index++, rowkeyValue ,sqlType);
               } 
               colUpsert.execute();
           }
           connection.commit();
       } catch(Exception ex){
           logger.error("An error {} occurred during persisting the event ",ex.getMessage());
           throw new SQLException(ex.getMessage());
       } finally {
           if(wasAutoCommit) {
               connection.setAutoCommit(true);
           }
       }
       
    }