public void reduce()

in src/java/org/apache/nutch/tools/warc/WARCExporter.java [115:404]


      public void reduce(Text key, Iterable<NutchWritable> values,
          Context context) throws IOException, InterruptedException {
        boolean onlySuccessfulResponses = context.getConfiguration()
            .getBoolean(ONLY_SUCCESSFUL_RESPONSES, false);
        ParseData parseData = null;
        ParseText parseText = null;
        Content content = null;
        CrawlDatum cd = null;
        SimpleDateFormat warcdf = new SimpleDateFormat(
            "yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ENGLISH);

        // aggregate the values found
        for (NutchWritable val : values) {
          final Writable value = val.get(); // unwrap
          if (value instanceof Content) {
            content = (Content) value;
            continue;
          }
          if (value instanceof CrawlDatum) {
            cd = (CrawlDatum) value;
            continue;
          }
          if (value instanceof ParseData) {
            parseData = (ParseData) value;
            continue;
          }
          if (value instanceof ParseText) {
            parseText = (ParseText) value;
            continue;
          }
        }

        // check that we have everything we need
        if (content == null) {
          LOG.info("Missing content for {}", key);
          context.getCounter("WARCExporter", "missing content").increment(1);
          return;
        }

        if (cd == null) {
          LOG.info("Missing fetch datum for {}", key);
          context.getCounter("WARCExporter", "missing metadata").increment(1);
          return;
        }

        if (onlySuccessfulResponses) {
          // Empty responses is everything that was not a regular response
          if (!(cd.getStatus() == CrawlDatum.STATUS_FETCH_SUCCESS
              || cd.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED)) {
            context.getCounter("WARCExporter", "omitted empty response")
                .increment(1);
            return;
          }
        }

        // were the headers stored as is? Can write a response element then
        String headersVerbatim = content.getMetadata()
            .get("_response.headers_");
        headersVerbatim = WARCUtils.fixHttpHeaders(headersVerbatim,
            content.getContent().length);
        byte[] httpheaders = new byte[0];
        if (StringUtils.isNotBlank(headersVerbatim)) {
          // check that ends with an empty line
          if (!headersVerbatim.endsWith(CRLF + CRLF)) {
            headersVerbatim += CRLF + CRLF;
          }
          httpheaders = headersVerbatim.getBytes();
        }

        String mainId = UUID.randomUUID().toString();
        StringBuilder buffer = new StringBuilder();
        buffer.append(WARCRecord.WARC_VERSION);
        buffer.append(CRLF);
        buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
            .append(mainId).append(">").append(CRLF);

        int contentLength = 0;
        if (content != null) {
          contentLength = content.getContent().length;
        }

        // add the length of the http header
        contentLength += httpheaders.length;

        buffer.append("Content-Length").append(": ")
            .append(Integer.toString(contentLength)).append(CRLF);

        Date fetchedDate = new Date(cd.getFetchTime());
        buffer.append("WARC-Date").append(": ")
            .append(warcdf.format(fetchedDate)).append(CRLF);

        // check if http headers have been stored verbatim
        // if not generate a response instead
        String warcTypeValue = "resource";

        if (StringUtils.isNotBlank(headersVerbatim)) {
          warcTypeValue = "response";
        }

        buffer.append("WARC-Type").append(": ").append(warcTypeValue)
            .append(CRLF);

        // "WARC-IP-Address" if present
        String IP = content.getMetadata().get("_ip_");
        if (StringUtils.isNotBlank(IP)) {
          buffer.append("WARC-IP-Address").append(": ").append("IP")
              .append(CRLF);
        }

        // detect if truncated only for fetch success
        String status = CrawlDatum.getStatusName(cd.getStatus());
        if (status.equalsIgnoreCase("STATUS_FETCH_SUCCESS")
            && ParseSegment.isTruncated(content)) {
          buffer.append("WARC-Truncated").append(": ").append("unspecified")
              .append(CRLF);
        }

        // must be a valid URI
        try {
          String normalised = key.toString().replaceAll(" ", "%20");
          URI uri = URI.create(normalised);
          buffer.append("WARC-Target-URI").append(": ")
              .append(uri.toASCIIString()).append(CRLF);
        } catch (Exception e) {
          LOG.error("Invalid URI {} ", key);
          context.getCounter("WARCExporter", "invalid URI").increment(1);
          return;
        }

        // provide a ContentType if type response
        if (warcTypeValue.equals("response")) {
          buffer.append("Content-Type: application/http; msgtype=response")
              .append(CRLF);
        }

        // finished writing the WARC headers, now let's serialize it

        ByteArrayOutputStream bos = new ByteArrayOutputStream();

        // store the headers
        bos.write(buffer.toString().getBytes("UTF-8"));
        bos.write(CRLF_BYTES);
        // the http headers
        bos.write(httpheaders);

        // the binary content itself
        if (content.getContent() != null) {
          bos.write(content.getContent());
        }
        bos.write(CRLF_BYTES);
        bos.write(CRLF_BYTES);

        try {
          DataInput in = new DataInputStream(
              new ByteArrayInputStream(bos.toByteArray()));
          WARCRecord record = new WARCRecord(in);
          context.write(NullWritable.get(), new WARCWritable(record));
          context.getCounter("WARCExporter", "records generated").increment(1);
        } catch (IOException | IllegalStateException exception) {
          LOG.error(
              "Exception when generating WARC resource record for {} : {}", key,
              exception.getMessage());
          context.getCounter("WARCExporter", "exception").increment(1);
        }

        // Do we need to emit a metadata record too?
        if (parseData != null) {
          // Header builder
          buffer = new StringBuilder();

          JsonObject jsonObject = new JsonObject();
          jsonObject.add("contentMeta",
              metadataToJson(parseData.getContentMeta()));
          jsonObject.add("parseMeta", metadataToJson(parseData.getParseMeta()));

          // Payload builder
          StringBuilder payload = new StringBuilder();
          payload.append(gson.toJson(jsonObject));
          payload.append(CRLF);

          buffer.append(WARCRecord.WARC_VERSION);
          buffer.append(CRLF);
          buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
              .append(UUID.randomUUID().toString()).append(">").append(CRLF);
          buffer.append("WARC-Refers-To").append(": ").append("<urn:uuid:")
              .append(mainId).append(">").append(CRLF);
          buffer.append("WARC-Date").append(": ")
              .append(warcdf.format(fetchedDate)).append(CRLF);
          buffer.append("WARC-Type").append(": ").append("metadata")
              .append(CRLF);
          buffer.append("Content-Type").append(": ").append("application/json")
              .append(CRLF);

          contentLength = payload.toString().getBytes("UTF-8").length;
          buffer.append("Content-Length").append(": ")
              .append(Integer.toString(contentLength)).append(CRLF);

          try {
            String normalised = key.toString().replaceAll(" ", "%20");
            URI uri = URI.create(normalised);
            buffer.append("WARC-Target-URI").append(": ")
                .append(uri.toASCIIString()).append(CRLF);
          } catch (Exception e) {
            LOG.error("Invalid URI {} ", key);
            context.getCounter("WARCExporter", "invalid URI").increment(1);
            return;
          }

          bos = new ByteArrayOutputStream();
          bos.write(buffer.toString().getBytes("UTF-8"));
          bos.write(CRLF_BYTES); // separate header and payload
          bos.write(payload.toString().getBytes("UTF-8"));
          bos.write(CRLF_BYTES);
          bos.write(CRLF_BYTES); // separation between records

          try {
            DataInput in = new DataInputStream(
                new ByteArrayInputStream(bos.toByteArray()));
            WARCRecord record = new WARCRecord(in);
            context.write(NullWritable.get(), new WARCWritable(record));
            context.getCounter("WARCExporter", "records generated")
                .increment(1);
          } catch (IOException | IllegalStateException exception) {
            LOG.error(
                "Exception when generating WARC metadata record for {} : {}",
                key, exception.getMessage(), exception);
            context.getCounter("WARCExporter", "exception").increment(1);
          }
        }

        // Do we need to emit a text record too?
        if (parseText != null) {
          // Header builder
          buffer = new StringBuilder();

          // Payload builder
          StringBuilder payload = new StringBuilder();
          payload.append(parseText);
          payload.append(CRLF);

          buffer.append(WARCRecord.WARC_VERSION);
          buffer.append(CRLF);
          buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
              .append(UUID.randomUUID().toString()).append(">").append(CRLF);
          buffer.append("WARC-Refers-To").append(": ").append("<urn:uuid:")
              .append(mainId).append(">").append(CRLF);
          buffer.append("WARC-Date").append(": ")
              .append(warcdf.format(fetchedDate)).append(CRLF);
          buffer.append("WARC-Type").append(": ").append("conversion")
              .append(CRLF);
          buffer.append("Content-Type").append(": ").append("text/plain")
              .append(CRLF);

          contentLength = payload.toString().getBytes("UTF-8").length;
          buffer.append("Content-Length").append(": ")
              .append(Integer.toString(contentLength)).append(CRLF);

          try {
            String normalised = key.toString().replaceAll(" ", "%20");
            URI uri = URI.create(normalised);
            buffer.append("WARC-Target-URI").append(": ")
                .append(uri.toASCIIString()).append(CRLF);
          } catch (Exception e) {
            LOG.error("Invalid URI {} ", key);
            context.getCounter("WARCExporter", "invalid URI").increment(1);
            return;
          }

          bos = new ByteArrayOutputStream();
          bos.write(buffer.toString().getBytes("UTF-8"));
          bos.write(CRLF_BYTES); // separate header and payload
          bos.write(payload.toString().getBytes("UTF-8"));
          bos.write(CRLF_BYTES);
          bos.write(CRLF_BYTES); // separation between records

          try {
            DataInput in = new DataInputStream(
                new ByteArrayInputStream(bos.toByteArray()));
            WARCRecord record = new WARCRecord(in);
            context.write(NullWritable.get(), new WARCWritable(record));
            context.getCounter("WARCExporter", "records generated")
                .increment(1);
          } catch (IOException | IllegalStateException exception) {
            LOG.error(
                "Exception when generating WARC metadata record for {} : {}",
                key, exception.getMessage(), exception);
            context.getCounter("WARCExporter", "exception").increment(1);
          }
        }
      }