开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

阿里云 EventBridge 事件驱动架构实践 消息与事件

发表于 2021-11-26

简介:我们认为 EventBridge 是云原生时代新的计算驱动力,这些数据可以驱动云的计算能力,创造更多业务价值。

作者:周新宇

本文内容整理自 中国开源年会 演讲

首先做一个自我介绍,我是 RocketMQ 的 PMC member 周新宇,目前负责阿里云 RocketMQ 以及 EventBridge 的产品研发。今天我的分享主要包括以下几部分:

  • 消息与事件、微服务与事件驱动架构
  • 阿里云 EventBridge:事件驱动架构实践
  • 基于 RocketMQ 内核构建阿里云统一的事件枢纽
  • 云原生时代的新趋势:Serverless+ 事件驱动
  • 事件驱动架构的未来展望

消息与事件、微服务与事件驱动架构

首先,我们先讲一下消息跟事件的区别:大家都知道 RocketMQ 里面的消息,它是非常泛化的概念,是一个比事件更加抽象的概念。因为消息的内容体就是 Byte 数组,没有任何一个定义,是个弱 Data,所以它是非常通用的抽象。

与之相反的,事件可能是更加具象化的。一般情况下,它有一个 Schema 来精准描述事件有哪些字段,比如 CloudEvents 就对事件有一个明确的 Schema 定义。事件也往往代表了某个事情的发生、某个状态的变化,所以非常具象化。

从用途来讲,消息往往用于微服务的异步解耦的架构。但这一块的话,事件驱动跟消息是稍微类似的。消息的应用场景往往发生在一个组织内部,消息的生产方知道这个消息要将被如何处理。比如说在一个团队里,消息的生产者跟发送者可能是同一个团队同一块业务,对这个消息内容有一个非常强的约定。相比之下,事件更加松耦合,比如说事件发送方也不知道这个事件将被投递到什么地方,将被谁消费,谁对他感兴趣,对事件被如何处理是没有任何预期的。所以说,基于事件的架构是更加解耦的。消息的应用往往还是脱离不了同一个业务部门,即使一些大公司里最多涉及到跨部门合作。消息的使用通过文档进行约束,事件通过 Schema 进行约束,所以我们认为事件是比消息更加彻底解耦的方式。

接下来,微服务架构跟 EDA 架构有什么区别?

首先是微服务架构,微服务作为从单体应用演进而来的架构,比如说把一个单体应用拆成了很多微服务,微服务之间通过 RPC 进行组织和串联。过去一个业务可能是在本地编排了一堆 function,现在通过一堆 RPC 将之串起来。比如说用户去做一个前端的下单操作,可能后台就是好几个微服务进行订单操作,一个微服务去新建订单,一个微服务去对订单进行处理,处理完再调另一个微服务去把订单已完成的消息通知出去,这是一个典型的 RPC 架构。

但纯粹的 RPC 架构有很多问题,比如所有业务逻辑是耦合在一起的,只是把本地方法调用换成了远程调用。当业务增速达到一定阶段,会发现各个微服务之间的容量可能是不对等的,比如说短信通知可以通过异步化完成,却同步完成。这就导致前端有多大流量,短信通知也需要准备同样规模的流量。当准备资源不充足,上下游流量不对等时,就有可能导致某个微服被打挂,从而影响到上游,进而产生雪崩效应。

在这种情况下,大家一般就会引入消息队列进行异步解耦。这个架构已非常接近于事件驱动架构了,还是以用户前端创建一个订单举例,订单创建的事件就会就发到事件总线、event broker、 event bus 上,下游各个不同订阅方去对这个事件做监听处理。

不同之处在于消息订阅者基于消息中间件厂商提供 SDK 的去做消息处理,业务往往需要进行改造,也会被厂商提供的技术栈绑定;事件驱动架构中订阅者属于泛化订阅,即不要求订阅方基于什么样的技术栈去开发,可以是一个 HTTP 网关,也可以是一个function,甚至可以是历史遗留的存量系统。只要 event broker 兼容业务的协议,就可以把事件推送到不同订阅方。可以看到,泛化订阅的用途更加广泛,更加解耦,改造成本也最低。

阿里云 EventBridge:事件驱动架构实践

Gartner 曾预测, EDA 架构将来会成为微服务主流。在 2022 年它将会成为 60% 的新型数字化商业解决方案,也会有 50% 的商业组织参与其中。

同时, CNCF 基金会也提出了 CloudEvents 规范,旨在利用统一的规范格式来声明事件通信。EventBridge也是遵循这一标准。CloudEvents作为社区标准,解除了大家对于厂商锁定的担忧,提高了各个系统之间的互操作性,相当于说对各个系统约定了统一的语言,这个是非常关键的一步。

事件在开源社区有了统一的规范,但在云上,很多用户购买了云厂商很多云产品,这些云产品每天可能有数以亿计的事件在不停产生,这些事件躺在不同云服务的日志、内部实现里。用户也看不着,也不知道云产品实例在云上发生什么事情。各个厂商对事件的定义也不一样,整体是没有同一类标准。各个云服务之间的事件是孤立的,就是说没有打通,这不利于挖掘事件的价值。在使用开源产品时也有类似问题,用户往往也没有统一标准进行数据互通,想去把这些生态打通时需要付出二次开发成本。

最后,事件驱动在很多场景应用的现状是偏离线的,现在比较少的人把 EDA 架构用于在线场景。一方面是因为没有事件型中间件基础设施,很难做到一个事件被实时获取,被实时推送的同时,能被业务方把整个链路给追踪起来。所以,以上也是阿里云为什么要做这款产品的背景。

因此,我们对 EventBridge 做了定义,它有几个核心价值:

**一、统一事件枢纽:**统一事件界面,定义事件标准,打破云产品事件孤岛。

**二、事件驱动引擎:**海量事件源,毫秒级触发能力,加速 EDA/Serverless 架构升级。

**三、开放与集成:**提供丰富的跨产品、跨平台连接能力,促进云产品、应用程序、SaaS服务相互集成。

首先讲一下,EventBridge 基本模型,EventBridge 有四大部分。第一部分是事件源,这其中包括云服务的事件、自定义应用、SaaS应用、自建数据平台。

第二个部分就是事件总线,这是存储实体,事件过来,它要存在某个地方进行异步解耦。类似于说 RocketMQ 里面 topic 的概念,具备一定存储的同时,提供了异步能力。事件总线涵盖两种,一种默认事件总线,用于收集所有云产品的事件,另一种自定义事件总线就是用户自己去管理、去定义、去收发事件,用来实践 EDA 架构概念。第三部分就是规则,规则与 RocketMQ 的消费者、订阅比较类似,但我们赋予规则包括过滤跟转换在内的更多计算能力。第四部分就是事件目标即订阅方,对某事件感兴趣就创建规则关联这个事件,这其中包括函数计算、消息服务、HTTP 网关等等。

这里具体讲一下这个事件规则,虽然类似于订阅,但事件规则拥有事件轻量级处理能力。比如在使用消息时可能需要把这个消息拿到本地,再决定是否消费掉。但基于规则,可以在服务端就把这个消息处理掉。

事件规则支持非常复杂的事件模式过滤,包括对指定值的匹配,比如前缀匹配、后缀匹配、数值匹配、数组匹配,甚至把这些规则组合起来形成复杂的逻辑匹配能力。

另一个,就是转换器能力,事件目标泛化定义,其接受的事件格式可能有很多种,但下游服务不一定。比如说你要把事件推到钉钉,钉钉 API 已经写好了并只接受固定格式。那么,把事件推过去,就需要对事件进行转换。我们提供了包括:

  • **完整事件:**不做转换,直接投递原生 CloudEvents。
  • **部分事件:**通过 JsonPath 语法从 CloudEvents 中提取部分内容投递至事件目标。
  • **常量:**事件只起到触发器的作用,投递内容为常量。
  • **模板转换器:**通过定义模板,灵活地渲染自定义的内容投递至事件模板。
  • **函数:**通过指定处理函数,对事件进行自定义函数处理,将返回值投递至事件目标。

目前,EventBridge 集成了 80 多种云产品,约 800 多种事件类型,第一时间打通了消息生态,比如说 RocketMQ 作为一个微服务生态,我们去实践消息事件理念,就可以把 RocketMQ 的事件直接投递到 EventBridge,通过事件驱动架构去对这些消息进行处理,甚至 MQTT、KafKa 等消息生态,都进行打通集成。除了阿里云消息产品的打通,下一步也会把一些开源自建的消息系统进行打通。另一个生态就是 ISV 生态,为什么 ISV 需要 EventBridge?以钉钉 6.0 举例,其最近发布了连接器能力。钉钉里面要安装很多软件,这些软件可能是官方提供,也可能是 ISV、第三方开发者提供,这就造成数据的互通性差。因此,我们提供这个能力让 ISV 的数据流通起来。最后就是事件驱动生态,我们当前能够触达到大概 10 多种事件目标,目前也在持续丰富当中。

事件因相对消息更加解耦、离散,所以事件治理也更加困难。所以,我们制作了事件中心并提供三块能力:

  • **事件追踪:**对每一个事件能有完整的追踪,它从在哪里产生,什么时候被投递,什么时候被过滤掉了,什么时候被投递到某个目标,什么时候被处理成功了。使整个生命周期完全追踪起来。
  • **事件洞察&分析:**让用户从 EDA 编程视角变成用户视角,让用户更加迅速的了解 EventBridge 里面到底有哪些事件,并进行可视化分析。通过 EB 做到就近计算分析,直接把业务消息导入到事件总线中,对消息进行及时分析。
  • **事件大盘:**针对云产品,引导云产品对业务事件进行定义,让云产品更加开放,从而提供大盘能力。

基于 RocketMQ 内核构建阿里云统一的事件枢纽

EventBridge 一开始就构建在云原生的容器服务之上。在这之上首先是 RocketMQ 内核,内核在这个产品里扮演的角色有两种,一种就是事件存储,当成存储来用;另一方面是利用订阅能力,把订阅转化成泛化订阅。在 RocketMQ 内核之上就是 connect 集群。EventBridge 比较重要的能力是连接,所以 EventBridge 首先要具备 Source 的能力,把事件 Source 过来,然后再存下来;其核心是 Connect 集群,每个 Connect 集群有很多 Worker。每个 Worker 要负责很多事情,包括事件的摄入,事件过滤,事件转换,事件回放,事件追踪等,同时在 Connect 集群之上有 Connect 控制面,来完成集群的治理,Worker 的调度等。

在更上面一层是 API Server,一个事件的入口网关,EventBridge 的世界里,摄入事件有两种方式,一种是通过 Connect 的 Source Connector,把事件主动的 Source 过来,另一种用户或者云产品可以通过 API server,通过我们的 SDK 把事件给投递过来。投递的方式有很多种,包括有 OpenAPI,有多语言的官方 SDK,同时考虑 CloudEvents 有社区的标准,EventBridge 也完全兼容社区开源的 SDK,用户也可以通过 Webhook 将事件投递过来。

这个架构优点非常明显:

(1)减少用户开发成本

  • 用户无需额外开发进行事件处理
  • 编写规则对事件过滤、转换

(2)原生 CloudEvents 支持

  • 拥抱 CNCF 社区,无缝对接社区 SDK
  • 标准协议统一阿里云事件规范

(3)事件 Schema 支持

  • 支持事件 Schema 自动探测和校验
  • Source 和 Target 的 Schema 绑定

(4)全球事件任意互通

  • 组建了跨地域、跨账户的事件网络
  • 支持跨云、跨数据中心事件路由

云原生时代的新趋势:Serverless+ 事件驱动

我们认为 Serverless 加事件驱动是新的研发方式,各个厂商对 Serverless 理解各有侧重,但是落地方式大道趋同。

首先,Serverless 基础设施把底层 IaaS 屏蔽掉,上层 Serverless 运行时即计算托管,托管的不仅仅是微服务应用、K8s 容器,不仅仅是函数。

EventBridge 首先把这种驱动的事件源连接起来,能够触发这些运行时。因为 Serverless 最需要的就是驱动方,事件驱动带给他这样的能力,即计算入口。EventBridge 驱动 Serverless 运行时,再去连接与后端服务。目前,EventBridge 与 Serverless 结合的场景主要是松耦合场景,比如前端应用、SaaS 服务商小程序,以及音视频编解码等落地场景。

那么,Serverless 的 EDA 架构开发模式到底是怎样的呢?以函数计算为例,首先开发者从应用视角需要转换为函数视角,将各个业务逻辑在一个个函数中进行实现;一个函数代表了一个代码片段,代表了一个具体的业务,当这段代码上传后就变成了一个函数资源,然后 EventBridge 可以通过事件来驱动函数,将函数通过事件编排起来组成一个具体的应用。

这里面 function 还需要做很多事情,大家也知道 function 有很多弊端,它最受诟病的就是冷启动。因为 Serverless 需要 scale to zero 按量付费,在没有请求没有事件去触发时,应该是直接收到 0 的,从 0~1 就是一个冷启动。这个冷启动有些时候可能要秒级等待,因为它可能涉及到下载代码、下载镜像,涉及到 namespace 的构建,存储挂载,root 挂载,这里面很多事情,各个云厂商投入很大精力优化这一块。Serverless 价格优势很明显,它资源利用率特别高,因按量付费的,所以能做到接近百分百的资源利用率,也不需要去做容量规划。

举一个简单的例子,就是基于 Serverless 加 EDA 的极简编程范式,再举一个具体的例子,新零售场景下 EDA 架构对这个业务进行改造。首先来讲,业务中有几个关键资源,可能有 API 网关、函数计算,首先可以去打通一些数据,打通 rds 并把 rds 数据同步过来,兼容一些历史架构,同时去触发计算资源、function、网关。整个架构优势非常明显,所以具备极致弹性能力,不需要去预留资源。

事件驱动的未来展望

我们认为事件驱动的未来有两部分,一是要做好连接,做好云内、跨云的集成,让用户的多元架构更加高效。二是开源生态的集成,我们可以看到开源生态愈发蓬勃,所以也需要把这些开源生态中的数据集成好。此外,还有传统 IDC 计算能力、边缘计算能力这些生态都需要有连接性软件把它连接起来。

EventBridge 是云原生时代新的计算驱动力,这些数据可以去驱动云的计算能力,创造更多业务价值。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Go(五)Go不知道怎么用Gorm?

发表于 2021-11-26

这是我参与11月更文挑战的第10天,活动详情查看:2021最后一次更文挑战

作者:lomtom

个人网站:lomtom.top,

个人公众号:博思奥园

你的支持就是我最大的动力。

前言

所有的后端应用都离不开数据库的操作,在Go中也有一些好用的数据库操作组件,例如Gorm就是一个很不错的选择。

