client/sql/dataframewriter.go (71 lines of code) (raw):

package sql import ( "fmt" proto "github.com/apache/spark-connect-go/v_3_4/internal/generated" "strings" ) // DataFrameWriter supports writing data frame to storage. type DataFrameWriter interface { // Mode specifies saving mode for the data, e.g. Append, Overwrite, ErrorIfExists. Mode(saveMode string) DataFrameWriter // Format specifies data format (data source type) for the underlying data, e.g. parquet. Format(source string) DataFrameWriter // Save writes data frame to the given path. Save(path string) error } // dataFrameWriterImpl is an implementation of DataFrameWriter interface. type dataFrameWriterImpl struct { sparkSession *sparkSessionImpl relation *proto.Relation saveMode string formatSource string } func (w *dataFrameWriterImpl) Mode(saveMode string) DataFrameWriter { w.saveMode = saveMode return w } func (w *dataFrameWriterImpl) Format(source string) DataFrameWriter { w.formatSource = source return w } func (w *dataFrameWriterImpl) Save(path string) error { saveMode, err := getSaveMode(w.saveMode) if err != nil { return err } var source *string if w.formatSource != "" { source = &w.formatSource } plan := &proto.Plan{ OpType: &proto.Plan_Command{ Command: &proto.Command{ CommandType: &proto.Command_WriteOperation{ WriteOperation: &proto.WriteOperation{ Input: w.relation, Mode: saveMode, Source: source, SaveType: &proto.WriteOperation_Path{ Path: path, }, }, }, }, }, } responseClient, err := w.sparkSession.executePlan(plan) if err != nil { return err } return consumeExecutePlanClient(responseClient) } func getSaveMode(mode string) (proto.WriteOperation_SaveMode, error) { if mode == "" { return proto.WriteOperation_SAVE_MODE_UNSPECIFIED, nil } else if strings.EqualFold(mode, "Append") { return proto.WriteOperation_SAVE_MODE_APPEND, nil } else if strings.EqualFold(mode, "Overwrite") { return proto.WriteOperation_SAVE_MODE_OVERWRITE, nil } else if strings.EqualFold(mode, "ErrorIfExists") { return proto.WriteOperation_SAVE_MODE_ERROR_IF_EXISTS, nil } else if strings.EqualFold(mode, "Ignore") { return proto.WriteOperation_SAVE_MODE_IGNORE, nil } else { return 0, fmt.Errorf("unsupported save mode: %s", mode) } }