func()

in x-pack/filebeat/input/httpjson/split.go [105:225]


func (s *split) split(ctx context.Context, trCtx *transformContext, root mapstr.M, h handler) error {

	v, err := root.GetValue(s.targetInfo.Name)
	if err != nil && err != mapstr.ErrKeyNotFound { //nolint:errorlint // mapstr.ErrKeyNotFound is never wrapped by GetValue.
		return err
	}

	if v == nil {
		if s.ignoreEmptyValue {
			if s.child != nil {
				return s.child.split(ctx, trCtx, root, h)
			}
			if s.keepParent {
				h.handleEvent(ctx, root)
			}
			return nil
		}
		if s.isRoot {
			if s.keepParent {
				h.handleEvent(ctx, root)
				return errEmptyField
			}
			return errEmptyRootField
		}
		h.handleEvent(ctx, root)
		return errEmptyField
	}

	switch s.kind {
	case "", splitTypeArr:
		varr, ok := v.([]interface{})
		if !ok {
			return errExpectedSplitArr
		}

		if len(varr) == 0 {
			if s.ignoreEmptyValue {
				if s.child != nil {
					return s.child.split(ctx, trCtx, root, h)
				}
				if s.keepParent {
					h.handleEvent(ctx, root)
				}
				return nil
			}
			if s.isRoot {
				h.handleEvent(ctx, root)
				return errEmptyRootField
			}
			h.handleEvent(ctx, root)
			return errEmptyField
		}

		for _, e := range varr {
			err := s.processMessage(ctx, trCtx, root, s.targetInfo.Name, e, h)
			if err != nil {
				s.log.Debug(err)
			}
		}

		return nil
	case splitTypeMap:
		vmap, ok := toMapStr(v, s.targetInfo.Name)
		if !ok {
			return errExpectedSplitObj
		}

		if len(vmap) == 0 {
			if s.ignoreEmptyValue {
				if s.child != nil {
					return s.child.split(ctx, trCtx, root, h)
				}
				if s.keepParent {
					h.handleEvent(ctx, root)
				}
				return nil
			}
			if s.isRoot {
				return errEmptyRootField
			}
			h.handleEvent(ctx, root)
			return errEmptyField
		}

		for k, e := range vmap {
			if err := s.processMessage(ctx, trCtx, root, k, e, h); err != nil {
				s.log.Debug(err)
			}
		}

		return nil
	case splitTypeString:
		vstr, ok := v.(string)
		if !ok {
			return errExpectedSplitString
		}

		if len(vstr) == 0 {
			if s.ignoreEmptyValue {
				if s.child != nil {
					return s.child.split(ctx, trCtx, root, h)
				}
				return nil
			}
			if s.isRoot {
				return errEmptyRootField
			}
			h.handleEvent(ctx, root)
			return errEmptyField
		}
		for _, substr := range strings.Split(vstr, s.delimiter) {
			if err := s.processMessageSplitString(ctx, trCtx, root, substr, h); err != nil {
				s.log.Debug(err)
			}
		}

		return nil
	}

	return errUnknownSplitType
}