开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

企业级数据湖最佳实践 一、统一数据存储,多引擎对接,运存分离

发表于 2021-11-24

简介:2021云栖大会云原生企业级数据湖专场,阿里云智能高级解决方案架构师周皓为我们带来《企业级数据湖最佳实践》的分享。

本文主要分享了数据湖的核心能力及几个最佳实践案例。

以下是精彩视频内容整理:

一、统一数据存储,多引擎对接,运存分离

在这一节开始之前,先回顾一下数据湖几个核心的能力:

  • 集中存储、多种引擎对接

各种类型的数据用集中方式统一存储在OSS,无缝对接如 EMR 等各类计算引擎,支持开源计算生态

  • 数据无需处理、直接存储

对接多种数据输入源,提供便捷的数据接入和数据消费通道,多种类型数据都可以按照原始产生的形态直接存储,随需再进行处理,对比传统数仓 schema 限制结构,更适配业务快速发展的应用场景

  • 更灵活架构、运存分离

存算分离架构带来非常好的灵活性,通过计算与存储解耦合提供更灵活的系统架构设计空间,让计算、存储资源具备更好的扩展性,充分提高资源利用率,极大降低运维管理难度、优化 TCO。这也是本文介绍案例中,客户选择数据湖方案的一个重要原因。

二、最佳实践案例

Yeahmobi- 移动互联网广告实践案例

Yeahmobi,作为一家技术驱动发展的企业国际化智能营销服务公司,主要涉及到的就是智能营销业务,每天的业务波动非常大。如果采用传统的架构,势必要按照业务的峰值准备资源,就会造成很多 CPU 资源无法得到最大化的利用。这也是许多智能营销互联网公司的痛点。基于此,大多企业选择了数据湖方案。

  • 存储与计算解耦合,让计算资源使用量可以按照线上业务量的变化动态增减,减少常驻资源量
  • 多种不同类型计算引擎,轻松对接在线广告各种场景所需要的分析要求
  • 通过数据湖方案,整体TCO优化达到30%,让业务形态更具竞争力

只要将数据存储在数据湖中,计算资源可以按照业务的变化实现动态的伸缩与创建,只需维护一个最小的常驻计算资源就足够了。在这种情况下,再结合兼具半托管和全托管模式的 EMR 动态伸缩计算及分析的能力,可以极大地降低运维的难度。这也是许多智能营销公司选择这套数据湖方案的原因。Yeahmobi 选择这套数据湖方案后,TCO 降低了30%。

数禾科技-互联网金融实践案例

数禾科技是一家互联网金融科技公司,因其所在行业的特性及本身的业务场景需求,对数据的安全可靠性,以及数据访问控制的细粒度都有很高的要求。数禾服务了大量内部和外部用户,数据安全敏感,要求严格的数据权限隔离。其次,整个业务变化也需要非常强劲的吞吐能力来支撑计算及存储。

其实在数禾的发展过程中,最早采用的是最常见最通用的大数据集群构建方式既通过服务器搭建,然而很快就发现这种方式无法跟上业务的快速发展:一是存储成本显著增长,一个标准的 HDFS 集群是三个冗余备份,在考虑到水位,整个文件系统的开销等因素后,存储成本是显著增加的。二是因为业务的快速扩容, 如果频繁增加 HDFS 集群节点,会影响业务的可用性。

基于以上原因,数禾选择了阿里云数据湖方案。数据湖采用 对象存储 OSS 作为底座,无需担心容量的扩展或是小文件的增加。文件数量的快速增加会对 HDFS 集群的 NameNode 造成比较大的压力,但是对象存储结构无需担心文件数量增加,哪怕是到万亿级的 object 的数量,也没有任何压力。采用了数据湖方式以后,多 bucket 切分搭配阿里云的 RAM 体系,可以做到非常细腻度的访问控制。再通过例如 OSS 与 EMR 在软件层合作优化的 JindoFS 方案,能够输出超过 TBS 的吞吐能力,以支撑整个业务的需求,实际操作体验超越自建HDFS。此外,通过云上弹性资源能力,任务可以随需进行千台规模节点的弹性伸缩,达到降本增效的效果。

数据湖经典使用场景-冷热数据分层

模式特点

  • 应用与业务系统的长期运行积累了大量的冷数据,不断增长的冷数据对现有集群的存储空间压力大
  • 需要解决冷数据存储空间,同时为访问频繁的热数据留出性能优化空间
  • 优化冷数据的长期存储成本,要能远低于热数据存储成本,并且冷数据要能便于读取

冷热数据分层是数据湖的一个经典使用方式。应用与业务系统的长期运行会产生大量冷数据,对整个集群的运维产生非常大的压力。一方面是规模上的压力,通用的大数据集群里面的服务器架构比较同构导致冷数据的优化的空间不大,如果通过增加高密度的或者异购的机型,在实践中会导致集群运维管理难度的大幅度提升。另外一方面是在 IDC 环境中,要想快速扩容,物理集群会受限于很多因素。这也是很多数据湖客户从传统的大数据集群架构往数据湖迁移的原因。目前已经有很多客户拥抱了数据湖,全面使用 OSS。无法一步到位的时候,客户会先将温数据和冷数据先沉降到 OSS。早在2016年,OSS 就已经与 Hadoop 生态充分融合,Hadoop 3.0可以直接访问 OSS,写好的任务不用做任何修改就可以直接运行,大幅减少了迁移的难度。迁移过来之后,OSS 上的智能生命周期管理简单地配置一个生命周期策略,就可以按照规则把冷数据进一步沉降到归档和冷归档类型中,进一步降低成本。

教育科技平台实践案例

客户价值

  • 通过 OSS 多存储类型和数据生命周期管理,实现冷数据长期存储的成本优化,通过云上承载冷数据,IDC 自建集群无需扩容,解决机房空间难题
  • 通过 OSS 数据湖的高可扩展性,有效的帮助客户解决了大数据存储的性能吞吐问题,规避了自建 HDFS 文件系统在元数据节点的性能瓶颈
  • 客户已经在规划进一步通过云上弹性资源去扩容计算资源,减小一次性资源投入

这是一个冷热分层的实际案例。教育平台涉及到的业务场景包含各种日志的采集,通过使用的日志帮助学生改进学习。这个客户也面临一个问题,大量日志采集以后对于空间占用会造成非常大的压力。客户又是自建 IDC,很难在一段时间内就完成物理空间的扩容,所以最后选择了数据湖方案。通过专线打通了 IDC 和阿里云的连接,使用阿里云的资源扩容现有 IDC ,再通过专线将线下的冷数据迁移到了数据湖中,为线下的业务腾出了空间,压力释放以后,空间就非常灵活了。然后进一步将很多应用日志直接入湖,通过对象存储及多版本能力,为数据可靠性提供更多的保障,同时也使用了冷归档能力将温数据进一步沉降以降低成本。入湖的数据则通过专线拉到本地计算,但是在数据湖的使用过程中,客户想进一步使用云上的计算资源以扩容计算能力,就无需一次性采购线下的计算服务器,进一步降低成本。

全球化在线游戏实践案例

客户价值

  • 通过日志服务,打通应用日志的采集、投递,贯通实时计算引擎,为后续的用户热力图 ,用户轨迹,用户登陆,在线人数统计提供数据支撑
  • 通过 OSS 数据湖承载所有日志数据的长期存储,与离线分析引擎结合,能够更深入对日志数据进行分析
  • 全球统一化架构部署,对于一款面向全球化的游戏,可以保证在全球任何一个区域都可以使用相同的部署方式,简化运维部署难度

一款全球化的游戏一般要服务全球玩家,这就要求全球统一化架构部署以降低运维难度,而阿里云数据湖在全球任何一个区域都可以使用相同的部署方式,可以完全匹配客户的需求。此外,游戏行业日志的采集非常关键,比如游戏在线人数的大屏展示,就是通过采集应用服务器的日志去做分析。针对这个客户,我们采用了阿里云的日志服务,从千台万台机器规模的应用服务器中实时采集日志,推送到 Flink 中做实时计算,并将结果实时写入到 ClickHouse,提供实时性的查询。OSS 在这个场景中,作为了日志的的永久存储。SLS 把采集的日志定期投递到 OSS ,并且通过 OSS SDK 和一些命令行工具,直接将一些应用日志传到 OSS ,存储到 OSS 的日志可以进一步通过离线分析,如通过 Spark、Hive 做更大规模的分析,并将深度分析的结果再写入到 ClickHouse,提供更多的分析查询。

小鹏汽车-自动驾驶实践案例

数据湖与各种存储产品之间是无缝打通的。在这个自动驾驶案例中,我们提供了一套完整的从采集到存储到分析的方案。闪电立方提供了车载的部署能力,解决了自动驾驶场景下每天采集的大量路面数据的存储问题,采集完以后通过就近的接入点快速地存储到 OSS 数据湖中去,解决了最后一公里的问题,把数据上传到 OSS 以后,可以直接使用阿里云的各项计算引擎,包括 EMR、MaxCompute 等对数据进行各种各样的清洗标注和分析。CPFS 是阿里云上的一款支持大规模并行计算的存储产品,它有非常高的吞吐能力,并且是 posix 语义的。OSS 数据湖与 CPFS 无缝的数据流动,使得训练数据也能传递到 CPFS中,对数据 GPU 进行分析,并将最终的结果写回到 OSS 做长期的存储。

不光在互联网领域,包括自动驾驶、高性能计算等都已经广泛的应用数据湖。希望有更多的用户可以在生产业务中引入阿里云数据湖。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

性能监控之初识 Prometheus

发表于 2021-11-24

「这是我参与11月更文挑战的第24天,活动详情查看:2021最后一次更文挑战」

一、概述

Prometheus 是一套开源的监控、报警、时间序列数据库的组合,起始是由 SoundCloud 公司开发的,源于谷歌 borgmon。从 2016 年加入CNCF,2016 年 6 月正式发布 1.0 版本,2017 年底发布了基于全新存储层的 2.0 版本,能更好地与容器平台、云平台配合,到 2018 年8月毕业,现在已经成为 Kubernetes 的官方监控方案,社区活跃,第三方集成非常丰富。

官网地址:prometheus.io/
在这里插入图片描述

二、监控的目标

在《SRE: Google运维解密》一书中指出,监控系统需要能够有效的支持白盒监控和黑盒监控。通过白盒能够了解其内部的实际运行状态,通过对监控指标的观察能够预判可能出现的问题,从而对潜在的不确定因素进行优化。而黑盒监控,常见的如 HTTP探针,TCP 探针等,可以在系统或者服务在发生故障时能够快速通知相关的人员进行处理。通过建立完善的监控体系,从而达到以下目的:

  • 长期趋势分析:通过对监控样本数据的持续收集和统计,对监控指标进行长期趋势分析。例如,通过对磁盘空间增长率的判断,我们可以提前预测在未来什么时间节点上需要对资源进行扩容。
  • 对照分析:两个版本的系统运行资源使用情况的差异如何?在不同容量情况下系统的并发和负载变化如何?通过监控能够方便的对系统进行跟踪和比较。
  • 告警:当系统出现或者即将出现故障时,监控系统需要迅速反应并通知管理员,从而能够对问题进行快速的处理或者提前预防问题的发生,避免出现对业务的影响。
  • 故障分析与定位:当问题发生后,需要对问题进行调查和处理。通过对不同监控监控以及历史数据的分析,能够找到并解决根源问题。
  • 数据可视化:通过可视化仪表盘能够直接获取系统的运行状态、资源使用情况、以及服务运行状态等直观的信息。

三、Prometheus 的优势

Prometheus是一个开源的完整监控解决方案,其对传统监控系统的测试和告警模型进行了彻底的颠覆,形成了基于中央化的规则计算、统一分析和告警的新模型。 相比于传统监控系统 Prometheus 具有以下优点:

  • 易于管理:只有一个单独的二进制文件,不存在任何的第三方依赖,采用 Pull-based 的方式拉取数据,Prometheus基于Pull模型的架构方式,可以在任何地方(本地电脑,开发环境,测试环境)搭建我们的监控系统。对于一些复杂的情况,还可以使用Prometheus 服务发现(Service Discovery)的能力动态管理监控目标。
  • 监控服务的内部运行状态:Pometheus 鼓励用户监控服务的内部状态,基于 Prometheus 丰富的 Client 库,用户可以轻松的在应用程序中添加对 Prometheus 的支持,从而让用户可以获取服务和应用内部真正的运行状态。
    在这里插入图片描述
  • 强大的数据模型:时间序列数据库 TSDB,golang,实现每一条时间序列由指标名称(Metrics Name)以及一组标签(Labels)唯一标识,表示维度的标签可能来源于你的监控对象的状态,比如 code=404 或者 content_path=/api/path 。也可能来源于的你的环境定义,比如 environment=produment。基于这些 Labels 我们可以方便地对监控数据进行聚合,过滤,裁剪。
1
2
3
bash复制代码http_request_status{code='200',content_path='/api/path', environment='produment'} => [value1@timestamp1,value2@timestamp2...]

