Kayıtlar

bigdata etiketine sahip yayınlar gösteriliyor

Data Patterns #1 - Data Summarization Pattern

Resim
Problem Verilen bir DNA dizesinde (mesela "AATGC..."), her bir bazin (A, C, G, T) kac kere gectigini bulunuz. Ornek dna dizilim dosyasi: https://ftp.ncbi.nlm.nih.gov/genomes/INFLUENZA/influenza.fna (1.3 Gb) Cozum Yani aslinda kabaca, verilen text icerisinde gecen distinct karakterleri saymamizi istiyor. Cok basit (kolay demedik) ve temel bir problem.  Ornek bir veri dosyasi icin https://ftp.ncbi.nlm.nih.gov/genomes/INFLUENZA/influenza.fna (1.3 Gb text dosyasi) Input sekline bakacak olursak: >gi|58576|gb|X52226|Influenza A virus (A/FPV/Rostock/34(H7N1)) gene for neuraminidase, genomic RNA AGCAAAAGCAGGAGTTCAAAATGAATCCAAATCAGAAAATAATAACCATTGGGTCAATCTGTATGGGGAT CGGAATAATCAGCCTAATATTACAAATTGGAAACATAATCTCAATGTGGGTTAGTCATTCAATTCAGACT GAAAATCAAAATCACCATGAAGCATGCAACCCAAGCATTGCTGGACAGGATGCAGCTTCAGTGGCACTAG CAGGCAATTCCTCTCTTTGTCCCATTAGTGGGTGGGCTATATACAGTAAAGACAATGGTATAAGAATTGG .... En basta, buyuktur isareti ile baslayan bir yorum satiri var ki bu satiri gozardi etmemiz gerekiyor. Ge

Hadoop #1: Giris

Resim
Hadoop distributed computing frameworkune genel olarak bir goz atacagiz. Neler var? - HDFS - MapReduce -Yarn Big data da bana mi big? Facbook'un toplamda 300 Petabyte data store ettigi ve gunde 600 terabyte datayi isledigi soyleniyor. Her ay Facook'a 1 milyar kisi log-in oluyor. Her gun 2.7 milyar like veriliyor. Her gun 300 milyon resim Facebook'a yukleniyor.  NSA toplamda 5 exabyte data sakladigi soyleniyor. Gunde 30 petabyte data isledigi ve internet trafiginin %1.6 sini takip ettigi bunlara web aralamalari , ziyaret edilen siteler, telefon konusmalari, kredi karti ve hesap hareketleri finansal ve saglik bilgileri de dahil.  Babalarin babasi Goole ise toplamda 15 exabyte veri barindirdigi soyleniyor. Her gun 100 petabyte datayi isliyor. 60 trilyon web sayfasini indexinde barindiriyor. Her ay toplamda 1 milyardan fazla arama yapiliyor. Her saniye 2.3 milyon arama demek oluyor.  Bigdata sistem gereksinimleri - Storerage : Bigdata sistemlerinde cok fazla data olur (surpris

Spark Dataset ile typed bir gelecek

Resim
Hersey RDD ile basliyor. Kendisi type-safe. Ama tum execution dag bir kara kutu gibi, optimizasyon yazana kalmis durumda, low level bir API. Daha sonra Dataframe geliyor. Tum execution dag optimize edilebiliyor. Sql'e yakin ve bu durumda type safety kaybediliyor, hersey Row adi verilen jenerik bir class uzerinden donuyor.  Son olarak da Sql optimizasyonlari ve typesafety birleserek Dataset'i olusturuyor.  Hatta aslinda Spark 2.0'dan sonra Dataframe ve Dataset birlestirilmis durumda. Yani Dataframe, Dataset'in ozel bir hali gibi. Yani aslinda Dataframe = Dataset[Row] . Dataset Actions Dataframe uzerinde tanimlanan collect, take, foreach gibi action metodlari Row nesnesi collection'u uzerinden calisiyor. Ornek olarak bir df uzerinde collect yaparsak geriye Row listesi donecektir. Bunu da tekrardan parse edip kullanmamiz gerekir. Oyse ki Dataset, olusturuldugu class ne ise Dataset[BizimClass] o class tipinen bir kolleksion dondurecektir. Bu durumda tekrar parse etmeye

