func()

in metastore/schema_fetch.go [101:181]


func (j *SchemaFetchJob) applySchemaChange(tables []common.Table) (err error) {
	oldTables, err := j.schemaMutator.ListTables()
	if err != nil {
		return
	}

	oldTablesMap := make(map[string]bool)
	for _, oldTableName := range oldTables {
		oldTablesMap[oldTableName] = true
	}

	for _, table := range tables {
		if _, exist := oldTablesMap[table.Name]; !exist {
			// found new table
			err = j.schemaMutator.CreateTable(&table)
			if err != nil {
				reportError(err, true, table.Name)
				continue
			}
			utils.GetRootReporter().GetCounter(utils.SchemaCreationCount).Inc(1)
			utils.GetLogger().With("table", table.Name).Info("added new table")
		} else {
			oldTablesMap[table.Name] = false
			var oldTable *common.Table
			oldTable, err = j.schemaMutator.GetTable(table.Name)
			if err != nil {
				reportError(err, true, table.Name)
				continue
			}
			if oldTable.Incarnation < table.Incarnation {
				// found new table incarnation, delete previous table and data
				// then create new table
				err := j.schemaMutator.DeleteTable(table.Name)
				if err != nil {
					reportError(err, true, table.Name)
					continue
				}
				utils.GetRootReporter().GetCounter(utils.SchemaDeletionCount).Inc(1)
				utils.GetLogger().With("table", table.Name).Info("deleted table")
				err = j.schemaMutator.CreateTable(&table)
				if err != nil {
					reportError(err, true, table.Name)
					continue
				}
				utils.GetRootReporter().GetCounter(utils.SchemaCreationCount).Inc(1)
				utils.GetLogger().With("table", table.Name).Info("recreated table")

			} else if oldTable.Incarnation == table.Incarnation && !reflect.DeepEqual(&table, oldTable) {
				// found table update
				j.schemaValidator.SetNewTable(table)
				j.schemaValidator.SetOldTable(*oldTable)
				err = j.schemaValidator.Validate()
				if err != nil {
					reportError(err, true, table.Name)
					continue
				}
				err = j.schemaMutator.UpdateTable(table)
				if err != nil {
					reportError(err, true, table.Name)
					continue
				}
				utils.GetRootReporter().GetCounter(utils.SchemaUpdateCount).Inc(1)
				utils.GetLogger().With("table", table.Name).Info("updated table")
			}
		}
	}

	for oldTableName, notAddressed := range oldTablesMap {
		if notAddressed {
			// found table deletion
			err = j.schemaMutator.DeleteTable(oldTableName)
			if err != nil {
				reportError(err, true, oldTableName)
				continue
			}
			utils.GetRootReporter().GetCounter(utils.SchemaDeletionCount).Inc(1)
		}
	}

	return
}