hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopNewPartitionerTuplePartitionComputerFactory.java [40:64]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
        super(klass);
        this.keyIO = keyIO;
        this.valueIO = valueIO;
    }

    @Override
    public ITuplePartitionComputer createPartitioner() {
        return new ITuplePartitionComputer() {
            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
            private final DataInputStream dis = new DataInputStream(bbis);

            @Override
            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                        + accessor.getFieldStartOffset(tIndex, 0);
                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
                K key = keyIO.deserialize(dis);
                int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                        + accessor.getFieldStartOffset(tIndex, 1);
                bbis.setByteBuffer(accessor.getBuffer(), valueStart);
                V value = valueIO.deserialize(dis);
                return instance.getPartition(key, value, nParts);
            }
        };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



hyracks/hyracks-dataflow-hadoop/src/main/java/org/apache/hyracks/dataflow/hadoop/data/HadoopPartitionerTuplePartitionComputerFactory.java [40:64]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
            ISerializerDeserializer<K> keyIO, ISerializerDeserializer<V> valueIO) {
        super(klass);
        this.keyIO = keyIO;
        this.valueIO = valueIO;
    }

    @Override
    public ITuplePartitionComputer createPartitioner() {
        return new ITuplePartitionComputer() {
            private final ByteBufferInputStream bbis = new ByteBufferInputStream();
            private final DataInputStream dis = new DataInputStream(bbis);

            @Override
            public int partition(IFrameTupleAccessor accessor, int tIndex, int nParts) throws HyracksDataException {
                int keyStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                        + accessor.getFieldStartOffset(tIndex, 0);
                bbis.setByteBuffer(accessor.getBuffer(), keyStart);
                K key = keyIO.deserialize(dis);
                int valueStart = accessor.getTupleStartOffset(tIndex) + accessor.getFieldSlotsLength()
                        + accessor.getFieldStartOffset(tIndex, 1);
                bbis.setByteBuffer(accessor.getBuffer(), valueStart);
                V value = valueIO.deserialize(dis);
                return instance.getPartition(key, value, nParts);
            }
        };
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



