网站首页 > 博客文章 正文
Spring Boot集成Spark SQL进行大数据分析
一、大数据开发者的春天
最近一段时间学习Spark基础知识,Spark开发现在完成集成了SQL。具有SQL功底的同学可以无缝学习Spark,成为Spark开发工程师。
同Flink一样,Spark Streaming支持实时数据计算。Flink也完全兼容SQL,Hive更是数据仓库人员开发的圣剑。我曾经在电商公司做过专门的Hive SQL数据仓库开发,当时还是SSM架构,Hive定时任务作业。
现在Spring Boot称霸了Spring框架开发鳌头,由此衍生的Spring Cloud微服务架构,扩展出来Spring Cloud Alibaba架构。笔者曾经的文章都有讲到,本文分析Spring Boot集成Spark SQL开发的案例。
Spark SQL可以使用MySQL作为数据源,也可以使用Oracle、Hive作为数据源。本文采用Scala格式连接MySQL和Java格式连接Hive进行讲解,暂时连接Hive只测试了Java格式。
二、Maven项目引入
Maven工程引入Spark坐标,其版本号为
<java.version>1.8</java.version>
<scala.version>2.11</scala.version>
<spark.version>2.4.0</spark.version>
Spring Boot版本号为:
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.6.RELEASE</version>
<relativePath/>
</parent>
2.1构建Spring Boot项目
引入需要的Spring Boot依赖坐标:
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-aop</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
2.2引入Spark核心工具
引入Spark核心工具、Scala依赖
dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
引入Spark Streaming:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
引入MySQL数据库坐标:
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
</dependency>
<dependency>
<groupId>org.codehaus.janino</groupId>
<artifactId>janino</artifactId>
<version>3.0.8</version>
</dependency>
2.3引入Spark SQL坐标
引入Spark SQL依赖:
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
引入Spring Boot整合器
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-configuration-processor</artifactId>
<optional>true</optional>
</dependency>
三、主启动工程采用MySQL作为数据源的Spark SQL
主启动工程Scala语言启动,因为引入了Scala语法在,支持Scala启动:
@Configuration
@EnableAutoConfiguration
@ComponentScan
@SpringBootApplication
class SpApp
object springsparkdemoApplication extends App{
SpringApplication.run(classOf[SpApp])
}
3.1核心配置
核心Config配置也采用Scala形式,如下:
import org.springframework.context.annotation.{Bean, Configuration}
import org.apache.spark.{SparkConf, SparkContext}
@Configuration
class HSparkStreamconfig {
private var local= "local"
private var sparkHome = "."
private var sparkName = "sparkProd"
@Bean
def SparkConf: SparkConf = {
var spConf = new SparkConf().setAppName(sparkName).setMaster(local)
return spConf
}
@Bean
def SparkContext = new SparkContext(SparkConf)
}
3.2RestController层
3.2.1示例Controller
数据测试层采用Scala格式RestController如下:
@RestController
@RequestMapping (value = Array("SparkController/spark/"))
@CrossOrigin
class SparkController{
...
}
具体的方法:
@Autowired
var sc:SparkContext = _
@GetMapping(value = Array("prod"))
def prod=
{
val url = "jdbc:mysql://192.16.1.39:3306/taskEmmDB?useUnicode=true&characterEncoding=UTF-8&user=emm&password=wty";
val prop = new Properties();
val sqlContext = new SQLContext(sc);
val df = sqlContext.read.jdbc(url, "t_mem", prop);
df.createOrReplaceTempView("t_mem")
//使用MySQL语句进行查询
var sf = sqlContext.sql("select * from t_mem where p_id = 0")
println("1.------------->" + sf.show().toString())
//println("1.------------->" + sf.rdd.partitions.size)
JSON.parseFull("{honke:1}")
}
此种是Scala连接MySQL的测试Spark SQL
四、采用Hive SQL作为数据源
前面讲到了Spark SQL连接MySQL的用法,下面举例使用Java JDBC连接Hive的方法,Hive作为数据源
加入Hive配置坐标:
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-jdbc</artifactId>
<version>1.2.1</version>
</dependency>
目前测试可用Java JDBC的示例连接Hive为:
String url = "jdbc:hive2://192.16.1.41:10000/default";
try {
Class.forName("org.apache.hive.jdbc.HiveDriver");
} catch (ClassNotFoundException e) {
log.error("异常:", e);
}
Connection conn = DriverManager.getConnection(url,"hadoop","");
Statement stmt = conn.createStatement();
String sql = "SELECT * FROM t_mem limit 10";
ResultSet res = stmt.executeQuery(sql);
while(res.next()){
System.out.println("p_id: "+res.getInt(1)+"\tname: "+res.getString(2)+"\tage:" + res.getInt(3));
}
本文简单讲解了Spark SQL的用法,后面将继续推出Spring Boot集成Hive、集成Flink、微服务系列及阿里微服务系列,敬请期待。
猜你喜欢
- 2024-10-04 TP5 where数组查询(模糊查询--多个查询条件)
- 2024-10-04 sql基础(六)(sql基本知识点)
- 2024-10-04 必知的php数组函数(必知的php数组函数有哪些)
- 2024-10-04 C# 数据操作系列 - 16 SqlSugar 完结篇(最后的精华)
- 2024-10-04 会SQL语句,就能快速开放你的数据接口API
- 2024-10-04 VBA数组与字典解决方案第46讲:进行数据的模糊分类汇总
- 2024-10-04 sqlite3 支持JSON(Sqlite3 支持网络访问)
- 2024-10-04 HeidiSQL 免费的可视化数据库管理工具
- 2024-10-04 实例讲解MyBatisPlus自定义sql注入器方法
- 2024-10-04 【实用技能】Seacms 8.7版本SQL注入分析
你 发表评论:
欢迎- 最近发表
- 标签列表
-
- ifneq (61)
- 字符串长度在线 (61)
- googlecloud (64)
- messagesource (56)
- promise.race (63)
- 2019cad序列号和密钥激活码 (62)
- window.performance (66)
- qt删除文件夹 (72)
- mysqlcaching_sha2_password (64)
- ubuntu升级gcc (58)
- nacos启动失败 (64)
- ssh-add (70)
- jwt漏洞 (58)
- macos14下载 (58)
- yarnnode (62)
- abstractqueuedsynchronizer (64)
- source~/.bashrc没有那个文件或目录 (65)
- springboot整合activiti工作流 (70)
- jmeter插件下载 (61)
- 抓包分析 (60)
- idea创建mavenweb项目 (65)
- vue回到顶部 (57)
- qcombobox样式表 (68)
- tomcatundertow (58)
- pastemac (61)
本文暂时没有评论,来添加一个吧(●'◡'●)