【大数据】Spark读取Hbase/Elasticsearch/Kafka/Mysql等数据源
创始人
2024-06-02 16:00:24
0

spark读取数据源

  • 配置maven依赖
  • Spark读取mysql数据
    • 环境信息
    • 代码
  • spark读取kafka
    • 环境信息
    • 代码
  • spark读取hdfs数据
    • 环境信息
    • 代码
  • spark读取Hbase数据源
    • 环境信息
    • 代码
  • spark读取es数据源
    • 环境信息
    • 代码

配置maven依赖

2.123.2.13.3.12.12.31.12.77org.apache.sparkspark-sql_${scala.version}${spark.version}org.apache.sparkspark-hive_${scala.version}${spark.version}org.apache.sparkspark-streaming_${scala.version}${spark.version}org.apache.sparkspark-streaming-kafka-0-10_${scala.version}${spark.version}org.apache.sparkspark-sql-kafka-0-10_${scala.version}${spark.version}com.fasterxml.jackson.corejackson-core${jackson.version}com.fasterxml.jackson.corejackson-databind${jackson.version}com.fasterxml.jackson.corejackson-annotations${jackson.version}com.amazonawsaws-java-sdk-s3${s3.version}org.apache.hadoophadoop-aws${hadoop.version}commons-httpclientcommons-httpclient3.1org.elasticsearchelasticsearch-hadoop7.10.2org.apache.sparkspark-*org.apache.hbase.connectors.sparkhbase-spark1.0.0org.apache.hadoophadoop-hdfsorg.apache.sparkspark-*com.thoughtworks.paranamerparanamer2.8org.slf4jslf4j-log4j121.7.2

Spark读取mysql数据

环境信息

  • mysql地址
  • 用户名
  • 密码
  • 库表

代码

