func()

in ingestor/adx/syncer.go [140:197]


func (s *Syncer) EnsureTable(table string, mapping schema.SchemaMapping) error {
	s.mu.RLock()
	if _, ok := s.tables[table]; ok {
		s.mu.RUnlock()
		return nil
	}
	s.mu.RUnlock()

	var columns []columnDef

	for _, v := range mapping {
		columns = append(columns, columnDef{
			name: v.Column,
			typ:  v.DataType,
		})
	}

	var sb strings.Builder
	sb.WriteString(".create-merge table ")
	sb.WriteString(fmt.Sprintf("['%s'] ", table))

	sb.WriteString("(")
	for i, c := range mapping {
		sb.WriteString(fmt.Sprintf("['%s']:%s", c.Column, c.DataType))
		if i < len(mapping)-1 {
			sb.WriteString(", ")
		}
	}
	sb.WriteString(")")

	if logger.IsDebug() {
		logger.Debugf("Creating table %s %s", table, sb.String())
	}

	showStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(sb.String())

	rows, err := s.KustoCli.Mgmt(context.Background(), s.database, showStmt)
	if err != nil {
		return err
	}

	for {
		_, err1, err2 := rows.NextRowOrError()
		if err2 == io.EOF {
			break
		} else if err1 != nil {
			return err1
		} else if err2 != nil {
			return err2
		}
	}

	s.mu.Lock()
	s.tables[table] = struct{}{}
	s.mu.Unlock()

	return nil
}