开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

TiDB 在微众银行核心批量场景的实践

发表于 2021-10-11

本文根据 PingCAP DevCon 2021 上来自微众银行资深数据库架构师黄蔚的分享整理而成,主要阐述 TiDB 在微众银行的应用实践,包括微众银行选择 TiDB 的背景和 TiDB 的部署架构,以及 TiDB 在贷款核心批量场景的应用,最后分享了基于 TiDB 优化方案的最佳实践和未来规划。

TiDB 的产品优势

从 2018 年底微众银行开始接触 TiDB 的团队,到 2019 年上线,TiDB 在数据库的选型之中展现了很多独有的优势。

TiDB 兼容 MySQL 协议,同时也兼容 MySQL 的生态工具,比如备份、恢复、监控等等,不管是应用本身还是运维或是开发人员,从 MySQL 迁移到 TiDB,其成本和门槛都较低。对于 TiDB 原生的计算、存储分离的架构,用户将不必担心容量或者单机性能的瓶颈,某种程度可以把 TiDB 当作一个很大的 MySQL 来使用。同时 TiDB 的数据多副本强一致的特性对金融场景来说十分重要,TiDB 还天然支持多 IDC 的部署架构,可以支持应用做到同城多活的部署架构。此外,TiDB 开源社区的运营也非常活跃,比如在 AskTUG 平台可以看到很多用户的典型问题的处理方法,包含大量的宝贵经验可以借鉴,可以进一步降低用户使用 TiDB 的门槛。

现在使用 TiDB 的用户越来越多,不管是互联网头部厂商或者金融行业用户都在大量使用,这也是 TiDB 产品越来越成熟的体现,也给更多用户使用 TiDB 带来了更强的信心。

TiDB 在微众银行的部署架构

TiDB 的特性是否能够满足金融机构高可用的架构需求?

这是 TiDB 在微众银行的部署架构,如图所示,首先 TiKV 选择三副本,分别部署在同城的三个数据中心,这样可以实现 IDC 级别的高可用,同时在每个 IDC 部署了一套 TiDB Server,通过绑定到负载均衡器提供 VIP 服务,这样使得应用可以做到多活接入的模式。这套架构也经受过 IDC 级别的真实故障的演练验证,将其中一个 IDC 的网络全部断掉,观察到集群可以快速恢复,我们认为TiDB 能够符合金融场景高可用的要求。

核心批量核算场景

贷款核心批量核算是金融行业比较经典且非常重要的场景,我们将其接入到了 TiDB。下图是之前微众银行贷款核心批量应用场景的架构,左边这部分有很多业务单元,相当于把用户的数据做了单元化拆分,每一个单元化数据可能不一样,但架构和部署模型是一样的,底层用的是单实例数据库,同时批量是在每一个单实例数据库上面运行,最终把批量结果 ETL 到大数据平台给下游使用,那么这个架构有哪些瓶颈或者优化点呢?

\

\

它是一个纯批量的应用,意味着有大量的批量的写入、更新以及运算,而且数据量都特别大,亿级或者十亿级别以上,随着业务快速开展,借据数、用户数和流水数据也在持续增涨,如果使用单机数据库来承载,首先受限于单机数据库的性能上限,跑批耗时会越来越长,而且之前单机数据库负载已经很高,IO、CPU 已经达到 70% ~ 80%,如果想提升跑批效率,应用通过增加并发的方式是有风险的,因为数据库负载太高可能造成主备复制延迟或者遇到故障无法进行快速主备切换,所以效率很难提升;其次单机数据库对这种亿级或者十亿级的表加字段或者数据管理难度非常大,虽然微众银行日常会用 online DDL 工具比如 pt-online-schema-change 来做表变更操作,但也会有小概率锁表风险。另外基于资源利用率考虑,批量系统和联机系统复用了同一套单机数据库,所以如果批量任务造成高负载,很可能会影响联机交易。基于这些背景问题,微众银行借助 TiDB 做了架构优化的升级。

升级改造后的架构如下图所示,可以看到微众银行把各个业务单元的数据通过 DM 工具把数据实时同步和汇总到 TiDB,然后批量 APP 直接基于 TiDB 做批量计算,再把结果传到大数据平台,相当于借助了 TiDB 的水平扩展性来达到批量效率的水平扩展。之前是传统的 MySQL 主备架构,会要求 APP 服务器 要跟 MySQL 的主节点是部署在同一个 IDC,而如果是跨机房访问,网络延时会比较大进而影响批量耗时,所以其他 IDC 的 APP服务器 就处于 standby 的状态,会有一定的资源浪费,而 TiDB 架构的所有 TiKV 节点可以同时读写,所以可以多个 IDC 同时启动批量任务,最大化资源利用率。

\

价值收益

TiDB 在微众银行贷款核心业务场景中的使用,总结有三个主要的价值收益。

  • 批量效率的提高。下图左边是微众银行其中一个贷款业务的账单日的批量耗时对比,可以看到在单实例架构下面,批量大概是跑三个多小时,而微众银行通过借助 TiDB 进行架构的升级优化后,耗时减少到了 30 分钟左右,有绝对效率上的提升。

  • 线性水平扩展。微众银行的需求不仅仅是效率提升,而且要求其做到水平扩展,也就是弹性伸缩。因为随着业务发展,借据量包括用户量在持续增长,如果存在热点或者其他瓶颈,以后想继续提升将十分困难,下图右边是展示其批量耗时的对比情况,在初始的一个资源情况下大概跑 25 分钟,如果数据量翻倍,耗时增加到 50 分钟,如果想把这个耗时降下来再把资源也翻倍,可以发现耗时又降回到 26 分钟左右,可见已经具备线性扩展能力。所以除了效率上的提升,线性扩展能力的一大好处就是随着业务持续的发展,借据数、借据量都在快速增长,这套架构将无需担心业务快速增长可能出现的技术瓶颈,业务可以更加聚焦于产品本身,这是 TiDB 带来的一个实实在在的业务价值。

  • 批量系统与联机交易系统分离。前面提到跟联机系统是因为资源的考虑做了一个复用,现在拆分之后实际上跟联机就已经完全剥离了,并且没有像单机数据库的主备复制延迟,可以最大化资源利用率来提升批量效率。

基于 TiDB 的优化

以上这些收益可以看到比较明显的效果,那么微众银行做了哪些优化或者遇到了哪些问题呢?

  • SQL 模式优化。TiDB 因为本身分布式架构其单条请求时延会相对比 MySQL 更高,所以需要去把一些跟数据库频繁交互的请求进行打包,尽量减少交互,比如把多个 select 改成 in 的方式,把 insert 多条改成 insert 单条多 values 的方式,把多个 update 改成 replace 多条 values 的方式。此外,因为把多个单元化的数据全部汇总到一个 TiDB 集群,那么它的单表数据量一定非常非常大,如果跑了一个比较低效的 SQL,很容易把这个集群搞垮,比如存在着 OOM 的风险,所以需要特别注意 SQL 的审核和调优。再比如早期版本会出现执行计划不准确的问题,在 4.0 版本支持 SQL 执行计划绑定,可以对一些高频的 SQL 进行绑定,使其运行更加稳定。因为微众银行接入 TiDB 比较早期,所以主要使用的是乐观锁模式,应用也做了很多适配,目前适配乐观锁模式的代码已经固化为一个通用模块,新系统接入时直接拿来用就可以了。\
  • 热点与应用并发优化。使用 TiDB 比较多的用户可能会对热点这个问题比较熟悉,前面提到弹性伸缩,而要做弹性伸缩,数据必须足够离散,所以微众银行在前期接入 TiDB 的时候也发现像在 MySQL 上的 Auto Increment 特性,可能会存在热点问题,还比如像用户的卡号、借据号也可能是一些连续的数字,所以微众银行针对这两块做了一些调整或优化,比如把它改成 Auto Random,然后把一些卡号,根据它的数据分布规律,通过算法提前把这些数据的分布区间算出来,再通过 Split Region 打散功能进行预打散,这样大批量瞬时写入时就可以充分利用每一个节点的性能;另外也对低频修改、高频访问的小表进行了应用内的缓存处理,缓解热点读问题。除了数据需要足够离散,应用同样也要做分布式改造优化,因为应用是分布式的,所以需要一个 App Master 节点来做数据分片的工作,然后把分片任务均匀分摊到每一个 App 上做运算,运行期间还需要监测每个分片任务的状态和进度;最终通过数据和应用的协同优化,达到整体具备的水平扩展能力。\
  • 数据同步与数据校验优化。这就是前面提到微众银行通过 DM 工具把各个业务单元的数据汇总起来,早期使用的 DM 1.0 版本不具备高可用特性,这在金融场景下是比较致命的。而在 DM 2.0 版本上包括高可用性,兼容灰度 DDL,易用性等等几个特性都已稳定上线。此外是数据校验部分,因为是核心批量场景,数据同步必须做到数据不丢、不错,所以应用也内嵌了数据 checksum 的逻辑,比如在 MySQL 入库时先对数据进行分片,然后把各个分片的 checksum 值写到表里面,再通过 DM 同步到下游 TiDB,最后应用在跑批期间从 TiDB 把各个分片加载出来,然后再跑一遍对应分片的 checksum,再对上下游的 checksum 值进行比对,通过这样的校验机制以确保数据的一致性。\
  • 故障演练与兜底预案优化。这个系统之前是基于 MySQL 的批量系统,迁到 TiDB 后有一些故障场景表现可能会出现非预期的现象,所以微众银行做了大量故障演练,第一是模拟各个 TiDB 组件节点异常,确保应用可以兼容,同时当出现批量中断后,应用也支持断点续跑;第二是整批次重跑,由于可能会遇到程序 bug 或者非预期问题导致整个批次需要重跑,为了快速恢复重跑现场,应用开发了快速备份和 rename 闪回的功能; 第三是针对极端场景的演练,比如假设 TiDB 库出现了整体不可用,微众银行结合 Dumpling 和 Lightning 进行了整集群的快速备份和恢复,难点包括 DM 同步的还原点快速确认、大表人工预打散等等,最后验证的结果符合正确性和时效性的要求。因为这个架构涉及到较多数据流转,所以针对故障场景做了大量的演练以及对应预案 SOP 的编写。

未来规划

微众银行从 2018 年开始调研及 POC,2019 年上线了第一个 TiDB 的应用,当前 TiDB 在微众银行的应用领域已覆盖了贷款、同业、科技管理、基础科技等等,当前还有多个核心业务场景在做 POC 测试。针对未来的规划有五个方面:

1.TiDB 的云原生 + 容器化。 可以带来比如自动化运维能力的提升、资源调配的能力等等。

2.基于 Redis + TiKV 的持久化方案。 主要是替换 Redis + MySQL 的兜底方案,借助 TiKV 天然的高可用特性来做持久化的方案。

3.基于 SAS 盘低成本应用。 微众银行在行内有很多归档场景,数据量特别大,因为受监管要求需要保留很长时间,针对这种存储容量要求高但低频访问的场景,TiDB 基于 SAS 盘低成本的方向也会做一些试点。

4.国产化 ARM 平台的 TiDB 应用。 去年微众银行已经有业务上了 TiDB ARM,未来随着国产化的趋势,这块将会被继续加大投入力度。

5.TiFlash 的评估与应用。 TiFlash 提供的是 HTAP 的能力,尤其像实时风控以及轻量 AP 查询的场景会带来很大帮助,这也是微众银行未来的重点规划方向。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

把之前CompletableFuture留下的坑给填上。

发表于 2021-10-11

本文已参与「掘力星计划」赢取创作大礼包,挑战创作激励金。

你好呀,我是歪歪。

填个坑吧,把之前一直欠着的 CompletableFuture 给写了,因为后台已经收到过好几次催更的留言了。

这玩意我在之前写的这篇文章中提到过:《面试官问我知不知道异步编程的Future》

因为是重点写 Future 的,所以 CompletableFuture 只是在最后一小节的时候简单的写了一下:

我就直接把当时的例子拿过来改一下吧,先把代码放在这里了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
csharp复制代码public class MainTest {

    public static void main(String[] args) throws Exception {
        CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了,好了我叫你。");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "化妆完毕了。";
        }).whenComplete((returnStr, exception) -> {
            if (exception == null) {
                System.out.println(Thread.currentThread().getName() + returnStr);
            } else {
                System.out.println(Thread.currentThread().getName() + "女神放你鸽子了。");
                exception.printStackTrace();
            }
        });
        System.out.println(Thread.currentThread().getName() + "-等女神化妆的时候可以干点自己的事情。");
        Thread.currentThread().join();
    }
}

核心需求就是女神化妆的时候,我可以先去干点自己的事情。

上面的程序运行结果是这样的:

符合我们的预期,没有任何毛病。

但是当你自己去编写程序的时候,有可能会遇到这样的情况:

什么情况,女神还在化妆呢,程序就运行完了?

是的,这就是我要说的第一个关于 CompletableFuture 的知识点:守护线程。

守护线程

你仔细观察前面提到的两个截图,对比一下他们的第 28 行,第二个截图少了一行代码:

Thread.currentThread().join();

这行代码是在干啥事呢?

目的就是阻塞主线程,哪怕你让主线程睡眠也行,反正目的就是把主线程阻塞住。

如果没有这行代码,出现的情况就是主线程直接运行完了,程序也就结束了。

你想想,会是什么原因?

这个时候你脑海里面应该啪的一下,很快就想到“守护线程”这个概念。

主线程是用户线程,这个没啥说的。

所有的用户线程执行完成后, JVM 也就退出了。

因此,出现上面问题的原因我有合理的理由猜测:CompletableFuture 里面执行的任务属于守护线程。

有了理论知识的支撑,并推出这个假设之后,就有了证实的方向,问题就很简单了。

啪的一下在这里打上一个断点,然后 Debug 起来,表达式一写就看出来了,确实是守护线程:

我一般是想要看到具体的代码的,就是得看到把这个线程设置为守护线程的那一行代码,我才会死心。

所以我就去找了一下,还是稍微花了点时间,过程就不描述了,直接说结论吧。

首先 CompletableFuture 默认的线程池是 ForkJoinPool,这个是很容易就能在源码里面找到的:

在 ForkJoinPool 里面,把线程都设置为守护线程的地方就在这里:

java.util.concurrent.ForkJoinPool#registerWorker

你若是想要自己调试的话,那么在这里打上断点之后,可以看一下调用栈,很快就摸清楚这个调用流程了:

另外,我在写文章的过程中还注意到了这个注释:

前面大概就是说 shutdown 和 shutdownNow 对于这个线程池来说没用。

如果,线程池里面的任务需要在程序终止前完成,那么应该在退出前调用 commonPool().awaitQuiescence。

所以,我的程序应该改成这样:

可以,不错,很优雅。

如果,你的异步任务非常重要,必须要执行完成,那么 ForkJoinPool 也给你封装好了一个方法:

java.util.concurrent.ForkJoinPool#quiesceCommonPool

另外,其实 CompletableFuture 也是支持传一个自定义线程池的:

比如,我把前面的程序改成下面这样:

加入指定线程池的逻辑,注释掉主线程 join 的代码,跑起来之后。诶,JVM 一直都在。

你说神奇不神奇?

我想这个原因就不用我来分析了吧?

和 Future 对比

CompletableFuture 其实就是 Future 的升级版。

Future 有的,它都有。

Future 的短板,它补上了。

毕竟一个是 JDK 1.5 时代的产物,另一个是 1.8 时代的作品:

中间跨度了整整 10 年,10 年啊!

所以,后来居上。

给大家对比一下 Future 和 CompletableFuture。

首先对于我个人而言,第一个最直观的感受是获取结果的姿势舒服多了。

我不得不又把这张图拿出来说说了,主要关注下面的两种 future 和 callback:

当我们用 Future 去实现异步,要获取异步结果的时候,是怎么样操作的?

是不是得调用 future.get() 方法去取值。

如果这个时候值已经准备就绪,在 future 里面封装好了,那么万事大吉,直接拿出来就可以用。

但是如果值还没有准备好呢?

是不是就阻塞等待了?

所以我常常说 Future 是一种阉割版的异步模式。

比如还是最开始的例子,如果我用 Future 来做,那么是这样的:

你仔细看我框起来的地方,是 main 线程开始获取结果,获取结果的这个动作把 main 线程给阻塞住了。

你就去洗不了头了,老弟。

好,你说你把获取结果的操作放到最后,没问题。

但是,无论你放在哪里,你都有一个 get 的动作,且你执行这个动作的时候,你也不知道值到底准备好了没,所以有可能出现阻塞等待的情况。

好,那么问题来了:如果消除这个阻塞等待呢?

很简单,换个思路,我们从主动问询,变成等待通知。

女神化妆好了之后,主动通知一下我不就好了吗?

用程序员的话说就是:运行结果出来了,你执行一下我留给你的回调函数不就好了吗?

CompletableFuture 就可以干这个事儿。

用 CompletableFuture 写一遍上面的程序就是这样的:

pool-1-thread-1,女神化妆的这个线程,她好了之后会主动叫你,你明白吗?

这就是我第一次接触到 CompletableFuture 后,学到的第一个让我感到舒服的地方。

这种写法你注意,whenComplete(returnStr, exception) 返回信息和异常信息在这里都有了。

除此之外,这个方法还是带返回值的,你也完全可以像是用 Future 那样通过 get 获取其返回值:

按理来说也就是可以用了。

但是如果你不需要返回值,它还提供了这样的写法:

正常情况和异常情况分开处理。

优雅,非常优雅。

还有更牛的。

前面我们化妆的线程和化妆完成的线程不是同一个线程吗:

假设我们需要两个不同的线程,一个只负责化妆,一个只负责通知。毕竟女神化完妆之后,更加女神了,搞两个线程我寻思也不过分。

改动点小到令人发指:

只需要把调用的方法从 whenComplete 改为 whenCompleteAysn 即可。

同样,这个方法也支持指定线程池:

你可以去看 CompletableFuture 里面有非常多的 Aysn 结尾的方法,大多都是干这个事儿的,以异步的形式在线程池中执行。

如果说上面的介绍让你觉得不过如此,那么再介绍一个 Future 没有的东西。

假设现在需求是这样的。

女神化完妆之后,还要花个一小会选衣服,不过分吧。

也就是说我们现在有两个异步任务,第一个是化妆,第二个是选衣服。