Spark Basit Join

Resim
En buyuk problemlerden bir tanesi. Ozellikle performans konusunda ciddi bottleneck olusturabiliyor. Yapabilecegimiz en iyi sey, Catalyst Reyizi en iyi sekilde kullanip, maksimum optimizasyon saglayabilmek. Soyle uc bir ornek verelim: Iki tabloyu join etmek icin bir join kriterine ihtiyacimiz var oyle degil mi? Bu bir esitlik olabilir, bir ID uzerinden join etmek icin. Bu esitlik operatorunu bir UDF'e kaydirabiliriz, hayet legit.  scala> val esitUDF = udf((id1: Long, id2: Long) => id1 == id2) scala> val joinedDF = df1.join(df2, esitUDF($"id1", $"id2")) Ancak buradaki UDF bir blackbox, yani catalyst buradaki imperative kod hakkinda bir optimizasyon gerceklestiremedigi gibi, bunu diger execution plan'daki adimlarla da kaynastiramiyor. Bu durumda join islemi bir kartezyen karsilastirma yapmak durumunda kaliyor ve sonuc olarak n2 complexity elde etmis oluyoruz. Ancak biz bu karsilastirmayi efendi gibi SparkSQL ile yazsa idik, scala> val joinedDF = df

Spark #20 : Dataframe

Resim
Spark DataFrame, Pandas dataframes ile kullanim acisindan benzerdir. Ancak tabi ki de altyapi cok farkli, spark distributed bir yapiya sahip iken, Pandas single core kullanabilmektedir. En nihayetinde de bir database table gibi dusunulebilir. Her satir bir kaydi temsil ederken bir veya daha fazla stuna sahip olabilir. Hemen basit bir ornek ile RDD ile farki gorelim. Diyelim ki elimizde isim - yas iceren bir buyuk veri var. Ayni isme sahip kisilerin yas ortalamasini bulmak istiyoruz. Ornek bir data tanimlayalim,  scala> val rddBireyler = sc.parallelize(Seq(("babur", 20), ("jamiryo", 12), ("babur", 56), ("babur", 44), ("jamiryo", 67))) dfBireyler: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[145] at parallelize at <console>:30 Yas ortalamasi almak icin her bir isimden kac tane oldugunu da bulmamiz gerek. Bu acidan kelime sayma yaklasiminda oldugu gibi her bir kaydi 1 ile baslatiyoruz ve daha sonra bir reduce i

Spark #19 : Closure mevzusu

Resim
Kafa karistiran konulardan birtanesi de spark icerisinde closure kullanimidir. Closure, en azindan bir adet bound variable iceren standalone fonksyonlara denir. Bu ne demek? Bir ornek, scala> var count = 0 cc: Int = 0 scala> val list = 0 to 10 list: scala.collection.immutable.Range.Inclusive = Range 0 to 10 scala> list.foreach(t => {      | count += 1      | println(s"sayac: $count")      | }) Buradaki sondaki foreach icerisinde bir fonksiyon tanimlamasi var. Ve bu tanimlama bir ustteki scope'da bulunan count degiskenine bir referans iceriyor. Bu fonksiyon bir Java nesnesi ve bu tanimlamanin yapildigi metot sonlandiktan sonra dahi calistirilma ihtimali var. Bu durumda nasil count degiskenine erisecek? Cevap, closure. Isim de oradan geliyor zaten, icteki fonksiyon, count degiskeni uzerine kapaniyor ve bir nevi sarmaliyor. Bu sayede count degiskeninin tanimlanmis oldugu metot sonlanmis dahi olsa, bu degisken closure icerisinde yasamaya devam ediyor.  Ciktiya ba

Spark #18 : Spark SQL

