开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Java 中的三大特性

发表于 2021-11-22

这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战

我们都知道 Java 中有三大特性,那就是继承 ,封装和多态 。那我今天我就来说说这几个特性 。

封装

老样子 ,先问问自己为什么会存在这些特性 。首先说封装 ,封装就是使用权限修饰符来实现对属性的隐藏 ,同时提供一些共有的 get / set 方法来对数据进行访问 。这样怎么实现封装的效果的呢 ?我们可以在 get 或 set 方法内部添加一些逻辑控制语句 。比方说在设置年龄的时候 ,我就限制传入的 age 不能大于 120 。代码实现可以这样写 。

1
2
3
4
5
6
7
8
9
10
11
12
csharp复制代码public void setAge(int age){
if(age > 120){
this.age = 120;
}
}
public int getAge(){
if(this.gender == "女"){
System.out.println("女孩子的年龄是你能看的?");
return -1;
}
return this.age;
}

这只是一个简单的实例 ,实际情况是可以优化的 。比方说对小于零的 age 进行限制 。所以我们可以看到对数据的封装有以下优点 ,对成员进行精密的控制且隐藏了实现细节 ,还可以自由的修改方法的内部实现 。

继承

就像是儿子继承老子的财富那样 ,子类也可以继承父类的属性和方法 ,但是有些是不能继承的 ,这要回到前面看一看访问修饰符的问题了 。在 Java 中只允许单继承 ,就是只能有一个老子 。继承是怎么实现的 ?为什么要有继承 ?我们通过 extends 关键字来标识继承的父类 。而且继承是可以传递的 ,A 继承 B ,B 继承 C ,那么 C 也是 A 的父类 。父类也叫超类 ,子类也叫基类 。Object 是所有类的超类 。在子类中调用和父类同名的方法时 ,执行的是子类的方法 ,而可以显示的使用 super 来调用父类的方法 。

继承的优点显而易见 ,可以增加代码的复用性 ,但是我们不能为了继承而去继承 ,还要注意继承和组合的概念 ,继承是 “ is a ” 的概念 ,比方说 cat is a animal . 而组合是 “ has a ” 的概念 ,比方说 our body has a leg . 以上两句话可以设计出如下的结构 。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
scala复制代码class Animal {
sleep(){
System.print.out("动物睡觉");
}
}
// 表示猫是动物
class Cat extends Animal{
sleep(){
System.print.out("猫睡觉");
}
}

class Leg{
}
// 表示身体里有腿
class Body{
Leg leg = new Leg();
}

多态

而最后一个就是多态 ,同样的问自己 ,什么是多态 ?多态有什么用 ?顾名思义 ,多态就是多种状态 。就像是同样的早晨有时候你是精神满满而有时候你是萎靡不振一样 ,我们在调用同一个方法的时候 ,很可能执行的情况也不一样 。这就是 Java 中说的多态 。那为什么会不一样呢 ?还记得刚刚说的子类继承父类的方法吧 ,子类在继承父类方法之后 ,可以对其进行重写 ,当我们调用重写的方法时 ,运行的结果就和父类的结果不一样 。所以说 ,你想要多态 ,先要有继承 。还有一种多态的形式 ,我们称之为 “ 虚拟方法调用 ” 又叫向上转型 ,说的简单一点就是父类的引用指向了子类对象 。像这样 ,父类 Animal 的 a 指向了 Cat 实体 。

1
2
scss复制代码Animal a = new Cat();
a.sleep() // 猫睡觉

需要注意的是在使用虚拟方法调用的时候不能调用父类独有的方法 ,这会在运行时报错 ,在编译时没事 ,编译的时候只编译等号左边 。但是在运行的会看具体的对象 ,在子类中找不到相应的方法自然会报错 。

总结

可以总结一下多态的作用了 ,我们使用多态可以达到解耦合和易扩展的目的 ,我们编写不同模块的功能 ,在调用的时候只需要将父类的引用变个方向就行 。只需提供稳定的子类实现 ,父类只负责调用相应的接口方法就行 。这也就是工厂设计模式的原理 。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

浅析SQL中的JOIN用法

发表于 2021-11-22

「这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战」

前言

  近期在操作数据库的时候,需要经多个表关联起来查询,在很多业务当中,也需要将多个表关联起来查询,相信第一个想法就是使用JOIN链接查询,那么你熟悉JOIN链接查询吗?

JOIN

  在编写SQL中,我们操作多个数据库表时总是使用JOIN进行链接。在实际开发中经常使用到JOIN进行数据库表联查。常用的方式有:INNER JOIN、LEFT JOIN、RIGHT JOIN、FULL JOIN、OUTER JOIN等形式。下面将与实际集合起来进行相关数据演示。

  本次将以MySQL中两个数据库表进行演示。建表SQL语句如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
js复制代码CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(255) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`email` varchar(255) DEFAULT NULL,
`create_time` datetime DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8mb4;

CREATE TABLE `user_subject` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`uid` int(11) DEFAULT NULL,
`subject` varchar(255) DEFAULT NULL,
`score` int(255) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=19 DEFAULT CHARSET=utf8mb4;

图片.png

图片.png

INNER JOIN

  INNER JOIN在SQL中是使用较多的内连接方式,也可以称作等值链接,主要作用是获取两个及两个以上表中相等的数据值。其语法格式一般为:

1
js复制代码SELECT * FROM 表A AS a INNER JOIN 表B AS b ON a.id=b.uid

本次测试使用的SQL如下:

1
js复制代码SELECT * FROM `user` as a INNER JOIN user_subject as b ON a.id=b.uid;

  通过INNER JOIN查询之后,可以发现user表中的小爱没有被查询到,user_subject表中的id为16、17、18的数据没有返回。表中只返回了user表中id与user_subject表中uid相同的列的数据信息。
图片.png
  可以得出如下图中红色部分就是INNER JOIN查询的数据信息。

图片.png

LEFT JOIN

  LEFT JOIN是左连接,主要作用是获取左表中的所有记录,即使在右表没有对应匹配的记录。其语法格式一般为:

1
js复制代码SELECT * FROM 表A as a  LEFT JOIN  表B as b ON a.id=b.uid;

本次测试使用的SQL如下:

1
js复制代码SELECT * FROM `user` as a  LEFT JOIN  user_subject as b ON a.id=b.uid;

  可以看到表user中的所有数据已经列出,而表user_subject中只有与user表相同值得数据被列出。而user表中的小度在表中user_subject没有数据也被列出了。
图片.png
  可以通过图形化理解LEFT JOIN,即包含A表中的所有数据,和包含A表中与B表值相同的数据。

图片.png

RIGHT JOIN

  RIGHT JOIN是右连接:与 LEFT JOIN 恰恰想反,用于获取右表中的所有记录,即使左表没有对应匹配的记录也会查询到。其语法格式一般为:

1
js复制代码SELECT * FROM 表A as a  RIGHT JOIN 表B as b ON a.id=b.uid;

本次测试使用的SQL如下:

1
js复制代码SELECT * FROM `user` as a  RIGHT JOIN user_subject as b ON a.id=b.uid;

  通过执行测试SQL 可以看到,表user_subject中所有的数据都已经被列出,而表user中与表user_subject匹配的数据也被列出来了。
图片.png

  可以通过图形化理解RIGHT JOIN,即包含B表中的所有数据,和包含B表中与A表值相同的数据。

图片.png

FULL JOIN

   FULL JOIN是全连接,主要作用是只要其中一个表中存在匹配,则返回所在行的数据信息。
其语法格式一般为:

1
js复制代码SELECT * FROM 表A as a  FULL JOIN 表B as b ON a.id=b.uid;

本次测试使用的SQL如下:

1
js复制代码SELECT * FROM `user` as a  FULL JOIN user_subject as b ON a.id=b.uid;

图片.png

结语

   好了,以上就是浅析SQL中的JOIN用法,感谢您的阅读,希望您喜欢,如对您有帮助,欢迎点赞收藏。如有不足之处,欢迎评论指正。下次见。

   作者介绍:【小阿杰】一个爱鼓捣的程序猿,JAVA开发者和爱好者。公众号【Java全栈架构师】维护者,欢迎关注阅读交流。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JVM——简单分析GC

发表于 2021-11-22

这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战」

前言

GC常用相关参数和堆内存使用情况 - 掘金 (juejin.cn)

这篇文章讲了GC的常用参数与查看堆内存情况

首先我们创建DemoTest类添加main方法,并添加jvm参数:-Xms20M -Xmx20M -Xmn10M -XX:+UserSerialGC -xx:+PrintGCDetails -verbose:gc

然后运行空的main方法,内容如下(具体意义看上篇文章)

image.png

分析GC

在main方法中创建一个ArrayList里面放入数组byte[],并使该对象为栈帧上的局部变量引用堆里面的对象,那么这个对象在运行期间由于GC Root引用所以它不会被回收。

然后往对象里面加入5M的byte数组,通过上面图看到Eden中已经被占用了百分之45,然后再放入5M显然Eden是存不下的这里肯定就会触发以下垃圾回收,代码如下

1
2
3
4
5
js复制代码    private static final int SEVEN_MB = 7 * 1024 * 1024;
public static void main(String[] args) {
ArrayList list = new ArrayList();
list.add(new Byte[SEVEN_MB]);
}

运行后结果:

1637590815(1).jpg

如果是老年代发生的GC最前面会提示Full GC,新生代发生的GC就叫做GC

DefNew 后面的两个值前面代表回收前的大小后面表示回收后的大小,括号中表示新生代的总大小,在后面跟着的数值就是代表新生代垃圾回收执行的时间。

执行时间之后的信息是整个堆执行前占用情况和执行后占用情况,括号中表示堆的总大小,再之后就是整个堆的回收时间

然后内存区域也发生了改变,Eden被占用了百分之就是,由于GC的时候幸存区From和幸存区To内存会发生交换,所以幸存区内存空间被占用了百分之六十五

image.png

继续往数组中添加byte数组,这里再添加 1M:

1
2
3
4
5
6
7
js复制代码    private static final int SEVEN_MB = 7 * 1024 * 1024;
private static final int _1MB = 1 * 1024 * 1024;
public static void main(String[] args) {
ArrayList list = new ArrayList();
list.add(new Byte[SEVEN_MB]);
list.add(new Byte[_1MB]);
}

再次运行查看GC信息:

image.png

这里触发了两次垃圾回收,老年代中已经被占用了百分之七十六的空间。

继续添加byte数组,这里再添加 15M

1
2
3
4
5
6
7
8
9
arduino复制代码private static final int SEVEN_MB = 7 * 1024 * 1024;
private static final int _15MB = 15 * 1024 * 1024;
private static final int _1MB = 1 * 1024 * 1024;
public static void main(String[] args) {
ArrayList list = new ArrayList();
list.add(new Byte[SEVEN_MB]);
list.add(new Byte[_1MB]);
list.add(new Byte[_15MB]);
}

