Spark Streaming using the Kotlin Spark API in Jupyter
While still slightly experimental in Jupyter, it is possible to use Spark Streaming (DStreams) with the Kotlin Spark API. We'll provide a small but powerful example that can read and show Wikipedia edits live right here in the notebook!
To get started in your own notebook, add the following %use
magic and you're off! In comparison to the non-streaming %use
magic keyword, no Spark Session is started since that needs to be created after the JavaStreamingContext
has been created.
If you want to use a different version of the API or of Spark (supported versions are 3.0, 3.1, and 3.2), they can be defined like:
%use spark-streaming(spark=3.2, v=1.1.0)
%useLatestDescriptors
%use spark-streaming
For this example, we'll use Klaxon to convert the JSON data to Serializable Kotlin Data classes. Other deserializing methods like Kotlinx Serialization (%use serialization
) can also be used. Just make sure that the resulting classes implement java.io.Serializable
to be used with RDDs
and are data classes or any other supported type to be used with Datasets
.
%use klaxon
For this example we want to be able to read data from the stream: https://stream.wikimedia.org/v2/stream/recentchange for which the specification is given at https://stream.wikimedia.org/?spec. We can then use a tool like https://app.quicktype.io/ to convert this JSON Schema to Kotlin data classes and with a couple small adjustments, we're ready to parse the data!
/**
* Old and new revision IDs
*/
data class Revision(
/**
* (rc_last_oldid)
*/
val new: Long? = null,
/**
* (rc_this_oldid)
*/
val old: Long? = null,
) : Serializable
/**
* Length of old and new change
*/
data class Length(
/**
* (rc_new_len)
*/
val new: Long? = null,
/**
* (rc_old_len)
*/
val old: Long? = null,
) : Serializable
data class Meta(
/**
* Domain the event or entity pertains to
*/
val domain: String? = null,
/**
* UTC event datetime, in ISO-8601 format
*/
val dt: String? = null,
/**
* Unique ID of this event
*/
val id: String? = null,
/**
* Unique ID of the request that caused the event
*/
@Json(name = "request_id")
val requestID: String? = null,
/**
* Name of the stream/queue/dataset that this event belongs in
*/
val stream: String? = null,
/**
* Unique URI identifying the event or entity
*/
val uri: String? = null,
) : Serializable
/**
* Represents a MW RecentChange event. https://www.mediawiki.org/wiki/Manual:RCFeed
*/
data class Data(
/**
* A URI identifying the JSONSchema for this event. This should match an schema's $id in a
* schema repository. E.g. /schema/title/1.0.0
*/
@Json(name = "\$schema")
val schema: String? = null,
/**
* (rc_bot)
*/
val bot: Boolean? = null,
/**
* (rc_comment)
*/
val comment: String? = null,
/**
* ID of the recentchange event (rcid).
*/
val id: Long? = null,
/**
* Length of old and new change
*/
val length: Length? = null,
/**
* (rc_log_action)
*/
@Json(name = "log_action")
val logAction: String? = null,
@Json(name = "log_action_comment")
val logActionComment: String? = null,
/**
* (rc_log_id)
*/
@Json(name = "log_id")
val logId: Long? = null,
// /**
// * Property only exists if event has rc_params.
// */
// @Json(name = "log_params", ignored = true)
// val logParams: LogParams? = null,
/**
* (rc_log_type)
*/
@Json(name = "log_type")
val logType: String? = null,
val meta: Meta? = null,
/**
* (rc_minor).
*/
val minor: Boolean? = null,
/**
* ID of relevant namespace of affected page (rc_namespace, page_namespace). This is -1
* ("Special") for log events.
*/
val namespace: Long? = null,
/**
* The rc_comment parsed into simple HTML. Optional
*/
val parsedcomment: String? = null,
/**
* (rc_patrolled). This property only exists if patrolling is supported for this event
* (based on $wgUseRCPatrol, $wgUseNPPatrol).
*/
val patrolled: Boolean? = null,
/**
* Old and new revision IDs
*/
val revision: Revision? = null,
/**
* $wgServerName
*/
@Json(name = "server_name")
val serverName: String? = null,
/**
* $wgScriptPath
*/
@Json(name = "server_script_path")
val serverScriptPath: String? = null,
/**
* $wgCanonicalServer
*/
@Json(name = "server_url")
val serverUrl: String? = null,
/**
* Unix timestamp (derived from rc_timestamp).
*/
val timestamp: Long? = null,
/**
* Full page name, from Title::getPrefixedText.
*/
val title: String? = null,
/**
* Type of recentchange event (rc_type). One of "edit", "new", "log", "categorize", or
* "external". (See Manual:Recentchanges table#rc_type)
*/
val type: String? = null,
/**
* (rc_user_text)
*/
val user: String? = null,
/**
* wfWikiID ($wgDBprefix, $wgDBname)
*/
val wiki: String? = null,
) : Serializable
While in Spark there already exist a couple of streams like ssc.queueStream()
, ssc.socketTextStream()
, or ssc.fileStream()
, for convenience, we want to create our own Receiver
stream which can read streaming data from a given URL.
For this purpose, we define UrlReceiver
and the extension function ssc.urlStream()
below.
import java.io.*
import java.net.*
import kotlin.concurrent.thread
import org.apache.spark.streaming.api.java.JavaStreamingContext
import org.apache.spark.streaming.receiver.Receiver
import org.apache.spark.storage.StorageLevel
class UrlReceiver(val urlString: String, val runOnUrlConnection: URLConnection.() -> Unit = {}) :
Receiver<String>(StorageLevel.MEMORY_AND_DISK_2()) {
@Volatile
private var stopped = false
override fun onStart() {
thread(name = "Url Receiver", start = true) {
BufferedReader(
InputStreamReader(
URL(urlString)
.openConnection()
.apply(runOnUrlConnection)
.getInputStream()
)
).use { bufferedReader ->
var msg: String? = bufferedReader.readLine()
while (msg != null && !stopped) {
if (msg.isNotEmpty())
store(msg)
msg = bufferedReader.readLine()
}
}
}
}
override fun onStop() {
stopped = true
}
}
fun JavaStreamingContext.urlStream(
urlString: String,
runOnUrlConnection: URLConnection.() -> Unit = {},
): JavaReceiverInputDStream<String> = receiverStream(UrlReceiver(urlString, runOnUrlConnection))
Now that we have the data classes and a way to connect to Wikipedia, let's get streaming!
We define withSparkStreaming {}
with the batch duration and timeout we want. The timeout is quite essential in Jupyter, otherwise the stream simply keeps running, which usually is not something you want.
Inside, we open a URL stream for which we set the right request properties according to the Wikipedia API. The stream outputs JSON Strings
, so we attempt to parse those to our Data
class with Klaxon, ignoring the values it cannot parse.
After this, feel free to do anything with the stream you like! For our example, we filter the stream to only have edits on the English Wikipedia. After that we loop over each batch of data in the form of an RDD
, which we then convert to a Dataset
to be able to easily select and show certain columns like the title, user, and comment attached to the edit event.
val url = "https://stream.wikimedia.org/v2/stream/recentchange"
// we can adjust the display options for the rendering of Datasets/RDDs like this
// truncate adjusts the characters per cell, limit adjusts the number of rows
DISPLAY_TRUNCATE = -1
DISPLAY_LIMIT = 20
withSparkStreaming(batchDuration = Durations.seconds(5), timeout = Durations.seconds(20).milliseconds()) {
setRunAfterStart {
println("stream running!")
}
// Create a stream from the url using Klaxon to parse the JSON to Data
val stream: JavaDStream<Data> = ssc.urlStream(url) { setRequestProperty("Accept", "application/json") }
.flatMap { json: String ->
iterator {
Klaxon().parse<Data>(json)?.let { yield(it) }
}
}
// We first filter and then loop over each batch as RDDs
stream
.filter { it.type == "edit" && it.serverName == "en.wikipedia.org" }
.foreachRDD { rdd: JavaRDD<Data>, _: Time ->
// To create a Dataset, we need a SparkSession
// We'll create it from the SparkConf of the RDD as is the way to go
withSpark(rddForConf = rdd) {
rdd
.toDS() // this is only possible within withSpark {}
.select(
col(Data::title),
col(Data::user),
col(Data::comment),
)
.showDS(truncate = false) // we can print them normally
.let { DISPLAY(it) } // or we can make them be displayed and rendered fully
}
}
}
+-----+----+-------+
|title|user|comment|
+-----+----+-------+
+-----+----+-------+
+-----------------------------------------------------------+--------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|title |user |comment |
+-----------------------------------------------------------+--------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
|Nikolai Rubinshtein |WikiCleanerBot |v2.04b - [[User:WikiCleanerBot#T5|Bot T5 CW#16]] - Fix errors for [[WP:WCW|CW project]] (Unicode control characters) |
|1950 Gator Bowl |PrimeBOT |[[User:PrimeBOT/30|Task 30]]: replace deprecated infobox parameters following [[1089355765#Infobox_college_football_game_and_wrappers|a discussion]]|
|Hakra Ware culture |Sram337 |Added an infobox |
|Beergate |DeFacto |/* COVID-19 regulations and Partygate */ typo |
|Oh My English! |2401:3C00:122:8486:BD59:8A6D:B22E:E3AB| |
|Zavadiv, Mostyska urban hromada, Yavoriv Raion, Lviv Oblast|Ymblanter |restored, the government website link is not anymore working |
|Co-op Live |TomHennell |/* Arena design */ |
|Wikipedia talk:Counter-Vandalism Unit/Archive 3 |Graham87 |fix link after archive move |
|Automobile drag coefficient |Lastroot |Added some drag coefficient data. |
|Ophiocordyceps sinensis |Keith D |Fix cite date error |
|French cruiser Jeanne d'Arc (R97) |Lyndaship |clean-up |
+-----------------------------------------------------------+--------------------------------------+----------------------------------------------------------------------------------------------------------------------------------------------------+
+-------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title |user |comment |
+-------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|John Mulaney |Clocksbycoldplay|Added a part about a recent incident at one of Mulaney's shows. |
|1954 Orange Bowl |PrimeBOT |/* top */[[User:PrimeBOT/30|Task 30]]: replace deprecated infobox parameters following [[1089355765#Infobox_college_football_game_and_wrappers|a discussion]] |
|Obed Vargas |Mexedits1 | |
|2022 Formula D season |DanTickner |/* Schedule */ Round 3 winner; Round 2 source |
|Wonder Boy (video game) |Hhhseo | |
|2014 Russell Athletic Bowl |AnomieBOT |Moving refs out of templates |
|Carol Kirkwood |MIDI |double space |
|Hina Khan |Pri2000 |/* External links */ |
|Scaevola eneabba |Allthingsnative |Add description |
|List of members of the 17th Lok Sabha|Dhruv edits |/* Tamil Nadu */ |
|Marikina–Infanta Highway |112.201.163.244 |Added citations. |
|Australian Labor Party |Jay Cracknell |Grammar and spelling |
|2009 SEC Championship Game |PrimeBOT |/* top */[[User:PrimeBOT/30|Task 30]]: replace deprecated infobox parameters following [[1089355765#Infobox_college_football_game_and_wrappers|a discussion]] |
|The Unknown Warrior |Citation bot |Removed parameters. | [[WP:UCB|Use this bot]]. [[WP:DBUG|Report bugs]]. | Suggested by Whoop whoop pull up | [[Category:Tombs of Unknown Soldiers]] | #UCB_Category 33/42|
+-------------------------------------+----------------+-------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
+-------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|title |user |comment |
+-------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|Ashley Young |Mediocre Legacy|Stat update |
|Talk:2022 Australian federal election|HiLo48 |/* Include comment from French foreign minister in the international reactions? */ Yes, but kill the boring ones. |
|Laodicea on the Lycus |Rjdeadly |/* Location */ |
|Nikolai Sukhanov |WikiCleanerBot |v2.04b - [[User:WikiCleanerBot#T5|Bot T5 CW#16]] - Fix errors for [[WP:WCW|CW project]] (Unicode control characters) |
|Draft:October (company) |Oculi |/* '''New name and expansion in the Netherlands and Germany (2018-2019)''' */ dates |
|Wikipedia:Disambiguation |Bagumba |/* Primary topic with only one other topic */ clarify: this initially seemed to be guidance on whether to create a dab, but it's actually about a dab that already was created |
|Mount Erebus disaster |AndrewRG10 |/* Mahon inquiry */ Fixed to link to Conspiracy, not conspiracy theory. |
|List of Punjab Kings records |ਜਸਕੀਰਤ | |
|2011 BCS National Championship Game |PrimeBOT |/* top */[[User:PrimeBOT/30|Task 30]]: replace deprecated infobox parameters following [[1089355765#Infobox_college_football_game_and_wrappers|a discussion]] |
|Gragnague station |Markussep |/* Train services */ |
|Kelsey Wingert |Citation bot |Alter: template type, title. Add: title, date, newspaper. Changed bare reference to CS1/2. Removed parameters. Some additions/deletions were parameter name changes. | [[WP:UCB|Use this bot]]. [[WP:DBUG|Report bugs]]. | Suggested by BrownHairedGirl | Linked from User:BrownHairedGirl/Articles_with_new_bare_URL_refs | #UCB_webform_linked 2012/3841|
|French aircraft carrier La Fayette |Lyndaship |clean-up |
+-------------------------------------+---------------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+