Kayıtlar

Aralık, 2020 tarihine ait yayınlar gösteriliyor

Spark #20 : Dataframe

Resim
Spark DataFrame, Pandas dataframes ile kullanim acisindan benzerdir. Ancak tabi ki de altyapi cok farkli, spark distributed bir yapiya sahip iken, Pandas single core kullanabilmektedir. En nihayetinde de bir database table gibi dusunulebilir. Her satir bir kaydi temsil ederken bir veya daha fazla stuna sahip olabilir. Hemen basit bir ornek ile RDD ile farki gorelim. Diyelim ki elimizde isim - yas iceren bir buyuk veri var. Ayni isme sahip kisilerin yas ortalamasini bulmak istiyoruz. Ornek bir data tanimlayalim,  scala> val rddBireyler = sc.parallelize(Seq(("babur", 20), ("jamiryo", 12), ("babur", 56), ("babur", 44), ("jamiryo", 67))) dfBireyler: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[145] at parallelize at <console>:30 Yas ortalamasi almak icin her bir isimden kac tane oldugunu da bulmamiz gerek. Bu acidan kelime sayma yaklasiminda oldugu gibi her bir kaydi 1 ile baslatiyoruz ve daha sonra bir reduce i

Spark #19 : Closure mevzusu

Resim
Kafa karistiran konulardan birtanesi de spark icerisinde closure kullanimidir. Closure, en azindan bir adet bound variable iceren standalone fonksyonlara denir. Bu ne demek? Bir ornek, scala> var count = 0 cc: Int = 0 scala> val list = 0 to 10 list: scala.collection.immutable.Range.Inclusive = Range 0 to 10 scala> list.foreach(t => {      | count += 1      | println(s"sayac: $count")      | }) Buradaki sondaki foreach icerisinde bir fonksiyon tanimlamasi var. Ve bu tanimlama bir ustteki scope'da bulunan count degiskenine bir referans iceriyor. Bu fonksiyon bir Java nesnesi ve bu tanimlamanin yapildigi metot sonlandiktan sonra dahi calistirilma ihtimali var. Bu durumda nasil count degiskenine erisecek? Cevap, closure. Isim de oradan geliyor zaten, icteki fonksiyon, count degiskeni uzerine kapaniyor ve bir nevi sarmaliyor. Bu sayede count degiskeninin tanimlanmis oldugu metot sonlanmis dahi olsa, bu degisken closure icerisinde yasamaya devam ediyor.  Ciktiya ba

Spark #18 : Spark SQL

Resim
Spark ozelliklerini SQL'e benzeyen bir dil ile kullanabilmeyi saglar. Tum sql standardini suan desteklemese de ileride hedef olarak SQL92 standardina uygun olabilmek belirlenmis durumda. Bir ornek: benimData.createOrReplaceTable("sparkTable") sqlContext.sql("SELECT * FROM sparkTable WHERE baziStun == 'bazi deger'") Databricks'teki arkadaslar, spark'in paralel islem gucunu "Big Data" muhendislerinden baska insanlarin da kullanmasini arzu ettiklerini ve SQL adaptasyonuna bu acidan onem verdiklerini soylemisler .  Burada esas amac SQL standardindan da ziyade bir soyutlama katmani yaratmak ve kullaniciya declerative kod yazma imkani saglamak. RDD'ler ile ugrasirken bircok detaya ve islemlerin nasil yapilmasi gerektigine kafa yormak gerekiyor. Ancak SQL ile islerin `nasil` yapilacagini degil de sadece `ne` yapilmasi gerektigini tarif ediyoruz. Gerisi spark tarafindan optimize ediliyor. SparkSQL'e bir alternatif ise Hive'dir. Kisa

Spark #17 : Spark UI

Resim
Bana gore spark ile verimli joblar yazmanin anahtari problemleri iyi analiz etmek ve gerekli optimizasyonlari yapabilmek. Olcemedigin bir seyi iyilestiremezsin derler. Tam bu noktada SparkUI cok degerli bilgiler sunarak calismakta olan Spark job'unu derinlemesine analiz etmemize olanak sagliyor.  Herhangi bir spark driver program ayaga kalktiginda, basit bit web server calistirarak mevcut program hakinda bilgiler verecektir. Test etmek icin spark-shell'i acip, localhost:4040 adresine gidiyoruz.  Jobs Default aktif tab jobs tabidir. Her bir action, yeni bir spark job anlamina gelir. Cunku daha once de bahsetmistik, her bir action, tum execution dag'inin basta calistirilmasini gerektirir. Bu yuzden listelenen job'lar uzerinde de aciklama olarak cagrilan action ve hangi satirda oldugu gorulebilir.  Tekrar spark-shell'e donup, bir RDD olsuturp bir action calistiralim: > val nums = sc.parallelize(1 to 1000000) > nums.count Bunu yaptiktan sonra SparkUI sekmesinde re

Spark #16 : Cluster Manager

