Palīdziet vietnes attīstībai, daloties ar rakstu ar draugiem!

Ievads Spark Transformations

Transformācija ir funkcija, kas atgriež jaunu RDD, modificējot esošo(s) RDD. Ievades RDD netiek modificēts, jo RDD ir nemainīgi. Visas transformācijas Spark izpilda slinkā veidā – rezultāti netiek aprēķināti uzreiz. Pārveidojumu aprēķināšana notiek tikai tad, kad tiek veikta noteikta darbība RDD.

Spark transformāciju veidi

Tās ir plaši iedalītas divos veidos:

  • Šaura transformācija: visi dati, kas nepieciešami ierakstu aprēķināšanai vienā nodalījumā, atrodas vienā vecāk-RDD nodalījumā. Tas notiek šādu metožu gadījumā:

karte(), flatMap(), filtrs(), paraugs(), savienība() utt.

  • Plaša transformācija: visi dati, kas nepieciešami ierakstu aprēķināšanai vienā nodalījumā, atrodas vairākos vecāku RDD nodalījumos. Tas notiek šādu metožu gadījumā:

distinct(), groupByKey(), reductionByKey(), join() , repartition() utt.

Spark transformāciju piemēri

Šeit mēs apspriežam tālāk minētos piemērus.

1. Šauras pārvērtības

  • map(): šī funkcija izmanto funkciju kā parametru un piemēro šo funkciju katram RDD elementam.

Kods:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelise(masīvs(10,15,50,100))
"println(Pamata RDD ir:)
">"rdd.foreach(x=print(x+ ))
println()
>val rddNew=rdd.map(x=x+10)
"println(RDD pēc MAP metodes pielietošanas:)
""rddNew.foreach(x=>print(x+ ))

Izvade:

Iepriekš minētajā MAP metodē katrs elements tiek pievienots ar 10, un tas tiek atspoguļots izvadē.

  • FlatMap(): tā ir līdzīga kartei, taču tā var ģenerēt vairākus izvades vienumus, kas atbilst vienam ievades vienumam. Tādējādi funkcijai ir jāatgriež secība, nevis viens vienums.

Kods:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=jauns SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelise(Masīvs(1:2:3,4:5:6))
">"val rddNew=rdd.flatMap(x=x.split(:))
"rddNew.foreach(x=>print(x+ ))

Izvade:

Šī funkcija tika nodota kā parametrs sadala katru ievadi ar “:” un atgriež masīvu, un FlatMap metode izlīdzina masīvu.

  • filter(): Tā izmanto funkciju kā parametru un atgriež visus RDD elementus, kuriem funkcija atgriež true.

Kods:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd=sc.parallelise(Array(com.whatsapp.prod,com.facebook.prod,com.instagram.prod,com.whatsapp.test))
""println(Pamata RDD ir:)
">"rdd.foreach(x=print(x+ ))
println()
>"val rddNew=rdd.filter (x=!x.contains(test))
"println(RDD pēc MAP metodes pielietošanas:)
""rddNew.foreach(x=>print(x+ ))

Izvade:

Iepriekš minētajā kodā mēs ņemam virknes, kurās nav vārda “test”.

  • sample(): Tas atgriež daļu datu, ar vai bez aizstāšanas, izmantojot noteiktu nejaušo skaitļu ģeneratora sākumu (tas gan nav obligāti).

Kods:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=jauns SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelise(Masīvs(1,2,3,4,5,6,7,10,11,12,15,20,50))
val rddNew=rdd.sample(false,.5)
"rddNew.foreach(x=>print(x+ ))

Izvade:

Iepriekš minētajā kodā mēs iegūstam izlases veida paraugus bez aizstāšanas.

  • union(): Tas atgriež avota RDD un RDD savienojumu, kas nodots kā parametrs.

Kods:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=jauns SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelise(Masīvs(1,2,3,4,5))
val rdd2=sc.parallelize(Masīvs(-1,-2,-3,-4,-5))
val rddUnion=rdd.union(rdd2)
"rddUnion.foreach(x=>print(x+ ))

Izvade:

Rezultātā RDD rddUnion ir visi elementi no rdd un rdd2.

2. Plašas pārvērtības

  • distinct(): Šī metode atgriež atšķirīgos RDD elementus.

Kods:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
"val rdd=sc.parallelise(masīvs(1,1,3,4,5,5,5))
"println(Pamata RDD ir:)
">"rdd.foreach(x=print(x+ ))
println()
val rddNew=rdd.distinct()
"println(RDD pēc MAP metodes pielietošanas:)
""rddNew.foreach(x=>print(x+ ))

Izvade:

izvadā tiek iegūti atšķirīgie elementi 4,1,3,5.

  • groupByKey(): Šī funkcija ir piemērojama pāru RDD. Pāra veida RDD ir tāds, kura katrs elements ir kortežs, kur pirmais elements ir atslēga, bet otrais elements ir vērtība. Šī funkcija sagrupē visas atslēgai atbilstošās vērtības.

Kods:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=jauns SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Masīvs((a,1),(b,2),(a,3),(b,10),(a,100)))
"val rddNew=rdd.groupByKey()
"rddNew.foreach(x=>print(x+ ))

Izvade:

Kā paredzēts, visas taustiņu “a” un “b” vērtības ir sagrupētas kopā.

  • reduceByKey(): Šī darbība ir piemērojama arī pāra veida RDD. Tajā tiek apkopotas vērtības katrai atslēgai saskaņā ar piegādāto samazināšanas metodi, kurai ir jābūt šāda veida (v,v)=v.

Kods:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=jauns SparkContext(conf)
"sc.setLogLevel(WARN)
""val rdd=sc.parallelize(Masīvs((a,1),(b,2),(a,3),(b,10),(a,100),(c,50)))
">val rddNew=rdd.reduceByKey((x,y)=x+y )
"rddNew.foreach(x=>print(x+ ))

Izvade:

Iepriekšminētajā gadījumā mēs summējam visas atslēgas vērtības.

  • join(): Savienojuma darbība ir piemērojama pāra veida RDD. Savienošanas metode apvieno divas datu kopas, kuru pamatā ir atslēga.

Kods:

"val conf=new SparkConf().setMaster(local).setAppName(testApp)"
val sc=SparkContext.getOrCreate(conf)
"sc.setLogLevel(ERROR)
""val rdd1=sc.parallelise(Masīvs((atslēga1,10),(key2,15),(key3,100)))
""val rdd2=sc.parallelise(Masīvs((atslēga2,11),(key2,20),(key1,75)))
"val rddJoined=rdd1.join(rdd2)
"println(RDD pēc pievienošanās:)
""rddJoined.foreach(x=>print(x+ ))

Izvade:

  • repartition(): Tas nejauši sadala RDD datus to nodalījumu skaitā, kas nodoti kā parametri. Tas var gan palielināt, gan samazināt nodalījumus.

Kods:

"val conf=new SparkConf().setAppName(test).setMaster(local)"
val sc=jauns SparkContext(conf)
"sc.setLogLevel(WARN)
"val rdd=sc.parallelise(masīvs(1,2,3,4,5,10,15,18,243,50),10)
"println(Sadaļas pirms: +rdd.getNumPartitions)
"val rddNew=rdd.repartition(15)
"println(Sadaļas pēc: +rddNew.getNumPartitions)"

Izvade:

Iepriekš minētajā gadījumā mēs palielinām nodalījumus no 10 uz 15.

Palīdziet vietnes attīstībai, daloties ar rakstu ar draugiem!

Kategorija: