in hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/mapreduce/MapperOperatorDescriptor.java [74:270]
public IOperatorNodePushable createPushRuntime(final IHyracksTaskContext ctx,
IRecordDescriptorProvider recordDescProvider, final int partition, final int nPartitions)
throws HyracksDataException {
final HadoopHelper helper = new HadoopHelper(config);
final Configuration conf = helper.getConfiguration();
final Mapper<K1, V1, K2, V2> mapper = helper.getMapper();
final InputFormat<K1, V1> inputFormat = helper.getInputFormat();
final IInputSplitProvider isp = factory.createInputSplitProvider(partition);
final TaskAttemptID taId = new TaskAttemptID("foo", jobId, true, partition, 0);
final TaskAttemptContext taskAttemptContext = helper.createTaskAttemptContext(taId);
final int framesLimit = helper.getSortFrameLimit(ctx);
final IBinaryComparatorFactory[] comparatorFactories = helper.getSortComparatorFactories();
class SortingRecordWriter extends RecordWriter<K2, V2> {
private final ArrayTupleBuilder tb;
private final IFrame frame;
private final FrameTupleAppender fta;
private ExternalSortRunGenerator runGen;
private int blockId;
public SortingRecordWriter() throws HyracksDataException {
tb = new ArrayTupleBuilder(2);
frame = new VSizeFrame(ctx);
fta = new FrameTupleAppender(frame);
}
public void initBlock(int blockId) throws HyracksDataException {
runGen = new ExternalSortRunGenerator(ctx, new int[] { 0 }, null, comparatorFactories,
helper.getMapOutputRecordDescriptorWithoutExtraFields(), Algorithm.MERGE_SORT, framesLimit);
this.blockId = blockId;
}
@Override
public void close(TaskAttemptContext arg0) throws IOException, InterruptedException {
}
@Override
public void write(K2 key, V2 value) throws IOException, InterruptedException {
DataOutput dos = tb.getDataOutput();
tb.reset();
key.write(dos);
tb.addFieldEndOffset();
value.write(dos);
tb.addFieldEndOffset();
if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
runGen.nextFrame(frame.getBuffer());
fta.reset(frame, true);
if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new HyracksDataException("Record size (" + tb.getSize() + ") larger than frame size ("
+ frame.getBuffer().capacity() + ")");
}
}
}
public void sortAndFlushBlock(final IFrameWriter writer) throws HyracksDataException {
if (fta.getTupleCount() > 0) {
runGen.nextFrame(frame.getBuffer());
fta.reset(frame, true);
}
runGen.close();
IFrameWriter delegatingWriter = new IFrameWriter() {
private final FrameTupleAppender appender = new FrameTupleAppender(new VSizeFrame(ctx));
private final FrameTupleAccessor fta = new FrameTupleAccessor(
helper.getMapOutputRecordDescriptorWithoutExtraFields());
private final ArrayTupleBuilder tb = new ArrayTupleBuilder(3);
@Override
public void open() throws HyracksDataException {
}
@Override
public void nextFrame(ByteBuffer buffer) throws HyracksDataException {
fta.reset(buffer);
int n = fta.getTupleCount();
for (int i = 0; i < n; ++i) {
tb.reset();
tb.addField(fta, i, 0);
tb.addField(fta, i, 1);
try {
tb.getDataOutput().writeInt(blockId);
} catch (IOException e) {
throw new HyracksDataException(e);
}
tb.addFieldEndOffset();
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
appender.flush(writer, true);
if (!appender.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new IllegalStateException();
}
}
}
}
@Override
public void close() throws HyracksDataException {
appender.flush(writer, true);
}
@Override
public void fail() throws HyracksDataException {
// TODO Auto-generated method stub
}
};
if (helper.hasCombiner()) {
Reducer<K2, V2, K2, V2> combiner = helper.getCombiner();
TaskAttemptID ctaId = new TaskAttemptID("foo", jobId, true, partition, 0);
TaskAttemptContext ctaskAttemptContext = helper.createTaskAttemptContext(taId);
final IFrameWriter outputWriter = delegatingWriter;
RecordWriter<K2, V2> recordWriter = new RecordWriter<K2, V2>() {
private final FrameTupleAppender fta = new FrameTupleAppender(new VSizeFrame(ctx));
private final ArrayTupleBuilder tb = new ArrayTupleBuilder(2);
{
outputWriter.open();
}
@Override
public void write(K2 key, V2 value) throws IOException, InterruptedException {
DataOutput dos = tb.getDataOutput();
tb.reset();
key.write(dos);
tb.addFieldEndOffset();
value.write(dos);
tb.addFieldEndOffset();
if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
fta.flush(outputWriter, true);
if (!fta.append(tb.getFieldEndOffsets(), tb.getByteArray(), 0, tb.getSize())) {
throw new IllegalStateException();
}
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
fta.flush(outputWriter, true);
}
};
delegatingWriter = new ReduceWriter<K2, V2, K2, V2>(ctx, helper,
new int[] { HadoopHelper.KEY_FIELD_INDEX }, helper.getGroupingComparatorFactories(),
helper.getMapOutputRecordDescriptorWithoutExtraFields(), combiner, recordWriter, ctaId,
ctaskAttemptContext);
}
IBinaryComparator[] comparators = new IBinaryComparator[comparatorFactories.length];
for (int i = 0; i < comparatorFactories.length; ++i) {
comparators[i] = comparatorFactories[i].createBinaryComparator();
}
ExternalSortRunMerger merger = new ExternalSortRunMerger(ctx, runGen.getSorter(), runGen.getRuns(),
new int[] { 0 }, comparators, null, helper.getMapOutputRecordDescriptorWithoutExtraFields(),
framesLimit, delegatingWriter);
merger.process();
}
}
return new AbstractUnaryOutputSourceOperatorNodePushable() {
@SuppressWarnings("unchecked")
@Override
public void initialize() throws HyracksDataException {
try {
writer.open();
SortingRecordWriter recordWriter = new SortingRecordWriter();
InputSplit split = null;
int blockId = 0;
while ((split = isp.next()) != null) {
try {
RecordReader<K1, V1> recordReader = inputFormat.createRecordReader(split,
taskAttemptContext);
ClassLoader ctxCL = Thread.currentThread().getContextClassLoader();
try {
Thread.currentThread().setContextClassLoader(getClass().getClassLoader());
recordReader.initialize(split, taskAttemptContext);
} finally {
Thread.currentThread().setContextClassLoader(ctxCL);
}
recordWriter.initBlock(blockId);
Mapper<K1, V1, K2, V2>.Context mCtx = new MRContextUtil().createMapContext(conf, taId,
recordReader, recordWriter, null, null, split);
mapper.run(mCtx);
recordReader.close();
recordWriter.sortAndFlushBlock(writer);
++blockId;
} catch (IOException e) {
throw new HyracksDataException(e);
} catch (InterruptedException e) {
throw new HyracksDataException(e);
}
}
} catch (Throwable th) {
writer.fail();
throw th;
} finally {
writer.close();
}
}
};
}