Resim
Nedir? Cluster manager bir dagitik kernel olarak dusunulebilir. Nasil ki esas kernel tek bir makinede calisan process'leri yonetiyor ve ihtiyaclari olan resource'lari pay ediyor ise, cluster manager de tek bir makineden (daha dogrusu gek bir noktadan) dagitik halde bulunan makineler uzerindeki resourclarin pay edilmeisni saglar.  Neler var? Spark'in kendine ait bir stand-alone cluster manageri mevut. Daha sonra Hadoop'tan YARN geliyor (yet another resource negotiatior). Apache Mesos, biraz daha esnek oldugu soylenmekle beraber YARN'a bir alternatif olarak dusunulebilir.  Spark standalone Eger basit bir manager istiyor isek bunu secebiliyoruz.  --master spark://[HOST]:7077/ seklinde cluster manager'i belirtmemiz gerekiyor. hem client hem de cluster metotlarini destekliyor. default olarak spreadOut ozelligi true olarak geliyor.  spark.deploy.spreadOut=true bu da su demek. Mumkun oldugunca isi dagitmaya calisacak spark cluster manager. Yani her biri 3 core barindir

Spark #15 : spark-submit

Resim
Bir spark job'u spark-submit ile cluster'a gonderildiginde, bir driver program olsuturulur ve onun main metodu calistirilir. spark-submit aslinda spark'in kok dizininde bulunan bin klasordunde yasayan bir scriptciktir.  Driver programi calistiran process spark-submiti calistiran makinede host edilebilecegi gibi (client mode), cluster uzerindeki bu spark job icin ayrilmis olan master node uzerinde de calisabilir (cluster mode).  Submit gerceklestikten sonra, driver program, cluster manager'den gerekli resource'lari ister. (kac cpu olacak ne kadar RAM gerekli gibi). Gerekli kaynaklar mevcut ise cluster manager bunlari ayaga kaldirmaya baslar. Daha sonra da driver program'in main metodu calismaya baslar. RDD'ler lazy bir sekilde olusturulur ta ki bir action'a denk gelene kadar. Action, o ana kadar olusturulan execurion dag'in gercekten calistirilmasini saglar. Dag execute edilip de eger geri dondurulecek bir deger var ise (mesela count gibi) driver prog

Spark #14 : Akumulator

Resim
Meslea spark job'u esnasinda ortaya cikan hatalarin sayisini bir sekilde tutmak istiyoruz. Spark, cluster uzerindeki tum node'lardan erisilebilen, guncellenebilen ama deger okumasi sadece driver program uzerinde gerceklestirilebilen bir veri yapisi sunuyor: Accumulator.  Bir ornekle durumu aciklayalim: scala> val acc = sc.longAccumulator("benim akum") scala> val rdd = sc.parallelize(Seq(1,2,3,4,5)) scala> cc1.foreach(x => {             | println(x)             | acc.add(1)             | }) scala> acc.value res18: Long = 5 Seklinde, her adimda acc degerini 1 artiriyoruz. Normalde bakinca cok kolay bir islemmis gibi gelebilir ama unutmayin ki buradaki foreach icerisindeki anonim fonksiyon cluster uzerine dagitiliyor ve farkli makinelerde calisiyor olabilir. Bu da demek oluyor ki, acc isimli akumulatoru biz farkli makinelerden guncelliyoruz. Ve son olarak da driver programda degerini okuyoruz. Neresinden bakarsan cok kral bir ozellik bu kadar guzel impleme

Spark #13 : Keşleme

Resim
Spark'in en buyuk avantajlarindan birisi olan ara datalari RAM'de saklamak ve Hadoop'a karsi 100x'e kadar bir performans farki saglamak.  Diyelim ki soyle bir RDD'im var ve bu RDD uzerinde maliyetli islemler yapiyorum. Bunu simule edebilmek icin Thread.sleep kullancagiz.  s cala> val cc = sc.parallelize(Seq(1,2,3,4,5)) scala> val cc1 = cc.map(x => {          | Thread.sleep(10000)          | x*x          | }) Bu asamada cc1.collect calistirirsaniz yaklasik 10 saniye surecektir (eger enaz 5 core'unuz var ise). Simdi de fazla maliyetli olmayan baska bir transformasyon tanimlayalim ve drawdan bir action calistiralim ki dag execute edilsin: scala> cc1.map(x=>x+1).collect Simdi bunun da 10 saniye civari surdugunu gormekteyiz. Neden? Cunku her bir action, tum execution planin tekrar calistirilmasini gerektirir. Eger ki cache'leme yapilmamis ise. Bu durumda cc1 RDD'sine cache ekliyorum.  scala> cc1.cache() Ve cache'in calisabilmesi icin cc1

Spark #12 : Kay-value metodlari