http_request_status{code='200',content_path='/api/path2', environment='produment'} => [value1@timestamp1,value2@timestamp2...]
  • 强大的查询语言 PromQL:内置了一个强大的数据查询语言 PromQL,可以实现多种查询、聚合,同时 PromQL 也被应用于数据可视化(如Grafana)以及告警当中。通过 PromQL 可以轻松回答类似于以下问题:
    • 在过去一段时间中 95% 应用延迟时间的分布范围?
    • 预测在 4 小时后,磁盘空间占用大致会是什么情况?
    • CPU 占用率前 5 位的服务有哪些?(过滤)
  • 高性能:对于监控系统而言,大量的监控任务必然导致有大量的数据产生。而 Prometheus 可以高效地处理这些数据,对于单一Prometheus Server 实例而言它可以处理:
    • 数以百万的监控指标
    • 每秒处理数十万的数据点
    • 上千个 targets
  • 易扩展:Prometheus 是如此简单,因此你可以在每个数据中心、每个团队运行独立的 Prometheus Sevrer。Prometheus 对于联邦集群的支持,可以让多个 Prometheus 实例产生一个逻辑集群,当单实例 Prometheus Server 处理的任务量过大时,通过使用功能分区(sharding)+联邦集群(federation)可以对其进行扩展,实现多数据中心。
  • 易集成:使用 Prometheus 可以快速搭建监控服务,并且可以非常方便地在应用程序中进行集成。目前支持: Java, JMX, Python, Go,Ruby, .Net, Node.js 等等语言的客户端SDK,基于这些 SDK 可以快速让应用程序纳入到 Prometheus 的监控当中,或者开发自己的监控数据收集程序。同时这些客户端收集的监控数据,不仅仅支持 Prometheus,还能支持 Graphite 这些其他的监控工具。同时 Prometheus 还支持与其他的监控系统进行集成:Graphite, Statsd, Collected, Scollector, muini, Nagios等。Prometheus社区还提供了大量第三方实现的监控数据采集支持:JMX, CloudWatch, EC2, MySQL, PostgresSQL, Haskell, Bash, SNMP, Consul, Haproxy, Mesos, Bind, CouchDB, Django, Memcached, RabbitMQ, Redis, RethinkDB, Rsyslog等等。总之,支持多种语言的 SDK 进行应用程序数据埋点,社区有丰富插件,白盒&黑盒监控都支持,DevOps友好。
  • 可视化:Prometheus Server 中自带了一个 Prometheus UI,通过这个 UI 可以方便地直接对数据进行查询,并且支持直接以图形化的形式展示数据。同时 Prometheus 还提供了一个独立的基 于Ruby On Rails的 Dashboard 解决方案 Promdash。最新的Grafana 可视化工具也已经提供了完整的 Prometheus 支持,基于 Grafana 可以创建更加精美的监控图标。基于 Prometheus 提供的 API 还可以实现自己的监控可视化 UI。
  • 开放性:通常来说当我们需要监控一个应用程序时,一般需要该应用程序提供对相应监控系统协议的支持。因此应用程序会与所选择的监控系统进行绑定。为了减少这种绑定所带来的限制。对于决策者而言要么你就直接在应用中集成该监控系统的支持,要么就在外部创建单独的服务来适配不同的监控系统。
    而对于 Prometheus 来说,使用 Prometheus 的 client library 的输出格式不止支持 Prometheus 的格式化数据,也可以输出支持其它监控系统的格式化数据,比如 Graphite。
    因此你甚至可以在不使用 Prometheus 的情况下,采用 Prometheus 的 client library 来让你的应用程序支持监控数据采集。也就是说使用 sdk 采集的数据可以被其他监控系统使用,不一定非要用 Prometheus。

四、Prometheus 的基本架构

作为一个监控系统,Prometheus 项目的作用和工作方式,其实可以用如下所示的一张官方示意图来解释:

image.png

Prometheus 是使用 Pull (抓取)的方式去搜集被监控对象的 Metrics 数据(监控指标数据),比如从 exporter 拉取数据,或者间接地通过网关 gateway 拉取数据(如果在 k8s 内部署,可以使用服务发现的方式),它默认本地存储抓取的所有数据,并通过一定规则进行清理和整理数据,然后再把这些结果保存在一个 TSDB (时间序列数据库,比如 OpenTSDB、InfluxDB 等)当中,以便后续可以按照时间进行检索。有了这套核心监控机制, Prometheus 剩下的组件就是用来配合这套机制的运行。比如Pushgateway ,可以允许被监控对象以 Push 的方式向 Prometheus 推送 Metrics 数据。而 Alertmanager,则可以根据 Metrics 信息灵活地设置报警。当然, Prometheus 最受用户欢迎的功能,还是通过 Grafana 对外暴露出的、可以灵活配置的监控数据可视化界面。

五、组件内容

  • Prometheus Server:Prometheus Server 是 Prometheus ,组件中的核心部分,负责实现对监控数据的获取,存储以及查询。 Prometheus Server 可以通过静态配置管理监控目标,也可以配合使用 Service Discovery 的方式动态管理监控目标,并从这些监控目标中获取数据。其次 Prometheus Server 需要对采集到的监控数据进行存储,Prometheus Server 本身就是一个时序数据库,将采集到的监控数据按照时间序列的方式存储在本地磁盘当中。最后 Prometheus Server 对外提供了自定义的 PromQL 语言,实现对数据的查询以及分析。
+ Retrieval: 采样模块
+ TSDB: 存储模块默认本地存储为tsdb
+ HTTP Server: 提供http接口查询和面板,默认端口为9090
  • Exporters/Jobs:负责收集目标对象(host, container…)的性能数据,并通过 HTTP 接口供 Prometheus Server 获取,Prometheus Server通过访问该 Exporter 提供的Endpoint端点,即可获取到需要采集的监控数据。支持数据库、硬件、消息中间件、存储系统、http服务器、jmx等。只要符合接口格式,就可以被采集。
+ 直接采集:这一类 Exporter 直接内置了对 Prometheus 监控的支持,比如 cAdvisor,Kubernetes,Etcd,Gokit等,都直接内置了用于向 Prometheus 暴露监控数据的端点。
+ 间接采集:间接采集,原有监控目标并不直接支持 Prometheus,因此我们需要通过 Prometheus 提供的 Client Library 编写该监控目标的监控采集程序。例如: Mysql Exporter,JMX Exporter,Consul Exporter等。
  • Short-lived jobs:瞬时任务的场景,无法通过pull方式拉取,需要使用push方式,与PushGateway搭配使用
  • PushGateway:可选组件,主要用于短期的 jobs。由于 Prometheus 数据采集基于 Pull 模型进行设计,因此在网络环境的配置上必须要让 Prometheus Server 能够直接与 Exporter 进行通信。 当这种网络需求无法直接满足时,就可以利用 PushGateway 来进行中转。可以通过 PushGateway 将内部网络的监控数据主动 Push 到 Gateway 当中。而Prometheus Server则可以采用同样Pull 的方式从 PushGateway 中获取到监控数据。由于这类 jobs 存在时间较短,可能在 Prometheus 来 pull 之前就消失了。为此,这次 jobs 可以直接向 Prometheus server 端推送它们的 metrics。这种方式主要用于服务层面的 metrics,对于机器层面的 metrices,需要使用 node exporter。
  • 客户端sdk:官方提供的客户端类库有go、java、scala、python、ruby,其他还有很多第三方开发的类库,支持nodejs、php、erlang等
  • PromDash:使用 rails 开发的 dashboard,用于可视化指标数据,已废弃
  • Alertmanager:在 Prometheus Server 中支持基于 PromQL 创建告警规则,如果满足 PromQL 定义的规则,则会产生一条告警,而告警的后续处理流程则由 AlertManager 进行管理。在 AlertManager 中我们可以与邮件,Slack等等内置的通知方式进行集成,也可以通过 Webhook 自定义告警处理方式。AlertManager 即 Prometheus 体系中的告警处理中心。
  • Service Discovery:服务发现,Prometheus 支持多种服务发现机制:文件,DNS,Consul,Kubernetes,OpenStack,EC2等等。基于服务发现的过程并不复杂,通过第三方提供的接口,Prometheus查询到需要监控的Target列表,然后轮训这些Target获取监控数据。

其大概的工作流程是:

  • Prometheus server 定期从配置好的 jobs 或者 exporters 中拉 metrics,或者接收来自 Pushgateway 发过来的 metrics,或者从其他的 Prometheus server 中拉 metrics。
  • Prometheus server 在本地存储收集到的 metrics,并运行已定义好的 alert.rules,记录新的时间序列或者向 Alertmanager 推送警报。
  • Alertmanager 根据配置文件,对接收到的警报进行处理,发出告警。
  • 在图形界面中,可视化采集数据。

六、安装Prometheus Server

Prometheus基于Golang编写,编译后的软件包,不依赖于任何的第三方依赖。用户只需要下载对应平台的二进制包,解压并且添加基本的配置即可正常启动 Prometheus Server。

1、从二进制包安装

对于非 Docker 用户,可以从 prometheus.io/download/ 找到最新版本的 Prometheus Sevrer软件包:

1
2
bash复制代码export VERSION=2.25.2
curl -LO https://github.com/prometheus/prometheus/releases/download/v$VERSION/prometheus-$VERSION.darwin-amd64.tar.gz

解压,并将Prometheus相关的命令,添加到系统环境变量路径即可:

1
2
bash复制代码tar -xzf prometheus-${VERSION}.darwin-amd64.tar.gz
cd prometheus-${VERSION}.darwin-amd64

解压后当前目录会包含默认的 Prometheus 配置文 件promethes.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
bash复制代码# my global config
global:
scrape_interval: 15s # Set the scrape interval to every 15 seconds. Default is every 1 minute.
evaluation_interval: 15s # Evaluate rules every 15 seconds. The default is every 1 minute.
# scrape_timeout is set to the global default (10s).

# Alertmanager configuration
alerting:
alertmanagers:
- static_configs:
- targets:
# - alertmanager:9093

# Load rules once and periodically evaluate them according to the global 'evaluation_interval'.
rule_files:
# - "first_rules.yml"
# - "second_rules.yml"

# A scrape configuration containing exactly one endpoint to scrape:
# Here it's Prometheus itself.
scrape_configs:
# The job name is added as a label `job=<job_name>` to any timeseries scraped from this config.
- job_name: 'prometheus'

# metrics_path defaults to '/metrics'
# scheme defaults to 'http'.

static_configs:
- targets: ['localhost:9090']

Promtheus 作为一个时间序列数据库,其采集的数据会以文件的形式存储在本地中,默认的存储路径为 data/,因此我们需要先手动创建该目录:

1
bash复制代码mkdir -p data

用户也可以通过参数 --storage.tsdb.path="data/" 修改本地数据存储的路径。

启动 prometheus 服务,其会默认加载当前路径下的 prometheus.yaml 文件:

1
bash复制代码./prometheus

正常的情况下,你可以看到以下输出内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
bash复制代码level=info ts=2021-03-21T04:40:31.632Z caller=main.go:722 msg="Starting TSDB ..."
level=info ts=2021-03-21T04:40:31.632Z caller=web.go:528 component=web msg="Start listening for connections" address=0.0.0.0:9090
level=info ts=2021-03-21T04:40:31.632Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1615788000000 maxt=1615852800000 ulid=01F0WFKTA8JPYE3GTA5RP9XMB5
level=info ts=2021-03-21T04:40:31.633Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1615852800000 maxt=1615917600000 ulid=01F0YDDARF4MHECC8PZ603CZR1
level=info ts=2021-03-21T04:40:31.633Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1615917600000 maxt=1615982400000 ulid=01F10B6WRZNYGJXZW90R2BQENK
level=info ts=2021-03-21T04:40:31.633Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1615982400000 maxt=1616047200000 ulid=01F1290D3KN9JNGVMJXHP57EPR
level=info ts=2021-03-21T04:40:31.633Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1616047200000 maxt=1616112000000 ulid=01F146SZ7Q4MB39SRVBRSDQKMM
level=info ts=2021-03-21T04:40:31.633Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1616112000000 maxt=1616176800000 ulid=01F164KFHDRKCNXR7Y9H6Y8GE2
level=info ts=2021-03-21T04:40:31.634Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1616176800000 maxt=1616241600000 ulid=01F182D1JXT487YV58WYFZMJZS
level=info ts=2021-03-21T04:40:31.634Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1616241600000 maxt=1616263200000 ulid=01F18Q04A506Q2942TQAPKNVYE
level=info ts=2021-03-21T04:40:31.634Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1616284800000 maxt=1616292000000 ulid=01F19BK905YGNE1WV8CVX9RZQA
level=info ts=2021-03-21T04:40:31.634Z caller=repair.go:57 component=tsdb msg="Found healthy block" mint=1616263200000 maxt=1616284800000 ulid=01F19BKB09BVDWBE7QZPXNETX0
level=info ts=2021-03-21T04:40:31.741Z caller=head.go:645 component=tsdb msg="Replaying on-disk memory mappable chunks if any"
level=info ts=2021-03-21T04:40:31.799Z caller=head.go:659 component=tsdb msg="On-disk memory mappable chunks replay completed" duration=57.298066ms
level=info ts=2021-03-21T04:40:31.799Z caller=head.go:665 component=tsdb msg="Replaying WAL, this may take a while"
level=info ts=2021-03-21T04:40:32.030Z caller=head.go:691 component=tsdb msg="WAL checkpoint loaded"
level=info ts=2021-03-21T04:40:32.475Z caller=head.go:717 component=tsdb msg="WAL segment loaded" segment=1134 maxSegment=1137

