Spark #13 : Keşleme


Spark'in en buyuk avantajlarindan birisi olan ara datalari RAM'de saklamak ve Hadoop'a karsi 100x'e kadar bir performans farki saglamak. 

Diyelim ki soyle bir RDD'im var ve bu RDD uzerinde maliyetli islemler yapiyorum. Bunu simule edebilmek icin Thread.sleep kullancagiz. 

scala> val cc = sc.parallelize(Seq(1,2,3,4,5))

scala> val cc1 = cc.map(x => {
         | Thread.sleep(10000)
         | x*x
         | })

Bu asamada cc1.collect calistirirsaniz yaklasik 10 saniye surecektir (eger enaz 5 core'unuz var ise). Simdi de fazla maliyetli olmayan baska bir transformasyon tanimlayalim ve drawdan bir action calistiralim ki dag execute edilsin:

scala> cc1.map(x=>x+1).collect

Simdi bunun da 10 saniye civari surdugunu gormekteyiz. Neden? Cunku her bir action, tum execution planin tekrar calistirilmasini gerektirir. Eger ki cache'leme yapilmamis ise. Bu durumda cc1 RDD'sine cache ekliyorum. 

scala> cc1.cache()

Ve cache'in calisabilmesi icin cc1 uzerinde bir action calistirmam gerekiyor. Aslinda bunu en basta RDD'yi tanimlarken yapmamiz gerekirdi ama ornegi detayli anlayabilmek icin basta yapmadik.

scala> cc1.count

Yime 10 saniye civari suruyor. Ama bu sefer cc1, RAM'de cacheleniyor ve uzerinde baska bir transformasyon calistirdigimizda:

scala> cc1.map(x=>x+1).collect
res16: Array[Int] = Array(2, 5, 10, 17, 26)

Aninda sonuc aliyoruz :D. Neden? Cunku collect calistigi anda spark execution plan'a bakiyor. Goruyor ki, cc1 kismi RAM'de cache'lenmis vaziyette. Onu tekrar maliyetli transformasyonu yaparak olusturmak yerine direk RAM'den getiriyor. Ve son olarak uzerinde ornekteki ufak map metodunu calistiriyor ki bu da maliyetli bir islem degil. 

persist
Aslinda cache de persist metodunun sadece RAM icin olan versiyonu. Persist ise daha genis kapsamli, diske de yazma secenegi sunuyor. Metot signature'une bakarsak:

def persist(newLevel: org.apache.spark.storage.StorageLevel):

Buradaki storageLevel hangi degerleri alabilir ve anlamlari nelerdir, gorelim:

MEMORY_ONLY 
MEMORY_AND_DISK --> eger RAM'de yeterli yer yoksa diske spill et
DISK_ONLY
MEMORY_ONLY_SER --> RAM'de serialized sekilde tut. Daha az yer kaplar ama desearializtion maliyeti getirir.
MEMORY_AND_DISK_SER

ve hatta yukaridaki seceneklerin sonuna _2 ekleyerek, data replikasyonunu 2 olarak belirleyebilir ve her partition'dan 2 ser adet node'lar uzerinde tutulmasini saglayabilirsiniz. Son secenek ise 

OFF_HEAP

Bu da hafizada saklamakla beraber JVM disinda HEAP uzerinde saklamak icin henuz deneysel olan bir ozellik. 

Peki cache'lenmis RDD ile isimiz bitince ne yapmamiz gerekiyor? unpersist(blocking=true) metodunu cagirabilir (arguman ile blocking veya non-blocking olarak) hafizada bu RDD tarafindan kaplanan yeri temizleyebiliriz. 

Aslinda buna cok fazla gerek yoktur. Ilk olarak egere RDD scope disina cikarsa cache kendisini temizleyecektir. Ikinci olarak da bundan daha sonra gelen ve RAM ihtiyaci olan baska bir RDD tarafindan evict edilecektir. Spark, memory eviction stratejisi olarak LRU kullanir.  Yani belirli bir zaman diliminde gecmise bakildiginda en az erisilen hafiza bloklari temizlenecektir

Eger RDD'nin persist seviyesi degistirilmek istenirse oncelikle unpersist calistirilmalidir. 

Dewamke


Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding