开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

对 Kafka 和 Pulsar 进行性能测试后,拉卡拉将消

发表于 2021-05-07

拉卡拉支付成立于 2005 年,是国内领先的第三方支付企业,致力于整合信息科技,服务线下实体,从支付切入,全维度为中小微商户的经营赋能。2011 年成为首批获得《支付业务许可证》企业的一员,2019 年上半年服务商户超过 2100 万家。2019 年 4 月 25 日,登陆创业板。

功能需求

由于拉卡拉的项目组数量较多,各个项目在建设时,分别根据需要选择了自己的消息系统。这就导致一方面很多系统的业务逻辑和具体的消息系统之间存在耦合,为后续系统维护和升级带来麻烦;另一方面业务团队成员对消息系统的管理和使用水平存在差异,从而使得整体系统服务质量和性能不稳定;此外,同时维护多套系统,物理资源利用率和管理成本都比较高。因此,我们计划建设一套分布式基础消息平台,同时为各个团队提供服务。该平台需要具备以下特性:高可靠、低耦合、租户隔离、易于水平扩展、易于运营维护、统一管理、按需申请使用,同时支持传统的消息队列和流式队列。表 1 展示了这两类服务应该具备的特性。

为什么选择 Apache Pulsar

现在可供用户选择的大厂开源消息平台有很多,架构设计大多类似,比如 Kafka 和 RocketMQ 都采用存储与计算一体的架构,只有 Pulsar 采用存储与计算分离的多层架构。我们比较选型的消息系统有三个:Kafka、RocketMQ 和 Pulsar。测试之前,我们通过网上的公开数据,对三者的性能和功能进行了简单的对比,表 2 为对比结果。从中可以看出 Pulsar 更符合我们的需求。

Pulsar 的架构优势

Pulsar 是云原生的分布式消息流平台,源于 Yahoo!,支持 Yahoo! 应用,服务 140 万个 topic,日处理超过 1000 亿条消息。2016 年 Yahoo! 开源 Pulsar 并将其捐赠给 Apache 软件基金会,2018 年 Pulsar 成为 Apache 软件基金会的顶级项目。

作为一种高性能解决方案,Pulsar 具有以下特性:支持多租户,通过多租户可为每个租户单独设置认证机制、存储配额、隔离策略等;高吞吐、低延迟、高容错;原生支持多集群部署,集群间支持无缝数据复制;高可扩展,能够支撑上百万个 topic;支持多语言客户端,如 Java、Go、Python、C++ 等;支持多种消息订阅模式(独占、共享、灾备、Key_Shared)。

架构合理 Kafka 采用计算与存储一体的架构,当 topic 数量较多时,Kafka 的存储机制会导致缓存污染,降低性能。Pulsar 采用计算与存储分离的架构(如图 1)。无状态计算层由一组接收和投递消息的 broker 组成,broker 负责与业务系统进行通信,承担协议转换,序列化和反序列化、选主等功能。有状态存储层由一组 bookie 存储节点组成,可以持久存储消息。

Broker 架构

Broker 主要由四个模块组成。我们可以根据实际需求对相应的功能进行二次开发。

Dispatcher:调度分发模块,承担协议转换、序列化反序列化等。

Load balancer:负载均衡模块,对访问流量进行控制管理。

Global replicator:跨集群复制模块,承担异步的跨集群消息同步功能。

Service discovery:服务发现模块,为每个 topic 选择无状态的主节点。

持久层(BookKeeper)架构

图 3 为 Pulsar 中持久层的架构图。Bookie 是 BookKeeper 的存储节点,提供独立的存储服务。ZooKeeper 为元数据存储系统,提供服务发现以及元数据管理服务。BookKeeper 架构属于典型的 slave-slave 架构,所有 bookie 节点的角色都是 slave,负责持久化数据,每个节点的处理逻辑都相同;BookKeeper 客户端为 leader 角色,承担协调工作,由于其本身无状态,所以可以快速实现故障转移。

隔离架构

保证了 Pulsar 的优良性能,主要体现在以下几个方面:

IO 隔离:写入、追尾读和追赶读隔离。

利用网络流入带宽和磁盘顺序写入的特性实现高吞吐写:传统磁盘在顺序写入时,带宽很高,零散读写导致磁盘带宽降低,采取顺序写入方式可以提升性能。

利用网络流出带宽和多个磁盘共同提供的 IOPS 处理能力实现高吞吐读:收到数据后,写到性能较好的 SSD 盘里,进行一级缓存,然后再使用异步线程,将数据写入到传统的 HDD 硬盘中,降低存储成本。

利用各级缓存机制实现低延迟投递:生产者发送消息时,将消息写入 broker 缓存中;实时消费时(追尾读),首先从 broker 缓存中读取数据,避免从持久层 bookie 中读取,从而降低投递延迟。读取历史消息(追赶读)场景中,bookie 会将磁盘消息读入 bookie 读缓存中,从而避免每次都读取磁盘数据,降低读取延时。

对比总结

左侧为 Kafka、RabbitMQ 等消息系统采用的架构设计,broker 节点同时负责计算与存储,在某些场景中使用这种架构,可以实现高吞吐;但当 topic 数量增加时,缓存会受到污染,影响性能。

右侧为 Pulsar 的架构,Pulsar 对 broker 进行了拆分,增加了 BookKeeper 持久层,虽然这样会增加系统的设计复杂性,但可以降低系统的耦合性,更易实现扩缩容、故障转移等功能。表 3 总结了分区架构和分片架构的主要特性。

基于对 Pulsar 的架构和功能特点,我们对 Pulsar 进行了测试。在操作系统层面使用 NetData 工具进行监控,使用不同大小的数据包和频率进行压测,测试的几个重要指标是磁盘、网络带宽等的波动情况。

测试结论如下:

**部署方式:**混合部署优于分开部署。broker 和 bookie 可以部署在同一个节点上,也可以分开部署。节点数量较多时,分开部署较好;节点数量较少或对性能要求较高时,将二者部署在同一个节点上较好,可以节省网络带宽,降低延迟。

**负载大小:**随着测试负载的增大,tps 降低,吞吐量稳定。

**刷盘方式:**异步刷盘优于同步刷盘。

压缩算法:压缩算法推荐使用 LZ4 方式。我们分别测试了 Pulsar 自带的几种压缩方式,使用 LZ4 压缩算法时,CPU 使用率最低。使用压缩算法可以降低网络带宽使用率,压缩比率为 82%。

**分区数量:**如果单 topic 未达到单节点物理资源上限,建议使用单分区;由于 Pulsar 存储未与分区耦合,可以根据业务发展情况,随时调整分区数量。

**主题数量:**压测过程中,增加 topic 数量,性能不受影响。

**资源约束:**如果网络带宽为千兆,网络会成为性能瓶颈,网络 IO 可以达到 880 MB/s;在网络带宽为万兆时,磁盘会成为瓶颈,磁盘 IO 使用率为 85% 左右。

**内存与线程:**如果使用物理主机,需注意内存与线程数目的比例。默认配置参数为 IO 线程数等于 CPU 核数的 2 倍。这种情况下,实体机核数为 48 核,如果内存设置得较小,比较容易出现 OOM 的问题。

除了上述测试以外,我们还复测了 Jack Vanlightly(RabbitMQ 的测试工程师)的破坏性测试用例,得到如下结论:

所有测试场景中,没有出现消息丢失与消息乱序;

开启消息去重的场景中,没有出现消息重复。

另外,我们与 Apache Pulsar 项目的核心开发人员交流沟通时间较早,他们在 Yahoo! 和推特有过丰富的实践经验,预备成立公司在全世界范围内推广使用 Pulsar,并且会将中国作为最重要的基地,这为我们的使用提供了强有力的保障。现在大家也都知道,他们成立了 StreamNative 公司,并且已获得多轮融资,队伍也在不断壮大。

Pulsar 在基础消息平台的实践

我们基于 Pulsar 构建的基础消息平台架构如下图,图中绿色部分为基于 Pulsar 实现的功能或开发的组件。本节将结合实际使用场景,详细介绍我们如何在实际使用场景中应用 Pulsar 及基于 Pulsar 开发的组件。

场景 1:流式队列

1. OGG For Pulsar 适配器

源数据存储在 Oracle 中,我们希望实时抓取 Oracle 的变更数据,进行实时计算、数据分析、提供给下游业务系统查询等场景。

我们使用 Oracle 的 OGG(Oracle Golden Gate) 工具进行实时抓取,它包含两个模块:源端 OGG 和目标 OGG。由于 OGG 官方没有提供 Sink 到 Pulsar 的组件,我们根据需要开发了 OGG For Pulsar 组件。下图为数据处理过程图,OGG 会抓取到表中每条记录的增删改操作,并且把每次操作作为一条消息推送给 OGG For Pulsar 组件。OGG For Pulsar 组件会调用 Pulsar 客户端的 producer 接口,进行消息投递。投递过程中,需要严格保证消息顺序。我们使用数据库表的主键作为消息的 key,数据量大时,可以根据 key 对 topic 进行分区,将相同的 key 投递到同一分区,从而保证对数据库表中主键相同的记录所进行的增删改操作有序。

2. Pulsar To TiDB 组件

我们通过 Pulsar To TiDB 组件将抓取到的变更消息存储到 TiDB 中,对下游系统提供查询服务。这一组件的处理逻辑为:

使用灾备订阅方式,消费 Pulsar 消息。

根据消息的 key 进行哈希运算,将相同的 key 散列到同一持久化线程中。

启用 Pulsar 的消息去重功能,避免消息重复投递。假设 MessageID2 重复投递,那么数据一致性将被破坏。

3. Pulsar 的消息持久化过程分析

Pulsar 的消息持久化过程包括以下四步:

OGG For Pulsar 组件调用 Pulsar 客户端的 producer 接口,投递消息。

Pulsar 客户端根据配置文件中的 broker 地址列表,获取其中一个 broker 的地址,然后发送 topic 归属查询服务,获取服务该 topic 的 broker 地址(下图示例中为 broker2)。

Pulsar 客户端将消息投递给 Broker2。

Broker2 调用 BookKeeper 的客户端做持久化存储,存储策略包括本次存储可选择的 bookie 总数、副本数、成功存储确认回复数。

4. 数据库表结构动态传递

OGG 使用 AVRO 方式进行序列化操作时,如果将多个表投递到同一个 topic 中,AVRO Schema 为二级结构:wrapper schema 和 table schema。wrapper schema 结构始终不变,包含 table_name、schema_fingerprint、payload 三部分信息;OGG 在抓取数据时,会感知数据库表结构的变化并通知给 OGG For Pulsar,即表结构决定其 table schema,再由 table schema 生成对应的 schema_fingerprint。

我们将获取到的 table schema 发送并存储在指定的 Schema topic 中。Data topic 中的消息只包含 schema_fingerprint 信息,这样可以降低序列化后消息包的大小。Pulsar To TiDB 启动时,从 Schema topic 消费数据,使用 schema_fingerprint 为 Key 将 table schema 缓存在内存中。反序列化 Data Topic 中的消息时,从缓存中根据 schema_fingerprint 提取 table schema,对 payload 进行反序列化操作。

5. 一致性保障

要保证消息有序和去重,需要从 broker、producer、consumer 三方面进行设置。

Broker

在 namespace 级别开启去重功能:bin/pulsar-admin namespaces set-deduplication namespace –enable

修复 / 优化 Pulsar 客户端死锁问题。2.7.1 版本已修复,详细信息可参考 PR 9552。

Producer

pulsar.producer.batchingEnabled=false

在 producer 设置中,关闭批量发送。如果开启批量发送消息,则消息可能会乱序。

pulsar.producer.blocklfQueueFull=true

为了提高效率,我们采用异步发送消息,需要开启阻塞队列处理,否则可能会出现消息丢失。调用异步发送超时,发送至异常 topic。如果在异步超时重发消息时,出现消息重复,可以通过开启自动去重功能进行处理;其它情况下出现的消息发送超时,需要单独处理,我们将这些消息存储在异常 topic 中,后续通过对账程序从源库直接获取终态数据。

Consumer

实现拦截器:ConsumerInterceptorlmpl implements ConsumerInterceptor

配置确认超时:pulsarClient.ackTimeout(3000, TimeUnit.MILLISECONDS).ackTimeoutTickTime(500, TimeUnit.MILLISECONDS)

使用累积确认:consumer.acknowledgeCumulative(sendMessageID)

**备注:**配置确认超时参数,如果没有在 ackTimeout 时间内进行消费确认的话,消息将重新投递。为了严格保证一致性,我们需要使用累计确认方式进行确认。

6. 消息消费的确认方式

假如在 MessageID 为 1 的消息已确认消费成功,开始采用累积确认方式,此时正在确认 MessageID 为 3 的消息,则已消费但未确认的 MessageID 为 2 的消息也会被确认成功。假如在“确认超时”时间内一直未收到确认,则会按照原顺序重新投递 MessageID 为 2、3、4、5 的消息。

假如采用单条确认方式,图中 MessageID 为 1、3、4 的消息确认消费成功,而 MessageID 为 2 的消息“确认超时”。在这种情况下,如果应用程序处理不当,未按照消费顺序逐条确认,则出现消息“确认超时”时,只有发生超时的消息(即 MessageID 为 2 的消息)会被重新投递,导致消费顺序发生错乱。

**总结:**队列消费模式建议使用单条确认方式,流式消费模式建议使用累积确认方式。

7. 消息确认超时(客户端)检测机制

确认超时机制中有两个参数,超时时间和轮询间隔。超时检测机制通过一个双向队列 + 多个 HashSet 实现。HashSet 的个数为(超时时间)除以(轮询间隔)后取整,因此每次轮询处理一个 HashSet,从而有效规避全局锁带来的性能损耗。

场景 2:消息队列:OpenMessaging 协议实现(透明层协议)

我们过去使用的很多业务系统都和消息系统强耦合,导致后续升级和维护很麻烦,因此我们决定使用 OpenMessaging 协议作为中间层进行解耦。

通过 Pulsar 实现 OpenMessaging 协议。

开发框架(基于 spring boot)调用 OpenMessaging 协议接口,发送和接收消息。

场景 3:流式队列:自定义 Kafka 0.8-Source(Source 开发)

Pulsar IO 可以轻松对接到各种数据平台。我们的部分业务系统使用的是 Kafka 0.8,官方没有提供对应的 Source,因此我们根据 Pulsar IO 的接口定义,开发了 Kafka 0.8 Source 组件。

场景 4:流式队列:Function 消息过滤(消息过滤)

我们通过 Pulsar Functions 把 Pulsar IDC 集群消息中的敏感字段(比如身份证号,手机号)脱敏后实时同步到云集群中,供云上应用消费。

场景 5:流式队列:Pulsar Flink Connector 流式计算(流式计算)

商户经营分析场景中,Flink 通过 Pulsar Flink Connector 连接到 Pulsar,对流水数据根据不同维度,进行实时计算,并且将计算结果再通过 Pulsar 持久化到 TiDB 中。从目前的使用情况来看,Pulsar Flink Connector 的性能和稳定性均表现良好。

场景 6:流式队列:TiDB CDC 适配(TiDB 适配)

我们需要基于 TiDB 数据变更进行实时抓取,但 TiDB CDC For Pulsar 序列化方式不支持 AVRO 方式,因此我们针对这一使用场景进行了定制化开发,即先封装从 TiDB 发出的数据,再投递到 Pulsar 中。TiDB CDC For Pulsar 组件的开发语言为 Go 语言。

未来规划

我们基于 Pulsar 构建的基础消息平台有效提高了物理资源的使用效率;使用一套消息平台简化了系统维护和升级等操作,整体服务质量也得以提升。我们对 Pulsar 的未来使用规划主要包括以下两点:

陆续下线其它消息系统,最终全部接入到 Pulsar 基础消息平台;

深度使用 Pulsar 的资源隔离和流控机制。

在实践过程中,借助 Pulsar 诸多原生特性和基于 Pulsar 开发的组件,新消息平台完美实现了我们预期的功能需求。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

搞定Tomcat服务配置和服务器优化只需看这一篇文章! 前言

发表于 2021-05-07

前言

Tomcat优化主要有两个方面:内存优化和并发优化。
关于Tomcat的知识点总结了一份文档,分享给大家:

一、Tomcat内存优化

① tomcat启动脚本

Tomcat内存优化其实也就是JVM优化,启动时告诉JVM需要多大内存(调优内存是最直接的方式)。配置文件为Windows 下的 catalina.bat,Linux 下的 catalina.sh。

在linux启动Tomcat通常我们会执行bin/startup.sh文件,查看该文件源码会发现,该文件最后会执行catalina.sh脚本文件。

在catalina.sh脚本文件中,会看到如下注释变量释义:

image.png

该脚本注释说明了一些CATALINA Server的环境配置,如CATALINA_HOME、CATALINA_BASE、CATALINA_OUT、CATALINA_OPTS以及CATALINA_TMPDIR等。值得注意的是,通常建议自定义环境配置文件放在setenv.sh in CATALINA_BASE/bin。

② catalina.sh中变量说明

CATALINA_HOME&&CATALINA_BASE

前者是Tomcat安装目录,后者是Tomcat工作目录。CATALINA_BASE该变量是可选的,如果没有配置该变量,则默认使用CATALINA_HOME。那么什么时候下同时配置CATALINA_HOME&&CATALINA_BASE呢?如果我们想要运行Tomcat的 多个实例,但是不想安装多个Tomcat软件副本。那么我们可以配置多个工作目录,每个运行实例独占一个工作目录,但是共享同一个安装目录。

CATALINA_OUT

同样是可选的配置,指向将重定向stdout和stderr的文件的完整路径。默认是$CATALINA_BASE/logs/catalina.out。

CATALINA_OPTS

可选配置。执行“开始”、“运行”或“调试”命令时使用的Java运行时选项。在此处定义的选项(不在JAVA_OPTS中),只能被Tomcat自身使用,不能被stop线程、version命令等运行。使用实例如配置head size,GC logging, JMX ports等。

CATALINA_TMPDIR

可选配置。JVM使用的临时路径(java.io.tmpdir该系统环境变量对应),默认为$CATALINA_BASE/temp。

JAVA_HOME&&JRE_HOME

JAVA_HOME–Java开发环境,JRE_HOME–Java运行环境。jdk1.7后,jdk内嵌了jre。如果JRE_HOME未显示设置,则取JAVA_HOME值。

JAVA_OPTS

可选配置。执行任何命令时使用的Java运行时选项。此处的选项(不在CATALINA_OPTS中)可以被Tomcat自身以及stop进程程、version命令等使用。注释中还有这样一句话-Most options should go into CATALINA_OPTS,也就是说你在JAVA_OPTS配置的参数大多数对CATALINA_OPTS实用,并且被CATALINA_OPTS使用。

CATALINA_PID

可选配置。包含catalina启动java进程的pid文件。这里注释有个彩蛋–when start (fork) is used。

LOGGING_CONFIG

可选配置。覆盖Tomcat默认日志配置文件,默认实例为LOGGING_CONFIG=”-Djava.util.logging.config.file=$CATALINA_BASE/conf/logging.properties”

LOGGING_MANAGER

可选配置。覆盖Tomcat默认的日志管理器,默认实例为LOGGING_MANAGER=”-Djava.util.logging.manager=org.apache.juli.ClassLoaderLogManager”

这些参数可以在Tomcat启动日志里面看到:

image.png

③ catalina.sh配置jvm参数实例

在catalina.sh文件中配置jvm的内存空间,如:

JAVA_OPTS="-server -Xms256m -Xmx2048m -XX:PermSize=256m -XX:MaxNewSize=1024m -XX:MaxPermSize=1024M"

-Xms JVM初始化堆的大小,-Xmx JVM堆的最大值,实际参数大小根据服务器配置或者项目具体设置。需要注意的是jdk1.8中持久代转换成了元空间,故不再支持PermSize与MaxPermSize参数,替换为了-XX:MetaspaceSize与-XX:MaxMetaspaceSize。

二、Tomcat 线程优化 在server.xml中配置

实例如下:

# 优化参数 <Connector port="8080" protocol="HTTP/1.1" maxThreads="1000" minSpareThreads="100" acceptCount="1000" maxConnections="1000" connectionTimeout="20000" maxHttpHeaderSize="8192" tcpNoDelay="true" compression="on" compressionMinSize="2048" disableUploadTimeout="true" redirectPort="8443" enableLookups="false" URIEncoding="UTF-8" />

① protocol

协议类型,可选类型有4种,BIO(阻塞型IO),NIO,NIO2和APR。

BIO

BIO(Blocking I/O) 阻塞式I/O操作,传统的Java I/O操作(即java.io包及其子包)。Tomcat在默认情况下,是以bio模式运行的,bio模式是三种运行模式中性能最低的一种。BIO配置采用默认即可。
BIO更适合处理简单流程,如程序处理较快可以立即返回结果。简单项目及应用可以采用BIO。

NIO

NIO(New I/O)是Java SE 1.4及后续版本提供的一种新的I/O操作方式(即java.nio包及其子包)。Java nio是一个基于缓冲区、非阻塞I/O操作的Java API它拥有比传统I/O操作(bio)更好的并发运行性能。
NIO更适合后台需要耗时完成请求的操作,如程序接到了请求后需要比较耗时的处理这已请求,所以无法立即返回结果,这样如果采用BIO就会占用一个连接,而使用NIO后就可以将此连接转让给其他请求,直至程序处理完成返回为止。

APR

APR(Apache Portable Runtime/Apache可移植运行时),是Apache HTTP服务器的支持库。你可以简单地理解为:Tomcat将以JNI的形式调用 Apache HTTP服务器的核心动态链接库来处理文件读取或网络传输操作,从而大大地提高 Tomcat对静态文件的处理性能。
APR可以大大提升Tomcat对静态文件的处理性能,同时如果你使用了HTTPS方式传输的话,也可以提升SSL的处理性能。

修改方式

//BIO protocol="HTTP/1.1" //NIO protocol="org.apache.coyote.http11.Http11NioProtocol" //NIO2 protocol="org.apache.coyote.http11.Http11Nio2Protocol" //APR protocol="org.apache.coyote.http11.Http11AprProtocol"

② maxThreads

连接器创建处理请求线程的最大数目,处理同时请求的最大数目,默认值为200。

如果一个执行器与此连接器关联,则忽略此属性,因为该属性将被忽略,所以该连接器将使用执行器而不是一个内部线程池来执行任务。maxThreads是一个重要的配置属性,maxThreads配置的合理直接影响了Tomcat的相关性能。maxThreads并不是配置的越大越好,事实上你即使配置成999999也是没有用的,因为这个最大值是受操作系统及相关硬件所制约的,并且最大值并不一定是最优值,所以我们追寻的应该是最优值而不是最大值。

③ minSpareThreads

线程的最小运行数目,这些始终保持运行(空闲线程)。如果未指定,默认值为10。

④ acceptCount

最大队列长度,一般与maxThreads相同,默认为100。

当所有可能的请求处理线程都在使用时,传入连接请求的最大队列长度。如果未指定,默认值为100。一般是设置的跟 maxThreads一样或一半,此值设置的过大会导致排队的请求超时而未被处理。所以这个值应该是主要根据应用的访问峰值与平均值来权衡配置。

⑤ maxConnections

在任何给定的时间内,服务器将接受和处理的最大连接数。当这个数字已经达到时,服务器将接受但不处理,等待进一步连接(进入队列排队)。NIO与NIO2的默认值为10000,APR默认值为8192。

⑥ connectionTimeout

当请求已经被接受,但未被处理,也就是等待中的超时时间。单位为毫秒,默认值为60000。通常情况下设置为30000。

⑦ maxHttpHeaderSize

请求和响应的HTTP头的最大大小,以字节为单位指定。如果没有指定,这个属性被设置为8192(8 KB)。

⑧ tcpNoDelay

如果为true,服务器socket会设置TCP_NO_DELAY选项,在大多数情况下可以提高性能。缺省情况下设为true。

⑨ compression

是否启用gzip压缩,默认为关闭状态。这个参数的可接受值为“off”(不使用压缩),“on”(压缩文本数据),“force”(在所有的情况下强制压缩)。

⑩ compressionMinSize

如果compression=“on”,则启用此项。被压缩前数据的最小值,也就是超过这个值后才被压缩。如果没有指定,这个属性默认为“2048”(2K),单位为byte。

①① disableUploadTimeout

这个标志允许servlet Container在一个servlet执行的时候,使用一个不同的,更长的连接超时。最终的结果是给servlet更长的时间以便完成其执行,或者在数据上传的时候更长的超时时间。如果没有指定,设为false。

①② enableLookups

关闭DNS反向查询。

三、Tomcat IO优化

① 同步阻塞IO(JAVA BIO)

同步并阻塞,服务器实现模式为一个连接一个线程(one connection one thread 想想都觉得恐怖,线程可是非常宝贵的资源),当然可以通过线程池机制改善。

② JAVA NIO 又分为同步非阻塞IO,异步阻塞IO

与BIO最大的区别one request one thread。可以复用同一个线程处理多个connection(多路复用)。

③ 异步非阻塞IO(Java NIO2又叫AIO)

主要与NIO的区别主要是操作系统的底层区别,可以做个比喻:比作快递,NIO就是网购后要自己到官网查下快递是否已经到了(可能是多次),然后自己去取快递。AIO就是快递员送货上门了(不用关注快递进度)。

BIO方式适用于连接数目比较小且固定的架构,这种方式对服务器资源要求比较高,并发局限于应用中,JDK1.4以前的唯一选择,但程序直观简单易理解。

NIO方式适用于连接数目多且连接比较短(轻操作)的架构,比如聊天服务器,并发局限于应用中,编程比较复杂,JDK1.4开始支持。

AIO方式使用于连接数目多且连接比较长(重操作)的架构,比如相册服务器,充分调用OS参与并发操作,编程比较复杂,JDK7开始支持。

在server.xml中:

<Connector port="80" protocol="org.apache.coyote.http11.Http11NioProtocol" connectionTimeout="20000" URIEncoding="UTF-8" useBodyEncodingForURI="true" enableLookups="false" redirectPort="8443" />

实现对Tomcat的IO切换。

四、大杀器APR

APR是从操作系统级别来解决异步的IO问题,大幅度的提高性能. (apr.apache.org/)。%E3%80%82)

APR(Apache Portable Runtime)是一个高可移植库,它是Apache HTTP Server 2.x 的核心,能更好地和其它本地web技术集成,总体上让Java更有效率作为一个高性能web服务器平台而不是简单作为后台容器;

在产品环境中,特别是直接使用Tomcat做WEB服务器的时候,应该使用Tomcat Native来提高其性能,如果不配APR,基本上300个线程狠快就会用满,以后的请求就只好等待.但是配上APR之后,并发的线程数量明显下降,从原来的300可能会马上下降到只有几十,新的请求会毫无阻塞的进来;

在局域网环境测,就算是400个并发,也是一瞬间就处理/传输完毕,但是在真实的Internet环境下,页面处理时间只占0.1%都不到,绝大部分时间都用来页面传输,如果不用APR,一个线程同一时间只能处理一个用户,势必会造成阻塞,所以生产环境下用apr是非常必要的。

