Spark #6 : RDD nedir?

clusterda datalar yeniden dagitiliyor (shuffle)

Resilient Distributed Dataset
Bir clusterda node'lar uzerinde dagitilmis halde bulunan ve uzerinde paralelde islemler yapilabilen bir koleksiyondur. Spark dokumantasyonu boyle tanimliyor RDD'yi. Yani aslinda bir program yazarken normal , tek bir makinenin hafizasinda bulunan bir kolleksiyon gibi islemler yapabiliyoruz (mesele .map) ama aslinda bu veriyapisi birden fazla makinenin hafizasinda dagitik olarak bulunuyor. Bir parcasi orada bir parcasi burada falan gibi. Hatta parcalarda replikasyon var ki fault tolerant olsun.

Iste bur bir soyutlama aslinda. Spark api'si tasarlanirken bu dagitik kolleksiyon uzerinde islem yapmayi sanki normal bir kolleksiyonmus gibi developer'dan sakliyor. Ama burada da bir rist var. Bazi yapilan islemler tum node'lardaki datanin birbiriyle senkronize edilmesini ya da yeniden dagitilmasini (shuffle) gerektiriyor. Eger bunlara dikkat etmezsek, cok ciddi performans kayiplari yasanabiliyor. 

Transformation
Simdi bu RDD uzerinde yapilan islemler (map gibi reduce gibi) lazy yani hemen calistirilmiyor diye bahsetmistik. Her bir uygulanan transformasyon, biriktiriliyor. Ve bir plan cikariliyor. Yani datayi suradan soyle okuyacagiz, su keyler uzerinde gruplama yapacagiz, suradan count alacagiz falan gibi sadece yapilacak olan islem adimlari belirleniyor ve optimzie ediliyor. Bu da aslinda bir asyclic graph yapisi olustuyor. Yani datayi o anda gercekten islemek yerine nasil isleneceginin planini tutuyor sadece. 

Bu sayede eger cluster uzerinde bir node kaybedilirse, orada bulunan RDD, tekrar kendisini nasil olusturmasi gerektigini biliyor. Bu sayedece fault tolerance de saglanmis oluyor. 

Action
Tamam transformasyonlar yani data uzerinde yapilacak manipulasyonlar lazy ama bir noktada da gercekten bu donusumleri yapmak gerekiyor. Bu da bir aksiyon ile mumkun. Ornek olarak count gibi. Siz bir RDD uzerinde bircok filtreleme ve transformasyon uygulayip son olarak da bu islemler sonucuda kac tane kayit donecegini bulmak istediginiz anda, count metodu calistigi anda daha once olusturulan DAG calismaya basliyor. 

Aksiyon calistiktan sonra ya olusan sonucu bir yerlere yazabilir ya da driver programa geri dondurebilir. Eger yazma islemi yok da geri toplama varsa (mesela collect metodu), bu durumda da dikkat edilmezse eger cok fazla kayit donerse driver programi hafiza yonunden patlatip OOM (out of memory) problemlerine yol acilabilir.

Nereden geliyoruz?
Bu arada 2 sekilde RDD yaratabiliniyor. Birincisi ve genel olarak HDFS uzerinden bir dosya okunarak. 2.si ise mevcut bir scala kolleksiyonunu parallel hale getirerek. 

val data = Array(1, 2, 3, 4, 5)
val distData = sc.parallelize(data)

Seklinde. RDD bir kere olusturulduktan sonra uzerinde paralel islemler yapilabilir. Ornek olarak bu listededki sayilarin toplamini bulmak istersek:

distData.reduce((acc, cur) => acc + cur)

Seklinde birsey yapabiliriz. Diger bir onemli bir parametre ise, RDD'nin kac partition'a bolunecegi yani daha dogrusu kac parcaya ayrilip node'lara dagitilacagi. Spark her bir partition icin paralelde bir task calistiracaktir. Yani sizin sadece 1 partitionununz varsa ne yaparsaniz yapin paralelde islem yapamazsiniz. Genel olarak CPU basina 2-4 arasinda partition ayrilmasini salik veriyor spark dokumantasyonu. Ve de ekliyor, Spark sizin cluster'in durumuna bakarak partition sayisini otomatik olarak ayarlamaya calisir ama bunu manuel olarka da set edebilirisiniz. 

Partitinon Mevzusu
Partition sayisi performansa etki eden en buyuk etkenlerden bir tanesi. Uygulama ihtiyaclari ve de cluster durumuna gore tune edilmesi gerekiyor. Gereginden fazla partition olmasi demek, her bir partition'un cok az dataya sahip olmasi ya da hic dataya sahip olmamasi anlamina gelecektir. Bu da gereksiz parallelism ve data-shuffle ile performansi kotu etkiler. Eger gereginden az parititon olur ise, bu sefer cok az paralelizm olur. Cunku spark herbir partiton icin paralelde islem yapabilir. Sadece 1 parititon olan yerde paralelizm olmaz. 

Tabi bunun ust limiti cluster'daki core sayisidir. Her bir core 1 paralel task calistirabilir. Bu acidan gelenek, her bir core basina 2-3 parititon sahibi olmaktir. 


Bana mi partition?
Cogu (belki de tum) spark uygulamalari bir yerden bir data okuyarak basliyor. Spark, her bir HDFS dosya parcacigi icin (default deger her biri icin 64 Mb) bir partition olusturur. Yani biz 3 tane hdsfs partitionu olan bir dosya okur isek spark ile, bu RDD, 3 partition'a sahip olacaktir, eger ozel bir islem yapmazsak repartition gibi. 

Herhangi bir RDD'nin kac partitionu oldugunu ogrenebiliriz:

rdd.partitions.size

to be Dewamked.


Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

SD #1: Scalability

Threat Modeling 1