开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

基于 Flink 搭建实时平台

发表于 2021-11-24

「这是我参与11月更文挑战的第24天,活动详情查看:2021最后一次更文挑战」。

一、前言

在大数据时代,金融科技公司通常借助消费数据来综合评估用户的信用和还款能力。这个过程中,某些中介机构会搜集大量的号并进行“养号”工作,即在一年周期里让这些号形成正常的消费、通讯记录,目的是将这些号“培养”得非常健康,然后卖给有欺诈意向的用户。这类用户通过网上信息提交审核,骗到贷款后就“销声匿迹”了。

那么,如何更快速地预防或甄别可能的欺诈行为?如何从超大规模、高并发、多维度的数据中实现在线实时反欺诈?这些都是金融科技公司当下面临的主要难题。

针对这些问题,InfoQ 专访玖富集团,揭秘基于 Flink 的超大规模在线实时反欺诈技术是如何快速处理海量数据并打造良好的用户体验。

二、在线实时反欺诈的难点和痛点

有三类常见的金融欺诈场景:

  • 一是材料伪造。这是早年需要提交纸质材料时期常见的欺诈;
  • 二是“养号”,常见于中介机构,通过收取服务费来维护大量号的健康状态,卖给有欺诈意向的用户进行贷款申请;
  • 三是来自于专业黑客的威胁,他们通过寻找系统、流程的漏洞等方式,对账号安全构成攻击。

金融科技因其虚拟性特征,主要风险集中在两方面:一是欺诈风险,二是信用风险,因此核心的风险评估流程就是反欺诈和信用评估。对于反欺诈而言,信息核实、高危人群拦截和实时计算、识别、决策是其核心风控手段。而对于信用风险的评估,需要内外兼修。

玖富集团对用户的信用评级主要由玖富集团自主研发的火眼评分 - 彩虹评级系统动态评估用户信用情况,覆盖玖富集团 C 端全线借贷服务,自上线以来表现稳定,区分效果明显。外部也参考了腾讯、阿里等评分作为参考。

目前,在线实时反欺诈会面临各类痛点,在玖富集团业务场景中,主要痛点集中在以下三方面:

  1. 低延时要求。越是大量数据需要计算,所需时间越长。在网贷盛行的年代,经常流传的一句口号是“三分钟授信,一分钟放款”,甚至有的公司打出“一分钟授信,半分钟放款”。但是在大数据场景中,数据分析与处理对低延时的需求越来越高。
  2. 超大规模实时计算要求。大数据场景中,需要对大规模数据做到实时计算,玖富集团内部代号为“伏羲”的 Flink 计算平台每天要在接近 510TB 的数据集上做快速的检索和计算,用户的行为改变会导致数据发生变化,进而影响决策。因此对超大规模数据的实时计算需求越来越高,确保用户在出现欺诈行为时能够及时中止交易。
  3. 多维度、高并发要求。随着同一业务场景下用户规模的扩增,用户产生的数据也形成爆发性增长。在金融场景下,亟需一套完整系统可以实现按照数据各个维度分析得出风险评估报告,根据用户特性挖掘用户潜在需求等;系统获取用户产生数据最简单有效的方法就是流水式数据,单个数据包里包含了发生时间点的各个维度的所有信息量,这种场景的特性之一就是数据高并发,因此对时效要求比较高的数据分析来说是一个非常巨大的挑战。

针对目前在线实时反欺诈的痛点,玖富集团采用基于 Flink 的超大规模在线实时反欺诈系统,在提升用户体验的同时,也降低了商业损失。

三、基于 Flink 的超大规模在线实时反欺诈系统

1、为什么选择 Flink?

Flink 开源项目是近一两年大数据处理领域冉冉升起的一颗新星。虽然是后起之秀,但在国内许多大型互联网企业的工程实践中均有被应用,如阿里、美团、京东等。那么,在玖富的大数据技术体系迭代中,为何会选用 Flink 这套流数据处理引擎呢?

从技术语言角度:Spark 的技术语言主要是 JAVA 和 Scala,尤其是对 Scala 语言有一定要求。而 Flink 主要是基于 JAVA,编程语言更成熟,通用度更高,修改代码也更容易。所以从语言层面综合来看,Flink 相对较好。Spark、Storm、Flink 技术选型对比如下:

图片

从时延和吞吐量的角度:Flink 是纯粹的流式设计,流式大数据技术的计算是逻辑先行,即先定义计算逻辑,当数据流过时,实时计算并保留计算结果;当需要使用数据时,直接调用计算结果即可,无需再次计算。

流式大数据技术可广泛应用于对数据处理时效性要求较高的场景,如实时交易反欺诈。Flink 的时延和吞吐量方面的性能表现较好,能满足玖富集团对超大规模数据流在线实时计算的要求。

相比之下,Spark 主要是小批量处理模式,无法满足反欺诈系统实时处理大规模、多维度、高并发的数据流的要求。Storm 虽然是基于流处理,但与 Flink 的性能数据相比,Flink 吞吐量约为 Storm 的 3~5 倍,Flink 在满吞吐时的延迟约为 Storm 的一半。综合来看,Flink 框架本身性能优于 Storm。

从与现有生态体系结合的角度:Flink 与超大型计算和存储(HBase)的结合比 Spark 和 Storm 要好很多,同时接口也更友好。HBase 是整个系统预查功能的缓存基础,预查功能是降低系统 p99 延迟最重要的技术优化。

总的来说,Flink 是一个设计良好的框架,它不但功能强大,而且性能出色。此外它还有一些比较好的设计,比如的内存管理和流控。但是,由于 Flink 目前成熟度较低,还存在不少问题,比如 SQL 支持比较初级,无法像 Storm 一样在不停止任务的情况下动态调整资源;不能像 Spark 一样提供很好的 Streaming 和 Static Data 的交互操作等。

2、超大规模在线实时反欺诈系统架构

线上信贷的基本流程是:由用户通过 App 发起需求,App 会要求用户填写与授权相关的信息,主要目的是评估用户的信用额度。之后用户数据会进入后台数据系统进行反欺诈和信用的评估,审核通过,用户会收到信息,账户额度开通。基于 Flink 的超大规模在线实时反欺诈系统架构如下:

图片

对于该套在线实时反欺诈系统的未来规划,玖富第一步会针对 Flink 技术本身,结合玖富在技术、场景等方面的积累,把基于 Flink 的超大规模在线实时反欺诈系统打造成一款数据产品,使其具备向外输出数据资产和数据处理的能力。

其次,玖富技术团队也会持续投入人力在系统的功能优化上,并把它做成一个开源的产品推向社区,让更多开发人员可以直接使用这个系统。

最后,通过技术的优化进一步提升整个系统的性能,目前该系统的 p99 延迟是 100ms,未来玖富的下一项目标是实现 p99 延迟是 50ms。)

玖富基于 Flink 的超大规模在线实时反欺诈系统的架构分为两部分:数据部分和决策部分。整个系统的运作相当于一个工作流,用户的数据信息以流的形式由一个节点传到下一个节点,在流转过程中会产生大量的决策信息,根据条件做出筛选和判断,并把判断结果快速推行到下一个节点,从而实时判断用户的数据情况,进而决定是否放款给用户。

数据部分需要最快速度的加工处理,整个数据处理由四部分完成。

第一部分是把数据从前端最快速地传递到后端。基于 Flink 的超大规模在线实时反欺诈系统首先要把数据通路加宽,允许更多信息同时涌入数据处理中。

第二部分是大型的列式存储集群,主要由 HBase 实现。HBase 是运行在 Hadoop 上的 NoSQL 数据库,它是一个分布式和可扩展的大数据仓库,能够利用 HDFS 的分布式处理模式,并从 Hadoop 的 MapReduce 程序模型中获益,最关键的是可以提供高并发读写操作的支持。HBase 是整个架构最基础的保障,当大量数据涌入时能实现快速存储,降低写入和读取数据过程对系统架构的过度依赖。

HBase 里有大量的索引,如一级索引、二级索引等,对 HBase 的读写缓存进行定制化改造,保证预查功能的实现。通过 App 或其他渠道获取用户的行为数据信息,进而推测用户的意愿,然后系统开始做预查询,把用户的相关信息放到缓存里,这样当用户在前端触发操作时,后端直接从缓存里调用数据开展计算,极大地提升了数据处理速度。在 HBase 缓存里,基本能够实现 99% 的数据信息被命中,这依赖于系统强大的用户感知能力。

第三部分就是计算引擎,主要由 Flink 完成。计算引擎分为两部分,一个是过滤引擎,主要是在大规模、高并发数据流中对用户信息做不同维度的定制化过滤,目的是降低整个数据计算的量级。另一个是函数引擎,通过高度抽象的方法,定制化一些性能非常好的函数,并把这些函数加载到引擎中去,可以避免开发人员自行修改代码。过滤引擎和函数引擎的结合,使整个用户的数据量级大幅降低,再结合一些高效的代码,进一步降低延迟。

Flink 的核心是基于流执行引擎,Flink 提供了诸多更高抽象层的 API 以方便用户编写分布式任务,常用的三类 API 如下:

DataSet API,对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便的采用 Flink 提供的各种操作符对分布式数据集进行各种操作。

DataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便的采用 Flink 提供的各种操作符对分布式数据流进行各种操作。

Table API,对结构化数据进行查询操作,将结构化数据抽象成关系表,并通过 Flink 提供的类 SQL 的 DSL 对关系表进行各种查询操作。

玖富根据自身业务特点,需要对超大规模在线实时数据流进行快速处理,因此采用 DataStream API,追求更低的延迟。

第四部分是算力。算力依赖于 Hadoop 集群,通过 YARN 实现对整个资源的管理,横向来说具有很好的可扩展性。YARN 的基本思想是将资源管理和作业调度 / 监控的功能分解为单独的守护进程,包括两个部分,一个是全局的资源调度(RM),另一个是针对每个应用程序的调度(AM)。YARN 使得 Hadoop 不再局限于仅支持 MapReduce 一种计算模型,而是可无限融入多种计算框架,且对这些框架进行统一管理和调度。YARN 架构如下:

图片

3、系统架构迭代

基于 Flink 的超大规模在线实时反欺诈系统,在玖富集团内部经历过一次比较重大的架构迭代。玖富集团最初是以 1s 内快速得出风控结果为目标,但是用户体验不够快,于是整个系统进行了一次技术升级,增加了预查技术。预查技术包括检索和计算两部分,其核心依赖于 Flink 强大的并发能力。

在大量数据中做快速预查,利用 Flink 并发能力进行数据覆盖,最后在缓存里命中结果,从而不必重新进行网络 I/O 查询、等待返回的过程。经过部分计算框架升级,最终系统实现了 p99 延迟由 1s 降为 100ms 的优化。

4、AI 技术的应用

