Spark with delta lake

build.sbt

ThisBuild / version      := "0.1.0"
ThisBuild / scalaVersion := "2.13.8"
ThisBuild / organization := "bkr" 

libraryDependencies ++= List("org.apache.spark" %% "spark-core" % "3.2.0",
                            "org.apache.spark" %% "spark-sql" % "3.2.0", // % "provided"
                            "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.2.0", // % Test
                            "org.apache.spark" %% "spark-avro" % "3.2.0")

libraryDependencies ++= List("org.apache.hadoop" % "hadoop-common" % "3.3.1",
                            "org.apache.hadoop" % "hadoop-azure" % "3.3.1")

libraryDependencies += "org.json4s" %% "json4s-native" % "3.6.12"

libraryDependencies += "io.delta" %% "delta-core" % "1.1.0"

App.scala


import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._
import org.apache.hadoop._
import org.apache.spark.sql.types._

object Application extends App {

    println("Running app..")

    val adlsAccountName = "ADLS_ACCOUNT_NAME"
    val adlsAccountKey = "ADLS_ACCOUNT_KEY"

    val sparkConfig = new SparkConf().setMaster("local")
    val spark = SparkSession
                            .builder
                            .config(sparkConfig)
                            .getOrCreate()
    import spark.implicits._
    
    spark.sparkContext.setLogLevel("ERROR")
    spark.sparkContext
            .hadoopConfiguration
            .set(s"fs.azure.account.key.$adlsAccountName.dfs.core.windows.net", adlsAccountKey)

    val schema = new StructType()
                    .add("action", StringType)
                    .add("entity", new StructType()
                        .add("id", IntegerType)
                        .add("name", StringType)
                        .add("lastname", StringType))

    val ops: Dataset[ClientOperation] = spark.read
                                            .json("./data/input")
                                            .as[ClientOperation]

    val created: Dataset[Client] = ops.filter(o => o.action == "create").map(_.entity)

    val updated: Dataset[Client] = ops.filter(o => o.action == "update").map(_.entity)

    val createdNotUpdated = created.join(updated, created("id") === updated("id"), "leftanti").as[Client]

    val snapshot = createdNotUpdated.union(updated)

    snapshot.foreach(e => println(e))

    // snapshot.write.format("delta").save("./data/output")
    snapshot.write.format("delta").save(s"abfss://joker@$adlsAccountName.dfs.core.windows.net/delta-lake/clients-table")
}

case class Client(id: Long, name: String, lastname: String)
case class ClientOperation(action: String, entity: Client)