2、使用容器安装

对于 Docker 用户,直接使用 Prometheus 的镜像即可启动 Prometheus Server:

1
2
bash复制代码docker run -p 9090:9090 -v 
/etc/prometheus/prometheus.yml:/etc/prometheus/prometheus.yml prom/prometheus

启动完成后,可以通过 http://localhost:9090 访问 Prometheus 的UI界面:

在这里插入图片描述

3、使用 Kubernetes Operator 安装

参考:github.com/coreos/kube…

七、小结

我们初步了解了 Prometheus 以及相比于其他相似方案的优缺点,可以为大家在选择监控解决方案时,提供一定的参考。同时我们介绍了 Prometheus 的生态以及核心能力,相信大家通过本文能够对 Prometheus 有一个直观的认识。

参考资料:

  • [1]:www.xuyasong.com/?p=1550
  • [2]:prometheus.io/docs/introd…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

性能监控之Telegraf+InfluxDB+Grafana

发表于 2021-11-24

「这是我参与11月更文挑战的第24天,活动详情查看:2021最后一次更文挑战」

一、背景

由于我们的自研客户端压测工具的测试结果是结构化日志文件,而考虑到目前性能监控需要做到实时化和集中化,那么需要一种定时和批量采集结构化日志文件的采集 agent,而刚好 Telegraf Logparser插件可以满足这个需求。

二、Telegraf logparser

Logparser插件流式传输并解析给定的日志文件,目前支持解析 “grok” 模式和正则表达式模式。

1、Grok 解析器

熟悉 grok 解析器的最佳途径是参考 logstash文档:
www.elastic.co/guide/en/lo…
Telegraf 解析器使用经过稍微修改的 logstash “grok” 模式版本,其格式为

1
bash复制代码%{<capture_syntax>[:<semantic_name>][:<modifier>]}
  • capture_syntax :定义解析输入行的 grok 模式
  • semantic_name:用于命名字段或标记
  • modifier:扩展被解析项转换为的数据类型或其他特殊处理

默认情况下,所有命名的捕获都转换为字符串字段。如果模式没有语义名称,则不会捕获它。时间戳修饰符可用于将捕获转换为已解析度量的时间戳。如果未解析任何时间戳,则将使用当前时间创建度量。

注意:每行必须捕获至少一个字段。将所有捕获转换为标记的模式将导致无法写入到时序数据库的点。

  • Available modifiers:
    • string (default if nothing is specified)
    • int
    • float
    • duration (ie, 5.23ms gets converted to int nanoseconds)
    • tag (converts the field into a tag)
    • drop (drops the field completely)
  • Timestamp modifiers:
    • ts (This will auto-learn the timestamp format)
    • ts-ansic (“Mon Jan _2 15:04:05 2006”)
    • ts-unix (“Mon Jan _2 15:04:05 MST 2006”)
    • ts-ruby (“Mon Jan 02 15:04:05 -0700 2006”)
    • ts-rfc822 (“02 Jan 06 15:04 MST”)
    • ts-rfc822z (“02 Jan 06 15:04 -0700”)
    • ts-rfc850 (“Monday, 02-Jan-06 15:04:05 MST”)
    • ts-rfc1123 (“Mon, 02 Jan 2006 15:04:05 MST”)
    • ts-rfc1123z (“Mon, 02 Jan 2006 15:04:05 -0700”)
    • ts-rfc3339 (“2006-01-02T15:04:05Z07:00”)
    • ts-rfc3339nano (“2006-01-02T15:04:05.999999999Z07:00”)
    • ts-httpd (“02/Jan/2006:15:04:05 -0700”)
    • ts-epoch (seconds since unix epoch, may contain decimal)
    • ts-epochmilli (milliseconds since unix epoch, may contain decimal)
    • ts-epochnano (nanoseconds since unix epoch)
    • ts-syslog (“Jan 02 15:04:05”, parsed time is set to the current year)
    • ts-“CUSTOM”

自定义时间格式必须在引号内,并且必须是 “参考时间” 的表示形式 on Jan 2 15:04:05 -0700 MST 2006。
要匹配逗号小数点,可以使用句点。例如,%{TIMESTAMP:timestamp:ts-"2006-01-02 15:04:05.000"} 可以用来匹配 "2018-01-02 15:04:05,000" 要匹配逗号小数点,可以在模式字符串中使用句点。

有关更多详细信息,请参考:
golang.org/pkg/time/#P…

Telegraf 具有许多自己的内置模式,并支持大多数 logstash 的内置模式。 Golang 正则表达式不支持向前或向后查找。不支持依赖于这些的logstash 模式。

如果需要构建模式以匹配日志的调试,使用 grokdebug.herokuapp.com 调试非常有用!

2、示例

我们可以使用 logparser 将 Telegraf 生成的日志行转换为指标。

为此,我们需要配置 Telegraf 以将日志写入文件。可以使用 agent.logfile 参数或配置 syslog 来完成。

1
2
bash复制代码[agent]
logfile = "/var/log/telegraf/telegraf.log"

Logparser配置:

1
2
3
4
5
6
7
8
9
bash复制代码[[inputs.logparser]]
files = ["/var/log/telegraf/telegraf.log"]

[inputs.logparser.grok]
measurement = "telegraf_log"
patterns = ['^%{TIMESTAMP_ISO8601:timestamp:ts-rfc3339} %{TELEGRAF_LOG_LEVEL:level:tag}! %{GREEDYDATA:msg}']
custom_patterns = '''
TELEGRAF_LOG_LEVEL (?:[DIWE]+)
'''

log 内容:

1
2
3
4
5
bash复制代码2018-06-14T06:41:35Z I! Starting Telegraf v1.6.4
2018-06-14T06:41:35Z I! Agent Config: Interval:3s, Quiet:false, Hostname:"archer", Flush Interval:3s
2018-02-20T22:39:20Z E! Error in plugin [inputs.docker]: took longer to collect than collection interval (10s)
2018-06-01T10:34:05Z W! Skipping a scheduled flush because there is already a flush ongoing.
2018-06-14T07:33:33Z D! Output [file] buffer fullness: 0 / 10000 metrics.

InfluxDB 采集的数据:

1
2
3
4
5
bash复制代码telegraf_log,host=somehostname,level=I msg="Starting Telegraf v1.6.4" 1528958495000000000
telegraf_log,host=somehostname,level=I msg="Agent Config: Interval:3s, Quiet:false, Hostname:\"somehostname\", Flush Interval:3s" 1528958495001000000
telegraf_log,host=somehostname,level=E msg="Error in plugin [inputs.docker]: took longer to collect than collection interval (10s)" 1519166360000000000
telegraf_log,host=somehostname,level=W msg="Skipping a scheduled flush because there is already a flush ongoing." 1527849245000000000
telegraf_log,host=somehostname,level=D msg="Output [file] buffer fullness: 0 / 10000 metrics." 1528961613000000000

三、具体实践

1、日志格式

需要采集的结构化日志示例如下:

1
2
3
4
5
6
7
8
bash复制代码TestConfig1,5.0,2019/3/6 17:48:23,2019/3/6 17:48:30,demo_1,open,3,1,6.8270219,openscreen>validatestage
TestConfig2,5.0,2019/3/6 17:48:33,2019/3/6 17:48:40,demo_2,open,3,2,6.9179322,openscreen>validatestage
TestConfig3,5.0,2019/3/6 17:48:43,2019/3/6 17:50:23,demo_1,open,3,3,100.1237885,switchscreen>validatestag
TestConfig3,5.0,2019/3/6 17:48:43,2019/3/6 17:50:23,demo_1,open,3,3,100.1237885,switchscreen>validatestag
TestConfig3,5.0,2019/3/6 17:48:43,2019/3/6 17:50:23,demo_1,open,3,3,100.1237885,switchscreen>validatestag
TestConfig3,5.0,2019/3/6 17:48:43,2019/3/6 17:50:23,demo_1,open,3,3,100.1237885,switchscreen>validatestag
TestConfig3,5.0,2019/3/6 17:48:43,2019/3/6 17:50:23,demo_1,open,3,3,100.1237885,switchscreen>validatestag
TestConfig3,5.0,2019/3/6 17:48:43,2019/3/6 17:50:23,demo_1,open,3,3,100.1237885,switchscreen>validatestag

注意:这个日志是批量生成的,每一次客户端压测当前目录都会生成一个 *.log 的文件。数据采集的时候需要为对应列指定列名。

2、Telegraf 配置

配置 Telegraf.conf

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
bash复制代码[[inputs.logparser]]
## Log files to parse.
## These accept standard unix glob matching rules, but with the addition of
## ** as a "super asterisk". ie:
## /var/log/**.log -> recursively find all .log files in /var/log
## /var/log/*/*.log -> find all .log files with a parent dir in /var/log
## /var/log/apache.log -> only tail the apache log file
files = ["C:\\Release\\TestConfigLog\\*.log"]

## Read files that currently exist from the beginning. Files that are created
## while telegraf is running (and that match the "files" globs) will always
## be read from the beginning.
from_beginning = false

## Method used to watch for file updates. Can be either "inotify" or "poll".
watch_method = "poll"

## Parse logstash-style "grok" patterns:
## Telegraf built-in parsing patterns: https://goo.gl/dkay10
[inputs.logparser.grok]
## This is a list of patterns to check the given log file(s) for.
## Note that adding patterns here increases processing time. The most
## efficient configuration is to have one pattern per logparser.
## Other common built-in patterns are:
## %{COMMON_LOG_FORMAT} (plain apache & nginx access logs)
## %{COMBINED_LOG_FORMAT} (access logs + referrer & agent)
patterns = ['%{WORD:scene},%{NUMBER:version:float},%{TS_WIN:begtime},%{TS_WIN:endtime},%{WORD:canvasName},%{WORD:canvasCase},%{NUMBER:totaltimes:int},%{NUMBER:current:int},%{NUMBER:time_consuming:float}']

## Name of the outputted measurement name.
measurement = "bigscreen"

## Full path(s) to custom pattern files.
## custom_pattern_files = []

## Custom patterns can also be defined here. Put one pattern per line.
custom_patterns = 'TS_WIN %{YEAR}/%{MONTHNUM}/%{MONTHDAY}[T ]%{HOUR}:?%{MINUTE}(?::?%{SECOND})?'

## Timezone allows you to provide an override for timestamps that
## don't already include an offset
## e.g. 04/06/2016 12:41:45 data one two 5.43µs
##
## Default: "" which renders UTC
## Options are as follows:
## 1. Local -- interpret based on machine localtime
## 2. "Canada/Eastern" -- Unix TZ values like those found in https://en.wikipedia.org/wiki/List_of_tz_database_time_zones
## 3. UTC -- or blank/unspecified, will return timestamp in UTC
timezone = "Local"

注意:

  • files = [" *.log"],解决了当前目录多文件对象匹配的需求
  • watch_method = "poll",设置轮训获取文件更新
  • custom_patterns,自定义一个时间格式化模式匹配

InfluxDB 生成的指标数据如下:

1
2
3
4
5
6
7
8
9
sql复制代码> select * from bigscreen limit 5
name: bigscreen
time begtime canvasCase canvasName current endtime host path scene time_consuming totaltimes version
---- ------- ---------- ---------- ------- ------- ---- ---- ----- -------------- ---------- -------
1552296231630588200 2019/3/6 17:48:43 open demo_1 3 2019/3/6 17:50:23 DESKTOP-MLD0KTS C:\Users\htsd\Desktop\VBI5\Release\TestConfigLog\1.log TestConfig3 100.1237885 3 5
1552296231630588201 2019/3/6 17:48:43 open demo_1 3 2019/3/6 17:50:23 DESKTOP-MLD0KTS C:\Users\htsd\Desktop\VBI5\Release\TestConfigLog\1.log TestConfig3 100.1237885 3 5
1552296231630588202 2019/3/6 17:48:43 open demo_1 3 2019/3/6 17:50:23 DESKTOP-MLD0KTS C:\Users\htsd\Desktop\VBI5\Release\TestConfigLog\1.log TestConfig3 100.1237885 3 5
1552296231631587700 2019/3/6 17:48:43 open demo_1 3 2019/3/6 17:50:23 DESKTOP-MLD0KTS C:\Users\htsd\Desktop\VBI5\Release\TestConfigLog\1.log TestConfig3 100.1237885 3 5
1552297570005076300 2019/3/6 17:48:23 open demo_1 1 2019/3/6 17:48:30 DESKTOP-MLD0KTS C:\Users\htsd\Desktop\VBI5\Release\TestConfigLog\12.log TestConfig1 6.8270219 3 5

列名都是我们自定义的。

四、Grafana设置

整体的考虑是使用一个表格进行数据展示,支持按个别字段筛选。

在这里插入图片描述

在这里插入图片描述

设置筛选变量,满足字段过滤筛选要求:

在这里插入图片描述

创建Dashboard,并选择表格组件:

在这里插入图片描述

定义数据源:
在这里插入图片描述

设置表格字段样式,对时间字段进行格式化

在这里插入图片描述

