Spark #7 : Data okuma
Datayi arap faikten aliyorduk
Spark bircok farkli mecradan data okuyabilmektedir. Normal file systems (lokal makine veya uzak makine), Amazon S3 ve HDFS gibi distributed file systems, database, distributed database ve hatta direk olarak memory'den de data yuklenebilmektedir. Spark, datanin cigerini sokmektedir.
Ayrid dosya formati olarak da, text, csv gibi plain formatlar oldugu gibi, parquet (dassagini yedigimiz) ve avro gibi de daha gelismis formatlar okunabilir.
Tatavamizi kenara birakip shell'imizi aciyoruz:
> sc.parallelize(1 to 100)
> res0: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:25
Goruldugu gibi 1'den 100'e kadar olan sayilari barindiran bir listeyi RDD icerisine yukledik. Burada bir kucuk detay da spark-shell bu olusturulan RDD'yi biz bir degiskene atamasak da, res0 adli degiskene atadi. Buna daha sonra atifta bulunabiliriz:
> res0.collect
> res1: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100)
Burada parallelize metodu ile listeyi distribute ettik ve daha sonra da collect metodu ile spark-shell driver'a geri topladik.
Burada siranin da korundugunu gormekteyiz. Tum RDD traverse islemleri order'i korur, map, filter, flatMap gibi. Bunun yaninda join, partitionBy, sortBy gibi islemler dogasi geregi order'i degistirir.
Bu arada datanin kac partition'a ayrildigini gormek icin res0.partitions.size metodunu calistirabiliriz. Bu sayi, hafizadan yuklenen datalarda clusterdaki core sayisina gore belirlenir. Bende 6 cikiyor o acidan. Hatta her partition'da barinan datayi da listeleyebiliriz. Simdilik biz lokalde calistiyoruz ama normalde bu partition'lar, cluster'daki makinelere dagilmis olarak bulunacakti. Birtek bana mi cok heyecan verici geliyor bu aq :D
scala> res0.partitions.size
res2: Int = 6
Her bir partiondaki data:
scala> res0.foreachPartition(p => println(p.mkString(",")))
1,2,3,4,5,6,7,8,9,10,11,12,13,14,15,16
34,35,36,37,38,39,40,41,42,43,44,45,46,47,48,49,50
17,18,19,20,21,22,23,24,25,26,27,28,29,30,31,32,33
84,85,86,87,88,89,90,91,92,93,94,95,96,97,98,99,100
67,68,69,70,71,72,73,74,75,76,77,78,79,80,81,82,83
51,52,53,54,55,56,57,58,59,60,61,62,63,64,65,66
Tabi burada orderin korunmadigini goruyoruz. Hatta bur foreachPartition metodunu birden fazla kez calistirdiginizda partition'larin orderinin da degistigini gorebilirsiniz. Bu noktayi belki biraz daha arastirmak gerekebilir. Eee, usta olmak kolay degil yegenim.
Dosyadan Okuma ve Okutma
Ama genellikle hafizadan degil de dosyadan gelir datalar. spark-shell'de iken spark.read. dedikten sonra tab'a basarsaniz gelen seceneklerden gorecegiz ki oldukca fazla secenek mevcut.
spark.read.parquet, spark.read.textFile, spark.read.csv bazi ornekler.
Ama bunlarin alayi sc.hadoopRDD adli metoda dayaniyor aslinda. Bu metod cok fazla config aliyor. Bunu kullanarak henuz desteklenmeyen formatlari dahi okutabiliyormusuz (denemedim). Ama su var ki mesela bu metod ile bir XML dosyasi okutmaya calisirsak bize hadoop.streaming.StreamXmlRecordReader adli bir class'a ihtiyac duyariz. Bu da standart spark dagitiminda olmadigi icin ekstra olarak build.sbt'ye eklmemiz ve spark projesini ona gore FAT JAR olarak derlememiz gerekir. Bunu icin de bize bir sbt plug-inin gerekiyor, sbt-assembly.
Sbt Assembly
Bu plugin sayesinde spark ve diger ekstra tum kutuphaneleri tek bir jar icerisinde (Fat Jar ya da Uber Jar adi verilir) toplayip direk spark-submit ile kullanabilir hale geliyoruz.
Plug-ini kurmak icin, kok dizine project klasorune assembly.sbt adli bir dosya olusturup:
addSbtPlugin("com.eed3si9n" % "sbt-assembly" % "0.14.10")
ekliyoruz. Daha sonra build.sbt dosyasina da su eklemeyi yaparsak tamamdir.
name := "spark-deneme"
version := "0.1"
scalaVersion := "2.11.12"
val sparkVersion = "2.3.4"
libraryDependencies ++= Seq(
"org.apache.spark" %% "spark-core" % sparkVersion % Provided,
"org.apache.spark" %% "spark-sql" % sparkVersion % Provided,
"org.apache.hadoop" % "hadoop-streaming" % "2.7.0"
)
assemblyJarName in assembly := s"${name.value.replace(' ', '-')}-${version.value}.jar"Daha sonra sbt assembly komutu ile fat jar'imizi olusturabiliriz. Bunu da daha sonra diger tum ekstra kutuphaneleri de barindiracak sekile spark-submit ile clustera'a gondereibliriz.
to be dewamked...
Yorumlar
Yorum Gönder