安装Apache Tomcat Native Library,直接启动就支持apr(tomcat.apache.org/native-doc/)

它本身是基于APR的,排除代码问题Tomcat优化到这个层次,可以应对大部分性能需求。

最后,优化的前提条件是良好的代码质量和设计。

总结

我这边整理了一份SpringBoot相关资料文档、Spring系列全家桶、Java的系统化资料:(包括Java核心知识点、面试专题和20年最新的互联网真题、电子书等)有需要的朋友可以关注公众号【程序媛小琬】即可获取。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java性能优化(二)——JFR(Java Flight R

发表于 2021-05-07

概念

JFR(Java Flight Recorder,下文简称JFR)数据是JVM的历史事件,用来诊断JVM的历史性能和操作。是一种监控工具,可以在Java应用程序执行期间收集有关JVM事件的信息。JFR会开启一组事件,当有对应的事件发生时,就会保留相应的数据到文件中或者内存中(假如有开启缓存池),JMC可以显示这些事件——实时从JVM获取或者从文件中获取,JMC可以展示详细的JFR记录的数据。JFR对于被监控的应用程序来说,默认设置的性能开销很低:程序性能的1%以下,但是随着开始的事件或者记录线程增多,性能开销也会随之增多。

JFR有两种主要的概念:事件和数据流

事件

JFR在Java应用运行时收集对应发生的事件,主要有三种类型的事件提供给JFR收集:

  • 即时事件:一旦事件发生会立即进行数据记录
  • 持续事件:如果持续时间超过指定阈值则进行数据记录
  • 简单事件:用于记录应用所在系统的活跃指标(例如CPU,内存等)

数据流

JFR收集的事件包含大量数据,将这些数据保存在filename.jfr中,众所周知,磁盘I/O操作非常昂贵。因此,在将数据块刷新到磁盘之前,JFR使用各种缓存来存储收集的数据。因为加入缓存的原因,在某些情况下,JFR的数据存在丢失的可能性。如果发生丢失数据的情况,JFR会尝试通知输出文件,丢失了一部分的信息。

前期准备

需要下载安装对应的Java Mission Control,自行下载即可,我本地安装了Java Mission Control 8(文章发布时的最新版),JMC可以同JFR结合使用,JMC可以将JFR生成的XXX.jfr文件可视化展示出来。数据展示非常丰富。在文章实战后面会详细解释。

使用

使用配置参数开启:

1
2
shell复制代码# 加上以下的这两个参数即可开启对应的JFR功能
java -XX:+UnlockCommercialFeatures -XX:+FlightRecorder

在Tomcat中配置对应的参数不生效,也不知道是什么原因,如果有解决的小伙伴可以在评论区

使用命令行开启

使用jcmd命令行解锁JFR功能权限。

1
shell复制代码jcmd process_id VM.unlock_commercial_features 解锁JFR记录功能权限

使用jcmd命令行开启一个记录线程,duration记录的时间段,默认为0s,代表无限制,以下代码使用200s,表示记录200s结束。filename表示保存的文件名。

1
shell复制代码jcmd激活 jcmd process_id JFR.start duration=100s filename=flight.jfr (JDK 11版本前需要先激活对应的功能)

命令行详解

jcmd命令中包含了操作JFR的所有操作,现在对操作以及参数进行详细解释。

首先,假如你不了解一些命令行的使用规则,你可以使用jcmd help命令去了解对应命令行的使用解释。例如,查看一个JFR.check的命令行使用方式,指定对应的进程ID(本文中使用5361),使用如下命令行:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
shell复制代码jcmd 5361 help JFR.check

# 命令行返回值
5361:
JFR.check
Checks running JFR recording(s)

Impact: Low

Permission: java.lang.management.ManagementPermission(monitor)

Syntax : JFR.check [options]

Options: (options must be specified using the <key> or <key>=<value> syntax)
name : [optional] Recording name, e.g. \"My Recording\" or omit to see all recordings (STRING, no default value)
recording : [optional] Recording number, or omit to see all recordings (JLONG, -1)
verbose : [optional] Print event settings for the recording(s) (BOOLEAN, false)

通过英文的意思,我们可以了解到各个参数的使用方式,以及对应含义和注意事项。

下面我们进入正题,与JFR关联的jcmd命令有以下四种:

  • JFR.start——启动一个新的JFR记录线程
  • JFR.check——检查正在运行的JFR记录线程
  • JFR.stop——停止一个指定的JFR记录线程
  • JFR.dump——拷贝一个指定的JFR记录线程的内容进入文件中
    每个命令行都有对应的参数,现在一一介绍对应的参数详解

JFR.start

参数 说明 值类型 默认值
name 记录线程的名字 String 无
settings 服务端模版 String 无
defaultrecording 开始默认记录 Boolean False
delay 开始记录的延迟时间 Time 0s
duration 记录的时长 Time 0s(表示永远,不中断)
filename 记录的名称 String
compress 使用GZip压缩记录的结果文件 Boolean False
maxage 缓冲区数据的最长使用期限 Time 0s代表没有时间期限制
maxsize 缓存容量的最大数量 Long 0代表没有最大大小

JFR.check

参数 说明 值类型 默认值
name 记录线程的名字 String 无
recording 记录线程的ID值 Long 1
verbose 是否打印详细数据信息 Boolean False

JFR.stop

参数 说明 值类型 默认值
name 记录线程的名字 String 无
recording 记录线程的ID值 Long 1
discard 抛弃记录数据 Boolean 无
copy_to_file 拷贝记录数据到文件 String 无
compress_copy GZip压缩“copy_to_file”的文件 Boolean False

JFR.dump

参数 说明 值类型 默认值
name 记录线程的名字 String 无
recording 记录线程的ID值 Long 1
copy_to_file 拷贝记录数据到文件 String 无
compress_copy GZip压缩“copy_to_file”的文件 Boolean False

实战

介绍完命令行的使用方式以及参数详解,现在我们就来进行实战使用JFR。这里有一个注意点就是,虽然JFR被设计为在JVM和应用程序的性能的影响会小,但是最好设置持续时间(duration),缓冲区数据的最长使用期限(maxage),限制收集的最大数据量(maxsize)。

首先定义个一个内存泄露的主程序,具体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public static void main(String[] args) {
List<Object> items = new ArrayList<>(1);
try {
while (true){
items.add(new Object());
}
} catch (OutOfMemoryError e){
System.out.println(e.getMessage());
}
assert items.size() > 0;
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
System.out.println(e.getMessage());
}
}

使用启动命令:

1
2
java复制代码java -XX:+UnlockCommercialFeatures -XX:+FlightRecorder 
-XX:StartFlightRecording=duration=200s,filename=flight.jfr

当项目启动完成,并发生 OutOfMemoryError 异常后,可以在目录下会发现一个名为flight.jfr的文件,将该文件拖至JDK Mission Control中,会自动进行分析。下图展示JDK Mission Control分析结果:
image.png
现在我们来具体分析这张图的结果:

  • 图片右侧红色的分数表示,该次分析的重点关注的部分,展开后都会进行详细的描述造成原因,例如第一点Application Halts表示由于GC造成应用程序停顿时间过长,比例偏高。如果实在不想看,复制贴贴放进翻译软件中,也会明白大致的意思。
  • 图片的左侧的侧边栏表示每个分析块的详细记录,有内存,有GC,有I/O等等,有红色小感叹号的是重点需要查看的部分。例如我们查看内存这一块

image.png
从图中可以得知,应用程序内存的使用,在5s内迅速跑满,然后触发GC操作,是什么方法导致内存使用如此迅速呢?根据Method Profiling的分析结果,可以看出时ArrayList的拷贝操作导致,如下图:

image.png

由以上操作我们可以快速定位到内存溢出的方法块,其他块的使用大家可以自行进行分析的时候查看,都是可视化的界面非常方便。使用JFR的时候最后不要定义太大的时间块,或者需要切分小块的时间块进行分析,因为大的时间块,会导致JFR文件十分巨大,本地进行分析的时候,也会产生卡顿或者电脑内存不够,导致本地机子卡死,无法分析。

参考资料

Monitoring Java Applications with Flight Recorder

Java性能权威指南

JDK Mission Control

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

据说 99% 的人不知道 vue-devtools 还能直接

发表于 2021-05-07
  1. 前言

大家好,我是若川。最近组织了源码共读活动,感兴趣的可以加我微信 ruochuan12 参与,或者关注我的公众号若川视野,回复“源码”参与。已进行三个月,大家一起交流学习,共同进步,很多人都表示收获颇丰。

想学源码,极力推荐之前我写的《学习源码整体架构系列》 包含jQuery、underscore、lodash、vuex、sentry、axios、redux、koa、vue-devtools、vuex4、koa-compose、vue 3.2 发布、vue-this、create-vue、玩具vite等10余篇源码文章。

本文仓库地址:git clone https://github.com/lxchuan12/open-in-editor.git,本文最佳阅读方式,克隆仓库自己动手调试,容易吸收消化。

要是有人说到怎么读源码,正在读文章的你能推荐我的源码系列文章,那真是无以为报啊。

我的文章尽量写得让想看源码又不知道怎么看的读者能看懂。我都是推荐使用搭建环境断点调试源码学习,哪里不会点哪里,边调试边看,而不是硬看。正所谓:授人与鱼不如授人予渔。

阅读本文后你将学到:

  1. 如何解决该功能报错问题
  2. 如何调试学习源码
  3. launch-editor-middleware、launch-editor 等实现原理

1.1 短时间找不到页面对应源文件的场景

不知道你们有没有碰到这样的场景,打开你自己(或者你同事)开发的页面,却短时间难以找到对应的源文件。

这时你可能会想要是能有点击页面按钮自动用编辑器打开对应文件的功能,那该多好啊。

而vue-devtools提供了这样的功能,也许你不知道。我觉得很大一部分人都不知道,因为感觉很多人都不常用vue-devtools。

open-in-editor

你也许会问,我不用vue,我用react有没有类似功能啊,有啊,请看react-dev-inspector。你可能还会问,支持哪些编辑器呀,主流的 vscode、webstorm、atom、sublime 等都支持,更多可以看这个列表 Supported editors。

本文就是根据学习尤大写的 launch-editor 源码,本着知其然,知其所以然的宗旨,探究 vue-devtools「在编辑器中打开组件」功能实现原理。

1.2 一句话简述其原理

1
sh复制代码code path/to/file

一句话简述原理:利用nodejs中的child_process,执行了类似code path/to/file命令,于是对应编辑器就打开了相应的文件,而对应的编辑器则是通过在进程中执行ps x(Window则用Get-Process)命令来查找的,当然也可以自己指定编辑器。

1.3 打开编辑器无法打开组件的报错解决方法

而你真正用这个功能时,你可能碰到报错,说不能打开这个文件。

1
2
3
sh复制代码Could not open App.vue in the editor.

To specify an editor, specify the EDITOR env variable or add "editor" field to your Vue project config.

控制台不能打开编辑器的错误提示

这里说明下写这篇文章时用的是 Windows 电脑,VSCode 编辑器,在Ubuntu子系统下使用的终端工具。同时推荐我的文章使用 ohmyzsh 打造 windows、ubuntu、mac 系统高效终端命令行工具,用过的都说好。

解决办法也简单,就是这句英文的意思。

1.3.1 方法一:先确保在终端能用命令打开你使用的编辑器,文中以VSCode为例

如果你的命令行本身就不能运行code等命令打开编辑器,那肯定是报错的。这时需要把VSCode注入到命令行终端中。
注入方法也简单。我的交流群里有小伙伴提供了mac电脑的截图。Windows 用户安装VSCode后,终端默认支持 code 命令无需此操作。

mac 电脑在 VSCode command + shift + p,Windows 则是 ctrl + shift + p。然后输入shell,选择安装code。如下图:

Install 'code' command in PATH

这样就能在终端中打开VSCode的了。

如果能在终端打开使用命令编辑器能打开,但实际上还是报错,那么大概率是没有识别到你的编辑器。
那么可以通过方法二设置指定编辑器。

1.3.2 方法二:具体说明编辑器,在环境变量中说明指定编辑器

在vue项目的根目录下,对应本文则是:vue3-project,添加.env.development文件,其内容是EDITOR=code。这里重点说明下,我的 vue-cli 版本是4.5.12,好像在vue-cli 3.5及以上版本才支持自定义EDITOR这样的环境变量。

1
2
3
sh复制代码# .env.development
# 当然,我的命令行终端已经有了code这个命令。
EDITOR=code

不用指定编辑器的对应路径(c/Users/lxchu/AppData/Local/Programs/Microsoft VS Code/bin/code),因为会报错。为什么会报错,因为我看了源码且试过。因为会被根据空格截断,变成c/Users/lxchu/AppData/Local/Programs/Microsoft,当然就报错了。

也有可能你的编辑器路径有中文路径导致报错,可以在环境变量中添加你的编辑器路径。

如果你通过以上方法,还没解决报错问题。欢迎留言,或者加我微信 ruochuan12 交流。毕竟电脑环境不一,很难保证所有人都能正常执行,但我们知道了其原理,就很容易解决问题。

接下来我们从源码角度探究「在编辑器中打开组件」功能的实现原理。

  1. vue-devtools Open component in editor 文档

探究原理之前,先来看看vue-devtools官方文档。

vuejs/vue-devtools
文档

Open component in editor

To enable this feature, follow this guide.

这篇指南中写了在Vue CLI 3中是开箱即用。

1
sh复制代码Vue CLI 3 supports this feature out-of-the-box when running vue-cli-service serve.

也详细写了如何在Webpack下使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
sh复制代码# 1. Import the package:
var openInEditor = require('launch-editor-middleware')
# 2. In the devServer option, register the /__open-in-editor HTTP route:
devServer: {
before (app) {
app.use('/__open-in-editor', openInEditor())
}
}
# 3. The editor to launch is guessed. You can also specify the editor app with the editor option. See the supported editors list.
# 用哪个编辑器打开会自动猜测。你也可以具体指明编辑器。这里显示更多的支持编辑器列表
openInEditor('code')
# 4. You can now click on the name of the component in the Component inspector pane (if the devtools knows about its file source, a tooltip will appear).
# 如果`vue-devtools`开发者工具有提示点击的组件的显示具体路径,那么你可以在编辑器打开。

同时也写了如何在Node.js中使用等。

Node.js

You can use the launch-editor package to setup an HTTP route with the /__open-in-editor path. It will receive file as an URL variable.

查看更多可以看这篇指南。

  1. 环境准备工作

熟悉我的读者,都知道我都是推荐调试看源码的,正所谓:哪里不会点哪里。而且调试一般都写得很详细,是希望能帮助到一部分人知道如何看源码。于是我特意新建一个仓库open-in-editor git clone https://github.com/lxchuan12/open-in-editor.git,便于大家克隆学习。

安装vue-cli

1
2
3
sh复制代码npm install -g @vue/cli
# OR
yarn global add @vue/cli
1
2
3
4
5
6
7
8
9
10
sh复制代码node -V
# v14.16.0
vue -V
# @vue/cli 4.5.12
vue create vue3-project
# 这里选择的是vue3、vue2也是一样的。
# Please pick a preset: Default (Vue 3 Preview) ([Vue 3] babel, eslint)
npm install
# OR
yarn install

这里同时说明下我的vscode版本。

1
2
sh复制代码code -v
1.55.2

前文提到的Vue CLI 3中开箱即用和Webpack使用方法。

vue3-project/package.json中有一个debug按钮。

debug示意图

选择第一项,serve vue-cli-service serve。

我们来搜索下'launch-editor-middleware'这个中间件,一般来说搜索不到node_modules下的文件,需要设置下。当然也有个简单做法。就是「排除的文件」右侧旁边有个设置图标「使用“排查设置”与“忽略文件”」,点击下。

其他的就不赘述了。可以看这篇知乎回答:vscode怎么设置可以搜索包含node_modules中的文件?

这时就搜到了vue3-project/node_modules/@vue/cli-service/lib/commands/serve.js中有使用这个中间件。

如下图所示:

search.png

  1. vue-devtools 开箱即用具体源码实现

接着我们来看Vue CLI 3中开箱即用具体源码实现。

1
2
3
4
5
6
7
8
9
10
11
12
13
js复制代码// vue3-project/node_modules/@vue/cli-service/lib/commands/serve.js
// 46行
const launchEditorMiddleware = require('launch-editor-middleware')
// 192行
before (app, server) {
// launch editor support.
// this works with vue-devtools & @vue/cli-overlay
app.use('/__open-in-editor', launchEditorMiddleware(() => console.log(
`To specify an editor, specify the EDITOR env variable or ` +
`add "editor" field to your Vue project config.\n`
)))
// 省略若干代码...
}

本文项目使用的是Vue3,如果无法访问谷歌应用商店,如果未安装 vue3 对应的 vue-devtools,可以点此下载安装Vue3对应版本的vue-devtools

若平时项目开发使用的是vue2,可以点此下载安装vue2版本的vue-devtools

点击vue-devtools中的图中 open in editor 按钮时,会有一个请求,http://localhost:8080/__open-in-editor?file=src/App.vue,不出意外就会打开该组件啦。

open src/App.vue in editor

接着我们在launchEditorMiddleware的具体实现。

  1. launch-editor-middleware

看源码时,先看调试截图。

debug-launch

在launch-editor-middleware中间件中作用在于最终是调用 launch-editor 打开文件。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
js复制代码// vue3-project/node_modules/launch-editor-middleware/index.js
const url = require('url')
const path = require('path')
const launch = require('launch-editor')

module.exports = (specifiedEditor, srcRoot, onErrorCallback) => {
// specifiedEditor => 这里传递过来的则是 () => console.log() 函数
// 所以和 onErrorCallback 切换下,把它赋值给错误回调函数
if (typeof specifiedEditor === 'function') {
onErrorCallback = specifiedEditor
specifiedEditor = undefined
}

// 如果第二个参数是函数,同样把它赋值给错误回调函数
// 这里传递过来的是undefined
if (typeof srcRoot === 'function') {
onErrorCallback = srcRoot
srcRoot = undefined
}

// srcRoot 是传递过来的参数,或者当前node进程的目录
srcRoot = srcRoot || process.cwd()

// 最后返回一个函数, express 中间件
return function launchEditorMiddleware (req, res, next) {
// 省略 ...
}
}

上一段中,这种切换参数的写法,在很多源码中都很常见。为的是方便用户调用时传参。虽然是多个参数,但可以传一个或者两个。

可以根据情况打上断点。比如这里我会在launch(path.resolve(srcRoot, file), specifiedEditor, onErrorCallback)打断点。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
js复制代码// vue3-project/node_modules/launch-editor-middleware/index.js
module.exports = (specifiedEditor, srcRoot, onErrorCallback) => {
// 省略上半部分
return function launchEditorMiddleware (req, res, next) {
// 根据请求解析出file路径
const { file } = url.parse(req.url, true).query || {}
// 如果没有文件路径,则报错
if (!file) {
res.statusCode = 500
res.end(`launch-editor-middleware: required query param "file" is missing.`)
} else {
// 否则拼接路径,用launch打开。
launch(path.resolve(srcRoot, file), specifiedEditor, onErrorCallback)
res.end()
}
}
}
  1. launch-editor

跟着断点来看,走到了launchEditor函数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
js复制代码// vue3-project/node_modules/launch-editor/index.js
function launchEditor (file, specifiedEditor, onErrorCallback) {
// 解析出文件路径和行号列号等信息
const parsed = parseFile(file)
let { fileName } = parsed
const { lineNumber, columnNumber } = parsed

// 判断文件是否存在,不存在,直接返回。
if (!fs.existsSync(fileName)) {
return
}
// 所以和 onErrorCallback 切换下,把它赋值给错误回调函数
if (typeof specifiedEditor === 'function') {
onErrorCallback = specifiedEditor
specifiedEditor = undefined
}
// 包裹一层函数
onErrorCallback = wrapErrorCallback(onErrorCallback)

// 猜测当前进程运行的是哪个编辑器
const [editor, ...args] = guessEditor(specifiedEditor)
if (!editor) {
onErrorCallback(fileName, null)
return
}
// 省略剩余部分,后文再讲述...
}

6.1 wrapErrorCallback 包裹错误函数回调

1
js复制代码onErrorCallback = wrapErrorCallback(onErrorCallback)

这段的代码,就是传递错误回调函数,wrapErrorCallback 返回给一个新的函数,wrapErrorCallback 执行时,再去执行 onErrorCallback(cb)。

我相信读者朋友能看懂,我单独拿出来讲述,主要是因为这种包裹函数的形式在很多源码里都很常见。

这里也就是文章开头终端错误图Could not open App.vue in the editor.输出的代码位置。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
js复制代码// vue3-project/node_modules/launch-editor/index.js
function wrapErrorCallback (cb) {
return (fileName, errorMessage) => {
console.log()
console.log(
chalk.red('Could not open ' + path.basename(fileName) + ' in the editor.')
)
if (errorMessage) {
if (errorMessage[errorMessage.length - 1] !== '.') {
errorMessage += '.'
}
console.log(
chalk.red('The editor process exited with an error: ' + errorMessage)
)
}
console.log()
if (cb) cb(fileName, errorMessage)
}
}

6.2 guessEditor 猜测当前正在使用的编辑器

这个函数主要做了如下四件事情:

  1. 如果具体指明了编辑器,则解析下返回。
  2. 找出当前进程中哪一个编辑器正在运行。macOS 和 Linux 用 ps x 命令
    windows 则用 Get-Process 命令
  3. 如果都没找到就用 process.env.VISUAL或者process.env.EDITOR。这就是为啥开头错误提示可以使用环境变量指定编辑器的原因。
  4. 最后还是没有找到就返回[null],则会报错。
1
2
3
4
5
js复制代码const [editor, ...args] = guessEditor(specifiedEditor)
if (!editor) {
onErrorCallback(fileName, null)
return
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
js复制代码// vue3-project/node_modules/launch-editor/guess.js
const shellQuote = require('shell-quote')
const childProcess = require('child_process')

module.exports = function guessEditor (specifiedEditor) {
// 如果指定了编辑器,则解析一下,这里没有传入。如果自己指定了路径。
// 比如 c/Users/lxchu/AppData/Local/Programs/Microsoft VS Code/bin/code
// 会根据空格切割成 c/Users/lxchu/AppData/Local/Programs/Microsoft
if (specifiedEditor) {
return shellQuote.parse(specifiedEditor)
}
// We can find out which editor is currently running by:
// `ps x` on macOS and Linux
// `Get-Process` on Windows
try {
// 代码有删减
if (process.platform === 'darwin') {
const output = childProcess.execSync('ps x').toString()
// 省略
} else if (process.platform === 'win32') {
const output = childProcess
.execSync('powershell -Command "Get-Process | Select-Object Path"', {
stdio: ['pipe', 'pipe', 'ignore']
})
.toString()
// 省略
} else if (process.platform === 'linux') {
const output = childProcess
.execSync('ps x --no-heading -o comm --sort=comm')
.toString()
}
} catch (error) {
// Ignore...
}

// Last resort, use old skool env vars
if (process.env.VISUAL) {
return [process.env.VISUAL]
} else if (process.env.EDITOR) {
return [process.env.EDITOR]
}

return [null]
}

看完了 guessEditor 函数,我们接着来看 launch-editor 剩余部分。

6.3 launch-editor 剩余部分

以下这段代码不用细看,调试的时候细看就行。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
js复制代码// vue3-project/node_modules/launch-editor/index.js
function launchEditor(){
// 省略上部分...
if (
process.platform === 'linux' &&
fileName.startsWith('/mnt/') &&
/Microsoft/i.test(os.release())
) {
// Assume WSL / "Bash on Ubuntu on Windows" is being used, and
// that the file exists on the Windows file system.
// `os.release()` is "4.4.0-43-Microsoft" in the current release
// build of WSL, see: https://github.com/Microsoft/BashOnWindows/issues/423#issuecomment-221627364
// When a Windows editor is specified, interop functionality can
// handle the path translation, but only if a relative path is used.
fileName = path.relative('', fileName)
}

if (lineNumber) {
const extraArgs = getArgumentsForPosition(editor, fileName, lineNumber, columnNumber)
args.push.apply(args, extraArgs)
} else {
args.push(fileName)
}

if (_childProcess && isTerminalEditor(editor)) {
// There's an existing editor process already and it's attached
// to the terminal, so go kill it. Otherwise two separate editor
// instances attach to the stdin/stdout which gets confusing.
_childProcess.kill('SIGKILL')
}

if (process.platform === 'win32') {
// On Windows, launch the editor in a shell because spawn can only
// launch .exe files.
_childProcess = childProcess.spawn(
'cmd.exe',
['/C', editor].concat(args),
{ stdio: 'inherit' }
)
} else {
_childProcess = childProcess.spawn(editor, args, { stdio: 'inherit' })
}
_childProcess.on('exit', function (errorCode) {
_childProcess = null

if (errorCode) {
onErrorCallback(fileName, '(code ' + errorCode + ')')
}
})

_childProcess.on('error', function (error) {
onErrorCallback(fileName, error.message)
})
}

这一大段中,主要的就是以下代码,用子进程模块。简单来说子进程模块有着执行命令的能力。

1
2
3
4
5
6
7
8
9
10
11
12
13
js复制代码const childProcess = require('child_process')

if (process.platform === 'win32') {
// On Windows, launch the editor in a shell because spawn can only
// launch .exe files.
_childProcess = childProcess.spawn(
'cmd.exe',
['/C', editor].concat(args),
{ stdio: 'inherit' }
)
} else {
_childProcess = childProcess.spawn(editor, args, { stdio: 'inherit' })
}

行文至此,就基本接近尾声了。原理其实就是利用nodejs中的child_process,执行了类似code path/to/file命令。

  1. 总结

这里总结一下:首先文章开头通过提出「短时间找不到页面对应源文件的场景」,并针对容易碰到的报错情况给出了解决方案。
其次,配置了环境跟着调试学习了vue-devtools中使用的尤大写的 yyx990803/launch-editor。

7.1 一句话简述其原理

我们回顾下开头的原理内容。

1
sh复制代码code path/to/file

一句话简述原理:利用nodejs中的child_process,执行了类似code path/to/file命令,于是对应编辑器就打开了相应的文件,而对应的编辑器则是通过在进程中执行ps x(Window则用Get-Process)命令来查找的,当然也可以自己指定编辑器。

最后还能做什么呢。

可以再看看 umijs/launch-editor 和 react-dev-utils/launchEditor.js 。他们的代码几乎类似。

也可以利用Node.js做一些提高开发效率等工作,同时可以学习child_process等模块。

也不要禁锢自己的思维,把前端禁锢在页面中,应该把视野拓宽。

Node.js是我们前端人探索操作文件、操作网络等的好工具。

如果读者朋友发现有不妥或可改善之处,再或者哪里没写明白的地方,欢迎评论指出。另外觉得写得不错,对您有些许帮助,可以点赞、评论、转发分享,也是对我的一种支持,万分感谢。如果能关注我的前端公众号:「若川视野」,就更好啦。

最后可以持续关注我@若川。欢迎加我微信 ruochuan12 交流,参与 源码共读 活动,大家一起学习源码,共同进步。

参考链接

yyx990803/launch-editor

umijs/launch-editor

vuejs/vue-devtools

vue-devtools open-in-editor.md

“Open in editor” button doesn’t work in Win 10 with VSCode if installation path contains spaces

react-dev-utils/launchEditor.js


关于 && 交流群

最近组织了源码共读活动,感兴趣的可以加我微信 ruochuan12 参与,长期交流学习。

作者:常以若川为名混迹于江湖。欢迎加我微信ruochuan12。前端路上 | 所知甚少,唯善学。

关注公众号若川视野,每周一起学源码,学会看源码,进阶高级前端。

若川的博客

segmentfault若川视野专栏,开通了若川视野专栏,欢迎关注~

掘金专栏,欢迎关注~

知乎若川视野专栏,开通了若川视野专栏,欢迎关注~

github blog,求个star^_^~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ThreadLocal使用与原理

发表于 2021-05-07

在处理多线程并发安全的方法中,最常用的方法,就是使用锁,通过锁来控制多个不同线程对临界区的访问。

但是,无论是什么样的锁,乐观锁或者悲观锁,都会在并发冲突的时候对性能产生一定的影响。

那有没有一种方法,可以彻底避免竞争呢?

答案是肯定的,这就是ThreadLocal。

从字面意思上看,ThreadLocal可以解释成线程的局部变量,也就是说一个ThreadLocal的变量只有当前自身线程可以访问,别的线程都访问不了,那么自然就避免了线程竞争。

因此,ThreadLocal提供了一种与众不同的线程安全方式,它不是在发生线程冲突时想办法解决冲突,而是彻底的避免了冲突的发生。

ThreadLocal的基本使用

创建一个ThreadLocal对象:

1
java复制代码private ThreadLocal<Integer> localInt = new ThreadLocal<>();

上述代码创建一个localInt变量,由于ThreadLocal是一个泛型类,这里指定了localInt的类型为整数。

下面展示了如果设置和获取这个变量的值:

1
2
3
4
java复制代码public int setAndGet(){
localInt.set(8);
return localInt.get();
}

上述代码设置变量的值为8,接着取得这个值。

由于ThreadLocal里设置的值,只有当前线程自己看得见,这意味着你不可能通过其他线程为它初始化值。为了弥补这一点,ThreadLocal提供了一个withInitial()方法统一初始化所有线程的ThreadLocal的值:

1
java复制代码private ThreadLocal<Integer> localInt = ThreadLocal.withInitial(() -> 6);

上述代码将ThreadLocal的初始值设置为6,这对全体线程都是可见的。

ThreadLocal的实现原理

ThreadLocal变量只在单个线程内可见,那它是如何做到的呢?我们先从最基本的get()方法说起:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public T get() {
//获得当前线程
Thread t = Thread.currentThread();
//每个线程 都有一个自己的ThreadLocalMap,
//ThreadLocalMap里就保存着所有的ThreadLocal变量
ThreadLocalMap map = getMap(t);
if (map != null) {
//ThreadLocalMap的key就是当前ThreadLocal对象实例,
//多个ThreadLocal变量都是放在这个map中的
ThreadLocalMap.Entry e = map.getEntry(this);
if (e != null) {
@SuppressWarnings("unchecked")
//从map里取出来的值就是我们需要的这个ThreadLocal变量
T result = (T)e.value;
return result;
}
}
// 如果map没有初始化,那么在这里初始化一下
return setInitialValue();
}

可以看到,所谓的ThreadLocal变量就是保存在每个线程的map中的。这个map就是Thread对象中的threadLocals字段。如下:

1
java复制代码ThreadLocal.ThreadLocalMap threadLocals = null;

ThreadLocal.ThreadLocalMap是一个比较特殊的Map,它的每个Entry的key都是一个弱引用:

1
2
3
4
5
6
7
8
9
java复制代码static class Entry extends WeakReference<ThreadLocal<?>> {
/** The value associated with this ThreadLocal. */
Object value;
//key就是一个弱引用
Entry(ThreadLocal<?> k, Object v) {
super(k);
value = v;
}
}

这样设计的好处是,如果这个变量不再被其他对象使用时,可以自动回收这个ThreadLocal对象,避免可能的内存泄露(注意,Entry中的value,依然是强引用,如何回收,见下文分解)。

理解ThreadLocal中的内存泄漏问题

虽然ThreadLocalMap中的key是弱引用,当不存在外部强引用的时候,就会自动被回收,但是Entry中的value依然是强引用。这个value的引用链条如下:

可以看到,只有当Thread被回收时,这个value才有被回收的机会,否则,只要线程不退出,value总是会存在一个强引用。但是,要求每个Thread都会退出,是一个极其苛刻的要求,对于线程池来说,大部分线程会一直存在在系统的整个生命周期内,那样的话,就会造成value对象出现泄漏的可能。处理的方法是,在ThreadLocalMap进行set(),get(),remove()的时候,都会进行清理:

以getEntry()为例:

1
2
3
4
5
6
7
8
9
10
java复制代码private Entry getEntry(ThreadLocal<?> key) {
int i = key.threadLocalHashCode & (table.length - 1);
Entry e = table[i];
if (e != null && e.get() == key)
//如果找到key,直接返回
return e;
else
//如果找不到,就会尝试清理,如果你总是访问存在的key,那么这个清理永远不会进来
return getEntryAfterMiss(key, i, e);
}

下面是getEntryAfterMiss()的实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码private Entry getEntryAfterMiss(ThreadLocal<?> key, int i, Entry e) {
Entry[] tab = table;
int len = tab.length;

while (e != null) {
// 整个e是entry ,也就是一个弱引用
ThreadLocal<?> k = e.get();
//如果找到了,就返回
if (k == key)
return e;
if (k == null)
//如果key为null,说明弱引用已经被回收了
//那么就要在这里回收里面的value了
expungeStaleEntry(i);
else
//如果key不是要找的那个,那说明有hash冲突,这里是处理冲突,找下一个entry
i = nextIndex(i, len);
e = tab[i];
}
return null;
}

真正用来回收value的是expungeStaleEntry()方法,在remove()和set()方法中,都会直接或者间接调用到这个方法进行value的清理:

从这里可以看到,ThreadLocal为了避免内存泄露,也算是花了一番大心思。不仅使用了弱引用维护key,还会在每个操作上检查key是否被回收,进而再回收value。

但是从中也可以看到,ThreadLocal并不能100%保证不发生内存泄漏。

比如,很不幸的,你的get()方法总是访问固定几个一直存在的ThreadLocal,那么清理动作就不会执行,如果你没有机会调用set()和remove(),那么这个内存泄漏依然会发生。

因此,一个良好的习惯依然是:当你不需要这个ThreadLocal变量时,主动调用remove(),这样对整个系统是有好处的。

ThreadLocalMap中的Hash冲突处理

ThreadLocalMap作为一个HashMap和java.util.HashMap的实现是不同的。对于java.util.HashMap使用的是链表法来处理冲突:

但是,对于ThreadLocalMap,它使用的是简单的线性探测法,如果发生了元素冲突,那么就使用下一个槽位存放:

具体来说,整个set()的过程如下:

可以被继承的ThreadLocal——InheritableThreadLocal

在实际开发过程中,我们可能会遇到这么一种场景。主线程开了一个子线程,但是我们希望在子线程中可以访问主线程中的ThreadLocal对象,也就是说有些数据需要进行父子线程间的传递。比如像这样:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public static void main(String[] args) {
ThreadLocal threadLocal = new ThreadLocal();
IntStream.range(0,10).forEach(i -> {
//每个线程的序列号,希望在子线程中能够拿到
threadLocal.set(i);
//这里来了一个子线程,我们希望可以访问上面的threadLocal
new Thread(() -> {
System.out.println(Thread.currentThread().getName() + ":" + threadLocal.get());
}).start();
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}

执行上述代码,你会看到:

1
2
3
4
java复制代码Thread-0:null
Thread-1:null
Thread-2:null
Thread-3:null

因为在子线程中,是没有threadLocal的。如果我们希望子线可以看到父线程的ThreadLocal,那么就可以使用InheritableThreadLocal。顾名思义,这就是一个支持线程间父子继承的ThreadLocal,将上述代码中的threadLocal使用InheritableThreadLocal:

1
java复制代码InheritableThreadLocal threadLocal = new InheritableThreadLocal();

再执行,就能看到:

1
2
3
4
5
java复制代码Thread-0:0
Thread-1:1
Thread-2:2
Thread-3:3
Thread-4:4

可以看到,每个线程都可以访问到从父进程传递过来的一个数据。虽然InheritableThreadLocal看起来挺方便的,但是依然要注意以下几点:

  1. 变量的传递是发生在线程创建的时候,如果不是新建线程,而是用了线程池里的线程,就不灵了
  2. 变量的赋值就是从主线程的map复制到子线程,它们的value是同一个对象,如果这个对象本身不是线程安全的,那么就会有线程安全问题

写在最后的话

今天,我们介绍了ThreadLocal,ThreadLocal在Java的多线程开发中有着十分重要的作用。

在这里,我们介绍了ThreadLocal的基本使用和实现原理,尤其重点介绍了基于当前实现原理下可能存在的内存泄漏问题。

最后,还介绍了一个用于在父子线程间传递数据的特殊的ThreadLocal实现,希望对大家有所帮助。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

mq的那些破事儿,你不好奇吗?

发表于 2021-05-06

大家好,我是苏三,又和大家见面了。

前言

最近mq越来越火,很多公司在用,很多人在用,其重要性不言而喻。但是如果我让你回答下面的这些问题:

  1. 我们为什么要用mq?
  2. 引入mq会多哪些问题?
  3. 如何解决这些问题?

你心中是否有答案了呢?本文将会一一为你解答,这些看似平常却很有意义的问题。

1 传统模式有哪些痛点?

1.1 痛点1

有些复杂的业务系统,一次用户请求可能会同步调用N个系统的接口,需要等待所有的接口都返回了,才能真正的获取执行结果。

这种同步接口调用的方式总耗时比较长,非常影响用户的体验,特别是在网络不稳定的情况下,极容易出现接口超时问题。

1.2 痛点2

很多复杂的业务系统,一般都会拆分成多个子系统。我们在这里以用户下单为例,请求会先通过订单系统,然后分别调用:支付系统、库存系统、积分系统 和 物流系统。系统之间耦合性太高,如果调用的任何一个子系统出现异常,整个请求都会异常,对系统的稳定性非常不利。

1.3 痛点3

有时候为了吸引用户,我们会搞一些活动,比如秒杀等。

如果用户少还好,不会影响系统的稳定性。但如果用户突增,一时间所有的请求都到数据库,可能会导致数据库无法承受这么大的压力,响应变慢或者直接挂掉。

对于这种突然出现的请求峰值,无法保证系统的稳定性。

2 为什么要用mq?

对于上面传统模式的三类问题,我们使用mq就能轻松解决。

2.1 异步

对于痛点1:同步接口调用导致响应时间长的问题,使用mq之后,将同步调用改成异步,能够显著减少系统响应时间。

系统A作为消息的生产者,在完成本职工作后,就能直接返回结果了。而无需等待消息消费者的返回,它们最终会独立完成所有的业务功能。

这样能避免总耗时比较长,从而影响用户的体验的问题。

2.2 解耦

对于痛点2:子系统间耦合性太大的问题,使用mq之后,我们只需要依赖于mq,避免了各个子系统间的强依赖问题。订单系统作为消息生产者,保证它自己没有异常即可,不会受到支付系统等业务子系统的异常影响,并且各个消费者业务子系统之间,也互不影响。

这样就把之前复杂的业务子系统的依赖关系,转换为只依赖于mq的简单依赖,从而显著的降低了系统间的耦合度。

2.3 消峰

对于痛点3:由于突然出现的请求峰值,导致系统不稳定的问题。使用mq后,能够起到消峰的作用。

订单系统接收到用户请求之后,将请求直接发送到mq,然后订单消费者从mq中消费消息,做写库操作。如果出现请求峰值的情况,由于消费者的消费能力有限,会按照自己的节奏来消费消息,多的请求不处理,保留在mq的队列中,不会对系统的稳定性造成影响。

3 引入mq会多哪些问题?

引入mq后让我们子系统间耦合性降低了,异步处理机制减少了系统的响应时间,同时能够有效的应对请求峰值问题,提升系统的稳定性。

但是,引入mq同时也会带来一些问题。

3.1 重复消息问题

重复消费问题可以说是mq中普遍存在的问题,不管你用哪种mq都无法避免。

有哪些场景会出现重复的消息呢?

  1. 消息生产者产生了重复的消息
  2. kafka和rocketmq的offset被回调了
  3. 消息消费者确认失败
  4. 消息消费者确认时超时了
  5. 业务系统主动发起重试

如果重复消息不做正确的处理,会对业务造成很大的影响,产生重复的数据,或者导致数据异常,比如会员系统多开通了一个月的会员。

3.2 数据一致性问题

很多时候,如果mq的消费者业务处理异常的话,就会出现数据一致性问题。比如:一个完整的业务流程是,下单成功之后,送100个积分。下单写库了,但是消息消费者在送积分的时候失败了,就会造成数据不一致的情况,即该业务流程的部分数据写库了,另外一部分没有写库。

如果下单和送积分在同一个事务中,要么同时成功,要么同时失败,是不会出现数据一致性问题的。

但由于跨系统调用,为了性能考虑,一般不会使用强一致性的方案,而改成达成最终一致性即可。

3.3 消息丢失问题

同样消息丢失问题,也是mq中普遍存在的问题,不管你用哪种mq都无法避免。

有哪些场景会出现消息丢失问题呢?

  1. 消息生产者发生消息时,由于网络原因,发生到mq失败了。
  2. mq服务器持久化时,磁盘出现异常
  3. kafka和rocketmq的offset被回调时,略过了很多消息。
  4. 消息消费者刚读取消息,已经ack确认了,但业务还没处理完,服务就被重启了。

导致消息丢失问题的原因挺多的,生产者、mq服务器、消费者 都有可能产生问题,我在这里就不一一列举了。最终的结果会导致消费者无法正确的处理消息,而导致数据不一致的情况。

3.4 消息顺序问题

有些业务数据是有状态的,比如订单有:下单、支付、完成、退货等状态,如果订单数据作为消息体,就会涉及顺序问题了。

如果消费者收到同一个订单的两条消息,第一条消息的状态是下单,第二条消息的状态是支付,这是没问题的。但如果第一条消息的状态是支付,第二条消息的状态是下单就会有问题了,没有下单就先支付了?

消息顺序问题是一个非常棘手的问题,比如:

  • kafka同一个partition中能保证顺序,但是不同的partition无法保证顺序。
  • rabbitmq的同一个queue能够保证顺序,但是如果多个消费者同一个queue也会有顺序问题。

如果消费者使用多线程消费消息,也无法保证顺序。

如果消费消息时同一个订单的多条消息中,中间的一条消息出现异常情况,顺序将会被打乱。

还有如果生产者发送到mq中的路由规则,跟消费者不一样,也无法保证顺序。

3.5 消息堆积

如果消息消费者读取消息的速度,能够跟上消息生产者的节奏,那么整套mq机制就能发挥最大作用。但是很多时候,由于某些批处理,或者其他原因,导致消息消费的速度小于生产的速度。这样会直接导致消息堆积问题,从而影响业务功能。

这里以下单开通会员为例,如果消息出现堆积,会导致用户下单之后,很久之后才能变成会员,这种情况肯定会引起大量用户投诉。

3.6 系统复杂度提升

这里说的系统复杂度和系统耦合性是不一样的,比如以前只有:系统A、系统B和系统C 这三个系统,现在引入mq之后,你除了需要关注前面三个系统之外,还需要关注mq服务,需要关注的点越多,系统的复杂度越高。mq的机制需要:生产者、mq服务器、消费者。

有一定的学习成本,需要额外部署mq服务器,而且有些mq比如:rocketmq,功能非常强大,用法有点复杂,如果使用不好,会出现很多问题。有些问题,不像接口调用那么容易排查,从而导致系统的复杂度提升了。

4 如何解决这些问题?

mq是一种趋势,总体来说对我们的系统是利大于弊的,难道因为它会出现一些问题,我们就不用它了?

那么我们要如何解决这些问题呢?

4.1 重复消息问题

不管是由于生产者产生的重复消息,还是由于消费者导致的重复消息,我们都可以在消费者中这个问题。

这就要求消费者在做业务处理时,要做幂等设计,如果有不知道如何设计的朋友,可以参考《高并发下如何保证接口的幂等性?》,里面介绍得非常详情。

在这里我推荐增加一张消费消息表,来解决mq的这类问题。消费消息表中,使用messageId做唯一索引,在处理业务逻辑之前,先根据messageId查询一下该消息有没有处理过,如果已经处理过了则直接返回成功,如果没有处理过,则继续做业务处理。

4.2 数据一致性问题

我们都知道数据一致性分为:

  • 强一致性
  • 弱一致性
  • 最终一致性

而mq为了性能考虑使用的是最终一致性,那么必定会出现数据不一致的问题。这类问题大概率是因为消费者读取消息后,业务逻辑处理失败导致的,这时候可以增加重试机制。

重试分为:同步重试 和 异步重试。

有些消息量比较小的业务场景,可以采用同步重试,在消费消息时如果处理失败,立刻重试3-5次,如何还是失败,则写入到记录表中。但如果消息量比较大,则不建议使用这种方式,因为如果出现网络异常,可能会导致大量的消息不断重试,影响消息读取速度,造成消息堆积。

而消息量比较大的业务场景,建议采用异步重试,在消费者处理失败之后,立刻写入重试表,有个job专门定时重试。

还有一种做法是,如果消费失败,自己给同一个topic发一条消息,在后面的某个时间点,自己又会消费到那条消息,起到了重试的效果。如果对消息顺序要求不高的场景,可以使用这种方式。

4.3 消息丢失问题

不管你是否承认有时候消息真的会丢,即使这种概率非常小,也会对业务有影响。生产者、mq服务器、消费者都有可能会导致消息丢失的问题。

为了解决这个问题,我们可以增加一张消息发送表,当生产者发完消息之后,会往该表中写入一条数据,状态status标记为待确认。如果消费者读取消息之后,调用生产者的api更新该消息的status为已确认。有个job,每隔一段时间检查一次消息发送表,如果5分钟(这个时间可以根据实际情况来定)后还有状态是待确认的消息,则认为该消息已经丢失了,重新发条消息。

这样不管是由于生产者、mq服务器、还是消费者导致的消息丢失问题,job都会重新发消息。

4.4 消息顺序问题

消息顺序问题是我们非常常见的问题,我们以kafka消费订单消息为例。订单有:下单、支付、完成、退货等状态,这些状态是有先后顺序的,如果顺序错了会导致业务异常。

解决这类问题之前,我们先确认一下,消费者是否真的需要知道中间状态,只知道最终状态行不行?

其实很多时候,我真的需要知道的是最终状态,这时可以把流程优化一下:

这种方式可以解决大部分的消息顺序问题。

但如果真的有需要保证消息顺序的需求。订单号路由到不同的partition,同一个订单号的消息,每次到发到同一个partition。

4.5 消息堆积

如果消费者消费消息的速度小于生产者生产消息的速度,将会出现消息堆积问题。其实这类问题产生的原因很多,如果你想进一步了解,可以看看我的另一篇文章《我用kafka两年踩过的一些非比寻常的坑》。

那么消息堆积问题该如何解决呢?

这个要看消息是否需要保证顺序。

如果不需要保证顺序,可以读取消息之后用多线程处理业务逻辑。

这样就能增加业务逻辑处理速度,解决消息堆积问题。但是线程池的核心线程数和最大线程数需要合理配置,不然可能会浪费系统资源。

如果需要保证顺序,可以读取消息之后,将消息按照一定的规则分发到多个队列中,然后在队列中用单线程处理。

好了,今天先分享到这来,下期再见。我在这里只是抛砖引玉,其实mq相关的内容还有很多,比如:定时发送、延迟发送、私信队列、事务问题等等,有兴趣的朋友可以找我私聊。

最后说一句(求关注,别白嫖我)

如果这篇文章对您有所帮助,或者有所启发的话,帮忙扫描下发二维码关注一下,您的支持是我坚持写作最大的动力。

求一键三连:点赞、转发、在看。

关注公众号:【苏三说技术】,在公众号中回复:面试、代码神器、开发手册、时间管理有超赞的粉丝福利,另外回复:加群,可以跟很多BAT大厂的前辈交流和学习。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

这样设置,让你的 IDEA 好看到爆炸

发表于 2021-05-06

Hello,大家好,我是楼下小黑哥。

今天这篇文章是次条视频的文案,这里推荐大家直接看视频学习。

IDEA 这样设置,好看到爆炸!!!#01

今天这期我们来分享几个美化 IDEA 设置技巧,让你的 IDEA 与众不同。

首先我们来看下 IDEA 默认设置,虽然不丑,但就是太单调,千篇一律。

默认主题

接着,我们来看下美化以后的界面,总体看起来是不是比默认好看了很多。

好了,废话不多话,我们进入设置环节。

那今天 IDEA 所有设置,小黑哥这里都总结好,大家可以看下这张思维导图。

主题

首先我们来设置 IDEA 主题的。

那 IDEA 自带了两款主题,一款是浅色主题,一款是深色主题。

那显然这两款满足不了了我们需求,所以我们需要下载其他主题。

那 IDEA 的插件仓库其实提供很多主题插件,那有些小伙伴可能还不知道怎么去查找这些主题插件。

那这里教大家一个小技巧。

打开 IDEA Setting-Plugins,然后在搜索框输入 /tag:Theme

此时,这里搜索结果就会显示所有主题的插件,那大家根据自己的喜欢安装即可。

那这里小黑哥推荐两款主题插件:

  • Gradianto
  • Material Theme UI

Gradianto

那这款插件主要提供暗黑模式相关主题,这个主题有个好处就是比较护眼,看久了眼镜也不是很酸。

这款插件主要四种主题,分别是:

Nature Green

Nature Green

Deep OceanDeep Ocean

Dark Fuchsia

Dark Fuchsia

Midnight Blue

Midnight Blue

Material Theme UI

上面这个插件它仅仅提供几个主题,下面这款插件呢,它的功能的就很齐全,不仅能设置主题,还能设置 IDEA 其他 UI。

那安装好之后,这个插件他会弹出一个设置窗口:

这个设置比较简单,这里就不具体介绍,大家挑自己喜欢设置即可。

那这里,如果你不小心关掉了,那也没关系,我们可以从 Setting 进去,搜索 Material,就可以找到。

这个设置地方,首先我们挑选自己喜欢的主题。

这里我个人比较细喜欢 Atom One Dark,推荐大家可以试试。

那第二点,设置 Tab(标签页) 的大小,高度这些。

这里的 Tab,指的就是上方窗口的标签页

第二个就是设置标签页的高亮颜色,当这个标签页被选中的时候,下方就会显示这个高亮颜色。

第三点,设置 Compact 这个选项。

那这个选项,主要是减少 UI 的高度,设置之后看起来就可以更紧凑一点。

第三个设置选项,Components 选项。

图标

IDEA 默认自带了一下文件图标,比如这个 pom 就会显示下面这个一样。

不过很多其他类型文件就没有这个图标。

那我们可以通过下载安装 Atom Material Icons 这个插件,扩展这些文件图标。

这个插件支持很多不同类型的文件。

另外它还支持设置文件夹图标。

插件安装完成之后,进入 Setting-Atom Material Icons Setting 选项,可以看到好几个选项。

Enable File Icons 与 Enable Directory Icons 这两个选项主要设置打开上面说的文件图标。

那 Enable UI Icons ,这个设置选项是下面这一排 UI 图标。

最后,不推荐大家打开 Enable PSI Icons 这个选项,因为打开之后,Java 文件图标都会变成这样,反而不容易区分了。

字体

这里推荐两款程序员专用字体:

  • Fira Code
  • JetBrains Mono

Fira Code 下载地址为:

github.com/tonsky/Fira…

JetBrains Mono 这款字体就需要这么麻烦,新版 IDEA 自带这款字体。

为什么说是程序员专用字体呢?

那是因为这两款字体都加入一个重要功能,编程连字特性(ligatures)。

这连个字体利用这个特性对编程中的常用符号进行优化,比如把输入的「!=」直接显示成「≠」或者把「>=」变成「≥ 」等等,以此来提高代码的可读性。

以 Fira Code 为例,左边是打开编程连字特性,右边则是没有打开。

字体安装完成之后,打开 IDEA Setting-Font 设置选项,选择 Fira Code 字体或者 JetBrains Mono 字体,选中 Enable ligatures。

除此之外,上面 Material Theme UI 自带的主题,默认配置字体都偏小,这里我们可以适当调大。

背景图

IDEA 默认情况背景是纯色,比如黑色,比较单调,这里我们可以设置一下背景图。

打开 Setting-Appearance,点击 Background Image 按钮。

背景图设置窗口如下:

其他

最后,介绍最后一个插件- Rainbow Brackets。

这款插件主要作用就是用各种鲜明的颜色显示括号。如果我们有很多括号,这样就可以很容易分清楚括号配对问题。

最后

今天文章给大家介绍美化 IDEA 相关设置技巧,那如果你还有其他可以美化 IDEA 技巧,欢迎在留言区评论一下。

如果大家觉得还不错,点赞,在看,分享,一键三连支持我一下。
那如果你还没关注我的公号的话,那请点击下面小卡片快速关注一波。

另外,也可以关注一下我的 B 站账号-楼下小黑哥520。

我的频道是专注于程序员干货知识分享,
ok,那我们下期视频再见。

本篇文章由一文多发平台ArtiPub自动发布

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试官:请用五种方法实现多线程交替打印问题

发表于 2021-05-06

三个线程T1、T2、T3,如何让他们按顺序执行?

这是一道面试中常考的并发编程的代码题,与它相似的问题有:

  • 三个线程T1、T2、T3轮流打印ABC,打印n次,如ABCABCABCABC…….
  • 两个线程交替打印1-100的奇偶数
  • N个线程循环打印1-100
  • ……

其实这类问题本质上都是线程通信问题,思路基本上都是一个线程执行完毕,阻塞该线程,唤醒其他线程,按顺序执行下一个线程。下面先来看最简单的,如何按顺序执行三个线程。

synchronized+wait/notify

基本思路就是线程A、线程B、线程C三个线程同时启动,因为变量num的初始值为0,所以线程B或线程C拿到锁后,进入while()循环,然后执行wait()方法,线程线程阻塞,释放锁。只有线程A拿到锁后,不进入while()循环,执行num++,打印字符A,最后唤醒线程B和线程C。此时num值为1,只有线程B拿到锁后,不被阻塞,执行num++,打印字符B,最后唤醒线程A和线程C,后面以此类推。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
java复制代码class Wait_Notify_ACB {

private int num;
private static final Object LOCK = new Object();

private void printABC(int targetNum) {
synchronized (LOCK) {
while (num % 3 != targetNum) { //想想这里为什么不能用if代替while,想不起来可以看公众号上一篇文章
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.print(Thread.currentThread().getName());
LOCK.notifyAll();
}
}

public static void main(String[] args) {
Wait_Notify_ACB wait_notify_acb = new Wait_Notify_ACB ();
new Thread(() -> {
wait_notify_acb.printABC(0);
}, "A").start();
new Thread(() -> {
wait_notify_acb.printABC(1);
}, "B").start();
new Thread(() -> {
wait_notify_acb.printABC(2);
}, "C").start();
}
}

输入结果:

1
2
java复制代码ABC
Process finished with exit code 0

接下来看看第一个问题,三个线程T1、T2、T3轮流打印ABC,打印n次。其实只需要将上述代码加一个循环即可,这里假设n=10。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码class Wait_Notify_ACB {

private int num;
private static final Object LOCK = new Object();

private void printABC(int targetNum) {
for (int i = 0; i < 10; i++) {
synchronized (LOCK) {
while (num % 3 != targetNum) { //想想这里为什么不能用if代替,想不起来可以看公众号上一篇文章
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
num++;
System.out.print(Thread.currentThread().getName());
LOCK.notifyAll();
}
}
}
public static void main(String[] args) {
Wait_Notify_ACB wait_notify_acb = new Wait_Notify_ACB ();
new Thread(() -> {
wait_notify_acb.printABC(0);
}, "A").start();
new Thread(() -> {
wait_notify_acb.printABC(1);
}, "B").start();
new Thread(() -> {
wait_notify_acb.printABC(2);
}, "C").start();
}
}

输出结果:

1
2
java复制代码ABCABCABCABCABCABCABCABCABCABC
Process finished with exit code 0

下面看第二个问题,两个线程交替打印1-100的奇偶数,为了减少输入所占篇幅,这里将100 改成了10。基本思路上面类似,线程odd先拿到锁——打印数字——唤醒线程even——阻塞线程odd,以此循环。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码class  Wait_Notify_Odd_Even{

private Object monitor = new Object();
private volatile int count;

Wait_Notify_Odd_Even(int initCount) {
this.count = initCount;
}

private void printOddEven() {
synchronized (monitor) {
while (count < 10) {
try {
System.out.print( Thread.currentThread().getName() + ":");
System.out.println(++count);
monitor.notifyAll();
monitor.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
//防止count=10后,while()循环不再执行,有子线程被阻塞未被唤醒,导致主线程不能退出
monitor.notifyAll();
}
}

public static void main(String[] args) throws InterruptedException {

Wait_Notify_Odd_Even waitNotifyOddEven = new Wait_Notify_Odd_Even(0);
new Thread(waitNotifyOddEven::printOddEven, "odd").start();
Thread.sleep(10); //为了保证线程odd先拿到锁
new Thread(waitNotifyOddEven::printOddEven, "even").start();
}
}

运行结果:

1
2
3
4
5
6
7
8
9
10
java复制代码odd:1
even:2
odd:3
even:4
odd:5
even:6
odd:7
even:8
odd:9
even:10

再看第三个问题,N个线程循环打印1-100,其实仔细想想这个和三个线程循环打印ABC并没有什么本质区别,只需要加上判断是否到了打印数字的最大值的语句即可。假设N=3,为了能把输出结果完全显示,打印1-10,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
java复制代码class Wait_Notify_100 {

private int num;
private static final Object LOCK = new Object();
private int maxnum = 10;

private void printABC(int targetNum) {
while (true) {
synchronized (LOCK) {
while (num % 3 != targetNum) { //想想这里为什么不能用if代替,想不起来可以看公众号上一篇文章
if(num >= maxnum){
break;
}
try {
LOCK.wait();
} catch (InterruptedException e) {
e.printStackTrace();
}
}
if(num >= maxnum){
break;
}
num++;
System.out.println(Thread.currentThread().getName() + ": " + num);
LOCK.notifyAll();
}
}

}

public static void main(String[] args) {
Wait_Notify_100 wait_notify_100 = new Wait_Notify_100 ();
new Thread(() -> {
wait_notify_100.printABC(0);
}, "thread1").start();
new Thread(() -> {
wait_notify_100.printABC(1);
}, "thread2").start();
new Thread(() -> {
wait_notify_100.printABC(2);
}, "thread3").start();
}
}

输出结果:

1
2
3
4
5
6
7
8
9
10
java复制代码thread1: 1
thread2: 2
thread3: 3
thread1: 4
thread2: 5
thread3: 6
thread1: 7
thread2: 8
thread3: 9
thread1: 10

面试官: 大家都是用的synchronized+wait/notify,你能不能换个方法解决该问题?

我: 好的,我还会用join()方法

下面介绍的方法只给出第一道题的代码了,否则太长了,相信大家可以举一反三

join()

join()方法:在A线程中调用了B线程的join()方法时,表示只有当B线程执行完毕时,A线程才能继续执行。基于这个原理,我们使得三个线程按顺序执行,然后循环多次即可。无论线程1、线程2、线程3哪个先执行,最后执行的顺序都是线程1——>线程2——>线程3。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码class Join_ABC {

public static void main(String[] args) throws InterruptedException {
for (int i = 0; i < 10; i++) {
Thread t1 = new Thread(new printABC(null),"A");
Thread t2 = new Thread(new printABC(t1),"B");
Thread t3 = new Thread(new printABC(t2),"C");
t0.start();
t1.start();
t2.start();
Thread.sleep(10); //这里是要保证只有t1、t2、t3为一组,进行执行才能保证t1->t2->t3的执行顺序。
}

}

static class printABC implements Runnable{
private Thread beforeThread;
public printABC(Thread beforeThread) {
this.beforeThread = beforeThread;
}
@Override
public void run() {
if(beforeThread!=null) {
try {
beforeThread.join();
System.out.print(Thread.currentThread().getName());
}catch(Exception e){
e.printStackTrace();
}
}else {
System.out.print(Thread.currentThread().getName());
}

}
}
}

输出结果:

1
java复制代码ABCABCABCABCABCABCABCABCABCABC

面试官: 还会其他方法吗?

我: 还会使用Lock解决该问题。

Lock

该方法很容易理解,不管哪个线程拿到锁,只有符合条件的才能打印。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码 class Lock_ABC {

private int num; // 当前状态值:保证三个线程之间交替打印
private Lock lock = new ReentrantLock();


private void printABC(int targetNum) {
for (int i = 0; i < 10; ) {
lock.lock();
if (num % 3 == targetNum) {
num++;
i++;
System.out.print(Thread.currentThread().getName());
}
lock.unlock();
}
}

public static void main(String[] args) {
Lock_ABC lockABC = new Lock_ABC();

new Thread(() -> {
lockABC.printABC(0);
}, "A").start();

new Thread(() -> {
lockABC.printABC(1);
}, "B").start();

new Thread(() -> {
lockABC.printABC(2);
}, "C").start();
}
}

输出结果:

1
java复制代码ABCABCABCABCABCABCABCABCABCABC

面试官: 该方法存在什么问题,可以进一步优化吗

我: 可以使用Lock+Condition实现对线程的精准唤醒,减少对同步锁的无意义竞争,浪费资源。

Lock+Condition

该思路和synchronized+wait/notify方法的很像,synchronized对应lock,await/signal方法对应wait/notify方法。下面的代码为了能精准地唤醒下一个线程,创建了多个Condition对象。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
java复制代码class LockConditionABC {

private int num;
private static Lock lock = new ReentrantLock();
private static Condition c1 = lock.newCondition();
private static Condition c2 = lock.newCondition();
private static Condition c3 = lock.newCondition();

private void printABC(int targetNum, Condition currentThread, Condition nextThread) {
for (int i = 0; i < 10; ) {
lock.lock();
try {
while (num % 3 != targetNum) {
currentThread.await(); //阻塞当前线程
}
num++;
i++;
System.out.print(Thread.currentThread().getName());
nextThread.signal(); //唤醒下一个线程,而不是唤醒所有线程
} catch (Exception e) {
e.printStackTrace();
} finally {
lock.unlock();
}
}
}

public static void main(String[] args) {
LockConditionABC print = new LockConditionABC();
new Thread(() -> {
print.printABC(0, c1, c2);
}, "A").start();
new Thread(() -> {
print.printABC(1, c2, c3);
}, "B").start();
new Thread(() -> {
print.printABC(2, c3, c1);
}, "C").start();
}
}

输出结果:

1
java复制代码ABCABCABCABCABCABCABCABCABCABC

面试官: 除了该方法,还有什么方法可以避免唤醒其他无意义的线程避免资源浪费?

我: 可以通过使用信号量来实现。

Semaphore

Semaphore:用来控制同时访问某个特定资源的操作数量,或者同时执行某个制定操作的数量。Semaphore内部维护了一个计数器,其值为可以访问的共享资源的个数。

一个线程要访问共享资源,先使用acquire()方法获得信号量,如果信号量的计数器值大于等于1,意味着有共享资源可以访问,则使其计数器值减去1,再访问共享资源。如果计数器值为0,线程进入休眠。

当某个线程使用完共享资源后,使用release()释放信号量,并将信号量内部的计数器加1,之前进入休眠的线程将被唤醒并再次试图获得信号量。

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码class SemaphoreABC {

private static Semaphore s1 = new Semaphore(1); //因为先执行线程A,所以这里设s1的计数器为1
private static Semaphore s2 = new Semaphore(0);
private static Semaphore s3 = new Semaphore(0);


private void printABC(Semaphore currentThread, Semaphore nextThread) {
for (int i = 0; i < 10; i++) {
try {
currentThread.acquire(); //阻塞当前线程,即信号量的计数器减1为0
System.out.print(Thread.currentThread().getName());
nextThread.release(); //唤醒下一个线程,即信号量的计数器加1

} catch (InterruptedException e) {
e.printStackTrace();
}
}
}

public static void main(String[] args) throws InterruptedException {
SemaphoreABC printer = new SemaphoreABC();
new Thread(() -> {
printer.printABC(s1, s2);
}, "A").start();
Thread.sleep(10);
new Thread(() -> {
printer.printABC(s2, s3);
}, "B").start();
Thread.sleep(10);
new Thread(() -> {
printer.printABC(s3, s1);
}, "C").start();
}
}

输出结果:

1
java复制代码ABCABCABCABCABCABCABCABCABCABC

面试官: 除了上述五种方法,还有其他方法吗

我: 还有LockSupport、CountDownLatch、AtomicInteger等等。

面试官: 那如何实现三个线程循环打印ACB,其中A打印两次,B打印三次,C打印四次呢?

我: ……

面试官: 如何用两个线程交叉打印数字和字符呢?例如A1B2C3……Z26

我: ……

真正的面试过程中,肯定不会让大家用这么多方法实现多线程交替打印问题,记住一两种即可,大家可以思考下后面两个升级版的问题,原理都是相通的。

微信搜索公众号路人zhang,回复面试手册,领取更多高频面试题PDF版及更多面试资料。

推荐阅读:

  • MySQL数据库高频面试题
  • 计算机网络高频面试题最新版
  • Java集合高频面试题最新版
  • 并发编程高频面试题
  • Java基础高频面试题

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ElasticSearch的JavaAPI简单使用

发表于 2021-05-05

ES是一个开源的高扩展的分布式全文搜索引擎,是整个Elastic Stack技术栈的核心。它可以近乎实时的存储、检索数据;本身扩展性很好,可以扩展到上百台服务器,处理PB级别的数据。Elasticsearch软件是由Java语言开发的,所以也可以通过Java API的方式对Elasticsearch服务进行访问,并且操作的方式基本和Http操作一致

本文已经收录到Github: github.com/chenliang15…

Maven依赖

  • pom.xml中加入ES需要的依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
xml复制代码<dependencies>
<dependency>
<groupId>org.elasticsearch</groupId>
<artifactId>elasticsearch</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch的客户端 -->
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.8.0</version>
</dependency>
<!-- elasticsearch依赖2.x的log4j -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-api</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>org.apache.logging.log4j</groupId>
<artifactId>log4j-core</artifactId>
<version>2.8.2</version>
</dependency>
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
<version>2.9.9</version>
</dependency>
<!-- junit单元测试 -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.12</version>
</dependency>
</dependencies>
  • 如果在运行时报错
1
makefile复制代码java.lang.NoSuchMethodError: org.elasticsearch.client.Request.addParameters

那么应该是发生了依赖冲突,新建一个Maven项目,最好不要直接创建springboot项目,可能会有依赖冲突,将上面的依赖加入到maven项目的pom中

链接ES客户端

因为使用的highLevel的ES客户端,所以可以很方便的连接到ES,只需要通过host和port就可以直接连接

1
2
3
4
5
6
7
java复制代码public static void main(String[] args) throws IOException {
RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);

esClient.close();
}

如果代码执行正常,没有报错那么就表示正常连接到了ES节点,可以通过获取到的RestHighLevelClient对象进行操作

索引操作

加入了maven依赖,那么就可以通过Java客户端链接到ES进行操作,首先进行索引的基本操作

  • 创建索引
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);
@Test
public void index_create() throws IOException {
// 创建索引 - 请求对象
CreateIndexRequest request = new CreateIndexRequest("test");
// 发送请求,获取响应
CreateIndexResponse response = esClient.indices().create(request, RequestOptions.DEFAULT);
boolean acknowledged = response.isAcknowledged();
// 响应状态
System.out.println("操作状态 = " + acknowledged);
}
  • 查询索引
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);

@Test
public void index_get() throws IOException {

GetIndexRequest request = new GetIndexRequest("test");

GetIndexResponse response = esClient.indices().get(request, RequestOptions.DEFAULT);

System.out.println(Arrays.toString(response.getIndices()));
System.out.println(response.getMappings());
System.out.println(response.getSettings());
}
  • 删除索引
1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码RestHighLevelClient esClient = new RestHighLevelClient(
RestClient.builder(new HttpHost("localhost", 9200, "http"))
);

@Test
public void index_delete() throws IOException {

DeleteIndexRequest request = new DeleteIndexRequest("test");

AcknowledgedResponse response = esClient.indices().delete(request, RequestOptions.DEFAULT);

System.out.println(response.isAcknowledged());
}

文档操作

文档作为数据的对象,可以通过ES的客户端对象直接操作

RestHighLevelClient对象和上面的创建方式相同,此处省略

文档基本操作
  • 创建文档
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Test
public void doc_create() throws IOException {
IndexRequest request = new IndexRequest();
// 设置操作的索引和当前插入的文档的id
request.index("people").id("1001");

User user = new User();
user.setName("张三");
user.setAge(10);
user.setSex("男");

ObjectMapper mapper = new ObjectMapper();
String userJson = mapper.writeValueAsString(user);

request.source(userJson, XContentType.JSON);

IndexResponse response = esClient.index(request, RequestOptions.DEFAULT);

System.out.println(response);

System.out.println("_index" + response.getIndex());
System.out.println("_id" + response.getId());
System.out.println("_result" + response.getResult());
}
  • 更新文档
1
2
3
4
5
6
7
8
9
10
11
java复制代码@Test
public void doc_update() throws IOException {
UpdateRequest request = new UpdateRequest();
request.index("people").id("1001");

request.doc(XContentType.JSON, "sex", "女");

UpdateResponse response = esClient.update(request, RequestOptions.DEFAULT);

System.out.println(response);
}
  • 查询文档
1
2
3
4
5
6
7
8
9
10
11
java复制代码@Test
public void doc_get() throws IOException {

GetRequest request = new GetRequest();
request.index("people").id("1001");

GetResponse response = esClient.get(request, RequestOptions.DEFAULT);

System.out.println(response);
System.out.println(response.getSource());
}
  • 删除文档
1
2
3
4
5
6
7
8
java复制代码@Test
public void doc_delete() throws IOException {
DeleteRequest request = new DeleteRequest();
request.index("people").id("1001");

DeleteResponse response = esClient.delete(request, RequestOptions.DEFAULT);
System.out.println(response);
}
文档批量操作

一般操作ES的数据都是批量保存或者批量删除,如果通过一条一条的操作,那么效率非常的低,可以通过批量操作API提高效率

  • 批量保存
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码@Test
public void doc_insert_batch() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new IndexRequest().index("people").id("1001").source(XContentType.JSON, "name", "张三", "sex", "男", "age", 10));
request.add(new IndexRequest().index("people").id("1002").source(XContentType.JSON, "name", "李四", "sex", "男", "age", 11));
request.add(new IndexRequest().index("people").id("1003").source(XContentType.JSON, "name", "王五", "sex", "男", "age", 12));
request.add(new IndexRequest().index("people").id("1004").source(XContentType.JSON, "name", "王五1", "sex", "男", "age", 8));
request.add(new IndexRequest().index("people").id("1005").source(XContentType.JSON, "name", "王五2", "sex", "男", "age", 9));
request.add(new IndexRequest().index("people").id("1006").source(XContentType.JSON, "name", "王五33", "sex", "男", "age", 7));

BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);

System.out.println(response.getTook());
System.out.println(Arrays.toString(response.getItems()));
}
  • 批量删除
1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Test
public void doc_delete_batch() throws IOException {
BulkRequest request = new BulkRequest();
request.add(new DeleteRequest().index("people").id("1001"));
request.add(new DeleteRequest().index("people").id("1002"));
request.add(new DeleteRequest().index("people").id("1003"));

BulkResponse response = esClient.bulk(request, RequestOptions.DEFAULT);

System.out.println(response.getTook());
System.out.println(response);
}

文档高级查询

高级查询、分组查询、分页、排序、聚合操作等,作为ES常用的查询操作,ES的Java客户端也提供了好用的API进行操作

  • 查询指定索引的所有数据
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Test
public void match_all() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());

request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • term查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码@Test
public void query_term() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.termQuery("age", 10));

request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • 分页查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码@Test
public void query_page() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());

// from指定的是页数开始的偏移量, (当前页数 - 1)* size
builder.from(0);
builder.size(3);

request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • 排序查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码@Test
public void query_sort() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());

builder.sort("age", SortOrder.DESC);
request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • 查询并过滤字段
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码@Test
public void query_filter() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();
builder.query(QueryBuilders.matchAllQuery());

String[] excludes = {};
String[] includes = {"name"};
// 过滤字段,指定只包含某些字段,或者需要排除某些字段
builder.fetchSource(includes, excludes);

request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • bool条件查询,多条件Must
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码@Test
public void query_bool_must() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

// 多条件查询,must -> 一定需要匹配到
boolQueryBuilder.must(QueryBuilders.matchQuery("age", 10));
boolQueryBuilder.must(QueryBuilders.matchQuery("sex", "女"));


builder.query(boolQueryBuilder);

request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • bool条件查询,多条件should
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
java复制代码@Test
public void query_bool_should() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();

BoolQueryBuilder boolQueryBuilder = QueryBuilders.boolQuery();

// 多条件查询,should -> 如果当前这条数据满足两个should条件中的任意一个,则表示满足条件
boolQueryBuilder.should(QueryBuilders.matchQuery("age", 10));
boolQueryBuilder.should(QueryBuilders.matchQuery("sex", "女"));

builder.query(boolQueryBuilder);

request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • 范围查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
java复制代码@Test
public void query_range() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();

// 根据某个字段构建范围查询
RangeQueryBuilder rangeQueryBuilder = QueryBuilders.rangeQuery("age");

// 指定范围查询的条件数据
rangeQueryBuilder.gte(9);
rangeQueryBuilder.lte(12);

builder.query(rangeQueryBuilder);

request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • 模糊查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码@Test
public void query_fuzzy() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();

// 模糊查询,指定字段名称进行模糊查询,后面的指定模糊匹配的字符个数
builder.query(QueryBuilders.fuzzyQuery("name", "王五").fuzziness(Fuzziness.ONE));

request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);

SearchHits hits = response.getHits();
System.out.println("took:" + response.getTook());
System.out.println("timeout:" + response.isTimedOut());
System.out.println("total:" + hits.getTotalHits());
System.out.println("MaxScore:" + hits.getMaxScore());
System.out.println("hits========>>");
for (SearchHit hit : hits) {
//输出每条查询的结果信息
System.out.println(hit.getSourceAsString());
}
System.out.println("<<========");
}
  • 高亮查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码@Test
public void query_highlight() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();
TermsQueryBuilder termsQueryBuilder = QueryBuilders.termsQuery("sex", "女");

// 设置查询方式
builder.query(termsQueryBuilder);

// 构建高亮字段
HighlightBuilder highlightBuilder = new HighlightBuilder();
highlightBuilder.preTags("<font color='red'>"); // 设置标签前缀
highlightBuilder.postTags("</font>"); // 设置标签后缀
highlightBuilder.field("sex"); // 设置高亮字段

// 设置高亮查询
builder.highlighter(highlightBuilder);

// 设置请求体
request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
System.out.println(response);
SearchHits hits = response.getHits();
System.out.println("took::"+response.getTook());
System.out.println("time_out::"+response.isTimedOut());
System.out.println("total::"+hits.getTotalHits());
System.out.println("max_score::"+hits.getMaxScore());
System.out.println("hits::::>>");
for (SearchHit hit : hits) {
String sourceAsString = hit.getSourceAsString();
System.out.println(sourceAsString);
//打印高亮结果
Map<String, HighlightField> highlightFields = hit.getHighlightFields();
System.out.println(highlightFields);
}
}
  • 聚合查询—最大值
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Test
public void query_aggs_max() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();

// 聚合查询,获取最大的age字段的数
builder.aggregation(AggregationBuilders.max("maxAge").field("age"));

// 设置请求体
request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
System.out.println(response);
}
  • 聚合查询—分组查询
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码@Test
public void query_aggs_group() throws IOException {
SearchRequest request = new SearchRequest();
request.indices("people");

SearchSourceBuilder builder = new SearchSourceBuilder();

// 聚合查询,获取最大的age字段的数
builder.aggregation(AggregationBuilders.terms("age_groupby").field("age"));

// 设置请求体
request.source(builder);

SearchResponse response = esClient.search(request, RequestOptions.DEFAULT);
System.out.printl n(response);
}

文档的高级查询,JavaAPI的操作方式和语法基本和Http操作的一致,只不过将对应的Http方式替换为了API,可以更好的在程序中进行ES操作,下一篇一起看看如何在SpringBoot和ElasticSearch进行集成吧,更加方便的操作ES与索引数据

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

php 有哪些杀手级超厉害框架或库或应用?

发表于 2021-05-05

1:[phpvirtualbox/phpvirtualbox]PHP版的VirtualBox,Web上的虚拟机

用过virtualbox吧,很好用对不对,他还有个php版的。就是这么好玩。

有什么用呢,假如你一台用不到的主机,不给他配屏幕了,想把他当做一个实验机器,装一个虚拟机, 随时测玩,能怎么办,顶多顶多也就windows远程桌面了吧,稍微厉害点装linux用命令行,但是命令行 操作虚拟机就麻烦点了吧,装远程桌面就更麻烦了。那就不如装个php版的vbox,浏览器输上ip端口直接怼。

2:[ph-7/obfuscator]PHP代码混淆加密器

php加密扩展一大堆,基本都是收费的,原理还需要额外扩展。这个加密库直接把php的代码翻译成不可 逆的文本编码形式,不依赖扩展,不降低性能,适合加密类代码文件。

3:[kiddyu/beanbun]一个简单可扩展的爬虫框架

这个就很简单了,基于workwerman做的一个爬虫框架,封装了一系列dom接口,还不错。不过我做爬虫用 的都是定时任务 。

4:[walkor/workerman-todpole]小蝌蚪聊天室,十分有趣

打开页面,进入一个无垠的空间,好多小蝌蚪在空间里游荡,互发消息。有趣有趣。

5:[symfony/process]好用的异步调用系统命令组件

还在用exec调用命令?唉,麻烦啊,这么好的库竟然不知道。

6:[symfony/finder]简单易用又高效,操作目录从未如此简单!

还在自己写函数递归遍历目录?low啊,这么好的库,用起来比“我的电脑”还好用。

7:[kosinix/grafika]简单好用智能,处理图片就是这么方便

谁再直接用gd库合成图片,谁就是嫌阳寿太多了,这个库做剪裁那都是智能的。(不过合成海报现在都 用canvas了 ,做压缩图水印还是很好的)

8: [linfo/linfo]让你认清自己,全面获取系统状态

优雅地获取系统状态( 网络/torrents/cpu/内存/usb/pci/声卡/文件系统/raid阵列/ipmi/等 ).

9:[guzzlehttp/guzzle]还在上网搜curl的函数?用这个优雅易用的HTTP客户端

这个都不用多介绍了.凡是原来用到curl的都用他. [guzzlehttp/guzzle]是PHP的一个HTTP客户端,他干的就是curl干的那些事,不过比curl的用法要更简单, 适用,代码更清晰明了.

10:[clue/redis-server]纯PHP实现的Redis服务器

什么是Redis呢?redis是一个开源的、使用C语言编写的、支持网络交互的、可基于内存也可持久化的 Key-Value数据库.

当你遇到队列,秒杀,缓存等提高性能的场景时,使用Redis是最广泛的解决方案,在宝塔面板中还可以 一键安装. 首先我们介绍了,Redis是用C语言开发的,最近也有Go的爱好者开发了好多个Go的版本,那你有没有见过 PHP版本开发的Redis呢?

今天就给大家介绍一个纯PHP开发的Redis服务端项目:clue/php-redis-server. 这个项目的计划是实现如下功能:

  • 使用Redis的协议
  • 实现在内存中存储数据
  • 与常见的Redis客户端兼容
  • Redis命令行
  • Redis标准
  • 健壮和现代化的设计,可测试的模块化组件
  • 实现所有的标准的操作(还未全部实现)

11:ReactPHP

我还没开始写这个项目的介绍,但是很值得介绍.简单来说就是php版的nodejs.这样说就已经很有趣了是不是.

12:php-pm

我们都知道php-fpm,鸟哥说了,这是php黑科技. 那什么是php-pm呢?这个是用ReactPHP做的一个服务框架,用来运行你的业务框架,比如 Symfony,Laravel,Zend,Wordpress.声称比php-fpm快.

如果还不理解,那你可以了解一下webman,这个是最近才开始推广的技术,workerman系下的服务框架,简单 说就是不用php-fpm了,用webman来跑你的项目,一般跟tp结合得很好.都是国产的嘛.声称比php-fpm快 还有就是swoole系的服务框架,都是抛弃php-fpm,

基于swoole来运行你的业务代码,所有的swoole系的框 架都声称比php-fpm快. 国内一般都了解swoole系的服务框架是干什么的,你可以照此理解php-pm,他是reactphp系的服务框架.

13: skydiablo/ SkyRadius

据所知是第一个RADIUS server ,拨号认证服务,至于是干什么的,自己了解吧,我也不了解,但是感觉好厉害.

点关注,不迷路

好了各位,以上就是这篇文章的全部内容了,能看到这里的人呀,都是人才。之前说过,PHP方面的技术 点很多,也是因为太多了,实在是写不过来,写过来了大家也不会看的太多,所以我这里把它整理成了 PDF和文档,如果有需要的可以点这里shimo.im/docs/rjJttd… 《进阶PHP月薪30k>>>架构师成长路线【视频、面试文档免费获取】》

)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…674675676…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%