对响应时间字段进行不同级别高亮设置(绿,黄,红三个颜色)

在这里插入图片描述

实际的动态效果如下:

在这里插入图片描述

五、小结

本文通过一个简单的示例展示了 Telegraf+InfluxDB+Grafana 如何对结构化日志进行实时监控,当然也支持非结构化日志采集,大家有兴趣的话也可以自己动手实践。

相关资料:

  • github.com/zuozewei/bl…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何快速过滤出一次请求的所有日志?

发表于 2021-11-24

大家好,我是不才陈某~

出现故障时,我们经常需要获取一次请求流程里的所有日志进行定位。

如果请求只在一个线程里处理,则我们可以通过线程ID来过滤日志,但如果请求包含异步线程的处理,那么光靠线程ID就显得捉襟见肘了。

华为IoT平台,提供了接收设备上报数据的能力, 当数据到达平台后,平台会进行一些复杂的业务逻辑处理,如数据存储,规则引擎,数据推送,命令下发等等。由于这个逻辑之间没有强耦合的关系,所以通常是异步处理。如何将一次数据上报请求中包含的所有业务日志快速过滤出来,就是本文要介绍的。

1、正文

SLF4J日志框架提供了一个MDC(Mapped Diagnostic Contexts)工具类,谷歌翻译为映射的诊断上下文 ,从字面上很难理解,我们可以先实战一把。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码public class Main {

private static final String KEY = "requestId";
private static final Logger logger = LoggerFactory.getLogger(Main.class);

public static void main(String[] args) {

// 入口传入请求ID
MDC.put(KEY, UUID.randomUUID().toString());

// 打印日志
logger.debug("log in main thread 1");
logger.debug("log in main thread 2");
logger.debug("log in main thread 3");

// 出口移除请求ID
MDC.remove(KEY);

}

}

我们在main函数的入口调用MDC.put()方法传入请求ID,在出口调用MDC.remove()方法移除请求ID。配置好log4j2.xml 文件后,运行main函数,可以在控制台看到以下日志输出:

1
2
3
java复制代码2018-02-17 13:19:52.606 {requestId=f97ea0fb-2a43-40f4-a3e8-711f776857d0} [main] DEBUG cn.wudashan.Main - log in main thread 1
2018-02-17 13:19:52.609 {requestId=f97ea0fb-2a43-40f4-a3e8-711f776857d0} [main] DEBUG cn.wudashan.Main - log in main thread 2
2018-02-17 13:19:52.609 {requestId=f97ea0fb-2a43-40f4-a3e8-711f776857d0} [main] DEBUG cn.wudashan.Main - log in main thread 3

从日志中可以明显地看到花括号中包含了 (映射的)请求ID(requestId),这其实就是我们定位(诊断)问题的关键字(上下文) 。有了MDC工具,只要在接口或切面植入put()和remove()代码,在现网定位问题时,我们就可以通过grep requestId=xxx *.log快速的过滤出某次请求的所有日志。

2、进阶

然而,MDC工具真的有我们所想的这么方便吗?回到我们开头,一次请求可能涉及多线程异步处理,那么在多线程异步的场景下,它是否还能正常运作呢?Talk is cheap, show me the code。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class Main {

private static final String KEY = "requestId";
private static final Logger logger = LoggerFactory.getLogger(Main.class);

public static void main(String[] args) {

// 入口传入请求ID
MDC.put(KEY, UUID.randomUUID().toString());

// 主线程打印<font style="color: #1e6bb8;word-wrap: break-word;font-weight: bold;border-bottom: 1px solid">日志</font>
logger.debug("log in main thread");

// 异步线程打印<font style="color: #1e6bb8;word-wrap: break-word;font-weight: bold;border-bottom: 1px solid">日志</font>
new Thread(new Runnable() {
@Override
public void run() {
logger.debug("log in other thread");
}
}).start();

// 出口移除请求ID
MDC.remove(KEY);

}

}

代码里我们新起了一个异步线程,并在匿名对象Runnable的run()方法打印日志。运行main函数,可以在控制台看到以下日志输出:

1
2
java复制代码2018-02-17 14:05:43.487 {requestId=e6099c85-72be-4986-8a28-de6bb2e52b01} [main] DEBUG cn.wudashan.Main - log in main thread
2018-02-17 14:05:43.490 {} [Thread-1] DEBUG cn.wudashan.Main - log in other thread

不幸的是,请求ID在异步线程里不打印了。这是怎么回事呢?

要解决这个问题,我们就得知道MDC的实现原理。由于篇幅有限,这里就暂不详细介绍,MDC之所以在异步线程中不生效是因为底层采用ThreadLocal 作为数据结构,我们调用MDC.put()方法传入的请求ID只在当前线程有效。

感兴趣的小伙伴可以自己深入一下代码细节。

知道了原理那么解决这个问题就轻而易举了,我们可以使用装饰器模式 ,新写一个MDCRunnable类对Runnable接口进行一层装饰。

在创建MDCRunnable类时保存当前线程的MDC值,在执行run()方法时再将保存的MDC值拷贝到异步线程中去。

代码实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public class MDCRunnable implements Runnable {

private final Runnable runnable;

private final Map<String, String> map;

public MDCRunnable(Runnable runnable) {
this.runnable = runnable;
// 保存当前线程的MDC值
this.map = MDC.getCopyOfContextMap();
}

@Override
public void run() {
// 传入已保存的MDC值
for (Map.Entry<String, String> entry : map.entrySet()) {
MDC.put(entry.getKey(), entry.getValue());
}
// 装饰器模式,执行run方法
runnable.run();
// 移除已保存的MDC值
for (Map.Entry<String, String> entry : map.entrySet()) {
MDC.remove(entry.getKey());
}
}

}

接着,我们需要对main函数里创建的Runnable实现类进行装饰:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码public class Main {

private static final String KEY = "requestId";
private static final Logger logger = LoggerFactory.getLogger(Main.class);
private static final ExecutorService EXECUTOR = Executors.newSingleThreadExecutor();

public static void main(String[] args) {

// 入口传入请求ID
MDC.put(KEY, UUID.randomUUID().toString());

// 主线程打印<font style="color: #1e6bb8;word-wrap: break-word;font-weight: bold;border-bottom: 1px solid">日志</font>
logger.debug("log in main thread");

// 异步线程打印<font style="color: #1e6bb8;word-wrap: break-word;font-weight: bold;border-bottom: 1px solid">日志</font>,用MDCRunnable装饰Runnable
new Thread(new MDCRunnable(new Runnable() {
@Override
public void run() {
logger.debug("log in other thread");
}
})).start();

// 异步线程池打印日志,用MDCRunnable装饰Runnable
EXECUTOR.execute(new MDCRunnable(new Runnable() {
@Override
public void run() {
logger.debug("log in other thread pool");
}
}));
EXECUTOR.shutdown();

// 出口移除请求ID
MDC.remove(KEY);

}

}

执行main函数,将会输出以下日志:

1
2
3
java复制代码2018-03-04 23:44:05.343 {requestId=5ee2a117-e090-41d8-977b-cef5dea09d34} [main] DEBUG cn.wudashan.Main - log in main thread
2018-03-04 23:44:05.346 {requestId=5ee2a117-e090-41d8-977b-cef5dea09d34} [Thread-1] DEBUG cn.wudashan.Main - log in other thread
2018-03-04 23:44:05.347 {requestId=5ee2a117-e090-41d8-977b-cef5dea09d34} [pool-2-thread-1] DEBUG cn.wudashan.Main - log in other thread pool

Congratulations! 经过我们的努力,最终在异步线程和线程池中都有requestId打印了!

3、总结

本文讲述了如何使用MDC工具来快速过滤一次请求的所有日志,并通过装饰器模式使得MDC工具在异步线程里也能生效。

有了MDC,再通过AOP技术对所有的切面植入requestId,就可以将整个系统的任意流程的日志过滤出来。

使用MDC工具,在开发自测阶段,可以极大地节省定位问题的时间,提升开发效率;在运维维护阶段,可以快速地收集相关日志信息,加快分析速度。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

leetcode 1606 Find Servers Th

发表于 2021-11-24

「这是我参与11月更文挑战的第24天,活动详情查看:2021最后一次更文挑战」

描述

You have k servers numbered from 0 to k-1 that are being used to handle multiple requests simultaneously. Each server has infinite computational capacity but cannot handle more than one request at a time. The requests are assigned to servers according to a specific algorithm:

  • The ith (0-indexed) request arrives.
  • If all servers are busy, the request is dropped (not handled at all).
  • If the (i % k)th server is available, assign the request to that server.
  • Otherwise, assign the request to the next available server (wrapping around the list of servers and starting from 0 if necessary). For example, if the ith server is busy, try to assign the request to the (i+1)th server, then the (i+2)th server, and so on.

You are given a strictly increasing array arrival of positive integers, where arrival[i] represents the arrival time of the ith request, and another array load, where load[i] represents the load of the ith request (the time it takes to complete). Your goal is to find the busiest server(s). A server is considered busiest if it handled the most number of requests successfully among all the servers.

Return a list containing the IDs (0-indexed) of the busiest server(s). You may return the IDs in any order.

Example 1:

1
2
3
4
5
6
7
8
vbscript复制代码Input: k = 3, arrival = [1,2,3,4,5], load = [5,2,3,3,3] 
Output: [1]
Explanation:
All of the servers start out available.
The first 3 requests are handled by the first 3 servers in order.
Request 3 comes in. Server 0 is busy, so it's assigned to the next available server, which is 1.
Request 4 comes in. It cannot be handled since all servers are busy, so it is dropped.
Servers 0 and 2 handled one request each, while server 1 handled two requests. Hence server 1 is the busiest server.

Example 2:

1
2
3
4
5
6
vbscript复制代码Input: k = 3, arrival = [1,2,3,4], load = [1,2,1,2]
Output: [0]
Explanation:
The first 3 requests are handled by first 3 servers.
Request 3 comes in. It is handled by server 0 since the server is available.
Server 0 handled two requests, while servers 1 and 2 handled one request each. Hence server 0 is the busiest server.

Example 3:

1
2
3
vbnet复制代码Input: k = 3, arrival = [1,2,3], load = [10,12,11]
Output: [0,1,2]
Explanation: Each server handles a single request, so they are all considered the busiest.

Example 4:

1
2
ini复制代码Input: k = 3, arrival = [1,2,3,4,8,9,10], load = [5,2,10,3,1,2,2]
Output: [1]

Example 5:

1
2
ini复制代码Input: k = 1, arrival = [1], load = [1]
Output: [0]

Note:

1
2
3
4
5
ini复制代码1 <= k <= 10^5
1 <= arrival.length, load.length <= 10^5
arrival.length == load.length
1 <= arrival[i], load[i] <= 10^9
arrival is strictly increasing.

解析

根据题意,有 k 个服务器,编号从 0 到 k-1 ,用于同时处理多个请求。每个服务器都有无限的计算能力,但一次不能处理多个请求。根据特定算法将请求分配给服务器:

  • 第 i 个( 从 0 开始索引)请求到达
  • 如果所有服务器都忙,则请求将被丢弃(根本不处理)
  • 如果第 (i % k) 个服务器可用,则将请求分配给该服务器
  • 否则,将请求分配给下一个可用的服务器(环绕服务器列表并在必要时从 0 开始),例如,如果第 i 个服务器繁忙,则尝试将请求分配给第 (i+1) 个服务器,然后是第 (i+2) 个服务器,依此类推。

给定一个严格递增的正整数数组 arrival ,其中 arrival[i] 表示第 i 个请求的到达时间,以及另一个数组 load ,其中 load[i] 表示第 i 个请求的负载(完成所需的时间)。目标是找到最繁忙的服务器。如果服务器在所有服务器中成功处理的请求数量最多,则该服务器被认为是最繁忙的。返回包含最繁忙服务器的 ID 的列表。可以按任何顺序返回 ID。

题目很繁杂,但是理解之后也比较简单,解法的主要思想是维护两个列表,一个是空闲服务器列表 free ,另一个是正在执行请求的服务器列表 buzy ,每次得到一个新的请求的时候,如果 free 为空,丢弃该请求即可,如果不为空从第 (i % k) 个服务器中向后找空闲服务器,如果没有则从第 0 个开始找空闲服务器。

有两个关键的地方需要注意,一个是初始化 free 和 buzy 的时候选用能自动排序的类,另一个是在 free 中找空闲服务器时最好有内置函数,如果没有则用二分法进行找,如果这两个步骤处理不好很容易超时,我的代码也是刚好过,耗时还是很严重。

解答

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
python复制代码from sortedcontainers import SortedList
class Solution(object):
def busiestServers(self, k, arrival, load):
"""
:type k: int
:type arrival: List[int]
:type load: List[int]
:rtype: List[int]
"""
free = SortedList([i for i in range(k)])
buzy = SortedList([],key=lambda x:-x[1])
count = {i:0 for i in range(k)}
for i,start in enumerate(arrival):
while(buzy and buzy[-1][1]<=start):
pair = buzy.pop()
free.add(pair[0])
if not free: continue
id = self.findServer(free, i%k)
count[id] += 1
free.remove(id)
buzy.add([id, start+load[i]])
result = []
MAX = max(count.values())
for k,v in count.items():
if v == MAX:
result.append(k)
return result
def findServer(self, free, id):
idx = bisect.bisect_right(free, id-1)
if idx!=len(free):
return free[idx]
return free[0]

