【大数据】Spark读取Hbase/Elasticsearch/Kafka/Mysql等数据源
创始人
2024-06-02 16:00:24
0

spark读取数据源

  • 配置maven依赖
  • Spark读取mysql数据
    • 环境信息
    • 代码
  • spark读取kafka
    • 环境信息
    • 代码
  • spark读取hdfs数据
    • 环境信息
    • 代码
  • spark读取Hbase数据源
    • 环境信息
    • 代码
  • spark读取es数据源
    • 环境信息
    • 代码

配置maven依赖

2.123.2.13.3.12.12.31.12.77org.apache.sparkspark-sql_${scala.version}${spark.version}org.apache.sparkspark-hive_${scala.version}${spark.version}org.apache.sparkspark-streaming_${scala.version}${spark.version}org.apache.sparkspark-streaming-kafka-0-10_${scala.version}${spark.version}org.apache.sparkspark-sql-kafka-0-10_${scala.version}${spark.version}com.fasterxml.jackson.corejackson-core${jackson.version}com.fasterxml.jackson.corejackson-databind${jackson.version}com.fasterxml.jackson.corejackson-annotations${jackson.version}com.amazonawsaws-java-sdk-s3${s3.version}org.apache.hadoophadoop-aws${hadoop.version}commons-httpclientcommons-httpclient3.1org.elasticsearchelasticsearch-hadoop7.10.2org.apache.sparkspark-*org.apache.hbase.connectors.sparkhbase-spark1.0.0org.apache.hadoophadoop-hdfsorg.apache.sparkspark-*com.thoughtworks.paranamerparanamer2.8org.slf4jslf4j-log4j121.7.2

Spark读取mysql数据

环境信息

  • mysql地址
  • 用户名
  • 密码
  • 库表

代码

