Spark Dataset ile typed bir gelecek

Hersey RDD ile basliyor. Kendisi type-safe. Ama tum execution dag bir kara kutu gibi, optimizasyon yazana kalmis durumda, low level bir API.

Daha sonra Dataframe geliyor. Tum execution dag optimize edilebiliyor. Sql'e yakin ve bu durumda type safety kaybediliyor, hersey Row adi verilen jenerik bir class uzerinden donuyor. 

Son olarak da Sql optimizasyonlari ve typesafety birleserek Dataset'i olusturuyor. 

Hatta aslinda Spark 2.0'dan sonra Dataframe ve Dataset birlestirilmis durumda. Yani Dataframe, Dataset'in ozel bir hali gibi. Yani aslinda Dataframe = Dataset[Row].

Dataset Actions
Dataframe uzerinde tanimlanan collect, take, foreach gibi action metodlari Row nesnesi collection'u uzerinden calisiyor. Ornek olarak bir df uzerinde collect yaparsak geriye Row listesi donecektir. Bunu da tekrardan parse edip kullanmamiz gerekir. Oyse ki Dataset, olusturuldugu class ne ise Dataset[BizimClass] o class tipinen bir kolleksion dondurecektir. Bu durumda tekrar parse etmeye gerek kalmayacak. 

Hatta spark 2.0'dan itibaren toLocalIterator adinda bir action eklenmis. Bu da collect gibi ama onun yerine bir iterator donduruyor. Iterator'ler lazy bir yapiya sahip oldugu icin collect kadar hafizada yer kaplamiyor. En fazla, datadaki en buyuk partition kadar yere ihtiyacinizi oluyor. Ama diger taraftan da birkac farkli spark job olusturdugu icin performans acisindan sikintilari var. Aralarinda his bir trade-off var.

Functional vs Relational
Dataset typed oldugu icin bazi functional transformasyonlar da getirilmis durumda. Ornegin filter vs where vakasini inceleyelim. 

val dsKullanicilar = spar.read.json("hdfs://root/kullanicilar.json").as[Kullanici]
val istanbulKullanicilar = dsKullanicilar.filter(k => k.sehir == "istanbul")

Burada type safety'nin imkanlanrindan faydalandigimizi goruyoruz. Filter metodu izerisindeki fonksiyon bilgimizi bir scala lambdasi. Ama burada kaybettigimiz bir sey var. Bu fonksyion sadece bir Kullanici class'i kabul edip geriye bir boolean donduruyor ki, ilgili row sonuca dahil edilsin mi edilmesin mi seklinde. Bunun disinda spark, bu kod ile ilgili hicbirsey bilmiyor. Yani UDF gibi bir blackbox. En basitinden bir predicate-pushdown yapilamiyor. Yani tum optimizasyonu kaybediyoruz. Ornek olarak spark tum satiri okumak zorunda kaliyor, tum stunlarla birlikte. Cunku hangisinin gerekli oldugunu bilemiyor. 

Bunu yerinde, relational bir action olan where 'e gozatalim:

val dsKullanicilar = spar.read.json("hdfs://root/kullanicilar.json").as[Kullanici]
val istanbulKullanicilar = dsKullanicilar.where($"sehir" === "istanbul")

Burada type safety'i kaybettik. Ama spark simdi ne yapmak istedigimizi biliyor ve istedigi sekilde bu query'i optimize edebiliyor. Bu gibi durumlara dikkat etmek gerekiyor.

Aggregate
Dataset ile aggregate'leri de bir yere kadar type-safe yapabiliyoruz. Bir yere kadar diyorum cunku column typle'lari korunsa da, aggregate sonucu olusacak olan schema icin bir type tanimlanmiyor, Dataset[(String, Int ...)] gibi bir tuple Dataset olusuyor. Bunu da tekrar isterseniz .as[AggregateTip] yaparak bir tipe donusturebilirsiniz. Kucuk bir ornek:
case class Insan(ad: String, yas:Int)
case class InsanAverage(ad: String, ortYas:Int)

val ds = Seq(Insan("joe biden", 75)).toDS

val dsAvg = ds.groupByKey(x => x.ad).agg(
typed.avg[InsanAverage](_.ortYas).as("ortYas").as[Double]
)
Ornegi salladim cunku Joe Biden kac yasinda bilmiyorum. Ama bir problem var ki, bu agg metodu tuple Dataset dondurdugu icin en fazla 4 aggregate yapilmasina izin veriyor. Bunun yerine mapGroups kullanilabilir mi bir bakalim.