def main(args: Array[String]): Unit = {val Array(url, username, password, table) = argsval sparkConf = new SparkConf().setAppName("Spark Mysql Demo (Scala)")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val props = new Properties()props.setProperty("user", username)props.setProperty("password", password)val df: DataFrame = spark.read.jdbc(url, table, props)val rowNumbers: Long = df.count()println("数据总条数据: " + rowNumbers)// select 里的参数位为 mysql 的字段df.select("id").where("id >= 3").show()// 写入数据// df.write.mode(SaveMode.Append).jdbc(url,"tb_02",props)spark.stop()}

spark读取kafka

环境信息

  • brokers地址
  • topics信息
  • 消费组id

代码

  def main(args: Array[String]): Unit = {val Array(brokers, topics, interval, groupId) = argsval sparkConf = new SparkConf().setAppName("Spark Kafka Demo (Scala)")val ssc = new StreamingContext(sparkConf, Seconds(interval.toInt))// kafka参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))// 消息val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array(topics), kafkaParams))// 单词统计val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}

spark读取hdfs数据

环境信息

  • 源地址
  • 读取的分区数
  • 目标地址

代码

  def main(args: Array[String]): Unit = {val Array(src, partition, dest) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HDFS Demo (Scala)")// 1、创建sessionval session: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()// 2、依据sc创建rddval sc: SparkContext = session.sparkContextval file: RDD[String] = sc.textFile(src, partition)file.saveAsTextFile(dest)session.stop()}

spark读取Hbase数据源

环境信息

  • zk的地址
  • zk上的hbase设置的rootDir
  • hbase的master地址
  • table

代码

  def main(args: Array[String]): Unit = {val Array(zookeeper, rootdir, master, table) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HBase Demo (Scala)")// 支持 hive 读写val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val hbaseConfig: Configuration = HBaseConfiguration.create()hbaseConfig.set("hbase.zookeeper.quorum",zookeeper)hbaseConfig.set("hbase.rootdir", rootdir)hbaseConfig.set("hbase.master", master)// 设置查询的表名hbaseConfig.set(TableInputFormat.INPUT_TABLE, table)val stuRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count: Long = stuRDD.count()println("Students RDD Count:" + count)stuRDD.cache()// 遍历输出stuRDD.foreach({ case (_, result) =>val key: String = Bytes.toString(result.getRow)println("Row key:" + key)})spark.stop()}

spark读取es数据源

环境信息

  • es的用户名
  • es的密码
  • es的服务地址
  • es的clusterName,主要是集群权限给关了。需要自己指定
  • es的index

代码

  def main(args: Array[String]): Unit = {val Array(user, password, esIp, clusterName, index) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark Es Demo (Scala)").setMaster("local[*]")sparkConf.set("cluster.name", clusterName)sparkConf.set("es.internal.es.cluster.name", clusterName)sparkConf.set("es.internal.es.version", "7.12") // 防止 security_exception: action [cluster:monitor/main] is unauthorizedsparkConf.set("es.index.auto.create", "true")sparkConf.set("es.nodes", esIp)sparkConf.set("es.port", "9200")sparkConf.set("es.mapping.date.rich", "false")sparkConf.set("es.index.read.missing.as.empty", "true")sparkConf.set("es.net.http.auth.user", user) //访问es的用户名sparkConf.set("es.net.http.auth.pass", password) //访问es的密码sparkConf.set("es.nodes.wan.only", "true")sparkConf.set("es.index.read.allow.red.status", "true") // 防止 security_exception: action [cluster:monitor/health] is unauthorizedval sc = new SparkContext(sparkConf)write2Es(sc, index)read2Es(sc, index)sc.stop()}def write2Es(sc: SparkContext, index: String): Unit = {val numbers: Map[String, String] = Map("jsIp" -> "11111","address" -> "11111", "enterprise" -> "北京","xian" -> "11111", "ip" -> "11111","source" -> "11111", "sheng" -> "11111","phone" -> "11111", "shi" -> "11111","ipLong" -> "333", "time" -> "2022-12-27 09:56:50","qsIp" -> "11111", "contacts" -> "11111","email" -> "11111@163.com")val rdd: RDD[Map[String, Any]] = sc.makeRDD(Seq(numbers))EsSpark.saveToEs(rdd, s"${index}/_doc")println("--------------------End-----------------")}def read2Es(sc: SparkContext, index: String): Unit = {val rdd: RDD[(String, collection.Map[String, AnyRef])] = EsSpark.esRDD(sc, s"${index}/_doc")println("------------------rdd.count():" + rdd.count())rdd.foreach(line => {val key: String = line._1val value: collection.Map[String, AnyRef] = line._2println("------------------key:" + key)for (tmp <- value) {val key1: String = tmp._1val value1: AnyRef = tmp._2println("------------------key1:" + key1)println("------------------value1:" + value1)}})}

希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您

相关内容

热门资讯

我的心愿四年级100字作文怎... 篇一:我的心愿四年级100字作文怎么写42篇第一篇内容:我希望成为一名优秀的画家我是一个喜欢画画的孩...
感动的事四年级作文300字【... 感动的事四年级作文300字 篇一我的小伙伴我有一个特别好的小伙伴,他叫小明。他是一个非常善良和乐于助...
回乡偶书【优选4篇】 回乡偶书 篇一 回乡偶书 篇二回乡偶书 篇三 重庆市忠县实验小学四年级二班:申晓航         ...
想象四年级作文400字(精简... 想象四年级作文400字 篇一我的未来世界我是一位四年级的学生,我有一个梦想,那就是拥有一个充满科技和...
四年级作文【精彩6篇】 四年级作文 篇一我喜欢的动物我喜欢的动物是小狗。小狗是一种可爱的动物,它们有着憨态可掬的外表和忠诚的...
四年级描写北京故宫的作文40... 四年级描写北京故宫的作文400字 篇一北京故宫是我国古代宫殿建筑的代表之一,它位于北京市中心,是中国...
四年级作文我的暑生活(通用6... 四年级作文我的暑生活 篇一我的暑假生活充满了欢乐和收获。这个暑假,我参加了许多有趣的活动,学到了很多...
玫瑰花和小雨珠四年级作文【通... 玫瑰花和小雨珠四年级作文 篇一玫瑰花和小雨珠我有一个好朋友,她叫小雨珠。她是我们班的班花,而我则是班...
我我快乐300字四年级作文8... 我快乐300字四年级作文 篇一我快乐的一天今天是我快乐的一天。早上我起床后,妈妈给我做了我最喜欢吃的...
我们这美丽的校园四年级作文(... 我们这美丽的校园四年级作文 篇一我热爱我的校园,因为它是如此美丽。每天早上,当我走进校园时,我都被各...
古老而神秘的海神庙四年级作文... 古老而神秘的海神庙四年级作文 篇一海神庙是一座古老而神秘的庙宇,位于一个小岛上。我和我的家人去参观了...
小书迷四年级作文【推荐6篇】 小书迷四年级作文 篇一看见书,我就忍不住心里狂喜。那是一种浓郁的书香,让我仿佛置身于一个神奇的世界。...
五一劳动节四年级作文(精选6... 五一劳动节四年级作文 篇一五一劳动节是每年的五月一日,也是我们最期待的假期之一。这个节日是为了表彰劳...
四年级写我的动物朋友的作文第... 四年级写我的动物朋友的作文第四单元 篇一我的宠物狗我家养了一只非常可爱的宠物狗,它的名字叫小黑。小黑...
热爱生命四年级作文【优秀6篇... 热爱生命四年级作文 篇一热爱生命生命是最宝贵的财富,我们每个人都应该热爱生命。生命只有一次,我们要珍...
以我发现了为题的作文【优选3... 以我发现了为题的作文 篇一我发现了一片神奇的花海今天,我偶然发现了一片令人惊叹的花海,仿佛置身于童话...
小学生四年级以安全为主题作文... 小学生四年级以安全为主题作文 篇一标题:保护自己的安全安全是我们生活中非常重要的一部分,我们每个人都...
小青蛙四年级作文(精彩6篇) 小青蛙四年级作文 篇一:我的好朋友小青蛙我有一个好朋友,他是一只小青蛙。他的名字叫小绿,因为他全身都...
爱的教育作文【精彩6篇】 爱的教育作文 篇一爱,是一种最基本的人类情感,也是一种最重要的教育方式。在教育过程中,爱是最有效的催...
清明扫墓的作文(最新6篇) 清明扫墓的作文 篇一清明扫墓是我国传统的节日之一,也是我最喜欢的节日之一。每年的清明节,我都会和家人...