image.png

再添加15M已经超出了我们设置的堆内存最大大小,所以抛出了堆内存溢出的错误信息,并且还进行了一次 Fubll GC。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

分布式事务解决方案 概念

发表于 2021-11-22

「这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战」。

概念

在分布式系统中,如果使用到事务就会涉及到分布式事务,例如订购业务,订购分为下单和减库存,

下单和减库存是两个独立的服务,如果下单成功,但是减库存失败,就会造成超卖的情况,这时候就要通过分布式事务来解决这种场景。

分布式事务解决方案

XA方案

XA方案也叫做两阶段提交事务方案,例如公司团建,会有个主席来负责组织团建。

第一个阶段,一般主席会提前一周问一下团队里的每个人,问他们去不去,这个时候主席会等待每个人的回答,如果所有人都ok,那么就可以决定一起去,如果任何一个人回答说不去,那么主席就会取消这次活动。

第二阶段就是大家一起去团建。

两阶段提交,有个事务管理器的概念,负责协调多个数据库(资源管理器)的事务,事务管理器会问各个数据库是否准备好,如果每个数据库都ok,就正式提交事务,在各个数据库上执行操作,如果存在数据库回答不ok,就会全部回滚事务。

这种分布式事务方案,比较适合单块应用里跨多个库的分布式事务,但这种方案严重依赖于数据库层面来搞定复杂的事务,所以不适合高并发场景。

TCC方案

TCC 全称是 Try、Confirm、Cancel

这个其实是用到了补偿的概念,分为了三个阶段:

  • Try阶段:这个阶段说的是各个服务的资源做检测以及对资源进行锁定或者预留。
  • Confirm阶段: 这个阶段说的是在各个服务中执行的实际的操作。
  • Cancel阶段:如果任何一个服务的业务方法执行出错,那么这里就需要进行补偿,就是执行已经执行成功的业务逻辑的回滚操作。

例如跨银行转账的时候,要涉及到两个银行的分布式事务,如果用TCC方案来实现,思路是这样的 :

  • try阶段:先把两个银行账户的资金给它冻结住不让操作。
  • confirm阶段:执行实际的转账操作,A银行账户资金扣减,B银行账户的资金增加
  • Cancel阶段: 如果任何一个银行的操作执行失败,那么就需要回滚进行补偿,例如A银行扣减成功,B银行资金增加失败,那就就需要A银行账户资金给加回去。

这种方案也几乎很少人使用,因为这个事务回滚实际上是严重依赖于你自己写代码来回滚和补偿,会造成补偿代码巨大。

这种方案适合一致性要求很高,属于系统中核心之核心的场景,例如常见的资金类场景,就可以用TCC方案。

可靠消息最终一致性方案

基于MQ来实现事务,例如阿里的RocketMQ支持消息事务。

  1. A系统先发送一个prepared消息到mq,如果这个prepared消息发送失败那么就直接取消操作别执行。
  2. 如果消息发送成功,就接着执行本地事务,如果成功就告诉mq发送确认消息,如果失败就告诉mq回滚消息。
  3. 如果发送了确认消息,此时b系统会接收到确认消息,然后执行本地的事务。
  4. mq会自动定时轮询所有prepared消息回调你的接口,询问消息是不是本地事务处理失败,所以没发送确认消息,那是继续重试还是回滚,一般来说这里可以查下数据库看之前本地事务是否执行,如果回滚,这里也会回滚,避免了可能本地事务执行成功,确认消息发送失败了。
  5. 要是系统B的事务失败了,就会进行重试,自动不断重试直到成功。如果实在不行,要么就针对重要的资金类进行回滚,例如B系统本地回滚,想办法通知A也回滚,,或者发送报警来手工回滚和补偿。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kafka 消费者组 Rebalance 详细过程

发表于 2021-11-22

这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战

相关:Kafka 中的消费者组 | Kafka 之如何避免 Rebalance

Rebalance 的触发

之前的文章介绍了 Rebalance 触发的三大类情况,即主题发生变化、主题分区发生变化、消费者组成员发生变化。其中,实际生产中消费者组成员变化导致的 Rebalance 是最常见的,消费者组成员的依次启动也属于这种情况。

本文主要讨论消费者组成员发生变化时,Rebalance 的详细过程。

协调者

在 Rebalance 的过程,不仅需要消费者组内成员的相互协调,还需要一个叫做「协调者」的角色参与。

协调者,也叫做 Coordinator,它是一个专门为消费者群提供服务的角色,主要负责 Rebalance 的执行、位移管理、组成员管理等。

协调者存在于 Broker 中,每个消费者实例都有其对应的协调者,消费者实例在启动、提交位移等操作时,其实都是在向它对应的协调者所在的 Broker 发送请求。

消费者实例如果找到自己的协调者对应的 Broker 呢?前面我们说过,消费者实例的位移信息保存在 Kafka 内部创建的位移主题中,先找到当前消费者组的数据保存在位移主题的那个分区中,该分区的 Leader Replica 副本所在的 Broker 就是对应的协调者所在的 Broker。

消费者组的几种状态

了解了协调者这个角色,接下来介绍,协调者在管理消费者组成员的时候,会对消费者组标定状态。在协调者的视角,消费者组一共有 5 种状态:

  • Empty:表示组内没有任何成员,但可能存在已经提交的位移数据,而且这些数据还没有过期。
  • Dead:表示组内没有任何成员,而且元信息已经被移除。
  • PreparingRebalance:表示准备开始 Rebalance,所有消费成员都需要重新向协调者请求加入消费者组。
  • CompletingRebalance:表示所有的成员已经重新加入消费者组,正在等待分配方案。
  • Stable:表示稳定状态,也就是完成 Rebalance 后可以正常消费数据的状态。

他们的关系如下图:

流程图.jpg

Rebalance 的过程

当消费者组处于 Stable 状态时,消费者实例会定期向协调者发送心跳通知,起作用就是告诉协调者实例在正常运行,当有新成员计入、成员主动离开、成员失联被动离开的场景下,协调者会通过响应心跳请求的方式,告诉所有实例,要进行 Rebalance 了。

我们分别梳理一下协调者发起新一轮 Rebalance 前,这三种情况的流程。

  • 第一种,当有新的实例加入的时候,新加入的实例会向协调者发送 JoinGroup 请求,协调者收到后,会向所有实例响应新一轮 Rebalance 开始的信息。
  • 第二种,当有成员主动离开消费者组,离组的成语啊你会向协调者发送 LeaveGroup 的请求,协调者收到后,会向所有实例响应新一轮 Rebalance 开始的信息。
  • 第三种,协调者没有在规定时间内接收到某个成员的心跳通知,那么这个成员会被踢出消费者组,测试,协调者会向所有实例响应新一轮 Rebalance 开始的信息。

以上所说的开始新一轮 Rebalance 的信息,其实就是在心跳请求的响应中,告诉成员:Rebalance 要开始了,请重新申请加入消费者组。成员收到后,会重新发送 JoinGroup 请求。

当协调者收到所有成员的 JoinGroup 请求后,会在所有成员中选定一个 Leader,通常是第一个发起 JoinGroup 请求的成员。协调者会把所有成员及其订阅的主题,发给这个 Leader,由它来为每一个成员分配消费的主题分区。

在此过程中,组内的成员会向协调者发送 SyncGroup 请求,等待接收 Leader 分配好的方案。当所有的成员都接收到新的分配方案后,Rebalance 就完成了。

还有一个需要了解的地方是,当 Rebalance 开启的时候,协调者会给成员一定的时间,来提交自己当前的位移信息,然后在开始 JoinGroup 和 SyncGroup 请求。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spark SQL知识点详解与实战

发表于 2021-11-22

数据仓库系列文章

  1. 数仓架构发展史
  2. 数仓建模方法论
  3. 数仓建模分层理论
  4. 数仓建模—宽表的设计
  5. 数仓建模—指标体系
  6. 一文搞懂ETL和ELT的区别
  7. 数据湖知识点
  8. 技术选型 | OLAP大数据技术哪家强?
  9. 数仓相关面试题
  10. 从 0 到 1 学习 Presto,这一篇就够了!
  11. 元数据管理在数据仓库的实践应用
  12. 做中台2年多了,中台到底是什么呢?万字长文来聊一聊中台
  13. 数据仓库之拉链表
  14. sqoop用法之mysql与hive数据导入导出

Spark SQL概述

1、什么是Spark SQL

Spark SQL是Spark用于结构化数据(structured data)处理的Spark模块。

与基本的Spark RDD API不同,Spark SQL的抽象数据类型为Spark提供了关于数据结构和正在执行的计算的更多信息。

在内部,Spark SQL使用这些额外的信息去做一些额外的优化,有多种方式与Spark SQL进行交互,比如: SQL和DatasetAPI。

当计算结果的时候,使用的是相同的执行引擎,不依赖你正在使用哪种API或者语言。这种统一也就意味着开发者可以很容易在不同的API之间进行切换,这些API提供了最自然的方式来表达给定的转换。

Hive是将Hive SQL转换成 MapReduce然后提交到集群上执行,大大简化了编写MapReduce的程序的复杂性,由于MapReduce这种计算模型执行效率比较慢。所以Spark SQL的应运而生,它是将Spark SQL转换成RDD,然后提交到集群执行,执行效率非常快!

Spark SQL它提供了2个编程抽象,类似Spark Core中的RDD

(1)DataFrame

(2)Dataset

2、Spark SQL的特点

1)易整合

无缝的整合了SQL查询和Spark编程

2)统一的数据访问方式

使用相同的方式连接不同的数据源

3)兼容Hive

在已有的仓库上直接运行SQL或者HiveQL

4)标准的数据连接

通过JDBC或者ODBC来连接

3、什么的DataFrame

在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

同时,与Hive类似,DataFrame也支持嵌套数据类型(struct、array和map)。从API易用性的角度上看,DataFrame API提供的是一套高层的关系操作,比函数式的RDD API要更加友好,门槛更低。

图片1

上图直观地体现了DataFrame和RDD的区别。

左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得 Spark SQL 可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。

DataFrame是为数据提供了Schema的视图。可以把它当做数据库中的一张表来对待,DataFrame也是懒执行的,但性能上比RDD要高,主要原因:优化的执行计划,即查询计划通过Spark catalyst optimiser进行优化。比如下面一个例子:

图2

为了说明查询优化,我们来看上图展示的人口数据分析的示例。图中构造了两个DataFrame,将它们join之后又做了一次filter操作。

