in manifest.go [585:641]
func NewManifestReader(file ManifestFile, in io.Reader) (*ManifestReader, error) {
dec, err := ocf.NewDecoder(in, ocf.WithDecoderSchemaCache(&avro.SchemaCache{}))
if err != nil {
return nil, err
}
metadata := dec.Metadata()
sc := dec.Schema()
formatVersion, err := strconv.Atoi(string(metadata["format-version"]))
if err != nil {
return nil, fmt.Errorf("manifest file's 'format-version' metadata is invalid: %w", err)
}
if formatVersion != file.Version() {
return nil, fmt.Errorf("manifest file's 'format-version' metadata indicates version %d, but entry from manifest list indicates version %d",
formatVersion, file.Version())
}
var content ManifestContent
switch contentStr := string(metadata["content"]); contentStr {
case "data":
content = ManifestContentData
case "deletes":
content = ManifestContentDeletes
default:
return nil, fmt.Errorf("manifest file's 'content' metadata is invalid, should be \"data\" or \"deletes\" but instead is %q",
contentStr)
}
if content != file.ManifestContent() {
return nil, fmt.Errorf("manifest file's 'content' metadata indicates %q, but entry from manifest list indicates %q",
content.String(), file.ManifestContent().String())
}
isFallback := false
if formatVersion == 1 {
for _, f := range sc.(*avro.RecordSchema).Fields() {
if f.Name() == "snapshot_id" {
if f.Type().Type() != avro.Union {
isFallback = true
}
break
}
}
}
fieldNameToID, fieldIDToType := getFieldIDMap(sc)
return &ManifestReader{
dec: dec,
file: file,
formatVersion: formatVersion,
isFallback: isFallback,
content: content,
fieldNameToID: fieldNameToID,
fieldIDToType: fieldIDToType,
}, nil
}