NormalDocumentPtr SingleDocumentParser::Parse()

in aios/storage/indexlib/indexlib/document/document_parser/normal_parser/single_document_parser.cpp [81:173]


NormalDocumentPtr SingleDocumentParser::Parse(const IndexlibExtendDocumentPtr& document)
{
    const RawDocumentPtr& rawDoc = document->GetRawDocument();
    if (!rawDoc) {
        IE_LOG(ERROR, "empty raw document!");
        return NormalDocumentPtr();
    }
    if (rawDoc->getDocOperateType() == SKIP_DOC || rawDoc->getDocOperateType() == CHECKPOINT_DOC ||
        rawDoc->getDocOperateType() == UNKNOWN_OP) {
        return CreateDocument(document);
    }

    if (mNullFieldAppender) {
        mNullFieldAppender->Append(rawDoc);
    }

    regionid_t regionId = document->getRegionId();
    const FieldSchemaPtr& fieldSchema = mSchema->GetFieldSchema(regionId);
    const IndexSchemaPtr& indexSchema = mSchema->GetIndexSchema(regionId);
    const AttributeSchemaPtr& attrSchema = mSchema->GetAttributeSchema(regionId);
    const SummarySchemaPtr& summarySchema = mSchema->GetSummarySchema(regionId);

    assert(indexSchema);
    SetPrimaryKeyField(document, indexSchema, regionId);
    if (document->getTokenizeDocument()->getFieldCount() == 0) {
        IE_LOG(DEBUG, "tokenizeDoc is empty while exists index schema");
    }

    DocOperateType op = rawDoc->getDocOperateType();
    string orderPreservingField;
    const RegionSchemaPtr& regionSchema = mSchema->GetRegionSchema(regionId);
    if (regionSchema) {
        orderPreservingField = regionSchema->GetOrderPreservingFieldName();
    }
    for (FieldSchema::Iterator it = fieldSchema->Begin(); it != fieldSchema->End(); ++it) {
        if ((*it)->IsDeleted()) {
            continue;
        }

        if ((op == DELETE_DOC || op == DELETE_SUB_DOC) && (*it)->GetFieldName() != orderPreservingField) {
            continue;
        }

        fieldid_t fieldId = (*it)->GetFieldId();

        if (indexSchema->IsInIndex(fieldId)) {
            mFieldConvertPtr->convertIndexField(document, *it);
        }

        // TODO: indexlib need to fix this
        // use full schema to tell if its updatable(set to attr doc)
        if (attrSchema && attrSchema->IsInAttribute(fieldId)) {
            if (rawDoc->getDocOperateType() == UPDATE_FIELD) {
                const AttributeConfigPtr& attributeConfig = attrSchema->GetAttributeConfigByFieldId(fieldId);
                if (attributeConfig->IsAttributeUpdatable() && rawDoc->exist(attributeConfig->GetAttrName()) &&
                    indexSchema->GetPrimaryKeyIndexFieldId() != fieldId) {
                    mFieldConvertPtr->convertAttributeField(document, *it);
                }
            } else {
                mFieldConvertPtr->convertAttributeField(document, *it);
            }
        } else if (summarySchema && summarySchema->IsInSummary(fieldId)) {
            if (rawDoc->getDocOperateType() != UPDATE_FIELD) {
                mFieldConvertPtr->convertSummaryField(document, *it);
            }
        }
    }

    const ClassifiedDocumentPtr& classifiedDocument = document->getClassifiedDocument();
    const AttributeDocumentPtr& attrDoc = classifiedDocument->getAttributeDoc();
    if (attrDoc && attrDoc->HasFormatError() && mAttributeConvertErrorCounter) {
        mAttributeConvertErrorCounter->Increase(1);
    }
    if (mSchema->NeedStoreSummary() && op != DELETE_DOC && op != DELETE_SUB_DOC) {
        SummaryFormatter formatter(summarySchema);
        classifiedDocument->serializeSummaryDocument(formatter);
    }
    if (mSectionAttrAppender && rawDoc->getDocOperateType() == ADD_DOC) {
        SectionAttributeAppenderPtr appender(mSectionAttrAppender->Clone());
        appender->AppendSectionAttribute(classifiedDocument->getIndexDocument());
    }
    if (mPackAttrAppender && rawDoc->getDocOperateType() == ADD_DOC) {
        if (!mPackAttrAppender->AppendPackAttribute(classifiedDocument->getAttributeDoc(),
                                                    classifiedDocument->getPool(), regionId)) {
            IE_RAW_DOC_TRACE(rawDoc, "parse error: append packAttribute failed.");
        }
    }

    if (!Validate(document)) {
        return NormalDocumentPtr();
    }
    return CreateDocument(document);
}