Kayıtlar

Spark 9 : Transformations-2

 Baska bir transformations kosesinde daha birlikteyiz.  mapPartitions Bizim datamiz node'lar uzerinde cluster'da dagitik durumdadir. Bunun uzerinde bir map islemi yapar isek, her bir node, kendi uzerindeki data icin her bir elemana verilen transformasyonu uygulayacaktir. Ama mesela bir database baglantisi yapilmasi gerekiyor ise bu durumda her bir item icin yeni bir database baglantisi gerekecektir. Bu da performans acisindan problem dogurur. Ama mapPartitions kullanarak, verilen bir transformasyona o partition'daki tun datayi bir seferde verebiliriz.  Cok uzattim ama boylelikle her bir partition icin sadece tek bir database baglantisi olusturma imkani yakalariz.  > sc.parallelize(1 to 100) res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25 > scala> res0.partitions.size res1: Int = 6 scala> res0.mapPartitions(partition => {      // partition oncesi hazirlik      partition.ma...

Spark 8 : Transformations

Resim
Ya hocam birak, var mi biseyler? Datayi yuklediysek transform edelim az biraz. Bir metodun transformer olup olmadigini anlamanin bir cakal yolu da signature'une bakmaktir. Eger o da bir RDD donduruyorsa, lazy bir sekilde, orjinal RDD uzerine uygulanacak islemlerin planini olusturan bir transformer metottan baskasi degildir.  Tatavami su sekilde birakarak olaya giriyorum. Map Adi uzerinde, belli bir degere karsilik baska bir deger "map" ediyoruz. Yani bir listeyi alip her bir eleman uzerinde bir islem uygulayarak (evet layarak) yeni bir liste olusturuyoruz.  > sc.parallelize(1 to 100).map(_ *2)  dersem anlasilir herhalde.  FlatMap Bu da oncekinin daha flat hali zheuehhas. Tamam tamam. Bu metod verilen transformasyonu once tum elemanlara uygular sonra da ortaya cikan elemanlarin tamamini birlestirip flat bir list yapar. Yani, > sc.textFile("file:///C:/deneme.txt").flatMap(satir => satir.split(" ")) seklinde yaparsak, oncelike bir text dosyasi ok...

Spark #7 : Data okuma

Resim
Yok be olum, bu sekilde degil. Datayi arap faikten aliyorduk Spark bircok farkli mecradan data okuyabilmektedir. Normal file systems (lokal makine veya uzak makine), Amazon S3 ve HDFS gibi distributed file systems, database, distributed database ve hatta direk olarak memory'den de data yuklenebilmektedir. Spark, datanin  cigerini sokmektedir. Ayrid dosya formati olarak da, text, csv gibi plain formatlar oldugu gibi, parquet (dassagini yedigimiz) ve avro gibi de daha gelismis formatlar okunabilir.   Tatavamizi kenara birakip shell'imizi aciyoruz: > sc.parallelize(1 to 100) > res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25 Goruldugu gibi 1'den 100'e kadar olan sayilari barindiran bir listeyi RDD icerisine yukledik. Burada bir kucuk detay da spark-shell bu olusturulan RDD'yi biz bir degiskene atamasak da, res0 adli degiskene atadi. Buna daha sonra atifta bulunabiliriz: > res0.collect > res1: Array[Int] = A...

Spark #6 : RDD nedir?

Resim
clusterda datalar yeniden dagitiliyor (shuffle) Resilient Distributed Dataset Bir clusterda node'lar uzerinde dagitilmis halde bulunan ve uzerinde paralelde islemler yapilabilen bir koleksiyondur. Spark dokumantasyonu boyle tanimliyor RDD'yi. Yani aslinda bir program yazarken normal , tek bir makinenin hafizasinda bulunan bir kolleksiyon gibi islemler yapabiliyoruz (mesele .map) ama aslinda bu veriyapisi birden fazla makinenin hafizasinda dagitik olarak bulunuyor. Bir parcasi orada bir parcasi burada falan gibi. Hatta parcalarda replikasyon var ki fault tolerant olsun. Iste bur bir soyutlama aslinda. Spark api'si tasarlanirken bu dagitik kolleksiyon uzerinde islem yapmayi sanki normal bir kolleksiyonmus gibi developer'dan sakliyor. Ama burada da bir rist var. Bazi yapilan islemler tum node'lardaki datanin birbiriyle senkronize edilmesini ya da yeniden dagitilmasini (shuffle) gerektiriyor. Eger bunlara dikkat etmezsek, cok ciddi performans kayiplari yasanabiliyor.  T...