def main(args: Array[String]): Unit = {val Array(url, username, password, table) = argsval sparkConf = new SparkConf().setAppName("Spark Mysql Demo (Scala)")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val props = new Properties()props.setProperty("user", username)props.setProperty("password", password)val df: DataFrame = spark.read.jdbc(url, table, props)val rowNumbers: Long = df.count()println("数据总条数据: " + rowNumbers)// select 里的参数位为 mysql 的字段df.select("id").where("id >= 3").show()// 写入数据// df.write.mode(SaveMode.Append).jdbc(url,"tb_02",props)spark.stop()}

spark读取kafka

环境信息

  • brokers地址
  • topics信息
  • 消费组id

代码

  def main(args: Array[String]): Unit = {val Array(brokers, topics, interval, groupId) = argsval sparkConf = new SparkConf().setAppName("Spark Kafka Demo (Scala)")val ssc = new StreamingContext(sparkConf, Seconds(interval.toInt))// kafka参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))// 消息val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array(topics), kafkaParams))// 单词统计val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}

spark读取hdfs数据

环境信息

  • 源地址
  • 读取的分区数
  • 目标地址

代码

  def main(args: Array[String]): Unit = {val Array(src, partition, dest) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HDFS Demo (Scala)")// 1、创建sessionval session: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()// 2、依据sc创建rddval sc: SparkContext = session.sparkContextval file: RDD[String] = sc.textFile(src, partition)file.saveAsTextFile(dest)session.stop()}

spark读取Hbase数据源

环境信息

  • zk的地址
  • zk上的hbase设置的rootDir
  • hbase的master地址
  • table

代码

  def main(args: Array[String]): Unit = {val Array(zookeeper, rootdir, master, table) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HBase Demo (Scala)")// 支持 hive 读写val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val hbaseConfig: Configuration = HBaseConfiguration.create()hbaseConfig.set("hbase.zookeeper.quorum",zookeeper)hbaseConfig.set("hbase.rootdir", rootdir)hbaseConfig.set("hbase.master", master)// 设置查询的表名hbaseConfig.set(TableInputFormat.INPUT_TABLE, table)val stuRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count: Long = stuRDD.count()println("Students RDD Count:" + count)stuRDD.cache()// 遍历输出stuRDD.foreach({ case (_, result) =>val key: String = Bytes.toString(result.getRow)println("Row key:" + key)})spark.stop()}

spark读取es数据源

环境信息

  • es的用户名
  • es的密码
  • es的服务地址
  • es的clusterName,主要是集群权限给关了。需要自己指定
  • es的index

代码

  def main(args: Array[String]): Unit = {val Array(user, password, esIp, clusterName, index) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark Es Demo (Scala)").setMaster("local[*]")sparkConf.set("cluster.name", clusterName)sparkConf.set("es.internal.es.cluster.name", clusterName)sparkConf.set("es.internal.es.version", "7.12") // 防止 security_exception: action [cluster:monitor/main] is unauthorizedsparkConf.set("es.index.auto.create", "true")sparkConf.set("es.nodes", esIp)sparkConf.set("es.port", "9200")sparkConf.set("es.mapping.date.rich", "false")sparkConf.set("es.index.read.missing.as.empty", "true")sparkConf.set("es.net.http.auth.user", user) //访问es的用户名sparkConf.set("es.net.http.auth.pass", password) //访问es的密码sparkConf.set("es.nodes.wan.only", "true")sparkConf.set("es.index.read.allow.red.status", "true") // 防止 security_exception: action [cluster:monitor/health] is unauthorizedval sc = new SparkContext(sparkConf)write2Es(sc, index)read2Es(sc, index)sc.stop()}def write2Es(sc: SparkContext, index: String): Unit = {val numbers: Map[String, String] = Map("jsIp" -> "11111","address" -> "11111", "enterprise" -> "北京","xian" -> "11111", "ip" -> "11111","source" -> "11111", "sheng" -> "11111","phone" -> "11111", "shi" -> "11111","ipLong" -> "333", "time" -> "2022-12-27 09:56:50","qsIp" -> "11111", "contacts" -> "11111","email" -> "11111@163.com")val rdd: RDD[Map[String, Any]] = sc.makeRDD(Seq(numbers))EsSpark.saveToEs(rdd, s"${index}/_doc")println("--------------------End-----------------")}def read2Es(sc: SparkContext, index: String): Unit = {val rdd: RDD[(String, collection.Map[String, AnyRef])] = EsSpark.esRDD(sc, s"${index}/_doc")println("------------------rdd.count():" + rdd.count())rdd.foreach(line => {val key: String = line._1val value: collection.Map[String, AnyRef] = line._2println("------------------key:" + key)for (tmp <- value) {val key1: String = tmp._1val value1: AnyRef = tmp._2println("------------------key1:" + key1)println("------------------value1:" + value1)}})}

希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您

相关内容

热门资讯

常用商务英语口语   商务英语是以适应职场生活的语言要求为目的,内容涉及到商务活动的方方面面。下面是小编收集的常用商务...
六年级上册英语第一单元练习题   一、根据要求写单词。  1.dry(反义词)__________________  2.writ...
复活节英文怎么说 复活节英文怎么说?复活节的英语翻译是什么?复活节:Easter;"Easter,anniversar...
2008年北京奥运会主题曲 2008年北京奥运会(第29届夏季奥林匹克运动会),2008年8月8日到2008年8月24日在中华人...
英语道歉信 英语道歉信15篇  在日常生活中,道歉信的使用频率越来越高,通过道歉信,我们可以更好地解释事情发生的...
六年级英语专题训练(连词成句... 六年级英语专题训练(连词成句30题)  1. have,playhouse,many,I,toy,i...
上班迟到情况说明英语   每个人都或多或少的迟到过那么几次,因为各种原因,可能生病,可能因为交通堵车,可能是因为天气冷,有...
小学英语教学论文 小学英语教学论文范文  引导语:英语教育一直都是每个家长所器重的,那么有关小学英语教学论文要怎么写呢...
英语口语学习必看的方法技巧 英语口语学习必看的方法技巧如何才能说流利的英语? 说外语时,我们主要应做到四件事:理解、回答、提问、...
四级英语作文选:Birth ... 四级英语作文范文选:Birth controlSince the Chinese Governmen...
金融专业英语面试自我介绍 金融专业英语面试自我介绍3篇  金融专业的学生面试时,面试官要求用英语做自我介绍该怎么说。下面是小编...
我的李老师走了四年级英语日记... 我的李老师走了四年级英语日记带翻译  我上了五个学期的小学却换了六任老师,李老师是带我们班最长的语文...
小学三年级英语日记带翻译捡玉... 小学三年级英语日记带翻译捡玉米  今天,我和妈妈去外婆家,外婆家有刚剥的`玉米棒上带有玉米籽,好大的...
七年级英语优秀教学设计 七年级英语优秀教学设计  作为一位兢兢业业的人民教师,常常要写一份优秀的教学设计,教学设计是把教学原...
我的英语老师作文 我的英语老师作文(通用21篇)  在日常生活或是工作学习中,大家都有写作文的经历,对作文很是熟悉吧,...
英语老师教学经验总结 英语老师教学经验总结(通用19篇)  总结是指社会团体、企业单位和个人对某一阶段的学习、工作或其完成...
初一英语暑假作业答案 初一英语暑假作业答案  英语练习一(基础训练)第一题1.D2.H3.E4.F5.I6.A7.J8.C...
大学生的英语演讲稿 大学生的英语演讲稿范文(精选10篇)  使用正确的写作思路书写演讲稿会更加事半功倍。在现实社会中,越...
VOA美国之音英语学习网址 VOA美国之音英语学习推荐网址 美国之音网站已经成为语言学习最重要的资源站点,在互联网上还有若干网站...
商务英语期末试卷 Part I Term Translation (20%)Section A: Translate ...