in banyand/metadata/schema/etcd.go [188:235]
func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) error {
if !e.closer.AddRunning() {
return ErrClosed
}
defer e.closer.Done()
key, err := metadata.key()
if err != nil {
return err
}
getResp, err := e.client.Get(ctx, key)
if err != nil {
return err
}
if getResp.Count > 1 {
return errUnexpectedNumberOfEntities
}
val, err := proto.Marshal(metadata.Spec.(proto.Message))
if err != nil {
return err
}
replace := getResp.Count > 0
if replace {
existingVal, innerErr := metadata.Unmarshal(getResp.Kvs[0].Value)
if innerErr != nil {
return innerErr
}
// directly return if we have the same entity
if metadata.equal(existingVal) {
return nil
}
modRevision := getResp.Kvs[0].ModRevision
txnResp, txnErr := e.client.Txn(ctx).
If(clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)).
Then(clientv3.OpPut(key, string(val))).
Commit()
if txnErr != nil {
return txnErr
}
if !txnResp.Succeeded {
return errConcurrentModification
}
} else {
return ErrGRPCResourceNotFound
}
e.notifyUpdate(metadata)
return nil
}