开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

网络基础-字节序 网络和主机字节序 htons() ntoh

发表于 2017-11-16

英文版发表在:holmeshe.me

我在读一些网络系统的源码时,htons()和 ntohs() 是两个最开始困扰我的函数。所以我决定要重新学一下大学里的知识-字节序。

网络和主机字节序

字节序控制了一个字(word)是怎么存储在内存里,以及怎么在网络上传输的。在big-endian中,最高位的byte被存储在最低的地址,而在little-endian中,最高位则存在最高的地址。

正着

正着

反着

在物理媒介上放字节和铺地板一样,正着铺或者反着铺都可以。但是一个系统设计师就需要做个决定,来保证风格一致了。RFC 1700里面规定,网络协议里面都使用big-endian 字节序,但是一些主机的设计师明显不同意。X86使用little-endian;而ARM则两种都有可能。这就意味着同一个数据,在不同的系统中存放的实际值是不一样的。比如说,A1B2C3D4 (写成十进制是 2712847316)就会有两种存放方式,

在上图中,每一个框,比如那个A1 的框,代表一个字节。这里要注意,这个字“节序”是以“字节” (byte)而不是位(bit)为单位的。

其实计算机处理两种字节序都可以,但是人类就总会抱怨little-endian反了。为啥呢?难道把低位字节设置给低地址,高位字节给高地址不是最合乎逻辑的做法吗?

其实,这个是因为我们在纸上(另一种物理媒介)写这些数字时会在潜意识里用big-endian。还是那上面的数字举例(A1B2C3D4),我们的潜意识会在这个数字周围画上横纵坐标,

如果我们指挥潜意识从右往左画这个横坐标,也许我们就可以解决这个直觉和理性之间的矛盾了。

我觉得这么做也没啥不对的,毕竟在实际开发中本来就存在各种各样的坐标系,比如:

你觉得呢?

好了,下面我们来看看为啥这些理论这么重要,并且在实际中都是咋使用的。

htons() ntohs()

这两个函数就是用来解决网络和主机字节序不一致的问题。技术上来说,当主机在网络上通信时,这两个函数负责转换主机和网络字节序。如果主机和网络字节序是一样的(这代表主机和网络都用big-endian),这两个函数啥都不做。而当主机和网络字节序不一致的话,htons() 把little-endian转换成big-endian,而ntohs()把big-endian转换回为little-endian。

类似的还有一对函数,htonl()和ntohl()。这两个函数除了处理的数字比较大以外,其他的都和htons(),ntohs()一样,就不赘述了。

验证一下

[cpp] view plain copy print?

  1. #include <stdio.h>
  2. #define BITS_PER_BYTE 8
  3. int main() {
  4. unsigned int anint = 0xa1b2c3d4;
  5. unsigned char *truth = &anint;
  6. printf(“value in decimal: %u\n”, anint);
  7. printf(“0x”);
  8. for (int i = 0; i < sizeof(anint); i++) {
  9. printf(“%2X”, truth[i]);
  10. }
  11. printf(“\n”);
  12. unsigned int anint_net = htons(anint);
  13. truth = &anint_net;
  14. printf(“value in decimal after hton: %u\n”, anint_net);
  15. printf(“0x”);
  16. for (int i = 0; i < sizeof(anint_net); i++) {
  17. printf(“%02X”, truth[i]);
  18. }
  19. printf(“\n”);
  20. }
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
cpp复制代码#include <stdio.h>
#define BITS_PER_BYTE 8
int main() {
unsigned int anint = 0xa1b2c3d4;
unsigned char *truth = &anint;
printf("value in decimal: %u\n", anint);
printf("0x");
for (int i = 0; i < sizeof(anint); i++) {
printf("%2X", truth[i]);
}
printf("\n");
unsigned int anint_net = htons(anint);
truth = &anint_net;
printf("value in decimal after hton: %u\n", anint_net);
printf("0x");
for (int i = 0; i < sizeof(anint_net); i++) {
printf("%02X", truth[i]);
}
printf("\n");
}

结果是(在我的测试机上): [html] view plain copy print?

  1. value in decimal: 2712847316
  2. 0xD4C3B2A1
  3. value in decimal after hton: 54467
  4. 0xC3D40000
1
2
3
4
dns复制代码value in decimal: 2712847316
0xD4C3B2A1
value in decimal after hton: 54467
0xC3D40000

正如上述��,htons()把anint的原始的值转换成big-endian,用于准备网络传输。这个值会在接收方用ntohs()恢复成原始值。虽然这个接受方的部分没有展示,但是我相信你可以懂的。

咋判断字节序

我们可以复用上面��中的代码来实现一个函数来判断主机的字节序:

[cpp] view plain copy print?

  1. int isLittle() {
  2. unsigned int anint = 0xa1b2c3d4;
  3. unsigned char *truth = &anint;
  4. return truth[0] == 0xd4;
  5. }
1
2
3
4
5
cpp复制代码int isLittle() {
unsigned int anint = 0xa1b2c3d4;
unsigned char *truth = &anint;
return truth[0] == 0xd4;
}

实际上还有一个更简单的办法, [plain] view plain copy print?

  1. cpu | grep “Byte Order”
1
1c复制代码cpu | grep "Byte Order"

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何向6岁孩子解释编程

发表于 2017-11-16

原文:Explaining Programming to 6 Years Old Kids
作者:Tomek Kaczanowski
翻译:雁惊寒

摘要:本文作者图文并茂地介绍了自己给幼儿园的小朋友教授电脑基本知识的整个过程。如果你家里也有一个年龄相仿的小孩的话,你也试一下吧。以下是译文。

任务:向一群六岁的孩子解释你所从事的工作。
等级:高(我自己的女儿就是六岁)。

许多人分享了他们对这个“演讲”的想法:你可以在Stack Overflow上找到一些非常有趣的例子(例如,这里)。很多人认为,首先要让孩子们知道计算机是非常愚蠢的,然后准确地告诉他们要做什么以及如何做才能达到自己想要的结果。但是,我很怀疑孩子们听了这些之后能够了解到有关编程的什么东西。

关于这个成功的“演讲”,我的想法是这样的:

通过提问和绘图的方式让他们参与进来。原始的图纸非常大(是活动挂图),我一边讲解一边画画。这是让年幼的观众们参与进来的一个很好的方法(他们会很开心的笑话程序员稻草人,或指出小猪佩奇的样子并不是这样的,等等)。我现在手头并没有原始的图纸,当时忘了拍照了,所以我又画了一遍。

立足于他们已经知道的东西(例如:YouTube、笔记本电脑、平板电脑、DVD)。现在每个孩子都会在YouTube上看动画片,在平板电脑上玩游戏,看着粑粑麻麻在笔记本电脑上工作。他们也知道电影是保存在VCD/DVD上的,智能手机和平板电脑非常的相似(涉及到游戏和电影)。

我首先从最基本的问题开始,“谁知道程序员是做什么的?”我知道他们昨天已经讨论过这个了,所以,对于他们的一些想法我并没有感到惊讶。我告诉了他们我自己的定义:“程序员是编写计算机程序的人,他们知道如何告诉计算机应该做什么。程序员非常了解电脑。”

然后,我画了一个正在工作的程序员。

下一步是谈论电脑。所有的孩子都开始谈论起他们父母的笔记本电脑(他们反复谈论同一样事情的时候真的是非常的可爱)。等一小段时间之后,我打断了他们,告诉他们:在很多不同的东西里面都有电脑,例如智能手机和手表,还有汽车和电梯。我把提到的每个东西都画了出来,并用几个字来概括在什么情况下该用哪种电脑。我又问他们天气预报是如何实现的(一个聪明的孩子回答道:“卫星知道第二天的天气”),然后告诉他们是电脑根据数千个测量设备提供的数据计算出预测结果。

我打算把话题转移到电脑程序上来,所以,我提到,电脑之所以会干活都要归功于程序,是程序命令电脑去做某些事情。

我又把话题转到了YouTube,因为我知道孩子们很了解它。我画出了网站的界面,他们都很熟悉这个界面。

我问,YouTube上有多少部电影。一些小朋友开始迷糊了,但也有人喊道:“几百万个”或“无限个”。所以,我又问,是否有可能把所有的电影都放在一张光盘上呢。然后我画了很多碟片,并问道:“如果你告诉YouTube要观看小猪佩奇的某一集,它就能找到对应的光盘,这是如何做到的呢?”他们都沉默了,所以我画了一个数据库,并迅速描述了它的角色。

我要给他们灌输这样一个观念:他们看到的只是一个冰山一角,而程序员负责了更多的东西。我向他们解释了我们是如何处理各种各样的事情的:

  • 保护电影免受坏人的破坏(“*如果有人修改了数据库会怎么样,你会看到足球赛,而不是小猪佩奇?” 孩子们用沉默回答了我)
  • 制作备份(“*如果光盘坏了会怎么样?那么你永远都无法再看到小猪佩奇里猪爸爸挂照片那一集了?”孩子们再一次沉默了)
  • 让YouTube可以在不同的设备上播放(智能手机,平板电脑,笔记本电脑)
  • 展示广告
  • 计算观看每部电影的人数
  • 显示类似电影的缩略图等

这时是告诉他们编程是团队合作的最好的时机了。程序很庞大,需要具备不同领域的知识,需要很多人参与进来创造。在这一点上,我更新了第一幅画,添加了另一个程序员,让两个人坐在一起。

下一个主题是计算机的工作原理,当然是非常粗略的。我是这样告诉他们的:

  • 鼠标和键盘是人们用来给计算机下命令的
  • 电脑屏幕用于显示电脑的响应
  • 每台电脑里面都有一个处理器(你可以把它看成是电脑的大脑)
  • 还有一些磁盘供电脑存储信息

接着,我把重点放在处理器上,告诉他们它使用一种只有0和1组成的非常有趣的语言。然后,当我假装用一种只有A和B的语言说话时,大家都笑得很开心。接着,我解释说,程序员使用编程语言是为了能够更容易地与处理器进行交流。