这里是Gorm自己例举的优点:

  • 全功能 ORM
  • 关联 (Has One,Has Many,Belongs To,Many To Many,多态,单表继承)
  • Create,Save,Update,Delete,Find 中钩子方法
  • 支持 Preload、Joins 的预加载
  • 事务,嵌套事务,Save Point,Rollback To Saved Point
  • Context、预编译模式、DryRun 模式
  • 批量插入,FindInBatches,Find/Create with Map,使用 SQL 表达式、Context Valuer 进行 CRUD
  • SQL 构建器,Upsert,数据库锁,Optimizer/Index/Comment Hint,命名参数,子查询
  • 复合主键,索引,约束
  • Auto Migration
  • 自定义 Logger
  • 灵活的可扩展插件 API:Database Resolver(多数据库,读写分离)、Prometheus…
  • 每个特性都经过了测试的重重考验
  • 开发者友好

当然,你可能用不到gorm这么多特性,但是也不阻碍Gorm是Go中一个非常优秀的ORM框架。

本文也不探究Gorm和其他框架的优劣比较,而是从使用者出发,一起来探讨Gorm在实际开发中的使用。

当然Gorm本身的官方文档已经非常详细了,如果对本文中的部分Gorm使用有稍许疑惑的话,请移步官方文档:gorm.io/zh_CN/docs/…

安装

在控制台执行go get命令进行安装依赖,驱动根据自己的实际使用进行安装,这里以MySQL为例。

Gorm 官方支持的数据库类型有: MySQL, PostgreSQL, SQlite, SQL Server

1
2
go复制代码go get -u gorm.io/gorm
go get -u gorm.io/driver/mysql

在使用时引入依赖即可

1
2
3
4
go复制代码import (
"gorm.io/driver/mysql"
"gorm.io/gorm"
)

建立连接

使用Gorm建立数据库的连接其实很简单,但是要做到好用,那就需要花点心思,在这里,将带领大家怎么从最简单的连接到好用的连接设置。

最基本的连接

1
2
3
4
5
6
7
8
9
go复制代码func GetDb() *gorm.DB {
// 参考 https://github.com/go-sql-driver/mysql#dsn-data-source-name 获取详情
dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{})
if err!= nil {
return nil
}
return db
}

注意:

  1. 想要正确的处理 time.Time ,您需要带上 parseTime 参数,
  2. 使用charset指定编码,要支持完整的 UTF-8 编码,您需要将 charset=utf8 更改为 charset=utf8mb4

更多参数设置:github.com/go-sql-driv…

设置连接池

Gorm同样支持连接池,Gorm使用 database/sql 维护连接池

分别使用SetMaxIdleConns,SetMaxOpenConns,SetConnMaxLifetime来设置最大空闲连接数、最大连接数和设置连接空闲超时参数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
go复制代码func GetDb() *gorm.DB {
// 参考 https://github.com/go-sql-driver/mysql#dsn-data-source-name 获取详情
dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
db, err := gorm.Open(mysql.Open(dsn), &gorm.Config{

})
if err != nil {
return nil
}
sqlDB, err := db.DB()
if err != nil {
log.Printf("database setup error %v", err)
}
sqlDB.SetMaxIdleConns(10) //最大空闲连接数
sqlDB.SetMaxOpenConns(100) //最大连接数
sqlDB.SetConnMaxLifetime(time.Hour) //设置连接空闲超时
return db
}

全局连接

为了方便使用,我们可以在一开始就使用一个全局变量来保存数据库的连接,在使用时直接调用即可,而不需要再次进行数据库的初始化。

1
2
3
4
5
6
go复制代码var db *gorm.DB

// GetDb 获取连接
func GetDb() *gorm.DB {
return db
}

将之前的函数改为给db进行初始化并赋值,在使用的时候直接调用GetDb函数即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
go复制代码func DbInit(){
// 参考 https://github.com/go-sql-driver/mysql#dsn-data-source-name 获取详情
dsn := "user:pass@tcp(127.0.0.1:3306)/dbname?charset=utf8mb4&parseTime=True&loc=Local"
tempDb, err := gorm.Open(mysql.Open(dsn), &gorm.Config{

})
if err != nil {
return nil
}
sqlDB, err := tempDb.DB()
if err != nil {
log.Printf("database setup error %v", err)
}
sqlDB.SetMaxIdleConns(10) //最大空闲连接数
sqlDB.SetMaxOpenConns(100) //最大连接数
sqlDB.SetConnMaxLifetime(time.Hour) //设置连接空闲超时
db = tempDb
}

利用配置文件

到这里,你其实发现已经能够很好的使用Gorm去建立数据库连接了,但是有没有什么办法像Spring Boot一样从配置文件中获取连接参数呢,恰好第三章中讲到了怎么使用读取配置文件的方法,那何不利用起来呢?

戳 -> Go(三)Go配置文件

在配置文件中定义数据库连接参数

1
2
3
4
5
6
7
8
9
10
bash复制代码database:
type: mysql
host: localhost
port: 3306
username: root
password: 123456
dbname: test
max_idle_conn: 10
max_open_conn: 30
conn_max_lifetime: 300

定义相应的结构体

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bash复制代码var Database *database

type conf struct {
DB database `yaml:"database"`
}

type database struct {
Type string `yaml:"type"`
Host string `yaml:"host"`
Port string `yaml:"port"`
UserName string `yaml:"username"`
Password string `yaml:"password"`
DbName string `yaml:"dbname"`
MaxIdleConn int `yaml:"max_idle_conn"`
MaxOpenConn int `yaml:"max_open_conn"`
ConnMaxLifetime int `yaml:"conn_max_lifetime"`
}

具体怎么绑定参数,请戳 -> Go(三)Go配置文件

为了更直观的感受,将URI抽取出来

1
2
3
4
5
6
7
8
9
go复制代码//获取链接URI
func mySQLUri() string {
return fmt.Sprintf("%s:%s@tcp(%s:%s)/%s?charset=utf8&parseTime=true",
Database.UserName,
Database.Password,
Database.Host,
Database.Port,
Database.DbName)
}

那么最终呈现的就是这样。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
go复制代码var db *gorm.DB

// GetDb 获取连接
func GetDb() *gorm.DB {
return db
}

// DbInit 数据库连接池初始化
func DbInit() {
fmt.Println(mySQLUri())
conn, err1 := gorm.Open(mysql.Open(mySQLUri()), &gorm.Config{})
if err1 != nil {
log.Printf("connect get failed.")
return
}
sqlDB, err := conn.DB()
if err != nil {
log.Printf("database setup error %v", err)
}
sqlDB.SetMaxIdleConns(Database.MaxIdleConn) //最大空闲连接数
sqlDB.SetMaxOpenConns(Database.MaxOpenConn) //最大连接数
sqlDB.SetConnMaxLifetime(time.Duration(Database.ConnMaxLifetime) * time.Second) //设置连接空闲超时
db = conn
}

如果想要在项目启动时自动初始化,将DbInit方法名改为init即可,否则,需要在main方法中自行调用执行初始化。

为了更好的开发,我们可以自定义Gorm的日志

1
2
3
4
5
6
7
8
9
10
go复制代码//初始化数据库日志
newLogger := logger.New(
log.New(os.Stdout, "\r\n", log.LstdFlags), // io writer
logger.Config{
SlowThreshold: time.Second, // Slow SQL threshold
LogLevel: logger.Info, // Log level
IgnoreRecordNotFoundError: true, // Ignore ErrRecordNotFound error for logger
Colorful: true, // Disable color
},
)

将其作为参数放置在Gorm参数上gorm.Open(mysql.Open(mySQLUri()), &gorm.Config{})

1
2
3
go复制代码conn, err1 := gorm.Open(mysql.Open(mySQLUri()), &gorm.Config{
Logger: newLogger,
})

使用

Gorm的CURD相对来说叶比较简单。

定义一个结构体User,除开记录的字段,有编号、姓名、密码三个字段

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码type User struct {
Id int64 `gorm:"primaryKey;column:id;"`
Username string `gorm:"column:user_name;type:varchar(255);default:(-)" `
Password string `gorm:"column:password;type:varchar(255);default:(-)"`
Deleted gorm.DeletedAt `gorm:"column:deleted;type:timestamp;default:(-)"`
CreateTime time.Time `gorm:"column:create_time;type:timestamp;default:(-)"`
UpdateTime time.Time `gorm:"column:update_time;type:timestamp;default:(-)"`
}

// TableName 自定义表名
func (*User) TableName() string {
return "users"
}

说明:

  1. 使用primaryKey指定主键
  2. 使用column:id指定在数据库中的列名
  3. 使用gorm.DeletedAt标明该字段为删除标志,如果使用了gorm.DeletedAt,数据库列类型必须为时间格式。
  4. 使用type:varchar(255)标明字段类型
  5. 使用default:(-)设置默认值,-表示为无默认值。
  6. 使用User.TableName表名数据库名,当使用Model绑定结构体时,Gorm会默认调用该方法,除此之外,还可以使用db.Table("user")显式的标明表名。

查询

  1. 获取第一个,默认查询第一个
1
2
3
4
5
6
go复制代码// GetFirst SELECT * FROM users ORDER BY id LIMIT 1;
func GetFirst() (user *User) {
db := config.GetDb()
db.Model(&user).First(&user)
return
}
  1. 获取最后一个
1
2
3
4
5
6
go复制代码// GetLast SELECT * FROM users ORDER BY id DESC LIMIT 1;
func GetLast() (user *User) {
db := config.GetDb()
db.Model(&user).Last(&user)
return
}
  1. 通过主键获取
1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码// GetById SELECT * FROM users WHERE id = 1;
func GetById(id int64) (user *User) {
db := config.GetDb()
db.Model(&user).Find(&user,id)
return
}

等同于

func GetById(id int64) (user *User) {
db := config.GetDb()
db.Model(&user).Where("id = ?",id).Find(&user)
return
}
  1. 通过主键批量查询
1
2
3
4
5
6
7
8
9
10
11
12
go复制代码// GetByIds SELECT * FROM users WHERE id IN (1,2,3);
func GetByIds(ids []int64) (user []*User) {
db := config.GetDb()
db.Model(&user).Find(&user,ids)
return
}
等同于
func GetByIds(s []int64) (user []*User) {
db := config.GetDb()
db.Model(&user).Where("id in ?",ids).Find(&user)
return
}
  1. 获取部分参数,例如只获取名字和密码
1
2
3
4
5
6
go复制代码// GetSomeParam SELECT username,password FROM users WHERE id = 1;
func GetSomeParam(id int64) (user *User) {
db := config.GetDb()
db.Model(&user).Select("username", "password").Find(&user,id)
return
}
  1. 分页查询,可以使用Limit & Offset进行分页查询
1
2
3
4
5
6
go复制代码// GetPage SELECT * FROM users OFFSET 5 LIMIT 10;
func GetPage(limit int,offset int) (user []*User) {
db := config.GetDb()
db.Model(&user).Limit(limit).Offset(offset).Find(&user)
return
}
  1. order
1
2
3
4
5
6
7
8
9
10
11
12
go复制代码// GetByOrder SELECT * FROM users ORDER BY id desc, username;
func GetByOrder() (user []*User) {
db := config.GetDb()
db.Model(&user).Order("id desc,username").Find(&user)
return
}
等同于
func GetByOrder() (user []*User) {
db := config.GetDb()
db.Model(&user).Order("id desc").Order("username").Find(&user)
return
}

更多请移步:gorm.io/zh_CN/docs/…

新增

  1. 创建单个(Create)
1
2
3
4
5
go复制代码func Create(user *User)  {
db := config.GetDb()
db.Model(&user).Create(&user)
return
}
  1. 保存单个(Save)
1
2
3
4
5
go复制代码func Save(user *User)  {
db := config.GetDb()
db.Model(&user).Save(&user)
return
}

Create和Save的区别:Save需要插入的数据存在则不进行插入,Create无论什么情况都执行插入

  1. 创建多个
1
2
3
4
5
go复制代码func CreateBatch(user []*User)  {
db := config.GetDb()
db.Model(&user).Create(&user)
return
}

更多请移步:gorm.io/zh_CN/docs/…

修改

  1. 更新单个字段
1
2
3
4
5
6
go复制代码// UpdateUsername UPDATE users SET username = "lomtom" where id = 1
func UpdateUsername(id int64,username string) {
db := config.GetDb()
db.Model(&User{}).Where("id = ?",id).Update("username",username)
return
}
  1. 全量/多列更新(根据结构体)
1
2
3
4
5
6
go复制代码// UpdateByUser UPDATE `user` SET `id`=14,`user_name`='lomtom',`password`='123456',`create_time`='2021-09-26 14:22:21.271',`update_time`='2021-09-26 14:22:21.271' WHERE id = 14 AND `user`.`deleted` IS NULL
func UpdateByUser(user *User) {
db := config.GetDb()
db.Model(&User{}).Where("id = ?",user.Id).Updates(&user)
return
}

更多请移步:gorm.io/zh_CN/docs/…

删除

  1. 简单删除(根据user里的id进行删除)
1
2
3
4
5
6
7
go复制代码// DeleteByUser DELETE from users where id = 28;
// DeleteByUser UPDATE `user` SET `deleted`='2021-09-26 14:25:33.368' WHERE `user`.`id` = 28 AND `user`.`deleted` IS NULL
func DeleteByUser(user *User) {
db := config.GetDb()
db.Model(&User{}).Delete(&user)
return
}

说明: 结构体未加gorm.DeletedAt标记的字段,直接删除,加了将更新deleted字段,即实现软删除

  1. 根据id进行删除
1
2
3
4
5
6
go复制代码// DeleteById UPDATE `user` SET `deleted`='2021-09-26 14:29:55.15' WHERE `user`.`id` = 28 AND `user`.`deleted` IS NULL
func DeleteById(id int64) {
db := config.GetDb()
db.Model(&User{}).Delete(&User{},id)
return
}

事务

同样,Gorm也有丰富的事务支持。

匿名事务

可使用db.Transaction匿名方法来表明多个操作在一个事务里面,返回err将回滚,返回nil将提交事务

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
go复制代码func Transaction() error {
db := config.GetDb()
err := db.Transaction(func(tx *gorm.DB) error {
// 在事务中执行一些 db 操作(从这里开始,您应该使用 'tx' 而不是 'db')
if err := tx.Create(&User{Username: "lomtom"}).Error; err != nil {
// 返回任何错误都会回滚事务
return err
}
if err := tx.Delete(&User{}, 28).Error; err != nil {
return err
}
// 返回 nil 提交事务
return nil
})
if err != nil {
return err
}
return nil
}

手动事务

