Spark #5 : Spark uygulamasi
Konsole guzel, konsol pratik, interaktif. Ben de ad-hoc queryler icin kullaniyorum ya da experimentation amacli. Ama misal ETL joblari yazmak icin girdili ciktili (I/O) uygulama yazip derleyip ortaya koymamiz lazim.
Tum spark uygulamari driver program adi verilen tek bir noktadan yonetilir. Driver, gerekli islemleri diger makinelere dagitmak gibi bir gorevi vardir. Bizim meshur sparkContext de bu driver program icerisinde calismaktadir. Driverin gorevleri arasinda:
1. Task'larin olusturulmasi: Execution planin olusturulmasi ve tasklarin worker node'lara gonderilmesi
2. Schedule edilmesi (ya da ettirilmesi)
3. Data locality'nin gozetilmesi
4. Fault tolerance:
Goruldugu gibi bir spark job'un calismaya baslayabilmesi icin bir miktar zaman gerekiyor.
Bu arada driver program, kendine ait web tabanli bir UI barindirir. Genelde 4040 portu uzerinden bu UI'a ulasabilir ver spark'in o anda ne haltlar karistirdigini gorebiliriz.
Scala projesi olusturma
Tatavayi bir kenara birakarak, koda geciyoruz. Bize bir scala projesi lazim. Nasil ki node'un npm'i, python'un pip'i varsa scala'nin da sbt'si var. simple build tool. yoksa bunu kurunuz.
Bu konuda birkac bisey denedim ama simdilik isime gelen IntelliJ ile bir sbt projesi olusturmak. Daha sonra build.sbt dosyasina Spark dependency'lerini ekliyoruz. Ama burada scala versiyonu ile spark versiyonu uyumlu olmasi cok onemli. Ben biraz eski bir versiyon kullaniyorum.
name := "spark-deneme"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion
)
build.sbt guncelledikten sonra sbt arac panelinden Reload cekiyoruz ve spark indirilerek kuruluyor.
Yeni bir scala dosyasi olusturup adini HelloSparkApp.scala koyuyurum.
import org.apache.spark.{SparkConf, SparkContext}
object HelloSparkApp extends App {
val sparkConf = new SparkConf()
.setAppName("deneme")
.setMaster("local[1]")
val sc = new SparkContext(sparkConf)
val textFile = sc.textFile("README.md")
val tokenizedFile = textFile.flatMap(_.split(" "))
val keyVals = tokenizedFile.map(t => (t, 1))
val counts = keyVals.reduceByKey((acc, cur) => acc + cur)
val sortedCounts = counts.sortBy(_._2)
sortedCounts.saveAsTextFile("COUNTS")
}
SparkConf
Bu sefer spark config ve spark contexti kendimiz olusturuyoruz. Burada fluent api'den yararlanarak bircok config tanimlanabilir. setMaster metodu ile uygulama hangi cluster'da calisacaksa onun master url'ini veriyoruz. Ama biz simdilik lokalde calistiracagimiz icin local veriyoruz. 1 ise, kullanacagi core sayisini belirtiyor. Eger * verir isek (locak[*]), spark makinedeki tum core'lari kullanacaktir. Yani lokalde bile parallel calisiyor gahpe.
Gerisi de gecen yazida bahsettigimiz word count, big datanin hello world'u.
Derle, derle beni derle ...
Simdi kodu yazdigimiza gore bunu cluster'a gonderebiliriz. sbt package koumutu ile kodu derleyerek jar halinde paketliyorum. Burada olusan JAR dosyasinin yolunu bize gosteriyor.
Son olarak da spark-submit ile bu uygulamayi cluster'a gonderiyorum. (lokale yani)
spark-submit --class "HelloSparkApp" --master "local[*]" .\target\scala-2.11\spark-deneme_2.11-0.1.jar
Son tavsiyem de bu isleri windows'da yapmayin. Gene spark-submit esnasinda birsuru sacma sapan hata cikti ki su anda hic o hatalarin detaylarini arastirasim yok.
Yorumlar
Yorum Gönder