in cmd/spark-connect-example-spark-session/main.go [31:162]
func main() {
flag.Parse()
spark, err := sql.SparkSession.Builder.Remote(*remote).Build()
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
defer spark.Stop()
df, err := spark.Sql("select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
log.Printf("DataFrame from sql: select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
err = df.Show(100, false)
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
schema, err := df.Schema()
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
for _, f := range schema.Fields {
log.Printf("Field in dataframe schema: %s - %s", f.Name, f.DataType.TypeName())
}
rows, err := df.Collect()
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
schema, err = rows[0].Schema()
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
for _, f := range schema.Fields {
log.Printf("Field in row: %s - %s", f.Name, f.DataType.TypeName())
}
for _, row := range rows {
log.Printf("Row: %v", row)
}
err = df.Write().Mode("overwrite").
Format("parquet").
Save("file:///tmp/spark-connect-write-example-output.parquet")
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
df, err = spark.Read().Format("parquet").
Load("file:///tmp/spark-connect-write-example-output.parquet")
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
log.Printf("DataFrame from reading parquet")
df.Show(100, false)
err = df.CreateTempView("view1", true, false)
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
df, err = spark.Sql("select count, word from view1 order by count")
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
log.Printf("DataFrame from sql: select count, word from view1 order by count")
df.Show(100, false)
log.Printf("Repartition with one partition")
df, err = df.Repartition(1, nil)
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
err = df.Write().Mode("overwrite").
Format("parquet").
Save("file:///tmp/spark-connect-write-example-output-one-partition.parquet")
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
log.Printf("Repartition with two partitions")
df, err = df.Repartition(2, nil)
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
err = df.Write().Mode("overwrite").
Format("parquet").
Save("file:///tmp/spark-connect-write-example-output-two-partition.parquet")
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
log.Printf("Repartition with columns")
df, err = df.Repartition(0, []string{"word", "count"})
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
err = df.Write().Mode("overwrite").
Format("parquet").
Save("file:///tmp/spark-connect-write-example-output-repartition-with-column.parquet")
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
log.Printf("Repartition by range with columns")
df, err = df.RepartitionByRange(0, []sql.RangePartitionColumn{
{
Name: "word",
Descending: true,
},
})
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
err = df.Write().Mode("overwrite").
Format("parquet").
Save("file:///tmp/spark-connect-write-example-output-repartition-by-range-with-column.parquet")
if err != nil {
log.Fatalf("Failed: %s", err.Error())
}
}