Spark with delta lake
build.sbt
ThisBuild / version := "0.1.0"
ThisBuild / scalaVersion := "2.13.8"
ThisBuild / organization := "bkr"
libraryDependencies ++= List("org.apache.spark" %% "spark-core" % "3.2.0",
"org.apache.spark" %% "spark-sql" % "3.2.0", // % "provided"
"org.apache.spark" %% "spark-sql-kafka-0-10" % "3.2.0", // % Test
"org.apache.spark" %% "spark-avro" % "3.2.0")
libraryDependencies ++= List("org.apache.hadoop" % "hadoop-common" % "3.3.1",
"org.apache.hadoop" % "hadoop-azure" % "3.3.1")
libraryDependencies += "org.json4s" %% "json4s-native" % "3.6.12"
libraryDependencies += "io.delta" %% "delta-core" % "1.1.0"
App.scala
import org.apache.spark._
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._
import org.apache.hadoop._
import org.apache.spark.sql.types._
object Application extends App {
println("Running app..")
val adlsAccountName = "ADLS_ACCOUNT_NAME"
val adlsAccountKey = "ADLS_ACCOUNT_KEY"
val sparkConfig = new SparkConf().setMaster("local")
val spark = SparkSession
.builder
.config(sparkConfig)
.getOrCreate()
import spark.implicits._
spark.sparkContext.setLogLevel("ERROR")
spark.sparkContext
.hadoopConfiguration
.set(s"fs.azure.account.key.$adlsAccountName.dfs.core.windows.net", adlsAccountKey)
val schema = new StructType()
.add("action", StringType)
.add("entity", new StructType()
.add("id", IntegerType)
.add("name", StringType)
.add("lastname", StringType))
val ops: Dataset[ClientOperation] = spark.read
.json("./data/input")
.as[ClientOperation]
val created: Dataset[Client] = ops.filter(o => o.action == "create").map(_.entity)
val updated: Dataset[Client] = ops.filter(o => o.action == "update").map(_.entity)
val createdNotUpdated = created.join(updated, created("id") === updated("id"), "leftanti").as[Client]
val snapshot = createdNotUpdated.union(updated)
snapshot.foreach(e => println(e))
// snapshot.write.format("delta").save("./data/output")
snapshot.write.format("delta").save(s"abfss://joker@$adlsAccountName.dfs.core.windows.net/delta-lake/clients-table")
}
case class Client(id: Long, name: String, lastname: String)
case class ClientOperation(action: String, entity: Client)