如果原封不动地执行这个执行计划,最终的执行效率是不高的。因为join是一个代价较大的操作,也可能会产生一个较大的数据集。如果我们能将filter下推到 join下方,先对DataFrame进行过滤,再join过滤后的较小的结果集,便可以有效缩短执行时间。而Spark SQL的查询优化器正是这样做的。简而言之,逻辑查询计划优化就是一个利用基于关系代数的等价变换,将高成本的操作替换为低成本操作的过程。

图片3

4、什么是DataSet

DataSet是分布式数据集合。DataSet是Spark 1.6中添加的一个新抽象,是DataFrame的一个扩展。它提供了RDD的优势(强类型,使用强大的lambda函数的能力)以及Spark SQL优化执行引擎的优点。DataSet也可以使用功能性的转换(操作map,flatMap,filter等等)。

1)是DataFrame API的一个扩展,是SparkSQL最新的数据抽象;

2)用户友好的API风格,既具有类型安全检查也具有DataFrame的查询优化特性;

3)用样例类来定义DataSet中数据的结构信息,样例类中每个属性的名称直接映射到DataSet中的字段名称;

4)DataSet是强类型的。比如可以有DataSet[Car],DataSet[Person]。

5)DataFrame是DataSet的特列,DataFrame=DataSet[Row] ,所以可以通过as方法将DataFrame转换为DataSet。Row是一个类型,跟Car、Person这些的类型一样,所有的表结构信息都用Row来表示。

图片4

Spark SQL编程

1、Spark Session新的起始点

在老的版本中,SparkSQL提供两种SQL查询起始点:一个叫SQLContext,用于Spark自己提供的SQL查询;一个叫HiveContext,用于连接Hive的查询。

SparkSession是Spark最新的SQL查询起始点,实质上是SQLContext和HiveContext的组合,所以在SQLContex和HiveContext上可用的API在SparkSession上同样是可以使用的。SparkSession内部封装了sparkContext,所以计算实际上是由sparkContext完成的。当我们使用 spark-shell 的时候, spark 会自动的创建一个叫做spark的SparkSession, 就像我们以前可以自动获取到一个sc来表示SparkContext

图片1

2、DataFrame

Spark SQL的DataFrame API 允许我们使用 DataFrame 而不用必须去注册临时表或者生成SQL表达式。DataFrame API 既有transformation操作也有action操作,DataFrame的转换从本质上来说更具有关系, 而 DataSet API 提供了更加函数式的 API。

2.1 创建DataFrame

在Spark SQL中SparkSession是创建DataFrame和执行SQL的入口,创建DataFrame有三种方式:通过Spark的数据源进行创建;从一个存在的RDD进行转换;还可以从Hive Table进行查询返回。

2.2 SQL风格语法

SQL语法风格是指我们查询数据的时候使用SQL语句来查询,这种风格的查询必须要有临时视图或者全局视图来辅助

1)创建一个DataFrame

1
2
scala复制代码scala> val df = spark.read.json("/opt/module/spark-local/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)对DataFrame创建一个临时表

1
scala复制代码scala> df.createOrReplaceTempView("people")

3)通过SQL语句实现查询全表

1
2
scala复制代码scala> val sqlDF = spark.sql("SELECT * FROM people")
sqlDF: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

4)结果展示

1
2
3
4
5
6
7
8
scala复制代码scala> sqlDF.show
+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|
+---+--------+

 注意:普通临时表是Session范围内的,如果想应用范围内有效,可以使用全局临时表。使用全局临时表时需要全路径访问,如:global_temp.people

5)对于DataFrame创建一个全局表

1
scala复制代码scala> df.createGlobalTempView("people")

6)通过SQL语句实现查询全表

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
scala复制代码scala> spark.sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|
+---+--------+

scala> spark.newSession().sql("SELECT * FROM global_temp.people").show()
+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|
+---+--------+

2.3 DSL风格语法

DataFrame提供一个特定领域语言(domain-specific language, DSL)去管理结构化的数据,可以在Scala, Java, Python和R中使用DSL,使用DSL语法风格不必去创建临时视图了。

1)创建一个DataFrame

1
2
scala复制代码scala> val df = spark.read.json("/opt/module/spark-local /people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)查看DataFrame的Schema信息

1
2
3
4
scala复制代码scala> df.printSchema
root
|-- age: Long (nullable = true)
|-- name: string (nullable = true)

3)只查看”name”列数据

1
2
3
4
5
6
7
8
scala复制代码scala> df.select("name").show()
+--------+
| name|
+--------+
|qiaofeng|
| duanyu|
| xuzhu|
+--------+

4)查看所有列

1
2
3
4
5
6
7
8
scala复制代码scala> df.select("*").show
+--------+---------+
| name |age|
+--------+---------+
|qiaofeng| 18|
| duanyu| 19|
| xuzhu| 20|
+--------+---------+

5)查看”name”列数据以及”age+1”数据

注意:涉及到运算的时候, 每列都必须使用$

1
2
3
4
5
6
7
8
scala复制代码scala> df.select($"name",$"age" + 1).show
+--------+---------+
| name|(age + 1)|
+--------+---------+
|qiaofeng| 19|
| duanyu| 20|
| xuzhu| 21|
+--------+---------+

6)查看”age”大于”19”的数据

1
2
3
4
5
6
scala复制代码scala> df.filter($"age">19).show
+---+-----+
|age| name|
+---+-----+
| 20|xuzhu|
+---+-----+

7)按照”age”分组,查看数据条数

1
2
3
4
5
6
7
8
scala复制代码scala> df.groupBy("age").count.show
+---+-----+
|age|count|
+---+-----+
| 19| 1|
| 18| 1|
| 20| 1|
+---+-----+

2.4 RDD转换为DataFrame

在 IDEA 中开发程序时,如果需要RDD 与DF 或者DS 之间互相操作,那么需要引入import spark.implicits._。

这里的spark不是Scala中的包名,而是创建的sparkSession 对象的变量名称,所以必须先创建 SparkSession 对象再导入。这里的 spark 对象不能使用var 声明,因为 Scala 只支持val 修饰的对象的引入。

spark-shell 中无需导入,自动完成此操作。

1
2
3
4
5
6
7
8
9
scala复制代码scala> val idRDD = sc.textFile("data/id.txt") scala> idRDD.toDF("id").show
+---+
| id|
+---+
| 1|
| 2|
| 3|
| 4|
+---+

实际开发中,一般通过样例类将RDD转换为DataFrame。

1
2
3
4
5
scala复制代码scala> case class User(name:String, age:Int) defined class User
scala> sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF.show
+--------+---+
| name|age|
+--------+---+

2.5 DataFrame转换为RDD

DataFrame其实就是对RDD的封装,所以可以直接获取内部的RDD

1
2
3
4
5
6
7
8
scala复制代码scala> val df = sc.makeRDD(List(("zhangsan",30), ("lisi",40))).map(t=>User(t._1, t._2)).toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: int]

scala> val rdd = df.rdd
rdd: org.apache.spark.rdd.RDD[org.apache.spark.sql.Row] = MapPartitionsRDD[46] at rdd at <console>:25

scala> val array = rdd.collect
array: Array[org.apache.spark.sql.Row] = Array([zhangsan,30], [lisi,40])

注意:此时得到的RDD存储类型为Row

1
2
3
4
scala复制代码scala> array(0)
res28: org.apache.spark.sql.Row = [zhangsan,30] scala> array(0)(0)
res29: Any = zhangsan
scala> array(0).getAs[String]("name") res30: String = zhangsan

3、DataSet

DataSet是具有强类型的数据集合,需要提供对应的类型信息。

3.1 创建DataSet

1)使用样例类序列创建DataSet

1
2
3
4
5
6
7
8
9
10
11
12
13
scala复制代码scala> case class Person(name: String, age: Long)
defined class Person

scala> val caseClassDS = Seq(Person("wangyuyan",2)).toDS()

caseClassDS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]

scala> caseClassDS.show
+---------+---+
| name|age|
+---------+---+
|wangyuyan| 2|
+---------+---+

2)使用基本类型的序列创建DataSet

1
2
3
4
5
6
7
8
9
10
11
12
13
14
scala复制代码scala> val ds = Seq(1,2,3,4,5,6).toDS
ds: org.apache.spark.sql.Dataset[Int] = [value: int]

scala> ds.show
+-----+
|value|
+-----+
| 1|
| 2|
| 3|
| 4|
| 5|
| 6|
+-----+

注意:在实际使用的时候,很少用到把序列转换成DataSet,更多是通过RDD来得到DataSet。

3.2 RDD转换为DataSet

SparkSQL能够自动将包含有样例类的RDD转换成DataSet,样例类定义了table的结构,样例类属性通过反射变成了表的列名。样例类可以包含诸如Seq或者Array等复杂的结构。

1)创建一个RDD

1
2
3
scala复制代码scala> val peopleRDD = sc.textFile("/opt/module/spark-local/people.txt")

peopleRDD: org.apache.spark.rdd.RDD[String] = /opt/module/spark-local/people.txt MapPartitionsRDD[19] at textFile at <console>:24

2)创建一个样例类

1
2
3
4
5
6
scala复制代码scala> case class Person(name:String,age:Int)
defined class Person
3)将RDD转化为DataSet
scala> peopleRDD.map(line => {val fields = line.split(",");Person(fields(0),fields(1). toInt)}).toDS

res0: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]

3.3DataSet转换为RDD

调用rdd方法即可。

1)创建一个DataSet

1
2
3
scala复制代码scala> val DS = Seq(Person("zhangcuishan", 32)).toDS()

DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: Long]

2)将DataSet转换为RDD

1
2
3
scala复制代码scala> DS.rdd

res1: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[6] at rdd at <console>:28

4、DataFrame与DataSet的互操作

4.1 DataFrame转为DataSet

1)创建一个DateFrame

1
2
3
scala复制代码scala> val df = spark.read.json("/opt/module/spark-local/people.json")

df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2)创建一个样例类

1
2
scala复制代码scala> case class Person(name: String,age: Long)
defined class Person

3)将DataFrame转化为DataSet

1
2
3
scala复制代码scala> df.as[Person]

res5: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]

这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便。在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用。

4.2Dataset转为DataFrame

1)创建一个样例类

1
2
scala复制代码scala> case class Person(name: String,age: Long)
defined class Person

2)创建DataSet

1
2
3
scala复制代码scala> val ds = Seq(Person("zhangwuji",32)).toDS()

ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

3)将DataSet转化为DataFrame

1
2
scala复制代码scala> var df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

4)展示

1
2
3
4
5
6
scala复制代码scala> df.show
+---------+---+
| name|age|
+---------+---+
|zhangwuji| 32|
+---------+---+

5、IDEA实践

1)Maven工程添加依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.11</artifactId>
<version>2.1.1</version>
</dependency>

2)代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
scala复制代码object SparkSQL01_Demo {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
//RDD=>DataFrame=>DataSet转换需要引入隐式转换规则,否则无法转换
//spark不是包名,是上下文环境对象名
import spark.implicits._

//读取json文件 创建DataFrame {"username": "lisi","age": 18}
val df: DataFrame = spark.read.json("D:\\dev\\workspace\\spark-bak\\spark-bak-00\\input\\test.json")
//df.show()

//SQL风格语法
df.createOrReplaceTempView("user")
//spark.sql("select avg(age) from user").show

//DSL风格语法
//df.select("username","age").show()

//*****RDD=>DataFrame=>DataSet*****
//RDD
val rdd1: RDD[(Int, String, Int)] = spark.sparkContext.makeRDD(List((1,"qiaofeng",30),(2,"xuzhu",28),(3,"duanyu",20)))

//DataFrame
val df1: DataFrame = rdd1.toDF("id","name","age")
//df1.show()

//DateSet
val ds1: Dataset[User] = df1.as[User]
//ds1.show()

//*****DataSet=>DataFrame=>RDD*****
//DataFrame
val df2: DataFrame = ds1.toDF()

//RDD 返回的RDD类型为Row,里面提供的getXXX方法可以获取字段值,类似jdbc处理结果集,但是索引从0开始
val rdd2: RDD[Row] = df2.rdd
//rdd2.foreach(a=>println(a.getString(1)))

//*****RDD=>DataSe*****
rdd1.map{
case (id,name,age)=>User(id,name,age)
}.toDS()

//*****DataSet=>=>RDD*****
ds1.rdd

//释放资源
spark.stop()
}
}
case class User(id:Int,name:String,age:Int)

Spark SQL数据的加载与保存

1、通用的加载与保存方式

1)spark.read.load是加载数据的通用方法

2)df.write.save 是保存数据的通用方法

1.1 数据加载

1)read直接加载数据

1
2
3
scala复制代码scala> spark.read.

csv format jdbc json load option options orc parquet schema table text textFile

注意:加载数据的相关参数需写到上述方法中,如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。

例如:直接加载Json数据

1
2
3
4
5
6
7
scala复制代码scala> spark.read.json("/opt/module/spark-local/people.json").show
+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|

2)format指定加载数据类型

1
scala复制代码scala> spark.read.format("…")[.option("…")].load("…")

用法详解:

(1)format(“…”):指定加载的数据类型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”

(2)load(“…”):在”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”格式下需要传入加载数据的路径

(3)option(“…”):在”jdbc”格式下需要传入JDBC相应参数,url、user、password和dbtable

例如:使用format指定加载Json类型数据

1
2
3
4
5
6
7
scala复制代码scala> spark.read.format("json").load ("/opt/module/spark-local/people.json").show
+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|

3)在文件上直接运行SQL

前面的是使用read API先把文件加载到DataFrame然后再查询,也可以直接在文件上进行查询。

1
2
3
4
5
6
7
8
9
scala复制代码scala>  spark.sql("select * from json.`/opt/module/spark-local/people.json`").show

+---+--------+
|age| name|
+---+--------+
| 18|qiaofeng|
| 19| duanyu|
| 20| xuzhu|
+---+--------+|

说明:json表示文件的格式. 后面的文件具体路径需要用反引号括起来。

1.2 保存数据

1)write直接保存数据

1
2
scala复制代码scala> df.write.
csv jdbc json orc parquet textFile… …

注意:保存数据的相关参数需写到上述方法中。如:textFile需传入加载数据的路径,jdbc需传入JDBC相关参数。

例如:直接将df中数据保存到指定目录

1
2
3
4
scala复制代码//默认保存格式为parquet
scala> df.write.save("/opt/module/spark-local/output")
//可以指定为保存格式,直接保存,不需要再调用save了
scala> df.write.json("/opt/module/spark-local/output")

2)format指定保存数据类型

1
scala复制代码scala> df.write.format("…")[.option("…")].save("…")

用法详解:

(1)format(“…”):指定保存的数据类型,包括”csv”、”jdbc”、”json”、”orc”、”parquet”和”textFile”。

(2)save (“…”):在”csv”、”orc”、”parquet”和”textFile”格式下需要传入保存数据的路径。

(3)option(“…”):在”jdbc”格式下需要传入JDBC相应参数,url、user、password和dbtable

3)文件保存选项

保存操作可以使用 SaveMode, 用来指明如何处理数据,使用mode()方法来设置。有一点很重要: 这些 SaveMode 都是没有加锁的, 也不是原子操作。

SaveMode是一个枚举类,其中的常量包括:

2021-05-13_183502

例如:使用指定format指定保存类型进行保存

1
scala复制代码df.write.mode("append").json("/opt/module/spark-local/output")

1.3 默认数据源

Spark SQL的默认数据源为Parquet格式。数据源为Parquet文件时,Spark SQL可以方便的执行所有的操作,不需要使用format。修改配置项spark.sql.sources.default,可修改默认数据源格式。

1)加载数据

1
2
3
4
5
6
7
8
9
10
scala复制代码val df = spark.read.load("/opt/module/spark-local/examples/src/main/resources/users.parquet").show

+------+--------------+----------------+
| name|favorite_color|favorite_numbers|
+------+--------------+----------------+
|Alyssa| null| [3, 9, 15, 20]|
| Ben| red| []|
+------+--------------+----------------+

df: Unit = ()

2)保存数据

1
2
3
scala复制代码scala> var df = spark.read.json("/opt/module/spark-local/people.json")
//保存为parquet格式
scala> df.write.mode("append").save("/opt/module/spark-local/output")

2、JSON文件

Spark SQL能够自动推测JSON数据集的结构,并将它加载为一个Dataset[Row]。可以通过SparkSession.read.json()去加载一个一个JSON文件。

注意:这个JSON文件不是一个传统的JSON文件,每一行都得是一个JSON串。格式如下:

1
2
3
json复制代码{"name":"Michael"}
{"name":"Andy","age":30}
{"name":"Justin","age":19}

1)导入隐式转换

1
json复制代码import spark.implicits._

2)加载JSON文件

1
2
json复制代码val path = "/opt/module/spark-local/people.json"
val peopleDF = spark.read.json(path)

3)创建临时表

1
json复制代码peopleDF.createOrReplaceTempView("people")

4)数据查询

1
2
3
4
5
6
7
json复制代码val teenagerNamesDF = spark.sql("SELECT name FROM people WHERE age BETWEEN 13 AND 19")
teenagerNamesDF.show()
+------+
| name|
+------+
|Justin|
+------+

3、MySQL

Spark SQL可以通过JDBC从关系型数据库中读取数据的方式创建DataFrame,通过对DataFrame一系列的计算后,还可以将数据再写回关系型数据库中。

**如果使用spark-shell操作,可在启动shell时指定相关的数据库驱动路径或者将相关的数据库驱动放到spark的类路径下。 **

1
2
json复制代码bin/spark-shell 
--jars mysql-connector-java-5.1.27-bin.jar

这里演示在Idea中通过JDBC对Mysql进行操作

3.1 导入依赖

1
2
3
4
5
xml复制代码<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.27</version>
</dependency>

3.2 从JDBC读数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
scala复制代码object SparkSQL02_Datasource {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()

import spark.implicits._

//方式1:通用的load方法读取
spark.read.format("jdbc")
.option("url", "jdbc:mysql://hadoop202:3306/test")
.option("driver", "com.mysql.jdbc.Driver")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "user")
.load().show


//方式2:通用的load方法读取 参数另一种形式
spark.read.format("jdbc")
.options(Map("url"->"jdbc:mysql://hadoop202:3306/test?user=root&password=123456",
"dbtable"->"user","driver"->"com.mysql.jdbc.Driver")).load().show

//方式3:使用jdbc方法读取
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
val df: DataFrame = spark.read.jdbc("jdbc:mysql://hadoop202:3306/test", "user", props)
df.show

//释放资源
spark.stop()
}
}

3.3 向JDBC写数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
scala复制代码object SparkSQL03_Datasource {
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")

//创建SparkSession对象
val spark: SparkSession = SparkSession.builder().config(conf).getOrCreate()
import spark.implicits._

val rdd: RDD[User2] = spark.sparkContext.makeRDD(List(User2("lisi", 20), User2("zs", 30)))
val ds: Dataset[User2] = rdd.toDS
//方式1:通用的方式 format指定写出类型
ds.write
.format("jdbc")
.option("url", "jdbc:mysql://hadoop202:3306/test")
.option("user", "root")
.option("password", "123456")
.option("dbtable", "user")
.mode(SaveMode.Append)
.save()

//方式2:通过jdbc方法
val props: Properties = new Properties()
props.setProperty("user", "root")
props.setProperty("password", "123456")
ds.write.mode(SaveMode.Append).jdbc("jdbc:mysql://hadoop202:3306/test", "user", props)

//释放资源
spark.stop()
}
}

case class User2(name: String, age: Long)

4、Hive

Apache Hive 是 Hadoop 上的 SQL 引擎,Spark SQL编译时可以包含 Hive 支持,也可以不包含。

包含 Hive 支持的 Spark SQL 可以支持 Hive 表访问、UDF (用户自定义函数)以及 Hive 查询语言(HiveQL/HQL)等。需要强调的一点是,如果要在 Spark SQL 中包含Hive 的库,并不需要事先安装 Hive。一般来说,最好还是在编译Spark SQL时引入Hive支持,这样就可以使用这些特性了。如果你下载的是二进制版本的 Spark,它应该已经在编译时添加了 Hive 支持。

若要把 Spark SQL 连接到一个部署好的 Hive 上,你必须把 hive-site.xml 复制到 Spark的配置文件目录中($SPARK_HOME/conf)。即使没有部署好 Hive,Spark SQL 也可以运行,需要注意的是,如果你没有部署好Hive,Spark SQL 会在当前的工作目录中创建出自己的 Hive 元数据仓库,叫作 metastore_db。此外,对于使用部署好的Hive,如果你尝试使用 HiveQL 中的 CREATE TABLE (并非 CREATE EXTERNAL TABLE)语句来创建表,这些表会被放在你默认的文件系统中的 /user/hive/warehouse 目录中(如果你的 classpath 中有配好的 hdfs-site.xml,默认的文件系统就是 HDFS,否则就是本地文件系统)。

spark-shell默认是Hive支持的;代码中是默认不支持的,需要手动指定(加一个参数即可)。

4.1 使用内嵌Hive

如果使用 Spark 内嵌的 Hive, 则什么都不用做, 直接使用即可。

Hive 的元数据存储在 derby 中, 仓库地址:$SPARK_HOME/spark-warehouse。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala复制代码scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
+--------+---------+-----------+

scala> spark.sql("create table aa(id int)")
19/02/09 18:36:10 WARN HiveMetaStore: Location: file:/opt/module/spark-local/spark-warehouse/aa specified for non-external table:aa
res2: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| aa| false|
+--------+---------+-----------+

