public void unionFields()

in flink-core/src/main/java/org/apache/flink/types/Record.java [559:748]


    public void unionFields(Record other) {
        final int minFields = Math.min(this.numFields, other.numFields);
        final int maxFields = Math.max(this.numFields, other.numFields);

        final int[] offsets = this.offsets.length >= maxFields ? this.offsets : new int[maxFields];
        final int[] lengths = this.lengths.length >= maxFields ? this.lengths : new int[maxFields];

        if (!(this.isModified() || other.isModified())) {
            // handle the special (but common) case where both records have a valid binary
            // representation differently
            // allocate space for the switchBuffer first
            final int estimatedLength = this.binaryLen + other.binaryLen;
            this.serializer.memory =
                    (this.switchBuffer != null && this.switchBuffer.length >= estimatedLength)
                            ? this.switchBuffer
                            : new byte[estimatedLength];
            this.serializer.position = 0;

            try {
                // common loop for both records
                for (int i = 0; i < minFields; i++) {
                    final int thisOff = this.offsets[i];
                    if (thisOff == NULL_INDICATOR_OFFSET) {
                        final int otherOff = other.offsets[i];
                        if (otherOff == NULL_INDICATOR_OFFSET) {
                            offsets[i] = NULL_INDICATOR_OFFSET;
                        } else {
                            // take field from other record
                            offsets[i] = this.serializer.position;
                            this.serializer.write(other.binaryData, otherOff, other.lengths[i]);
                            lengths[i] = other.lengths[i];
                        }
                    } else {
                        // copy field from this one
                        offsets[i] = this.serializer.position;
                        this.serializer.write(this.binaryData, thisOff, this.lengths[i]);
                        lengths[i] = this.lengths[i];
                    }
                }

                // add the trailing fields from one record
                if (minFields != maxFields) {
                    final Record sourceForRemainder = this.numFields > minFields ? this : other;
                    int begin = -1;
                    int end = -1;
                    int offsetDelta = 0;

                    // go through the offsets, find the non-null fields to account for the remaining
                    // data
                    for (int k = minFields; k < maxFields; k++) {
                        final int off = sourceForRemainder.offsets[k];
                        if (off == NULL_INDICATOR_OFFSET) {
                            offsets[k] = NULL_INDICATOR_OFFSET;
                        } else {
                            end = sourceForRemainder.offsets[k] + sourceForRemainder.lengths[k];
                            if (begin == -1) {
                                // first non null column in the remainder
                                begin = sourceForRemainder.offsets[k];
                                offsetDelta = this.serializer.position - begin;
                            }
                            offsets[k] = sourceForRemainder.offsets[k] + offsetDelta;
                        }
                    }

                    // copy the remaining fields directly as binary
                    if (begin != -1) {
                        this.serializer.write(sourceForRemainder.binaryData, begin, end - begin);
                    }

                    // the lengths can be copied directly
                    if (lengths != sourceForRemainder.lengths) {
                        System.arraycopy(
                                sourceForRemainder.lengths,
                                minFields,
                                lengths,
                                minFields,
                                maxFields - minFields);
                    }
                }
            } catch (Exception ioex) {
                throw new RuntimeException(
                        "Error creating field union of record data" + ioex.getMessage() == null
                                ? "."
                                : ": " + ioex.getMessage(),
                        ioex);
            }
        } else {
            // the general case, where at least one of the two records has a binary representation
            // that is not in sync.
            final int estimatedLength =
                    (this.binaryLen > 0
                                    ? this.binaryLen
                                    : this.numFields * DEFAULT_FIELD_LEN_ESTIMATE)
                            + (other.binaryLen > 0
                                    ? other.binaryLen
                                    : other.numFields * DEFAULT_FIELD_LEN_ESTIMATE);
            this.serializer.memory =
                    (this.switchBuffer != null && this.switchBuffer.length >= estimatedLength)
                            ? this.switchBuffer
                            : new byte[estimatedLength];
            this.serializer.position = 0;

            try {
                // common loop for both records
                for (int i = 0; i < minFields; i++) {
                    final int thisOff = this.offsets[i];
                    if (thisOff == NULL_INDICATOR_OFFSET) {
                        final int otherOff = other.offsets[i];
                        if (otherOff == NULL_INDICATOR_OFFSET) {
                            offsets[i] = NULL_INDICATOR_OFFSET;
                        } else if (otherOff == MODIFIED_INDICATOR_OFFSET) {
                            // serialize modified field from other record
                            offsets[i] = this.serializer.position;
                            other.writeFields[i].write(this.serializer);
                            lengths[i] = this.serializer.position - offsets[i];
                        } else {
                            // take field from other record binary
                            offsets[i] = this.serializer.position;
                            this.serializer.write(other.binaryData, otherOff, other.lengths[i]);
                            lengths[i] = other.lengths[i];
                        }
                    } else if (thisOff == MODIFIED_INDICATOR_OFFSET) {
                        // serialize modified field from this record
                        offsets[i] = this.serializer.position;
                        this.writeFields[i].write(this.serializer);
                        lengths[i] = this.serializer.position - offsets[i];
                    } else {
                        // copy field from this one
                        offsets[i] = this.serializer.position;
                        this.serializer.write(this.binaryData, thisOff, this.lengths[i]);
                        lengths[i] = this.lengths[i];
                    }
                }

                // add the trailing fields from one record
                if (minFields != maxFields) {
                    final Record sourceForRemainder = this.numFields > minFields ? this : other;

                    // go through the offsets, find the non-null fields
                    for (int k = minFields; k < maxFields; k++) {
                        final int off = sourceForRemainder.offsets[k];
                        if (off == NULL_INDICATOR_OFFSET) {
                            offsets[k] = NULL_INDICATOR_OFFSET;
                        } else if (off == MODIFIED_INDICATOR_OFFSET) {
                            // serialize modified field from the source record
                            offsets[k] = this.serializer.position;
                            sourceForRemainder.writeFields[k].write(this.serializer);
                            lengths[k] = this.serializer.position - offsets[k];
                        } else {
                            // copy field from the source record binary
                            offsets[k] = this.serializer.position;
                            final int len = sourceForRemainder.lengths[k];
                            this.serializer.write(sourceForRemainder.binaryData, off, len);
                            lengths[k] = len;
                        }
                    }
                }
            } catch (Exception ioex) {
                throw new RuntimeException(
                        "Error creating field union of record data" + ioex.getMessage() == null
                                ? "."
                                : ": " + ioex.getMessage(),
                        ioex);
            }
        }

        serializeHeader(this.serializer, offsets, maxFields);

        // set the fields
        this.switchBuffer = this.binaryData;
        this.binaryData = serializer.memory;
        this.binaryLen = serializer.position;

        this.numFields = maxFields;
        this.offsets = offsets;
        this.lengths = lengths;

        this.firstModifiedPos = Integer.MAX_VALUE;

        // make sure that the object arrays reflect the size as well
        if (this.readFields == null || this.readFields.length < maxFields) {
            final Value[] na = new Value[maxFields];
            System.arraycopy(this.readFields, 0, na, 0, this.readFields.length);
            this.readFields = na;
        }
        this.writeFields =
                (this.writeFields == null || this.writeFields.length < maxFields)
                        ? new Value[maxFields]
                        : this.writeFields;
    }