2.12 3.2.1 3.3.1 2.12.3 1.12.77 org.apache.spark spark-sql_${scala.version} ${spark.version} org.apache.spark spark-hive_${scala.version} ${spark.version} org.apache.spark spark-streaming_${scala.version} ${spark.version} org.apache.spark spark-streaming-kafka-0-10_${scala.version} ${spark.version} org.apache.spark spark-sql-kafka-0-10_${scala.version} ${spark.version} com.fasterxml.jackson.core jackson-core ${jackson.version} com.fasterxml.jackson.core jackson-databind ${jackson.version} com.fasterxml.jackson.core jackson-annotations ${jackson.version} com.amazonaws aws-java-sdk-s3 ${s3.version} org.apache.hadoop hadoop-aws ${hadoop.version} commons-httpclient commons-httpclient 3.1 org.elasticsearch elasticsearch-hadoop 7.10.2 org.apache.spark spark-* org.apache.hbase.connectors.spark hbase-spark 1.0.0 org.apache.hadoop hadoop-hdfs org.apache.spark spark-* com.thoughtworks.paranamer paranamer 2.8 org.slf4j slf4j-log4j12 1.7.2
def main(args: Array[String]): Unit = {val Array(url, username, password, table) = argsval sparkConf = new SparkConf().setAppName("Spark Mysql Demo (Scala)")val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val props = new Properties()props.setProperty("user", username)props.setProperty("password", password)val df: DataFrame = spark.read.jdbc(url, table, props)val rowNumbers: Long = df.count()println("数据总条数据: " + rowNumbers)// select 里的参数位为 mysql 的字段df.select("id").where("id >= 3").show()// 写入数据// df.write.mode(SaveMode.Append).jdbc(url,"tb_02",props)spark.stop()}
def main(args: Array[String]): Unit = {val Array(brokers, topics, interval, groupId) = argsval sparkConf = new SparkConf().setAppName("Spark Kafka Demo (Scala)")val ssc = new StreamingContext(sparkConf, Seconds(interval.toInt))// kafka参数val kafkaParams = Map[String, Object]("bootstrap.servers" -> brokers,"key.deserializer" -> classOf[StringDeserializer],"value.deserializer" -> classOf[StringDeserializer],"group.id" -> groupId,"auto.offset.reset" -> "earliest","enable.auto.commit" -> (false: java.lang.Boolean))// 消息val messages = KafkaUtils.createDirectStream[String, String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String, String](Array(topics), kafkaParams))// 单词统计val lines = messages.map(_.value)val words = lines.flatMap(_.split(" "))val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)wordCounts.print()// Start the computationssc.start()ssc.awaitTermination()}
def main(args: Array[String]): Unit = {val Array(src, partition, dest) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HDFS Demo (Scala)")// 1、创建sessionval session: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()// 2、依据sc创建rddval sc: SparkContext = session.sparkContextval file: RDD[String] = sc.textFile(src, partition)file.saveAsTextFile(dest)session.stop()}
def main(args: Array[String]): Unit = {val Array(zookeeper, rootdir, master, table) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark HBase Demo (Scala)")// 支持 hive 读写val spark: SparkSession = SparkSession.builder().config(sparkConf).getOrCreate()val hbaseConfig: Configuration = HBaseConfiguration.create()hbaseConfig.set("hbase.zookeeper.quorum",zookeeper)hbaseConfig.set("hbase.rootdir", rootdir)hbaseConfig.set("hbase.master", master)// 设置查询的表名hbaseConfig.set(TableInputFormat.INPUT_TABLE, table)val stuRDD: RDD[(ImmutableBytesWritable, Result)] = spark.sparkContext.newAPIHadoopRDD(hbaseConfig, classOf[TableInputFormat],classOf[org.apache.hadoop.hbase.io.ImmutableBytesWritable],classOf[org.apache.hadoop.hbase.client.Result])val count: Long = stuRDD.count()println("Students RDD Count:" + count)stuRDD.cache()// 遍历输出stuRDD.foreach({ case (_, result) =>val key: String = Bytes.toString(result.getRow)println("Row key:" + key)})spark.stop()}
def main(args: Array[String]): Unit = {val Array(user, password, esIp, clusterName, index) = argsval sparkConf: SparkConf = new SparkConf().setAppName("Spark Es Demo (Scala)").setMaster("local[*]")sparkConf.set("cluster.name", clusterName)sparkConf.set("es.internal.es.cluster.name", clusterName)sparkConf.set("es.internal.es.version", "7.12") // 防止 security_exception: action [cluster:monitor/main] is unauthorizedsparkConf.set("es.index.auto.create", "true")sparkConf.set("es.nodes", esIp)sparkConf.set("es.port", "9200")sparkConf.set("es.mapping.date.rich", "false")sparkConf.set("es.index.read.missing.as.empty", "true")sparkConf.set("es.net.http.auth.user", user) //访问es的用户名sparkConf.set("es.net.http.auth.pass", password) //访问es的密码sparkConf.set("es.nodes.wan.only", "true")sparkConf.set("es.index.read.allow.red.status", "true") // 防止 security_exception: action [cluster:monitor/health] is unauthorizedval sc = new SparkContext(sparkConf)write2Es(sc, index)read2Es(sc, index)sc.stop()}def write2Es(sc: SparkContext, index: String): Unit = {val numbers: Map[String, String] = Map("jsIp" -> "11111","address" -> "11111", "enterprise" -> "北京","xian" -> "11111", "ip" -> "11111","source" -> "11111", "sheng" -> "11111","phone" -> "11111", "shi" -> "11111","ipLong" -> "333", "time" -> "2022-12-27 09:56:50","qsIp" -> "11111", "contacts" -> "11111","email" -> "11111@163.com")val rdd: RDD[Map[String, Any]] = sc.makeRDD(Seq(numbers))EsSpark.saveToEs(rdd, s"${index}/_doc")println("--------------------End-----------------")}def read2Es(sc: SparkContext, index: String): Unit = {val rdd: RDD[(String, collection.Map[String, AnyRef])] = EsSpark.esRDD(sc, s"${index}/_doc")println("------------------rdd.count():" + rdd.count())rdd.foreach(line => {val key: String = line._1val value: collection.Map[String, AnyRef] = line._2println("------------------key:" + key)for (tmp <- value) {val key1: String = tmp._1val value1: AnyRef = tmp._2println("------------------key1:" + key1)println("------------------value1:" + value1)}})}
希望对正在查看文章的您有所帮助,记得关注、评论、收藏,谢谢您