以db.Begin()表明一个事务的开始,出现错误使用tx.Rollback(),事务提交使用tx.Commit()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
go复制代码func Transaction1() error {
db := config.GetDb()
tx := db.Begin()
defer func() {
if r := recover(); r != nil {
tx.Rollback()
}
}()
// 在事务中执行一些 db 操作(从这里开始,您应该使用 'tx' 而不是 'db')
if err := tx.Create(&User{Username: "lomtom"}).Error; err != nil {
// 回滚事务
tx.Rollback()
return err
}
if err := tx.Delete(&User{}, 28).Error; err != nil {
tx.Rollback()
return err
}
// 提交事务
return tx.Commit().Error
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

HTTP客户端工具该选哪个?进来看

发表于 2021-11-26

这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战

前言

HTTP(超文本传输协议)是一种应用层协议,用于客户端和服务端进行通信,按照标准格式如JSON、XML等进行网络数据的传输,通常也作为应用程序之间以RESTAPI形式进行通信的常用协议。

在Java应用中需要调用其他应用提供的HTTP服务API时,通常需要使用一些HTTP客户端组件。

而可选择的HTTP客户端有很多,本期内容主要介绍在Java应用程序中可以使用的HTTP客户端工具。

概述

本文主要介绍的HTTP客户端包括:

  • Java 11+版本中提供的HttpClient
  • Apache HttpComponents项目中的HttpClient
  • OkHttpClient
  • Spring Boot中的WebClient

为了更好的进行对比,我们在示例中将分别使用不同的客户端完成异步GET请求和同步POST请求。

GET请求

对于Get请求,我们通过请求以下接口查询北京未来3天的天气预报。

请求地址为http://api.weatherdt.com/common/?area=101010100&type=forecast&key=645cc4f76d011bbd8717b1607d6cb9d7。

POST请求

对于POST请求,我们通过访问https://getman.cn/echoAPI测试接口,该接口接收一个JSON参数。

对于所有的HTTP客户端工具,发送一个请求的过程基本一致,包含如下步骤:

  • 创建一个HTTP Client实例
  • 创建用于发送请求的Request对象
  • 设置使用同步或异步方式并调用
  • 处理HTTP响应数据

接下来使用不同的HTTP客户端来完成功能。

JDK原生HttpClient

原生HttpClient是在Java 9中作为孵化模块引入的,然后在Java11中作为JEP 321的一部分正式可用,HTTPClient取代了JDK更早期的HttpUrlConnection类。

HttpClient支持以下功能:

  • 支持HTTP1.1、HTTP2.0协议
  • 支持同步和异步编程模型
  • 支持请求和响应的流式处理
  • 支持Cookie

异步GET请求

使用HttpClient进行异步GET请求的代码如下所示:

在上面代码中使用构建器模式创建了HttpClient和HttpRequest的实例,然后对REST API进行异步调用。

在创建请求时,我们通过调用get()方法将HTTP方法设置为GET,并在设置10秒的超时时间。

同步POST请求

对于POST请求,可以在构建器上调用POST(BodyPublisher Body)方法,可以使用HttpRequest.BodyPublishers.ofString(String requestBody)将JSON字符串转换为BodyPublisher作为需要发送的数据参数。

在以上代码中,通过prepareRequest()方法模拟出作为请求数据的JSON字符串,同样使用构建器模式,通过POST()将请求方式设置为POST,对于返回结果,可以通过HttpResponse.BodyHandlers.ofString()转换为字符串。

如果你的项目中使用的JDK版本11+,则原生的HTTP Client可以作为首选。

Apache HttpComponents

HttpComponents是Apache软件基金会的一个开源项目,该项目中包含了可用于HTTP协议的Java工具集。

该项目下的组件分为以下两部分:

HttpCore:一组低级HTTP传输组件,可用于构建自定义客户端和服务器端HTTP服务;

HttpClient:基于HttpCore的符合HTTP协议的HTTP代理实现。

同时,它还为提供了客户端身份验证、HTTP状态管理和HTTP连接管理等组件。

首先,使用该组件需要添加Maven依赖:

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.apache.httpcomponents.client5</groupId>
<artifactId>httpclient5</artifactId>
<version>5.1.1</version>
</dependency>

异步GET请求

使用Apache HttpClient进行异步REST API调用的常见方法如下所示:

主要步骤如下:

  • 首先创建CloseableHttpAsyncClient作为HTTP客户端;
  • 然后调用start()方法启动客户端;
  • 使用SimpleHttpRequest创建请求;
  • 并通过调用Execute()方法发送请求,并设置FutureCallback对象来异步处理不同响应结果。

同步POST请求

使用Apache HttpClient发送同步POST请求代码如下:

发送同步POST请求的步骤如下:

  • 首先通过prepareRequest()方法创建出一个JSON字符串,作为请求数据;
  • 创建HttpPost对象作为POST请求实例,并将请求数据和请求头数据设置到HttpPost对象中;
  • 通过HttpClients.createDefault()创建HTTP客户端实例;
  • 调用execute()方法发送请求;
  • 从返回值CloseableHttpResponse中获得响应数据。

当使用的JDK版本为11以下的版本,或者需要将功能作为插件提供给别的系统使用时,Apache HttpClient是一个比较好的选择。

OkHttpClient

OkHttpClient也是一个开源库,由美国的Square公司提供。

同样需要添加对应的Maven依赖:

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.squareup.okhttp3</groupId>
<artifactId>okhttp</artifactId>
<version>4.9.2</version>
</dependency>

异步GET请求

使用OkHttpClient发送异步GET请求的代码如下:

使用OKHttpClient创建GET请求步骤如下:

  • 通过构建器模式设置读写超时时间,创建出HTTP客户端实例;
  • 使用Request.Builder()创建请求实例;
  • 使用OkHttpClient.newCall()方法发送异步get请求;
  • 通过enqueue()方法设置对异步请求响应的处理。

同步POST请求

OKHttpClient发送同步POST请求代码如下所示:

通过OKHttpClient创建POST请求步骤如下:

  • 调用prepaareRequest()方法生成请求JSON数据;
  • 通过构建器模式创建出OkHttpClient客户端实例;
  • 调用RequestBody.create()创建出请求数据体;
  • 调用Request.Builder()构建出POST请求示例;
  • 调用execute()发送请求,并同步得到返回结果Response。

OKHttpClient的客户端和请求示例都不需要手动关闭,我们创建单个OkHttpClient实例可以将进行重复使用,并且OkHttp的性能最佳。

Spring WebClient

Spring WebClient是在Spring 5中引入的异步、反应式HTTP客户端,用于取代较旧的RestTemplate,以便在使用Spring Boot框架构建的应用程序中进行REST API调用,它支持同步、异步和流式处理。

同样使用Spring WebClient需要添加Maven依赖:

1
2
3
4
5
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-webflux</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>

Spring WebClient在Spring-boot-starter-webFlux包中,Spring WebFlux是Spring5的一部分,用于为Web应用程序中的反应式编程提供支持。

异步GET请求

使用Spring WebClient发送异步GET请求代码示例如下:

  • 在此代码片段中,我们首先使用默认设置创建客户端;
  • 接下来,调用client的get()方法,并调用uri()方法设置请求API地址;
  • 调用链中的retrieve()方法用于进行API调用,也就是发送请求;
  • 然后并通过bodyToMono()方法获取响应体,该响应体通过bodyToMono()方法转换为Mono对象;
  • 最后,使用subscribe()方法以非阻塞方式订阅bodyToMono()方法转换返回的Mono对象。

同步POST请求

虽然Spring WebClient是异步的,但我们仍然可以通过调用block()方法进行同步调用,该方法会阻塞线程,直到执行结束;在方法执行后返回结果。

使用WebClient发出的同步POST请求示例如下:

  • 首先通过WebClient.create()创建HTTP客户端;
  • 通过client.post()设置请求方式为POST;
  • 通过body()方法将prepaareRequest()方法中返回JSON字符串作为请求数据;
  • 调用exchange()方法发送请求,并且会将响应数据封装到Mono对象中,比retrieve()方法提供更多的操作方法;
  • 调用block()设置请求为同步阻塞的。

如何选择

在本文上述内容中我们通过不同的客户端工具实现了发送同步的GET请求和异步的POST请求。

各个工具的特点可以总结为以下几点:

  • 如果不想添加任何外部库并且应用程序的JDK版本是11+,那么原生HTTPClient是首选;
  • 如果是Spring Boot应用并且是反应式API,则使用Spring WebClient;
  • Apache HttpClient的灵活性更高,相比其他库有更多的参考文档;
  • OkHttpClient的性能最佳,客户端对象可重复使用,功能丰富,高度可配置。

所以在实际开发中,按自己的实际需求和场景进行选择,俗话说没有最好的,只有最适合的。


我是小黑,一个在互联网“苟且”的程序员。

流水不争先,贵在滔滔不绝

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

大数据Hive学习之旅第五篇 一、函数 二、友情链接

发表于 2021-11-26

「这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战」。

一、函数

1、系统内置函数

1.1、查看系统自带的函数

1
hive复制代码hive (default)> show functions;

1.2、显示自带的函数的用法

1
hive复制代码hive (default)> desc function upper;

1.3、详细显示自带的函数的用法

1
hive复制代码hive (default)> desc function extended upper;

2、常用内置函数

2.1、空字段赋值

  1. 函数说明

NVL:给值为 NULL 的数据赋值,它的格式是 NVL( value,default_value)。它的功能是如果 value 为 NULL,则 NVL 函数返回 default_value 的值,否则返回 value 的值,如果两个参数都为 NULL ,则返回 NULL。

  1. 数据准备:采用员工表
  2. 查询:如果员工的 comm 为 NULL,则用-1 代替
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
hive复制代码hive (default)> select comm,nvl(comm,-1) from emp;
OK
comm _c1
NULL -1.0
300.0 300.0
500.0 500.0
NULL -1.0
1400.0 1400.0
NULL -1.0
NULL -1.0
NULL -1.0
NULL -1.0
0.0 0.0
NULL -1.0
NULL -1.0
NULL -1.0
NULL -1.0
  1. 查询:如果员工的 comm 为 NULL,则用领导 id 代替
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
hive复制代码hive (default)> select comm,nvl(comm,mgr) from emp;
OK
comm _c1
NULL 7902.0
300.0 300.0
500.0 500.0
NULL 7839.0
1400.0 1400.0
NULL 7839.0
NULL 7839.0
NULL 7566.0
NULL NULL
0.0 0.0
NULL 7788.0
NULL 7698.0
NULL 7566.0
NULL 7782.0

2.2、CASE WHEN THEN ELSE END

  1. 数据准备

image.png
2. 需求

求出不同部门男女各多少人。结果如下:

image.png
3. 创建本地 emp_sex.txt,导入数据

1
2
3
4
5
6
7
shell复制代码[moe@hadoop102 datas]$ vi emp_sex.txt
悟空 A 男
大海 A 男
宋宋 B 男
凤姐 A 女
婷姐 B 女
婷婷 B 女
  1. 创建 hive 表并导入数据
1
2
3
4
5
hive复制代码hive (default)> create table emp_sex
> (name string, dept_id string, sex string)
> row format delimited fields terminated by '\t';

hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/emp_sex.txt' into table emp_sex;
  1. 按需求查询数据
1
2
3
4
5
6
hive复制代码hive (default)> select
> dept_id,
> sum(case sex when '男' then 1 else 0 end) male_count,
> sum(case sex when '女' then 1 else 0 end) female_count
> from emp_sex
> group by dept_id;

2.3、行转列

  1. 相关函数说明

CONCAT(string A/col, string B/col…):返回输入字符串连接后的结果,支持任意个输入字符串;

CONCAT_WS(separator, str1, str2,…):它是一个特殊形式的 CONCAT()。第一个参数剩余参数间的分隔符。分隔符可以是与剩余参数一样的字符串。如果分隔符是 NULL,返回值也将为 NULL。这个函数会跳过分隔符参数后的任何 NULL 和空字符串。分隔符将被加到被连接的字符串之间;

注意: CONCAT_WS must be "string or array<string>

COLLECT_SET(col):函数只接受基本数据类型,它的主要作用是将某字段的值进行去重汇总,产生 Array 类型字段。
2. 数据准备

image.png
3. 需求

把星座和血型一样的人归类到一起。结果如下:

image.png
4. 创建本地 constellation.txt,导入数据

创建本地 constellation.txt,导入数据

1
2
3
4
5
6
7
shell复制代码[moe@hadoop102 datas]$ vim person_info.txt
孙悟空 白羊座 A
大海 射手座 A
宋宋 白羊座 B
猪八戒 白羊座 A
凤姐 射手座 A
苍老师 白羊座 B
  1. 创建 hive 表并导入数据
1
2
3
4
5
hive复制代码hive (default)> create table person_info
> (name string, constellation string, blood_type string)
> row format delimited fields terminated by '\t';

hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/person_info.txt' into table person_info;
  1. 按需求查询数据
1
2
3
4
5
6
7
8
9
10
hive复制代码hive (default)> SELECT
> t1.c_b,
> CONCAT_WS("|",collect_set(t1.name))
> FROM (
> SELECT
> NAME,
> CONCAT_WS(',',constellation,blood_type) c_b
> FROM person_info
> )t1
> GROUP BY t1.c_b;

image.png

2.4、列转行

  1. 函数说明

EXPLODE(col):将 hive 一列中复杂的 Array 或者 Map 结构拆分成多行。

LATERAL VIEW

用法:LATERAL VIEW udtf(expression) tableAlias AS columnAlias

解释:用于和 split, explode 等 UDTF 一起使用,它能够将一列数据拆成多行数据,在此基础上可以对拆分后的数据进行聚合。
2. 数据准备

image.png
3. 需求

将电影分类中的数组数据展开。结果如下:

image.png
4. 创建本地 movie.txt,导入数据

1
2
3
4
shell复制代码[moe@hadoop102 datas]$ vi movie_info.txt
《疑犯追踪》 悬疑,动作,科幻,剧情
《Lie to me》 悬疑,警匪,心理,剧情
《战狼2》 战争,动作,灾难
  1. 创建 hive 表并导入数据
1
2
3
4
5
hive复制代码hive (default)> create table movie_info
> (movie string, category string)
> row format delimited fields terminated by '\t';

hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/movie_info.txt' into table movie_info;
  1. 按需求查询数据
1
2
3
4
5
6
7
hive复制代码hive (default)> SELECT
> movie,
> category_name
> FROM
> movie_info
> lateral VIEW
> explode(split(category,",")) movie_info_tmp AS category_name;

image.png

2.5、窗口函数(开窗函数)

  1. 相关函数说明

OVER():指定分析函数工作的数据窗口大小,这个数据窗口大小可能会随着行的变而变化。
CURRENT ROW:当前行

n PRECEDING:往前 n 行数据

n FOLLOWING:往后 n 行数据

UNBOUNDED:起点,

UNBOUNDED PRECEDING 表示从前面的起点,UNBOUNDED FOLLOWING 表示到后面的终点

LAG(col,n,default_val):往前第 n 行数据

LEAD(col,n, default_val):往后第 n 行数据

NTILE(n):把有序窗口的行分发到指定数据的组中,各个组有编号,编号从 1 开始,对于每一行,NTILE 返回此行所属的组的编号。注意:n 必须为 int 类型。
2. 数据准备:name,orderdate,cost

1
2
3
4
5
6
7
8
9
10
11
12
13
14
txt复制代码jack,2017-01-01,10
tony,2017-01-02,15
jack,2017-02-03,23
tony,2017-01-04,29
jack,2017-01-05,46
jack,2017-04-06,42
tony,2017-01-07,50
jack,2017-01-08,55
mart,2017-04-08,62
mart,2017-04-09,68
neil,2017-05-10,12
mart,2017-04-11,75
neil,2017-06-12,80
mart,2017-04-13,94
  1. 需求
* 查询在 2017 年 4 月份购买过的顾客及总人数
* 查询顾客的购买明细及月购买总额
* 上述的场景, 将每个顾客的 cost 按照日期进行累加
* 查询每个顾客上次的购买时间
* 查询前 20%时间的订单信息
  1. 创建本地 business.txt,导入数据
1
shell复制代码[moe@hadoop102 datas]$ vi business.txt
  1. 创建 hive 表并导入数据
1
2
3
4
5
6
7
hive复制代码hive (default)> create table business(
> name string,
> orderdate string,
> cost int)
> row format delimited fields terminated by ',';

hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/business.txt' into table business;
  1. 按需求查询数据
* 查询在 2017 年 4 月份购买过的顾客及总人数


未使用over()函数



1
2
3
4
5
hive复制代码hive (default)> select
> name,count(*)
> from business
> where substring(orderdate,1,7) = '2017-04'
> group by name;
![image.png](https://gitee.com/songjianzaina/juejin_p8/raw/master/img/88f17e33d94cf803de0aece411aaecac801cab1c445d3799cb45743a592bc923) 使用over()函数
1
2
3
4
5
hive复制代码hive (default)> select
> name,count(*) over()
> from business
> where substring(orderdate,1,7) = '2017-04'
> group by name;
![image.png](https://gitee.com/songjianzaina/juejin_p8/raw/master/img/7c04e0cbb6a6cba7afad9eaee7d5ca649f0251d151d2e8489146bfdb07f7c06e) * 查询顾客的购买明细及月购买总额
1
2
3
hive复制代码hive (default)> select
> name,orderdate,cost,sum(cost) over(partition by month(orderdate))
> from business;
![image.png](https://gitee.com/songjianzaina/juejin_p8/raw/master/img/c7871d7ab4bce96a03a8c0e72bbfe56114075d012ca101e8390063944b537c54) * 将每个顾客的 cost 按照日期进行累加
1
2
3
4
5
6
7
8
9
hive复制代码select name,orderdate,cost,
sum(cost) over() as sample1,--所有行相加
sum(cost) over(partition by name) as sample2,--按 name 分组,组内数据相加
sum(cost) over(partition by name order by orderdate) as sample3,--按 name分组,组内数据累加
sum(cost) over(partition by name order by orderdate rows between UNBOUNDED PRECEDING and current row ) as sample4 ,--和 sample3 一样,由起点到当前行的聚合
sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING and current row) as sample5, --当前行和前面一行做聚合
sum(cost) over(partition by name order by orderdate rows between 1 PRECEDING AND 1 FOLLOWING ) as sample6,--当前行和前边一行及后面一行
sum(cost) over(partition by name order by orderdate rows between current row and UNBOUNDED FOLLOWING ) as sample7 --当前行及后面所有行
from business;
![image.png](https://gitee.com/songjianzaina/juejin_p8/raw/master/img/8e88a26f953aeea8b6a20199ca4092512a94ce27776dc5ccef15e31c38835a28) rows 必须跟在 order by 子句之后,对排序的结果进行限制,使用固定的行数来限制分区中的数据行数量 ![154E646C.gif](https://gitee.com/songjianzaina/juejin_p8/raw/master/img/72c633265ad4dcfbb62b91d224e47e2d177b7431ae01ec2b8a57f53b78d7bda9) * 查看顾客上次的购买时间
1
2
3
4
5
hive复制代码select 
name,orderdate,cost,
lag(orderdate,1,'1900-01-01') over(partition by name order by orderdate ) as time1,
lag(orderdate,2) over (partition by name order by orderdate) as time2
from business;
![image.png](https://gitee.com/songjianzaina/juejin_p8/raw/master/img/201541085e19bc6909b8efb1b2ba7939ad3cc9e28fbf2cc403a56aeaedc97f3d) * 查询前 20%时间的订单信息
1
2
3
4
5
hive复制代码select * from (
select name,orderdate,cost, ntile(5) over(order by orderdate) sorted
from business
) t
where sorted = 1;
![image.png](https://gitee.com/songjianzaina/juejin_p8/raw/master/img/4eb3dc069cb621448033f0d19c9f812a7c10731db5ec56eb57a4c84d3772ef48)

2.6、Rank

  1. 函数说明

RANK() 排序相同时会重复,总数不会变。

DENSE_RANK() 排序相同时会重复,总数会减少。

ROW_NUMBER() 会根据顺序计算
2. 数据准备

image.png
3. 需求

计算每门学科成绩排名。
4. 创建本地 score.txt,导入数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
shell复制代码[moe@hadoop102 datas]$ vi score.txt

孙悟空 语文 87
孙悟空 数学 95
孙悟空 英语 68
大海 语文 94
大海 数学 56
大海 英语 84
宋宋 语文 64
宋宋 数学 86
宋宋 英语 84
婷婷 语文 65
婷婷 数学 85
婷婷 英语 78
  1. 创建 hive 表并导入数据
1
2
3
4
5
6
7
hive复制代码hive (default)> create table score(
> name string,
> subject string,
> score int)
> row format delimited fields terminated by '\t';

hive (default)> load data local inpath '/opt/module/hive-3.1.2/datas/score.txt' into table score;
  1. 按需求查询数据
1
2
3
4
5
6
7
hive复制代码select name,
subject,
score,
rank() over(partition by subject order by score desc) rp,
dense_rank() over(partition by subject order by score desc) drp,
row_number() over(partition by subject order by score desc) rmp
from score;

image.png

3、自定义函数

  1. Hive 自带了一些函数,比如:max/min 等,但是数量有限,自己可以通过自定义 UDF 来方便的扩展。
  2. 当 Hive 提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。
  3. 根据用户自定义函数类别分为以下三种
* UDF(User-Defined-Function)


一进一出
* UDAF(User-Defined Aggregation Function)聚集函数


多进一出


类似于:count/max/min
* UDTF(User-Defined Table-Generating Functions)


一进多出


如 lateral view explode()
  1. 官方文档地址

cwiki.apache.org/confluence/…
5. 编程步骤

* 继承 Hive 提供的类


org.apache.hadoop.hive.ql.udf.generic.GenericUDF


org.apache.hadoop.hive.ql.udf.generic.GenericUDTF
* 实现类中的抽象方法
* 在 hive 的命令行窗口创建函数


添加 jar



1
shell复制代码add jar linux_jar_path
创建 function
1
hive复制代码create [temporary] function [dbname.]function_name AS class_name;
* 在 hive 的命令行窗口删除函数
1
hive复制代码drop [temporary] function [if exists] [dbname.]function_name;

4、自定义 UDF 函数

  1. 需求

自定义一个 UDF 实现计算给定字符串的长度(当然系统默认有这个函数,仅仅是案例模拟!),例如:

1
2
hive复制代码hive(default)> select my_len("abcd");
4
  1. 创建一个 Maven 工程 Hive
  2. 导入依赖
1
2
3
4
5
6
7
pom复制代码<dependencies>
<dependency>
<groupId>org.apache.hive</groupId>
<artifactId>hive-exec</artifactId>
<version>3.1.2</version>
</dependency>
</dependencies>
  1. 创建一个类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码public class MyUDF extends GenericUDF {

//校验数据参数个数
@Override
public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {

if (arguments.length != 1) {
throw new UDFArgumentException("参数个数不为1");
}

return PrimitiveObjectInspectorFactory.javaIntObjectInspector;
}

//处理数据
@Override
public Object evaluate(DeferredObject[] arguments) throws HiveException {

//1.取出输入数据
String input = arguments[0].get().toString();

//2.判断输入数据是否为null
if (input == null) {
return 0;
}

//3.返回输入数据的长度
return input.length();
}

@Override
public String getDisplayString(String[] children) {
return "";
}
}
  1. 打成 jar 包上传到服务器/opt/module/hive-3.1.2/datas/myudf.jar
  2. 将 jar 包添加到 hive 的 classpath
1
hive复制代码hive (default)> add jar /opt/module/hive-3.1.2/datas/myudf.jar;
  1. 创建临时函数与开发好的 java class 关联
1
hive复制代码hive (default)> create temporary function my_len as "com.moe.hive.udf.MyUDF";
  1. 即可在 hql 中使用自定义的函数
1
2
3
4
5
6
7
8
9
hive复制代码hive (default)> select my_len('moe');
OK
_c0
3

hive (default)> select length('moe');
OK
_c0
3

5、自定义 UDTF 函数

  1. 需求

自定义一个 UDTF 实现将一个任意分割符的字符串切割成独立的单词,例如:

1
2
3
4
5
hive复制代码hive(default)> select myudtf("hello,world,hadoop,hive", ",");
hello
world
hadoop
hive
  1. 代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
java复制代码public class MyUDTF extends GenericUDTF {

//输出数据的集合
private ArrayList<String> outPutList = new ArrayList<>();

@Override
public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {

//输出数据的默认列名,可以用别名覆盖
List<String> fieldNames = new ArrayList<>();
fieldNames.add("word");

//输出数据的类型
List<ObjectInspector> fieldOIs = new ArrayList<>();
fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

//最终返回值
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
}

//处理输入数据:hello,moe,hive
@Override
public void process(Object[] args) throws HiveException {

//1.取出输入数据
String input = args[0].toString();

//2.按照","分割字符串
String[] words = input.split(",");

//3.遍历数据写出
for (String word : words) {

//清空集合
outPutList.clear();

//将数据放入集合
outPutList.add(word);

//输出数据
forward(outPutList);

}

}

//收尾方法
@Override
public void close() throws HiveException {

}
}
  1. 打成 jar 包上传到服务器/opt/module/hive-3.1.2/datas/myudtf.jar
  2. 将 jar 包添加到 hive 的 classpath 下
1
hive复制代码hive (default)> add jar /opt/module/hive-3.1.2/datas/myudtf.jar;
  1. 创建临时函数与开发好的 java class 关联
1
hive复制代码hive (default)> create temporary function myudtf as "com.moe.hive.udtf.MyUDTF";
  1. 使用自定义的函数
1
2
3
4
5
6
hive复制代码hive (default)> select myudtf('hello,moe,hive',',');
OK
word
hello
moe
hive

二、友情链接

大数据Hive学习之旅第四篇

大数据Hive学习之旅第三篇

大数据Hive学习之旅第二篇

大数据Hive学习之旅第一篇

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

prometheus+grafana替代Zabbix监控sp

发表于 2021-11-26

这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战

1.下载可视化模板

通过上文可知grafana为显示页面,所以本文提供一份监控springboot的json页面供大家下载。

链接:pan.baidu.com/s/1h5yrTsqU…
提取码:ehbv

2.配置SpringBoot

1.修改pom

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
js复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.1.3.RELEASE</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>
<groupId>com.dalaoyang</groupId>
<artifactId>springboot2_prometheus</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>springboot2_prometheus</name>
<description>springboot2_prometheus</description>

<properties>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>io.micrometer</groupId>
<artifactId>micrometer-registry-prometheus</artifactId>
<version>1.1.3</version>
</dependency>
</dependencies>


<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

</project>

2.修改application.yml

1
2
3
4
5
6
7
8
9
10
11
js复制代码management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
health:
redis:
enabled: false

3.设置application

在启动问价中加入以下代码。
在这里插入图片描述

1
2
3
4
js复制代码    @Bean
MeterRegistryCustomizer<MeterRegistry> configurer(@Value("${spring.application.name}") String applicationName) {
return (registry) -> registry.config().commonTags("application", applicationName);
}

SpringBoot项目到这里就配置完成了,启动项目,访问http://ip:8080/actuator/prometheus即可看到返回值,这些值就是spring boot返回在前台页面的信息。
在这里插入图片描述

3.Prometheus配置

在prometheus配置监控我们的SpringBoot应用,完整配置如下所示。找到配置文件替换即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
js复制代码# my global config
global:
scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'
static_configs:
- targets: ['localhost:9090']
- job_name: 'springboot_system' #监控任务名称
scrape_interval: 5s
metrics_path: '/actuator/prometheus'
static_configs:
- targets: ['localhost:8088'] #springboot端口号

启动Prometheus ,访问 ip:9090,然后按照下图操作,即可看到已经注册到Prometheus 的监控任务。
在这里插入图片描述

4.Grafana配置

输入ip:3000,可以进入grafana的可视化界面。

1.配置prometheus数据源 (ip:9090)

在这里插入图片描述
在这里插入图片描述

上图填写你prometheus地址,端口切记填写9090,点击save后,如果失败会有提示 。

2.导入可视化模板

然后导入上文下载好的可视化界面模板。
在这里插入图片描述

选择上文配置好的数据源,prometheus选项就是上文配置的数据源。
在这里插入图片描述

3.验证

在这里插入图片描述

出现以上画面 配置完成!

​

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

APScheduler不完全踩坑指南

发表于 2021-11-26

这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战

踩坑指南

这是一篇排错指南,源自博主近期用到APScheduler较多,特此总结了一些常见的问题,希望对大家有帮助。

问题1: Can’t Connect to MySQL的问题

如果用的是SQLAlchemy作为job的存储介质,那如果参数没配置好的话,就会导致这个问题。

  • 问题描述

在涉及到job数据读取/更新的时候,我们会调用SQLAlchemy的Session去完成相关操作。但偶尔会弹出Can't Connect to MySQL或者Lose Connection的错误。

  • 问题分析

MySQL对每个数据库连接是有个有效期的,默认应该是8小时,但这个是可以配置的。而SQLAlchemy的连接都是在连接池里面,而且有一个默认的回收时间。

假如SQLAlchemy开辟的连接回收时间是2小时,但是MySQL配置的连接失效时间是1小时,那么就会出现一个问题:

SQLAlchemy认为连接仍然有效,但MySQL认为没效了,所以连接到了MySQL那边就不认了。就会出现以上错误了。

  • 解决方法

在jobstore处配置pool_recycle参数,这个参数要确保小于数据库的连接超时时间。

比如我配置25分钟,那么数据库超时时间就算只有30分钟,那我也能正常工作。

1
2
3
4
python复制代码# 连接回收时间为1500秒
job_store = {
'default': SQLAlchemyJobStore(url="数据库jdbc连接地址", engine_options={"pool_recycle": 1500})
}

问题2 job莫名其妙被删除的问题

不得不说,这个设定很变态,问题表象是: 你的服务跑着跑着,你的定时任务全不见了。这个非常非常蛋疼。

  • 问题描述

我这边的情况是,当job无法从数据库中反序列化到内存的时候,APScheduler会自动删除这样的job。

  • 问题分析

注意看具体的日志,经过我的排查,发现APScheduler用的序列化方式是pickle,与JSON相比,是个咱们不太熟悉但又听过的序列化类库。

但这个类库有个很严重的问题,比如我的用的python3.7,里面有个pickle.DEFAULT_PROTOCOL参数,在3.7的时候是3,而到Python3.9以后,这个值变成了4.

APScheduler里面这个参数默认用的pickle.DEFAULT_PROTOCOL参数。

想象一下,你本地是Python3.9, 但服务器是3.7,那你添加job的时候可能是用的DEFAULT_PROTOCOL=4,但是服务器反序列化job的时候,拿到的就是3了,导致无法反序列化,最终删除job。

  • 解决方法

指定序列化类型,最好知道你即将部署的机器的python版本,目前可以指定为2或者3。

1
2
python复制代码job_store = {
'default': SQLAlchemyJobStore(url="数据库连接地址", pickle_protocol=3, engine_options={"pool_recycle": 1500})

问题3 用gunicorn或者uvicorn重复执行的问题

可以看一下我上一篇文章,用socket/分布式锁都可以解决。

测试平台系列(82) 解决APScheduler重复执行的问题

问题4 分布式部署的问题

如上所说,不管是分布式部署,还是多worker模式。APScheduler的支持都不是很友好。

因为它旨在给单个Python项目完成定时任务方案,而没有考虑集群等分布式问题。在单个python服务里面,他可以运行良好。我们来看一个例子:

  • 服务器A 部署了web服务
  • 服务器B 部署了web服务

2个服务一模一样,之所以部署2台机器,是为了负载均衡考虑。那么我此时调用新增job的方法,请求打到了服务器A,那会是什么过程呢:

  1. 服务器A接收数据,与本地的func联动并加载job到内存
  2. 讲加载后的内存持久化到数据库

但你要说服务器B的jobstore有这个任务吗?那当然是没有的,因为它没有任何集群的概念。不存在服务器A接受命令,送达到服务器B的机制。

这最直观地导致,你在gunicorn/uvicorn开启N个worker,并调用add接口添加job,不会感知到job重复执行的问题。

而一旦你重启了服务,8个worker都从数据库加载了job到内存中,那你会很快遇到重复执行的任务。

本文仅限于我对APScheduler的浅薄理解而写,如有不周之处还望指正。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Hadoop企业级生产调优手册(一)

发表于 2021-11-26

「这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战」。

一、HDFS核心参数

1.1 NameNode内存生产配置

1. NameNode 内存计算

每个文件块大概占用 150byte,一台服务器 128G 内存为例,能存储多少文件块呢?

128*128*1024*1024/150Byte≈9.1亿

2. Hadoop2.x 系列, 配置 NameNode 内存

NameNode 内存默认 2000m,如果服务器内存 4G, NameNode 内存可以配置 3G。在 hadoop-env.sh 文件中配置如下。

1
powershell复制代码HADOOP_NAMENODE_OPTS= Xmx 3072 m

3. Hadoop3.x 系列,配置 NameNode 内存

hadoop-env.sh 中描述 Hadoop 的内存是动态分配的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
powershell复制代码# The maximum amount of heap to use (Java -Xmx).  If no unit
# is provided, it will be converted to MB. Daemons will
# prefer any Xmx setting in their respective _OPT variable.
# There is no default; the JVM will autoscale based upon machine
# memory size.
# export HADOOP_HEAPSIZE_MAX=

# The minimum amount of heap to use (Java -Xms). If no unit
# is provided, it will be converted to MB. Daemons will
# prefer any Xms setting in their respective _OPT variable.
# There is no default; the JVM will autoscale based upon machine
# memory size.
# export HADOOP_HEAPSIZE_MIN=
HADOOP_NAMENODE_OPTS= Xmx102400m

查看 NameNode 占用内存

1
2
3
4
5
6
7
8
9
10
11
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ jps
3136 JobHistoryServer
3200 Jps
2947 NodeManager
2486 NameNode
2622 DataNode
[Tom@hadoop102 hadoop-3.1.3]$ jps -heap 2486
Heap Configuration:
MinHeapFreeRatio = 40
MaxHeapFreeRatio = 70
MaxHeapSize = 478150656 (456.0MB)

查看 DataNode 占用内存

1
2
3
4
5
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ jmap -heap 2622
Heap Configuration:
MinHeapFreeRatio = 40
MaxHeapFreeRatio = 70
MaxHeapSize = 478150656 (456.0MB)

查看发现 hadoop102 上的 NameNode 和 DataNode 占用内存都是自动分配的,且相等。不是很合理。

经验参考:

docs.cloudera.com/documentati…

具体修改:hadoop-env.sh

1
2
3
powershell复制代码export HDFS_NAMENODE_OPTS="-Dhadoop.security.logger=INFO,RFAS-Xmx1024m"

export HDFS_DATANODE_OPTS="-Dhadoop.security.logger=ERROR,RFAS-Xmx1024m"

1.2 NameNode心跳并发配置

NameNode 有一个工作线程池,用来处理不同 DataNode 的并发心跳以及客户端并发的元数据操作 。对于大集群或者有大量客户端的集群来说,通常需要增大该参数 。 默认值是 10 。

1
2
3
4
xml复制代码<property>
<name>dfs.namenode.handler.count</name>
<value>21</value>
</property>

企业经验:dfs.namenode.handler.count=20 × logeClustersize ,比如集群规模 (DataNode 台数)为 3 台时,此参数设置为 21。可通过简单的 python 代码计算该值,代码如下。

1
2
3
4
5
6
7
8
python复制代码[Tom@hadoop102 hadoop-3.1.3]$ python
Python 2.7.5 (default, Oct 14 2020, 14:45:30)
[GCC 4.8.5 20150623 (Red Hat 4.8.5-44)] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import math
>>> print int(20*math.log(3))
21
>>> quit()

1.3 开启回收站配置

开启回收站功能,可以将删除的文件在不超时的情况下,恢复原数据,起到防止误删除、备份等作用。

1. 回收站工作机制

2. 开启回收站功能参数说明

默认值 fs.trash.interval = 0,0 表示禁用回收站,其他值表示设置文件的存活时间。

默认值 fs.trash.checkpoint.interval = 0,检查回收站的间隔时间。如果该值为 0,则该值设置和 fs.trash.interval 的参数值相等。

要求 fs.trash.checkpoint.interval <= fs.trash.interval。

3. 启用回收站

修改core-site.xml配置,垃圾回收时间为 1 分钟。

1
2
3
4
xml复制代码<property>
<name>fs.trash.interval</name>
<value>1</value>
</property>

4. 查看回收站

回收站目录在 HDFS 集群中的路径:/user/Tom/.Trash/….

5. 注意:通过网页上直接删除的文件也不会走回收站。

6. 通过程序删除的文件不会经过回收站,需要调用 moveToTrash() 才进入回收站

1
2
powershell复制代码Trash trash = N ew Trash(conf);
trash.moveToTrash(path);

7. 只有在命令行利用 hadoop fs -rm 命令删除的文件才会走回收站。

1
2
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hadoop fs -rm -r /input
2021-06-24 18:20:36,515 INFO fs.TrashPolicyDefault: Moved: 'hdfs://hadoop102:8020/input' to trash at: hdfs://hadoop102:8020/user/Tom/.Trash/Current/input

(8)恢复回收站数据

1
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hadoop fs -mv /user/Tom/.Trash/Current/input /input

二、HDFS集群压测

在企业中非常关心每天从 Java 后台拉取过来的数据,需要多久能上传到集群?消费者关心多久能从 HDFS 上拉取需要的数据?

为了搞清楚 HDFS 的读写性能,生产环境上非常需要对集群进行压测。

HDFS 的读写性能主要受网络和磁盘影响比较大。为了方便测试,将 hadoop102、hadoop103、 hadoop104 虚拟机网络都设置为 100mbps。

100Mbps 单位是 bit;10M/s 单位是 byte;1byte=8bit;100Mbps/8=12.5M/s。

测试网速:来到 hadoop102 的 /opt/software 目录, 创建一个

1
2
powershell复制代码[Tom@hadoop102 software]$ python -m SimpleHTTPServer
Serving HTTP on 0.0.0.0 port 8000 ...

2.1 测试HDFS写性能

写测试底层原理

测试内容: 向 HDFS 集群写 5 个 128M 的文件

注意: nrFiles n 为生成 mapTask 的数量,生产环境一般可通过 hadoop103:8088 查看 CPU 核数,设置为(CPU 核数 - 1)

1
2
3
4
5
6
7
8
9
10
11
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -write -nrFiles 5 -fileSize 128MB
2021-06-24 21:58:25,548 INFO sasl.SaslDataTransferClient: SASL encryption trust check: localHostTrusted = false, remoteHostTrusted = false
2021-06-24 21:58:25,568 INFO fs.TestDFSIO: ----- TestDFSIO ----- : write
2021-06-24 21:58:25,568 INFO fs.TestDFSIO: Date & time: Thu Jun 24 21:58:25 CST 2021
2021-06-24 21:58:25,568 INFO fs.TestDFSIO: Number of files: 5
2021-06-24 21:58:25,568 INFO fs.TestDFSIO: Total MBytes processed: 640
2021-06-24 21:58:25,568 INFO fs.TestDFSIO: Throughput mb/sec: 0.88
2021-06-24 21:58:25,568 INFO fs.TestDFSIO: Average IO rate mb/sec: 0.88
2021-06-24 21:58:25,568 INFO fs.TestDFSIO: IO rate std deviation: 0.04
2021-06-24 21:58:25,568 INFO fs.TestDFSIO: Test exec time sec: 246.54
2021-06-24 21:58:25,568 INFO fs.TestDFSIO:
  • Number of files:生成 mapTask 数量,一般是集群中 CPU 核数 -1,我们测试虚拟机就按照实际的物理内存 -1 分配即可
  • Total MBytes processed:单个 map 处理的文件大小
  • Throughput mb/sec:单个 mapTak 的吞吐量

计算方式:处理的总文件大小 / 每一个 mapTask 写数据的时间累加

集群整体吞吐量:生成 mapTask 数量 * 单个 mapTak 的吞吐量

  • Average IO rate mb/sec:平均 mapTak 的吞吐量

计算方式:每个 mapTask 处理文件大小 / 每一个 mapTask 写数据的时间全部相加除以 task 数量

  • IO rate std deviation:方差、反映各个 mapTask 处理的差值,越小越均衡

注意:如果测试过程中,出现异常,可以在yarn-site.xml中设置虚拟内存检测为 false。然后分发配置并重启集群。

1
2
3
4
5
xml复制代码<!--是否启动一个线程检查每个任务正使用的虚拟内存量,如果任务超出分配值,则直接将其杀掉,默认是true -->
<property>
<name>yarn.nodemanager.vmem-check-enabled</name>
<value>false</value>
</property>

测试结果分析

由于副本 1 就在本地,所以该副本不参与测试

一共参与测试的文件:

5 个文件 * 2 个副本 = 10 个

压测后的速度: 0.88

实测速度:0.88M/s * 10 个文件 ≈ 8.8M/s

三台服务器的带宽:12.5 + 12.5 + 12.5 ≈ 30m/s

所有网络资源没有用满。

如果实测速度远远小于网络,并且实测速度不能满足工作需求,可以考虑采用固态硬盘或者增加磁盘个数。

如果客户端不在集群节点,那就三个副本都参与计算

2.2 测试HDFS读性能

测试内容:读取 HDFS 集群 5 个 128M 的文件

1
2
3
4
5
6
7
8
9
10
11
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -read -nrFiles 5 -fileSize 128MB

2021-06-25 17:34:41,179 INFO fs.TestDFSIO: ----- TestDFSIO ----- : read
2021-06-25 17:34:41,181 INFO fs.TestDFSIO: Date & time: Fri Jun 25 17:34:41 CST 2021
2021-06-25 17:34:41,182 INFO fs.TestDFSIO: Number of files: 5
2021-06-25 17:34:41,182 INFO fs.TestDFSIO: Total MBytes processed: 640
2021-06-25 17:34:41,182 INFO fs.TestDFSIO: Throughput mb/sec: 4.6
2021-06-25 17:34:41,182 INFO fs.TestDFSIO: Average IO rate mb/sec: 4.74
2021-06-25 17:34:41,182 INFO fs.TestDFSIO: IO rate std deviation: 0.93
2021-06-25 17:34:41,182 INFO fs.TestDFSIO: Test exec time sec: 82.47
2021-06-25 17:34:41,182 INFO fs.TestDFSIO:

删除测试生成数据

1
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hadoop jar share/hadoop/mapreduce/hadoop-mapreduce-client-jobclient-3.1.3-tests.jar TestDFSIO -clean

三、HDFS多目录

3.1 NameNode多目录配置

NameNode 的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性。

具体配置如下

(1)在 hdfs-site.xml文件中添加如下内容

1
2
3
4
xml复制代码<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/name1,file://${hadoop.tmp.dir}/dfs/name2</value>
</property>

注意:因为每台服务器节点的磁盘情况不同,所以这个配置配完之后,可以选择不分发。

(2)停止集群,删除三台节点的 data 和 logs 中所有数据。

1
2
3
xml复制代码[Tom@hadoop102 hadoop 3.1.3 ]$ rm rf data/ logs/
[Tom@hadoop103 hadoop 3.1.3 ]$ rm rf data/ logs/
[Tom@hadoop104 hadoop 3.1.3 ]$ rm rf data/ logs/

(3)格式化集群并启动。

1
2
powershell复制代码[Tom@hadoop102 hadoop 3.1.3 ]$ bin/hdfs namenode format
[Tom@hadoop102 hadoop 3.1.3 ]$ sbin/start dfs.sh

查看结果

1
2
3
4
5
powershell复制代码[Tom@hadoop102 dfs]$ ll
总用量12
drwx------. 3 Tom Tom 4096 12月11 08:03 data
drwxrwxr-x. 3 Tom Tom 4096 12月11 08:03 name1
drwxrwxr-x. 3 Tom Tom 4096 12月11 08:03 name2

检查 name1 和 name2 里面的内容,发现一模一样。

3.2 DataNode多目录配置

DataNode 可以配置成多个目录,每个目录存储的数据不一样(数据不是副本)

具体配置如下

(1)在 hdfs-site.xml文件中添加如下内容

1
2
3
4
xml复制代码<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data1,file://${hadoop.tmp.dir}/dfs/data2</value>
</property>

查看结果

1
2
3
4
5
6
powershell复制代码[Tom@hadoop102 dfs]$ ll
总用量12
drwx------. 3 Tom Tom 4096 4月4 14:22 data1
drwx------. 3 Tom Tom 4096 4月4 14:22 data2
drwxrwxr-x. 3 Tom Tom 4096 12月11 08:03 name1
drwxrwxr-x. 3 Tom Tom 4096 12月11 08:03 name2

向集群上传一个文件,再次观察两个文件夹里面的内容发现不一致 (一个有数一个没有)

1
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hadoop fs -put wcinput/word.txt /

3.3 集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。( Hadoop3.x 新特性)

(1)生成均衡计划(我只有一块磁盘,不会生成计划)

1
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hdfs diskbalancer -plan hadoop102

(2)执行均衡计划

1
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hdfs diskbalancer -execute hadoop102.plan.json

(3)查看当前均衡任务的执行情况

1
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hdfs diskbalancer -query hadoop102

(4)取消均衡任务

1
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hdfs diskbalancer -cancel hadoop102.plan.json

四、HDFS集群扩容及缩容

4.1 添加白名单

白名单:表示在白名单的主机 IP 地址可以用来存储数据。

企业中:配置白名单,可以尽量防止黑客恶意访问攻击。

配置白名单步骤如下:

在 NameNode 节点的 /opt/module/hadoop-3.1.3/etc/hadoop 目录下分别创建 whitelist 和 blacklist 文件

(1)创建白名单

1
powershell复制代码[Tom@hadoop102 hadoop]$ vim whitelist

在 whitelist 中添加下主机名称,假如集群正常工作的节点为 102 103

1
2
powershell复制代码hadoop102
hadoop103

(2)创建黑名单,保持空的就可以

1
powershell复制代码[Tom@hadoop102 hadoop]$ touch blacklist

在 hdfs-site.xml 配置文件中增加 dfs.hosts 配置参数

1
2
3
4
5
6
7
8
9
10
11
xml复制代码<!--白名单-->
<property>
<name>dfs.hosts</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/whitelist</value>
</property>

<!--黑名单-->
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/blacklist</value>
</property>

分发配置文件 whitelist hdfs-site.xml

1
powershell复制代码[Tom@hadoop104 hadoop]$ xsync hdfs site.xml whitelist

第一次添加白名单必须重启集群,不是第一次,只需要刷新 NameNode 节点即可

1
2
powershell复制代码[Tom@hadoop102 hadoop 3.1.3]$ myhadoop.sh stop
[Tom@hadoop102 hadoop 3.1.3]$ myhadoop.sh start

在 web 浏览器上查看 DN,http://hadoop102:9870/dfshealth.html#tab-datanode

在 hadoop104 上执行上传数据数据失败

1
powershell复制代码[Tom@hadoop104 hadoop-3.1.3]$ hadoop fs -put NOTICE.txt /

二次修改白名单,增加 hadoop104

1
2
3
4
5
powershell复制代码[Tom@hadoop102 hadoop]$ vim whitelist
修改为如下内容
hadoop102
hadoop103
hadoo p104

刷新 NameNode

1
2
powershell复制代码[Tom@hadoop102 hadoop 3.1.3]$ hdfs dfsadmin refreshNodes
Refresh nodes successful

在 web 浏览器上查看 DN http://hadoop102:9870/dfshealth.html#tab-datanode

4.2 服役新服务器

需求

随着公司业务的增长,数据量越来越大,原有的数据节点的容量已经不能满足存储数据的需求,需要在原有集群基础上动态添加新的数据节点。

环境准备

(1)在 hadoop100 主机上再克隆一台 hadoop105 主机

(2)修改 IP 地址和主机名称

1
2
powershell复制代码[root@hadoop105 ~]# vim /etc/sysconfig/network-scripts/ifcfg-ens33
[root@hadoop105 ~]# vim /etc/hostname

(3)拷贝 hadoop102 的 /opt/module 目录和 /etc/profile.d/my_env.sh 到 hadoop105

1
2
3
powershell复制代码[Tom@hadoop102 opt]$ scp-r module/* Tom@hadoop105:/opt/module/
[Tom@hadoop102 opt]$ sudo scp/etc/profile.d/my_env.sh root@hadoop105:/etc/profile.d/my_env.sh
[Tom@hadoop105 hadoop-3.1.3]$ source /etc/profile

(4)删除 hadoop105 上 Hadoop 的历史数据, data 和 logs 数据

1
powershell复制代码[Tom@hadoop105 hadoop 3.1.3]$ rm rf data/ logs/

(5)配置 hadoop102 和 hadoop103 到 hadoop105 的 ssh 无密登录

1
2
powershell复制代码[ hadoop102 .ssh]$ ssh copy id hadoop105
[ hadoop103 .ssh]$ ssh copy id hadoop105

服役新节点具体步骤

(1)直接启动 DataNode 即可关联到集群

1
2
powershell复制代码[Tom@hadoop105 hadoop-3.1.3]$ hdfs --daemon start datanode
[Tom@hadoop105 hadoop-3.1.3]$ yarn --daemon start nodemanager

在白名单中增加新服役的服务器

(1)在白名单 whitelist 中增加 hadoop104、 hadoop105,并重启集群

1
2
3
4
5
6
powershell复制代码[Tom@hadoop102 hadoop]$ vim whitelist
修改为如下内容
hadoop102
hadoop103
hadoop104
hadoop105

(2)分发

1
powershell复制代码[Tom@hadoop102 hadoop]$ xsync whitelist

(3)刷新 NameNode

1
2
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hdfs dfsadmin -refreshNodes
Refresh nodes successful

在 hadoop105 上上传文件

1
powershell复制代码[Tom@hadoop105 hadoop-3.1.3]$ hadoop fs -put /opt/module/hadoop-3.1.3/LICENSE.txt /

4.3 服务器间数据均衡

企业经验

在企业开发中,如果经常在 hadoop102 和 hadoop104 上提交任务,且副本数为 2,由于数据本地性原则,就会导致 hadoop102 和 hadoop104 数据过多, hadoop103 存储的数据量小。

另一种情况,就是新服役的服务器数据量比较少,需要执行集群均衡命令。

开启数据均衡命令

1
2
powershell复制代码[Tom@hadoop105 hadoop 3.1.3]$ sbin/start balancer.sh
threshold 10

对于参数 10,代表的是集群中各个节点的磁盘空间利用率相差不超过 10%,可根据实际情况进行调整。

停止数据均衡命令

1
powershell复制代码[Tom@hadoop105 hadoop-3.1.3]$ sbin/stop-balancer.sh

注意 :由于 HDFS 需要启动单独的 Rebalance Server 来执行 Rebalance 操作, 所以尽量不要在 NameNode 上执行 start-balancer.sh,而是找一台比较空闲的机器。

4.4 黑名单退役服务器

黑名单:表示在黑名单的主机 IP 地址不可以用来存储数据。

企业中:配置黑名单,用来退役服务器。

黑名单配置步骤如下:

编辑 /opt/module/hadoop-3.1.3/etc/hadoop 目录下的 blacklist 文件

1
powershell复制代码[Tom@hadoop102 hadoop vim blacklist

添加如下主机名称(要退役的节点)

hadoop105

注意:如果白名单中没有配置,需要在 hdfs-site.xml 配置文件中增加 dfs.hosts 配置参数

1
2
3
4
5
xml复制代码<!--黑名单-->
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/blacklist</value>
</property>

分发配置文件 blacklist,hdfs-site.xml

1
powershell复制代码[Tom@hadoop104 hadoop]$ xsync hdfs-site.xml blacklist

第一次添加黑名单必须重启集群,不是第一次,只需要刷新 NameNode 节点即可

1
2
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ hdfs dfsadmin -refreshNodes
Refresh nodes successful

检查 Web 浏览器 ,退役节点的状态为 decommission in progress(退役中,说明数据节点正在复制块到其他节点)

等待退役节点状态为 decommissioned(所有块已经复制完成 ),停止该节点及节点资源管理器。 注意 :如果副本数是 3 服役的节点小于等于 3,是不能退役成功的,需要修改副本数后才能退役。

1
2
3
powershell复制代码[Tom@hadoop105 hadoop-3.1.3]$ hdfs --daemon stop datanode

[Tom@hadoop105 hadoop-3.1.3]$ yarn--daemon stop nodemanager

如果数据不均衡,可以用命令实现集群的再平衡

1
powershell复制代码[Tom@hadoop102 hadoop-3.1.3]$ sbin/start-balancer.sh -threshold 10

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

慢查询与MySQL语句优化 1 慢查询 2 Mysql语句优

发表于 2021-11-26

如果我们了解了Mysql中的索引原理之后,(详见探秘数据库 —— 事务 + InnoDB存储引擎),如何利用索引并对一些执行较慢的sql进行优化也是必要的,所以我们可以结合索引的原理来探究一下慢查询与优化的知识。

1 慢查询

MySQL的慢查询,全名慢查询日志,

是MySQL提供的一种日志记录,用来记录在MySQL中应时间超过阈值的语句。

默认情况下,MySQL数据库并不启动慢查询,需要手动来设置这个参数。

如果不是调优需要的话,一般不建议启动该参数,开启慢查询日志会或多或少带来一定的性能影响。

慢查询日志可用于查找需要很长时间才能执行的查询,因此是优化的候选者。

  • 查看“慢查询”的配置信息:
1
mysql复制代码SHOW VARIABLES LIKE "%slow%";

)​

  • 查看“慢查询”的时间定义
1
mysql复制代码SHOW VARIABLES LIKE "long_query_time";

)​

  • 设置“慢查询”的时间定义
1
mysql复制代码SET long_query_time = 2;

)​

  • 开启慢日志
1
mysql复制代码SET GLOBAL slow_query_log = "ON";

2 Mysql语句优化

2.1 数据准备

为了做实验,我们需要现在表中插入很多很多条数据,以观察查询时候的性能差异,这里我们插入一千万条数据。

  • 创建表:
1
2
3
4
5
6
7
8
9
mysql复制代码CREATE TABLE `users` (
`id` int(11) NOT NULL AUTO_INCREMENT,
`name` varchar(30) NOT NULL,
`email` varchar(30) DEFAULT NULL,
`phone` char(11) DEFAULT NULL,
`age` int(11) DEFAULT NULL,
`sex` char(1) DEFAULT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

  • 创建插入1000万条数据的存储过程:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
mysql复制代码\d //
create procedure p1()
begin
set @i=1;
while @i<=10000000 do
insert into users values(
null,
concat('user:',@i),
concat('user:',@i,'@qq.com'),
concat('13701',FLOOR(RAND()*500000 + 500000)),
floor(rand()*100),
if(floor(rand() * 2) = 1 , '男' , '女')
);
set @i=@i+1;
end while;
end;
//
\d ;

  • 调用存储过程,完成数据插入
1
mysql复制代码call p1();

运行时间会比较久,在我的电脑上是168分钟左右,大家耐心等待哦~~

)​

我们再插入一条特殊的数据:

1
mysql复制代码insert into users values(null,"zhangsan","zhangsan@qq.com",13701383017,25,'女');

  • 查询刚刚插入的数据
1
mysql复制代码select * from users where name = "zhangsan";

)​

可以看出想要查询出这条数据需要的时间非常久,相应的也存储到了慢查询的日志里面了,对应的日志内容如下:

1
2
3
4
5
mysql复制代码# Time: 200916 15:19:54
# User@Host: root[root] @ localhost [127.0.0.1]
# Query_time: 6.708004 Lock_time: 0.000000 Rows_sent: 1 Rows_examined: 10000001
SET timestamp=1600240794;
select * from users where name = "zhangsan";

2.2 EXPLAIN语句

一条查询语句在经过MySQL查询优化器的各种基于成本和规则的优化会后生成一个所谓的执行计划

这个执行计划展示了接下来具体执行查询方式,比如多表连接的顺序是什么,对于每个表采用什么访问方法来具体执行查询等等。

MySQL为我们提供了EXPLAIN语句来帮助我们查看某个语句的具体执行计划。

  • 使用EXPLAIN分析SQL语句

)​

对输出结果的参数解释如下,其中重要的已经在上图标明:

  • id 在一个大的查询语句中每个SELECT关键字都对应一个唯一的id
  • select type SELECT 关键字对应的那个查询的类型
  • table 表名
  • partitions 匹配的分区信息
  • type 针对单表的访问方法
  • possible_keys 可能用到的索引
  • key 实际上使用的索引
  • key_len 实际使用到的索引长度
  • ref 当使用索引列等值查询时,与索引列进行等值匹配的对象信息
  • rows 预估的需要读取的记录条数
  • filtered 某个表经过搜索条件过滤后剩余记录条数的百分比
  • Extra 一些额外的信息

当我们换一种方法来查找这一条数据时,比如使用id来查询,由于id默认为主键索引,所以查询速度较快:

)​

只用了0.02秒,explain一下的结果如下,也验证了该理论:

)​

2.3 添加索引

  • 尝试给name字段加普通索引
1
mysql复制代码 alter table users add index index_name(name);

之后再使用name字段来查询,发现速度提升了不少,原因就在于我们将name字段设置成了索引项:

)​

使用explain查看一下:

)​

大家看到,索引能给数据检索提高的效率非常明显

那么是否意味着我们只要尽可能多的去建立索引就可以了呢?

每建立一个索引都会建立一棵B+树,并且需要维护,这是很费性能和存储空间的。

2.4 适当建立索引

1.创建并使用自增数字来建立主键索引

2.经常作为where条件的字段建立索引

3.添加索引的字段尽可能的保持唯一性

4.可考虑使用联合索引并进行索引覆盖

2.5 合理使用索引

  1. MySQL索引通常是被用于提高 WHERE 条件的数据行匹配时的搜索速度,
  2. 在索引的使用过程中,存在一些使用细节和注意事项。
  3. 因为不合理的使用可能会导致建立了索引之后,不一定就使用上了索引

2.5.1 不要在列上使用函数和进行运算

  • 不要在列上使用函数,将导致索引失效
1
mysql复制代码select * from news where year(publish_time) = 2017;

  • 改造为使用索引
1
mysql复制代码select * from news where publish_time = '2017-01-01';

  • 不要在列上进行运算,也会导致索引失效
1
mysql复制代码select * from news where id / 100 = 1;

  • 改造为使用索引
1
mysql复制代码select * from news where id = 100;

)​

)​

2.5.2 类型要匹配

当查询条件左右两侧类型不匹配的时候会发生隐式转换,隐式转换带来的影响就是可能导致索引失效而进行全表扫描。

  • 修改一下表中的数据
1
mysql复制代码 update users set name = '123456' where id = 10086;

  • 正常查询与含有隐式转换的对比

)​

发现如果出现隐式数据类型转换时,查询时间会非常长。

2.5.3 首部不出现通配符

  • 当在尾部使用通配符时可以使用索引

)​

  • 当在头部使用通配符时,会导致索引失效

)​

2.5.4 多个单列索引并不是最佳选择

Mysql只能使用一个索引,会从多个索引中选择一个(限制最严格的)索引,因此,为多个列创建单列索引并不能提高Mysql的查询性能。

使用多个单列索引的情况:

1
mysql复制代码alter table users add index index_name(name);

)​

看上去很美好,但文件可能会非常的大。

)​

事实上,MySQL只能使用一个单列索引。这样既浪费了空间,又没有提高性能(因为需要回行)为了提高性能,可以使用复合索引保证列都被索引覆盖。

复合索引:

1
mysql复制代码alter table users add index in_x(email,phone,name);

)​

)​

2.5.5 小心复合索引的最左侧原则

查询条件中使用了复合索引的第一个字段,索引才会被使用。因此,在复合索引中索引列的顺序至关重要。如果不是按照索引的最左列开始查找,则无法使用索引。

)​

发现如果只按照email查询时索引失效了。

2.5.6 尽可能达成索引覆盖

如果一个索引包含所有需要的查询的字段的值,直接根据索引的查询结果返回数据,而无需读表,能够极大的提高性能。因此,可以定义一个让索引包含的额外的列,即使这个列对于索引而言是无用的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot和 Spring Cloud 的关系详

发表于 2021-11-26

要了解 Spring Boot 的发展背景,还得从 2004 年 Spring Framework1.0 版本发布开始说起,不过大家都是从开始学习 Java 就使用 Spring Framework 了,所以就不做过多展开。

随着使用 Spring Framework 进行开发的企业和个人越来越多,Spring 也慢慢从一个单一简洁的小框架编程了一个大而全的开源软件,Spring Framework 的边界不断进行扩张,到了现在 Spring 几乎可以做任何事情。目前市面上绝大部分的开源组件和中间件,都有 Spring 对应组件的支持,

你们如果去关注 Spring 目前的官网,你会发现他的 slogan 是:Spring makes Java Simple。它让 Java 的开发变得更加简单。

虽然 Spring 的组件代码是轻量级的,但是它的配置却是重量级的,Spring 每集成一个开源软件,就需要增加一些基础配置,慢慢的随着我们开发的项目越来越庞大,往往需要集成很多开源软件,因此后期使用 Spirng 开发大型项目需要引入很多配置文件,太多的配置非常难以理解,并容易配置出错,这个给开发人员带来了不少的负担。

大家想象一个场景,就是假如你需要用 spring 开发一个简单的 Hello World Web 应用程序,应该要做哪些动作呢?

  • 创建一个项目结构,必然包含依赖 Maven 或者 Gradle 的构建文件。
  • 至少需要添加 spring mvc 和 servlet api 的依赖
  • 一个 web.xml,声明 spring 的 DispatcherServlet
  • 一个启用了 Spring MVC 的 spring 配置
  • 一个控制器类,“以 HelloWord”为响应的 http 请求
  • 一个用于部署应用程序的 web 应用服务器,比如 Tomcat

在整个过程中,我们发现只有一个东西和 Hello Word 功能相关,那就是控制器(controller),剩下的都是 Spring 开发的 Web 应用程序必须要的通用模版,既然所有 Spring Web 应用程序都要用到他们,那为什么还要你来提供这些东西呢?

所以,直到 2012 年 10 月份,一个叫 Mike Youngstrom(扬斯特罗姆)在 Spring Jira 中创建了一个功能请求,要求在 Spring Framework 中支持无容器 Web 应用程序体系结构,他谈到了在主容器引导 spring 容器内配置 Web 容器服务。

1
css复制代码I think that Spring's web application architecture can be significantly simplified if it were to provided tools and a reference architecture that leveraged the Spring component and configuration model from top to bottom. Embedding and unifying the configuration of those common web container services within a Spring Container bootstrapped from a simple main() method.我认为,如果要提供从上到下充分利用Spring组件和配置模型的工具和参考体系结构,则可以大大简化Spring的Web应用程序体系结构。在通过简单main()方法引导的Spring容器中嵌入和统一那些通用Web容器服务的配置。

而且 Spring 开发团队也意识到了这些问题,急需要一套软件来解决这个问题,而这个时候微服务的概念也慢慢的起来,快速开发微小独立的应用也变得很急迫。

而 Spring 恰好处在这样一个交叉点上,所以顺势而为在 2013 年初的时候,开始投入 Spring Boot 项目的研发,直到 2014 年 4 月,Spring Boot1.0 版本发布。从那以后,Spring Boot 开启了一些列的迭代和升级的过程。

经过 7 年时间的发展,到目前为止,Spring Boot 最新稳定版为 2.6.0 版本。

Spring Boot 的发展

Spring Boot 刚出生的时候,引起了很多开源社区的关注,并且也有个人和企业开始尝试使用 Spring Boot。其实直到 2016 年,Spring Boot 才真正在国内被使用起来。我之前在挖财的时候,2015 年公司就开始采用 Spring Boot 来构建基于 Dubbo 的微服务架构。到现在,Spring Boot 几乎是所有公司的第一选择。

Build Anything

Spring Boot 被官方定位为“BUILD ANYTHING”,Spring Boot 官方的概述是这么描述 Spring Boot 的。

1
arduino复制代码Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications that you can "just run".// 通过Spring Boot可以轻松的创建独立的、生产级别的基于Spring 生态下的应用,你只需要运行即可。We take an opinionated view of the Spring platform and third-party libraries so you can get started with minimum fuss. Most Spring Boot applications need minimal Spring configuration.//对于Spring平台和第三方库,我们提供了一个固化的视图,这个视图可以让我们在构建应用是减少很多麻烦。大部分spring boot应用只需要最小的Spring 配置即可。

如果大家在java培训学习过程中不习惯看英文文档,可能理解起来比较复杂,翻译成人话就是:Spring Boot 能够帮助使用 Spring Framework 生态的开发者快速高效的构建一个基于 Spring 以及 spring 生态体系的应用。

为了让大家对这句话的理解更加深刻,我们来做两个小实验,一个是基于传统的 Spring MVC 框架构建一个项目、另一种是使用 Spring Boot。

Spring MVC With Spring Boot

通过 Spring MVC 项目搭建过程来对比 Spring Boot 的差异和优势。

Spring MVC 项目搭建过程

  • 创建一个 maven-webapp 项目
  • 添加 jar 包依赖
1
xml复制代码  <dependency>      <groupId>org.springframework</groupId>      <artifactId>spring-beans</artifactId>      <version>5.2.5.RELEASE</version>  </dependency>  <dependency>      <groupId>commons-logging</groupId>      <artifactId>commons-logging</artifactId>      <version>1.2</version>  </dependency>
1
r复制代码  spring-context  spring-context-support  spring-core  spring-expression  spring-web  spring-webmvc

修改 web.xml 文件

1
xml复制代码  <context-param><!--配置上下文配置路径-->      <param-name>contextConfigLocation</param-name>      <param-value>classpath:applicationContext.xml</param-value>  </context-param>  <!--配置监听器-->  <listener>      <listener-class>org.springframework.web.context.ContextLoaderListener</listener-class>  </listener>  <listener>      <listener-class>org.springframework.web.util.IntrospectorCleanupListener</listener-class>  </listener>  <!--配置Spring MVC的请求拦截-->  <servlet>      <servlet-name>springmvc</servlet-name>      <servlet-class>org.springframework.web.servlet.DispatcherServlet</servlet-class>      <init-param>          <param-name>contextConfigLocation</param-name>          <param-value>classpath:dispatcher-servlet.xml</param-value>      </init-param>  </servlet>  <servlet-mapping>      <servlet-name>springmvc</servlet-name>      <url-pattern>/</url-patter>  </servlet-mapping>

在 resources 目录下添加 dispatcher-servlet.xml 文件

1
xml复制代码  <?xml version="1.0" encoding="UTF-8"?>  <beans xmlns="http://www.springframework.org/schema/beans"         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"         xmlns:context="http://www.springframework.org/schema/context"         xmlns:mvc="http://www.springframework.org/schema/mvc"         xmlns:aop="http://www.springframework.org/schema/aop"         xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd         http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd         http://www.springframework.org/schema/mvc http://www.springframework.org/schema/mvc/spring-mvc.xsd         http://www.springframework.org/schema/aop http://www.springframework.org/schema/aop/spring-aop.xsd">    <!-- 扫描 controller -->  <context:component-scan base-package="com.gupaoedu.controller" />  <!--开启注解驱动-->  <mvc:annotation-driven/>  <!-- 定义视图解析器 -->  <bean class="org.springframework.web.servlet.view.InternalResourceViewResolver">      <property name="prefix" value="/"/>      <property name="suffix" value=".jsp"/>  </bean>

创建一个 Controller

1
kotlin复制代码  @Controller  public class HelloController {        @RequestMapping(method = RequestMethod.GET,path = "/index")      public String index(Model model){          model.addAttribute("key","Hello Gupao");          return "index";      }  }

修改默认的 index.jsp,设置 el 表达式的解析

1
ini复制代码  <%@ page language="java" contentType="text/html; charset=utf-8"           pageEncoding="utf-8" isELIgnored="false" %>  ${key}

运行项目

Spring Boot 搭建过程

直接基于 start.spring.io 这个脚手架搭建即可。

思考和总结

咱们再回到最开始 Spring Boot 的定义部分,Spring Boot 能够帮助使用 Spring Framework 生态的开发者快速高效的构建一个基于 Spring 以及 spring 生态体系的应用。

再对比两种构建过程,似乎也能够理解 Spring Boot 的作用了吧。当然它的作用不仅于此,后续会逐步揭开它的真实面目。

通过上面这个案例我们发现,如果没有 spring boot,要去构建一个 Spring MVC 的 web 应用,需要做的事情很多

  • 引入 jar 包
  • 修改 web.xml,添加监听和拦截
  • 创建 spring mvc 核心配置文件 dispatcher-servlet.xml
  • 创建 controller
  • 部署到 tomcat

这个过程如果不熟悉,很可能需要 1~2 个小时,如果是新手,可能需要更长时间。但是 spring boot,不管是新手还是老手,都能够分分钟解决问题。

理解约定优于配置

我们知道,Spring Boot 是约定由于配置理念下的产物,那么什么是约定由于配置呢?

约定优于配置是一种软件设计的范式,主要是为了减少软件开发人员需做决定的数量,获得简单的好处,而又不失灵活性。

简单来说,就是你所使用的工具默认会提供一种约定,如果这个约定和你的期待相符合,就可以省略那些基础的配置,否则,你就需要通过相关配置来达到你所期待的方式。

约定优于配置有很多地方体现,举个例子,比如交通信号灯,红灯停、绿灯行,这个是一个交通规范。你可以在红灯的时候不停,因为此时没有一个障碍物阻碍你。但是如果大家都按照这个约定来执行,那么不管是交通的顺畅度还是安全性都比较好。

而相对于技术层面来说,约定有很多地方体现,比如一个公司,会有专门的文档格式、代码提交规范、接口命名规范、数据库规范等等。这些规定的意义都是让整个项目的可读性和可维护性更强。

Spring Boot Web 应用中约定优于配置的体现

那么在前面的案例中,我们可以思考一下,Spring Boot 为什么能够把原本繁琐又麻烦的工作省略掉呢?实际上这些工作并不是真正意义上省略了,只是 Spring Boot 帮我们默认实现了。

而这个时候我们反过来思考一下,Spring Boot Web 应用中,相对 Spring MVC 框架的构建而言,它的约定由于配置体现在哪些方面呢?

  • Spring Boot 的项目结构约定,Spring Boot 默认采用 Maven 的目录结构,其中
  • src.main.java 存放源代码文件
  • src.main.resource 存放资源文件
  • src.test.java 测试代码
  • src.test.resource 测试资源文件
  • target 编译后的 class 文件和 jar 文件
  • 内置了嵌入式的 Web 容器,在 Spring 2.2.6 版本的官方文档中 3.9 章节中,有说明 Spring Boot 支持四种嵌入式的 Web 容器
  • Tomcat
  • Jetty
  • Undertow
  • Reactor
  • Spring Boot 默认提供了两种配置文件,一种是 application.properties、另一种是 application.yml。Spring Boot 默认会从该配置文件中去解析配置进行加载。
  • Spring Boot 通过 starter 依赖,来减少第三方 jar 的依赖。

这些就是 Spring Boot 能够方便快捷的构建一个 Web 应用的秘密。当然 Spring Boot 的约定优于配置还不仅体现在这些地方,在后续的分析中还会看到 Spring Boot 中约定优于配置的体现。

Spring Boot 整合 Mybatis

实际上 Spring Boot 的本质就是 Spring,如果一定要从技术发展的过程中找到一些相似的对比的话,你们可以对比一下 Jsp/Servlet 和 Spring MVC, 两者都可以用来开发 Web 项目,但是在使用上,Spring MVC 的使用会更加简单。

而 Spring Boot 和 Spring 就相当于当年的 JSP/Servlet 和 Spring MVC 的关系。所以它本身并没有所谓新的技术,接下来,我带着大家来通过 Spring Boot 整合 Mybatis 实现数据的基本操作的案例,来继续认识一下 Spring Boot。

创建 Spring Boot 应用

创建一个 Web 项目

引入项目中需要的 starter 依赖

1
xml复制代码<dependency>    <groupId>mysql</groupId>    <artifactId>mysql-connector-java</artifactId></dependency><dependency>    <groupId>org.mybatis.spring.boot</groupId>    <artifactId>mybatis-spring-boot-starter</artifactId>    <version>2.1.2</version></dependency><dependency>    <groupId>org.springframework.boot</groupId>    <artifactId>spring-boot-starter-thymeleaf</artifactId></dependency>

创建数据库表

1
sql复制代码DROP TABLE IF EXISTS `t_user`;CREATE TABLE `t_user` (  `id` int(10) unsigned NOT NULL AUTO_INCREMENT,  `name` varchar(20) DEFAULT NULL,  `address` varchar(80) DEFAULT NULL,  PRIMARY KEY (`id`)) ENGINE=InnoDB DEFAULT CHARSET=latin1;

配置数据库连接

1
ruby复制代码spring:  datasource:    url: jdbc:mysql://192.168.13.106:3306/test_springboot    username: root    password: root    driver-class-name: com.mysql.cj.jdbc.Driver

开发数据库访问层

创建实体对象

1
arduino复制代码public class User {    private int id;    private String name;    private String address;}

创建 Mapper

1
2
sql复制代码//@Repository可以支持在你的持久层作为一个标记,可以去自动处理数据库操作产生的异常@Repository@Mapperpublic interface UserMapper {
User findById(int id); List<User> list(); int insert(User user); int delete(int id); int update(User user);}

编写 mapper 文件

在 resource 文件目录下创建 UserMapper.xml 文件,内容如下

1
2
xml复制代码<?xml version="1.0" encoding="utf-8" ?><!DOCTYPE mapper PUBLIC        "-//mybatis.org//DTD com.example.Mapper 3.0//EN"        "http://mybatis.org/dtd/mybatis-3-mapper.dtd"><mapper namespace="com.example.demo.mapper.UserMapper">    <resultMap id="resultMap" type="com.example.demo.entity.User">        <id property="id" column="id"/>        <result property="name" column="name"/>        <result property="address" column="address"/>    </resultMap>
<select id="findById" resultMap="resultMap" parameterType="java.lang.Integer"> select * from t_user where id=#{id} </select> <select id="list" resultMap="resultMap"> select * from t_user </select> <insert id="insert" parameterType="com.example.demo.entity.User" keyProperty="id" useGeneratedKeys="true"> insert into t_user(name,address) values(#{name,jdbcType=VARCHAR},#{address,jdbcType=VARCHAR}) </insert> <delete id="delete" parameterType="java.lang.Integer"> delete from t_user where id=#{id} </delete> <update id="update" parameterType="com.example.demo.entity.User"> update t_user set name=#{name,jdbcType=VARCHAR},address=#{address,jdbcType=VARCHAR} where id=#{id,jdbcType=INTEGER} </update></mapper

定义 service 及实现

1
2
3
java复制代码public interface IUserService {
User findById(int id); List<User> list(); int insert(User user); int delete(int id); int update(User user);}
@Servicepublic class UserServiceImpl implements IUserService { @Autowired private UserMapper userMapper;}

创建 Controller

1
2
3
4
5
6
typescript复制代码@RestControllerpublic class Controller {
@Autowired private IUserService userService;
@GetMapping("/user/{id}") public User user(@PathVariable("id") int id){ return userService.findById(id); }
@GetMapping("/users") public List<User> users(){ return userService.list(); }
@PostMapping("/user") public String insertUser(User user){ int row=userService.insert(user); return row>0?"SUCCESS":"FAILED"; }
@PutMapping("/user") public String updateUser(User user){ int row=userService.update(user); return row>0?"SUCCESS":"FAILED"; } @DeleteMapping("/user/{id}") public String deleteUser(@PathVariable("id") int id){ return userService.delete(id)>0?"SUCCESS":"FAILED"; }}

修改配置

在 Spring 的 Main 方法上增加以下注解,用来扫描 Mybatis 的 Mapper 文件

1
kotlin复制代码  @MapperScan("com.example.demo.mapper")

配置 Mapper 配置文件的地址,在 application.yml 中

1
yaml复制代码  mybatis:    mapper-locations: classpath:*Mapper.xml

id int,

name varchar(20),

address varchar(20)

)

项目打包

  • mvn -Dmaven.test.skip -U clean install
  • java -jar xxx.jar

简单总结

这个代码,我想,大家应该写过无数遍了,而在基于 Spring Boot 集成 Mybatis 这个案例中,核心的业务逻辑并没有减少,它只减少了一些繁琐的配置,使得我们更聚焦在业务开发层面。

简单来说,基于 Spring Boot 的项目中,我们只需要写 Controlelr、Service、Dao 即可。甚至很多情况下我们 dao 都不需要管,比如使用 mybatis-plus 这个插件,就可以省去很多固定的 dao 层逻辑。

所以实际上,Spring Boot 并没有新鲜的东西,因此你看到市面上大部分讲 spring boot 的书,这些书我几乎都看过,基本上都是讲解 Spring Boot 的应用,以及 Spring Boot 的一些特性分析。因为一旦你想讲 Spring Boot 的原理,就必然会回归到 Spring 这块的内容上。比如《Spring Boot 编程思想》这本书,大篇幅的都是在讲 Spring Framework。因为 Spring Boot 的内核还是 Spring Framework。

Spring Boot 与微服务

接下来,给大家讲讲 spring boot 与微服务这块的内容。

什么是 Spring Cloud

首先,我们要简单了解一下什么是微服务,按照我的理解来说,微服务就是微粒度的服务,它是面向服务架构(SOA)的进一步优化。如果大家不是很好理解,翻译成白话就是

一个业务系统,原本是在一个独立的 war 包中。现在为了更好的维护和提高性能,把这个 war 包按照业务纬度拆分成了一个个独立的业务子系统,每个子系统提供该业务领域相关的功能,并暴露 API 接口。

这些服务彼此之间进行数据交换和通信来实现整个产品的功能。

而这些业务子系统,实际上代表的就是一个服务,那么所谓的微服务,说的是这个服务的粒度。至于服务的粒度什么样才叫微,其实没有一个固定的衡量标准。更多的还是在每个公司具体的业务粒度的把控上。

微服务化遇到的问题

在为服务化之后,会面临很多的问题,比如服务注册、服务路由、负载均衡、服务监控等等。这些问题都需要有相应的技术来解决,这个时候,Spring Cloud 就出现了。

简单来说,Spring Cloud 提供了一些可以让开发者快速构建微服务应用的工具,比如配置管理、服务发现、熔断、智能路由等,这些服务可以在任何分布式环境下很好地工作。Spring Cloud 主要 致力于解决如下问题:

  • Distributed/versioned configuration,分布式及版本化配置。
  • Service registration and discovery,服务注册与发现。
  • Routing,服务路由。
  • Service-to-service calls,服务调用。
  • Load balancing,负载均衡。
  • Circuit Breakers,断路器。
  • Global locks,全局锁。
  • Leadership election and cluster state,Leader 选举及集群状态。
  • Distributed messaging,分布式消息。

需要注意的是,Spring Cloud 并不是 Spring 团队全新研发的框架,它只是把一些比较优秀的解决微服务架构中常见问题的开源框架基于 Spring Cloud 规范进行了整合,通过 Spring Boot 这个 框架进行再次封装后屏蔽掉了复杂的配置,给开发者提供良好的开箱即用的微服务开发体验。不难看出,Spring Cloud 其实就是一套规范,而 Spring Cloud Netflix、Spring Cloud Consul、Spring CloudAlibaba 才是 Spring Cloud 规范的实现。

为什么 Spring Cloud 是基于 Spring Boot

那为什么 Spring Cloud 会采用 Spring Boot 来作为基础框架呢?原因很简单

  1. Spring Cloud 它是关注服务治理领域的解决方案,而服务治理是依托于服务架构之上,所以它仍然需要一个承载框架
  2. Spring Boot 可以简单认为它是一套快速配置 Spring 应用的脚手架,它可以快速开发单个微服务

在微服务架构下,微服务节点越来越多,需要一套成熟高效的脚手架,而 Spring Boot 正好可以满足这样的需求,如下图所示。

​

Spring Boot 的四大核心机制

如果一定要基于 Spring Boot 的特性去说,那么只能去说 Spring Boot 的四大核心机制,分别是 @EnableAutoConfiguration 、 Starter 开箱即用组件、Actuator 应用监控、Spring Boot CLI 命令行工具。

EnableAutoConfiguration

Starter

告诉 Spring Boot 需要什么功能,它就能引入需要的库。

Actuator

让你能够深入运行中的 Spring Boot 应用程序

Spring Boot CLI

Spring Boot CLI 为 Spring Cloud 提供了 Spring Boot 命令行功能。我们可以通过编写 groovy 脚本来运行 Spring Cloud 组件应用程序。步骤如下

  • 下载 spring-boot-cli
  • Spring Boot CLI:repo.spring.io/release/org…
  • 配置环境变量
  • 在控制台spring --version查看 CLI 版本
  • 使用 CLI 运行应用。我们可以使用 run 命令编译和运行 Groovy 源代码。Spring Boot CLI 中包含所有运行 Groovy 所需要的依赖。
  • 创建一个hello.groovy文件
1
kotlin复制代码  @RestController  class HelloController {        @GetMapping("/hello")      String hello(){          return "Hello World";      }  }

在控制台执行spring run hello.groovy,如果需要传递参数,比如端口,和 JVM 参数类似

1
ini复制代码  spring run hello.groovy -- --server.port=9000

Spring Boot 的四大核心特性

  • EnableAutoConfiguration
  • Starter
  • Actuator
  • Spring Boot CLI
  • Spring Boot CLI 为 Spring Cloud 提供了 Spring Boot 命令行功能。我们可以通过编写 groovy 脚本来运行 Spring Cloud 组件应用程序。步骤如下、
  • 下载 spring-boot-cli
  • Spring Boot CLI:repo.spring.io/release/org…
  • 配置环境变量
  • 在控制台spring --version查看 CLI 版本
  • 使用 CLI 运行应用。我们可以使用 run 命令编译和运行 Groovy 源代码。Spring Boot CLI 中包含所有运行 Groovy 所需要的依赖。
  • 创建一个hello.groovy文件
1
kotlin复制代码    @RestController    class HelloController {            @GetMapping("/hello")        String hello(){            return "Hello World";        }    }

在控制台执行spring run hello.groovy,如果需要传递参数,比如端口,和 JVM 参数类似

1
ini复制代码    spring run hello.groovy -- --server.port=9000

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

微信ClickHouse实时数仓的最佳实践 一、微信遇到的挑

发表于 2021-11-26

作者:微信WeOLAP团队&腾讯云数据仓库Clickhouse团队

微信作为一款国民级应用,已经覆盖了社交、支付、出行等人们生活的方方面面。海量多样化的业务形态,对数据分析提出了新的挑战。为了满足业务数据分析的需求,微信WeOLAP团队联手腾讯云,共建千台规模、数据PB级、批流一体的ClickHouse数据仓库,实现了10倍以上的性能提升。下文将由浅入深,为大家揭晓微信在ClickHouse实时数仓实践中积累的经验及方法。

一、微信遇到的挑战

一般来说,微信主要的数据分析场景包含以下几个方面:

1.科学探索:服务于数据科学家,通过即席查询做业务上的归因推断;

2.看板:服务于运营和管理层,展示所关注的核心指标;

3.A/B 实验平台:服务于算法工程师,把新的模型,放在A/B实验平台上做假设检验,看模型是否符合预期。

除此以外,还有实时监控、日志系统明细查询等场景。

图片

在所有的场景当中,使用者都有非常重要的诉求——快:希望查询响应更快,指标开发更快完成,看板更新更及时。与此同时,微信面临的是海量的数据,业务场景中“单表日增万亿”很常见,这就对下一代“数据分析系统”提出新的挑战。

在使用ClickHouse之前,微信使用的是Hadoop生态为主的数仓,存在以下这些问题:

1.响应慢,基本上是分钟级,可能到小时,导致决策过程长;

2.开发慢,由于传统的数仓理念的多层架构,使得更新一个指标的成本很高;

3.架构臃肿,在微信业务体量规模的数据下,传统架构很难做到流批一体。进而导致,代码需要写多套、数据结果难以对齐、存储冗余。经过十几年的发展之后,传统的Hadoop生态的架构变得非常臃肿,维护难度和成本都很大。

所以,微信一直在寻求更轻量、简单敏捷的方案来解决这些问题。经过一番调研,在百花齐放的OLAP产品中,最终选定了ClickHouse作为微信OLAP的主要核心引擎。主要有两个原因:

1.效率:在真实数据的实验场景下,ClickHouse要比Hadoop生态快10倍以上(2020年底测试);

2.开源:微信的A/B实验、线上特征等场景会有些个性化需求,需要对引擎内核做较多改动;

因此,微信尝试在OLAP场景下,构建基于ClickHouse计算存储为核心的“批流一体”数仓。

但是,使用原生的ClickHouse,在真正放量阶段出现了很多问题:

1.稳定性:ClickHouse的原始稳定性并不好,比如说:在高频写入的场景下经常会出现too many part等问题,整个集群被一个慢查询拖死,节点OOM、DDL请求卡死都比较常见。另外,由于ClickHouse原始设计缺陷,随数据增长的依赖的zookeeper瓶颈一直存在,无法很好解决;微信后期进行多次内核改动,才使得它在海量数据下逐步稳定下来,部分issue也贡献给了社区。

2.使用门槛较高:会用ClickHouse的,跟不会用ClickHouse的,其搭建的系统业务性能可能要差3倍甚至10倍,有些场景更需要针对性对内核优化。

二、微信和腾讯云数据仓库共建

此时,腾讯云数据仓库Clickhouse团队积极深入业务,主动与微信团队合作,双方开始共同解决上述问题。腾讯云数据仓库Clickhouse提供全托管一站式的全面服务,使得微信团队不需要过多关注稳定性问题。另外,双方团队积累了丰富查询优化经验,共享经验更有利于Clickhouse性能极致提升。

图片

微信跟腾讯云数据仓库Clickhouse的合作,从今年3月份开始,在验证期小规模试用ClickHouse后,业务一直在快速增长,双方开始共建进行稳定性和性能上的优化。主要做了两件事:一个是建立了整个ClickHouse OLAP的生态,另外一个是做了的探索出贴近业务的查询优化方法。

图片

三、共建 ClickHouse OLAP 的生态

要想比较好地解决ClickHouse易用性和稳定性,需要生态支撑,整体的生态方案有以下几个重要的部分:

1.QueryServer:数据网关,负责智能缓存,大查询拦截,限流;

2.Sinker:离线/在线高性能接入层,负责削峰、hash路由,流量优先级,写入控频;

3.OP-Manager:负责集群管理、数据均衡,容灾切换、数据迁移;

4.Monitor:负责监控报警,亚健康检测,查询健康度分析,可与Manager联动;

图片

微信WeOLAPt团队和腾讯云重点在以下方面进行了合作攻坚:

1.高性能接入:微信的吞吐达到了十亿级别,实时接入方面,通过令牌、反压的方案,比较好地解决了流量洪峰的问题。另外通过Hash路由接入,使数据落地了之后可直接做Join,无需shuffle实现的更快Join查询,在接入上也实现了精确一次。离线同步方案上,微信跟大多数业界的做法基本上一致,在通过预构Merge成建成Part,再送到线上的服务节点,这其实是种读写分离的思想,更便于满足高一致性、高吞吐的场景要求。

图片

2.极致的查询优化:ClickHouse整个的设计哲学,要求在特定的场景下,采用特定的语法,才能得到最极致的性能。为解决ClickHouse 使用门槛高的问题,微信把相应的优化经验落地到内部BI平台上,沉淀到平台后,使得小白用户都可以方便使用ClickHouse。通过一系列优化手段,在直播、视频号等多个Case实现10倍以上性能提升。

图片

基于共建的ClickHouse生态,在微信有以下的典型应用场景:

1.BI 分析/看板:由于科学探索是随机的,很难通过预构建的方式来解决,之前用Hadoop的生态只能实现小时到分钟的级别。目前ClickHouse优化完之后,在单表万亿的数据量下,大多数的查询,P95在5秒以内。数据科学家现在想做一个验证,非常快就可以实现。

图片

2.A/B 实验平台:早期做A/B实验的时候,前一天晚上要把所有的实验统计结果,预先聚合好,第二天才能查询实验结果。在单表数据量级千亿/天、大表实时Join的场景下,微信前后经历了几个方案,实现了近50倍的性能提升。从离线到实时分析的飞跃,使得P95响应<3S,A/B实验结论更加准确,实验周期更短,模型验证更快。

图片

3.实时特征计算:虽然大家普遍认为ClickHouse不太擅长解决实时相关的问题,但最终通过优化,可以做到扫描量数十亿,全链路时延<3秒,P95响应近1秒。

图片

四、性能的显著提升

目前,微信当前规模千台,数据量PB级,每天的查询量上百万,单集群TPS达到了亿级,而查询耗时均值仅需秒级返回。ClickHouse OLAP的生态相对于之前的Hadoop生态,性能提升了10倍以上,通过流批一体提供更稳定可靠的服务,使得业务决策更迅速,实验结论更准确。

五、共建存算分离的云原生数仓

ClickHouse原始的设计和Shard-Nothing的架构,无法很好地实现秒级伸缩与Join的场景;因此下一个微信和腾讯云数据仓库ClickHouse的共建目标,是实现存算分离的云原生数仓:

1.弹性扩容:秒级弹性能力,用户只为使用付费,实现高峰查询更快,低峰成本更省;

2.稳定性:无 ZK 瓶颈,读写易分离,异地容灾;

3.易运维:数据容易均衡,存储无状态;

4.功能全:专注于查询优化与Cache策略、支持高效多表Join;

存算分离的云原生数仓能力,明年将会在腾讯云官网上线,敬请期待!

图片

本文章由微信技术架构部-WeOLAP团队出品,「WeOLAP」专注于用前沿大数据技术解决微信海量数据高性能查询问题。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…177178179…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%