Data Patterns #1 - Data Summarization Pattern




Problem
Verilen bir DNA dizesinde (mesela "AATGC..."), her bir bazin (A, C, G, T) kac kere gectigini bulunuz.
Ornek dna dizilim dosyasi: https://ftp.ncbi.nlm.nih.gov/genomes/INFLUENZA/influenza.fna (1.3 Gb)

Cozum
Yani aslinda kabaca, verilen text icerisinde gecen distinct karakterleri saymamizi istiyor. Cok basit (kolay demedik) ve temel bir problem. 

Ornek bir veri dosyasi icin https://ftp.ncbi.nlm.nih.gov/genomes/INFLUENZA/influenza.fna (1.3 Gb text dosyasi)

Input sekline bakacak olursak:

>gi|58576|gb|X52226|Influenza A virus (A/FPV/Rostock/34(H7N1)) gene for neuraminidase, genomic RNA
AGCAAAAGCAGGAGTTCAAAATGAATCCAAATCAGAAAATAATAACCATTGGGTCAATCTGTATGGGGAT
CGGAATAATCAGCCTAATATTACAAATTGGAAACATAATCTCAATGTGGGTTAGTCATTCAATTCAGACT
GAAAATCAAAATCACCATGAAGCATGCAACCCAAGCATTGCTGGACAGGATGCAGCTTCAGTGGCACTAG
CAGGCAATTCCTCTCTTTGTCCCATTAGTGGGTGGGCTATATACAGTAAAGACAATGGTATAAGAATTGG
....

En basta, buyuktur isareti ile baslayan bir yorum satiri var ki bu satiri gozardi etmemiz gerekiyor. Geri kalan satirlardaki karakterleri sayacagiz.

spark-shell'de hizlica dosyaya gozatacak olur isek:

scala> val df = spark.read.text("/home/lombak-sehidi/influenza.fna")
scala> df.count
res0: Long = 19849373

19milyon kusur satir var. Bakalim en performansli sekilde bu karakterleri nasil sayabiliriz?


1. Naive Cozum

Ilk akla gelen yontemde:

1.text dosyasini RDD[String] olarak oku
2. her bir satirdaki (yani RDD elemanindaki) karakterleri split ederek, (k, 1) seklinde tupleler olustur ki, ensonda sayabilelim
3. Olusturdugumuz RDD[(Char, Int)] tuplelerindeki deki elemanlari key bazli olarak reduce edelim.


val rddDna = spark.sparkContext.textFile("/home/lombak/influenza.nfa")

def processDnaLine(line: String): Array[(Char, Int)] = 
    if(line.startsWith(">")){
        Array(('z', 1))      // eger yorum satiri ise bunu z karakteri ile temsil ediyoruz
    }
    else {
        line.map(karakter => (karakter, 1) // her bir karakterden count deger 1 olan tuple olusturuyoruz
    }

val charCounts = rddDna.flatMap(processDnaLine).reduceByKey(_+_)

charCounts.collect


Tupleleri barindiran rdd, aslinda PairRDD diye geciyor ve de ilk eleman, o pairin key'i olarak kabul ediliyor. Bu durumda akla, bu keyi kullanarak kayitlari (yani tupleleri) gruplayip ondan sonra da sayabiliriz gibi bir fikir de geliyor. Yani,

rddDna.flarMap(processDnaLine).groupByKey.mapValues(v => v.sum)

Gorulecegi gibi, once key ile kayitlari gruplayabilir ve daha sonra her bir grup uzerinde calisacak olan mapValues metodu ile her bir grubun (yani distinct karakterin) sayisini bulabiliriz. 


reduceByKey vs groupBy

Temel fark bu iki metodun sayma islemini hangi tarafra yaptigi. Map-side ve de reduce-side olmak uzere iki cesit yaklasim mevcut. Map-side islemler, data shuffle olmadan once, executor uzerinde, daha kucuk data ile yapilir. Islem bittikten sonra cikan sonuc (bu problemde her bir karakterin sayisi) shuffle edilir. Yani egere islem sonucunda ortaya cikan sonuc kucuk ise (bizde kucuk, sadece karakter sayisi) shuffle cok az olacaktir. 

Diger taraftan reduce-side islemlerde ise once butun data shuffle edilir ve istenen islem bu butun data uzerinde gerceklestirilir. groupByKey yaptigimizda tum data gruplara ayrilacak ve tekrar partition olacaktir. Bu da buyuk miktarda shuffle demek olacaktir. 

Yani eger bizim gerceklestirmek istedigimiz islem divide-and-conquer tarzi, datanin kucuk parcaciklarina uygulanip, sonuc alinip, daha sonra bu sonuclar birlestirilerek gerceklestirilebiliyor ise, yani:
- karakter sayisi alma
- en buyuk/kucuk n elemani bulma

gibi islemler buna ornek olabilir, reduceByKey ile bu islemi yapmak cok buyuk performans artisi getirecektir.

Sonuc olarak bu sekilde karakter counterlari bulabiliyoruz. Ancak her bir karakter icin yeni bir kayit (tuple) olusturuyoruz. Ki bu da executor memorysi uzerinde bir baski olusturabilir (cluster setupina gore belki cok az memoryimiz olabilir). Burada kucuk bir optimizasyon yapilabilir. 


2. Tuple optimizasyonu

Her bir karakterden bir tuple olusturup, bunlari toplamak yerine, text dosyasindaki bir satiri komple isleyip buradan 4 farkli (key, n) seklinde tupleler olusturabiliriz. 

Bu sayede olusturdugumuz ara-data miktarini azaltabilir ve executor uzerindeki memory pressure'i azaltabiliriz. 

def processDnaLine2(line: String): Array[(Char, Int)] = 
    if(line.startsWith(">")){
        Array(('z', 1))      // eger yorum satiri ise bunu z karakteri ile temsil ediyoruz
    }
    else {
        // satir uzerinde scala'ya ait olan grupBy calistirdigimizda, her bir distinct karakter uzerinden
        // gruplama yapacak ve bu distinct karakterden kac adet oldugunu bulabilecegiz
        // bu islemin sonucunda, satirda kac tane distinct karakter varsa, (4 olmasi gerek)
        // o sayida tuple uretiyor olacagiz.
        line.groupBy(c => c.toLower)
              .map(e => (e._1, e._2.length))
             .toArray
    }

val charCounts = rddDna.flatMap(processDnaLine2).reduceByKey(_+_)

Bu kucuk optimizasyon disinda hersey ayni oldugu icin, ben cok fazla bir performans artisi gormedim ama eger input data cok daha buyuk olursa, executorlardaki peak memory'de bir fark gormemiz olasi.


3. mapPartitions

Ilk cozumde her bir karakter icin tuple olsuturup, keyleri ayni olanlari topladik. Ikinci cozumde her bir satiri isleyerek, satir basina tupleler olusturduk ve topladik. Buradaki sikinti su ki, yaptigimiz islemleri cok kucuk datalar uzerinde yapiyoruz ve cikan sonuclari tekrardan driver'a gondermemiz gerekiyor. Burada hem shuffle oluyor hem de driver kendi basina bircok sonucu birlestirmek zorunda kaliyor. 

Executor'lar uzerine daha cok yuk bindirebilmek ve mumkun oludugunca sayma islemini execuyor uzerinde lokal olarak yapip sonucu driver'a gondermemiz gerekiyor. 

Bu noktada, yardimizia mapPartitions metodu yetisiyor. Zaten her bir partition tek bir makine uzerinde bulunuyor. Yani bir partition iki makineye dagilmis sekilde olmaz. Bu durumda biz her bir data partitionu uzerinde sayma islemini yaparsak, sonuclari driver'a gonderirsek, minimum shuffle yapmis oluruz ve de executor uzerinde lokal olarak islemin cogunlugu gerceklesmis olur. 

Her bir data partitionu icerisinde birden fazla satir var. Yani bu son yaklasimda, birden fazla satirdaki distinct karakter sayisini tek seferde sayip, sonuclari toplayacagiz. Birden fazla satir dedigim de, toplamda 19 milyon satir uzerinden, bendeki 43 partition (lokalde calistiriyorum, core sayisina bagli olarak degisiklik gosterebilir) uzerine dagitildiginda, 441bin satir yapmaktadir. 

def processPartition(partition: Iterator[String]): Iterator[(Char, Int)] = {
    // bu sefer iteratif olarak satirlari tarayacagiz 
    / /ve her bir karakter sayini bu dictionary icerisinde bir artiracagiz.
    val distinctCharCounts = mutable.Map.empty[Char, Int].withDefaultValue(0)

    // partition icerisindeki kayitlar (dna satirlari) uzerinde donelim
    partition.foreach(dnaLine =>
        if(dnaLine.startsWith(">")){
            distinctCharCounts("z") += 1
        }
        else {
            // satir icindeki karakterler uzerinde donerek,
            // her bir karakterin sayisini 1 artiralim
            dnaLine.foreach(chr => distinctCharCounts(chr)+=1)
        }

    // mapPartitions metodu iterator dondurmemiz istiyor
    distinctCharCounts.iterator
}

// ve yine her bir partitionu isledikten sonra
// olusan (key, val) degerlerini key bazli olarak toplayarak sonucu ekde ediyoruz
val charCounts = rddDna.mapPartitions(processPartition)reduceByKey(_+_)

Bu son cozumde goruldugu uzere, her bir partitionu lokal olarak isleyip, her bir partition'dan sadece 4 tane tuple uretmis olduk. Boylece driver'a gonderilmesi gereken tuple sayisi cok azalmis oldu, shuffle azalmis oldu. 

Bu metot ile toplam calisma zamanini 4.5 dakikadan 1.5 dakikaya indigine ben sahit oldum. 


Data Summary Pattern

Ozet olarak, eger buyuk miktardaki data'dan, kucuk (condensed) bilgi cikarmak gerekiyorsa, ve de yapilacak islem divide-and-conquer seklinde once kucuk parcalara uygulanip sonra birlestirilebilecek bir islem ise, mapPartitions kullanarak bunu executor seviyesinde lokal olarak yapmak cok buyuk performans artisi saglayacaktir. 


Diger bir data pattern'de gorusmek uzere, hoscakalin. 


Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

SD #1: Scalability

Threat Modeling 1