Пару месяцев назад я начала изучать Spark, и в какой-то момент столкнулась с проблемой сохранения вычислений Structured Streaming в базе данных Cassandra.
В данном посте я привожу простой пример создания и использования Cassandra Sink для Spark Structured Streaming. Я надеюсь, что пост будет полезен тем, кто недавно начал работать со Spark Structured Streaming и задается вопросом, как выгружать результаты вычислений в базу данных.
Идея приложения очень проста — получить и распарсить сообщения из кафки, выполнить простые трансформации в спарке и сохранить результаты в кассандре.
class CassandraSinkForeach() extends ForeachWriter[org.apache.spark.sql.Row] {
// This class implements the interface ForeachWriter, which has methods that get called
// whenever there is a sequence of rows generated as output
val cassandraDriver = new CassandraDriver();
def open(partitionId: Long, version: Long): Boolean = {
// open connection
println(s"Open connection")
true
}
def process(record: org.apache.spark.sql.Row) = {
println(s"Process new $record")
cassandraDriver.connector.withSessionDo(session =>
session.execute(s"""
insert into ${cassandraDriver.namespace}.${cassandraDriver.foreachTableSink} (fx_marker, timestamp_ms, timestamp_dt)
values('${record(0)}', '${record(1)}', '${record(2)}')""")
)
}
def close(errorOrNull: Throwable): Unit = {
// close the connection
println(s"Close connection")
}
}
val sink = parsed
.writeStream
.queryName("KafkaToCassandraForeach")
.outputMode("update")
.foreach(new CassandraSinkForeach())
.start()
class CassandraDriver extends SparkSessionBuilder {
// This object will be used in CassandraSinkForeach to connect to Cassandra DB from an executor.
// It extends SparkSessionBuilder so to use the same SparkSession on each node.
val spark = buildSparkSession
import spark.implicits._
val connector = CassandraConnector(spark.sparkContext.getConf)
// Define Cassandra's table which will be used as a sink
/* For this app I used the following table:
CREATE TABLE fx.spark_struct_stream_sink (
fx_marker text,
timestamp_ms timestamp,
timestamp_dt date,
primary key (fx_marker));
*/
val namespace = "fx"
val foreachTableSink = "spark_struct_stream_sink"
}
class SparkSessionBuilder extends Serializable {
// Build a spark session. Class is made serializable so to get access to SparkSession in a driver and executors.
// Note here the usage of @transient lazy val
def buildSparkSession: SparkSession = {
@transient lazy val conf: SparkConf = new SparkConf()
.setAppName("Structured Streaming from Kafka to Cassandra")
.set("spark.cassandra.connection.host", "ec2-52-23-103-178.compute-1.amazonaws.com")
.set("spark.sql.streaming.checkpointLocation", "checkpoint")
@transient lazy val spark = SparkSession
.builder()
.config(conf)
.getOrCreate()
spark
}
}
object KafkaToCassandra extends SparkSessionBuilder {
// Main body of the app. It also extends SparkSessionBuilder.
def main(args: Array[String]) {
val spark = buildSparkSession
import spark.implicits._
// Define location of Kafka brokers:
val broker = "ec2-18-209-75-68.compute-1.amazonaws.com:9092,ec2-18-205-142-57.compute-1.amazonaws.com:9092,ec2-50-17-32-144.compute-1.amazonaws.com:9092"
/*Here is an example massage which I get from a Kafka stream. It contains multiple jsons separated by \n
{"timestamp_ms": "1530305100936", "fx_marker": "EUR/GBP"}
{"timestamp_ms": "1530305100815", "fx_marker": "USD/CHF"}
{"timestamp_ms": "1530305100969", "fx_marker": "EUR/CHF"}
{"timestamp_ms": "1530305100011", "fx_marker": "USD/CAD"}
*/
// Read incoming stream
val dfraw = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", broker)
.option("subscribe", "currency_exchange")
.load()
val schema = StructType(
Seq(
StructField("fx_marker", StringType, false),
StructField("timestamp_ms", StringType, false)
)
)
val df = dfraw
.selectExpr("CAST(value AS STRING)").as[String]
.flatMap(_.split("\n"))
val jsons = df.select(from_json($"value", schema) as "data").select("data.*")
// Process data. Create a new date column
val parsed = jsons
.withColumn("timestamp_dt", to_date(from_unixtime($"timestamp_ms"/1000.0, "yyyy-MM-dd HH:mm:ss.SSS")))
.filter("fx_marker != ''")
// Output results into a database
val sink = parsed
.writeStream
.queryName("KafkaToCassandraForeach")
.outputMode("update")
.foreach(new CassandraSinkForeach())
.start()
sink.awaitTermination()
}
}
./bin/spark-submit --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1,datastax:spark-cassandra-connector:2.3.0-s_2.11 --class com.insight.app.CassandraSink.KafkaToCassandra --master spark://ec2-18-232-26-53.compute-1.amazonaws.com:7077 target/cassandra-sink-0.0.1-SNAPSHOT.jar
К сожалению, не доступен сервер mySQL