in metastore/schema_fetch.go [101:181]
func (j *SchemaFetchJob) applySchemaChange(tables []common.Table) (err error) {
oldTables, err := j.schemaMutator.ListTables()
if err != nil {
return
}
oldTablesMap := make(map[string]bool)
for _, oldTableName := range oldTables {
oldTablesMap[oldTableName] = true
}
for _, table := range tables {
if _, exist := oldTablesMap[table.Name]; !exist {
// found new table
err = j.schemaMutator.CreateTable(&table)
if err != nil {
reportError(err, true, table.Name)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaCreationCount).Inc(1)
utils.GetLogger().With("table", table.Name).Info("added new table")
} else {
oldTablesMap[table.Name] = false
var oldTable *common.Table
oldTable, err = j.schemaMutator.GetTable(table.Name)
if err != nil {
reportError(err, true, table.Name)
continue
}
if oldTable.Incarnation < table.Incarnation {
// found new table incarnation, delete previous table and data
// then create new table
err := j.schemaMutator.DeleteTable(table.Name)
if err != nil {
reportError(err, true, table.Name)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaDeletionCount).Inc(1)
utils.GetLogger().With("table", table.Name).Info("deleted table")
err = j.schemaMutator.CreateTable(&table)
if err != nil {
reportError(err, true, table.Name)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaCreationCount).Inc(1)
utils.GetLogger().With("table", table.Name).Info("recreated table")
} else if oldTable.Incarnation == table.Incarnation && !reflect.DeepEqual(&table, oldTable) {
// found table update
j.schemaValidator.SetNewTable(table)
j.schemaValidator.SetOldTable(*oldTable)
err = j.schemaValidator.Validate()
if err != nil {
reportError(err, true, table.Name)
continue
}
err = j.schemaMutator.UpdateTable(table)
if err != nil {
reportError(err, true, table.Name)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaUpdateCount).Inc(1)
utils.GetLogger().With("table", table.Name).Info("updated table")
}
}
}
for oldTableName, notAddressed := range oldTablesMap {
if notAddressed {
// found table deletion
err = j.schemaMutator.DeleteTable(oldTableName)
if err != nil {
reportError(err, true, oldTableName)
continue
}
utils.GetRootReporter().GetCounter(utils.SchemaDeletionCount).Inc(1)
}
}
return
}