in plugins/input/docker/logmeta/metric_container_info.go [364:508]
func (idf *InputDockerFile) Collect(collector pipeline.Collector) error {
newUpdateTime := helper.GetContainersLastUpdateTime()
if idf.lastUpdateTime != 0 {
// Nothing update, just skip.
if idf.lastUpdateTime >= newUpdateTime {
return nil
}
}
var allCmd *DockerFileUpdateCmdAll
allCmd = nil
// if cache is empty, use update all cmd
if len(idf.lastContainerInfoCache) == 0 {
allCmd = new(DockerFileUpdateCmdAll)
}
newCount, delCount, addResultList, deleteResultList := helper.GetContainerByAcceptedInfoV2(
idf.fullList, idf.matchList,
idf.IncludeLabel, idf.ExcludeLabel,
idf.IncludeLabelRegex, idf.ExcludeLabelRegex,
idf.IncludeEnv, idf.ExcludeEnv,
idf.IncludeEnvRegex, idf.ExcludeEnvRegex,
idf.K8sFilter)
idf.lastUpdateTime = newUpdateTime
// record config result
havingPathkeys := make([]string, 0)
nothavingPathkeys := make([]string, 0)
if newCount != 0 || delCount != 0 {
logger.Infof(idf.context.GetRuntimeContext(), "update match list, new: %v, delete: %v", newCount, delCount)
// Can not return here because we should notify empty update to clear
// cache in docker_path_helper.json.
} else {
logger.Debugf(idf.context.GetRuntimeContext(), "update match list, new: %v, delete: %v", newCount, delCount)
}
dockerInfoDetails := idf.matchList
logger.Debug(idf.context.GetRuntimeContext(), "match list length", len(dockerInfoDetails))
idf.avgInstanceMetric.Add(int64(len(dockerInfoDetails)))
for k, info := range dockerInfoDetails {
if len(idf.LogPath) > 0 && info.ContainerInfo.State.Status == helper.ContainerStatusRunning {
// inputFile
idf.updateMapping(info, allCmd)
} else if len(idf.LogPath) == 0 {
// stdout
idf.updateMapping(info, allCmd)
}
// 容器元信息预览使用
if idf.CollectingContainersMeta && len(idf.LogPath) > 0 {
sourcePath, containerPath := info.FindBestMatchedPath(idf.LogPath)
formatSourcePath := formatPath(sourcePath)
formateContainerPath := formatPath(containerPath)
destPath := helper.GetMountedFilePathWithBasePath(idf.MountPath, formatSourcePath) + idf.LogPath[len(formateContainerPath):]
if ok, err := util.PathExists(destPath); err == nil {
if !ok {
nothavingPathkeys = append(nothavingPathkeys, helper.GetShortID(k))
} else {
havingPathkeys = append(havingPathkeys, helper.GetShortID(k))
}
} else {
nothavingPathkeys = append(nothavingPathkeys, helper.GetShortID(k))
}
}
}
if idf.CollectingContainersMeta {
var configResult *helper.ContainerConfigResult
if len(idf.LogPath) == 0 {
keys := make([]string, 0, len(idf.matchList))
for k := range idf.matchList {
if len(k) > 0 {
keys = append(keys, helper.GetShortID(k))
}
}
configResult = &helper.ContainerConfigResult{
DataType: "container_config_result",
Project: idf.context.GetProject(),
Logstore: idf.context.GetLogstore(),
ConfigName: idf.context.GetConfigName(),
PathExistInputContainerIDs: helper.GetStringFromList(keys),
SourceAddress: "stdout",
InputType: "input_container_stdio",
FlusherType: "flusher_sls",
FlusherTargetAddress: fmt.Sprintf("%s/%s", idf.context.GetProject(), idf.context.GetLogstore()),
}
} else {
configResult = &helper.ContainerConfigResult{
DataType: "container_config_result",
Project: idf.context.GetProject(),
Logstore: idf.context.GetLogstore(),
ConfigName: idf.context.GetConfigName(),
SourceAddress: fmt.Sprintf("%s/**/%s", idf.LogPath, idf.FilePattern),
PathExistInputContainerIDs: helper.GetStringFromList(havingPathkeys),
PathNotExistInputContainerIDs: helper.GetStringFromList(nothavingPathkeys),
InputType: "file_log",
InputIsContainerFile: "true",
FlusherType: "flusher_sls",
FlusherTargetAddress: fmt.Sprintf("%s/%s", idf.context.GetProject(), idf.context.GetLogstore()),
}
}
helper.RecordContainerConfigResultMap(configResult)
if newCount != 0 || delCount != 0 || idf.firstStart {
helper.RecordContainerConfigResultIncrement(configResult)
idf.firstStart = false
}
logger.Debugf(idf.context.GetRuntimeContext(), "update match list, addResultList: %v, deleteResultList: %v", addResultList, deleteResultList)
}
for id := range idf.lastContainerInfoCache {
if c, ok := dockerInfoDetails[id]; !ok {
idf.deleteMetric.Add(1)
idf.notifyStop(id)
idf.deleteMapping(id)
} else if c.Status() != helper.ContainerStatusRunning && len(idf.LogPath) > 0 { // input_file时会触发
if idf.forceReleaseStopContainerFile {
idf.deleteMetric.Add(1)
idf.notifyStop(id)
idf.deleteMapping(id)
} else {
idf.notifyStop(id)
}
}
}
if allCmd != nil {
if len(allCmd.AllCmd) == 0 {
// only update empty if empty flag is true
if idf.updateEmptyFlag {
idf.updateAll(allCmd)
idf.updateEmptyFlag = false
}
} else {
idf.updateAll(allCmd)
idf.updateEmptyFlag = true
}
}
if time.Since(idf.lastClearTime) > time.Hour {
idf.lastContainerInfoCache = make(map[string]ContainerInfoCache)
idf.lastClearTime = time.Now()
}
return nil
}