in filebeat/input/filestream/prospector.go [85:277]
func (p *fileProspector) Init(
prospectorStore,
globalStore loginp.StoreUpdater,
newID func(loginp.Source) string,
) error {
files := p.filewatcher.GetFiles()
// If this fileProspector belongs to an input that did not have an ID
// this will find its files in the registry and update them to use the
// new ID.
globalStore.UpdateIdentifiers(func(v loginp.Value) (id string, val interface{}) {
var fm fileMeta
err := v.UnpackCursorMeta(&fm)
if err != nil {
return "", nil
}
fd, ok := files[fm.Source]
if !ok {
return "", fm
}
newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}))
return newKey, fm
})
if p.cleanRemoved {
prospectorStore.CleanIf(func(v loginp.Value) bool {
var fm fileMeta
err := v.UnpackCursorMeta(&fm)
if err != nil {
// remove faulty entries
return true
}
_, ok := files[fm.Source]
return !ok
})
}
identifierName := p.identifier.Name()
// If the file identity has changed to fingerprint, update the registry
// keys so we can keep the state. This is only supported from file
// identities that do not require configuration:
// - native (inode + device ID)
// - path
if identifierName != fingerprintName {
p.logger.Debugf("file identity is '%s', will not migrate registry", identifierName)
} else {
p.logger.Debug("trying to migrate file identity to fingerprint")
prospectorStore.UpdateIdentifiers(func(v loginp.Value) (string, interface{}) {
var fm fileMeta
err := v.UnpackCursorMeta(&fm)
if err != nil {
return "", nil
}
fd, ok := files[fm.Source]
if !ok {
return "", fm
}
// Return early (do nothing) if:
// - The identifiers are the same
// - The old identifier is neither native nor path
oldIdentifierName := fm.IdentifierName
if oldIdentifierName == identifierName ||
!(oldIdentifierName == nativeName || oldIdentifierName == pathName) {
return "", nil
}
// Our current file (source) is in the registry, now we need to ensure
// this registry entry (resource) actually refers to our file. Sources
// are identified by path. However, as log files rotate the same path
// can point to a different file.
//
// So, to ensure we're dealing with the resource from our current file,
// we use the old identifier to generate a registry key for the current
// file we're trying to migrate, if this key matches with the key in the
// registry, then we proceed to update the registry.
registryKey := v.Key()
oldIdentifier, ok := identifiersMap[oldIdentifierName]
if !ok {
// This should never happen, but we properly handle it just in case.
// If we cannot find the identifier, move on to the next entry
// some identifiers cannot be migrated
p.logger.Errorf(
"old file identity '%s' not found while migrating entry to "+
"new file identity '%s'. If the file still exists, it will be re-ingested",
oldIdentifierName,
identifierName,
)
return "", nil
}
previousIdentifierKey := newID(oldIdentifier.GetSource(
loginp.FSEvent{
NewPath: fm.Source,
Descriptor: fd,
}))
// If the registry key and the key generated by the old identifier
// do not match, log it at debug level and do nothing.
if previousIdentifierKey != registryKey {
return "", fm
}
// The resource matches the file we found in the file system, generate
// a new registry key and return it alongside the updated meta.
newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}))
fm.IdentifierName = identifierName
p.logger.Infof("registry key: '%s' and previous file identity key: '%s', are the same, migrating. Source: '%s'",
registryKey, previousIdentifierKey, fm.Source)
return newKey, fm
})
}
// Last, but not least, take over states if needed/enabled.
if !p.takeOver.Enabled {
return nil
}
// Take over states from other Filestream inputs or the log input
prospectorStore.TakeOver(func(v loginp.Value) (string, interface{}) {
var fm fileMeta
err := v.UnpackCursorMeta(&fm)
if err != nil {
return "", nil
}
fd, ok := files[fm.Source]
if !ok {
return "", fm
}
// Return early (do nothing) if:
// - The old identifier is neither native, path or fingerprint
oldIdentifierName := fm.IdentifierName
if oldIdentifierName != nativeName &&
oldIdentifierName != pathName &&
oldIdentifierName != fingerprintName {
return "", nil
}
// Our current file (source) is in the registry, now we need to ensure
// this registry entry (resource) actually refers to our file. Sources
// are identified by path, however as log files rotate the same path
// can point to different files.
//
// So to ensure we're dealing with the resource from our current file,
// we use the old identifier to generate a registry key for the current
// file we're trying to migrate, if this key matches with the key in the
// registry, then we proceed to update the registry.
oldIdentifier, ok := identifiersMap[oldIdentifierName]
if !ok {
// This should never happen, but just in case we properly handle it.
// If we cannot find the identifier, move on to the next entry
// some identifiers cannot be migrated
p.logger.Errorf(
"old file identity '%s' not found while taking over old states, "+
"new file identity '%s'. If the file still exists, it will be re-ingested",
oldIdentifierName,
identifierName,
)
return "", nil
}
fsEvent := loginp.FSEvent{
NewPath: fm.Source,
Descriptor: fd,
}
split := strings.Split(v.Key(), "::")
if len(split) != 4 {
// This should never happen.
p.logger.Errorf("registry key '%s' is in the wrong format, cannot migrate state", v.Key())
return "", fm
}
idFromRegistry := strings.Join(split[2:], "::")
idFromPreviousIdentity := oldIdentifier.GetSource(fsEvent).Name()
if idFromPreviousIdentity != idFromRegistry {
return "", fm
}
newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}))
fm.IdentifierName = identifierName
p.logger.Infof("Taking over state: '%s' -> '%s'", v.Key(), newKey)
return newKey, fm
})
return nil
}