向表中加载本地数据数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala复制代码scala> spark.sql("load data local inpath './ids.txt' into table aa")
res8: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select * from aa").show
+---+
| id|
+---+
|100|
|101|
|102|
|103|
|104|
|105|
|106|
+---+

在实际使用中, 几乎没有任何人会使用内置的 Hive。

4.2 外部Hive应用

如果Spark要接管Hive外部已经部署好的Hive,需要通过以下几个步骤。

(1)确定原有Hive是正常工作的

(2)需要把hive-site.xml拷贝到spark的conf/目录下

(3)如果以前hive-site.xml文件中,配置过Tez相关信息,注释掉

(4)把Mysql的驱动copy到Spark的jars/目录下

(5)需要提前启动hive服务,hive/bin/hiveservices.sh start

(6)如果访问不到hdfs,则需把core-site.xml和hdfs-site.xml拷贝到conf/目录

启动 spark-shell

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
scala复制代码scala> spark.sql("show tables").show
+--------+---------+-----------+
|database|tableName|isTemporary|
+--------+---------+-----------+
| default| emp| false|
+--------+---------+-----------+

scala> spark.sql("select * from emp").show
19/02/09 19:40:28 WARN LazyStruct: Extra bytes detected at the end of the row! Ignoring similar problems.
+-----+-------+---------+----+----------+------+------+------+
|empno| ename| job| mgr| hiredate| sal| comm|deptno|
+-----+-------+---------+----+----------+------+------+------+
| 7369| SMITH| CLERK|7902|1980-12-17| 800.0| null| 20|
| 7499| ALLEN| SALESMAN|7698| 1981-2-20|1600.0| 300.0| 30|
| 7521| WARD| SALESMAN|7698| 1981-2-22|1250.0| 500.0| 30|
| 7566| JONES| MANAGER|7839| 1981-4-2|2975.0| null| 20|
| 7654| MARTIN| SALESMAN|7698| 1981-9-28|1250.0|1400.0| 30|
| 7698| BLAKE| MANAGER|7839| 1981-5-1|2850.0| null| 30|
| 7782| CLARK| MANAGER|7839| 1981-6-9|2450.0| null| 10|
| 7788| SCOTT| ANALYST|7566| 1987-4-19|3000.0| null| 20|
| 7839| KING|PRESIDENT|null|1981-11-17|5000.0| null| 10|
| 7844| TURNER| SALESMAN|7698| 1981-9-8|1500.0| 0.0| 30|
| 7876| ADAMS| CLERK|7788| 1987-5-23|1100.0| null| 20|
| 7900| JAMES| CLERK|7698| 1981-12-3| 950.0| null| 30|
| 7902| FORD| ANALYST|7566| 1981-12-3|3000.0| null| 20|
| 7934| MILLER| CLERK|7782| 1982-1-23|1300.0| null| 10|
| 7944|zhiling| CLERK|7782| 1982-1-23|1300.0| null| 50|
+-----+-------+---------+----+----------+------+------+------+

4.3 运行Spark SQL CLI

Spark SQLCLI可以很方便的在本地运行Hive元数据服务以及从命令行执行查询任务。在Spark目录下执行如下命令启动Spark SQ LCLI,直接执行SQL语句,类似Hive窗口。

1
scala复制代码bin/spark-sql

4.4 代码中操作Hive

1)添加依赖

1
2
3
4
5
6
7
8
9
10
xml复制代码<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.11</artifactId>
<version>2.1.1</version>
</dependency>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>1.2.1</version>
</dependency>

2)拷贝hive-site.xml到resources目录

3)代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scala复制代码object SparkSQL08_Hive{
def main(args: Array[String]): Unit = {
//创建上下文环境配置对象
val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("SparkSQL01_Demo")
val spark: SparkSession = SparkSession
.builder()
.enableHiveSupport()
.master("local[*]")
.appName("SQLTest")
.getOrCreate()
spark.sql("show tables").show()
//释放资源
spark.stop()
}
}

Spark SQL实战

1、数据准备

Spark-sql操作所有的数据均来自Hive,首先在Hive中创建表,并导入数据。一共有3张表:1张用户行为表,1张城市表,1张产品表。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
xml复制代码CREATE TABLE `user_visit_action`(
`date` string,
`user_id` bigint,
`session_id` string,
`page_id` bigint,
`action_time` string,
`search_keyword` string,
`click_category_id` bigint,
`click_product_id` bigint,
`order_category_ids` string,
`order_product_ids` string,
`pay_category_ids` string,
`pay_product_ids` string,
`city_id` bigint)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/user_visit_action.txt' into table sparkpractice.user_visit_action;

CREATE TABLE `product_info`(
`product_id` bigint,
`product_name` string,
`extend_info` string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/product_info.txt' into table sparkpractice.product_info;

CREATE TABLE `city_info`(
`city_id` bigint,
`city_name` string,
`area` string)
row format delimited fields terminated by '\t';
load data local inpath '/opt/module/data/city_info.txt' into table sparkpractice.city_info;

2、需求

2.1 需求简介

这里的热门商品是从点击量的维度来看的,计算各个区域前三大热门商品,并备注上每个商品在主要城市中的分布比例,超过两个城市用其他显示。

例如:

2021-05-13_184130

2.2 思路分析

1)使用sql来完成,碰到复杂的需求,可以使用udf或udaf

2)查询出来所有的点击记录,并与city_info表连接,得到每个城市所在的地区,与Product_info表连接得到产品名称

3)按照地区和商品名称分组,统计出每个商品在每个地区的总点击次数

4)每个地区内按照点击次数降序排列

5)只取前三名,并把结果保存在数据库中

6)城市备注需要自定义UDAF函数

2.3 代码实现

1)UDAF函数定义

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
scala复制代码class AreaClickUDAF extends UserDefinedAggregateFunction {
// 输入数据的类型: 北京 String
override def inputSchema: StructType = {
StructType(StructField("city_name", StringType) :: Nil)
// StructType(Array(StructField("city_name", StringType)))
}

// 缓存的数据的类型: 北京->1000, 天津->5000 Map, 总的点击量 1000/?
override def bufferSchema: StructType = {
// MapType(StringType, LongType) 还需要标注 map的key的类型和value的类型
StructType(StructField("city_count", MapType(StringType, LongType)) :: StructField("total_count", LongType) :: Nil)
}

// 输出的数据类型 "北京21.2%,天津13.2%,其他65.6%" String
override def dataType: DataType = StringType

// 相同的输入是否应用有相同的输出.
override def deterministic: Boolean = true

// 给存储数据初始化
override def initialize(buffer: MutableAggregationBuffer): Unit = {
//初始化map缓存
buffer(0) = Map[String, Long]()
// 初始化总的点击量
buffer(1) = 0L
}

// 分区内合并 Map[城市名, 点击量]
override def update(buffer: MutableAggregationBuffer, input: Row): Unit = {
// 首先拿到城市名, 然后把城市名作为key去查看map中是否存在, 如果存在就把对应的值 +1, 如果不存在, 则直接0+1
val cityName = input.getString(0)
// val map: collection.Map[String, Long] = buffer.getMap[String, Long](0)
val map: Map[String, Long] = buffer.getAs[Map[String, Long]](0)
buffer(0) = map + (cityName -> (map.getOrElse(cityName, 0L) + 1L))
// 碰到一个城市, 则总的点击量要+1
buffer(1) = buffer.getLong(1) + 1L
}

// 分区间的合并
override def merge(buffer1: MutableAggregationBuffer, buffer2: Row): Unit = {
val map1 = buffer1.getAs[Map[String, Long]](0)
val map2 = buffer2.getAs[Map[String, Long]](0)

// 把map1的键值对与map2中的累积, 最后赋值给buffer1
buffer1(0) = map1.foldLeft(map2) {
case (map, (k, v)) =>
map + (k -> (map.getOrElse(k, 0L) + v))
}

buffer1(1) = buffer1.getLong(1) + buffer2.getLong(1)
}

// 最终的输出. "北京21.2%,天津13.2%,其他65.6%"
override def evaluate(buffer: Row): Any = {
val cityCountMap = buffer.getAs[Map[String, Long]](0)
val totalCount = buffer.getLong(1)

var citysRatio: List[CityRemark] = cityCountMap.toList.sortBy(-_._2).take(2).map {
case (cityName, count) => {
CityRemark(cityName, count.toDouble / totalCount)
}
}
// 如果城市的个数超过2才显示其他
if (cityCountMap.size > 2) {
citysRatio = citysRatio :+ CityRemark("其他", citysRatio.foldLeft(1D)(_ - _.cityRatio))
}
citysRatio.mkString(", ")
}
}


case class CityRemark(cityName: String, cityRatio: Double) {
val formatter = new DecimalFormat("0.00%")
override def toString: String = s"$cityName:${formatter.format(cityRatio)}"
}

2)具体实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
scala复制代码object SparkSQL04_TopN {
def main(args: Array[String]): Unit = {
val spark: SparkSession = SparkSession
.builder()
.master("local[2]")
.appName("AreaClickApp")
.enableHiveSupport()
.getOrCreate()
spark.sql("use sparkpractice")
// 0 注册自定义聚合函数
spark.udf.register("city_remark", new AreaClickUDAF)
// 1. 查询出所有的点击记录,并和城市表产品表做内连接
spark.sql(
"""
|select
| c.*,
| v.click_product_id,
| p.product_name
|from user_visit_action v join city_info c join product_info p on v.city_id=c.city_id and v.click_product_id=p.product_id
|where click_product_id>-1
""".stripMargin).createOrReplaceTempView("t1")

// 2. 计算每个区域, 每个产品的点击量
spark.sql(
"""
|select
| t1.area,
| t1.product_name,
| count(*) click_count,
| city_remark(t1.city_name)
|from t1
|group by t1.area, t1.product_name
""".stripMargin).createOrReplaceTempView("t2")

// 3. 对每个区域内产品的点击量进行倒序排列
spark.sql(
"""
|select
| *,
| rank() over(partition by t2.area order by t2.click_count desc) rank
|from t2
""".stripMargin).createOrReplaceTempView("t3")

// 4. 每个区域取top3

spark.sql(
"""
|select
| *
|from t3
|where rank<=3
""".stripMargin).show

//释放资源
spark.stop()

}
}

猜你喜欢

Hive计算最大连续登陆天数

Hadoop 数据迁移用法详解

Hbase修复工具Hbck

数仓建模分层理论

一文搞懂Hive的数据存储与压缩

大数据组件重点学习这几个

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MyBatis-Plus——代码生成器(351+版本)

发表于 2021-11-22

这是我参与11月更文挑战的第14天,活动详情查看:2021最后一次更文挑战


MyBatis-Plus 官方文档:mp.baomidou.com/guide/gener…

