Spark #20 : Dataframe


Spark DataFrame, Pandas dataframes ile kullanim acisindan benzerdir. Ancak tabi ki de altyapi cok farkli, spark distributed bir yapiya sahip iken, Pandas single core kullanabilmektedir. En nihayetinde de bir database table gibi dusunulebilir. Her satir bir kaydi temsil ederken bir veya daha fazla stuna sahip olabilir.

Hemen basit bir ornek ile RDD ile farki gorelim. Diyelim ki elimizde isim - yas iceren bir buyuk veri var. Ayni isme sahip kisilerin yas ortalamasini bulmak istiyoruz.

Ornek bir data tanimlayalim, 

scala> val rddBireyler = sc.parallelize(Seq(("babur", 20), ("jamiryo", 12), ("babur", 56), ("babur", 44), ("jamiryo", 67)))
dfBireyler: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[145] at parallelize at <console>:30

Yas ortalamasi almak icin her bir isimden kac tane oldugunu da bulmamiz gerek. Bu acidan kelime sayma yaklasiminda oldugu gibi her bir kaydi 1 ile baslatiyoruz ve daha sonra bir reduce islemi uyguluyoruz.

scala> val rddBireysCounts = rddBireyler.map((isim, yas) => (isim, (yas, 1)))
scala> val rddBireysTotals = rddBireysCounts.reduceByKey((x, y) => (x._1 + y._1, x._2 + y._2))
res57: Array[(String, (Int, Int))] = Array((babur,(120,3)), (jamiryo,(79,2)))

Simdi her bir isim icin toplam yas ve toplam sayiyi bulduktan sonra, bolme islemi kaliyor geriye:

scala> val rddAvg = rddBireysTotals.map(x => (x._1, x._2._1 / x._2._2))
res59: Array[(String, Int)] = Array((babur,40), (jamiryo,39))

Bu sekilde istedigimiz sonucu elde ettik. Baya bir low level ve imprerative bir yontem. Hataya son derece acik. 

Simdi ufak bir hareket ile input rdd'yi dataframe'e cevirelim. 

scala> import spark.sqlContext
scala> import sqlContext.implicits._
scala> dfBireys = rddBireys.toDF("isim", "yas")

Ve daha sonra ortalamayi alalim.

scala> dfBireys.groupBy($"isim").agg(avg($"yas")).show

Seklinde sedece tek satir ve gayet declerative bir sekilde problemimizi cozduk. Zaten sql de dekleratif bir dil degil midiri eyyy insanlar! Ve biliyoruz ki hatta direk SQL de yazabilirdik,

scala> dfBireys.createOrReplaceTempView("Bireys")
scala> sql("SELECT isim, avg(yas) FROM Bireys GROUP BY isim").show

Bir motto der ki, BigData'yi en hizli islemenin yolu onu okumamaktir. Bu da demek oluyor ki gereksiz satir ve stunlarin diskten okunmamasi gerekir. SparkSQL bastan sona bir query icin gerekli olan stunlari bildigi icin, sadece bu stunlar diskten okunabilir. Ozellikle parquet gibi columnar bir dosyadan okunuyorsa ciddi performans getirisi saglanabilir. Ama bu parquet ~ spark ikilisine baska bir postta deginecegim. 

Simdilik bu kadar, gerisini sparksql dokumantasyonunda var. 

To be dewamked...

Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding