Kayıtlar

spark etiketine sahip yayınlar gösteriliyor

SageMaker Notebook'a S3'ten Jar Import etmek

Emr uzerinde Jupyter notebook calistirmak cok kolay. Tek yapmamiz gereken SageMaker console'a giderek yeni bir notebook olusturmak. Buradaki tek puruz, ekstra bir Jar dosyasindan spark job import etmek ve jupyter notebook uzerinde calismak.  Yapmamiz gereken sey, Jar dosyamizi s3 uzerinde bir yere kopyalayip, daha sonra Jar'i jupyter notebook'a import etmek. %%configure -f {     "conf": {          "spark.jars.packages": "com.jsuereth:scala-arm_2.11:2.0,ml.combust.bundle:bundle-ml_2.11:0.13.0,com.databricks:dbutils-api_2.11:0.0.3"     },     "jars": [           "//s3 jar yolu"     ] } Bu sayede yolunu verdigimiz Jar (lar) icerisinde spark job'larini notebook uzerinde claistirabilir ve cok keyifli bir development environment olusturabiliriz.  Happy hacking.

Zeppelin Notebook Scala'dan Python'a Veri Paylasimi

Cok fazla tatavaya girmeden son donemde karsialstigim bir problemle ilgili cozumumu paylasmak istiyorum. Neden Zeppelin? Oncelikle neden ve ne sekilde zeppelin kullaniyorum? Aslinda tabi ki production-grade uygulamalari zeppelin ile gelistirmiyoruz. sbt bazli baya duz IntelliJ ile gelistirdigimiz scala / spark uygulamalari var. Ancak bazi durumlar oluyor ki, bir metodu alip incelemek istiyorum. Farkli inputlar ile ne gibi sonuclar veriyor gormek istiyorum. Yani elimizdeki mevcut scala / spark projesi ile biraz oynamak istiyorum.  Bunun icin oncelikle scala projesini assemble ederek (sbt assembly plugin kullaniyorum), zeppelin'deki spark interpereter'e import ediyorum. Daha sonra zeppelin notebooklar icerisinde bu projeimin istedigim kisimlarini import ederek cagirabiliyorum.  Gorsellestirme onemli bir mevzumuzdur Ve bir noktada bu sonuclari gorsellestirmem gerekiyor. Ozellikle spatial data ile ugrasirken 2d scatter plot guzel gidiyor diyebilirim. Ancak scala tarafinda bunu ciz...

Data Patterns #1 - Data Summarization Pattern

Resim
Problem Verilen bir DNA dizesinde (mesela "AATGC..."), her bir bazin (A, C, G, T) kac kere gectigini bulunuz. Ornek dna dizilim dosyasi: https://ftp.ncbi.nlm.nih.gov/genomes/INFLUENZA/influenza.fna (1.3 Gb) Cozum Yani aslinda kabaca, verilen text icerisinde gecen distinct karakterleri saymamizi istiyor. Cok basit (kolay demedik) ve temel bir problem.  Ornek bir veri dosyasi icin https://ftp.ncbi.nlm.nih.gov/genomes/INFLUENZA/influenza.fna (1.3 Gb text dosyasi) Input sekline bakacak olursak: >gi|58576|gb|X52226|Influenza A virus (A/FPV/Rostock/34(H7N1)) gene for neuraminidase, genomic RNA AGCAAAAGCAGGAGTTCAAAATGAATCCAAATCAGAAAATAATAACCATTGGGTCAATCTGTATGGGGAT CGGAATAATCAGCCTAATATTACAAATTGGAAACATAATCTCAATGTGGGTTAGTCATTCAATTCAGACT GAAAATCAAAATCACCATGAAGCATGCAACCCAAGCATTGCTGGACAGGATGCAGCTTCAGTGGCACTAG CAGGCAATTCCTCTCTTTGTCCCATTAGTGGGTGGGCTATATACAGTAAAGACAATGGTATAAGAATTGG .... En basta, buyuktur isareti ile baslayan bir yorum satiri var ki bu satiri gozardi etmemiz gerekiyor. Ge...

