Spark #5 : Spark uygulamasi

 Konsole guzel, konsol pratik, interaktif. Ben de ad-hoc queryler icin kullaniyorum ya da experimentation amacli. Ama misal ETL joblari yazmak icin girdili ciktili (I/O) uygulama yazip derleyip ortaya koymamiz lazim. 

Tum spark uygulamari driver program adi verilen tek bir noktadan yonetilir. Driver, gerekli islemleri diger makinelere dagitmak gibi bir gorevi vardir. Bizim meshur sparkContext de bu driver program icerisinde calismaktadir. Driverin gorevleri arasinda:

1. Task'larin olusturulmasi: Execution planin olusturulmasi ve tasklarin worker node'lara gonderilmesi
2. Schedule edilmesi (ya da ettirilmesi)
3. Data locality'nin gozetilmesi
4. Fault tolerance: 


Buradaki Cluster Manager spark'in kendi cluster manageri olabilecegi gibi YARN veya Mesos da olabilir. Ben genelde YARN ile calisildigini gordum. Bu cluster manager sayesinde bizim driver program gerekli resourcelari cluster'dan talep eder. Driver program, cluster manager'e baglandiktan sonra clusterda bulunan executor'larin listesine erisir. Executor bir nevi computation unit olup, data saklama ve isleme gorevi ustlenen, izole, minimum birimdir. Daha sonra yine driver program, cluster manager uzerinden uygulama kodunu (JAR veya Python dosyalari) tum executor node'lara gonderir ki, onlar da kodlari calistirabilsin. Son olarak da tasklari executor node'lara gondermeye baslar ve boylece spark job calismaya baslamis olur. 

Goruldugu gibi bir spark job'un calismaya baslayabilmesi icin bir miktar zaman gerekiyor. 

Bu arada driver program, kendine ait web tabanli bir UI barindirir. Genelde 4040 portu uzerinden bu UI'a ulasabilir ver spark'in o anda ne haltlar karistirdigini gorebiliriz. 

Scala projesi olusturma
Tatavayi bir kenara birakarak, koda geciyoruz. Bize bir scala projesi lazim. Nasil ki node'un npm'i, python'un pip'i varsa scala'nin da sbt'si var. simple build tool. yoksa bunu kurunuz

Bu konuda birkac bisey denedim ama simdilik isime gelen IntelliJ ile bir sbt projesi olusturmak. Daha sonra build.sbt dosyasina Spark dependency'lerini ekliyoruz. Ama burada scala versiyonu ile spark versiyonu uyumlu olmasi cok onemli. Ben biraz eski bir versiyon kullaniyorum.

name := "spark-deneme"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.4"

libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
build.sbt guncelledikten sonra sbt arac panelinden Reload cekiyoruz ve spark indirilerek kuruluyor. 
Yeni bir scala dosyasi olusturup adini HelloSparkApp.scala koyuyurum. 

import org.apache.spark.{SparkConf, SparkContext}

object HelloSparkApp extends App {
val sparkConf = new SparkConf()
.setAppName("deneme")
.setMaster("local[1]")

val sc = new SparkContext(sparkConf)

val textFile = sc.textFile("README.md")
val tokenizedFile = textFile.flatMap(_.split(" "))
val keyVals = tokenizedFile.map(t => (t, 1))
val counts = keyVals.reduceByKey((acc, cur) => acc + cur)
val sortedCounts = counts.sortBy(_._2)
sortedCounts.saveAsTextFile("COUNTS")
}

SparkConf
Bu sefer spark config ve spark contexti kendimiz olusturuyoruz. Burada fluent api'den yararlanarak bircok config tanimlanabilir. setMaster metodu ile uygulama hangi cluster'da calisacaksa onun master url'ini veriyoruz. Ama biz simdilik lokalde calistiracagimiz icin local veriyoruz. 1 ise, kullanacagi core sayisini belirtiyor. Eger * verir isek (locak[*]), spark makinedeki tum core'lari kullanacaktir. Yani lokalde bile parallel calisiyor gahpe.
Gerisi de gecen yazida bahsettigimiz word count, big datanin hello world'u. 

Derle, derle beni derle ...
Simdi kodu yazdigimiza gore bunu cluster'a gonderebiliriz.
sbt package koumutu ile kodu derleyerek jar halinde paketliyorum.  Burada olusan JAR dosyasinin yolunu bize gosteriyor. 
Son olarak da spark-submit ile bu uygulamayi cluster'a gonderiyorum. (lokale yani)
spark-submit --class "HelloSparkApp" --master "local[*]" .\target\scala-2.11\spark-deneme_2.11-0.1.jar

Son tavsiyem de bu isleri windows'da yapmayin. Gene spark-submit esnasinda birsuru sacma sapan hata cikti ki su anda hic o hatalarin detaylarini arastirasim yok.






Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

SD #1: Scalability

Threat Modeling 1