为了初始化Spark Streaming程序,一个StreamingContext对象必需被创建,它是Spark Streaming所有流操作的主要入口。一个StreamingContext 对象可以用SparkConf对象创建。
import org.apache.spark._
import org.apache.spark.streaming._
val conf = new SparkConf().setAppName(appName).setMaster(master)
val ssc = new StreamingContext(conf, Seconds(1))
appName
表示你的应用程序显示在集群UI上的名字,master
是一个Spark、Mesos、YARN集群URL 或者一个特殊字符串“local[*]”,它表示程序用本地模式运行。当程序运行在集群中时,你并不希望在程序中硬编码master
,而是希望用spark-submit
启动应用程序,并从spark-submit
中得到 master
的值。对于本地测试或者单元测试,你可以传递“local”字符串在同一个进程内运行Spark Streaming。需要注意的是,它在内部创建了一个SparkContext对象,你可以通过ssc.sparkContext
访问这个SparkContext对象。
批时间片需要根据你的程序的潜在需求以及集群的可用资源来设定,你可以在性能调优那一节获取详细的信息。
可以利用已经存在的SparkContext
对象创建StreamingContext
对象。
import org.apache.spark.streaming._
val sc = ... // existing SparkContext
val ssc = new StreamingContext(sc, Seconds(1))
当一个上下文(context)定义之后,你必须按照以下几步进行操作
streamingContext.start()
方法接收和处理数据;streamingContext.stop()
方法被调用。几点需要注意的地方:
stop()
方法,也会关闭SparkContext对象。如果只想仅关闭StreamingContext对象,设置stop()
的可选参数为false