Spark jobum nasil calisiyor?

Resim
 Spark ogrenmek gercekten cok kolay. Esas hazine spark ile paralelize edilmis bir computation'u inceleyip optimizasyon noktalarini bulup optimize edebilmek. Bazen pata kute yazdigimizi spark job'lari cok performanssiz ya da olabileceginden cok daha az performansli olabiliyor. Bunun icin gelin bir spark job'unun cigerini nasil sokebiliriz inceleyeim: Spark UI. Ever gorsellestirmeler bize cok yardimci olacak.  Timeleline gosterimi Tum joblar, tek bir job veya tek bir stage icin timeline view seklinde olan olaylari gorebiliriz. Bu sayede bircok icgoru elde edebiliriz.  Bu gorsel sparkUI'in giris sayfasindan. Yani tum joblar bir timeline uzerinde gosteriliyor. Oncelikle 4 adet executor'in kayit edildidigini goruyoruz. Hemen sonra 4 farkli job paralel bir sekilde calismaya basliyor. Bir tanesi (kirmizi olan) fail olurken digerleri basariyla tamamlaniyor. Tum job'lar tamamlandiktan sonra bizim spark job sona eriyor ve executor'lar bizden alinarak tekrar clustera...

Spark Dataset ile typed bir gelecek

Resim
Hersey RDD ile basliyor. Kendisi type-safe. Ama tum execution dag bir kara kutu gibi, optimizasyon yazana kalmis durumda, low level bir API. Daha sonra Dataframe geliyor. Tum execution dag optimize edilebiliyor. Sql'e yakin ve bu durumda type safety kaybediliyor, hersey Row adi verilen jenerik bir class uzerinden donuyor.  Son olarak da Sql optimizasyonlari ve typesafety birleserek Dataset'i olusturuyor.  Hatta aslinda Spark 2.0'dan sonra Dataframe ve Dataset birlestirilmis durumda. Yani Dataframe, Dataset'in ozel bir hali gibi. Yani aslinda Dataframe = Dataset[Row] . Dataset Actions Dataframe uzerinde tanimlanan collect, take, foreach gibi action metodlari Row nesnesi collection'u uzerinden calisiyor. Ornek olarak bir df uzerinde collect yaparsak geriye Row listesi donecektir. Bunu da tekrardan parse edip kullanmamiz gerekir. Oyse ki Dataset, olusturuldugu class ne ise Dataset[BizimClass] o class tipinen bir kolleksion dondurecektir. Bu durumda tekrar parse etmeye ...

Spark Basit Join

Resim
En buyuk problemlerden bir tanesi. Ozellikle performans konusunda ciddi bottleneck olusturabiliyor. Yapabilecegimiz en iyi sey, Catalyst Reyizi en iyi sekilde kullanip, maksimum optimizasyon saglayabilmek. Soyle uc bir ornek verelim: Iki tabloyu join etmek icin bir join kriterine ihtiyacimiz var oyle degil mi? Bu bir esitlik olabilir, bir ID uzerinden join etmek icin. Bu esitlik operatorunu bir UDF'e kaydirabiliriz, hayet legit.  scala> val esitUDF = udf((id1: Long, id2: Long) => id1 == id2) scala> val joinedDF = df1.join(df2, esitUDF($"id1", $"id2")) Ancak buradaki UDF bir blackbox, yani catalyst buradaki imperative kod hakkinda bir optimizasyon gerceklestiremedigi gibi, bunu diger execution plan'daki adimlarla da kaynastiramiyor. Bu durumda join islemi bir kartezyen karsilastirma yapmak durumunda kaliyor ve sonuc olarak n2 complexity elde etmis oluyoruz. Ancak biz bu karsilastirmayi efendi gibi SparkSQL ile yazsa idik, scala> val joinedDF = df...

Spark #20 : Dataframe