选衣服要在化妆完成之后进行,这两个任务是串行的,用 CompletableFuture 怎么实现呢?

我把代码贴一下,为了看起来更加直观,我没有用链式调用:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
csharp复制代码public class MainTest {

    public static void main(String[] args) throws Exception {
        ExecutorService executorService = Executors.newFixedThreadPool(10);
        //任务一
        CompletableFuture<String> makeUpFuture = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "-女神:我开始化妆了。");
            try {
                TimeUnit.SECONDS.sleep(5);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "化妆完毕了。";
        }, executorService);

        //任务二(makeUpFuture是方法调用方,意思是等makeUpFuture执行完成后执行再执行)
        CompletableFuture<String> dressFuture = makeUpFuture.thenApply(makeUp -> {
            System.out.println(Thread.currentThread().getName() + "-女神:" + makeUp + "我开始选衣服啦,好了我叫你。");
            try {
                TimeUnit.SECONDS.sleep(1);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return makeUp + "衣服也选好啦。靓仔,走去玩儿吧。";
        });

        //获取结果
        dressFuture.thenAccept(result -> {
            System.out.println(Thread.currentThread().getName() + "-" + result);
        });
    }
}

这样输出结果就是这样的:

符合我们的预期。

假设我们想要选衣服的时候换另外一个线程怎么办呢?

别说不知道,这不刚才教你了吗,Async 结尾的方法,得活学活用起来:

前面讲的是多个异步任务串行执行,接下来再说一下并行。

CompletableFuture 里面提供了两个并行的方法:

两个方法的入参都是可变参数,就是一个个异步任务。

allOf 顾名思义就是入参的多个 CompletableFuture 都必须成功,才能继续执行。

而 anyOf 就是入参的多个 CompletableFuture 只要有一个成功就行。

还是举个例子。

假设,我是说假设啊,我是一个海王。

算了,我假设我有一个朋友吧。

他同时追求好几个女朋友。今天他打算约小美和小乖中的一个出门玩,随便哪个都行。谁先化妆完成,就约谁。另外一个就放她鸽子。

这个场景,我们就可以用 anyOf 来模拟,于是就出现了这样的代码:

从输出结果来看,最后和朋友约会的是小美。

都把小美约出来了,必须要一起吃个饭才行,对吧。

那么这个时候朋友问:小美,你想吃点什么呢?

小美肯定会回答:随便,就行,无所谓。

听到这样的回答,朋友心里就有底了,马上给出了一个方案:我们去吃沙县小吃或者黄焖鸡吧,哪一家店等的时间短,我们就去吃哪一家。

于是上面的代码,就变成了这样:

输出结果是这样的:

我把代码都放这里,你粘过去就能跑起来:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
ini复制代码public class MainTest {

    public static void main(String[] args) {
        ExecutorService executorService = Executors.newFixedThreadPool(10);

        CompletableFuture<String> xiaoMei = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "-小美:我开始化妆了,好了我叫你。");
            try {
                int time = ThreadLocalRandom.current().nextInt(5);
                TimeUnit.SECONDS.sleep(time);
                System.out.println(Thread.currentThread().getName() + "-小美,化妆耗时:" + time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "小美:化妆完毕了。";
        }, executorService);

        CompletableFuture<String> xiaoGuai = CompletableFuture.supplyAsync(() -> {
            System.out.println(Thread.currentThread().getName() + "-小乖:我开始化妆了,好了我叫你。");
            try {
                int time = ThreadLocalRandom.current().nextInt(5);
                TimeUnit.SECONDS.sleep(time);
                System.out.println(Thread.currentThread().getName() + "-小乖,化妆耗时:" + time);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return "小乖:化妆完毕了。";
        }, executorService);

        CompletableFuture<Object> girl = CompletableFuture.anyOf(xiaoMei, xiaoGuai);
        girl.thenAccept(result -> {
            System.out.println("我看最后是谁先画完呢 = " + result);
        });

        CompletableFuture<String> eatChooseOne = girl.thenApplyAsync((result) -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result + "这里人少,我们去吃沙县小吃吧!";
        }, executorService);

        CompletableFuture<String> eatChooseTwo = girl.thenApplyAsync((result) -> {
            try {
                TimeUnit.SECONDS.sleep(ThreadLocalRandom.current().nextInt(10));
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return result + "这里人少,我们去吃黄焖鸡吧!";
        }, executorService);

        CompletableFuture.allOf(eatChooseOne, eatChooseTwo).thenAccept(result -> {
            System.out.println("最终结果:" + result);
        });
    }
}

如果你说,小孩子才做选择,大人是全部都要。

那么,你可以试着用一下 allOf,只是需要注意的是,allOf 是不带返回值的。

好了,写到这里我都感觉有点像是 API 教学了,没啥劲。所以 CompletableFuture 还有很多很多的方法,我就不一一介绍了。

再说说 get 方法

最后,再看看 get 方法吧。之前发布的《看完JDK并发包源码的这个性能问题,我惊了!》这篇文章,有朋友看了之后有几个问题,我再串起来讲一下。

CompletableFuture 提交任务的方式有两种:

一种是 supplyAsync 带返回值的。

一种是 runAsync 返回值是 void 的,相当于没有返回值。

比如,我们用 supplyAsync 的时候:

就刻意返回一个 null。

我还可以扩展一下,假设我们的方法用的是 runAsync,本来就没有返回值的。

比如这样:

我们再看一下 get 方法:

你看这里的判断条件是 (r = result) == null。

那么问题就来了,假设这个方法的返回值本来就是 null,也就是我们上面的情况,怎么办呢?

为 null 就有三种情况了:

  • 1.是 runAsync 这种,真的没有返回值,所以就算任务执行完成了,get 出来的确实就是 null。
  • 2.是有返回值的,只是目前任务还没执行完成,所以 result 还是 null。
  • 3.是有返回值的,返回的值就是 null。

怎么去分别出这三种情况呢?

那么就要看看这个 result 赋值的地方了,用脚指头猜也知道在这里搞了一些事情。

所以简单的找寻一番之后,可以找到这个关键的地方:

框起来的代码,目的是为了获取 CompletableFuture 类中的 result 字段的偏移量,并用大写的 RESULT 存储起来。

有经验的朋友看到这里大概就知道它要用 compareAndSwapObject 这个骚操作了:

然后就能找到这几个和 null 相关的地方:

答案就是我框起来的部分:在 CompletableFuture 里面,把 null 也封装到 AltResult 对象里面了。

基于此,可以区分出前面我说的那三种情况。

你看这里有一个专门的 completeNull 方法,其中的调用者就有 AysncRun 方法:

你可以在其调用的地方打上断点,然后把我前面用 runAsync 提交方式的代码跑起来:

再去看看调用栈,调试一下,你就知道 runAsync 这种,真的没有返回值的是怎么处理的了。

核心技术就是把 null 封装到 AltResult 对象里面。

然后如何分別返回值就是 null 的情况呢?

都有一个代表 null 的对象了,那还不简单吗,一个小小的判断就搞定了:

最后,再提一下这个方法:

java.util.concurrent.CompletableFuture#waitingGet

我之前那篇文章里面写了这样一句话:

加入这个自旋,是为了稍晚一点执行后续逻辑中的 park 代码,这个稍重一点的操作。但是我觉得这个 “brief spin-wait” 的收益其实是微乎其微的。

有小伙伴问我 park 的逻辑在哪?

其实就在 waitingGet 的 while 循环的最后一个分支里面,也就是我框起来的部分:

最后你顺着往下 Debug ,就能找到这个地方:

java.util.concurrent.CompletableFuture.Signaller#block

这里不就是 park 的逻辑吗:

打上断点自己玩去吧。

其实还有一种骚操作,我一般不告诉别人,也简单的分享一下吧。

还是拿前面的代码做演示,这个代码你跑起来之后,主线程由于调用了 get 方法,那么势必会阻塞等待异步任务的结果:

你就把它给跑起来,然后点一下这个照相机的图标:

就可以看到这样的画面:

主线程是 park 起来的,在哪被 park 起来的呢?

at java.util.concurrent.CompletableFuture$Signaller.block(CompletableFuture.java:1707)

这不就是我刚刚给你说的方法吗?

然后你在这里打上断点,看一下调用堆栈,不就把主链路玩得明明白白的嘛:

怎么样,这波逆向操作,溜不溜,分分钟就学会了。

找到了 park 的地方,那么在哪儿被 unpark 的呢?

这还不简单吗?

反正我一搜就搜出来了:

然后再在 unpark 这里打上一个断点:

唤醒流程也可以调试的明明白白。

好了,挂起和唤醒都给你定位到关键地方了,就到这,玩去吧。

本文已收录自个人博客,欢迎大家来玩。

www.whywhy.vip/

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一篇万字博文带你入坑爬虫这条不归路 【万字图文&&建议收藏】

发表于 2021-10-11

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

👻最近,很多粉丝私信我问——爬虫到底是什么?学习爬虫到底该从何下手?👻

😬其实,我想说的也是曾经的我身为小白的时候某些大牛对我说过的——很多时候我们都有一颗想要学习新知识的心,却总是畏惧于对想要学习内容的无知,这也是多数人失败甚至后悔终身的:因为他们从来没有开始过!😬

😜借一位几年前带我入坑的前辈的话——坑就在你面前,别总是犹豫徘徊,大胆一点:向前一步,入了这个坑,莽着头就是往前冲,别多想,别回头,终有一天——>你也会成为别人的前辈!😜

在这里插入图片描述


今日份鸡汤已成功送达,目的地:your heart! 💗💗💗

回归正题~
博主写本文的初衷也是想借助本文达到的一个效果:

带领那些想要学习爬虫却一直迟迟不敢下手,或者说那些对爬虫有兴趣想要好好学一学这门技术的童鞋们——正式入坑!!!

💩<-🐷首先,我来回答第一个问题——什么是爬虫?🐷->💩

👉其实你百度也可以百度到一大篇官方化的定义,但是那些对新人不友好,爬虫!总结一句话:就是**模拟浏览器发送请求,获取响应!**👈
在这里插入图片描述

🎈至于第二个问题:学习爬虫到底该从何下手?等你们认认真真看完本篇博文之后再考虑这个问题你们还需不需要我来解答。🎈

在这里插入图片描述

1.爬虫的概念

(1)爬虫的概念(专业化定义):

网络爬虫也叫网络蜘蛛,它特指一类自动批量下载网络资源的程序,这是一个比较口语化的定义。
更加专业和全面对的定义是:网络爬虫是伪装成客户端与服务端进行数据交互的程序。
在这里插入图片描述

(2)爬虫的应用:

  1. 数据采集
      大数据时代来临,数据就是核心,数据就是生产力,越来越多的企业开始注重收集用户数据,而爬虫技术是收集数据的一种重要手段。
1
2
3
powershell复制代码比如:抓取微博评论(机器学习舆情监控)
抓取招聘网站的招聘信息(数据分析,挖掘)
百度新闻网站
  1. 搜索引擎
     百度,谷歌等搜索引擎都是基于爬虫技术。(PS:爬虫界大佬!)
1
2
powershell复制代码知识补给站:
知名的某头条就是靠爬虫发家致富的哦!!!
  1. 模拟操作
     爬虫也被广泛用于模拟用户操作,测试机器人,灌水机器人等。
  2. 软件测试
     爬虫之自动化测试
     虫师
  3. 网络安全
     短信轰炸
     web漏洞扫描
    在这里插入图片描述

(3)爬虫的分类:

🎈 根据不同的标准,爬虫分类也有所不同,常见的三大分类标准及其分类如下: 🎈

第一个:根据爬取的数量不同进行分类:
 ①通用爬虫:通常指搜索引擎的爬虫。
   通用爬虫是搜索引擎抓取系统 (baidu,goole,yahoo等)的重要组成部分 。主要目的是将互联网的网页下载到本地 ,形成一个互联网内容的镜像备份。(但是有一个很大的问题就是它们具有很大的局限性:大部分内容没有用——不同的搜索目的,返回的内容相同!)

 ②聚焦爬虫:针对特定网站的爬虫。
   是面向特定主题需求的一种网络爬虫程序 ,它与通用搜索引擎爬虫的区别在于 :
聚焦爬虫在实施页面抓取时会对内容进行处理筛选,尽量保证只抓取与需求相关的网页信息!


第二个:根据是否获取数据为目的进行分类:
 ①功能性爬虫:
  比如,投票,点赞…
 ②数据增量爬虫:
  比如招聘信息…


第三个:根据url地址和对应的页面内容是否改变,数据增量爬虫又可分为:
 ①基于url地址变化,内容也随之变化的数据增量爬虫;
 ②url地址不变,内容变化的数据增量爬虫。

在这里插入图片描述

(4)爬虫的一般开发流程:

①最简单的单一页面数据的爬取:
  url——>发送请求,获取响应——>提取数据——>保存数据
②多页面数据的爬取:
  发送请求,获取响应——>提取url地址,继续请求

聚焦爬虫开发流程

搜索引擎流程

一般爬虫开发流程

(5)爬虫开发的重难点:

爬虫难点主要分为两个方向:

  1. 数据的获取(PS:自己人何苦为难自己人嘞!)
      网络公共资源都是为用户准备的,为了避免被爬虫采集,服务端会设置非常多的图灵测试,阻止爬虫的恶意爬取,也即是反爬措施。爬虫开发工程师在开发爬虫时,需要解决这些反爬措施。我们在开发爬虫的过程中,有很大一部分的工作就是处理这些反爬措施。
  2. 采集的速度
       大数据时代,需要巨大的数据量,动辄千万条的级别,甚至上亿条。如果采集速度跟不上,耗时过长,那么就达不到商业要求。一般我们会采取并发以及分布式来解决速度上的问题。这也是爬虫开发过程中的另外一个重心。
知识点补给站: robots协议:网站通过robots协议,告诉我们搜索引擎哪些页面可以抓取,哪些页面不能抓取, 但它仅仅是道德层面上的约束。

在这里插入图片描述

2.HTTP和HTTPS

在这里插入图片描述

1
2
3
4
5
6
7
8
9
powershell复制代码知识点补给站:
大多数商业应用采用的架构:
1.c/s 即 client(客户端) server(服务端)

2.b/s 即 browser(浏览器) server(服务端)

3.m/s 即 moblie(移动端) server(服务端)

以上统称为客户端与服务端!!!

在这里插入图片描述
网络爬虫是伪装成客户端与服务端进行数据交互的程序。那么,客户端和服务端该怎样进行数据交互呢?就像我们中国人用中文交流,说的中国的语法,我们可以正常沟通。客户端与服务端如果不统一一下,那不就乱套了,所以在网络传输方面产生了众多协议,HTTP就是其中一种。

(1)HTTP协议

  目前互连网上90%的网络传输都是基于http协议(补充:http协议是一个应用层协议)。(注意:爬取想要的数据前,一定要明确其使用的是什么协议!虽然90%都是基于http协议,但是仍有10%采用的是其他的协议,比如:弹幕可能采取的是websocket协议!这样的话,我们采取传统的爬虫就无法爬取到了。)

HTTP是Hyper Text Transfer Protocol(超文本传输协议)的缩写,是用于从万维网(WWW:World Wide Web )服务器传输超文本到本地浏览器的传送协议。

  HTTP是基于TCP/IP通信协议来传递数据的(HTML 文件, 图片文件, 查询结果等)。
注意:TCP/IP有个面向连接的特性!(意义:保证数据的完整性)
  让咱们生动的了解一下TCP/IP通信协议中的三次握手四次挥手:
在这里插入图片描述

  • 三次握手建立连接:
    🔑客户端说:嘿,服务端girl!我想和你建立连接。(打招呼)
    🔒服务端说:好的呢,我听你的。
    🔑客户端说:真好,那咱们开始数据交互吧(羞羞)。
    .
    .(干羞羞的事ing,进行数据交互)
    .
  • 四次挥手断开连接:
    🔑客户端说:我已经和你交互完数据了,咱断开连接吧!(打招呼)
    🔒服务端说:你确定断开连接嘛?(不舍)
    🔒服务端又说:那你断开连接吧!
    🔑客户端说:好的,那我断开连接了!

(2)HTTP请求流程:

  我们日常用浏览器搜索东西,输入的是URL,浏览器会将其自动转换为HTTP协议。

  一次http请求的基本流程是,有客户端向服务端发起一次请求(request), 而服务器在接收到以后返回给客户端一个响应(response)。所以一次完整的http请求包含请求和响应两部分。

浏览器发送http请求的过程:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
powershell复制代码1.域名解析 --> 

2.发起TCP的3次握手 -->

3.建立TCP连接后发起http请求 -->

4.服务器响应http请求,浏览器得到html代码 -->

5.浏览器解析html代码,并请求html代码中的资源(如js、css、图片等) -->

6.浏览器对页面进行渲染呈现给用户.


知识点补给站:
在网页的右键检查里Network->Name->Request Headers view parsed下
的Connection:keep-alive保持常连接,就不用频繁的三次握手和四次挥手!

在这里插入图片描述

浏览器获取的内容(elements的内容)包含:url地址对应的响应+js+css+pictures 爬虫会获取:url地址对应的响应 爬虫获取的内容和elements的内容不一样,进行数据提取的时候,需要根据url地址对应的响应为准!

(3)URL(浏览器搜索框里的内容!)

 发送http请求时,通过url对网络资源进行定位。

 URL(Uniform Resource Locator),中文叫统一资源定位符。是用来标识某一处资源的地址。也即是我们常说的网址。以下面这个URL为例,介绍下普通URL的各部分组成:
协议+域名(端口默认80)+路径+参数

注意:
 1.http协议的端口号默认为80可以不写;https协议的端口号默认为443可以不写(注意:域名可以确定是哪一台电脑;而端口号是为了确定是那台电脑的哪一个应用!)
 2.域名通常是IP地址的映射,端口号通常是默认的就不写。我们平常搜索时,比如进入百度:www.baidu.com/,这里的https协议…

在这里插入图片描述

(4)HTTP请求格式

  客户端(也就是我们用户)发送一个HTTP请求到服务器的请求消息包括以下部分:请求行,请求头,空行和请求数据。

  一般格式:
在这里插入图片描述
注意:上图中请求行的URL是指(2)URL中的路径!
在这里插入图片描述

1.请求方法:

