Spark #12 : Kay-value metodlari


Evet pekcok transformasyonu gorduk, bircok aksyionu tanidik. Ancak bazi durumlarda islemleri optimize etmek icin kay-value pair'lerinden faydalanabiliyoruz. 

Pair RDD'ler uzerinde uygulanabilecek ekstra metotlar su sekildedir:

collectAsMap
Normal siradan collect gibi ama key-val'lerden bir map (dictionary) olusturuyor.

mapValues / flatMapValues
sadece value tarafindan islem goren map fonksyonu.

reduceByKey
Ayni key'e sahip elemanlar uzerinde bir reduce operasyonu calistiriliyor. Taa ilk bastaki word count orneginde bu metot ile ayni kelimeden metin icerisinde kac tane gectigini hesaplamistik. Tekrardan bir ornek yaparsak,

Ayni key'e sahip olan elemanlar tek bir partition'a sigmiyor olabilir. Hatta ayni makine uzerinde bile olmayabilir. Bu durumda bir all-to-all operasyon ile data shuffle edilir ve ayni key'e sahip elemanlar tum partitionlar'dan okunarak reduce edilir. 

foldByKey / aggregateByKey
fold ve aggregate metotlari gibi olsa da key ile gruplanmis halleri gini dusunulebilir. Ayrica fold ve aggergate birer action iken, bu metotlar birer transformasyondur. Shuffle gerektirme ihtimalleri vardir. 

combineByKey
Yukaridaki tum metotlarin atasi pasasi budur. Ancak cogu zaman bu derece bir abstract metot ihtiyac dahilinde degildir. 

join
2 RDD'yi ayni sql'de iki tabloyu join eder gibi join etmek mumkun. Cok basit bir ornek ile:

scala> val t = sc.parallelize(Seq(("a", 1), ("a", 2), ("a", 3), ("b", 5), ("c", 6)))
t: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[0] at parallelize at <console>:24

scala> val q = sc.parallelize(Seq(("a", 12), ("b", 99)))
q: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[3] at parallelize at <console>:24

scala> t.join(q).collect
res2: Array[(String, (Int, Int))] = Array((a,(1,12)), (a,(2,12)), (a,(3,12)), (b,(5,99)))

Burada dikkatimizi ceken sey, default olarak inner join kullanilmasidir. Yani her iki RDD'de de olan elemanlardan olusan bir sonuc elde ediyoruz. Bunun yaninda fullOuterJoin, leftOuterJoin, rightOuterJoin gibi seceneklerimiz de mevcut. (bunlar da birer metot)

cogroup / groupWith
Bu da RDD'leri key'ler uzerinden birlestiren bir metot. Ayni key'e sahip olan elemanlar ayni satirda gruplaniyor. Yukaridaki ornekte kullandigimiz t ve q RDD'leri uzerinden :

scala> t.cogroup(q).collect
res3: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((a,(CompactBuffer(1, 2, 3),CompactBuffer(12))), (b,(CompactBuffer(5),CompactBuffer(99))), (c,(CompactBuffer(6),CompactBuffer())))

Buradaki CompactBuffer ise array tarzindan bir veri tipi. Yani sonuc olarak iki RDD uzerinde bulunan tum key'leri barindiran ve her bir key icin iki RDD'nin de katkilarini ayri ayri (her satir) gruplayan bir sonuc elde ediyoruz. 





Yorumlar

Bu blogdaki popüler yayınlar

Python'da Multithreading ve Multiprocessing

Threat Modeling 1

Encoding / Decoding