随后,我给他们展示了最最简单的程序语句的例子(使用一些伪代码)。我想给他们展示更多的东西,但我觉得他们的注意力开始不集中了,所以我甚至没有讲for循环。

为了结束我的“演讲”,我再一次画了一些符号来总结我们讨论的东西。我一边画一边问问题“这是什么?”和“你还记得有关这个的一些什么吗?”诸如此类。一切进行得很顺利。

附加内容,要做的和不要做的事情,提示和技巧:

这比我预期的要更加好玩!我很认真地对待这个任务,做了充足的准备,并取得了一定的成果。孩子们对此很感兴趣,我想他们也许会记住一些内容的。

画图是正确的方法。它抓住了孩子们的注意力,通过对图上的东西进行指点,让我可以提醒他们几分钟前大家讨论的内容。

准备好一遍又一遍地重复内容。当我问他们知道有关语言的哪些东西时,一个人举手说:“我知道一点点英文”,然后他的伙伴也是这样。然后还有一个,接着是另一个,其余的人一个接一个地这么说。所有人关于英语的说法都完全一样。当你提到他们喜欢的主题时,例如动画片,场面会变得更加混乱。

有些孩子不管你做什么都会觉得无聊。他们集中注意力的时间太短了。

认真书写。例如,当你写的1看起来像7的时候,他们会抗议。

在演讲中,我明白没有任何必要去纠正他们。例如,我认为他们明白YouTube是将动画片放在DVD上的,而当你要求播放一部动画片时,电脑会选择正确的光盘并播放它。我认为这是非常好的。我希望他们了解这个概念(搜索是如何实现的,数据库的作用是什么),而不是技术细节。我觉得这是正确的教育方法。

我花了25分钟的时间来做这次演讲。已经够了,没必要再长了。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

痛入爽出 HTTP/2:入门简述 视频 参考 文章

发表于 2017-11-16

导语

之前在B站做过一段时间的编程视频,但是最近半年一直没有稳定产出。因为我发现视频涉及到的内容越来越进阶了,所以仅仅靠视频,观众的吸收程度不会高。现在我觉得要将文字教程和视频教程结合起来。我会在掘金放教程的文字部分,往B站和油腻的管子里塞视频部分。

警告 Warning!

  • 适合进阶级的开发者和天生的极客
  • 可能会引起编程经验不足或者没有 Web 经验的观众心理不适、呕吐感、等不良反应。
  • 在继续阅读之前,请咨询您的医生。不能放弃治疗!
  • 18岁以下请回避
  • 本文非常适合妹子入门编程!

教程大纲

  1. HTTP/2 简述
  2. 实现一个 HTTP/2 Python 库(你没有看错,我们要实现一个 Library)
  3. 用这个库实现一个支持 HTTP/2 的 Web 框架(一个字,就是干!)
  4. Production Ready 的 HTTP/2 开发工具

本教程之所以叫《痛入爽出》,是因为我们从实现开始讲起,再讲应用,难度由难到易。

但是教程走完了你会很爽。(或者我会很爽?)


我感觉我好脏

HTTP/2 简述

HTTP/2 简称 h2,是 HTTP 协议的正式二代。2015年5月14日,h2 被正式通过.到今年为止,h2 已被广泛使用。比如:Google,Facebook,Amazon,阿里等科技巨头。各大 CDN 也几乎都使用了 h2。这里我就不一一举例了,你打开浏览器的开发者模式一看便知。

那么,在 2017 的尾声,作为一名 Web 开发者,我认为 h2 是 2017 年最值得学习的技术。比人工智能、ES7都更加有直接的帮助。

h2 主要解决的是效率问题。这一点我会在视频中讲到。

我关注 4 点:

  1. Multiplexing (Stream)
  2. Binary Encoded
  3. Header Compression
  4. Server Push

小二,上代码!

下一期我们直接进入代码。你需要准备 Python3.5+。别问我为什么。都 2017 了,Python3 发布那年出生的孩子都谈念爱了。

用 Python 做演示是因为 Python 语法简单,即使不会 Python 的同学也可以看懂。完全不是因为我个人擅长 Python 什么的。(无形装逼,最为致命)

如果你等不及了,可以直接看 github.com/CreatCodeBu…


视频

B站

1小时写个后端框架 2:HTTP/2 的简述

油腻的管子

watch?v=NB9ExwvvSVk

参考

http2.github.io
RFC 7540

《Learning HTTP/2》 by Stephen Ludin & Javier Garza
这本书讲到了很多基本的知识点,是绝佳的入门书籍。

文章

下文

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【翻译】ElasticSearch官方文档-执行查询和过滤操

发表于 2017-11-16

本文翻译自:www.elastic.co/guide/en/el…

本文是Elasticsearch的入门文档,将会介绍ElasticSearch中的查询操作和过滤操作。

执行查询

现在我们已经看到了一些基本的搜索参数,让我们再深入查询DSL。我们先来看看返回的文档字段。默认情况下,完整的JSON文档作为所有搜索的一部分返回。这被称为源(搜索匹配中的_source字段)。如果我们不希望整个源文档返回,我们有能力只需要返回源内的几个字段。

此示例显示如何从搜索中返回两个字段account_number和balance(在_source之内):

1
2
3
4
5
复制代码GET /bank/_search
{
"query": { "match_all": {} },
"_source": ["account_number", "balance"]
}

请注意,上面的例子简单地减少了_source字段。它仍然会返回一个名为_source的字段,但在其中只包含字段account_number和balance。

如果你会一些SQL语句,则容易看出上述内容在概念上与SQL SELECT FROM字段列表有些相似。

现在我们来看看查询部分。以前,我们已经看过如何使用match_all查询来匹配所有文档。现在我们来介绍一个叫做匹配查询(match query)的新查询,这个查询可以被看作是基本的搜索查询(即针对特定字段或者字段集合进行的搜索)。

1
2
3
4
复制代码GET /bank/_search
{
"query": { "match": { "account_number": 20 } }
}

此示例返回地址中包含术语“mill”的所有帐户:

1
2
3
4
复制代码GET /bank/_search
{
"query": { "match": { "address": "mill" } }
}

此示例返回地址中包含术语“mill”或“lane”的所有帐户:

1
2
3
4
复制代码GET /bank/_search
{
"query": { "match": { "address": "mill lane" } }
}

这个例子是match(match_phrase)的一个变体,返回在地址中包含短语“mill lane”的所有账号:

1
2
3
4
复制代码GET /bank/_search
{
"query": { "match_phrase": { "address": "mill lane" } }
}

现在我们来介绍一下bool query。 bool查询允许我们使用布尔逻辑将更小的查询组合成更大的查询。

此示例组成两个match查询,并返回地址中包含“mill”和“lane”的所有帐户:

1
2
3
4
5
6
7
8
9
10
11
复制代码GET /bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}

在上面的例子中,bool must子句指定了一个文档被认为是匹配的所有查询。

相反,这个例子组成两个match查询,并返回地址中包含“mill”或“lane”的所有帐户:

1
2
3
4
5
6
7
8
9
10
11
复制代码GET /bank/_search
{
"query": {
"bool": {
"should": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}

在上面的例子中,bool should子句指定了一个查询列表,其中任何一个查询都必须是true,才能被视为匹配的文档。

本示例组成两个match查询,并返回地址中既不包含“mill”也不包含“lane”的所有帐户:

1
2
3
4
5
6
7
8
9
10
11
复制代码GET /bank/_search
{
"query": {
"bool": {
"must_not": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}

在上面的例子中,bool must_not子句指定了一个查询列表,其中任何一个查询都不能被匹配。

我们可以在一个bool查询中同时结合must,should和must_not子句。此外,我们可以在任何这些bool子句中编写bool查询来模拟任何复杂的多级布尔逻辑。

这个例子返回所有40岁但ID不为(aho)的人的账号:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码GET /bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "age": "40" } }
],
"must_not": [
{ "match": { "state": "ID" } }
]
}
}
}

执行过滤

在上一节中,我们跳过了一个称为文档分数(搜索结果中的_score字段)的细节。分数是一个数字值,它是文档与我们指定的搜索查询匹配度的相对度量。分数越高,文档越相关,分数越低,文档就越不相关。

但查询并不总是需要产生分数,特别是当它们仅用于“过滤”文档集时。 Elasticsearch检测这些情况并自动优化查询执行,以便不计算无用分数。

我们在前一节介绍的bool查询也支持过滤子句,它允许使用查询来限制将被其他子句匹配的文档,而不改变计算得分的方式。作为一个例子,我们来介绍一下范围查询(range query),它允许我们通过一系列值来过滤文档。这通常用于数字或日期过滤。

本示例使用bool查询返回余额在20000和30000之间的所有帐户。换句话说,我们要查找大于或等于20000且小于等于30000的帐户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码GET /bank/_search
{
"query": {
"bool": {
"must": { "match_all": {} },
"filter": {
"range": {
"balance": {
"gte": 20000,
"lte": 30000
}
}
}
}
}
}

解析上述内容,bool查询包含一个match_all查询(查询部分)和一个range查询(过滤器部分)。我们可以将其他查询替换为查询和过滤器部分。在上述情况下,范围查询是非常有意义的,因为落入该范围的文档全部匹配“相等”,即没有文档比另一个文档更加匹配。

除了match_all,match,bool和range查询之外,还有很多其他查询类型可用,我们不在这里介绍。由于我们已经对其工作原理有了一个基本的了解,所以将这些知识应用于其他查询类型的学习和实验并不难。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Nginx 源代码笔记 - 知识点

发表于 2017-11-16

Nginx 源代码笔记 - Prerequisite

本文对 Nginx 源代码中 HTTP 协议处理部分的零碎知识点进行汇总,以便日后查阅。

路很长,高人很多,要坚持,要抱着持续学习的态度。