image-20211121150529787

这是官网上的文档,从官方文档中给快速生成代码中,可以看出代码生成器的配置结构为:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码
//1、配置数据源
FastAutoGenerator.create("url", "username", "password")
//2、全局配置
.globalConfig(...)
//3、包配置
.packageConfig(...)
//4、策略配置
.strategyConfig(...)
//5、模板引擎配置
.templateEngine(...)
//6、执行
.execute();

我们只需要通过填空的方式去配置数据源(DataSource),全局配置(GlobalConfig),包配置(PackageConfig),策略配置(StrategyConfig)和模板引擎配置(TemplateEngine)即可。

官方文档上也给出了对于每一个配置我们可以进行什么操作。

配置

数据源配置(DataSource)

属性 说明 示例
url jdbc路径 jdbc:mysql://127.0.0.1:3306/mybatis-plus
username 数据库账号 root
password 数据库密码 123456
dbQuery(IDbQuery) 数据库查询 new MySqlQuery()
schema(String) 数据库schema(部分数据库适用) mybatis-plus
typeConvert(ITypeConvert) 数据库类型转换器 new MySqlTypeConvert()
keyWordsHandler(IKeyWordsHandler) 数据库关键字处理器 new MySqlKeyWordsHandler()

全局配置(GlobalConfig)

方法 说明 示例
fileOverride 覆盖已生成文件 默认值:false
disableOpenDir 禁止打开输出目录 默认值:true
outputDir(String) 指定输出目录 /opt/baomidou/ 默认值: windows:D:// linux or mac : /tmp
author(String) 作者名 baomidou 默认值:作者
enableKotlin 开启 kotlin 模式 默认值:false
enableSwagger 开启 swagger 模式 默认值:false
dateType(DateType) 时间策略 DateType.ONLY_DATE=Date 默认值: DateType.TIME_PACK=LocalDateTime
commentDate(String) 注释日期 默认值: yyyy-MM-dd

包配置(PackageConfig)