在大数据时代,数据的质量直接影响大数据分析处理方法的效果,也影响决策过程。通过分析海量数据,可以从中发现数据集中隐含的模式和规律。但异常数据会对分析过程产生重大干扰。在基于 Flink 的超大规模在线实时反欺诈系统中,利用机器学习进行异常点检测。异常点检测(又称离群点检测)是找出其行为不同于预期对象的一个检测过程。这些对象被称为异常点或者离群点。

在大数据中的异常数据存在如下特点:与正常数据的表现有明显的差异;其产生机制与正常数据不同,可能为未知方式;数据维度较高。异常点检测在信用卡欺诈检测中应用较多,当用户数量非常多时,其中一些低信用值的用户需要被识别出来,利用机器学习进行异常值检测,把信用值低的用户筛选出来,再进行人工确认。

在基于 Flink 的超大规模在线实时反欺诈系统中也应用了 AI 知识图谱技术。社会是由大大小小的群体组成,同理用户也有这样的群体特点,用数据来构建这些群体的关系,通过图的分割与检索这两大类算法深入挖掘数据价值。在实际应用中,如果一个用户的信用非常差,已经被列入黑名单,那么与他有关系的用户都需要重点排查。根据用户的行为将用户进行分类,即聚类。各式各样的聚类算法很多,然后根据用户的信息进行图的分割,确定每个人的风险系数,也可以通过一些手段打通优质圈层的通路,引导优质圈层进行信息交互。

四、基于 Flink 的超大规模在线实时反欺诈系统的未来规划

对于该套在线实时反欺诈系统的未来规划,玖富第一步会针对 Flink 技术本身,结合玖富在技术、场景等方面的积累,把基于 Flink 的超大规模在线实时反欺诈系统打造成一款数据产品,使其具备向外输出数据资产和数据处理的能力。

其次,玖富技术团队也会持续投入人力在系统的功能优化上,并把它做成一个开源的产品推向社区,让更多开发人员可以直接使用这个系统。

最后,通过技术的优化进一步提升整个系统的性能,目前该系统的 p99 延迟是 100ms,未来的下一项目标是实现 p99 延迟是 50ms。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

解决 Serverless 落地困难的关键,是给开发者足够的

发表于 2021-11-24

简介: 越来越多的云产品都会向全托管、Serverless 形态演进。当云的产品体系 Serverless 化达到一个临界值,通过函数计算这样的 Serverless 计算服务结合其他 Serverless 形态的云服务,能够完整的实现整个应用时,Serverless 就会变成了一个确定的技术趋势,并越来越流行。

作者 | 不瞋

刚刚过去的 2021 年天猫双 11,阿里云函数计算与阿里巴巴运维体系全面实现标准化对接,打通研发的最后一公里,首次实现了业务全链路“ FaaS + BaaS ”的 Serverless 体系化研发,覆盖淘特、淘系、阿里妈妈、1688、高德、飞猪等业务场景,支撑场景数量同比增加 2 倍,峰值流量总数同比增加 3 倍,实现了百万 QPS 的突破,人效提升 40%。

前段时间,我与 InfoQ 大咖说合作了一期直播,跟开发者们聊了聊我眼中的 Serverless。大家对于 Serverless 热情很高,但是顾虑仍然存在,这也是我写作本文的原因。作为这一技术浪潮的见证者,我想跟大家一起思考 Serverless 诞生的原因,阿里云 Serverless 技术和产品的演进历程,以及我对 Serverless 未来趋势的判断。

01 云产品体系的 Serverless 化推动技术演进

虽然 Serverless 对很多人来说,仍然比较新鲜,但其实 Serverless 这种形态早已有之。

2010 年我刚加入阿里云,参与飞天操作系统研发,飞天操作系统最初是通过管理数千台的机器来执行大数据处理的。用户的编程界面是 MapReduce 任务,通过 SQL 语句等来处理海量数据,这就是早期的 Serverless 形态。

阿里云的第一个云服务对象存储 OSS,亚马逊云科技的第一个云服务 S3,它们其实也都是 Serverless 形态的存储服务。用户不需要关心数据如何被分片存储到不同的服务器上来实现负载均衡,也不需要考虑如何做到在服务器宕机或者交换机故障时,保证数据的高可靠性和高可用性,他们只需要用简单的 API 就可以实现海量数据的可靠存储。他们都屏蔽了 Server 的复杂度,让用户有一个非常简洁的 Serverless 体验,这些都是 Serverless 形态。

2012 年,Serverless 概念被首次提出,到亚马逊云科技正式商用 Lambda,Serverless 开始流行并逐渐走红。近 10 年时间,这样的演进过程并不偶然、也非一蹴而就,反而是带着宿命般的必然性,其背后原因是云的产品体系一直都在向 Serverless 化演进。

无论是阿里云、Azure,还是亚马逊云科技,绝大多数新产品都是全托管的 Serverless 形态。时至今日,公有云的用户越来越习惯使用全托管的服务,除了省力以外,对很多用户来说,最重要的是能更高效的解决业务问题。如果全托管的服务能带来更好的性能、更好的稳定性、更少的运维代价,为什么不用呢?

按照这些逻辑,越来越多的云产品都会向全托管、Serverless 形态演进。当云的产品体系 Serverless 化达到一个临界值,通过函数计算这样的 Serverless 计算服务结合其他 Serverless 形态的云服务,能够完整的实现整个应用时,Serverless 就会变成了一个确定的技术趋势,并越来越流行。

02 Serverless 走出幻想破灭的低谷

2017 到 2018 年,我们都有体感 Serverless 热度达到了一个高峰,但和很多新兴技术一样,从概念大讨论到企业落地应用,都会经历幻想破灭的低谷。从 Serverless 这十年的发展来看,无论是学术界还是工业界,都认为这是一项颠覆式的技术,在提升研发效率、资源效率上有着巨大的潜力。但作为一个新概念和新的计算形态,Serverless 最主要的挑战是对开发者心智的改变,在工具链、编程模型、应用架构上,都需要开发者转换思路。

今天,这些问题正在被快速的、持续的解决。

Serverless 正处于稳步上升期,我们能看到业界最主要的云服务商在不断推出不同形态的 Serverless 计算服务,比如 Google Cloud Run,亚马逊云科技的 App Runner,阿里云的 Serverless 应用引擎 SAE。另外,阿里云的函数计算这类最经典的 Serverless 计算服务,也正变得越来越通用,对应用的侵入越来越少。

无论在阿里巴巴上还是在阿里云上,开发者对 Serverless 的认识越来越客观、务实,并在越来越多的场景中引入 Serverless 技术和相关的工具链,驱动 Serverless 生态愈加成熟。

03 给开发者安全感,是最重要的事

我们经历了一个从 Serverless 非常受关注到落地困难,再到 Serverless 被广泛使用的全过程。这个过程中也确实遇到了不少挑战,解决 Serverless 落地困难的关键,在于给开发者安全感。对开发者来说,Serverless 把更多的技术层面的东西交给了云厂商去做,所以怎么给他们安全感,让他们无负担使用是非常关键的,也是他们做技术选型时最关注的点。

开发者这种安全感的担忧主要来自于两方面:

  • 云厂商锁定问题:Serverless 让应用更深度的依赖于云服务商的能力,如何避免 vendor lock-in,从一个云迁移到另一个云,会有哪些障碍?
  • 控制黑盒问题:云厂商接管了应用的运行平台,怎么能提供给用户控制力?比如用户怎么能看到足够丰富的指标来优化应用或者掌控应用运行的情况?云平台出问题了怎么办?出现问题时,用户有什么手段能快速查明问题,恢复服务?

对于供应商锁定的担忧。阿里云是以公有云、阿里集团、开源三位一体的方式打造 Serverless 产品,坚定的拥抱开源开放。阿里云函数计算的 Runtime 运行时采用无侵入的标准的 http-server 协议,用户使用 Golang 或者 PHP 写的 Web server 放上来就可以跟 Serverless 平台去交互。

另外函数计算的可观测能力基于开源开放的 OpenTelemetry、OpenTracing 等标准。阿里云推出的 Serverless Devs 工具链也是开源开放的,提供了多个云厂商的 Serverless 应用部署的能力。承载阿里云事件生态的 EventBridge 也是采用 CNCF CloudEvents 开放标准。这些都是希望开发者能够通过开源开放的方式来使用产品,未来,我们会积极推进 Serverless 领域的标准。

对于控制黑盒问题,最主要的是要做好产品设计的平衡,既能给开发者控制力,又能减小开发者的复杂度。阿里云函数计算把给开发者安全感看作最重要的事情,我们在可观测性上是业界首个,也是目前唯一一个透出了实例级别的指标,让用户能更容易调优 Serverless 应用。我们透出了非常细粒度的资源计量数据,让用户能更容易判断费用是否符合预期。

在未来,我们会将系统事件和状态以合适的方式透出给开发者,让他们能更容易预期系统的行为。我们也会在问题诊断等方面开放更多的能力,去贴合开发者已有的开发习惯,让他们能更平滑的使用 Serverless。

04 正在全面落地的 Serverless

在应用场景上来看,Serverless 不再仅仅是小程序,还有电商大促、音视频转码、AI 算法服务、游戏应用包分发、文件实时处理、物联网数据处理、微服务等场景。Serverless 正持续与容器、微服务等生态融合,降低开发者使用 Serverless 技术的门槛,反过来也将促进传统应用的云原生化。

在企业赋能方面,尤其是疫情之后,能够看到用户对 Serverless 的认知变深,在很多场景下,切换到 Serverless 架构确实能够为用户带来明显的收益,用户逐渐认可这项技术。

1、Serverless 全链路、全场景覆盖天猫双 11

2020 年天猫双 11,阿里云实现了国内首例 Serverless 在核心业务场景下的大规模落地,扛住了全球最大规模的流量洪峰,创造了 Serverless 落地应用的里程碑。

今年天猫双 11,阿里云 Serverless 支撑业务场景更多,范围更广,阿里云函数计算与集团内的运维体系全面实现标准化对接,打通研发的最后一公里,首次实现了业务全链路“ FaaS + BaaS ”的 Serverless 体系化研发,覆盖淘特、淘系、阿里妈妈、1688、高德、飞猪等业务场景,支撑场景数量同比增加 2 倍,峰值流量总数同比增加 3 倍,实现了百万 QPS 的突破,人效提升 40%。

2、网易云音乐音视频算法的 Serverless 探索

网易云音乐产品背后,实际有非常多的算法服务支撑,比如多种码率的音频转码、听歌识曲中应用的音频指纹生成和识别、副歌检测、小语种音译歌词等等。这些任务的资源需求和执行时间变化很大,需要使用 C++、Python 等多种语言实现,对算力的弹性要求非常大。

原先网易是在自己的数据中心搭建这样一个算法服务平台,落地了 60+ 音视频算法,对接 100+ 的业务场景。但随着业务增长,基础设施管理的负担越来越大。虽然通过了很多方式去简化了内部业务场景、算法等的对接,但越来越多夹杂存量、增量处理的算法;不同流量的业务场景规模,以及不同业务场景可能会复用同一类算法的,导致在业务上的时间越来越少。

比如上线一种新算法,首先要对超过 6000 万首存量歌曲进行处理,这要求平台在短时间内弹出大量算力,可靠的执行任务,同时提供完善的应用、实例等多维度的监控信息。这些需求是非常匹配函数计算的。网易在函数计算上高峰期一天处理超过 2000 万个任务,算法应用到业务 10 倍速的提升,稀疏调用的算法成本大幅缩减。

网易这个案例最有意思的点,在于他们在应用层融合了自有机房和公有云上的服务。以往大家谈到 Serverless,觉得它很难在混合云的场景下应用。网易的案例证明了专有云和公有云融合不是只有资源纳管这一种方式,在应用层考虑融合方案,有时候效果会更好。

网易云音乐音视频算法的 Serverless 探索之路

3、南瓜电影 7 天全面 Serverless 化

另一个比较有意思的案例是南瓜视频使用 SAE 实现传统微服务应用的零迁移改造,只用了一周就完整迁移到 SAE 平台。

南瓜原有的微服务平台面临几个挑战:

  • 运维成本高。要管理基础设施,要规划网络,要升级系统等等,大量的时间花在这些低价值的工作上,而不是专注于业务的发展;
  • 机器难以规划容量。热点电影经常造成访问热点,临时扩容操作复杂、慢。南瓜经历了业务的爆发式增长,因为一部热映电影,1 小时新增 80 万注册用户,比正常流量高了 80 倍,系统很快就崩了。

这次经历促使南瓜进行了技术升级。用户也对比了 K8s 和 SAE,最后认为要玩转 K8s ,需要组建好专业团队,代价不小。SAE 的产品形态非常有亲和力,南瓜只花了很短的时间就迁移到 SAE,现在所有的应用都运行在 SAE 上。

南瓜电影 7 天内全面 Serverless 化实践

05 Serverless 不是未来,是现在

**云的发展一定是往更高的抽象层面发展,让用户研发效率更高更敏捷,资源使用更高效。**因此云的产品体系一定是 Serverless 化,也就是越来越多的云服务是全托管、Serverless 的形态。如果我们把云看作一台计算机,那么 IaaS 层是硬件,以 K8s 为代表的容器编排系统是操作系统,而 Serverless 计算则是应用的运行时。所以 Serverless 是云的未来,这实际上不算是对未来的预测,而是正在发生的事实。

接下来,Serverless 的产品形态会变得多样,早些年大家都把 Lambda 这样形态的产品等同于 Serverless 计算,这几年我们看到 Google Cloud Run,亚马逊云科技 App Runner 等针对 Web 应用场景的 Serverless 服务,阿里云函数计算也在不断演进,比如支持容器镜像、更少的运行限制等等。而且针对传统微服务等存量市场,我们还推出了 SAE 这样形态的服务,让用户能够非常方便的把存量应用迁移上来,享受 Serverless 的红利。

Serverless 底层技术发展上也有一些值得关注的趋势。包括在资源调度上更加智能,因为 Serverless 的计算模式给平台提供了更多的负载信息,使得平台有机会通过数据驱动的方式在资源调度、流量路由等方面做得更加精准。另外,Serverless 有望支持更多类型的硬件,包括 ARM 类型的 CPU、GPU 或者 FPGA 等异构硬件,给用户提供更有性价比的计算类型。

谈未来,就不免说到对 Serverless 终点的判断,我想云就像一台计算机,在过去的 10 年,云主要是通过 Cloud Hosting 的模式,在兼容原有编程模式的同时,为开发者提供了海量的算力。但这种模式有点像使用汇编语言编程,开发者需要处理相当多的细节。微软预测未来 5 年将新增 5 亿个应用,超过过去 40 年的总和,这是传统的开发模式难以支撑的。

所以我们看到现代应用、低代码等理念开始流行。下一个 10 年,云的编程模型将迎来巨大的创新。过去 PC、移动互联网,都从一开始的硬件创新,发展到形成自己的原生编程模型,形成完整的、繁荣的产业生态,云也正在经历这样的过程。最终,云会有属于自己的、原生的、高效的编程模型和应用研发模式。而 Serverless 在云的生态中,扮演应用运行时的角色,是承载应用运行的基础设施。

作者简介:

不瞋:阿里云 Serverless 产品研发负责人,致力于构建下一代弹性、高可用的无服务器计算平台。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ADG搭建系列之 11G RAC to Single DAT

发表于 2021-11-24

「这是我参与11月更文挑战的第24天,活动详情查看:2021最后一次更文挑战」

一、环境准备

主机名 ip DB Version db_name db_unique_name
主库节点一 lucifer01 10.211.55.100 11.2.0.4 orcl orcl
主库节点二 lucifer02 10.211.55.101 11.2.0.4 orcl orcl
备库 luciferdg 10.211.55.110 11.2.0.4 orcl orcldg

Notes:

1、db_unique_name主备库不能相同。

2、db_name主备库需保持一致。

3、主备库DB版本需保持一致。

二、搭建过程

1、Oracle软件安装

主库一键安装:

1
2
3
bash复制代码./AllRacOracleSilent.sh -i 10.211.55.101 -n lucifer02 -o orcl -d 11g -rac Y -n1 lucifer01 -n2 lucifer02 -pb1 10.211.55.100 -pb2 10.211.55.101 -vi1 10.211.55.102 -vi2 10.211.55.103 -pi1 10.10.1.1 -pi2 10.10.1.2 -si 10.211.55.105 -sn lucifer-scan -cn lucifer-cluster -dd /dev/asm_data -od /dev/asm_ocr -puf eth0 -prf eth1

./AllRacOracleSilent.sh -i 10.211.55.100 -n lucifer01 -o orcl -d 11g -rac Y -n1 lucifer01 -n2 lucifer02 -pb1 10.211.55.100 -pb2 10.211.55.101 -vi1 10.211.55.102 -vi2 10.211.55.103 -pi1 10.10.1.1 -pi2 10.10.1.2 -si 10.211.55.105 -sn lucifer-scan -cn lucifer-cluster -dd /dev/asm_data -od /dev/asm_ocr -puf eth0 -prf eth1

备库一键安装:(备库仅安装ORACLE软件,不建库)

1
bash复制代码./AllRacOracleSilent.sh -i 10.211.55.110 -n luciferdg -o orcl -d 11g -w Y

一键安装脚本可参考:ORACLE一键安装单机11G/12C/18C/19C并建库脚本

2、环境配置

a.配置hosts文件

主库:

1
2
3
4
5
6
7
bash复制代码##节点一
#dg
10.211.55.110 luciferdg

##节点二
#dg
10.211.55.110 luciferdg

备库:

1
2
3
4
bash复制代码##dg
10.211.55.100 lucifer01
10.211.55.101 lucifer02
10.211.55.105 lucifer-scan

b.配置静态监听和TNS

主库+备库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
bash复制代码##listener.ora
##节点一
su - oracle -c "cat <<EOF >> /u01/app/oracle/product/11.2.0/db/network/admin/listener.ora
##FOR DG BEGIN
SID_LIST_LISTENER =
(SID_LIST =
(SID_DESC =
(GLOBAL_DBNAME = orcl)
(ORACLE_HOME = /u01/app/oracle/product/11.2.0/db)
(SID_NAME = orcl01)
)
)
##FOR DG END
EOF"

##节点二
su - oracle -c "cat <<EOF >> /u01/app/oracle/product/11.2.0/db/network/admin/listener.ora
##FOR DG BEGIN
SID_LIST_LISTENER =
(SID_LIST =
(SID_DESC =
(GLOBAL_DBNAME = orcl)
(ORACLE_HOME = /u01/app/oracle/product/11.2.0/db)
(SID_NAME = orcl02)
)
)
##FOR DG END
EOF"

##DG
su - oracle -c "cat <<EOF >> /u01/app/oracle/product/11.2.0/db/network/admin/listener.ora
##FOR DG BEGIN
SID_LIST_LISTENER =
(SID_LIST =
(SID_DESC =
(GLOBAL_DBNAME = orcl)
(ORACLE_HOME = /u01/app/oracle/product/11.2.0/db)
(SID_NAME = orcl)
)
)
##FOR DG END
EOF"

##重启监听
##rac
su - grid -c "lsnrctl stop"
su - grid -c "lsnrctl start"

##dg
su - oracle -c "lsnrctl stop"
su - oracle -c "lsnrctl start"

##tnsnames.ora
su - oracle -c "cat <<EOF >> /u01/app/oracle/product/11.2.0/db/network/admin/tnsnames.ora
##FOR DG BEGIN
ORCL =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = lucifer-scan)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = orcl)
)
)

ORCL1 =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = lucifer01)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = orcl)
)
)

ORCL2 =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = lucifer02)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = orcl)
)
)
ORCLDG =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = luciferdg)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = orcl)
)
)
##FOR DG BEGIN
EOF"

c.主库配置参数

1
2
3
4
5
6
7
8
9
sql复制代码--开启归档模式
srvctl stop database -d orcl -o immediate
srvctl start instance -d orcl -i orcl1 -o mount
alter database archivelog;
alter database open;
srvctl start instance -d orcl -i oorcl2

--开启强制日志模式
alter database force logging;

d.复制参数文件和密码文件至备库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
bash复制代码##复制参数文件至备库(备库执行),要在oracle用户下复制
su - oracle
scp oracle@lucifer01:/tmp/initorcldg.ora /tmp

##备库创建目录
mkdir -p /u01/app/oracle/admin/orcl/adump
su - oracle -c "mkdir -p /oradata/orcl/datafile"
su - oracle -c "mkdir -p /oradata/orcl/onlinelog"
su - oracle -c "mkdir -p /oradata/orcl/tempfile"
mkdir -p /u01/app/oracle/fast_recovery_area/orcl
chown -R oracle:oinstall /u01/app/oracle/admin/orcl/adump
chown -R oracle:oinstall /u01/app/oracle/fast_recovery_area