Resim
Spark DataFrame, Pandas dataframes ile kullanim acisindan benzerdir. Ancak tabi ki de altyapi cok farkli, spark distributed bir yapiya sahip iken, Pandas single core kullanabilmektedir. En nihayetinde de bir database table gibi dusunulebilir. Her satir bir kaydi temsil ederken bir veya daha fazla stuna sahip olabilir. Hemen basit bir ornek ile RDD ile farki gorelim. Diyelim ki elimizde isim - yas iceren bir buyuk veri var. Ayni isme sahip kisilerin yas ortalamasini bulmak istiyoruz. Ornek bir data tanimlayalim,  scala> val rddBireyler = sc.parallelize(Seq(("babur", 20), ("jamiryo", 12), ("babur", 56), ("babur", 44), ("jamiryo", 67))) dfBireyler: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[145] at parallelize at <console>:30 Yas ortalamasi almak icin her bir isimden kac tane oldugunu da bulmamiz gerek. Bu acidan kelime sayma yaklasiminda oldugu gibi her bir kaydi 1 ile baslatiyoruz ve daha sonra bir reduce i...

Spark #19 : Closure mevzusu

Resim
Kafa karistiran konulardan birtanesi de spark icerisinde closure kullanimidir. Closure, en azindan bir adet bound variable iceren standalone fonksyonlara denir. Bu ne demek? Bir ornek, scala> var count = 0 cc: Int = 0 scala> val list = 0 to 10 list: scala.collection.immutable.Range.Inclusive = Range 0 to 10 scala> list.foreach(t => {      | count += 1      | println(s"sayac: $count")      | }) Buradaki sondaki foreach icerisinde bir fonksiyon tanimlamasi var. Ve bu tanimlama bir ustteki scope'da bulunan count degiskenine bir referans iceriyor. Bu fonksiyon bir Java nesnesi ve bu tanimlamanin yapildigi metot sonlandiktan sonra dahi calistirilma ihtimali var. Bu durumda nasil count degiskenine erisecek? Cevap, closure. Isim de oradan geliyor zaten, icteki fonksiyon, count degiskeni uzerine kapaniyor ve bir nevi sarmaliyor. Bu sayede count degiskeninin tanimlanmis oldugu metot sonlanmis dahi olsa, bu degisken closure icerisinde yasa...

Spark #18 : Spark SQL

Resim
Spark ozelliklerini SQL'e benzeyen bir dil ile kullanabilmeyi saglar. Tum sql standardini suan desteklemese de ileride hedef olarak SQL92 standardina uygun olabilmek belirlenmis durumda. Bir ornek: benimData.createOrReplaceTable("sparkTable") sqlContext.sql("SELECT * FROM sparkTable WHERE baziStun == 'bazi deger'") Databricks'teki arkadaslar, spark'in paralel islem gucunu "Big Data" muhendislerinden baska insanlarin da kullanmasini arzu ettiklerini ve SQL adaptasyonuna bu acidan onem verdiklerini soylemisler .  Burada esas amac SQL standardindan da ziyade bir soyutlama katmani yaratmak ve kullaniciya declerative kod yazma imkani saglamak. RDD'ler ile ugrasirken bircok detaya ve islemlerin nasil yapilmasi gerektigine kafa yormak gerekiyor. Ancak SQL ile islerin `nasil` yapilacagini degil de sadece `ne` yapilmasi gerektigini tarif ediyoruz. Gerisi spark tarafindan optimize ediliyor. SparkSQL'e bir alternatif ise Hive'dir. Kisa...

Spark #17 : Spark UI