本文目录

  • 调试工具
  • 只言片语
  • 请求阶段
    • POST_READ
    • SERVER_REWRITE
    • FIND_CONFIG
    • REWRITE
    • POST_REWRITE
    • PREACCESS
    • ACCESS
    • POST_ACCESS
    • TRY_FILES (PRECONTENT)
    • CONTENT
    • LOG
  • 模块顺序
  • 数据字段
    • ngx_connection_t::data
    • ngx_connection_t::buffered
    • ngx_http_request_t::postponed
    • ngx_http_request_t::posted_requests
    • ngx_http_request_t::valid_location
    • ngx_http_request_t::uri_changed
    • ngx_http_request_t::count
    • ngx_http_requeste_t::out
    • ngx_event_t::active
    • ngx_event_t::ready
  • 函数调用
    • ngx_http_close_connection
    • ngx_http_finalize_connection
    • ngx_http_termniate_request
    • ngx_http_free_request
    • ngx_http_close_request
    • ngx_http_finalize_request
    • ngx_event_t::handler
    • ngx_request_t::read_event_handler 和 write_event_handler
    • ngx_http_read_client_request_body
    • ngx_http_send_header 和 ngx_http_output_filter
    • ngx_handle_read_event 和 ngx_handle_write_event
  • 请求变量
    • $request_body

调试工具

  • 测试用例 - Test::Nginx (perl)
  • 检查内存泄漏 - valgrind (worker_processes 1; daemon off; master_process off;), no-pool-patch
1
2
3
4
复制代码valgrind --log-file=valgrid.log --leak-check=full --leak-resolution=high \
--track-origins=yes --show-reachable=yes

root@laptop:/usr/local/src/nginx-1.4.3# patch -p1 <../no-pool-nginx/nginx-1.4.3-no_pool.patch
  • valgrind stress file - 自动化 valgrind 检测。
  • 单步调试 - gdb, cgdb
  • 调用栈 - pstack, strace, lstrace, systemtap, -finstrument-functions, valgrind –tool=callgrind
  • 压力测试 - ab, httperf, wrk
  • 真实流量压测 - tcpcopy, goreplay, ngx-http-mirror-module

只言片语

  • 指令解析回调函数,可以返回字符串描述错误信息(”is duplicate”)。此错误信息会由 Nginx 拼接成完整信息 后,在 终端打印出来:
1
复制代码nginx: [emerg] "spent_prefix" directive is duplicate in /usr/local/nginx/conf/nginx.conf:101
  • 配置结构创建函数,返回 NULL 或者实际结构体。返回 NULL 时,Nginx 不会打印任何出错信息。所以 ,最好 使用 ngx_conf_log_error 函数手动打印出错信息。
  • Nginx 提供的类型转换符 %V 对应的类型是 ngx_str_t *,而不是 ngx_str_t 。
  • ngx_cpystrn(u_char *dst, u_char *src, size_t n) 函数的第三个参数 n 指示 dst 对应内存块 的最大长度 (including the terminating null byte,参考函数 snprintf 函数定义)。
  • ngx_str_t 类型只有两个成员,其中 ngx_str_t::data 指针指向字符串起始地址, ngx_str_t::len 表示字符串的有效长度。ngx_str_t::data 指向的可能并不是普通的字符串,未必会 以 \0 结尾,所以使用 ngx_str_t 时必须根据长度 ngx_str_t::len 的值确定字符串长度。
  • ngx_buf_t 和 ngx_chain_t
+ ngx\_buf\_t 可以表示的对象有:内存块、磁盘文件和特殊标志;
+ ngx\_buf\_t 通常和 ngx\_chain\_t 配对使用:ngx\_chain\_t 将 ngx\_buf\_t 包装成单链表结 点 然后 Nginx 使用由 ngx\_chain\_t 组成单链表来表示逻辑上连续的数据;
  • 分清以下四个回调函数的功能,它们是理解请求处理流程的关键:
1
2
3
4
复制代码c->read->handler = ngx_http_request_handler;
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;
r->write_event_handler = ngx_http_block_reading;

请求阶段

Nginx 将 HTTP 请求处理流程分为几个阶段(PHASE)进行,每个阶段对应的 phase checker 按注册顺序逐个 调用 各模块提供的回调函数,也就是 phase handler 。针对所有请求的 phase checker 及 phase
handler
执行顺序在 Nginx 进程启动时,在 ngx_http_init_phase_handlers 函数中定义。

下面列出 Nginx 定义的阶段,以及该阶段中 phase checker 可以处理的 phase handler 返回值。

POST_READ

该阶段的 phase checker 函数是 ngx_http_core_generic_phase 。各模块注册到该阶段的 phase
handler
可以使用如下返回值控制请求处理流程:

错误码 处理方式
NGX_OK 将请求转入下一处理阶段(跳过本阶段还未调用的 phase handler)
NGX_DECLINED 为请求调用下一个 phase handler(可能是本阶段的,也可能是下一个阶段的)
NGX_AGAIN/NGX_DONE 需对该请求再次调用当前 phase handler
其它 错误码是 NGX_ERROR 或者 NGX_HTTP_* 等等时,提前结束当前请求

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_realip_module - Change the client address to the one sent in the specificed header field.

SERVER_REWRITE

该阶段的 phase checker 函数为 ngx_http_core_rewrite_phase 。各模块注册到该阶段的 phase
handler
可以使用如下返回值控制请求处理流程:

错误码 处理方式
NGX_DECLINED 为请求调用下一个 phase handler(可能是本阶段的,也可能是下一个阶段的)
NGX_DONE 需对该请求再次调用当前 phase handler
其它 错误码是 NGX_OK, NGX_AGAIN, NGX_ERROR 或者 NGX_HTTP_xx 等等 时,提前结束当前请求

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_rewrite_module - 执行定义于 server {} 的 rewrite, set, if 等指令。

FIND_CONFIG

该阶段的 phase checker 函数为 ngx_http_core_find_config_phase 。该阶段属于 Nginx 内部流程, 不允 许模块注册 phase handler 。

ngx_http_core_find_config_phase 函数根据请求 uri 匹配合适的 location {} 配置块。

REWRITE

该阶段的 phase checker 函数为 ngx_http_core_rewrite_phase 。各模块注册到该阶段的 phase
handler
可以使用的返回值和 Nginx 对这些返回值的处理方式和 SERVER_REWRITE 阶段一致。

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_rewrite_module - 执行定义于 location {} 的 rewrite, set, if 等指令。

POST_REWRITE

该阶段的 phase checker 函数为 ngx_http_core_post_rewrite_phase 。该阶段属于 Nginx 内部流程, 不 允许模块注册 phase handler 。

该阶段检查 REWRITE 阶段的执行结果并执行不同逻辑:如果请求 uri 被上个阶段修改过的话 ( r->uri_changed = 1),将此请求转到 FIND_CONFIG 阶段,重新进行 location {} 查找和匹配;如 果请求 uri 未被上个阶段修改的话,继续为请求调用 PREACCESS 阶段的 phase handler 。

PREACCESS

该阶段的 phase checker 函数为 ngx_http_core_generic_phase 。各模块注册到该阶段的 phase
handler
可以使用的返回值和 Nginx 对这些返回值的处理方式和 POST_READ 阶段一致。

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_limit_conn_module - limits the number of connections per the defined key, in particular, the number of of connections from a single IP address.
  • ngx_http_limit_req_module - limits the request processing rate per the defined key, in particular, the processing rate of requests coming from a single IP address.
  • ngx_http_degradation_module - returns 204 or 444 code for some locations on low memory condition.
  • ngx_http_realip_module - Change the client address to the one sent in the specificed header field.

ACCESS

该阶段的 phase checker 函数为 ngx_http_core_access_phase 。各模块注册到该阶段的 phase
handler
可以使用如下返回值控制请求处理流程:

错误码 处理方式
r != r->main 当前请求是子请求,直接将其转入下一处理阶段
NGX_DECLINED 为请求调用下一个 phase handler(可能是本阶段的,也可能是下一个阶段的)
NGX_AGAIN/NGX_DONE 需对该请求再次调用当前 phase handler
NGX_OK * SATISFY_ALL: 为请求调用下一个 phase handler(可能是本阶段的,也可能是 下一个阶段的) * SATISFY_ANY: r->access_code 赋 0 值,并将请求转到下一处理阶段
NGX_HTTP_FORBIDDEN * SATISFY_ALL: 提前结束当前请求 * SATISFY_ANY: 将该错误码赋值给 r->access_code 变量,并为请求调用下一个 phase handler。
NGX_HTTP_UNAUTHORIZED 处理逻辑和 NGX_HTTP_FORBIDDEN 相同
其它 错误码是 NGX_ERROR 或者 NGX_HTTP_* 等等时,提前结束当前请求

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_auth_basic_module - limits access to resources by validating the user name and password using the “HTTP Basic Authentication” protocol.
  • ngx_http_access_module - limits access to certain client addresses.

POST_ACCESS

该阶段的 phase checker 函数为 ngx_http_core_post_access_phase 。该阶段属于 Nginx 内部流程, 不允 许模块注册 phase handler 。

该阶段检查 ACCESS 阶段的处理结果并执行不同逻辑:如果 r->access_code == NGX_HTTP_FORBIDDEN 则 提 前结束该请求处理(使用 NGX_HTTP_FORBIDDEN 作为 HTTP 响应码);如果 r->access_code 为其它 非 0 值,则提前结束该请求处理;如果 r->access_code == 0 值,为请求调用下一下
phase
handler
。

TRY_FILES (PRECONTENT)

NOTES:从 Nginx 1.13.4 开始,此阶段更名为 PRECONTENT ,并使用 ngx_http_core_generic_phase 作 为 phase checker 。try_files 指令功能由模块 ngx_http_try_files_module 提供。

1
2
3
4
5
6
7
8
9
10
复制代码commit 129b06dc5dfab7b4513a4f274b3778cd9b8a6a22
Author: Roman Arutyunyan <arut@nginx.com>
Date: Thu Jul 20 15:51:11 2017 +0300

Precontent phase.

The phase is added instead of the try_files phase. Unlike the old phase, the
new one supports registering multiple handlers. The try_files implementation is
moved to a separate ngx_http_try_files_module, which now registers a precontent
phase handler.

而在 Nginx 1.13.4 之前的版本,该阶段的 phase checker 函数为 ngx_http_core_try_files_phase 。 该阶段属于 Nginx 内部流程,不允许模块注册 phase handler 。

如果请求使用的 location {} 块未配置 try_files 指令,将该请求转入下一个 phase handler 。

如果请求使用的 location {} 中使用了 try_files 指令,那么继续检查该指令的参数:如果参数(最后 一个 参数除外)对应的磁盘静态文件存在,将静态文件内容返回给客户端;如果参数对应的磁盘静态文件都不存 在,使用函数 ngx_http_internal_redirect 将该请求重定向到 try_files 指令最后一个参数指定的 location 后, 重新处理该请求。

CONTENT

该阶段的 phase checker 函数为 ngx_http_core_content_phase 。各模块注册到该阶段的 phase
handler
可以使用如下返回值控制请求处理流程:

错误码 处理方式
r->content_handler 如果 location {} 配置有 content handler ,使用它处理请求,并忽略其它 phase handler
NGX_DECLINED 为请求调用下一个 phase handler。如果所有 phase handler 都已经被调用过后,则结束该 请求处理过程(使用响应码:NGX_HTTP_FORBIDDEN (请求 uri 以 ‘/‘ 结束) 或者 NGX_HTTP_NOT_FOUND (请求 uri 不以 ‘/‘ 结束))
其它 结束该请求处理流程(错误码作为 ngx_http_finalize_request 函数参数使用)

从上面的分析可以看到,请求的响应数据可以由 content handler 函数或者 phase handler 函数提供。 content handler 优先级比 phase handler 高,并且具有排他性。phase handler 可以看作是 CONTENT 阶段为请求提供的通用处理逻辑,而
content handler 是某个 location {} 块为请求提供 的 特殊处理逻辑。

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_random_index_module - processes requests ending with the slash character (‘/‘) and picks a random file in a directory to serve as an index file.
  • ngx_http_index_module - processes requests ending with the slash character (‘/‘).
  • ngx_http_autoindex_module - processes requests ending with the slash character (‘/‘) and produces a directory listings.
  • ngx_http_dav_module - intended for file management automation via the WebDAV protocol. The module processes HTTP and WebDAV methods PUT, DELETE, MKCOL, COPY, and MOVE.
  • ngx_http_gzip_static_module - allows sending precompressed files with the “.gz” filename extension instead of regular files.
  • ngx_http_static_module - 静态文件响应模块

Nginx 自带模块中提供了 content handler 的有:

  • ngx_http_fastcgi_module
  • ngx_http_scgi_module
  • ngx_http_memcached_module
  • ngx_http_proxy_module
  • ngx_http_stub_status_module
  • ngx_http_flv_module
  • ngx_http_mp4_module
  • ngx_http_empty_gif_module
  • ngx_http_perl_module
  • ngx_http_uwsgi_module

LOG

该阶段比较特殊,它并没有对应 phase checker ,该阶段的 phase handler 在请求处理结束时,由 ngx_http_log_request 函数直接调用。

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_log_module

模块顺序

标准配置下,filter 模块的调用顺序(和模块初始化顺序相反)如下:

调用顺序 模块名 提供的 filter
1 ngx_http_not_modified_filter_module header
2 ngx_http_range_body_filter_module body
3 ngx_http_copy_filter_module body
4 ngx_http_headers_filter_module header
third party filter goes here
6 ngx_http_userid_filter_module header
7 ngx_http_ssi_filter_module header and body
8 ngx_http_charset_filter_module header and body
9 ngx_http_postpone_filter_module body
10 ngx_http_gzip_filter_module header and body
11 ngx_http_range_header_filter_module header
12 ngx_http_chunked_filter_module header and body
13 ngx_http_header_filter_module header
14 ngx_http_write_filter_module body

数据字段

ngx_connection_t::data

这个字段有多种用途,它得值随着 ngx_connection_t 的状态变化而代表不同含义:

  • 当该连接被回收放入 Nginx 的空闲连接池(单链表)时,该指针字段充当用于单链表节点的 next 指针:
  • 客户端和 Nginx 建立 TCP 连接之后,Nginx 开始读取并处理该 TCP 连接上的 HTTP 请求数据之前,该指针字 段指向 ngx_http_connection_t 类型的变量。该变量中保存该 HTTP 请求对应的虚拟主机、虚拟主机的配 置结构体、 SSL 主机名称等 「HTTP 连接」相关的信息。
  • 在随后的 HTTP 请求处理过程(请求数据接收,响应数据生成等)中,该字段始终指向该 TCP 连接上的当前活 跃请求 (由于HTTP Pipeline 技术、Nginx 子请求机制等等,当前 TCP 连接上可能会对应多个请求实例)。该 请求的响应数据可以立即发送给客户端。

ngx_connection_t::buffered

TODO: More explanation needed.

ngx_http_request_t::postponed

当该指针字段值不是 NULL 时,指向一个节点类型是 ngx_http_postponed_request_t 的单链表。这个单 链表的节点中包含该请求创建的子请求( ngx_http_postponed_request_t::request ),或者该请求产生的 暂时还不能发送的响应数据( ngx_http_postponed_request_t::out )。

当该指针字段值是 NULL 时,该请求无待处理子请求。

ngx_http_request_t::posted_requests

该字段指向一个节点类型是 ngx_http_posted_request_t 的单链表。这个单链表中保存着该请求(主请求) 上的「就绪」请求,这些请求可以随时被 Nginx 调度处理。

Nginx 只在 “主请求” 上维护着「就绪」请求链表,「就緒」请求可以是主请求、子请求、子子请求等等。函数 ngx_http_post_request 用于向该链表中添加「就緒」请求 ,函数 ngx_http_run_posted_requests 用 于调度「就緒」请求。

ngx_http_request_t::valid_location

TODO: More explanation needed.

当 rewrite 指令使用 break 修改了 r->uri 后,此标志位变量被置为 0;如果此请求被 ngx_http_internal_redirect 函数在 Nginx 内部重定向的话,这个标志位被重置为 1。

ngx_http_request_t::uri_changed

TODO: More explanation needed.

当 rewrite 指令使用了非 break 修改了 r->uri 后,此标志位变量被置为 1;

ngx_http_request_t::count

请求引用计数。关于它的作用,在《深入理解 Nginx:模块开发与架构解析》第 1 版的 11.8 节有过介绍:

1
2
3
4
5
6
7
8
复制代码> 在 HTTP 模块中每进行一类新的(异步)操作,包括为一个请求添加新的事件,或者把一些已经
> 由定时器、epoll 中移除事件重新加放其中,都需要把这个请求的引用计数加 1.这是因为需要让
> HTTP 框架知道,HTTP 模块对于该请求有独立的异步处理机制,将由该 HTTP 模块决定这个操作
> 什么时候结束,防止在这个操作还未结束时 HTTP 框架却把这个请求销毁了(如其它 HTTP 模块通
> 过调用 ngx_http_finalize_request 方法要求 HTTP 框架结束请求),异致请求出现不可知的
> 严重错误。这就要求每个操作在 “认为” 自身的动作结束时,都得最终调用
> ngx_http_close_request 方法,该方法会自动检查引用计数,当引用计数为 0 时才真正地销销
> 毁请求。

总结:要在当前请求执行流程中新增异步执行分支时,就需要为主请求增加一次引用计数。例如,需要通过「可读 事件」读取请求包体时;创建子请求时;

ngx_http_requeste_t::out

使用 ngx_chain_t 串起来的 ngx_buf_t 链表,用于保存待发送响应数据。

另外,Nginx 模块通过调用函数 ngx_http_output_filter 经过 output filter chain 发送响应数据。如果 响应数据已经被 output filter 处理过,但是因网络原因,没有完全发出,可以在连接的「可写事件」发生后, 通过 ngx_http_output_filter(r, NULL) 的调用形式继续发送。

ngx_event_t::active

Nginx 使用的事件模块是否正在管理该事件结构体。

ngx_event_t::ready

该事件结构体是否就绪(有未处理事件)。

函数调用

ngx_http_close_connection

该函数用于关闭连接。

函数签名:

1
2
复制代码void
ngx_http_close_connection(ngx_connect_t *c);

主要功能:

函数 ngx_close_connection 从事件模块中将该连接注册的事件(网络事件、超时事件)全部清理掉后, 关闭 底层 socket 描述符。同时,调用 ngx_free_connection 函数将连接的结构体存入单链表 ngx_cycle->free_connections 以便下次使用。

该函数对连接对应的 SSL 相关结构进行清理,关闭连接对应的内存池,然后调用 ngx_close_connection 函数 完成其它清理工作。

ngx_http_finalize_connection

该函数在请求正常处理完成后,调用 ngx_http_close_request 关闭请求(请求引用计数减一),并判断需要 关闭连 接或是使其保持连通(keepalive 连接)。

函数签名:

1
2
复制代码static void
ngx_http_finalize_connection(ngx_http_request_t *r);

ngx_http_termniate_request

强制清理并销毁请求。

函数签名:

1
2
复制代码static void
ngx_http_terminate_request(ngx_http_request_t *r, ngx_int_t rc);

主要功能:
该函数调用为在请求上注册的清理函数(cleanup handler),并强制将引用计数置 1,然后使用 nxx_http_close_request 函数销毁该请求。

ngx_http_free_request

该函数对请求进行清理和销毁。

函数签名:

1
2
复制代码void
ngx_http_free_request(ngx_http_request_t *r, ngx_int_t rc);

主要功能:
该函数调用为在请求上注册的清理函数(cleanup handler)、为该请求调用 LOG 阶段的 phase
handler
, 然后销毁请求内存池。

ngx_http_close_request

该函数关闭对请求的一次引用。

函数签名:

1
2
复制代码static void
ngx_http_close_request(ngx_http_request_t *r, ngx_int_t rc);