Resim
Spark ozelliklerini SQL'e benzeyen bir dil ile kullanabilmeyi saglar. Tum sql standardini suan desteklemese de ileride hedef olarak SQL92 standardina uygun olabilmek belirlenmis durumda. Bir ornek: benimData.createOrReplaceTable("sparkTable") sqlContext.sql("SELECT * FROM sparkTable WHERE baziStun == 'bazi deger'") Databricks'teki arkadaslar, spark'in paralel islem gucunu "Big Data" muhendislerinden baska insanlarin da kullanmasini arzu ettiklerini ve SQL adaptasyonuna bu acidan onem verdiklerini soylemisler .  Burada esas amac SQL standardindan da ziyade bir soyutlama katmani yaratmak ve kullaniciya declerative kod yazma imkani saglamak. RDD'ler ile ugrasirken bircok detaya ve islemlerin nasil yapilmasi gerektigine kafa yormak gerekiyor. Ancak SQL ile islerin `nasil` yapilacagini degil de sadece `ne` yapilmasi gerektigini tarif ediyoruz. Gerisi spark tarafindan optimize ediliyor. SparkSQL'e bir alternatif ise Hive'dir. Kisa

Spark #17 : Spark UI

Resim
Bana gore spark ile verimli joblar yazmanin anahtari problemleri iyi analiz etmek ve gerekli optimizasyonlari yapabilmek. Olcemedigin bir seyi iyilestiremezsin derler. Tam bu noktada SparkUI cok degerli bilgiler sunarak calismakta olan Spark job'unu derinlemesine analiz etmemize olanak sagliyor.  Herhangi bir spark driver program ayaga kalktiginda, basit bit web server calistirarak mevcut program hakinda bilgiler verecektir. Test etmek icin spark-shell'i acip, localhost:4040 adresine gidiyoruz.  Jobs Default aktif tab jobs tabidir. Her bir action, yeni bir spark job anlamina gelir. Cunku daha once de bahsetmistik, her bir action, tum execution dag'inin basta calistirilmasini gerektirir. Bu yuzden listelenen job'lar uzerinde de aciklama olarak cagrilan action ve hangi satirda oldugu gorulebilir.  Tekrar spark-shell'e donup, bir RDD olsuturp bir action calistiralim: > val nums = sc.parallelize(1 to 1000000) > nums.count Bunu yaptiktan sonra SparkUI sekmesinde re

Spark #16 : Cluster Manager

Resim
Nedir? Cluster manager bir dagitik kernel olarak dusunulebilir. Nasil ki esas kernel tek bir makinede calisan process'leri yonetiyor ve ihtiyaclari olan resource'lari pay ediyor ise, cluster manager de tek bir makineden (daha dogrusu gek bir noktadan) dagitik halde bulunan makineler uzerindeki resourclarin pay edilmeisni saglar.  Neler var? Spark'in kendine ait bir stand-alone cluster manageri mevut. Daha sonra Hadoop'tan YARN geliyor (yet another resource negotiatior). Apache Mesos, biraz daha esnek oldugu soylenmekle beraber YARN'a bir alternatif olarak dusunulebilir.  Spark standalone Eger basit bir manager istiyor isek bunu secebiliyoruz.  --master spark://[HOST]:7077/ seklinde cluster manager'i belirtmemiz gerekiyor. hem client hem de cluster metotlarini destekliyor. default olarak spreadOut ozelligi true olarak geliyor.  spark.deploy.spreadOut=true bu da su demek. Mumkun oldugunca isi dagitmaya calisacak spark cluster manager. Yani her biri 3 core barindir

Spark #15 : spark-submit

Resim
Bir spark job'u spark-submit ile cluster'a gonderildiginde, bir driver program olsuturulur ve onun main metodu calistirilir. spark-submit aslinda spark'in kok dizininde bulunan bin klasordunde yasayan bir scriptciktir.  Driver programi calistiran process spark-submiti calistiran makinede host edilebilecegi gibi (client mode), cluster uzerindeki bu spark job icin ayrilmis olan master node uzerinde de calisabilir (cluster mode).  Submit gerceklestikten sonra, driver program, cluster manager'den gerekli resource'lari ister. (kac cpu olacak ne kadar RAM gerekli gibi). Gerekli kaynaklar mevcut ise cluster manager bunlari ayaga kaldirmaya baslar. Daha sonra da driver program'in main metodu calismaya baslar. RDD'ler lazy bir sekilde olusturulur ta ki bir action'a denk gelene kadar. Action, o ana kadar olusturulan execurion dag'in gercekten calistirilmasini saglar. Dag execute edilip de eger geri dondurulecek bir deger var ise (mesela count gibi) driver prog

