开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Freemarker 教程(一)-模板开发手册

发表于 2021-02-04

本文是Freemarker系列的第一篇,面向模板开发人员,主要介绍 FreeMarker 所使用的 FTL(FreeMarker Template Language) 语法,了解 Freemarker 的基本概念,介绍基本的 FTL 术语 及内置函数,内置指令,方便作为开发手册速查(文中演示所用版本为 2.3.30,实际使用中请根据自己项目版本自查官网)。

本文不会罗列官网API,只在必要时演示其语法,代码工程中有课代表整理的 freemarker api 思维导图,配合此文食用可使功力大增!请到 课代表的 github自取。

1.FreeMarker 是什么

Freemarker是一款纯 Java编写的模板引擎软件,可以用来生成各种文本,包括但不限于:HTML,E-Mail以及各种源代码等等。

它的主要任务就是:把模板和数据组装在一起,生成文档,这个过程又叫渲染(Render)。流程如图:

freemarker-overview.png

由于大部分模板开发人员都是用它来生成HTML页面,所以本文将基于 SpringBoot(2.4.1)+Freemarker(2.3.30)+SpringWeb演示 HTML 页面的渲染

2.最简单的模板

假设我想要一个简单页面用来欢迎当前用户,模板代码:

1
2
3
4
5
6
7
8
html复制代码<html>
<head>
<title>index</title>
</head>
<body>
<p>你好,${userName}</p>
</body>
</html>

${userName}是 FTL 的插值语法,他会把userName的值替换到生成的 HTML 中,从而根据当前登录者,显示不同的用户名,这个值由后端代码放到Model中,对应的 Controlelr 代码:

1
2
3
4
5
6
7
8
9
java复制代码@Controller
public class HelloWorld {
@GetMapping("hello")
public String hello(Model model) {
model.addAttribute("userName","Java 课代表");
// 返回模板名称
return "index";
}
}

访问页面:
freemarker-hello.png

数据由后端代码通过数据模型(Model)传递,模板只关心数据如何展示(View),二者的关联关系由 Controller 来控制,这就是 MVC。

3.数据模型(data-model)

Controller中添加到 model 中的数据是如何组织的呢?这就需要了解一下FTL的数据模型(data-model)。

FTL 的数据模型在结构上是一个树形:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
json复制代码(root)
|
+- animals
| |
| +- mouse
| | |
| | +- size = "small"
| | |
| | +- price = 50
| |
| +- elephant
| | |
| | +- size = "large"
| | |
| | +- price = 5000
| |
| +- python
| |
| +- size = "medium"
| |
| +- price = 4999
|
+- message = "It is a test"
|
+- misc
|
+- foo = "Something"

其中的root 可以理解为 Controller 中的 model ,通过 model.addAttribute("userName","Java 课代表");就可以往数据模型中添加数据。

数据模型中可以像目录一样展开的变量,如:root, animals, mouse, elephant, python, misc称之为哈希(hash)。哈希的 key 就是变量名,value 就是变量存储的值,通过.分隔的路径可以访问变量值,比如访问 mouse 的 price :animals.mouse.price.

像animals.mouse.price这样存储单个值的变量叫做标量(scalar),标量有四种具体类型:string,boolean,date-like,number;

还有一种变量叫做序列(sequence),可以类比为 Java 中的数组,序列中的每个项没有名字,可以通过遍历,或者下标的方式访问(后面会演示序列的访问),它的数据结构看起来是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码(root)
|
+- animals
| |
| +- (1st)
| | |
| | +- name = "mouse"
| | |
| | +- size = "small"
| | |
| | +- price = 50
| |
| +- (2nd)
| | |
| | +- name = "elephant"
| | |
| | +- size = "large"
| | |
| | +- price = 5000
| |
| +- (3rd)
| |
| +- name = "python"
| |
| +- size = "medium"
| |
| +- price = 4999
|
+- misc
|
+- fruits
|
+- (1st) = "orange"
|
+- (2nd) = "banana"

FTL 里常用的数据类型就这三类:哈希(hashe),标量(scalar),序列(sequence)。

有了数据,还要有语法来组织这些数据,下面介绍 FTL 中的常用语法。

4.FTL 语法

FreeMarker 只认如下三种语法:

  1. 插值:${…} ,Freemarker 会将里面的变量替换为实际值
  2. FTL 标签(tags):结构上类似HTML的标签,都是用<>包裹起来,普通标签以<#开头,用户自定义标签以<@开头,如<#if true>true thing<#/if>,<@myDirect></@myDirect>

你会看到两种叫法,1:标签(tags),2:指令(directive)。举个例子:<#if></#if>叫标签; 标签里面的if是指令,可以类比于html中的标签(如:<table></table>)和元素(如:table)。不过,把标签和指令认为是同义词也没有问题。

  1. 注释(Comments):FTL 中的注释是这样的:<#-- 被注释掉的内容 -->,对于注释,FTL会自动跳过,所以不会显示在生成的文本中(这点有别于 HTML 的注释)

注意:除以上三种语法之外的所有内容,皆被 FreeMarker 视为普通文本,普通文本会被原样输出

插值就是单纯的替换变量的值,注释更没啥好说的,下面主要介绍几个最常用的 FTL 标签(指令)并结合代码演示其用法。

if 指令

if 可以根据条件跳过模板中的某块代码,以前文为例,当 userName 值为 “Java课代表” 或zhengxl5566时,用特殊样式展示,相关模板代码如下:

1
2
3
4
5
6
7
8
9
html复制代码<p>你好,
<#if userName == "Java 课代表">
<strong>${userName}</strong>
<#elseif userName == "zhengxl5566">
<h1>${userName}</h1>
<#else>
${userName}
</#if>
</p>

list 指令

list用来遍历序列,其语法为:

1
2
3
html复制代码<#list sequence as loopVariable>
repeatThis
</#list>

比如后台往 model 里放入一个 allUsers 的集合

1
java复制代码model.addAttribute("allUsers",userService.getAllUser());

可以直接使用下标访问集合中的某个元素:${allUsers[0].name}

也可以在模板中直接遍历展示:

1
2
3
4
5
6
7
html复制代码<ol>
<#list allUsers as user>
<li>
姓名:${user.name},年龄:${user.age}
</li>
</#list>
</ol>

实际渲染出来的 HTML:

1
2
3
4
5
6
7
8
9
10
11
html复制代码<ol>
<li>
姓名:zxl,年龄:18
</li>
<li>
姓名:ls,年龄:19
</li>
<li>
姓名:zs,年龄:16
</li>
</ol>

注意:假设 allUsers 是空的,渲染出来的页面会是<ol></ol>,如果需要规避这个情况,可以使用 items 标签

1
2
3
4
5
6
7
8
9
html复制代码<#list allUsers>
<ol>
<#items as user>
<li>
姓名:${user.name},年龄:${user.age}
</li>
</#items>
</ol>
</#list>

此时,假设 allUsers是空的,list 标签中的 html 内容就不会被渲染出来。

include 指令

include 指令可以把一个模板的内容插入到另一个模板中(官方建议使用 import 代替,参见下文的最佳实践)。
假设我们每个页面都需要一个 footer,可以写一个公共的footer.ftlh模板,其余需要footer的页面只需要引用footer.ftlh模板即可:

1
html复制代码<#include "footer.ftlh">

import 指令

import 可以将模板中定义的变量引入当前模板,并在当前模板中使用。它和 include 的主要区别就是 import 可以将变量封装到新的命名空间中(后文会介绍 import 和 include 的对比)。

例如:模板 /libs/commons.ftl 里面写了很多公共方法,想在其他模板里引用,只需要在其他模板的开头写上:

1
javascript复制代码<#import "/libs/commons.ftl" as com>

后续想使用/libs/commons.ftl 中的 copyright 方法,可以直接使用:

1
html复制代码<@com.copyright date="1999-2002"/>

assign 指令

assign 可以用来创建新的变量并为其赋值,语法如下:

1
2
3
4
5
6
7
8
9
10
11
html复制代码<#assign name1=value1 name2=value2 ... nameN=valueN>
or
<#assign name1=value1 name2=value2 ... nameN=valueN in namespacehash>
or
<#assign name>
capture this
</#assign>
or
<#assign name in namespacehash>
capture this
</#assign>

举例:

1
2
3
4
html复制代码<#--创建字符串-->
<#assign myStr = "Java 课代表">
<#--使用插值语法显示字符串-->
myStr:${myStr}

macro 指令

macro 用来从模板上创建用户自定义指令(Java后端可以通过实现TemplateDirectiveModel接口自定义指令,将在下一篇:《Freemarker 教程(二)-后端开发指南》中介绍)

macro 创建的也是变量,该变量可以做为用户自定义指令使用,比如下面的模板定义了 greet指令:

1
2
3
html复制代码<#macro greet>
<h1>hello 课代表</h1>
</#macro>

使用 greet 指令

1
2
3
html复制代码<@greet></@greet>
或者
<@greet/>

指令还可以附带参数:

1
2
3
html复制代码<#macro greet person>
<h1>hello ${person}</h1>
</#macro>

使用时传入 person 变量:

1
html复制代码<@greet person="Java课代表"/> and <@greet person="zhengxl5566"/>

5.内置函数(Built-ins)

所谓内置函数,就是 FreeMarker 针对不同数 据类型为我们提供的一些内置方法,有了这些方法,可以让数据在模板中的展示更加方便。使用内置函数时,只需要在变量后面使用?加相应函数名即可。篇幅有限,这里不打算罗列所有内置函数,只挑几个简单例子展示其语法。

例子一:字符串的内置函数

1
2
3
4
5
6
7
8
html复制代码<#--创建字符串-->
<#assign myStr = "Java 课代表">
<#--首字母小写-->
${myStr?uncap_first}
<#--保留指定字符后面的字符串-->
${myStr?keep_after("Java")}
<#--替换指定字符-->
${myStr?replace("Java","Freemarker")}

例子二:时间类型内置函数

1
2
3
4
5
6
7
8
9
10
html复制代码<#--获取当前时间(如果是后端将时间传入data-model,只需要传Date类型即可)-->
<#assign currentDateTime = .now>
<#--展示日期部分-->
${currentDateTime?date}<br>
<#--展示时间部分-->
${currentDateTime?time}<br>
<#--展示日期和时间部分-->
${currentDateTime?datetime}<br>
<#--按指定格式展示时间日期-->
${currentDateTime?string("yyyy-MM-dd HH:mm a")}<br>

例子三:序列的内置函数

1
2
3
4
5
6
7
8
html复制代码<#--序列类型内置函数样例-->
<#assign mySequence = ["Java 课代表","张三","李四","王五"]>
<#--将所有元素以指定分隔符分割,输出字符串-->
${mySequence?join(",")}<br>
<#--取序列中的第一个元素-->
${mySequence?first}<br>
<#--将序列排序后使用逗号分割输出为字符串-->
${mySequence?sort?join(",")}<br>

通过以上三个例子的简单演示,相信你已经能掌握内置函数的使用技巧了,就是在变量后面用 ?加变量数据类型所支持的函数。FTL 的内置函数极其丰富,官网按数据类型详细罗列了各自支持的内置函数及其用法,可自行查看官网的内置函数参考。

为了方便大家快速查阅相关内置函数(built-ins)和指令(directives),课代表从官网翻译,并使用 xmind 做了个思维导图,每个函数(指令)都可以点进去查看功能描述和样例,可以极大提高模板开发效率:

freemarker-xmind.png

原始 xmind 文件放在 课代表的github上,读者可以按需自取。

课代表划重点!这个思维导图是全文精华,一定要下载下来看看!

6.命名空间(Namespaces)

所谓命名空间,就是在同一个模板里,所有使用 assign,macro,function 指令所创建的变量集合,它的主要作用就是唯一标识一个变量。

有两种方式可以创建命名空间:

1、同一个模板中的变量在同一个命名空间中。

以如下的index.ftlh为例,这里面创建的变量都在一个命名空间下,同名变量的值会相符覆盖

1
2
3
4
html复制代码<#assign myName = "Java 课代表">
<#assign myName = "课代表">
<#--实际输出的是“课代表”-->
${myName}

2、不同模板中的变量可以通过 import 指令来区分不同的命名空间变量

模板A中想使用模板B中的变量,可以使用 import 指令,给引入的模板定义一个新的命名空间,通过 as 后面指定的 key 访问该新命名空间。

比如模板 lib/example.ftlh中定义了copyright:

1
2
3
4
5
html复制代码<#macro copyright date>
<p>Copyright (C) ${date} Someone. All rights reserved.</p>
</#macro>

<#assign mail = "user@example.com">

在另一个模板index.ftlh中使用copyright:

1
2
3
html复制代码<#import "lib/example.ftlh" as e>
<@e.copyright date="1999-2002"/>
${e.mail}

命名空间的生命周期(The life-cycle of namespaces)

命名空间由 import 指令中的 path 确定(绝对路径),如果相同的 path 引入了多次,只有第一次调用 import 的时候才会触发相应命名空间的创建。后面对于相同模板路径的 import,指代的都是同一个命名空间,举例:

1
2
3
4
5
6
html复制代码<#import "/lib/example.ftl" as e>
<#import "/lib/example.ftl" as e2>
<#import "/lib/example.ftl" as e3>
${e.mail}, ${e2.mail}, ${e3.mail}
<#assign mail="other@example.com" in e>
${e.mail}, ${e2.mail}, ${e3.mail}

将/lib/example.ftl import 后赋给 e,e2,e3三个命名空间,修改e中的 mail, 对e2,e3同样生效

输出:

1
2
html复制代码user@example.com, user@example.com, user@example.com
other@example.com, other@example.com, other@example.com

import 和 include 的区别

<#import "lib/example.ftlh" as e> 会创建一个全新的命名空间,并把lib/example.ftlh中定义的变量封装到新命名空间中,提供访问,如:<@e.copyright date="1999-2002"/>。

<#include "lib/example.ftlh"> 只是单纯把example.ftlh的内容插入到当前模板,并不会对命名空间产生影响

Freemarker官方建议:

所有使用 include 的地方都应该被 import 替代

使用 import 好处如下:

  • import 引入的模板只会被执行一次,重复引用多次并不会重复执行模板。而对于 include 而言,每次 include 都会执行一次模板内容;
  • import 可以创建该模板的命名空间,引用变量时可以清晰表达出变量来源,减少命名冲突概率;
  • 如果定义了很多通用方法,可以将auto-import配置为懒加载,用到哪个加载哪个。而auto-include无法实现懒加载,必须全量加载;
  • import 指令不会有任何输出,而 include 可能会根据模板内容输出相应字符。

7.最佳实践

1.空值处理

变量空值的处理

对于不存在的变量和值为 null 的变量,freemarker 统一认为是不存在的值,对其调用将会报错。为了避免这种情况,可以设置默认值,示例:<h1>Welcome ${user!"visitor"}!</h1>,当user不存在时,将显示visitor字符串。

另一种方式是在变量后面使用 ??表达式,如:user??如果user存在,则返回 true,否则返回 false,示例:<#if user??><h1>Welcome ${user}!</h1></#if>,当user不存在时,就不展示欢迎标语了。

list中的空值处理

遍历序列的时候,假设序列中有空值,freemarker并不会直接报错或显示空值,而是像更上一级作用域去搜索同名变量值,这种行为可能会导致错误的输出,举例:

1
2
3
4
html复制代码<#assign x = 20>
<#list xs as x>
${x!'Missing'}
</#list>

本意是当遇到序列 xs 中的空元素是显示“missing” 字符串,但由于 freemarker 向上查找的特性, 这里的空值将会显示为 20。要关闭这个特性,可以在服务端配置:configuration.setFallbackOnNullLoopVariable(false);

2.使用 import 代替 include

前文在介绍到 include 的时候提到过,官方建议应该将所有用到 include 的地方都用 import 实现

我们平时用到 include 指令,主要就是用来把一段内容插入到当前模板,那如何用 import 实现 include 的功能呢?

很简单,把需要插入的内容封装成 自定义指令就好了。

比如我们从common.ftlh 里定义一个自定义指令 myFooter

1
2
3
4
html复制代码<#macro myFooter>
<hr>
<p>这里是 footer</p>
</#macro>

在需要使用的地方,引入 common.ftlh 并调用 myFooter 指令

1
2
3
4
html复制代码<#import "lib/common.ftlh" as common>
<#--将include使用import代替-->
<#--<#include "footer.ftlh">-->
<@common.myFooter/>

3.隔行变色

数据展示的时候经常遇到使用表格展示数据的情况,为了增加辨识度,一般会让奇数行和偶数行颜色不同以区分,这就是隔行变色

下面以遍历序列为例:

1
2
3
4
html复制代码<#assign mySequence = ["Java 课代表","张三","李四","王五"]>
<#list mySequence as name>
<span style="color: ${name?item_cycle("red","blue")}"> ${name}<br/> </span>
</#list>

这里的 item_cycle 是循环变量的内置函数,至于他的详细用法,再次推荐你去看一下课代表整理的思维导图。

8.总结

本文介绍了 Freemarker 的基本概念和基础语法,意在让刚接触的萌新能对 Freemarker 有个全局性认识,了解Freemarker的数据模型、内置函数、指令。只要能区分这几个概念,实际开发中现用现查即可,不要一开始就迷失在海量的API中。

总体来说,Freemarker 是一个比较简单,容易上手的模板引擎,只要掌握了本文所提及的基本概念,直接上手开发是完全没问题的。

在本文写作过程中,课代表意识到纯文字表达的局限性,又不能全文罗列和翻译API,于是整理了一个思维导图,将Freemarker 的各类指令,内置函数及其官网示例全都翻译整理了进去。在平时开发过程中极大提升了开发效率,需要的同学请到 课代表的 github自取。

最后附上思维导图镇楼:

freemarker-api-xmind.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

深入理解nodejs的HTTP处理流程 简介 使用nodej

发表于 2021-02-03

简介

我们已经知道如何使用nodejs搭建一个HTTP服务,今天我们会详细的介绍nodejs中的HTTP处理流程,从而对nodejs的HTTP进行深入的理解。

使用nodejs创建HTTP服务

使用nodejs创建HTTP服务很简单,nodejs提供了专门的HTTP模块,我们可以使用其中的createServer方法来轻松创建HTTP服务:

1
2
3
4
5
js复制代码const http = require('http');

const server = http.createServer((request, response) => {
// magic happens here!
});

首先createServer方法传入的是一个callback函数,这个callback函数将会在每次服务端接收到客户端的请求时调用。所以这个callback函数,也叫做 request handler.

再看看createServer的返回值,createServer返回的是一个EventEmitter对象。

之前我们也介绍过了EventEmitter,它可以发送和接收事件,所以我们可以使用on来监听客户端的事件。

上面的代码相当于:

1
2
3
4
js复制代码const server = http.createServer();
server.on('request', (request, response) => {
// the same kind of magic happens here!
});

当发送request事件的时候,就会触发后面的handler method,并传入request和response参数。我们可以在这个handler中编写业务逻辑。

当然,为了让http server正常运行,我们还需要加上listen方法,来绑定ip和端口,以最终启动服务。

1
2
3
4
5
6
js复制代码const hostname = '127.0.0.1'
const port = 3000

server.listen(port, hostname, () => {
console.log(`please visit http://${hostname}:${port}/`)
})

解构request

上面的request参数实际上是一个http.IncomingMessage对象,我们看下这个对象的定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
js复制代码    class IncomingMessage extends stream.Readable {
constructor(socket: Socket);

aborted: boolean;
httpVersion: string;
httpVersionMajor: number;
httpVersionMinor: number;
complete: boolean;
/**
* @deprecate Use `socket` instead.
*/
connection: Socket;
socket: Socket;
headers: IncomingHttpHeaders;
rawHeaders: string[];
trailers: NodeJS.Dict<string>;
rawTrailers: string[];
setTimeout(msecs: number, callback?: () => void): this;
/**
* Only valid for request obtained from http.Server.
*/
method?: string;
/**
* Only valid for request obtained from http.Server.
*/
url?: string;
/**
* Only valid for response obtained from http.ClientRequest.
*/
statusCode?: number;
/**
* Only valid for response obtained from http.ClientRequest.
*/
statusMessage?: string;
destroy(error?: Error): void;
}

