in ingestor/adx/syncer.go [140:197]
func (s *Syncer) EnsureTable(table string, mapping schema.SchemaMapping) error {
s.mu.RLock()
if _, ok := s.tables[table]; ok {
s.mu.RUnlock()
return nil
}
s.mu.RUnlock()
var columns []columnDef
for _, v := range mapping {
columns = append(columns, columnDef{
name: v.Column,
typ: v.DataType,
})
}
var sb strings.Builder
sb.WriteString(".create-merge table ")
sb.WriteString(fmt.Sprintf("['%s'] ", table))
sb.WriteString("(")
for i, c := range mapping {
sb.WriteString(fmt.Sprintf("['%s']:%s", c.Column, c.DataType))
if i < len(mapping)-1 {
sb.WriteString(", ")
}
}
sb.WriteString(")")
if logger.IsDebug() {
logger.Debugf("Creating table %s %s", table, sb.String())
}
showStmt := kusto.NewStmt("", kusto.UnsafeStmt(unsafe.Stmt{Add: true, SuppressWarning: true})).UnsafeAdd(sb.String())
rows, err := s.KustoCli.Mgmt(context.Background(), s.database, showStmt)
if err != nil {
return err
}
for {
_, err1, err2 := rows.NextRowOrError()
if err2 == io.EOF {
break
} else if err1 != nil {
return err1
} else if err2 != nil {
return err2
}
}
s.mu.Lock()
s.tables[table] = struct{}{}
s.mu.Unlock()
return nil
}