Spark jobum nasil calisiyor?

 Spark ogrenmek gercekten cok kolay. Esas hazine spark ile paralelize edilmis bir computation'u inceleyip optimizasyon noktalarini bulup optimize edebilmek. Bazen pata kute yazdigimizi spark job'lari cok performanssiz ya da olabileceginden cok daha az performansli olabiliyor. Bunun icin gelin bir spark job'unun cigerini nasil sokebiliriz inceleyeim: Spark UI. Ever gorsellestirmeler bize cok yardimci olacak. 

Timeleline gosterimi
Tum joblar, tek bir job veya tek bir stage icin timeline view seklinde olan olaylari gorebiliriz. Bu sayede bircok icgoru elde edebiliriz. 

Bu gorsel sparkUI'in giris sayfasindan. Yani tum joblar bir timeline uzerinde gosteriliyor. Oncelikle 4 adet executor'in kayit edildidigini goruyoruz. Hemen sonra 4 farkli job paralel bir sekilde calismaya basliyor. Bir tanesi (kirmizi olan) fail olurken digerleri basariyla tamamlaniyor. Tum job'lar tamamlandiktan sonra bizim spark job sona eriyor ve executor'lar bizden alinarak tekrar clustera'a iade ediliyor ki diger  joblar da bunlari kullanabilsin.

Bir adim daha derine inerek buradaki job'lardan birtanesini detayli inceleyeim. Job detaylari sayfasinda su sekilde bir timeline ile karsilasiyoruz:

Bu ornekte, 3 farkli dosyadan word-count hesaplaniyor ve en sonunda bu sonuclar birlestiriliyor. Timeline'da da gorulecegi gibi uc dosya paralel bir sekilde okunarak word-count hesaplanabiliyor. Ancak en sondaki sonuclari birlestirme islemi bu uc dosyanin da hesaplanmasini gerektirdigi icin ensonda bekliyor. 

Bir adim daha derine inerek bir stage'i inceleyelim. 

Bu stage 4 makine uzerine dagilmis toplamda 20 partitiona sahip (hepsi gorulemiyor resimde). Buradaki her bir bar, stage icindeki bir task'i temsil ediyor. Buradan cikarabileceklerimiz:

1. Partitionlar makineler uzerinde gayet esit bir sekilde dagilmis. Bunun barlarin uzunlugunun hemen hemen ayni olmasina bakarak anliyoruz. Eger tek bir bar cok uzun olup digerleri cok kisa olsa idi bu durumda data-skew var diyebilirdik. Bu da paralelizmden yeterince faydalanmamizi engellerdi. 

2. Her bir task, zamaninin cogunu gercek hesaplama icin kullanmis (yesil kisimlar). Network I/O overhead'inin gorece az oldugunu (turuncu kisimlar) gorebiliyoruz. Bu da cok fazla data shuffle edilmedigini gosteriyor ki performans acisindan guzel bir haber. 

3. Her bir executor en fazla iki task calistirabiliyor ayni anda. Bunu da yine barlara bakarak anlayabiliyoruz. Bu durumda executor core sayisini artirarak buradaki paralelizmi artirabilir ve spark job'un toplamda daha kisa surmesini saglayabiliriz. 

Dynamic allocation
Yeri gelmisken dynamic allocation adi verilen spark ozelliginden bahsedebiliriz. Ayni EC2'nin auto-scale'i gibi, Spark yuk arttikca yeni executorlar isteyebilir ve yuk azalinca da kullanilmayan executorlari clustera iade edebilir. Bunu bir timeline gorseli ile irdeleyeim:


Dynamic allocation aktive edildiginde, ilk gozumuze carpan sey, executor alma isinini onceden degil de bir job esnasinda gelistigi oluyor. Normalde buradaki count job'undan once executorlarin reserve edilmesi gerekirdi ama bu sefer job'a gore executor istiyor spark. 

Daha sonra ilk job tamamlandiktan sora, bosa cikan execurotlar yine clustara geri veriliyor. Bu da clusterda calisan diger job'larin bunlari hemen kullanabilmesi ve fazla beklememsi anlamina geliyor ki bu da cluster utilization'u artiran bir avantaj. 

Bizim spark job daha sonra yeni bir job'a basliyor ve yeni executorlar kendisie veriliyor. (sorry for the wording)

Goruldugu gibi gorsel olarak olan olaylari takip edebilmek spark job'undaki bottleneck'lari tespit etmede cok faydali. 

Execution Dag
Aslinda bir spark job'u, bir directed acyclic graph seklinde temsil edilebilen RDD operasyonlarindan olusuyor. Eger biz bu dag'i gorsellestirebilirsek debug etmemiz kolaylasir. 


Bu job basit bir word-count gerceklestiriyor. Oncelikle HDFS uzerinden bir dosya okunuyor, daha sonra her bir satir flatMap ile kelimelere ayriliyor, her bir kelime sayim icim (kelime, 1) seklinde tuple'lara donusturuluyor map ile ve son olarak da reduceByKey ile her bir distinct kelime sayisi hesaplanmis oluyor. 

Mavi kutucuklar spark uygulamasi icerisinde yapilan bir operasyonu gosteriyor. Kutular icerisindeki siyah daireler de bu operasyon sonucu olusan RDD'yi temsil ediyor. 

Buradan cikarimsamalarimiz:
1. Spark shuffle gerektirmeyen operasyonlari tek bir stage icerisinde toplayarak pipeline ediyor. Yani bu operasyonlari tek tek degil de tek bir operasyonmus gibi birbiri ile birlestirerek optimize ediyor. Ornekte bir partition dosyadan okunduktan sonra her bir executor map ve flatMap islemlerini o partition uzerinde lokal olarak uygulayabiiyor. Bu durumda data shuffle etmeye gerek kalmiyor.

2. Ilk RDD'nin cache'lendigini goruyoruz (yesil nokta). HDFS'ten okumak maliyetli bir islem oldugu icin bu cacheleme yapilarak ileride bu RDD'yi tekrar kullanmak gerektiginde en azindan bir kismina hafizadan erisilecek ve performans artisi saglanacak. 

Dag gorsellestirme kendisini esasinda complex joblarda gosterecektir. Ornek olarak ALS (Alternating Least Squares) algoritmasi iki matrisin yaklasik carpimini iteratif olarak hesaplamaktadir. Bunun icin de bircok map, join, groupByKey islemleri kullanilmaktadir. 

Iteratif bir algoritma oldugu icin dogru kisimlari cache'lemek performansi cok ciddi olcude articaktir. Bu Dag gorsellestirme ile kullanicilar hangi RDD'lerin cache'lenmesi gerektigini ve genel olarak spark job'un neden yavas oldugunu anlayabilirler. 

Dag gorselinde de bir stage'e tiklayarak detaylarina inebilmek mumkun. Asagidaki gorsel ALS algoritmasindaki tek bir stage'e ait:


Stage detay gorunumunde her bir RDD icin aciklamarin daha genis verildigini goruyoruz. 

Son olarak da dag gorsellestirme ile Spark SQL integrasyonundan bahsedebiliriz. SparkSQL kullanicilari RDD'lerdeki gibi low-level spark primitifleri yerine daha high-level operasyonlarla ilgilendikleri icin SparkSQL gorsellestirmesini (spark UI'da, SQL sekmesinden ulasilabilir) kullanmalari daha faydali olacaktir. 

Bu post databricksin ilgili yazisindan cevrilmistir. 

Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

SD #1: Scalability

Threat Modeling 1