Spark Training 上課小記
- Course file in Github
$ cp /opt/spark-1.5.1-bin-hadoop2.6.tgz ~
$ tar zxvf spark-1.5.1-bin-hadoop2.6.tgz
$ mv spark-1.5.1-bin-hadoop2.6 spark
測試 shell
Launch interactive shell
$ ~/spark/bin/spark-shell
$ ~/spark/bin/pyspark
Note:
- pyspark 沒有自動補齊, 須查證為何沒有自動補齊
Functional Programming
- Pass FUNCTION ( what to do ) as method parameter, rather than OBJECT(what)
- 將要做的事情丟進去, 而不是 object
- Scala: FP language based on JVM
- Java: support FP since Java 8
- Python: support FP
在 IDE 內以 local 模式執行 spark application
- 程式碼內 使用 .setMaster(“local”) 指定為 local
- SparkConf conf = new SparkConf().setAppName("HelloWorld").setMaster("local");
== 中午休息 ==
If there are no enough memory for caching RDD partitions
- MEMORY_ONLY
- Oldest will be deleted, then recomputed if nessary
- MEMORY_AND_DISK
- Swap to disk and read back when necessary
RDD Type
- Basic RDD[T]
- Considers each data item as a single value
- Convert to other RDD Type
- PariRDDs
- Each data item containing key / value pairs.
- DoubleRDD
- Data items are confertable to the Scaladata-type double
從集合建立RDD
啟動 spark-shell
scala> sc.parallelize( 1 to 10)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:16
scala> sc.parallelize(Array("1","2","3"))
res1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at
<console>:16
scala> res0.collect()
res4: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> res1.collect()
res5: Array[String] = Array(1, 2, 3)
Notes:
- 如果沒有指定 val 那預設就是用 res0 的方式來進行
scala> val a = sc.parallelize( 1 to 20 )
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:15
scala> a.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
RDD.map( func )
- map 是針對RDD 內所有的東西做一樣的動作
scala> a.collect
res6: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> a.map( x => x + 1)
res7: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at map at <console>:18
scala> a.collect()
res8: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> res7.collect()
res9: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21)
RDD.filter( func )
- Filter 指定的條件
scala> a.collect()
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
scala> a.filter( x => x != 1)
res11: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[4] at filter at <console>:18
scala> res11.collect()
res12: Array[Int] = Array(2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20)
RDD.flatMap( func )
- map 是針對RDD 內所有的東西做一樣的動作, 然後攤平成一個集合
建立 一個 RDD 內容為 1 到 10
scala> val rdd = sc.parallelize( 1 to 10 )
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[12] at parallelize at <console>:15
使用 .flatMap, 列出裡面1到每個元件的結果.
- 以內容來說, 就會列出 1 到 1, 1到 2, 1到 3 …. 到 1 到 10的所有結果, 變成一個集合
scala> rdd.flatMap( x => 1 to x ).collect()
res20: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4, 1, 2, 3, 4, 5, 1, 2, 3, 4, 5, 6, 1, 2, 3, 4, 5, 6, 7, 1, 2, 3, 4, 5, 6, 7, 8, 1, 2, 3, 4, 5, 6, 7, 8, 9, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
反向做法
scala> rdd.flatMap( x => x to 3 ).collect()
res21: Array[Int] = Array(1, 2, 3, 2, 3, 3)
RDD.reduce( func )
scala> var rdd1 = sc.makeRDD( 1 to 10, 3)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[15] at makeRDD at <console>:15
- 後面的 3, 為指定 3 個 partition
以這個 reduce 來就就是將內容累加, 所以 1 到 10 累加是 55
scala> rdd1.reduce( (a,b) => a+b )
res22: Int = 55
這邊要注意的是如果今天 RDD 是分配到多個 partition, 是不保證執行的順序, 所以如果是數值的累加, 有符合交換性不會受到影響, 但是如果是字串型態, 就會產生意外的結果
scala> var range = ( 'a' to 'z' ).map(_.toString)
range: scala.collection.immutable.IndexedSeq[String] = Vector(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z)
scala> val rdd = sc.parallelize( range, 3 )
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[16] at parallelize at <console>:17
scala> rdd.reduce( (a,b) => a+b)
res23: String = abcdefghrstuvwxyzijklmnopq
- 這邊就是因為 3 個 partition 不一定按照順序的關係
scala> rdd.partitions.size
res24: Int = 3
RDD.fold(zero)(func)
- Reduce()的一般式
- Zero value會運用在每個partition中
- Zero value也會用在合併partition的結果中( 也就是最後合併的時候會再加一次 )
- 想法上有點像前置字元, 或是預設值的方式
scala> var rdd1 = sc.makeRDD( 1 to 10, 3 )
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at makeRDD at <console>:15
scala> rdd1.fold(0)( (a,b) => a+b )
res25: Int = 55
scala> rdd1.fold(1)( (a,b) => a+b )
res26: Int = 59
- 這邊之所以是 59
- 1 累加到 10 為 55
- 3 個 partitions 為 1 + 1 + 1 ( 上面的第二點 )
- 合併累加結果也有 1 個 1 ( 上面的第三點 )
- 59 = 55 + 3 + 1
scala> val range = ( 'a' to 'z').map(_.toString)
range: scala.collection.immutable.IndexedSeq[String] = Vector(a, b, c, d, e, f, g, h, i, j, k, l, m, n, o, p, q, r, s, t, u, v, w, x, y, z)
scala> val rdd = sc.parallelize(range,3)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[18] at parallelize at <console>:17
scala> rdd.fold("1")( (a,b) => a+b )
res27: String = 11rstuvwxyz1abcdefgh1ijklmnopq
沒有留言:
張貼留言