controller/handlers/schema.go (203 lines of code) (raw):
// Copyright (c) 2017-2018 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package handlers
import (
"github.com/uber/aresdb/utils"
"net/http"
"github.com/gorilla/mux"
apiCom "github.com/uber/aresdb/api/common"
mutatorCom "github.com/uber/aresdb/controller/mutators/common"
"github.com/uber/aresdb/metastore"
metaCom "github.com/uber/aresdb/metastore/common"
"go.uber.org/fx"
"go.uber.org/zap"
)
const (
forceKey = "force"
logMsg = "Schema request"
)
// SchemaHandlerParams defineds parameters needed to initialize schema handler
type SchemaHandlerParams struct {
fx.In
TableSchemaMutator mutatorCom.TableSchemaMutator
EnumMutator mutatorCom.EnumMutator
Logger *zap.SugaredLogger
}
// SchemaHandler serves schema requests
type SchemaHandler struct {
tableSchemaMutator mutatorCom.TableSchemaMutator
enumMutator mutatorCom.EnumMutator
logger *zap.SugaredLogger
}
// NewSchemaHandler creates a new schema handler
func NewSchemaHandler(p SchemaHandlerParams) SchemaHandler {
return SchemaHandler{
tableSchemaMutator: p.TableSchemaMutator,
enumMutator: p.EnumMutator,
logger: p.Logger,
}
}
// Register adds paths to router
func (h SchemaHandler) Register(router *mux.Router, wrappers ...utils.HTTPHandlerWrapper) {
router.HandleFunc("/{namespace}/tables", utils.ApplyHTTPWrappers(h.AddTable, wrappers...)).Methods(http.MethodPost)
router.HandleFunc("/{namespace}/tables/{table}", utils.ApplyHTTPWrappers(h.GetTable, wrappers...)).Methods(http.MethodGet)
router.HandleFunc("/{namespace}/tables", utils.ApplyHTTPWrappers(h.GetTables, wrappers...)).Methods(http.MethodGet)
router.HandleFunc("/{namespace}/tables/{table}", utils.ApplyHTTPWrappers(h.DeleteTable, wrappers...)).Methods(http.MethodDelete)
router.HandleFunc("/{namespace}/tables/{table}", utils.ApplyHTTPWrappers(h.UpdateTable, wrappers...)).Methods(http.MethodPut)
router.HandleFunc("/{namespace}/tables/{table}/columns/{column}/enum-cases", utils.ApplyHTTPWrappers(h.GetEnumCases, wrappers...)).Methods(http.MethodGet)
router.HandleFunc("/{namespace}/tables/{table}/columns/{column}/enum-cases", utils.ApplyHTTPWrappers(h.ExtendEnumCases, wrappers...)).Methods(http.MethodPost)
router.HandleFunc("/{namespace}/hash", utils.ApplyHTTPWrappers(h.GetHash, wrappers...)).Methods(http.MethodGet)
}
// AddTable swagger:route POST /schema/{namespace}/tables addTable
// adds a new table
//
// Consumes:
// - application/json
func (h SchemaHandler) AddTable(w *utils.ResponseWriter, r *http.Request) {
var req AddTableRequest
req.Body.Config = metastore.DefaultTableConfig
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
forceUpdate := r.URL.Query().Get(forceKey)
err = h.tableSchemaMutator.CreateTable(req.Namespace, &req.Body, forceUpdate == "true")
if err != nil {
statusCode := http.StatusInternalServerError
if err == mutatorCom.ErrNamespaceDoesNotExist || err == metaCom.ErrTableAlreadyExist {
statusCode = http.StatusBadRequest
}
w.WriteErrorWithCode(statusCode, err)
return
}
w.WriteObject(nil)
}
// GetTable swagger:route GET /schema/{namespace}/tables/{table} getTable
// returns table schema of given table name
func (h SchemaHandler) GetTable(w *utils.ResponseWriter, r *http.Request) {
var req GetTableRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
table, err := h.tableSchemaMutator.GetTable(req.Namespace, req.TableName)
if err != nil {
statusCode := http.StatusInternalServerError
if mutatorCom.IsNonExist(err) {
statusCode = http.StatusNotFound
err = metaCom.ErrTableDoesNotExist
}
w.WriteErrorWithCode(statusCode, err)
return
}
w.WriteObject(table)
}
// GetTables swagger:route GET /schema/{namespace}/tables getTables
// returns schema of all tables
func (h SchemaHandler) GetTables(w *utils.ResponseWriter, r *http.Request) {
var getTablesRequest GetTablesRequest
err := apiCom.ReadRequest(r, &getTablesRequest, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
// TODO: call membership mutator to update instance status
//if getTablesRequest.InstanceName != "" {
// ...
//}
tableNames, err := h.tableSchemaMutator.ListTables(getTablesRequest.Namespace)
if err != nil {
statusCode := http.StatusInternalServerError
if err == metaCom.ErrTableDoesNotExist {
statusCode = http.StatusBadRequest
}
w.WriteErrorWithCode(statusCode, err)
return
}
tables := make([]metaCom.Table, len(tableNames))
for i, tableName := range tableNames {
table, err := h.tableSchemaMutator.GetTable(getTablesRequest.Namespace, tableName)
if err != nil {
w.WriteError(err)
return
}
tables[i] = *table
}
w.WriteObject(tables)
}
// DeleteTable swagger:route DELETE /schema/{namespace}/tables/{table} deleteTable
// deletes an existing table
func (h SchemaHandler) DeleteTable(w *utils.ResponseWriter, r *http.Request) {
var deleteTableRequest DeleteTableRequest
err := apiCom.ReadRequest(r, &deleteTableRequest, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
err = h.tableSchemaMutator.DeleteTable(deleteTableRequest.Namespace, deleteTableRequest.TableName)
if err != nil {
statusCode := http.StatusInternalServerError
if err == metaCom.ErrTableDoesNotExist {
statusCode = http.StatusBadRequest
}
w.WriteErrorWithCode(statusCode, err)
return
}
w.WriteObject(nil)
}
// UpdateTable swagger:route PUT /schema/{namespace}/tables/{table} updateTable
// updates existing table's schema and config
//
// Consumes:
// - application/json
func (h SchemaHandler) UpdateTable(w *utils.ResponseWriter, r *http.Request) {
var updateTableRequest UpdateTableRequest
updateTableRequest.Body.Config = metastore.DefaultTableConfig
err := apiCom.ReadRequest(r, &updateTableRequest, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
forceUpdate := r.URL.Query().Get(forceKey)
h.logger.Infow(logMsg,
"endpoint", "update",
"event", "received",
"force", forceUpdate)
err = h.tableSchemaMutator.UpdateTable(updateTableRequest.Namespace, updateTableRequest.Body, forceUpdate == "true")
if err != nil {
w.WriteErrorWithCode(http.StatusInternalServerError, err)
return
}
w.WriteObject(updateTableRequest.Body)
}
// GetHash swagger:route GET /schema/{namespace}/hash getSchemaHash
// returns hash of all table schemas in a namespace, hash change means there's a change in any of the schemas
func (h SchemaHandler) GetHash(w *utils.ResponseWriter, r *http.Request) {
var req GetHashRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
// TODO: call membership mutator to update instance status
//if getTablesRequest.InstanceName != "" {
// ...
//}
hash, err := h.tableSchemaMutator.GetHash(req.Namespace)
if err != nil {
statusCode := http.StatusInternalServerError
if mutatorCom.IsNonExist(err) {
statusCode = http.StatusNotFound
err = metaCom.ErrTableDoesNotExist
}
w.WriteErrorWithCode(statusCode, err)
return
}
w.WriteJSONBytesWithCode(http.StatusOK, []byte(hash), nil)
}
// ExtendEnumCases swagger:route POST /schema/{namespace}/tables/{table}/columns/{column}/enum-cases extendEnumCases
// returns enum ids for given enum cases
func (h SchemaHandler) ExtendEnumCases(w *utils.ResponseWriter, r *http.Request) {
var req ExtendEnumCaseRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
enumIDs, err := h.enumMutator.ExtendEnumCases(req.Namespace, req.TableName, req.Column, req.Body)
if err != nil {
if err == ErrTableDoesNotExist || err == ErrColumnDoesNotExist {
w.WriteErrorWithCode(http.StatusBadRequest, err)
} else {
w.WriteError(err)
}
return
}
w.WriteObject(enumIDs)
}
// GetEnumCases swagger:route GET /schema/{namespace}/tables/{table}/columns/{column}/enum-cases getEnumCases
// returns all enum cases for given table column
func (h SchemaHandler) GetEnumCases(w *utils.ResponseWriter, r *http.Request) {
var req GetEnumCaseRequest
err := apiCom.ReadRequest(r, &req, w.SetRequest)
if err != nil {
w.WriteErrorWithCode(http.StatusBadRequest, err)
return
}
enumCases, err := h.enumMutator.GetEnumCases(req.Namespace, req.TableName, req.Column)
if err != nil {
if err == ErrTableDoesNotExist || err == ErrColumnDoesNotExist {
w.WriteErrorWithCode(http.StatusBadRequest, err)
} else {
w.WriteError(err)
}
return
}
w.WriteObject(enumCases)
}