func()

in cassandra-bigtable-migration-tools/cassandra-bigtable-proxy/translator/translator_update.go [142:359]


func (t *Translator) TranslateUpdateQuerytoBigtable(queryStr string, isPreparedQuery bool) (*UpdateQueryMapping, error) {
	lowerQuery := strings.ToLower(queryStr)
	query := renameLiterals(queryStr)

	query, PrependColumns := TransformPrepareQueryForPrependList(query)
	lexer := cql.NewCqlLexer(antlr.NewInputStream(query))
	stream := antlr.NewCommonTokenStream(lexer, antlr.TokenDefaultChannel)
	p := cql.NewCqlParser(stream)
	updateObj := p.Update()

	if updateObj == nil {
		return nil, errors.New("error parsing the update object")
	}

	kwUpdateObj := updateObj.KwUpdate()

	if kwUpdateObj == nil {
		return nil, errors.New("error parsing the update object")
	}

	keyspace := updateObj.Keyspace()
	updateObj.DOT()

	table := updateObj.Table()

	if table == nil || keyspace == nil {
		return nil, errors.New("invalid input paramaters found for table or keyspace")
	}

	keyspaceObj := keyspace.OBJECT_NAME()
	if keyspaceObj == nil {
		return nil, errors.New("invalid input paramaters found for keyspace")
	}
	keyspaceName := keyspaceObj.GetText()

	if !t.SchemaMappingConfig.InstanceExists(keyspaceName) {
		return nil, fmt.Errorf("keyspace %s does not exist", keyspaceName)
	}

	if !t.SchemaMappingConfig.TableExist(keyspaceName, table.GetText()) {
		return nil, fmt.Errorf("table %s does not exist", table.GetText())
	}

	tableobj := table.OBJECT_NAME()
	if tableobj == nil {
		return nil, errors.New("invalid input paramaters found for table")
	}
	tableName := tableobj.GetText()
	var err error

	timestampInfo, err := GetTimestampInfoByUpdate(queryStr, updateObj)
	if err != nil {
		return nil, err
	}
	updateObj.KwSet()
	assignmentObj := updateObj.Assignments()
	allAssignmentObj := assignmentObj.AllAssignmentElement()
	if allAssignmentObj == nil {
		return nil, errors.New("error parsing all the assignment object")
	}
	setValues, err := parseAssignments(allAssignmentObj, tableName, t.SchemaMappingConfig, keyspaceName, &PrependColumns)
	if err != nil {
		return nil, err
	}

	var QueryClauses QueryClauses

	if hasWhere(lowerQuery) {
		resp, err := parseWhereByClause(updateObj.WhereSpec(), tableName, t.SchemaMappingConfig, keyspaceName)
		if err != nil {
			return nil, err
		}

		QueryClauses = *resp

		for k, v := range QueryClauses.Params {
			setValues.Params[k] = v
		}

		setValues.ParamKeys = append(setValues.ParamKeys, QueryClauses.ParamKeys...)
	}
	ifExistObj := updateObj.IfExist()
	var ifExist bool = false
	if ifExistObj != nil {
		val := strings.ToLower(ifExistObj.GetText())
		if val == ifExists {
			ifExist = true
		}
	}

	primaryKeys, err := getPrimaryKeys(t.SchemaMappingConfig, tableName, keyspaceName)
	var actualPrimaryKeys []string
	for _, val := range QueryClauses.Clauses {
		actualPrimaryKeys = append(actualPrimaryKeys, val.Column)
	}
	if !ValidateRequiredPrimaryKeys(primaryKeys, actualPrimaryKeys) {
		missingPrime := findFirstMissingKey(primaryKeys, actualPrimaryKeys)
		missingPkColumnType, err := t.SchemaMappingConfig.GetPkKeyType(tableName, keyspaceName, missingPrime)
		if err != nil {
			return nil, err
		}
		return nil, fmt.Errorf("some %s key parts are missing: %s", missingPkColumnType, missingPrime)
	}
	if err != nil {
		return nil, err
	}
	var primkeyvalues []string
	var rowKey string
	var values []interface{}
	var columns []Column
	for _, val := range setValues.UpdateSetValues {
		values = append(values, val.Encrypted)
		columns = append(columns, Column{Name: val.Column, ColumnFamily: t.SchemaMappingConfig.SystemColumnFamily, CQLType: val.CQLType})
	}
	var newValues []interface{} = values
	var newColumns []Column = columns
	var delColumns []Column
	var delColumnFamily []string
	var complexMeta map[string]*ComplexOperation
	var rawOutput *ProcessRawCollectionsOutput // Declare rawOutput here

	if !isPreparedQuery {
		pkValues := make(map[string]interface{})

		for _, key := range primaryKeys {
			for _, clause := range QueryClauses.Clauses {
				if !clause.IsPrimaryKey {
					return nil, errors.New("any value other then primary key is not accepted in where clause")
				}
				if clause.IsPrimaryKey && clause.Operator == "=" && key == clause.Column {
					pv := clause.Value
					if strings.HasPrefix(clause.Value, "@") {
						pv = clause.Value[1:]
					}
					pkValues[clause.Column] = QueryClauses.Params[pv]
					primkeyvalues = append(primkeyvalues, fmt.Sprintf("%v", QueryClauses.Params[pv]))
				}
			}
		}
		if len(primkeyvalues) != len(primaryKeys) {
			return nil, errors.New("missing primary key values in where clause")
		}
		pmks, err := t.SchemaMappingConfig.GetPkByTableNameWithFilter(tableName, keyspaceName, primaryKeys)
		if err != nil {
			return nil, err
		}
		rowKeyBytes, err := createOrderedCodeKey(pmks, pkValues, t.EncodeIntValuesWithBigEndian)
		if err != nil {
			return nil, err
		}
		rowKey = string(rowKeyBytes)

		// Building new colum family, qualifier and values for collection type of data.
		rawInput := ProcessRawCollectionsInput{
			Columns:        columns,
			Values:         values,
			TableName:      tableName,
			Translator:     t,
			KeySpace:       keyspaceName,
			PrependColumns: PrependColumns,
		}
		rawOutput, err = processCollectionColumnsForRawQueries(rawInput)
		if err != nil {
			return nil, fmt.Errorf("error processing raw collection columns: %w", err)
		}
		newColumns = rawOutput.NewColumns
		newValues = rawOutput.NewValues
		delColumnFamily = rawOutput.DelColumnFamily
		delColumns = rawOutput.DelColumns
		complexMeta = rawOutput.ComplexMeta // Assign complexMeta from output

		for _, val := range QueryClauses.Clauses {
			var column Column
			if columns, exists := t.SchemaMappingConfig.TablesMetaData[keyspaceName][tableName][val.Column]; exists {
				column = Column{Name: columns.ColumnName, ColumnFamily: t.SchemaMappingConfig.SystemColumnFamily, CQLType: columns.ColumnType}
			}
			newColumns = append(newColumns, column)

			pv := val.Value
			if strings.HasPrefix(val.Value, "@") {
				pv = val.Value[1:]
			}
			value := fmt.Sprintf("%v", QueryClauses.Params[pv])
			encryVal, err := formatValues(value, column.CQLType, 4)
			if err != nil {
				return nil, err
			}
			newValues = append(newValues, encryVal)
		}
	} else {
		complexMeta, err = t.ProcessComplexUpdate(columns, values, tableName, keyspaceName, PrependColumns)
		if err != nil {
			return nil, err
		}
	}

	updateQueryData := &UpdateQueryMapping{
		Query:                 query,
		QueryType:             UPDATE,
		Table:                 tableName,
		RowKey:                rowKey,
		Columns:               newColumns,
		Values:                newValues,
		DeleteColumnFamilies:  delColumnFamily,
		DeleteColumQualifires: delColumns,
		IfExists:              ifExist,
		Keyspace:              keyspaceName,
		Clauses:               QueryClauses.Clauses,
		Params:                setValues.Params,
		ParamKeys:             setValues.ParamKeys,
		UpdateSetValues:       setValues.UpdateSetValues,
		PrimaryKeys:           primaryKeys,
		TimestampInfo:         timestampInfo,
		ComplexOperation:      complexMeta,
	}

	return updateQueryData, nil
}