方法 说明 示例
parent(String) 父包名 默认值:com.baomidou
moduleName(String) 父包模块名 默认值:无
entity(String) Entity 包名 默认值:entity
service(String) Service 包名 默认值:service
serviceImpl(String) Service Impl 包名 默认值:service.impl
mapper(String) Mapper 包名 默认值:mapper
mapperXml(String) Mapper XML 包名 默认值:mapper.xml
controller(String) Controller 包名 默认值:controller
other(String) 自定义文件包名 输出自定义文件时所用到的包名
pathInfo(Map<OutputFile, String>) 路径配置信息 Collections.singletonMap(OutputFile.mapperXml, “D://“)

策略配置(StrategyConfig)

方法 说明 示例
enableCapitalMode 开启大写命名 默认值:false
enableSkipView 开启跳过视图 默认值:false
disableSqlFilter 禁用 sql 过滤 默认值:true,语法不能支持使用 sql 过滤表的话,可以考虑关闭此开关
enableSchema 启用 schema 默认值:false,多 schema 场景的时候打开
likeTable(LikeTable) 模糊表匹配(sql 过滤) likeTable 与 notLikeTable 只能配置一项
notLikeTable(LikeTable) 模糊表排除(sql 过滤) likeTable 与 notLikeTable 只能配置一项
addInclude(String…) 增加表匹配(内存过滤) include 与 exclude 只能配置一项
addExclude(String…) 增加表排除匹配(内存过滤) include 与 exclude 只能配置一项
addTablePrefix(String…) 增加过滤表前缀
addTableSuffix(String…) 增加过滤表后缀
addFieldPrefix(String…) 增加过滤字段前缀
addFieldSuffix(String…) 增加过滤字段后缀
entityBuilder 实体策略配置
controllerBuilder controller 策略配置
mapperBuilder mapper 策略配置
serviceBuilder service 策略配置

模板引擎配置(TemplateEngine)

默认 Velocity ;可选模板引擎 Beetl 或 Freemarker。

模板引擎 代码
Velocity 默认 .templateEngine(new VelocityTemplateEngine())
Freemarker 可选 .templateEngine(new FreemarkerTemplateEngine())
Beetl 可选 .templateEngine(new BeetlTemplateEngine())

代码生成器测试样例

那么知道配置之后我们可以自己写一个操作一下。

步骤:

1、创建测试数据库 mp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
sql复制代码CREATE DATABASE mp;

USE `mp`;

/*Table structure for table `student` */

CREATE TABLE `student` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '学号',
`name` varchar(50) DEFAULT NULL COMMENT '姓名',
`score` double DEFAULT NULL COMMENT '成绩',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

/*Table structure for table `user` */

CREATE TABLE `user` (
`id` int(11) NOT NULL AUTO_INCREMENT COMMENT '用户id',
`username` varchar(50) NOT NULL COMMENT '用户名',
`password` varchar(50) DEFAULT NULL COMMENT '密码',
`create_time` date DEFAULT NULL COMMENT '创建时间',
`modify_time` date DEFAULT NULL COMMENT '最后一次修改时间',
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
Field Type Comment
id int 用户id
username varchar(50) 用户名
password varchar(50) 密码
create_time date 创建时间
modify_time date 最后一次修改时间
Field Type Comment
id int 学号
name varchar(50) 名字
score double 成绩

2、创建一个 Spring-Boot 项目

image-20211121144541404

3、在 pom.xml 中导入相关依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
xml复制代码<!--spring-boot-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<!--SpringBootTest-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<!--swagger-->
<dependency>
<groupId>io.swagger</groupId>
<artifactId>swagger-annotations</artifactId>
<version>1.6.3</version>
</dependency>
<!--lombok-->
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
<!--mysql 驱动-->
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.26</version>
</dependency>
<!--mybatis-plus-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-boot-starter</artifactId>
<version>3.4.3.4</version>
</dependency>
<!--mybatis-plus-generator 生成器-->
<dependency>
<groupId>com.baomidou</groupId>
<artifactId>mybatis-plus-generator</artifactId>
<version>3.5.1</version>
</dependency>
<!--velocity-->
<dependency>
<groupId>org.apache.velocity</groupId>
<artifactId>velocity-engine-core</artifactId>
<version>2.3</version>
</dependency>
<!--freemarker-->
<dependency>
<groupId>org.freemarker</groupId>
<artifactId>freemarker</artifactId>
<version>2.3.31</version>
</dependency>
<!--beetl 模板-->
<dependency>
<groupId>com.ibeetl</groupId>
<artifactId>beetl</artifactId>
<version>3.8.1.RELEASE</version>
</dependency>

注:模板引擎配置,默认 Velocity 可选模板引擎 Beetl 或 Freemarker,实际使用中只导入使用模板对应的依赖即可,不用全部导入。

4、编写一个mian方法,加上框架

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public static void main(String[] args) {
//1、配置数据源
FastAutoGenerator.create("url", "username", "password")
//2、全局配置
.globalConfig(...)
//3、包配置
.packageConfig(...)
//4、策略配置
.strategyConfig(...)
//5、模板引擎配置
.templateEngine(...)
//6、执行
.execute();
}

5、进行数据源配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public static void main(String[] args) {
//1、配置数据源
FastAutoGenerator.create("jdbc:mysql://localhost:3306/mp", "root", "123456")
//2、全局配置
.globalConfig(...)
//3、包配置
.packageConfig(...)
//4、策略配置
.strategyConfig(...)
//5、模板引擎配置
.templateEngine(...)
//6、执行
.execute();
}

6、进行全局配置

注:3.5.1+版本开始支持 lambda 表达式

1
2
3
4
5
6
7
8
9
10
java复制代码	//2、全局配置
.globalConfig(builder -> {
builder.author("Jie") // 设置作者名
.outputDir(System.getProperty("user.dir") + "/src/main/java") //设置输出路径
.commentDate("yyyy-MM-dd hh:mm:ss") //注释日期
.dateType(DateType.ONLY_DATE) //定义生成的实体类中日期的类型 TIME_PACK=LocalDateTime;ONLY_DATE=Date;
.fileOverride() //覆盖之前的文件
.enableSwagger() //开启 swagger 模式
.disableOpenDir(); //禁止打开输出目录,默认打开
});

7、进行 包配置

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码	//3、包配置
.packageConfig(builder -> {
builder.parent("com") // 设置父包名
.moduleName("mp") //设置模块包名
.entity("entity") //pojo 实体类包名
.service("service") //Service 包名
.serviceImpl("serviceImpl") // ***ServiceImpl 包名
.mapper("mapper") //Mapper 包名
.xml("mapper") //Mapper XML 包名
.controller("controller") //Controller 包名
.other("utils") //自定义文件包名
.pathInfo(Collections.singletonMap(OutputFile.mapperXml, System.getProperty("user.dir")+"/src/main/resources/mapper")) //配置 **Mapper.xml 路径信息:项目的 resources 目录的 Mapper 目录下
});

8、策略配置

策略配置中需要对 Mapper,Service,Entity,Controller 类的生成策略进行单独的配置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码	//4、策略配置
.strategyConfig(builder -> {
builder.addInclude("user", "student") // 设置需要生成的数据表名
.addTablePrefix("t_", "c_") // 设置过滤表前缀

//4.1、Mapper策略配置
.mapperBuilder()
.superClass(BaseMapper.class) //设置父类
.formatMapperFileName("%sMapper") //格式化 mapper 文件名称
.enableMapperAnnotation() //开启 @Mapper 注解
.formatXmlFileName("%sXml"); //格式化 Xml 文件名称

//4.2、service 策略配置
.serviceBuilder()
.formatServiceFileName("%sService") //格式化 service 接口文件名称,%s进行匹配表名,如 UserService
.formatServiceImplFileName("%sServiceImpl") //格式化 service 实现类文件名称,%s进行匹配表名,如 UserServiceImpl

//4.3、实体类策略配置
.entityBuilder()
.enableLombok() //开启 Lombok
.disableSerialVersionUID() //不实现 Serializable 接口,不生产 SerialVersionUID
.logicDeleteColumnName("deleted") //逻辑删除字段名
.naming(NamingStrategy.underline_to_camel) //数据库表映射到实体的命名策略:下划线转驼峰命
.columnNaming(NamingStrategy.underline_to_camel) //数据库表字段映射到实体的命名策略:下划线转驼峰命
.addTableFills(
new Column("create_time", FieldFill.INSERT),
new Column("modify_time", FieldFill.INSERT_UPDATE)
) //添加表字段填充,"create_time"字段自动填充为插入时间,"modify_time"字段自动填充为插入修改时间
.enableTableFieldAnnotation() // 开启生成实体时生成字段注解

//4.4、Controller策略配置
.controllerBuilder()
.formatFileName("%sController") //格式化 Controller 类文件名称,%s进行匹配表名,如 UserController
.enableRestStyle() //开启生成 @RestController 控制器

})

9、模板引擎配置

1
2
3
4
5
6
java复制代码	//5、模板引擎
.templateEngine(new VelocityTemplateEngine()) //默认
/*
.templateEngine(new FreemarkerTemplateEngine())
.templateEngine(new BeetlTemplateEngine())
*/

10、执行

1
2
java复制代码	//6、执行
.execute();

执行效果演示:

注:这里使用的是附2的交互式生成,两者源码几乎没有区别。

代码自动生成效果

image-20211121162318620

附1:快速生成样例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
java复制代码
public static void main(String[] args) {
//1、配置数据源
FastAutoGenerator.create("jdbc:mysql://localhost:3306/mp", "root", "123456")
//2、全局配置
.globalConfig(builder -> {
builder.author("Jie") // 设置作者名
.outputDir(System.getProperty("user.dir") + "/src/main/java") //设置输出路径:项目的 java 目录下
.commentDate("yyyy-MM-dd hh:mm:ss") //注释日期
.dateType(DateType.ONLY_DATE) //定义生成的实体类中日期的类型 TIME_PACK=LocalDateTime;ONLY_DATE=Date;
.fileOverride() //覆盖之前的文件
.enableSwagger() //开启 swagger 模式
.disableOpenDir(); //禁止打开输出目录,默认打开
})
//3、包配置
.packageConfig(builder -> {
builder.parent("com") // 设置父包名
.moduleName("mp") //设置模块包名
.entity("entity") //pojo 实体类包名
.service("service") //Service 包名
.serviceImpl("serviceImpl") // ***ServiceImpl 包名
.mapper("mapper") //Mapper 包名
.xml("mapper") //Mapper XML 包名
.controller("controller") //Controller 包名
.other("utils") //自定义文件包名
.pathInfo(Collections.singletonMap(OutputFile.mapperXml, System.getProperty("user.dir")+"/src/main/resources/mapper")) //配置 mapper.xml 路径信息:项目的 resources 目录下
})
//4、策略配置
.strategyConfig(builder -> {
builder.addInclude("user", "student") // 设置需要生成的数据表名
.addTablePrefix("t_", "c_") // 设置过滤表前缀

//4.1、Mapper策略配置
.mapperBuilder()
.superClass(BaseMapper.class) //设置父类
.formatMapperFileName("%sMapper") //格式化 mapper 文件名称
.enableMapperAnnotation() //开启 @Mapper 注解
.formatXmlFileName("%sXml"); //格式化 Xml 文件名称

//4.2、service 策略配置
.serviceBuilder()
.formatServiceFileName("%sService") //格式化 service 接口文件名称,%s进行匹配表名,如 UserService
.formatServiceImplFileName("%sServiceImpl") //格式化 service 实现类文件名称,%s进行匹配表名,如 UserServiceImpl

//4.3、实体类策略配置
.entityBuilder()
.enableLombok() //开启 Lombok
.disableSerialVersionUID() //不实现 Serializable 接口,不生产 SerialVersionUID
.logicDeleteColumnName("deleted") //逻辑删除字段名
.naming(NamingStrategy.underline_to_camel) //数据库表映射到实体的命名策略:下划线转驼峰命
.columnNaming(NamingStrategy.underline_to_camel) //数据库表字段映射到实体的命名策略:下划线转驼峰命
.addTableFills(
new Column("create_time", FieldFill.INSERT),
new Column("modify_time", FieldFill.INSERT_UPDATE)
) //添加表字段填充,"create_time"字段自动填充为插入时间,"modify_time"字段自动填充为插入修改时间
.enableTableFieldAnnotation() // 开启生成实体时生成字段注解

//4.4、Controller策略配置
.controllerBuilder()
.formatFileName("%sController") //格式化 Controller 类文件名称,%s进行匹配表名,如 UserController
.enableRestStyle() //开启生成 @RestController 控制器
})
//5、模板
.templateEngine(new VelocityTemplateEngine())
/*
.templateEngine(new FreemarkerTemplateEngine())
.templateEngine(new BeetlTemplateEngine())
*/
//6、执行
.execute();
}

附2:交互式生成样例代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
java复制代码public static void main(String[] args) {
Scanner scan = new Scanner(System.in);
System.out.println("=====================数据库配置=======================");
System.out.println("请输入 URL");
String url = scan.next();
System.out.println("请输入 username");
String username = scan.next();
System.out.println("请输入 password");
String password = scan.next();

FastAutoGenerator.create(url, username, password)
// 全局配置
.globalConfig((scanner, builder) -> builder.author(scanner.apply("=====================全局配置=======================\n请输入作者名称?"))
.outputDir(System.getProperty("user.dir") + "/src/main/java")
.commentDate("yyyy-MM-dd hh:mm:ss")
.dateType(DateType.TIME_PACK)
.enableSwagger()
.fileOverride()
.enableSwagger()
.disableOpenDir()
)
// 包配置
.packageConfig((scanner, builder) -> builder.parent(scanner.apply("=====================包配置=======================\n请输入包名?"))
.moduleName(scanner.apply("请输入父包模块名?"))
.entity("entity")
.service("service")
.serviceImpl("serviceImpl")
.mapper("mapper")
.xml("mapper")
.other("utils")
.pathInfo(Collections.singletonMap(OutputFile.mapperXml, System.getProperty("user.dir")+"/src/main/resources/mapper"))
)
// 策略配置
.strategyConfig((scanner, builder) -> {
builder.addInclude(getTables(scanner.apply("=====================策略配置=======================\n请输入表名,多个英文逗号分隔?所有输入 all")))
.serviceBuilder()
.formatServiceFileName("%sService")
.formatServiceImplFileName("%sServiceImpl")
.entityBuilder() //实体类策略配置
.enableLombok() //开启 Lombok
.disableSerialVersionUID()
.logicDeleteColumnName("deleted") //逻辑删除字段
.naming(NamingStrategy.underline_to_camel)
.columnNaming(NamingStrategy.underline_to_camel)
.addTableFills(new Column("create_time", FieldFill.INSERT), new Column("modify_time", FieldFill.INSERT_UPDATE))
.enableTableFieldAnnotation() // 开启生成实体时生成字段注解
.controllerBuilder()
.formatFileName("%sController")
.enableRestStyle()
.mapperBuilder()
.superClass(BaseMapper.class)
.formatMapperFileName("%sMapper")
.enableMapperAnnotation() //@mapper
.formatXmlFileName("%sMapper");
})
/*
模板引擎配置,默认 Velocity 可选模板引擎 Beetl 或 Freemarker
.templateEngine(new BeetlTemplateEngine())
.templateEngine(new FreemarkerTemplateEngine())
*/
.execute();
}

// 处理 all 情况
protected static List<String> getTables(String tables) {
return "all".equals(tables) ? Collections.emptyList() : Arrays.asList(tables.split(","));
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

理解TCP的三次握手与四次挥手

发表于 2021-11-22

这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战

TCP是面向连接的协议,运输层依靠tcp协议来传输报文。TCP运输连接的建立和释放是每一次面向连接的通信比不可少的过程。运输过程包括三个阶段,即连接建立、数据传送、连接释放。


TCP报文段的首部格式


由于tcp连接时传输的是报文段,在了解运输过程前先了解下tcp报文段的格式,理解了各字段的意义,对于三次握手和四次挥手的理解很有帮助。

  • 源端口和目的端口: 各占2个字节,分别为源端口号和目的端口号;
  • 序号:占4字节;序号范围是[0,2^32-1] ,当序号增加到2^32-1后,下一个就又回到0。在一个TCP连接中传送的字节流中的每一个字节都按顺序编号。首部的序号字段表示本报文段所发送的数据第一个字节的序号。例如当一个报文段的序号字段值为301,携带的数据共有100字节,那么下一个报文段的数据序号应当从401开始。
  • 确认号:占4字节;是期望收到对方下一个报文段的第一个数据字段的序号。例如服务B收到了A发送的报文段,其首部序号为501,数据长度是200字节,这表明B收到了A发送的到序号700为止的数据。因此B期望下一次收到A的报文序号为701,所以会返回确认号701给A。即确认号(ack)= 收到的序号 + (数据长度) + 1;

注意:在建立TCP连接时是不带数据的,所以确认号刚好等于收到的序号+1;

  • 数据偏移: 占4位。它指出TCP报文段的数据起始处距离TCP报文段起始处有多远。
  • 保留:占6位,保留今后使用,但目前置为0;
  • 紧急URG: 当URG=1,表示紧急指针字段有效,它会告诉系统报文段中有紧急数据,应优先处理。例如用户键盘的Crtl+C终止命令。
  • 确认号ACK: 与上述确认号不同,该确认号只有0或1;当置为1的时候,确认字段才有效;Tcp规定,在连接建立后所有传送的报文段必须把ACK置为1;
  • 推送PSH: push标志,为1表示是带有push标志的数据,指示接收方在接收到该报文段以后,应尽快将这个报文段交给应用程序,而不是在缓冲区排队。
  • 复位RST:重置连接标志,用于重置由于主机崩溃或其他原因而出现错误的连接。或者用于拒绝非法的报文段和拒绝连接请求。当RST=1时,表明TCP连接中出现严重差错(如由于主机崩溃或其他原因),必须释放连接;
  • 同步SYN: 在连接建立时用来同步序号。当SYN=1,而ACK=0时,表示这是一个连接请求报文段。当SYN=1,ACK=1时,表示这是一个连接接收报文段。因此,SYN=1就表示这是一个连接请求或连接接受报文;
  • 终止FIN: 用来释放一个连接。当FIN=1时,表明此报文段的发送方的数据已发送完毕,并要求释放运输连接。
  • 窗口:占2字节。窗口指的是发送本报文段的一方的接受窗口(而不是自己的发送窗口)。窗口值作为接收方让发送方设置其发送窗口的数据;

TCP的三次握手

TCP的连接建立,即TCP的三次挥手。连接建立前,最初两端的TCP进程都处于CLOSED(关闭)状态;

  1. 连接建立时,客户端A会先发送连接请求到服务端B,发送序号seq=x,将SYN同步位置为1,A进入SYN-SENY(即同步已发送)状态;
  2. B收到连接请求报文后,如同意连接,即向A发起确认;将发送序号seq置为y,同步位SYN置为1,确认序号ACK也置为1,同时将确认号ack置为x+1。此时B进入SYN-REVD(即同步收到)的状态;
  3. 客户端A收到B的确认请求后,会再一次发送确认号给B,将确认号ACK置为1,确认号ack=y+1,确认报文段不携带数据的话,是不需要消耗序号的,seq=x+1。这时TCP连接已经建立,A进入ESTABLISHED(已建立连接)状态。当B收到了A的确认后,也进入ESTABLISHED状态。

TCP的四次挥手

TCP的连接释放,即TCP的四次挥手。

  1. 客户A向服务发送连接释放报文,将FIN位置为1,其序号seq=u+1,u表示前面已发送数据的字节值。此时A进入FIN-WAIT-1(终止等待状态1)状态,等待B的确认;
  2. B收到连接释放报文后即发出确认,确认号ack=u+2,seq=v,ACK=1。然后B进入CLOSE-WAIT(关闭等待)状态。此时服务B会通知高层应用进程,A到B这个方向的连接就释放掉了,这时TCP处于半关闭状态(half-close)。但是B到A方向的连接还没释放,还是可以继续发送数据的;A收到B的确认后,就进入FIN-WAIT-2(终止等待状态2)。
  3. 若B已经没有要向A发送的数据,应用进程会通知TCP释放连接。这是B会发出连接释放的报文,将FIN位置为1,继续发送ack=u+2的确认号。置ACK=1,序号seq=w;此后B进入LAST-BACK(最后确认)状态,等待A的确认。
  4. A在收到B的连接释放报文后,必须对此发出确认。将ACK=1,ack=w+1,序号seq=u+2的报文发送给B,B收到确认后,就进入CLOSED(关闭)状态了。然后进入TIME-WAIT(时间等待)状态。注意此时TCP连接还没有释放掉,A必须等待时间等待器设置的2MSL(RFC793 定义了MSL为2分钟,Linux设置成了30s)后,A才进入CLOSED状态。时间MSL叫做最长报文段寿命。

tcp的常见问题

1. 三次握手时,为什么需要第三次确认?

是为了防止已失效的连接请求报文段突然又传送到了服务B,从而产生了错误。这里【已失效的连接请求报文段】指的是当A发起第一个连接请求时,由于网络问题等原因没发送给B,A在等待一段时间后会重新发送报文段,这时候A就连续发送了两条请求报文段,而当连接建立后,此前因网络问题原因滞留的报文又到了服务B,服务B会认为是新的连接请求,从而再发送确认报文给A;如果不经过第三次的握手确认,那么A重传后的请求又可以建立起了连接,从而引发B资源的浪费。

2. TIME-WAIT是什么?为什么要等待2MSL?

TIME-WAIT 指的是客户端的时间等待状态;需要等待2MSL有两个原因。

一是为了保证A发送的最后一个ACK报文段能够达到B,因为这个ACK报文段可能是会丢失,因而使得处于LAST-ACK状态B收不到对已发送FIN+ACK报文段的确认;B就会超时重传这个FIN+ACK报文段,而A在等待的2MSL中就可以收到B超时重传的报文。接着A重传一次确认,重新启动2MSL计时器,最后A、B都正常进入CLOSED状态。如果A在TIME-WAIT状态不等待一段时间,而在发送完ACK报文段后立即释放连接,那么就会收不到B重传的确认报文,因而也不会再发送一次确认报文段。这样,B就无法按照正常步骤进入ClOSED状态了。

二是为了防止已失效的连接请求报文段出现在本连接中。A在发送完最后一个ACK报文段后,再经过2MSL,就可以使本连接持续的时间内所产生的所有报文段从网络中消失。这样下一次新的连接中不会出现这种旧的连接请求报文段了。

参考文章

谢希仁 – 《计算机网络》

网络协议 - TCP 协议详解

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

十大经典排序之:冒泡排序 |快速排序 冒泡排序 快速排序

发表于 2021-11-22

「这是我参与11月更文挑战的第22天,活动详情查看:2021最后一次更文挑战」

冒泡排序

冒泡原理

冒泡排序相信是很多人入门必学算法,是打开我们走向算法大门的钥匙,这个算法的名字由来是因为越小的元素会经由交换慢慢“浮”到数列的顶端(升序或降序排列),就如同碳酸饮料中二氧化碳的气泡最终会上浮到顶端一样,故名“冒泡排序”。

原理:比较两个相邻的元素,将值大的元素交换到右边
直观表达:每一趟遍历,将一个最大的数移到序列末尾

算法实现

1、算法描述

比较相邻的元素,如果前一个比后一个大,交换之。
第一趟排序第1个和第2个一对,比较与交换,随后第2个和第3个一对比较交换,这样直到倒数第2个和最后1个,将最大的数移动到最后一位。
第二趟将第二大的数移动至倒数第二位
……

2、图示
在这里插入图片描述
3、算法空间复杂度和时间复杂度

时间复杂度:

  • 最坏:o(n2n^{2 }n2)
  • 最好:o(nn^{}n)
  • 平均:o(n2n^{2 }n2)

空间复杂度(辅助存储):o(1)

稳定性:稳定

例题

-》用冒泡排序将以下数列按照从小到大的顺序输出:

123,45,6,22,99,1,38,41,-6,0

java代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class Test {
public static void main(String[] args) {
int[] arr=new int[]{123,45,6,22,99,1,38,41,-6,0};
//冒泡排序 比较相邻的元素,如果第一个比第二个大,就交换他们两个

//需进行length-1次冒泡
for(int i=0;i<arr.length-1;i++)
{
for(int j=0;j<arr.length-1-i;j++)
{
if(arr[j]>arr[j+1])
{
int temp=arr[j];
arr[j]=arr[j+1];
arr[j+1]=temp;
}
}
}
System.out.println("从小到大排序后的结果是:");
for(int i=0;i<arr.length;i++)
System.out.print(arr[i]+"\t");
}

}

快速排序

快排原理

同样的,必学算法之一。快速排序(Quicksort)是对冒泡排序算法的一种改进。快速排序使用分治法(Divide and conquer)策略来把一个串行(list)分为两个子串行(sub-lists)。

算法实现

1、算法描述
快速排序,说白了就是给基准数据找其正确索引位置的过程.

  1. 从数列中挑出一个元素,称为 “基准”(pivot);
  2. 重新排序数列,所有元素比基准值小的摆放在基准前面,所有元素比基准值大的摆在基准的后面(相同的数可以到任一边)。在这个分区退出之后,该基准就处于数列的中间位置。这个称为分区(partition)操作;
  3. 递归地(recursive)把小于基准值元素的子数列和大于基准值元素的子数列排序;

2、图示
在这里插入图片描述
3、算法空间复杂度和时间复杂度

时间复杂度:

  • 最坏:o(n2n^{2}n2)
  • 最好:o(nlog⁡2nn\log _{2}nnlog2​n)
  • 平均:o(nlog⁡2nn\log _{2}nnlog2​n)

空间复杂度(辅助存储):o(nlog⁡2nn\log _{2}nnlog2​n)

稳定性:不稳定

例题

用快速排序将以下数列按照从小到大的顺序输出:

-》66,13,51,76,81,26,57,69,23

java代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码public class Test {
static void quick_sort(int q[], int l, int r)
{
if (l >= r) return;

int i = l - 1, j = r + 1, x = q[l + r >> 1];
while (i < j)
{
do i ++ ; while (q[i] < x);
do j -- ; while (q[j] > x);
//交换
if (i < j) {
q[i] = q[i] ^ q[j];
q[j] = q[i] ^ q[j];
q[i] = q[i] ^ q[j];
}

}

quick_sort(q, l, j);
quick_sort(q, j + 1, r);
}


public static void main(String[] args) {
int[] arr=new int[]{66,13,51,76,81,26,57,69,23};
//快速排序 分治
quick_sort(arr, 0, arr.length - 1);


System.out.println("快速排序后的结果是:");
for(int i=0;i<arr.length;i++)
System.out.print(arr[i]+"\t");
}

}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

说说在 Java 中如何比较两个对象

发表于 2021-11-22

Common Lang 中的 Builder 包内提供了一个 DiffBuilder 类,可以比较两个对象,并返回不同的部分。

  1. 首先在要比较对象的类中实现 Diffable 接口,然后实现 DiffResult diff(T obj) 方法。
  2. 在DiffResult diff(T obj) 方法中,新建一个 DiffBuilder 对象,把需要比较的类属性一一放入 DiffBuilder 中。

DiffBuilder 的构造函数有三个入参,lhs 是当前对象,rhs 是要比较的对象,style 是比较结果输出的格式。

1
2
java复制代码DiffBuilder(final T lhs, final T rhs,
final ToStringStyle style)

假设有一个 Person 类定义了三个属性 name、age 和 smoker。以下是使用不同的 ToStringStyle 所对应的输出内容。

ToStringStyle 说明 输出
SHORT_PREFIX_STYLE 简短 Person[name=deniro,smoker=false] differs from Person[name=jack,smoker=true]
DEFAULT_STYLE 默认 org.apache.commons.lang3.builder.Person@9f70c54[name=deniro,smoker=false] differs from org.apache.commons.lang3.builder.Person@234bef66[name=jack,smoker=true]。
MULTI_LINE_STYLE 多行 org.apache.commons.lang3.builder.Person@9f70c54[ name=deniro smoker=false] differs from org.apache.commons.lang3.builder.Person@234bef66[ name=jack smoker=true]
NO_FIELD_NAMES_STYLE 不包含类属性名 org.apache.commons.lang3.builder.Person@9f70c54[deniro,false] differs from org.apache.commons.lang3.builder.Person@234bef66[jack,true]。
SIMPLE_STYLE 简洁 deniro,false differs from jack,true。
NO_CLASS_NAME_STYLE 不包含类名 [name=deniro,smoker=false] differs from [name=jack,smoker=true]。
JSON_STYLE JSON {“name”:”deniro”,”smoker”:false} differs from {“name”:”jack”,”smoker”:true}。

感觉 NO_CLASS_NAME_STYLE 与 JSON_STYLE 看的更清楚。

完整示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码public class Person implements Diffable<Person> {

private static final Logger log = LoggerFactory.getLogger(Person.class);

String name;
int age;
boolean smoker;

public DiffResult diff(Person obj) {
// No need for null check, as NullPointerException correct if obj is null
return new DiffBuilder(this, obj, ToStringStyle.SHORT_PREFIX_STYLE)
.append("name", this.name, obj.name)
.append("age", this.age, obj.age)
.append("smoker", this.smoker, obj.smoker)
.build();
}

public static void main(String[] args) {
Person a = new Person();
a.name = "deniro";
a.age = 22;
a.smoker = false;

Person b = new Person();
b.name = "jack";
b.age = 22;
b.smoker = true;

DiffResult result = a.diff(b);
log.info("result -> {}。", result);
}
}

输出:

1
bash复制代码result -> Person[name=deniro,smoker=false] differs from Person[name=jack,smoker=true]。

如果是个大类,那么需要很多个 append 方法把这个大类的属性放到 DiffBuilder 中,不太方便。可以写个程序生成 DiffBuilder 初始化代码,或者尝试用反射来优化这部分代码。

参考资料

commons.apache.org/proper/comm…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…230231232…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%