Spark 8 : Transformations

Ya hocam birak, var mi biseyler?


Datayi yuklediysek transform edelim az biraz.

Bir metodun transformer olup olmadigini anlamanin bir cakal yolu da signature'une bakmaktir. Eger o da bir RDD donduruyorsa, lazy bir sekilde, orjinal RDD uzerine uygulanacak islemlerin planini olusturan bir transformer metottan baskasi degildir. 

Tatavami su sekilde birakarak olaya giriyorum.

Map
Adi uzerinde, belli bir degere karsilik baska bir deger "map" ediyoruz. Yani bir listeyi alip her bir eleman uzerinde bir islem uygulayarak (evet layarak) yeni bir liste olusturuyoruz. 

> sc.parallelize(1 to 100).map(_ *2) 

dersem anlasilir herhalde. 


FlatMap
Bu da oncekinin daha flat hali zheuehhas. Tamam tamam. Bu metod verilen transformasyonu once tum elemanlara uygular sonra da ortaya cikan elemanlarin tamamini birlestirip flat bir list yapar. Yani,

> sc.textFile("file:///C:/deneme.txt").flatMap(satir => satir.split(" "))

seklinde yaparsak, oncelike bir text dosyasi okuduk. Sonra bunun her bir satirini split ederek, kelimeleri aldik. Boylece aslinda her bir satir icin elimizde bir liste oldu. Bunu tum satirlar icin yaparsak elimizde listelerden olusan bir liste olur. Bunu da flatten edersek, tek bir liste olur ki bu da flatMap iste.

SortBy
Verilen fonksiyon ile listeyi sort eder. Liste deyip duruyorum ama aslinda RDD iste siz anlayin. 

> sc.parallelize(1 to 100).sortBy(t => t, ascending=false)

bu sefer de 100'den 1'e gelecek sekilde sort etmis oluruz.

Sample
Verilen RDD icerisinden belirli kriterlere gore bir `sample` alma, bir `dadina bakma` hukmunde bir islemdir. spark-shell'de calisirken ya da bir machine learning algoritmasi ile iyi gidecek bir metottur. 

scala> res0.sample(withReplacement=false, fraction = 0.03).collect
res37: Array[Int] = Array(10, 32, 66, 69)

Seklinde bir sample alinabilir. Burada fraction, sample datanin gercek datanin kacta kaci olacagini belirtirken, withReplacement ise sample islemi yapilirken bir elemanin birden fazla kere sample icerisinde yer alip almamasi gerektigini belirtir. Mesela sirf bunu denemek icin biraz fraction'u artirirsak,

scala> res0.sample(withReplacement=true, fraction = 0.1).collect
res44: Array[Int] = Array(1, 12, 13, 27, 29, 30, 34, 39, 47, 47, 55, 74)

Burada 47'nin iki kere gectigini gorebiliriz. 

KeyBy
Kendine iyi bak gibi bisey zannedebilirmisiz zuahshdhs. Cok sapittik bu aralar farkindayim. Neyse, verilen fonksiyon ile her bir item uzerinde key olusturulur ve sonucata key-val iceren bir PairRDD elde edilir.

> val isimler = Array("at", "kedi", "kus")
> sc.parallelize(isimler)
> res0.keyBy(t => t.length)
> res1: Array[(Int, String)] = Array((2, "at"), (4, "kedi"), (3, "kus"))

Konu Ozeti
Haydi simdi ele ele tutusun ve kosup gelin, tum ogrendiklerimizle bir ornek yapalim. Oncelikle guzel bir dosya lazim bize, ornek input dosyasi. Shakespeare kulliyatini buradan tek bir text dosyasi olarak indrebilirsiniz. Yani big-data camiasinda 5 mb'in lafi olmaz ama ornek bu ornek. 

Daha sonra burada en sik gecen kelimeleri (evet yabanci klavyem var), listeleyelim. 

import org.apache.spark.{SparkConf, SparkContext}

object SortWords {
def main(args: Array[String]): Unit = {
val sparkConf = new SparkConf().setAppName("SortWords").setMaster("local[*]")
val sc = new SparkContext(sparkConf)

val textFile = sc.textFile("file:///C:\\data\\shakes.txt")
val tokenized = textFile.flatMap(_.split(" "))
val filteredTokens = tokenized.map(_.trim).filter(_.length > 2)
val keyed = filteredTokens.map(t => (t, 1))
val counts = keyed.reduceByKey(_ + _)
val sortedCounts = counts.sortBy(t => t._2, ascending = false)

sortedCounts.saveAsTextFile("SORTED_WORD_FREQ")
}
}

