func()

in x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go [275:440]


func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (updatedUsers, updatedDevices collections.UUIDSet, err error) {
	var usersDeltaLink, devicesDeltaLink, groupsDeltaLink string

	// Get user changes.
	if !fullSync {
		usersDeltaLink = state.usersLink
		devicesDeltaLink = state.devicesLink
		groupsDeltaLink = state.groupsLink
	}

	var (
		wantUsers    = p.conf.wantUsers()
		changedUsers []*fetcher.User
		userLink     string
	)
	if wantUsers {
		changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink)
		if err != nil {
			return updatedUsers, updatedDevices, err
		}
		p.logger.Debugf("Received %d users from API", len(changedUsers))
	} else {
		p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset)
	}

	var (
		wantDevices    = p.conf.wantDevices()
		changedDevices []*fetcher.Device
		deviceLink     string
	)
	if wantDevices {
		changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink)
		if err != nil {
			return updatedUsers, updatedDevices, err
		}
		p.logger.Debugf("Received %d devices from API", len(changedDevices))
	} else {
		p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset)
	}

	// Get group changes. Groups are required for both users and devices.
	// So always collect these.
	changedGroups, groupLink, err := p.fetcher.Groups(ctx, groupsDeltaLink)
	if err != nil {
		return updatedUsers, updatedDevices, err
	}
	p.logger.Debugf("Received %d groups from API", len(changedGroups))

	state.usersLink = userLink
	state.devicesLink = deviceLink
	state.groupsLink = groupLink

	for _, v := range changedUsers {
		updatedUsers.Add(v.ID)
		state.storeUser(v)
	}
	for _, v := range changedDevices {
		updatedDevices.Add(v.ID)
		state.storeDevice(v)
	}
	for _, v := range changedGroups {
		state.storeGroup(v)
	}

	// Populate group relationships tree.
	for _, g := range changedGroups {
		if g.Deleted {
			for _, u := range state.users {
				if u.TransitiveMemberOf.Contains(g.ID) {
					updatedUsers.Add(u.ID)
				}
			}
			state.relationships.RemoveVertex(g.ID)
			continue
		}

		for _, member := range g.Members {
			switch member.Type {
			case fetcher.MemberGroup:
				if !wantUsers {
					break
				}
				for _, u := range state.users {
					if u.TransitiveMemberOf.Contains(member.ID) {
						updatedUsers.Add(u.ID)
					}
				}
				if member.Deleted {
					state.relationships.RemoveEdge(member.ID, g.ID)
				} else {
					state.relationships.AddEdge(member.ID, g.ID)
				}

			case fetcher.MemberUser:
				if !wantUsers {
					break
				}
				if u, ok := state.users[member.ID]; ok {
					updatedUsers.Add(u.ID)
					if member.Deleted {
						u.MemberOf.Remove(g.ID)
					} else {
						u.MemberOf.Add(g.ID)
					}
				}

			case fetcher.MemberDevice:
				if !wantDevices {
					break
				}
				if d, ok := state.devices[member.ID]; ok {
					updatedDevices.Add(d.ID)
					if member.Deleted {
						d.MemberOf.Remove(g.ID)
					} else {
						d.MemberOf.Add(g.ID)
					}
				}
			}
		}
	}

	// Expand user group memberships.
	if wantUsers {
		updatedUsers.ForEach(func(userID uuid.UUID) {
			u, ok := state.users[userID]
			if !ok {
				p.logger.Errorf("Unable to find user %q in state", userID)
				return
			}
			u.Modified = true
			if u.Deleted {
				p.logger.Debugw("not expanding membership for deleted user", "user", userID)
				return
			}

			u.TransitiveMemberOf = u.MemberOf
			state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
				u.TransitiveMemberOf.Add(elem)
			})
		})
	}

	// Expand device group memberships.
	if wantDevices {
		updatedDevices.ForEach(func(devID uuid.UUID) {
			d, ok := state.devices[devID]
			if !ok {
				p.logger.Errorf("Unable to find device %q in state", devID)
				return
			}
			d.Modified = true
			if d.Deleted {
				p.logger.Debugw("not expanding membership for deleted device", "device", devID)
				return
			}

			d.TransitiveMemberOf = d.MemberOf
			state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
				d.TransitiveMemberOf.Add(elem)
			})
		})
	}

	return updatedUsers, updatedDevices, nil
}