RELATEED CONSULTING
相关咨询
选择下列产品马上在线沟通
服务时间:8:30-17:00
你可能遇到了下面的问题
关闭右侧工具栏

新闻中心

这里有您想知道的互联网营销解决方案
怎么理解spark的自定义分区和排序及spark与jdbc-创新互联

这篇文章将为大家详细讲解有关怎么理解spark的自定义分区和排序及spark与jdbc,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

创新互联坚持“要么做到,要么别承诺”的工作理念,服务领域包括:成都网站建设、成都网站设计、企业官网、英文网站、手机端网站、网站推广等服务,满足客户于互联网时代的儋州网站设计、移动媒体设计的需求,帮助企业找到有效的互联网解决方案。努力成为您成熟可靠的网络建设合作伙伴!
//自定义分区
import org.apache.spark.SparkConf
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.Partitioner
object PrimitivePartitionTest {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf
    conf.setMaster("local[2]").setAppName("Partitioner")
    val context = new SparkContext(conf)
    val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)
    //实例化类,并设置分区类
    val partitioner = new CustomPartitioner(2)
    val rdd1 = rdd.partitionBy(partitioner)
    rdd1.saveAsTextFile("c:\\partitioner")
    context.stop()    
  }
}
//自定义分区类继承spark的Partitioner
class CustomPartitioner(val partitions:Int ) extends Partitioner{
     
    def numPartitions: Int= this.partitions
   
    def getPartition(key: Any): Int={
      if(key.toString().length()<=2)
        0
      else
        1      
    }
}
//自定义排序
package hgs.spark.othertest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import scala.math.Ordered
//自定义排序第一种实现方式,通过继承ordered
class Student(val name:String,var age:Int) extends Ordered[Student] with Serializable{
  def compare(that: Student): Int={
    return this.age-that.age
  }
}
class Boy(val name:String,var age:Int) extends  Serializable{
  
}
//第二种方式通过实现隐式转换实现
object MyPredef{
  implicit def toOrderBoy = new Ordering[Boy]{
   def compare(x: Boy, y: Boy): Int={
     x.age - y.age
   }
  }
}
//引入隐式转换
import MyPredef._
object CutstomOrder {
   def main(args: Array[String]): Unit = {
     val conf = new SparkConf()
     conf.setMaster("local[2]").setAppName("CutstomOrder")
     val context = new SparkContext(conf)
     val rdd = context.parallelize(List(("hgs",2),("wd",44),("cm",99),("zz",100),("xzhh",67)), 2)
     //下面的第二个参数false为降序排列
     //val rdd_sorted = rdd.sortBy(f=>new Student(f._1,f._2), false, 1)
     val rdd_sorted = rdd.sortBy(f=>new Boy(f._1,f._2), false, 1)
     rdd_sorted.saveAsTextFile("d:\\ordered")
     context.stop()
   } 
}
//JDBC
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
import org.apache.spark.rdd.JdbcRDD
import java.sql.Connection
import java.sql.DriverManager
import java.sql.ResultSet
import scala.collection.mutable.ListBuffer
object DataFromJdbcToSpark {
  def main(args: Array[String]): Unit = {
     val conf = new SparkConf()
    conf.setMaster("local[2]").setAppName("BroadCastTest")
    val context = new SparkContext(conf)
    val sql = "select name,age from test where id>=? and id <=?"
    var list = new ListBuffer[(String,Int)]()
    //第七个参数是一个自定义的函数,spark会调用该函数,完成自定义的逻辑,y的数据类型是ResultSet,该函数不可以想自己定义的数组添加数据,
    //应为应用的函数会将结果保存在JdbcRDD中
    val jdbcRDD = new JdbcRDD(context,getConnection,sql,1,8,2,y=>{
    (y.getString(1),y.getInt(2))       
    })
     
     println(jdbcRDD.collect().toBuffer)
     context.stop()
    
  }
  
    def getConnection():Connection={
    Class.forName("com.mysql.jdbc.Driver")
    val  conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");
    conn
  }
}
//----------------------------------------------------------------------
package hgs.spark.othertest
import java.sql.Connection
import java.sql.DriverManager
import org.apache.commons.dbutils.QueryRunner
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
//将spark计算后的结果录入数据库
object DataFromSparktoJdbc {
  
  def main(args: Array[String]): Unit = {
    
    val conf = new SparkConf
    conf.setMaster("local[2]").setAppName("DataFromSparktoJdbc")
    val context = new SparkContext(conf)
    val addressrdd= context.textFile("d:\\words")
    val words = addressrdd.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_)
    //println(words.partitions.size)
    var p:Int =0
    words.foreachPartition(iter=>{
      //每个分区一个链接
      val qr = new QueryRunner()
      val conn = getConnection
      println(conn)
      val sql = s"insert into words values(?,?)"
      //可以修改为批量插入效率更高
      while(iter.hasNext){
        val tpm = iter.next()  
        val obj1 :Object = tpm._1
        val obj2 :Object = new Integer(tpm._2)
        //obj1+conn.toString()可以看到数据库的插入数据作用有三个不同的链接
        qr.update(conn, sql,obj1+conn.toString(),obj2)
      }
      //println(conn)
      //println(p)
      conn.close()
      
    })
    words.saveAsTextFile("d:\\wordresult")
  }
  def getConnection():Connection={
    Class.forName("com.mysql.jdbc.Driver")
    val  conn = DriverManager.getConnection("jdbc:mysql://192.168.6.133:3306/hgs","root","123456");
    conn
  }
  
}
//广播变量
package hgs.spark.othertest
import org.apache.spark.SparkConf
import org.apache.spark.SparkContext
object BroadCastTest{
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
    conf.setMaster("local[2]").setAppName("BroadCastTest")
    val context = new SparkContext(conf)
    val addressrdd= context.textFile("d:\\address")
    val splitaddrdd =     addressrdd.map(x=>{      
      val cs = x.split(",")
      (cs(0),cs(1))
    }).collect().toMap
    //广播变量,数据被缓存在每个节点,减少了节点之间的数据传送,可以有效的增加效率,广播出去的可以是任意的数据类型
    val maprdd = context.broadcast(splitaddrdd)
    val namerdd = context.textFile("d:\\name")
    
    val result = namerdd.map(x=>{
      //该出使用了广播的出去的数组
      maprdd.value.getOrElse(x, "UnKnown")      
    })
    println(result.collect().toBuffer)
    context.stop()
  }
}
其他一些知识点
1.spark 广播变量 rdd.brodcastz(rdd),广播变量的用处是将数据汇聚传输到各个excutor上面
	,这样在做数据处理的时候减少了数据的传输
2.wordcount程序
	context.textFile(args(0),1).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_) 
	wordcount程序代码,一个wordcount会产生5个RDD
	sc.textFile() 会产生两个RDD 1.HadoopRDD-> MapPartitionsRDD
	   flatMap() 会产生MapPartitionsRDD
	   map 会产生MapPartitionsRDD
	   reduceByKey 产生ShuuledRDD
	   saveAsTextFile
   
3.缓存数据到内存 rdd.cache   清理缓存 rdd.unpersist(true),rdd.persist存储及级别 cache方法调用的是persist方法
4.spark 远程debug,需要设置sparkcontext.setMaster("spark://xx.xx.xx.xx:7077").setJar("d:/jars/xx.jar")

关于怎么理解spark的自定义分区和排序及spark与jdbc就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。


当前题目:怎么理解spark的自定义分区和排序及spark与jdbc-创新互联
新闻来源:http://scpingwu.com/article/cshipe.html