public void run()

in metron-platform/metron-data-management/src/main/java/org/apache/metron/dataloads/nonbulk/taxii/TaxiiHandler.java [166:249]


  public void run() {
    if(inProgress) {
      return;
    }
    Date ts = new Date();
    LOG.info("Polling...{}", new SimpleDateFormat().format(ts));
    try {
      inProgress = true;
      // Prepare the message to send.
      String sessionID = MessageHelper.generateMessageId();
      PollRequest request = messageFactory.get().createPollRequest()
          .withMessageId(sessionID)
          .withCollectionName(collection);
      if (subscriptionId != null) {
        request = request.withSubscriptionID(subscriptionId);
      } else {
        request = request.withPollParameters(messageFactory.get().createPollParametersType());
      }
      if (beginTime != null) {
        Calendar gc = GregorianCalendar.getInstance();
        gc.setTime(beginTime);
        XMLGregorianCalendar gTime = null;
        try {
          gTime = DatatypeFactory.newInstance().newXMLGregorianCalendar((GregorianCalendar) gc).normalize();
        } catch (DatatypeConfigurationException e) {
          RuntimeErrors.ILLEGAL_STATE.throwRuntime("Unable to set the begin time due to", e);
        }
        gTime.setFractionalSecond(null);
        LOG.info("Begin Time: {}", gTime);
        request.setExclusiveBeginTimestamp(gTime);
      }

      try {
        PollResponse response = call(request, PollResponse.class);
        LOG.info("Got Poll Response with {} blocks", response.getContentBlocks().size());
        int numProcessed = 0;
        long avgTimeMS = 0;
        long timeStartedBlock = System.currentTimeMillis();
        for (ContentBlock block : response.getContentBlocks()) {
          AnyMixedContentType content = block.getContent();
          for (Object o : content.getContent()) {
            numProcessed++;
            long timeS = System.currentTimeMillis();
            String xml = null;
            if (o instanceof Element) {
              Element element = (Element) o;
              xml = getStringFromDocument(element.getOwnerDocument());
              if(LOG.isDebugEnabled() && Math.random() < 0.01) {
                LOG.debug("Random Stix doc: {}", xml);
              }
              for (LookupKV<EnrichmentKey, EnrichmentValue> kv : extractor.extract(xml)) {
                if(allowedIndicatorTypes.isEmpty()
                || allowedIndicatorTypes.contains(kv.getKey().type)
                  )
                {
                  kv.getValue().getMetadata().put("source_type", "taxii");
                  kv.getValue().getMetadata().put("taxii_url", endpoint.toString());
                  kv.getValue().getMetadata().put("taxii_collection", collection);
                  Put p = converter.toPut(columnFamily, kv.getKey(), kv.getValue());
                  Table table = getTable(hbaseTable);
                  table.put(p);
                  LOG.info("Found Threat Intel: {} => ", kv.getKey(), kv.getValue());
                }
              }
            }
            avgTimeMS += System.currentTimeMillis() - timeS;
          }
          if( (numProcessed + 1) % 100 == 0) {
            LOG.info("Processed {}  in {} ms, avg time: {}", numProcessed, System.currentTimeMillis() - timeStartedBlock, avgTimeMS / content.getContent().size());
            timeStartedBlock = System.currentTimeMillis();
            avgTimeMS = 0;
            numProcessed = 0;
          }
        }
      } catch (Exception e) {
        LOG.error(e.getMessage(), e);
        throw new RuntimeException("Unable to make request", e);
      }
    }
    finally {
      inProgress = false;
      beginTime = ts;
    }
  }