【大数据】Spark读取Hbase/Elasticsearch/Kafka/Mysql等数据源
创始人
2024-06-02 16:00:24
0

spark读取数据源

  • 配置maven依赖
  • Spark读取mysql数据
    • 环境信息
    • 代码
  • spark读取kafka
    • 环境信息
    • 代码
  • spark读取hdfs数据
    • 环境信息
    • 代码
  • spark读取Hbase数据源
    • 环境信息
    • 代码
  • spark读取es数据源
    • 环境信息
    • 代码

配置maven依赖

2.123.2.13.3.12.12.31.12.77org.apache.sparkspark-sql_${scala.version}${spark.version}org.apache.sparkspark-hive_${scala.version}${spark.version}org.apache.sparkspark-streaming_${scala.version}${spark.version}org.apache.sparkspark-streaming-kafka-0-10_${scala.version}${spark.version}org.apache.sparkspark-sql-kafka-0-10_${scala.version}${spark.version}com.fasterxml.jackson.corejackson-core${jackson.version}com.fasterxml.jackson.corejackson-databind${jackson.version}com.fasterxml.jackson.corejackson-annotations${jackson.version}com.amazonawsaws-java-sdk-s3${s3.version}org.apache.hadoophadoop-aws${hadoop.version}commons-httpclientcommons-httpclient3.1org.elasticsearchelasticsearch-hadoop7.10.2org.apache.sparkspark-*org.apache.hbase.connectors.sparkhbase-spark1.0.0org.apache.hadoophadoop-hdfsorg.apache.sparkspark-*com.thoughtworks.paranamerparanamer2.8org.slf4jslf4j-log4j121.7.2

Spark读取mysql数据

环境信息

  • mysql地址
  • 用户名
  • 密码
  • 库表

代码