##备库修改参数文件
*._optimizer_cartesian_enabled=FALSE
*.audit_file_dest='/u01/app/oracle/admin/orcl/adump'
*.audit_trail='NONE'
*.compatible='11.2.0.4.0'
*.db_block_size=8192
*.control_files='/oradata/orcl/control01.ctl','/u01/app/oracle/fast_recovery_area/orcl/control02.ctl'
*.db_create_file_dest='/oradata/orcl'
*.db_domain=''
*.db_name='orcl'
*.db_recovery_file_dest='/u01/app/oracle/fast_recovery_area'
*.db_recovery_file_dest_size=5501878272
*.deferred_segment_creation=FALSE
*.diagnostic_dest='/u01/app/oracle'
*.dispatchers='(PROTOCOL=TCP) (SERVICE=orclXDB)'
*.event='10949 trace name context forever:28401 trace name context forever,level 1:10849 trace name context forever, level 1:19823 trace name context forever, level 90'
*.open_cursors=300
*.pga_aggregate_target=196083712
*.processes=150
*.result_cache_max_size=0
*.sga_target=784334848
*.db_unique_name='orcldg'
*.log_archive_config='dg_config=(ORCLDG,ORCL)'
*.log_archive_dest_1='location=/archivelog valid_for=(all_logfiles,all_roles) db_unique_name=ORCLDG'
*.log_archive_dest_2='service=orcl async valid_for=(online_logfiles,primary_role) db_unique_name=ORCL'
*.log_archive_dest_state_2='ENABLE'
*.log_archive_format='%t_%s_%r.arc'
*.log_archive_max_processes=4
*.remote_login_passwordfile='exclusive'
*.fal_server='ORCL'
*.fal_client='ORCLDG'
*.db_file_name_convert='+DATA','/oradata'
*.log_file_name_convert='+DATA','/oradata'
*.standby_file_management='AUTO'
*.undo_tablespace='UNDOTBS1'

##复制密码文件至备库(备库执行),要在oracle用户下复制
su - oracle
scp oracle@lucifer01:/u01/app/oracle/product/11.2.0/db/dbs/orapworcl1 /u01/app/oracle/product/11.2.0/db/dbs/orapworcl

e.主库添加stanby log文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
sql复制代码set pagesize100
set line222
col member for a60
select * from v$logfile;
select * from v$log;

--需要注意:
--1.stanby log日志大小与redo log日志保持一致
--2.stanby log数量:
standby logfile=(1+logfile组数)*thread=(1+3)*1=4组,需要加4组standby logfile.
--3.thread要与redo log保持一致,如果是rac,需要增加多个thread对应的standby log

ALTER DATABASE ADD STANDBY LOGFILE thread 1
group 5 ('+DATA') SIZE 120M,
group 6 ('+DATA') SIZE 120M,
group 7 ('+DATA') SIZE 120M;

ALTER DATABASE ADD STANDBY LOGFILE thread 2
group 8 ('+DATA') SIZE 120M,
group 9 ('+DATA') SIZE 120M,
group 10 ('+DATA') SIZE 120M;

select * from v$standby_log;

f.备库开启到nomount状态

1
sql复制代码startup nomount pfile='/tmp/initorcldg.ora';

3、 RMAN DUPLICATE

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
bash复制代码##rman恢复备库
rman target sys/oracle@orcl1 AUXILIARY sys/oracle@orcldg

run {
allocate channel prmy1 type disk;
allocate channel prmy2 type disk;
allocate auxiliary channel aux1 type disk;
allocate auxiliary channel aux2 type disk;
DUPLICATE TARGET DATABASE FOR STANDBY FROM ACTIVE DATABASE DORECOVER NOFILENAMECHECK;
}

##主库设置DG参数
ALTER SYSTEM SET LOG_ARCHIVE_CONFIG='DG_CONFIG=(ORCL,ORCLDG)';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_1='LOCATION=+DATA VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=ORCL';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_2='SERVICE=orcldg LGWR ASYNC VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAME=ORCLDG';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_STATE_2=ENABLE;
ALTER SYSTEM SET LOG_ARCHIVE_FORMAT='%t_%s_%r.arc' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_ARCHIVE_MAX_PROCESSES=4;
ALTER SYSTEM SET REMOTE_LOGIN_PASSWORDFILE=EXCLUSIVE SCOPE=SPFILE;
ALTER SYSTEM SET FAL_SERVER=ORCLDG;
ALTER SYSTEM SET FAL_CLIENT=ORCL;
ALTER SYSTEM SET DB_FILE_NAME_CONVERT='/oradata','+DATA' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_FILE_NAME_CONVERT='/oradata','+DATA' SCOPE=SPFILE;
ALTER SYSTEM SET STANDBY_FILE_MANAGEMENT=AUTO;

4、备库开启日志应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sql复制代码alter database open read only;

ALTER DATABASE RECOVER MANAGED STANDBY DATABASE USING CURRENT LOGFILE DISCONNECT FROM SESSION;

select database_role,open_mode from v$database;

DATABASE_ROLE OPEN_MODE
---------------- --------------------
PHYSICAL STANDBY READ ONLY WITH APPLY

SQL> SELECT protection_mode FROM v$database;

PROTECTION_MODE
--------------------
MAXIMUM PERFORMANCE

5、主库开启LOG_ARCHIVE_DEST_STATE_2

1
ini复制代码ALTER SYSTEM SET LOG_ARCHIVE_DEST_STATE_2=ENABLE;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Cloudera CDP 企业数据云测试开通指导 一、申请云

发表于 2021-11-24

简介:基于阿里云部署的 Cloudera CDP 企业数据云平台已经进入公测阶段,本文详细介绍了相关试用/试用流程。

基于阿里云部署的 Cloudera CDP 企业数据云平台已经进入公测阶段,如对该平台感兴趣,可以使用下面的流程进行试用。

如需要直接使用,请参考下面的流程:

每家阿里云企业认证的企业可以申请一次云服务器代金券和30天 CDP 平台的试用机会。

每步详细的流程参考下面章节。

一、申请云服务器代金券

  • 点击填写 阿里云*Cloudera CDP产品免费测试申请表,申请云服务器代金券和基于阿里云部署的 CDP 产品试用。
  • 等待系统进行审批,然后发放云服务器代金券到贵司的企业账户中。

二、企业账号准备

云服务器代金券申请的同时,如果贵司还没有阿里云的企业账号,需要进行阿里云的企业账号注册和认证。

  • 准备企业认证的阿里云账号并记录UID;

账号通过企业认证,如果没有企业认证,需要先按照向导进行企业认证。

注意:云服务器代金券的申请和CDP平台的使用都需要企业账户。

三、CDP 平台购买和交付流程

1、产品购买页购买 CDP 平台产品

进入到 Cloudera CDP企业数据云平台 产品购买页面,套餐选择 CDP企业数据云(测试套餐),购买时长选择试用30天,然后点击立即购买。

2、接受开通协议进行软件开通

3、进行软件支付

对于试用版,这里支付金额为0。对于非试用的用户,则直接支付对应产品套餐的价格。

4、进入软件控制台,点击免登

5、进行用户角色授权

6、配置集群

阿里云平台 CDP 集群配置,用户在此页面配置集群 ECS 实例。用户 CDP 首页购买完CDP 软件后,选择集群配置进入到集群配置页签

7、支付云服务器资源

配置完成后,点击确认并跳转到支付页面。在支付页面可以选择使用前面申请的云服务器代金券,进行云服务器的支付。

8、创建 CDP 集群

支付完成以后,在集群管理页面刷新页面可以看到集群启动状态。一般拉起时间20-30分钟。集群搭建完成后会发送通知信息,然后贵司就可以使用 CDP 集群了。

9、交付 CDP 集群给用户

用户集群创建完成以后,在首次登录 CM 控制台需要确认安全组中为指定 IP 开通 7180端口,用户可以按照如下操作进行开通。

www.yuque.com/aliyunbigda…

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MyBatis批量插入几千条数据,请慎用foreach

发表于 2021-11-24

近日,项目中有一个耗时较长的Job存在CPU占用过高的问题,经排查发现,主要时间消耗在往MyBatis中批量插入数据。mapper configuration是用foreach循环做的,差不多是这样。(由于项目保密,以下代码均为自己手写的demo代码)

1
2
3
4
5
6
sql复制代码<insert id="batchInsert" parameterType="java.util.List">
insert into USER (id, name) values
<foreach collection="list" item="model" index="index" separator=",">
(#{model.id}, #{model.name})
</foreach>
</insert>

这个方法提升批量插入速度的原理是,将传统的:

1
2
3
4
5
go复制代码INSERT INTO `table1` (`field1`, `field2`) VALUES ("data1", "data2");
INSERT INTO `table1` (`field1`, `field2`) VALUES ("data1", "data2");
INSERT INTO `table1` (`field1`, `field2`) VALUES ("data1", "data2");
INSERT INTO `table1` (`field1`, `field2`) VALUES ("data1", "data2");
INSERT INTO `table1` (`field1`, `field2`) VALUES ("data1", "data2");

转化为:

1
2
3
4
5
6
go复制代码INSERT INTO `table1` (`field1`, `field2`) 
VALUES ("data1", "data2"),
("data1", "data2"),
("data1", "data2"),
("data1", "data2"),
("data1", "data2");

在MySql Docs中也提到过这个trick,如果要优化插入速度时,可以将许多小型操作组合到一个大型操作中。理想情况下,这样可以在单个连接中一次性发送许多新行的数据,并将所有索引更新和一致性检查延迟到最后才进行。

乍看上去这个foreach没有问题,但是经过项目实践发现,当表的列数较多(20+),以及一次性插入的行数较多(5000+)时,整个插入的耗时十分漫长,达到了14分钟,这是不能忍的。在资料中也提到了一句话:

1
vbnet复制代码Of course don't combine ALL of them, if the amount is HUGE. Say you have 1000 rows you need to insert, then don't do it one at a time. You shouldn't equally try to have all 1000 rows in a single query. Instead break it into smaller sizes.

它强调,当插入数量很多时,不能一次性全放在一条语句里。可是为什么不能放在同一条语句里呢?这条语句为什么会耗时这么久呢?我查阅了资料发现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
vbnet复制代码Insert inside Mybatis foreach is not batch, this is a single (could become giant) SQL statement and that brings drawbacks:

some database such as Oracle here does not support.

in relevant cases: there will be a large number of records to insert and the database configured limit (by default around 2000 parameters per statement) will be hit, and eventually possibly DB stack error if the statement itself become too large.

Iteration over the collection must not be done in the mybatis XML. Just execute a simple Insertstatement in a Java Foreach loop. The most important thing is the session Executor type.

SqlSession session = sessionFactory.openSession(ExecutorType.BATCH);
for (Model model : list) {
session.insert("insertStatement", model);
}
session.flushStatements();
Unlike default ExecutorType.SIMPLE, the statement will be prepared once and executed for each record to insert.

从资料中可知,默认执行器类型为Simple,会为每个语句创建一个新的预处理语句,也就是创建一个PreparedStatement对象。在我们的项目中,会不停地使用批量插入这个方法,而因为MyBatis对于含有的语句,无法采用缓存,那么在每次调用方法时,都会重新解析sql语句。

1
2
3
4
csharp复制代码Internally, it still generates the same single insert statement with many placeholders as the JDBC code above.
MyBatis has an ability to cache PreparedStatement, but this statement cannot be cached because it contains <foreach /> element and the statement varies depending on the parameters. As a result, MyBatis has to 1) evaluate the foreach part and 2) parse the statement string to build parameter mapping [1] on every execution of this statement.
And these steps are relatively costly process when the statement string is big and contains many placeholders.
[1] simply put, it is a mapping between placeholders and the parameters.

