Spark with scala
build.sbt
ThisBuild / version := "0.1.0"
ThisBuild / scalaVersion := "2.12.13"
ThisBuild / organization := "bkr"
lazy val KafkaStreamProcessing = (project in file("."))
.settings(
name := "Bkr.Spark",
libraryDependencies += "org.apache.spark" %% "spark-core" % "3.1.1",
libraryDependencies += "org.apache.spark" %% "spark-sql" % "3.1.1",
libraryDependencies += "org.apache.spark" %% "spark-sql-kafka-0-10" % "3.1.1",
libraryDependencies += "org.apache.spark" %% "spark-avro" % "3.1.1",
// version is critical, or ADLS will fail
libraryDependencies += "org.apache.hadoop" % "hadoop-common" % "3.3.1",
libraryDependencies += "org.apache.hadoop" % "hadoop-azure" % "3.3.1",
libraryDependencies += "org.scalatest" %% "scalatest" % "3.0.8" % Test,
// deserializer
libraryDependencies ++= List(
"io.circe" %% "circe-core" % "0.14.1",
"io.circe" %% "circe-generic" % "0.14.1",
"io.circe" %% "circe-parser" % "0.14.1"),
)
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
class SparkSessionBuilder {
def build(appName: String, config: AppConfig): SparkSession = {
val sparkConfig = config.sparkMasterCluster match {
case "LOCAL" => new SparkConf()
.setAppName(appName)
.setMaster("local")
case _ => new SparkConf()
.setAppName(appName)
}
val spark = SparkSession
.builder
.config(sparkConfig)
.getOrCreate()
// blob storage
spark
.sparkContext
.hadoopConfiguration
.set(s"fs.azure.account.key.${config.azBlobStorageConfig.accountName}.blob.core.windows.net", config.azBlobStorageConfig.accountKey)
// adls gen2
spark
.sparkContext
.hadoopConfiguration
.set(s"fs.azure.account.key.${config.azDataLakeStorageConfig.accountName}.dfs.core.windows.net", config.azDataLakeStorageConfig.accountKey)
spark
}
}
Azure blob storage
import org.apache.spark.{SparkConf, SparkSession}
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._
import org.apache.hadoop._
class AzBlobStorageDataSource(config: AzBlobStorageConfig) {
def read(spark: SparkSession, container: String, blob: String) : DataFrame = {
spark.read
.option("inferSchema", "true")
.json(s"wasbs://$container@${config.accountName}.blob.core.windows.net/$blob")
}
}
ADLS Gen2 sink
import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql._
import org.apache.spark.sql.streaming._
import org.apache.hadoop._
class AzDataLakeStorageDataSink(config: AzDataLakeStorageConfig) extends DataSink {
def save[T](ds: Dataset[T], container: String, blob: String) = {
ds.coalesce(1)
.write
.json(s"abfss://$container@${config.accountName}.dfs.core.windows.net/$blob")
}
}