def main(args: Array[String]): Unit = {val Array(url, username, password, table) = argsval sparkConf = new SparkConf().setAppName("Spark Mysql Demo (Scala)")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val props = new Properties()props.setProperty("user", username)props.setProperty("password", password)val df: DataFrame = spark.read.jdbc(url, table, props)val rowNumbers: Long = df.count()println("数据总条数据: " + rowNumbers)// select 里的参数位为 mysql 的字段df.select("id").where("id >= 3").show()// 写入数据// df.write.mode(SaveMode.Append).jdbc(url,"tb_02",props)spark.stop()}

spark读取kafka

环境信息

  • brokers地址
  • topics信息
  • 消费组id

代码

  def main(args: Array[String]): Unit = {val Array(brokers, topics, interval, groupId) = argsval sparkConf = new SparkConf().setAppName("Spark Kafka Demo (Scala)")val ssc = new StreamingContext(sparkConf, Seconds(interval.toInt))// kafka参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))// 消息val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array(topics), kafkaParams))// 单词统计val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}

spark读取hdfs数据

环境信息

  • 源地址
  • 读取的分区数
  • 目标地址

代码

  def main(args: Array[String]): Unit = {val Array(src, partition, dest) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HDFS Demo (Scala)")// 1、创建sessionval session: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()// 2、依据sc创建rddval sc: SparkContext = session.sparkContextval file: RDD[String] = sc.textFile(src, partition)file.saveAsTextFile(dest)session.stop()}

spark读取Hbase数据源

环境信息

  • zk的地址
  • zk上的hbase设置的rootDir
  • hbase的master地址
  • table

代码

  def main(args: Array[String]): Unit = {val Array(zookeeper, rootdir, master, table) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HBase Demo (Scala)")// 支持 hive 读写val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val hbaseConfig: Configuration = HBaseConfiguration.create()hbaseConfig.set("hbase.zookeeper.quorum",zookeeper)hbaseConfig.set("hbase.rootdir", rootdir)hbaseConfig.set("hbase.master", master)// 设置查询的表名hbaseConfig.set(TableInputFormat.INPUT_TABLE, table)val stuRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count: Long = stuRDD.count()println("Students RDD Count:" + count)stuRDD.cache()// 遍历输出stuRDD.foreach({ case (_, result) =>val key: String = Bytes.toString(result.getRow)println("Row key:" + key)})spark.stop()}

spark读取es数据源

环境信息

  • es的用户名
  • es的密码
  • es的服务地址
  • es的clusterName,主要是集群权限给关了。需要自己指定
  • es的index

代码

  def main(args: Array[String]): Unit = {val Array(user, password, esIp, clusterName, index) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark Es Demo (Scala)").setMaster("local[*]")sparkConf.set("cluster.name", clusterName)sparkConf.set("es.internal.es.cluster.name", clusterName)sparkConf.set("es.internal.es.version", "7.12") // 防止 security_exception: action [cluster:monitor/main] is unauthorizedsparkConf.set("es.index.auto.create", "true")sparkConf.set("es.nodes", esIp)sparkConf.set("es.port", "9200")sparkConf.set("es.mapping.date.rich", "false")sparkConf.set("es.index.read.missing.as.empty", "true")sparkConf.set("es.net.http.auth.user", user) //访问es的用户名sparkConf.set("es.net.http.auth.pass", password) //访问es的密码sparkConf.set("es.nodes.wan.only", "true")sparkConf.set("es.index.read.allow.red.status", "true") // 防止 security_exception: action [cluster:monitor/health] is unauthorizedval sc = new SparkContext(sparkConf)write2Es(sc, index)read2Es(sc, index)sc.stop()}def write2Es(sc: SparkContext, index: String): Unit = {val numbers: Map[String, String] = Map("jsIp" -> "11111","address" -> "11111", "enterprise" -> "北京","xian" -> "11111", "ip" -> "11111","source" -> "11111", "sheng" -> "11111","phone" -> "11111", "shi" -> "11111","ipLong" -> "333", "time" -> "2022-12-27 09:56:50","qsIp" -> "11111", "contacts" -> "11111","email" -> "11111@163.com")val rdd: RDD[Map[String, Any]] = sc.makeRDD(Seq(numbers))EsSpark.saveToEs(rdd, s"${index}/_doc")println("--------------------End-----------------")}def read2Es(sc: SparkContext, index: String): Unit = {val rdd: RDD[(String, collection.Map[String, AnyRef])] = EsSpark.esRDD(sc, s"${index}/_doc")println("------------------rdd.count():" + rdd.count())rdd.foreach(line => {val key: String = line._1val value: collection.Map[String, AnyRef] = line._2println("------------------key:" + key)for (tmp <- value) {val key1: String = tmp._1val value1: AnyRef = tmp._2println("------------------key1:" + key1)println("------------------value1:" + value1)}})}

希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您

相关内容

热门资讯

最新公司年会主持词 最新公司年会主持词  主持词已成为各种演出活动和集会中不可或缺的一部分。在当下的社会中,主持词的实用...
毕业晚会主持词结束语 毕业晚会主持词结束语  以下是由应届毕业生网PQ小编为大家整理出来的2016年毕业晚会主持词结束语,...
启动会主持词 启动会主持词  利用在中国拥有几千年文化的诗词能够有效提高主持词的感染力。在现在的社会生活中,主持人...
七十大寿祝寿主持词最新 七十大寿祝寿主持词最新  根据活动对象的不同,需要设置不同的主持词。在当下的中国社会,主持词的实用频...
十一国庆节晚会主持词 十一国庆节晚会主持词  在国庆节需要举办相关的晚会,那么晚会的主持词应该怎么写呢?下面是小编分享给大...
中学秋季开学典礼主持词串词 中学秋季开学典礼主持词串词  暑假如同一部赏心悦目的电影,看完了,请你把美好的记忆珍藏在心中,开学日...
适合父亲节的歌曲 适合父亲节的歌曲适合父亲节的歌曲推荐Eric Clapton的.My Father's Eyes,很...
我们的节日端午主持稿 我们的节日端午主持稿(通用12篇)  在当下社会,很多地方都会使用到主持稿,主持稿具有语言过度自然、...
婚礼主持词开场白   第一篇:  尊敬的各位来宾,  很高兴接受一对新人的委托,主持今天这场爱情的盛典,我是主持人张 ...
幸福额度简介及台词 幸福额度简介及台词  《幸福额度》简介  《幸福额度》由陈正道执导,林志玲、陈坤、廖凡主演的爱情电影...
舞蹈大赛主持词 舞蹈大赛主持词集合七篇  利用在中国拥有几千年文化的诗词能够有效提高主持词的感染力。在人们积极参与各...
团代会闭幕词 团代会闭幕词精选3篇  今天,CN人才网小编给大家分享的是团代会闭幕词精选3篇,希望对大家有帮助。 ...
职工代表大会主持词 职工代表大会主持词  ×××××第二十一届四次职工代表大会  开幕式主持词  各位代表:  根据大会...
婚礼主持词(精选10篇) 婚礼主持词(精选10篇)  主持词的格式及写法  1.标题  一般情况下,只需写明“主持词”即可,也...
农村婚庆司仪主持词 农村婚庆司仪主持词  主持词可以采用和历史文化有关的表述方法去写作以提升活动的文化内涵。在现今人们越...
泰坦尼克号英文台词 泰坦尼克号英文台词  导语:泰坦尼克号以1912年泰坦尼克号邮轮在其处女启航时触礁冰山而沉没的事件为...
宋小宝的小品烤串台词 宋小宝的小品烤串台词  宋小宝是我国著名的喜剧演员,下面一起来欣赏宋小宝的小品烤串台词!欢迎阅读! ...
年度总结大会主持稿 年度总结大会主持稿(通用9篇)  总结是事后对某一时期、某一项目或某些工作进行回顾和分析,从而做出带...
会晚宴主持词 主题:突破·创新主持:A: B: C: D:放音乐 暖场1 暖场2晚会开始前五分钟 此音乐完毕后主持...
新世纪福音战士渚薰的经典台词 新世纪福音战士渚薰的经典台词  新世纪福音战士渚薰是具有生命之果的使徒,和凌波丽一样,是一个人造人。...