Resim
Evet pekcok transformasyonu gorduk, bircok aksyionu tanidik. Ancak bazi durumlarda islemleri optimize etmek icin kay-value pair'lerinden faydalanabiliyoruz.  Pair RDD'ler uzerinde uygulanabilecek ekstra metotlar su sekildedir: collectAsMap Normal siradan collect gibi ama key-val'lerden bir map (dictionary) olusturuyor. mapValues / flatMapValues sadece value tarafindan islem goren map fonksyonu. reduceByKey Ayni key'e sahip elemanlar uzerinde bir reduce operasyonu calistiriliyor. Taa ilk bastaki word count orneginde bu metot ile ayni kelimeden metin icerisinde kac tane gectigini hesaplamistik. Tekrardan bir ornek yaparsak, Ayni key'e sahip olan elemanlar tek bir partition'a sigmiyor olabilir. Hatta ayni makine uzerinde bile olmayabilir. Bu durumda bir all-to-all operasyon ile data shuffle edilir ve ayni key'e sahip elemanlar tum partitionlar'dan okunarak reduce edilir.  foldByKey / aggregateByKey fold ve aggregate metotlari gibi olsa da key ile gruplanm

Spark #11 : Örtük dönüştürmeler

Resim
Bugun Scala implicit conversions'dan bahsediyoruz. Mesela ki, bir sekilde, iki integer toplamak istedigimde soyle bisey yapmak istiyorum: scala> 1.plus(1) <console>:24: error: value plus is not a member of Int        1.plus(1)       ^ Tabi boyle bisey yok. O zaman bir case class tanimliyorum: scala> case class IntExtensions(value: Int){          |     def plus(operand: Int) = value + operand          | } defined class IntExtensions ve daha sonra: scala> IntExtensions(1).plus(1) res8: Int = 2 Bu sekilde calistirabilirim. Ama bu cok cirkin ve cok uzun. Her seferinde IntExtensions class'ini kullanmak istemiyorum. O zaman scala'nin implicit conversions guzelligini sahneye davet ediyorum: scala> import scala.language.implicitConversions import scala.language.implicitConversions ve daha sonra implicit ve fonskyion tanimliyorum ki her bir integer'e, IntExtensions muamelesi yapabileyim: scala> implicit def intToIntExtensions(value: Int) = {         | IntExt

Paper kosesi #1 : RDD Paper

Resim
Daha once spark paper 'den bahsetmistik. Ama daha pratik ve saniyorum ki daha az tatava icermesi ihtimaline binaen bu paper ile okumalara basliyoruz.  Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing Ozet gec chip (abstract) RDD, programcilara cluster uzerinde datigit olarak in-memory islem yapabilmeyi saglana bir soyutlama katmanidir. Bu yapiyi olusturmaktaki esas motivasyon, suanki uygulamalarin (Hadoop'u kastediyor) inefficient olarak handle ettigi iteratif uygulamalar ve interactive data-mining . Cunku iki durum icin de, datayi hafizada tutmak (Hadoop, bunun aksine her step'i diske yaziyordu), performansi kat be kat artirmaktadir.   Fault tolerance'i elde edebilmek icin, RDD'ler cok detayli state guncellemeleri yerine, kaba (coarse grained) transformasyonlara dayanan cok kisitli shared-memory sunar. (dediklerin iyi gibi ama cok da anlamadik). Buna ragmen, RDD'lerin destekledigi transformasyonlar cok genis yelpaza

Spark 10 : Action

Resim
Macera, aksiyon ve tansiyon... hepsi bu postta.       T ransformasyonlarin aksine, aksiyonlar lazy degildir. Bir aksyion cagrildigi an, tum hesaplamalar yapilir ve sonuc node'lardan driver programa aktarilir. Eger bir metot RDD dondurmuyor ise, aksyondur. Mesela count, integer dondurur.  Bazi aksyionlar onceliler node'larda her bir partition uzerinde calistirilarak daha sonra driver'a yonlenirler. Bu sayede mumkun oldugunca shuffle edilecek data azaltilmis olur. Mesela count'ta, oncelikler her bir partition icin count hesaplanarak sadece bu int degeri shuffle edilir. Yoksa tum data shuffle edilip driver uzerinde bunlarin count'ini almak en hafif tabirle lazlik olacaktir.  Collect En bodoslama action bu olsa gerek. Diyor ki tum distributed datayi topla bana (driver program) getir. Eger burada cok fazla data varsa driver program out of memory diye bagirarak patlatacaktir. Bir dataset'in distributed olarak tutulmasinda bir sebep olsa gerek degil mi? mesele tek bir

Spark 9 : Transformations-2

 Baska bir transformations kosesinde daha birlikteyiz.  mapPartitions Bizim datamiz node'lar uzerinde cluster'da dagitik durumdadir. Bunun uzerinde bir map islemi yapar isek, her bir node, kendi uzerindeki data icin her bir elemana verilen transformasyonu uygulayacaktir. Ama mesela bir database baglantisi yapilmasi gerekiyor ise bu durumda her bir item icin yeni bir database baglantisi gerekecektir. Bu da performans acisindan problem dogurur. Ama mapPartitions kullanarak, verilen bir transformasyona o partition'daki tun datayi bir seferde verebiliriz.  Cok uzattim ama boylelikle her bir partition icin sadece tek bir database baglantisi olusturma imkani yakalariz.  > sc.parallelize(1 to 100) res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25 > scala> res0.partitions.size res1: Int = 6 scala> res0.mapPartitions(partition => {      // partition oncesi hazirlik      partition.map(item => /// biseyler yap)     //