主要功能:
该函数将请求 r 所属的主请求( r->main )的引用计数( r->main->count )减 1。如果主请 求引用计数等于 0,调用 ngx_http_free_request 函数清理并销毁主请求,调用 ngx_http_close_connection 函数清理并销毁请求所用的连接。

ngx_http_finalize_request

该函数根据请求处理结果,决定请求接下来的处理流程。

函数签名:

1
2
复制代码void
ngx_http_finalize_request(ngx_http_request_t *r, ngx_int_t rc);

主要功能:
该函数功能比较复杂,逻辑分支较多。接下来,我们根据参数 rc 值和请求当前的状态,总结一下该函 数的主要分支流程和处理方式(使用源代码版本号:1.13.6):

顺序 分支条件 处理方式
函数开始
1 rc == NGX_DONE 请求当次动作已经处理完成,但是该请求上还有其它进行中的异步动作。调用 ngx_http_finalize_connection 函数将请求引用计数减一。 RETURN
2 rc == DECLINED content handler 无法处理该请求,将其交由 CONTENT 阶段的 phase handler 处理。 RETURN
3 rc 值是 NGX_ERROR , NGX_HTTP_REQUEST_TIMEOUT , NGX_HTTP_CLIENT_CLOSE_REQUEST 错误码之一,或者 c->error == 1 该请求所在的连接上发生了错误,调用 ngx_http_terminate_request 强制清理 销毁该请求。 RETURN
4 rc >= NGX_HTTP_SPECIAL_RESPONSE 或者 rc 值是 NGX_HTTP_CREATED , NGX_HTTP_NO_CONTENT 中的一个 如果 rc 值是 NGX_HTTP_CLOSE (大于 NGX_HTTP_SPECIAL_RESPONSE ),立即调 用 ngx_http_terminate_request 销毁请求;其它情况调用函数 ngx_http_special_response_handler 重新生成此请求的响应数据。这时需要再次 调用函数 ngx_http_finalize_request 处理 ngx_http_special_response_handler 的返回值。RETURN
请求响应发送完毕、请求响应数据未发送完毕、请求是子请求 等等正常业务流程继续向下进行分支处理
5 r != r->main 当前请求是「子请求」时,函数继续以下处理流程:1. 如果该子请求是「后台子请求」,Nginx 认为此时该子请求已经处理完成。 置其 r->done 值为 1,随后调用函数 ngx_http_finalize_connection 将其占 用的引用计数清除。RETURN * 「后台子请求」并不参与响应生成流程,也就不会影响主请求的响应时长。但它 会保持对主请求的引用。从 Nginx 目前实现来看,只能对这类请求调用一次 ngx_http_finalize_request 。 2. 如果该子请求非「后台子请求」,并且响应数据未完全发送( r->buffered == 1 ) 或者该子请求创建的子请求未全部完成( r->postponed != NULL ),使用函数 ngx_http_set_write_handler 为其注册写事件处理函数 ngx_http_writer 待有事件发生时继续处理该请求。RETURN * 如果因为 socket 发送缓存区满,数据未能完全发送,也就是 r->buffered == 1 时,那么缓存区一旦空闲,那么「可写事件」会再次触发该子请求的处理逻辑; * 如果因为该子请求非「连接活跃请求」,也就是 r->postponed != NULL 时,事 件循环并不会重复抛出「可写事件」造成该处逻辑重复执行。 3. 对于己处理完成的普通子请求,如果它是「连接活跃请求」( c->data == r ), 那么将其标记为己完成( r->done = 1 ),并将其父请求( r->parent )设为 「连接活跃请求」。随后调用函数 ngx_http_post_request 将父请求添加到主 请求的 posted_request 链表中,以便由函数 ngx_http_run_posted_requests 调用其写回调函数 write_event_handler ,启动其处理流程。RETURN
6 r == r->main 当前请求是「主请求」,函数继续以下处理流程:1. 如果请求数据未发送完毕( r->buffered
函数结束

ngx_event_t::handler

事件循环直接调用的事件回调函数。

函数签名:

1
复制代码typedef void (*ngx_event_handler_pt)(ngx_event_t *ev);

针对 Nginx HTTP 模块来说,Nginx 接入客户端连接后,为此连接分配 ngx_connection_t 结构体。该结构体 含有 对应「可读事件」和「可写事件」的两个 ngx_event_t 类型成员 read 和 write ,这两个成 员的回调函数 会在事件发生时被调用。下面我们简要列出随着请求状态改变,这两个回调函数的值变化过程:

  1. Nginx 调用函数 ngx_event_accept 接入连接,并为其分配 ngx_connection_t 结构:
1
2
3
4
5
6
7
8
9
复制代码void
ngx_event_accept(ngx_event_t *ev)
{
...
c = ngx_get_connection(s, ev->log);
...
ls->handler(c);
...
}
  1. 随后,Nginx 调用函数 ngx_http_init_connection 设置「读事件」回调函数,为接收请求数据做准备:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码void
ngx_http_init_connection(ngx_connection_t *c)
{
...
rev = c->read;
rev->handler = ngx_http_wait_request_handler;
c->write->handler = ngx_http_empty_handler;
...
if (rev->ready) {
...
rev->handler(rev);
return;
}
...
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
...
}
}
  1. 函数 ngx_http_wait_request_handler 从连接读取部分数据,然后创建请求结构体 ngx_request_t 。然后 调整「可读事件」回调函数为 ngx_http_process_request_line ,用于接收和分析请求 status line。这期间 如果已接收数据不足构成完整 status line 的话,Nginx 会为该连接注册「可读事件」,等新 数据到达后,继续向下 执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码void
ngx_http_wait_request_handler(ngx_event_t *rev)
{
...
c->data = ngx_http_create_request(c);
rev->handler = ngx_http_process_request_line;
ngx_http_process_request_line(rev);
}

void
ngx_http_process_request_line(ngx_event_t *rev)
{
...
for (;;) {
if (rc == NGX_AGAIN) {
n = ngx_http_read_request_header(r);
if (n == NGX_AGAIN || n == NGX_ERROR) {
return;
}
...
}
...
rev->handler = ngx_http_process_request_headers;
ngx_http_process_request_headers(rev);
}

}
  1. 请求 status line 处理完毕后,Nginx 调整「读事件」函数 ngx_http_process_request_headers ,用于 接收 和分析请求 headers。这期间如果已接收数据不足构成完整 header 的话,Nginx 会为该连接注册「可读 事件」,等新数据到达后,继续向下执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码void
ngx_http_process_request_headers(ngx_event_t *ev)
{
...
n = ngx_http_read_request_header(r);
if (n == NGX_AGAIN || n == NGX_ERROR) {
return;
}
...
if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
...
ngx_http_process_request(r);
return;
}
...
}
  1. 请求包头数据接收并解析完成后,调用函数 ngx_http_process_request 开启处理请求,生成响应数据的 流程。 此时,Nginx 才为连接设定有效的「可写事件」回调函数。
* 接下来,「可读事件」和「可写事件」均由回调函数 ngx\_http\_request\_handler 处理,它会根据事件 类型 调用请求结构体 ngx\_request\_t 中 read\_event\_handler 和 write\_event\_handler 成员 指向的函数。
* 由于 Nginx 已经获取了所需要的请求数据,所以再有「可读事件」发生时,它使用函数 ngx\_http\_blocking\_reading 暂时屏蔽该事件。
* 在本步之前,并不需要处理连接上的「可写事件」,而此后,我们就需要使用此事件驱动请求完成 PHASE 处 理流程了( ngx\_http\_core\_run\_phases )。
* 同时,每次事件发生时,Nginx 还会调用函数 ngx\_http\_run\_posted\_requests 触发「就绪」 (*posted* , 有 “张贴”、“发布” 的意思,也就是说这类请求已经处于就绪状态,等待被调度处理)请求 的处理流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码void
ngx_http_process_request(ngx_event_t *ev)
{
...
c->read->handler = ngx_http_request_handler
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;

ngx_http_handler(r);
ngx_http_run_posted_requests(c);
}

void
ngx_http_handler(ngx_http_request_t *r)
{
...
r->write_event_handler = ngx_http_core_run_phases;
ngx_http_core_run_phases(r);
}

void
ngx_http_request_handler(ngx_event_t *ev)
{
...
if (ev->write) {
r->write_event_handler(r);
} else {
r->read_event_handler(r);
}

ngx_http_run_posted_requests(c);
}
  1. 从此以后,该请求的「可读事件」和「可写事件」回调函数基本就不再变化了。请求在不同阶段的处理流程由 ngx_request_t::read_event_handler 和 write_event_handler 这两个函数决定。

ngx_request_t::read_event_handler 和 write_event_handler

请求数据接收完毕后,请求处理进入响应生成和响应发送阶段。这两个函数用于在不同处理阶段驱动请求的处理流 程。

1
2
3
4
5
6
7
8
9
10
11
复制代码write_event_handler:
ngx_http_core_run_phases
ngx_http_writer
ngx_http_request_finalizer
ngx_http_terminate_handler

read_event_handler:
ngx_http_block_reading
ngx_http_read_client_request_body_handler
ngx_http_discarded_request_body_handler
ngx_http_upstream_process_header

ngx_http_read_client_request_body

读取请求包体,并将其存到 ngx_http_request_t::request_body 成员中。

该函数会增加主请求引用计数,原因如下:

Nginx 调用函数 ngx_http_read_client_request_body 读取请求包体时,可能需要通过事件处理机制监听连 接上的「可读事件」,使用读回调函数继续读取数据。而请求的其它流程处理完「可写事件」过程中,如果因为 正常流程或者处理异常需要销毁请求时,在没有引用计数的保护下,会造成「可读事件」流程使用的请求失效, 给整个进程带来严重后果。

ngx_http_send_header 和 ngx_http_output_filter

用于开始发送响应包头和包体。 ngx_http_send_header 只能调用一次,而 ngx_http_output_filter 可 以多次调用。但是每次 ngx_http_output_filter 需要发送响应包体的不同部分,上次未发出的响应数据会暂 存到 ngx_http_request_t 中。调用 ngx_http_output_filter(r, NULL) 可再次尝试发送响应数据。

