Spark #4 : Hello world

Bu devirde basit dusunebilmek zor is *


Simdi spark-shell ile interactive repl'a gectigimiz zaman, bizim icin 2 sey olusturuluyor:

- Spark context available as 'sc' (master = local[*], app id = local-1606910359074).

- Spark session available as 'spark'.

Buradaki spark bir SparkSession instance'i olup daha cok SparkSql tarafindan kullanilmaktadir. 

sc ise SparkContext. Tum spark uygulamalari bir spark context olusturularak baslatilir. baska turlu olmaz. spark uygulamalari aslinda dagitik uygulamalardir ve bu dagitik uygulamayi manage edebilmek icin bir entry point olmasi lazim. yani bize bir muhattap lazim ki getir goturu yapsin. 

Ilk olarak bir text dosyasi okutarak baslayalim,

val textFile = sc.textFile("file:///spark/README.md")

burada kolaylik olsun diye spark kurulumu yapilan dir'de spark-shell calistirdim ve halihazirda bulunan README.md dosyasini okutuyorum. 

Bunu calistirir isek, bize direkman bir RDD olusturdugunu (resillient distributed dataset, spark'in bel kemigi, cornflex notkasi)

textFile: org.apache.spark.rdd.RDD[String] = README.md MapPartitionsRDD[1]

Burada bir kere daha goruldugu uzere programci olarak bizim muhattabimiz spark context. Ben suan cluster'da kac vm var kac core var kac mb ram var ilgilenmiyorum. Bu makineler nerede? Spark-shell lokal calistidigim icin benim lokal makinemi kullaniyor. Ama ben programi yazarken sanki lokalde calisan bir uygulama yaziyormus gibi takilabiliyorum. Iste bu deployment ile execution'u ayirmak oluyor ve cok guzel oluyor. 

Simdi ben RDD uzerinde 2 cesit islem yapabilirim. Transformation ve action. Transformasyon adindan da anlasiliyor zaten data uzerinde belli basli degisikler yapiliyor. Ve bu islem lazy. Yani gercekten sonuc gerekene kadar bu transformasyonlar erteleniyor. Sadece bu transformasyonlarin nasil yapilacagi saklaniyor. 

Aksiyon ise datanin gercekten disklerden okunmasini, daha once varsa tanimlanmis transformasyonlarin calistirilmasini ve nihayetinde de sonuc datanin drive node'a (pardon buna gelecegiz) dondurulmeisni sagliyor. 

Yani ben simdi burada bir aksiyon olan first metodunu calistirir isem:

textFile.first 

yaparsam, sonuc olarak README.md dosyasindaki ilk satiri bana geri dondurecek:

res0: String = # Apache Spark

Hem sadece first metodunu calistirdigim icin spark tum dosya yerine sadece ilk satiri okuyacak. Bu da aslinda bir anlamda aksiyonlarin bile lazy ozellikleri olduklari gosteriyor. Boyle cakalliklar yapmasa spark'in gercek bigdata ile basetmesi hatta MapReduce'un canina ot tikamasi soz konusu olamazdi.

Simdi textFile rdd'si aslinda bir array gibi dusunulebilir. Her bir eleman aslinda dosyadaki bir satira karsilik geliyor. Ama biz ornek acisindan her bir kelimenin kac kere gectigini hesaplayacagimiz icin bize kelime bazli bir RDD gerekiyor. Bunu flatMap ile yapabiliriz:

val tokenizedText = textFile.flatMap(_.split(" "))

Bu sekilde, bir kontrol edelim, 

tokenizedText.collect

res3: Array[String] = Array(#, Apache, Spark, "", Spark, is, a, unified, analytics, engine, for, large-scale, data, processing., It, provides, high-level, APIs, in, Scala,, Java,, Python,, and, R,, and, an, optimized, engine, that, supports, general, computation, graphs, for ... 

Seklinde dosyadaki kelimelerden olusan bir RDD'miz var.

Simdi her bir kelimeyi sayacagimiz icin, oncelikle tek tek kelimelerden key-value pairler olusturuyoruz:

val countPrep = tokenizedText.map(w => (w, 1))

ve daha sonra reduceByKey ile, herbir distinct kelimeyi sayiyoruz:

val counts = countPrep.reduceByKey((acc, cur) => acc + cur)

Sonucu kontrol edelim:

counts.collect

res6: Array[(String, Int)] = Array((package,1), (this,1), (integration,1), (Python,2), (cluster.,1), (its,1), ([run,1), (There,1), (general,2), (have,1) ...

Seklinde, kelimeleri saydik. Bir de kelime sikligina gore sort edelim ki ne yaptigimizi bilelim:

val sortedCountr = counts.sortBy(kvPair => kvPair._2, false) // false -> buyukten kucuge

ve sonuc:

res7: Array[(String, Int)] = Array(("",73), (the,23), (to,16), (Spark,14), (for,12), (##,9), (a,9), (and,9), (is,7), (run,7), (on,7), (can,6), (also,5), (in,5), (of,5), (Please,4), (*,4), (if,4), (including,4), (an,4), (you,4), (documentation,3), (example,3), (build,3) .. 

Bu noktaya gelene kadar bircok transformasyon uyguladik, iste key-value pairler olustduk sort ettik falan. Eger hic collect calistirmasaydik, data uzerinde hicbir islem yapilmamis olacakti. Sadece transformasyonlari olusturduk. 

Herbir reduce kutucugu distinct bir kelimeye tekabul ediyor. 



Ama aslinda once bir map ile key-val olusturup daha sonra reduceByKey ile saymak bize neyi animsatti? evet, tam anlamiyla bir map-reduce job'u aslinda. Neyse, bunu daha kolay yapmak icin spark bize countByKey diye bir metod sunuyor, onu kullanip gecebiliriz. 


Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

SD #1: Scalability

Threat Modeling 1