in x-pack/filebeat/input/entityanalytics/provider/azuread/azure.go [275:440]
func (p *azure) doFetch(ctx context.Context, state *stateStore, fullSync bool) (updatedUsers, updatedDevices collections.UUIDSet, err error) {
var usersDeltaLink, devicesDeltaLink, groupsDeltaLink string
// Get user changes.
if !fullSync {
usersDeltaLink = state.usersLink
devicesDeltaLink = state.devicesLink
groupsDeltaLink = state.groupsLink
}
var (
wantUsers = p.conf.wantUsers()
changedUsers []*fetcher.User
userLink string
)
if wantUsers {
changedUsers, userLink, err = p.fetcher.Users(ctx, usersDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d users from API", len(changedUsers))
} else {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.conf.Dataset)
}
var (
wantDevices = p.conf.wantDevices()
changedDevices []*fetcher.Device
deviceLink string
)
if wantDevices {
changedDevices, deviceLink, err = p.fetcher.Devices(ctx, devicesDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d devices from API", len(changedDevices))
} else {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.conf.Dataset)
}
// Get group changes. Groups are required for both users and devices.
// So always collect these.
changedGroups, groupLink, err := p.fetcher.Groups(ctx, groupsDeltaLink)
if err != nil {
return updatedUsers, updatedDevices, err
}
p.logger.Debugf("Received %d groups from API", len(changedGroups))
state.usersLink = userLink
state.devicesLink = deviceLink
state.groupsLink = groupLink
for _, v := range changedUsers {
updatedUsers.Add(v.ID)
state.storeUser(v)
}
for _, v := range changedDevices {
updatedDevices.Add(v.ID)
state.storeDevice(v)
}
for _, v := range changedGroups {
state.storeGroup(v)
}
// Populate group relationships tree.
for _, g := range changedGroups {
if g.Deleted {
for _, u := range state.users {
if u.TransitiveMemberOf.Contains(g.ID) {
updatedUsers.Add(u.ID)
}
}
state.relationships.RemoveVertex(g.ID)
continue
}
for _, member := range g.Members {
switch member.Type {
case fetcher.MemberGroup:
if !wantUsers {
break
}
for _, u := range state.users {
if u.TransitiveMemberOf.Contains(member.ID) {
updatedUsers.Add(u.ID)
}
}
if member.Deleted {
state.relationships.RemoveEdge(member.ID, g.ID)
} else {
state.relationships.AddEdge(member.ID, g.ID)
}
case fetcher.MemberUser:
if !wantUsers {
break
}
if u, ok := state.users[member.ID]; ok {
updatedUsers.Add(u.ID)
if member.Deleted {
u.MemberOf.Remove(g.ID)
} else {
u.MemberOf.Add(g.ID)
}
}
case fetcher.MemberDevice:
if !wantDevices {
break
}
if d, ok := state.devices[member.ID]; ok {
updatedDevices.Add(d.ID)
if member.Deleted {
d.MemberOf.Remove(g.ID)
} else {
d.MemberOf.Add(g.ID)
}
}
}
}
}
// Expand user group memberships.
if wantUsers {
updatedUsers.ForEach(func(userID uuid.UUID) {
u, ok := state.users[userID]
if !ok {
p.logger.Errorf("Unable to find user %q in state", userID)
return
}
u.Modified = true
if u.Deleted {
p.logger.Debugw("not expanding membership for deleted user", "user", userID)
return
}
u.TransitiveMemberOf = u.MemberOf
state.relationships.ExpandFromSet(u.MemberOf).ForEach(func(elem uuid.UUID) {
u.TransitiveMemberOf.Add(elem)
})
})
}
// Expand device group memberships.
if wantDevices {
updatedDevices.ForEach(func(devID uuid.UUID) {
d, ok := state.devices[devID]
if !ok {
p.logger.Errorf("Unable to find device %q in state", devID)
return
}
d.Modified = true
if d.Deleted {
p.logger.Debugw("not expanding membership for deleted device", "device", devID)
return
}
d.TransitiveMemberOf = d.MemberOf
state.relationships.ExpandFromSet(d.MemberOf).ForEach(func(elem uuid.UUID) {
d.TransitiveMemberOf.Add(elem)
})
})
}
return updatedUsers, updatedDevices, nil
}