Spark——JDBC操作MySQL
创始人
2024-05-20 15:33:10
0

文章目录

    • JDBC操作MySQL
    • JDBC读取数据方式
    • JDBC读取MySQL数据

JDBC操作MySQL

在实际的企业级开发环境中,如果数据规模特S别大,此时采用传统的SQL语句去处理的话一般需要分成很多批次处理,而且很容易造成数据库服务宕机,且实际的处理过程可能会非常复杂,通过传统的Java EE等技术可能很难或者不方便实现处理算法,此时采用SparkSQL进行分布式分析处理就可以非常好的解决该问题,在生产环境下,一般会在Spark SQL和具体要操作的DB之间加上一个缓冲层次,例如中间使用Redis或者Kafka。

Spark SQL可以通过JDBC从传统的关系型数据库中读写数据,读取数据后直接生成的是DataFrame,然后再加上借助于Spark SQL丰富的API来进行各种操作。从计算数据规模的角度去讲,集群并行访问数据库数据,调用Data Frame Reader的Format(“JDBC”)的方式说明Spark SQL操作的数据来源是通过JDBC获得,JDBC后端一般都是数据库,例如MySQL、Oracle等。

JDBC读取数据方式

  1. 单Partition(无并发)

    调用函数格式:def jdbc(url: String, table: String, properties: Properties): DataFrame

    • url:代表数据库的JDBC链接地址;
    • table:具体要链接的数据库;

    这种方法是将所有的数据放在一个Partition中进行操作(即并发度为1),意味着无论给的资源有多少,只有一个Task会执行任务,执行效率比较慢,并且容易出现OOM。使用如下,在spark-shell中执行:

    /*此为代码格式,实际中使用应替换相应字段中的内容*/
    val url = "jdbc:mysql://localhost:/database"        
    val tableName = "table"
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")  //实际使用中替换username为相应的用户名
    prop.setProperty("password","pwd")   //实际使用中替换pwd为相应的密码
    
  2. 根据Long类型字段分区

    /*此为代码格式,实际中使用应替换相应字段中的内容*/
    def jdbc(
    url: String,
    table: String,
    columnName: String, // 根据该字段分区,需要为整型,比如 id 等
    lowerBound: Long, // 分区的下界
    upperBound: Long, // 分区的上界
    numPartitions: Int,  //分区的个数
    connectionProperties: Properties): DataFrame
    

    根据字段将数据进行分区,放进不同的Partition中,执行效率较快,但是只能根据数据字段作为分区关键字。使用如下:

    /*此为代码格式,实际中使用应替换相应字段中的内容*/
    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    val columnName = "colName"
    val lowerBound = 1,
    val upperBound = 10000000,
    val numPartitions = 10,
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    

    将字段 colName 中发 1~10000000 条数据分区到 10 个 Partition 中。

  3. 根据任意类型字段分区

    /*此为代码格式,实际中使用应替换相应字段中的内容*/
    jdbc(
    url: String,
    table: String,
    predicates: Array[String],
    connectionProperties: Properties): DataFrame
    

    以下使用时间字段进行分区:

    /*此为代码格式,实际中使用应替换相应字段中的内容*/
    val url = "jdbc:mysql://mysqlHost:3306/database"
    val tableName = "table"
    // 设置连接用户&密码
    val prop = new java.util.Properties
    prop.setProperty("user","username")
    prop.setProperty("password","pwd")
    /**
    * 将 9 月 16-12 月 15 三个月的数据取出,按时间分为 6 个 partition
    * 为了减少事例代码,这里的时间都是写死的
    * modified_time 为时间字段
    */
    val predicates =
    Array(
    "2015-09-16" -> "2015-09-30",
    "2015-10-01" -> "2015-10-15",
    "2015-10-16" -> "2015-10-31",
    "2015-11-01" -> "2015-11-14",
    "2015-11-15" -> "2015-11-30",
    "2015-12-01" -> "2015-12-15"
    ).map {
    case (start, end) =>
    s"cast(modified_time as date) >= date '$start' " + s"AND cast(modified_time
    as date) <= date '$end'"
    }
    

    这种方法可以使用任意字段进行分区,比较灵活,适用于各种场景。以MySQL 3000W数据量为例,如果单分区count,若干分钟就会报OOM;如果分成5~20个分区后,count操作只需要2s,效率会明显提高,这里就凸显出JDBC高并发的优势。Spark高并发度可以大幅度提高读取以及处理数据的速度,但是如果设置过高(大量的Partition同时读取)也可能会将数据源数据库宕掉。

JDBC读取MySQL数据

下面来进行实际操作,首先需要配置MySQL

  • 免密登陆:mysql -uroot
  • 查看数据库:show databases;
  • 使用MySQL数据库:use mysql;

修改表格的权限,目的是为了使其他主机可以远程连接 MySQL,通过此命令可以查看访问用户允许的主机名。

  • 查看所有用户及其host:select host, user from user;
  • 将相应用户数据表中的host字段改成’%':update user set host="%" where user="root";
  • 刷新修改权限flush privileges;

