func()

in banyand/metadata/client.go [109:207]


func (s *clientService) PreRun(ctx context.Context) error {
	stopCh := make(chan struct{})
	sn := make(chan os.Signal, 1)
	l := logger.GetLogger(s.Name())
	signal.Notify(sn,
		syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
	go func() {
		select {
		case si := <-sn:
			logger.GetLogger(s.Name()).Info().Msgf("signal received: %s", si)
			close(stopCh)
		case <-s.closer.CloseNotify():
			close(stopCh)
		}
	}()

	for {
		var err error
		s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
			schema.Namespace(s.namespace),
			schema.ConfigureServerEndpoints(s.endpoints),
			schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
			schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
			schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile),
			schema.ConfigureWatchCheckInterval(s.etcdFullSyncInterval),
		)
		if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
			select {
			case <-stopCh:
				return errors.New("pre-run interrupted")
			case <-time.After(s.registryTimeout):
				return errors.New("pre-run timeout")
			case <-s.closer.CloseNotify():
				return errors.New("pre-run interrupted")
			default:
				l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("the schema registry init timeout, retrying...")
				time.Sleep(time.Second)
				continue
			}
		}
		if err == nil {
			break
		}
		return err
	}
	if !s.toRegisterNode {
		return nil
	}

	val := ctx.Value(common.ContextNodeKey)
	if val == nil {
		return errors.New("node id is empty")
	}
	node := val.(common.Node)
	val = ctx.Value(common.ContextNodeRolesKey)
	if val == nil {
		return errors.New("node roles is empty")
	}
	nodeRoles := val.([]databasev1.Role)
	nodeInfo := &databasev1.Node{
		Metadata: &commonv1.Metadata{
			Name: node.NodeID,
		},
		GrpcAddress: node.GrpcAddress,
		HttpAddress: node.HTTPAddress,
		Roles:       nodeRoles,
		Labels:      node.Labels,
		CreatedAt:   timestamppb.Now(),
	}
	for {
		ctxCancelable, cancel := context.WithTimeout(ctx, time.Second*10)
		err := s.schemaRegistry.RegisterNode(ctxCancelable, nodeInfo, s.forceRegisterNode)
		cancel()
		if errors.Is(err, schema.ErrGRPCAlreadyExists) ||
			errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
			// Log the specific error
			l.Warn().Err(err).Strs("etcd-endpoints", s.endpoints).Msg("register node error")

			select {
			case <-stopCh:
				return errors.New("register node interrupted")
			case <-time.After(s.registryTimeout):
				return errors.New("register node timeout")
			case <-s.closer.CloseNotify():
				return errors.New("register node interrupted")
			default:
				time.Sleep(time.Second)
				continue
			}
		}
		if err == nil {
			l.Info().Stringer("info", nodeInfo).Msg("register node successfully")
			s.nodeInfoMux.Lock()
			s.nodeInfo = nodeInfo
			s.nodeInfoMux.Unlock()
		}
		return err
	}
}