从上述资料可知,耗时就耗在,由于我foreach后有5000+个values,所以这个PreparedStatement特别长,包含了很多占位符,对于占位符和参数的映射尤其耗时。并且,查阅相关资料可知,values的增长与所需的解析时间,是呈指数型增长的。

)

所以,如果非要使用 foreach 的方式来进行批量插入的话,可以考虑减少一条 insert 语句中 values 的个数,最好能达到上面曲线的最底部的值,使速度最快。一般按经验来说,一次性插20~50行数量是比较合适的,时间消耗也能接受。

重点来了。上面讲的是,如果非要用的方式来插入,可以提升性能的方式。而实际上,MyBatis文档中写批量插入的时候,是推荐使用另外一种方法。(可以看 www.mybatis.org/mybatis-dyn… 中 Batch Insert Support 标题里的内容)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
scss复制代码SqlSession session = sqlSessionFactory.openSession(ExecutorType.BATCH);
try {
SimpleTableMapper mapper = session.getMapper(SimpleTableMapper.class);
List<SimpleTableRecord> records = getRecordsToInsert(); // not shown

BatchInsert<SimpleTableRecord> batchInsert = insert(records)
.into(simpleTable)
.map(id).toProperty("id")
.map(firstName).toProperty("firstName")
.map(lastName).toProperty("lastName")
.map(birthDate).toProperty("birthDate")
.map(employed).toProperty("employed")
.map(occupation).toProperty("occupation")
.build()
.render(RenderingStrategy.MYBATIS3);

batchInsert.insertStatements().stream().forEach(mapper::insert);

session.commit();
} finally {
session.close();
}

即基本思想是将 MyBatis session 的 executor type 设为 Batch ,然后多次执行插入语句。就类似于JDBC的下面语句一样。

1
2
3
4
5
6
7
8
9
10
11
ini复制代码Connection connection = DriverManager.getConnection("jdbc:mysql://127.0.0.1:3306/mydb?useUnicode=true&characterEncoding=UTF-8&useServerPrepStmts=false&rewriteBatchedStatements=true","root","root");
connection.setAutoCommit(false);
PreparedStatement ps = connection.prepareStatement(
"insert into tb_user (name) values(?)");
for (int i = 0; i < stuNum; i++) {
ps.setString(1,name);
ps.addBatch();
}
ps.executeBatch();
connection.commit();
connection.close();

经过试验,使用了 ExecutorType.BATCH 的插入方式,性能显著提升,不到 2s 便能全部插入完成。

总结一下,如果MyBatis需要进行批量插入,推荐使用 ExecutorType.BATCH 的插入方式,如果非要使用 的插入的话,需要将每次插入的记录控制在 20~50 左右。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【领域驱动系列3】DDD实践 前言 项目介绍 项目目录 项目

发表于 2021-11-24

往期精选(欢迎转发~~)

  • Java全套学习资料(14W字),耗时半年整理
  • 2年经验总结,告诉你如何做好项目管理
  • 消息队列:从选型到原理,一文带你全部掌握
  • 我肝了三个月,为你写出了GO核心手册
  • RPC框架:从原理到选型,一文带你搞懂RPC
  • 如何看待程序员35岁职业危机?
  • 更多…

实现一个简易版的DDD脚手架,并给出落地的示例。

前言

在前面的《一文带你学习DDD,全是干货!》文章中,里面讲述了一个Demo,虽然有DDD的思想,但是感觉整体很乱,每一层都没有做好隔离,所以我参考小米内部的DDD脚手架,对这个Demo进行了重构,也就诞生了我这个版本,代码已经上传到GitHub中,大家可以自取:github.com/lml20070115…

1
bash复制代码git clone git@github.com:lml200701158/ddd-framework.git

项目介绍

  • 主要是围绕用户、角色和两者的关系,构建权限分配领域模型。
  • 采用DDD 4层架构,包括用户接口层、应用层、领域层和基础服务层。
  • 数据通过VO、DTO、DO、PO转换,进行分层隔离。
  • 采用SpringBoot + MyBatis Plus框架,存储用MySQL。

项目目录

项目划分为用户接口层、应用层、领域层和基础服务层,每一层的代码结构都非常清晰,包括每一层VO、DTO、DO、PO的数据定义。对于每一层的公共代码,比如常量、接口等,都抽离到ddd-common中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
less复制代码./ddd-application  // 应用层
├── pom.xml
└── src
└── main
└── java
└── com
└── ddd
└── applicaiton
├── converter
│ └── UserApplicationConverter.java // 类型转换器
└── impl
└── AuthrizeApplicationServiceImpl.java // 业务逻辑
./ddd-common
├── ddd-common // 通用类库
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── ddd
│ └── common
│ ├── exception // 异常
│ │ ├── ServiceException.java
│ │ └── ValidationException.java
│ ├── result // 返回结果集
│ │ ├── BaseResult.javar
│ │ ├── Page.java
│ │ ├── PageResult.java
│ │ └── Result.java
│ └── util // 通用工具
│ ├── GsonUtil.java
│ └── ValidationUtil.java
├── ddd-common-application // 业务层通用模块
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── ddd
│ └── applicaiton
│ ├── dto // DTO
│ │ ├── RoleInfoDTO.java
│ │ └── UserRoleDTO.java
│ └── servic // 业务接口
│ └── AuthrizeApplicationService.java
├── ddd-common-domain
│ ├── pom.xml
│ └── src
│ └── main
│ └── java
│ └── com
│ └── ddd
│ └── domain
│ ├── event // 领域事件
│ │ ├── BaseDomainEvent.java
│ │ └── DomainEventPublisher.java
│ └── service // 领域接口
│ └── AuthorizeDomainService.java
└── ddd-common-infra
├── pom.xml
└── src
└── main
└── java
└── com
└── ddd
└── infra
├── domain // DO
│ └── AuthorizeDO.java
├── dto
│ ├── AddressDTO.java
│ ├── RoleDTO.java
│ ├── UnitDTO.java
│ └── UserRoleDTO.java
└── repository
├── UserRepository.java // 领域仓库
└── mybatis
└── entity // PO
├── BaseUuidEntity.java
├── RolePO.java
├── UserPO.java
└── UserRolePO.java
./ddd-domian // 领域层
├── pom.xml
└── src
└── main
└── java
└── com
└── ddd
└── domain
├── event // 领域事件
│ ├── DomainEventPublisherImpl.java
│ ├── UserCreateEvent.java
│ ├── UserDeleteEvent.java
│ └── UserUpdateEvent.java
└── impl // 领域逻辑
└── AuthorizeDomainServiceImpl.java
./ddd-infra // 基础服务层
├── pom.xml
└── src
└── main
└── java
└── com
└── ddd
└── infra
├── config
│ └── InfraCoreConfig.java // 扫描Mapper文件
└── repository
├── converter
│ └── UserConverter.java // 类型转换器
├── impl
│ └── UserRepositoryImpl.java
└── mapper
├── RoleMapper.java
├── UserMapper.java
└── UserRoleMapper.java
./ddd-interface
├── ddd-api // 用户接口层
│ ├── pom.xml
│ └── src
│ └── main
│ ├── java
│ │ └── com
│ │ └── ddd
│ │ └── api
│ │ ├── DDDFrameworkApiApplication.java // 启动入口
│ │ ├── converter
│ │ │ └── AuthorizeConverter.java // 类型转换器
│ │ ├── model
│ │ │ ├── req // 入参 req
│ │ │ │ ├── AuthorizeCreateReq.java
│ │ │ │ └── AuthorizeUpdateReq.java
│ │ │ └── vo // 输出 VO
│ │ │ └── UserAuthorizeVO.java
│ │ └── web // API
│ │ └── AuthorizeController.java
│ └── resources // 系统配置
│ ├── application.yml
│ └── resources // Sql文件
│ └── init.sql
└── ddd-task
└── pom.xml
./pom.xml

项目解读

数据库

包括3张表,分别为用户、角色和用户角色表,一个用户可以拥有多个角色,一个角色可以分配给多个用户。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
SQL复制代码create table t_user
(
id bigint auto_increment comment '主键' primary key,
user_name varchar(64) null comment '用户名',
password varchar(255) null comment '密码',
real_name varchar(64) null comment '真实姓名',
phone bigint null comment '手机号',
province varchar(64) null comment '用户名',
city varchar(64) null comment '用户名',
county varchar(64) null comment '用户名',
unit_id bigint null comment '单位id',
unit_name varchar(64) null comment '单位名称',
gmt_create datetime default CURRENT_TIMESTAMP not null comment '创建时间',
gmt_modified datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间',
deleted bigint default 0 not null comment '是否删除,非0为已删除'
)comment '用户表' collate = utf8_bin;

create table t_role
(
id bigint auto_increment comment '主键' primary key,
name varchar(256) not null comment '名称',
code varchar(64) null comment '角色code',
gmt_create datetime default CURRENT_TIMESTAMP not null comment '创建时间',
gmt_modified datetime default CURRENT_TIMESTAMP not null on update CURRENT_TIMESTAMP comment '修改时间',
deleted bigint default 0 not null comment '是否已删除'
)comment '角色表' charset = utf8;

create table t_user_role (
id bigint auto_increment comment '主键id' primary key,
user_id bigint not null comment '用户id',
role_id bigint not null comment '角色id',
gmt_create datetime default CURRENT_TIMESTAMP not null comment '创建时间',
gmt_modified datetime default CURRENT_TIMESTAMP not null comment '修改时间',
deleted bigint default 0 not null comment '是否已删除'
)comment '用户角色关联表' charset = utf8;

基础服务层

仓储(资源库)介于领域模型和数据模型之间,主要用于聚合的持久化和检索。它隔离了领域模型和数据模型,以便我们关注于领域模型而不需要考虑如何进行持久化。

比如保存用户,需要将用户和角色一起保存,也就是创建用户的同时,需要新建用户的角色权限,这个可以直接全部放到仓储中:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码public AuthorizeDO save(AuthorizeDO user) {
UserPO userPo = userConverter.toUserPo(user);
if(Objects.isNull(user.getUserId())){
userMapper.insert(userPo);
user.setUserId(userPo.getId());
} else {
userMapper.updateById(userPo);
userRoleMapper.delete(Wrappers.<UserRolePO>lambdaQuery()
.eq(UserRolePO::getUserId, user.getUserId()));
}
List<UserRolePO> userRolePos = userConverter.toUserRolePo(user);
userRolePos.forEach(userRoleMapper::insert);
return this.query(user.getUserId());
}

仓储对外暴露的接口如下:

1
2
3
4
5
6
7
8
9
java复制代码// 用户领域仓储
public interface UserRepository {
// 删除
void delete(Long userId);
// 查询
AuthorizeDO query(Long userId);
// 保存
AuthorizeDO save(AuthorizeDO user);
}