Spark #5 : Spark uygulamasi

Resim
 Konsole guzel, konsol pratik, interaktif. Ben de ad-hoc queryler icin kullaniyorum ya da experimentation amacli. Ama misal ETL joblari yazmak icin girdili ciktili (I/O) uygulama yazip derleyip ortaya koymamiz lazim.  Tum spark uygulamari driver program adi verilen tek bir noktadan yonetilir. Driver, gerekli islemleri diger makinelere dagitmak gibi bir gorevi vardir. Bizim meshur sparkContext de bu driver program icerisinde calismaktadir. Driverin gorevleri arasinda: 1. Task'larin olusturulmasi: Execution planin olusturulmasi ve tasklarin worker node'lara gonderilmesi 2. Schedule edilmesi (ya da ettirilmesi) 3. Data locality'nin gozetilmesi 4. Fault tolerance:  Buradaki Cluster Manager spark'in kendi cluster manageri olabilecegi gibi YARN veya Mesos da olabilir. Ben genelde YARN ile calisildigini gordum. Bu cluster manager sayesinde bizim driver program gerekli resourcelari cluster'dan talep eder. Driver program, cluster manager'e baglandiktan sonra clusterda bu...

Airflow ile remote directory mount etme

 Uzak bir makinede dockerize olarak calisan bir airflow instance'im var. Bu airflow instance'i da baska bir remote machine'deki bir directory'e erisip, okuyup yazmasi icab ediyor. Aslinda gecen bir postta bahsetmistim. Dosyalari kopyalamak cok zahmetli onun icin mount etmemiz daha mantikli.  Esasinda isin ozu su ki, eger uzak makinedeki directory zaten mount edilmisse bisey yapma. Eger degilse bunu mount et. Burada 3rd party bir uygulama olan sshf kullandim ki onceki bir yazida bahsetmistim. Bunu kurmak kolay, ve de parolayi da bir cakallik ile buna aktarabiliyoruz.  Esas olay bir bash script'ini uzak makineye gonderip orada calistirmak: bash -c "if mountpoint -q -- '{lokaldeki_mount_noktasi}'; then \     printf 'maprfs is already mounted'; \ else \     echo '{password}' | sshfs -o password_stdin -o allow_other kullanici@uzak_makine:klasor_to_mount { lokaldeki_mount_noktasi }; fi" Bu komutu SSHOperator ile host makineye gonderiyorum...

Spark #4 : Hello world

Resim
Bu devirde basit dusunebilmek zor is * Simdi spark-shell ile interactive repl'a gectigimiz zaman, bizim icin 2 sey olusturuluyor: - Spark context available as 'sc' (master = local[*], app id = local-1606910359074). - Spark session available as 'spark'. Buradaki spark bir SparkSession instance'i olup daha cok SparkSql tarafindan kullanilmaktadir.  sc ise SparkContext. Tum spark uygulamalari bir spark context olusturularak baslatilir. baska turlu olmaz. spark uygulamalari aslinda dagitik uygulamalardir ve bu dagitik uygulamayi manage edebilmek icin bir entry point olmasi lazim. yani bize bir muhattap lazim ki getir goturu yapsin.  Ilk olarak bir text dosyasi okutarak baslayalim, val textFile = sc.textFile("file:///spark/README.md") burada kolaylik olsun diye spark kurulumu yapilan dir'de spark-shell calistirdim ve halihazirda bulunan README.md dosyasini okutuyorum.  Bunu calistirir isek, bize direkman bir RDD olusturdugunu (resillient distributed dat...