func syncDeregistered()

in state/manager.go [513:564]


func syncDeregistered(r *Row) {
	mgr.metrics.TableDereg.Inc()
	defer func(st time.Time) {
		mgr.metrics.TableDeregDuration.Record(time.Since(st))
		mgr.metrics.TableDereg.Dec()
	}(time.Now())

	var tablesTotal, tablesSuccess uint64
	syncedState := regStateSynced

	enumerator, err := db.GetEnumerator(r.Service, r.Cluster, r.DB, r.Table, r.Input)
	if err != nil {
		if err.Error() == "DB not found" && r.Cluster != "" && r.DB != "" {
			dbl := &db.Loc{Service: r.Service, Cluster: r.Cluster, Name: r.DB}
			if DeregisterTableFromState(dbl, r.Table, r.Input, r.Output, r.Version, r.ID) {
				syncedState = regStateNotFound
				tablesTotal = 1
				tablesSuccess = 1
			} else {
				tableLocLog(&r.TableLoc).Errorf("Failed to deregister")
				mgr.metrics.SyncErrors.Inc(1)
			}
		} else {
			log.E(err)
		}
	} else {
		sem := semaphore.NewWeighted(regSyncConcurrency)

		for ; enumerator.Next(); tablesTotal++ {
			_ = sem.Acquire(context.Background(), 1)

			go func(dbl *db.Loc) {
				defer sem.Release(1)
				if DeregisterTableFromState(dbl, r.Table, r.Input, r.Output, r.Version, r.ID) {
					atomic.AddUint64(&tablesSuccess, 1)
				} else {
					tableLocLog(&r.TableLoc).Errorf("Failed to deregister")
					mgr.metrics.SyncErrors.Inc(1)
				}
			}(enumerator.Value())
		}

		_ = sem.Acquire(context.Background(), regSyncConcurrency)
	}

	if tablesTotal != 0 && tablesSuccess == tablesTotal {
		if err = util.ExecSQL(mgr.conn, "UPDATE registrations SET sync_state=? WHERE id=?", syncedState, r.ID); err != nil {
			log.E(errors.Wrap(err, "Failed to update sync state in registrations"))
			mgr.metrics.SyncErrors.Inc(1)
		}
	}
}