Spark #14 : Akumulator

Resim
Meslea spark job'u esnasinda ortaya cikan hatalarin sayisini bir sekilde tutmak istiyoruz. Spark, cluster uzerindeki tum node'lardan erisilebilen, guncellenebilen ama deger okumasi sadece driver program uzerinde gerceklestirilebilen bir veri yapisi sunuyor: Accumulator.  Bir ornekle durumu aciklayalim: scala> val acc = sc.longAccumulator("benim akum") scala> val rdd = sc.parallelize(Seq(1,2,3,4,5)) scala> cc1.foreach(x => {             | println(x)             | acc.add(1)             | }) scala> acc.value res18: Long = 5 Seklinde, her adimda acc degerini 1 artiriyoruz. Normalde bakinca cok kolay bir islemmis gibi gelebilir ama unutmayin ki buradaki foreach icerisindeki anonim fonksiyon cluster uzerine dagitiliyor ve farkli makinelerde calisiyor olabilir. Bu da demek oluyor ki, acc isimli akumulatoru biz farkli makinelerden guncelliyoruz. Ve son olarak da driver programda degerini okuyoruz. Neresinden bakarsan cok kral bir ozellik bu kadar guzel impleme

Spark #13 : Keşleme

Resim
Spark'in en buyuk avantajlarindan birisi olan ara datalari RAM'de saklamak ve Hadoop'a karsi 100x'e kadar bir performans farki saglamak.  Diyelim ki soyle bir RDD'im var ve bu RDD uzerinde maliyetli islemler yapiyorum. Bunu simule edebilmek icin Thread.sleep kullancagiz.  s cala> val cc = sc.parallelize(Seq(1,2,3,4,5)) scala> val cc1 = cc.map(x => {          | Thread.sleep(10000)          | x*x          | }) Bu asamada cc1.collect calistirirsaniz yaklasik 10 saniye surecektir (eger enaz 5 core'unuz var ise). Simdi de fazla maliyetli olmayan baska bir transformasyon tanimlayalim ve drawdan bir action calistiralim ki dag execute edilsin: scala> cc1.map(x=>x+1).collect Simdi bunun da 10 saniye civari surdugunu gormekteyiz. Neden? Cunku her bir action, tum execution planin tekrar calistirilmasini gerektirir. Eger ki cache'leme yapilmamis ise. Bu durumda cc1 RDD'sine cache ekliyorum.  scala> cc1.cache() Ve cache'in calisabilmesi icin cc1

Spark #12 : Kay-value metodlari

Resim
Evet pekcok transformasyonu gorduk, bircok aksyionu tanidik. Ancak bazi durumlarda islemleri optimize etmek icin kay-value pair'lerinden faydalanabiliyoruz.  Pair RDD'ler uzerinde uygulanabilecek ekstra metotlar su sekildedir: collectAsMap Normal siradan collect gibi ama key-val'lerden bir map (dictionary) olusturuyor. mapValues / flatMapValues sadece value tarafindan islem goren map fonksyonu. reduceByKey Ayni key'e sahip elemanlar uzerinde bir reduce operasyonu calistiriliyor. Taa ilk bastaki word count orneginde bu metot ile ayni kelimeden metin icerisinde kac tane gectigini hesaplamistik. Tekrardan bir ornek yaparsak, Ayni key'e sahip olan elemanlar tek bir partition'a sigmiyor olabilir. Hatta ayni makine uzerinde bile olmayabilir. Bu durumda bir all-to-all operasyon ile data shuffle edilir ve ayni key'e sahip elemanlar tum partitionlar'dan okunarak reduce edilir.  foldByKey / aggregateByKey fold ve aggregate metotlari gibi olsa da key ile gruplanm