基础服务层不仅仅包括资源库,与第三方的调用,都需要放到该层,Demo中没有该示例,我们可以看一个小米内部具体的实际项目,他把第三方的调用放到了remote目录中:

领域层

聚合&聚合根

我们有用户和角色两个实体,可以将用户、角色和两者关系进行聚合,然后用户就是聚合根,聚合之后的属性,我们称之为“权限”。

对于地址Address,目前是作为字段属性存储到DB中,如果对地址无需进行检索,可以把地址作为“值对象”进行存储,即把地址序列化为Json存,存储到DB的一个字段中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码public class AuthorizeDO {
// 用户ID
private Long userId;
// 用户名
private String userName;
// 真实姓名
private String realName;
// 手机号
private String phone;
// 密码
private String password;
// 用户单位
private UnitDTO unit;
// 用户地址
private AddressDTO address;
// 用户角色
private List<RoleDTO> roles;
}

领域服务

Demo中的领域服务比较薄,通过单位ID后去获取单位名称,构建单位信息:

1
2
3
4
5
6
7
8
9
java复制代码@Service
public class AuthorizeDomainServiceImpl implements AuthorizeDomainService {
@Override
// 设置单位信息
public void associatedUnit(AuthorizeDO authorizeDO) {
String unitName = "武汉小米";// TODO: 通过第三方获取
authorizeDO.getUnit().setUnitName(unitName);
}
}

我们其实可以把领域服务再进一步抽象,可以抽象出领域能力,通过这些领域能力去构建应用层逻辑,比如账号相关的领域能力可以包括授权领域能力、身份认证领域能力等,这样每个领域能力相对独立,就不会全部揉到一个文件中,下面是实际项目的领域层截图:

领域事件

领域事件 = 事件发布 + 事件存储 + 事件分发 + 事件处理。

这个Demo中,对领域事件的处理非常简单,还是一个应用内部的领域事件,就是每次执行一次具体的操作时,把行为记录下来。Demo中没有记录事件的库表,事件的分发还是同步的方式,所以Demo中的领域事件还不完善,后面我会再继续完善Demo中的领域事件,通过Java消息机制实现解耦,甚至可以借助消息队列,实现异步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
java复制代码/**
* 领域事件基类
*
* @author louzai
* @since 2021/11/22
*/
@Getter
@Setter
@NoArgsConstructor
public abstract class BaseDomainEvent<T> implements Serializable {
private static final long serialVersionUID = 1465328245048581896L;
/**
* 发生时间
*/
private LocalDateTime occurredOn;
/**
* 领域事件数据
*/
private T data;
public BaseDomainEvent(T data) {
this.data = data;
this.occurredOn = LocalDateTime.now();
}
}