(1)分类

 根据http标准,http请求可以使用多种请求方法。

 五种请求方法:OPTIONS,PUT,DELETE,TRACE和CONNECT方法。

(2)分类讲解

常用方法 是 GET和POST。

  • GET

1.主要是负责从服务器获取数据
2.URL中添加请求参数,显示在地址栏
3.请求字符串限制 1024个字节
比POST更加高效和方便。

  • POST

1.主要负责向服务器提交数据
2.没有大小限制(但一般是2M)
比’GET’传递数据量大,安全性高。

(3)

在这里插入图片描述

2.请求头:

在这里插入图片描述)在这里插入图片描述)在这里插入图片描述

3.HTTP请求正文(请求数据)

 请求正文通常是使用POST方法进行发送的数据,GET方法是没有请求正文的。

 请求正文跟上面的消息报头由一个空行隔开。

来个承上启下!既然现在请求格式已经OK了,也就是说我们可以让服务端听懂我们说的话了;下面要做的就是让我们能听懂服务端给我们说的话了。

(6)HTTP响应格式

 HTTP响应也由四个部分组成,分别是:状态行(响应行)、消息报头、空行和响应正文。

在这里插入图片描述 一般格式:
在这里插入图片描述

1.HTTP响应状态码:(这里面就有熟悉的404哦!)

 当客户端向服务端发起一次请求后,服务端在返回的响应头中会包含一个HTTP状态码(我们在进行爬虫实战的时候可以通过判断此状态码得知目前的爬虫代码是否OK!)。

 HTTP的状态码是由三位数字来表示的,由第一位数字来表示状态码的类型,一般来说有五种类型:

在这里插入图片描述)注意:重定向就相当于一个中介转接。(所有HTTP响应状态码详解点我查看!)

2.HTTP响应报头:

在这里插入图片描述


(7)总结:

1.HTTP流程总结:

在这里插入图片描述

2.HTTP协议的特点:

HTTP三点注意事项:

  • HTTP是无连接的:无连接的含义是限制每次连接只处理一个请求。服务器处理完客户的请求,并收到客户的应答后,即断开连接。采用这种方式可以节省传输时间。
  • HTTP是媒体独立的:这意味着,只要客户端和服务器知道如何处理的数据内容,任何类型的数据都可以通过HTTP发送。
  • HTTP是无状态的:HTTP协议是无状态协议。无状态是指协议对于事务处理没有记忆能力。缺少状态意味着如果后续处理需要前面的信息,则它必须重传,这样可能导致每次连接传送的数据量增大。另一方面,在服务器不需要先前信息时它的应答就较快。
1
2
3
4
5
6
7
powershell复制代码无状态HTTP官方详解:
HTTP的无状态是指HTTP协议对事务处理是没有记忆能力的,也就是说服务器不知道客户端是什么状态。当我
们向服务器发送请求后,服务器解析此请求,然后返回对应的响应,服务器负责完成这个过程,而且这个过程是完全
独立的,服务器不会记录前后状态的变化,也就是缺少状态记录。这意味着如果后续需要处理前面的信息,则必须重
传,这导致需要额外传递一些前面的重复请求,才能获取后续响应,然而这种效果显然不是我们想要的。为了保持前
后状态,我们肯定不能将前面的请求全部重传一次,这太浪费资源了,对于这种需要用户登录的页面来说,更是棘手。
这时两个用于保持HTTP连接状态的技术就出现了,它们分别是会话和Cookies。下面会介绍到哦!
注意: 无状态的意思是,比如你再一个网页中输入了账号密码登录了QQ空间,但是由于HTTP是无状态的,所以你再在QQ空间里登录QQ邮箱需要再输入一次账号和密码,登录的状态是没有被记忆的。但是可以利用会话技术解决。

3.HTTPS协议:

 加强版的HTTP,公鸡中的战斗机一枚!!!
 HTTPS(全称:Hyper Text Transfer Protocol over Secure Socket Layer 或 Hypertext Transfer Protocol Secure,超文本传输安全协议),是以安全为目标的HTTP通道,简单讲是HTTP的安全版!

 http协议是基于tcp/ip协议的,而https是在http协议的基础之上,再加了一层SSL/TLS协议,数据在传输过程中是加密的。

 注意:HTTPS协议的默认端口是443。

1
2
powershell复制代码http因为是明文传输,而https是密文传输,所以HTTPS比http更安全,
但是性能低,因为解密需要消耗时间!

3.解决http无状态 之 会话技术

 http是无状态的,那服务端怎么区分同一个用户的连续请求呢,这就用到了会话技术:cookie和session。

(1)Cookie

Cookie有时也用其复数形式 Cookies。
  指某些网站为了辨别用户身份、进行 session 跟踪而储存在用户本地终端上的数据(通常经过加密)。最新的规范是 RFC6265 。

Cookie可以理解为一个凭证

  • 1.实际是由服务器发给客户端的特殊信息,
  • 2.这些信息以文本文件的方式存放在客户端,
  • 3.客户端每次向服务器发送请求的时候都会带上这些特殊的信息。
  • 4.服务器在接收到Cookie以后,会验证Cookie的信息,以此来辨别用户的身份。

爬虫中为什么要使用cookie?

  1. 带上cookie的好处:
    ①能够访问登录页面。
    ②正常的浏览器在请求服务器的时候肯定会带上cookie(第一次请求除外),所以对方服务器有可能会通过是否携带cookie来判断我们是否是一个爬虫,对应的能够起到一定的反爬作用。
  2. 带上cookie的坏处:
    ①一套cookie往往对应的是一个用户的信息,请求太频繁有更大的可能性被对方识别为爬虫。
    ②一般使用多账号解决。

(2)Session

 Session,中文经常翻译为会话, 其本来的含义是指有始有终的一系列动作/消息,比如打电话时从拿起电话拨号到挂断电话这中间的一系列过程可以称之为一个session。这个词在各个领域都有在使用。

 而我们web&爬虫领域,一般使用的是其本义,一个浏览器窗口从打开到关闭这个期间。

  Session的目的则是,在一个客户从打开浏览器到关闭浏览器这个期间内,发起的所有请求都可以被识别为同一个用户。 而实现的方式则是,在一个客户打开浏览器开始访问网站的时候,会生成一个cookie,SessionID(注意:SessionID包含于cookie中),这个ID每次的访问都会带上,而服务器会识别这个SessionID并且将与这个SessionID有关的数据保存在服务器上。由此来实现客户端的状态识别。因此session是基于cookie的!

 Session与Cookie相反,Session是存储在服务器上的数据,只由客户端传上来的SessionId来进行判定,所以相对于Cookie,Session的安全性更高。

 一般SessionID会在浏览器被关闭时丢弃,或者服务器会验证Session的活跃程度,例如30分钟某一个SessionID都没有活跃,那么也会被识别为失效。

 session的作用——用来实现客户端和服务的的会话保持!
   会话(状态)保持:①保存cookie;② 实现和服务端的长连接。

(3)cookie和session的区别:

  1. cookie数据存放在客户的浏览器上,session数据放在服务器上;
  2. cookie不是很安全,别人可以分析存放在本地的cookie并进行cookie欺骗;
  3. session会在一定时间内保存在服务器上,当访问增多,会比较占用服务器的性能;
  4. 单个cookie保存的数据不能超过4k,很多浏览器都限制一个站点最多保存20个cookie。

(4)来个图理解理解那么枯燥的文字:

用户首次登录时:
 会在服务器生成一个session表,里面的key是hash生成的数据,value是一系列信息。
同时在客户端本地生成一个文本文件cookie,这里面包含sessionid,而这个sessionid的值为服务器中的hash形式的key。
在这里插入图片描述
用户再次登录时:
 会自动携带sessionid及其值,这个值与服务器里的hash形式的key比较,判断用户是否曾登录成功,如果成功,则获取用户登录的数据,然后返回给用户请求的界面。
在这里插入图片描述

(5)实操一波看看Cookies的属性结构:

(以QQ空间为例!)

  F12打开浏览器开发者工具,然后按如图步骤即为Cookies:(可以看到有很多条目,其中每个条目可以称为Cookie。)
在这里插入图片描述

属性名 属性值讲解
Name 该Cookie的名称。一旦创建,无法修改!
Value 该Cookie的值。如果值为Unicode字符,需要为字符编码;如果值为二进制数据,则需要使用BASE64编码。
Domain 可以访问该Cookie的域名。
Max Age 该Cookie失效的时间,单位为S,通常和Expires一起使用,通过它可以计算出其有效时间。Max Age如果为正数,则该Cookie在Max Age秒后失效;如果为负数,则关闭浏览器时失效,浏览器也不会以任何形式保存该Cookie。
Path 该Cookie的使用路径。如果设置为/path/,则只有路径为/path/的页面可以访问该Cookie;如果设置为/,则本域名下的所有页面都可以访问该Cookie。
Size 此Cookie的大小。
HTTP字段 Cookie的httponly属性。若此属性为true,则只有在HTTP头中会带有此Cookie 的信息,而不能通过document.cookie来访问此Cookie。
Secure 该Cookie是否仅被使用安全协议传输。安全协议有HTTPS和SSL等,在网络上传输数据之前先将数据加密。默认为false。

4.爬虫实战:利用socket下载一张图片

(1)socket学习

socket国外翻译为插座;同时,由于其具备了“套接”和“字”的概念,所以又称为套接字。
在这里插入图片描述

1
2
3
4
5
powershell复制代码知识补给站:(混个眼熟就行了!)
Socket是一种进程间通信机制,提供一种供应用程序
访问通信协议的操作系统调用,使得网络读写数据
和读写本地文件一样容易;Socket是一序列的“指令” ;
已经具备了“套接”(建立网络通讯或进程间通讯)和“字”(可交互的有序指令串)的概念。

在这里插入图片描述

①使用socket简单建造一个服务端:(点我观看另一篇进阶版搭建的TCP服务器端文章)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
python复制代码import socket
# 服务器对象
server = socket.socket()
'''
等同于:server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
socket.AF_INET:使用IPV4;
socket.SOCK_STREAM:创建一个socket套接字。
'''

# 1.绑定服务器
server.bind(("0.0.0.0",8800)) #0.0.0.0是允许所有人来访问;8800是端口号

# 2.监听
server.listen(5)

while True:
# 3.等待连接
# accept是一个阻塞的方法(你不来我就不动!),等待连接,每建立一个连接就会创建一个单独的通道。
# conn:通道参数;addr:通道地址。
conn,addr=server.accept()

# 4.接收数据
data=conn.recv(1024)
print(data)

response="HTTP/1.1 200 OK\r\nContent-Type: text/html;charset=utf-8;\r\n\r\n<h1 style='color:black'>我很帅!<h1>"

# 5.发送数据
conn.send(response.encode())
print("已经响应")

# 6.关闭
server.close()

在本地浏览器中输入:127.0.0.1:8800即可访问到此服务端:
在这里插入图片描述

②使用socket简单建造一个客户端:(爬取百度首页整个界面)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
python复制代码import socket

# 建立服务器对象 通过打印这个client服务器对象可知:默认使用的是IPV4,协议是TCP。
client=socket.socket()

# 1.建立连接
client.connect(("www.baidu.com",80))

# 构造请求报文
data=b"GET / HTTP/1.1\r\nHost: www.baidu.com\r\n\r\n"

# 2.发送请求
client.send(data)
res=b""

# 3.接收数据
temp=client.recv(4096)
while temp:
print("*"*50)
res += temp
temp = client.recv(4096)
print(temp.decode())

# 4.断开连接
client.close()

(2)实战:使用socket来爬取一张漂亮MM的图片:

据说搜狗是没有设置反爬的,刚入门的话就挑软柿子捏,所以我们就来爬爬它。

1.首先,分析网页:

在这里插入图片描述

 而我们要爬取的图片的URL就在头信息里的Request URL中。CV大法即可!

2.上代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
python复制代码#搜狗图片 下载一张
import socket
import re
#搜狗图片
img_url="https://i02piccdn.sogoucdn.com/a3ffebbb779e0baf"

''' 拓展:如何使用HTTPS请求
#HTTPS请求
import ssl
client = ssl.wrap_socket(socket.socket()) #ssl.wrap_socket 一个装饰器
client.connect(('i02piccdn.sogoucdn.com',443))
'''

client = socket.socket()
# 创建连接 注意上面我们爬的是https协议的url,但是我们使用http也行的原因是自动进行了重定向
client.connect(("i02piccdn.sogoucdn.com",80)) #连接服务器,ip地址的映射可以定位到它的服务器

# 构造请求报文
data = "GET /a3ffebbb779e0baf HTTP/1.1\r\nHost:i02piccdn.sogoucdn.com\r\n\r\n"

# 发送数据
client.send(data.encode()) #报文要以字节码的形式

# 接收数据
first_data = client.recv(1024)
print("first_data",first_data)

length = int(re.findall(b"Content-Length: (.*?)\r\n",first_data)[0]) #在列表里,所以加0; 响应的也是字节码形式,所以加b
print(length) #内容长度

# 写这句的原因是在双\r\n后面可能有数据,也可能没有,如果有就直接拿到了
# .*是匹配除了\r\n换行符之外的,后面加个re.S,则也可以匹配\r\n换行符,变成无敌的了!
image_data = re.findall(b"From Inner Cluster \r\n\r\n(.*?)",first_data,re.S)
if image_data:
image_data = image_data[0]
else:
image_data = b""

# 拼接拿到相应长度的数据
while True:
temp = client.recv(1024)
image_data += temp
if len(image_data)>=length:
break
# 4.断开连接
client.close()

# 写入文件
with open("girl.jpg","wb") as f:
f.write(image_data)
3.实现效果:

在这里插入图片描述控制台输出为: first_data b'HTTP/1.1 200 OK\r\nServer: nginx\r\nDate: Thu, 08 Jul 2021 17:04:43 GMT\r\nExpires: Fri, 08 Jul 2022 17:04:43 GMT\r\nX-NWS-UUID-VERIFY: 1266ff4f6f6197f273f603ca87522cc9\r\nExpiration-Time: Sun, 26 Dec 2021 13:11:13 GMT\r\nX-Daa-Tunnel: hop_count=3\r\nAccept-Ranges: bytes\r\nX-Cache-Lookup: Cache Miss\r\nLast-Modified: Sun, 27 Jun 2021 01:11:13 GMT\r\nCache-Control: max-age=31536000\r\nContent-Length: 19594\r\nX-NWS-LOG-UUID: 14051802991302897940\r\nConnection: keep-alive\r\nX-Cache-Lookup: Hit From Inner Cluster\r\n\r\n' 19594

5.In the end!

在这里插入图片描述

从现在做起,坚持下去,一天进步一小点,不久的将来,你会感谢曾经努力的你!

 本博主会持续更新爬虫基础分栏及爬虫实战分栏,认真仔细看完本文的小伙伴们,可以点赞收藏并评论出你们的读后感。并可关注本博主,在今后的日子里阅读更多爬虫文!

1
2
powershell复制代码	如有错误或者言语不恰当的地方可在评论区指出,谢谢!
如转载此文请联系我说明用以意并标注出处及本博主名,谢谢!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

为什么阿里巴巴强制要求使用包装类型定义属性?

发表于 2021-10-11

在阿里巴巴Java开发手册中,对于POJO中如何选择变量的类型也有着一些规定:


这里强制要求使用包装类型,原因是什么呢?

我们来看一段简单的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
csharp复制代码 /**
* @author Hollis
*/
public class BooleanMainTest {
public static void main(String[] args) {
Model model1 = new Model();
System.out.println("default model : " + model1);
}
}

class Model {
/**
* 定一个Boolean类型的success成员变量
*/
private Boolean success;
/**
* 定一个boolean类型的failure成员变量
*/
private boolean failure;

/**
* 覆盖toString方法,使用Java 8 的StringJoiner
*/
@Override
public String toString() {
return new StringJoiner(", ", Model.class.getSimpleName() + "[", "]")
.add("success=" + success)
.add("failure=" + failure)
.toString();
}
}

以上代码输出结果为:

1
csharp复制代码default model : Model[success=null, failure=false]

可以看到,当我们没有设置Model对象的字段的值的时候,Boolean类型的变量会设置默认值为null,而boolean类型的变量会设置默认值为false。

即对象的默认值是null,boolean基本数据类型的默认值是false。

也就是说,包装类型的默认值都是null,而基本数据类型的默认值是一个固定值,如boolean是false,byte、short、int、long是0,float是0.0f等;

我们再举一个扣费的例子,我们做一个扣费系统,扣费时需要从外部的定价系统中读取一个费率的值,我们预期该接口的返回值中会包含一个浮点型的费率字段。当我们取到这个值得时候就使用公式:金额*费率=费用 进行计算,计算结果进行划扣。

如果由于计费系统异常,他可能会返回个默认值,如果这个字段是Double类型的话,该默认值为null,如果该字段是double类型的话,该默认值为0.0。

如果扣费系统对于该费率返回值没做特殊处理的话,拿到null值进行计算会直接报错,阻断程序。拿到0.0可能就直接进行计算,得出接口为0后进行扣费了。这种异常情况就无法被感知。

有人说,那我可以对0.0做特殊判断,如果是0一样可以阻断报错啊。但是,这时候就会产生一个问题,如果允许费率是0的场景又怎么处理呢?

所以,使用基本数据类型只会让方案越来越复杂,坑越来越多。

这种使用包装类型定义变量的方式,通过异常来阻断程序,进而可以被识别到这种线上问题。如果使用基本数据类型的话,系统可能不会报错,进而认为无异常。

以上,就是建议在POJO和RPC的返回值中使用包装类型的原因。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot+Mybatis配置多源数据库

发表于 2021-10-11

方法一

  1. 配置多个数据库
1
2
3
4
5
6
7
8
9
10
11
12
13
yml复制代码spring:
datasource:
master:
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3305/se?useSSL=false&characterEncoding=utf8&serverTimezone=GMT%2B8
username: root
password: root

movies:
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3307/se_movies?useSSL=false&characterEncoding=utf8&serverTimezone=GMT%2B8
username: root
password: root
  1. 配置数据库连接
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码package com.my.equipment.config.oldConfig;

