Spark Basit Join

En buyuk problemlerden bir tanesi. Ozellikle performans konusunda ciddi bottleneck olusturabiliyor. Yapabilecegimiz en iyi sey, Catalyst Reyizi en iyi sekilde kullanip, maksimum optimizasyon saglayabilmek. Soyle uc bir ornek verelim: Iki tabloyu join etmek icin bir join kriterine ihtiyacimiz var oyle degil mi? Bu bir esitlik olabilir, bir ID uzerinden join etmek icin. Bu esitlik operatorunu bir UDF'e kaydirabiliriz, hayet legit. 

scala> val esitUDF = udf((id1: Long, id2: Long) => id1 == id2)
scala> val joinedDF = df1.join(df2, esitUDF($"id1", $"id2"))

Ancak buradaki UDF bir blackbox, yani catalyst buradaki imperative kod hakkinda bir optimizasyon gerceklestiremedigi gibi, bunu diger execution plan'daki adimlarla da kaynastiramiyor. Bu durumda join islemi bir kartezyen karsilastirma yapmak durumunda kaliyor ve sonuc olarak n2 complexity elde etmis oluyoruz. Ancak biz bu karsilastirmayi efendi gibi SparkSQL ile yazsa idik,

scala> val joinedDF = df1.join(df2, $"id1" === $"id2")

Bu durumda catalyst optimzier burada devreye girip bir SortMergeJoin patlatir. Bu da nlogn seklinde cok daha sekilli ve seksi bir complexity demek oluyor. 

Cikarim sun: mumkun oldugunca built-in fonksyionlari kullanin, element uydurmayin. 

Bir tip
Yukaridaki joini yazarken eger column isimleri ayni ise, $"id1" === $"id1" yazmak hata verecektir. Bunun yerine 

scala> df1.join(df2, df1("id") === df2("id")) 

seklinde bir ifade kullanialbilir. Hatta daha da guzel olmasi acisindan, tek bir hamlede:

scala> df1.join(df2, Seq("id"))

kullanilabilir. Ayrica bunun bir diger guzelligi de sonuc df'in sadece tek bir id stunu icermesi. Hatta eger ki sadece ID uzerinden bir join yapilacaksa daha da kisaltip, Seq tanimlamasini da kaldirabiliriz.

scala> df1.join(df2, "id")

Sql join turu
Ucuncu argumanda da join turunu belirtebiliyouz. Default tur, Inner joindir.

inner: sadece join kriterini saglayan satirlari sonuca ekler. ornek olarak yukaridaki ID'ler tutmazsa bu satir sonucta yer almayacaktir.

left_outer: Yani soldaki tablodaki tum satirlari sonuca dahil et, eslesme olmasa dahi. Bu durumda sag tabldan gelen deger olarak null olacaktir. Ama sagdaki eslesme olmayanlari elbette sonuca dahil etmeyecek.

right_outer: Sagdaki tablodaki tum satirlari sonuca dahil et. Bu durumda soldaki stundan gelen degerler eslesme yok ise null olacaktir.

full_outer: Hem soldaki hem de sagdaki tum satirlar sonuc dataframe'de yer alacaktir. 

left_anti: Sadece sol tablodan gelip de sag tabloda bir eslesme bulunamayan satirlari donduru. 

cross: Kartezyen bir carpma islemi yapilir. Yani soldaki bir satir ile sagdaki tum satirlar, eslestirilir. Yani eger solda 3 satir sagda da 4 satir var ise sonucta 12 satirlik bir dataframe elde edilir.

Execution Plan
Simdi tabi yukaridaki join turleri bizim sonuc tablonun nasil olmasini istedigimizi tarif ediyor. Iste id uzerinden bir esitlik ile full_outer join olsun, her iki tablodaki stunlar da korunsun gibi. Peki gercekte bu islem nasil yapilacak? Iki dataframe'in de partitionlari nasil bir arya gelecek de sonuc tabloyu ortaya cikartacak? Buna spark karar veriyor. Bu noktada 3 onemli join tipi mevcut.

SortMergeJoin
Yukarida da bahsettik. sparkin default join turu. Eger broadcast join yapilamayacak kadar buyuk tablolariniz var ise bu kullanilir. Buarada iki tablo da verilen key uzerinden sort edilmis olmalidir. Eger sortlu degilse edilir. Daha sonra da bu iki tablo fermuarin iki yakasi gibi joinlenir. 

Broadcast Join
MergeSortJoin guzel ama join edilecek iki dataframein ilgili partitionlarinin ayni makinede olacaginin garantisi yok. Bu durumda data shuffle edilecek demektir. Bu noktada Broadcast join, genelde buyuk bir dataframe ile gorecek kucuk bir dataframe arasinda joinlerde kullanilir. Spark kucuk olan dataframe'i tum node'lara gonderir ve hafizada tutulur. Burada master node, worker'lara tek tek gondermek yerine bu datayi download eden worker kendisi de diger workerlara kopyalamaya basliyor. Peer-to-peer'e geciyorlar. Ve hatta burada Torrent protokolu kullaniliyor.


Bu durumda cok maliyetli olan shuffle islemine gerek kalmiyor. En performansli execution yontemi budur. Default olarak 10Mb'tan kucuk dataframe'ler icin spark bu tipi kullanir. Max is 8Gb olmasi gerekiyor. 

Kucuk tablo hashlenir. Buyuk tablo uzerinde tek bir iterasyon icerisinde her satir taranir ve verilen ID icin hashlenmis olan kucuk dataframe'den ilgili row bulunur ve join gerceklesmis olur.

Join islemlerine genel bir bakis attik. Buradaki Join stretejilerini tek tek ele alacagimiz, ornekleri ile performans incelemeleri yapip her birinin cigerini sokecegimiz 2021'deki guzel gunlerde gorusmek uzere. Iyi seneler.



Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding