开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

痛入爽出 HTTP/2:入门简述 视频 参考 文章

发表于 2017-11-16

导语

之前在B站做过一段时间的编程视频,但是最近半年一直没有稳定产出。因为我发现视频涉及到的内容越来越进阶了,所以仅仅靠视频,观众的吸收程度不会高。现在我觉得要将文字教程和视频教程结合起来。我会在掘金放教程的文字部分,往B站和油腻的管子里塞视频部分。

警告 Warning!

  • 适合进阶级的开发者和天生的极客
  • 可能会引起编程经验不足或者没有 Web 经验的观众心理不适、呕吐感、等不良反应。
  • 在继续阅读之前,请咨询您的医生。不能放弃治疗!
  • 18岁以下请回避
  • 本文非常适合妹子入门编程!

教程大纲

  1. HTTP/2 简述
  2. 实现一个 HTTP/2 Python 库(你没有看错,我们要实现一个 Library)
  3. 用这个库实现一个支持 HTTP/2 的 Web 框架(一个字,就是干!)
  4. Production Ready 的 HTTP/2 开发工具

本教程之所以叫《痛入爽出》,是因为我们从实现开始讲起,再讲应用,难度由难到易。

但是教程走完了你会很爽。(或者我会很爽?)


我感觉我好脏

HTTP/2 简述

HTTP/2 简称 h2,是 HTTP 协议的正式二代。2015年5月14日,h2 被正式通过.到今年为止,h2 已被广泛使用。比如:Google,Facebook,Amazon,阿里等科技巨头。各大 CDN 也几乎都使用了 h2。这里我就不一一举例了,你打开浏览器的开发者模式一看便知。

那么,在 2017 的尾声,作为一名 Web 开发者,我认为 h2 是 2017 年最值得学习的技术。比人工智能、ES7都更加有直接的帮助。

h2 主要解决的是效率问题。这一点我会在视频中讲到。

我关注 4 点:

  1. Multiplexing (Stream)
  2. Binary Encoded
  3. Header Compression
  4. Server Push

小二,上代码!

下一期我们直接进入代码。你需要准备 Python3.5+。别问我为什么。都 2017 了,Python3 发布那年出生的孩子都谈念爱了。

用 Python 做演示是因为 Python 语法简单,即使不会 Python 的同学也可以看懂。完全不是因为我个人擅长 Python 什么的。(无形装逼,最为致命)

如果你等不及了,可以直接看 github.com/CreatCodeBu…


视频

B站

1小时写个后端框架 2:HTTP/2 的简述

油腻的管子

watch?v=NB9ExwvvSVk

参考

http2.github.io
RFC 7540

《Learning HTTP/2》 by Stephen Ludin & Javier Garza
这本书讲到了很多基本的知识点,是绝佳的入门书籍。

文章

下文

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【翻译】ElasticSearch官方文档-执行查询和过滤操

发表于 2017-11-16

本文翻译自:www.elastic.co/guide/en/el…

本文是Elasticsearch的入门文档,将会介绍ElasticSearch中的查询操作和过滤操作。

执行查询

现在我们已经看到了一些基本的搜索参数,让我们再深入查询DSL。我们先来看看返回的文档字段。默认情况下,完整的JSON文档作为所有搜索的一部分返回。这被称为源(搜索匹配中的_source字段)。如果我们不希望整个源文档返回,我们有能力只需要返回源内的几个字段。

此示例显示如何从搜索中返回两个字段account_number和balance(在_source之内):

1
2
3
4
5
复制代码GET /bank/_search
{
"query": { "match_all": {} },
"_source": ["account_number", "balance"]
}

请注意,上面的例子简单地减少了_source字段。它仍然会返回一个名为_source的字段,但在其中只包含字段account_number和balance。

如果你会一些SQL语句,则容易看出上述内容在概念上与SQL SELECT FROM字段列表有些相似。

现在我们来看看查询部分。以前,我们已经看过如何使用match_all查询来匹配所有文档。现在我们来介绍一个叫做匹配查询(match query)的新查询,这个查询可以被看作是基本的搜索查询(即针对特定字段或者字段集合进行的搜索)。

1
2
3
4
复制代码GET /bank/_search
{
"query": { "match": { "account_number": 20 } }
}

此示例返回地址中包含术语“mill”的所有帐户:

1
2
3
4
复制代码GET /bank/_search
{
"query": { "match": { "address": "mill" } }
}

此示例返回地址中包含术语“mill”或“lane”的所有帐户:

1
2
3
4
复制代码GET /bank/_search
{
"query": { "match": { "address": "mill lane" } }
}

这个例子是match(match_phrase)的一个变体,返回在地址中包含短语“mill lane”的所有账号:

1
2
3
4
复制代码GET /bank/_search
{
"query": { "match_phrase": { "address": "mill lane" } }
}

现在我们来介绍一下bool query。 bool查询允许我们使用布尔逻辑将更小的查询组合成更大的查询。

此示例组成两个match查询,并返回地址中包含“mill”和“lane”的所有帐户:

1
2
3
4
5
6
7
8
9
10
11
复制代码GET /bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}

在上面的例子中,bool must子句指定了一个文档被认为是匹配的所有查询。

相反,这个例子组成两个match查询,并返回地址中包含“mill”或“lane”的所有帐户:

1
2
3
4
5
6
7
8
9
10
11
复制代码GET /bank/_search
{
"query": {
"bool": {
"should": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}

在上面的例子中,bool should子句指定了一个查询列表,其中任何一个查询都必须是true,才能被视为匹配的文档。

本示例组成两个match查询,并返回地址中既不包含“mill”也不包含“lane”的所有帐户:

1
2
3
4
5
6
7
8
9
10
11
复制代码GET /bank/_search
{
"query": {
"bool": {
"must_not": [
{ "match": { "address": "mill" } },
{ "match": { "address": "lane" } }
]
}
}
}

在上面的例子中,bool must_not子句指定了一个查询列表,其中任何一个查询都不能被匹配。

我们可以在一个bool查询中同时结合must,should和must_not子句。此外,我们可以在任何这些bool子句中编写bool查询来模拟任何复杂的多级布尔逻辑。

这个例子返回所有40岁但ID不为(aho)的人的账号:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码GET /bank/_search
{
"query": {
"bool": {
"must": [
{ "match": { "age": "40" } }
],
"must_not": [
{ "match": { "state": "ID" } }
]
}
}
}

执行过滤

在上一节中,我们跳过了一个称为文档分数(搜索结果中的_score字段)的细节。分数是一个数字值,它是文档与我们指定的搜索查询匹配度的相对度量。分数越高,文档越相关,分数越低,文档就越不相关。

但查询并不总是需要产生分数,特别是当它们仅用于“过滤”文档集时。 Elasticsearch检测这些情况并自动优化查询执行,以便不计算无用分数。

我们在前一节介绍的bool查询也支持过滤子句,它允许使用查询来限制将被其他子句匹配的文档,而不改变计算得分的方式。作为一个例子,我们来介绍一下范围查询(range query),它允许我们通过一系列值来过滤文档。这通常用于数字或日期过滤。

本示例使用bool查询返回余额在20000和30000之间的所有帐户。换句话说,我们要查找大于或等于20000且小于等于30000的帐户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码GET /bank/_search
{
"query": {
"bool": {
"must": { "match_all": {} },
"filter": {
"range": {
"balance": {
"gte": 20000,
"lte": 30000
}
}
}
}
}
}

解析上述内容,bool查询包含一个match_all查询(查询部分)和一个range查询(过滤器部分)。我们可以将其他查询替换为查询和过滤器部分。在上述情况下,范围查询是非常有意义的,因为落入该范围的文档全部匹配“相等”,即没有文档比另一个文档更加匹配。

除了match_all,match,bool和range查询之外,还有很多其他查询类型可用,我们不在这里介绍。由于我们已经对其工作原理有了一个基本的了解,所以将这些知识应用于其他查询类型的学习和实验并不难。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Nginx 源代码笔记 - 知识点

发表于 2017-11-16

Nginx 源代码笔记 - Prerequisite

本文对 Nginx 源代码中 HTTP 协议处理部分的零碎知识点进行汇总,以便日后查阅。

路很长,高人很多,要坚持,要抱着持续学习的态度。

本文目录

  • 调试工具
  • 只言片语
  • 请求阶段
    • POST_READ
    • SERVER_REWRITE
    • FIND_CONFIG
    • REWRITE
    • POST_REWRITE
    • PREACCESS
    • ACCESS
    • POST_ACCESS
    • TRY_FILES (PRECONTENT)
    • CONTENT
    • LOG
  • 模块顺序
  • 数据字段
    • ngx_connection_t::data
    • ngx_connection_t::buffered
    • ngx_http_request_t::postponed
    • ngx_http_request_t::posted_requests
    • ngx_http_request_t::valid_location
    • ngx_http_request_t::uri_changed
    • ngx_http_request_t::count
    • ngx_http_requeste_t::out
    • ngx_event_t::active
    • ngx_event_t::ready
  • 函数调用
    • ngx_http_close_connection
    • ngx_http_finalize_connection
    • ngx_http_termniate_request
    • ngx_http_free_request
    • ngx_http_close_request
    • ngx_http_finalize_request
    • ngx_event_t::handler
    • ngx_request_t::read_event_handler 和 write_event_handler
    • ngx_http_read_client_request_body
    • ngx_http_send_header 和 ngx_http_output_filter
    • ngx_handle_read_event 和 ngx_handle_write_event
  • 请求变量
    • $request_body

调试工具

  • 测试用例 - Test::Nginx (perl)
  • 检查内存泄漏 - valgrind (worker_processes 1; daemon off; master_process off;), no-pool-patch
1
2
3
4
复制代码valgrind --log-file=valgrid.log --leak-check=full --leak-resolution=high \
--track-origins=yes --show-reachable=yes

root@laptop:/usr/local/src/nginx-1.4.3# patch -p1 <../no-pool-nginx/nginx-1.4.3-no_pool.patch
  • valgrind stress file - 自动化 valgrind 检测。
  • 单步调试 - gdb, cgdb
  • 调用栈 - pstack, strace, lstrace, systemtap, -finstrument-functions, valgrind –tool=callgrind
  • 压力测试 - ab, httperf, wrk
  • 真实流量压测 - tcpcopy, goreplay, ngx-http-mirror-module

只言片语

  • 指令解析回调函数,可以返回字符串描述错误信息(”is duplicate”)。此错误信息会由 Nginx 拼接成完整信息 后,在 终端打印出来:
1
复制代码nginx: [emerg] "spent_prefix" directive is duplicate in /usr/local/nginx/conf/nginx.conf:101
  • 配置结构创建函数,返回 NULL 或者实际结构体。返回 NULL 时,Nginx 不会打印任何出错信息。所以 ,最好 使用 ngx_conf_log_error 函数手动打印出错信息。
  • Nginx 提供的类型转换符 %V 对应的类型是 ngx_str_t *,而不是 ngx_str_t 。
  • ngx_cpystrn(u_char *dst, u_char *src, size_t n) 函数的第三个参数 n 指示 dst 对应内存块 的最大长度 (including the terminating null byte,参考函数 snprintf 函数定义)。
  • ngx_str_t 类型只有两个成员,其中 ngx_str_t::data 指针指向字符串起始地址, ngx_str_t::len 表示字符串的有效长度。ngx_str_t::data 指向的可能并不是普通的字符串,未必会 以 \0 结尾,所以使用 ngx_str_t 时必须根据长度 ngx_str_t::len 的值确定字符串长度。
  • ngx_buf_t 和 ngx_chain_t
+ ngx\_buf\_t 可以表示的对象有:内存块、磁盘文件和特殊标志;
+ ngx\_buf\_t 通常和 ngx\_chain\_t 配对使用:ngx\_chain\_t 将 ngx\_buf\_t 包装成单链表结 点 然后 Nginx 使用由 ngx\_chain\_t 组成单链表来表示逻辑上连续的数据;
  • 分清以下四个回调函数的功能,它们是理解请求处理流程的关键:
1
2
3
4
复制代码c->read->handler = ngx_http_request_handler;
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;
r->write_event_handler = ngx_http_block_reading;

请求阶段

Nginx 将 HTTP 请求处理流程分为几个阶段(PHASE)进行,每个阶段对应的 phase checker 按注册顺序逐个 调用 各模块提供的回调函数,也就是 phase handler 。针对所有请求的 phase checker 及 phase
handler
执行顺序在 Nginx 进程启动时,在 ngx_http_init_phase_handlers 函数中定义。

下面列出 Nginx 定义的阶段,以及该阶段中 phase checker 可以处理的 phase handler 返回值。

POST_READ

该阶段的 phase checker 函数是 ngx_http_core_generic_phase 。各模块注册到该阶段的 phase
handler
可以使用如下返回值控制请求处理流程:

错误码 处理方式
NGX_OK 将请求转入下一处理阶段(跳过本阶段还未调用的 phase handler)
NGX_DECLINED 为请求调用下一个 phase handler(可能是本阶段的,也可能是下一个阶段的)
NGX_AGAIN/NGX_DONE 需对该请求再次调用当前 phase handler
其它 错误码是 NGX_ERROR 或者 NGX_HTTP_* 等等时,提前结束当前请求

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_realip_module - Change the client address to the one sent in the specificed header field.

SERVER_REWRITE

该阶段的 phase checker 函数为 ngx_http_core_rewrite_phase 。各模块注册到该阶段的 phase
handler
可以使用如下返回值控制请求处理流程:

错误码 处理方式
NGX_DECLINED 为请求调用下一个 phase handler(可能是本阶段的,也可能是下一个阶段的)
NGX_DONE 需对该请求再次调用当前 phase handler
其它 错误码是 NGX_OK, NGX_AGAIN, NGX_ERROR 或者 NGX_HTTP_xx 等等 时,提前结束当前请求

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_rewrite_module - 执行定义于 server {} 的 rewrite, set, if 等指令。

FIND_CONFIG

该阶段的 phase checker 函数为 ngx_http_core_find_config_phase 。该阶段属于 Nginx 内部流程, 不允 许模块注册 phase handler 。

ngx_http_core_find_config_phase 函数根据请求 uri 匹配合适的 location {} 配置块。

REWRITE

该阶段的 phase checker 函数为 ngx_http_core_rewrite_phase 。各模块注册到该阶段的 phase
handler
可以使用的返回值和 Nginx 对这些返回值的处理方式和 SERVER_REWRITE 阶段一致。

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_rewrite_module - 执行定义于 location {} 的 rewrite, set, if 等指令。

POST_REWRITE

该阶段的 phase checker 函数为 ngx_http_core_post_rewrite_phase 。该阶段属于 Nginx 内部流程, 不 允许模块注册 phase handler 。

该阶段检查 REWRITE 阶段的执行结果并执行不同逻辑:如果请求 uri 被上个阶段修改过的话 ( r->uri_changed = 1),将此请求转到 FIND_CONFIG 阶段,重新进行 location {} 查找和匹配;如 果请求 uri 未被上个阶段修改的话,继续为请求调用 PREACCESS 阶段的 phase handler 。

PREACCESS

该阶段的 phase checker 函数为 ngx_http_core_generic_phase 。各模块注册到该阶段的 phase
handler
可以使用的返回值和 Nginx 对这些返回值的处理方式和 POST_READ 阶段一致。

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_limit_conn_module - limits the number of connections per the defined key, in particular, the number of of connections from a single IP address.
  • ngx_http_limit_req_module - limits the request processing rate per the defined key, in particular, the processing rate of requests coming from a single IP address.
  • ngx_http_degradation_module - returns 204 or 444 code for some locations on low memory condition.
  • ngx_http_realip_module - Change the client address to the one sent in the specificed header field.

ACCESS

该阶段的 phase checker 函数为 ngx_http_core_access_phase 。各模块注册到该阶段的 phase
handler
可以使用如下返回值控制请求处理流程:

错误码 处理方式
r != r->main 当前请求是子请求,直接将其转入下一处理阶段
NGX_DECLINED 为请求调用下一个 phase handler(可能是本阶段的,也可能是下一个阶段的)
NGX_AGAIN/NGX_DONE 需对该请求再次调用当前 phase handler
NGX_OK * SATISFY_ALL: 为请求调用下一个 phase handler(可能是本阶段的,也可能是 下一个阶段的) * SATISFY_ANY: r->access_code 赋 0 值,并将请求转到下一处理阶段
NGX_HTTP_FORBIDDEN * SATISFY_ALL: 提前结束当前请求 * SATISFY_ANY: 将该错误码赋值给 r->access_code 变量,并为请求调用下一个 phase handler。
NGX_HTTP_UNAUTHORIZED 处理逻辑和 NGX_HTTP_FORBIDDEN 相同
其它 错误码是 NGX_ERROR 或者 NGX_HTTP_* 等等时,提前结束当前请求

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_auth_basic_module - limits access to resources by validating the user name and password using the “HTTP Basic Authentication” protocol.
  • ngx_http_access_module - limits access to certain client addresses.

POST_ACCESS

该阶段的 phase checker 函数为 ngx_http_core_post_access_phase 。该阶段属于 Nginx 内部流程, 不允 许模块注册 phase handler 。

该阶段检查 ACCESS 阶段的处理结果并执行不同逻辑:如果 r->access_code == NGX_HTTP_FORBIDDEN 则 提 前结束该请求处理(使用 NGX_HTTP_FORBIDDEN 作为 HTTP 响应码);如果 r->access_code 为其它 非 0 值,则提前结束该请求处理;如果 r->access_code == 0 值,为请求调用下一下
phase
handler
。

TRY_FILES (PRECONTENT)

NOTES:从 Nginx 1.13.4 开始,此阶段更名为 PRECONTENT ,并使用 ngx_http_core_generic_phase 作 为 phase checker 。try_files 指令功能由模块 ngx_http_try_files_module 提供。

1
2
3
4
5
6
7
8
9
10
复制代码commit 129b06dc5dfab7b4513a4f274b3778cd9b8a6a22
Author: Roman Arutyunyan <arut@nginx.com>
Date: Thu Jul 20 15:51:11 2017 +0300

Precontent phase.

The phase is added instead of the try_files phase. Unlike the old phase, the
new one supports registering multiple handlers. The try_files implementation is
moved to a separate ngx_http_try_files_module, which now registers a precontent
phase handler.

而在 Nginx 1.13.4 之前的版本,该阶段的 phase checker 函数为 ngx_http_core_try_files_phase 。 该阶段属于 Nginx 内部流程,不允许模块注册 phase handler 。

如果请求使用的 location {} 块未配置 try_files 指令,将该请求转入下一个 phase handler 。

如果请求使用的 location {} 中使用了 try_files 指令,那么继续检查该指令的参数:如果参数(最后 一个 参数除外)对应的磁盘静态文件存在,将静态文件内容返回给客户端;如果参数对应的磁盘静态文件都不存 在,使用函数 ngx_http_internal_redirect 将该请求重定向到 try_files 指令最后一个参数指定的 location 后, 重新处理该请求。

CONTENT

该阶段的 phase checker 函数为 ngx_http_core_content_phase 。各模块注册到该阶段的 phase
handler
可以使用如下返回值控制请求处理流程:

错误码 处理方式
r->content_handler 如果 location {} 配置有 content handler ,使用它处理请求,并忽略其它 phase handler
NGX_DECLINED 为请求调用下一个 phase handler。如果所有 phase handler 都已经被调用过后,则结束该 请求处理过程(使用响应码:NGX_HTTP_FORBIDDEN (请求 uri 以 ‘/‘ 结束) 或者 NGX_HTTP_NOT_FOUND (请求 uri 不以 ‘/‘ 结束))
其它 结束该请求处理流程(错误码作为 ngx_http_finalize_request 函数参数使用)

从上面的分析可以看到,请求的响应数据可以由 content handler 函数或者 phase handler 函数提供。 content handler 优先级比 phase handler 高,并且具有排他性。phase handler 可以看作是 CONTENT 阶段为请求提供的通用处理逻辑,而
content handler 是某个 location {} 块为请求提供 的 特殊处理逻辑。

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_random_index_module - processes requests ending with the slash character (‘/‘) and picks a random file in a directory to serve as an index file.
  • ngx_http_index_module - processes requests ending with the slash character (‘/‘).
  • ngx_http_autoindex_module - processes requests ending with the slash character (‘/‘) and produces a directory listings.
  • ngx_http_dav_module - intended for file management automation via the WebDAV protocol. The module processes HTTP and WebDAV methods PUT, DELETE, MKCOL, COPY, and MOVE.
  • ngx_http_gzip_static_module - allows sending precompressed files with the “.gz” filename extension instead of regular files.
  • ngx_http_static_module - 静态文件响应模块

Nginx 自带模块中提供了 content handler 的有:

  • ngx_http_fastcgi_module
  • ngx_http_scgi_module
  • ngx_http_memcached_module
  • ngx_http_proxy_module
  • ngx_http_stub_status_module
  • ngx_http_flv_module
  • ngx_http_mp4_module
  • ngx_http_empty_gif_module
  • ngx_http_perl_module
  • ngx_http_uwsgi_module

LOG

该阶段比较特殊,它并没有对应 phase checker ,该阶段的 phase handler 在请求处理结束时,由 ngx_http_log_request 函数直接调用。

Nginx 自带模块中在此阶段注册 phase handler 的模块有:

  • ngx_http_log_module

模块顺序

标准配置下,filter 模块的调用顺序(和模块初始化顺序相反)如下:

调用顺序 模块名 提供的 filter
1 ngx_http_not_modified_filter_module header
2 ngx_http_range_body_filter_module body
3 ngx_http_copy_filter_module body
4 ngx_http_headers_filter_module header
third party filter goes here
6 ngx_http_userid_filter_module header
7 ngx_http_ssi_filter_module header and body
8 ngx_http_charset_filter_module header and body
9 ngx_http_postpone_filter_module body
10 ngx_http_gzip_filter_module header and body
11 ngx_http_range_header_filter_module header
12 ngx_http_chunked_filter_module header and body
13 ngx_http_header_filter_module header
14 ngx_http_write_filter_module body

数据字段

ngx_connection_t::data

这个字段有多种用途,它得值随着 ngx_connection_t 的状态变化而代表不同含义:

  • 当该连接被回收放入 Nginx 的空闲连接池(单链表)时,该指针字段充当用于单链表节点的 next 指针:
  • 客户端和 Nginx 建立 TCP 连接之后,Nginx 开始读取并处理该 TCP 连接上的 HTTP 请求数据之前,该指针字 段指向 ngx_http_connection_t 类型的变量。该变量中保存该 HTTP 请求对应的虚拟主机、虚拟主机的配 置结构体、 SSL 主机名称等 「HTTP 连接」相关的信息。
  • 在随后的 HTTP 请求处理过程(请求数据接收,响应数据生成等)中,该字段始终指向该 TCP 连接上的当前活 跃请求 (由于HTTP Pipeline 技术、Nginx 子请求机制等等,当前 TCP 连接上可能会对应多个请求实例)。该 请求的响应数据可以立即发送给客户端。

ngx_connection_t::buffered

TODO: More explanation needed.

ngx_http_request_t::postponed

当该指针字段值不是 NULL 时,指向一个节点类型是 ngx_http_postponed_request_t 的单链表。这个单 链表的节点中包含该请求创建的子请求( ngx_http_postponed_request_t::request ),或者该请求产生的 暂时还不能发送的响应数据( ngx_http_postponed_request_t::out )。

当该指针字段值是 NULL 时,该请求无待处理子请求。

ngx_http_request_t::posted_requests

该字段指向一个节点类型是 ngx_http_posted_request_t 的单链表。这个单链表中保存着该请求(主请求) 上的「就绪」请求,这些请求可以随时被 Nginx 调度处理。

Nginx 只在 “主请求” 上维护着「就绪」请求链表,「就緒」请求可以是主请求、子请求、子子请求等等。函数 ngx_http_post_request 用于向该链表中添加「就緒」请求 ,函数 ngx_http_run_posted_requests 用 于调度「就緒」请求。

ngx_http_request_t::valid_location

TODO: More explanation needed.

当 rewrite 指令使用 break 修改了 r->uri 后,此标志位变量被置为 0;如果此请求被 ngx_http_internal_redirect 函数在 Nginx 内部重定向的话,这个标志位被重置为 1。

ngx_http_request_t::uri_changed

TODO: More explanation needed.

当 rewrite 指令使用了非 break 修改了 r->uri 后,此标志位变量被置为 1;

ngx_http_request_t::count

请求引用计数。关于它的作用,在《深入理解 Nginx:模块开发与架构解析》第 1 版的 11.8 节有过介绍:

1
2
3
4
5
6
7
8
复制代码> 在 HTTP 模块中每进行一类新的(异步)操作,包括为一个请求添加新的事件,或者把一些已经
> 由定时器、epoll 中移除事件重新加放其中,都需要把这个请求的引用计数加 1.这是因为需要让
> HTTP 框架知道,HTTP 模块对于该请求有独立的异步处理机制,将由该 HTTP 模块决定这个操作
> 什么时候结束,防止在这个操作还未结束时 HTTP 框架却把这个请求销毁了(如其它 HTTP 模块通
> 过调用 ngx_http_finalize_request 方法要求 HTTP 框架结束请求),异致请求出现不可知的
> 严重错误。这就要求每个操作在 “认为” 自身的动作结束时,都得最终调用
> ngx_http_close_request 方法,该方法会自动检查引用计数,当引用计数为 0 时才真正地销销
> 毁请求。

总结:要在当前请求执行流程中新增异步执行分支时,就需要为主请求增加一次引用计数。例如,需要通过「可读 事件」读取请求包体时;创建子请求时;

ngx_http_requeste_t::out

使用 ngx_chain_t 串起来的 ngx_buf_t 链表,用于保存待发送响应数据。

另外,Nginx 模块通过调用函数 ngx_http_output_filter 经过 output filter chain 发送响应数据。如果 响应数据已经被 output filter 处理过,但是因网络原因,没有完全发出,可以在连接的「可写事件」发生后, 通过 ngx_http_output_filter(r, NULL) 的调用形式继续发送。

ngx_event_t::active

Nginx 使用的事件模块是否正在管理该事件结构体。

ngx_event_t::ready

该事件结构体是否就绪(有未处理事件)。

函数调用

ngx_http_close_connection

该函数用于关闭连接。

函数签名:

1
2
复制代码void
ngx_http_close_connection(ngx_connect_t *c);

主要功能:

函数 ngx_close_connection 从事件模块中将该连接注册的事件(网络事件、超时事件)全部清理掉后, 关闭 底层 socket 描述符。同时,调用 ngx_free_connection 函数将连接的结构体存入单链表 ngx_cycle->free_connections 以便下次使用。

该函数对连接对应的 SSL 相关结构进行清理,关闭连接对应的内存池,然后调用 ngx_close_connection 函数 完成其它清理工作。

ngx_http_finalize_connection

该函数在请求正常处理完成后,调用 ngx_http_close_request 关闭请求(请求引用计数减一),并判断需要 关闭连 接或是使其保持连通(keepalive 连接)。

函数签名:

1
2
复制代码static void
ngx_http_finalize_connection(ngx_http_request_t *r);

ngx_http_termniate_request

强制清理并销毁请求。

函数签名:

1
2
复制代码static void
ngx_http_terminate_request(ngx_http_request_t *r, ngx_int_t rc);

主要功能:
该函数调用为在请求上注册的清理函数(cleanup handler),并强制将引用计数置 1,然后使用 nxx_http_close_request 函数销毁该请求。

ngx_http_free_request

该函数对请求进行清理和销毁。

函数签名:

1
2
复制代码void
ngx_http_free_request(ngx_http_request_t *r, ngx_int_t rc);

主要功能:
该函数调用为在请求上注册的清理函数(cleanup handler)、为该请求调用 LOG 阶段的 phase
handler
, 然后销毁请求内存池。

ngx_http_close_request

该函数关闭对请求的一次引用。

函数签名:

1
2
复制代码static void
ngx_http_close_request(ngx_http_request_t *r, ngx_int_t rc);

主要功能:
该函数将请求 r 所属的主请求( r->main )的引用计数( r->main->count )减 1。如果主请 求引用计数等于 0,调用 ngx_http_free_request 函数清理并销毁主请求,调用 ngx_http_close_connection 函数清理并销毁请求所用的连接。

ngx_http_finalize_request

该函数根据请求处理结果,决定请求接下来的处理流程。

函数签名:

1
2
复制代码void
ngx_http_finalize_request(ngx_http_request_t *r, ngx_int_t rc);

主要功能:
该函数功能比较复杂,逻辑分支较多。接下来,我们根据参数 rc 值和请求当前的状态,总结一下该函 数的主要分支流程和处理方式(使用源代码版本号:1.13.6):

顺序 分支条件 处理方式
函数开始
1 rc == NGX_DONE 请求当次动作已经处理完成,但是该请求上还有其它进行中的异步动作。调用 ngx_http_finalize_connection 函数将请求引用计数减一。 RETURN
2 rc == DECLINED content handler 无法处理该请求,将其交由 CONTENT 阶段的 phase handler 处理。 RETURN
3 rc 值是 NGX_ERROR , NGX_HTTP_REQUEST_TIMEOUT , NGX_HTTP_CLIENT_CLOSE_REQUEST 错误码之一,或者 c->error == 1 该请求所在的连接上发生了错误,调用 ngx_http_terminate_request 强制清理 销毁该请求。 RETURN
4 rc >= NGX_HTTP_SPECIAL_RESPONSE 或者 rc 值是 NGX_HTTP_CREATED , NGX_HTTP_NO_CONTENT 中的一个 如果 rc 值是 NGX_HTTP_CLOSE (大于 NGX_HTTP_SPECIAL_RESPONSE ),立即调 用 ngx_http_terminate_request 销毁请求;其它情况调用函数 ngx_http_special_response_handler 重新生成此请求的响应数据。这时需要再次 调用函数 ngx_http_finalize_request 处理 ngx_http_special_response_handler 的返回值。RETURN
请求响应发送完毕、请求响应数据未发送完毕、请求是子请求 等等正常业务流程继续向下进行分支处理
5 r != r->main 当前请求是「子请求」时,函数继续以下处理流程:1. 如果该子请求是「后台子请求」,Nginx 认为此时该子请求已经处理完成。 置其 r->done 值为 1,随后调用函数 ngx_http_finalize_connection 将其占 用的引用计数清除。RETURN * 「后台子请求」并不参与响应生成流程,也就不会影响主请求的响应时长。但它 会保持对主请求的引用。从 Nginx 目前实现来看,只能对这类请求调用一次 ngx_http_finalize_request 。 2. 如果该子请求非「后台子请求」,并且响应数据未完全发送( r->buffered == 1 ) 或者该子请求创建的子请求未全部完成( r->postponed != NULL ),使用函数 ngx_http_set_write_handler 为其注册写事件处理函数 ngx_http_writer 待有事件发生时继续处理该请求。RETURN * 如果因为 socket 发送缓存区满,数据未能完全发送,也就是 r->buffered == 1 时,那么缓存区一旦空闲,那么「可写事件」会再次触发该子请求的处理逻辑; * 如果因为该子请求非「连接活跃请求」,也就是 r->postponed != NULL 时,事 件循环并不会重复抛出「可写事件」造成该处逻辑重复执行。 3. 对于己处理完成的普通子请求,如果它是「连接活跃请求」( c->data == r ), 那么将其标记为己完成( r->done = 1 ),并将其父请求( r->parent )设为 「连接活跃请求」。随后调用函数 ngx_http_post_request 将父请求添加到主 请求的 posted_request 链表中,以便由函数 ngx_http_run_posted_requests 调用其写回调函数 write_event_handler ,启动其处理流程。RETURN
6 r == r->main 当前请求是「主请求」,函数继续以下处理流程:1. 如果请求数据未发送完毕( r->buffered
函数结束

ngx_event_t::handler

事件循环直接调用的事件回调函数。

函数签名:

1
复制代码typedef void (*ngx_event_handler_pt)(ngx_event_t *ev);

针对 Nginx HTTP 模块来说,Nginx 接入客户端连接后,为此连接分配 ngx_connection_t 结构体。该结构体 含有 对应「可读事件」和「可写事件」的两个 ngx_event_t 类型成员 read 和 write ,这两个成 员的回调函数 会在事件发生时被调用。下面我们简要列出随着请求状态改变,这两个回调函数的值变化过程:

  1. Nginx 调用函数 ngx_event_accept 接入连接,并为其分配 ngx_connection_t 结构:
1
2
3
4
5
6
7
8
9
复制代码void
ngx_event_accept(ngx_event_t *ev)
{
...
c = ngx_get_connection(s, ev->log);
...
ls->handler(c);
...
}
  1. 随后,Nginx 调用函数 ngx_http_init_connection 设置「读事件」回调函数,为接收请求数据做准备:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码void
ngx_http_init_connection(ngx_connection_t *c)
{
...
rev = c->read;
rev->handler = ngx_http_wait_request_handler;
c->write->handler = ngx_http_empty_handler;
...
if (rev->ready) {
...
rev->handler(rev);
return;
}
...
if (ngx_handle_read_event(rev, 0) != NGX_OK) {
...
}
}
  1. 函数 ngx_http_wait_request_handler 从连接读取部分数据,然后创建请求结构体 ngx_request_t 。然后 调整「可读事件」回调函数为 ngx_http_process_request_line ,用于接收和分析请求 status line。这期间 如果已接收数据不足构成完整 status line 的话,Nginx 会为该连接注册「可读事件」,等新 数据到达后,继续向下 执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码void
ngx_http_wait_request_handler(ngx_event_t *rev)
{
...
c->data = ngx_http_create_request(c);
rev->handler = ngx_http_process_request_line;
ngx_http_process_request_line(rev);
}

void
ngx_http_process_request_line(ngx_event_t *rev)
{
...
for (;;) {
if (rc == NGX_AGAIN) {
n = ngx_http_read_request_header(r);
if (n == NGX_AGAIN || n == NGX_ERROR) {
return;
}
...
}
...
rev->handler = ngx_http_process_request_headers;
ngx_http_process_request_headers(rev);
}

}
  1. 请求 status line 处理完毕后,Nginx 调整「读事件」函数 ngx_http_process_request_headers ,用于 接收 和分析请求 headers。这期间如果已接收数据不足构成完整 header 的话,Nginx 会为该连接注册「可读 事件」,等新数据到达后,继续向下执行。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码void
ngx_http_process_request_headers(ngx_event_t *ev)
{
...
n = ngx_http_read_request_header(r);
if (n == NGX_AGAIN || n == NGX_ERROR) {
return;
}
...
if (rc == NGX_HTTP_PARSE_HEADER_DONE) {
...
ngx_http_process_request(r);
return;
}
...
}
  1. 请求包头数据接收并解析完成后,调用函数 ngx_http_process_request 开启处理请求,生成响应数据的 流程。 此时,Nginx 才为连接设定有效的「可写事件」回调函数。
* 接下来,「可读事件」和「可写事件」均由回调函数 ngx\_http\_request\_handler 处理,它会根据事件 类型 调用请求结构体 ngx\_request\_t 中 read\_event\_handler 和 write\_event\_handler 成员 指向的函数。
* 由于 Nginx 已经获取了所需要的请求数据,所以再有「可读事件」发生时,它使用函数 ngx\_http\_blocking\_reading 暂时屏蔽该事件。
* 在本步之前,并不需要处理连接上的「可写事件」,而此后,我们就需要使用此事件驱动请求完成 PHASE 处 理流程了( ngx\_http\_core\_run\_phases )。
* 同时,每次事件发生时,Nginx 还会调用函数 ngx\_http\_run\_posted\_requests 触发「就绪」 (*posted* , 有 “张贴”、“发布” 的意思,也就是说这类请求已经处于就绪状态,等待被调度处理)请求 的处理流程。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码void
ngx_http_process_request(ngx_event_t *ev)
{
...
c->read->handler = ngx_http_request_handler
c->write->handler = ngx_http_request_handler;
r->read_event_handler = ngx_http_block_reading;

ngx_http_handler(r);
ngx_http_run_posted_requests(c);
}

void
ngx_http_handler(ngx_http_request_t *r)
{
...
r->write_event_handler = ngx_http_core_run_phases;
ngx_http_core_run_phases(r);
}

void
ngx_http_request_handler(ngx_event_t *ev)
{
...
if (ev->write) {
r->write_event_handler(r);
} else {
r->read_event_handler(r);
}

ngx_http_run_posted_requests(c);
}
  1. 从此以后,该请求的「可读事件」和「可写事件」回调函数基本就不再变化了。请求在不同阶段的处理流程由 ngx_request_t::read_event_handler 和 write_event_handler 这两个函数决定。

ngx_request_t::read_event_handler 和 write_event_handler

请求数据接收完毕后,请求处理进入响应生成和响应发送阶段。这两个函数用于在不同处理阶段驱动请求的处理流 程。

1
2
3
4
5
6
7
8
9
10
11
复制代码write_event_handler:
ngx_http_core_run_phases
ngx_http_writer
ngx_http_request_finalizer
ngx_http_terminate_handler

read_event_handler:
ngx_http_block_reading
ngx_http_read_client_request_body_handler
ngx_http_discarded_request_body_handler
ngx_http_upstream_process_header

ngx_http_read_client_request_body

读取请求包体,并将其存到 ngx_http_request_t::request_body 成员中。

该函数会增加主请求引用计数,原因如下:

Nginx 调用函数 ngx_http_read_client_request_body 读取请求包体时,可能需要通过事件处理机制监听连 接上的「可读事件」,使用读回调函数继续读取数据。而请求的其它流程处理完「可写事件」过程中,如果因为 正常流程或者处理异常需要销毁请求时,在没有引用计数的保护下,会造成「可读事件」流程使用的请求失效, 给整个进程带来严重后果。

ngx_http_send_header 和 ngx_http_output_filter

用于开始发送响应包头和包体。 ngx_http_send_header 只能调用一次,而 ngx_http_output_filter 可 以多次调用。但是每次 ngx_http_output_filter 需要发送响应包体的不同部分,上次未发出的响应数据会暂 存到 ngx_http_request_t 中。调用 ngx_http_output_filter(r, NULL) 可再次尝试发送响应数据。

ngx_handle_read_event 和 ngx_handle_write_event

向事件循环注册 「可读事件」和「可写事件」,并确保不会重复注册事件(如果 ngx_event_t 对应的事件正 在被事件循环监听的话,该函数直接返回)。

请求变量

$request_body

It contains the body of the request. 这个变量当且仅当 Nginx 读取了请求包体,并且请求包体没有被写入临 时文件时,才能从它得到包体数据。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Elasticsearch Java API 索引的增删改查

发表于 2017-11-16

Elasticsearch Java API - 客户端连接(TransportClient,PreBuiltXPackTransportClient)(一)

本节介绍以下 CRUD API:

单文档 APIs

  • Index API
  • Get API
  • Delete API
  • Delete By Query API
  • Update API

多文档 APIs

  • Multi Get API
  • Bulk API
  • Using Bulk Processor

Multi Get API Bulk API

注意:所有的单文档的CRUD API,index参数只能接受单一的索引库名称,或者是一个指向单一索引库的alias。

Index API

Index API 允许我们存储一个JSON格式的文档,使数据可以被搜索。文档通过index、type、id唯一确定。我们可以自己提供一个id,或者也使用Index API 为我们自动生成一个。

这里有几种不同的方式来产生JSON格式的文档(document):

  • 手动方式,使用原生的byte[]或者String
  • 使用Map方式,会自动转换成与之等价的JSON
  • 使用第三方库来序列化beans,如Jackson
  • 使用内置的帮助类 XContentFactory.jsonBuilder()

手动方式

数据格式

1
2
3
4
5
复制代码String json = "{" +
"\"user\":\"kimchy\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"trying out Elasticsearch\"" +
"}";
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码/**  
* 手动生成JSON
*/
@Test
public void CreateJSON(){

String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

Map方式

Map是key:value数据类型,可以代表json结构.

1
2
3
4
复制代码Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate",new Date());
json.put("message","trying out Elasticsearch");
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码 /**  
* 使用集合
*/
@Test
public void CreateList(){

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

序列化方式

ElasticSearch已经使用了jackson,可以直接使用它把javabean转为json.

1
2
3
4
5
6
7
复制代码import com.fasterxml.jackson.databind.*;

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(yourbeaninstance);
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码/**  
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{

CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(csdn);

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}

XContentBuilder帮助类方式

ElasticSearch提供了一个内置的帮助类XContentBuilder来产生JSON文档

1
2
3
4
5
6
7
8
9
10
复制代码// Index name
String _index = response.getIndex();
// Type name
String _type = response.getType();
// Document ID (generated or not)
String _id = response.getId();
// Version (if it's the first time you index this document, you will get: 1)
long _version = response.getVersion();
// status has stored current instance statement.
RestStatus status = response.status();
实例
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码/**  
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{

XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();

IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");


}

综合实例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
复制代码 
import java.io.IOException;
import java.net.InetAddress;
import java.net.UnknownHostException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.transport.TransportClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.transport.InetSocketTransportAddress;
import org.elasticsearch.common.xcontent.XContentBuilder;
import org.elasticsearch.common.xcontent.XContentFactory;
import org.elasticsearch.transport.client.PreBuiltTransportClient;
import org.junit.Before;
import org.junit.Test;

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;

public class CreateIndex {

private TransportClient client;

@Before
public void getClient() throws Exception{
//设置集群名称
Settings settings = Settings.builder().put("cluster.name", "my-application").build();// 集群名
//创建client
client = new PreBuiltTransportClient(settings)
.addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName("127.0.0.1"), 9300));
}

/**
* 手动生成JSON
*/
@Test
public void CreateJSON(){

String json = "{" +
"\"user\":\"fendo\"," +
"\"postDate\":\"2013-01-30\"," +
"\"message\":\"Hell word\"" +
"}";

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}


/**
* 使用集合
*/
@Test
public void CreateList(){

Map<String, Object> json = new HashMap<String, Object>();
json.put("user","kimchy");
json.put("postDate","2013-01-30");
json.put("message","trying out Elasticsearch");

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());

}

/**
* 使用JACKSON序列化
* @throws Exception
*/
@Test
public void CreateJACKSON() throws Exception{

CsdnBlog csdn=new CsdnBlog();
csdn.setAuthor("fendo");
csdn.setContent("这是JAVA书籍");
csdn.setTag("C");
csdn.setView("100");
csdn.setTitile("编程");
csdn.setDate(new Date().toString());

// instance a json mapper
ObjectMapper mapper = new ObjectMapper(); // create once, reuse

// generate json
byte[] json = mapper.writeValueAsBytes(csdn);

IndexResponse response = client.prepareIndex("fendo", "fendodate")
.setSource(json)
.get();
System.out.println(response.getResult());
}

/**
* 使用ElasticSearch 帮助类
* @throws IOException
*/
@Test
public void CreateXContentBuilder() throws IOException{

XContentBuilder builder = XContentFactory.jsonBuilder()
.startObject()
.field("user", "ccse")
.field("postDate", new Date())
.field("message", "this is Elasticsearch")
.endObject();

IndexResponse response = client.prepareIndex("fendo", "fendodata").setSource(builder).get();
System.out.println("创建成功!");


}

}

你还可以通过startArray(string)和endArray()方法添加数组。.field()方法可以接受多种对象类型。你可以给它传递数字、日期、甚至其他XContentBuilder对象。

Get API

根据id查看文档:

1
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1").get();

更多请查看 rest get API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false :

1
2
3
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();

Delete API

根据ID删除:

1
复制代码DeleteResponse response = client.prepareDelete("twitter", "tweet", "1").get();

更多请查看 delete API 文档

配置线程

operationThreaded 设置为 true 是在不同的线程里执行此次操作

下面的例子是operationThreaded 设置为 false :

1
2
3
复制代码GetResponse response = client.prepareGet("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();
1
2
3
复制代码DeleteResponse response = client.prepareDelete("twitter", "tweet", "1")
.setOperationThreaded(false)
.get();

Delete By Query API

通过查询条件删除

1
2
3
4
5
6
7
复制代码BulkByScrollResponse response =
DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询条件
.source("persons") //index(索引名)
.get(); //执行

long deleted = response.getDeleted(); //删除文档的数量

如果需要执行的时间比较长,可以使用异步的方式处理,结果在回调里面获取

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码DeleteByQueryAction.INSTANCE.newRequestBuilder(client)
.filter(QueryBuilders.matchQuery("gender", "male")) //查询
.source("persons") //index(索引名)
.execute(new ActionListener<BulkByScrollResponse>() { //回调监听
@Override
public void onResponse(BulkByScrollResponse response) {
long deleted = response.getDeleted(); //删除文档的数量
}
@Override
public void onFailure(Exception e) {
// Handle the exception
}
});

Update API

有两种方式更新索引:

  • 创建 UpdateRequest,通过client发送;
  • 使用 prepareUpdate() 方法;

使用UpdateRequest

1
2
3
4
5
6
7
8
9
复制代码UpdateRequest updateRequest = new UpdateRequest();
updateRequest.index("index");
updateRequest.type("type");
updateRequest.id("1");
updateRequest.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();

使用 prepareUpdate() 方法

这里官方的示例有问题,new Script()参数错误,所以一下代码是我自己写的(2017/11/10)

1
2
3
4
5
6
7
8
9
10
复制代码client.prepareUpdate("ttl", "doc", "1")
.setScript(new Script("ctx._source.gender = \"male\"" ,ScriptService.ScriptType.INLINE, null, null))//脚本可以是本地文件存储的,如果使用文件存储的脚本,需要设置 ScriptService.ScriptType.FILE
.get();

client.prepareUpdate("ttl", "doc", "1")
.setDoc(jsonBuilder() //合并到现有文档
.startObject()
.field("gender", "male")
.endObject())
.get();

Update by script

使用脚本更新文档

1
2
3
复制代码UpdateRequest updateRequest = new UpdateRequest("ttl", "doc", "1")
.script(new Script("ctx._source.gender = \"male\""));
client.update(updateRequest).get();

Update by merging documents

合并文档

1
2
3
4
5
6
复制代码UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject());
client.update(updateRequest).get();

Upsert

更新插入,如果存在文档就更新,如果不存在就插入

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码IndexRequest indexRequest = new IndexRequest("index", "type", "1")
.source(jsonBuilder()
.startObject()
.field("name", "Joe Smith")
.field("gender", "male")
.endObject());
UpdateRequest updateRequest = new UpdateRequest("index", "type", "1")
.doc(jsonBuilder()
.startObject()
.field("gender", "male")
.endObject())
.upsert(indexRequest); //如果不存在此文档 ,就增加 `indexRequest`
client.update(updateRequest).get();

如果 index/type/1 存在,类似下面的文档:

1
2
3
4
复制代码{
"name" : "Joe Dalton",
"gender": "male"
}

如果不存在,会插入新的文档:

1
2
3
4
复制代码{
"name" : "Joe Smith",
"gender": "male"
}

Multi Get API

一次获取多个文档

1
2
3
4
5
6
7
8
9
10
11
12
复制代码MultiGetResponse multiGetItemResponses = client.prepareMultiGet()
.add("twitter", "tweet", "1") //一个id的方式
.add("twitter", "tweet", "2", "3", "4") //多个id的方式
.add("another", "type", "foo") //可以从另外一个索引获取
.get();

for (MultiGetItemResponse itemResponse : multiGetItemResponses) { //迭代返回值
GetResponse response = itemResponse.getResponse();
if (response.isExists()) { //判断是否存在
String json = response.getSourceAsString(); //_source 字段
}
}

更多请浏览REST multi get 文档

Bulk API

Bulk API,批量插入:

1
复制代码import static org.elasticsearch.common.xcontent.XContentFactory.*;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码BulkRequestBuilder bulkRequest = client.prepareBulk();

// either use client#prepare, or use Requests# to directly build index/delete requests
bulkRequest.add(client.prepareIndex("twitter", "tweet", "1")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "trying out Elasticsearch")
.endObject()
)
);

bulkRequest.add(client.prepareIndex("twitter", "tweet", "2")
.setSource(jsonBuilder()
.startObject()
.field("user", "kimchy")
.field("postDate", new Date())
.field("message", "another post")
.endObject()
)
);

BulkResponse bulkResponse = bulkRequest.get();
if (bulkResponse.hasFailures()) {
// process failures by iterating through each bulk response item
//处理失败
}

使用 Bulk Processor

BulkProcessor 提供了一个简单的接口,在给定的大小数量上定时批量自动请求

创建BulkProcessor实例

首先创建BulkProcessor实例

1
2
3
4
5
复制代码import org.elasticsearch.action.bulk.BackoffPolicy;
import org.elasticsearch.action.bulk.BulkProcessor;
import org.elasticsearch.common.unit.ByteSizeUnit;
import org.elasticsearch.common.unit.ByteSizeValue;
import org.elasticsearch.common.unit.TimeValue;
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码BulkProcessor bulkProcessor = BulkProcessor.builder(
client, //增加elasticsearch客户端
new BulkProcessor.Listener() {
@Override
public void beforeBulk(long executionId,
BulkRequest request) { ... } //调用bulk之前执行 ,例如你可以通过request.numberOfActions()方法知道numberOfActions

@Override
public void afterBulk(long executionId,
BulkRequest request,
BulkResponse response) { ... } //调用bulk之后执行 ,例如你可以通过request.hasFailures()方法知道是否执行失败

@Override
public void afterBulk(long executionId,
BulkRequest request,
Throwable failure) { ... } //调用失败抛 Throwable
})
.setBulkActions(10000) //每次10000请求
.setBulkSize(new ByteSizeValue(5, ByteSizeUnit.MB)) //拆成5mb一块
.setFlushInterval(TimeValue.timeValueSeconds(5)) //无论请求数量多少,每5秒钟请求一次。
.setConcurrentRequests(1) //设置并发请求的数量。值为0意味着只允许执行一个请求。值为1意味着允许1并发请求。
.setBackoffPolicy(
BackoffPolicy.exponentialBackoff(TimeValue.timeValueMillis(100), 3))//设置自定义重复请求机制,最开始等待100毫秒,之后成倍更加,重试3次,当一次或多次重复请求失败后因为计算资源不够抛出 EsRejectedExecutionException 异常,可以通过BackoffPolicy.noBackoff()方法关闭重试机制
.build();

BulkProcessor 默认设置

  • bulkActions 1000
  • bulkSize 5mb
  • 不设置flushInterval
  • concurrentRequests 为 1 ,异步执行
  • backoffPolicy 重试 8次,等待50毫秒

增加requests

然后增加requests到BulkProcessor

1
2
复制代码bulkProcessor.add(new IndexRequest("twitter", "tweet", "1").source(/* your doc here */));
bulkProcessor.add(new DeleteRequest("twitter", "tweet", "2"));

关闭 Bulk Processor

当所有文档都处理完成,使用awaitClose 或 close 方法关闭BulkProcessor:

1
复制代码bulkProcessor.awaitClose(10, TimeUnit.MINUTES);

或

1
复制代码bulkProcessor.close();

在测试中使用Bulk Processor

如果你在测试种使用Bulk Processor可以执行同步方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码BulkProcessor bulkProcessor = BulkProcessor.builder(client, new BulkProcessor.Listener() { /* Listener methods */ })
.setBulkActions(10000)
.setConcurrentRequests(0)
.build();

// Add your requests
bulkProcessor.add(/* Your requests */);

// Flush any remaining requests
bulkProcessor.flush();

// Or close the bulkProcessor if you don't need it anymore
bulkProcessor.close();

// Refresh your indices
client.admin().indices().prepareRefresh().get();

// Now you can start searching!
client.prepareSearch().get();

所有实例 已经上传到Git

更多请浏览 spring-boot-starter-es 开源项目

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot的自动配置、Command-line

发表于 2017-11-16

接下来关于SpringBoot的一系列文章和例子,都来自《Spring Boot Cookbook》这本书,本文的主要内容是start.spring.io的使用、Spring Boot的自动配置以及CommandRunner的角色和应用场景。

start.spring.io的使用

首先带你浏览start.spring.io/,在这个网址中有一些Spring Boot提供的组件,然后会给你展示如何让你的Spring工程变得“Bootiful”,我称之为“Boot化”。

在网站Spring Initializr上填对应的表单,描述Spring Boot项目的主要信息,例如Project Metadata、Dependency等。在Project Dependencies区域,你可以根据应用程序的功能需要选择相应的starter。

Spring Boot starters可以简化Spring项目的库依赖管理,将某一特定功能所需要的依赖库都整合在一起,就形成一个starter,例如:连接数据库、springmvc、spring测试框架等等。简单来说,spring boot使得你的pom文件从此变得很清爽且易于管理。

常用的starter以及用处可以列举如下:

  • spring-boot-starter: 这是核心Spring Boot starter,提供了大部分基础功能,其他starter都依赖于它,因此没有必要显式定义它。
  • spring-boot-starter-actuator:主要提供监控、管理和审查应用程序的功能。
  • spring-boot-starter-jdbc:该starter提供对JDBC操作的支持,包括连接数据库、操作数据库,以及管理数据库连接等等。
  • spring-boot-starter-data-jpa:JPA starter提供使用Java Persistence API(例如Hibernate等)的依赖库。
  • spring-boot-starter-data-*:提供对MongoDB、Data-Rest或者Solr的支持。
  • spring-boot-starter-security:提供所有Spring-security的依赖库。
  • spring-boot-starter-test:这个starter包括了spring-test依赖以及其他测试框架,例如JUnit和Mockito等等。
  • spring-boot-starter-web:该starter包括web应用程序的依赖库。

实践

首先我们要通过start.spring.io创建一个图书目录管理程序,它会记录出版图书的记录,包括作者、审阅人、出版社等等。我们将这个项目命名为BookPub,具体的操作步骤如下:

点击“Switch to the full version.”,展示完整页面;

  • Group设置为:site.javadu;
  • Artifact设置为:bookpub;
  • Name设置为:BookPub;
  • Package Name设置为:site.javadu.bookpub;
  • Packaging代表打包方式,我们选jar;
  • Spring Boot Version选择最新的1.3.0;
  • 创建Maven工程,当然,对Gradle比较熟悉的同学可以选择Gradle工程。
  • 点击“Generate Project”下载工程包。

利用IDEA导入下载的工程,可以看到pom文件的主体如下如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
复制代码<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>site.javadu</groupId>
<artifactId>bookpub</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>BookPub</name>
<description>Demo project for Spring Boot</description>

<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.0.0.M6</version>
<relativePath/> <!-- lookup parent from repository -->
</parent>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
<java.version>1.8</java.version>
</properties>

<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-jpa</artifactId>
</dependency>

<dependency>
<groupId>com.h2database</groupId>
<artifactId>h2</artifactId>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>

<repositories>
<repository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</repository>
<repository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>spring-snapshots</id>
<name>Spring Snapshots</name>
<url>https://repo.spring.io/snapshot</url>
<snapshots>
<enabled>true</enabled>
</snapshots>
</pluginRepository>
<pluginRepository>
<id>spring-milestones</id>
<name>Spring Milestones</name>
<url>https://repo.spring.io/milestone</url>
<snapshots>
<enabled>false</enabled>
</snapshots>
</pluginRepository>
</pluginRepositories>
</project>

Spring Boot的自动配置

在Spring Boot项目中,xxxApplication.java会作为应用程序的入口,负责程序启动以及一些基础性的工作。@SpringBootApplication是这个注解是该应用程序入口的标志,然后有熟悉的main函数,通过SpringApplication.run(xxxApplication.class, args)来运行Spring Boot应用。打开SpringBootApplication注解可以发现,它是由其他几个类组合而成的:@Configuration(等同于spring中的xml配置文件,使用Java文件做配置可以检查类型安全)、@EnableAutoConfiguration(自动配置,稍后细讲)、@ComponentScan(组件扫描,大家非常熟悉的,可以自动发现和装配一些Bean)。

我们在pom文件里可以看到,com.h2database这个库起作用的范围是runtime,也就是说,当应用程序启动时,如果Spring Boot在classpath下检测到org.h2.Driver的存在,会自动配置H2数据库连接。现在启动应用程序来观察,以验证我们的想法。打开shell,进入项目文件夹,利用mvn spring-boot:run启动应用程序,如下图所示。

2017-11-1520.59.01.png

可以看到类似No active profile set, falling back to default profiles: default、Building JPA container EntityManagerFactory for persistence unit ‘default、HHH000412: Hibernate Core {5.2.12.Final}、HHH000400: Using dialect: org.hibernate.dialect.H2Dialect这些信息;我们在pom文件里选择了jdbc和jpa等starters,Spring
Boot将自动创建JPA容器,并使用Hibernate4.3.11,使用H2Dialect管理H2数据库(内存数据库)。

使用Command-line runners

有时候需要在应用启动后立马执行一些任务,进行上下文的初始化、测试数据的灌入等等。这时候我们可以使用SpringBoot提供的CommandLineRunner接口。我们新建一个StartupRunner类,该类实现CommandLineRunner接口,这个接口只有一个函数:public void run(String... args),而且这个方法会在应用程序启动后首先被调用。

实践

  • 在src/main/java/site/javadu/bookpub/下建立StartRunner类,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码package site.javadu.bookpub;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.CommandLineRunner;

/**
* 作用: 启动器
* User: duqi
* Date: 2017/11/15
* Time: 21:09
*/
public class StarterRunner implements CommandLineRunner {

private static final Logger logger = LoggerFactory.getLogger(StarterRunner.class);

@Override
public void run(String... args) throws Exception {
logger.info("hello word");
}
}
  • 在BookPubApplication类中创建bean对象,代码如下:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码package site.javadu.bookpub;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Bean;

@SpringBootApplication
public class BookPubApplication {

public static void main(String[] args) {
SpringApplication.run(BookPubApplication.class, args);
}

/**
* 在spring容器中定义StarterRunner对应的bean
* @return
*/
@Bean
public StarterRunner starterRunner() {
return new StarterRunner();
}
}

还是用mvn spring-boot:run命令启动程序,可以看到hello的输出,如下图所示。

StartRunnerHelloWord.png

对于那种只需要在应用程序启动时执行一次的任务,非常适合利用Command line runners来完成。Spring Boot应用程序在启动后,会遍历CommandLineRunner接口的实例并运行它们的run方法。也可以利用@Order注解(或者实现Order接口)来规定所有CommandLineRunner实例的运行顺序。

利用command-line runner的这个特性,再配合依赖注入,可以在应用程序启动时后首先引入一些依赖bean,例如data source、rpc服务或者其他模块等等,这些对象的初始化可以放在run方法中。不过,需要注意的是,在run方法中执行初始化动作的时候一旦遇到任何异常,都会使得应用程序停止运行,因此最好利用try/catch语句处理可能遇到的异常。

参考资料 & 源码

  1. 《Spring Boot Cookbook》
  2. bookpub

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

自己实现蝇量级Java MVC框架

发表于 2017-11-16

Tiny Java MVC Framework.

Requirements

  • Java8+

Dependency

Apache Maven

1
2
3
4
5
复制代码`<dependency>`
<groupId>com.nosuchfield</groupId>
<artifactId>geisha</artifactId>
<version>1.0.0-RELEASE</version>
</dependency>

Apache Buildr

1
复制代码`'com.nosuchfield:geisha:jar:1.0.0-RELEASE'`

Apache Ivy

1
复制代码`<dependency org="com.nosuchfield" name="geisha" rev="1.0.0-RELEASE" />`

Groovy Grape

1
2
3
复制代码`@Grapes(` 
@Grab(group='com.nosuchfield', module='geisha', version='1.0.0-RELEASE')
)

Gradle/Grails

1
复制代码`compile 'com.nosuchfield:geisha:1.0.0-RELEASE'`

Scala SBT

1
复制代码`libraryDependencies += "com.nosuchfield" % "geisha" % "1.0.0-RELEASE"`

Leiningen

1
复制代码`[com.nosuchfield/geisha "1.0.0-RELEASE"]`

Example

1
2
3
4
5
6
7
8
9
10
复制代码@Component
@RequestMapping("/person")
public class Hello {

@RequestMapping("/info")
public String hello(@Param("name") String name, @Param("age") String age) {
return "hello " + name + ", your age is " + Integer.valueOf(age);
}

}
1
2
3
4
5
6
7
复制代码public class Application {

public static void main(String[] args) {
Geisha.run();
}

}

Run Application and visit http://127.0.0.1:5200/person/info?name=张三&age=18

Result:

1
复制代码`hello 张三, your age is 18`

License GPL

Project License can be found here.

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java 设计模式之装饰模式(八)

发表于 2017-11-16

一、前言

本篇主题为结构型模式中的第三个模式–装饰模式。上篇 Java 设计模式主题为《Java 设计模式之桥接模式(七)》。

二、简单介绍

# 2.1 定义

装饰(Decorator)模式又叫做包装模式,其功能是动态地给一个对象添加一些额外的职责。就增加功能来说,装饰模式相比生成子类更为灵活,是继承关系的一个替换方案。