运行结果

1
2
erlang复制代码Runtime: 5120 ms, faster than 10.00% of Python online submissions for Find Servers That Handled Most Number of Requests.
Memory Usage: 38.6 MB, less than 50.00% of Python online submissions for Find Servers That Handled Most Number of Requests.

原题链接:leetcode.com/problems/fi…

您的支持是我最大的动力

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring的BeanUtils的copyPropertie

发表于 2021-11-24

这是我参与11月更文挑战的第24天,活动详情查看:2021最后一次更文挑战

背景
最近项目中在和第三方进行联调一个接口,我们这边发送http请求给对方,然后接收对方的回应,代码都是老代码。根据注释,对方的SDK中写好的Request类有一个无法序列化的bug,所以这边重新写了一个Request类,基本属性都是相同的,但是重点是有一个属性是静态内部类,还有两个是list属性,类似于下面这样:

1
2
3
java复制代码private List<Order> orders;
private AddRequest.Ticket ticket;
private List<Payment> payments;

AddRequest就是我们自己重写的请求类,他们SDK中的请求类是MixAddRequest,我们组装好请求参数后利用Spring的BeanUtils的copyProperties方法将AddRequest中的属性拷贝到MixAddRequest,然后发送请求。到此为止,照理说一切完美

结果请求失败,纳尼?对方说缺少一个必要的字段,参数校验不通过,一查字段名称,是Ticket这个类里面的某个字段,赶紧看代码,心里充满对老代码的自信,想着一定是哪里搞错了,或者是他们那边偷偷动了代码,把字段从可选改为了必选,嘿嘿

果然在代码里找到了设置的地方,这下应该是他们的问题确信无疑了,再开一把调试,准备宣判他们的死刑。结果发现发给他们的请求就是没有这个字段。。。中间只有一个Spring的copy属性的方法,当时觉得很诡异

由于中间只有这么一行代码,玄机肯定在这里面,初步怀疑是两个静态内部类不同导致,所以自己写Demo,准备搞一把这个BeanUtils的copyProperties方法,写了两个类和一个Main,@Data和@ToString是lombok插件的注解,这里用来自动生成getter和setter方法以及toString方法
原始类:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@ToString
@Data
public class CopyTest1 {
public String outerName;
public CopyTest1.InnerClass innerClass;
public List<CopyTest1.InnerClass> clazz;

@ToString
@Data
public static class InnerClass {
public String InnerName;
}
}

目标类:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@ToString
@Data
public class CopyTest2 {
public String outerName;
public CopyTest2.InnerClass innerClass;
public List<CopyTest2.InnerClass> clazz;

@ToString
@Data
public static class InnerClass {
public String InnerName;
}
}

测试代码:

1
2
3
4
5
6
7
8
9
10
11
java复制代码        CopyTest1 test1 = new CopyTest1();
test1.outerName = "hahaha";
CopyTest1.InnerClass innerClass = new CopyTest1.InnerClass();
innerClass.InnerName = "hohoho";
test1.innerClass = innerClass;

System.out.println(test1.toString());
CopyTest2 test2 = new CopyTest2();
BeanUtils.copyProperties(test1, test2);

System.out.println(test2.toString());

这里遇到了第一个坑,一开始图省事,属性写为public,想着省掉了getter和setter方法,没加@Data注解,结果运行完test2所有属性都为null,一个都没copy过去,加上@Data继续跑,果然,基本属性(String)复制过去了,但是内部类在test2中还是null。那就验证了真的是内部类的问题,有点不敢相信自己的眼睛,毕竟线上跑了这么久的代码。。。

知道了问题,总要想着怎么解决吧,所以需要单独设置一下内部类,单独copy,如果内部类的bean属性较多或者递归的bean属性很多,那可以自己封装一个方法,用于递归拷贝,我这里只有一层,所以直接额外copy一次

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码        CopyTest1 test1 = new CopyTest1();
test1.outerName = "hahaha";
CopyTest1.InnerClass innerClass = new CopyTest1.InnerClass();
innerClass.InnerName = "hohoho";
test1.innerClass = innerClass;

System.out.println(test1.toString());
CopyTest2 test2 = new CopyTest2();
test2.innerClass = new CopyTest2.InnerClass();
BeanUtils.copyProperties(test1, test2);
BeanUtils.copyProperties(test1.innerClass, test2.innerClass);

System.out.println(test2.toString());

记得内部类的属性也是要有setter方法的,不然也会导致copy失败,大家还记得我开头说到还有两个List属性的吧,为什么要提到这个呢?你猜

其实list里面的两个类也都是重写的内部类,他们也是不同的,当时他们却顺利copy过去了,为什么呢?因为java的泛型只在编译期起作用,在运行期,list属性就是一个存放Object的集合,在copy后,MixAddRequest的orders属性其实是一个Order类的集合,但却不是自己内部类的集合,是AddRequest的内部类Order的集合,但因为对方是解析json的,所以没有发生错误。。。

总结

  1. Spring的BeanUtils的CopyProperties方法需要对应的属性有getter和setter方法;
  2. 如果存在属性完全相同的内部类,但是不是同一个内部类,即分别属于各自的内部类,则spring会认为属性不同,不会copy;
  3. 泛型只在编译期起作用,不能依靠泛型来做运行期的限制;
  4. 最后,spring和apache的copy属性的方法源和目的参数的位置正好相反,所以导包和调用的时候都要注意一下。

最后的最后

附上spring的源码,getWriteMethod是jdk的方法,会去取set开头的方法,所以没有setter方法是不行滴。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码private static void copyProperties(Object source, Object target, @Nullable Class<?> editable, @Nullable String... ignoreProperties) throws BeansException {
Assert.notNull(source, "Source must not be null");
Assert.notNull(target, "Target must not be null");
Class<?> actualEditable = target.getClass();
if (editable != null) {
if (!editable.isInstance(target)) {
throw new IllegalArgumentException("Target class [" + target.getClass().getName() + "] not assignable to Editable class [" + editable.getName() + "]");
}

actualEditable = editable;
}

PropertyDescriptor[] targetPds = getPropertyDescriptors(actualEditable);
List<String> ignoreList = ignoreProperties != null ? Arrays.asList(ignoreProperties) : null;
PropertyDescriptor[] var7 = targetPds;
int var8 = targetPds.length;

for(int var9 = 0; var9 < var8; ++var9) {
PropertyDescriptor targetPd = var7[var9];
Method writeMethod = targetPd.getWriteMethod();
if (writeMethod != null && (ignoreList == null || !ignoreList.contains(targetPd.getName()))) {
PropertyDescriptor sourcePd = getPropertyDescriptor(source.getClass(), targetPd.getName());
if (sourcePd != null) {
Method readMethod = sourcePd.getReadMethod();
if (readMethod != null && ClassUtils.isAssignable(writeMethod.getParameterTypes()[0], readMethod.getReturnType())) {
try {
if (!Modifier.isPublic(readMethod.getDeclaringClass().getModifiers())) {
readMethod.setAccessible(true);
}

Object value = readMethod.invoke(source);
if (!Modifier.isPublic(writeMethod.getDeclaringClass().getModifiers())) {
writeMethod.setAccessible(true);
}

writeMethod.invoke(target, value);
} catch (Throwable var15) {
throw new FatalBeanException("Could not copy property '" + targetPd.getName() + "' from source to target", var15);
}
}
}
}
}

}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringBoot 实战:通过 BeanPostProce

发表于 2021-11-24

「这是我参与11月更文挑战的第23天,活动详情查看:2021最后一次更文挑战」

本文被《Spring Boot 实战》专栏收录。

你好,我是看山。

在分布式系统中,我们会需要 ID 生成器的组件,这个组件可以实现帮助我们生成顺序的或者带业务含义的 ID。

目前有很多经典的 ID 生成方式,比如数据库自增列(自增主键或序列)、Snowflake 算法、美团 Leaf 算法等等,所以,会有一些公司级或者业务级的 ID 生成器组件的诞生。本文就是通过 BeanPostProcessor 实现动态注入 ID 生成器的实战。

在 Spring 中,实现注入的方式很多,比如 springboot 的 starter,在自定义的 Configuration 中初始化 ID 生成器的 Bean,业务代码中通过@AutoWired或者@Resource注入即可,开箱即用。这种方式简单直接,但是缺点也是过于简单,缺少了使用方自定义的入口。

考虑一下实际场景,在同一个业务单据中,要保持 ID 的唯一,但是在不同单据中,可以重复。而且,这些算法在生成 ID 的时候,为了保持多线程返回结果唯一,都会锁定共享资源。如果不同业务,并发情景不同,可能低并发的业务被高并发的业务阻塞获取 ID,造成一些性能的损失。所以,我们要考虑将 ID 生成器,根据业务隔离开,这样 springboot 的 starter 就会显得不够灵活了。

实现

根据上面的需求,我们可以分几步实现我们的逻辑:

  1. 自定义属性注解,用于判断是否需要注入属性对象
  2. 定义 ID 生成器接口、实现类,以及工厂类,工厂类是为了根据定义创建不同的 ID 生成器实现对象
  3. 定义 BeanPostProcessor,查找使用自定义注解定义的属性,实现注入

自定义注解

首先自定义一个注解,可以定义一个value属性,作为隔离业务的标识:

1
2
3
4
5
6
7
8
9
10
java复制代码@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD, ElementType.METHOD})
public @interface IdGeneratorClient {
    /**
     * ID 生成器名称
     *
     * @return
     */
    String value() default "DEFAULT";
}

定义 ID 生成器

定义 ID 生成器的接口:

1
2
3
4
5
java复制代码public interface IdGenerator {
    String groupName();

    long nextId();
}

实现 ID 生成器接口,偷懒使用AtomicLong实现自增,同时考虑 ID 生成器是分组的,通过ConcurrentHashMap实现 ID 生成器的持有:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码class DefaultIdGenerator implements IdGenerator {
    private static final Map<String, AtomicLong> ID_CACHE = new ConcurrentHashMap<>(new HashMap<>());
    private final String groupName;

    DefaultIdGenerator(final String groupName) {
        this.groupName = groupName;
        synchronized (ID_CACHE) {
            ID_CACHE.computeIfAbsent(groupName, key -> new AtomicLong(1));
        }
    }

    @Override
    public String groupName() {
        return this.groupName;
    }

    @Override
    public long nextId() {
        return ID_CACHE.get(this.groupName).getAndIncrement();
    }
}

如前面设计的,我们需要一个工厂类来创建 ID 生成器,示例中使用最简单的实现,我们真正使用的时候,还可以通过更加灵活的 SPI 实现(关于 SPI 的实现,这里挖个坑,后面专门写一篇填坑):

1
2
3
4
5
6
7
8
9
java复制代码public enum IdGeneratorFactory {
    INSTANCE;

    private static final Map<String, IdGenerator> ID_GENERATOR_MAP = new ConcurrentHashMap<>(new HashMap<>());

    public synchronized IdGenerator create(final String groupName) {
        return ID_GENERATOR_MAP.computeIfAbsent(groupName, key -> new DefaultIdGenerator(groupName));
    }
}

定义 BeanPostProcessor

前面都是属于基本操作,这里才是扩展的核心。我们的实现逻辑是:

  1. 扫描 bean 的所有属性,然后找到定义了IdGeneratorClient注解的属性
  2. 获取注解的value值,作为 ID 生成器的分组标识
  3. 使用IdGeneratorFactory这个工厂类生成 ID 生成器实例,这里会返回新建的或已经定义的实例
  4. 通过反射将 ID 生成器实例写入 bean
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码public class IdGeneratorBeanPostProcessor implements BeanPostProcessor {

    @Override
    public Object postProcessBeforeInitialization(final Object bean, final String beanName) throws BeansException {
        return bean;
    }

    @Override
    public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
        parseFields(bean);
        return bean;
    }

    private void parseFields(final Object bean) {
        if (bean == null) {
            return;
        }
        Class<?> clazz = bean.getClass();
        parseFields(bean, clazz);

        while (clazz.getSuperclass() != null && !clazz.getSuperclass().equals(Object.class)) {
            clazz = clazz.getSuperclass();
            parseFields(bean, clazz);
        }
    }

    private void parseFields(final Object bean, Class<?> clazz) {
        if (bean == null || clazz == null) {
            return;
        }

        for (final Field field : clazz.getDeclaredFields()) {
            try {
                final IdGeneratorClient annotation = AnnotationUtils.getAnnotation(field, IdGeneratorClient.class);
                if (annotation == null) {
                    continue;
                }

                final String groupName = annotation.value();

                final Class<?> fieldType = field.getType();
                if (fieldType.equals(IdGenerator.class)) {
                    final IdGenerator idGenerator = IdGeneratorFactory.INSTANCE.create(groupName);
                    invokeSetField(bean, field, idGenerator);
                    continue;
                }

                throw new RuntimeException("未知字段类型无法初始化,bean: " + bean + ",field: " + field);
            } catch (Throwable t) {
                throw new RuntimeException("初始化字段失败,bean=" + bean + ",field=" + field, t);
            }
        }
    }

    private void invokeSetField(final Object bean, final Field field, final Object param) {
        ReflectionUtils.makeAccessible(field);
        ReflectionUtils.setField(field, bean, param);
    }
}

实现BeanPostProcessor接口需要完成postProcessBeforeInitialization和postProcessAfterInitialization两个方法的定义。下图是 Spring 中 Bean 的实例化过程:

图片

Spring 中 Bean 的实例化过程图示

从图中可以知道,Spring 调用BeanPostProcessor的这两个方法时,bean 已经被实例化,所有能注入的属性都已经被注入了,是一个完整的 bean。而且两个方法的返回值,可以是原来的 bean 实例,也可以是包装后的实例,这就要看我们的定义了。

测试我们的代码

写一个测试用例,验证我们的实现是否生效:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码@SpringBootTest
class SpringBeanPostProcessorApplicationTests {
    @IdGeneratorClient
    private IdGenerator defaultIdGenerator;
    @IdGeneratorClient("group1")
    private IdGenerator group1IdGenerator;

    @Test
    void contextLoads() {
        Assert.notNull(defaultIdGenerator, "注入失败");
        System.out.println(defaultIdGenerator.groupName() + " => " + defaultIdGenerator.nextId());

        Assert.notNull(group1IdGenerator, "注入失败");
        for (int i = 0; i < 5; i++) {
            System.out.println(defaultIdGenerator.groupName() + " => " + defaultIdGenerator.nextId());
            System.out.println(group1IdGenerator.groupName() + " => " + group1IdGenerator.nextId());
        }
    }

}

运行结果为:

1
2
3
4
5
6
7
8
9
10
11
ini复制代码DEFAULT => 1
DEFAULT => 2
group1 => 1
DEFAULT => 3
group1 => 2
DEFAULT => 4
group1 => 3
DEFAULT => 5
group1 => 4
DEFAULT => 6
group1 => 5

可以看到,默认的 ID 生成器与定义名称为 group1 的 ID 生成器是分别生成的,符合预期。

文末思考

我们实现了通过BeanPostProcessor实现自动注入自定义的业务对象,上面的实现还比较简单,有很多可以扩展的地方,比如工厂方法实现,可以借助 SPI 的方式更加灵活的创建 ID 生成器对象。同时,考虑到分布式场景,我们还可以在 ID 生成器实现类中,通过注入 rpc 实例,实现远程 ID 生成逻辑。

玩法无限,就看我们的想象了。可以关注公众号「看山的小屋」,回复“spring”获取源码。

推荐阅读

  • SpringBoot 实战:一招实现结果的优雅响应
  • SpringBoot 实战:如何优雅的处理异常
  • SpringBoot 实战:通过 BeanPostProcessor 动态注入 ID 生成器
  • SpringBoot 实战:自定义 Filter 优雅获取请求参数和响应结果
  • SpringBoot 实战:优雅的使用枚举参数
  • SpringBoot 实战:优雅的使用枚举参数(原理篇)
  • SpringBoot 实战:在 RequestBody 中优雅的使用枚举参数
  • SpringBoot 实战:在 RequestBody 中优雅的使用枚举参数(原理篇)
  • SpringBoot 实战:JUnit5+MockMvc+Mockito 做好单元测试
  • SpringBoot 实战:加载和读取资源文件内容

你好,我是看山。游于码界,戏享人生。如果文章对您有帮助,请点赞、收藏、关注。欢迎关注公众号「看山的小屋」,发现不一样的世界。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Cloud Gateway修改请求和响应bod

发表于 2021-11-24

欢迎访问我的GitHub

github.com/zq2599/blog…

内容:所有原创文章分类汇总及配套源码,涉及Java、Docker、Kubernetes、DevOPS等;

本篇概览

  • 作为《Spring Cloud Gateway实战》系列的第九篇,咱们聊聊如何用Spring Cloud Gateway修改原始请求和响应内容,以及修改过程中遇到的问题
  • 首先是修改请求body,如下图,浏览器是请求发起方,真实参数只有user-id,经过网关时被塞入字段user-name,于是,后台服务收到的请求就带有user-name字段了

在这里插入图片描述

  • 其次是修改响应,如下图,服务提供方provider-hello的原始响应只有response-tag字段,经过网关时被塞入了gateway-response-tag字段,最终浏览器收到的响应就是response-tag和gateway-response-tag两个字段:

在这里插入图片描述

  • 总的来说,今天要做具体事情如下:
  1. 准备工作:在服务提供者的代码中新增一个web接口,用于验证Gateway的操作是否有效
  2. 介绍修改请求body和响应body的套路
  3. 按套路开发一个过滤器(filter),用于修改请求的body
  4. 按套路开发一个过滤器(filter),用于修改响应的body
  5. 思考和尝试:如何从Gateway返回错误?
  • 在实战过程中,咱们顺便搞清楚两个问题:
  1. 代码配置路由时,如何给一个路由添加多个filter?
  2. 代码配置路由和yml配置是否可以混搭,两者有冲突吗?

源码下载

  • 本篇实战中的完整源码可在GitHub下载到,地址和链接信息如下表所示(github.com/zq2599/blog…%EF%BC%9A)
名称 链接 备注
项目主页 github.com/zq2599/blog… 该项目在GitHub上的主页
git仓库地址(https) github.com/zq2599/blog… 该项目源码的仓库地址,https协议
git仓库地址(ssh) git@github.com:zq2599/blog_demos.git 该项目源码的仓库地址,ssh协议
  • 这个git项目中有多个文件夹,本篇的源码在spring-cloud-tutorials文件夹下,如下图红框所示:

在这里插入图片描述

  • spring-cloud-tutorials文件夹下有多个子工程,本篇的代码是gateway-change-body,如下图红框所示:

在这里插入图片描述

准备工作

  • 为了观察Gateway能否按预期去修改请求和响应的body,咱们给服务提供者provider-hello增加一个接口,代码在Hello.java中,如下:
1
2
3
4
5
java复制代码    @PostMapping("/change")
public Map<String, Object> change(@RequestBody Map<String, Object> map) {
map.put("response-tag", dateStr());
return map;
}
  • 可见新增的web接口很简单:将收到的请求数据作为返回值,在里面添加了一个键值对,然后返回给请求方,有了这个接口,咱们就能通过观察返回值来判断Gateway对请求和响应的操作是否生效
  • 来试一下,先启动nacos(provider-hello需要的)
  • 再运行provider-hello应用,用Postman向其发请求试试,如下图,符合预期:

在这里插入图片描述

  • 准备工作已完成,开始开发吧

修改请求body的套路

  • 如何用Spring Cloud Gateway修改请求的body?来看看其中的套路:
  1. 修改请求body是通过自定义filter实现的
  2. 配置路由及其filter的时候,有yml配置文件和代码配置两种方式可以配置路由,官方文档给出的demo是代码配置的,因此今天咱们也参考官方做法,通过代码来配置路由和过滤器
  3. 在代码配置路由的时候,调用filters方法,该方法的入参是个lambda表达式
  4. 此lambda表达式固定调用modifyRequestBody方法,咱们只要定义好modifyRequestBody方法的三个入参即可
  5. modifyRequestBody方法的第一个入参是输入类型
  6. 第二个入参是返回类型
  7. 第三个是RewriteFunction接口的实现,这个代码需要您自己写,内容是将输入数据转换为返回类型数据具体逻辑,咱们来看官方Demo,也就是上述套路了:
1
2
3
4
5
6
7
8
9
java复制代码@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("rewrite_request_obj", r -> r.host("*.rewriterequestobj.org")
.filters(f -> f.prefixPath("/httpbin")
.modifyRequestBody(String.class, Hello.class, MediaType.APPLICATION_JSON_VALUE,
(exchange, s) -> return Mono.just(new Hello(s.toUpperCase())))).uri(uri))
.build();
}

修改响应body的套路

  • 用Spring Cloud Gateway修改响应body的套路和前面的请求body如出一辙
  1. 通过代码来配置路由和过滤器
  2. 在代码配置路由的时候,调用filters方法,该方法的入参是个lambda表达式
  3. 此lambda表达式固定调用modifyResponseBody方法,咱们只要定义好modifyResponseBody方法的三个入参即可
  4. modifyRequestBody方法的第一个入参是输入类型
  5. 第二个入参是返回类型
  6. 第三个是RewriteFunction接口的实现,这个代码要您自己写,内容是将输入数据转换为返回类型数据具体逻辑,咱们来看官方Demo,其实就是上述套路:
1
2
3
4
5
6
7
8
9
java复制代码@Bean
public RouteLocator routes(RouteLocatorBuilder builder) {
return builder.routes()
.route("rewrite_response_upper", r -> r.host("*.rewriteresponseupper.org")
.filters(f -> f.prefixPath("/httpbin")
.modifyResponseBody(String.class, String.class,
(exchange, s) -> Mono.just(s.toUpperCase()))).uri(uri))
.build();
}
  • 套路总结出来了,接下来,咱们一起撸代码?

按套路开发一个修改请求body的过滤器(filter)

  • 废话不说,在父工程spring-cloud-tutorials下新建子工程gateway-change-body,pom.xml无任何特殊之处,注意依赖spring-cloud-starter-gateway即可
  • 启动类毫无新意:
1
2
3
4
5
6
7
8
9
10
11
java复制代码package com.bolingcavalry.changebody;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class ChangeBodyApplication {
public static void main(String[] args) {
SpringApplication.run(ChangeBodyApplication.class,args);
}
}
  • 配置文件千篇一律:
1
2
3
4
5
6
yml复制代码server:
#服务端口
port: 8081
spring:
application:
name: gateway-change-body
  • 然后是核心逻辑:修改请求body的代码,既RewriteFunction的实现类,代码很简单,将原始的请求body解析成Map对象,取出user-id字段,生成user-name字段放回map,apply方法返回的是个Mono:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码package com.bolingcavalry.changebody.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.factory.rewrite.RewriteFunction;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Map;


@Slf4j
public class RequestBodyRewrite implements RewriteFunction<String, String> {

private ObjectMapper objectMapper;

public RequestBodyRewrite(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

/**
* 根据用户ID获取用户名称的方法,可以按实际情况来内部实现,例如查库或缓存,或者远程调用
* @param userId
* @return
*/
private String mockUserName(int userId) {
return "user-" + userId;
}

@Override
public Publisher<String> apply(ServerWebExchange exchange, String body) {
try {
Map<String, Object> map = objectMapper.readValue(body, Map.class);

// 取得id
int userId = (Integer)map.get("user-id");

// 得到nanme后写入map
map.put("user-name", mockUserName(userId));

// 添加一个key/value
map.put("gateway-request-tag", userId + "-" + System.currentTimeMillis());

return Mono.just(objectMapper.writeValueAsString(map));
} catch (Exception ex) {
log.error("1. json process fail", ex);
// json操作出现异常时的处理
return Mono.error(new Exception("1. json process fail", ex));
}
}
}
  • 然后是按部就班的基于代码实现路由配置,重点是lambda表达式执行modifyRequestBody方法,并且将RequestBodyRewrite作为参数传入:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码package com.bolingcavalry.changebody.config;

import com.bolingcavalry.changebody.function.RequestBodyRewrite;
import com.bolingcavalry.changebody.function.ResponseBodyRewrite;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;

@Configuration
public class FilterConfig {
@Bean
public RouteLocator routes(RouteLocatorBuilder builder, ObjectMapper objectMapper) {
return builder
.routes()
.route("path_route_change",
r -> r.path("/hello/change")
.filters(f -> f
.modifyRequestBody(String.class,String.class,new RequestBodyRewrite(objectMapper))
)
.uri("http://127.0.0.1:8082"))
.build();
}
}
  • 代码写完了,运行工程gateway-change-body,在postman发起请求,得到响应如下图,红框中可见Gateway添加的内容已成功:

在这里插入图片描述

  • 现在修改请求body已经成功,接下来再来修改服务提供者响应的body

修改响应body

  • 接下来开发修改响应body的代码
  • 新增RewriteFunction接口的实现类ResponseBodyRewrite.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
java复制代码package com.bolingcavalry.changebody.function;

import com.fasterxml.jackson.databind.ObjectMapper;
import lombok.extern.slf4j.Slf4j;
import org.reactivestreams.Publisher;
import org.springframework.cloud.gateway.filter.factory.rewrite.RewriteFunction;
import org.springframework.web.server.ServerWebExchange;
import reactor.core.publisher.Mono;
import java.util.Map;

@Slf4j
public class ResponseBodyRewrite implements RewriteFunction<String, String> {

private ObjectMapper objectMapper;

public ResponseBodyRewrite(ObjectMapper objectMapper) {
this.objectMapper = objectMapper;
}

@Override
public Publisher<String> apply(ServerWebExchange exchange, String body) {
try {
Map<String, Object> map = objectMapper.readValue(body, Map.class);

// 取得id
int userId = (Integer)map.get("user-id");

// 添加一个key/value
map.put("gateway-response-tag", userId + "-" + System.currentTimeMillis());

return Mono.just(objectMapper.writeValueAsString(map));
} catch (Exception ex) {
log.error("2. json process fail", ex);
return Mono.error(new Exception("2. json process fail", ex));
}
}
}
  • 路由配置代码中,lambda表达式里面,filters方法内部调用modifyResponseBody,第三个入参是ResponseBodyRewrite:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.bolingcavalry.changebody.config;

import com.bolingcavalry.changebody.function.RequestBodyRewrite;
import com.bolingcavalry.changebody.function.ResponseBodyRewrite;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.cloud.gateway.route.RouteLocator;
import org.springframework.cloud.gateway.route.builder.RouteLocatorBuilder;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.http.MediaType;
import reactor.core.publisher.Mono;

@Configuration
public class FilterConfig {

@Bean
public RouteLocator routes(RouteLocatorBuilder builder, ObjectMapper objectMapper) {
return builder
.routes()
.route("path_route_change",
r -> r.path("/hello/change")
.filters(f -> f
.modifyRequestBody(String.class,String.class,new RequestBodyRewrite(objectMapper))
.modifyResponseBody(String.class, String.class, new ResponseBodyRewrite(objectMapper))
)
.uri("http://127.0.0.1:8082"))
.build();
}
}
  • 还记得咱们的第一个问题吗?通过上面的代码,您应该已经看到了答案:用代码配置路由时,多个过滤器的配置方法就是在filters方法中反复调用内置的过滤器相关API,下图红框中的都可以:

在这里插入图片描述

