Spark 10 : Action

Macera, aksiyon ve tansiyon... hepsi bu postta.

   

  Transformasyonlarin aksine, aksiyonlar lazy degildir. Bir aksyion cagrildigi an, tum hesaplamalar yapilir ve sonuc node'lardan driver programa aktarilir. Eger bir metot RDD dondurmuyor ise, aksyondur. Mesela count, integer dondurur. 

Bazi aksyionlar onceliler node'larda her bir partition uzerinde calistirilarak daha sonra driver'a yonlenirler. Bu sayede mumkun oldugunca shuffle edilecek data azaltilmis olur. Mesela count'ta, oncelikler her bir partition icin count hesaplanarak sadece bu int degeri shuffle edilir. Yoksa tum data shuffle edilip driver uzerinde bunlarin count'ini almak en hafif tabirle lazlik olacaktir. 

Collect
En bodoslama action bu olsa gerek. Diyor ki tum distributed datayi topla bana (driver program) getir. Eger burada cok fazla data varsa driver program out of memory diye bagirarak patlatacaktir. Bir dataset'in distributed olarak tutulmasinda bir sebep olsa gerek degil mi? mesele tek bir makinenin hafizasina sigmamasi olabilir. Bu yuzden collect pek onerilmez. 

Take
Eger ki datadan azcik bir alalim bakalim neheymis diyecek olursak, take metodu ile istedigimiz sayida record'u driver programa toplayabiliriz. RDD'de bulunan ilk n kaydi dondurecektir, yani order'i korur.

scala> res0.take(5).foreach(println)
1
2
3
4
5

bu sekilde ekrana ilk 5 kaydi yazdirabiliriz. Eger ki bu iste biraz randomness olsun istersek, takeSample metodu kullanilabilir. Eger takeSample'i birden fazla kez calistirrsak farkli sonuclar getirecektir.

scala> res0.takeSample(false,5).foreach(println)
14
84
99
95
43

Bu arada top, RDD'deki en buyuk n elemani toplarken takeOrdered ise en kucuk n elemani getirir. 

Count
RDD'deki record sayisini getirir. Ama ki maliyetli bir islemdir. Bu acidan adamlar bir de bunun yaklasik degerini hesaplayan metot gelistirmislar:  countApproxDistinct. Bir tolerans argumani vererek, ne kadar accurate sonuc istediginizi bildirebilirsiniz. Cok buyuk RDD'lerde sayim islmei maliyetli ve sizin tek istediginiz kabacak kac (in the same ballpark derler ya) kayit oldugunu ogrenmek ise kullanisli. 

scala> res0.countApproxDistinct(0.001) // bu deger 0'a ne kadar yaklasirsa o kadar accurate oluyor ama daha uzun suruyor.


Reduce ve Fold Biladerler
Bunlar mahserin iki atlisi gibi fonk. programlamanin da iki atlisidir. Hemen olaya girelim. 

scala> val c = sc.parallelize(1 to 3) 

scala> c.reduce((accumulator, value) => accumulator + value)
res0: Int = 6

Direk sonuc vermesinden de anliyoruz ki bu bir aksiyon. RDD'deki tum elemanlarin uzerinden gecerek tek bir elemena `reduce` ediyoruz. Burada tum elemanlari topladik. Aslinda uzun yazdim ama scala sugarlarindan istifade ederek c.reduce(_+_) diye de yazilabilir. 

Ama spark'in reduce metodu yukaridaki functional programming metodundan biraz farkli. Data distributed oldugu icin akumulatoru garanti etmiyor. Yani aslinda metod signature'i su sekilde:

rdd.reduce((val1, val2) => val1 + val2)

Burada akumulator val1 de olabilir val2 de olabilir. Gerci toplama orneginde bisey farketmiyor. Reduce, akumule edilecek ilk deger olarak RDD'deki ilk item'i aliyor. Bu acidan eger empty bir RDD uzerinde reduce yaparsaniz, hata ile karsilasirsiniz. Burada rdd.isEmpty ile check etmek gerekir.

Ya da fold kullanilabilir ki ilk degeri kabul eden bir metottur. 

Aggregate
Yukaridaki aggregate metotlarin da babasi atasi konumudadir. Cok genel bir fonksiyondur.  Hemen signature;u inceleyelim:

scala> c.aggregate

def aggregate[U](zeroValue: U)(seqOp: (U, Int) => U,combOp: (U, U) => U)(implicit evidence$31: scala.reflect.ClassTag[U]): U

Gorulecegi gibi bir zeroValue aliyor ki, bu da akumulator icin ilk degeri teskil ediyor. Daha sonra iki tane callback metod istiyor bizden. Birincisi, her bir partition'da lokal olarak calistiriliyor. Ve bildigimiz gercek reduce 'deki gibi accumulator ve current_value degerlerini garanti ediyor, adi da seqOp

Ikincisi ise her bir partition'dan seqOp metodu uygulanarak reduce edilmis degerleri nasil combine edecegimizi belirten combOp isimli fonksiyon.

En bastaki reduce fonskyonunu aggregate ile tekrar yazar isek:

scala> c.aggregate(0)((acc, value)=> acc + value, (val1, val2)=> val1+val2)
res10: Int = 6

seklinde ayni sonucu elde edebiliriz.

Bu arada tekrar fonksiyon signature'una dikkat edersek eger, geri dondurdugu tip olarak U goruruz. Yani uzerinde calisilan RDD'nin tipinden fakli olarak bir tip dondurebiliriz. Ilk akumulasyon degeri olarak verdigimiz item'in tipidir hatta donus tipi. Yani,

scala> c.aggregate("")((acc, value)=> acc + value, (val1, val2)=> val1+val2)
res16: String = 231

Seklinde bir string birlestirme islemi yapabiliriz. Ilkakumulasyon degeri olarak bos string verdigimiz icin ilk yapilan islemde Int + String gibi bir islem oluyor ve scala bize bunu bir string olarak donduruyor. (inanmayan shell'de 1 + " hodugum" seklinde deneyebilir). Sonuc olarak da tum RDD elemanlari string olarak islem goruyor ve sonucu veriyor. 

Ama tek bir farkla. Biz `123` diye bir sonuc beklerdik ki order'i kaybettigimizi goruyoruz. Neden? Cunku 2. metot, yani partitionlardan gelen degerleri combine eden fonskyonu bize order'i garanti etmiyor. Ama partiton lokalinde order garanti edilmis durumda. O acidan bu yukaridaki ornegi birkac kere calistirirsaniz, farkli sonuclar elde edebilirsiniz.

o zaman bidahaki sefere kadar, salam ve dua ile.




Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding