Spark #14 : Akumulator

Meslea spark job'u esnasinda ortaya cikan hatalarin sayisini bir sekilde tutmak istiyoruz. Spark, cluster uzerindeki tum node'lardan erisilebilen, guncellenebilen ama deger okumasi sadece driver program uzerinde gerceklestirilebilen bir veri yapisi sunuyor: Accumulator. 

Bir ornekle durumu aciklayalim:

scala> val acc = sc.longAccumulator("benim akum")
scala> val rdd = sc.parallelize(Seq(1,2,3,4,5))

scala> cc1.foreach(x => {
            | println(x)
            | acc.add(1)
            | })

scala> acc.value
res18: Long = 5

Seklinde, her adimda acc degerini 1 artiriyoruz. Normalde bakinca cok kolay bir islemmis gibi gelebilir ama unutmayin ki buradaki foreach icerisindeki anonim fonksiyon cluster uzerine dagitiliyor ve farkli makinelerde calisiyor olabilir. Bu da demek oluyor ki, acc isimli akumulatoru biz farkli makinelerden guncelliyoruz. Ve son olarak da driver programda degerini okuyoruz. Neresinden bakarsan cok kral bir ozellik bu kadar guzel implemente edilmis. 

Bu arada, akumulatorleri kullanmanin ne iyi yeri, action metotlaridir. Bu da spark'in fault tolerance yapisindan kaynaklanmaktadir. Eger biz acc degerini lazy olan transformasyonlar icerisinden artirirsak, ve bir noktada bir partition kayboldugunda spark bu transformasyonu tekrar calistirirsa, bu sefer gercegindan daha yuksek bir deger elde ederiz. Yani belirli partition icin acc artirma isi gereksizce 2 kere yapilmis olur. Bundan kacinmak gerekir. 

Bir spark job'unda kac hata olsutugunu bu sayede gorebiliriz demistik. Hatta hata sayisina bakarak job'un outputunun yazilmasi veya job'un fail etmesi gerektigini de belirtebiliriz. Bir error threshold tanimlayabiliriz. 

rdd.foreach (x => {
    try {
        fn(x)
    } catch {
        case _ => hataSayaciAkumulatoru.add(1)
    }
})

seklinde bir yapi ile toplamda kac hatayla karsilasildigi bulunabilir. Daha sonra kayit etme asamasinda kontrol edilerek job'un fail etmesi saglanabilir. 

Dewamke. 



Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

SD #1: Scalability

Threat Modeling 1