# 2.2 参与角色

  1. Component:定义一个对象接口,可以给这些对象动态地添加职责。
  2. ConcreteComponent:定义一个对象,可以给这个对象添加一些职责。
  3. Decorator:维持一个指向 Component 对象的指针,并定义一个与 Component 接口一致的接口。
  4. ConcreteDecorator:向组件添加职责。

# 2.3 应用场景

  1. 在不影响其他对象的情况下,以动态、透明的方式给单个对象添加职责。
  2. 当不能采用生成子类的方法进行扩充时。

三、实现方式

我们以人的打扮为例。人打扮需要穿衣,穿裤,穿鞋子。代码表示如下:

Person 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
复制代码public class Person {



private String name;



public Person(String name) {

this.name = name;

}



public void putOnClothes() {

System.out.println(this.name + "穿衣服");

}



public void putOnTrousers() {

System.out.println(this.name + "穿裤子");

}



public void putOnShoes() {

System.out.println(this.name + "穿鞋子");

}

}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码public class Client {



public static void main(String[] args) {



Person person = new Person("小白");



person.putOnClothes();

person.putOnTrousers();

person.putOnShoes();

}

}

打印:

1
2
3
4
5
复制代码小白穿衣服

小白穿裤子

小白穿鞋子

上述代码很简单,但是扩展性不好。当我们需要添加打领带、戴手表的行为时,需要修改 Person 类,违背了开放封闭原则。

因此,我们需要将人和打扮的行为抽离出来:

Person 类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码public class Person {



private String name;



public Person(String name) {

this.name = name;

}



public String getName() {

return name;

}



}

打扮类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
复制代码public abstract class DressUp {



public abstract void dressup(Person person);

}





class ClothesDressUp extends DressUp {



@Override

public void dressup(Person person) {

System.out.println(person.getName() + "穿衣服");

}



}



class TrousersDressUp extends DressUp {



@Override

public void dressup(Person person) {

System.out.println(person.getName() + "穿裤子");

}



}



class ShoesDressUp extends DressUp {



@Override

public void dressup(Person person) {

System.out.println(person.getName() + "穿鞋子");

}



}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码public class Client {



public static void main(String[] args) {

Person person = new Person("小白");



DressUp du1 = new ClothesDressUp();

du1.dressup(person);



DressUp du2 = new TrousersDressUp();

du2.dressup(person);



DressUp du3 = new ShoesDressUp();

du3.dressup(person);

}

}

执行结果与上文的一致。现在,当我们添加新的打扮行为时,只需新增 DressUp 的子类即可。

但是,上边的代码没有封装性,每打扮一次都要调用 dressup 方法一次,就感觉人是光着身在公共场合进行打扮穿衣、穿鞋。因此,我们需要一种模式将这些打扮的细节封装起来,就像建造者模式一样。

不过,此次的需求不能使用建造者模式。因为建造者模式封装过程/细节是一个固定的顺序/模式,而当前需求是人的打扮,打扮的行为是多种多样的,如:穿衣穿裤、穿衣打领带、穿鞋戴手表等。

这样就引出了本章的主题–装饰模式:

Person 接口与实现类(Component 和 ConcreteComponent):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码public interface Person {



public void decorate();

}



class Man implements Person {



@Override

public void decorate() {

System.out.println("男人打扮");

}



}

装饰类(Decorator 和 ConcreteDecorator):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
复制代码public class Decorator implements Person {



private Person person;



public Decorator(Person person) {

this.person = person;

}



@Override

public void decorate() {

this.person.decorate();

}



}



class ClothesDecorator extends Decorator {



public ClothesDecorator(Person person) {

super(person);

}



public void decorate() {

super.decorate();

System.out.println("穿衣服");

}

}



