开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

后端服务性能压测实践 作者:王清培本文原创,转载请注明作者及

发表于 2017-12-07

作者:王清培本文原创,转载请注明作者及出处

目录

  • 背景
  • 环境检测
  • 压力机及压力工具检测
  • Linux openfiles limit 设置
  • 排查周边依赖
  • 空接口压测检测
  • 聚合报告中 throughput 计算
  • 压测及性能排查方法
  • 关注各纬度 log
  • Linux 常规命令
  • 性能排查两种方式(从上往下、从下往上)
  • 总结

背景

最近大半年内有过两次负责性能压测的一些工作。一件事情做了一次可能还无法总结出一些东西,两次过后还是能发现一些共性问题,所以总结下性能压测的一般性实践。但是问题肯定不止这些,还有更多深层次的问题等着发现,等我们遇到了在逐个解决再来总结分享。

做性能压测的原因就不多说了,一般两个时间点是必须要做的,大促前、新系统上线。压测都是为了系统在线上的处理能力和稳定性维持在一个标准范围内,做到心中有数。

从整个行业来看,抛开一些大厂不说,全自动化的性能压测环境还是比较少的,要想建设好一套全自动化的性能压测环境起码涉及到几个问题,CI\CD、独立、隔离的压测环境,自动化压测工具、日常压测性能报警、性能报表分析、排查/解决性能问题流程等等。这样才能将性能压测常规化,一旦不是常规化性能压测,就会有代码、中间件配置滞后于生产环境的问题。时间一长,就等于要重新开始搭建、排查压测环境。

如果性能压测的环境是全自动化的,那么就可以把性能压测工作常规化变成研发过程中的一个例行的事项,执行起来效率就会非常高,压测的时候也会比较轻松,好处也是比较明显的。

但是大多数的时候我们还是需要从零开始进行性能压测的工作。毕竟搭建这样一套环境给企业带来的成本也是巨大的。性能压测对环境敏感,必须划分独立的部署、隔离单元,才能在后续的常规压测流程中直观的阅读压测报告。

题外话,如果有了自动化的压测环境,也还是需要去了解下整个压测环境的基本架构,毕竟压测环境不是真实的生产环境,有些问题我们需要知道是正常的还是不正常的。

环境检测

当我们需要进行性能压测时首先要面对的问题就是环境问题,环境问题包含了常见的几个点:

1.机器问题(实体机还是虚拟机、CPU、内存、网络适配器进出口带宽、硬盘大小,硬盘是否 SSD、内核基本参数配置)

2.网络问题(是否有跨网段问题、网段是否隔离、如果有跨网段机器,是否能访问、跨网段是否有带宽限速)

3.中间件问题(程序里所有依赖的中间件是否有部署,中间件的配置是否初始化、中间件 cluster 结构什么样、这些中间件是否都进行过性能压测、压测的纬度是什么,是 benchmark 还是针对特定业务场景的压测)

这些环境问题第一次排查的时候会有点累,但是掌握了一些方法、工具、流程之后剩下的也就是例行的事情,只不过人工参与的工作多点。

上面的问题里,有些问题查看是比较简单的,这里就不介绍了,比如机器的基本配置等。有些配置只需要你推动下,走下相关流程回头验收下,比如网段隔离等,也还是比较简单的。

比较说不清楚的是中间件问题,看上去都是能用的,但是就是压不上去,这时候就需要你自己去进行简单的压测,比如 db 的单表插入、cache 的并发读取、mq 的落地写入等。这时候就涉及到一个问题,你需要对这些中间件都有一定深度的了解,要知道内在的运行机制,要不然出现异常情况排查起来确实很困难。

其实没有人能熟悉市面上所有的中间件,每一个中间件都很复杂,我们也不可能掌握一个中间件的所有点,但是常用的一些我们是需要掌握的,至少知道个大概的内部结构,可以顺藤摸瓜的排查问题。

但是事实上总有你不熟悉的,这个时候求助下大家的力量互相探讨再自己摸索找点资料,我们没遇到过也许别人遇到过,学技术其实就是这么个过程。

压力机及压力工具检测

既然做性能压测就需要先对压测机、压力工具先进行了解,压测工具我们主要有 locust、jmeter、ab,前两者主要是压测同事进行准出验收测试使用的。

后两者主要是用来提交压测前的自检使用,就是开发自己用来检查和排错使用的。这里需要强调下 ab 其实是做基准测试的,不同于 jmeter 的作用。

需要知道压力机是否和被压测机器服务器在一个网段,且网段之间没有任何带宽限制。压力机的压测工具配置是否有瓶颈,一般如果是 jmeter 的话需要检查 java 的一些基本配置。

但是一般如果压力机是固定不变的,一直在使用的,那么基本不会有什么问题,因为压力机压测同事一直维护者,反而是自己使用的压测工具的参数要做好配置和检测。

用 jmeter 压测的时候,如果压测时间过长,记得关掉 监听器->图形结果 面板,因为那个渲染如果时间太长基本会假死,误以为会是内存的问题,其实是渲染问题。

在开发做基准压测的时候有一个问题就是办公网络与压测服务器的网络之间的带宽问题,压力过大会导致办公网络出现问题。所以需要错开时间段。

大致梳理好后,我们需要通过一些工具来查看下基本配置是否正常。比如,ethtool 网络适配器信息、nload 流量情况等等,当然还有很多其他优秀的工具用来查看各项配置,这里就不罗列了。

使用 ethtool 查看网络适配器信息前需要先确定当前机器有几个网络适配器,最好的办法是使用 ifconfig找到你正在使用的网络适配器。

排除 127.0.0.1 的适配器外,还有三个适配器信息,只有第一个 bond0 才是我们正在使用的,然后使用 ethtool 查看当前 bond0 的详细适配器信息。重点关注下 speed 域,它表示当前网络适配器的带宽。

虽然网络适配器可能配置的没有问题,但是整个网络是否没问题还需要咨询相关的运维同事进行排查下,中间还可能存在限速问题。

要确定网络带宽确实没有问题,我们还需要一个实时的监控网络流量工具,这里我们使用nload来监控下进出口流量问题。

这个工具还是很不错的,尤其是在压测的过程中可以观察流量的进出口情况,尤其是排查一些间隙抖动情况。

如果发现进口流量一直很正常,出口流量下来了有可能系统对外调用再放慢,有可能是下游调用 block,但是 request 线程池还未跑满,也有可能内部是纯 async ,request 线程根本不会跑满,也有可能是压测工具本身的压力问题等等。但是我们至少知道是自己的系统对外调用这个边界出了问题。

Linux openfiles limit 设置

工作环境中,一般情况下 linux 打开文件句柄数上限是不需要我们设置的,这些初始化的值运维同事一般是设置过的,而且是符合运维统一标准的。但是有时候关于最大连接数设置还要根据后端系统的使用场景来决定。

以防万一我们还是需要自己检查下是否符合当前系统的压测要求。

在 Linux 中一切都是文件,socket 也是文件,所以需要查看下当前机器对于文件句柄打开的限制,查看 ulimit -a的 open files 域,也可以直接查看ulimit -n 。

如果觉得配置的参数需要调整,可以通过编辑 /etc/security/limits.conf 配置文件。

排查周边依赖

要想对一个服务进行压测,就需要对这个服务周边依赖进行一个排查,有可能你所依赖的服务不一定具备压测条件。并不是每个系统的压测都在一个时间段内,所以你在压测的时候别人的服务也许并不需要压测等等。

还有类似中间件的问题,比如,如果我们依赖中间件 cache ,那么是否有本地一级 cache ,如果有的话也许对压测环境的中间件 cache 依赖不是太大。如果我们依赖中间件 mq ,是不是在业务上可以断开对 mq的依赖,因为我们毕竟不是对 mq 进行压测。还有我们所依赖服务也不关心我们的压测波动。

整理出来之后最好能画个草图,再重新 git branch -b 重新拉一个性能压测的 branch 出来根据草图进行调整代码依赖。然后压测的时候观察流量和数据的走向,是否符合我们梳理之后的路线。

空接口压测检测

为了快速验证压测服务一个简单的办法,就是通过压测一个空接口,查看下整个网络是否通畅,各个参数是否大体上正常。

一般在任何一个后端服务中,都有类似 health_check 的 endpoint,方便起见可以直接找一个没有任何下游依赖的接口进行压测,这类接口主要是为了验证服务器的 online、offline 状态。

如果当前服务没有类似 health_check 新建一个空接口也可以,而且实践证明,一个服务在生产环境非常需要这么一个接口,必要情况下可以帮助来排查调用链路问题。

《发布!软件的设计与部署》Jolt 大奖图书 第17章 透明性 介绍了架构的透明性设计作用。

聚合报告中 throughput 计算

我们在用 jmeter 进行压测的时候关于 聚合报告 中的 throughput 理解需要统一下。

正常情况下在使用 jmeter 压测的时候会仔细观察 throughput 这一列的变化情况,但是没有搞清楚 thourghput 的计算原理的时候就会误以为是 tps/qps 下来了,其实有时候是整个远程服务器根本就没有 response 了。

throughput=samples/压测时间

throughput(吞吐量) 是单位时间内的请求处理数,一般是按 second 计算,如果是压测 write 类型的接口,那么就是 tps 指标。如果压测 read 类型的接口,那么就是 qps 指标。这两种类型的指标是完全不一样的,我们不能搞混淆了。

200(throughput) tps=1000(write)/5(s)**1000(throughput) qps=2000(read)/2(s)**

当我们发现 throughput 逐渐下来的时候要考虑一个时间的纬度。

也就是说我们的服务有可能已经不响应了,但是随着压测时间的积累,整个吞吐量的计算自然就在缓慢下滑,像这种刺尖问题是发现不了的。

这一点用ui版本的 jmeter 尤其明显,因为它的表现方式就是在欢欢放慢。用 Linux 版本的 jmeter 还好点,因为它的输出打印是隔断时间才打印。

关于这个点没有搞清楚非常影响我们对性能压测的结果判断。所以我们在压测的时候一定要有监控报表,才能知道在整个压测过程中服务器的各项指标是否出现过异常情况。

大多数的时候我们还会使用 apache ab 做下基本的压测,主要是用来与 jmeter 对比下,两个工具压测的结果是否相差不大,主要用来纠偏一些性能虚高问题。

apache ab 与 jmeter 各有侧重,ab 可以按固定请求数来压,jmeter 可以按时间来压,最后计算的时候需要注意两者区别。ab 好像是没有请求错误提示和中断的,jmeter 是有错误提示,还有各个纬度断言设置。

我们在使用压测工具的时候,大致了解下工具的一些原理有助于准确的使用这款工具。

压测及性能排查方法

在文章的前面部分讲到了 排查周边依赖 的环境检查步骤。其实要想顺利的进行压测,这一步是必须要有的。经过这一步分析我们会有一个基本的 系统依赖 roadmap 。

基于这份 系统依赖 roadmap 我们将进行性能压测和问题定位及性能优化。

合理的系统架构应该是上层依赖下层,在没有确定下游系统性能的情况下,是没办法确定上游系统性能的瓶颈在哪里。

所以压测的顺序应该尽可能的从下往上依次进行,这样可以避免无意义的排查由于下游吞吐量不够带来的性能问题。越是下游系统性能要求越高,因为上游系统的性能瓶颈直接依赖下游系统。

比如,商品系统的 v1/product/{productid} 前台接口,吞吐量为 qps 8000,那么所有依赖这个接口的上游服务在这个代码路径上最高吞吐量瓶颈就是 8000 ,代码路径不管是 tps 还是 qps 都是一样的瓶颈。

上层服务可以使用 async方式来提高 request 并发量,但是无法提高代码路径在 v1/product/{productid} 业务上的吞吐量。

我们不能将并发和吞吐量搞混淆了,系统能扛住多少并发不代表吞吐量就很高。可以有很多方式来提高并发量,threadpool 提高线程池大小 、socket 类c10k 、nio事件驱动,诸如此类方法。

关注各纬度 log

当在压测的过程中定位性能问题的性价比较高的方法就是请求处理的log,请求处理时长log,对外接口调用时长log,这一般能定位大部分比较明显的问题。当我们用到了一些中间件的时候都会输出相应的执行log。

如下所示,在我们所使用的开发框架中支持了很多纬度的执行log,这在排查问题的时候就会非常方便。

slow.log 类型的慢日志还是非常有必要记录下来的,这不仅在压测的时候需要,在生产上我们也是非常需要。

如果我们使用了各种中间件,那就需要输出各种中间件的处理日志,mq.log、cache.log、 search.log 诸如此类。

除了这些 log 之外,我们还需要重点关注运行时的 gc log。

我们主要使用 Java 平台,在压测的时候关注 gc log 是正常的事。哪怕不是 Java 程序,类似基于 vm 的语言都需要关注 gc log 。根据 jvm gcer 配置的不同,输出的日志也不太一样。

一般电商类的业务,以响应为优先时 gc 主要是使用 cms+prenew ,关注 full gc 频次,关注 cms 初始标记、 并发标记、重新标记、并发清除 各个阶段执行时间, gc 执行的
real time ,pernew 执行时的内存回收大小等 。

java gc 比较复杂涉及到的东西也非常多,对 gc log 的解读也需要配合当前的内存各个代的大小及一系列 gc 的相关配置不同而不同。

《Java性能优化权威指南》 java之父gosling推荐,可以长期研究和学习。

Linux 常规命令

在压测的过程中为了能观察到系统的各项资源消耗情况我们需要借助各种工具来查看,主要包括网络、内存、处理器、流量。

netstat

主要是用来查看各种网络相关信息。

比如,在压测的过程中,通过 netstat wc 看下 tcp 连接数是否和服务器 threadpool 设置的匹配。

netstat -tnlp | grep ip | wc -l

如果我们服务器的 threadpool 设置的是50,那么可以看到 tcp 连接数应该是50才对。然后再通过统计 jstack 服务器的 request runing 状态的线程数是不是>=50。

request 线程数的描述信息可能根据使用的 nio 框架的不同而不同。

还有使用频率最高的查看系统启动的端口状态、tcp 连接状态是 establelished 还是 listen 状态。

netstat -tnlp

再配合 ps 命令查看系统启动的状态。这一般用来确定程序是否真的启动了,如果启动了是不是 listen 的端口与配置中指定的端口不一致。

ps aux | grep ecm-placeorder

netstat 命令很强大有很多功能,如果我们需要查看命令的其他功能,可以使用man netstat 翻看帮助文档。

vmstat

主要用来监控虚拟处理器的运行队列统计信息。

vmstat 1

在压测的时候可以每隔 1s 或 2s 打印一次,可以查看处理器负载是不是过高。 procs 列 r 子列就是当前处理器的处理队列,如果这个值超高当前 cpu core 数那么处理器负载将过高。可以和下面将介绍的 top 命令搭配着监控。

同时此命令可以在处理器过高的时候,查看内存是否够用是否出现大量的内存交换,换入换出的量多少 swap si 换入 swap so 换出。是否有非常高的上下文切换 system cs 每秒切换的次数,system us 用户态运行时间是否很少。是否有非常高的 io wait 等等。

关于这个命令网上已经有很多优秀的文章讲解,这里就不浪费时间重复了。同样可以使用 man vmstat 命令查看各种用法。

mpstat

主要用来监控多处理器统计信息

mpstat -P ALL 1

我这是一个 32 core 的压测服务器,通过 mpstat 可以监控每一个虚拟处理器的负载情况。也可以查看总的处理器负载情况。

mpstat 1

可以看到 %idle 处于闲置状态的 cpu 百分比, %user 用户态任务占用的 cpu 百分比,%sys 系统态内核占用 cpu 百分比,%soft 软中断占用 cpu 百分比,%nice 调整任务优先级占用的 cpu 百分比等等。

iostat

主要用于监控io统计信息

iostat 1

如果我们有大量的 io 操作的话通过 iostat 监控 io 的写入和读取的数据量,同时也能看到在 io 负载特别大的情况下 cpu 的平均负载情况。

top

监控整个系统的整体性能情况top 命令是我们在日常情况下使用频率最高的,可以对当前系统环境了如指掌。处理器 load 率情况,memory 消耗情况,哪个 task 消耗 cpu 、memory 最高。

top

top 命令功能非常丰富,可以分别根据 %MEM、 %CPU 排序。

load average 域表示 cpu load 率情况,后面三段分别表示最近1分钟、5分钟、15分钟的平均 load 率。这个值不能大于当前 cpu core 数,如果大于说明 cpu load 已经严重过高。就要去查看是不是线程数设置的过高,还要考虑这些任务是不是处理时间太长。设置的线程数与任务所处理的时长有直接关系。

Tasks 域表示任务数情况,total 总的任务数,running 运行中的任务数,sleeping 休眠中的任务数,stopped 暂停中的任务数,zombie 僵尸状态任务数。

Swap 域表示系统的交换区,压测的时候关注 used 是否会持续升高,如果持续升高说明物理内存已经用完开始进行内存页的交换。

free

查看当前系统的内存使用情况

free -m

total 总内存大小,used 已经分配的内存大小,free 当前可用的内存大小,shared 任务之间的共享内存大小,buffers 系统已经分配但是还未使用的,用来存放文件 matedata 元数据内存大小,cached 系统已经分配但是还未使用的,用来存放文件的内容数据的内存大小。

-/+buffer/cache

used 要减去 buffers/cached ,也就是说并没有用掉这么多内存,而是有一部分内存用在了 buffers/cached 里。

free 要加上 buffers/cached ,也就是说还有 buffers/cached 空余内存需要加上。

Swap 交换区统计,total 交换区总大小,used 已经使用的交换区大小,free 交换区可用大小。只需要关注 used 已经使用的交换区大小,如果这里有占用说明内存已经到瓶颈。

《深入理解LINUX内核》、《LINUX内核设计与实现》可以放在手边作为参考手册遇到问题翻翻。

性能排查两种方式(从上往下、从下往上)

当系统出现性能问题的时候可以从两个层面来排查问题,从上往下、从下网上,也可以综合运用这两种方法,压测的时候可以同时查看这两个纬度的信息。

一边打开 top 、free 观察 cpu 、memory 的系统级别的消耗情况,同时一边在通过 jstack 、jstat 之类的工具查看应用程序运行时的内部状态来综合定位。

总结

本篇文章主要还是从抛砖引玉的角度出发,整理下我们在做一般性能压测的时候出现的常规问题及排查方法和处理流程,并没有多么高深的技术点。

性能问题一旦出现也不会是个简单的问题,都需要花费很多精力来排查问题,运用各种工具、命令来逐步排查,而这些工具和命令所输出的信息都是系统底层原理,需要逐一去理解和实验的,并没有一个银弹能解决所有问题。

从错误码406说起

如何优雅的设计 React 组件

安卓推送“有救”了?从工信部统一推送联盟说起

Callback 与 Promise 间的桥梁 —— promisify

使用合适的设计模式一步步优化前端代码

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

zookeeper原理及应用

发表于 2017-12-07

ZooKeeper是一个分布式的,开放源码的分布式应用程序协调服务,是Google的Chubby一个开源的实现。

Apache ZooKeeper is an effort to develop and maintain an open-source server which enables highly reliable distributed coordination.

原理介绍和说明

  • 一致性算法

ZooKeeper以Fast Paxos(帕克索斯)算法为基础,让集群中的每个zk实例数据保持一致。一般部署集群,机器数设置为奇数个,更容易满足>N/2的投票条件。

  • 存储模型
1
2
3
4
5
6
复制代码/
├── app1
│ ├── p_1
│ ├── p_2
│ └── p_3
└── app2

类似操作系统文件夹的树型模型,与文件夹的区别在于,每个节点上可存储数据(不超过1M),每个节点上的数据可带版本号。

  • zNode节点类型

2个划分维度:按节点是否可持久化存储,分为持久节点与临时节点;按节点序号是否可顺序递增(类似mysql的auto_increment),分为顺序节点及非顺序节点,

注:3.6.0版本以后,还新增了Container Nodes(容器节点),该节点的特点是,如果其下的所有子节点都被删除,该节点也会在将来某个时间被删除。

ZooKeeper has the notion of container nodes. Container nodes are special purpose nodes useful for recipes such as leader, lock, etc. When the last child of a container is deleted, the container becomes a candidate to be deleted by the server at some point in the future.

持久节点 (client创建持久节点后,就算与zk断开,节点仍然保存在zk中)

持久顺序节点 (eg: /order/quartz/wm000001 , /order/quartz/wm000002, /order/quartz/wm000003...)

临时节点 (client与zk断开连接后,节点自动删除)

临时顺序节点

  • 事件监听
    • Created event (节点创建事件)
    • Deleted event (节点删除事件)
    • Changed event (节点的数据变化事件)
    -zkClient.subscribeChildChanges(); //订阅子节点的变化
    • zkClient.subscribeDataChanges();//订阅某节点的数据变化(包括数据被删除)事件
    • zkClient.subscribeStateChanges();//订单状态变化(状态包括:连接,断开,认证失败等等)

以上为常用事件,其它事件请参考官方文档。实际使用中,很少用原生写法来监听事件,而是借助一些第三方的开源zk客户端,比如zkClient来监听事件。

  • ACL(Access Control List)权限控制
+ **每个节点有5种操作权限:`Create、Read、Write、Delete、Admin` 简称`crwda`。其中:`Delete`是指对子节点是否具有删除权限,其它4种权限指对自身节点的操作权限。**
+ **身份认证方式:`world:`默认方式,无限制,全世界均能访问。`auth:` 在上下文中添加授权用户。`digest:` 用户名/密码认证`ip: ip`地址认证**

应用场景

  • 应用场景1:分布式配置

要点:配置信息保存在db与zk中(保险起见,数据安全性更高),弄一个后台管理界面,对配置修改后,先保存到db,然后同步写入zk的节点中。应用启动时,先连到zk上读取节点中的配置,同时监听节点的数据变化,当配置变化时,得到实时通知。

  • 应用场景2:消除单点故障(Single Point of Failure,SPOF)
1
2
3
4
5
复制代码└── ./OrderNoService
├── A0000001
10.0.0.1:8001
└── A0000002
10.0.0.2:8001

多个服务实例,启动时在zk上临时顺序节点,服务的调用方约定取最小节点为Master,当master挂掉后,节点自动删除,调用方得到事件通知,取新的最小节点来调用(相当于slave提升为master)

  • 应用场景3:去中心化

上图中,左边为传统中心化的架构,缺点是每次有新的服务实例加入或下线,都要调整nginx中心节点的配置(不管是人工,还是借助工具自动),不利于云时代的动态弹性调整,而且整体的可用性强依赖于中心节点,一旦中心节点(中心集群)全挂掉,系统就不可用了。

  • 应用场景4:分布式锁
1
2
3
4
5
复制代码Order
└── 3456890
├── 000001
├── 000002
└── 000003

原理:以多个程序运行实例同时在处理订单3456890为例,每个程序运行实例处理前,创建一个临时顺序节点,然后检查自己创建的节点是否为最小,如果不是,表明没抢到锁,如果是,表示抢到了锁,抢到锁的程序处理完以后,删除该节点表示释放锁。同时,其它处于等候状态的程序,为了实时得到锁的释放通知,均监听父节点Order/3456890的子节点变化,发现子节点变化时,重复刚才的检测过程,直到自己创建的节点变成最小为止。
与redis SETNX之类的分布式锁相比,zk的分布式锁,还能实现解决Top N之类的有限资源竞争问题(类似并发中的信号量)。比如:一堆程序要打印,但是只有2台打印机(或者打印队列的长度只有2,最多同时只能允许2个程序提交打印任务), 类似刚才的思路 ,可以检测最小的前2个节点,只有创建最小前2个节点的程序,才认为是拿到了信号,允许提交打印任务。

  • 应用场景5:分布式队列
1
2
3
4
5
复制代码Queue
└── Queue1
├── 000001
├── 000002
└── 000003

如上图,创建Queue/Queue1做为一个队列(或Topic),然后每创建一个顺序节点,视为一条消息(节点存储的数据即为消息内容),生产者每次创建一个新节点,做为消息发送,消费者监听Queue1的子节点变化(或定时轮询),每次取最小节点当做消费消息,处理完后,删除该节点。相当于实现了一个FIFO(先进先出)的队列。
注:zk框架强制的是CP(一致性),而非专为高并发、高性能场景设计的,如果在高并发,qps很高的情况下,分布式队列需酌情考虑。

  • 应用场景7:生成分布式唯一id
1
2
3
4
5
复制代码Order
└── OrderId
├── 000001
├── 000002
└── 000003

思路:每次要生成一个新Id时,创建一个持久顺序节点,创建操作返回的节点序号,即为新Id,然后把比自己节点小的删除即可。

终篇

目前很多开源项目,几乎都是或多或少依赖zookeeper,比如:dubbo,disconf,kafka,...

分布式环境中,zk能用于什么场景,基本上取决于开发人员的想象力!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一个简单的导弹自动追踪以及实时图片旋转算法,Python-p

发表于 2017-12-07

自动追踪算法,在我们设计2D射击类游戏时经常会用到,这个听起来很高大上的东西,其实也并不是军事学的专利,在数学上解决的话需要去解微分方程,

这个没有点数学基础是很难算出来的。但是我们有了计算机就不一样了,依靠计算机极快速的运算速度,我们利用微分的思想,加上一点简单的三角学知识,就可以实现它。

好,话不多说,我们来看看它的算法原理,看图:

        初始状态

由于待会要用pygame演示,他的坐标系是y轴向下,所以这里我们也用y向下的坐标系。

算法总的思想就是根据上图,把时间t分割成足够小的片段(比如1/1000,这个时间片越小越精确),每一个片段分别构造如上三角形,计算出导弹下一个时间片走的方向(即∠a)和走的路程(即vt=|AC|),这时候目标再在第二个时间片移动了位置,这时刚才计算的C点又变成了第二个时间片的初始点,这时再在第二个时间片上在C点和新的目标点构造三角形计算新的vt,然后进入第三个时间片,如此反复即可。

假定导弹和目标的初始状态下坐标分别是(x1,y1),(x,y),构造出直角三角形ABE,这个三角形用来求∠a的正弦和余弦值,因为vt是自己设置的,我们需要计算A到C点x和y坐标分别移动了多少,移动的值就是AD和CD的长度,这两个分别用vt乘cosa和sina即可。

计算sina和cosa,正弦对比斜,余弦邻比斜,斜边可以利用两点距离公式计算出,即:

      

于是

      

AC的长度就是导弹的速度乘以时间即 |AC|=vt,然后即可计算出AD和CD的长度,于是这一个时间片过去后,导弹应该出现在新的位置C点,他的坐标就是老的点A的x增加AD和y减去CD。

于是,新的C点坐标就是:

      

只要一直反复循环执行这个操作即可,好吧,为了更形象,把第一个时间片和第二个时间片放在一起看看:

        

第一个是时间片构造出的三角形是ABE,经过一个时间片后,目标从B点走到了D点,导弹此时在C点,于是构造新的三角形CDF,重复刚才的计算过程即可,图中的角∠b就是导弹需要旋转的角度,现实中只需要每个时间片修正导弹的方向就可以了,具体怎么让导弹改变方向,这就不是我们需要研究的问题了

好,由于最近在用Python的pygame库制作小游戏玩,接下来我们就用pygame来演示一下这个效果,效果如下图:

          

很简单的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
routeros复制代码 1 import pygame,sys
2 from math import *
3 pygame.init()
4 screen=pygame.display.set_mode((800,700),0,32)
5 missile=pygame.image.load('element/red_pointer.png').convert_alpha()
6 x1,y1=100,600 #导弹的初始发射位置
7 velocity=800 #导弹速度
8 time=1/1000 #每个时间片的长度
9 clock=pygame.time.Clock()
10 old_angle=0
11 while True:
12 for event in pygame.event.get():
13 if event.type==pygame.QUIT:
14 sys.exit()
15 clock.tick(300)
16 x,y=pygame.mouse.get_pos() #获取鼠标位置,鼠标就是需要打击的目标
17 distance=sqrt(pow(x1-x,2)+pow(y1-y,2)) #两点距离公式
18 section=velocity*time #每个时间片需要移动的距离
19 sina=(y1-y)/distance
20 cosa=(x-x1)/distance
21 angle=atan2(y-y1,x-x1) #两点线段的弧度值
22 x1,y1=(x1+section*cosa,y1-section*sina)
23 d_angle = degrees(angle) #弧度转角度
24 screen.blit(missile, (x1-missile.get_width(), y1-missile.get_height()/2))
25 dis_angle=d_angle-old_angle #dis_angle就是到下一个位置需要改变的角度
26 old_angle=d_angle #更新初始角度
27 pygame.display.update()

如果仅把导弹考虑为一个质点的话,那么以上算法就已经足矣,我没有做导弹的旋转,因为一个质点也不分头尾不需要旋转,当然这前提得是你加载的导弹图片很小的时候不旋转看起来也没什么问题。但是在pygame里面做旋转并不是一件容易的事情(也可能是我无知),好吧我们先把图片替换成一张矩形的,再加入旋转函数看看效果如何

1
2
stylus复制代码missiled = pygame.transform.rotate(missile, -(d_angle))
screen.blit(missiled, (x1-missile.get_width(), y1-missile.get_height()/2))

因为图片的坐标点是它的左上角的点,所以如果我们想让图片的坐标固定在箭头尖点,那么把图片实际打印位置x减少图片长度,y减少一半宽度就行。

但是实际运行效果并不好:

大致方向相同,但是图片箭头的尖点并没有一直跟随鼠标,这是为什么呢。经过我的研究(就因为这个问题没解决一直没发布),

我发现原来是这个图旋转的机制问题,我们看看旋转后的图片变成什么样了:

旋转后的图片变成了蓝色的那个范围,根据旋转角度的不同,所变成的图片大小也不一样,我们看旋转90的情况

我们发现,旋转后的图片不仅面积变大了,导弹头的位置也变了。那应该怎么解决这个问题呢?思路是,每一次旋转图片以后,求出旋转图的头位置(图中的绿色箭头点),然后把绿图的打印位置移动一下,下,x,y分别移动两个头的距离,就可以让旋转后的导弹头对准实际我们参与运算的那个导弹头的位置,移动后应该是这样的:

这样,两个导弹头的点就一致了。接下来我们分析求旋转后的导弹头的算法。根据旋转角度的不同,旋转角在不同象限参数不一样,所以我们分为这四种情况

1,2象限

3,4象限,它的旋转只有正负0—180,所以3,4象限就是负角

显示图片的时候我们将他移动

1
css复制代码screen.blit(missiled, (x1-width+(x1-C[0]),y1-height/2+(y1-C[1])))

这里的(x1-width,y1-height/2)其实才是上图中的(x1,y1)

所以最后我们加入相关算法代码,效果就比较完美了

大功告成,最后附上全部的算法代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
apache复制代码 1 import pygame,sys
2 from math import *
3 pygame.init()
4 font1=pygame.font.SysFont('microsoftyaheimicrosoftyaheiui',23)
5 textc=font1.render('*',True,(250,0,0))
6 screen=pygame.display.set_mode((800,700),0,32)
7 missile=pygame.image.load('element/rect1.png').convert_alpha()
8 height=missile.get_height()
9 width=missile.get_width()
10 pygame.mouse.set_visible(0)
11 x1,y1=100,600 #导弹的初始发射位置
12 velocity=800 #导弹速度
13 time=1/1000 #每个时间片的长度
14 clock=pygame.time.Clock()
15 A=()
16 B=()
17 C=()
18 while True:
19 for event in pygame.event.get():
20 if event.type==pygame.QUIT:
21 sys.exit()
22 clock.tick(300)
23 x,y=pygame.mouse.get_pos() #获取鼠标位置,鼠标就是需要打击的目标
24 distance=sqrt(pow(x1-x,2)+pow(y1-y,2)) #两点距离公式
25 section=velocity*time #每个时间片需要移动的距离
26 sina=(y1-y)/distance
27 cosa=(x-x1)/distance
28 angle=atan2(y-y1,x-x1) #两点间线段的弧度值
29 fangle=degrees(angle) #弧度转角度
30 x1,y1=(x1+section*cosa,y1-section*sina)
31 missiled=pygame.transform.rotate(missile,-(fangle))
32 if 0<=-fangle<=90:
33 A=(width*cosa+x1-width,y1-height/2)
34 B=(A[0]+height*sina,A[1]+height*cosa)
35
36 if 90<-fangle<=180:
37 A = (x1 - width, y1 - height/2+height*(-cosa))
38 B = (x1 - width+height*sina, y1 - height/2)
39
40 if -90<=-fangle<0:
41 A = (x1 - width+missiled.get_width(), y1 - height/2+missiled.get_height()-height*cosa)
42 B = (A[0]+height*sina, y1 - height/2+missiled.get_height())
43
44 if -180<-fangle<-90:
45 A = (x1-width-height*sina, y1 - height/2+missiled.get_height())
46 B = (x1 - width,A[1]+height*cosa )
47
48 C = ((A[0] + B[0]) / 2, (A[1] + B[1]) / 2)
49
50 screen.fill((0,0,0))
51 screen.blit(missiled, (x1-width+(x1-C[0]),y1-height/2+(y1-C[1])))
52 screen.blit(textc, (x,y)) #鼠标用一个红色*代替
53 pygame.display.update()

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kubernetes网络插件CNI学习整理 概要 CNI工作

发表于 2017-12-07

概要

  • 项目背景(XX银行客户):私有云上要在K8S上跑像mysql这类状态的数据库服务,对性能和延时都比较敏感,并不像web偏应用的无状态延时性能差点可接受。而基于overlay方式等网络性能和延时比较差,网络架构又比较复杂。并且银行对于IP网络管理需要简单可控。SR-IOV是基于硬件实现虚拟网卡,性能损失少,接近宿主机,此外有支持QOS,vlan等特性也是客户需要的。即要根据用户定制基于SR-IOV网络插件。
  • 问题: 目前kubernetes(1.8)(以后版本可能支持)中,POD并没有网络相关的配置,kubelete 调用 CNI plugin 默认只会以CNI_ARGS传入pod_name基本信息。如果要固定分配IP地址,以及配置QOS,vlan等网络特性,没法通过CNI_ARGS方式传入,不能像volume一样在pod SPEC中配置options可选的网络参数来传入cni plugin。
  • 一种可行解决方案:声明一个POD前,先根据pod_name来在外部configMap或其他地方存放网络配置信息,定制的CNI,IPAM的网络插件根据pod_name来从外部获取配置信息。

CNI工作原理

Kubernetes指南 cni

CNI:容器网络接口

  • 网络插件是独立的可执行文件,被上层的容器管理平台调用。网络插件只有两件事情要做:把容器加入到网络以及把容器从网络中删除。
  • 调用插件的数据通过两种方式传递:环境变量和标准输入。
  • kubernetes 使用了 CNI 网络插件之后 工作流程:
    • kubernetes 先创建 pause 容器生成对应的 network namespace
    • 调用网络 driver(因为配置的是 CNI,所以会调用 CNI 相关代码
    • CNI driver 根据配置调用具体的 cni 插件
    • cni 插件给 pause 容器配置正确的网络,pod 中其他的容器都是用 pause 的网络.

Kubernetes的网络接口CNI及灵雀云的实践

  • 运维人员视角,在传统运维工作强调对IP要有很强的管控(银行等),POD需要固定IP:
    • 于运维来说,网络方面是很重要的资源,要对IP进行强管控,服务来回飘,会让他的安全感下降很多。
    • 运维服务有很多基于IP的东西,有流量和突发的监控,如果你服务的IP一直变化,通过这个IP它很难用到这个服务,相当于IP的监控就没有意义,因为根本不知道IP流量上去了是哪个服务的,很难对应到这个事。
    • 还有是对于IP安全策略没有办法做。
    • kubelet 与 CNI plugin调用逻辑图:
      cni
  • hackers-guide-kubernetes-networking
    • Kubernetes unfortunately still supports only one CNI interface per POD with one cluster-wide configuration. This is very limiting since we may want to configure multiple network interfaces per POD, potentially using different overlay solutions
      with different policies (subnet, security, QoS).
    • Kubelet will pass the POD name and namespace as part of the CNI_ARGS variable (for example “K8S_POD_NAMESPACE=default;K8S_POD_NAME=mytests-1227152546-vq7kw;” ). We can use this to customize the network configuration per POD or POD namespace
      (e.g. put every namespace in a different subnet).
    • Future Kubernetes versions will treat networks as equal citizens and include network configuration as part of the POD or namespace spec just like memory, CPUs and volumes. For the time being, we can use annotations to store configuration
      or record POD networking data/state.
    • multus-cni
  • 从源码看kubernetes与CNI Plugin的集成
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
复制代码  //kubernetes/pkg/kubelet/network/cni/cni.go
func (plugin *cniNetworkPlugin) buildCNIRuntimeConf(podName string, podNs string, podSandboxID kubecontainer.ContainerID, podNetnsPath string) (*libcni.RuntimeConf, error) {
rt := &libcni.RuntimeConf{
ContainerID: podSandboxID.ID,
NetNS: podNetnsPath,
IfName: network.DefaultInterfaceName,
Args: [][2]string{
{"IgnoreUnknown", "1"},
{"K8S_POD_NAMESPACE", podNs},
{"K8S_POD_NAME", podName},
{"K8S_POD_INFRA_CONTAINER_ID", podSandboxID.ID},
},
}

//libcni
func (c *CNIConfig) AddNetwork(net *NetworkConfig, rt *RuntimeConf)
invoke.ExecPluginWithResult(pluginPath, net.Bytes, c.args("ADD", rt))
//将RuntimeConf.Args以环境变量方式传入。
stdoutBytes, err := e.RawExec.ExecPlugin(pluginPath, netconf, args.AsEnv())


type CNI interface {
AddNetworkList(net *NetworkConfigList, rt *RuntimeConf) (types.Result, error)
DelNetworkList(net *NetworkConfigList, rt *RuntimeConf) error

AddNetwork(net *NetworkConfig, rt *RuntimeConf) (types.Result, error)
DelNetwork(net *NetworkConfig, rt *RuntimeConf) error
}

type RuntimeConf struct {
ContainerID string
NetNS string
IfName string
Args [][2]string
// A dictionary of capability-specific data passed by the runtime
// to plugins as top-level keys in the 'runtimeConfig' dictionary
// of the plugin's stdin data. libcni will ensure that only keys
// in this map which match the capabilities of the plugin are passed
// to the plugin
CapabilityArgs map[string]interface{}
}

CNI开发参考资源

  • 深入理解CNI
  • CNI spec 文档
  • 官方维护的plugin插件: plugins
  • 用cnitool 调试自己编写的plugin:cnitool
  • 用脚本运行容器测试自己plugin:docker-run.sh
  • 官方plugin sample:sample

kubelet原理

  • Kubernetes指南 kubelet
  • kubernetes 简介:kubelet 和 pod
    • 它的核心工作是监听 apiserver,配置目录(默认/etc/kubernetes/manifests/)等清单,一旦发现当前节点的 pod 配置发生变化,就根据最新的配置执行响应的动作,保证运行的 pod 状态和期望的一致。
      • 如果发现本地的Pod被修改,则Kubelet会做出相应的修改,比如删除Pod中某个容器时,则通过Docker Client删除该容器。 如果发现删除本节点的Pod,则删除相应的Pod,并通过Docker Client删除Pod中的容器。
    • 定时汇报当前节点的状态给 apiserver,以供调度的时候使用,通过cAdvisor监控节点和容器的资源。
    • 用“kubernetes/pause”镜像为每个Pod创建一个容器。Pause容器用于接管Pod中所有其他容器的网络。每创建一个新的Pod,Kubelet都会先创建一个Pause容器,然后创建其他容器。

kubelet源码分析

kubelet 源码分析:启动流程 (v1.5.0版本)

  • 解析参数配置信息等初始化准备后, 创建kubeDeps这个对象:
    • 其实它内部保存了 kubelet 各个重要组件的对象,之所以要把它作为参数传递,是为了实现 dependency injection。简单地说,就是把 kubelet 依赖的组件对象作为参数传进来,这样可以控制 kubelet 的行为。比如在测试的时候,只要构建 fake 的组件实现,就能很轻松进行测试。KubeDeps 包含的组件很多,下面列出一些:
    • CAdvisorInterface:提供 cAdvisor 接口功能的组件,负责收集容器的监控信息
    • DockerClient:docker 客户端,用来和 docker 交互
    • KubeClient:apiserver 客户端,用来和 api server 通信
    • Mounter:执行 mount 相关操作
    • NetworkPlugins:网络插件,执行网络设置工作
    • VolumePlugins:volume 插件,执行 volume 设置工作
  • RunKubelet 函数:
    • 初始化各个对象,比如 eventBroadcaster,这样就能给 apiserver 发送 kubelet 的事件
    • 通过 builder 创建出来 Kubelet对象(pkg/kubelet/kubelet.go#NewMainKubelet)
    • 根据运行模式,运行 Kubelet对象,各种组件以 goroutine运行启动
      • 异步事件驱动:syncLoop 是 kubelet 的主循环方法,它从不同的管道(文件、URL 和 apiserver)监听变化,并把它们汇聚起来。当有新的变化发生时,它会调用对应的处理函数,保证 pod 处于期望的状态。
  • kubelet对象 中包含的重要对象:
    • podConfig:这个对象里面会从文件、网络和 apiserver 三个来源中汇聚节点要运行的 pod 信息,并通过管道发送出来,读取这个管道就能获取实时的 pod 最新配置
    • runtime:容器运行时,对容器引擎(docker 或者 rkt)的一层封装,负责调用容器引擎接口管理容器的状态,比如启动、暂停、杀死容器等
    • probeManager:如果 pod 配置了状态监测,那么 probeManager 会定时检查 pod 是否正常工作,并通过 statusManager 向 apiserver 更新 pod 的状态
    • volumeManager:负责容器需要的 volume 管理。检测某个 volume 是否已经 mount、获取 pod 使用的 volume 等
    • podWorkers:具体的执行者,每次有 pod 需要更新的时候都会发送给它
    • podManager:缓存了 pod 的信息,是所有需要该信息都会去访问的地方
    • nodeLister:能够读取 apiserver 中节点的信息

kubelet源码分析-启动运行与信息处理
kubelet 源码分析:pod 新建流程(v1.5.0版本):

  • pod的创建顺序留意点:
    • 创建 pod 的数据目录,存放 volume 和 plugin 信息
    • 如果定义了 PV,等待所有的 volume mount 完成(volumeManager 会在后台做这些事情)
    • 如果有 image secrets,去 apiserver 获取对应的 secrets 数据
    • 调用 container runtime 的 SyncPod 方法,去实现真正的容器创建逻辑
      • 通过 docker 创建出来一个运行的 pause 容器(Pause容器用于接管Pod中所 有其他容器的网络)。
      • 网络配置:如果 pod 是主机模式,容器也是;其他情况下,容器会使用 None 网络模式,让 kubelet 的网络插件自己进行网络配置。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

go语言死循环分析

发表于 2017-12-07

最近看了一篇文章,如何定位 golang 进程 hang 死的 bug,里面有这样一段代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
复制代码package main
import (
"fmt"
"io"
"log"
"net/http"
"runtime"
"time"
)
func main() {
runtime.GOMAXPROCS(runtime.NumCPU())
go server()
go printNum()
var i = 1
for {
// will block here, and never go out
i++
}
fmt.Println("for loop end")
time.Sleep(time.Second * 3600)
}
func printNum() {
i := 0
for {
fmt.Println(i)
i++
}
}
func HelloServer(w http.ResponseWriter, req *http.Request) {
fmt.Println("hello world")
io.WriteString(w, "hello, world!\n")
}
func server() {
http.HandleFunc("/", HelloServer)
err := http.ListenAndServe(":12345", nil)
if err != nil {
log.Fatal("ListenAndServe: ", err)
}
}

运行,会发现打印一会儿数字后停了,我们执行

1
复制代码curl localhost:12345

程序卡死。关于程序挂在哪里借助dlv是很好定位的:

1
复制代码dlv debug hang.go

进去之后运行程序,打印停止进入卡死状态,我们执行ctrl C,dlv会显示断开的地方:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码received SIGINT, stopping process (will not forward signal)> main.main() ./hang.go:17 (PC: 0x12dd7c8)
12: func main() {
13: runtime.GOMAXPROCS(runtime.NumCPU())
14: go server()
15: go printNum()
16: var i = 1
=> 17: for {
18: // will block here, and never go out
19: i++
20: }
21: fmt.Println("for loop end")
22: time.Sleep(time.Second * 3600)
(dlv)

但是我还是不明白,不明白的地方主要是因为:

  • 我又看了两篇文章Goroutine调度实例简要分析和也谈goroutine调度器,是同一位作者Tony Bai写的,写得非常好。第二篇文章解释了goroutine的调度和cpu数量的关系(不多加解释,建议大家看看),我的mac是双核四线程(这里不明白的同学自行google
    cpu 超线程),go版本是1.9,理论上讲可以跑4个goroutine而不用考虑死循环,一个死循环最多把一个cpu打死,上面的代码中只有3个goroutine,而且他们看上去都挂住了。
  • 上面说的理论上讲,不是我主观臆测的,我跑了1中第一篇文章中的一个例子:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码package main
import (
"fmt"
"time"
)
func deadloop() {
for {
}
}
func main() {
go deadloop()
for {
time.Sleep(time.Second * 1)
fmt.Println("I got scheduled!")
}
}

上面代码有两个goroutine,一个是main goroutine,一个是deadloop goroutine,跑得时候deadloop gouroutine不会对main goroutine造成影响,打印一直在持续,作者的文章解释了原因。

  • 如何定位 golang 进程 hang 死的 bug这篇文章提到了gcwaiting,然而没有解释。

在如何定位 golang 进程 hang 死的 bug有这样一段话:

因为在 for 循环中没有函数调用的话,编译器不会插入调度代码,所以这个执行 for 循环的 goroutine 没有办法被调出,而在循环期间碰到 gc,那么就会卡在 gcwaiting 阶段,并且整个进程永远 hang 死在这个循环上。并不再对外响应。

这个其实就是我们的第一段代码卡死的原因,也是我们第二段代码没有卡死的原因,就是在gc上!

我们再看一篇文章,golang的垃圾回收(GC)机制,这篇文章很短,但每句话都很重要:

  1. 设置gcwaiting=1,这个在每一个G任务之前会检查一次这个状态,如是,则会将当前M 休眠;
  2. 如果这个M里面正在运行一个长时间的G任务,咋办呢,难道会等待这个G任务自己切换吗?这样的话可要等10ms啊,不能等!坚决不能等!
    所以会主动发出抢占标记(类似于上一篇),让当前G任务中断,再运行下一个G任务的时候,就会走到第1步

那么如果这时候运行的是没有函数调用的死循环呢,gc也发出了抢占标记,但是如果死循环没有函数调用,就没有地方被标记,无法被抢占,那就只能设置gcwaiting=1,而M没有休眠,stop the world卡住了(死锁),gcwaiting一直是1,整个程序都卡住了!

这里其实已经解释了第一份代码的现象,第二份代码为什么没有hang住相信大家也能猜到了:代码里没有触发gc!我们来手动触发一下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
复制代码package main
import (
"fmt"
"log"
"net/http"
_ "net/http/pprof"
// "runtime"
"time"
)
func deadloop() {
for {
}
}
func main() {
go func() {
log.Println(http.ListenAndServe("localhost:6060", nil))
}()
go deadloop()
i := 3
for {
time.Sleep(time.Second * 1)
i--
fmt.Println("I got scheduled!")
if i == 0 {
runtime.GC()
}
}
}

会发现打印了3行之后,程序也卡死了,bingo🎉

我们来看看gcwaiting是不是等于1:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码$ go build hang2.go
$ GODEBUG="schedtrace=300,scheddetail=1" ./hang2
SCHED 2443ms: gomaxprocs=4 idleprocs=3 threads=7 spinningthreads=0 idlethreads=2 runqueue=0 gcwaiting=0 nmidlelocked=0 stopwait=0 sysmonwait=0
P0: status=1 schedtick=4 syscalltick=5 m=5 runqsize=0 gfreecnt=1
P1: status=0 schedtick=14 syscalltick=0 m=-1 runqsize=0 gfreecnt=0
P2: status=0 schedtick=3 syscalltick=4 m=-1 runqsize=0 gfreecnt=0
......
SCHED 2751ms: gomaxprocs=4 idleprocs=0 threads=7 spinningthreads=0 idlethreads=2 runqueue=0 gcwaiting=1 nmidlelocked=0 stopwait=1 sysmonwait=0
P0: status=1 schedtick=4 syscalltick=5 m=5 runqsize=0 gfreecnt=1
P1: status=3 schedtick=14 syscalltick=0 m=-1 runqsize=0 gfreecnt=0
P2: status=3 schedtick=3 syscalltick=10 m=-1 runqsize=0 gfreecnt=0
P3: status=3 schedtick=1 syscalltick=26 m=0 runqsize=0 gfreecnt=0
M6: p=-1 curg=-1 mallocing=0 throwing=0 preemptoff= locks=0 dying=0 helpgc=0 spinning=false blocked=false lockedg=-1
M5: p=0 curg=19 mallocing=0 throwing=0 preemptoff= locks=0 dying=0 helpg

代码诚不欺我也!

参考资料

  • 如何定位 golang 进程 hang 死的 bug
  • Goroutine调度实例简要分析
  • 也谈goroutine调度器
  • golang的垃圾回收(GC)机制

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

源码 从串行线程封闭到对象池、线程池 线程封闭与串行线程封闭

发表于 2017-12-07

今天讲一个牛逼而实用的概念,串行线程封闭。对象池是串行线程封闭的典型应用场景;线程池糅合了对象池技术,但核心实现不依赖于对象池,很容易产生误会。本文从串行线程封闭和对象池入手,最后通过源码分析线程池的核心原理,厘清对象池与线程池之间的误会。

JDK版本:oracle java 1.8.0_102

线程封闭与串行线程封闭

线程封闭

线程封闭是一种常见的线程安全设计策略:仅在固定的一个线程内访问对象,不对其他线程共享。

使用线程封闭技术,对象O始终只对一个线程T1可见,“单线程”中自然不存在线程安全的问题。

ThreadLocal是常用的线程安全工具,见源码|ThreadLocal的实现原理。线程封闭在Servlet及高层的web框架Spring等中应用不少。

串行线程封闭

线程封闭虽然好用,却限制了对象的共享。串行线程封闭改进了这一点:对象O只能由单个线程T1拥有,但可以通过安全的发布对象O来转移O的所有权;在转移所有权后,也只有另一个线程T2能获得这个O的所有权,并且发布O的T1不会再访问O。

所谓“所有权”,指修改对象的权利。

相对于线程封闭,串行线程封闭使得任意时刻,最多仅有一个线程拥有对象的所有权。当然,这不是绝对的,只要线程T1事实不会再修改对象O,那么就相当于仅有T2拥有对象的所有权。串行线层封闭让对象变得可以共享(虽然只能串行的拥有所有权),灵活性得到大大提高;相对的,要共享对象就涉及安全发布的问题,依靠BlockingQueue等同步工具很容易实现这一点。

对象池是串行线程封闭的经典应用场景,如数据库连接池等。

对象池

对象池利用了串行封闭:将对象O“借给”一个请求线程T1,T1使用完再交还给对象池,并保证“未擅自发布该对象”且“以后不再使用”;对象池收回O后,等T2来借的时候再把它借给T2,完成对象所有权的传递。

猴子撸了一个简化版的线程池,用户只需要覆写newObject()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
复制代码public abstract class AbstractObjectPool<T> {
protected final int min;
protected final int max;
protected final List<T> usings = new LinkedList<>();
protected final List<T> buffer = new LinkedList<>();
private volatile boolean inited = false;

public AbstractObjectPool(int min, int max) {
this.min = min;
this.max = max;
if (this.min < 0 || this.min > this.max) {
throw new IllegalArgumentException(String.format(
"need 0 <= min <= max <= Integer.MAX_VALUE, given min: %s, max: %s", this.min, this.max));
}
}

public void init() {
for (int i = 0; i < min; i++) {
buffer.add(newObject());
}
inited = true;
}

protected void checkInited() {
if (!inited) {
throw new IllegalStateException("not inited");
}
}

abstract protected T newObject();

public synchronized T getObject() {
checkInited();

if (usings.size() == max) {
return null;
}
if (buffer.size() == 0) {
T newObj = newObject();
usings.add(newObj);
return newObj;
}
T oldObj = buffer.remove(0);
usings.add(oldObj);
return oldObj;
}

public synchronized void freeObject(T obj) {
checkInited();
if (!usings.contains(obj)) {
throw new IllegalArgumentException(String.format("obj not in using queue: %s", obj));
}

usings.remove(usings.indexOf(obj));
buffer.add(obj);
}
}

AbstractObjectPool具有以下特性:

  • 支持设置最小、最大容量
  • 对象一旦申请就不再释放,避免了GC

虽然很简单,但大可以用于一些时间敏感、资源充裕的场景。如果时间进一步敏感,可将getObject()、freeObject()改写为并发程度更高的版本,但记得保证安全发布安全回收;如果资源不那么充裕,可以适当增加对象回收策略。

可以看到,一个对象池的基本行为包括:

  • 创建对象newObject()
  • 借取对象getObject()
  • 归还对象freeObject()

典型的对象池有各种连接池、常量池等,应用非常多,模型也大同小异,不做解析。令人迷惑的是线程池,很容易让人误以为线程池的核心原理也是对象池,下面来追一遍源码。

线程池

首先摆出结论:线程池糅合了对象池模型,但核心原理是生产者-消费者模型。

继承结构如下:

image.png

用户可以将Runnable(或Callables)实例提交给线程池,线程池会异步执行该任务,返回响应的结果(完成/返回值)。

猴子最喜欢的是submit(Callable<T> task)方法。我们从该方法入手,逐步深入函数栈,探究线程池的实现原理。

submit()

submit()方法在ExecutorService接口中定义,AbstractExecutorService实现,ThreadPoolExecutor直接继承。

1
2
3
4
5
6
7
8
9
10
复制代码public abstract class AbstractExecutorService implements ExecutorService {
...
public <T> Future<T> submit(Callable<T> task) {
if (task == null) throw new NullPointerException();
RunnableFuture<T> ftask = newTaskFor(task);
execute(ftask);
return ftask;
}
...
}

AbstractExecutorService#newTaskFor()创建一个RunnableFuture类型的FutureTask。

核心是execute()方法。

execute()

execute()方法在Executor接口中定义,ThreadPoolExecutor实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}

我们暂且忽略线程池的池化策略。关注一个最简单的场景,看能不能先回答一个问题:线程池中的任务如何执行?

核心是addWorker()方法。以8行的参数为例,此时,线程池中的线程数未达到最小线程池大小corePoolSize,通常可以直接在9行返回。

addWorker()

简化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
复制代码public class ThreadPoolExecutor extends AbstractExecutorService {
...
private boolean addWorker(Runnable firstTask, boolean core) {
boolean workerStarted = false;
boolean workerAdded = false;
Worker w = null;
try {
w = new Worker(firstTask);
final Thread t = w.thread;
if (t != null) {
final ReentrantLock mainLock = this.mainLock;
mainLock.lock();
try {
int rs = runStateOf(ctl.get());
if (rs < SHUTDOWN) {
workers.add(w);
workerAdded = true;
}
} finally {
mainLock.unlock();
}
if (workerAdded) {
t.start();
workerStarted = true;
}
}
} finally {
if (! workerStarted)
addWorkerFailed(w);
}
return workerStarted;
}
...
}

我去掉了很多用于管理线程池、维护线程安全的代码。假设线程池未关闭,worker(即w,下同)添加成功,则必然能够将worker添加至workers中。workers是一个HashSet:

1
复制代码private final HashSet<Worker> workers = new HashSet<Worker>();

哪里是对象池?

如果说与对象池有关,那么workers即相当于示例代码中的using,应用了对象池模型;只不过这里的using是一直增长的,直到达到最大线程池大小maximumPoolSize。

但是很明显,线程池并没有将线程发布出去,workers也仅仅完成using“保存线程”的功能。那么,线程池中的任务如何执行呢?跟线程池有没有关系?

哪里又不是?

注意9、17、24行:

  • 9行将我们提交到线程池的firstTask封装入一个worker。
  • 17行将worker加入workers,维护起来
  • 24行则启动了worker中的线程t

核心在与这三行,但线程池并没有直接在addWorker()中启动任务firstTask,代之以启动一个worker。最终任务必然被启动,那么我们继续看Worker如何启动这个任务。

Worker

Worker实现了Runnable接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码private final class Worker
extends AbstractQueuedSynchronizer
implements Runnable
{
...
Worker(Runnable firstTask) {
setState(-1); // inhibit interrupts until runWorker
this.firstTask = firstTask;
this.thread = getThreadFactory().newThread(this);
}

/** Delegates main run loop to outer runWorker */
public void run() {
runWorker(this);
}
...
}

为什么要将构造Worker时的参数命名为firstTask?因为当且仅当需要建立新的Worker以执行任务task时,才会调用构造函数。因此,任务task对于新Worker而言,是第一个任务firstTask。

Worker的实现非常简单:将自己作为Runable实例,构造时在内部创建并持有一个线程thread。Thread和Runable的使用大家很熟悉了,核心是Worker的run方法,它直接调用了runWorker()方法。

runWorker()

敲黑板!!!

重头戏来了。简化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码public class ThreadPoolExecutor extends AbstractExecutorService {
...
final void runWorker(Worker w) {
Thread wt = Thread.currentThread();
Runnable task = w.firstTask;
w.firstTask = null;
w.unlock(); // allow interrupts
boolean completedAbruptly = true;
try {
while (task != null || (task = getTask()) != null) {
w.lock();
try {
beforeExecute(wt, task);
Throwable thrown = null;
try {
task.run();
} catch (RuntimeException x) {
thrown = x; throw x;
} catch (Error x) {
thrown = x; throw x;
} catch (Throwable x) {
thrown = x; throw new Error(x);
} finally {
afterExecute(task, thrown);
}
} finally {
task = null;
w.completedTasks++;
w.unlock();
}
}
completedAbruptly = false;
} finally {
processWorkerExit(w, completedAbruptly);
}
}
...
}

我们在前面将要执行的任务赋值给firstTask,5-6行首先取出任务task,并将firstTask置为null。因为接下来要执行task,firstTask字段就没有用了。

重点是10-31行的while循环。下面分情况讨论。

case1:第一次进入循环,task不为null

case1对应前面作出的诸多假设。

第一次进入循环时,task==firstTask,不为null,使10行布尔短路直接进入循环;从而16行执行的是firstTask的run()方法;异常处理不表;最后,finally代码块中,task会被置为null,导致下一轮循环会进入case2。

case2:非第一次进入循环,task为null

case2是更普遍的情况,也就是线程池的核心。

case1中,task被置为了null,使10行布尔表达式执行第二部分(task = getTask()) != null(getTask()稍后再讲,它返回一个用户已提交的任务)。假设task得到了一个已提交的任务,从而16行执行的是新获得的任务task的run()方法。后同case1,最后task仍然会被置为null,以后循环都将进入case2。

getTask()

任务从哪来呢?简化如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
复制代码public class ThreadPoolExecutor extends AbstractExecutorService {
...
private Runnable getTask() {
boolean timedOut = false;

for (;;) {
try {
Runnable r = timed ?
workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
workQueue.take();
if (r != null)
return r;
timedOut = true;
} catch (InterruptedException retry) {
timedOut = false;
}
}
}
...
}

我们先看最简单的,19-28行。

首先,workQueue是一个线程安全的BlockingQueue,大部分时候使用的实现类是LinkedBlockingQueue,见源码|并发一枝花之BlockingQueue:

1
复制代码private final BlockingQueue<Runnable> workQueue;

假设timed为false,则调用阻塞的take()方法,返回的r一定不是null,从而12行退出,将任务交给了某个worker线程。

一个小细节有点意思:前面每个worker线程runWorker()方法时,在循环中加锁粒度在worker级别,直接使用的lock同步;但因为每一个woker都会调用getTask(),考虑到性能因素,源码中getTask()中使用乐观的CAS+SPIN实现无锁同步。关于乐观锁和CAS,可以参考我的另一篇文章源码|并发一枝花之ConcurrentLinkedQueue【伪】。

workQueue中的元素从哪来呢?这就要回顾execute()方法了。

execute()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码public class ThreadPoolExecutor extends AbstractExecutorService {
...
public void execute(Runnable command) {
if (command == null)
throw new NullPointerException();
int c = ctl.get();
if (workerCountOf(c) < corePoolSize) {
if (addWorker(command, true))
return;
c = ctl.get();
}
if (isRunning(c) && workQueue.offer(command)) {
int recheck = ctl.get();
if (! isRunning(recheck) && remove(command))
reject(command);
else if (workerCountOf(recheck) == 0)
addWorker(null, false);
}
else if (!addWorker(command, false))
reject(command);
}
...
}

前面以8行的参数为例,此时,线程池中的线程数未达到最小线程池大小corePoolSize,通常可以直接在9行返回。进入8行的条件是“当前worker数小于最小线程池大小corePoolSize”。

如果不满足,会继续执行到12行。isRunning(c)判断线程池是否未关闭,我们关注未关闭的情况;则会继续执行布尔表达式的第二部分workQueue.offer(command),尝试将任务command放入队列workQueue。

workQueue.offer()的行为取决于线程池持有的BlockingQueue实例。Executors.newFixedThreadPool()、Executors.newSingleThreadExecutor()创建的线程池使用LinkedBlockingQueue,而Executors.newCachedThreadPool()创建的线程池则使用SynchronousQueue。以LinkedBlockingQueue为例,创建时不配置容量,即创建为无界队列,则LinkedBlockingQueue#offer()永远返回true,从而进入12-18行。

更细节的内容不必关心了,当workQueue.offer()返回true时,已经将任务command放入了队列workQueue。当未来的某个时刻,某worker执行完某一个任务之后,会从workQueue中再取出一个任务继续执行,直到线程池关闭,直到海枯石烂。

CachedThreadPool是一种无界线程池,使用SynchronousQueue能进一步提升性能,简化代码结构。留给读者分析。

case2小结

可以看到,实际上,线程池的核心原理与对象池模型无关,而是生产者-消费者模型:

image.png

  • 生产者(调用submit()或execute()方法)将任务task放入队列
  • 消费者(worker线程)循环从队列中取出任务处理任务(执行task.run())。

钩子方法

回到runWorker()方法,在执行任务的过程中,线程池保留了一些钩子方法,如beforeExecute()、afterExecute()。用户可以在实现自己的线程池时,可以通过覆写钩子方法为线程池添加功能。

但猴子不认为钩子方法是一种好的设计。因为钩子方法大多依赖于源码实现,那么除非了解源码或API声明绝对的严谨正确,否则很难正确使用钩子方法。等发生错误时再去了解实现,可能就太晚了。说到底,还是不要使用类似extends这种表达“扩展”语义的语法来实现继承,详见Java中如何恰当的表达“继承”与“扩展”的语义?。

当然,钩子方法也是极其方便的。权衡看待。

总结

相对于线程封闭,串行线程封闭离用户的距离更近一些,简单灵活,实用性强,很容易掌握。而线程封闭更多沦为单纯的设计策略,单纯使用线程封闭的场景不多。

线程池与串行线程封闭、对象池的关系不大,但经常被混为一谈;没看过源码的很难想到其实现方案,面试时也能立分高下。

线程池的实现很有意思。在追源码之前,猴子一直以为线程池就是把线程存起来,用的时候取出来执行任务;看了源码才知道实现如此之妙,简洁优雅效率高。源码才是最好的老师。


本文链接:源码|从串行线程封闭到对象池、线程池

作者:猴子007

出处:monkeysayhi.github.io

本文基于知识共享署名-相同方式共享 4.0国际许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名及链接。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java Executor 框架最佳实践 开发者秘籍_开

发表于 2017-12-07

在 JDK 5 中发布了 Executors framework(java.util.concurrent.Executor)来运行 Runnable 对象,不用像之前那样每次都创建新的线程,大多数情况下会复用已经创建的线程。

我们都知道在java中创建线程有两种方法。如果你想更多地阅读他们的比较,阅读这篇文章。在java中创建线程是一个非常昂贵的过程,它还包括内存开销。因此,如果我们可以重复使用这些线程,来运行 Runnable 可以更节省资源。

在本文中,我将编写一些示例程序,演示如何使用 Executor 框架,然后我们将讨论在设计多线程程序时需要记住的一些最佳实践。

如果你想了解更多关于其他多线程方面的信息,请点击此链接。

基本使用示例

在我们的示例程序中,有两个任务运行,预计都不会停止运行,并且两个任务都在固定的环境下运行。 我将尝试编写一个包装类,以便于:

  1. 如果任何任务引发异常,程序将捕获它并重新启动任务。
  2. 如果任何任务运行完成,程序将通知并重新启动任务。

以下是程序的代码示例:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
复制代码package com.devcheats.multithreading.executors;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;

public class DemoExecutorUsage {

private static ExecutorService executor = null;
private static volatile Future taskOneResults = null;
private static volatile Future taskTwoResults = null;

public static void main(String[] args) {
executor = Executors.newFixedThreadPool(2);
while (true) {
try {
checkTasks();
Thread.sleep(1000);
} catch (Exception e) {
System.err.println("Caught exception: " + e.getMessage());
}
}
}

private static void checkTasks() throws Exception {
if (taskOneResults == null
|| taskOneResults.isDone()
|| taskOneResults.isCancelled()) {
taskOneResults = executor.submit(new TestOne());
}

if (taskTwoResults == null
|| taskTwoResults.isDone()
|| taskTwoResults.isCancelled()) {
taskTwoResults = executor.submit(new TestTwo());
}
}
}

class TestOne implements Runnable {
public void run() {
while (true) {
System.out.println("Executing task one");
try {
Thread.sleep(1000);
} catch (Throwable e) {
e.printStackTrace();
}
}

}
}

class TestTwo implements Runnable {
public void run() {
while (true) {
System.out.println("Executing task two");
try {
Thread.sleep(1000);
} catch (Throwable e) {
e.printStackTrace();
}
}
}
}

请不要忘记在文章末尾阅读最佳实践。

在单个线程中执行多个任务

每个Runnable都不必在单独的线程中执行。有时,我们需要在单个线程中执行多个任务,每个任务都是Runnable的实例。要设计这种类型的解决方案,应该使用多运行。这个多可运行只是一个需要执行的可运行集合。除此之外,这个多runnable也是一个Runnable本身。

以下是需要在单个线程中执行的任务列表。

不需要在单独的线程中执行每一个 Runnable。有时候我们需要在一个线程中执行多个任务,每个任务都是一个 Runnable 实例。 要设计这种类型的程序,应该创建多个 Runnable,这些 Runnable 是需要被执行的任务集合。 另外,这个 Runnable 集合本身也是 Runnable 实例。

下面是需要在单个线程中执行多个任务的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
复制代码package com.devcheats.multithreading.executors;

public class TaskOne implements Runnable {
@Override
public void run() {
System.out.println("Executing Task One");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public class TaskTwo implements Runnable {
@Override
public void run() {
System.out.println("Executing Task Two");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public class TaskThree implements Runnable {
@Override
public void run() {
System.out.println("Executing Task Three");
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

我们创建一个多任务运行的包装器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码package com.devcheats.demo.multithreading;

import java.util.List;

public class MultiRunnable implements Runnable {

private final List<Runnable> runnables;

public MultiRunnable(List<Runnable> runnables) {
this.runnables = runnables;
}

@Override
public void run() {
for (Runnable runnable : runnables) {
new Thread(runnable).start();
}
}
}

现在上面的多线程可以像下面的程序这样执行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
复制代码package com.devcheats.demo.multithreading;

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.RejectedExecutionHandler;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

public class MultiTaskExecutor {

public static void main(String[] args) {

BlockingQueue<Runnable> worksQueue = new ArrayBlockingQueue<Runnable>(10);
RejectedExecutionHandler rejectionHandler = new RejectedExecutionHandelerImpl();
ThreadPoolExecutor executor = new ThreadPoolExecutor(3, 3, 10, TimeUnit.SECONDS, worksQueue, rejectionHandler);

executor.prestartAllCoreThreads();

List<Runnable> taskGroup = new ArrayList<Runnable>();
taskGroup.add(new TestOne());
taskGroup.add(new TestTwo());
taskGroup.add(new TestThree());

worksQueue.add(new MultiRunnable(taskGroup));
}
}

class RejectedExecutionHandelerImpl implements RejectedExecutionHandler {
@Override
public void rejectedExecution(Runnable runnable, ThreadPoolExecutor executor) {
System.out.println(runnable.toString() + " : I've been rejected ! ");
}
}

必须遵守的最佳实践

  1. 总是使用一些静态分析工具来运行你的Java代码,比如 PMD 和 FindBugs 来发现深层的问题,它们对决定未来可能出现的复杂场景更有帮助。
  2. 总是和高级开发人员相互进行代码审查,以便于在代码中发现死锁或活锁,在程序中添加一个健康监视器来检查运行任务的状态,大多数场景中这样做是好的方式。
  3. 编写多线程应用时要养成捕获错误的习惯,不仅仅是异常。有时会发生意想不到的事情,可能Java会向你抛出一个Error,它不是一个异常。
  4. 使用回滚开关,如果程序出错的时候可能是无法恢复的,你不必急于启动另一个循环来升级这种情况,相反应该等待程序恢复后再重新开始。
  5. 请注意,Executors 的核心是抽象出执行的细节,因此除非明确说明,否则不保证排序。

祝你学习愉快!


  • 上一篇:使用内部类进行异常处理
  • 下一篇:考虑迁移旧系统的5个理由
  • 系列目录:Java编程最佳实践

原文出处:https://howtodoinjava.com/core-java/multi-threading/java-executor-framework-tutorial-and-best-practices

由 王爵nice 翻译

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL入门学习笔记——七周数据分析师实战作业

发表于 2017-12-07

本篇推送主要涉及SQL语言中较为复杂的子查询与函数嵌套。

虽然这个MySQL系列取名为MySQL基础入门,但是个人不打算做单个函数的用法总结,或者说简单罗列,(这些内容你可以通过很多途径了解)因为一方面以前有过SQL基础方面的学习经历(本科的计算机必修课以及计算机等级考试)现在应该更加深入一些,另一方面SQL是一门数据分析语言,单纯的一个两个函数基本很少能解决问题。

SQL语言不像R语言和Python那种面向对象的语言,提供了各种灵活多变的的可用方法以及成千上万的高效解决工具,更没有提供像管道函数那样的参数传递工具,所以多重任务想要一次性解决大多数时候需要借助子查询和函数嵌套。

(如果你是第一次接触SQL语言,最好能够通过浏览一两本入门书或者系统了解一下SQL的查询语法之后再来看此文)

本文的练习数据素材取自天善智能大数据模块的畅销课程——“七周成为数据分析师”,主讲老师是在职场混迹多年的数据大咖,老司机秦路老师。

秦老师的课程针对数据分析师所需要的业务知识、分析技能、编程技能等各个模块都做了非常精彩的总结和案例分享,推荐喜欢或者感兴趣的小伙伴儿入手。

edu.hellobi.com/course/205

由于是付费课程,这里不便提供原始数据,还请各位见谅,但是文中所有的代码输出均会在适当的地方提供数据预览和字段描述,此文仅是我学习其中的MySQL模块的课程大作业,用自己的思路实现一遍,同时又按照老师的思路整理出代码,通过思路的对比查漏补缺、提升sql的代码实践能力。

同时我会把这份大作业使用R语言和Python中的常用分析工具实现,这样读者可以对比三种工具之间实现相同需求的过程差异以及各自优缺点,加深数据处理过程的理解。

首先大致介绍这两份数据:

1
2
3
4
5
6
7
8
9
10
11
12
复制代码userinfo  客户信息表
userId 客户id
gender 性别
brithday 出生日期


orderinfo 订单信息表
orderId 订单序号(虚拟主键)
userId 客户id
isPaid 是否支付
price 商品价格
paidTime 支付时间

以上两个表格是本次分析的主要对象,其中匹配字段是userId。

本次分析的五个问题:

1、统计不同月份的下单人数;
2、统计用户三月份回购率和复购率
3、统计男女用户消费频次是否有差异
4、统计多次消费的用户,第一次和最后一次消费间隔是多少?
5、统计不同年龄段用户消费金额是否有差异
6、统计消费的二八法则,消费的top20%用户,贡献了多少额度?

1、统计不同月份的下单人数;

第一道题目比较简单,仅需将日期字段通过日期函数转换为月份标签,然后根据月份标签聚合出单月下单的人数即可!

我的思路是使用DATE_FORMAT函数输出购买记录的月度标签,然后使用聚合函数group by函数对月度标签进行聚合(计数),使用count计数时要考虑重复购买的情况,进行客户去重,获取真实人数。

1
2
3
4
5
6
7
8
9
复制代码SELECT
DATE_FORMAT(paidTIme, '%Y-%m') AS MT,
count(DISTINCT userId) AS scale
FROM
orderinfo
WHERE
isPaid = '已支付'
GROUP BY
MT

因为购买日期字段都是同一个年份的,所有老师直接使用MONTH函数,这样更加简便!

1
2
3
4
5
6
7
8
9
复制代码SELECT
MONTH (paidTIme) AS MT,
count(DISTINCT userId) AS scale
FROM
orderinfo
WHERE
isPaid = '已支付'
GROUP BY
MT


2、统计用户三月份回购率和复购率

第二道题目需要理解回购率和复购率的业务含义(我之前都搞混了,后来去百度查的),复购率等于当月消费者中消费次数多于一次的人数占比,回购率则是上一个月消费者中在当月再次消费的占比。

计算复购率(复购率的计算思路,自己的与老师的差不多):

先计算三月份购买人数,并作为一个子查询返回,外层查询使用count+if函数计算大于一次消费的购买者人数,将其与总人数相除,即可得到复购率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
复制代码SELECT
COUNT(if (thr.scale!=1,1,null)) as useful,
COUNT(*) as Fulln,
COUNT(if( thr.scale!=1,1,null))/count(*) as ratio
FROM
(
SELECT
userId,
count(userId) AS scale
FROM
db1.orderinfo
WHERE
MONTH (paidTime) = 3
AND isPaid = '已支付'
GROUP BY
userId
) AS thr


计算回购率(自己的思路):

对三月份购买者进行去重,使用count计算三月份购买者中有多少出现在四月份购买者中(通过在where中使用子查询作为过滤条件),将返回结果记录数与三月份购买者总人数相除即可得到回购率。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
复制代码SELECT 
count(DISTINCT userId) AS allfunn,
count(DISTINCT userId) / (
SELECT
count(DISTINCT userId)
FROM
orderinfo
WHERE
MONTH (paidTime) = 3
AND isPaid = '已支付'
) AS ratio
FROM
db1.orderinfo
WHERE
MONTH (paidTime) = 3
AND isPaid = '已支付'
AND userId IN (
SELECT DISTINCT
userId
FROM
db1.orderinfo
WHERE
MONTH (paidTime) = 4
AND isPaid = '已支付'
)


关于回购率,老师使用了一个自连接,勉强能理解大致思路,通过对比两个月份的月度标签是否相差一个月,相差一个月则为老客户重复购买,这样在月份多时具有更好地适用性。

1
2
3
4
5
6
7
8
9
10
11
复制代码select t1.m,count(t1.m),count(t2.m) from (
select userId,DATE_FORMAT(paidTime,'%Y-%m-01') as m from db1.orderinfo
where isPaid = '已支付'
group by userId,date_format(paidTime,'%Y-%m-01')) t1
left join (
select userId,date_format(paidTime,'%Y-%m-01') as m from db1.orderinfo
where isPaid = '已支付'
group by userId,date_format(paidTime,'%Y-%m-01')) t2
on t1.userId = t2.userId
and t1.m = date_sub(t2.m,interval 1 month)
group by t1.m


3、统计男女用户消费频次是否有差异

这个问题被我给复杂化了,我分别求了一次男性消费频次和女性消费频次!思路就是先将用户表和订单表做联结,然后过滤性别为男的记录并通过分组返回单一消费者记录。(女性的计算类比男性)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
复制代码-- 男性消费频次
SELECT
SUM(mmg.mean) / count(*) AS mam_m
FROM
(
SELECT
orderinfo.userId,
ROUND(COUNT(orderinfo.userId), 1) AS mean
FROM
orderinfo
INNER JOIN userinfo ON orderinfo.userId = userinfo.userId
WHERE
userinfo.gender = '男'
GROUP BY
orderinfo.userId
) AS mmg
-- 女性消费频次:
SELECT
SUM(mmg.mean) / count(*) AS mam_m
FROM
(
SELECT
orderinfo.userId,
ROUND(COUNT(orderinfo.userId), 1) AS mean
FROM
orderinfo
INNER JOIN userinfo ON orderinfo.userId = userinfo.userId
WHERE
userinfo.gender = '女'
GROUP BY
orderinfo.userId
) AS mmg

男女消费频次(老师版):

老师首先在连接两个表的基础上,剔除了无效记录,通过count做了单个购买者的购买数量统计,
然后使用了基于性别的分组均值聚合,输出男女性平均消费频次。这个思路太棒了,我特么的就是想不到~_~

SELECT
gender,
AVG(ct)
FROM
(
SELECT
o.userId,
gender,
count(1) AS ct
FROM
orderinfo o
INNER JOIN (
SELECT
*
FROM
db1.userinfo
WHERE
gender != ''
) t ON o.userId = t.userId
GROUP BY
userId,
gender
) t2
GROUP BY
gender



4、统计多次消费的用户,第一次和最后一次消费间隔是多少?

这个题目是我耗费我时间最长的一道题目,其实逻辑上肯定大家都知道需要筛选出那些消费次数大于1次的记录,然后通过单个购买者所有消费记录中最远的消费时间与最近的消费时间做时间差即可。说起来简单可以做起来并不简单。于是我把代码写成了下面这个样子!

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
复制代码SELECT
* FROM
(
SELECT
userId,
DATEDIFF(
myresult.uptime,
myresult.odtime
) AS difftime
FROM
(
SELECT
lowf.userId,
lowf.odtime,
UPf.uptime
FROM
(
SELECT
userId,
min(ldd.ltime) AS odtime
FROM
(
SELECT
userId,
paidTime AS ltime
FROM
orderinfo
WHERE
isPaid = '已支付'
ORDER BY
userId,
Ltime
) AS ldd
GROUP BY
userId
) AS lowf
INNER JOIN (
SELECT
userId,
max(pdd.ptime) AS uptime
FROM
(
SELECT
userId,
paidTime AS ptime
FROM
orderinfo
WHERE
isPaid = '已支付'
ORDER BY
userId,
ptime DESC
) AS pdd
GROUP BY
userId
) AS UPf
ON lowf.userId = UPf.userId
) AS myresult
) AS myresult1
WHERE
difftime != 0


我的大体思路是,最内层的逻辑是先筛选出来消费者距今最远消费记录,最近消费记录,并将两次输出做内连接。在输出的表基础上,做时间差,如果时间为0则说明只有一次消费,直接使用difftime != 0过滤掉即可。

以下是老师给出的思路,看完之后大呼自愧不如,可以看到我上面的那个内连接是多此一举,使用max、min两个函数并列字段就可以解决,但是我写的太复杂了!居然也能跑出来。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
复制代码SELECT
userId,
min(paidTime) as mintime,
max(paidTime) as maxtime,
datediff(
max(paidTime),
min(paidTime)
) as difftime
FROM
db1.orderinfo
WHERE
isPaid = '已支付'
GROUP BY
userId
HAVING
count(1) > 1


5、统计不同年龄段用户消费金额是否有差异

这个问题乍一看,我不太理解,最初想着这个年龄段怎么定义(没有给出精确的定义),然后我就想着平时一说到年龄代购就说什么70后、80后、90后什么的,就以为这种就可以做年龄段依据。

我个人的大体思路就是,最内层首先做两个表的联结(联结的同时过滤掉缺失值和未支付记录),然后中间层对出生日期进行分类编码(1970~1979为70后,以此类推)。

最后最外层通过对年龄段进行分组聚合,求不同年龄段下的支付价格的均值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
复制代码SELECT
trend,
round(avg(price), 2) AS means
FROM
(
SELECT
userId,
btdate,
price,
CASE
WHEN btdate BETWEEN '1970-01-01' AND '1979-12-31' THEN '70后'
WHEN btdate BETWEEN '1980-01-01' AND '1989-12-31' THEN '80后'
WHEN btdate BETWEEN '1990-01-01' AND '1999-12-31' THEN '90后'
WHEN btdate BETWEEN '2001-01-01' AND '2009-12-31' THEN '00后'
ELSE '10后'
END
AS 'trend'
FROM
(
SELECT
o.userId,
price,
date(brith) AS btdate
FROM
orderinfo o
LEFT JOIN (
SELECT * FROM
db1.userinfo
WHERE
gender != ''
) t ON o.userId = t.userId
WHERE
isPaid = '已支付'
AND
date(brith) != '0000-00-00'
) AS mt
ORDER BY
userId
) AS outtable
GROUP BY
trend
ORDER BY
means

关于年龄段消费金额差异,老师给出的思路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码SELECT
age,
AVG(ct)FROM
(
SELECT
o.userId,
age,
count(o.userId) AS ct
FROM
db1.orderinfo o
INNER JOIN (
SELECT
userId,
CEIL((YEAR(now()) - YEAR(brith)) / 10) AS age
FROM
db1.userinfo
WHERE
brith > '1901-00-00'
) AS t
ON o.userId = t.userId
GROUP BY
o.userId,
age
) t2
GROUP BY
age


以上老师计算了各年龄段购买者消费消费频次的平均值。这里老师使用日期函数替代了分别编码工作,使得整体代码看起来很简洁易懂。(自己需要学的还有很多!)

6、统计消费的二八法则,消费的top20%用户,贡献了多少额度?

其实这个二八法则的问题逻辑很简单,就是按照单个消费者总消费金额排序,计算出那些前20%的的购买者消费金额占总体消费金额的比例。虽然逻辑很简单,但是在MySQL中想要写出次逻辑却并不是一件容易的事情,因为MySQL不支持 top n 这种函数,想要过滤前n个记录只能通过 追加 limit参数才可以。

所以我自己写了两段代码才解决:

首先按照单个消费者总购买金额排序,计算出前总支出排在前top20的消费者数量。(一共是17130)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码SELECT
ceil(count(*) / 5)
FROM
(
SELECT
userId,
sum(price) AS allsp
FROM
orderinfo
WHERE
date(paidTime) != '0000-00-00'
AND isPaid = '已支付'
GROUP BY
userId
ORDER BY
allsp DESC
) AS spend
-- 17130


然后再次运行次查询,使用limit参数限制输出前17130 个记录并计算其总金额占所有消费金额的比例即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
复制代码SELECT
(
SELECT
sum(allsp) AS top20
FROM
(
SELECT
userId,
sum(price) AS allsp
FROM
orderinfo
WHERE
date(paidTime) != '0000-00-00'
AND isPaid = '已支付'
GROUP BY
userId
ORDER BY
allsp DESC
LIMIT 17130
) AS spend
) / (
SELECT
sum(allsp) AS entry
FROM
(
SELECT
userId,
sum(price) AS allsp
FROM
orderinfo
WHERE
date(paidTime) != '0000-00-00'
AND isPaid = '已支付'
GROUP BY
userId
ORDER BY
allsp DESC
) AS spend
) as top20ratio


计算结果是85.46%左右。

由于篇幅所限,关于这五个问题的R语言版、Python版,期待下一篇推送吧!

说几点个人感想:

1、因为之前关于数据清洗和数据处理技能,全部都是在R语言中练习的,突然使用SQL来做,即便很简单的需求逻辑,写起来都感觉磕磕碰碰,总之就是无法灵活运用,简单问题往往被复杂化。

2、SQL中查询语句有固定的模式,所有的输出都要严格依赖select …… from…… where group by语句,甚至连各种函数都无法单独使用,这一点儿导致很多需要多步完成需求无法分割成多个中间步骤,必须借助子查询。

3、SQL没有像R语言一样的管道操作符或者Python中的方法调用,多任务步骤在一个句子中只能依赖子查询进行嵌套,稍微复杂些的需求,如果基础函数使用不够灵活的话,可能会写的很繁杂。

SQL查询语法需要在深刻理解表关系的基础上,尽量使用自带函数解决,这样既高效、又可以节省代码,以上自己写的代码中,有特别多的地方有冗余,以后还需要勤加练习,加强各种场景下的实践,灵活运用才能写出来简洁、高效、可复用性高的任务代码。

以下链接是秦路老师在天善学院所主讲的七周成为数据分析师系列课程!

edu.hellobi.com/course/205

七周课程,七种数据分析师必备技能,循序渐进、逐个击破,推荐给对数据分析、商业分析感兴趣的小伙伴儿学习!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot 入门之缓存和 NoSQL 篇(四)

发表于 2017-12-07

一、前言

当系统的访问量增大时,相应的数据库的性能就逐渐下降。但是,大多数请求都是在重复的获取相同的数据,如果使用缓存,将结果数据放入其中可以很大程度上减轻数据库的负担,提升系统的响应速度。

本篇将介绍 Spring Boot 中缓存和 NoSQL 的使用。上篇文章《Spring Boot 入门之持久层篇(三)》。

二、整合缓存

Spring Boot 针对不同的缓存技术实现了不同的封装,本篇主要介绍 EhCache 和 Redis 缓存。

Spring Boot 提供了以下几个注解实现声明式缓存:

注解 说明
@EnableCaching 开启缓存功能,放在配置类或启动类上
@CacheConfig 缓存配置,设置缓存名称
@Cacheable 执行方法前先查询缓存是否有数据。有则直接返回缓存数据;否则查询数据再将数据放入缓存
@CachePut 执行新增或更新方法后,将数据放入缓存中
@CacheEvict 清除缓存
@Caching 将多个缓存操作重新组合到一个方法中

2.1 EhCache 缓存

2.1.1 添加依赖

1
2
3
4
5
6
7
8
复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-cache</artifactId>
</dependency>
<dependency>
<groupId>net.sf.ehcache</groupId>
<artifactId>ehcache</artifactId>
</dependency>

2.1.2 添加配置

在 src/main/resources 目录下创建 ehcache.xml 文件,内容如下:

1
2
3
4
5
6
7
8
复制代码<ehcache xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:noNamespaceSchemaLocation="ehcache.xsd">
<cache name="department"
eternal="false"
maxEntriesLocalHeap="0"
timeToIdleSeconds="50">
</cache>
</ehcache>

application.properties :

1
2
3
4
复制代码spring.cache.type=ehcache
spring.cache.ehcache.config=classpath:ehcache.xml
# 打印日志,查看 sql
logging.level.com.light.springboot=DEBUG

2.1.3 编码

在持久层篇的基础上,结合 Mybatis 测试:

Service 层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
复制代码@CacheConfig(cacheNames = "department")
@Service
public class DepartmentService {
@Autowired
private DepartmentMapper departmentMapper;
@CachePut(key = "#department.id")
public Department save(Department department) {
System.out.println("保存 id=" + department.getId() + " 的数据");
this.departmentMapper.insert(department);
return department;
}
@CachePut(key = "#department.id")
public Department update(Department department) {
System.out.println("修改 id=" + department.getId() + " 的数据");
this.departmentMapper.update(department);
return department;
}
@Cacheable(key = "#id")
public Department getDepartmentById(Integer id) {
System.out.println("获取 id=" + id + " 的数据");
Department department = this.departmentMapper.getById(id);
return department;
}
@CacheEvict(key = "#id")
public void delete(Integer id) {
System.out.println("删除 id=" + id + " 的数据");
this.departmentMapper.deleteById(id);
}
}

控制层:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
复制代码@Controller
@RequestMapping("department")
@ResponseBody
public class DepartmentController {
@Autowired
private DepartmentService departmentService;

@RequestMapping("save")
public Map<String,Object> save(Department department) {
this.departmentService.save(department);

Map<String,Object> map = new HashMap<String,Object>();
map.put("code", "200");
map.put("msg", "保存成功");
return map;
}

@RequestMapping("get/{id}")
public Map<String,Object> get(@PathVariable("id") Integer id) {
Department department = this.departmentService.getDepartmentById(id);

Map<String,Object> map = new HashMap<String,Object>();
map.put("code", "200");
map.put("msg", "获取成功");
map.put("data", department);
return map;
}

@RequestMapping("update")
public Map<String,Object> update(Department department) {
this.departmentService.update(department);

Map<String,Object> map = new HashMap<String,Object>();
map.put("code", "200");
map.put("msg", "修改成功");
return map;
}

@RequestMapping("delete/{id}")
public Map<String,Object> delete(@PathVariable("id") Integer id) {
this.departmentService.delete(id);

Map<String,Object> map = new HashMap<String,Object>();
map.put("code", "200");
map.put("msg", "删除成功");
return map;
}
}

启动类:

添加 @EnableCaching 注解,开启缓存功能。

1
2
3
4
5
6
7
复制代码@EnableCaching
@SpringBootApplication
public class SpringbootNosqlApplication {
public static void main(String[] args) {
SpringApplication.run(SpringbootNosqlApplication.class, args);
}
}

2.1.4 测试说明

1) 发起保存请求:

1
2
3
4
复制代码保存 id=2 的数据
2017-12-06 14:50:48.800 DEBUG 680 --- [nio-8081-exec-7] c.l.s.dao.DepartmentMapper.insert : ==> Preparing: insert into department(id,name,descr) values(?,?,?)
2017-12-06 14:50:48.801 DEBUG 680 --- [nio-8081-exec-7] c.l.s.dao.DepartmentMapper.insert : ==> Parameters: 2(Integer), Ehcache 部门(String), Ehcache(String)
2017-12-06 14:50:48.868 DEBUG 680 --- [nio-8081-exec-7] c.l.s.dao.DepartmentMapper.insert : <== Updates: 1

2) 保存成功后,立刻发起查询请求,没有日志打印,但返回对象数据,说明数据是从缓存中获取。

3) 发起修改请求:

1
2
3
4
复制代码修改 id=2 的数据
2017-12-06 14:51:16.588 DEBUG 680 --- [nio-8081-exec-8] c.l.s.dao.DepartmentMapper.update : ==> Preparing: update department set name = ? , descr = ? where id = ?
2017-12-06 14:51:16.589 DEBUG 680 --- [nio-8081-exec-8] c.l.s.dao.DepartmentMapper.update : ==> Parameters: Ehcache 部门2(String), Ehcache2(String), 2(Integer)
2017-12-06 14:51:16.657 DEBUG 680 --- [nio-8081-exec-8] c.l.s.dao.DepartmentMapper.update : <== Updates: 1

4) 修改成功后,立刻发起查询请求,没有日志打印,但返回修改后的对象数据,说明缓存中的数据已经同步。

5) 发起删除请求:

1
2
3
4
复制代码删除 id=2 的数据
2017-12-06 14:52:07.572 DEBUG 680 --- [nio-8081-exec-1] c.l.s.dao.DepartmentMapper.deleteById : ==> Preparing: delete from department where id = ?
2017-12-06 14:52:07.572 DEBUG 680 --- [nio-8081-exec-1] c.l.s.dao.DepartmentMapper.deleteById : ==> Parameters: 2(Integer)
2017-12-06 14:52:07.613 DEBUG 680 --- [nio-8081-exec-1] c.l.s.dao.DepartmentMapper.deleteById : <== Updates: 1

6) 删除成功后,立刻发起查询请求,控制台打印 sql 语句,说明缓存数据被删除,需要查询数据库。

1
2
3
4
复制代码获取 id=2 的数据
2017-12-06 14:52:40.324 DEBUG 680 --- [nio-8081-exec-3] c.l.s.dao.DepartmentMapper.getById : ==> Preparing: select id,name,descr from department where id = ?
2017-12-06 14:52:40.325 DEBUG 680 --- [nio-8081-exec-3] c.l.s.dao.DepartmentMapper.getById : ==> Parameters: 2(Integer)
2017-12-06 14:52:40.328 DEBUG 680 --- [nio-8081-exec-3] c.l.s.dao.DepartmentMapper.getById : <== Total: 0

2.2 Redis 缓存

2.2.1 添加依赖

1
2
3
4
复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

2.2.2 添加配置

application.properties :

1
2
3
4
5
6
7
8
9
10
11
12
复制代码spring.redis.host=192.168.2.11
spring.redis.port=6379
spring.redis.password=
spring.redis.database=0
spring.redis.pool.max-active=8
spring.redis.pool.max-idle=8
spring.redis.pool.max-wait=-1
spring.redis.pool.min-idle=0
spring.redis.timeout=0
spring.cache.type=redis
# 打印日志,查看 sql
logging.level.com.light.springboot=DEBUG

注意:spring.cache.type=redis,缓存类型设置成 redis。

完成上边 2 个步骤后,其他步骤与测试 Ehcache 时的步骤一致。

测试结果也一致,此处省略。

三、整合 Redis

上一个小节其实已经介绍了 Spring Boot 整合 Redis 的内容。

在添加 redis 依赖包启动项目后,Spring Boot 会自动配置 RedisCacheManger 和 RedisTemplate 的 Bean。如果开发者不想使用 Spring Boot 写好的 Redis 缓存,而是想使用其 API 自己实现缓存功能、消息队列或分布式锁之类的需求时,可以继续往下浏览。

Spring Data Redis 为我们提供 RedisTemplate 和 StringRedisTemplate 两个模板进行数据操作,它们主要 的访问方法如下:

方法 说明
opsForValue() 操作简单属性的数据
opsForList() 操作含有 list 的数据
opsForSet() 操作含有 set 的数据
opsForZSet() 操作含有 zset 的数据
opsForHash() 操作含有 hash 的数据

3.1 添加依赖

1
2
3
4
复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>

3.2 配置连接

1
2
3
4
5
6
7
8
9
复制代码spring.redis.host=192.168.2.11
spring.redis.port=6379
spring.redis.password=
spring.redis.database=0
spring.redis.pool.max-active=8
spring.redis.pool.max-idle=8
spring.redis.pool.max-wait=-1
spring.redis.pool.min-idle=0
spring.redis.timeout=0

3.3 编码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
复制代码@Component
public class RedisDao {

@Autowired
private StringRedisTemplate stringRedisTemplate;
public void set(String key, String value) {
this.stringRedisTemplate.opsForValue().set(key, value);
}
public String get(String key) {
return this.stringRedisTemplate.opsForValue().get(key);
}
public void delete(String key) {
this.stringRedisTemplate.delete(key);
}
}

3.4 测试

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码@RunWith(SpringRunner.class)
@SpringBootTest
public class RedisDaoTest {
@Autowired
private RedisDao redisDao;

@Test
public void testSet() {
String key = "name";
String value = "zhangsan";

this.redisDao.set(key, value);
}

@Test
public void testGet() {
String key = "name";
String value = this.redisDao.get(key);
System.out.println(value);
}

@Test
public void testDelete() {
String key = "name";
this.redisDao.delete(key);
}
}

测试结果省略…

四、整合 MongoDB

Spring Data MongoDB 提供了 MongoTemplate 模板 和 Repository 让开发者进行数据访问。

4.1 添加依赖

1
2
3
4
复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-mongodb</artifactId>
</dependency>

4.2 配置连接

1
2
3
复制代码spring.data.mongodb.host=192.168.2.25
spring.data.mongodb.port=27017
spring.data.mongodb.database=test

4.3 编码

4.3.1 使用 MongoTemplate

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
复制代码@Component
public class MongodbDao {

@Autowired
private MongoTemplate mongoTemplate;
public void insert(User user) {
this.mongoTemplate.insert(user);
}
public void deleteById(int id) {
Criteria criteria = Criteria.where("id").is(id);
Query query = new Query(criteria);
this.mongoTemplate.remove(query, User.class);
}
public void update(User User) {
Criteria criteria = Criteria.where("id").is(User.getId());
Query query = new Query(criteria);
Update update = new Update();
update.set("password", User.getPassword());
this.mongoTemplate.updateMulti(query, update, User.class);
}
public User getById(int id) {
Criteria criteria = Criteria.where("id").is(id);
Query query = new Query(criteria);
return this.mongoTemplate.findOne(query, User.class);
}

public List<User> getAll() {
List<User> userList = this.mongoTemplate.findAll(User.class);
return userList;
}

}

4.3.2 使用 Repository

1
2
3
复制代码public interface UserRepository extends MongoRepository<User, Integer> {

}

测试方式与 Redis 测试大同小异,测试结果省略…

五、参考资料

  • docs.spring.io/spring/docs… 官方文档

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

零拷贝读取文件成go对象

发表于 2017-12-06

我们观察到从文件读取到go对象,需要两次拷贝:

  1. 从文件拷贝到内存,成为[]byte
  2. 从[]byte,按照格式进行读取,拷贝到go对象上

怎么样优化这个读取速度呢?

  1. 利用mmap,把文件直接映射到内存,go允许把这片内存已经转化成[]byte来使用
  2. 直接在这个[]byte上“展开”go对象

所谓”展开“就是一个reinterpret cast,对一个指针的类型重新解读。

1
2
3
4
复制代码var bytes = []byte{
16, 0, 0, 0, 0, 0, 0, 0,
5, 0, 0, 0, 0, 0, 0, 0,
'h', 'e', 'l', 'l', 'o'}

假设有这样一个[]byte数组。这个是直接用mmap读取出来的。

1
复制代码var ptr = &bytes[0]

这个ptr就是这片内存区域的指针,指向了开头的第一个元素

1
2
3
4
5
复制代码type stringHeader struct {
Data uintptr
Len int
}
header := (*stringHeader)(unsafe.Pointer(ptr))

这样我们就把这个内存重新解读为了一个stringHeader了。利用stringHeader就可以构造出string来。

1
复制代码header.Data = uintptr(unsafe.Pointer(&bytes[16]))

把stringHeader的指针指向实际的hello数据部分。

1
2
复制代码str := (*string)(unsafe.Pointer(ptr))
fmt.Println(str) // "hello"

最后再把同一片内存区域解读为string类型,就得到了”hello”字符串了。整个解码过程只做了一次header.Data的更新,没有做任何内存分配。

相比Java来说,go允许我们使用go自己的heap外的内存。甚至允许把go的对象直接在这片内存上构造出来。这使得我们的应用可以和文件系统的缓存共享一片内存,达到内存利用率的最大化。同时相比protobuf/thrift来说,gocodec就是把cpu对值的内存表示(little endian的integer等),以及go语言对象的内存表示(stringHeader,sliceHeader)直接拷贝了,减少了编解码的计算成本。

完整的代码,欢迎star:bloomfilter_test.go

设计了一个编解码格式叫 github.com/esdb/gocode…

和protobuf的对比还没有测,和json相比,毫无悬念地不在一个量级上。

gocodec 200000 10893 ns/op 288 B/op 2 allocs/op

json 300 3746169 ns/op 910434 B/op 27 allocs/op

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…914915916…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%