Sonuclar mantikli:

(the,23407)
(and,18358)
(you,9129)
(that,7543)
(And,7068)
(not,6967)
(with,6771)
(his,6218)
(your,6016)
(for,5629)
(have,5236)
(this,4809)
(thou,4247)
Seklinde devam ediyor. Buradaki kod cok yalap sap yazilmis durumda. Bunu kisaltmak hepimize bir odev olsun eyyyy yoldaslar. Saka lan saka siktiredin aq. Bidahakinde gorusmek uzre. 
Until then,

Hayir, son bir nokta var. Shuffle mevzusu ki, cok onemli bir mevzumuzdur. 

Ogrenci Shuffle'i var mi abi :( ?
Simdi bizim RDD dagitik olarak bulundugu icin, bazi operasyonlar bu parcaciklar uzerinde yapilabiliyor. Mesela Map. Data 100 partition halinde de olsa, bu map islemi her bir partition uzerinde ayri ayri uygulanabiliyor. Yani lokal bir islem. Ancak bazi islemler, mesela distinct, yani her bir elemandan sadece tek bir tane birakma islemi yapilacagi zaman tum partition'larin birbiri ile senkronize edilmesi gerekiyor. Buna da shuffle adi veriliyor. Tabu bu senkronizasyon network uzerinden yapildigi icin baya maliyetli ve uzun zaman alan bir islem haline geliyor. Bundan mumkun oldugunca kacinmak gerekiyor. 

Bir islemin shuffle gerektirip gerektirmedigini anlamanin en iyi yolu, RDD uzerinde toDebugString metodunu cagirarak execution plan'a bakmaktir. (Bu konulara ilerde detayli deginecegiz ama execution plan bir RDD'nin nasil olusturacaginin cetelesini, planini tutan plandir :p)

Ilk ornek bir RDD uzerinde map islmei icin:

scala> sc.parallelize(1 to 100).map(t => t*2).toDebugString
res5: String =
(6) MapPartitionsRDD[4] at map at <console>:26 []
 |  ParallelCollectionRDD[0] at parallelize at <console>:27 []

Ikinci ornek ise shuffle gerektiren, distinct metodu icin:

scala> res0.map(t => t*2).distinct.toDebugString
res6: String =
(6) MapPartitionsRDD[8] at distinct at <console>:26 []
 |  ShuffledRDD[7] at distinct at <console>:26 []
 +-(6) MapPartitionsRDD[6] at distinct at <console>:26 []
    |  MapPartitionsRDD[5] at map at <console>:26 []
    |  ParallelCollectionRDD[0] at parallelize at <console>:27 []

Burada cok acik bir sekilde Shuffle oldugunu gorebiliyoruz. Bu yol ile sadece belirli transformasyonlari degil, tum RDD'nin son hali icin eger shuffle gerekip gerekmedigini gorebiliriz. Spark tuning icin execution plan okuman onemli bir yer tutmaktadir. 

Bizim yukaridaki tam dosya okuma ve sort etme ornegine bakacak olursak,

(2) MapPartitionsRDD[11] at sortBy at SortWords.scala:13 []
 |  ShuffledRDD[10] at sortBy at SortWords.scala:13 []
 +-(2) MapPartitionsRDD[7] at sortBy at SortWords.scala:13 []
    |  ShuffledRDD[6] at reduceByKey at SortWords.scala:12 []
    +-(2) MapPartitionsRDD[5] at map at SortWords.scala:11 []
       |  MapPartitionsRDD[4] at filter at SortWords.scala:10 []
       |  MapPartitionsRDD[3] at map at SortWords.scala:10 []
       |  MapPartitionsRDD[2] at flatMap at SortWords.scala:9 []
       |  file:///C:\data\shakespeare.txt MapPartitionsRDD[1] at textFile at SortWords.scala:8 []
       |  file:///C:\data\shakespeare.txt HadoopRDD[0] at textFile at SortWords.scala:8 []


sortBy ve reduceByKey ile iki kere shuffle oldugunu goruyoruz. Eger production'da olsak baya bir zorlaniriz demek oluyor. Sanirim bunu iyilestirebiliriz, ilerde. 

Simdilik, yallah tazzik.


Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

SD #1: Scalability

Threat Modeling 1