ngx_handle_read_event 和 ngx_handle_write_event

向事件循环注册 「可读事件」和「可写事件」,并确保不会重复注册事件(如果 ngx_event_t 对应的事件正 在被事件循环监听的话,该函数直接返回)。

请求变量

$request_body

It contains the body of the request. 这个变量当且仅当 Nginx 读取了请求包体,并且请求包体没有被写入临 时文件时,才能从它得到包体数据。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Elasticsearch Java API 索引的增删改查

发表于 2017-11-16

Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)

本节介绍以下 CRUD API:

单文档 APIs

  • Index API
  • Get API
  • Delete API
  • Delete By Query API
  • Update API

多文档 APIs

  • Multi Get API
  • Bulk API
  • Using Bulk Processor

Multi Get API Bulk API

注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

Index API

Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

这里有几种不同的方式来产生JSON格式的文档(document):

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来序列化beans,如Jackson
  • 使用内置的帮助类 XContentFactory.jsonBuilder()

手动方式

数据格式

1
2
3
4
5
复制代码String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码/**  
* 手动生成JSON
*/
@Test
public void CreateJSON(){

String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

Map方式

Map是key:value数据类型,可以代表json结构.

1
2
3
4
复制代码Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码 /**  
* 使用集合
*/
@Test
public void CreateList(){

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

序列化方式

ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

1
2
3
4
5
6
7
复制代码import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码/**  
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{

CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(csdn);

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}

XContentBuilder帮助类方式

ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

1
2
3
4
5
6
7
8
9
10
复制代码// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码/**  
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{

XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();

IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");


}

综合实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
复制代码 
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CreateIndex {

private TransportClient client;

@Before
public void getClient() throws Exception{
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名
//创建client
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}

/**
* 手动生成JSON
*/
@Test
public void CreateJSON(){

String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}


/**
* 使用集合
*/
@Test
public void CreateList(){

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

/**
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{

CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(csdn);

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}

/**
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{

XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();

IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");


}

}

你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

Get API

根据id查看文档:

1
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多请查看 rest get API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false :

1
2
3
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();

Delete API

根据ID删除:

1
复制代码DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多请查看 delete API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false :

1
2
3
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
1
2
3
复制代码DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();

Delete By Query API

通过查询条件删除

1
2
3
4
5
6
7
复制代码BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
.source("persons") //index(索引名)
.get(); //执行

long deleted = response.getDeleted(); //删除文档的数量

如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询
.source("persons") //index(索引名)
.execute(new ActionListener<BulkByScrollResponse>() { //回调监听
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted(); //删除文档的数量
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});

Update API

有两种方式更新索引:

  • 创建 UpdateRequest,通过client发送;
  • 使用 prepareUpdate() 方法;

使用UpdateRequest

1
2
3
4
5
6
7
8
9
复制代码UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();

使用 prepareUpdate() 方法

这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

1
2
3
4
5
6
7
8
9
10
复制代码client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE
.get();

client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() //合并到现有文档
.startObject()
.field("gender", "male")
.endObject())
.get();

Update by script

使用脚本更新文档

1
2
3
复制代码UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

合并文档

1
2
3
4
5
6
复制代码UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();

Upsert

更新插入,如果存在文档就更新,如果不存在就插入

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();

如果 index/type/1 存在,类似下面的文档:

1
2
3
4
复制代码{
"name" : "Joe Dalton",
"gender": "male"
}

如果不存在,会插入新的文档:

1
2
3
4
复制代码{
"name" : "Joe Smith",
"gender": "male"
}

Multi Get API

一次获取多个文档

1
2
3
4
5
6
7
8
9
10
11
12
复制代码MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1") //一个id的方式
.add("twitter", "tweet", "2", "3", "4") //多个id的方式
.add("another", "type", "foo") //可以从另外一个索引获取
.get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { //判断是否存在
String json = response.getSourceAsString(); //_source 字段
}
}

更多请浏览REST multi get 文档

Bulk API

Bulk API,批量插入:

1
复制代码import static org.elasticsearch.common.xcontent.XContentFactory.*;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
//处理失败
}

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

1
2
3
4
5
复制代码import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //增加elasticsearch客户端
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions

@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败

@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } //调用失败抛 Throwable
})
.setBulkActions(10000) //每次10000请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
.setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
.setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();

BulkProcessor 默认设置

  • bulkActions 1000
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requests到BulkProcessor

1
2
复制代码bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭 Bulk Processor

当所有文档都处理完成,使用awaitClose 或 close 方法关闭BulkProcessor:

1
复制代码bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

或

1
复制代码bulkProcessor.close();

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot的自动配置、Command-line

发表于 2017-11-16

接下来关于SpringBoot的一系列文章和例子,都来自《Spring Boot Cookbook》这本书,本文的主要内容是start.spring.io的使用、Spring Boot的自动配置以及CommandRunner的角色和应用场景。

start.spring.io的使用

首先带你浏览start.spring.io/,在这个网址中有一些Spring Boot提供的组件,然后会给你展示如何让你的Spring工程变得“Bootiful”,我称之为“Boot化”。

在网站Spring Initializr上填对应的表单,描述Spring Boot项目的主要信息,例如Project Metadata、Dependency等。在Project Dependencies区域,你可以根据应用程序的功能需要选择相应的starter。

Spring Boot starters可以简化Spring项目的库依赖管理,将某一特定功能所需要的依赖库都整合在一起,就形成一个starter,例如:连接数据库、springmvc、spring测试框架等等。简单来说,spring boot使得你的pom文件从此变得很清爽且易于管理。

常用的starter以及用处可以列举如下:

  • spring-boot-starter: 这是核心Spring Boot starter,提供了大部分基础功能,其他starter都依赖于它,因此没有必要显式定义它。
  • spring-boot-starter-actuator:主要提供监控、管理和审查应用程序的功能。
  • spring-boot-starter-jdbc:该starter提供对JDBC操作的支持,包括连接数据库、操作数据库,以及管理数据库连接等等。
  • spring-boot-starter-data-jpa:JPA starter提供使用Java Persistence API(例如Hibernate等)的依赖库。
  • spring-boot-starter-data-*:提供对MongoDB、Data-Rest或者Solr的支持。
  • spring-boot-starter-security:提供所有Spring-security的依赖库。
  • spring-boot-starter-test:这个starter包括了spring-test依赖以及其他测试框架,例如JUnit和Mockito等等。
  • spring-boot-starter-web:该starter包括web应用程序的依赖库。

实践

首先我们要通过start.spring.io创建一个图书目录管理程序,它会记录出版图书的记录,包括作者、审阅人、出版社等等。我们将这个项目命名为BookPub,具体的操作步骤如下:

点击“Switch to the full version.”,展示完整页面;

  • Group设置为:site.javadu;
  • Artifact设置为:bookpub;
  • Name设置为:BookPub;
  • Package Name设置为:site.javadu.bookpub;
  • Packaging代表打包方式,我们选jar;
  • Spring Boot Version选择最新的1.3.0;
  • 创建Maven工程,当然,对Gradle比较熟悉的同学可以选择Gradle工程。
  • 点击“Generate Project”下载工程包。

利用IDEA导入下载的工程,可以看到pom文件的主体如下如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>site.javadu</groupId>
<artifactId>bookpub</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>BookPub</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.M6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>

Spring Boot的自动配置

在Spring Boot项目中,xxxApplication.java会作为应用程序的入口,负责程序启动以及一些基础性的工作。@SpringBootApplication是这个注解是该应用程序入口的标志,然后有熟悉的main函数,通过SpringApplication.run(xxxApplication.class, args)来运行Spring Boot应用。打开SpringBootApplication注解可以发现,它是由其他几个类组合而成的:@Configuration(等同于spring中的xml配置文件,使用Java文件做配置可以检查类型安全)、@EnableAutoConfiguration(自动配置,稍后细讲)、@ComponentScan(组件扫描,大家非常熟悉的,可以自动发现和装配一些Bean)。

我们在pom文件里可以看到,com.h2database这个库起作用的范围是runtime,也就是说,当应用程序启动时,如果Spring Boot在classpath下检测到org.h2.Driver的存在,会自动配置H2数据库连接。现在启动应用程序来观察,以验证我们的想法。打开shell,进入项目文件夹,利用mvn spring-boot:run启动应用程序,如下图所示。

2017-11-1520.59.01.png

可以看到类似No active profile set, falling back to default profiles: default、Building JPA container EntityManagerFactory for persistence unit ‘default、HHH000412: Hibernate Core {5.2.12.Final}、HHH000400: Using dialect: org.hibernate.dialect.H2Dialect这些信息;我们在pom文件里选择了jdbc和jpa等starters,Spring
Boot将自动创建JPA容器,并使用Hibernate4.3.11,使用H2Dialect管理H2数据库(内存数据库)。

使用Command-line runners

有时候需要在应用启动后立马执行一些任务,进行上下文的初始化、测试数据的灌入等等。这时候我们可以使用SpringBoot提供的CommandLineRunner接口。我们新建一个StartupRunner类,该类实现CommandLineRunner接口,这个接口只有一个函数:public void run(String... args),而且这个方法会在应用程序启动后首先被调用。

实践

  • 在src/main/java/site/javadu/bookpub/下建立StartRunner类,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码package site.javadu.bookpub;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;

/**
* 作用: 启动器
* User: duqi
* Date: 2017/11/15
* Time: 21:09
*/
public class StarterRunner implements CommandLineRunner {

private static final Logger logger = LoggerFactory.getLogger(StarterRunner.class);

@Override
public void run(String... args) throws Exception {
logger.info("hello word");
}
}
  • 在BookPubApplication类中创建bean对象,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码package site.javadu.bookpub;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class BookPubApplication {

public static void main(String[] args) {
SpringApplication.run(BookPubApplication.class, args);
}

/**
* 在spring容器中定义StarterRunner对应的bean
* @return
*/
@Bean
public StarterRunner starterRunner() {
return new StarterRunner();
}
}

还是用mvn spring-boot:run命令启动程序,可以看到hello的输出,如下图所示。

StartRunnerHelloWord.png

对于那种只需要在应用程序启动时执行一次的任务,非常适合利用Command line runners来完成。Spring Boot应用程序在启动后,会遍历CommandLineRunner接口的实例并运行它们的run方法。也可以利用@Order注解(或者实现Order接口)来规定所有CommandLineRunner实例的运行顺序。

利用command-line runner的这个特性,再配合依赖注入,可以在应用程序启动时后首先引入一些依赖bean,例如data source、rpc服务或者其他模块等等,这些对象的初始化可以放在run方法中。不过,需要注意的是,在run方法中执行初始化动作的时候一旦遇到任何异常,都会使得应用程序停止运行,因此最好利用try/catch语句处理可能遇到的异常。

参考资料 & 源码

  1. 《Spring Boot Cookbook》
  2. bookpub

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

自己实现蝇量级Java MVC框架

发表于 2017-11-16

Tiny Java MVC Framework.

Requirements

  • Java8+

Dependency

Apache Maven

1
2
3
4
5
复制代码`<dependency>`
<groupId>com.nosuchfield</groupId>
<artifactId>geisha</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>

Apache Buildr

1
复制代码`'com.nosuchfield:geisha:jar:1.0.0-RELEASE'`

Apache Ivy

1
复制代码`<dependency org="com.nosuchfield" name="geisha" rev="1.0.0-RELEASE" />`

Groovy Grape

1
2
3
复制代码`@Grapes(` 
@Grab(group='com.nosuchfield', module='geisha', version='1.0.0-RELEASE')
)

Gradle/Grails

1
复制代码`compile 'com.nosuchfield:geisha:1.0.0-RELEASE'`

Scala SBT

1
复制代码`libraryDependencies += "com.nosuchfield" % "geisha" % "1.0.0-RELEASE"`

Leiningen

1
复制代码`[com.nosuchfield/geisha "1.0.0-RELEASE"]`

Example

1
2
3
4
5
6
7
8
9
10
复制代码@Component
@RequestMapping("/person")
public class Hello {

@RequestMapping("/info")
public String hello(@Param("name") String name, @Param("age") String age) {
return "hello " + name + ", your age is " + Integer.valueOf(age);
}

}
1
2
3
4
5
6
7
复制代码public class Application {

public static void main(String[] args) {
Geisha.run();
}

}

Run Application and visit http://127.0.0.1:5200/person/info?name=张三&age=18

Result:

1
复制代码`hello 张三, your age is 18`

License GPL

Project License can be found here.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 设计模式之装饰模式(八)

发表于 2017-11-16

一、前言

本篇主题为结构型模式中的第三个模式–装饰模式。上篇 Java 设计模式主题为《Java 设计模式之桥接模式(七)》。

二、简单介绍

# 2.1 定义

装饰(Decorator)模式又叫做包装模式,其功能是动态地给一个对象添加一些额外的职责。就增加功能来说,装饰模式相比生成子类更为灵活,是继承关系的一个替换方案。

# 2.2 参与角色

  1. Component:定义一个对象接口,可以给这些对象动态地添加职责。
  2. ConcreteComponent:定义一个对象,可以给这个对象添加一些职责。
  3. Decorator:维持一个指向 Component 对象的指针,并定义一个与 Component 接口一致的接口。
  4. ConcreteDecorator:向组件添加职责。

# 2.3 应用场景

  1. 在不影响其他对象的情况下,以动态、透明的方式给单个对象添加职责。
  2. 当不能采用生成子类的方法进行扩充时。

三、实现方式

我们以人的打扮为例。人打扮需要穿衣,穿裤,穿鞋子。代码表示如下:

Person 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
复制代码public class Person {



private String name;



public Person(String name) {

this.name = name;

}



public void putOnClothes() {

System.out.println(this.name + "穿衣服");

}



public void putOnTrousers() {

System.out.println(this.name + "穿裤子");

}



public void putOnShoes() {

System.out.println(this.name + "穿鞋子");

}

}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码public class Client {



public static void main(String[] args) {



Person person = new Person("小白");



person.putOnClothes();

person.putOnTrousers();

person.putOnShoes();

}

}

打印:

1
2
3
4
5
复制代码小白穿衣服

小白穿裤子

小白穿鞋子

上述代码很简单,但是扩展性不好。当我们需要添加打领带、戴手表的行为时,需要修改 Person 类,违背了开放封闭原则。

因此,我们需要将人和打扮的行为抽离出来:

Person 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码public class Person {



private String name;



public Person(String name) {

this.name = name;

}



public String getName() {

return name;

}



}

打扮类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
复制代码public abstract class DressUp {



public abstract void dressup(Person person);

}





class ClothesDressUp extends DressUp {



@Override

public void dressup(Person person) {

System.out.println(person.getName() + "穿衣服");

}



}



class TrousersDressUp extends DressUp {



@Override

public void dressup(Person person) {

System.out.println(person.getName() + "穿裤子");

}



}



class ShoesDressUp extends DressUp {



@Override

public void dressup(Person person) {

System.out.println(person.getName() + "穿鞋子");

}



}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码public class Client {



public static void main(String[] args) {

Person person = new Person("小白");



DressUp du1 = new ClothesDressUp();

du1.dressup(person);



DressUp du2 = new TrousersDressUp();

du2.dressup(person);



DressUp du3 = new ShoesDressUp();

du3.dressup(person);

}

}

执行结果与上文的一致。现在,当我们添加新的打扮行为时,只需新增 DressUp 的子类即可。

但是,上边的代码没有封装性,每打扮一次都要调用 dressup 方法一次,就感觉人是光着身在公共场合进行打扮穿衣、穿鞋。因此,我们需要一种模式将这些打扮的细节封装起来,就像建造者模式一样。

不过,此次的需求不能使用建造者模式。因为建造者模式封装过程/细节是一个固定的顺序/模式,而当前需求是人的打扮,打扮的行为是多种多样的,如:穿衣穿裤、穿衣打领带、穿鞋戴手表等。

这样就引出了本章的主题–装饰模式:

Person 接口与实现类(Component 和 ConcreteComponent):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码public interface Person {



public void decorate();

}



class Man implements Person {



@Override

public void decorate() {

System.out.println("男人打扮");

}



}

装饰类(Decorator 和 ConcreteDecorator):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
复制代码public class Decorator implements Person {



private Person person;



public Decorator(Person person) {

this.person = person;

}



@Override

public void decorate() {

this.person.decorate();

}



}



class ClothesDecorator extends Decorator {



public ClothesDecorator(Person person) {

super(person);

}



public void decorate() {

super.decorate();

System.out.println("穿衣服");

}

}



class TrousersDecorator extends Decorator {



public TrousersDecorator(Person person) {

super(person);

}



public void decorate() {

super.decorate();

System.out.println("穿裤子");

}

}



class ShoesDecorator extends Decorator {



public ShoesDecorator(Person person) {

super(person);

}



public void decorate() {

super.decorate();

System.out.println("穿鞋子");

}

}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
复制代码public class Client {



public static void main(String[] args) {



Person person = new Man();



Person decorator = new Decorator(person);



System.out.println("======第一种打扮=======");



ClothesDecorator cd = new ClothesDecorator(decorator);



TrousersDecorator td = new TrousersDecorator(cd);



td.decorate();



System.out.println("======第二种打扮=======");



ShoesDecorator sd = new ShoesDecorator(person);



sd.decorate();

}

}

打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码======第一种打扮=======

男人打扮

穿衣服

穿裤子

======第二种打扮=======

男人打扮

穿鞋子

总结:装饰模式有效地把类的核心职责和装饰功能区分开来,而且去除了相关类的重复的装饰逻辑。

UML 类图表示如下:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

微服务MySQL分库分表数据到MongoDB同步方案

发表于 2017-11-15

在现有的架构中做大数据分析,第一步面临的问题就是数据如何从关系型数据库到非关系数据库,网上有很多的解决方案,我们也经过了很多的摸索,经历了三套方案的实践,最终使用了canal。这是我们大数据部门的一个同事张同睿写的文章,分享给大家,如果感兴趣后面可以进一步的介绍。

需求背景

近年来,微服务概念持续火热,网络上针对微服务和单体架构的讨论也是越来越多,面对日益增长的业务需求是,很多公司做技术架构升级时优先选用微服务方式。我所在公司也是选的这个方向来升级技术架构,以支撑更大访问量和更方便的业务扩展。

发现问题

微服务拆分主要分两种方式:拆分业务系统不拆分数据库,拆分业务系统拆分库。如果数据规模小的话大可不必拆分数据库,因为拆分数据看必将面对多维度数据查询,跨进程之间的事务等问题。而我所在公司随着业务发展单数据库实例已经不能满足业务需要,所以选择了拆分业务系统同时拆分数据库的模式,所以也面临着以上的问题。本文主要介绍多维度数据实时查询解决方案。当前系统架构和存储结构如下:

解决思路

  • 要对多数据库数据进行查询,首先就需要将数据库同步到一起以方便查询
  • 为了满足大数据量数据需求,所以优先选择NOSQL数据库做同步库
  • NOSQL数据库基本无法进行关联查询,所以需要将关系数据进行拼接操作,转换成非关系型数据
  • 业务多维度查询需要实时性,所以需要选择NOSQL中实时性相对比较好的数据库:MongoDB

根据以上思路,总结数据整合架构如下图所示:

解决方案

目前网上一些数据同步案例分两种:MQ消息同步和binlog数据读取同步

先说MQ消息同步,该同步方式我所在公司试用过一段时间,发现以下问题:

  • 数据围绕业务进行,对业务关键性数据操作发送MQ消息,对业务系统依赖性比较高
  • 对于数据库中存量数据需要单独处理
  • 对于工具表还需要单独维护同步
  • 每次新增数据表都需要重新添加MQ逻辑

考虑到以上问题,用MQ方式同步数据最优解决办法

使用binlog 数据读取方式目前有一些成熟方案,比如tungsten replicator,但这些同步工具只能实现数据1:1复制,数据复制过程自定义逻辑添加比较麻烦,不支持分库分表数据归集操作。综上所述,最优方案应该是读取后binlog后自行处理后续数据逻辑。目前binlog读取binlog工具中最成熟的方案应该就是alibaba开源的canal了。

canal

canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制. 都是基于canal做的,应用广泛。canal原理相对比较简单:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

canal介绍: https://github.com/alibaba/canal/wiki

我使用的是canal的HA模式,由zookeeper选举可用实例,每个数据库一个instance,服务端配置如下:

目录:

1
2
3
4
5
6
复制代码`conf`
   database1
       -instance.properties
   database2
       -instance.properties
   canal.properties

instance.properties

1
2
3
4
5
6
7
8
9
10
11
复制代码`canal.instance.mysql.slaveId = 1001`
canal.instance.master.address = X.X.X.X:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*
canal.instance.filter.black.regex =

canal.properties

1
2
3
4
5
6
7
8
9
10
11
复制代码`canal.id= 1`
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 ...
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

部署数据流如下:

tip:*虽然canal同时支持mixed和row类型的binlog日志,但是获取行数据时如果是mixed类型的日志则获取不到表名,所以本方案暂只支持row格式的binlog*

数据同步

创建canal client应用订阅canal读取的binlog数据

1.开启多instance 订阅,订阅多个instance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> 复制代码`public void initCanalStart() {`
>    List<String> destinations = canalProperties.getDestination();
>    final List<CanalClient> canalClientList = new ArrayList<>();
>    if (destinations != null && destinations.size() > 0) {
>     `for (String destination : destinations) {`
>            // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
>            CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
>            CanalClient client = new CanalClient(destination, connector);
>            canalClientList.add(client);
>            client.start();
>        }
>    }
>    Runtime.getRuntime().addShutdownHook(new Thread() {
>        public void run() {
>            try {
>                logger.info("## stop the canal client");
>                for (CanalClient canalClient : canalClientList) {
>                    canalClient.stop();
>                }
>            } catch (Throwable e) {
>                logger.warn("##something goes wrong when stopping canal:", e);
>            } finally {
>                logger.info("## canal client is down.");
>            }
>        }
>    });
> }
>
>

