x-pack/filebeat/input/entityanalytics/provider/okta/okta.go (585 lines of code) (raw):
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
// or more contributor license agreements. Licensed under the Elastic License;
// you may not use this file except in compliance with the Elastic License.
// Package okta provides a user identity asset provider for Okta.
package okta
import (
"context"
"errors"
"fmt"
"io"
"io/fs"
"net/http"
"net/url"
"os"
"path/filepath"
"slices"
"strconv"
"strings"
"time"
"github.com/hashicorp/go-retryablehttp"
"go.elastic.co/ecszap"
"go.uber.org/zap"
"go.uber.org/zap/zapcore"
v2 "github.com/elastic/beats/v7/filebeat/input/v2"
"github.com/elastic/beats/v7/libbeat/beat"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/internal/kvstore"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider"
"github.com/elastic/beats/v7/x-pack/filebeat/input/entityanalytics/provider/okta/internal/okta"
"github.com/elastic/beats/v7/x-pack/filebeat/input/internal/httplog"
"github.com/elastic/elastic-agent-libs/config"
"github.com/elastic/elastic-agent-libs/logp"
"github.com/elastic/elastic-agent-libs/mapstr"
"github.com/elastic/elastic-agent-libs/transport/httpcommon"
"github.com/elastic/go-concert/ctxtool"
)
func init() {
err := provider.Register(Name, New)
if err != nil {
panic(err)
}
}
// Name of this provider.
const Name = "okta"
// FullName of this provider, including the input name. Prefer using this
// value for full context, especially if the input name isn't present in an
// adjacent log field.
const FullName = "entity-analytics-" + Name
// oktaInput implements the provider.Provider interface.
type oktaInput struct {
*kvstore.Manager
cfg conf
client *http.Client
lim *okta.RateLimiter
metrics *inputMetrics
logger *logp.Logger
}
// New creates a new instance of an Okta identity provider.
func New(logger *logp.Logger) (provider.Provider, error) {
p := oktaInput{
cfg: defaultConfig(),
}
p.Manager = &kvstore.Manager{
Logger: logger,
Type: FullName,
Configure: p.configure,
}
return &p, nil
}
// configure configures this provider using the given configuration.
func (p *oktaInput) configure(cfg *config.C) (kvstore.Input, error) {
err := cfg.Unpack(&p.cfg)
if err != nil {
return nil, fmt.Errorf("unable to unpack %s input config: %w", Name, err)
}
return p, nil
}
// Name returns the name of this provider.
func (p *oktaInput) Name() string {
return FullName
}
func (*oktaInput) Test(v2.TestContext) error { return nil }
// Run will start data collection on this provider.
func (p *oktaInput) Run(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
p.logger = inputCtx.Logger.With("provider", Name, "domain", p.cfg.OktaDomain)
p.metrics = newMetrics(inputCtx.ID, nil)
defer p.metrics.Close()
lastSyncTime, _ := getLastSync(store)
syncWaitTime := time.Until(lastSyncTime.Add(p.cfg.SyncInterval))
lastUpdateTime, _ := getLastUpdate(store)
updateWaitTime := time.Until(lastUpdateTime.Add(p.cfg.UpdateInterval))
syncTimer := time.NewTimer(syncWaitTime)
updateTimer := time.NewTimer(updateWaitTime)
// Allow a single fetch operation to obtain limits from the API.
p.lim = okta.NewRateLimiter(p.cfg.LimitWindow, p.cfg.LimitFixed)
if p.cfg.Tracer != nil {
id := sanitizeFileName(inputCtx.IDWithoutName)
p.cfg.Tracer.Filename = strings.ReplaceAll(p.cfg.Tracer.Filename, "*", id)
}
var err error
p.client, err = newClient(ctxtool.FromCanceller(inputCtx.Cancelation), p.cfg, p.logger)
if err != nil {
return err
}
for {
select {
case <-inputCtx.Cancelation.Done():
if !errors.Is(inputCtx.Cancelation.Err(), context.Canceled) {
return inputCtx.Cancelation.Err()
}
return nil
case <-syncTimer.C:
start := time.Now()
if err := p.runFullSync(inputCtx, store, client); err != nil {
p.logger.Errorw("Error running full sync", "error", err)
p.metrics.syncError.Inc()
}
p.metrics.syncTotal.Inc()
p.metrics.syncProcessingTime.Update(time.Since(start).Nanoseconds())
syncTimer.Reset(p.cfg.SyncInterval)
p.logger.Debugf("Next sync expected at: %v", time.Now().Add(p.cfg.SyncInterval))
// Reset the update timer and wait the configured interval. If the
// update timer has already fired, then drain the timer's channel
// before resetting.
if !updateTimer.Stop() {
<-updateTimer.C
}
updateTimer.Reset(p.cfg.UpdateInterval)
p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval))
case <-updateTimer.C:
start := time.Now()
if err := p.runIncrementalUpdate(inputCtx, store, client); err != nil {
p.logger.Errorw("Error running incremental update", "error", err)
p.metrics.updateError.Inc()
}
p.metrics.updateTotal.Inc()
p.metrics.updateProcessingTime.Update(time.Since(start).Nanoseconds())
updateTimer.Reset(p.cfg.UpdateInterval)
p.logger.Debugf("Next update expected at: %v", time.Now().Add(p.cfg.UpdateInterval))
}
}
}
func newClient(ctx context.Context, cfg conf, log *logp.Logger) (*http.Client, error) {
c, err := cfg.Request.Transport.Client(clientOptions(cfg.Request.KeepAlive.settings())...)
if err != nil {
return nil, err
}
c = requestTrace(ctx, c, cfg, log)
c.CheckRedirect = checkRedirect(cfg.Request, log)
client := &retryablehttp.Client{
HTTPClient: c,
Logger: newRetryLog(log),
RetryWaitMin: cfg.Request.Retry.getWaitMin(),
RetryWaitMax: cfg.Request.Retry.getWaitMax(),
RetryMax: cfg.Request.Retry.getMaxAttempts(),
CheckRetry: retryablehttp.DefaultRetryPolicy,
Backoff: retryablehttp.DefaultBackoff,
}
return client.StandardClient(), nil
}
// lumberjackTimestamp is a glob expression matching the time format string used
// by lumberjack when rolling over logs, "2006-01-02T15-04-05.000".
// https://github.com/natefinch/lumberjack/blob/4cb27fcfbb0f35cb48c542c5ea80b7c1d18933d0/lumberjack.go#L39
const lumberjackTimestamp = "[0-9][0-9][0-9][0-9]-[0-9][0-9]-[0-9][0-9]T[0-9][0-9]-[0-9][0-9]-[0-9][0-9].[0-9][0-9][0-9]"
// requestTrace decorates cli with an httplog.LoggingRoundTripper if cfg.Tracer
// is non-nil.
func requestTrace(ctx context.Context, cli *http.Client, cfg conf, log *logp.Logger) *http.Client {
if cfg.Tracer == nil {
return cli
}
if !cfg.Tracer.enabled() {
// We have a trace log name, but we are not enabled,
// so remove all trace logs we own.
err := os.Remove(cfg.Tracer.Filename)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", cfg.Tracer.Filename, "error", err)
}
ext := filepath.Ext(cfg.Tracer.Filename)
base := strings.TrimSuffix(cfg.Tracer.Filename, ext)
paths, err := filepath.Glob(base + "-" + lumberjackTimestamp + ext)
if err != nil {
log.Errorw("failed to collect request trace log path names", "error", err)
}
for _, p := range paths {
err = os.Remove(p)
if err != nil && !errors.Is(err, fs.ErrNotExist) {
log.Errorw("failed to remove request trace log", "path", p, "error", err)
}
}
return cli
}
w := zapcore.AddSync(cfg.Tracer)
go func() {
// Close the logger when we are done.
<-ctx.Done()
cfg.Tracer.Close()
}()
core := ecszap.NewCore(
ecszap.NewDefaultEncoderConfig(),
w,
zap.DebugLevel,
)
traceLogger := zap.New(core)
maxBodyLen := cfg.Tracer.MaxSize * 1e6 / 10 // 10% of file max
cli.Transport = httplog.NewLoggingRoundTripper(cli.Transport, traceLogger, maxBodyLen, log)
return cli
}
// sanitizeFileName returns name with ":" and "/" replaced with "_", removing
// repeated instances. The request.tracer.filename may have ":" when an input
// has cursor config and the macOS Finder will treat this as path-separator and
// causes to show up strange filepaths.
func sanitizeFileName(name string) string {
name = strings.ReplaceAll(name, ":", string(filepath.Separator))
name = filepath.Clean(name)
return strings.ReplaceAll(name, string(filepath.Separator), "_")
}
// clientOption returns constructed client configuration options, including
// setting up http+unix and http+npipe transports if requested.
func clientOptions(keepalive httpcommon.WithKeepaliveSettings) []httpcommon.TransportOption {
return []httpcommon.TransportOption{
httpcommon.WithAPMHTTPInstrumentation(),
keepalive,
}
}
func checkRedirect(cfg *requestConfig, log *logp.Logger) func(*http.Request, []*http.Request) error {
return func(req *http.Request, via []*http.Request) error {
log.Debug("http client: checking redirect")
if len(via) >= cfg.RedirectMaxRedirects {
log.Debug("http client: max redirects exceeded")
return fmt.Errorf("stopped after %d redirects", cfg.RedirectMaxRedirects)
}
if !cfg.RedirectForwardHeaders || len(via) == 0 {
log.Debugf("http client: nothing to do while checking redirects - forward_headers: %v, via: %#v", cfg.RedirectForwardHeaders, via)
return nil
}
prev := via[len(via)-1] // previous request to get headers from
log.Debugf("http client: forwarding headers from previous request: %#v", prev.Header)
req.Header = prev.Header.Clone()
for _, k := range cfg.RedirectHeadersBanList {
log.Debugf("http client: ban header %v", k)
req.Header.Del(k)
}
return nil
}
}
// retryLog is a shim for the retryablehttp.Client.Logger.
type retryLog struct{ log *logp.Logger }
func newRetryLog(log *logp.Logger) *retryLog {
return &retryLog{log: log.Named("retryablehttp").WithOptions(zap.AddCallerSkip(1))}
}
func (l *retryLog) Error(msg string, kv ...interface{}) { l.log.Errorw(msg, kv...) }
func (l *retryLog) Info(msg string, kv ...interface{}) { l.log.Infow(msg, kv...) }
func (l *retryLog) Debug(msg string, kv ...interface{}) { l.log.Debugw(msg, kv...) }
func (l *retryLog) Warn(msg string, kv ...interface{}) { l.log.Warnw(msg, kv...) }
// runFullSync performs a full synchronization. It will fetch user and group
// identities from Azure Active Directory, enrich users with group memberships,
// and publishes all known users (regardless if they have been modified) to the
// given beat.Client.
func (p *oktaInput) runFullSync(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
p.logger.Debugf("Running full sync...")
p.logger.Debugf("Opening new transaction...")
state, err := newStateStore(store)
if err != nil {
return fmt.Errorf("unable to begin transaction: %w", err)
}
p.logger.Debugf("Transaction opened")
defer func() { // If commit is successful, call to this close will be no-op.
closeErr := state.close(false)
if closeErr != nil {
p.logger.Errorw("Error rolling back full sync transaction", "error", closeErr)
}
}()
wantUsers := p.cfg.wantUsers()
wantDevices := p.cfg.wantDevices()
if wantUsers || wantDevices {
ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
p.logger.Debugf("Starting fetch...")
tracker := kvstore.NewTxTracker(ctx)
start := time.Now()
p.publishMarker(start, start, inputCtx.ID, true, client, tracker)
if wantUsers {
err = p.doFetchUsers(ctx, state, true, func(u *User) {
p.publishUser(u, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
}
if wantDevices {
err = p.doFetchDevices(ctx, state, true, func(d *Device) {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
}
end := time.Now()
p.publishMarker(end, end, inputCtx.ID, false, client, tracker)
tracker.Wait()
if ctx.Err() != nil {
return ctx.Err()
}
}
state.lastSync = time.Now()
err = state.close(true)
if err != nil {
return fmt.Errorf("unable to commit state: %w", err)
}
return nil
}
// runIncrementalUpdate will run an incremental update. The process is similar
// to full synchronization, except only users which have changed (newly
// discovered, modified, or deleted) will be published.
func (p *oktaInput) runIncrementalUpdate(inputCtx v2.Context, store *kvstore.Store, client beat.Client) error {
p.logger.Debugf("Running incremental update...")
state, err := newStateStore(store)
if err != nil {
return fmt.Errorf("unable to begin transaction: %w", err)
}
defer func() { // If commit is successful, call to this close will be no-op.
closeErr := state.close(false)
if closeErr != nil {
p.logger.Errorw("Error rolling back incremental update transaction", "error", closeErr)
}
}()
ctx := ctxtool.FromCanceller(inputCtx.Cancelation)
tracker := kvstore.NewTxTracker(ctx)
if p.cfg.wantUsers() {
p.logger.Debugf("Fetching changed users...")
err = p.doFetchUsers(ctx, state, false, func(u *User) {
p.publishUser(u, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
}
if p.cfg.wantDevices() {
p.logger.Debugf("Fetching changed devices...")
err = p.doFetchDevices(ctx, state, false, func(d *Device) {
p.publishDevice(d, state, inputCtx.ID, client, tracker)
})
if err != nil {
return err
}
}
tracker.Wait()
if ctx.Err() != nil {
return ctx.Err()
}
state.lastUpdate = time.Now()
if err = state.close(true); err != nil {
return fmt.Errorf("unable to commit state: %w", err)
}
return nil
}
// doFetchUsers handles fetching user identities from Okta. If fullSync is true, then
// any existing deltaLink will be ignored, forcing a full synchronization from Okta.
// Returns a set of modified users by ID.
func (p *oktaInput) doFetchUsers(ctx context.Context, state *stateStore, fullSync bool, publish func(u *User)) error {
if !p.cfg.wantUsers() {
p.logger.Debugf("Skipping user collection from API: dataset=%s", p.cfg.Dataset)
return nil
}
var (
query url.Values
err error
)
// Get user changes.
if !fullSync && state.nextUsers != "" {
query, err = url.ParseQuery(state.nextUsers)
if err != nil {
p.logger.Warnf("failed to parse next query: %v", err)
}
}
if query == nil {
// Use "search" because of recommendation on Okta dev documentation:
// https://developer.okta.com/docs/reference/user-query/.
// Search term of "status pr" is required so that we get DEPROVISIONED
// users; a nil query is more efficient, but excludes these users.
query = url.Values{"search": []string{"status pr"}}
}
if p.cfg.BatchSize > 0 {
// If limit is not specified, the API default is used in the case
// that we are using, this is 200.
//
// See:
// https://developer.okta.com/docs/api/openapi/okta-management/management/tag/User/#tag/User/operation/listUsers!in=query&path=limit&t=request
query.Set("limit", strconv.Itoa(p.cfg.BatchSize))
}
const omit = okta.OmitCredentials | okta.OmitCredentialsLinks | okta.OmitTransitioningToStatus
var (
n int
lastUpdated time.Time
)
for {
batch, h, err := okta.GetUserDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", query, omit, p.lim, p.logger)
if err != nil {
p.logger.Debugf("received %d users from API", n)
return err
}
p.logger.Debugf("received batch of %d users from API", len(batch))
if fullSync {
for _, u := range batch {
publish(p.addUserMetadata(ctx, u, state))
if u.LastUpdated.After(lastUpdated) {
lastUpdated = u.LastUpdated
}
}
} else {
for _, u := range batch {
su := p.addUserMetadata(ctx, u, state)
publish(su)
n++
if u.LastUpdated.After(lastUpdated) {
lastUpdated = u.LastUpdated
}
}
}
next, err := okta.Next(h)
if err != nil {
if err == io.EOF {
break
}
p.logger.Debugf("received %d users from API", n)
return err
}
query = next
}
// Prepare query for next update. This is any record that was updated
// at or after the last updated record we saw this round. Use this rather
// than time.Now() since we may have received stale records. Use ge
// rather than gt since timestamps are second resolution, so we may not
// have a complete set from that timestamp.
query = url.Values{}
query.Add("search", fmt.Sprintf(`lastUpdated ge "%s" and status pr`, lastUpdated.Format(okta.ISO8601)))
state.nextUsers = query.Encode()
p.logger.Debugf("received %d users from API", n)
return nil
}
func (p *oktaInput) addUserMetadata(ctx context.Context, u okta.User, state *stateStore) *User {
su := state.storeUser(u)
switch len(p.cfg.EnrichWith) {
case 1:
if p.cfg.EnrichWith[0] != "none" {
break
}
fallthrough
case 0:
return su
}
if slices.Contains(p.cfg.EnrichWith, "groups") {
groups, _, err := okta.GetUserGroupDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger)
if err != nil {
p.logger.Warnf("failed to get user group membership for %s: %v", u.ID, err)
} else {
su.Groups = groups
}
}
if slices.Contains(p.cfg.EnrichWith, "factors") {
factors, _, err := okta.GetUserFactors(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger)
if err != nil {
p.logger.Warnf("failed to get user factors for %s: %v", u.ID, err)
} else {
su.Factors = factors
}
}
if slices.Contains(p.cfg.EnrichWith, "roles") {
roles, _, err := okta.GetUserRoles(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, u.ID, p.lim, p.logger)
if err != nil {
p.logger.Warnf("failed to get user roles for %s: %v", u.ID, err)
} else {
su.Roles = roles
}
}
return su
}
// doFetchDevices handles fetching device and associated user identities from Okta.
// If fullSync is true, then any existing deltaLink will be ignored, forcing a full
// synchronization from Okta.
// Returns a set of modified devices by ID.
func (p *oktaInput) doFetchDevices(ctx context.Context, state *stateStore, fullSync bool, publish func(d *Device)) error {
if !p.cfg.wantDevices() {
p.logger.Debugf("Skipping device collection from API: dataset=%s", p.cfg.Dataset)
return nil
}
var (
deviceQuery url.Values
userQueryInit url.Values
err error
)
// Get user changes.
if !fullSync && state.nextDevices != "" {
deviceQuery, err = url.ParseQuery(state.nextDevices)
if err != nil {
p.logger.Warnf("failed to parse next query: %v", err)
}
}
if deviceQuery == nil {
// Use "search" because of recommendation on Okta dev documentation:
// https://developer.okta.com/docs/reference/user-query/.
// Search term of "status pr" is required so that we get DEPROVISIONED
// users; a nil query is more efficient, but excludes these users.
// There is no equivalent documentation for devices, so we assume the
// behaviour is the same.
deviceQuery = url.Values{"search": []string{"status pr"}}
}
if p.cfg.BatchSize > 0 {
// If limit is not specified, the API default is used in the case
// that we are using, this is 200.
//
// See:
// https://developer.okta.com/docs/api/openapi/okta-management/management/tag/User/#tag/User/operation/listUsers!in=query&path=limit&t=request
deviceQuery.Set("limit", strconv.Itoa(p.cfg.BatchSize))
}
// Start user queries from the same time point. This must not
// be mutated since we may perform multiple batched gets over
// multiple devices.
userQueryInit = cloneURLValues(deviceQuery)
var (
n int
lastUpdated time.Time
)
for {
batch, h, err := okta.GetDeviceDetails(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, "", deviceQuery, p.lim, p.logger)
if err != nil {
p.logger.Debugf("received %d devices from API", n)
return err
}
p.logger.Debugf("received batch of %d devices from API", len(batch))
for i, d := range batch {
userQuery := cloneURLValues(userQueryInit)
for {
// TODO: Consider softening the response to errors here. If we fail to get users
// from a device, do we want to fail completely? There are arguments in both
// directions. We _could_ keep a multierror and return that in the end, which
// would guarantee progression, but may result in holes in the data. What we are
// doing at the moment (both here and in doFetchUsers) guarantees no holes, but
// at the cost of potentially not making progress.
const omit = okta.OmitCredentials | okta.OmitCredentialsLinks | okta.OmitTransitioningToStatus
users, h, err := okta.GetDeviceUsers(ctx, p.client, p.cfg.OktaDomain, p.cfg.OktaToken, d.ID, userQuery, omit, p.lim, p.logger)
if err != nil {
p.logger.Debugf("received %d device users from API", len(users))
return err
}
p.logger.Debugf("received batch of %d device users from API", len(users))
// Users are not stored in the state as they are in doFetchUsers. We expect
// them to already have been discovered/stored from that call and are stored
// associated with the device undecorated with discovery state. Or, if the
// the dataset is set to "devices", then we have been asked not to care about
// this detail.
batch[i].Users = append(batch[i].Users, users...)
next, err := okta.Next(h)
if err != nil {
if err == io.EOF {
break
}
p.logger.Debugf("received %d devices from API", n)
return err
}
userQuery = next
}
}
if fullSync {
for _, d := range batch {
publish(state.storeDevice(d))
if d.LastUpdated.After(lastUpdated) {
lastUpdated = d.LastUpdated
}
}
} else {
for _, d := range batch {
sd := state.storeDevice(d)
publish(sd)
n++
if d.LastUpdated.After(lastUpdated) {
lastUpdated = d.LastUpdated
}
}
}
next, err := okta.Next(h)
if err != nil {
if err == io.EOF {
break
}
p.logger.Debugf("received %d devices from API", n)
return err
}
deviceQuery = next
}
// Prepare query for next update. This is any record that was updated
// at or after the last updated record we saw this round. Use this rather
// than time.Now() since we may have received stale records. Use ge
// rather than gt since timestamps are second resolution, so we may not
// have a complete set from that timestamp.
deviceQuery = url.Values{}
deviceQuery.Add("search", fmt.Sprintf(`lastUpdated ge "%s" and status pr`, lastUpdated.Format(okta.ISO8601)))
state.nextDevices = deviceQuery.Encode()
p.logger.Debugf("received %d devices from API", n)
return nil
}
func cloneURLValues(a url.Values) url.Values {
b := make(url.Values, len(a))
for k, v := range a {
b[k] = append(v[:0:0], v...)
}
return b
}
type entity interface {
*User | *Device | okta.User
}
// publishMarker will publish a write marker document using the given beat.Client.
// If start is true, then it will be a start marker, otherwise an end marker.
func (p *oktaInput) publishMarker(ts, eventTime time.Time, inputID string, start bool, client beat.Client, tracker *kvstore.TxTracker) {
fields := mapstr.M{}
_, _ = fields.Put("labels.identity_source", inputID)
if start {
_, _ = fields.Put("event.action", "started")
_, _ = fields.Put("event.start", eventTime)
} else {
_, _ = fields.Put("event.action", "completed")
_, _ = fields.Put("event.end", eventTime)
}
event := beat.Event{
Timestamp: ts,
Fields: fields,
Private: tracker,
}
tracker.Add()
if start {
p.logger.Debug("Publishing start write marker")
} else {
p.logger.Debug("Publishing end write marker")
}
client.Publish(event)
}
// publishUser will publish a user document using the given beat.Client.
func (p *oktaInput) publishUser(u *User, state *stateStore, inputID string, client beat.Client, tracker *kvstore.TxTracker) {
userDoc := mapstr.M{}
_, _ = userDoc.Put("okta", u.User)
_, _ = userDoc.Put("labels.identity_source", inputID)
_, _ = userDoc.Put("user.id", u.ID)
_, _ = userDoc.Put("groups", u.Groups)
switch u.State {
case Deleted:
_, _ = userDoc.Put("event.action", "user-deleted")
case Discovered:
_, _ = userDoc.Put("event.action", "user-discovered")
case Modified:
_, _ = userDoc.Put("event.action", "user-modified")
}
event := beat.Event{
Timestamp: time.Now(),
Fields: userDoc,
Private: tracker,
}
tracker.Add()
p.logger.Debugf("Publishing user %q", u.ID)
client.Publish(event)
}
// publishDevice will publish a device document using the given beat.Client.
func (p *oktaInput) publishDevice(d *Device, state *stateStore, inputID string, client beat.Client, tracker *kvstore.TxTracker) {
devDoc := mapstr.M{}
_, _ = devDoc.Put("okta", d.Device)
_, _ = devDoc.Put("labels.identity_source", inputID)
_, _ = devDoc.Put("device.id", d.ID)
switch d.State {
case Deleted:
_, _ = devDoc.Put("event.action", "device-deleted")
case Discovered:
_, _ = devDoc.Put("event.action", "device-discovered")
case Modified:
_, _ = devDoc.Put("event.action", "device-modified")
}
event := beat.Event{
Timestamp: time.Now(),
Fields: devDoc,
Private: tracker,
}
tracker.Add()
p.logger.Debugf("Publishing device %q", d.ID)
client.Publish(event)
}