func()

in filebeat/input/filestream/prospector.go [85:277]


func (p *fileProspector) Init(
	prospectorStore,
	globalStore loginp.StoreUpdater,
	newID func(loginp.Source) string,
) error {
	files := p.filewatcher.GetFiles()

	// If this fileProspector belongs to an input that did not have an ID
	// this will find its files in the registry and update them to use the
	// new ID.
	globalStore.UpdateIdentifiers(func(v loginp.Value) (id string, val interface{}) {
		var fm fileMeta
		err := v.UnpackCursorMeta(&fm)
		if err != nil {
			return "", nil
		}

		fd, ok := files[fm.Source]
		if !ok {
			return "", fm
		}

		newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}))
		return newKey, fm
	})

	if p.cleanRemoved {
		prospectorStore.CleanIf(func(v loginp.Value) bool {
			var fm fileMeta
			err := v.UnpackCursorMeta(&fm)
			if err != nil {
				// remove faulty entries
				return true
			}

			_, ok := files[fm.Source]
			return !ok
		})
	}

	identifierName := p.identifier.Name()

	// If the file identity has changed to fingerprint, update the registry
	// keys so we can keep the state. This is only supported from file
	// identities that do not require configuration:
	//  - native (inode + device ID)
	//  - path
	if identifierName != fingerprintName {
		p.logger.Debugf("file identity is '%s', will not migrate registry", identifierName)
	} else {
		p.logger.Debug("trying to migrate file identity to fingerprint")
		prospectorStore.UpdateIdentifiers(func(v loginp.Value) (string, interface{}) {
			var fm fileMeta
			err := v.UnpackCursorMeta(&fm)
			if err != nil {
				return "", nil
			}

			fd, ok := files[fm.Source]
			if !ok {
				return "", fm
			}

			// Return early (do nothing) if:
			//  - The identifiers are the same
			//  - The old identifier is neither native nor path
			oldIdentifierName := fm.IdentifierName
			if oldIdentifierName == identifierName ||
				!(oldIdentifierName == nativeName || oldIdentifierName == pathName) {
				return "", nil
			}

			// Our current file (source) is in the registry, now we need to ensure
			// this registry entry (resource) actually refers to our file. Sources
			// are identified by path. However, as log files rotate the same path
			// can point to a different file.
			//
			// So, to ensure we're dealing with the resource from our current file,
			// we use the old identifier to generate a registry key for the current
			// file we're trying to migrate, if this key matches with the key in the
			// registry, then we proceed to update the registry.
			registryKey := v.Key()
			oldIdentifier, ok := identifiersMap[oldIdentifierName]
			if !ok {
				// This should never happen, but we properly handle it just in case.
				// If we cannot find the identifier, move on to the next entry
				// some identifiers cannot be migrated
				p.logger.Errorf(
					"old file identity '%s' not found while migrating entry to "+
						"new file identity '%s'. If the file still exists, it will be re-ingested",
					oldIdentifierName,
					identifierName,
				)
				return "", nil
			}
			previousIdentifierKey := newID(oldIdentifier.GetSource(
				loginp.FSEvent{
					NewPath:    fm.Source,
					Descriptor: fd,
				}))

			// If the registry key and the key generated by the old identifier
			// do not match, log it at debug level and do nothing.
			if previousIdentifierKey != registryKey {
				return "", fm
			}

			// The resource matches the file we found in the file system, generate
			// a new registry key and return it alongside the updated meta.
			newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}))
			fm.IdentifierName = identifierName
			p.logger.Infof("registry key: '%s' and previous file identity key: '%s', are the same, migrating. Source: '%s'",
				registryKey, previousIdentifierKey, fm.Source)

			return newKey, fm
		})
	}

	// Last, but not least, take over states if needed/enabled.
	if !p.takeOver.Enabled {
		return nil
	}

	// Take over states from other Filestream inputs or the log input
	prospectorStore.TakeOver(func(v loginp.Value) (string, interface{}) {
		var fm fileMeta
		err := v.UnpackCursorMeta(&fm)
		if err != nil {
			return "", nil
		}

		fd, ok := files[fm.Source]
		if !ok {
			return "", fm
		}

		// Return early (do nothing) if:
		//  - The old identifier is neither native, path or fingerprint
		oldIdentifierName := fm.IdentifierName
		if oldIdentifierName != nativeName &&
			oldIdentifierName != pathName &&
			oldIdentifierName != fingerprintName {
			return "", nil
		}

		// Our current file (source) is in the registry, now we need to ensure
		// this registry entry (resource) actually refers to our file. Sources
		// are identified by path, however as log files rotate the same path
		// can point to different files.
		//
		// So to ensure we're dealing with the resource from our current file,
		// we use the old identifier to generate a registry key for the current
		// file we're trying to migrate, if this key matches with the key in the
		// registry, then we proceed to update the registry.
		oldIdentifier, ok := identifiersMap[oldIdentifierName]
		if !ok {
			// This should never happen, but just in case we properly handle it.
			// If we cannot find the identifier, move on to the next entry
			// some identifiers cannot be migrated
			p.logger.Errorf(
				"old file identity '%s' not found while taking over old states, "+
					"new file identity '%s'. If the file still exists, it will be re-ingested",
				oldIdentifierName,
				identifierName,
			)
			return "", nil
		}

		fsEvent := loginp.FSEvent{
			NewPath:    fm.Source,
			Descriptor: fd,
		}
		split := strings.Split(v.Key(), "::")
		if len(split) != 4 {
			// This should never happen.
			p.logger.Errorf("registry key '%s' is in the wrong format, cannot migrate state", v.Key())
			return "", fm
		}

		idFromRegistry := strings.Join(split[2:], "::")
		idFromPreviousIdentity := oldIdentifier.GetSource(fsEvent).Name()
		if idFromPreviousIdentity != idFromRegistry {
			return "", fm
		}

		newKey := newID(p.identifier.GetSource(loginp.FSEvent{NewPath: fm.Source, Descriptor: fd}))
		fm.IdentifierName = identifierName
		p.logger.Infof("Taking over state: '%s' -> '%s'", v.Key(), newKey)
		return newKey, fm
	})

	return nil
}