in banyand/metadata/client.go [109:207]
func (s *clientService) PreRun(ctx context.Context) error {
stopCh := make(chan struct{})
sn := make(chan os.Signal, 1)
l := logger.GetLogger(s.Name())
signal.Notify(sn,
syscall.SIGHUP, syscall.SIGINT, syscall.SIGQUIT, syscall.SIGTERM)
go func() {
select {
case si := <-sn:
logger.GetLogger(s.Name()).Info().Msgf("signal received: %s", si)
close(stopCh)
case <-s.closer.CloseNotify():
close(stopCh)
}
}()
for {
var err error
s.schemaRegistry, err = schema.NewEtcdSchemaRegistry(
schema.Namespace(s.namespace),
schema.ConfigureServerEndpoints(s.endpoints),
schema.ConfigureEtcdUser(s.etcdUsername, s.etcdPassword),
schema.ConfigureEtcdTLSCAFile(s.etcdTLSCAFile),
schema.ConfigureEtcdTLSCertAndKey(s.etcdTLSCertFile, s.etcdTLSKeyFile),
schema.ConfigureWatchCheckInterval(s.etcdFullSyncInterval),
)
if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
select {
case <-stopCh:
return errors.New("pre-run interrupted")
case <-time.After(s.registryTimeout):
return errors.New("pre-run timeout")
case <-s.closer.CloseNotify():
return errors.New("pre-run interrupted")
default:
l.Warn().Strs("etcd-endpoints", s.endpoints).Msg("the schema registry init timeout, retrying...")
time.Sleep(time.Second)
continue
}
}
if err == nil {
break
}
return err
}
if !s.toRegisterNode {
return nil
}
val := ctx.Value(common.ContextNodeKey)
if val == nil {
return errors.New("node id is empty")
}
node := val.(common.Node)
val = ctx.Value(common.ContextNodeRolesKey)
if val == nil {
return errors.New("node roles is empty")
}
nodeRoles := val.([]databasev1.Role)
nodeInfo := &databasev1.Node{
Metadata: &commonv1.Metadata{
Name: node.NodeID,
},
GrpcAddress: node.GrpcAddress,
HttpAddress: node.HTTPAddress,
Roles: nodeRoles,
Labels: node.Labels,
CreatedAt: timestamppb.Now(),
}
for {
ctxCancelable, cancel := context.WithTimeout(ctx, time.Second*10)
err := s.schemaRegistry.RegisterNode(ctxCancelable, nodeInfo, s.forceRegisterNode)
cancel()
if errors.Is(err, schema.ErrGRPCAlreadyExists) ||
errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
// Log the specific error
l.Warn().Err(err).Strs("etcd-endpoints", s.endpoints).Msg("register node error")
select {
case <-stopCh:
return errors.New("register node interrupted")
case <-time.After(s.registryTimeout):
return errors.New("register node timeout")
case <-s.closer.CloseNotify():
return errors.New("register node interrupted")
default:
time.Sleep(time.Second)
continue
}
}
if err == nil {
l.Info().Stringer("info", nodeInfo).Msg("register node successfully")
s.nodeInfoMux.Lock()
s.nodeInfo = nodeInfo
s.nodeInfoMux.Unlock()
}
return err
}
}