/**
* 用户新增领域事件
*
* @author louzai
* @since 2021/11/20
*/
public class UserCreateEvent extends BaseDomainEvent<AuthorizeDO> {
public UserCreateEvent(AuthorizeDO user) {
super(user);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码/**
* 领域事件发布实现类
*
* @author louzai
* @since 2021/11/20
*/
@Component
@Slf4j
public class DomainEventPublisherImpl implements DomainEventPublisher {

@Autowired
private ApplicationEventPublisher applicationEventPublisher;

@Override
public void publishEvent(BaseDomainEvent event) {
log.debug("发布事件,event:{}", GsonUtil.gsonToString(event));
applicationEventPublisher.publishEvent(event);
}
}

应用层

应用层就非常好理解了,只负责简单的逻辑编排,比如创建用户授权:

1
2
3
4
5
6
7
8
9
10
11
java复制代码@Transactional(rollbackFor = Exception.class)
public void createUserAuthorize(UserRoleDTO userRoleDTO){
// DTO转为DO
AuthorizeDO authorizeDO = userApplicationConverter.toAuthorizeDo(userRoleDTO);
// 关联单位单位信息
authorizeDomainService.associatedUnit(authorizeDO);
// 存储用户
AuthorizeDO saveAuthorizeDO = userRepository.save(authorizeDO);
// 发布用户新建的领域事件
domainEventPublisher.publishEvent(new UserCreateEvent(saveAuthorizeDO));
}

查询用户授权信息:

1
2
3
4
5
6
7
8
9
10
java复制代码@Override
public UserRoleDTO queryUserAuthorize(Long userId) {
// 查询用户授权领域数据
AuthorizeDO authorizeDO = userRepository.query(userId);
if (Objects.isNull(authorizeDO)) {
throw ValidationException.of("UserId is not exist.", null);
}
// DO转DTO
return userApplicationConverter.toAuthorizeDTO(authorizeDO);
}

细心的同学可以发现,我们应用层和领域层,通过DTO和DO进行数据转换。

用户接口层

最后就是提供API接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码@GetMapping("/query")
public Result<UserAuthorizeVO> query(@RequestParam("userId") Long userId){
UserRoleDTO userRoleDTO = authrizeApplicationService.queryUserAuthorize(userId);
Result<UserAuthorizeVO> result = new Result<>();
result.setData(authorizeConverter.toVO(userRoleDTO));
result.setCode(BaseResult.CODE_SUCCESS);
return result;
}

@PostMapping("/save")
public Result<Object> create(@RequestBody AuthorizeCreateReq authorizeCreateReq){
authrizeApplicationService.createUserAuthorize(authorizeConverter.toDTO(authorizeCreateReq));
return Result.ok(BaseResult.INSERT_SUCCESS);
}

数据的交互,包括入参、DTO和VO,都需要对数据进行转换。

项目运行

  • 新建库表:通过文件”ddd-interface/ddd-api/src/main/resources/init.sql”新建库表。
  • 修改SQL配置:修改”ddd-interface/ddd-api/src/main/resources/application.yml”的数据库配置。
  • 启动服务:直接启动服务即可。
  • 测试用例:
    • 请求URL:http://127.0.0.1:8087/api/user/save
    • Post body:{“userName”:”louzai”,”realName”:”楼”,”phone”:13123676844,”password”:”***“,”unitId”:2,”province”:”湖北省”,”city”:”鄂州市”,”county”:”葛店开发区”,”roles”:[{“roleId”:2}]}

结语

这段时间主要是学习DDD如何落地,也一直想写个DDD的Demo,感觉这次学习周期稍微有点长。下一篇文章会将之前文章的重点内容,包括近期对DDD的学习,以及一些自己的理解,再出一篇“理论到实践”相关的文章,算是对自己近一个多月学习的总结。

欢迎大家多多点赞,更多文章,请关注微信公众号“楼仔进阶之路”,点关注,不迷路~~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一文读懂k8s中的QoS机制 导读 QoS介绍 Qos In

发表于 2021-11-24

这是我参与11月更文挑战的第9天,活动详情查看:2021最后一次更文挑战

pexels-miguel-constantin-montes-2623968.jpg

导读

QoS(Quality of Servcie)对于大家并不陌生,本义是对外承诺的服务质量保证,越重要的服务承诺的要求就会越高,关于QoS更详细的说明可以参考参考文档[1]。

在Kubernetes语义下,QoS是针对POD来定义的,描述了POD在被调度和驱逐时的优先顺序,它对POD的生命周期有直接的影响。POD的QoS类别有三种,分别是Guaranteed、Burstable和BestEffort。具体含义请继续往下读~!

QoS介绍

QoS起源于网络的服务质量,它提供了针对不同用户或者不同数据流采用相应不同的优先级,或者是根据应用程序的要求,保证数据流的性能达到一定的水准。QoS的保证对于容量有限的网络来说是十分重要的,特别是对于流多媒体应用,例如VoIP和IPTV等,因为这些应用常常需要固定的传输率,对延时也比较敏感。[2]

Qos In Kubernetes

在Kubernetes系统中,QoS(Quality of Servcie)级别决定了POD的调度和驱逐优先级,从而可以为集群之上的用户提供可靠的符合预期的服务保障。QoS级别越高,获得的保障支持优先级就越高。

在Kubernetes中,QoS并不是由用户直接设置的,而是由Kubernetes自己根据用户对资源的请求设置计算得到的,总共有三种级别,由高到低分别是:Guaranteed、Burstable和BestEffort。下面分别介绍三种等级如何设置的。

Guaranteed

资源设置

当POD内所有容器的的资源申请requests的cpu和memory与limits的cpu和memory想等时,其QoS级别将会被设置为Guaranteed。一个快捷的方式是只设置limits的cpu和memory,Kubernetes会自动设置requests与limits相等的值。

1
2
3
4
5
6
7
8
9
10
11
yaml复制代码Kind: Pod
spec:
containers:
- image: busybox
resource:
requests:
cpu: 200m
memory: 10Mi
limits:
cpu: 200m
memory: 10Mi

对调度的影响

Kubernetes调度器只会讲Guaranteed类型的POD调度到完全满足资源请求的节点上。

如果kubelet上报了一个节点状态DiskPressure,Guaranteed POD将不会被调度到这个节点上。当节点的根文件系统或者镜像文件系统的可用磁盘空间和inodes数达到驱逐的阈值时,DiskPressure节点状态将会被触发。当一个节点上报DiskPressure状态时,调度器将停止将新的Guaranteed POD调度到其上。

独占CPU

在Kubernetes的默认CPU管理策略”None”下,Guaranteed POD只能使用节点上的共享CPU池资源,该共享CPU池资源包含节点上所有CPU资源减去kubelet使用–kube-reserved或–system-reserved预留的资源。

在static CPU管理策略下,Guaranteed POD可以申请到独占的CPU资源。在这个策略下,Guaranteed POD的cpu requests值必须是整数值才可以真正获得独占CPU,否则仍然是获得共享CPU池资源。

Burstable

资源设置

如果一个POD中至少一个容器只设置了requests的cpu或memory,或者都设置但是与limits的设置不同,则该POD的QoS将被设置为Burstable。

1
2
3
4
5
6
7
8
yaml复制代码Kind: Pod
spec:
containers:
- image: busybox
resource:
requests:
cpu: 200m
memory: 10Mi

对调度的影响

调度器不能确保Burstable POD可以被调度到完全满足其资源需求的节点。

Burstable POD同样不能被调度到已经上报了DiskPressure状态的节点。调度器不会将任何新的Burstable POD调度到DiskPressure状态的节点。

独占CPU

在”None” CPU管理策略下,Burstable POD必须与BestEffort、Guaranteed POD一起共享资源池,不能被分配独占CPU资源。

BestEffort

资源设置

如果POD中所有的容器都没有设置requests和limits的cpu和memory,则该POD的QoS将被设置为BestEffort。

1
2
3
4
yaml复制代码Kind: Pod
spec:
containers:
- image: busybox

对调度的影响

调度器同样不保证BestEffort POD被调度到有充足资源的节点。然而,它们可以使用节点上任意数量的可用的CPU和memory资源。有时候当BestEffort POD贪婪的占用资源,给其他POD申请预留的资源不足时,将会导致与其他POD的资源竞争。

BestEffort POD不能被调度到状态是DiskPressure和MemoryPressure的节点。当节点可用内存低于预定义的阈值时,将会上报MemoryPressure状态。调度器就会停止调度任何新的BestEffort POD到这些节点上。

独占CPU

与Burstable POD相同,BestEffort POD只能使用共享资源池,不能独占CPU资源。

驱逐

接下来让我们一起看一下,在节点资源不足,例如内存耗尽时,QoS是如何影响kubelet驱逐POD的。

kubelet如何驱逐Guaranteed、Burstable、BestEffort POD

当节点可用计算资源较低时,由kubelet触发POD驱逐。这些驱逐意味着要回收资源以免系统出现OOM。DevOPs可用设置资源的阈值,当这个阈值达到时就由kubelet触发pod驱逐。

POD的QoS级别确实会影响kubelet挑选驱逐对象的顺序。kubelet首先驱逐超出资源请求的BestEffort和Burstable级别的POD。驱逐的顺序依赖于被分配的驱逐优先级以及所消耗资源超出资源请求的量。

Guaranteed和Burstable Pod的资源使用量低于请求值时会有最低的驱逐优先级,因而不会被驱逐。

在发生DiskPressure状态的节点上,kubelet首先驱逐的是BestEffort POD,然后是Burstable POD。只有当前节点不再有BestEffort和Burstable POD之后,才开始驱逐Guaranteed POD。

Guaranteed、Burstable、BestEffort POD的节点OOM行为是什么

节点在kubelet回收资源之前发生OOM时,oom_killer将会基于容器的oom_score值直接杀掉容器。oom_killer会为每一个容器计算出来一个oom_score值,计算方式是基于容器使用的资源占其所申请资源的比例再加上oom_score_adj值。

每个容器的oom_score_adj由其所属的POD的QoS级别所确定。

Quality of Service oom_score_adj
Guaranteed -998
BestEffort 1000
Burstable min(max(2, 1000 - (1000 * memoryRequestBytes) / machineMemoryCapacityBytes), 999)

oom_killer首先会结束QoS级别最低的POD并且使用资源超出请求量最多的容器。这意味着较高QoS级别POD比低级别POD的容器有更低的被杀掉的可能性。但是如果较高级别POD的容器内存使用量超出请求量更大时也可能会被优先清除掉,因为oom_score不仅仅不与QoS有关系,还取决于容器实际内存的使用量。

深入思考

为什么会有这样的三个不同级别,只使用一个级别不可以吗?

我们先回到现实的需求,差不多是这样的

  1. 资源利用率最大化,资源超卖越多越好
  2. 关键业务要有一定的稳定性保障

为了同时满足这个几乎是相反方向的目标,我们必须要提供更灵活的服务级别,于是只使用一个级别不能同时满足这两个目标,需要有不同的级别来保障有不同要求的服务。对于关键性较低,对稳定性保障要求也较低的服务我们可以适当的超卖资源,从而获得资源更大程度的利用率;而对于关键的服务,则允许服务资源存在一定的浪费,从而实现了业务稳定优先。

为什么分三个呢?个人理解主要是因为Kubernetes中QoS主要取决于requests和limits的CPU和memory的值,这两组四个值不同的配置可以分为三种场景,正好可以对应到当前的三种界别,因而分为三种级别是最自然的一种方式。

在实际使用中,Burstable和BestEffort差别并不是很大,而且从业务稳定性角度考虑,系统工程师会提前规划系统的容量,在出现集群资源不足之前就应该提前介入,而不是依赖kubelet或者oom_killer的行为。kubelet和oom_killer更多的是在业务出现异常资源申请时可以及时制止以维护整体集群的稳定。

实战

在实际使用中应该怎样在保证业务稳定的同时尽量提高资源利用率呢?

我推荐的核心原则是依据业务的重要程度以及集群的特性。例如在生产环境集群,推荐所有POD设置为Guaranteed级别,因为我认为生产环境的业务是最重要的,资源超卖相对来说不重要。当然怎么最大可能的降低资源浪费就是一个很重要的问题了。而在线下的测试开发集群则不同,对业务的稳定性要求不高,而对资源超卖相对变得更高,此时POD都可以设置为Burstable或者BestEffort,我们当前测试开发集群都是Burstable。

生产环境如何降低资源的浪费?主要出发点是基于历史的资源使用率设置一个稍微超出一点的请求值,为了应对流量的突增或者抖动,可以附以HPA来保证在请求量变大时可以动态扩容,不需要太多服务实例时再自动缩容。

QoS总结

  • 通过requests和limits来定义QoS等级

  • 内存不足时,POD被驱逐的级别顺序是BestEffort、Burstable、Guaranteed
  • oom_killer杀掉容器时不仅参考容器所属POD的QoS级别,还会参考容器实际资源使用量

参考文档

  1. baike.baidu.com/item/QoS
  2. zh.wikipedia.org/wiki/服务质量

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

35 Codis VS Redis Cluster:我该

发表于 2021-11-24

Codis 的整体架构和基本流程

  • 基本概念:codis和redis cluster都是切片集群方案
  • codis架构
+ codis server:二次开发的**Redis实例**,增加**额外的数据结构**,支持**数据迁移**操作,处理数据的**读写请求**\
+ codis proxy:接收客户端请求,并把请求转发给 **codis server  client与server间的代理**\
+ Zookeeper:保存集群元数据(**数据位置信息**和**codis proxy信息**)\
+ codis dashboard 和 codis fe:共同组成了集群管理工具\


    - codis dashboard 负责执行集群管理工作,包括增删 codis server、codis proxy 和进行数据迁移\
    - codis fe 负责提供 dashboard 的 Web 操作界面,便于我们直接在 Web 界面上进行集群管理\
  • Codis怎么处理请求\
+ 使用codis dashboard 设置 codis server 和 codis proxy 的访问地址,让proxy和server建立连接
+ **客户端**(**负责进行网络请求**的三方包 实现**操作接口**)直接和 codis proxy(是支持 Redis 的 RESP 交互协议,可以从单例切换到codis) 建立连接\
+ codis proxy 接收到请求,就会查询**zookeeper**请求数据和 codis server的**映射关系**,并把请求转发给相应的 **codis server** 进行处理\
+ 最后结果返回给**proxy**,proxy再返回**客户端**
  • 切片集群的关注点:
+ 数据分布
+ 集群扩容和数据迁移
+ 客户端兼容性
+ 可靠性保证

\

Codis 的关键技术原理

数据分布

  • 是通过逻辑槽slot完成映射
+ Codis 集群一共有 1024 个 Slot,编号依次是 0 到 1023
+ Slot 手动分配给 codis server,每个 server 上包含一部分 Slot(可以选择自动平均分配)\
+ 会使用 **CRC32** 算法计算数据 key 的哈希值(64位int),并把这个哈希值对 1024 取模(得到Slot的编号)\
+ 查询zookeeper根据slot获取server编号
  • 路由
+ Slot 和 codis server 的映射关系称为数据路由表(简称路由表)
+ 在 codis dashboard 上分配好路由表后,dashboard 会把路由表发送给 **codis proxy**,同时,**dashboard** 也会把路由表保存在 **Zookeeper** 中\
+ codis-proxy 会把**路由表缓存在本地**,当它接收到客户端请求后,直接查询本地的路由表,就可以完成正确的请求转发了\
  • Codis 和 Redis Cluster区别\
+ Codis 中的路由表是我们通过 codis dashboard 分配和修改的,并被保存在 Zookeeper 集群中
+ 修改的路由发送给proxy(负责转发的中间层)\
+ 在 Redis Cluster 中,数据路由表是通过每个实例相互间的通信传递的,最后会在每个实例上保存一份(没有中间层,需要依靠节点互相传递)\
+ Redis Cluster会消耗较多的集群网络资源\

集群扩容和数据迁移

  • 扩容的方向:增加 codis server 和增加 codis proxy
  • 增加 codis server\
+ 启动新的 codis server,将它加入集群\
+ 把部分数据迁移到新的 server\
  • 增加 codis proxy\
+ 当新增了 proxy 后,Zookeeper 上会有最新的访问列表\
+ 客户端也就可以从 Zookeeper 上读取 proxy 访问列表,把请求发送给新增的 proxy\
  • 数据迁移流程(渐进式的迁移)
+ 在源 server 上,Codis 从要迁移的 Slot 中随机选择一个数据,发送给目的 server
+ 源server接收到目标ack后,删除本地数据
+ 不断重复上面的迁移过程,直到要迁移的 Slot 中的数据全部迁移完成
  • 可选择迁移方式
+ 同步:在数据从源 server 发送给目的 server 的过程中,**源 server 是阻塞的**(有潜在的风险 **)** ,无法处理新的请求操作\
+ 异步:当源 server 把数据发送给目的 server 后,就可以处理其他请求操作了,不用等到目的 server 的命令执行完,都到ack消息后,删除本地数据


    - **迁移过程中,数据只读,保证数据一致性**)
    - 对于 bigkey,异步迁移采用了**拆分指令**的方式进行迁移,避免大数据的迁移\
    - 如果迁移过程中失败,会破坏数据一致性,在目标server上设置了过期时间,迁移失败则过期删除
    - SLOTSMGRTTAGSLOT-ASYNC 批量迁移key的数量

客户端兼容性

  • 使用 Redis 单实例时,客户端只要符合 RESP 协议,就可以和实例进行交互和读写数据
  • codis proxy 直接和客户端连接,codis proxy 是和单实例客户端兼容的(保证兼容性)\

可靠性保证

  • 组件越多,潜在的风险点也就越多
  • codis server\
+ 本质上还是Redis 实例
+ Redis 的主从复制机制和哨兵机制在 codis server 上都是可以使用的\
+ Codis 给每个 server 配置从库,并使用哨兵机制进行监控,当发生故障时,主从库可以进行切换,从而保证了 server 的可靠性\
  • Zookeeper\
+ 使用多个实例来保存数据
  • codis proxy\
+ 稳定性由Zookeeper保证
  • codis dashboard 和 codis fe
+ 它们主要提供配置管理和管理员手工操作,负载压力不大,所以,它们的可靠性可以不用额外进行保证了\

\

集群扩容和数据迁移如何进行?

  • 从稳定性和成熟度来看,Codis 应用得比较早,在业界已经有了成熟的生产部署
  • 从业务应用客户端兼容性来看,连接单实例的客户端可以直接连接 codis proxy,而原本连接单实例的客户端要想连接 Redis Cluster 的话,就需要开发新功能\
  • Codis server 是基于开源的 Redis 3.2.8 开发的,所以,Codis 并不支持 Redis 后续的开源版本中的新增命令和数据类型,并且不支持部分命令\
  • 从数据迁移性能维度来看,Codis 能支持异步迁移,异步迁移对集群处理正常请求的性能影响要比使用同步迁移的小\

\

总结