  • 运行服务,用Postman验证效果,如下图红框,Gateway在响应body中成功添加了一个key&value:

在这里插入图片描述

代码配置路由和yml配置是否可以混搭?

  • 前面有两个问题,接下来回答第二个,咱们在application.yml中增加一个路由配置:
1
2
3
4
5
6
7
8
9
10
11
12
13
yml复制代码server:
#服务端口
port: 8081
spring:
application:
name: gateway-change-body
cloud:
gateway:
routes:
- id: path_route_str
uri: http://127.0.0.1:8082
predicates:
- Path=/hello/str
  • 把gateway-change-body服务启动起来,此时已经有了两个路由配置,一个在代码中,一个在yml中,先试试yml中的这个,如下图没问题:

在这里插入图片描述

  • 再试试代码配置的路由,如下图,结论是代码配置路由和yml配置可以混搭

在这里插入图片描述

如何处理异常

  • 还有个问题必须要面对:修改请求或者响应body的过程中,如果发现问题需要提前返回错误(例如必要的字段不存在),代码该怎么写?
  • 咱们修改请求body的代码集中在RequestBodyRewrite.java,增加下图红框内容:

在这里插入图片描述

  • 再来试试,这次请求参数中不包含user-id,收到Gateway返回的错误信息如下图:

在这里插入图片描述

  • 看看控制台,能看到代码中抛出的异常信息:

在这里插入图片描述

  • 此时,聪明的您应该发现问题所在了:咱们想告诉客户端具体的错误,但实际上客户端收到的是被Gateway框架处理后的内容
  • 篇幅所限,上述问题从分析到解决的过程,就留给下一篇文章吧
  • 本篇的最后,请容许欣宸唠叨两句,聊聊为何要网关来修改请求和响应body的内容,如果您没兴趣还请忽略

网关(Gateway)为什么要做这些?

  • 看过开篇的两个图,聪明的您一定发现了问题:为什么要破坏原始数据,一旦系统出了问题如何定位是服务提供方还是网关?
  • 按照欣宸之前的经验,尽管网关会破坏原始数据,但只做一些简单固定的处理,一般以添加数据为主,网关不了解业务,最常见的就是鉴权、添加身份或标签等操作
  • 前面的图中确实感受不到网关的作用,但如果网关后面有多个服务提供者,如下图,这时候诸如鉴权、获取账号信息等操作由网关统一完成,比每个后台分别实现一次更有效率,后台可以更加专注于自身业务:

在这里插入图片描述

  • 经验丰富的您可能会对我的狡辩不屑一顾:网关统一鉴权、获取身份,一般会把身份信息放入请求的header中,也不会修改请求和响应的内容啊,欣宸前面的一堆解释还是没说清楚为啥要在网关位置修改请求和响应的内容!
  • 好吧,面对聪明的您,我摊牌了:本篇只是从技术上演示Spring Cloud Gateway如何修改请求和响应内容,请不要将此技术与实际后台业务耦合;

你不孤单,欣宸原创一路相伴

  1. Java系列
  2. Spring系列
  3. Docker系列
  4. kubernetes系列
  5. 数据库+中间件系列
  6. DevOps系列

欢迎关注公众号:程序员欣宸

微信搜索「程序员欣宸」,我是欣宸,期待与您一同畅游Java世界…
github.com/zq2599/blog…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

自己动手在阿里云部署 K8S 集群 买机器 第一步 第二步

发表于 2021-11-24

一步一步实现搭建 K8S 集群。

买机器

两台机器,内存不小于2G,CPU不少于2核。
建议购买按使用量计费的机器,一小时不到一块钱的成本。
我是在阿里云上购买的两台张家口的机器,操作系统为 CentOS 7.9,关闭了安全加固,具体配置如下:

image.png

购买成功之后,需要知道两台机器的内网 IP,假设分别为 IP 1.1.1.1 和 IP 2.2.2.2,我们将前者看成 k8s-master,后者看成 k8s-worker。

第一步

创建 /path/to/step_1_on_master.sh,内容如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
ini复制代码hostnamectl set-hostname k8s-master
cat >> /etc/hosts << EOF
1.1.1.1 k8s-master ---------------------------注意改 IP 变成你的 k8s-master 的 IP
2.2.2.2 k8s-worker ---------------------------注意改 IP 变成你的 k8s-worker 的 IP
EOF
swapoff -a
sed -i '/ swap / s/^\(.*\)$/#\1/g' /etc/fstab
cat > /etc/sysctl.d/k8s.conf << EOF
net.bridge.bridge-nf-call-ip6tables = 1
net.bridge.bridge-nf-call-iptables = 1
EOF
yum install -y ntpdate
wget https://mirrors.aliyun.com/docker-ce/linux/centos/docker-ce.repo -O /etc/yum.repos.d/docker-ce.repo
yum -y install docker-ce-18.06.1.ce-3.el7
cat > /etc/yum.repos.d/kubernetes.repo << EOF
[kubernetes]
name=Kubernetes
baseurl=https://mirrors.aliyun.com/kubernetes/yum/repos/kubernetes-el7-x86_64
enabled=1
gpgcheck=0
repo_gpgcheck=0
gpgkey=https://mirrors.aliyun.com/kubernetes/yum/doc/yum-key.gpg https://mirrors.aliyun.com/kubernetes/yum/doc/rpm-package-key.gpg
EOF
yum install -y kubelet kubeadm kubectl --disableexcludes=kubernetes

使用 ssh root@k8s-master-public-ip 'bash -s' < step_1_on_master.sh 将此脚本放到机器上执行,执行完了之后,重启机器。

然后复制一份,改名为 step_1_on_worker.sh,只需要把第一行改成

1
arduino复制代码hostnamectl set-hostname k8s-worker

即可,一样执行,一样重启。

两台机器都执行了脚本,并重启之后,开始第二步。

第二步

请先用 docker info |grep -i cgroup 确认 docker 的 cgroup driver 是 cgroupfs 还是 systemd,下文中的 cgroup-driver 要与之保持一致,假设为 cgroupfs。

在 k8s-master 上启动集群:(你可以用 ssh 远程执行,而且一定要把 1.1.1.1 改成你的 IP)

1
2
3
4
5
6
7
8
9
10
11
12
css复制代码systemctl stop firewalld
systemctl disable firewalld
sysctl --system
ntpdate time.windows.com
systemctl enable docker
systemctl start docker

echo "KUBELET_EXTRA_ARGS=--cgroup-driver=cgroupfs" > /etc/sysconfig/kubelet

systemctl enable kubelet

kubeadm init --apiserver-advertise-address=1.1.1.1 --image-repository registry.aliyuncs.com/google_containers --kubernetes-version v1.22.4 --service-cidr=10.1.0.0/16 --pod-network-cidr=10.244.0.0/16

如果没有报错,你会看到如下提示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
bash复制代码Your Kubernetes control-plane has initialized successfully!

To start using your cluster, you need to run the following as a regular user:

mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config

Alternatively, if you are the root user, you can run:

export KUBECONFIG=/etc/kubernetes/admin.conf

You should now deploy a pod network to the cluster.
Run "kubectl apply -f [podnetwork].yaml" with one of the options listed at:


Then you can join any number of worker nodes by running the following on each as root:

kubeadm join 1.1.1.1:6443 --token v2djav.rfpat4j8g1uf7uoy \
--discovery-token-ca-cert-hash sha256:65d640b3798fde3c1e9ef8f1abbf26c01b512d5d28947a6c7bc921e3dcb8f88b

复制提示中的代码执行,我执行的代码如下:

1
2
3
4
5
6
7
8
bash复制代码mkdir -p $HOME/.kube
sudo cp -i /etc/kubernetes/admin.conf $HOME/.kube/config
sudo chown $(id -u):$(id -g) $HOME/.kube/config


# Alternatively, if you are the root user, you can run:
# 下面一行可选,不清楚就不要执行
export KUBECONFIG=/etc/kubernetes/admin.conf

然后点开提示中的网址:kubernetes.io/docs/concep… 。
选择一个网络插件进行安装,我选择点开其中的 Flannel 链接,刚好它的 README.md 中有一句

For Kubernetes v1.17+ kubectl apply -f https://raw.githubusercontent.com/coreos/flannel/master/Documentation/kube-flannel.yml

我就复制执行了,执行结果如下:

1
2
3
4
5
6
7
bash复制代码Warning: policy/v1beta1 PodSecurityPolicy is deprecated in v1.21+, unavailable in v1.25+
podsecuritypolicy.policy/psp.flannel.unprivileged created
clusterrole.rbac.authorization.k8s.io/flannel created
clusterrolebinding.rbac.authorization.k8s.io/flannel created
serviceaccount/flannel created
configmap/kube-flannel-cfg created
daemonset.apps/kube-flannel-ds created

说明成功了!

提示中最后一部分有很关键的代码:

1
2
sql复制代码kubeadm join 1.1.1.1:6443 --token v2djav.rfpat4j8g1uf7uoy \
--discovery-token-ca-cert-hash sha256:65d640b3798fde3c1e9ef8f1abbf26c01b512d5d28947a6c7bc921e3dcb8f88b

我将这段代码叫做「join代码」,会在第三步中用到

接下来是第三步。

第三步

在 k8s-worker 上执行以下代码:

1
2
3
4
5
6
7
8
9
10
bash复制代码systemctl stop firewalld
systemctl disable firewalld
sysctl --system
ntpdate time.windows.com
systemctl enable docker
systemctl start docker

echo "KUBELET_EXTRA_ARGS=--cgroup-driver=cgroupfs" > /etc/sysconfig/kubelet

systemctl enable kubelet

然后执行「join代码」,会看到如下提示:

1
2
3
4
5
vbscript复制代码This node has joined the cluster:
* Certificate signing request was sent to apiserver and a response was received.
* The Kubelet was informed of the new secure connection details.

Run 'kubectl get nodes' on the control-plane to see this node join the cluster.

这说明 k8s-worker 已经加入了 k8s-master 创建的集群!

第四步

来到 k8s-master,运行 kubectl get nodes 查看节点:

1
2
3
css复制代码NAME         STATUS   ROLES                  AGE   VERSION
k8s-master Ready control-plane,master 19m v1.22.4
k8s-worker Ready <none> 12m v1.22.4

最终,两个机器都会处于 Ready 状态才对。继续执行 kubectl get pod -n kube-system,结果如下:

1
2
3
4
5
6
7
8
9
10
11
sql复制代码NAME                                 READY   STATUS    RESTARTS   AGE
coredns-7f6cbbb7b8-4hzvm 1/1 Running 0 20m
coredns-7f6cbbb7b8-hh5rm 1/1 Running 0 20m
etcd-k8s-master 1/1 Running 2 20m
kube-apiserver-k8s-master 1/1 Running 2 20m
kube-controller-manager-k8s-master 1/1 Running 2 20m
kube-flannel-ds-bz29s 1/1 Running 0 13m
kube-flannel-ds-ddk84 1/1 Running 0 16m
kube-proxy-sxcrt 1/1 Running 0 20m
kube-proxy-x5fvc 1/1 Running 0 13m
kube-scheduler-k8s-master 1/1 Running 2 20m

所有 pod 的 Ready 字段都是 1/1 才对。

第五步

我们尝试往机器中放入一个 Nginx:

1
2
3
ini复制代码docker pull nginx
kubectl create deployment nginx --image=nginx
kubectl expose deployment nginx --port=80 --type=NodePort

然后使用 kubectl get pods,svc 查看 service/nginx 的状态:

1
2
3
4
5
6
scss复制代码NAME                         READY   STATUS    RESTARTS   AGE
pod/nginx-6799fc88d8-56852 1/1 Running 0 7m24s

NAME TYPE CLUSTER-IP EXTERNAL-IP PORT(S) AGE
service/kubernetes ClusterIP 10.1.0.1 <none> 443/TCP 30m
service/nginx NodePort 10.1.254.34 <none> 80:30999/TCP 7m15s

其中 80:30999 表示 Nginx 的 80 端口被映射到 Node 的 30999 端口(你的端口应该跟我不同),我们可以使用 NodeIP:30999 来访问这个服务,还记得 1.1.1.1 和 2.2.2.2 这两个内网 IP 吗,试试访问看看:

1
2
arduino复制代码curl http://k8s-master:30999
curl http://k8s-worker:30999

用 curl 请求两个 IP ,都会得到 Nginx 的欢迎页面:

1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码<body>
<h1>Welcome to nginx!</h1>
<p>If you see this page, the nginx web server is successfully installed and
working. Further configuration is required.</p>

<p>For online documentation and support please refer to
<a href="http://nginx.org/">nginx.org</a>.<br/>
Commercial support is available at
<a href="http://nginx.com/">nginx.com</a>.</p>

<p><em>Thank you for using nginx.</em></p>
</body>
</html>

那么,能不能用公网 IP 访问这个页面呢?可以,前提是要在阿里云/腾讯云的安全策略里把 30999 端口加入白名单才行,比如我就做到了:

image.png

结语

今天,我们学会了使用两台机器创建 K8S 集群,并提供 Nginx 服务。

赶紧写一篇博客记录一下吧,别忘了删掉阿里云的按使用量计费的服务器。


参考文章:

  • 使用kubeadm快速部署一套K8S集群
  • kubeadm安装k8s完整教程

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Mysql事务实现及锁的关系&实践分析一条sql加锁流程

发表于 2021-11-24

本篇文章主要简单描述一下Mysql事务的实现方式,MVCC机制,以及分析在不同事务隔离级别下,一条sql会加什么样的锁,如表锁,行锁,共享锁,排他锁等

一、事务的四大特性(ACID)

1
2
3
4
css复制代码1.原子性(Atomicity)原子性是指事务包含的所有操作要么全部成功,要么全部失败回滚
2.一致性(Consistency)一致性是指事务必须使数据库从一个一致性状态变换到另一个一致性状态
3.隔离性(Isolation)多个并发事务之间要相互隔离。
4.持久性(Durability)持久性是指一个事务一旦被提交了,那么对数据库中的数据的改变就是永久性的

二、事务的隔离级别

SQL标准定义了以下几种事务隔离级别

1
2
3
4
5
6
7
复制代码READ_UNCOMMITTED 读未提交:最低级别,一个事务可以读取另一个未提交事务的数据。幻想读、不可重复读和脏读都允许。

READ_COMMITTED 读已提交:一个事务要等另一个事务提交后才能读取数据。允许幻想读、不可重复读,不允许脏读。

REPEATABLE_READ 可重复读:在开始读取数据(事务开启)时,不再允许修改操作。允许幻想读,不允许不可重复读和脏读。

SERIALIZABLE 可串行化:最高级别,在该级别下,事务串行化顺序执行。幻想读、不可重复读和脏读都不允许。

事务不同隔离级别引发的问题

1
2
3
复制代码1  脏读,脏读是指在一个事务处理过程里读取了另一个未提交的事务中的数据。
2 不可重复读,不可重复读是指在对于数据库中的某个数据,一个事务范围内多次查询却返回了不同的数据值,这是由于在查询间隔,被另一个事务修改并提交了。
3 幻读,幻读和不可重复读都是读取了另一条已经提交的事务(这点就脏读不同),所不同的是不可重复读查询的都是同一个数据项,而幻读针对的是一批数据整体。

事务隔离级别.jpg

MySQL数据库中,默认的隔离级别为Repeatable read (可重复读);

三、事务的实现方式

事务的实现可通过无锁的读和加锁两种方式。

首先要了解一下MVCC机制:

MVCC (Multiversion Concurrency Control),即多版本并发控制技术,它使得大部分支持行锁的事务引擎,不再单纯的使用行锁来进行数据库的并发控制,取而代之的是把数据库的行锁与行的多个版本结合起来,只需要很小的开销,就可以实现非锁定读,从而大大提高数据库系统的并发性能

在InnoDB中,会在每行数据后添加两个额外的隐藏的值来实现MVCC,这两个值一个记录这行数据何时被创建,另外一个记录这行数据何时过期(或者被删除)。 在实际操作中,存储的并不是时间,而是事务的版本号,每开启一个新事务,事务的版本号就会递增

行记录.jpg

我们来看看不同操作时MVCC机制的体现:

Insert:InnoDB在MVCC机制中,会在创建时间隐藏列更新为当前的事务ID,删除时间列是undefined

Select:MVCC中只会返回创建时间小于或等于当前事务ID且删除时间要么是undefined要么大于当前事务ID的记录,两个条件都满足的记录才是正确的

Delete:MVCC会在删除时间列更新当前事务ID,然后真正删除是由MySQL后台运行的独立线程去定时清理的

Update:MVCC中的Update被拆分成了Insert和Delete操作,显示插入一个更新后的记录(主键可能重复),然后标记原始记录,等待其被删除

关于MySQL对于MVCC的实现想了解可以参考这篇文章,这里不做过多的赘述

通过MVCC机制,虽然让数据变得可重复读,但我们读到的数据可能是历史数据,是不及时的数据,不是数据库当前的数据!这在一些对于数据的时效特别敏感的业务中,就很可能出问题。

对于这种读取历史数据的方式,我们叫它快照读 (snapshot read),而读取数据库当前版本数据的方式,叫当前读 (current read)

快照读和当前读

  • 快照读:简单的Select操作,不加锁(在隔离级别为序列化时,MySQL会取消快照读,所有的读操作也会加读锁),也不会被其他加锁阻塞
1
2
3
4
5
6
sql复制代码例:select * from table where ?;

其中:
Read Committed(读已提交)隔离级别:每次select都生成一个快照读。
Read Repeatable(可重复读)隔离级别:开启事务后第一个select语句才是快照读的地方,
而不是一开启事务就快照读,之后每次读取为同一个快照。
  • 当前读:特殊的读操作,Insert、Update、Delete操作,处理的都是当前的数据,需要加锁
1
2
3
4
5
6
sql复制代码例:
select * from table where ? lock in share mode;(共享读锁)
select * from table where ? for update(排他锁);
insert into table values (…)(插入意向锁);
update table set ? where ?(排他锁);
delete from table where ?(排他锁);

为什么将插入/更新/删除操作,都归为当前读?我们看下一个Update操作的具体流程。

当Update SQL被发给MySQL后,MySQL Server会根据where条件,读取第一条满足条件的记录,然后InnoDB引擎会将第一条记录返回,并加锁 (共享锁)。待MySQL Server收到这条加锁的记录之后,会再发起一个Update请求,申请升级为排他锁,更新这条记录。一条记录操作完成,再读取下一条记录,直至没有满足条件的记录为止。因此,Update操作内部,就包含了一个当前读。同理,Delete操作也一样。Insert操作会稍微有些不同,简单来说,就是Insert操作可能会触发Unique Key的冲突检查(插入意向锁),也会进行一个当前读。

四、MySQL中的锁

MySQL 锁分成两类:锁类型(lock_type)和锁模式(lock_mode),锁模式和锁类型通常结合使用

1.锁类型

描述的锁的粒度,也可以说是把锁具体加在什么地方,如行锁,表锁,间隙锁;

表锁: 操作对象是数据表。Mysql大多数锁策略都支持(常见mysql innodb),是系统开销最低但并发性最低的一个锁策略。事务对整个表加读锁,则其他事务可读不可写,若加写锁,则其他事务增删改都不行。

行级锁: 操作对象是数据表中的一行。是MVCC技术用的比较多的,但在MYISAM用不了,行级锁用mysql的储存引擎实现而不是mysql服务器。但行级锁对系统开销较大,处理高并发较好。

其中行锁细分为间隙锁(锁住两条记录的间隙,用于防止幻读),记录锁(普通的锁一条记录),Next-Key锁(间隙锁和记录锁的结合),插入意向GAP锁(插入时的锁,只与间隙锁冲突)

InnoDB存储引擎支持行锁和表锁,MyISAM只支持表锁

2.锁模式

描述的是到底加的是什么锁,譬如读锁或写锁。

读锁: 也叫共享锁、S锁,若事务T对数据对象A加上读锁,则事务T可以读A但不能修改A(修改需尝试升级为写锁),其他事务只能再对A加读锁,而不能加写锁,直到事务T释放A上的读锁。这保证了其他事务可以读A,但在T释放A上的S锁之前不能对A做任何修改。

写锁: 又称排他锁、X锁。若事务T对数据对象A加上写锁,事务T可以读A也可以修改A,其他事务不能再对A加任何锁(共享锁和排他锁都不行,快照读可以),直到事务T释放A上的锁。这保证了其他事务在T释放A上的锁之前不能再修改A。

各类锁之间的互斥关系:

  1. 行锁与表锁冲突
  2. 行锁与同一记录行锁冲突
  3. 行锁与间隙锁不冲突
  4. 间隙锁只与插入意向间隙锁冲突
  5. 间隙锁与间隙锁不冲突
  6. 共享锁与共享锁不冲突
  7. 共享锁与排他锁冲突
  8. 排他锁与排他锁冲突

这里是锁冲突指的是一个事务获取一种锁,另一个事务尝试获取锁时的冲突,同一个事务内部的锁不会冲突

五、实践分析任一条sql加锁类型

首先MyISAM存储引擎不支持事务,且只支持表锁,故一般用于主从集群中的从库(读库),此文不做分析,InnoDB存储引擎支持事务,且支持表锁,行锁,间隙锁,意向锁等

在InnoDB存储引擎下, RC(读已提交)隔离级别不支持间隙锁,RR(可重复读)隔离级别支持间隙锁,Next-key 锁,我们这里分析RR隔离级别,同样的SQL在RC隔离级别只需去除间隙锁即可。

前提:所有的锁都是在关闭自动提交事务并设置手动提交事务的情况下(@Transactional),没有手动事务每次更新是独立一个事务,几乎不会发生冲突

在具体分析之前,先说一下结论:在MySQL中,InnoDB存储引擎下,RR(可重复读)隔离级别情况下,加锁种类最多,也是安装MySQL后默认的配置,故以此场景下分析,其次加锁的粒度需要找到对应的记录才能加行锁,故需要使用索引,在没有索引或有索引但是没有生效的情况下会锁住整张表(即表锁),引发线上灾难,严禁UPDATE或DELETE数据时WHERE条件中未使用索引。

关于查看一条sql是否使用索引可以使用Explain关键字查看,使用教程参考链接

场景一:select * from table where id = 1;

不加锁,简单的查询语句是快照读,自身不加锁,也不会被其他加锁的事务阻塞。

场景二:select * from table where id = 1 lock in share mode;id是主键

对id为1的这条记录加共享锁,其他事务可也加共享锁,不可加排他锁。

场景三:select * from table where id = 1 for update;id是主键

先对id为1的这条记录加共享锁,然后尝试升级为排他锁,若升级为排他锁时被其他事务加了共享锁则被阻塞,加锁成功后其他事务不可加共享锁,不可加排他锁。

场景四:update user set username = 2 where id = 1;ID是主键

与场景三相同,先加共享锁,后升级为排他锁,delete from 同理。

场景五:update user set username = 2 where name = 1;name不是主键,是唯一索引

先通过name=1在唯一索引上加锁,后找到name=1的主键 ID,再去聚簇索引(即主键索引)上加锁。

关于MySQL聚族索引,二级索引的区别可参考此文档

场景二到场景五中:若通过主键或唯一索引查找这条记录不存在时,则对此位置加间隙锁,此时插入对应条件的的数据会尝试加插入意向GAP锁会被阻塞,这防止了幻读的发生

场景六:update user set name = 2 where username = 5;username为普通索引且只有一条记录

先在普通索引上找到对应的记录,加排他锁,并在其索引附近间隙加GAP锁(间隙锁),然后找到对应的主键索引加排他锁,其中间隙锁的范围需实际分析,若数据库数据为,username: 2,3,5, 5, 8,9,则间隙锁范围为[3,8], 左闭右闭。

场景七:update user set name = 2 where username > 5;username为普通索引且扫描出多条记录

与场景六的区别在于,范围扫描,当范围扫描结果很多时,mysql索引优化器会放弃使用索引进行全表扫描,若数量较少会使用索引,需用Explain关键字实测,若使用了索引,则会扫描符合条件的记录,扫描到第一条后执行场景六的操作,之后继续扫描下一条记录执行相同操作直到扫描到不满足条件的记录停止加锁。

场景八:update user set name = 2 where username = 5;username没有索引

若username没有索引或执行计划中未使用索引,则进行全表扫描,加表锁以及全表间隙锁,其他事务不能更新数据,不能插入数据,不能删除数据,只能进行快照读(无锁操作),直至此事务执行完毕释放锁。

总结:本篇文章描述了Mysql事务基本概念,MVCC机制,各种类型锁,以及实战分析一条sql会加什么样的锁,可供使用者实际使用过程对自己写的代码有更深入的理解,并根据加锁的方式解决相应的问题,求点赞关注

版权声明:本文为博主原创文章,转载请附上原文出处链接

原文链接:juejin.cn/post/703379…

参考文章:
www.cnblogs.com/zhaoyl/p/41…

blog.csdn.net/Sugar_Rainb…

tech.meituan.com/2014/08/20/…

www.cnblogs.com/chenpingzha…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…213214215…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%