Spark 9 : Transformations-2

 Baska bir transformations kosesinde daha birlikteyiz. 

mapPartitions
Bizim datamiz node'lar uzerinde cluster'da dagitik durumdadir. Bunun uzerinde bir map islemi yapar isek, her bir node, kendi uzerindeki data icin her bir elemana verilen transformasyonu uygulayacaktir. Ama mesela bir database baglantisi yapilmasi gerekiyor ise bu durumda her bir item icin yeni bir database baglantisi gerekecektir. Bu da performans acisindan problem dogurur. Ama mapPartitions kullanarak, verilen bir transformasyona o partition'daki tun datayi bir seferde verebiliriz. 

Cok uzattim ama boylelikle her bir partition icin sadece tek bir database baglantisi olusturma imkani yakalariz. 

> sc.parallelize(1 to 100)
res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25

> scala> res0.partitions.size
res1: Int = 6

scala> res0.mapPartitions(partition => {
    // partition oncesi hazirlik
    partition.map(item => /// biseyler yap)

    // partition islenmesi sonrasi, kapanis
})

buradaki partition'u arguman olarak alan callback fonksiyonu, o RDD kac tane partition varsa o sayida calistirilir. 

union
Iki veya daha fazla RDD birlestirilebilir.
rdd1.union(rdd2) seklinde yeni bir RDD olusturabiliriz.
Eger daha fazla RDD birlestirilecekse, sc.union(rdd1, rdd2, ... ) seklinde yapilabilir. 

Ek olarak yine benzer sekilde intersect ve substract islemleri de uygulanabilir.  

cartesian
Eger RDD'leri birer liste gibi dusunursek, cartesian metodu da iki RDD nin kombinasyonlarini elde etmek yani ic ice iki loop calistirmak gibi dusunulebilir. 

rdd0.map(r1 => rdd2.map(r2 => println(r1, r2)))

seklinde dusunulebilir ama aslinda yukaridaki yontem calismaz. Cunku bir map icerisinden diger bir RDD'ye ulasamayiz. Hemen bir ornek ile pekistirelim:

scala> val r1 = sc.parallelize(1 to 4)
scala> val r2 = sc.parallelize(Array("a", "b", "c"))
scala> r1.cartesian(r2).collect
res12: Array[(Int, String)] = Array((1,a), (1,b), (1,c), (2,a), (2,b), (2,c), (3,a), (3,b), (3,c), (4,a), (4,b), (4,c))

Seklinde sonucu gorebiliriz. 

zip
Iki RDD alip , (ayni uzunlukta) bunlari bir fermuar gibi kapatiyoruz ve tek bir pairRDD elde ediyoruz. 

scala> val r1 = sc.parallelize(1 to 3)
scala> val r2 = sc.parallelize(Array("a", "b", "c"))
scala> r1.zip(r2).collect
res13: Array[(Int, String)] = Array((1,a), (2,b), (3,c))

Seklinde ilk RDD'deki her bir elemani ikinci RDD'deki ayni index'te bulunan eleman ile kaynastirarak yeni bir pairRDD elde edilir.

Aksiyonlarda gorusmek uzere.


Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding