func()

in banyand/metadata/schema/etcd.go [188:235]


func (e *etcdSchemaRegistry) update(ctx context.Context, metadata Metadata) error {
	if !e.closer.AddRunning() {
		return ErrClosed
	}
	defer e.closer.Done()
	key, err := metadata.key()
	if err != nil {
		return err
	}
	getResp, err := e.client.Get(ctx, key)
	if err != nil {
		return err
	}
	if getResp.Count > 1 {
		return errUnexpectedNumberOfEntities
	}
	val, err := proto.Marshal(metadata.Spec.(proto.Message))
	if err != nil {
		return err
	}
	replace := getResp.Count > 0
	if replace {
		existingVal, innerErr := metadata.Unmarshal(getResp.Kvs[0].Value)
		if innerErr != nil {
			return innerErr
		}
		// directly return if we have the same entity
		if metadata.equal(existingVal) {
			return nil
		}

		modRevision := getResp.Kvs[0].ModRevision
		txnResp, txnErr := e.client.Txn(ctx).
			If(clientv3.Compare(clientv3.ModRevision(key), "=", modRevision)).
			Then(clientv3.OpPut(key, string(val))).
			Commit()
		if txnErr != nil {
			return txnErr
		}
		if !txnResp.Succeeded {
			return errConcurrentModification
		}
	} else {
		return ErrGRPCResourceNotFound
	}
	e.notifyUpdate(metadata)
	return nil
}