import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = "com.my.equipment.web.seMoviesDao", sqlSessionTemplateRef = "seMoviesSqlSessionTemplate")
public class SeMoviesDatasourceConfig {

@Bean(name = "seMoviesDataSource")
@ConfigurationProperties(prefix = "spring.datasource.movies")
public DataSource testDataSource() {
return DataSourceBuilder.create().build();
}

@Bean(name = "seMoviesSqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("seMoviesDataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
return bean.getObject();
}

@Bean(name = "seMoviesTransactionManager")
public DataSourceTransactionManager testTransactionManager(@Qualifier("seMoviesDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean(name = "seMoviesSqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("seMoviesSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
java复制代码package com.my.equipment.config.oldConfig;


import org.apache.ibatis.session.SqlSessionFactory;

import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;

import javax.sql.DataSource;

@Configuration
@MapperScan(basePackages = "com.my.equipment.web.dao",sqlSessionTemplateRef = "seSqlSessionTemplate")
public class SeDatasourceConfig {


@Bean(name = "seDataSource")
@ConfigurationProperties(prefix = "spring.datasource.master")
@Primary
public DataSource testDataSource() {
return DataSourceBuilder.create().build();
}

@Bean(name = "seSqlSessionFactory")
@Primary
public SqlSessionFactory testSqlSessionFactory(@Qualifier("seDataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
bean.setMapperLocations(new PathMatchingResourcePatternResolver().getResources("classpath:mapper/*.xml"));
return bean.getObject();
}

@Bean(name = "seTransactionManager")
@Primary
public DataSourceTransactionManager testTransactionManager(@Qualifier("seDataSource") DataSource dataSource) {
return new DataSourceTransactionManager(dataSource);
}

@Bean(name = "seSqlSessionTemplate")
@Primary
public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("seSqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}

}

方法2

1.配置多个数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
yml复制代码spring:
datasource:
master:
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3305/se?useSSL=false&characterEncoding=utf8&serverTimezone=GMT%2B8
username: root
password: root
slave:
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3310/se?useSSL=false&characterEncoding=utf8&serverTimezone=GMT%2B8
username: root
password: root
movies:
driver-class-name: com.mysql.jdbc.Driver
jdbc-url: jdbc:mysql://localhost:3307/se_movies?useSSL=false&characterEncoding=utf8&serverTimezone=GMT%2B8
username: root
password: root

2.配置数据库连接

定义多元数据库

1
2
3
4
5
6
java复制代码package com.my.equipment.utils;

public enum DBTypeEnum {

MASTER,SLAVE,MOVIES;
}

定义数据源切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
csharp复制代码package com.my.equipment.utils;

public class DBContextHolder {


private static final ThreadLocal<DBTypeEnum> contextHolder=new ThreadLocal<>();

public static void set(DBTypeEnum dbTypeEnum){
contextHolder.set(dbTypeEnum);
}

public static DBTypeEnum get(){
return contextHolder.get();
}

public static void master(){
set(DBTypeEnum.MASTER);
System.out.println("写");
}

public static void slave(){
set(DBTypeEnum.SLAVE);
System.out.println("读");
}

public static void movies(){
set(DBTypeEnum.MOVIES);
System.out.println("movies");
}

public static void clear(){
contextHolder.remove();
}

}

重写路由选择类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
scala复制代码package com.my.equipment.utils;


import org.springframework.jdbc.datasource.lookup.AbstractRoutingDataSource;
import org.springframework.lang.Nullable;

public class MyRoutingDataSource extends AbstractRoutingDataSource {


@Nullable
@Override
protected Object determineCurrentLookupKey() {

return DBContextHolder.get();
}
}

配置Mybatis SqlSessionFactory 和事务管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
kotlin复制代码package com.my.equipment.config;

import org.apache.ibatis.jdbc.SQL;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.core.io.support.PathMatchingResourcePatternResolver;
import org.springframework.core.io.support.ResourcePatternResolver;
import org.springframework.jdbc.datasource.DataSourceTransactionManager;
import org.springframework.transaction.PlatformTransactionManager;
import org.springframework.transaction.annotation.EnableTransactionManagement;

import javax.annotation.Resource;
import javax.sql.DataSource;



@Configuration
@EnableTransactionManagement
public class MyBatisConfig {

@Value("${mybatis.mapper-locations}")
private String mapperLocation;

@Resource(name = "myRoutingDataSource")
private DataSource myRoutingDataSource;

@Bean
public SqlSessionFactory sqlSessionFactory() throws Exception {
SqlSessionFactoryBean sqlSessionFactoryBean=new SqlSessionFactoryBean();
sqlSessionFactoryBean.setDataSource(myRoutingDataSource);

ResourcePatternResolver resolver=new PathMatchingResourcePatternResolver();
sqlSessionFactoryBean.setMapperLocations(resolver.getResources(mapperLocation));
return sqlSessionFactoryBean.getObject();
}

@Bean
public PlatformTransactionManager platformTransactionManager(){
return new DataSourceTransactionManager(myRoutingDataSource);
}
}

配置数据源

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
kotlin复制代码package com.my.equipment.config;


import com.my.equipment.utils.DBTypeEnum;
import com.my.equipment.utils.MyRoutingDataSource;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.boot.jdbc.DataSourceBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;

@Configuration
public class DatasourceConfig {

/**
* 配置从数据库
* @return
*/
@Bean(name = "slaveDataSource")
@ConfigurationProperties("spring.datasource.slave")
public DataSource slaveDataSource(){
return DataSourceBuilder.create().build();
}

/**
* 配置主数据库
* @return
*/
@Bean(name = "masterDataSource")
@ConfigurationProperties("spring.datasource.master")
public DataSource masterDataSource(){
return DataSourceBuilder.create().build();
}

@Bean(name = "moviesDataSource")
@ConfigurationProperties("spring.datasource.movies")
public DataSource moviesDataSource(){
return DataSourceBuilder.create().build();
}


@Bean
public DataSource myRoutingDataSource(@Qualifier("slaveDataSource") DataSource slaveDataSource,
@Qualifier("masterDataSource") DataSource masterDataSource,
@Qualifier("moviesDataSource") DataSource moviesDataSource){



Map<Object,Object> targetDataSource=new HashMap<>();
targetDataSource.put(DBTypeEnum.MASTER,masterDataSource);
targetDataSource.put(DBTypeEnum.SLAVE,slaveDataSource);
targetDataSource.put(DBTypeEnum.MOVIES,moviesDataSource);

MyRoutingDataSource myRoutingDataSource=new MyRoutingDataSource();
//找不到用默认数据源
myRoutingDataSource.setDefaultTargetDataSource(masterDataSource);
//可选择的目标数据源
myRoutingDataSource.setTargetDataSources(targetDataSource);

return myRoutingDataSource;


}

}

切面实现数据源切换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
java复制代码package com.my.equipment.aspect;


import com.my.equipment.utils.DBContextHolder;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.stereotype.Component;

@Aspect
@Component
public class DataSourceAop {



//com.my.equipment.web.dao及其子包下所有的方法
@Pointcut("execution(* com.my.equipment.web.dao..*.*(..))")
public void writePoint(){

}

//com.my.equipment.web.seSlaveDao及其子包下所有的方法(..*代表当前及其子目录)
@Pointcut("execution(* com.my.equipment.web.seSlaveDao..*.*(..))")
public void readPoint(){

}

@Pointcut("execution(* com.my.equipment.web.seMoviesDao..*.*(..))")
public void moviesPoint(){

}

@Before("readPoint()")
public void read(){
DBContextHolder.slave();
}

@Before("writePoint()")
public void write(){
DBContextHolder.master();
}

@Before("moviesPoint()")
public void movies(){
DBContextHolder.movies();
}
}

注意:事务问题,可以发现一但添加了@Transactional,那么它的数据源只会去使用默认的数据源。(由于 AbstractRoutingDataSource中有个机制,如果当前上下文的连接对象为空,获取一个连接对象,然后保存起来,下次doBegin再调用时,就直接用这个连接了,根本不做任何切换(类似于缓存命中!),这就导致切换失败,也许有人会想到提高注入优先级,但是本文的切面是基于Dao的,那么会导致无论你怎么调整优先级,务必是@Transctional优先注入,那么要解决该问题个人目前的思路是:在Controller优先切换数据源,在services中使用@Transactional该方法)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
less复制代码@ApiOperation(value = "结束录制")
@GetMapping(value = "/stop")
public String stop() {
try {
for (Map.Entry<Integer, Process> entry : map.entrySet()) {
rtspToMP4.stopRecord(entry.getValue());
}
DBContextHolder.set(DBTypeEnum.MOVIES);
masterSensorService.saveVideoUrl(names,urls);
} catch (Exception e) {
e.printStackTrace();
return "录制失败";
}
return "结束录制";
}
1
2
3
4
5
6
7
8
9
10
less复制代码@Override
@Transactional
public void saveVideoUrl(List<String> names, List<String> urls) {
for (int i = 0; i < names.size(); i++) {
RecordVideo recordVideo=new RecordVideo();
recordVideo.setUrl(urls.get(i));
recordVideo.setName(names.get(i));
recordVideoMapper.insertSelective(recordVideo);
}
}

但是该方法仍然属于非分布式数据库事务层面,无法做到在一个方法中回滚两个不同的数据源。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Apache Flink 在汽车之家的应用与实践 一、背景及

发表于 2021-10-11

简介: 汽车之家如何基于 Flink 上线了 AutoStream 平台并持续打磨。

本文整理自汽车之家实时计算平台负责人邸星星在 Flink Forward Asia 2020 分享的议题《Apache Flink 在汽车之家的应用及实践》。主要内容包括:

  1. 背景及现状
  2. AutoStream 平台
  3. 基于 Flink 的实时生态建设
  4. 后续规划

一、背景及现状

1. 第一阶段

在 2019 年之前,汽车之家的大部分实时业务都是运行在 Storm 之上的。Storm 作为早期主流的实时计算引擎,凭借简单的 Spout 和 Bolt 编程模型以及集群本身的稳定性,俘获了大批用户,我们在 2016 年搭建了 Storm 平台。

随着实时计算的需求日渐增多,数据规模逐步增大,Storm 在开发及维护成本上都凸显了不足,这里列举几个痛点:

  1. 开发成本高
  2. 我们一直是用的 Lambda 架构,会用 T+1 的离线数据修正实时数据,即最终以离线数据为准,所以计算口径实时要和离线完全保持一致,实时数据开发的需求文档就是离线的 SQL,实时开发人员的核心工作就是把离线的 SQL 翻译成 Storm 代码,期间虽然封装了一些通用的 Bolt 来简化开发,但把离线动辄几百行的 SQL 精准地翻译成代码还是很有挑战的,并且每次运行都要经过打包、上传、重启的一系列的繁琐操作,调试成本很高。
  3. 计算低效
  4. Storm 对状态支持的不好,通常需要借助 Redis、HBase 这类 kv 存储维护中间状态,我们之前是强依赖 Redis。比如常见的计算 UV 的场景,最简单的办法是使用 Redis 的 sadd 命令判断 uid 是否为已经存在,但这种方法会带来很高的网络 IO,同时如果没有提前报备的大促或搞活动导致流量翻倍的情况,很容易把 Redis 内存搞满,运维同学也会被杀个措手不及。同时 Redis 的吞吐能力也限制了整个作业的吞吐量。
  5. 难以维护、管理
  6. 由于采用编写 Storm 代码方式开发,难以分析元数据及血缘关系,同时可读性差,计算口径不透明,业务交接成本很高。
  7. 对数仓不友好
  8. 数据仓库团队是直接对接业务需求的团队,他们更熟悉基于 Hive 的 SQL 开发模式,通常都不擅长 Storm 作业的开发,这导致一些原本是实时的需求,只能退而求其次选择 T+1 的方式给出数据。

在这个阶段,我们支持了最基本的实时计算需求,因为开发门槛比较高,很多实时业务都是由我们平台开发来完成,既做平台,又做数据开发,精力分散很严重。

2. 第二阶段

我们从 2018 年开始调研 Flink 引擎,其相对完备的 SQL 支持,天生对状态的支持吸引了我们,在经过学习调研后,2019 年初开始设计开发 Flink SQL 平台,并于 2019 年中上线了 AutoStream 1.0 平台。平台上线之初就在仓库团队、监控团队和运维团队得以应用,能够快速被用户主要得益于以下几点:

  1. 开发、维护成本低:汽车之家大部分的实时任务可以用 Flink SQL + UDF 实现。平台提供常用的 Source 和 Sink,以及业务开发常用的 UDF,同时用户可以自己编写 UDF。基于 “SQL + 配置” 的方式完成开发,可以满足大部分需求。对于自定义任务,我们提供方便开发使用的 SDK,助力用户快速开发自定义 Flink 任务。平台面向的用户已经不只是专业的数据开发人员了,普通开发、 测试、运维人员经过基本的学习都可以在平台上完成日常的实时数据开发工作,实现平台赋能化。数据资产可管理,SQL 语句本身是结构化的,我们通过解析一个作业的 SQL,结合 source、 sink 的 DDL,可以很容易的知道这个作业的上下游,天然保留血缘关系。
  2. 高性能:Flink 可以完全基于状态 (内存,磁盘) 做计算,对比之前依赖外部存储做计算的场景,性能提升巨。在 818 活动压测期间,改造后的程序可以轻松支持原来几十倍流量的实时计算,且横向扩展性能十分良好。
  3. 全面的监控报警:用户将任务托管在平台上,任务的存续由平台负责,用户专注于任务本身的逻辑开发本身即可。对于 SQL 任务,SQL 的可读性极高,便于维护;对于自定义任务,基于我们 SDK 开发,用户可以更专注于梳理业务逻辑上。不论是 SQL 任务还是 SDK,我们都内嵌了大量监控,并与报警平台关联,方便用户快速发现分析定位并修复任务,提高稳定性。
  4. 赋能业务:支持数仓分层模型,平台提供了良好的 SQL 支持,数仓人员可以借助 SQL,将离线数仓的建设经验应用于实时数仓的建设上,自平台上线后,数仓逐步开始对接实时计算需求。

痛点:

  1. 易用性有待提高,比如用户无法自助管理 UDF,只能使用平台内置的 UDF 或者把打好的 jar 包发给平台管理员,通过人工的方式处理上传问题。
  2. 随着平台作业量的高速增长,平台 on-call 成本非常高。首先我们经常面对一些新用户的基础问题:
  3. 平台的使用问题;
  4. 开发过程中遇到的问题,比如为什么打包报错;
  5. Flink UI 的使用问题;
  6. 监控图形的含义,如何配置报警。
  7. 还有一些不太容易快速给出答案的问题:
  8. Jar 包冲突;
  9. 为什么消费 Kafka 延迟;
  10. 任务为什么报错。
  11. 尤其是延迟问题,我们常见的数据倾斜,GC,反压问题可以直接引导用户去 Flink UI 和监控图表上去查看,但有时候还是需要手动去服务器上查看 jmap、jstack 等信息,有时候还需要生成火焰图来帮助用户定位性能问题。
  12. 初期我们没有和运营团队合作,完全是我们开发人员直接对接处理这些问题,虽然期间补充了大量的文档,但是整体上 on-call 成本还是很高。
  13. 在 Kafka 或 Yarn 出现故障时,没有快速恢复的方案,当面对一些重保业务时,有些捉襟见肘。众所周知,没有永远稳定,不出故障的环境或组件,当有重大故障出现时,需要有快速恢复业务的应对方案。
  14. 资源没有合理管控,存在比较严重的资源浪费的情况。随着使用平台开发任务的用户不断增加,平台的作业数也不断增加。有些用户不能很好的把控集群资源的使用,经常出现过多申请资源的问题,导致作业运行效率低下甚至空闲,造成了资源的浪费。

在 AutoStream1.0 平台这个阶段,基于 SQL 开发的方式极大地降低了实时开发的门槛,各业务方可以自己实现实时业务的开发,同时数仓同学经过简单的学习后,就开始对接实时业务,将我们平台方从大量的业务需求中释放出来,让我们可以专心做平台方面的工作。

3. 当前阶段

针对上面的几个方面,我们有针对性行的做了以下几点升级:

  1. 引入 Jar Service:支持用户自助上传 UDF jar 包,并在 SQL 片段中自助引用,实现自助管理 UDF。同时自定义作业也可以配置 Jar Service 中的 Jar,面对多个作业共用同一个 Jar 的场景,用户只需要在作业中配置 Jar Service 中的 jar 包路径就可以,避免每次上线都重复上传 Jar 的繁琐操作;
  2. 自助诊断:我们开发了动态调整日志级别、自助查看火焰图等功能,方便用户自己定位问题,减轻我们日常 on-call 成本;
  3. 作业健康检查功能:从多个维度分析,为每个 Flink 作业打分,每个低分项都相应的给出建议;
  4. Flink 作业级别的快速容灾恢复:我们建设了两套 YARN 环境,每一个 YARN 对应一个单独的 HDFS,两个 HDFS 之前通过 SNAPSHOT 方式进行 Checkpoint 数据的双向复制,同时在平台上增加了切换集群的功能,在一个 YARN 集群不可用的情况下,用户可以自助在平台上,选择备用集群的 Checkpoint;
  5. Kafka 多集群架构支持:使用我们自研的 Kafka SDK,支持快速切换 Kafka 集群;
  6. 对接预算系统:每个作业占用的资源都直接对应到预算团队,这样一定程度上保证资源不会被其他团队占用,同时每个团队的预算管理员可以查看预算使用明细,了解自己的预算支持了团队内的哪些业务。

目前用户对平台的使用已经趋于熟悉,同时自助健康检查和自助诊断等功能的上线,我们平台方的日常 on-call 频率在逐步降低,开始逐渐进入平台建设的良性循环阶段。

4. 应用场景

汽车之家用于做实时计算的数据主要分为三类:

  1. 客户端日志,也就是我们内部说的点击流日志,包括用户端上报的启动日志、时长日志、PV 日志、点击日志以及各类事件日志,这类主要是用户行为日志,是我们建设实时数仓中流量宽表、UAS 系统、实时画像的基础,在这之上还支持了智能搜索、智能推荐等在线业务;同时基础的流量数据也用于支持各业务线的流量分析、实时效果统计,支持日常运营决策。
  2. 服务端日志,包括 nginx 日志、各类后端应用产生的日志、各种中间件的日志。这些日志数据主要用于后端服务的健康监测、性能监控等场景。
  3. 业务库的实时变更记录,主要有三种:MySQL 的 binlog,SQLServer 的 CDC,TiDB 的 TiCDC 数据,基于这些实时的数据变更记录,我们通过对各种内容数据的抽象与规范,建设了内容中台、资源池等基础服务;也有一些做简单逻辑的业务数据实时统计场景,结果数据用于实时大屏、罗盘等,做数据展现。

以上这三类数据都会实时写入 Kafka 集群,在 Flink 集群中针对不同场景进行计算,结果数据写入到 Redis、MySQL、Elasticsearch、HBase、Kafka、Kylin 等引擎中,用于支持上层应用。

下面列举了一些应用场景:

5. 集群规模

目前 Flink 集群服务器 400+,部署模式为 YARN (80%) 和 Kubernetes,运行作业数 800+,日计算量 1 万亿,峰值每秒处理数据 2000 万条。

二、AutoStream 平台

1. 平台架构

上面是 AutoStream 平台目前的整体架构,主要是以下几部分内容:

  1. AutoStream core System
  2. 这是我们平台的核心服务,负责对元数据服务、Flink 客户端服务、Jar 管理服务及交互结果查询服务进行整合,通过前端页面把平台功能暴露给用户。
  3. 主要包括 SQL 和 Jar 作业的管理、库表信息的管理、UDF 管理、操作记录及历史版本的管理、健康检查、自助诊断、报警管理等模块,同时提供对接外部系统的能力,支持其他系统通过接口方式管理库表信息、SQL 作业信息及作业启停操作等。基于 Akka 任务的生命周期管理和调度系统提供了高效,简单,低延迟的操作保障,提升了用户使用的效率和易用性。
  4. 元数据服务 (Catalog-like Unified Metastore)
  5. 主要对应 Flink Catalog 的后端实现,除了支持基本的库表信息管理外,还支持库表粒度的权限控制,结合我们自身的特点,支持用户组级别的授权。
  6. 底层我们提供了 Plugin Catalog 机制,既可以用于和 Flink 已有的 Catalog 实现做集成,也可以方便我们嵌入自定义的 Catalogs,通过 Plugin 机制可以很容易的重用 HiveCatalog,JdbcCatalog 等,从而保证了库表的周期的一致性。
  7. 同时元数据服务还负责对用户提交的 DML 语句进行解析,识别当前作业的依赖的表信息,用于作业的分析及提交过程,同时可以记录血缘关系。
  8. Jar Service
  9. 平台提供的各类 SDK 在 Jar Service 上进行统一管理,同时用户也可以在平台上把自定义 Jar、UDF jar 等提交到 Jar Service 上统一管理,然后在作业中通过配置或 DDL 引用。
  10. Flink 客户端服务 (Customed Flink Job Client)
  11. 负责把平台上的作业转化成 Flink Job 提交到 Yarn 或 Kubernetes 上,我们在这一层针对 Yarn 和 Kubernetes 做了抽象,统一两种调度框架的行为,对外暴露统一接口及规范化的参数,弱化 Yarn 和 Kubernetes 的差异,为 Flink 作业在两种框架上无缝切换打下了良好的基础。
  12. 每个作业的依赖不尽相同,我们除了对基础依赖的管理以外,还需要支持个性化的依赖。比如不同版本的 SQL SDK,用户自助上传的 Jar、UDF 等,所以不同作业的提交阶段需要做隔离。
  13. 我们采用的是 Jar service + 进程隔离的方式,通过和 Jar Service 对接,根据作业的类型和配置,选用相应的 Jar,并且提交单独的进程中执行,实现物理隔离。
  14. 结果缓存服务 (Result Cache Serivce)
  15. 是一个简易的缓存服务,用于 SQL 作业开发阶段的在线调试场景。当我们分析出用户的 SQL 语句,将 Select 语句的结果集存入缓存服务中;然后用户可以在平台上通过选择 SQL 序号 (每个完整的 SELECT 语句对应一个序号),实时查看 SQL 对应的结果数据,方便用户开发与分析问题。
  16. 内置Connectors (Source & Sink)
  17. 最右侧的部分主要是各种 Source、Sink 的实现,有一些是重用 Flink 提供的 connector,有一些是我们自己开发的 connector。
  18. 针对每一种 connector 我们都添加了必要 Metric,并配置成单独的监控图表,方便用户了解作业运行情况,同时也为定位问题提供数据依据。

2. 基于 SQL 的开发流程

在平台提供以上功能的基础上,用户可以快速的实现 SQL 作业的开发:

  1. 创建一个 SQL 任务;
  2. 编写 DDL 声明 Source 和 Sink;
  3. 编写 DML,完成主要业务逻辑的实现;
  4. 在线查看结果,若数据符合预期,添加 INSERT INTO 语句,写入到指定 Sink 中即可。

平台默认会保存 SQL 每一次的变更记录,用户可以在线查看历史版本,同时我们会记录针对作业的各种操作,在作业维护阶段可以帮助用户追溯变更历史,定位问题。

下面是一个 Demo,用于统计当天的 PV、UV 数据:

3. 基于 Catalog 的元数据管理

元数据管理的主要内容:

  1. 支持权限控制:除了支持基本的库表信息管理外,还支持表粒度的权限控制,结合我们自身的特点,支持用户组级别的授权;
  2. Plugin Catalog 机制:可以组合多种其他 Catalog 实现,复用已有的 Catalog;
  3. 库表生命周期行为统一:用户可以选择平台上的表和底层存储的生命周期统一,避免两边分别维护,重复建表;
  4. 新老版本完全兼容:由于在 AutoStream 1.0 的时候,我们没有单独引入 Metastore 服务,此外 1.0 时期的 DDL SQL 解析模块是自研的组件。所以在建设 MetaStore 服务时,需要考虑历史作业和历史库表信息兼容的问题。
  5. 对于库表信息,新的 MetaStore 在底层将新版和旧版的库表信息转换成统一的存储格式,从而保证了库表信息的兼容性。
  6. 对于作业,这里我们通过抽象接口,并分别提供 V1Service 和 V2Service 两种实现路径,保证了新老作业在用户层面的兼容。

下面是几个模块和 Metastore 交互的示意图:

4. UDXF 管理

我们引入了 Jar Service 服务用来管理各种 Jar,包括用户自定义作业、平台内部 SDK 组件、UDXF 等,在 Jar Service 基础上我们可以很容易的实现 UDXF 的自助管理,在 On k8s 的场景下,我们提供了统一的镜像,Pod 启动后会从 Jar Service 下载对应的 Jar 到容器内部,用于支持作业的启动。

用户提交的 SQL 中如果包含 Function DDL,我们会在 Job Client Service 中会解析 DDL,下载对应的 Jar 到本地。

为了避免和其他作业有依赖冲突,我们每次都会单独启动一个子进程来完成作业提交的操作。UDXF Jar 会被并加入到 classpath 中,我们对 Flink 做了一些修改,作业提交时会把这个 Jar 一并上传到 HDFS 中;同时 AutoSQL SDK 会根据函数名称和类名为当前作业注册 UDF。

5. 监控报警及日志收集

得益于 Flink 完善的 Metric 机制,我们可以方便的添加 Metric,针对 Connector,我们内嵌了丰富的 Metric,并配置了默认的监控看板,通过看板可以查看 CPU、内存、JVM、网络传输、Checkpoint、各种 Connector 的监控图表。同时平台和公司的云监控系统对接,自动生成默认的报警策略,监控存活状态、消费延迟等关键指标。同时用户可以在云监控系统修改默认的报警策略,添加新的报警项实现个性化监控报警。

日志通过云 Filebeat 组件写入到 Elasticsearch 集群,同时开放 Kibana 供用户查询。

整体的监控报警及日志收集架构如下:

6. 健康检查机制

随着作业数的高速增长,出现了很多资源使用不合理的情况,比如前面提到的资源浪费的情况。用户大多时候都是在对接新需求,支持新业务,很少回过头来评估作业的资源配置是否合理,优化资源使用。所以平台规划了一版成本评估的模型,也就是现在说的健康检查机制,平台每天会针对作业做多维度的健康评分,用户可以随时在平台上查看单个作业的得分情况及最近 30 天的得分变化曲线。

低分作业会在用户登录平台时进行提示,并且定期发邮件提醒用户进行优化、整改,在优化作业后用户可以主动触发重新评分,查看优化效果。

我们引入了多维度,基于权重的评分策略,针对 CPU、内存使用率、是否存在空闲 Slot、GC 情况、Kafka 消费延迟、单核每秒处理数据量等多个维度的指标结合计算拓补图进行分析评估,最终产生一个综合分。

每个低分项都会显示低分的原因及参考范围,并显示一些指导建议,辅助用户进行优化。

我们新增了一个 Metric,用一个 0%~100% 的数字体现 TaskManagner CPU 利用率。这样用户可以直观的评估 CPU 是否存在浪费的情况。

下面是作业评分的大致流程:首先我们会收集和整理运行作业的基本信息和 Metrics 信息。然后应用我们设定好的规则,得到基本评分和基础建议信息。最后将得分信息和建议整合,综合评判,得出综合得分和最终的报告。用户可以通过平台查看报告。对于得分较低的作业,我们会发送报警给作业的归属用户。

7. 自助诊断

如之前提到的痛点,用户定位线上问题时,只能求助于我们平台方,造成我们 on-call 工作量很大,同时用户体验也不好,鉴于此,所以我们上线了以下功能:

  1. 动态修改日志级别:我们借鉴了 Storm 的修改日志级别的方式,在 Flink 上实现了类似功能,通过扩展 REST API 和 RPC 接口的方法,支持修改指定 Logger 的到某一日志级别,并支持设置一个过期时间,当过期后,改 Logger 的日志会重新恢复为 INFO 级别;
  2. 支持自助查看线程栈和堆内存信息:Flink UI 中已经支持在线查看线程栈 (jstack),我们直接复用了这个接口;还额外增加了查看堆内存 (jmap) 的接口,方便用户在线查看;
  3. 支持在线生成、查看火焰图:火焰图是定位程序性能问题的一大利器,我们利用了阿里的 arthas 组件,为 Flink 增加了在线查看火焰图的能力,用户遇到性能问题时,可以快速评估性能瓶颈。

8. 基于 Checkpoint 复制的快速容灾

当实时计算应用在重要业务场景时,单个 Yarn 集群一旦出现故障且短期内不可恢复,那么可能会对业务造成较大影响。

在此背景下,我们建设了 Yarn 多集群架构,两个独立的 Yarn 各自对应一套独立的 HDFS 环境,checkpoint 数据定期在两个 HDFS 间相互复制。目前 checkpoint 复制的延迟稳定在 20 分钟内。

同时,在平台层面,我们把切换集群的功能直接开放给用户,用户可以在线查看 checkpoint 的复制情况,选择合适的 checkpoint 后 (当然也可以选择不从 checkpoint 恢复) 进行集群切换,然后重启作业,实现作业在集群间的相对平滑的迁移。

三、基于 Flink 的实时生态建设

AutoStream 平台的核心场景是支持实时计算开发人员的使用,使实时计算开发变得简单高效、可监控、易运维。同时随着平台的逐步完善,我们开始摸索如何对 AutoStream 平台进行重用,如何让 Flink 应用在更多场景下。重用 AutoStream 有以下几点优势:

  1. Flink 本身是优秀的分布式计算框架,有着较高的计算性能,良好的容错能力和成熟的状态管理机制,社区蓬勃发展,功能及稳定性有保障;
  2. AutoStream 有着完善的监控和报警机制,作业运行在平台上,无需单独对接监控系统,同时 Flink 对 Metric 支持很友好,可以方便的添加新的 Metric;
  3. 大量的技术沉淀和运营经验,通过两年多的平台建设,我们在 AutoStream 上已经实现了较为完善的 Flink 作业全生命周期的管理,并建设了 Jar Service 等基础组件,通过简单的上层接口包装,就可以对接其他系统,让其他系统具备实时计算的能力;
  4. 支持 Yarn 和 Kubernetes 部署。

基于以上几点,我们在建设其他系统时,优先重用 AutoStream 平台,以接口调用的方式进行对接,将 Flink 作业全流程的生命周期,完全托管给 AutoStream 平台,各系统优先考虑实现自身的业务逻辑即可。

我们团队内的 AutoDTS (接入及分发任务) 和 AutoKafka (Kafka 集群复制) 系统目前就是依托于 AutoStream 建设的。简单介绍一下集成的方式,以 AutoDTS 为例:

  1. 把任务 Flink 化,AutoDTS 上的接入、分发任务,都是以 Flink 作业的形式存在;
  2. 和 AutoStream 平台对接,调用接口实现 Flink 作业的创建、修改、启动、停止等操作。这里 Flink 作业既可以是 Jar,也可以是 SQL 作业;
  3. AutoDTS 平台根据业务场景,建设个性化的前端页面,个性化的表单数据,表单提交后,可以将表单数据存储到 MySQL 中;同时需要把作业信息以及 Jar 包地址等信息组装成 AutoStream 接口定义的格式,通过接口调用在 AutoStream 平台自动生成一个 Flink 任务,同时保存这个 Flink 任务的 ID;
  4. 启动 AutoDTS 的一个接入任务,直接调用 AutoStream 接口就实现了作业的启动。

1. AutoDTS 数据接入分发平台

AutoDTS 系统主要包含两部分功能:

  1. 数据接入:将数据库中的变更数据 (Change log) 实时写入到 Kafka;
  2. 数据分发:将接入到 Kafka 的数据,实时写入到其他存储引擎。

1.1 AutoDTS 数据接入

下面是数据接入的架构图:

我们维护了基于 Flink 的数据接入 SDK 并定义了统一的 JSON 数据格式,也就是说 MySQL Binlog,SQL Server、 TiDB 的变更数据接入到 Kafka 后,数据格式是一致的,下游业务使用时,基于统一格式做开发,无需关注原始业务库的类型。

数据接入到 Kafka Topic 的同时,Topic 会自动注册为一张 AutoStream 平台上的流表,方便用户使用。

数据接入基于 Flink 建设还有一个额外的好处,就是可以基于 Flink 的精确一次语义,低成本的实现精确一次数据接入,这对支持数据准确性要求很高的业务来说,是一个必要条件。

目前我们在做把业务表中的全量数据接入 Kafka Topic 中,基于 Kafka 的 compact 模式,可以实现 Topic 中同时包含存量数据和增量数据。这对于数据分发场景来说是十分友好的,目前如果想把数据实时同步到其他存储引擎中,需要先基于调度系统,接入一次全量数据,然后再开启实时分发任务,进行变更数据的实时分发。有了 Compact Topic 后,可以省去全量接入的操作。Flink1.12 版本已经对 Compact Topic 做支持,引入 upsert-kafka Connector [1]

[1] cwiki.apache.org/confluence/…

下面是一条样例数据:

默认注册到平台上的流表是 Schemaless 的,用户可以用 JSON 相关的 UDF 获取其中的字段数据。

下面是使用流表的示例:

1.2 AutoDTS 数据分发

我们已经知道,接入到 Kafka 中的数据是可以当做一张流表来使用的,而数据分发任务本质上是把这个流表的数据写入到其他存储引擎,鉴于 AutoStream 平台已经支持多种 Table Sink (Connector),我们只需要根据用户填写的下游存储的类型和地址等信息,就可以通过拼装 SQL 来实现数据的分发。

通过直接重用 Connector 的方式,最大化的避免了重复开发的工作。

下面是一个分发任务对应的 SQL 示例:

2. Kaka 多集群架构

Kafka 在实际应用中,有些场景是需要做 Kafka 多集群架构支持的,下面列举几个常见的场景:

  • 数据冗余灾备,实时复制数据到另一个备用集群,当一个 Kafka 集群不可用时,可以让应用切换到备用集群,快速恢复业务;
  • 集群迁移,当机房合同到期,或者上云时,都需要做集群的迁移,此时需要把集群数据整体复制到新机房的集群,让业务相对平滑迁移;
  • 读写分离场景,使用 Kafka 时,大多数情况都是读多写少,为保证数据写入的稳定性,可以选择建设 Kafka 读写分离集群。

我们目前建设了 Kafka 多集群架构,和 Flink 相关的主要有两块内容:

  1. Kafka 集群间数据复制的程序运行在 Flink 集群中;
  2. 改造了 Flink Kafka Connector,支持快速切换 Kafka 集群。

2.1 整体架构

先来看一下 Kafka 集群间的数据复制,这是建设多集群架构的基础。我们是使用 MirrorMaker2 来实现数据复制的,我们把 MirrorMaker2 改造成普通的 Flink 作业,运行在 Flink 集群中。

我们引入了 Route Service 和 Kafka SDK,实现客户端快速切换访问的 Kafka 集群。

客户端需要依赖我们自己发布的 Kafka SDK,并且配置中不再指定 bootstrap.servers 参数,而是通过设置 cluster.code 参数来声明自己要访问的集群。 SDK 会根据 cluster.code 参数,访问 Route Service 获取集群真正的地址,然后创建 Producer/Consumer 开始生产/消费数据。

SDK 会监听路由规则的变化,当需要切换集群时,只需要在 Route Service 后台切换路由规则,SDK 发现路由集群发生变化时,会重启 Producer/Consumer 实例,切换到新集群。

如果是消费者发生了集群切换,由于 Cluster1 和 Cluster2 中 Topic 的 offset 是不同的,需要通过 Offset Mapping Service 来获取当前 Consumer Group 在 Cluster2 中的 offset,然后从这些 Offset 开始消费,实现相对平滑的集群切换。

2.2 Kafka 集群间的数据复制

我们使用 MirrorMaker2 来实现集群间的数据复制,MirrorMaker2 是 Kafka 2.4 版本引入的,具体以下特性:

  • 自动识别新的 Topic 和 Partition;
  • 自动同步 Topic 配置:Topic 的配置会自动同步到目标集群;
  • 自动同步 ACL;
  • 提供 Offset 的转换工具:支持根据源集群、目标集群及 Group 信息,获取到该 Group 在目标集群的中对应的 Offset 信息;
  • 支持扩展黑白名单策略:可以灵活定制,动态生效。

clusters = primary, backup

primary.bootstrap.servers = vip1:9091

backup.bootstrap.servers = vip2:9092

primary->backup.enabled = true

backup->primary.enabled = true

这段配置完成 primary 到 backup 集群的双向数据复制,primary 集群中的 topic1 中的数据会复制到 backup 集群中的 primary.topic1 这个 Topic 中,目标集群的Topic 命名规则是 sourceCluster.sourceTopicName,可以通过实现 ReplicationPolicy 接口来自定义命名策略。

2.3 MirrorMaker2 相关的 Topic 介绍

  • 源集群中的 Topic
  • heartbeats:存储心跳数据;
  • mm2-offset-syncs.targetCluster.internal:存储源集群 (upstreamOffset) 和目标集群的 offset(downstreamOffset) 对应关系。
  • 目标集群中的 Topic
  • mm2-configs.sourceCluster.internal:connect 框架自带,用来存储配置;
  • mm2-offsets.sourceCluster.internal:connect 框架自带,用来存储 WorkerSourceTask 当前处理的 offset,mm2 场景下是为了当前数据同步到源集群 topic partition 的哪一个 offset,这个更像是 Flink 的 checkpoint 概念;
  • mm2-status.sourceCluster.internal:connect 框架自带,用来存储 connector 状态。

上面三个用的都是 connect runtime 模块中的 KafkaBasedLog 工具类,这个工具类可以读写一个 compact 模式的 topic 数据,此时 MirrorMaker2 把 topic 当作 KV 存储使用。

sourceCluster.checkpoints.internal:记录 sourceCluster consumer group 在当前集群对应的 offset,mm2 会定期从源 kafka 集群读取 topic 对应的 consumer group 提交的 offset, 并写到目标集群的 sourceCluster.checkpoints.internal topic 中。

2.4 MirrorMaker2 的部署

下面是 MirrorMaker2 作业运行的流程,在 AutoKafka 平台上创建一个数据复制作业,会调用 AutoStream 平台接口,相应的创建一个 MM2 类型的作业。启动作业时,会调用 AutoStream 平台的接口把 MM2 作业提交到 Flink 集群中运行。

2.5 路由服务

Route Service 负责处理客户端的路由请求,根据客户端的信息匹配合适的路由规则,将最终路由结果,也就是集群信息返回给客户端。

支持基于集群名称、Topic、Group、ClientID 以及客户端自定义的参数灵活配置路由规则。

下面的例子就是将 Flink 作业 ID 为 1234 的消费者,路由到 cluster_a1 集群。

2.6 Kafka SDK

使用原生的 kafka-clients 是无法和 Route Service 进行通信的,客户端需要依赖我们提供的 Kafka SDK (汽车之家内部开发的 SDK) 能和 Route Service 通信,实现动态路由的效果。

Kafka SDK 实现了 Producer、Consumer 接口,本质是 kafka-clients 的代理,业务做较少的改动就可以引入 Kafka SDK。

业务依赖 Kafka SDK 后,Kafka SDK 会负责和 Route Service 通信,监听路由变化,当发现路由的集群发生变化时,会 close 当前的 Producer/Consumer,创建新的 Producer/Consumer,访问新的集群。

此外 Kafka SDK 还负责将 Producer、Consumer 的 metric 统一上报到云监控系统的 prometheus,通过查看平台预先配置好的仪表盘,可以清晰的看到业务的生产、消费情况。

同时 SDK 会收集一些信息,比如应用名称、IP 端口、进程号等,这些信息可以在 AutoKafka 平台上查到,方便我们和用户共同定位问题。

2.7 Offset Mapping Service

当 Consumer 的路由发生变化并切换集群时,情况有一些复杂,因为目前 MirrorMaker2 是先把数据从源集群消费出来,再写入到目标集群的,同一条数据可以确保写入到目标 topic 的相同分区,但是 offset 和源集群是不同的。

针对这种 offset 不一致的情况,MirrorMaker2 会消费源集群的 __consumer_offsets 数据,加上目标集群对应的 offset,写入到目标集群的 sourceCluster.checkpoints.internal topic 中。

同时,源集群的 mm2-offset-syncs.targetCluster.internal topic 记录了源集群和目标集群 offset 的映射关系,结合这两个 topic,我们建设了 Offset Mapping Service 来完成目标集群的 offset 的转换工作。

所以当 Consumer 需要切换集群时,会调用 Offset Mapping Service 的接口,获取到目标集群的 offsets,然后主动 seek 到这些位置开始消费,这样实现相对平滑的集群切换工作。

2.8 Flink 与 Kafka 多集群架构的集成

由于 Kafka SDK 兼容 kafka-clients 的用法,用户只需要更换依赖,然后设置 cluster.code、Flink.id 等参数即可。

当 Producer/Consumer 发生集群切换后,由于创建了新的 Producer/Consumer 实例,Kafka 的 metric 数据没有重新注册,导致 metric 数据无法正常上报。我们在 AbstractMetricGroup 类中增加了 unregister 方法,在监听 Producer/Consumer 的切换事件时,重新注册 kafka metrics 就可以了。

至此我们完成了 Flink 对 Kafka 多集群架构的支持。

四、后续规划

  1. 目前我们支持的数据统计类场景大多是基于流量数据或用户行为数据的,这些场景对精确一次的语义要求不高,随着目前社区对 Change Log 支持的逐步完善,同时我们的数据接入体系是支持精确一次语义的,并且正在做业务表全量接入到 Kafka 的功能,所以后续可以实现精确一次的数据统计,支持交易、线索、金融类的统计需求。
  2. 一些公司已经提出湖仓一体的理念,数据湖技术确实可以解决一些原有数仓架构的痛点,比如数据不支持更新操作,无法做到准实时的数据查询。目前我们在做一些 Flink 和 Iceberg、Hudi 集成的一些尝试,后续会在公司寻找场景并落地。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

浅入浅出Spring循环依赖

发表于 2021-10-11

小知识,大挑战!本文正在参与「程序员必备小知识」创作活动

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。


在日落大道浪漫出逃
除了风没有人知道

前言

哈喽大家好,我是一条

最近被面试搞得很是烦躁,特别是看到一些面试官上来就黑着个脸,真想赶紧面试结束。

可为了生活,还是得忍着,归根结底还是太菜了!

最近有粉丝问到了循环依赖问题,以后再有人问你,拿这篇“吊打”他。

概念

什么是循环依赖?

多个bean之间相互依赖,形成了一个闭环。比如:A依赖于B、B依赖于C、C依赖于A。

image.png

通常来说,如果问Spring容器内部如何解决循环依赖,一定是指默认的单例Bean中,基于set方法构造注入的属性互相引用的场景。

循环依赖的种类及能否解决如下:

名称 是否可解决循环依赖
构造器循环依赖 否
Setter循环依赖 是
Prototype作用域的循环依赖 否

报错信息

1
ini复制代码Unsatisfied dependency expressed through constructor parameter 0; nested exception is org.springframework.beans.factory.BeanCurrentlyInCreationException: Error creating bean with name ‘myDao’: Requested bean is currently in creation: Is there an unresolvable circular reference?

翻译一下

1
arduino复制代码通过构造函数参数 0 表示的依赖关系未得到满足;嵌套的异常是 创建名称为'myDao'的bean时出错。请求的Bean目前正在创建中。是否存在一个无法解决的循环引用?

异常信息:bean当前创建异常org.springframework.beans.factory.BeanCurrentlyInCreationException。

通俗版理解

两人拿枪对峙

现在甲乙两个人,互相拿枪对峙,甲说乙先放,乙说甲先放。就是不开枪。

哎,就是玩!

相信这个场景大家在电视剧里都见过吧,最后一般是“反派死于话多”。

但是回到我们 spring里,我们是不希望有人死亡的,也就是必须两个bean都创建出来,怎么办?

必须有一人妥协

解决方案就是:必须有一个人先妥协。

甲说:我退一步,我先把弹夹卸了,你把枪放下。

乙一听就感动了,满含热泪的拿枪放下了。

甲一看乙没有打自己,也热泪盈眶,两人紧紧相拥。

从此过上了幸福美满的生活……

Spring版理解

回到我们spring里,先回顾一下bean的生命周期:

  • 实例化
  • 属性赋值
  • 初始化
  • 销毁

简单理解一下的上面的过程

实例化和初始化什么区别?

是不是只差了中间赋值的过程,那只实例化的bean可以使用吗?

当然不可以!

也就是说只实例化的bean是一个半成品,初始化之后才是成品,才可以使用。

现在A依赖B,B依赖A。

A对B说:我要完整的你

b也对a:我要完整的你

ok,两人打起来了,拿枪对峙。怎么解决?是不是得一个人妥协。

a说:算了吧,你给我个你的半成品,我将就一下。

b心里寻思,他用我的半成品创建一个完整的a,然后我就可以创建了。

心里这么想,嘴上就爽快答应着:行,没问题。

如此,a创建了完整的自己,b拿着a也完成了创建。

问题解决。

真的解决了吗?成品和半成品都存在哪里呢?

这就不得不提到大名鼎鼎的三级缓存。

三级缓存

spring提供了三级缓存来存放成品和半成品及工厂。位于DefaultSingletonBeanRegistry类中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码public class DefaultSingletonBeanRegistry extends SimpleAliasRegistry implements SingletonBeanRegistry {   
/**
*一级缓存:单例池
*存放已经初始化的bean——成品
*/
private final Map<String, Object> singletonObjects = new ConcurrentHashMap(256);
/**
*三级缓存:单例工厂的高速缓存
*存放生成bean的工厂
*/
private final Map<String, ObjectFactory<?>> singletonFactories = new HashMap(16);
/**
*二级缓存:早期单例对象的高速缓存
*存放已经实例化但未初始化(未填充属性)的的bean——半成品
*/
private final Map<String, Object> earlySingletonObjects = new HashMap(16);
}

创建过程(简易版)

如果你是面试突击,建议把简易版被下来就可以应付面试了

等有时间再看源码版

假如A依赖B,B依赖A,那么这两个类之间形成了一个循环依赖

  • A先开始创建,通过其无参构造方法创建bean的实例,并将其实例放入到「二级缓存」提前暴露出来。A停止。
  • B开始创建,先去「一级缓存」找A的成品,找不到,再去「二级缓存」里找,还找不到,再去「三级缓存」里找,找到了A的创建工厂,通过工厂,拿到A的半成品,并将A放到「二级缓存」。
  • 拿到A后,B完成创建,将自己放入「一级缓存」。
  • 此时A继续创建,同样从「一级缓存」开始找,拿到B后完成创建,将自己放入「一级缓存」。

创建过程(源码版)

源码版建议配合spring源码边debug边食用。

1、当我们在调用getBean()获取bean时,实际调用的是doGetBean() 方法。doGetBean() 想要获取 beanA ,于是调用 getSingleton() 方法从缓存中查找 beanA

2、在 getSingleton() 方法中,从「一级缓存」中查找,没有,返回 null

3、doGetBean() 方法中获取到 beanA 为 null ,于是走对应的处理逻辑,调用 getSingleton() 的重载方法(参数为 ObjectFactory 的)

4、在 getSingleton()方法中,先将 beanA_name 添加到一个集合中,用于标记该 bean 正在创建中,然后回调匿名内部类的 createBean 方法

5、进入 AbstractAutowireCapableBeanFactory#doCreateBean,先反射调用构造器创建出 beanA 的实例,然后判断,是否为单例,是否允许提前暴露引用(对于单例一般为true)、是否正在创建中(即是否是在第四步的集合中)判断为 true 则将 beanA 添加到「三级缓存」中

6、对 beanA 进行属性填充,此时检测到 beanA 依赖于 beanB ,于是查找 beanB

7、调用 doGetBean() 方法,和上面 beanA 的过程一样,到缓存中查询 beanB ,没有则创建,然后给 beanB 填充属性

8、此时 beanB 依赖于 beanA ,调用 getSingleton() 获取 beanA ,依次从一级、二级、三级缓存中找、此时从「三级缓存」中获取到 beanA 的创建工厂,通过创建工厂获取到 singletonObject ,此时这个 singletonObject 指向的就是上面在 doCreateBean() 方法中实例化的 beanA

9、这样 beanB 就获取到了 beanA 的依赖,于是 beanB 顺利完成初始化,并将 beanA 从「三级缓存」移动到「二级缓存」中

10、随后 beanA 继续他的属性填充工作,此时也获取到了 beanB ,beanA 也随之完成了创建,回到 getSingleton() 方法中继续向下执行,将 beanA 从「二级缓存」移动到「一级缓存」中

image-20210729201728917

最后

朋友给我新设计了个logo,大家觉得哪个好一点呢?

image-20210729202951923
评论区留言可以参与抽奖,掘金官方将在掘力星计划活动结束后,在评论区抽送100份掘金周边。

为了回馈各位粉丝,礼尚往来,给大家准备了一条多年积累下来的优质资源,包括 学习视频、面试资料、珍藏电子书等。领取

在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL 定时备份数据库(包含脚本)

发表于 2021-10-11

在操作数据过程中,可能会导致数据错误,甚至数据库奔溃,而有效的定时备份能很好地保护数据库。本篇文章主要讲述了几种方法进行 MySQL 定时备份数据库。

一. mysqldump命令备份数据

在MySQL中提供了命令行导出数据库数据以及文件的一种方便的工具mysqldump,我们可以通过命令行直接实现数据库内容的导出dump,首先我们简单了解一下mysqldump命令用法:

1
2
css复制代码#MySQLdump常用
mysqldump -u root -p --databases 数据库1 数据库2 > xxx.sql

二. mysqldump常用操作示例

  1. 备份全部数据库的数据和结构
1
bash复制代码mysqldump -uroot -p123456 -A > /data/mysqlDump/mydb.sql

2.备份全部数据库的结构(加 -d 参数)

1
bash复制代码mysqldump -uroot -p123456 -A -d > /data/mysqlDump/mydb.sql
  1. 备份全部数据库的数据(加 -t 参数)
1
bash复制代码mysqldump -uroot -p123456 -A -t > /data/mysqlDump/mydb.sql

4.备份单个数据库的数据和结构(,数据库名mydb)

1
bash复制代码mysqldump -uroot-p123456 mydb > /data/mysqlDump/mydb.sql
  1. 备份单个数据库的结构
1
bash复制代码mysqldump -uroot -p123456 mydb -d > /data/mysqlDump/mydb.sql
  1. 备份单个数据库的数据
1
bash复制代码mysqldump -uroot -p123456 mydb -t > /data/mysqlDump/mydb.sql
  1. 备份多个表的数据和结构(数据,结构的单独备份方法与上同)
1
bash复制代码mysqldump -uroot -p123456 mydb t1 t2 > /data/mysqlDump/mydb.sql
  1. 一次备份多个数据库
1
bash复制代码mysqldump -uroot -p123456 --databases db1 db2 > /data/mysqlDump/mydb.sql

三. 还原 MySQL 备份内容

有两种方式还原,第一种是在 MySQL 命令行中,第二种是使用 SHELL 行完成还原

  1. 在系统命令行中,输入如下实现还原:
1
bash复制代码mysql -uroot -p123456 < /data/mysqlDump/mydb.sql
  1. 在登录进入mysql系统中,通过source指令找到对应系统中的文件进行还原:
1
shell复制代码mysql> source /data/mysqlDump/mydb.sql

在 Linux中,通常使用BASH脚本对需要执行的内容进行编写,加上定时执行命令crontab实现日志自动化生成。

以下代码功能就是针对mysql进行备份,配合crontab,实现备份的内容为近一个月(31天)内的每天的mysql数据库记录。

编写BASH维护固定数量备份文件

在Linux中,使用vi或者vim编写脚本内容并命名为:mysql_dump_script.sh

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
bash复制代码#!/bin/bash

#保存备份个数,备份31天数据
number=31
#备份保存路径
backup_dir=/root/mysqlbackup
#日期
dd=`date +%Y-%m-%d-%H-%M-%S`
#备份工具
tool=mysqldump
#用户名
username=root
#密码
password=TankB214
#将要备份的数据库
database_name=edoctor

#如果文件夹不存在则创建
if [ ! -d $backup_dir ];
then     
    mkdir -p $backup_dir;
fi

#简单写法 mysqldump -u root -p123456 users > /root/mysqlbackup/users-$filename.sql
$tool -u $username -p$password $database_name > $backup_dir/$database_name-$dd.sql

#写创建备份日志
echo "create $backup_dir/$database_name-$dd.dupm" >> $backup_dir/log.txt

#找出需要删除的备份
delfile=`ls -l -crt $backup_dir/*.sql | awk '{print $9 }' | head -1`

#判断现在的备份数量是否大于$number
count=`ls -l -crt $backup_dir/*.sql | awk '{print $9 }' | wc -l`

if [ $count -gt $number ]
then
  #删除最早生成的备份,只保留number数量的备份
  rm $delfile
  #写删除文件日志
  echo "delete $delfile" >> $backup_dir/log.txt
fi

如上代码主要含义如下:

1.首先设置各项参数,例如number最多需要备份的数目,备份路径,用户名,密码等。

2.执行mysqldump命令保存备份文件,并将操作打印至同目录下的log.txt中标记操作日志。

3.定义需要删除的文件:通过ls命令获取第九列,即文件名列,再通过实现定义操作时间最晚的那个需要删除的文件。

4.定义备份数量:通过ls命令加上

统计以sql结尾的文件的行数。

5.如果文件超出限制大小,就删除最早创建的sql文件

使用crontab定期执行备份脚本

在 Linux 中,周期执行的任务一般由cron这个守护进程来处理[ps -ef|grep cron]。cron读取一个或多个配置文件,这些配置文件中包含了命令行及其调用时间。cron的配置文件称为“crontab”,是“cron table”的简写。

cron服务

cron是一个 Liunx 下 的定时执行工具,可以在无需人工干预的情况下运行作业。

service crond start //启动服务 service crond stop //关闭服务 service crond restart //重启服务 service crond reload //重新载入配置 service crond status //查看服务状态

crontab语法

crontab命令用于安装、删除或者列出用于驱动cron后台进程的表格。用户把需要执行的命令序列放到crontab文件中以获得执行。每个用户都可以有自己的crontab文件。/var/spool/cron下的crontab文件不可以直接创建或者直接修改。该crontab文件是通过crontab命令创建的。

在crontab文件中如何输入需要执行的命令和时间。该文件中每行都包括六个域,其中前五个域是指定命令被执行的时间,最后一个域是要被执行的命令。每个域之间使用空格或者制表符分隔。

格式如下:minute hour day-of-month month-of-year day-of-week commands 合法值 00-59 00-23 01-31 01-12 0-6 (0 is sunday)

除了数字还有几个个特殊的符号就是”*“、”/“和”-“、”,”,*代表所有的取值范围内的数字,”/“代表每的意思,”/5”表示每5个单位,”-“代表从某个数字到某个数字,”,”分开几个离散的数字。

-l 在标准输出上显示当前的crontab。-r 删除当前的crontab文件。-e 使用VISUAL或者EDITOR环境变量所指的编辑器编辑当前的crontab文件。当结束编辑离开时,编辑后的文件将自动安装。

创建cron脚本

第一步:写cron脚本文件,命名为mysqlRollBack.cron。15,30,45,59 * * * * echo “xgmtest…..” >> xgmtest.txt 表示,每隔15分钟,执行打印一次命令 第二步:添加定时任务。执行命令 “crontab crontest.cron”。搞定 第三步:”crontab -l” 查看定时任务是否成功或者检测/var/spool/cron下是否生成对应cron脚本

注意:这操作是直接替换该用户下的crontab,而不是新增

定期执行编写的定时任务脚本(记得先给shell脚本执行权限)

1
javascript复制代码0 2 * * * /root/mysql_backup_script.sh

随后使用crontab命令定期指令编写的定时脚本

1
复制代码crontab mysqlRollback.cron

再通过命令检查定时任务是否已创建:

附 crontab 的使用示例:

  1. 每天早上6点
1
bash复制代码0 6 * * * echo "Good morning." >> /tmp/test.txt //注意单纯echo,从屏幕上看不到任何输出,因为cron把任何输出都email到root的信箱了。
  1. 每两个小时
1
bash复制代码0 */2 * * * echo "Have a break now." >> /tmp/test.txt
  1. 晚上11点到早上8点之间每两个小时和早上八点
1
bash复制代码0 23-7/2,8 * * * echo "Have a good dream" >> /tmp/test.txt
  1. 每个月的4号和每个礼拜的礼拜一到礼拜三的早上11点
1
arduino复制代码0 11 4 * 1-3 command line

5.1 月 1 日早上 4 点

1
ruby复制代码0 4 1 1 * command line SHELL=/bin/bash PATH=/sbin:/bin:/usr/sbin:/usr/bin MAILTO=root //如果出现错误,或者有数据输出,数据作为邮件发给这个帐号 HOME=/
  1. 每小时执行/etc/cron.hourly内的脚本
1
arduino复制代码01 * * * * root run-parts /etc/cron.hourly
  1. 每天执行/etc/cron.daily内的脚本
1
arduino复制代码02 4 * * * root run-parts /etc/cron.daily
  1. 每星期执行/etc/cron.weekly内的脚本
1
arduino复制代码22 4 * * 0 root run-parts /etc/cron.weekly
  1. 每月去执行/etc/cron.monthly内的脚本
1
arduino复制代码42 4 1 * * root run-parts /etc/cron.monthly

注意: “run-parts” 这个参数了,如果去掉这个参数的话,后面就可以写要运行的某个脚本名,而不是文件夹名。

  1. 每天的下午4点、5点、6点的5 min、15 min、25 min、35 min、45 min、55 min时执行命令。
1
bash复制代码5,15,25,35,45,55 16,17,18 * * * command
  1. 每周一,三,五的下午3:00系统进入维护状态,重新启动系统。
1
arduino复制代码00 15 * * 1,3,5 shutdown -r +5
  1. 每小时的10分,40分执行用户目录下的innd/bbslin这个指令:
1
bash复制代码10,40 * * * * innd/bbslink
  1. 每小时的1分执行用户目录下的bin/account这个指令:

以下是我的测试每分钟的截图效果,其对应代码如下:

1
javascript复制代码* * * * * /root/mysql_backup_script.sh

效果截图:

图片

其中的log.txt记录备份的操作详细日志:

图片

本文参考:

1
2
3
4
5
6
7
8
9
10
11
bash复制代码1.MySQLdump常用命令

www.cnblogs.com/smail-bao/p/6402265.html

2.利用Shell脚本实现对mysql数据库的备份:

www.cnblogs.com/mracale/p/7251292.html

3.Linux下的Crontab定时执行任务命令详解:

www.cnblogs.com/longjshz/p/5779215.html

图片

点击下方卡片/微信搜索,关注公众号“天宇文创意乐派”(ID:gh_cc865e4c536b)

天宇文创意乐派

天宇文创意乐派

个人订阅号 主要提供:分享最新资讯 IT教程 免费小说的平台

15篇原创内容

公众号

本文使用 文章同步助手 同步

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

❤️ SQL 零基础入门图文教程! 📚 前言 🌴 SQL 介

发表于 2021-10-11

本文已参与「掘力星计划」,赢取创作大礼包,挑战创作激励金。

📚 前言

SQL语言有40多年的历史,从它被应用至今几乎无处不在。我们消费的每一笔支付记录,收集的每一条用户信息,发出去的每一条消息,都会使用数据库或与其相关的产品来存储,而操纵数据库的语言正是 SQL !

SQL 对于现在的互联网公司生产研发等岗位几乎是一个必备技能,如果不会 SQL 的话,可能什么都做不了。你可以把 SQL 当做是一种工具,利用它可以帮助你完成你的工作,创造价值。


文章结尾有 SQL 小测验哦!看看你能得几分?

👉🏻 点我跳转到 SQL 测验!

🌴 SQL 介绍

🌼 什么是 SQL

SQL 是用于访问和处理数据库的标准的计算机语言。

  • SQL 指结构化查询语言
  • SQL 使我们有能力访问数据库
  • SQL 是一种 ANSI 的标准计算机语言


SQL 可与数据库程序协同工作,比如 MS Access、DB2、Informix、MS SQL Server、Oracle、Sybase 以及其他数据库系统。但是由于各种各样的数据库出现,导致很多不同版本的 SQL 语言,为了与 ANSI 标准相兼容,它们必须以相似的方式共同地来支持一些主要的关键词(比如 SELECT、UPDATE、DELETE、INSERT、WHERE 等等),这些就是我们要学习的SQL基础。

🌀 SQL 的类型

可以把 SQL 分为两个部分:数据操作语言 (DML) 和 数据定义语言 (DDL)。

  • 数据查询语言(DQL: Data Query Language)
  • 数据操纵语言(DML:Data Manipulation Language)

🌵 学习 SQL 的作用

SQL 是一门 ANSI 的标准计算机语言,用来访问和操作数据库系统。SQL 语句用于取回和更新数据库中的数据。

  • SQL 面向数据库执行查询
  • SQL 可从数据库取回数据
  • SQL 可在数据库中插入新的记录
  • SQL 可更新数据库中的数据
  • SQL 可从数据库删除记录
  • SQL 可创建新数据库
  • SQL 可在数据库中创建新表
  • SQL 可在数据库中创建存储过程
  • SQL 可在数据库中创建视图
  • SQL 可以设置表、存储过程和视图的权限

🍄 数据库是什么

顾名思义,你可以理解为数据库是用来存放数据的一个容器。

打个比方,每个人家里都会有冰箱,冰箱是用来干什么的?冰箱是用来存放食物的地方。

同样的,数据库是存放数据的地方。正是因为有了数据库后,我们可以直接查找数据。例如你每天使用余额宝查看自己的账户收益,就是从数据库读取数据后给你的。

最常见的数据库类型是关系型数据库管理系统(RDBMS):

RDBMS 是 SQL 的基础,同样也是所有现代数据库系统的基础,比如 MS SQL Server, IBM DB2, Oracle, MySQL 以及 Microsoft Access等等。

RDBMS 中的数据存储在被称为表(tables)的数据库对象中。表 是相关的数据项的集合,它由列和行组成。

由于本文主要讲解 SQL 基础,因此对数据库不做过多解释,只需要大概了解即可。咱们直接开始学习SQL!

🐥 SQL 基础语言学习

在了解 SQL 基础语句使用之前,我们先讲一下 表 是什么?

一个数据库通常包含一个或多个表。每个表由一个名字标识(例如“客户”或者“订单”)。表包含带有数据的记录(行)。

下面的例子是一个名为 “Persons” 的表:

Id LastName FirstName Address City
1 Adams John Oxford Street London
2 Bush George Fifth Avenue New York
3 Carter Thomas Changan Street Beijing

上面的表包含三条记录(每一条对应一个人)和五个列(Id、姓、名、地址和城市)。

有表才能查询,那么如何创建这样一个表?

🐤 CREATE TABLE – 创建表

CREATE TABLE 语句用于创建数据库中的表。

语法:

1
2
3
4
5
6
7
sql复制代码CREATE TABLE 表名称
(
列名称1 数据类型,
列名称2 数据类型,
列名称3 数据类型,
....
);

数据类型(data_type)规定了列可容纳何种数据类型。下面的表格包含了SQL中最常用的数据类型:

数据类型 描述
integer(size),int(size),smallint(size),tinyint(size) 仅容纳整数、在括号内规定数字的最大位数
decimal(size,d),numeric(size,d) 容纳带有小数的数字、”size” 规定数字的最大位数、”d” 规定小数点右侧的最大位数
char(size) 容纳固定长度的字符串(可容纳字母、数字以及特殊字符)、在括号中规定字符串的长度
varchar(size) 容纳可变长度的字符串(可容纳字母、数字以及特殊的字符)、在括号中规定字符串的最大长度
date(yyyymmdd) 容纳日期

实例:

本例演示如何创建名为 “Persons” 的表。

该表包含 5 个列,列名分别是:”Id_P”、”LastName”、”FirstName”、”Address” 以及 “City”:

1
2
3
4
5
6
7
8
sql复制代码CREATE TABLE Persons
(
Id_P int,
LastName varchar(255),
FirstName varchar(255),
Address varchar(255),
City varchar(255)
);

Id_P 列的数据类型是 int,包含整数。其余 4 列的数据类型是 varchar,最大长度为 255 个字符。


空的 “Persons” 表类似这样:

可使用 INSERT INTO 语句向空表写入数据。

🐑 INSERT – 插入数据

INSERT INTO 语句用于向表格中插入新的行。

语法:

1
sql复制代码INSERT INTO 表名称 VALUES (值1, 值2,....);

我们也可以指定所要插入数据的列:

1
sql复制代码INSERT INTO table_name (列1, 列2,...) VALUES (值1, 值2,....);

实例:

本例演示 “Persons” 表插入记录的两种方式:

1、插入新的行

1
sql复制代码INSERT INTO Persons VALUES (1, 'Gates', 'Bill', 'Xuanwumen 10', 'Beijing');

2、在指定的列中插入数据

1
sql复制代码INSERT INTO Persons (LastName, Address) VALUES ('Wilson', 'Champs-Elysees');

插入成功后,数据如下:

这个数据插入之后,是通过 SELECT 语句进行查询出来的,别急马上讲!

🐼 SELECT – 查询数据

SELECT 语句用于从表中选取数据,结果被存储在一个结果表中(称为结果集)。

语法:

1
sql复制代码SELECT * FROM 表名称;

我们也可以指定所要查询数据的列:

1
sql复制代码SELECT 列名称 FROM 表名称;

📢 注意: SQL 语句对大小写不敏感,SELECT 等效于 select。

实例:

SQL SELECT * 实例:

1
sql复制代码SELECT * FROM Persons;

📢 注意: 星号(*)是选取所有列的快捷方式。

如需获取名为 “LastName” 和 “FirstName” 的列的内容(从名为 “Persons” 的数据库表),请使用类似这样的 SELECT 语句:

1
sql复制代码SELECT LastName,FirstName FROM Persons;

🐫 DISTINCT – 去除重复值

如果一张表中有多行重复数据,如何去重显示呢?可以了解下 DISTINCT 。

语法:

1
sql复制代码SELECT DISTINCT 列名称 FROM 表名称;

实例:

如果要从 “LASTNAME” 列中选取所有的值,我们需要使用 SELECT 语句:

1
sql复制代码SELECT LASTNAME FROM Persons;

可以发现,在结果集中,Wilson 被列出了多次。

如需从 “LASTNAME” 列中仅选取唯一不同的值,我们需要使用 SELECT DISTINCT 语句:

1
sql复制代码SELECT DISTINCT LASTNAME FROM Persons;

通过上述查询,结果集中只显示了一列 Wilson,显然已经去除了重复列。

🐸 WHERE – 条件过滤

如果需要从表中选取指定的数据,可将 WHERE 子句添加到 SELECT 语句。

语法:

1
sql复制代码SELECT 列名称 FROM 表名称 WHERE 列 运算符 值;

下面的运算符可在 WHERE 子句中使用:

操作符 描述
= 等于
<> 不等于
大于
< 小于
>= 大于等于
<= 小于等于
BETWEEN 在某个范围内
LIKE 搜索某种模式

📢 注意: 在某些版本的 SQL 中,操作符 <> 可以写为 !=。

实例:

如果只希望选取居住在城市 “Beijing” 中的人,我们需要向 SELECT 语句添加 WHERE 子句:

1
sql复制代码SELECT * FROM Persons WHERE City='Beijing';

📢 注意: SQL 使用单引号来环绕文本值(大部分数据库系统也接受双引号)。如果是数值,请不要使用引号。

🐹 AND & OR – 运算符

AND 和 OR 可在 WHERE 子语句中把两个或多个条件结合起来。

  • 如果第一个条件和第二个条件都成立,则 AND 运算符显示一条记录。
  • 如果第一个条件和第二个条件中只要有一个成立,则 OR 运算符显示一条记录。

语法:

AND 运算符实例:

1
sql复制代码SELECT * FROM 表名称 WHERE 列 运算符 值 AND 列 运算符 值;

OR 运算符实例:

1
sql复制代码SELECT * FROM 表名称 WHERE 列 运算符 值 OR 列 运算符 值;

实例:

由于 Persons 表数据太少,因此增加几条记录:

1
2
3
4
5
sql复制代码INSERT INTO Persons VALUES (2, 'Adams', 'John', 'Oxford Street', 'London');
INSERT INTO Persons VALUES (3, 'Bush', 'George', 'Fifth Avenue', 'New York');
INSERT INTO Persons VALUES (4, 'Carter', 'Thomas', 'Changan Street', 'Beijing');
INSERT INTO Persons VALUES (5, 'Carter', 'William', 'Xuanwumen 10', 'Beijing');
SELECT * FROM Persons;

AND 运算符实例:

使用 AND 来显示所有姓为 “Carter” 并且名为 “Thomas” 的人:

1
sql复制代码SELECT * FROM Persons WHERE FirstName='Thomas' AND LastName='Carter';

OR 运算符实例:

使用 OR 来显示所有姓为 “Carter” 或者名为 “Thomas” 的人:

1
sql复制代码SELECT * FROM Persons WHERE firstname='Thomas' OR lastname='Carter';

结合 AND 和 OR 运算符:

我们也可以把 AND 和 OR 结合起来(使用圆括号来组成复杂的表达式):

1
sql复制代码SELECT * FROM Persons WHERE (FirstName='Thomas' OR FirstName='William') AND LastName='Carter';

🐰 ORDER BY – 排序

ORDER BY 语句用于根据指定的列对结果集进行排序,默认按照升序对记录进行排序,如果您希望按照降序对记录进行排序,可以使用 DESC 关键字。

语法:

1
sql复制代码SELECT * FROM 表名称 ORDER BY 列1,列2 DESC;

默认排序为 ASC 升序,DESC 代表降序。

实例:

以字母顺序显示 LASTNAME 名称:

1
sql复制代码SELECT * FROM Persons ORDER BY LASTNAME;

空值(NULL)默认排序在有值行之后。

以数字顺序显示ID_P,并以字母顺序显示 LASTNAME 名称:

1
sql复制代码SELECT * FROM Persons ORDER BY ID_P,LASTNAME;

以数字降序显示ID_P:

1
sql复制代码SELECT * FROM Persons ORDER BY ID_P DESC;

📢 注意: 在第一列中有相同的值时,第二列是以升序排列的。如果第一列中有些值为 null 时,情况也是这样的。

🐱 UPDATE – 更新数据

Update 语句用于修改表中的数据。

语法:

1
sql复制代码UPDATE 表名称 SET 列名称 = 新值 WHERE 列名称 = 某值;

实例:

更新某一行中的一个列:

目前 Persons 表有很多字段为 null 的数据,可以通过 UPDATE 为 LASTNAME 是 “Wilson” 的人添加FIRSTNAME:

1
sql复制代码UPDATE Persons SET FirstName = 'Fred' WHERE LastName = 'Wilson';

更新某一行中的若干列:

1
sql复制代码UPDATE Persons SET ID_P = 6,city= 'London' WHERE LastName = 'Wilson';

🐨 DELETE – 删除数据

DELETE 语句用于删除表中的行。

语法:

1
sql复制代码DELETE FROM 表名称 WHERE 列名称 = 值;

实例:

删除某行:

删除 Persons 表中 LastName 为 “Fred Wilson” 的行:

1
sql复制代码DELETE FROM Persons WHERE LastName = 'Wilson';

删除所有行:

可以在不删除表的情况下删除所有的行。这意味着表的结构、属性和索引都是完整的:

1
sql复制代码DELETE FROM table_name;

🐵 TRUNCATE TABLE – 清除表数据

如果我们仅仅需要除去表内的数据,但并不删除表本身,那么我们该如何做呢?

可以使用 TRUNCATE TABLE 命令(仅仅删除表格中的数据):

语法:

1
sql复制代码TRUNCATE TABLE 表名称;

实例:

本例演示如何删除名为 “Persons” 的表。

1
sql复制代码TRUNCATE TABLE persons;

🐯 DROP TABLE – 删除表

DROP TABLE 语句用于删除表(表的结构、属性以及索引也会被删除)。

语法:

1
sql复制代码DROP TABLE 表名称;

实例:

本例演示如何删除名为 “Persons” 的表。

1
sql复制代码drop table persons;

从上图可以看出,第一次执行删除时,成功删除了表 persons,第二次执行删除时,报错找不到表 persons,说明表已经被删除了。

🚀 SQL 高级言语学习

🚢 LIKE – 查找类似值

LIKE 操作符用于在 WHERE 子句中搜索列中的指定模式。

语法:

1
sql复制代码SELECT 列名/(*) FROM 表名称 WHERE 列名称 LIKE 值;

实例:

Persons 表插入数据:

1
2
3
4
5
6
sql复制代码INSERT INTO Persons VALUES (1, 'Gates', 'Bill', 'Xuanwumen 10', 'Beijing');
INSERT INTO Persons VALUES (2, 'Adams', 'John', 'Oxford Street', 'London');
INSERT INTO Persons VALUES (3, 'Bush', 'George', 'Fifth Avenue', 'New York');
INSERT INTO Persons VALUES (4, 'Carter', 'Thomas', 'Changan Street', 'Beijing');
INSERT INTO Persons VALUES (5, 'Carter', 'William', 'Xuanwumen 10', 'Beijing');
select * from persons;

1、现在,我们希望从上面的 “Persons” 表中选取居住在以 “N” 开头的城市里的人:

1
sql复制代码SELECT * FROM Persons WHERE City LIKE 'N%';

2、接下来,我们希望从 “Persons” 表中选取居住在以 “g” 结尾的城市里的人:

1
sql复制代码SELECT * FROM Persons WHERE City LIKE '%g';

3、接下来,我们希望从 “Persons” 表中选取居住在包含 “lon” 的城市里的人:

1
sql复制代码SELECT * FROM Persons WHERE City LIKE '%on%';

4、通过使用 NOT 关键字,我们可以从 “Persons” 表中选取居住在不包含 “lon” 的城市里的人:

1
sql复制代码SELECT * FROM Persons WHERE City NOT LIKE '%on%';

📢注意: “%” 可用于定义通配符(模式中缺少的字母)。

🚤 IN – 锁定多个值

IN 操作符允许我们在 WHERE 子句中规定多个值。

语法:

1
sql复制代码SELECT 列名/(*) FROM 表名称 WHERE 列名称 IN (值1,值2,值3);

实例:

现在,我们希望从 Persons 表中选取姓氏为 Adams 和 Carter 的人:

1
sql复制代码SELECT * FROM Persons WHERE LastName IN ('Adams','Carter');

⛵️ BETWEEN – 选取区间数据

操作符 BETWEEN … AND 会选取介于两个值之间的数据范围。这些值可以是数值、文本或者日期。

语法:

1
sql复制代码SELECT 列名/(*) FROM 表名称 WHERE 列名称 BETWEEN 值1 AND 值2;

实例:

1、查询以字母顺序显示介于 “Adams”(包括)和 “Carter”(不包括)之间的人:

1
sql复制代码SELECT * FROM Persons WHERE LastName BETWEEN 'Adams' AND 'Carter';

2、查询上述结果相反的结果,可以使用 NOT:

1
sql复制代码SELECT * FROM Persons WHERE LastName NOT BETWEEN 'Adams' AND 'Carter';

📢 注意: 不同的数据库对 BETWEEN…AND 操作符的处理方式是有差异的。

某些数据库会列出介于 “Adams” 和 “Carter” 之间的人,但不包括 “Adams” 和 “Carter” ;某些数据库会列出介于 “Adams” 和 “Carter” 之间并包括 “Adams” 和 “Carter” 的人;而另一些数据库会列出介于 “Adams” 和 “Carter” 之间的人,包括 “Adams” ,但不包括 “Carter” 。

所以,请检查你的数据库是如何处理 BETWEEN….AND 操作符的!

🚂 AS – 别名

通过使用 SQL,可以为列名称和表名称指定别名(Alias),别名使查询程序更易阅读和书写。

语法:

表别名:

1
sql复制代码SELECT 列名称/(*) FROM 表名称 AS 别名;

列别名:

1
sql复制代码SELECT 列名称 as 别名 FROM 表名称;

实例:

使用表名称别名:

1
2
3
sql复制代码SELECT p.LastName, p.FirstName
FROM Persons p
WHERE p.LastName='Adams' AND p.FirstName='John';

使用列名别名:

1
sql复制代码SELECT LastName "Family", FirstName "Name" FROM Persons;

📢 注意: 实际应用时,这个 AS 可以省略,但是列别名需要加上 " "。

🚁 JOIN – 多表关联

JOIN 用于根据两个或多个表中的列之间的关系,从这些表中查询数据。

有时为了得到完整的结果,我们需要从两个或更多的表中获取结果。我们就需要执行 join。

数据库中的表可通过键将彼此联系起来。主键(Primary Key)是一个列,在这个列中的每一行的值都是唯一的。在表中,每个主键的值都是唯一的。这样做的目的是在不重复每个表中的所有数据的情况下,把表间的数据交叉捆绑在一起。

如图,”Id_P” 列是 Persons 表中的的主键。这意味着没有两行能够拥有相同的 Id_P。即使两个人的姓名完全相同,Id_P 也可以区分他们。

❤️ 为了下面实验的继续,我们需要再创建一个表:Orders。

1
2
3
4
5
6
7
sql复制代码create table orders (id_o number,orderno number,id_p number);
insert into orders values(1,11111,1);
insert into orders values(2,22222,2);
insert into orders values(3,33333,3);
insert into orders values(4,44444,4);
insert into orders values(6,66666,6);
select * from orders;

如图,”Id_O” 列是 Orders 表中的的主键,同时,”Orders” 表中的 “Id_P” 列用于引用 “Persons” 表中的人,而无需使用他们的确切姓名。

1
sql复制代码select * from persons p,orders o where p.id_p=o.id_p;

可以看到,”Id_P” 列把上面的两个表联系了起来。

语法:

1
2
3
4
sql复制代码select 列名
from 表A
INNER|LEFT|RIGHT|FULL JOIN 表B
ON 表A主键列 = 表B外键列;

不同的 SQL JOIN:

下面列出了您可以使用的 JOIN 类型,以及它们之间的差异。

  • JOIN: 如果表中有至少一个匹配,则返回行
  • INNER JOIN: 内部连接,返回两表中匹配的行
  • LEFT JOIN: 即使右表中没有匹配,也从左表返回所有的行
  • RIGHT JOIN: 即使左表中没有匹配,也从右表返回所有的行
  • FULL JOIN: 只要其中一个表中存在匹配,就返回行

实例:

如果我们希望列出所有人的定购,可以使用下面的 SELECT 语句:

1
2
3
4
5
sql复制代码SELECT p.LastName, p.FirstName, o.OrderNo
FROM Persons p
INNER JOIN Orders o
ON p.Id_P = o.Id_P
ORDER BY p.LastName DESC;

🚜 UNION – 合并结果集

UNION 操作符用于合并两个或多个 SELECT 语句的结果集。

UNION 语法:

1
2
3
sql复制代码SELECT 列名 FROM 表A
UNION
SELECT 列名 FROM 表B;

📢注意: UNION 操作符默认为选取不同的值。如果查询结果需要显示重复的值,请使用 UNION ALL。

UNION ALL 语法:

1
2
3
sql复制代码SELECT 列名 FROM 表A
UNION ALL
SELECT 列名 FROM 表B;

另外,UNION 结果集中的列名总是等于 UNION 中第一个 SELECT 语句中的列名。

为了实验所需,创建 Person_b 表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sql复制代码CREATE TABLE Persons_b
(
Id_P int,
LastName varchar(255),
FirstName varchar(255),
Address varchar(255),
City varchar(255)
);
INSERT INTO Persons_b VALUES (1, 'Bill', 'Gates', 'Xuanwumen 10', 'Londo');
INSERT INTO Persons_b VALUES (2, 'John', 'Adams', 'Oxford Street', 'nBeijing');
INSERT INTO Persons_b VALUES (3, 'George', 'Bush', 'Fifth Avenue', 'Beijing');
INSERT INTO Persons_b VALUES (4, 'Thomas', 'Carter', 'Changan Street', 'New York');
INSERT INTO Persons_b VALUES (5, 'William', 'Carter', 'Xuanwumen 10', 'Beijing');
select * from persons_b;

实例:

使用 UNION 命令:

列出 persons 和 persons_b 中不同的人:

1
2
3
sql复制代码select * from persons
UNION
select * from persons_b;

📢注意: UNION 内部的 SELECT 语句必须拥有相同数量的列。列也必须拥有相似的数据类型。同时,每条 SELECT 语句中的列的顺序必须相同。

🚌 NOT NULL – 非空

NOT NULL 约束强制列不接受 NULL 值。

NOT NULL 约束强制字段始终包含值。这意味着,如果不向字段添加值,就无法插入新记录或者更新记录。

语法:

1
2
3
4
sql复制代码CREATE TABLE 表
(
列 int NOT NULL
);

如上,创建一个表,设置列值不能为空。

实例:

1
2
sql复制代码create table lucifer (id number not null);
insert into lucifer values (NULL);

📢 注意: 如果插入 NULL 值,则会报错 ORA-01400 提示无法插入!

⭐️ 拓展小知识:NOT NULL 也可以用于查询条件:

1
sql复制代码select * from persons where FirstName is not null;

同理,NULL 也可:

1
sql复制代码select * from persons where FirstName is null;

感兴趣的朋友,可以自己尝试一下!

🚐 VIEW – 视图

在 SQL 中,视图是基于 SQL 语句的结果集的可视化的表。

视图包含行和列,就像一个真实的表。视图中的字段就是来自一个或多个数据库中的真实的表中的字段。我们可以向视图添加 SQL 函数、WHERE 以及 JOIN 语句,我们也可以提交数据,就像这些来自于某个单一的表。

语法:

1
2
3
4
sql复制代码CREATE VIEW 视图名 AS
SELECT 列名
FROM 表名
WHERE 查询条件;

📢 注意: 视图总是显示最近的数据。每当用户查询视图时,数据库引擎通过使用 SQL 语句来重建数据。

实例:

下面,我们将 Persons 表中住在 Beijing 的人筛选出来创建视图:

1
2
sql复制代码create view persons_beijing as
select * from persons where city='Beijing';

查询上面这个视图:

如果需要更新视图中的列或者其他信息,无需删除,使用 CREATE OR REPLACE VIEW 选项:

1
2
3
4
sql复制代码CREATE OR REPLACE VIEW 视图名 AS
SELECT 列名
FROM 表名
WHERE 查询条件;

实例:

现在需要筛选出,LASTNAME 为 Gates 的记录:

1
2
sql复制代码create or replace view persons_beijing as
select * from persons where lastname='Gates';


删除视图就比较简单,跟表差不多,使用 DROP 即可:

1
sql复制代码drop view persons_beijing;

❤️ 本章要讲的高级语言就先到此为止,不宜一次性介绍太多~

🎯 SQL 常用函数学习

SQL 拥有很多可用于计数和计算的内建函数。

函数的使用语法:

1
sql复制代码SELECT function(列) FROM 表;

❤️ 下面就来看看有哪些常用的函数!

🍔 AVG – 平均值

AVG 函数返回数值列的平均值。NULL 值不包括在计算中。

语法:

1
sql复制代码SELECT AVG(列名) FROM 表名;

实例:

计算 “orderno” 字段的平均值。

1
sql复制代码select avg(orderno) from orders;

当然,也可以用在查询条件中,例如查询低于平均值的记录:

1
sql复制代码select * from orders where orderno < (select avg(orderno) from orders);

🍕 COUNT – 汇总行数

COUNT() 函数返回匹配指定条件的行数。

语法:

count() 中可以有不同的语法:

  • COUNT(*) :返回表中的记录数。
  • COUNT(DISTINCT 列名) :返回指定列的不同值的数目。
  • COUNT(列名) :返回指定列的值的数目(NULL 不计入)。
1
2
3
sql复制代码SELECT COUNT(*) FROM 表名;
SELECT COUNT(DISTINCT 列名) FROM 表名;
SELECT COUNT(列名) FROM 表名;

实例:

COUNT(*) :

1
sql复制代码select count(*) from persons;

COUNT(DISTINCT 列名) :

1
sql复制代码select count(distinct city) from persons;

COUNT(列名) :

1
sql复制代码select count(city) from persons;

🍘 MAX – 最大值

MAX 函数返回一列中的最大值。NULL 值不包括在计算中。

语法:

1
sql复制代码SELECT MAX(列名) FROM 表名;

MIN 和 MAX 也可用于文本列,以获得按字母顺序排列的最高或最低值。

实例:

1
sql复制代码select max(orderno) from orders;

🍢 MIN – 最小值

MIN 函数返回一列中的最小值。NULL 值不包括在计算中。

语法:

1
sql复制代码SELECT MIN(列名) FROM 表名;

实例:

1
sql复制代码select min(orderno) from orders;

🍰 SUM – 求和

SUM 函数返回数值列的总数(总额)。

语法:

1
sql复制代码SELECT SUM(列名) FROM 表名;

实例:

1
sql复制代码select sum(orderno) from orders;

🍪 GROUP BY – 分组

GROUP BY 语句用于结合合计函数,根据一个或多个列对结果集进行分组。

语法:

1
2
3
4
sql复制代码SELECT 列名A, 统计函数(列名B)
FROM 表名
WHERE 查询条件
GROUP BY 列名A;

实例:

获取 Persons 表中住在北京的总人数,根据 LASTNAME 分组:

1
2
3
sql复制代码select lastname,count(city) from persons 
where city='Beijing'
group by lastname;

如果不加 GROUP BY 则会报错:

也就是常见的 ORA-00937 不是单组分组函数的错误。

🍭 HAVING – 句尾连接

在 SQL 中增加 HAVING 子句原因是,WHERE 关键字无法与合计函数一起使用。

语法:

1
2
3
4
5
sql复制代码SELECT 列名A, 统计函数(列名B)
FROM table_name
WHERE 查询条件
GROUP BY 列名A
HAVING 统计函数(列名B) 查询条件;

实例:

获取 Persons 表中住在北京的总人数大于1的 LASTNAME,根据 LASTNAME 分组:

1
2
3
4
sql复制代码select lastname,count(city) from persons 
where city='Beijing'
group by lastname
having count(city) > 1;

🍷 UCASE/UPPER – 大写

UCASE/UPPER 函数把字段的值转换为大写。

语法:

1
sql复制代码select upper(列名) from 表名;

实例:

选取 “LastName” 和 “FirstName” 列的内容,然后把 “LastName” 列转换为大写:

1
sql复制代码select upper(lastname),firstname from persons;

🍶 LCASE/LOWER – 小写

LCASE/LOWER 函数把字段的值转换为小写。

语法:

1
sql复制代码select lower(列名) from 表名;

实例:

选取 “LastName” 和 “FirstName” 列的内容,然后把 “LastName” 列转换为小写:

1
sql复制代码select lower(lastname),firstname from persons;

👛 LEN/LENGTH – 获取长度

LEN/LENGTH 函数返回文本字段中值的长度。

语法:

1
sql复制代码select length(列名) from 表名;

实例:

获取 LASTNAME 的值字符长度:

1
sql复制代码select length(lastname),lastname from persons;

🍗 ROUND – 数值取舍

ROUND 函数用于把数值字段舍入为指定的小数位数。

语法:

1
sql复制代码select round(列名,精度) from 表名;

实例:

保留2位:

1
2
sql复制代码select round(1.1314,2) from dual;
select round(1.1351,2) from dual;


📢 注意:ROUND 取舍是 四舍五入 的!

取整:

1
2
sql复制代码select round(1.1351,0) from dual;
select round(1.56,0) from dual;

🍞 NOW/SYSDATE – 当前时间

NOW/SYSDATE 函数返回当前的日期和时间。

语法:

1
sql复制代码select sysdate from 表名;

实例:

获取当前时间:

1
sql复制代码select sysdate from dual;

📢 注意: 如果您在使用 Sql Server 数据库,请使用 getdate() 函数来获得当前的日期时间。

🍺 写在最后

上述如果都学完了的话,可以来做个小测验:SQL 测验,看看掌握的怎么样!

❤️ 测验会被记分:

每道题的分值是 1 分。在您完成全部的20道题之后,系统会为您的测验打分,并提供您做错的题目的正确答案。其中,绿色为正确答案,而红色为错误答案。

☞ 现在就开始测验! 祝您好运。

⭐️ 将你的得分写在下方的 评论区 中吧,让我看看大家的水平如何?⭐️

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot 集成 Nacos实现统一配置管理

发表于 2021-10-11

小知识,大挑战!本文正在参与“程序员必备小知识”创作活动。

从三个方面深入源码:

  • 服务注册
  • 服务地址的获取
  • 服务地址变化的感知

在Spring-Cloud-Common包中有一个类org.springframework.cloud.client.serviceregistry.ServiceRegistry,它是SpringCloud提供的服务注册的标准,集成到SpringCloud实现服务注册的组件,都会实现该接口。

1
2
3
4
5
6
7
java复制代码public interface ServiceRegistry<R extends Registration>{
void registry(R registration);
void deregistry(R registration);
void close();
void setStatus(R registration,String status);
<T> T getStatus(R registration);
}

而这个接口的实现类是com.alibaba.cloud.nacos.registry.NacosServiceRegistry,它什么时候触发服务注册动作?

Spring Cloud集成Nacos的实现过程

在Spring-cloud-commons包的META-INF/spring.factories中包含自动装配信息如下:

1
2
3
properties复制代码org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.cloud.client.CommonsClientAutoConfiguration=\
org.springframework.cloud.client.ReactiveCommonsClientAutoConfiguration=\

SpringBoot集成Nacos实现统一配置管理

  • 创建一个SpringBoot项目,并集成Nacos配置中心
  • 添加Nacos Config maven依赖
1
2
3
4
5
xml复制代码<dependency>
<groupId>com.alibaba.boot</groupId>
<artifactId>nacos-config-spring-boot-starter</artifactId>
<version>0.2.4</version>
</dependency>

在application.yml中添加Nacos Server的地址

1
2
3
yml复制代码 nacos:
config:
server-addr: 127.0.0.1:8848

创建NacosConfigController类,用于从Nacos Server动态读取配置

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@NacosPropertySource(dataId = "server1",autoRefreshed = true)
@RestController
public class NacosConfigController {

@NacosValue(value = "${info:local}",autoRefreshed = true)
private String info;

@GetMapping("config")
public String getConfig(){
return info;
}
}

Nacos注解说明:

  • @NacosPropertySource,用于加载dataId为server1的配置源,autoRefreshed标识开启自动更新
  • @NacosValue:设置属性的值,其中info表示key,而Local代表默认值,如果key不存在,则使用默认值,是一种高可用的策略,在实际应用中,需要考虑在配置中心不可用下如何保证服务的可用性。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…499500501…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%