class TrousersDecorator extends Decorator {



public TrousersDecorator(Person person) {

super(person);

}



public void decorate() {

super.decorate();

System.out.println("穿裤子");

}

}



class ShoesDecorator extends Decorator {



public ShoesDecorator(Person person) {

super(person);

}



public void decorate() {

super.decorate();

System.out.println("穿鞋子");

}

}

客户端:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
复制代码public class Client {



public static void main(String[] args) {



Person person = new Man();



Person decorator = new Decorator(person);



System.out.println("======第一种打扮=======");



ClothesDecorator cd = new ClothesDecorator(decorator);



TrousersDecorator td = new TrousersDecorator(cd);



td.decorate();



System.out.println("======第二种打扮=======");



ShoesDecorator sd = new ShoesDecorator(person);



sd.decorate();

}

}

打印:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码======第一种打扮=======

男人打扮

穿衣服

穿裤子

======第二种打扮=======

男人打扮

穿鞋子

总结:装饰模式有效地把类的核心职责和装饰功能区分开来,而且去除了相关类的重复的装饰逻辑。

UML 类图表示如下:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

微服务MySQL分库分表数据到MongoDB同步方案

发表于 2017-11-15

在现有的架构中做大数据分析,第一步面临的问题就是数据如何从关系型数据库到非关系数据库,网上有很多的解决方案,我们也经过了很多的摸索,经历了三套方案的实践,最终使用了canal。这是我们大数据部门的一个同事张同睿写的文章,分享给大家,如果感兴趣后面可以进一步的介绍。

需求背景

近年来,微服务概念持续火热,网络上针对微服务和单体架构的讨论也是越来越多,面对日益增长的业务需求是,很多公司做技术架构升级时优先选用微服务方式。我所在公司也是选的这个方向来升级技术架构,以支撑更大访问量和更方便的业务扩展。

发现问题

微服务拆分主要分两种方式:拆分业务系统不拆分数据库,拆分业务系统拆分库。如果数据规模小的话大可不必拆分数据库,因为拆分数据看必将面对多维度数据查询,跨进程之间的事务等问题。而我所在公司随着业务发展单数据库实例已经不能满足业务需要,所以选择了拆分业务系统同时拆分数据库的模式,所以也面临着以上的问题。本文主要介绍多维度数据实时查询解决方案。当前系统架构和存储结构如下:

解决思路

  • 要对多数据库数据进行查询,首先就需要将数据库同步到一起以方便查询
  • 为了满足大数据量数据需求,所以优先选择NOSQL数据库做同步库
  • NOSQL数据库基本无法进行关联查询,所以需要将关系数据进行拼接操作,转换成非关系型数据
  • 业务多维度查询需要实时性,所以需要选择NOSQL中实时性相对比较好的数据库:MongoDB

根据以上思路,总结数据整合架构如下图所示:

解决方案

目前网上一些数据同步案例分两种:MQ消息同步和binlog数据读取同步

先说MQ消息同步,该同步方式我所在公司试用过一段时间,发现以下问题:

  • 数据围绕业务进行,对业务关键性数据操作发送MQ消息,对业务系统依赖性比较高
  • 对于数据库中存量数据需要单独处理
  • 对于工具表还需要单独维护同步
  • 每次新增数据表都需要重新添加MQ逻辑

考虑到以上问题,用MQ方式同步数据最优解决办法

使用binlog 数据读取方式目前有一些成熟方案,比如tungsten replicator,但这些同步工具只能实现数据1:1复制,数据复制过程自定义逻辑添加比较麻烦,不支持分库分表数据归集操作。综上所述,最优方案应该是读取后binlog后自行处理后续数据逻辑。目前binlog读取binlog工具中最成熟的方案应该就是alibaba开源的canal了。

canal

canal是阿里巴巴mysql数据库binlog的增量订阅&消费组件 。阿里云DRDS、阿里巴巴TDDL 二级索引、小表复制. 都是基于canal做的,应用广泛。canal原理相对比较简单:

  • canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
  • mysql master收到dump请求,开始推送binary log给slave(也就是canal)
  • canal解析binary log对象(原始为byte流)

canal介绍: https://github.com/alibaba/canal/wiki

我使用的是canal的HA模式,由zookeeper选举可用实例,每个数据库一个instance,服务端配置如下:

目录:

1
2
3
4
5
6
复制代码`conf`
   database1
       -instance.properties
   database2
       -instance.properties
   canal.properties

instance.properties

1
2
3
4
5
6
7
8
9
10
11
复制代码`canal.instance.mysql.slaveId = 1001`
canal.instance.master.address = X.X.X.X:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = .*\\..*
canal.instance.filter.black.regex =

canal.properties

1
2
3
4
5
6
7
8
9
10
11
复制代码`canal.id= 1`
canal.ip=X.X.X.X
canal.port= 11111
canal.zkServers=X.X.X.X:2181,X.X.X.X:2181,X.X.X.X:2181
canal.zookeeper.flush.period = 1000
canal.file.data.dir = ${canal.conf.dir}
canal.file.flush.period = 1000
canal.instance.memory.buffer.size = 16384
canal.instance.memory.buffer.memunit = 1024 ...
canal.instance.global.lazy = false
canal.instance.global.spring.xml = classpath:spring/default-instance.xml

部署数据流如下:

tip:*虽然canal同时支持mixed和row类型的binlog日志,但是获取行数据时如果是mixed类型的日志则获取不到表名,所以本方案暂只支持row格式的binlog*

数据同步

创建canal client应用订阅canal读取的binlog数据

1.开启多instance 订阅,订阅多个instance

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
> 复制代码`public void initCanalStart() {`
>    List<String> destinations = canalProperties.getDestination();
>    final List<CanalClient> canalClientList = new ArrayList<>();
>    if (destinations != null && destinations.size() > 0) {
>     `for (String destination : destinations) {`
>            // 基于zookeeper动态获取canal server的地址,建立链接,其中一台server发生crash,可以支持failover
>            CanalConnector connector = CanalConnectors.newClusterConnector(canalProperties.getZkServers(), destination, "", "");
>            CanalClient client = new CanalClient(destination, connector);
>            canalClientList.add(client);
>            client.start();
>        }
>    }
>    Runtime.getRuntime().addShutdownHook(new Thread() {
>        public void run() {
>            try {
>                logger.info("## stop the canal client");
>                for (CanalClient canalClient : canalClientList) {
>                    canalClient.stop();
>                }
>            } catch (Throwable e) {
>                logger.warn("##something goes wrong when stopping canal:", e);
>            } finally {
>                logger.info("## canal client is down.");
>            }
>        }
>    });
> }
>
>

订阅消息处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
> 复制代码`private void process() {`
>    int batchSize = 5 * 1024;
>    while (running) {
>        try {
>            MDC.put("destination", destination);
>            connector.connect();
>            connector.subscribe();
>            while (running) {
>                Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
>                long batchId = message.getId();
>                int size = message.getEntries().size();
>                if (batchId != -1 && size > 0) {
>                    saveEntry(message.getEntries());
>                }
>             `connector.ack(batchId); // 提交确认`
>                // connector.rollback(batchId); // 处理失败, 回滚数据
>            }
>        } catch (Exception e) {
>            logger.error("process error!", e);
>        } finally {
>            connector.disconnect();
>            MDC.remove("destination");
>        }
>    }}
>
>

根据数据库事件处理消息,过滤消息列表,对数据变动进行处理,用到信息为:

  • insert :schemaName,tableName,beforeColumnsList
  • update :schemaName,tableName,afterColumnsList
  • delete :schemaName,tableName,afterColumnsList
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
> 复制代码`RowChange rowChage = null;`
>    try {
>        rowChage = RowChange.parseFrom(entry.getStoreValue());
>    } catch (Exception e) {
>        throw new RuntimeException("parse event has an error , data:" + entry.toString(), e);
>    }
>    EventType eventType = rowChage.getEventType();
>    logger.info(row_format,
>            entry.getHeader().getLogfileName(),
>            String.valueOf(entry.getHeader().getLogfileOffset()), entry.getHeader().getSchemaName(),
>            entry.getHeader().getTableName(), eventType,
>            String.valueOf(entry.getHeader().getExecuteTime()), String.valueOf(delayTime));
>    if (eventType == EventType.QUERY || rowChage.getIsDdl()) {
>        logger.info(" sql ----> " + rowChage.getSql());
>        continue;
>    }
>   `DataService dataService = SpringUtil.getBean(DataService.class);`
>    for (RowData rowData : rowChage.getRowDatasList()) {
>        if (eventType == EventType.DELETE) {
>            dataService.delete(rowData.getBeforeColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else if (eventType == EventType.INSERT) {
>            dataService.insert(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else if (eventType == EventType.UPDATE) {
>            dataService.update(rowData.getAfterColumnsList(), entry.getHeader().getSchemaName(), entry.getHeader().getTableName());
>        } else {
>            logger.info("未知数据变动类型:{}", eventType);
>        }
>    }
> }
>
>

ColumnsList转换成MongoTemplate 可用的数据类:DBObject,顺便做下数据类型转换

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
> 复制代码`public static DBObject columnToJson(List<CanalEntry.Column> columns) {`
>    DBObject obj = new BasicDBObject();
>    try {
>        for (CanalEntry.Column column : columns) {
>            String mysqlType = column.getMysqlType();
>            //int类型,长度11以下为Integer,以上为long
>            if (mysqlType.startsWith("int")) {
>                int lenBegin = mysqlType.indexOf('(');
>                int lenEnd = mysqlType.indexOf(')');
>                if (lenBegin > 0 && lenEnd > 0) {
>                    int length = Integer.parseInt(mysqlType.substring(lenBegin + 1, lenEnd));
>                    if (length > 10) {
>                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>                        continue;
>                    }
>                }
>             `obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Integer.parseInt(column.getValue()));`
>            } else if (mysqlType.startsWith("bigint")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>            } else if (mysqlType.startsWith("decimal")) {
>                int lenBegin = mysqlType.indexOf('(');
>                int lenCenter = mysqlType.indexOf(',');
>                int lenEnd = mysqlType.indexOf(')');
>                if (lenBegin > 0 && lenEnd > 0 && lenCenter > 0) {
>                    int length = Integer.parseInt(mysqlType.substring(lenCenter + 1, lenEnd));
>                    if (length == 0) {
>                        obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Long.parseLong(column.getValue()));
>                        continue;
>                    }
>                }
>             `obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : Double.parseDouble(column.getValue()));`
>            } else if (mysqlType.equals("datetime") || mysqlType.equals("timestamp")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_TIME_FORMAT.parse(column.getValue()));
>            } else if (mysqlType.equals("date")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : DATE_FORMAT.parse(column.getValue()));
>            } else if (mysqlType.equals("time")) {
>                obj.put(column.getName(), StringUtils.isBlank(column.getValue()) ? null : TIME_FORMAT.parse(column.getValue()));
>            } else {
>                obj.put(column.getName(), column.getValue());
>            }
>        }
>    } catch (ParseException e) {
>        e.printStackTrace();        }
>   `return obj;`
> }
>
>

tip:*DBObject对象如果同时用于保存原始数据和组合数据或其他数据,使用时应该深度拷贝对象生成副本,然后使用副本*

数据拼接

我们获取了数据库数据后做拼接操作,比如两张用户表:

1
2
复制代码`user_info:{id,user_no,user_name,user_password}`
user_other_info:{id,user_no,idcard,realname}

拼接后mongo数据为:

1
复制代码user:{_id,user_no,userInfo:{id,user_no,user_name,user_password},userOtherInfo:{id,user_no,idcard,realname})

接收到的数据信息很多,如何才能简单的触发数据拼接操作呢?

先看我们能获取的信息:schemaName,tableName,DBObject,Event(insert,update,delete)

将这些信息标识拼接起来看看:/schemaName/tableName/Event(DBObject),没错,就是一个标准的restful链接。只要我们实现一个简单的springMVC 就能自动获取需要的数据信息进行拼接操作。

先实现@Controller,定义名称为Schema,value对应schemaName

1
2
3
4
5
6
7
8
9
> 复制代码`@Target({ElementType.TYPE})`
> @Retention(RetentionPolicy.RUNTIME)
> @Documented
> @Component
> public  @interface Schema {
> String value() default "";
> }
>
>

然后实现@RequestMapping,定义名称为Table,直接使用Canal中的EventType 对应RequestMethod

1
2
3
4
5
6
7
8
9
> 复制代码`@Target({ElementType.METHOD, ElementType.TYPE})`
> @Retention(RetentionPolicy.RUNTIME)
> @Documented
> public  @interface Table {
>    String value() default "";
>    CanalEntry.EventType[] event() default {};
> }
>
>

然后创建springUtil,实现接口ApplicationContextAware,应用启动 加载的时候初始化两个Map:intanceMap,handlerMap

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
> 复制代码`@Override`
> public void setApplicationContext(ApplicationContext applicationContext) {
>    if (SpringUtil.applicationContext == null) {
>        SpringUtil.applicationContext = applicationContext;
>        //初始化instanceMap数据
>        instanceMap();
>        //初始化handlerMap数据
>        handlerMap();
>    }
> }private void instanceMap() {
>    Map<String, Object> beans = applicationContext.getBeansWithAnnotation(Schema.class);
>    for (Object bean : beans.values()) {
>        Class<?> clazz = bean.getClass();
>        Object instance = applicationContext.getBean(clazz);
>        Schema schema = clazz.getAnnotation(Schema.class);
>        String key = schema.value();
>        instanceMap.put(key, instance);
>        logger.info("instanceMap [{}:{}]", key, bean == null ? "null" : clazz.getName());
>    }
> }private void handlerMap(){  ...}
>
>

调用方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
> 复制代码`public static void doEvent(String path, DBObject obj) throws Exception {`
>    String[] pathArray = path.split("/");
>    if (pathArray.length != 4) {
>        logger.info("path 格式不正确:{}", path);
>        return;
>    }
>    Method method = handlerMap.get(path);
>    Object schema = instanceMap.get(pathArray[1]);
>    //查找不到映射Bean和Method不做处理
>    if (method == null || schema == null) {
>        return;    }
>    try {
>        long begin = System.currentTimeMillis();
>        logger.info("integrate data:{},{}", path, obj);
>        method.invoke(schema, new Object[]{obj});
>        logger.info("integrate data consume: {}ms:", System.currentTimeMillis() - begin);
>    } catch (Exception e) {
>        logger.error("调用组合逻辑异常", e);
>        throw new Exception(e.getCause());
>    }
> }
>
>

数据拼接消息处理:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
> 复制代码`@Schema("demo_user")``public class UserService {`
>    @Table(value = "user_info", event = {CanalEntry.EventType.INSERT, CanalEntry.EventType.UPDATE})
>    public void saveUser_UserInfo(DBObject userInfo) {
>        String userNo = userInfo.get("user_no") == null ? null : userInfo.get("user_no").toString();
>        DBCollection collection = completeMongoTemplate.getCollection("user");
>        DBObject queryObject = new BasicDBObject("user_no", userNo);
>        DBObject user = collection.findOne(queryObject);
>        if (user == null) {
>            user = new BasicDBObject();
>            user.put("user_no", userNo);
>            user.put("userInfo", userInfo);
>            collection.insert(user);
>        } else {                        DBObject updateObj = new BasicDBObject("userInfo", userInfo);
>            DBObject update = new BasicDBObject("$set", updateObj);
>            collection.update(queryObject, update);
>        }
>    }
> }
>
>

示例源码

https://github.com/zhangtr/canal-mongo

原文出处:http://www.torry.top/2017/10/22/canal-mongodb/

推荐阅读

  • 微服务(Microservice)那点事
  • 微服务架构的理论基础 - 康威定律
  • 从架构演进的角度聊聊Spring Cloud都做了些什么?

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Jupyter 常见可视化框架选择 选择标准 框架罗列 bq

发表于 2017-11-15

对于以Python作为技术栈的数据科学工作者,Jupyter是不得不提的数据报告工具。可能对于R社区而言,鼎鼎大名的ggplot2是常见的可视化框架,而大家对于Python,以及Jupyter为核心的交互式报告的可个视化方案就并没有那么熟悉。本文试图比较几个常用的解决方案,方便大家选择。

选择标准

称述式还是命令式

数据工作者使用的图的类别,常见的就三类:GIS可视化、网络可视化和统计图。因此,大多数场景下,我们并不想接触非常底层的基于点、线、面的命令,所以,选择一个好的封装的框架相当重要。

当然,公认较好的封装是基于《The Grammar of Graphics (Statistics and Computing)》一书,R中的ggplot2基本上就是一个很好的实现。我们基本上可以像用「自然语言」(Natural Language)一样使用这些绘图命令。我们姑且采用计算机科学领域的「陈述式」来表达这种绘图方式。

相反,有时候,以下情形时,我们可能对于这种绘图命令可能并不在意:

  1. 出图相当简单,要求绘制速度,一般大的框架较重(当然只是相对而言);
  2. 想要对细节做非常详尽的微调,一般大框架在微调方面会相对复杂或者退缩成一句句命令;
  3. 是统计作图可视化的创新者,想要尝试做出新的可视化实践。

这些情况下,显然,简单操作式并提供底层绘制命令的框架更让人愉快,与上面类似,我们借用「命令式」描述这类框架。

是否交互

与传统的交付静态图标不同,基于Web端的Jupter的一大好处就是可以绘制交互的图标(最近的RNotebook也有实现),因此,是否选择交互式,也是一个需要权衡的地方。

交互图的优势:

  1. 可以提供更多的数据维度和信息;
  2. 用户端可以做更多诸如放大、选取、转存的操作;
  3. 可以交付BI工程师相应的JavaScript代码用以工程化;
  4. 效果上比较炫酷,考虑到报告接受者的特征可以选择。

非交互图的优势:

  1. 报告文件直接导出成静态文件时相对问题,不会因为转换而损失信息;
  2. 图片可以与报告分离,必要时作为其他工作的成果;
  3. 不需要在运行Notebook时花很多世界载入各类前端框架。

是非内核交互

Jupyter上大多数命令通过以下方式获取数据,而大多数绘图方式事实上只是通过Notebook内的代码在Notebook与内核交互后展示出输出结果。但ipywidgets框架则可以实现Code Cell中的代码与Notebook中的前端控件(比如按钮等)绑定来进行操作内核,提供不同的绘图结果,甚至某些绘图框架的每个元素都可以直接和内核进行交互。

3262887070-59fae2a6967b6_articlex

用这些框架,可以搭建更复杂的Notebook的可视化应用,但缺点是因为基于内核,所以在呈递、展示报告时如果使用离线文件时,这些交互就会无效。

框架罗列

matplotlib

最家喻户晓的绘图框架是matplotlib,它提供了几乎所有python内静态绘图框架的底层命令。如果按照上面对可视化框架的分法,matplotlib属于非交互式的的「命令式」作图框架。

1
2
3
4
5
6
7
8
9
10
复制代码## matplotlib代码示例
from pylab import *
 
X = np.linspace(-np.pi, np.pi, 256,endpoint=True)
C,S = np.cos(X), np.sin(X)
 
plot(X,C)
plot(X,S)
 
show()

3262887070-59fae2a6967b6_articlex

优点是相对较快,底层操作较多。缺点是语言繁琐,内置默认风格不够美观。

matplotlib在jupyter中需要一些配置,可以展现更好的效果,详情参见这篇文章.

ggplot和plotnine

值得一说,对于R迁移过来的人来说,ggplot和plotnine简直是福音,基本克隆了ggplot2所有语法。横向比较的话,plotnine的效果更好。这两个绘图包的底层依旧是matplotlib,因此,在引用时别忘了使用%matplotlib inline语句。值得一说的是plotnine也移植了ggplot2中良好的配置语法和逻辑。

1
2
3
4
5
复制代码## plotnine示例
(ggplot(mtcars, aes('wt', 'mpg', color='factor(gear)'))
+ geom_point()
+ stat_smooth(method='lm')
+ facet_wrap('~gear'))

3262887070-59fae2a6967b6_articlex

Seaborn

seaborn准确上说属于matplotlib的扩展包,在其上做了许多非常有用的封装,基本上可以满足大部分统计作图的需求,以matplotlib+seaborn基本可以满足大部分业务场景,语法也更加「陈述式」。

缺点是封装较高,基本上API不提供的图就完全不可绘制,对于各类图的拼合也不适合;此外配置语句语法又回归「命令式」,相对复杂且不一致。

1
2
3
4
5
复制代码## seaborn示例
import seaborn as sns; sns.set(color_codes=True)
iris = sns.load_dataset("iris")
species = iris.pop("species")
g = sns.clustermap(iris)

3262887070-59fae2a6967b6_articlex

plotly

plotly是跨平台JavaScript交互式绘图包,由于开发者的核心是javascript,所以整个语法类似于写json配置,语法特质也介于「陈述式」和「命令式」之间,无服务版本是免费的。

有点是学习成本不高,可以很快将语句移植到javascript版本;缺点是语言相对繁琐。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
复制代码##plotly示例
import plotly.plotly as py
import plotly.graph_objs as go
 
# Add data
month = ['January', 'February', 'March', 'April', 'May', 'June', 'July',
         'August', 'September', 'October', 'November', 'December']
high_2000 = [32.5, 37.6, 49.9, 53.0, 69.1, 75.4, 76.5, 76.6, 70.7, 60.6, 45.1, 29.3]
low_2000 = [13.8, 22.3, 32.5, 37.2, 49.9, 56.1, 57.7, 58.3, 51.2, 42.8, 31.6, 15.9]
high_2007 = [36.5, 26.6, 43.6, 52.3, 71.5, 81.4, 80.5, 82.2, 76.0, 67.3, 46.1, 35.0]
low_2007 = [23.6, 14.0, 27.0, 36.8, 47.6, 57.7, 58.9, 61.2, 53.3, 48.5, 31.0, 23.6]
high_2014 = [28.8, 28.5, 37.0, 56.8, 69.7, 79.7, 78.5, 77.8, 74.1, 62.6, 45.3, 39.9]
low_2014 = [12.7, 14.3, 18.6, 35.5, 49.9, 58.0, 60.0, 58.6, 51.7, 45.2, 32.2, 29.1]
 
# Create and style traces
trace0 = go.Scatter(
    x = month,
    y = high_2014,
    name = 'High 2014',
    line = dict(
        color = ('rgb(205, 12, 24)'),
        width = 4)
)
trace1 = go.Scatter(
    x = month,
    y = low_2014,
    name = 'Low 2014',
    line = dict(
        color = ('rgb(22, 96, 167)'),
        width = 4,)
)
trace2 = go.Scatter(
    x = month,
    y = high_2007,
    name = 'High 2007',
    line = dict(
        color = ('rgb(205, 12, 24)'),
        width = 4,
        dash = 'dash') # dash options include 'dash', 'dot', and 'dashdot'
)
trace3 = go.Scatter(
    x = month,
    y = low_2007,
    name = 'Low 2007',
    line = dict(
        color = ('rgb(22, 96, 167)'),
        width = 4,
        dash = 'dash')
)
trace4 = go.Scatter(
    x = month,
    y = high_2000,
    name = 'High 2000',
    line = dict(
        color = ('rgb(205, 12, 24)'),
        width = 4,
        dash = 'dot')
)
trace5 = go.Scatter(
    x = month,
    y = low_2000,
    name = 'Low 2000',
    line = dict(
        color = ('rgb(22, 96, 167)'),
        width = 4,
        dash = 'dot')
)
data = [trace0, trace1, trace2, trace3, trace4, trace5]
 
# Edit the layout
layout = dict(title = 'Average High and Low Temperatures in New York',
              xaxis = dict(title = 'Month'),
              yaxis = dict(title = 'Temperature (degrees F)'),
              )
 
fig = dict(data=data, layout=layout)
py.iplot(fig, filename='styled-line')

3262887070-59fae2a6967b6_articlex

注意:此框架在jupyter中使用需要使用init_notebook_mode()加载JavaScript框架。

bokeh

bokeh是pydata维护的比较具有潜力的开源交互可视化框架。

值得一说的是,该框架同时提供底层语句和「陈述式」绘图命令。相对来说语法也比较清楚,但其配置语句依旧有很多可视化框架的问题,就是与「陈述式」命令不符,没有合理的结构。此外,一些常见的交互效果都是以底层命令的方式使用的,因此如果要快速实现Dashboard或者作图时就显得较为不便了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
复制代码## Bokeh示例
import numpy as np
import scipy.special
 
from bokeh.layouts import gridplot
from bokeh.plotting import figure, show, output_file
 
p1 = figure(title="Normal Distribution (μ=0, σ=0.5)",tools="save",
            background_fill_color="#E8DDCB")
 
mu, sigma = 0, 0.5
 
measured = np.random.normal(mu, sigma, 1000)
hist, edges = np.histogram(measured, density=True, bins=50)
 
x = np.linspace(-2, 2, 1000)
pdf = 1/(sigma * np.sqrt(2*np.pi)) * np.exp(-(x-mu)**2 / (2*sigma**2))
cdf = (1+scipy.special.erf((x-mu)/np.sqrt(2*sigma**2)))/2
 
p1.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
        fill_color="#036564", line_color="#033649")
p1.line(x, pdf, line_color="#D95B43", line_width=8, alpha=0.7, legend="PDF")
p1.line(x, cdf, line_color="white", line_width=2, alpha=0.7, legend="CDF")
 
p1.legend.location = "center_right"
p1.legend.background_fill_color = "darkgrey"
p1.xaxis.axis_label = 'x'
p1.yaxis.axis_label = 'Pr(x)'
 
 
 
p2 = figure(title="Log Normal Distribution (μ=0, σ=0.5)", tools="save",
            background_fill_color="#E8DDCB")
 
mu, sigma = 0, 0.5
 
measured = np.random.lognormal(mu, sigma, 1000)
hist, edges = np.histogram(measured, density=True, bins=50)
 
x = np.linspace(0.0001, 8.0, 1000)
pdf = 1/(x* sigma * np.sqrt(2*np.pi)) * np.exp(-(np.log(x)-mu)**2 / (2*sigma**2))
cdf = (1+scipy.special.erf((np.log(x)-mu)/(np.sqrt(2)*sigma)))/2
 
p2.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
        fill_color="#036564", line_color="#033649")
p2.line(x, pdf, line_color="#D95B43", line_width=8, alpha=0.7, legend="PDF")
p2.line(x, cdf, line_color="white", line_width=2, alpha=0.7, legend="CDF")
 
p2.legend.location = "center_right"
p2.legend.background_fill_color = "darkgrey"
p2.xaxis.axis_label = 'x'
p2.yaxis.axis_label = 'Pr(x)'
 
 
 
p3 = figure(title="Gamma Distribution (k=1, θ=2)", tools="save",
            background_fill_color="#E8DDCB")
 
k, theta = 1.0, 2.0
 
measured = np.random.gamma(k, theta, 1000)
hist, edges = np.histogram(measured, density=True, bins=50)
 
x = np.linspace(0.0001, 20.0, 1000)
pdf = x**(k-1) * np.exp(-x/theta) / (theta**k * scipy.special.gamma(k))
cdf = scipy.special.gammainc(k, x/theta) / scipy.special.gamma(k)
 
p3.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
        fill_color="#036564", line_color="#033649")
p3.line(x, pdf, line_color="#D95B43", line_width=8, alpha=0.7, legend="PDF")
p3.line(x, cdf, line_color="white", line_width=2, alpha=0.7, legend="CDF")
 
p3.legend.location = "center_right"
p3.legend.background_fill_color = "darkgrey"
p3.xaxis.axis_label = 'x'
p3.yaxis.axis_label = 'Pr(x)'
 
 
 
p4 = figure(title="Weibull Distribution (λ=1, k=1.25)", tools="save",
            background_fill_color="#E8DDCB")
 
lam, k = 1, 1.25
 
measured = lam*(-np.log(np.random.uniform(0, 1, 1000)))**(1/k)
hist, edges = np.histogram(measured, density=True, bins=50)
 
x = np.linspace(0.0001, 8, 1000)
pdf = (k/lam)*(x/lam)**(k-1) * np.exp(-(x/lam)**k)
cdf = 1 - np.exp(-(x/lam)**k)
 
p4.quad(top=hist, bottom=0, left=edges[:-1], right=edges[1:],
       fill_color="#036564", line_color="#033649")
p4.line(x, pdf, line_color="#D95B43", line_width=8, alpha=0.7, legend="PDF")
p4.line(x, cdf, line_color="white", line_width=2, alpha=0.7, legend="CDF")
 
p4.legend.location = "center_right"
p4.legend.background_fill_color = "darkgrey"
p4.xaxis.axis_label = 'x'
p4.yaxis.axis_label = 'Pr(x)'
 
 
 
output_file('histogram.html', title="histogram.py example")
 
show(gridplot(p1,p2,p3,p4, ncols=2, plot_width=400, plot_height=400, toolbar_location=None))

3262887070-59fae2a6967b6_articlex

bqplot

bqplot是基于ipywidgets和d3.js组合发展的内核交互式的可视化框架。语法上采用了和matplotlib大致一致的语法已经相对封装较高的「陈述式语法」。优点是直接和内核交互,可以使用大量控件来实现更多的图像处理,缺点也是直接的,离线文档则不会显示任何图案、控件也都失效。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码## bqplot示例
import numpy as np
from IPython.display import display
from bqplot import (
    OrdinalScale, LinearScale, Bars, Lines, Axis, Figure
)
 
size = 20
np.random.seed(0)
 
x_data = np.arange(size)
 
x_ord = OrdinalScale()
y_sc = LinearScale()
 
bar = Bars(x=x_data, y=np.random.randn(2, size), scales={'x': x_ord, 'y':
y_sc}, type='stacked')
line = Lines(x=x_data, y=np.random.randn(size), scales={'x': x_ord, 'y': y_sc},
             stroke_width=3, colors=['red'], display_legend=True, labels=['Line chart'])
 
ax_x = Axis(scale=x_ord, grid_lines='solid', label='X')
ax_y = Axis(scale=y_sc, orientation='vertical', tick_format='0.2f',
            grid_lines='solid', label='Y')
 
Figure(marks=[bar, line], axes=[ax_x, ax_y], title='API Example',
       legend_location='bottom-right')

3262887070-59fae2a6967b6_articlex

其他特殊需求的作图

除了统计作图,网络可视化和GIS可视化也是很常用的,在此只做一个简单的罗列:

GIS类:

  • gmap:交互,使用google maps接口
  • ipyleaflet:交互,使用leaflet接口

网络类:

  • networkx:底层为matplotlib
  • plotly

总结

底层实现 交互方式 语法 语言结构 备注 推荐程度
matplotlib – 无 命令式 底层语言 可以实现复杂底层操作 ★★★
gglot matplotlib 无 陈述式 类ggplot2 建议选择plotnine ★★
plotnine matplotlib 无 陈述式 类ggplot2 完全移植ggplot2 ★★★★★
seaborn matplotlib 无 陈述式 高级语言 有很多有用的统计图类的封装;但不适合做图拼装 ★★★★★
plotly plotly.js 前端交互 介于命令式和陈述式之间 类似JavaScript 语法类似于json配置 ★★★★
bokeh – 前端交互 命令、陈述式 同时有底层语言和高级语言 社区具有潜力 ★★★
bqplot d3.js 内核交互 命令、陈述式 有类似matplotlib底层语言,已经封装好的高级语言 内核交互 ★★★★

1 赞 收藏 评论

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何实现飞跃:在架构内构建云就绪应用

发表于 2017-11-15

本文要点

  • 云就绪(Cloud-Ready)!=云原生(Cloud-Native),后者比前者要进行更多的重建。
  • 云就绪的工作主要是修改部署过程,也可能要转变思维方式。
  • 云管理平台(CMP)是一个辅助工具。
  • 好处主要是可移植性、合理大小的云部署以及应对未来变化的灵活性,这样在处理这些老旧的应用的时候,你可以节省更多的时间。
  • 大处着眼,小处着手。不必一下子做完所有的事,那就从可控的开始吧。

在企业中,应用组合可能很复杂。 应用程序包含多样的需求和多种架构并不罕见,这些架构涉及的范围非常广,从电子邮件、维基和博客等协作项目到金融交易资产,再到人力资源的员工门户网站以及面向客户的营销网站。为了给这种多样化的需求寻找一种恰当的托管方式,我们有一种现代化的方法,那就是,在数据中心中运行一些应用程序,而其他应用程序在公有云中运行。研究表明,这样的混合云方法受到多达73%的组织的欢迎。

但这具体是什么意思呢?这些截然不同的应用直接在计算机上运行多年,而编写这些应用的开发人员早已离开公司,如何将这些应用变得云就绪(公有云或私有云)?

云就绪与云原生

云原生应用编写之初就是为在公有云上运行,通常意味着基于容器进行部署。这种架构在其初始设计中内置了水平自动扩展能力,并且通常依赖于云端衍生服务,如负载均衡器、对象存储、托管数据库和队列系统,这样它可以专注于具体任务的业务逻辑。通常,持续集成/持续交付工具链与这些应用相关联,因此敏捷软件开发方法可以快速地推出新的迭代。

也就是说,这不是这里讨论的那种应用。

经典的企业应用程序有多个组件,如Web服务器、应用服务器和数据库服务器。许多应用程序最初是在客户端-服务器时代编写的,要直接在硬件上运行它们。这些类型的应用程序尽管年代久远,但仍然可以变成云就绪的。基本上,组件通过TCP连接使用IP地址和端口号进行通信,通常由DNS辅助。这些应用程序的结构丝毫不会阻碍它们在虚拟机或甚至是容器上运行,它们如果可以在两者中任意一个上运行,那么也就可以部署到任何公有云或私有云上。

虽然这样的应用程序无法像云原生应用程序一样充分利用公有云提供的服务,但有时候,经典的企业应用程序可以变成云就绪的,并从中受益,而无需完全重写。这种转变的限制因素通常不是代码,因为如上所述,代码在公有云上运行时运行的上下文不会改变。相反,限制因素往往是部署机制,而检查应用程序生命周期的这个方面就能将经典的多层企业应用程序转变为云就绪的应用程序。

你如今如何部署?

在考虑现有的客户端-服务器时代应用程序的云就绪时,请问自己以下问题:应用程序如今如何部署?

最终要考虑生产环境,再三琢磨部署细节能了解使应用程序云就绪的难易程度。应用程序是五年前手动部署的吗,部署人员之后就退休了?这很可能表明,应用程序要变成云就绪还有很多问题。

应用程序有一套脚本来准备其Linux或Windows环境,然后自动安装自定义设置和依赖吗?离现在较近的应用程序有一组脚本,可以自动执行诸如准备操作系统内核和将IP地址注入到配置文件中的任务。这些过程和(或)脚本是过渡到云部署的关键,为云就绪指明了路线。在准备这些经典应用程序时,最重要的背景知识是,上述过程如何运行。

这两种极端的中间情况是,应用程序没有自动化脚本,但是有一本运行手册,详细说明了为应用程序准备特定环境所需的步骤。尽管没有完全使用脚本的场景那么容易转变成云就绪,也没有手动案例那么难,但这种情况描绘了一个需要根据具体情况仔细审查的灰色区域。

宠物与家畜

开始云就绪之前,另一个要考虑的部署方面涉及到思维方式的改变。对于这些较老的应用程序,根据其发布的年代,物理硬件当时是一种稀缺资源。获取新硬件需要几个月的时间,这影响了应用程序架构,这种架构考量物理服务器的养护,将软件发布视为风险。因此,我们将服务器看做宠物,给它们取名字,并尽我们所能,保持它们一直运行。如果在这些宠物般的服务器上所发布的新软件引入了难以恢复的风险,从零开始重新安装一台物理机器的成本是非常高的。

相比之下,采用虚拟机的方案能够控制额外的生命周期,带来了改进应用程序部署的可能性。在虚拟机世界中,计算资源可以被视为一次性实体,而不是稀缺的资源。例如,与其花费时间来升级VM的操作系统,不如使用新的操作系统创建一个新的VM,将其插入到负载均衡器池中,然后删除旧的VM。将VM看做家畜,带来了诸如水平自动扩展和更快的发布周期等优点,因为计算资源的稀缺性被最小化了。

即便不是全部,但有一些遗留企业应用程序适合这种云就绪的现代化方式。具体地说,不同程度地使用负载均衡器的应用或者允许节点插入的应用都是非常适合采用这种方式进行改善的。无论如何,在云就绪过程中,了解这两种方法之间的差异非常重要。

工具

云管理平台(CMP)专门用于辅助云就绪相关的准备工作。它提供了一种机制,了解特定应用程序部署的人员可以建立一个描述每个应用程序组件以及它们如何彼此交互的蓝图或资料。

CMP通常提供常用组件的实现,如上所示的HAProxy、Apache和MySQL。想要自己实现这些层的组织可以创建自己的组件,也可以在每一层上注入脚本来自定义安装。

例如,在上面的屏幕截图中,可能需要一些自定义的MySQL配置,这超出了CMP提供的基本安装的功能范围,比如我们想在操作系统上安装特定监控或安全软件。这里就能很好地用到部署过程的知识,以确定对目前正在使用的基本组件的补充之处。

CMP的核心优势在于,它将具体的公有云和私有云的细节抽象化,从而简化了经典企业应用程序向云就绪过渡的过程。例如,IT人员不必成为Google Cloud Platform、Amazon Web Services和VMware API的专家,而是要努力将应用程序抽象为CMP表示形式。虽然可以直接使用新的云API编写部署脚本将应用程序变得云就绪,但如果将来选择了不同的云,那么部署脚本也得修改。通过使用CMP的抽象来掩盖这些API细节,这可以一次完成,通常比直接使用API的方法更省力。

好处与如何开始

将经典企业应用程序转变成云就绪,使用CMP最大的好处是,可移植性和易用性。更进一步,可以将水平自动扩展和蓝绿部署注入到新的部署脚本中,从而为一些应用程序提供额外的好处。一些CMP甚至能够针对云部署运行基准测试,以便可以正确选择每个应用程序层的VM大小,甚至可以使用价格和性能标准彼此进行比较。这能够确保,应用程序满足业务需求的情况下,应用程序运行在最高效的云上,同时可以使用与初次云就绪时相同的CMP抽象轻松地迁移云平台。

大多数公司最开始所采用的部署机制都是移植那些低需求、无复杂高可用性需求且最初的部署专家仍然在职的应用程序。这降低了首次尝试云就绪转变的风险。早期成功之后,就可以评估应用程序组合的其余部分,并排定云就绪的优先级。通常,这意味着推后需求程度高、可用性要求高或专家早已不在的应用程序。

关于作者

作为思科全球合作伙伴组织的云技术解决方案架构师,Pete Johnson是一名有着20多年技术行业经验的资深人士,在Twitter上关注@nerdguru可以联系到他。

查看英文原文:www.infoq.com/articles/cl…

感谢张卫滨对本文的审校。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…944945946…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%