Joe biden'larin yaslari toplami ve max yasini aggregate etmek istersek:
val dsAvgg = ds.groupByKey(_.ad).mapGroups((key, values) => (
key,
values.map(_.yas).sum,
values.map(_.yas).max
))
Burada istedigimiz kadar aggregate dondurebiliriz karisan yok. Ancak yine bir problem var. Buradaki bir kara-kutu lambda fonksiyonu ve catalyst bunu optimize edemiyor. 
Vee donup dolasip yine columnar metotlara geri donuyoruz. Typed olan groupByKey yerine, columnar olan groupBy kullanir isek, optimizasyonlardan faydalanabiliyoruz ve istedigimiz sayida aggregation yapabiliyoruz. 
val dsAg = ds.groupBy($"ad").agg(
sum($"yas").as("YasToplami"),
avg($"yas").as("YasOrt"),
max($"yas").as("MaxYas")
)
Bunun donus tipi de yine good-old Dataframe oluyor. 
Demek ki Dataframe'in columnar metotlari performans acisindan daha garantili ve de gayet esnek. Ancak Dataset fonksiyonel transformasyonlari, UDF'lerden daha okunakli kod uretecekse kullanilmali. Cunku zaten UDF bir kara kutu ve optimizasyon yok. Bu durumda Dataset fonk. okunabilirligi artirabilir. Ancak  columnar operasyonlar ile yapilabilecek bir isiniz var ise (yani Spark sql fonksyinonlari ile), UDF yazmak veya Dataset'in fonksyionel transformasyonlarini kullanmak, performans goturusu saglayabilir. Uzzak durunuz. 
Encoder Davasi
Dataset'in type saglayabilmesi, bu Encoder adi verilen mekanizma sayesinde olmaktadir. Normalde, yani type bilgisi gerekmedigi zamanlarda Dataframe ve de Dataset ayni sekilde, binary bir formatta hafizada tutuluyor. Ama Dataset uzerinde bir map yapmak istedigimizde ornegin, encoder devreye girerek bu binary verileri okuyor ve istenilen type'a ceviriyor. Biz de typed bir sekilde uzerinde islemler yapabiliyoruz. Daha sonra encoder tersine calisarak veriler Catalyst'in optimize edebilecegi binary formata ceviriyor (tam adi Tungsten binary format). 
Her bir scala tipi icin tabi ki encoder'a sahip olmak gerekiyor. Cogu primitive type encoder'ine import spark.implicits._ seklinde ulasabiliriz.
Burada benim de aklimda olan bir soru ise, bu encoding-decoding asamasi bir performans kaybi yasatir mi? Bir kere encoder'lar cok hizli calisiyor, runtime code-generation ile searialization ve desearialization icin custom bytecode uretiyor. Bunun yaninda, bir data Tungsten binary format'a encode edildigi zaman 2x daha az hafizada yer kapliyor. Bu da network transferlerinde buyuk avantaj demek. Avantaj ama RDD uzerinde bir avantaj yoksa Dataframe de zaten Tungsten binary formatinda sakliyor datasini ve de encoder kullanmiyor. Burasi arastirmaya deger, bir kapisma gerekiyor Dataframe vs Dataset... 
Join
Iki typed Dataset'i joinledigimizi zaman Dataset[(SolTip, SagTip)] seklinde bir sonuc elde ediliyor. Yine bir ornekle durumu susleyelim:
case class Calisan(ad:String, gorev: Int)
case class Gorev(ad: String, kod: Int)

val calisanlar = Seq(Calisan("joe", 0), Calisan("biden", 1), Calisan("barack", 2)).toDS

val gorevler = Seq(Gorev("baskan", 0), Gorev("yasli", 1), Gorev("emekli", 2)).toDS

val calGor = calisanlar.joinWith(gorevler, calisanlar("gorev") === gorevler("kod"))

calGor.show
+-----------+-----------+
| _1| _2|
+-----------+-----------+
| [joe, 0]|[baskan, 0]|
| [biden, 1]| [yasli, 1]|
|[barack, 2]|[emekli, 2]|
+-----------+-----------+
Default olarak inner_join calistiriyor ama ucuncu parametre olarak yine join tipini belirtebiliyoruz. Bizim ornekte yok ama, outer join yaptigimizda eger eslesme yoksa, iki taraftan birisi null olacaktir. Dataset camiasinda ise null bir problem olabilir. Cunku siz bunu alip uzerinde fonksyionel bir transformasyon yapabilirsiniz ve gelen deger null ise runtime'da patlayacagiz demektir. Bu durumdan kurtulmak icin bu alanlari birer Option ile sarmalayabiliriz ancak bu da performans goturusu saglayacaktir deniliyor (ben test etmedim bilmiyorum). Sadece bu konuya dikkat etmek gerekiyor. 
Kisisel olarak Dataset'in getirisinin goturusunu kurtarmadigini dusunmeye basladim. Haki bokunu odemiyor diye bir laf var bizim oralarda. Dataset icin kullanilabilir sanki. 
Biraz eski bir post olsa da databricks'in Dataset yazisini okuyabilirsiniz. 

Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding