Paper kosesi #1 : RDD Paper
Daha once spark paper'den bahsetmistik. Ama daha pratik ve saniyorum ki daha az tatava icermesi ihtimaline binaen bu paper ile okumalara basliyoruz.
Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing
Ozet gec chip (abstract)
RDD, programcilara cluster uzerinde datigit olarak in-memory islem yapabilmeyi saglana bir soyutlama katmanidir. Bu yapiyi olusturmaktaki esas motivasyon, suanki uygulamalarin (Hadoop'u kastediyor) inefficient olarak handle ettigi iteratif uygulamalar ve interactive data-mining. Cunku iki durum icin de, datayi hafizada tutmak (Hadoop, bunun aksine her step'i diske yaziyordu), performansi kat be kat artirmaktadir.
Fault tolerance'i elde edebilmek icin, RDD'ler cok detayli state guncellemeleri yerine, kaba (coarse grained) transformasyonlara dayanan cok kisitli shared-memory sunar. (dediklerin iyi gibi ama cok da anlamadik). Buna ragmen, RDD'lerin destekledigi transformasyonlar cok genis yelpazade uygulamalara izin vermektedir. (yogurdum eksi demez tabi)
1. Giris
MapReduce ve Dryad gibi cluster computing framework'leri cokca yayginlasmistir. Bu frameworkler programcilara distributed olarak calisan programlar yazmalarina olanak saglarken isin distributed kismi ve fault tolerance kismi ile ilgilenmemelerini saglarlar.
Ancak frameworkler cluster'in islem gucune erisim saglarken, distributed bir memory kullanimini saglayacak soyutlamalardan yoksundurlar. Bu acidan, hizla yayginlasan bir ugyulama tipi olan ve ara sonuclari tekrar-tekrar kullanan iteratif uygulamalarda cok verimsiz kalmaktadirlar. Ozellikle Page-rang, K-Means clustering ve logistic regression gibi machine learning uygulamalarinda bu ara datanin tekrar tekrar kullanilabilmesi onem tasir.
Diger bir kullanim alani da kullanicinin ayni data uzerinde bircok ad-hoc query calistirarak interaktif olarak data mining yapmasidir. Ne yazik ki gunumuzdeki araclar (gene mapreduce) ara sonuclari diske yazmak durumundadir. Bu da disk I/O ve serialization gibi maliyetler getirerek uygulamanin bottle-neck'ini teskil eder hale gelmektedir. Pregel, HaLoop gibi araclar cikmis olsa da bunlar ozel bazi kullanimlar icin olup genel bir reusable memory abstraction teskil etmemektedir.
Bu paper'da biz datayi cluster uzerinde distributed olarak hafizada tutabilen ve bunlar uzerinde zengin operasyonlar yapip ortaya cikan ara sonuclari da yine hafizada tutan, partition sayisini ayarlanmasina izin veren bir yapidan bahsedecegiz: RDD.
RDD'nin esas fark noktasi fault tolerance'i efficient olarak saglayabilmesidir. Mevcut distributed in-memory araclari, distributed shared memory, key-value stores ve Piccolo data uzerinde cok detayli guncellemelere (fine-grained updates on mutable state) izin vermektedir. Bu durumda fault tolerance'i elde etmenin tek yolu datayi node'lar uzerinde replika etmek veya data uzerine uygulanan update'leri node'lar uzerinde loglamaktir. Iki yontem de buyuk miktarda datanin network uzerinden hareket etmesini gerektirdigi icin cok maliyetli ve verimsizdir.
Buna karsilik RDD; map, filter, join gibi coarse-grained transformasyonlar sunar. Bu transformasyonlar data uzerindeki her bir kayda uygulanmaktadir. Bu sayede bir RDD datanin tamamnini tutmak yerine, o datayi olusturmak icin gerekli adimlari tutar. Eger bir partition kaybolursa, RDD o partitionu onceki RDD'leri kullanarak nasil olusturmasi gerektigini (lineage) bilir ve tekrar olusturabilir. Bu sayede kaybolan bir parittion tekrar verimli bir sekilde olusturulabilmektedir.
fine grained update --> belirli satir/stuna uygulanan guncelleme (db update gibi)
coarse grained transformation --> Tum dataya uygulanan transformasyon (liste isleme gibi)
RDD'nin sundugu coarse grained transformasyonlar kisitli gibi gorulebilir. Ama aksine data islemede genellikle operasyonlar tum data uzerine uygulandigindan, bu is icin cok uygundur. MapReduce, DryadLINQ, SQL, Pregel and HaLoop gibi sistemlerin sundugu tum programlama modellerini RDD kullanarak olusturulabilecegini gostercegiz. (heyt koc be).
2. Resilient Distributed Dataset
RDD, readonly ve partitioned olarak bulunan bir kolleksiyondur. RDD'ler sadece 2 deterministik yol ile uretileblirler: stable bir storage'da bulunan data'dan okumak suretiyle ya da diger RDD'ler kullanilarak.
RDD'ler her zaman materilaze edilmek zorunda degildirler. RDD sahip oldugu her bir partitionun data kaynagindan okunup nasil olusturulacagi bilgisine sahiptirler.
Kullanicilar RDD'nin persistence ve parititoning uzerinde kontrol sahibidirler. Kullanici hangi RDD'nin tekrar tekrar kullanilacagini belirtebilir ve buna uygun bir sakmala medium'u (mesela tamamen hafiza storage) secebilir. RDD'nin elemanlarinin bir key'e bagli olarak cluster uzerinde node'larda partitioned olarak saklanmasi da saglanabilir. Bu yontem placement-optimizations icin cok elverislidir, mesela join edilecek iki dataset ayni sekilde hash-parititon edilmis olmasi saglanabilir.
Spark Programlama Arayuzu
Spark RDD'leri birer programala dili (scala) objesi olarak sunar. Bu yontemde herbir RDD bir obje ve transformasyonlar da bu objeler uzerinde uygulanan metotlardir.
Programcilar stable storage uzerinde bulunan bir dataset uzerinde transformasyon yaparak (map veya filter gibi) RDD olusturmaya baslayabilirler. Daha sonra bu RDD'lere action'lar uygulanarak belirli data uygulamaya geri toplanabilir ya da yine stable bir storage uzerine kaydedilebilir. Action'lara ornek olarak count (dataset icerisinde kac eleman oldugu), collect (datasetteki elementleri programa geri toplar) ve save (datasetteki elemanlari diske yazar) verilebilir. Tum transformasyonlar DryadLINQ'te oldugu bini lazy'dir ve pipeline edilebilir.
Ek olarak, programcilar istedikleri bir RDD uzerinde persist metodu calistirarak hafizada cache'lenmesini saglayabilirler ki tekrar tekrar bu RDD refere edildiginde tum operasyonlar tekrardan calistirilmasin. Cache'leme islemi diske veya once hafizaya sonra yeterli yer olmadiginda diske spill edilecek sekilde ayarlanabilir.
Ornek: Console log madenciligi
Bir web uygulamasi cesitli hatalar uretiyor ve programci HDFS uzerinde bulunan terabytlarca log dosyasi icerisinden arama yapiyor. Spark kullanicisi, bu datayi cluster uzerinde bulunan node'larin hafizasina (RAM) yukleyerek interaktif olarak sorgular calistirabilir.
lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR"))
errors.persist()
Burada ilk satirda HDFS uzerinde bulunan bir dagitik dosyadan okuyarak bir RDD olusturuluyor. Ikinci satirda ilk RDD uzerinde bir filtreleme islemi uygulanarak yeni bir RDD olusturuluyor. (burada filtre expression'u bir scala closure ile saglanmis olduguna dikkat). Son olarak da persisit metodu ile bu RDD'nin hafizada tutularak birden fazla query icin share edilmesi saglaniyor. Bu noktaya kadar cluster uzerinde henuz hic bir islem yapilmadi. Simdi kullanici bir aksiyon calistirarak, ornegin kac error mesaji bulundugunu bulabilir:
errors.count()
Kullanici daha fazla transformasyon uygulayabilir:
// Icerisinde MySQL gecen hata mesajlarini say
errors.filter(_.contains("MySQL")).count()
// icerisinde HDFS gecen hata mesajlarini al ve time-field kismini parse-et
errors.filter(_.contains("HDFS"))
.map(_.split(’\t’)(3))
.collect()
errors RDD'sini refere eden ilk action calistiktan sonra spark, errors RDD'sinin tum partitionlarini RAM'de saklayacak ve bir sonraki querylerde cok buyuk bit performans kazanci saglayacaktir. Ornek olarak yukaridaki filter islemlerinde errors isimli RDD hafizada hazir bulunmaktadir, tekrar dosyadan okumaya gerek yoktur.
Fault tolerance'in nasil elde edildigini gostermek icin RDD'nin kendi ejdadini (lineage) nasil tuttugunu gosterelim:
Burada errors RDD'si uzerinde iki transformasyon daha uygulanmis durumda. Son olarak collect aksiyonu calistirildigi zaman spark bunlari pipeline ederek, bazi task'lar olusturur. Bu tasklar cluster uzerindeki node'lara gonderilerek burada errors RDD'sinin cachelenmis olan partitionlari uzerinde calistirilir. Egere errors RDD'sinin bir partitionu kaybolursa (mesela clusterdaki bir node cokerse), Spark sadece o partitionu tekrar olusturabilir. Yapilmasi gereken lines RDD'si uzerinde karsilik partitionlara gerekli transformasyonlarin uygulanmasidir.
RDD Modelinin Avantajlari
RDD gibi bir distributed memory soyutlamasinin avantajlarini gormek icin bunu distributed shared memory (DSM) sistemler ile karsilastirabiliriz.
DSM sistemlerinde programlar global data icerisinde istedikleri yere (stari/stun gibi dusunelebilir) yazip okyabilmektedirler. DSM cok esnek bir yapidir ancak bu esnek yapisi, comodity hardware'lerden olusan cluster'larda fault tolrance elde etmeyi zorlastirmaktadir. RDD ve DSM arasindaki en buyuk fark, RDD sadece deterministik coarsa grained transformasyonlar ile olusturulabilirken DSM herhangibir kayit uzerinde herhangi bir verinin degistirilebilmesini olanakli kilar. (anladik aq ayni seyi kacinci seferdir anlatiyor C.N.) Bu sayede RDD parittionlari lineage kullanilarak tekrar olusturulabilirler ama DSM sistemleri datanin kendisini replika etmek zorundadirlar.
Bir diger ustunluk de yavas calisan tasklar icin ayni datayi okuyup ayni sonucu ureten birden fazla task calistirilip, birisinin fail olmasi durumunda otekisinin basarili olmasini saglayabilmektir. Ama DSM sistemlerde bu mumkun degildir. Cunku DSM global state'i mutate ettigi icin backup tasklar arasinda cakisma yasanabilir. (bildigin pompa C.N.)
Bu arada okuma esnasinda (write'ta degil), RDD islemleri de fine grained olabilir. Yani gidip istediginiz datasetin istediginiz row'unun istediginiz cell'ini okuyabilirsiniz. Sikinti yoktur.
RDD uzerinde islem yaparken, data locality de hesaba katilacak sekilde tasklar schedule edilebilir ve bu sayede network I/O azaltilarak performans artisi saglanabilir.
Son olarak da RDD'ler icin yeterince RAM yok ise gracefully bir sekilde diske yazilabilir. En kotu ihtimalle mapreduce performansina inilmis olur.
Spark mimarisi: Driver program birden fazla worker olusturarak distributed file systemden datayi okur ve computed RDD'leri hafizada persiste eder. (Turkce katliamina bak C.N.) RDD'ye gelmeyen uygulamalar RDD'ler, kolleksiyondaki tum elemanlara uygulanan genel transformasyonlar uzerine kurulmustur. Ve dogal olarak da, surekli shared state uzerinde fine grained update'ler yapan uygulamalar icin elverisli degildir. Ornek olarak bir web sitesinin data-base yapisi verilebilir. Bu gibi uygulamalar icin RAMCloud, Percolator veya Piccolo gibi sistemler kullanilmalidir. (hic bir tanesini de duymadik aq C.N.) Ancak bizim amacimiz genel bir batch analytics engine sunmak ve bu gibi state management islerini ozel sistemlere birakmak. Spark Programlama Arayuzu Spark'i kullanabilmek icin programcilar bir drive program yazarlar. (yukaridaki figurde gorulecegi gibi). Bu driver program, worker'lardan olusan bir cluster'a baglanarak is yaptirir. (Bir nevi taseron, trabzonlu muteahit C.N.). Driver programda bir veya daha fazla RDD olusturulur, bunlar uzerinde aksiyonlar uygulanir. Ayrica driver program RDD'lerin lineage'lerini de tutar. Worker'lar uzun yasam suresine sahip process'lerdir ve RDD partitionlarini hafizada tutarlar. Log madenceligi ornegindeki gibi kullanicilar RDD'ler uzerinde transformasyonlar yaparlar, ve bunu closure pass ederek (fonksiyon literalleri) yaparlar. Scala bu closure'lari birer Java nesnesi olarak saklar ve serialize ederek network uzerinden worker'lara dagitir ki onlar da bu islemi kendi lokallerinde bulunan RDD paritionlari uzerinde uygulayabilsinler. Ayrica Scala bu closure'lar icerisinden refere edilmis (bound) degiskenleri de saklar ve netwok uzerinden aktarir. Yani su sekilde bir transformasyon yazmak mumkundir: val x = 5 val rdd1PlusFive = rdd1.map(r => r + x) Buradaki x, map icerisindeki fonksiyon literaline `bound` durumundadir ki x de serialize edilerek diger node'lara aktarilir. Bu yapi konsept olarak basit gozukse de, scala'nin closure nesneleriyle alakali bazi sorunlari reflections kullanarak cozmemiz gerekti. Ayrica Spark'i scala interpreter icerisinden calistirabilmek icin de hayli emek sarfetmemiz gerekti ki bunlardan ileride bahsedecegiz. (heralde emek vereceksin, big data camiasi yan gelip yatma yeri degildir C.N.) Spark'da RDD islemleri Asagidaki tabloda RDD'ler uzerinde gerceklestirilebilecek transformasyonlar ve aksiyonlar listelenmistir. Join gibi bazi operasyonlar sadece pair RDD'ler uzerinde gerceklestirilir. |
Transformasyonlar ve aksiyonlar Bunun disinda kullanicilar bir RDD'yi hafizada persist edebilir. Bir RDD'nin partition order'i sorgulanabilir (bir Partitioner class tarafindan tanimlanir) ve baska bir dataset bu partitioner'e gore partition edilebilir. (burayi pek anlamadik) [Aslinda soyle, bir Partitioner class, her bir key degerinin hangi partition'a map edilecegini belirliyor galiba.] Ornek Uygulamalar Logistic regression ve PageRank olmak uzere iki ornek uygulama yapilacaktir. Ayrica parition kontorulunun nasil performans artisi saglayabilecegi gosterilmistir. Logistic Regression Machine learning uygulamalari dogasi geregi iteratif uygulamlardir. Iteratif olarak bir fonksiyonu maksimize etmeye calisan gradient descent gibi mesela. Data memory'de tutuldugunda daha hizli calismalari saglanabilir. Ornekte tipik bir classification algoritmasi olan gradient descent implemente edilmistir. w seklinde bir hyperplane optimize ederek iki nokta setini en iyi sekilde ayirmaya calismaktadir. (spam ve spam olmayan mailler mesela). Random bir w degeri ile baslayarak, her iterasyonda tum data uzerinden gecilerek, w degeri iyilestiriliri. val points = spark.textFile(...)
.map(parsePoint) .persist() var w = // random initial vector for (i <- 1 to ITERATIONS) { val gradient = points.map{ p => p.x * (1/(1+exp(-p.y*(w dot p.x)))-1)*p.y }.reduce((a,b) => a+b) w -= gradient } |
Oncelikle bir text dosyasindan okuyarak, her bir satir bir nokta degerine tekabul edecek sekilde map islemi ugularak bir RDD olusturuyor ve bunu hafizada persist ediyoruz.
Daha sonra tum data uzerinden iteratif olarak w degerini iyilestirmek icin map ve reduce islemlerini uyguluyoruz. Burada points RDD'sini hafizada persist etmek, iteratif islem icin 20X performans artisi saglamaktadir.
PageRank
Algoritma her bir dokuman icin iteratif olarak bu dokumana verilen linklerden yola cikarark bir rank degeri gunceller.
val links = spark.textFile(...).map(...).persist()
var ranks = // (URL, rank) pairlerinden olusan RDD
for (i <- 1 to ITERATIONS) {
// herbir sayfanin katkilari ile rank degerlerini hesapla
val contribs = links.join(ranks).flatMap {
(url, (links, rank)) =>
links.map(dest => (dest, rank/links.size))
}
// herbir url icin saglanan katkilari topla
ranks = contribs.reduceByKey((x,y) => x+y)
.mapValues(sum => a/N + (1-a)*sum)
}
Bu program su sekilde bir RDD lineage olusturuyor:
Bu lineage graph'indan da gorulecegi gibi, her bir iterasyonda bir onceki contribs datasetini ve de statik olarak links datasetini (yani links tabi ki her iterasyonda sabit, degismiyor) kullanarak yeni bir ranks dataset'i (RDD) olusturuyoruz. Iterasyon uzadikca bu graph da haliyle buyuyor.
Bir noktada ranks ve contribs icin lineage cok uzun olacaktir, bur durumda RELIABLE flag'i ile bunlari persist etmek daha performansli olur. [Yani aslinda checkpointing yapilmasi gerekir. Spark bu durumda RDD'leri HDFS uzerine kaydeder ki, bu noktada lineage ile islem yapmak direk dosyadan okumaktan daha az performansli hale gelebilir]
Checkpointing: Spark RDD'yi HDFS'e kaydeder ve lineage bilgisini tamamen siler.
SparkContext.setCheckpointDir(directory: String)
rdd.checkpoint
Graph'a geri donersek, links RDD'sinin ise degismediginden dolayi checkpoint edilmeye ihtiyaci yoktur.
Papapapapapapa partition show
Son olarak, bu ornekteki partition'lamaya mudahale ederek, join'deki shuffle gereksinimini ortadan kaldirabilir ve performans artisi saglayabiliriz. links ve ranks arasindaki join, URL uzerinden yapilmaktadir. Eger ki iki RDD'de ayni key (ornekte URL) uzerinde partition edilmemis ise, join edilecek satirlar baska makinelerde yeraliyor olabilir. Bu da buyuk miktarda veririn network uzerinden tasinmasi anlamina gelir.
links RDD'sinde bir partitioning metodu tanimlar isek (mesela her bir url'i hash edip bunu key olarak alan) daha sonra ayni partition stratejisini ranks RDD'si icin de kullanabiliriz. Bu durumda links ve ranks arasinda gerceklesen join islemi node'lar arasinda hicbir data transferine ihtiyac duymaz. Cunku ayni hashs'e sahip url ve rank degeri ayni makine uzerinde bulunacaktir. (Burasi cok onemli bir nokta, super anlamamiz gerekiyor C.N.)
Ayrica ozel bir Partitioner class yazip, birbirlerine link veren sayfalarin ayni makinede olmasini saglayabiliriz. (Mesela url'leri domain'e gore partition edebiliriz). Tum bu optimizasyonlar partitionBy ile yapilabilir.
links = spark.textFile(...)
.map(...)
.partitionBy(myPartFunc)
.persist()
Bu gibi tutarli (consistent ? ) partition'lama ve join etme Pregel gibi ozel framework'lerin sundugu bir ozellik iken spark bu ozelligi RDD'ler uzerinden genel olarak sunmaktadir.
RDD'lerin temsil edilmesi
RDD'lerde lineage graph yapisi ile temsil edilmektedir. Bu sayede RDD'ler cok cesitli sekillerde compose edilebilir ve transformasyonlar uygulanabilir. Bir RDD icin su bes veri gereklidir:
1- partition'lar (atomik veri setleri)
2- dependency'ler (bu RDD'nin dayandigi onceki RDD'ler)
3- gerektiginde bu RDD'nin parent RDD'lere bagli olarak tekrar hesaplanabilmesini saglayan fonksiyon
4- partitioning schema ve data placement meta-data'si
Ornek olarak HDFS uzerinde bulunan dagitik bir dosyayi temsil eden bir RDD, bu dagitik dosyanin kac partition'dan olustugunu ve herbir partition'nun hangi makinede bulundugunu bilir. Ve bunun uzerine uygulanan bir map transformasyonu ile olusan yeni RDD de ayni sayida partition'a sahiptir ve atasinin partition'lari uzerine bur map islemini uygulayarak kendi elementlerini hespalar.
RDD'leri temsil eden spark apisi |
Burada en enteresan soru, parent RDD'lerin ve parent-child iliskisinin nasil temsil edilecegi idi. Bu asama iki cesit dependency gorduk. Narrow ve Wide.
map: bir RDD uzerinde map uygulanmasi MappedRDD dondurur. Bu RDD, parent RDD'sinin sahip oldugu ayni partiton ve preferredLocations'a sahiptir. Iteror metodu icerisinde kendi parent RDD'si uzerinde uygulanacak (element wise) transformasyonu barindirir.
union: iki RDD'nin sahip oldugu partition'larin birlesitirilmis haline sahip bir RDD dondurur. Her bir child partition, narrow dep. ile hesaplanir.
sample: aslinda map'e cok yakindir ancak sadece RDD'nin bunyesinde her bir partition icin random sayi generator seed'i bulunur ki bu sayede deterministik olarak parent RDD sample edilebilir.
join: join islemi eger iki RDD de ayni partitioner ile hash/range partition yapilmissa iki narrow dependency ile kurtarabiliriz. RDD'lerden bir tanesi partitioneri varsa bir wide bir de narrow dependency gerekit. Eger hic bir partitioning yok ise iki wide dep.'e ihtiyacimiz olur. (Bu noktada join islemi sonucu ucuncu bir RDD olustugu ve bu bahsettigimiz dependecy'lerin bu ucuncu RDD'ye ait oldugunu hatirlatmak isterim. C.N.) Olusan RDD de bir partitioner'e sahip olur. Bu partitioner parent'lerden turetilir ya da default olarak hash partitionerdir.
Implementasyonun Icinden
Spark'i 14000 satir scala kodu ile gelistirdik. Uygulama suanda Mesos resource manager uzerinde calisiyor bu sayede Hadoop, MPI ve diger uygulamalarla resource paylasimi yapabilir. (Bu bira eski bir yaklasim sanirim, hala mesos kullanan var mi bilmiyorum. Yarn, yardi atti eheuehehzhzhhz C.N.)
Her bir spark programi kendi basina bir mesos uygulamasi olarak calisir. Kendi driver programi (master) ve workerlari ile. Bu ayrik spark uygulamalari arasindaki resource paylasimi mesos tarafindan halledilir.
Spark, ilgili Hadoop pluginleri ve api'leri sayesinde her turlu Hadoop input seklini okuyabilir ve modifiye edilmemis scala uzerinde calisir.
Job Scheduling
Spark scheduler'i Dryad'inkine benzer ancak persistent RDD'lerin hangi partitionlarinin hafizada (RAM) hazir bulundugu bilgisininden yararlanir. Kullanici RDD uzerinde bir aksiyon calistirdigi zaman (count veya save gibi)
1- RDD'nin lineage graph'i incelenir
2- stage'lerden olusan bir dag olusturulur
3- her bir stage, mumkun oldugunca fazla pipeline edilmis narrow-dep gerektiren operasyonlardan olusur.
4- stage'lerin sinirlarini shuffle operasyonu belirler. cunku eger bir wide-dep gerektiren operasyona denk gelmissek, data'nin shuffle edilmesi gerekir. Bu da yeni bir stage demek olur. Ya da parent RDD'nin hesaplanilmasini kolaylastiracak sekilde daha onceden hesaplanmis bazi partitionlar var ise scheduler her stage'de eksik partitionlari tamamlar. (burayi pek anlamadik C.N.)
Spark'da stage'lerin durumu |
Spark scheduler joblari schedule ederken data-locality'i hesaba katar, delay-scheduling kullanir. Eger bir taskin, bir partition uzerinde islem yapmasi gerekiyorsa ve bir node RAM'de bu partitionu tutuyor ise, bu task direk o node'a gonderilir.
Arkadaslar, post cok fazla uzadi. Burada kesip bir sonraki bolumde devam etmek dilegiyle.
Yorumlar
Yorum Gönder