订阅消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
> 复制代码`private void process() {`
>    int batchSize = 5 * 1024;
>    while (running) {
>        try {
>            MDC.put("destination", destination);
>            connector.connect();
>            connector.subscribe();
>            while (running) {
>                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
>                long batchId = message.getId();
>                int size = message.getEntries().size();
>                if (batchId != -1 && size > 0) {
>                    saveEntry(message.getEntries());
>                }
>             `connector.ack(batchId); // 提交确认`
>                // connector.rollback(batchId); // 处理失败, 回滚数据
>            }
>        } catch (Exception e) {
>            logger.error("process error!", e);
>        } finally {
>            connector.disconnect();
>            MDC.remove("destination");
>        }
>    }}
>
>

根据数据库事件处理消息,过滤消息列表,对数据变动进行处理,用到信息为:

  • insert :schemaName,tableName,beforeColumnsList
  • update :schemaName,tableName,afterColumnsList
  • delete :schemaName,tableName,afterColumnsList
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
> 复制代码`RowChange rowChage = null;`
>    try {
>        rowChage = RowChange.parseFrom(entry.getStoreValue());
>    } catch (Exception e) {
>        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
>    }
>    EventType eventType = rowChage.getEventType();
>    logger.info(row_format,
>            entry.getHeader().getLogfileName(),
>            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
>            entry.getHeader().getTableName(), eventType,
>            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
>    if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
>        logger.info(" sql ----> " + rowChage.getSql());
>        continue;
>    }
>   `DataService dataService = SpringUtil.getBean(DataService.class);`
>    for (RowData rowData : rowChage.getRowDatasList()) {
>        if (eventType == EventType.DELETE) {
>            dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else if (eventType == EventType.INSERT) {
>            dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else if (eventType == EventType.UPDATE) {
>            dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else {
>            logger.info("未知数据变动类型:{}", eventType);
>        }
>    }
> }
>
>

ColumnsList转换成MongoTemplate 可用的数据类:DBObject,顺便做下数据类型转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
> 复制代码`public static DBObject columnToJson(List<CanalEntry.Column> columns) {`
>    DBObject obj = new BasicDBObject();
>    try {
>        for (CanalEntry.Column column : columns) {
>            String mysqlType = column.getMysqlType();
>            //int类型,长度11以下为Integer,以上为long
>            if (mysqlType.startsWith("int")) {
>                int lenBegin = mysqlType.indexOf('(');
>                int lenEnd = mysqlType.indexOf(')');
>                if (lenBegin > 0 && lenEnd > 0) {
>                    int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));
>                    if (length > 10) {
>                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>                        continue;
>                    }
>                }
>             `obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));`
>            } else if (mysqlType.startsWith("bigint")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>            } else if (mysqlType.startsWith("decimal")) {
>                int lenBegin = mysqlType.indexOf('(');
>                int lenCenter = mysqlType.indexOf(',');
>                int lenEnd = mysqlType.indexOf(')');
>                if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {
>                    int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));
>                    if (length == 0) {
>                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>                        continue;
>                    }
>                }
>             `obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));`
>            } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
>            } else if (mysqlType.equals("date")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
>            } else if (mysqlType.equals("time")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
>            } else {
>                obj.put(column.getName(), column.getValue());
>            }
>        }
>    } catch (ParseException e) {
>        e.printStackTrace();        }
>   `return obj;`
> }
>
>

