func()

in plugins/input/docker/logmeta/metric_container_info.go [364:508]


func (idf *InputDockerFile) Collect(collector pipeline.Collector) error {
	newUpdateTime := helper.GetContainersLastUpdateTime()
	if idf.lastUpdateTime != 0 {
		// Nothing update, just skip.
		if idf.lastUpdateTime >= newUpdateTime {
			return nil
		}
	}

	var allCmd *DockerFileUpdateCmdAll
	allCmd = nil
	// if cache is empty, use update all cmd
	if len(idf.lastContainerInfoCache) == 0 {
		allCmd = new(DockerFileUpdateCmdAll)
	}
	newCount, delCount, addResultList, deleteResultList := helper.GetContainerByAcceptedInfoV2(

		idf.fullList, idf.matchList,
		idf.IncludeLabel, idf.ExcludeLabel,
		idf.IncludeLabelRegex, idf.ExcludeLabelRegex,
		idf.IncludeEnv, idf.ExcludeEnv,
		idf.IncludeEnvRegex, idf.ExcludeEnvRegex,
		idf.K8sFilter)
	idf.lastUpdateTime = newUpdateTime
	// record config result
	havingPathkeys := make([]string, 0)
	nothavingPathkeys := make([]string, 0)
	if newCount != 0 || delCount != 0 {
		logger.Infof(idf.context.GetRuntimeContext(), "update match list, new: %v, delete: %v", newCount, delCount)
		// Can not return here because we should notify empty update to clear
		//  cache in docker_path_helper.json.
	} else {
		logger.Debugf(idf.context.GetRuntimeContext(), "update match list, new: %v, delete: %v", newCount, delCount)
	}

	dockerInfoDetails := idf.matchList
	logger.Debug(idf.context.GetRuntimeContext(), "match list length", len(dockerInfoDetails))
	idf.avgInstanceMetric.Add(int64(len(dockerInfoDetails)))

	for k, info := range dockerInfoDetails {
		if len(idf.LogPath) > 0 && info.ContainerInfo.State.Status == helper.ContainerStatusRunning {
			// inputFile
			idf.updateMapping(info, allCmd)
		} else if len(idf.LogPath) == 0 {
			// stdout
			idf.updateMapping(info, allCmd)
		}
		// 容器元信息预览使用
		if idf.CollectingContainersMeta && len(idf.LogPath) > 0 {
			sourcePath, containerPath := info.FindBestMatchedPath(idf.LogPath)

			formatSourcePath := formatPath(sourcePath)
			formateContainerPath := formatPath(containerPath)
			destPath := helper.GetMountedFilePathWithBasePath(idf.MountPath, formatSourcePath) + idf.LogPath[len(formateContainerPath):]

			if ok, err := util.PathExists(destPath); err == nil {
				if !ok {
					nothavingPathkeys = append(nothavingPathkeys, helper.GetShortID(k))
				} else {
					havingPathkeys = append(havingPathkeys, helper.GetShortID(k))
				}
			} else {
				nothavingPathkeys = append(nothavingPathkeys, helper.GetShortID(k))
			}
		}
	}
	if idf.CollectingContainersMeta {
		var configResult *helper.ContainerConfigResult
		if len(idf.LogPath) == 0 {
			keys := make([]string, 0, len(idf.matchList))
			for k := range idf.matchList {
				if len(k) > 0 {
					keys = append(keys, helper.GetShortID(k))
				}
			}
			configResult = &helper.ContainerConfigResult{
				DataType:                   "container_config_result",
				Project:                    idf.context.GetProject(),
				Logstore:                   idf.context.GetLogstore(),
				ConfigName:                 idf.context.GetConfigName(),
				PathExistInputContainerIDs: helper.GetStringFromList(keys),
				SourceAddress:              "stdout",
				InputType:                  "input_container_stdio",
				FlusherType:                "flusher_sls",
				FlusherTargetAddress:       fmt.Sprintf("%s/%s", idf.context.GetProject(), idf.context.GetLogstore()),
			}
		} else {
			configResult = &helper.ContainerConfigResult{
				DataType:                      "container_config_result",
				Project:                       idf.context.GetProject(),
				Logstore:                      idf.context.GetLogstore(),
				ConfigName:                    idf.context.GetConfigName(),
				SourceAddress:                 fmt.Sprintf("%s/**/%s", idf.LogPath, idf.FilePattern),
				PathExistInputContainerIDs:    helper.GetStringFromList(havingPathkeys),
				PathNotExistInputContainerIDs: helper.GetStringFromList(nothavingPathkeys),
				InputType:                     "file_log",
				InputIsContainerFile:          "true",
				FlusherType:                   "flusher_sls",
				FlusherTargetAddress:          fmt.Sprintf("%s/%s", idf.context.GetProject(), idf.context.GetLogstore()),
			}
		}
		helper.RecordContainerConfigResultMap(configResult)
		if newCount != 0 || delCount != 0 || idf.firstStart {
			helper.RecordContainerConfigResultIncrement(configResult)
			idf.firstStart = false
		}
		logger.Debugf(idf.context.GetRuntimeContext(), "update match list, addResultList: %v, deleteResultList: %v", addResultList, deleteResultList)
	}

	for id := range idf.lastContainerInfoCache {
		if c, ok := dockerInfoDetails[id]; !ok {
			idf.deleteMetric.Add(1)
			idf.notifyStop(id)
			idf.deleteMapping(id)
		} else if c.Status() != helper.ContainerStatusRunning && len(idf.LogPath) > 0 { // input_file时会触发
			if idf.forceReleaseStopContainerFile {
				idf.deleteMetric.Add(1)
				idf.notifyStop(id)
				idf.deleteMapping(id)
			} else {
				idf.notifyStop(id)
			}
		}
	}
	if allCmd != nil {
		if len(allCmd.AllCmd) == 0 {
			// only update empty if empty flag is true
			if idf.updateEmptyFlag {
				idf.updateAll(allCmd)
				idf.updateEmptyFlag = false
			}
		} else {
			idf.updateAll(allCmd)
			idf.updateEmptyFlag = true
		}

	}

	if time.Since(idf.lastClearTime) > time.Hour {
		idf.lastContainerInfoCache = make(map[string]ContainerInfoCache)
		idf.lastClearTime = time.Now()
	}

	return nil
}