  • codis组成
+ codis server
+ codis proxy\
+ Zookeeper codis也支持etcd 或本地文件系统保存元数据信息\
+ codis dashboard 和 codis fe\
  • 可以使用多套codis隔离业务

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

频繁操作zookeeper节点,客户端收不到监听通知 故障现

发表于 2021-11-24

这是我参与11月更文挑战的第20天,活动详情查看:2021最后一次更文挑战

故障现象

生产上某个应用有45个节点,每个节点集成有一个zk客户端,即这45个zk客户端监听同一条zk路径。当该zk路径节点的值被更新,这45个节点会收到节点变动的通知,进行相关业务处理。

某次并行更新6次该zk节点的值(虽然是并行,但是每次setData的时候还是有个时间差,可能比较小)。发现有部分客户端没有通知(因为有些应用节点执行结果不预期)。

因为是生产上,当时运维重启这45个应用实例节点 获取到监听路径的最新值恢复了。

后来是查看日志发现,每个节点监听到通知的时候会打印一条日志,则45个节点,监听到6次变动,应当有45*6=270条日志,实际上可能只100多条,或者200条左右,总有些客户端在某一次监听路径的值变动的时候,没有收到通知。但是最后一两次的通知,全部收到了。

本地复现

因为是在生产环境,没法直接进行故障复盘。

zk客户端使用的curator的API封装的,监听器用的DataCache接口。

我只能本地模拟,在我本机上启动50个zk客户端。zk服务器用的是测试环境,考虑到可能存在网络因素,就不能在本机启动zk服务器来模拟。

问题是复现了,时间有点久了,过程就不再描述。

问题定位

通过curator的相关源码跟踪及其它场景的一些模拟,我把问题范围缩小在zookeeper原生客户端、网络、zookeeper服务器这3个方面。

使用原生的zookeeper客户接口测试,代码很简单,也没什么逻辑:
在这里插入图片描述

就是监听到节点变动,获取节点数据、重新注册监听器,打印一行信息,完全没业务逻辑。并且该客户端只启动一个。
然后再启动一个客户端,循环更新该节点的值5次:
在这里插入图片描述

问题出现了,监听客户端偶尔会打印日志不足5条,比如这次:
在这里插入图片描述
只监听到了3次。

因为问题定位时间有点紧,还有其它事情要忙,没有其它原因,是真的不想跟源码的。但是目前还不确定是zk客户端代码的原因还是服务器这里有原因。

没有办法,为了时间原因,这时候我能想到的只有抓包试下了,看下是服务器没发送通知,还是客户端没收到或者是收到了,处理的时候出问题了。

因为服务器我这边不方便登录,先抓客户端(我本机),本机好抓,用wireshark看起来也方便。
然后,几次尝试终于出现了一次问题:
在这里插入图片描述

客户端收到通知不足5条,只收到了0、1、2、4。另一端设置数据的时候,是正常设置的:
在这里插入图片描述

下面是抓包的一次分析内容我就说下分析结果:

  1. 服务器端实际发来了4条通知
  2. 客户端每次收到通知的时候,下次tcp请求是拉取数据(此时请求中会注册监听器的)
  3. tcp拉取数据的请求已发送,但这个时候服务器返回的下一条数据变更的通知
  4. 服务器返回上次请求的zk路径的数据。
    (注意这个顺序)
    服务器发送通知-》客户端收到通知,请求获取路径节点的值-》服务器返回下一条数据变更通知-》服务器返回上次客户端请求数据的值-》客户端收到了这次的通知,请求这次通知的数据…….

p.s. 设置节点的数据客户端也在我本机,上面分析的时候,我是把这个客户端的数据设置的请求报文过滤了。
问题出现在上面这个顺序,前几条报文都很正常,当客户端收到节点值变更为3的时候,客户端发出了拉取数据报文的请求,然后服务器返回了节点4的数据值。。。最终也没有返回节点值为4的变更通知,客户端本来是在节点值变更3通知的时候,发出的拉取请求却收到的值为4,值3丢失了,并且节点变为4的时候,最终也没有通知过来。

在这里插入图片描述
(IP不方便展示。虽然是内网)

上面是抓取的报文:按顺序:

  1. 设置数据的zk客户端设置path的值为4
  2. 监听变动的zk客户端请求获取path的值(这是节点变更为3的通知,此时它想获取的值是3呀!)
  3. 忽略
  4. zk服务器返回监听的zk客户端,Path的值为4。
    后面的ACK可以忽略了,之后的两条传输报文与此无关,没有其它相关通知和请求了。最终连接断开。

这个时候没有办法了,看来问题在zk服务器这里,只等翻源码了。

最终从启动类一开始,追踪到WatcheManager类,下面有段核心代码是触发监听的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
java复制代码    public Set<Watcher> triggerWatch(String path, EventType type, Set<Watcher> supress) {
WatchedEvent e = new WatchedEvent(type,
KeeperState.SyncConnected, path);
HashSet<Watcher> watchers;
synchronized (this) {
// 这是watchTable变量在前面的声明
/* private final HashMap<String, HashSet<Watcher>> watchTable =
new HashMap<String, HashSet<Watcher>>();

private final HashMap<Watcher, HashSet<String>> watch2Paths =
new HashMap<Watcher, HashSet<String>>();*/

// 每个路径注册的监听器在watchTable这个map里放着
// 这段代码是在同步块内,增加监听器的方法也存在同步操作,并且是和这段代码竞争同把锁
// 此时监听器先被移除了。
// 这意味着,新的监听器未注册进来的时候,此时发生节点变动,未注册监听器
// 的客户端自然不会收到通知
watchers = watchTable.remove(path);
if (watchers == null || watchers.isEmpty()) {
if (LOG.isTraceEnabled()) {
ZooTrace.logTraceMessage(LOG,
ZooTrace.EVENT_DELIVERY_TRACE_MASK,
"No watchers for " + path);
}
return null;
}
for (Watcher w : watchers) {
HashSet<String> paths = watch2Paths.get(w);
if (paths != null) {
paths.remove(path);
}
}
}
for (Watcher w : watchers) {
if (supress != null && supress.contains(w)) {
continue;
}
// 通知客户端
w.process(e);
}
return watchers;
}

看下我上面加的中文注释。

再加上上面抓的包,可以分析到,服务器端节点值变更为3的通知发给客户端的时候,该客户端的监听器已经被移除了,然后的另个设置数据的客户端设置节点值为4的请求已经发出去了,大约0.05ms后,监听客户端发现获取数据的请求(此时包括注册监听器),但是可能已经晚了,服务器数据变为4,向客户端发送通知的时候,监听器还没注册上(触发监听的增加监听器也存在同步操作的)。

我计算了下时间,从收到节点变更通知,到注册新监听请求发出,我本机上大概经过了30ms左右。

从设置节点的值到服务器发送到节点变动耗时在33ms左右,因为没有在服务端抓包还是不好估计传输耗时。

但是实际生产环境节点压力更大,业务侧的处理能力也不能保证。所以如果是在旧的监听器失效,新的尚未注册的时候,节点变动,自然是不收到通知的。

解决办法

既然操作频繁导致,数据更新的时候,新的监听器还没注册过来,或许在传输过程中也可能是服务器收到请求了还没注册上。

那操作避免过于频繁不就好了,给它点时间 。就像我,动态获取应用实例监听点(zk客户端的数目),并行改为串行,并且适当的计算一个合理的延时(如果有必要,就在两点变动中加上延时,避免客户端监听器来不及注册)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

PHP 基金会,是个好事 (PHP Foundation)

发表于 2021-11-24

事情原委

以下这一段来自 PHP/Laravel 社区帖子:博客:PHP 主力开发因生活压力不再参与维护,PHP 基金会加速成立

2021 年 11 月 23 日,PHP 主力开发 Nikita Popov 发文宣布 将不再参与 PHP 的主要维护,从月底开始逐渐淡出。据说 Nikita Popov 从他还在上高中时就开始参与 PHP 的开发,彼时为 2011 年,截至目前已有 10 年之久,他为 PHP 贡献诸多,可谓失之一臂。

这一决定背后的原因之一是主要贡献者之一 Nikita Popov 决定将他的重点从 PHP 转移到 LLVM。Nikita Popov 于 2011 年开始从事 PHP 工作,并在 JetBrains 从事 PHP 工作近三年的时间,为那里的三个主要版本做出了重大贡献 —— PHP 7.4、PHP 8.0 和 PHP 8.1。

Nikita Popov 虽然没有明说离开的原因,但是从相关信息可以了解,这应该是迫于生活压力,因为谁都知道维护 PHP 开源项目没啥收入。

不过因此,PHP 社区宣布成立 PHP 基金会,详细说明见 The New Life of PHP – The PHP Foundation。基金会力在寻求能够持续维护 PHP 的维护人员,从而使其良性发展。

其实,PHP 基金会很早就有被讨论,但不知为何,没能向前发展,Nikita Popov 的离开促使这一几乎被遗忘的议程匆匆落地。为了能够尽快找到接班人,也是不再让悲剧继续发生,基金会目前已经在 opencollective.com/phpfoundati… 发起赞助。

个人想法

这两天在社区里 『PHP 基金会』 的事情刷屏了。

  • 宣布成立 PHP 基金会
  • PHP 主力开发因生活压力不再参与维护,PHP 基金会加速成立

不管 Nikita 离开原因的真假,PHP 项目开发者都应该有一些财务回馈。开源不易啊,开发工作十个年头走过来,一路上看到太多的优秀项目到最后放弃维护,都是因为现实生活中的窘迫。梦想很美好,生活很残酷。

PHP 作为开源软件,目前不归属于任何一家商业机构,作为一个公益项目在运行。长此以往,还是有很多弊端的。最大的弊端就是会出现没有稳定开发者的问题。

所以 『PHP 基金会』是个大好事。

小小的请求

LearnKu 拥有国内最大 PHP 论坛(Google 搜索排第一)和 Laravel 论坛(Google 搜索排第一),作为 LearnKu.com 站长,觉得推广这件事情是我的使命。另一方面,作为 PHP 从业者,多年来一直受 PHP 的恩泽,也算是靠着 PHP 在养家糊口,一直对 PHP 开发者心存感激。

在这里我有一个小小的请求,希望看到这个信息的朋友,把这件事情传播出去。

让更多人知道,最重要的,让更多的受惠于 PHP 的公司和商业机构知道。个人的捐赠毕竟比较有限,且多是一次性捐赠,商业机构长期稳定的捐赠(使用每月捐赠的渠道)才是长久之计:

PHP 基金会是个好事 (PHP Foundation)

捐赠的链接

捐赠请前往: opencollective.com/phpfoundati…

OpenCollective 平台目前支持信用卡和 Paypal ,很遗憾暂不支持国内流行的支付渠道。

无法捐赠的朋友,不用沮丧,为我点个赞,让更多人能看到。或者将此链接分享到群、朋友圈,或者自己写博文或者订阅号文章来帮忙宣传这件事,有力出力嘛。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…212213214…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%