Resim
Bana gore spark ile verimli joblar yazmanin anahtari problemleri iyi analiz etmek ve gerekli optimizasyonlari yapabilmek. Olcemedigin bir seyi iyilestiremezsin derler. Tam bu noktada SparkUI cok degerli bilgiler sunarak calismakta olan Spark job'unu derinlemesine analiz etmemize olanak sagliyor.  Herhangi bir spark driver program ayaga kalktiginda, basit bit web server calistirarak mevcut program hakinda bilgiler verecektir. Test etmek icin spark-shell'i acip, localhost:4040 adresine gidiyoruz.  Jobs Default aktif tab jobs tabidir. Her bir action, yeni bir spark job anlamina gelir. Cunku daha once de bahsetmistik, her bir action, tum execution dag'inin basta calistirilmasini gerektirir. Bu yuzden listelenen job'lar uzerinde de aciklama olarak cagrilan action ve hangi satirda oldugu gorulebilir.  Tekrar spark-shell'e donup, bir RDD olsuturp bir action calistiralim: > val nums = sc.parallelize(1 to 1000000) > nums.count Bunu yaptiktan sonra SparkUI sekmesinde re...

Spark #16 : Cluster Manager

Resim
Nedir? Cluster manager bir dagitik kernel olarak dusunulebilir. Nasil ki esas kernel tek bir makinede calisan process'leri yonetiyor ve ihtiyaclari olan resource'lari pay ediyor ise, cluster manager de tek bir makineden (daha dogrusu gek bir noktadan) dagitik halde bulunan makineler uzerindeki resourclarin pay edilmeisni saglar.  Neler var? Spark'in kendine ait bir stand-alone cluster manageri mevut. Daha sonra Hadoop'tan YARN geliyor (yet another resource negotiatior). Apache Mesos, biraz daha esnek oldugu soylenmekle beraber YARN'a bir alternatif olarak dusunulebilir.  Spark standalone Eger basit bir manager istiyor isek bunu secebiliyoruz.  --master spark://[HOST]:7077/ seklinde cluster manager'i belirtmemiz gerekiyor. hem client hem de cluster metotlarini destekliyor. default olarak spreadOut ozelligi true olarak geliyor.  spark.deploy.spreadOut=true bu da su demek. Mumkun oldugunca isi dagitmaya calisacak spark cluster manager. Yani her biri 3 core barindir...

Spark #15 : spark-submit

Resim
Bir spark job'u spark-submit ile cluster'a gonderildiginde, bir driver program olsuturulur ve onun main metodu calistirilir. spark-submit aslinda spark'in kok dizininde bulunan bin klasordunde yasayan bir scriptciktir.  Driver programi calistiran process spark-submiti calistiran makinede host edilebilecegi gibi (client mode), cluster uzerindeki bu spark job icin ayrilmis olan master node uzerinde de calisabilir (cluster mode).  Submit gerceklestikten sonra, driver program, cluster manager'den gerekli resource'lari ister. (kac cpu olacak ne kadar RAM gerekli gibi). Gerekli kaynaklar mevcut ise cluster manager bunlari ayaga kaldirmaya baslar. Daha sonra da driver program'in main metodu calismaya baslar. RDD'ler lazy bir sekilde olusturulur ta ki bir action'a denk gelene kadar. Action, o ana kadar olusturulan execurion dag'in gercekten calistirilmasini saglar. Dag execute edilip de eger geri dondurulecek bir deger var ise (mesela count gibi) driver prog...

Spark #14 : Akumulator

Resim
Meslea spark job'u esnasinda ortaya cikan hatalarin sayisini bir sekilde tutmak istiyoruz. Spark, cluster uzerindeki tum node'lardan erisilebilen, guncellenebilen ama deger okumasi sadece driver program uzerinde gerceklestirilebilen bir veri yapisi sunuyor: Accumulator.  Bir ornekle durumu aciklayalim: scala> val acc = sc.longAccumulator("benim akum") scala> val rdd = sc.parallelize(Seq(1,2,3,4,5)) scala> cc1.foreach(x => {             | println(x)             | acc.add(1)             | }) scala> acc.value res18: Long = 5 Seklinde, her adimda acc degerini 1 artiriyoruz. Normalde bakinca cok kolay bir islemmis gibi gelebilir ama unutmayin ki buradaki foreach icerisindeki anonim fonksiyon cluster uzerine dagitiliyor ve farkli makinelerde calisiyor olabilir. Bu da demek oluyor ki, acc isimli akumulatoru biz farkli makinelerden guncelliyoruz. Ve son olarak da driver programda degeri...