通常我们需要用到request中的method,url和headers属性。

怎么从request中拿到这些属性呢?对的,我们可以使用ES6中解构赋值:

1
2
3
4
js复制代码const { method, url } = request;

const { headers } = request;
const userAgent = headers['user-agent'];

其中request的headers是一个IncomingHttpHeaders,它继承自NodeJS.Dict。

处理Request Body

从源码可以看出request是一个Stream对象,对于stream对象来说,我们如果想要获取其请求body的话,就不像获取静态的method和url那么简单了。

我们通过监听Request的data和end事件来处理body。

1
2
3
4
5
6
7
js复制代码let body = [];
request.on('data', (chunk) => {
body.push(chunk);
}).on('end', () => {
body = Buffer.concat(body).toString();
// at this point, `body` has the entire request body stored in it as a string
});

因为每次data事件,接收到的chunk实际上是一个Buffer对象。我们将这些buffer对象保存起来,最后使用Buffer.concat来对其进行合并,最终得到最后的结果。

直接使用nodejs来处理body看起来有点复杂,幸运的是大部分的nodejs web框架,比如koa和express都简化了body的处理。

处理异常

异常处理是通过监听request的error事件来实现的。

如果你在程序中并没有捕获error的处理事件,那么error将会抛出并终止你的nodejs程序,所以我们一定要捕获这个error事件。

1
2
3
4
js复制代码request.on('error', (err) => {
// This prints the error message and stack trace to `stderr`.
console.error(err.stack);
});

解构response

response是一个http.ServerResponse类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
js复制代码    class ServerResponse extends OutgoingMessage {
statusCode: number;
statusMessage: string;

constructor(req: IncomingMessage);

assignSocket(socket: Socket): void;
detachSocket(socket: Socket): void;
// https://github.com/nodejs/node/blob/master/test/parallel/test-http-write-callbacks.js#L53
// no args in writeContinue callback
writeContinue(callback?: () => void): void;
writeHead(statusCode: number, reasonPhrase?: string, headers?: OutgoingHttpHeaders): this;
writeHead(statusCode: number, headers?: OutgoingHttpHeaders): this;
writeProcessing(): void;
}

对于response来说,我们主要关注的是statusCode:

1
js复制代码response.statusCode = 404;

Response Headers:

response提供了setHeader方法来设置相应的header值。

1
2
js复制代码response.setHeader('Content-Type', 'application/json');
response.setHeader('X-Powered-By', 'bacon');

还有一个更加直接的同时写入head和status code:

1
2
3
4
js复制代码response.writeHead(200, {
'Content-Type': 'application/json',
'X-Powered-By': 'bacon'
});

最后,我们需要写入response body,因为response是一个WritableStream,所以我们可以多次写入,最后以end方法结束:

1
2
3
4
5
6
js复制代码response.write('<html>');
response.write('<body>');
response.write('<h1>Hello, World!</h1>');
response.write('</body>');
response.write('</html>');
response.end();

或者我们可以用一个end来替换:

1
js复制代码response.end('<html><body><h1>Hello, World!</h1></body></html>');

综上,我们的代码是这样的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
js复制代码const http = require('http');

http.createServer((request, response) => {
const { headers, method, url } = request;
let body = [];
request.on('error', (err) => {
console.error(err);
}).on('data', (chunk) => {
body.push(chunk);
}).on('end', () => {
body = Buffer.concat(body).toString();
// BEGINNING OF NEW STUFF

response.on('error', (err) => {
console.error(err);
});

response.statusCode = 200;
response.setHeader('Content-Type', 'application/json');
// Note: the 2 lines above could be replaced with this next one:
// response.writeHead(200, {'Content-Type': 'application/json'})

const responseBody = { headers, method, url, body };

response.write(JSON.stringify(responseBody));
response.end();
// Note: the 2 lines above could be replaced with this next one:
// response.end(JSON.stringify(responseBody))

// END OF NEW STUFF
});
}).listen(8080);

本文作者:flydean程序那些事

本文链接:www.flydean.com/nodejs-http…

本文来源:flydean的博客

欢迎关注我的公众号:「程序那些事」最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

产品经理的流程总是变,所以我搬出了大杀器状态机模式

发表于 2021-02-03

大家好,今天给大家介绍一个新的设计模式,这个设计模式非常重要,在我们日常的开发工作当中经常使用。它就是大名鼎鼎的状态机模式。

状态机模式非常适合用在复杂的流程或者是系统当中,可以方便我们对系统的某一个状态进行抽象,这会让我们编码具有更强的可读性以及延展性。

有向图与DAG

首先和大家解释一下状态机当中这个状态的概念,这里的状态指的是我们系统或者是流程当中的某一个状态。我用我之前做过的一个活动系统来给大家举一个例子。

比如我们现在要在网上举办一些活动,然后吸引用户来参与。但是在用户来参与活动的过程当中其实有很多的状态需要判断,比如说我们首先要判断用户是否已经登录了。如果登录了,还需要判断用户之前是否报名过,如果已经报名了,还需要判断活动是否开始了等等。

那么,我们就可以抽象出很多的状态。比如是否登录、是否报名、未登录等等这些都是状态。这些状态之间可以通过一些条件进行转移,比如在初始状态当中,通过判断用户是否登录选择转移到未登录状态或者是报名判断的状态上。我们把这其中的逻辑抽象出来,可以得到这么一张有向无环图。

几乎所有的固定流程都可以抽象出这么一张图来,这种图一般被缩写成DAG(Directed Acyclic Graph)。如果不用状态机的话,那么我们需要编写大量的代码来进行判断。就拿上述的这个逻辑举例,我们需要至少4层if嵌套的逻辑判断来实现这么一个流程。

如果通过if判断来实现的话,那么面临的一个问题就是这个流程是固定的。如果临时需要改动,那么必须要修改代码,而我们知道不管大小公司,发布代码都是有严格的规范的,是不能随意发布的。而使用状态机主要解决的就是这个问题,可以把流程做成可配置的,如果需要临时修改,只需要修改状态机的对应配置即可,可以规避掉代码层面的修改。

状态与状态机

理解了DAG之后,我们再来看看状态机的定义和解释。

状态机的官方定义是:

The intent of the STATE pattern is to distribute state-specific logic across classes that represent an object’s state.
状态模式会将与状态有关的逻辑分布写在代表对象状态的类中

这句话英文读起来还是挺好理解的,中文相对更绕一些。简而言之,machine是一种抽象的概念,代表某一个流程或者是原理,并不是我们理解的机器。所以状态机也不是一个机器,它是由多个代表状态的类组合而成的流程或者说模式。也就是说我们会把DAG当中的每一个节点(状态)单独实现成一个类,那么整个DAG就是一系列状态类构成的图。

对于每个状态类而言,它们的操作应该都是类似的,就是初始化、执行以及转移。在一些系统当中,甚至可以没有执行只有转移。既然所有状态的操作都是类似的,那么我们可以对所有的状态抽象出统一的接口。这里我们多了一个is_end方法,代表某一个状态是否是整个流程的结束,如果是的话,我们就不需要继续转移了,直接退出即可。

1
2
3
4
5
6
7
8
9
10
11
12
python复制代码class State:
def __init__(states):
pass

def determine(param):
pass

def operate():
pass

def is_end():
pass

同样,我们可以实现状态机的类。

1
2
3
4
5
6
7
8
9
10
11
python复制代码class StateMachine:
def __init__():
self.node = StartState()

def init():
self.node = StartState()

def run(param):
while not self.node.is_end():
self.node = self.node.determine(param)
self.node.operate()

由于状态之间转移以及执行的逻辑都被封装在不同的类当中了,所以对于状态机而言,里面的逻辑非常简单,一般也不需要太大的修改。即使整个流程或者是某一个状态的条件发生变动, 我们也只需要修改对应节点的代码即可,并不会影响整体,非常适合用在那些流程经常发生变动的场景。

最后,我们来看一个状态机的使用案例。这个案例源于github,是一个将状态机应用在收音机上的case,具体的细节查看代码即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
python复制代码class State:

def scan(self):
# 模拟收音机的仪表盘,只能一个方向转动
self.pos += 1
if self.pos == len(self.stations):
self.pos = 0
print('Scanning... Station is {} {}'.format(self.stations[self.pos], self.name))


class AmState(State):
# Am 音频的类
def __init__(self, radio):
self.radio = radio
self.stations = ['1250', '1380', '1510']
self.pos = 0
self.name = 'AM'

def toggle_amfm(self):
# 转移到Fm
print('Switching to FM')
self.radio.state = self.radio.fmstate


class FmState(State):
# Fm 音频类
def __init__(self, radio):
self.radio = radio
self.stations = ['81.3', '89.1', '103.9']
self.pos = 0
self.name = 'FM'

def toggle_amfm(self):
# 转移到Am
print('Switching to AM')
self.radio.state = self.radio.amstate


class Radio:
# 收音机的整体类,也就是状态机类
def __init__(self):
self.amstate = AmState(self)
self.fmstate = FmState(self)
self.state = self.amstate

def toggle_amfm(self):
self.state.toggle_amfm()

def scan(self):
self.state.scan()


if __name__ == '__main__':
radio = Radio()
actions = [radio.scan] * 2 + [radio.toggle_amfm] + [radio.scan] * 2
actions *= 2
for action in actions:
action()

整个状态机的设计模式本身并不复杂,更多的是对这个设计理念和思想的理解,代码和形式都是表象。

好了,今天的文章就到这里,衷心祝愿大家每天都有所收获。如果还喜欢今天的内容的话,请来一个三连支持吧~(点赞、关注、转发)

原文链接,求个关注

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot 2x基础教程:使用JTA实现多数据

发表于 2021-02-03

在一个Spring Boot项目中,连接多个数据源还是比较常见的。之前也介绍了如何在几种常用框架的场景下配置多数据源,具体可见:

  • Spring Boot 2.x基础教程:JdbcTemplate的多数据源配置
  • Spring Boot 2.x基础教程:Spring Data JPA的多数据源配置
  • Spring Boot 2.x基础教程:MyBatis的多数据源配置