通过命令修改host为%,表示任意IP地址都可以登录。出现ERROR 1062 (23000): Duplicate entry '%-root' for key 'PRIMARY',是因为 user+host 是主键,不能重复,可以不用理会。也可通过以下命令删除user 为空的内容来解决:delete from user where user='';

在MySQL创建数据库和表格,插入数据,查看

create database test;    //创建数据库test
use test;        //进入数据库test
create table people( name varchar(12), age int);         //创建表格people并构建结构
insert into people values ("Andy",30),("Justin",19),("Dela",25),("Magi",20),("Pule",21),("Mike",12);        //向people表中插入数据
select * from people;         //输出people表中全部数据

编写代码读取MySQL表中数据

//导入依赖环境
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.{DataFrame, SQLContext}
import java.util.Properties
val url = "jdbc:mysql://localhost/test"       //MySQL地址及数据库
val username = "root"       //用户名
val sqlContext = new SQLContext(sc)       
sc.setLogLevel("WARN")
val uri = url + "?user=" + username + "&useUnicode=true&characterEncoding=UTF-8"           //设置读取路径及用户名
val properties = new Properties()      //创建JDBC连接信息
properties.put("user","root")
properties.put("driver", "com.mysql.jdbc.Driver")
val df_test: DataFrame = spark.sqlContext.read.jdbc(uri, "people", properties)  //读取数据
df_test.select("name","age").collect().foreach(row => {         //输出数据println("name " + row(0) + ", age" + row(1))
})
df_test.write.mode("append").jdbc(uri,"people",properties)     //向people表中写入读出的数据,相当于people表中有两份一样的数据

相关内容

热门资讯

让人心碎到哭的网名 让人心碎到哭的网名  人总是会经历各种各样的情绪,当我们感到伤心的总是会想要通过各种渠道来摆脱伤心之...
有关梦的网名大全 有关梦的网名大全  网名指在网上使用的名字。由于网络是一个虚拟的世界,为了避免使用真实姓名带来的麻烦...
最好听的男生微信名字 最好听的男生微信名字  一、什么是网名  网名指在网上使用的名字。由于网络是一个虚拟的世界,为了避免...
张自忠将军殉国处 张自忠将军殉国处  区位及景点简介  在湖北襄樊宜城市十里长山上,有一座巍峨的纪念碑,上刻"张上将自...
公司授权委托书 公司授权委托书范本(通用5篇)  公司授权委托书一般包括委托内容、代理人具体信息以及单位名称、双方签...
圭峰山风景名胜区 圭峰山风景名胜区圭峰山风景名胜区座落在新会市会城镇的北面,离市区约3公里,最高山峰达440米,山势雄...
空间动态签名 空间动态签名精选  我很爱笑,我是个活泼可爱的`女孩。  我很爱哭,我是个多愁善感的女孩。  我很爱...
湘菜馆名字 湘菜馆名字  一、什么是名字  名字是指人或者产品、物体的名称,姓名有广义与狭义之分,还有小名、别名...
闹心的微信个性签名 闹心的微信个性签名大全  随着社交平台的兴起,越来越多人喜欢在网上设置个性签名,借助个性签名可以表现...
潮名字微信网名英文 潮名字微信网名英文  微信英文网名(精选270个)  网名指在网上使用的名字。由于网络是一个虚拟的世...
王者荣耀名字2400个 王者荣耀名字2400个  一、王者荣耀简介  《王者荣耀》是由腾讯游戏天美工作室群开发并运行的一款运...
唯美个性签名档 唯美个性签名档  十年以前,你是我的朋友,十年之后,我们就变成陌生人了,唯美个性签名档。下面unjs...
qq名片个性签名汇总   谁若真心对我,我便用命去珍惜她。下面是由yjbys小编为你精心编辑的一份qq名片个性签名汇总,欢...
最新发布的经典个性签名 最新发布的经典个性签名大全  随着社交网络的快速发展,越来越多人喜欢在网上设置个性签名,用以展示自己...
独一无二的二字昵称500个 独一无二的二字昵称500个  什么是网名  网名指在网上使用的名字。由于网络是一个虚拟的世界,为了避...
情人节个性签名 情人节个性签名(精选15篇)  随着网络社交的悄悄演进,越来越多人钟情于在社交平台上不定时地更换个性...
推进全民读书活动 推进全民读书活动推进全民读书活动促进社会主义核心价值体系建设王春霞文化是一个民族的根,一个民族的魂,...
简单有创意的车队名字350个 简单有创意的车队名字350个  名字的作用  姓名诱导性格倾向  不同的姓名诱导着不同特点的的性格。...
秦始皇帝陵博物院 秦始皇帝陵博物院欢迎大家来到举世闻名的秦始皇帝陵博物院。它是以秦始皇兵马俑博物馆为基础,以秦始皇帝陵...
湖北省普通话等级考试报名时间... 湖北省2010年普通话等级考试报名时间通知  一、报名  1、凡自愿参加普通话水平测试的个人可直接到...