in src/java/org/apache/nutch/tools/warc/WARCExporter.java [115:404]
public void reduce(Text key, Iterable<NutchWritable> values,
Context context) throws IOException, InterruptedException {
boolean onlySuccessfulResponses = context.getConfiguration()
.getBoolean(ONLY_SUCCESSFUL_RESPONSES, false);
ParseData parseData = null;
ParseText parseText = null;
Content content = null;
CrawlDatum cd = null;
SimpleDateFormat warcdf = new SimpleDateFormat(
"yyyy-MM-dd'T'HH:mm:ss'Z'", Locale.ENGLISH);
// aggregate the values found
for (NutchWritable val : values) {
final Writable value = val.get(); // unwrap
if (value instanceof Content) {
content = (Content) value;
continue;
}
if (value instanceof CrawlDatum) {
cd = (CrawlDatum) value;
continue;
}
if (value instanceof ParseData) {
parseData = (ParseData) value;
continue;
}
if (value instanceof ParseText) {
parseText = (ParseText) value;
continue;
}
}
// check that we have everything we need
if (content == null) {
LOG.info("Missing content for {}", key);
context.getCounter("WARCExporter", "missing content").increment(1);
return;
}
if (cd == null) {
LOG.info("Missing fetch datum for {}", key);
context.getCounter("WARCExporter", "missing metadata").increment(1);
return;
}
if (onlySuccessfulResponses) {
// Empty responses is everything that was not a regular response
if (!(cd.getStatus() == CrawlDatum.STATUS_FETCH_SUCCESS
|| cd.getStatus() == CrawlDatum.STATUS_FETCH_NOTMODIFIED)) {
context.getCounter("WARCExporter", "omitted empty response")
.increment(1);
return;
}
}
// were the headers stored as is? Can write a response element then
String headersVerbatim = content.getMetadata()
.get("_response.headers_");
headersVerbatim = WARCUtils.fixHttpHeaders(headersVerbatim,
content.getContent().length);
byte[] httpheaders = new byte[0];
if (StringUtils.isNotBlank(headersVerbatim)) {
// check that ends with an empty line
if (!headersVerbatim.endsWith(CRLF + CRLF)) {
headersVerbatim += CRLF + CRLF;
}
httpheaders = headersVerbatim.getBytes();
}
String mainId = UUID.randomUUID().toString();
StringBuilder buffer = new StringBuilder();
buffer.append(WARCRecord.WARC_VERSION);
buffer.append(CRLF);
buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
.append(mainId).append(">").append(CRLF);
int contentLength = 0;
if (content != null) {
contentLength = content.getContent().length;
}
// add the length of the http header
contentLength += httpheaders.length;
buffer.append("Content-Length").append(": ")
.append(Integer.toString(contentLength)).append(CRLF);
Date fetchedDate = new Date(cd.getFetchTime());
buffer.append("WARC-Date").append(": ")
.append(warcdf.format(fetchedDate)).append(CRLF);
// check if http headers have been stored verbatim
// if not generate a response instead
String warcTypeValue = "resource";
if (StringUtils.isNotBlank(headersVerbatim)) {
warcTypeValue = "response";
}
buffer.append("WARC-Type").append(": ").append(warcTypeValue)
.append(CRLF);
// "WARC-IP-Address" if present
String IP = content.getMetadata().get("_ip_");
if (StringUtils.isNotBlank(IP)) {
buffer.append("WARC-IP-Address").append(": ").append("IP")
.append(CRLF);
}
// detect if truncated only for fetch success
String status = CrawlDatum.getStatusName(cd.getStatus());
if (status.equalsIgnoreCase("STATUS_FETCH_SUCCESS")
&& ParseSegment.isTruncated(content)) {
buffer.append("WARC-Truncated").append(": ").append("unspecified")
.append(CRLF);
}
// must be a valid URI
try {
String normalised = key.toString().replaceAll(" ", "%20");
URI uri = URI.create(normalised);
buffer.append("WARC-Target-URI").append(": ")
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
context.getCounter("WARCExporter", "invalid URI").increment(1);
return;
}
// provide a ContentType if type response
if (warcTypeValue.equals("response")) {
buffer.append("Content-Type: application/http; msgtype=response")
.append(CRLF);
}
// finished writing the WARC headers, now let's serialize it
ByteArrayOutputStream bos = new ByteArrayOutputStream();
// store the headers
bos.write(buffer.toString().getBytes("UTF-8"));
bos.write(CRLF_BYTES);
// the http headers
bos.write(httpheaders);
// the binary content itself
if (content.getContent() != null) {
bos.write(content.getContent());
}
bos.write(CRLF_BYTES);
bos.write(CRLF_BYTES);
try {
DataInput in = new DataInputStream(
new ByteArrayInputStream(bos.toByteArray()));
WARCRecord record = new WARCRecord(in);
context.write(NullWritable.get(), new WARCWritable(record));
context.getCounter("WARCExporter", "records generated").increment(1);
} catch (IOException | IllegalStateException exception) {
LOG.error(
"Exception when generating WARC resource record for {} : {}", key,
exception.getMessage());
context.getCounter("WARCExporter", "exception").increment(1);
}
// Do we need to emit a metadata record too?
if (parseData != null) {
// Header builder
buffer = new StringBuilder();
JsonObject jsonObject = new JsonObject();
jsonObject.add("contentMeta",
metadataToJson(parseData.getContentMeta()));
jsonObject.add("parseMeta", metadataToJson(parseData.getParseMeta()));
// Payload builder
StringBuilder payload = new StringBuilder();
payload.append(gson.toJson(jsonObject));
payload.append(CRLF);
buffer.append(WARCRecord.WARC_VERSION);
buffer.append(CRLF);
buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
.append(UUID.randomUUID().toString()).append(">").append(CRLF);
buffer.append("WARC-Refers-To").append(": ").append("<urn:uuid:")
.append(mainId).append(">").append(CRLF);
buffer.append("WARC-Date").append(": ")
.append(warcdf.format(fetchedDate)).append(CRLF);
buffer.append("WARC-Type").append(": ").append("metadata")
.append(CRLF);
buffer.append("Content-Type").append(": ").append("application/json")
.append(CRLF);
contentLength = payload.toString().getBytes("UTF-8").length;
buffer.append("Content-Length").append(": ")
.append(Integer.toString(contentLength)).append(CRLF);
try {
String normalised = key.toString().replaceAll(" ", "%20");
URI uri = URI.create(normalised);
buffer.append("WARC-Target-URI").append(": ")
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
context.getCounter("WARCExporter", "invalid URI").increment(1);
return;
}
bos = new ByteArrayOutputStream();
bos.write(buffer.toString().getBytes("UTF-8"));
bos.write(CRLF_BYTES); // separate header and payload
bos.write(payload.toString().getBytes("UTF-8"));
bos.write(CRLF_BYTES);
bos.write(CRLF_BYTES); // separation between records
try {
DataInput in = new DataInputStream(
new ByteArrayInputStream(bos.toByteArray()));
WARCRecord record = new WARCRecord(in);
context.write(NullWritable.get(), new WARCWritable(record));
context.getCounter("WARCExporter", "records generated")
.increment(1);
} catch (IOException | IllegalStateException exception) {
LOG.error(
"Exception when generating WARC metadata record for {} : {}",
key, exception.getMessage(), exception);
context.getCounter("WARCExporter", "exception").increment(1);
}
}
// Do we need to emit a text record too?
if (parseText != null) {
// Header builder
buffer = new StringBuilder();
// Payload builder
StringBuilder payload = new StringBuilder();
payload.append(parseText);
payload.append(CRLF);
buffer.append(WARCRecord.WARC_VERSION);
buffer.append(CRLF);
buffer.append("WARC-Record-ID").append(": ").append("<urn:uuid:")
.append(UUID.randomUUID().toString()).append(">").append(CRLF);
buffer.append("WARC-Refers-To").append(": ").append("<urn:uuid:")
.append(mainId).append(">").append(CRLF);
buffer.append("WARC-Date").append(": ")
.append(warcdf.format(fetchedDate)).append(CRLF);
buffer.append("WARC-Type").append(": ").append("conversion")
.append(CRLF);
buffer.append("Content-Type").append(": ").append("text/plain")
.append(CRLF);
contentLength = payload.toString().getBytes("UTF-8").length;
buffer.append("Content-Length").append(": ")
.append(Integer.toString(contentLength)).append(CRLF);
try {
String normalised = key.toString().replaceAll(" ", "%20");
URI uri = URI.create(normalised);
buffer.append("WARC-Target-URI").append(": ")
.append(uri.toASCIIString()).append(CRLF);
} catch (Exception e) {
LOG.error("Invalid URI {} ", key);
context.getCounter("WARCExporter", "invalid URI").increment(1);
return;
}
bos = new ByteArrayOutputStream();
bos.write(buffer.toString().getBytes("UTF-8"));
bos.write(CRLF_BYTES); // separate header and payload
bos.write(payload.toString().getBytes("UTF-8"));
bos.write(CRLF_BYTES);
bos.write(CRLF_BYTES); // separation between records
try {
DataInput in = new DataInputStream(
new ByteArrayInputStream(bos.toByteArray()));
WARCRecord record = new WARCRecord(in);
context.write(NullWritable.get(), new WARCWritable(record));
context.getCounter("WARCExporter", "records generated")
.increment(1);
} catch (IOException | IllegalStateException exception) {
LOG.error(
"Exception when generating WARC metadata record for {} : {}",
key, exception.getMessage(), exception);
context.getCounter("WARCExporter", "exception").increment(1);
}
}
}