当我们采用多数据源的时候,同时也会出现一个这样的特殊场景:我们希望对A数据源的更新和B数据源的更新具备事务性。这样的例子很常见,比如:在订单库中创建一条订单记录,同时还需要在商品库中扣减商品库存。如果库存扣减失败,那么我们希望订单创建也能够回滚。

如果这两条数据在一个数据库中,那么通过之前介绍的事务管理就能轻松解决了。但是,当这两个操作位于不同的数据库中,那么就无法实现了。

本文就来介绍一种解决这类问题的方法:JTA事务。

什么是JTA

JTA,全称:Java Transaction API。JTA事务比JDBC事务更强大。一个JTA事务可以有多个参与者,而一个JDBC事务则被限定在一个单一的数据库连接。所以,当我们在同时操作多个数据库的时候,使用JTA事务就可以弥补JDBC事务的不足。

在Spring Boot 2.x中,整合了这两个JTA的实现:

  • Atomikos:可以通过引入spring-boot-starter-jta-atomikos依赖来使用
  • Bitronix:可以通过引入spring-boot-starter-jta-bitronix依赖来使用

由于Bitronix自Spring Boot 2.3.0开始不推荐使用,所以在下面的动手环节中,我们将使用Atomikos作为例子来介绍JTA的使用。

动手试试

下面我们就来实操一下,如何在Spring Boot中使用JTA来实现多数据源下的事务管理。

准备工作

  1. 这里我们将使用最基础的JdbcTemplate来实现数据访问,所以如果你还不会使用JdbcTemplate配置多数据源,建议先看一下JdbcTemplate的多数据源配置。
  2. 场景设定:
  • 假设我们有两个库,分别为:test1和test2
  • 这两个库中都有一张User表,我们希望这两张表中的数据是一致的
  • 假设这两张表中都已经有一条数据:name=aaa,age=30;因为这两张表中数据是一致的,所以要update的时候,就必须两个库中的User表更新时候,要么都成功,要么都失败。

本文首发:blog.didispace.com/spring-boot… ,后期修改更新主要以原文为主。

操作详细

  1. 在pom.xml中加入JTA的实现Atomikos的Starter
1
2
3
4
xml复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
  1. 在application.properties配置文件中配置两个test1和test2数据源
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
properties复制代码spring.jta.enabled=true

spring.jta.atomikos.datasource.primary.xa-properties.url=jdbc:mysql://localhost:3306/test1
spring.jta.atomikos.datasource.primary.xa-properties.user=root
spring.jta.atomikos.datasource.primary.xa-properties.password=12345678
spring.jta.atomikos.datasource.primary.xa-data-source-class-name=com.mysql.cj.jdbc.MysqlXADataSource
spring.jta.atomikos.datasource.primary.unique-resource-name=test1
spring.jta.atomikos.datasource.primary.max-pool-size=25
spring.jta.atomikos.datasource.primary.min-pool-size=3
spring.jta.atomikos.datasource.primary.max-lifetime=20000
spring.jta.atomikos.datasource.primary.borrow-connection-timeout=10000

spring.jta.atomikos.datasource.secondary.xa-properties.url=jdbc:mysql://localhost:3306/test2
spring.jta.atomikos.datasource.secondary.xa-properties.user=root
spring.jta.atomikos.datasource.secondary.xa-properties.password=12345678
spring.jta.atomikos.datasource.secondary.xa-data-source-class-name=com.mysql.cj.jdbc.MysqlXADataSource
spring.jta.atomikos.datasource.secondary.unique-resource-name=test2
spring.jta.atomikos.datasource.secondary.max-pool-size=25
spring.jta.atomikos.datasource.secondary.min-pool-size=3
spring.jta.atomikos.datasource.secondary.max-lifetime=20000
spring.jta.atomikos.datasource.secondary.borrow-connection-timeout=10000
  1. 创建多数据源配置类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码@Configuration
public class DataSourceConfiguration {

@Primary
@Bean
@ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.primary")
public DataSource primaryDataSource() {
return new AtomikosDataSourceBean();
}

@Bean
@ConfigurationProperties(prefix = "spring.jta.atomikos.datasource.secondary")
public DataSource secondaryDataSource() {
return new AtomikosDataSourceBean();
}

@Bean
public JdbcTemplate primaryJdbcTemplate(@Qualifier("primaryDataSource") DataSource primaryDataSource) {
return new JdbcTemplate(primaryDataSource);
}

@Bean
public JdbcTemplate secondaryJdbcTemplate(@Qualifier("secondaryDataSource") DataSource secondaryDataSource) {
return new JdbcTemplate(secondaryDataSource);
}

}

注意,这里除了家在的配置不同之外,DataSource也采用了AtomikosDataSourceBean注意与之前配置多数据源使用的配置和实现类的区别。

  1. 创建一个Service实现,模拟两种不同的情况。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码@Service
public class TestService {

private JdbcTemplate primaryJdbcTemplate;
private JdbcTemplate secondaryJdbcTemplate;

public TestService(JdbcTemplate primaryJdbcTemplate, JdbcTemplate secondaryJdbcTemplate) {
this.primaryJdbcTemplate = primaryJdbcTemplate;
this.secondaryJdbcTemplate = secondaryJdbcTemplate;
}

@Transactional
public void tx() {
// 修改test1库中的数据
primaryJdbcTemplate.update("update user set age = ? where name = ?", 30, "aaa");
// 修改test2库中的数据
secondaryJdbcTemplate.update("update user set age = ? where name = ?", 30, "aaa");
}

@Transactional
public void tx2() {
// 修改test1库中的数据
primaryJdbcTemplate.update("update user set age = ? where name = ?", 40, "aaa");
// 模拟:修改test2库之前抛出异常
throw new RuntimeException();
}

}

这里tx函数,是两句update操作,一般都会成功;而tx2函数中,我们人为的制造了一个异常,这个异常是在test1库中的数据更新后才产生的,这样就可以测试一下test1更新成功,之后是否还能在JTA的帮助下实现回滚。

  1. 创建测试类,编写测试用例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码@SpringBootTest(classes = Chapter312Application.class)
public class Chapter312ApplicationTests {

@Autowired
protected JdbcTemplate primaryJdbcTemplate;
@Autowired
protected JdbcTemplate secondaryJdbcTemplate;

@Autowired
private TestService testService;

@Test
public void test1() throws Exception {
// 正确更新的情况
testService.tx();
Assertions.assertEquals(30, primaryJdbcTemplate.queryForObject("select age from user where name=?", Integer.class, "aaa"));
Assertions.assertEquals(30, secondaryJdbcTemplate.queryForObject("select age from user where name=?", Integer.class, "aaa"));
}

@Test
public void test2() throws Exception {
// 更新失败的情况
try {
testService.tx2();
} catch (Exception e) {
e.printStackTrace();
} finally {
// 部分更新失败,test1中的更新应该回滚
Assertions.assertEquals(30, primaryJdbcTemplate.queryForObject("select age from user where name=?", Integer.class, "aaa"));
Assertions.assertEquals(30, secondaryJdbcTemplate.queryForObject("select age from user where name=?", Integer.class, "aaa"));
}
}

}

这里有两个测试用例:

  • test1:因为没有故意制造的异常,不出意外两个库的update都会成功,所以根据name=aaa去把两个数据查出来,看age是否都被更新到了30。
  • test2:tx2函数会把test1中name=aaa的用户age更新为40,然后抛出异常,JTA事务生效的话,会把age回滚回30,所以这里的检查也是两个库的aaa用户的age应该都为30,这样就意味着JTA事务生效,保证了test1和test2两个库中的User表数据更新一致,没有制造出脏数据。

测试验证

将上面编写的单元测试运行起来:

观察一下启动阶段的日志,可以看到这些Atomikos初始化日志输出:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
yaml复制代码2021-02-02 19:00:36.145  INFO 8868 --- [           main] c.a.icatch.provider.imp.AssemblerImp     : USING: com.atomikos.icatch.default_max_wait_time_on_shutdown = 9223372036854775807
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.allow_subtransactions = true
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.recovery_delay = 10000
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.automatic_resource_registration = true
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.oltp_max_retries = 5
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.client_demarcation = false
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.threaded_2pc = false
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.serial_jta_transactions = true
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.log_base_dir = /Users/didi/Documents/GitHub/SpringBoot-Learning/2.x/chapter3-12/transaction-logs
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.rmi_export_class = none
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.max_actives = 50
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.checkpoint_interval = 500
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.enable_logging = true
2021-02-02 19:00:36.145 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.log_base_name = tmlog
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.max_timeout = 300000
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.trust_client_tm = false
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: java.naming.factory.initial = com.sun.jndi.rmi.registry.RegistryContextFactory
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.tm_unique_name = 127.0.0.1.tm
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.forget_orphaned_log_entries_delay = 86400000
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.oltp_retry_interval = 10000
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: java.naming.provider.url = rmi://localhost:1099
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.force_shutdown_on_vm_exit = false
2021-02-02 19:00:36.146 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : USING: com.atomikos.icatch.default_jta_timeout = 10000
2021-02-02 19:00:36.147 INFO 8868 --- [ main] c.a.icatch.provider.imp.AssemblerImp : Using default (local) logging and recovery...
2021-02-02 19:00:36.184 INFO 8868 --- [ main] c.a.d.xa.XATransactionalResource : test1: refreshed XAResource
2021-02-02 19:00:36.203 INFO 8868 --- [ main] c.a.d.xa.XATransactionalResource : test2: refreshed XAResource

同时,我们在transaction-logs目录下,还能找到关于事务的日志信息:

1
2
3
json复制代码{"id":"127.0.0.1.tm161226409083100001","wasCommitted":true,"participants":[{"uri":"127.0.0.1.tm1","state":"COMMITTING","expires":1612264100801,"resourceName":"test1"},{"uri":"127.0.0.1.tm2","state":"COMMITTING","expires":1612264100801,"resourceName":"test2"}]}
{"id":"127.0.0.1.tm161226409083100001","wasCommitted":true,"participants":[{"uri":"127.0.0.1.tm1","state":"TERMINATED","expires":1612264100804,"resourceName":"test1"},{"uri":"127.0.0.1.tm2","state":"TERMINATED","expires":1612264100804,"resourceName":"test2"}]}
{"id":"127.0.0.1.tm161226409092800002","wasCommitted":false,"participants":[{"uri":"127.0.0.1.tm3","state":"TERMINATED","expires":1612264100832,"resourceName":"test1"}]}

更多本系列免费教程连载「点击进入汇总目录」

代码示例

本文的相关例子可以查看下面仓库中的chapter3-12目录:

  • Github:github.com/dyc87112/Sp…
  • Gitee:gitee.com/didispace/S…

如果您觉得本文不错,欢迎Star支持,您的关注是我坚持的动力!

欢迎关注我的公众号:程序猿DD,获得独家整理的学习资源、日常干货及福利赠送。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot整合kafka

发表于 2021-02-03

人生有涯,学海无涯

Kafka简介

Kafka 是由 Linkedin 公司开发的,它是一个分布式的,支持多分区、多副本,基于 Zookeeper 的分布式消息流平台,它同时也是一款开源的基于发布订阅模式的消息引擎系统。

Kafka 的基本术语

  • 消息:Kafka 中的数据单元被称为消息,也被称为记录,可以把它看作数据库表中某一行的记录。
  • 批次:为了提高效率, 消息会分批次写入 Kafka,批次就代指的是一组消息。
  • 主题:消息的种类称为 主题(Topic),可以说一个主题代表了一类消息。相当于是对消息进行分类。主题就像是数据库中的表。
  • 分区:主题可以被分为若干个分区(partition),同一个主题中的分区可以不在一个机器上,有可能会部署在多个机器上,由此来实现 kafka 的伸缩性,单一主题中的分区有序,但是无法保证主题中所有的分区有序

在这里插入图片描述

  • 生产者: 向主题发布消息的客户端应用程序称为生产者(Producer),生产者用于持续不断的向某个主题发送消息。
  • 消费者:订阅主题消息的客户端程序称为消费者(Consumer),消费者用于处理生产者产生的消息。
  • 消费者群组:生产者与消费者的关系就如同餐厅中的厨师和顾客之间的关系一样,一个厨师对应多个顾客,也就是一个生产者对应多个消费者,消费者群组(Consumer Group)指的就是由一个或多个消费者组成的群体。

在这里插入图片描述

  • 偏移量:偏移量(Consumer Offset)是一种元数据,它是一个不断递增的整数值,用来记录消费者发生重平衡时的位置,以便用来恢复数据。

broker: 一个独立的 Kafka 服务器就被称为 broker,broker 接收来自生产者的消息,为消息设置偏移量,并提交消息到磁盘保存。

broker 集群:broker 是集群 的组成部分,broker 集群由一个或多个 broker 组成,每个集群都有一个 broker 同时充当了集群控制器的角色(自动从集群的活跃成员中选举出来)。

  • 副本:Kafka 中消息的备份又叫做 副本(Replica),副本的数量是可以配置的,Kafka 定义了两类副本:领导者副本(Leader Replica) 和 追随者副本(Follower Replica),前者对外提供服务,后者只是被动跟随。
  • 重平衡:Rebalance。消费者组内某个消费者实例挂掉后,其他消费者实例自动重新分配订阅主题分区的过程。Rebalance 是 Kafka 消费者端实现高可用的重要手段。

Kafka 的特性(设计原则)

  • 高吞吐、低延迟:kakfa 最大的特点就是收发消息非常快,kafka 每秒可以处理几十万条消息,它的最低延迟只有几毫秒。
  • 高伸缩性: 每个主题(topic) 包含多个分区(partition),主题中的分区可以分布在不同的主机(broker)中。
  • 持久性、可靠性: Kafka 能够允许数据的持久化存储,消息被持久化到磁盘,并支持数据备份防止数据丢失,Kafka 底层的数据存储是基于 Zookeeper 存储的,Zookeeper 我们知道它的数据能够持久存储。
  • 容错性: 允许集群中的节点失败,某个节点宕机,Kafka 集群能够正常工作
  • 高并发: 支持数千个客户端同时读写

Kafka 的使用场景

  • 活动跟踪:Kafka 可以用来跟踪用户行为,比如我们经常回去淘宝购物,你打开淘宝的那一刻,你的登陆信息,登陆次数都会作为消息传输到 Kafka ,当你浏览购物的时候,你的浏览信息,你的搜索指数,你的购物爱好都会作为一个个消息传递给 Kafka ,这样就可以生成报告,可以做智能推荐,购买喜好等。
  • 传递消息:Kafka 另外一个基本用途是传递消息,应用程序向用户发送通知就是通过传递消息来实现的,这些应用组件可以生成消息,而不需要关心消息的格式,也不需要关心消息是如何发送的。
  • 度量指标:Kafka也经常用来记录运营监控数据。包括收集各种分布式应用的数据,生产各种操作的集中反馈,比如报警和报告。
  • 日志记录:Kafka 的基本概念来源于提交日志,比如我们可以把数据库的更新发送到 Kafka 上,用来记录数据库的更新时间,通过kafka以统一接口服务的方式开放给各种consumer,例如hadoop、Hbase、Solr等。
  • 流式处理:流式处理是有一个能够提供多种应用程序的领域。
  • 限流削峰:Kafka 多用于互联网领域某一时刻请求特别多的情况下,可以把请求写入Kafka 中,避免直接请求后端程序导致服务崩溃。

Kafka 的消息队列

  • Kafka 是支持消费者群组的,也就是说 Kafka 中会有一个或者多个消费者,如果一个生产者生产的消息由一个消费者进行消费的话,那么这种模式就是点对点模式

在这里插入图片描述

  • 如果一个生产者或者多个生产者产生的消息能够被多个消费者同时消费的情况,这样的消息队列成为发布订阅模式的消息队列

在这里插入图片描述

Kafka 系统架构

在这里插入图片描述

如上图所示,一个典型的 Kafka 集群中包含若干Producer(可以是web前端产生的Page View,或者是服务器日志,系统CPU、Memory等),若干broker(Kafka支持水平扩展,一般broker数量越多,集群吞吐率越高),若干Consumer Group,以及一个Zookeeper集群。Kafka通过Zookeeper管理集群配置,选举leader,以及在Consumer Group发生变化时进行rebalance。Producer使用push模式将消息发布到broker,Consumer使用pull模式从broker订阅并消费消息。

核心 API

Kafka 有四个核心API,它们分别是:

  • Producer API,它允许应用程序向一个或多个 topics 上发送消息记录
  • Consumer API,允许应用程序订阅一个或多个 topics 并处理为其生成的记录流
  • Streams API,它允许应用程序作为流处理器,从一个或多个主题中消费输入流并为其生成输出流,有效的将输入流转换为输出流。
  • Connector API,它允许构建和运行将 Kafka 主题连接到现有应用程序或数据系统的可用生产者和消费者。例如,关系数据库的连接器可能会捕获对表的所有更改

在这里插入图片描述

springboot集成kafka

本地kafka和zk环境搭建

我们需要在本地启动一个单机版的kafka和zookeeper环境。kafka的安装包自带zookeeper,直接启动即可。

我的本地环境配置如下:

  • win10系统
  • kafka_2.12-2.5.0
  • zookeeper-3.4.12
  • spring boot 2.2.10.RELEASE

zookeeper的安装不做说明了,直接启动zk,端口是2181

启动kafka,首先修改配置文件config文件下的server.properties文件

listeners=PLAINTEXT://127.0.0.1:9092

log.dirs=/kafka-logs 日志存储的文件夹

到kafka根目录下,输入命令 .\bin\windows\kafka-server-start.bat .\config\server.properties ,回车;启动成功的窗口。

依赖:

1
2
3
4
java复制代码<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码server:
port: 8999
contextPath : /kafka
spring:
application:
name: kafka
kafka:
bootstrapServers: 127.0.0.1:9092
consumer:
groupId: myGroup
keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
valueDserializer: org.apache.kafka.common.serialization.StringDeserializer
producer:
groupId: myGroup
keyDeserializer: org.apache.kafka.common.serialization.StringSerializer
valueDserializer: org.apache.kafka.common.serialization.StringSerializer

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码package com.cn.producer;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Service;
import org.springframework.util.concurrent.ListenableFutureCallback;

/**
* 类的功能描述:
* 消息生产者用于发送消息
*/
@Service
public class MessageProducer {

private final Logger log = LoggerFactory.getLogger(MessageProducer.class);

@Autowired
KafkaTemplate kafkaTemplate;

public void send(String payMessage) {
kafkaTemplate.send("payTopic", payMessage).addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
@Override
public void onFailure(Throwable ex) {
log.info("发送消息失败:"+ex.getMessage());
}

@Override
public void onSuccess(SendResult<String, Object> result) {
log.info("发送消息成功:" + result.getRecordMetadata().topic() + "-"
+ result.getRecordMetadata().partition() + "-" + result.getRecordMetadata().offset());
}
});
}
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码package com.cn.consumer;

import com.cn.constant.TopicConst;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;

/**
* 类的功能描述:
* 消息消费者者用于处理消息
*/
@Service
public class MessageConsumer {

private final Logger logger = LoggerFactory.getLogger(MessageConsumer.class);

@KafkaListener(topics = "payTopic")
public void onMessage(ConsumerRecord<?, ?> record) {
logger.info("消费者接收到消息为:"+record.topic()+"-"+record.partition()+"-"+record.value());
}
}
  • 只需要在监听的方法上通过注解配置一个监听器即可,另外就是指定需要监听的topic。
  • kafka的消息再接收端会被封装成ConsumerRecord对象返回,它内部的value属性就是实际的消息。

发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码package com.cn.controller;

import com.cn.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class SendController {

@Autowired
private MessageProducer producer;

@GetMapping(value = "/sendMsg/{message}")
public void sendMsg(@PathVariable("message") String msg){
producer.send(msg);
}
}

测试


控制台内容如下:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何在业务逻辑当中优雅引入重试机制 为什么要引入重试机制 重

发表于 2021-02-03

作者|田航(马刺)