tip:*DBObject对象如果同时用于保存原始数据和组合数据或其他数据,使用时应该深度拷贝对象生成副本,然后使用副本*

数据拼接

我们获取了数据库数据后做拼接操作,比如两张用户表:

1
2
复制代码`user_info:{id,user_no,user_name,user_password}`
user_other_info:{id,user_no,idcard,realname}

拼接后mongo数据为:

1
复制代码user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})

接收到的数据信息很多,如何才能简单的触发数据拼接操作呢?

先看我们能获取的信息:schemaName,tableName,DBObject,Event(insert,update,delete)

将这些信息标识拼接起来看看:/schemaName/tableName/Event(DBObject),没错,就是一个标准的restful链接。只要我们实现一个简单的springMVC 就能自动获取需要的数据信息进行拼接操作。

先实现@Controller,定义名称为Schema,value对应schemaName

1
2
3
4
5
6
7
8
9
> 复制代码`@Target({ElementType.TYPE})`
> @Retention(RetentionPolicy.RUNTIME)
> @Documented
> @Component
> public  @interface Schema {
> String value() default "";
> }
>
>

然后实现@RequestMapping,定义名称为Table,直接使用Canal中的EventType 对应RequestMethod

1
2
3
4
5
6
7
8
9
> 复制代码`@Target({ElementType.METHOD, ElementType.TYPE})`
> @Retention(RetentionPolicy.RUNTIME)
> @Documented
> public  @interface Table {
>    String value() default "";
>    CanalEntry.EventType[] event() default {};
> }
>
>

然后创建springUtil,实现接口ApplicationContextAware,应用启动 加载的时候初始化两个Map:intanceMap,handlerMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> 复制代码`@Override`
> public void setApplicationContext(ApplicationContext applicationContext) {
>    if (SpringUtil.applicationContext == null) {
>        SpringUtil.applicationContext = applicationContext;
>        //初始化instanceMap数据
>        instanceMap();
>        //初始化handlerMap数据
>        handlerMap();
>    }
> }private void instanceMap() {
>    Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Schema.class);
>    for (Object bean : beans.values()) {
>        Class<?> clazz = bean.getClass();
>        Object instance = applicationContext.getBean(clazz);
>        Schema schema = clazz.getAnnotation(Schema.class);
>        String key = schema.value();
>        instanceMap.put(key, instance);
>        logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());
>    }
> }private void handlerMap(){  ...}
>
>

调用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> 复制代码`public static void doEvent(String path, DBObject obj) throws Exception {`
>    String[] pathArray = path.split("/");
>    if (pathArray.length != 4) {
>        logger.info("path 格式不正确:{}", path);
>        return;
>    }
>    Method method = handlerMap.get(path);
>    Object schema = instanceMap.get(pathArray[1]);
>    //查找不到映射Bean和Method不做处理
>    if (method == null || schema == null) {
>        return;    }
>    try {
>        long begin = System.currentTimeMillis();
>        logger.info("integrate data:{},{}", path, obj);
>        method.invoke(schema, new Object[]{obj});
>        logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);
>    } catch (Exception e) {
>        logger.error("调用组合逻辑异常", e);
>        throw new Exception(e.getCause());
>    }
> }
>
>

数据拼接消息处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> 复制代码`@Schema("demo_user")``public class UserService {`
>    @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})
>    public void saveUser_UserInfo(DBObject userInfo) {
>        String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();
>        DBCollection collection = completeMongoTemplate.getCollection("user");
>        DBObject queryObject = new BasicDBObject("user_no", userNo);
>        DBObject user = collection.findOne(queryObject);
>        if (user == null) {
>            user = new BasicDBObject();
>            user.put("user_no", userNo);
>            user.put("userInfo", userInfo);
>            collection.insert(user);
>        } else {                        DBObject updateObj = new BasicDBObject("userInfo", userInfo);
>            DBObject update = new BasicDBObject("$set", updateObj);
>            collection.update(queryObject, update);
>        }
>    }
> }
>
>

示例源码

https://github.com/zhangtr/canal-mongo

原文出处:http://www.torry.top/2017/10/22/canal-mongodb/

推荐阅读

  • 微服务(Microservice)那点事
  • 微服务架构的理论基础 - 康威定律
  • 从架构演进的角度聊聊Spring Cloud都做了些什么?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…389390391…399

开发者博客

3990 日志
1304 标签
RSS
© 2024 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%