in state/manager.go [513:564]
func syncDeregistered(r *Row) {
mgr.metrics.TableDereg.Inc()
defer func(st time.Time) {
mgr.metrics.TableDeregDuration.Record(time.Since(st))
mgr.metrics.TableDereg.Dec()
}(time.Now())
var tablesTotal, tablesSuccess uint64
syncedState := regStateSynced
enumerator, err := db.GetEnumerator(r.Service, r.Cluster, r.DB, r.Table, r.Input)
if err != nil {
if err.Error() == "DB not found" && r.Cluster != "" && r.DB != "" {
dbl := &db.Loc{Service: r.Service, Cluster: r.Cluster, Name: r.DB}
if DeregisterTableFromState(dbl, r.Table, r.Input, r.Output, r.Version, r.ID) {
syncedState = regStateNotFound
tablesTotal = 1
tablesSuccess = 1
} else {
tableLocLog(&r.TableLoc).Errorf("Failed to deregister")
mgr.metrics.SyncErrors.Inc(1)
}
} else {
log.E(err)
}
} else {
sem := semaphore.NewWeighted(regSyncConcurrency)
for ; enumerator.Next(); tablesTotal++ {
_ = sem.Acquire(context.Background(), 1)
go func(dbl *db.Loc) {
defer sem.Release(1)
if DeregisterTableFromState(dbl, r.Table, r.Input, r.Output, r.Version, r.ID) {
atomic.AddUint64(&tablesSuccess, 1)
} else {
tableLocLog(&r.TableLoc).Errorf("Failed to deregister")
mgr.metrics.SyncErrors.Inc(1)
}
}(enumerator.Value())
}
_ = sem.Acquire(context.Background(), regSyncConcurrency)
}
if tablesTotal != 0 && tablesSuccess == tablesTotal {
if err = util.ExecSQL(mgr.conn, "UPDATE registrations SET sync_state=? WHERE id=?", syncedState, r.ID); err != nil {
log.E(errors.Wrap(err, "Failed to update sync state in registrations"))
mgr.metrics.SyncErrors.Inc(1)
}
}
}