为什么要引入重试机制

我们首先看看正常的业务系统交互流程,就像下面图中所示一样,我们自己开发的系统通过HTTP接口或者通过RPC去访问其他业务系统,其他系统在没出现任何问题的情况下会返回给我们需要的数据,状态为success。

但大家在日常的开发工作当中应该碰到过不少这样的问题:自己应用因为业务需求需要调其他关联应用的接口或二方包,而其他应用的接口稳定性不敢过分恭维,老是出一些莫名奇妙的幺蛾子,比如由于接口暂时升级维护导致的短暂不可用,又或者网络抖动因素导致的单次接口请求失败。

诸如此类的麻烦问题会因为业务强依赖致使我们自己维护的系统也跟着陷入一种不稳定的状态(当然这个强依赖是没有办法的事情,毕竟业务之间需要解耦独立开发维护)。

所以也就是说重试的使用场景大多是因为我们的系统依赖了其他的业务,或者是由于我们自己的业务需要通过网络请求去获取数据这样的场景。既然一次请求结果的状态非常不可控、不稳定,那么一个非常自然的想法就是多试几次,就能很好的避开网络抖动或其他关联应用暂时down机维护带来的系统不可用问题。

当然,这里也有几个引入重试机制以后需要考虑的问题。

  • 我们应该重试几次?
  • 每次重试的间隔设置为多少合适?
  • 如果所有重试机会都用完了还是不成功怎么办?下面我们就这几个问题展开分析一下。

重试几次合适

通常来说我们单次重试所面临的情况就如上面我们分析的一样,有很大的不可确定性,那到底多少次是比较合理的次数呢?这个就要“具体业务具体分析”了,但一般来说3次重试就差不多可以满足大多数业务需求了,当然,这是需要结合后面要说的重试间隔一起讨论的。为什么说3次就基本够了呢,因为如果被请求系统实在处于长时间不可用状态。我们重试多次是没有什么意义的。

重试间隔设置为多少合适

如果重试间隔设置得太小,可能被调用系统还没来得及恢复过来我们就又发起调用,得到的结果肯定还是Fail;如果设置的太大,我们自己的系统就会牺牲掉不少数据时效性。所以,重试间隔也要根据被调用的系统平均恢复时间去正确估量,通常而言这个平均恢复时间很难统计到,所以一般的经验值是3至5分钟。

重试机会用完以后依旧Fail怎么办

这种情况也是需要认真考虑的,因为不排除被调用系统真的起不来的情况,这时候就需要采取一定的补偿措施了。首先要做的就是在我们自己的系统增加错误报警机制,这样我们才能即时感知到应用发生了不可自恢复的调用异常。其次就是在我们的代码逻辑中加入触发手动重试的开关,这样在发生异常情况以后我们就可以方便的修改触发开关然后手动重试。

在这里还有一个非常重要的问题需要考虑,那就是接口调用的幂等性问题,如果接口不是幂等的,那我们手动重试的时候就很容易发生数据错乱相关的问题。

Spring重试工具包

Spring为我们提供了原生的重试类库,我们可以方便地引入到工程当中,利用它提供的重试注解,没有太多的业务逻辑侵入性。如下,我们先引入依赖包。

1
2
3
4
5
6
7
8
xml复制代码<dependency>
<groupId>org.springframework.retry</groupId>
<artifactId>spring-retry</artifactId>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>

然后在启动类或者配置类上添加@EnableRetry注解,并在需要重试的方法上添加@Retryable注解。

1
2
3
4
5
6
7
8
9
10
typescript复制代码@Retryable
public String hello(){
long times = helloTimes.incrementAndGet();
log.info("hello times:{}", times);
if (times % 4 != 0){
log.warn("发生异常,time:{}", LocalTime.now() );
throw new HelloRetryException("发生Hello异常");
}
return "hello " + nameService.getName();
}

更详细的用法和属性大家参阅Spring Retry的文档就好了,解释的非常清楚。这里要说的一点是,Spring的重试机制也还是存在一定的不足,只支持对异常进行捕获,而无法对返回值进行校验。

Guava Retry

相比Spring Retry,Guava Retry具有更强的灵活性,可以根据返回值校验来判断是否需要进行重试。我们依然需要先引入它的依赖包。

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.github.rholder</groupId>
<artifactId>guava-retrying</artifactId>
<version>2.0.0</version>
</dependency>

在用的时候也很简单,先创建一个Retryer实例,然后使用这个实例对需要重试的方法进行调用,可以通过很多方法来设置重试机制,比如使用retryIfException来对所有异常进行重试,使用retryIfExceptionOfType方法来设置对指定异常进行重试,使用retryIfResult来对不符合预期的返回结果进行重试,使用retryIfRuntimeException方法来对所有RuntimeException进行重试。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
scss复制代码@Test
public void guavaRetry() {
Retryer<String> retryer = RetryerBuilder.<String>newBuilder()
.retryIfExceptionOfType(HelloRetryException.class)
.retryIfResult(StringUtils::isEmpty)
.withWaitStrategy(WaitStrategies.fixedWait(3, TimeUnit.SECONDS))
.withStopStrategy(StopStrategies.stopAfterAttempt(3))
.build();

try {
retryer.call(() -> helloService.hello());
} catch (Exception e){
e.printStackTrace();
}
}

相比Spring,Guava Retry提供了几个核心特性。

  • 可以设置任务单次执行的时间限制,如果超时则抛出异常。
  • 可以设置重试监听器,用来执行额外的处理工作。
  • 可以设置任务阻塞策略,即可以设置当前重试完成,下次重试开始前的这段时间做什么事情。
  • 可以通过停止重试策略和等待策略结合使用来设置更加灵活的策略,比如指数等待时长并最多10次调用,随机等待时长并永不停止等等。

小结

上面针对我们为什么要引入重试机制,引入重试机制需要思考的几个核心问题,以及为重试机制提供良好支持的工具类库都分别作了简单介绍,相信大家在今后的开发工作中遇到类似场景也能驾轻就熟地使用思考了。

我们日常工作中有很多“大”的业务场景需要我们集中精力去突破、去思考,但也有很多类似的“小”点需要我们去打穿、吃透,大家共勉。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

给老孙做了个排班表! 碎碎念 背景 产品经理 大战开始

发表于 2021-02-03

碎碎念

不是吧,阿sir,996还不让人摸鱼,BOOS果断让人事部做出计划整改,改就改吧.为什么让我来写这个那,我自己制裁我自己可还行,看了看自己不争气的花呗!整啊,加油啊打工人.

背景

某公司软件开发工程师孙工,作息规律为上三天班,休息一天,经常不确定休
息日是否周末,为此,请你开发一个程序,当孙工输入年及月,以日历方式显示对
应月份的休息日,用中括号进行标记.同时,统计出本月有几天休息,轮到周末休
息有几天.(注:首次休息日是 2020 年 2 月 2 日)

产品经理

嘀嘀嘀嘀 , 产品经理将界面和需求一股脑发了过来.

产品界面:
在这里插入图片描述
需求概述:

  1. 获得输入数据:年,月
  2. 通过计算进行展示
  3. 同步输出统计信息
  4. 程序回到下一次的输入数据状态

大战开始

话不多说! 开撸开撸,刚开始撸,bug漫天飞,我摸摸了本来头发就不多的头皮,自己开撸.终于在下班前撸了出来.想了想mvc,还是算了吧,老老实实写写得了.

MVC的概念,将数据操作层,流程控制层和视图层尽可能分开,这样每个模块之间的关联尽可能地变小(低耦合),在编写大项目时可以使得程序整体逻辑清晰可见,易于维护和修改,体现出java的核心优势之一

需求分析(三步走战略)

第一,最明显的需要打印出一个日历来

第二,需要标记出孙工的休息日用[ ]标记出来

第三,需要算出来,孙工在他查询的那个月有几天的休息日,并且休息日有几天在周末

具体实现

接收用户输入的年份和月份

有人要问,为啥你接收个数据还要另外写个函数那?我说年轻人,年份和月份老讲究的,一不小心就有一些夸张的事情了.

  • 年份:老张2020才营业,你写前面就不合适了吧,还有老张不会在公司干的寿终正寝吧!
  • 月份:一年也就12个月,你在创造点可还行!
1
2
3
4
5
6
java复制代码  		int year = -1 ;
int month = -1;
System.out.println("请输入年份:");
year = input(2020,2080);
System.out.println("请输入月份:");
month = input(1,12);
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码    /**
* 1.接收数据 2.排除非法数据
* @param min
* @param max
* @return
*/
public static int input(int min,int max) {
String input = scanner.nextLine();
int number = -1;
try {
number = Integer.parseInt(input);
} catch (NumberFormatException e) {
System.out.println("输入有误,请重新输入!");
return input(min, max);
}
if (number < min || number > max) {
System.out.println("输入有误,请重新输入!");
return input(min,max);
}
return number;
}

Calendar类的使用

将获取到的数据进行处理:

  1. 月份: 这里有个细节,月份是从0开始的,就是0月对应的是1月,所以输入的值要减一.
  2. 获得每个月的一号是星期几
  3. 算算每个月的天数
  4. 打印一下表头和那个第一行的空格
  5. 空格数等于 = 一号对应星期数- 1
    jdk11中常量的描述: 在这里插入图片描述
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码       //创建日历类对象
Calendar calendar = Calendar.getInstance();
/**
* 设置年 月(0-11) 日
*/
calendar.set(Calendar.YEAR,year);
calendar.set(Calendar.MONTH,month-1);
calendar.set(Calendar.DATE,1);

//判断日历的初始值是星期几
int dayOfWeek = calendar.get(Calendar.DAY_OF_WEEK);
//System.out.println(dayOfWeek);

//获取本月的最大天数
int daysMaxOfMonth = calendar.getActualMaximum(Calendar.DATE);
//System.out.println(daysMaxOfMonth);

//打印日历表头
System.out.println("星期日\t星期一\t星期二\t星期三\t星期四\t星期五\t星期六");
for(int i = 1;i <= dayOfWeek - 1; i++){
System.out.printf("%-8s","");
}

这个时候,我看了看关于几个变量的描述:
在这里插入图片描述

jdk11中关于使用getActualMaximum方法的描述

public int getActualMaximum​(int field)
给定此Calendar的时间值,返回指定日历字段可能具有的Calendar 。 例如, MONTH字段的实际最大值在某些年份为12,在希伯来日历系统中为其他年份的13。
此方法的默认实现使用迭代算法来确定日历字段的实际最大值。 如果可能,子类应该使用更有效的实现来覆盖它。

计算(抠脚丫子功夫)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码        //本月休假天数
int relaxDays = 0;
//本月休假的周末天数
int relaxWeekends = 0;

for (int i = 1;i <= daysMaxOfMonth;i++) {
//设置日
calendar.set(Calendar.DATE, i);
//得到当地日期与初始日期:2020.2.2 差的天数
int intervalDays = intervalDays(calendar);
//判断该天是不是休息日
if (intervalDays % 4 == 0) {
System.out.printf("%-8s","["+i+"]");
relaxDays++;
if (calendar.get(Calendar.DAY_OF_WEEK) == 1 ||calendar.get(Calendar.DAY_OF_WEEK) == 7) {
relaxWeekends++;
}
} else {
System.out.printf("%-8s",i);
}
if(calendar.get(Calendar.DAY_OF_WEEK) == 7){
System.out.println();
}
}
System.out.println();
System.out.println("本月休息的天数:"+relaxDays+"天");
System.out.println("本月轮到周末休息的天数:"+relaxWeekends+"天");

}

计算距离初始日期(2020.02.02)的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码public static  int  intervalDays(Calendar c) throws ParseException {
//创建date对象
Date date = c.getTime();
//得到格林威治时间
long to = date.getTime();

//创建日期格式,用于记录初始日期
SimpleDateFormat format = new SimpleDateFormat("yyyy-MM-dd");
//将初始日期转化为Date对象
Date initialDate = format.parse("2020-02-02");
//得到初始日期格林威治时间
long from = initialDate.getTime();
//计算两个格林威治时间的差,得到两个日期的相隔天数
int intervalalDays = (int) ((to - from) / (24 * 60 * 60 * 1000));

//初始日期是第一个休息日,在此之前不算
if(to - from < 0){
return -1;
}
return intervalalDays;

}

客官看到这了就给个赞吧!!!(需要源码的,私我呦!)
在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

深入理解 ProtoBuf 原理与工程实践(概述) 一、什么

发表于 2021-02-03

ProtoBuf 作为一种跨平台、语言无关、可扩展的序列化结构数据的方法,已广泛应用于网络数据交换及存储。随着互联网的发展,系统的异构性会愈发突出,跨语言的需求会愈加明显,同时 gRPC 也大有取代Restful之势,而 ProtoBuf 作为g RPC 跨语言、高性能的法宝,我们技术人有必要

深入理解 ProtoBuf 原理,为以后的技术更新和选型打下基础。

我将过去的学习过程以及实践经验,总结成系列文章,与大家一起探讨学习,希望大家能有所收获,当然其中有不正确的地方也欢迎大家批评指正。

本系列文章主要包含:

  1. 深入理解 ProtoBuf 原理与工程实践(概述)
  2. 深入理解 ProtoBuf 原理与工程实践(编码)
  3. 深入理解 ProtoBuf 原理与工程实践(序列化)
  4. 深入理解 ProtoBuf 原理与工程实践(工程实践)

一、什么是ProtoBuf

ProtoBuf(Protocol Buffers)是一种跨平台、语言无关、可扩展的序列化结构数据的方法,可用于网络数据交换及存储。

在序列化结构化数据的机制中,ProtoBuf是灵活、高效、自动化的,相对常见的XML、JSON,描述同样的信息,ProtoBuf序列化后数据量更小、序列化/反序列化速度更快、更简单。

一旦定义了要处理的数据的数据结构之后,就可以利用ProtoBuf的代码生成工具生成相关的代码。只需使用 Protobuf 对数据结构进行一次描述,即可利用各种不同语言(proto3支持C++, Java, Python, Go, Ruby, Objective-C, C#)或从各种不同流中对你的结构化数据轻松读写。

二、为什么是 ProtoBuf

大家可能会觉得 Google 发明 ProtoBuf 是为了解决序列化速度的,其实真实的原因并不是这样的。

ProtoBuf最先开始是 Google用来解决索引服务器 request/response 协议的。没有ProtoBuf之前,Google 已经存在了一种 request/response 格式,用于手动处理 request/response 的编解码。它也能支持多版本协议,不过代码不够优雅:

1
2
3
4
5
java复制代码if (protocolVersion=1) {
doSomething();
} else if (protocolVersion=2) {
doOtherThing();
} ...

如果是非常明确的格式化协议,会使新协议变得非常复杂。因为开发人员必须确保请求发起者与处理请求的实际服务器之间的所有服务器都能理解新协议,然后才能切换开关以开始使用新协议。

这也就是每个服务器开发人员都遇到过的低版本兼容、新旧协议兼容相关的问题。

为了解决这些问题,于是ProtoBuf就诞生了。

ProtoBuf 最初被寄予以下 2 个特点:

  • 更容易引入新的字段,并且不需要检查数据的中间服务器可以简单地解析并传递数据,而无需了解所有字段。
  • 数据格式更加具有自我描述性,可以用各种语言来处理(C++, Java 等各种语言)。

这个版本的 ProtoBuf 仍需要自己手写解析的代码。

不过随着系统慢慢发展,演进,ProtoBuf具有了更多的特性:

  • 自动生成的序列化和反序列化代码避免了手动解析的需要。(官方提供自动生成代码工具,各个语言平台的基本都有)。
  • 除了用于数据交换之外,ProtoBuf被用作持久化数据的便捷自描述格式。

ProtoBuf 现在是 Google 用于数据交换和存储的通用语言。谷歌代码树中定义了 48162 种不同的消息类型,包括 12183 个 .proto 文件。它们既用于 RPC 系统,也用于在各种存储系统中持久存储数据。

ProtoBuf 诞生之初是为了解决服务器端新旧协议(高低版本)兼容性问题,名字也很体贴,“协议缓冲区”。只不过后期慢慢发展成用于传输数据。

Protocol Buffers 命名由来:

Why the name “Protocol Buffers”?

The name originates from the early days of the format, before we had the protocol buffer compiler to generate classes for us. At the time, there was a class called ProtocolBuffer which actually acted as a buffer for an individual method. Users would add tag/value pairs to this buffer individually by calling methods like AddValue(tag, value). The raw bytes were stored in a buffer which could then be written out once the message had been constructed.

Since that time, the “buffers” part of the name has lost its meaning, but it is still the name we use. Today, people usually use the term “protocol message” to refer to a message in an abstract sense, “protocol buffer” to refer to a serialized copy of a message, and “protocol message object” to refer to an in-memory object representing the parsed message.

三、如何使用 ProtoBuf

3.1 ProtoBuf 协议的工作流程

可以看到,对于序列化协议来说,使用方只需要关注业务对象本身,即 idl 定义,序列化和反序列化的代码只需要通过工具生成即可。

3.2 ProtoBuf 消息定义

ProtoBuf 的消息是在idl文件(.proto)中描述的。下面是本次样例中使用到的消息描述符customer.proto:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码syntax = "proto3";

package domain;

option java_package = "com.protobuf.generated.domain";
option java_outer_classname = "CustomerProtos";

message Customers {
repeated Customer customer = 1;
}

message Customer {
int32 id = 1;
string firstName = 2;
string lastName = 3;

enum EmailType {
PRIVATE = 0;
PROFESSIONAL = 1;
}

message EmailAddress {
string email = 1;
EmailType type = 2;
}

repeated EmailAddress email = 5;
}

上面的消息比较简单,Customers包含多个Customer,Customer包含一个id字段,一个firstName字段,一个lastName字段以及一个email的集合。

除了这些定义外,文件顶部还有三行可帮助代码生成器:

  1. 首先,syntax = “proto3”用于idl语法版本,目前有两个版本proto2和proto3,两个版本语法不兼容,如果不指定,默认语法是proto2。由于proto3比proto2支持的语言更多,语法更简洁,本文使用的是proto3。
  2. 其次有一个package domain;定义。此配置用于嵌套生成的类/对象。
  3. 有一个option java_package定义。生成器还使用此配置来嵌套生成的源。此处的区别在于这仅适用于Java。在使用Java创建代码和使用JavaScript创建代码时,使用了两种配置来使生成器的行为有所不同。也就是说,Java类是在包com.protobuf.generated.domain下创建的,而JavaScript对象是在包domain下创建的。

ProtoBuf 提供了更多选项和数据类型,本文不做详细介绍,感兴趣可以参考这里。

3.3 代码生成

首先安装 ProtoBuf 编译器 protoc,这里有详细的安装教程,安装完成后,可以使用以下命令生成 Java 源代码:

1
java复制代码protoc --java_out=./src/main/java ./src/main/idl/customer.proto

从项目的根路径执行该命令,并添加了两个参数:java_out,定义./src/main/java/为Java代码的输出目录;而./src/main/idl/customer.proto是.proto文件所在目录。

生成的代码非常复杂,但是幸运的是它的用法却非常简单。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码        CustomerProtos.Customer.EmailAddress email = CustomerProtos.Customer.EmailAddress.newBuilder()
.setType(CustomerProtos.Customer.EmailType.PROFESSIONAL)
.setEmail("crichardson@email.com").build();

CustomerProtos.Customer customer = CustomerProtos.Customer.newBuilder()
.setId(1)
.setFirstName("Lee")
.setLastName("Richardson")
.addEmail(email)
.build();
// 序列化
byte[] binaryInfo = customer.toByteArray();
System.out.println(bytes_String16(binaryInfo));
System.out.println(customer.toByteArray().length);
// 反序列化
CustomerProtos.Customer anotherCustomer = CustomerProtos.Customer.parseFrom(binaryInfo);
System.out.println(anotherCustomer.toString());

3.4 性能数据

我们简单地以Customers为模型,分别构造、选取小对象、普通对象、大对象进行性能对比。

序列化耗时以及序列化后数据大小对比

反序列化耗时

更多性能数据可以参考官方 Benchmark

四、总结

上面介绍了 ProtoBuf 是什么、产生的背景、基本用法,我们再总结下。

优点:

1. 效率高

从序列化后的数据体积角度,与XML、JSON这类文本协议相比,ProtoBuf通过T-(L)-V(TAG-LENGTH-VALUE)方式编码,不需要”, {, }, :等分隔符来结构化信息,同时在编码层面使用varint压缩,所以描述同样的信息,ProtoBuf序列化后的体积要小很多,在网络中传输消耗的网络流量更少,进而对于网络资源紧张、性能要求非常高的场景,ProtoBuf协议是不错的选择。

1
2
3
4
5
6
7
java复制代码// 我们简单做个对比
// 要描述如下JSON数据
{"id":1,"firstName":"Chris","lastName":"Richardson","email":[{"type":"PROFESSIONAL","email":"crichardson@email.com"}]}
# 使用JSON序列化后的数据大小为118byte
7b226964223a312c2266697273744e616d65223a224368726973222c226c6173744e616d65223a2252696368617264736f6e222c22656d61696c223a5b7b2274797065223a2250524f46455353494f4e414c222c22656d61696c223a226372696368617264736f6e40656d61696c2e636f6d227d5d7d
# 而使用ProtoBuf序列化后的数据大小为48byte
0801120543687269731a0a52696368617264736f6e2a190a156372696368617264736f6e40656d61696c2e636f6d1001

从序列化/反序列化速度角度,与XML、JSON相比,ProtoBuf序列化/反序列化的速度更快,比XML要快20-100倍。

2. 支持跨平台、多语言

ProtoBuf是平台无关的,无论是Android与PC,还是C#与Java都可以利用ProtoBuf进行无障碍通讯。

proto3支持C++, Java, Python, Go, Ruby, Objective-C, C#。

3. 扩展性、兼容性好

具有向后兼容的特性,更新数据结构以后,老版本依旧可以兼容,这也是ProtoBuf诞生之初被寄予解决的问题。因为编译器对不识别的新增字段会跳过不处理。

4. 使用简单

ProtoBuf 提供了一套编译工具,可以自动生成序列化、反序列化的样板代码,这样开发者只要关注业务数据idl,简化了编码解码工作以及多语言交互的复杂度。

缺点:

可读性差,缺乏自描述

XML,JSON是自描述的,而ProtoBuf则不是。

ProtoBuf是二进制协议,编码后的数据可读性差,如果没有idl文件,就无法理解二进制数据流,对调试不友好。

不过Charles已经支持ProtoBuf协议,导入数据的描述文件即可,详情可参考Charles Protocol Buffers

此外,由于没有idl文件无法解析二进制数据流,ProtoBuf在一定程度上可以保护数据,提升核心数据被破解的门槛,降低核心数据被盗爬的风险。

五、参考

  1. 维基百科
  2. 序列化与反序列化
  3. 官方Benchmark
  4. Charles Protocol Buffers
  5. choose-protocol-buffers

作者: Li Guanyun

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

熬夜肝了个IDEA插件整合程序员常用的工具,总有你能用上的

发表于 2021-02-03

本文已被Github仓库收录 github.com/silently952…

微信公众号:贝塔学Java

前言

自己在开发的过程中经常会使用一些在线的工具,比如:时间戳转日期,JSON格式化等等;前几天思考了下想把这些常用的功能都做成IDEA插件,在使用的时候就不用去网上寻找工具,在IDEA中就可以快速完成提升开发人员开发效率,所以就熬夜肝了这个插件,欢迎大家都来使用。

Github地址: github.com/silently952…

Gitee地址: gitee.com/silently952…

觉得好用的小伙伴记得小手一抖 star 哟

已实现功能

  • SQL 转换成 ElasticSearch 查询语句
  • 正则表达式
  • Base64编码/解码
  • JSON格式化
  • URL编码/解码
  • 手机号归属地
  • IP地址
  • 日期时间戳互转

计划中的功能

  • Cron表达式
  • MD5
  • 图片base64编码
  • 文件下载
  • js/css混淆压缩

插件安装步骤

  1. 从release中下载最新版本的代码
  2. 在IDEA中通过本地安装插件

功能介绍:

SQL转换成ElasticSearch查询语句

手写ElasticSearch的查询语句,语法记不住的可以使用下这个工具,常用的都能正常转换,如果遇到复杂的可能会转换出错,需要在再转换的结果中自行修改

Base64编码/解码

JSON格式化

IP地址

手机号归属地

URL编码/解码

日期时间戳互转

正则表达式

提供了常用的正则表达式匹配,当然自己也可以自己写表达式


写到最后(点关注,不迷路)

白嫖不好,创作不易,希望朋友们可以点赞评论关注三连,因为这些就是我分享的全部动力来源🙏

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python操作Kafka原理及使用详解

发表于 2021-02-02

Python操作Kafka原理及使用详解

一、什么是Kafka

Kafka是一个分布式流处理系统,流处理系统使它可以像消息队列一样publish或者subscribe消息,分布式提供了容错性,并发处理消息的机制

二、Kafka的基本概念

kafka运行在集群上,集群包含一个或多个服务器。kafka把消息存在topic中,每一条消息包含键值(key),值(value)和时间戳(timestamp)。

kafka有以下一些基本概念:

Producer - 消息生产者,就是向kafka broker发消息的客户端。

Consumer - 消息消费者,是消息的使用方,负责消费Kafka服务器上的消息。

Topic - 主题,由用户定义并配置在Kafka服务器,用于建立Producer和Consumer之间的订阅关系。生产者发送消息到指定的Topic下,消息者从这个Topic下消费消息。

Partition - 消息分区,一个topic可以分为多个 partition,每个

partition是一个有序的队列。partition中的每条消息都会被分配一个有序的id(offset)。

Broker - 一台kafka服务器就是一个broker。一个集群由多个broker组成。一个broker可以容纳多个topic。

Consumer Group - 消费者分组,用于归组同类消费者。每个consumer属于一个特定的consumer group,多个消费者可以共同消息一个Topic下的消息,每个消费者消费其中的部分消息,这些消费者就组成了一个分组,拥有同一个分组名称,通常也被称为消费者集群。

Offset - 消息在partition中的偏移量。每一条消息在partition都有唯一的偏移量,消息者可以指定偏移量来指定要消费的消息。

三、Kafka分布式架构

如上图所示,kafka将topic中的消息存在不同的partition中。如果存在键值(key),消息按照键值(key)做分类存在不同的partiition中,如果不存在键值(key),消息按照轮询(Round Robin)机制存在不同的partition中。默认情况下,键值(key)决定了一条消息会被存在哪个partition中。

partition中的消息序列是有序的消息序列。kafka在partition使用偏移量(offset)来指定消息的位置。一个topic的一个partition只能被一个consumer group中的一个consumer消费,多个consumer消费同一个partition中的数据是不允许的,但是一个consumer可以消费多个partition中的数据。

kafka将partition的数据复制到不同的broker,提供了partition数据的备份。每一个partition都有一个broker作为leader,若干个broker作为follower。所有的数据读写都通过leader所在的服务器进行,并且leader在不同broker之间复制数据。

上图中,对于Partition 0,broker 1是它的leader,broker 2和broker 3是follower。对于Partition 1,broker 2是它的leader,broker 1和broker 3是follower。

在上图中,当有Client(也就是Producer)要写入数据到Partition 0时,会写入到leader Broker 1,Broker 1再将数据复制到follower Broker 2和Broker 3。

在上图中,Client向Partition 1中写入数据时,会写入到Broker 2,因为Broker 2是Partition 1的Leader,然后Broker 2再将数据复制到follower Broker 1和Broker 3中。

上图中的topic一共有3个partition,对每个partition的读写都由不同的broker处理,因此总的吞吐量得到了提升。

四、kafka-python实现生产者消费者

kafka-python是一个python的Kafka客户端,可以用来向kafka的topic发送消息、消费消息。

这个实验会实现一个producer和一个consumer,producer向kafka发送消息,consumer从topic中消费消息。结构如下图

producer代码

consumer代码

接下来创建test topic

打开两个窗口中,我们在window1中运行producer,如下

在window2中运行consumer,如下

可以看到window2中的consumer成功的读到了producer写入的数据

五、消费组实现容错性机制

这个实验将展示消费组的容错性的特点。这个实验中将创建一个有2个partition的topic,和2个consumer,这2个consumer共同消费同一个topic中的数据。结构如下所示

producer部分代码和实验一相同,这里不再重复。consumer需要指定所属的consumer group,代码如下

接下来我们创建topic,名字test,设置partition数量为2

打开三个窗口,一个窗口运行producer,还有两个窗口运行consumer。

运行consumer的两个窗口的输出如下:

可以看到两个consumer同时运行的情况下,它们分别消费不同partition中的数据。window1中的consumer消费partition 0中的数据,window2中的consumer消费parition 1中的数据。

我们尝试关闭window1中的consumer,可以看到如下结果

刚开始window2中的consumer只消费partition1中的数据,当window1中的consumer退出后,window2中的consumer中也开始消费partition 0中的数据了。

六、offset管理

kafka允许consumer将当前消费的消息的offset提交到kafka中,这样如果consumer因异常退出后,下次启动仍然可以从上次记录的offset开始向后继续消费消息。

这个实验的结构和实验一的结构是一样的,使用一个producer,一个consumer,test topic的partition数量设为1。

producer的代码和实验一中的一样,这里不再重复。consumer的代码稍作修改,这里consumer中打印出下一个要被消费的消息的offset。consumer代码如下

在一个窗口中启动producer,在另一个窗口并且启动consumer。consumer的输出如下

可以尝试退出consumer,再启动consumer。每一次重新启动,consumer都是从offset=98的消息开始消费的。

修改consumer的代码如下,在consumer消费每一条消息后将offset提交回kafka

启动consumer

可以看到consumer从offset=98的消息开始消费,到offset=829时,我们Ctrl+C退出consumer。

我们再次启动consumer

可以看到重新启动后,consumer从上一次记录的offset开始继续消费消息。之后每一次consumer重新启动,consumer都会从上一次停止的地方继续开始消费。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…724725726…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%