Spark #19 : Closure mevzusu


Kafa karistiran konulardan birtanesi de spark icerisinde closure kullanimidir. Closure, en azindan bir adet bound variable iceren standalone fonksyonlara denir. Bu ne demek? Bir ornek,

scala> var count = 0
cc: Int = 0

scala> val list = 0 to 10
list: scala.collection.immutable.Range.Inclusive = Range 0 to 10

scala> list.foreach(t => {
     | count += 1
     | println(s"sayac: $count")
     | })

Buradaki sondaki foreach icerisinde bir fonksiyon tanimlamasi var. Ve bu tanimlama bir ustteki scope'da bulunan count degiskenine bir referans iceriyor. Bu fonksiyon bir Java nesnesi ve bu tanimlamanin yapildigi metot sonlandiktan sonra dahi calistirilma ihtimali var. Bu durumda nasil count degiskenine erisecek? Cevap, closure. Isim de oradan geliyor zaten, icteki fonksiyon, count degiskeni uzerine kapaniyor ve bir nevi sarmaliyor. Bu sayede count degiskeninin tanimlanmis oldugu metot sonlanmis dahi olsa, bu degisken closure icerisinde yasamaya devam ediyor. 

Ciktiya bakarsak hayet tahmin edilebilir,


Simdi ayni islemi spark ile yapalim.

scala> var count = 0
count: Int = 0

scala> val rdd = sc.makeRDD(1 to 10, 5)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[120] at makeRDD at <console>:28

scala> rdd.foreach
foreach   foreachAsync   foreachPartition   foreachPartitionAsync

scala> rdd.foreach(x => {
     | count += 1
     | println(s"sayac: $count")
     | })
sayac: 1
sayac: 2
sayac: 1
sayac: 1
sayac: 1
sayac: 2
sayac: 2
sayac: 1
sayac: 2
sayac: 2

scala> println(count)
0

Cok garip seyler oluyor. Oncelikle 1'den 10'a kadar olan sayilardan 5 partitionlu bir RDD olsuturuyoruz. Daha sonra bunun uzerinde foreach ile donup her bir iterasyonda sayaci artirip yazdiriyoruz. Son olarak da count degiskeninin son halini yazdirdim. Hala sifir gozukuyor. Yani hic artirma islemi yapilmamis!

Spark distributed bir yapida oldugu icin, her bir partition kendi bound variable kopyasini aliyor. Yani aslinda her bir partition icin farkli bir count degiskeni var. Buradaki foreach de farkli partitionlar uzerinde calistigi icin count degeri hicbir zaman 2'yi gecemedi. (cunku bir parititonda en fazla 2 eleman var bu ornek icin). Ve son olarak yazdirdigimiz count degiskeni ise, orjinal olan, yani executor'lara kopyalanmayan, driver programda bulunan count degiskeni. Bu degisken uzerinde ise hic islem yapilmadi, artirma olmadi. Haliyle degeri hala sifir. 

Gorulebilecegi gibi, driver program uzerinde bir mutable degisken tanimlayip (var) bunu executors icerisinden mutate etmeye calismak iyi bir fikir degil. Bu spesifik ornek acisindan accumulator kullanmak en mantikli cozum olacaktir. 

Broadcasting
Closure'lar ile ilgili baska bir husus da, bound variable'larin executor'lara yani node'lara, broadcast edildigi yani network uzerinden kopyalandigi gercegidir. Ve de her seferinde. Yani closure fonksiyonu her calistirildiginda, bunu calistiran node, bound variable'lari da driver programdan kopyalar. Bir ornek:

val indexer = Map(...) //1 Mb
rdd.flatMap(rddVal => indexer.get(rddVal))

Bura gorece buyuk bir degiskenimiz var, indexer isimli bir lookup table olsun. Bir alttaki flatMap'de verilen fonksiyon bu lookup table'a bir referans barindiriyor. Bu da demek oluyor ku bu closure cluster'da node'lara serialize edilip dagitilirken bu lookup table da dagitilacak yani kopyalanacak. Hem de closure fonksiyonu executor icerisinde her calistiridiginda. Bu da milyonlarca kere calisma ve data kopyalama demek olabilir. 

Cozum de broadcast kullanmak. Su sekilde degistirirsek kodu:

val indexer = sc.broadcast(Map(...))
rdd.flatMap(rddVal => indexer.value.get(rddVal))

Broadcast degiskeni de elbette netwok uzerinde kopyalanacak ama bu sadece bir referans ve onceki index'e gore cok daha kucuk boyutta. Ve hatta spark bunu node'lara distribute ederken, bit torrent'e benzer bir strateji kullanir. Master node'dan kendi kopyasini alan bir node, diger node'lara da bu variable'i kopyalamaya baslar. 

Iyi haber ise, 1.3'ten itibaren broadcast islemi otomatik olarak yapiliyor. Yani bu duruma kafa yormamiza gerek yok. Sadece bound-variable'i birden fazla stage icerisinde kullanacaksak ya da de-serialized olarak saklamak istersek explicit olarak broadcast metodunu cagirmaliyiz. 

Bu sorunu da hallettigimize gore, diger postta gorusuruz. 


Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

SD #1: Scalability

Threat Modeling 1