func main()

in cmd/spark-connect-example-spark-session/main.go [41:206]


func main() {
	flag.Parse()
	ctx := context.Background()
	spark, err := sql.NewSessionBuilder().Remote(*remote).Build(ctx)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}
	defer utils.WarnOnError(spark.Stop, func(err error) {})

	df, err := spark.Sql(ctx, "select id from range(100)")
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	df, _ = df.FilterByString(ctx, "id < 10")
	err = df.Show(ctx, 100, false)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	df, err = spark.Sql(ctx, "select * from range(100)")
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	df, _ = df.Filter(ctx, functions.Col("id").Lt(functions.Expr("10")))
	err = df.Show(ctx, 100, false)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	df, _ = spark.Sql(ctx, "select * from range(100)")
	df, err = df.Filter(ctx, functions.Col("id").Lt(functions.Lit(types.Int64(20))))
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}
	err = df.Show(ctx, 100, false)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	df, err = spark.Sql(ctx, "select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	log.Printf("DataFrame from sql: select 'apple' as word, 123 as count union all select 'orange' as word, 456 as count")
	err = df.Show(ctx, 100, false)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	schema, err := df.Schema(ctx)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	for _, f := range schema.Fields {
		log.Printf("Field in dataframe schema: %s - %s", f.Name, f.DataType.TypeName())
	}

	rows, err := df.Collect(ctx)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	schema, err = df.Schema(ctx)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	for _, f := range schema.Fields {
		log.Printf("Field in row: %s - %s", f.Name, f.DataType.TypeName())
	}

	for _, row := range rows {
		log.Printf("Row: %v", row)
	}

	err = df.Writer().Mode("overwrite").
		Format("parquet").
		Save(ctx, fmt.Sprintf("file://%s/spark-connect-write-example-output.parquet", *filedir))
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	df, err = spark.Read().Format("parquet").
		Load(fmt.Sprintf("file://%s/spark-connect-write-example-output.parquet", *filedir))
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	log.Printf("DataFrame from reading parquet")
	err = df.Show(ctx, 100, false)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	err = df.CreateTempView(ctx, "view1", true, false)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	df, err = spark.Sql(ctx, "select count, word from view1 order by count")
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	log.Printf("DataFrame from sql: select count, word from view1 order by count")
	err = df.Show(ctx, 100, false)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	log.Printf("Repartition with one partition")
	df, err = df.Repartition(ctx, 1, nil)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	err = df.Writer().Mode("overwrite").
		Format("parquet").
		Save(ctx, fmt.Sprintf("file://%s/spark-connect-write-example-output-one-partition.parquet", *filedir))
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	log.Printf("Repartition with two partitions")
	df, err = df.Repartition(ctx, 2, nil)
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	err = df.Writer().Mode("overwrite").
		Format("parquet").
		Save(ctx, fmt.Sprintf("file://%s/spark-connect-write-example-output-two-partition.parquet", *filedir))
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	log.Printf("Repartition with columns")
	df, err = df.Repartition(ctx, 0, []string{"word", "count"})
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	err = df.Writer().Mode("overwrite").
		Format("parquet").
		Save(ctx, fmt.Sprintf("file://%s/spark-connect-write-example-output-repartition-with-column.parquet", *filedir))
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	log.Printf("Repartition by range with columns")
	df, err = df.RepartitionByRange(ctx, 0, functions.Col("word").Desc())
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}

	err = df.Writer().Mode("overwrite").
		Format("parquet").
		Save(ctx, fmt.Sprintf("file:///%s/spark-connect-write-example-output-repartition-by-range-with-column.parquet", *filedir))
	if err != nil {
		log.Fatalf("Failed: %s", err)
	}
}