开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

服务网格 Istio 全系列之五 —— Istio 今生 1

发表于 2021-09-11

1 前言

上章介绍了 Istio 前世,讲述了微服务架构模型的演变史,也讲了微服务今天所遭遇的问题和面临的尴尬窘境,这里我们再复习一下。


目前微服务治理遭遇的问题:

服务治理方式不统一:不同服务治理的方式会引入不同的中间件,而这些中间件的技术标准和维护标准都不同。因此运维人员或者架构人员必须掌握每种中间件的使用方法,很多时候这对一个人力资源有限的科技公司并不现实。

重复造轮子:微服务架构是允许多语言栈、多技术栈的,但不同的技术栈针对通信、支撑服务、服务安全、服务监控、熔断/降级/限流等通用技术问题却需要各自的解决方案,实在是成本的浪费。

服务治理缺乏标准化:由于服务治理缺乏标准化,因此微服务治理的好坏全依靠技术人员个人的能力、经验和水平,这就有点像手工作坊时代,器物优质全靠工匠。但是无标准化显然不符合科技发展的轨迹。

为了解决上面微服务治理中的痛点,大家普遍的诉求在于能不能有这么一个平台,既可以无侵入、透明、用户无感知的插入到现有的分布式微服务架构中,同时又可以解决一些通信所必须考虑的普遍问题(服务发现、负载均衡、超时重试、熔断/限流、监控、访问控制、认证授权等),将这些问题的解决方案统一下沉到平台层,而不再依靠引入第三方中间件(zookeeper、nginx、sentinal、hystrix、pinpoint/zipkin、spring security),并且所有的维护方式统一且标准化。


于是服务网格出现了,Istio 也出现了,而且一切出现得都是这么自然。

《圣经》旧约-创世纪篇:

原始太初,上帝创造了天地。地面一片空虚混沌,渊面黑暗,只有上帝的灵运行在水面上。上帝说:“要有光!”于是,就有了光。上帝把光和暗分开,把光称为白昼,把暗称为黑夜。夜晚过去后,清晨接着来临,这是第一天。

上帝说:“诸水之间要有穹苍,将水分为上下。”于是创造了穹苍,把水上下分开。他称穹苍为“天空”。夜晚过去,清晨接着来临,这是第二天。

……

那么服务网格是什么?Istio 又是什么呢?

2 何为服务网格(service mesh)

下面看看 Istio 是怎么描述服务网格的:

The term service mesh is used to describe the network of microservices that make up such applications and the interactions between them. As a service mesh grows in size and complexity, it can become harder to understand and manage. Its requirements can include discovery, load balancing, failure recovery, metrics, and monitoring. A service mesh also often has more complex operational requirements, like A/B testing, canary rollouts, rate limiting, access control, and end-to-end authentication.

个人翻译如下:

服务网格是对微服务组成的一个可以互相通信的网络进行治理的规范。随着微服务的增长,服务网格也会变得越来越复杂和难以理解。服务网格治理的内容除了服务发现、负载均衡、失败恢复、指标收集、监控之外,还应该具有更复杂的运维要求,比如 A/B 测试、金丝雀发布、流量限制、访问控制和端到端认证。

3 何为 Istio

上面介绍了服务网格,下面再来介绍一下 Istio,仍然引用 Istio 官网的定义:

Cloud platforms provide a wealth of benefits for the organizations that use them. However, there’s no denying that adopting the cloud can put strains on DevOps teams. Developers must use microservices to architect for portability, meanwhile operators are managing extremely large hybrid and multi-cloud deployments. Istio lets you connect, secure, control, and observe services.

At a high level, Istio helps reduce the complexity of these deployments, and eases the strain on your development teams. It is a completely open source service mesh that layers transparently onto existing distributed applications. It is also a platform, including APIs that let it integrate into any logging platform, or telemetry or policy system. Istio’s diverse feature set lets you successfully, and efficiently, run a distributed microservice architecture, and provides a uniform way to secure, connect, and monitor microservices.

个人感觉介绍很啰嗦,建议你也别看了,我给你简单列举一下重点即可:

Istio 是 service mesh 的具体解决方案。她就像一个尤物,不仅能满足服务网格规定的一切苛刻要求之外,还贴心地为你准备了一整套标准化、规格化的豪华级国际服务,等待着您的临幸。更难能可贵地是,拥有这么多优秀品质的她,竟然还是免费的!爽不爽!

4 为什么使用 Istio

下面是你选择 Istio 的一些理由:

1 对 HTTP、gRPC、WebSocket 和 TCP 网络流量的自动负载均衡

2 通过丰富多样的路由规则、重试、故障转移和故障注入机制实现对流量行为进行细粒度控制

3 通过可插拔的策略层(联想成过滤器)和 API 实现对访问的控制、流量以及资源配额的限制

4 集群入口、集群内部、集群出口所有网络流量的全方位跟踪、记录和度量

5 保证服务之间通信的安全性

5 istio 核心特征

Istio 官方宣扬的特性是 traffic management(流控)、secure(安全)、polices(策略)、observability(可观察)。个人感觉这样的叙述太佶屈聱牙,一点都不口语化。

5.1 traffic management

这个好理解,本质就是网络流量的管理。就像早晚高峰车辆限行,以及交警在发生交通事故疏导新路,这些都是在做流量的控制和路由。


其实流量管理并不是服务网格化才出现的,早期的微服务时代就已经有流量控制了,比如负载均衡、熔断、限流、降级等,只不过早期这些功能的实现依赖中间件(比如 nginx、hystrix),如今服务网格时代,这些功能统一下沉到基础平台 Istio 去实现了。


Istio 的流控主要是通过 Envoy 组件实现。有关技术细节,哥以后会专门告诉你。

5.2 secure

说到 Istio 的 secure 其实是有个范围的。这里的 secure 并不是没有边界,它主要是指微服务之间通信的 secure,即 pod 对 pod、service 对 service 层面通信的 secure。众所周知,istio 是 google、ibm 以及 lyft 公司 3p 后的产物。而 istio 的 secure 正是脱胎于 google 的 ALTS(应用层传输安全)这项技术,该项技术用于验证 google 服务之间的通信,保证传输中数据的安全,即应用层服务到服务通信的防护方式。这些功能早先在微服务时代对标就是 jwt、oauth2 等技术规范。


Istio secure 主要的功能有 ACLS(访问控制)、authentication(认证,即证明你是谁)、authorization(授权,即允许你干啥)。


Istio secure 功能通过 Citadel 这个组件实现。有关技术细节,哥以后会专门告诉你。

5.3 policies

Istio policies 职责如下:
  • 动态限制服务通信的网络速率
  • 限制访问服务、设置黑、白名单
  • 网络包头信息的重写或者重定向
不仅如此,Istio 也允许添加自定义策略,通过 Istio 提供的 policy adapter 跟 Istio 集成在一起。


注意不要将 policies 跟 secure 进行混淆,policies 更多是人为进行干预控制,而 secure 重点在于安全。


Istio policies 功能实现是通过 mixer 组件实现的。有关技术细节,哥以后会专门告诉你。

5.4 observability

observability 特性是指提供给你多种工具实现全方位、立体式对集群入口、集群内部、集群出口的流量进行监控、跟踪和度量。微服务早期时代监控方式是 agent 或者中间件,比如:zabbix、pinpoint、zipkin 等。


Istio observability 功能实现是通过 mixer 组件实现的。还是老样子,有关技术细节,哥以后会专门告诉你。

6 多平台支持

Istio 可以支持多平台,比如 K8S、Consul、Mesos 以及独立虚拟机。


以后哥都会实际为你展示,你唯一做的就是耐心等待和持续尖叫。


自此,笔者带你轻松完爆 Istio 的介绍,后续更多精彩,敬请期待。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【优化技术专题】「线程间的高性能消息框架」深入浅出Disru

发表于 2021-09-11

我正在参加中秋创意投稿大赛,详情请看:中秋创意投稿大赛

前提概要

简单回顾 jdk 里的队列:

阻塞队列:

ArrayBlockingQueue主要通过:数组(Object[])+ 计数器(count)+ ReetrantLock的Condition (notEmpty:非空、notFull:非饱和)进行阻塞。

入队操作:
  • 操作不阻塞:
    • add:添加失败,则会直接进行返回。
    • offer:添加失败后(满了)直接抛出异常,注意:offer(E o, long timeout, TimeUnit unit):可以设定等待的时间,如果在指定的时间内,还不能往队列中加入BlockingQueue,则返回失败。
  • 操作阻塞:
    • put:满了,通过Condition:notFull.await()阻塞当前数据信息,当出队和删除元素时唤醒 put 操作。
出队操作:
  • 操作不阻塞:
    • poll:当空时直接返回 null。poll(long timeout, TimeUnit unit):从BlockingQueue取出一个队首的对象,如果在指定时间内,队列一旦有数据可取,则立即返回队列中的数据。否则知道时间,超时还没有数据可取,返回失败。
    • remove:删除元素情况相关元素信息控制,InterruptException异常
  • 操作阻塞:
    • take:当空时,notEmpty.await()(当有元素入队时唤醒)。
  • drainTo():一次性从BlockingQueue获取所有可用的数据对象(还可以指定获取数据的个数),通过该方法,可以提升获取数据效率;不需要多次分批加锁或释放锁。

与ArrayBlockingQueue相对的是LinkedBlockingQueue:Node 实现、加锁(读锁、写锁分离)、可选的有界队列。需要考虑实际使用中的内存问题,防止溢出。

实际应用

线程池队列

Excutors 默认是使用 LinkedBlockingQueue,但是在实际应用中,更应该手动创建线程池使用有界队列,防止生产者生产过快,导致内存溢出。

延迟队列(ScheduleService也是采用了延时队列哦!):

DelayQueue : PriorityQueue (优先级队列) + Lock.condition (延迟等待) + leader (避免不必要的空等待)。

主要方法:
  • getDelay() 延迟时间。
  • compareTo() 通过该方法比较从PriorityQueue里取值。
入队:

与BlockingQueue很相似,add、put、offer:入队时会将换唤醒等待中的线程,进行一次出队处理。

出队:
  • 如果队列里无数据,元素入队时会被唤醒。
  • 如果队列里有数据,会阻塞至时间满足。
+ take-阻塞:
+ poll-**满足队列有数据并且 delay 时间小于0时候会取出元素,否则立即返回 null 可能会抢占成为 leader**。
应用场景:
  • 延时任务:设置任务延迟多久执行;需要设置过期值的处理,例如缓存过期。
  • 实现方式:每次 getDelay() 方法提供一个缓存创建时间与当前时间的差值,出队时 compareTo() 方法取差值最小的。每次入队时都会重新取出队列里差值最小的值进行处理。
  • 使用队列更多的是像生产者、消费者这种场景,这种场景大多数情况又对处理速度有着要求,所以我们会使用多线程技术。
  • 使用多线程就可能会出现并发,为了避免出错,我们会选择线程安全的队列。
+ ArrayBlockingQueue、LinkedBlockingQueue 或者是 ConcurrentLinkedQueue。前俩者是通过加锁取实现,后面一种是通过 cas 去实现线程安全。
+ 要考虑到生产者过快可能造出的内存溢出的问题,所以看起来 ArrayBlockingQueue 是最符合要求的。

但是因为加锁效率又会变慢,所以就引出了:Disruptor服务框架 !


Disruptor简介介绍

  • Disruptor的源码Git仓库地址:github.com/LMAX-Exchan…
  • Disruptor的概念定义:异步体系的线程间的高性能消息框架
  • Disruptor的核心思想:把多线程并发写的线程安全问题转化为线程本地写,即:不需要做同步,不许要进行加锁操作。

Disruptor优点介绍

  • 非常轻量,但性能却非常强悍,得益于其优秀的设计和对计算机底层原理的运用
    • 单线程每秒能处理超600W的数据(Disruptor能在1秒内将600W数据发送给消费者,现在的硬件水平会远远在这个水平之上了!)
  • 基于事件驱动模型,不用消费者主动拉取消息
  • 比JDK的ArrayBlockingQueue性能高一个数量级
为什么这么快
  • 无锁序号栅栏
  • 缓存行填充,消除伪共享
  • 内存预分配
  • 环形队列RingBuffer

Disruptor核心概念

  • RingBuffer(环形队列):基于数组的内存级别缓存,是创建sequencer(序号)与定义WaitStrategy(拒绝策略)的入口。
  • Disruptor(总体执行入口):对RingBuffer的封装,持有RingBuffer、消费者线程池Executor、消费之集合ConsumerRepository等引用。
  • Sequence(序号分配器):对RingBuffer中的元素进行序号标记,通过顺序递增的方式来管理进行交换的数据(事件/Event),一个Sequence可以跟踪标识某个事件的处理进度,同时还能消除伪共享。
  • Sequencer(数据传输器):Sequencer里面包含了Sequence,是Disruptor的核心,Sequencer有两个实现类:SingleProducerSequencer(单生产者实现)、MultiProducerSequencer(多生产者实现),Sequencer主要作用是实现生产者和消费者之间快速、正确传递数据的并发算法
  • SequenceBarrier(消费者屏障):用于控制RingBuffer的Producer和Consumer之间的平衡关系,并且决定了Consumer是否还有可处理的事件的逻辑。
  • WaitStrategy(消费者等待策略):决定了消费者如何等待生产者将Event生产进Disruptor,WaitStrategy有多种实现策略,分别是:
1. BlockingWaitStrategy(最稳定的策略):阻塞方式,效率较低,但对cpu消耗小,内部使用的是典型的锁和条件变量机制(java的ReentrantLock),来处理线程的唤醒,这个策略是disruptor等待策略中最慢的一种,但是是最保守使用消耗cpu的一种用法,并且在不同的部署环境下最能保持性能一致。 但是,随着我们可以根据部署的服务环境优化出额外的性能。
2. BusySpinWaitStrategy(性能最好的策略):自旋方式,无锁,BusySpinWaitStrategy是性能最高的等待策略,但是受部署环境的制约依赖也越强。 仅当event处理线程数少于物理核心数的时候才应该采用这种等待策略。 例如,超线程不可开启。
3. LiteBlockingWaitStrategy(几乎不用,最接近原生的策略机制):BlockingWaitStrategy的变体版本,目前感觉不建议使用
4. LiteTimeoutBlockingWaitStrategy:LiteBlockingWaitStrategy的超时版本
5. PhasedBackoffWaitStrategy(最低CPU配置的策略):自旋 + yield + 自定义策略,当吞吐量和低延迟不如CPU资源重要,CPU资源紧缺,可以使用此策略。
6. SleepingWaitStrategy:**自旋休眠方式(无锁),性能和BlockingWaitStrategy差不多,但是这个对生产者线程影响最小,它使用一个简单的loop繁忙等待循环,但是在循环体中间它调用了LockSupport.parkNanos(1)**。
    + 一般情况在linux系统这样会使得线程停顿大约60微秒。不过这样做的好处是,生产者线程不需要额外的累加计数器,也不需要产生条件变量信号量开销。
    + 负面影响是,在生产者线程与消费者线程之间传递event数据的延迟变高。所以SleepingWaitStrategy适合在不需要低延迟, 但需要很低的生产者线程影响的情形。一个典型的案例是异步日志记录功能。
7. TimeoutBlockingWaitStrategy:BlockingWaitStrategy的超时阻塞方式
8. YieldingWaitStrategy(充分进行实现CPU吞吐性能策略):自旋线程切换竞争方式(Thread.yield()),最快的方式,适用于低延时的系统,在要求极高性能且事件处理线数小于CPU逻辑核心数的场景中推荐使用此策略,它会充分使用压榨cpu来达到降低延迟的目标。
    + 通过不断的循环等待sequence去递增到合适的值。 在循环体内,调用Thread.yield()来允许其他的排队线程执行。 这是一种在需要极高性能并且event handler线程数少于cpu逻辑内核数的时候推荐使用的策略。
    + 这里说一下YieldingWaitStrategy使用要小心,不是特别要求性能的情况下,要谨慎使用,否则会引起服务起cpu飙升的情况,因为他的内部实现是在线程做100次递减然后Thread.yield(),可能会压榨cpu性能来换取速度。

注意:超线程是intel研发的一种cpu技术,可以使得一个核心提供两个逻辑线程,比如4核心超线程后有8个线程。


  • Event:从生产者到消费者过程中所处理的数据单元,Event由使用者自定义。
  • EventHandler:由用户自定义实现,就是我们写消费者逻辑的地方,代表了Disruptor中的一个消费者的接口。
  • EventProcessor:这是个事件处理器接口,实现了Runnable,处理主要事件循环,处理Event,拥有消费者的Sequence,这个接口有2个重要实现:
    • WorkProcessor:多线程处理实现,在多生产者多消费者模式下,确保每个sequence只被一个processor消费,在同一个WorkPool中,确保多个WorkProcessor不会消费同样的sequence
    • BatchEventProcessor:单线程批量处理实现,包含了Event loop有效的实现,并回调到了一个EventHandler接口的实现对象,这接口实际上是通过重写run方法,轮询获取数据对象,并把数据经过等待策略交给消费者去处理。

Disruptor整体架构

接下来我们来看一下 Disruptor 是如何做到无阻塞、多生产、多消费的。

  • 构建 Disruptor 的各个参数以及 ringBuffer 的构造:
    • EventFactory:创建事件(任务)的工厂类。
    • ringBufferSize:容器的长度。
    • Executor:消费者线程池,执行任务的线程。
    • ProductType:生产者类型:单生产者、多生产者。
    • WaitStrategy:等待策略。
    • RingBuffer:存放数据的容器。
    • EventHandler:事件处理器。

Disruptor使用方式

maven依赖:
1
2
3
4
5
xml复制代码<dependency>
<groupId>com.lmax</groupId>
<artifactId>disruptor</artifactId>
<version>3.4.2</version>
</dependency>
生产单消费简单模式案例:
Event数据模型
1
2
3
4
5
java复制代码import lombok.Data;
@Data
public class SampleEventModel {
private String data;
}
Event事件模型Factory工厂类
1
2
3
4
5
6
7
8
9
10
11
java复制代码import com.lmax.disruptor.EventFactory;
/**
* 消息对象生产工厂
*/
public class SampleEventModelFactory implements EventFactory<SampleEventModel> {
@Override
public SampleEventModel newInstance() {
//返回空的消息对象数据Event
return new SampleEventModel();
}
}
EventHandler处理器操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码import com.lmax.disruptor.EventHandler;
/**
* 消息事件处理器
*/
public class SampleEventHandler implements EventHandler<SampleEventModel> {
/**
* 事件驱动模式
*/
@Override
public void onEvent(SampleEventModel event, long sequence, boolean endOfBatch) throws Exception {
// do ...
System.out.println("消费者消费处理数据:" + event.getData());
}
}
EventProducer工厂生产者服务处理器操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码import com.lmax.disruptor.RingBuffer;
/**
* 消息发送
*/
public class SampleEventProducer {
private RingBuffer<SampleEventModel> ringBuffer;
public SampleEventProducer(RingBuffer<SampleEventModel> ringBuffer) {
this.ringBuffer = ringBuffer;
}
/**
* 发布数据信息
* @param data
*/
public void publish(String data){
//从ringBuffer获取可用sequence序号
long sequence = ringBuffer.next();
try {
//根据sequence获取sequence对应的Event
//这个Event是一个没有赋值具体数据的对象
TestEvent testEvent = ringBuffer.get(sequence);
testEvent.setData(data);
} finally {
//提交发布
ringBuffer.publish(sequence);
}
}
}
EventProducer工厂生产者服务处理器操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public class TestMain {
public static void main(String[] args) {
SampleEventModelFactory eventFactory = new SampleEventModelFactory();
int ringBufferSize = 1024 * 1024;
//这个线程池最好自定义
ExecutorService executor = Executors.newCachedThreadPool();
//实例化disruptor
Disruptor<SampleEventModel> disruptor = new Disruptor<SampleEventModel>(
eventFactory, //消息工厂
ringBufferSize, //ringBuffer容器最大容量长度
executor, //线程池,最好自定义一个
ProducerType.SINGLE, //单生产者模式
new BlockingWaitStrategy() //等待策略
);
//添加消费者监听 把TestEventHandler绑定到disruptor
disruptor.handleEventsWith(new SampleEventHandler());
//启动disruptor
disruptor.start();
//获取实际存储数据的容器RingBuffer
RingBuffer<SampleEventModel> ringBuffer = disruptor.getRingBuffer();
//生产发送数据
SampleEventProducer producer = new SampleEventProducer(ringBuffer);
for(long i = 0; i < 100; i ++){
producer.publish(i);
}
disruptor.shutdown();
executor.shutdown();
}
}

Disruptor的原理分析

  • 使用循环数组代替队列生产者消费者模型自然是离不开队列的,使用预先填充数据的方式来避免 GC;
  • 使用CPU缓存行填充的方式来避免极端情况下的数据争用导致的性能下降;
  • 多线程编程中尽量避免锁争用的编码技巧。

Disruptor为我们提供了一个思路和实践,基本的循环数组实现,定义一个数组,长度为2的次方幂。

循环数组

  • 设定一个数字标志表示当前的可用的位置(可以从0开始)。
+ 头标志位表示下一个可以插入的位置。
    - **头标志位不能大于尾标志位一个数组长度(因为这样就插入的位置和读取的位置就重叠了会导致数据丢失)。**
+ 尾标志位表示下一个可以读取的位置。
    - **尾标志位不能等于头标志位(因为这样读取的数据实际上是上一轮的旧数据) 预先填充提高性能,我们知道在java中如果创造大量的对象使用后弃用,JVM 会在适当的时候进行GC操作。**
  • 当这个数字标志不断增长到大于数组长度时进行与数组长度的并运算,得到的新数字依然在数组的长度范围内,就又可以插入。
  • 这样就好像一直插入直到数组末尾又再次从头开始,故而称之为循环数组。 一般的循环数组有头尾两个标志位。这点和队列很像。

循环数组(初始化数据信息)

在循环数组中,可以事先在数组中填充好数据。一旦有新数据的产生,要做的就是修改数组中某一位中的一些属性值。这样可以避免频繁创建数据和弃用数据导致的 GC。

这点比起队列是要好的。 只保留一个标志位,多线程在队列也好,循环数组也好,必然存在对标志位的竞争。无论是使用锁来避免竞争,还是使用 CAS 来进行无锁算法。

只要争用的情况存在,并且线程较多,都会出现对资源的不断消耗。争用的对象越多,争用中消耗掉的资源也就越多。

为了避免这样的情况,减少争用的资源就是一个手段。比如在循环数组中只保留一个标志位,也就是下一个可以写入数据位置的标志位。而尾部标志位则在各个消费者线程中保存(具体的编程手法后续细讲)。

循环数组在单线程

  • 循环数组在单线程中的使用,如果确定只有一个生产者,也就是说只有一个写线程。则在循环数组中的使用会更加简化。
  • 具体来说单线程更新数组上的标志位,那这种情况,标志位就无需采用CAS写的方式来确定下一个可写入的位置,直接就是在单线程内进行普通的更新即可。

循环数组在多线程

  • 循环数组在多线程中的使用,如果存在多个生产者,则可写入的标志位需要用CAS 算法来进行争夺,避免锁的使用。
  • 多个线程通过CAS得到唯一的不冲突的下一个可写序号,由于需要获得序号后才能进行写入,而写入完成才可以让消费者线程进行消费。
  • 所以才获得序号后,完成写入前,必须有一种方式让消费者检测是否完成。
  • 以避免消费者拿到还未填入输入的数组位。 为了达到这个目标,存在简单—效率低和复杂—效率高两种方式。

简单但是可能效率低的方式使用两个标志位。

  • prePut:表示下一个可以供生产者放入的位置;
+ **多个生产者通过 CAS 获得 prePut 的不同的值,在获得的序号并且完成数据写入后,将 put 的值以 CAS 方式递增(比如获得的序号是7,只有 put 是6的时候才允许设置成功),称之为发布。**
+ **这种方式存在一个缺点,如果多个线程并发写入,获取 prePut 的值不会堵塞,假设其中一个生产者在写入数据的时候稍慢,则其他的线程写入完毕也无法完成发布。就会导致循环等待,浪费了 CPU 性能。**
  • put:表示最后一个生产者已经放入的位置。
  • 复杂但是可能效率高的方式,在上面的方式中,主要的争夺环节集中在多线程发布中,序号大的线程发布需要等到序号小的线程发布完成后才能发布。那我们的优化的点也在这个地方。
  • 这样就可以避免发布的争夺。 但是又来带来一个问题,用什么数字来表示是否已经发布完成?如果只是0和1,那么写过1轮以后,标志数组位上就都是1了。又无法区分。
  • 所以标志数组上的数字应该在循环数组的每一轮循环的值都不同。

比如一开始都是-1,第一轮中是0的表示已发布,第二轮中是0表示没发布,是1的表示已发布。

缓存行填充

要了解缓存行填充消除伪共享,首先要了解什么是系统缓存行:

  • CPU 为了更快的执行代码。于是当从内存中读取数据时,并不是只读自己想要的部分。而是读取足够的字节来填入高速缓存行。根据不同的 CPU ,高速缓存行大小不同。如 X86 是 32BYTES ,而 ALPHA 是 64BYTES 。并且始终在第 32 个字节或第 64 个字节处对齐。这样,当 CPU 访问相邻的数据时,就不必每次都从内存中读取,提高了速度。 因为访问内存要比访问高速缓存用的时间多得多。
  • 这个缓存是CPU内部自己的缓存,内部的缓存单位是行,叫做缓存行。在多核环境下会出现CPU之间的内存同步问题(比如一个核加载了一份缓存,另外一个核也要用到同一份数据),如果每个核每次需要时都往内存中存取(一个在读缓存,一个在写缓存时,造成数据不一致),这会带来比较大的性能损耗。
  • 数据在缓存中不是以独立的项来存储的,如不是一个单独的变量,也不是一个单独的指针。缓存是由缓存行组成的,通常是64字节(译注:这篇文章发表时常用处理器的缓存行是64字节的,比较旧的处理器缓存行是32字节),并且它有效地引用主内存中的一块地址。一个Java的long类型是8字节,因此在一个缓存行中可以存8个long类型的变量。
  • 当数组中的一个值被加载到缓存中,它会额外加载另外7个。因此你能非常快地遍历这个数组。事实上,你可以非常快速的遍历在连续的内存块中分配的任意数据结构。
  • 因此如果你数据结构中的项在内存中不是彼此相邻的,你将得不到免费缓存加载所带来的优势。并且在这些数据结构中的每一个项都可能会出现缓存未命中。
  • 设想你的long类型的数据不是数组的一部分。设想它只是一个单独的变量。让我们称它为head,这么称呼它其实没有什么原因。然后再设想在你的类中有另一个变量紧挨着它。让我们直接称它为tail。现在,当你加载head到缓存的时候,你也免费加载了tail

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

JVM 阅读GC log,以及各种VM参数的使用

发表于 2021-09-10

前言

  • 要会阅读GC log,来理解各种垃圾集器。
  • 理解内存区域的作用以及调整JVM 内存大小。
  • 要会操作创建对象时使用哪个区域。

使用VM参数,阅读GC log

代码及VM参数,触发Minor GC

  • VM参数
1
2
3
4
5
java复制代码-Xms20M  // 堆的最大值
-Xmx20M // 堆的最小值
-Xmn10M // 年轻代大小 即 Eden 和 两个Servivor的总大小,那么20-10=10M就是老年代的大小
-XX:+PrintGCDetails // 打印GC 日志
-XX:SurvivorRatio=8 // 设置Eden所占年轻代的比率

image.png

  • 回顾一下内存划分
    image.png
  • 代码
1
2
3
4
5
6
7
8
9
10
11
java复制代码public class GCLogTest {
//什么都不写,年轻代耗费2032k内存,不同机器应该会不一样
public static void main(String[] args) throws ClassNotFoundException {
int size = 1024*1024;//M
byte[] bytes = new byte[4*size];//4M
byte[] bytes1 = new byte[3*size];//3M
byte[] bytes2 = new byte[3*size];//3M
System.out.println("GC log");
//内存中不止有创建的byte[],还有运行Java程序所必须使用的对象。
}
}

image.png

  • 解释日志含义
    image.png

前面明明设置了年轻代为 10M 为什么日志却是9216k?

  • 年轻代有一个Eden区和两个Serviver,即s0和s1。
  • 前面设置Eden区占8 ,那么s0和s1各占1。
  • 在使用时只会用s0和s1中的其中一个。另外一个一定是不使用的。所以就少了1024k。
  • 即9216+1024 = 10240k = 10M,就是堆的设置大小和实际大小会差1024k。被其中一个Serviver区浪费了。

image.png

观察到老年代使用了4104k,如何得来?

  • 可以通过GC前后的内存变化来计算。
  • 5976 - 840 = 5136k,表示年轻代释放了5136k容量。
  • 年轻代释放的内容可能去了老年代或者已经被抹除了。
  • 5976 - 4944 = 1032k,表示整个堆释放的空间为1032k。
  • 5136 - 1032 = 4104k,表示年轻代释放的内容中抹除了1032k,剩下的4104k去了老年代。

代码,触发Full GC

  • 代码
1
2
3
4
5
6
7
8
9
10
java复制代码public class GCLogTest {
//什么都不写年轻代,耗费2032k内存,不同机器应该会不一样
public static void main(String[] args) throws ClassNotFoundException {
int size = 1024*1024;//M
byte[] bytes = new byte[4*size];//4M
byte[] bytes1 = new byte[4*size];//4M
byte[] bytes2 = new byte[3*size];//3M
System.out.println("GC log");
}
}

image.png

  • 解释一下Full GC

image.png

创建对象比上面触发时Full GC 容量大,却没有触发Full GC

上面的代码是11M byte[],触发了Full GC

  • 创建13M byte[]
1
2
3
4
5
6
7
8
9
10
11
java复制代码public class GCLogTest {
//什么都不写年轻代,耗费2032k内存,不同机器应该会不一样
public static void main(String[] args) throws ClassNotFoundException {
int size = 1024*1024;//M
byte[] bytes = new byte[4*size];//4M
byte[] bytes1 = new byte[3*size];//3M
byte[] bytes2 = new byte[size];//1M
byte[] bytes3 = new byte[5*size];//5M
System.out.println("GC log");
}
}

image.png

  • 明明创建的内容比较大,为什么没有触发Full GC
    • 对象无法在新生代创建时,就直接在老年代创建。
      • 不要想着将对象拆分了,一个部分在新生代,另一部分在老年代,这是不可能的。
  • 如下的情况也是会直接在老年代创建
1
2
3
4
5
6
7
8
Java复制代码public class GCLogTest {
//什么都不写年轻代,耗费2032k内存,不同机器应该会不一样
public static void main(String[] args) throws ClassNotFoundException {
int size = 1024*1024;//M
byte[] bytes = new byte[8*size];//8M ,加上其它的,会超过新生代大小
System.out.println("GC log");
}
}

image.png

1
2
3
4
5
6
7
8
Java复制代码public class GCLogTest {
//什么都不写年轻代,耗费2032k内存,不同机器应该会不一样
public static void main(String[] args) throws ClassNotFoundException {
int size = 1024*1024;//M
byte[] bytes = new byte[11*size];//11M ,对象超过了新生代和老年代的大小
System.out.println("GC log");
}
}

image.png

阈值

在终端使用java -XX:+PrintCommandLineFlags,查看JVM启动参数。

简单聊一聊UseCompressedOops 和 UseCompressedClassPointers这两个JVM参数。

通过一个VM 参数设置会在老年代创建对象的阈值

  • 就是设置一个阈值,当创建的对象大于这个阈值时,不管新生代的空间够不够都去老年代创建。
  • 以下为本次VM参数
1
2
3
4
5
6
java复制代码-Xms20M
-Xmx20M
-Xmn10M
-XX:+PrintGCDetails
-XX:SurvivorRatio=8
-XX:PretenureSizeThreshold=4194304 //阈值为4M
  • 代码,创建的字节数组为5M
1
2
3
4
5
6
java复制代码public class ThresholdTest {
public static void main(String[] args) {
int size = 1024*1024;
byte[] bytes = new byte[5*size];
}
}

image.png

要想阈值起作用,垃圾收集器得是单线程的

  • 在原来的VM 参基础上启动单线程垃圾收集器
1
java复制代码-XX:+UseSerialGC

image.png

设置对象在Servivor区年龄的阈值

  • Minor GC 是发生在新生代中的垃圾收集动作,所采用的是复制算法。
  • 新生代几乎是所有 Java 对象出生的地方,新生代是 GC 收集垃圾的频繁区域。
  • 当对象在 Eden ( 包括一个 Survivor 区域,这里假设是 from 区域 ) 出生后,在经过一次 Minor GC 后,如果对象还存活,并且能够被另外一块 Survivor 区域所容纳( 上面已经假设为 from 区域,这里应为 to 区域,即 to 区域有足够的内存空间来存储 Eden 和 from 区域中存活的对象 ),则使用复制算法将这些 仍然还存活的对象复制到另外一块 Survivor 区域 ( 即 to 区域 ) 中,然后清理所使用过的 Eden 以及 Survivor 区域 ( 即 from 区域 ),并且将这些对象的年龄设置为1,以后对象在 Survivor 区每熬过一次 Minor GC,就将对象的年龄 + 1。
  • 当年龄到达一定值时对象如果还在Servivor区复制来复制去的,就会被移到老年代。
  • -XX:MaxTenuringThreshold:表示当年龄到达设置的最大值时会被移到老年代。
    • 该参数的默认值为15,CMS中默认值为6,G1中默认为15(在Jvw中,该数值是由4个bit来表示的,所以最大值 1111,即15)。
    • 是设置的最大值,对象有可能在年龄到达最大值时被移到老年代,但是年龄到达设置的最大阈值时一定会被移到老年代。
    • 使用 -XX:TargetSurvivorRatio=value JVM 会根据情况动态设置它的大小,但不会超过手动给它设置的值。
  • 经历了多次GC后,存活的对象会在From Survivor与To Survivor之间来回存放,而这里面的一个前提则是这两个空间有足够的大小来存放这些数据,
  • 在GC算法中,会计算每个对象年龄的大小,如果达到某个年龄后发现总大小已经大于了Survivor(其中一个)空间的50%,那么这时就需要调整阔值,不能再继续等到默认的15次GC后才完成普升(移到老年代),因为这样会导致Survivor空间不足,所以需要调整阈值,让这些存活对象尽快完成晋升。

演示一下,对象从年轻代以到老年代的过程

  • 以下为本次代码 VM 参数
1
2
3
4
5
6
7
8
9
10
Java复制代码-Xmx200M // 堆大小 200M
-Xmn50M // 新生代大小
-XX:TargetSurvivorRatio=60 // 当一个Servivor区中的对象大于 Servivor总大小的 60 % 时,
// 重新估计TenuringThreshold,即更新年龄为多少会进入老年代
-XX:+PrintTenuringDistribution // 打印对象年龄
-XX:+PrintGCDetails // 打印GC 日志
-XX:+PrintGCDateStamps // 打印 GC 时的时间
-XX:+UseConcMarkSweepGC // 老年代使用 CMS 垃圾收集器
-XX:+UseParNewGC // 新生代使用 ParNew收集器
-XX:MaxTenuringThreshold=3 // 年龄阈值,最大的

本次并没有指定Serivivor占多少

  • 代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
java复制代码public class TenuringTest {
public static void main(String[] args) throws InterruptedException {
byte[] bytes = new byte[512 * 1024];
byte[] bytes1 = new byte[512 * 1024];//main方法没有结束,就不会被抹除
for (int i = 1; i < 7; i++) {
willGC();
Thread.sleep(1000);
System.out.println("loop: 00" + i);
}
byte[] bytes2 = new byte[1024 * 1024];
byte[] bytes3 = new byte[1024 * 1024];
byte[] bytes4 = new byte[1024 * 1024];
byte[] bytes5 = new byte[1024 * 1024];
willGC();
System.out.println("loop: 007");
Thread.sleep(1000);
willGC();
System.out.println("loop: 008");
Thread.sleep(1000);
System.out.println("end...");
}

private static void willGC() {//调用完,创建的对象就可以消除了
for (int i = 0; i < 40; i++) {
byte[] bytes = new byte[1024 * 1024];// 1 M
}
}
}

image.png

(max 3) 表示动态判断确定的阈值最大值为3,由-XX:MaxTenuringThreshold=3决定。

Desired survivor size 3145728 bytes 怎么来的?

  • 因为没有指定Servivor的比例 所以默认8:1:1,-Xmn50M指定新生代大小为50M ,所以 40:5:5。
  • 又 -XX:TargetSurvivorRatio=60,表示其中一个Serivior区超过的60%重新判断 阈值(threshold), 即 5 * 60% = 3M = 3145728 bytes,
  • 也说在其中一个Serivior区对象大小到3145728 bytes 时,重新判断阈值。

小结

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码-Xms20M  // 堆的最大值
-Xmx20M // 堆的最小值
-Xmn10M // 年轻代大小 即 Eden 和 两个Servivor的总大小,那么20-10=10M就是老年代的大小
-XX:+PrintGCDetails // 打印GC 日志
-XX:SurvivorRatio=8 // 设置Eden所占年轻代的比率
-XX:+UseSerialGC // 使用单线程垃圾收集器,包括新生代与老年代
-XX:PretenureSizeThreshold=4194304 //创建对象的大小阈值为4M,超过就直接在老年代创建
-XX:TargetSurvivorRatio=60 // 当一个Servivor区中的对象大于 Servivor总大小的 60 % 时,
// 重新估计TenuringThreshold,即更新年龄为多少会进入老年代,
-XX:+PrintTenuringDistribution // 打印各年龄对象的大小
-XX:+PrintGCDetails // 打印GC 日志
-XX:+PrintGCDateStamps // 打印 GC 时的时间
-XX:+UseConcMarkSweepGC // 老年代使用 CMS 垃圾收集器
-XX:+UseParNewGC // 新生代使用 ParNew收集器
-XX:MaxTenuringThreshold=3 // 年龄阈值,最大的

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

大型互联网架构演化简史

发表于 2021-09-10

对于一个大型网站,主要有以下几个特征:

  1. 支撑海量数据
  2. 非常高的访问量

我们常见的大型网站,如百度、淘宝、京东等,都是一个分布式系统。这么复杂的系统也不是一天建成的,每个系统都经历了漫长的演变过程。

架构演变

在大型网站中,其最核心的功能就是 计算 和 存储。因此系统演变过程也主要围绕这两点进行。

1 单机系统

在网站刚刚起步时,数据量、访问量都非常小,通常情况下,只需一台应用服务器就可以了。

1.1 单机部署方案

起步时,我们把所有资源全部打包到部署文件中(如 XXX.war),其中包括

  1. class 文件、依赖 jar等;
  2. js、css、图片等静态资源;
  3. 对于用户上传文件的场景,直接在服务器上新建一个目录,将上传的文件放置在目录即可。

然后,将打好的发布包放到 Web 容器中,比如 Tomcat,最后启动容器,让其直接对外提供服务。

图片

该部署策略有以下几个特征:

  1. 用户通过浏览器直接与 Java 应用程序进行交互(通常是 Tomcat);
  2. Java 应用程序通过 JDBC 与本机的数据库进行交互(如 MySQL);
  3. 如果存在文件读写的需求,Java 应用程序通过文件接口直接对文件进行操作。

这时,有人会问,Java 应用程序直接对外,会不会存在一些安全或性能方面的问题呢?

是的,Tomcat 这种 Web 容器对链接的保持能力比较弱,当存在大量链接时,性能下降很快。同时,Tomcat 并不擅长静态资源的处理,对此,我们可以引入 Nginx,以缓解 Tomcat 的压力。

1.2 单机部署方案进阶

我们在单机部署基础上,添加 Nginx,也就有了进阶方案。

图片

该方案存在以下特征:

  1. 用户不在直接与 Java 应用程序进行交互,而是与 Nginx 进行交互;
  2. Tomcat 挂在 Nginx 后,对动态请求进行处理;
  3. 对于静态资源的访问,通过 Nginx 直接访问文件系统;
  4. 当有文件写需求时,通过 Java 应用程序直接写入磁盘。

此时,架构显得清晰很多,但我们发现一个问题,就是系统对静态资源和动态资源的处理是完全不同的。

对于静态资源的处理,相对简单,只是简单的文件读写。而,动态请求(也就是我们的业务承载者)会随着业务的发展越来越复杂。

2 动静分离部署方案

由于 静态请求 与 动态请求 采用不同的处理策略,我们可以将其进行分离。

图片

该部署方案存在以下特性:

  1. 通过不同的域名对 动态请求 和 静态请求 进行分离;
  2. 新增 静态资源服务器,专门处理静态请求,并在服务器上部署 Java 应用程序,处理文件写需求;Nginx 只负责文件的读操作;
  3. 对 动态请求 进行独立部署,应用程序将文件的写请求转发到静态服务器进行处理;

静态资源服务器功能单一,部署繁琐,有没有一种更好的策略呢?

答案就是云服务,比如阿里云的 OSS 提供静态资源存储服务。CDN 提供访问加速服务,两者结合使用,就得到了一个海量容量并且性能超强的静态资源服务器(集群)。

结合 OSS 和 CDN,静态请求不会成为系统的瓶颈,因此,接下来只对动态请求进行讨论。

随着系统访问量的增加,动态请求出现了明显的瓶颈。

3 应用集群化部署

由于所有的动态请求全部由一台应用服务器进行处理,当访问量上升时,这台服务就成了系统的瓶颈。此时,我们需要将系统中的多个组件部署到不同的服务器上。

图片

新部署有以下特征:

  1. 对 Nginx 进行独立部署,形成 Web 集群;
  2. 对 Java 应用程序进行独立部署,形成 应用集群;
  3. 对 数据库 进行独立部署;
  4. Web 集群 与 应用集群 间通过 HTTP 协议进行交互;
  5. 应用集群 与 数据库 间通过 JDBC 协议进行交互。

应用集群化,会面临很多挑战,主要的焦点是如何有效的分配用户请求。

3.1 DNS 轮询

首先要解决的问题便是,用户如何将请求发送到不同的 Nginx 中,最常见的方式便是 DNS 轮询。

大多域名注册商都支持多条 A 记录的解析,其实这就是 DNS 轮询,DNS 服务器将解析请求按照 A 记录的顺序,逐一分配到不同的 IP 上,这样就完成了简单的负载均衡。

3.2 负载均衡器

这里的负载均衡器主要指的是 Nginx 的反向代理功能。当用户请求发送到 Nginx 后,Nginx 需要决定将请求转发到哪台应用服务器上。

反向代理(Reverse Proxy)是指以代理服务器来接受 internet 上的连接请求,然后将请求转发给内部网络上的服务器,并将从服务器上得到的结果返回给 internet 上请求连接的客户端,此时代理服务器对外就表现为一个反向代理服务器。

Nginx 对于后台服务器配置比较灵活,可以同时配置多台服务器,并根据负载策略将请求分发给后台服务器。

3.3 会话问题

在单机时代,我们的请求只会发送到同一台机器上,不存在会话问题。当将应用集群部署时,用户的多次请求会发送到不同的应用服务器上。此时,如何对会话进行同步便是棘手问题。

3.3.1 Session Sticky

这种方案主要由 Nginx 处理,让同样 session 请求每次都发送到同一台服务器进行处理。

图片

Nginx 会将相同用户的请求发送到同一台应用服务器中。

这是最简单的策略,但存在一定的问题:

  1. Web 服务器重启 Session 丢失;
  2. 负载均衡需要进行应用层解析(第7层),性能损耗较大;
  3. 负载均衡器变为一个有状态的点,不易容灾;
3.3.2 Session Replication

会话问题的根源在于 Session 由多个应用维护,我们可以使用某种机制,在多台 Web 服务间进行 Session 的数据同步。

图片

由 Session 同步器在各个 Java 应用程序间完成 Session 的同步,最终使每个服务器中都存在所有用户的 Session 数据。

这个方案的问题:

  1. 造成网络开销;
  2. 每台 Web 服务器都保存所有的 Session,内存开销大;
3.3.3 集中式Session

我们可以将 Session 从 Web 服务中抽取出来,并对其进行集中存储。

图片

将 Session 信息保存到 Session 存储集群中,Java 应用程序不在负责 Session 的存储。

这个方案的问题:

  1. 读取Session引入了网络开销;
  2. 存储设施问题影响应用;
3.3.4 Cookie Based Session

还可以将 session 数据放在 cookie 中,然后在 Web 服务器上从 cookie 中生成对应的 Session 数据。

图片

将 Session 数据编码到 Cookie 中,每次 Java 应用程序使用 Session 时,都从 Cookie 中重建 Session。

该方案的问题:

  1. 受到 Cookie 大小的限制;
  2. 存在安全性问题;
  3. 每次都携带巨大的 Cookie,带宽消耗严重;
  4. 每次都进行 Session 数据恢复,加大应用服务器的负担;

随着系统访问量的持续增加,面对大量的数据读取请求,数据库有些不堪重负。

此时,我们需要对数据库进行优化。

4 数据库读写分离

通常情况下,数据库的读会先成为系统的瓶颈。对此,我们可以使用数据库主从机制,通过添加多个从库来减缓读压力。

图片

与之前部署相比,该架构只是为数据库增加了若干个从库:

  1. 对数据库实施 主从 部署策略;
  2. 对于数据的写请求,只能在 主库 上进行;
  3. 对于数据的读请求,可以在任意的 从库 上进行;
  4. 主库与从库间,通过 数据库同步策略 进行数据同步。

由于主库与从库间的数据同步需要时间,会出现数据不一致的情况,这块是业务上需要慎重考虑的一点。

随着业务越来越复杂,对功能和性能的要求也越来越高,最常见的便是数据库 like 语句性能已经无法满足需求;对于某些热点数据的访问,其性能也下降很快。

此时,我们需要引入其他组件来有针对性的解决问题。

5 引入搜索和缓存

针对数据库的 like 语句,通常情况下,是通过引入搜索引擎来解决;而热点数据的访问加速,是通过引入缓存服务来解决。

图片

该架构的特征如下:

  1. 添加 搜索集群,用以提升数据检索性能;
  2. 添加 缓存集群,用以提升热点数据访问性能。

在对数据查询进行优化后,慢慢的系统的写性能成为了瓶颈。

此时,需要对数据的写性能进行扩展。

6 数据库分库分表

随着数据量的增长,写请求量的增加,数据库的写入逐渐成为了瓶颈。常规的写性能优化便是对数据库进行分库分表。

图片

6.1 垂直拆分

将不同的业务数据放到不同的数据库实例中。

6.2 水平切分

把同一个表中的数据拆分到多的数据库中。

随着研发团队的规模越来越多,大家同时在一个项目中进行开发,导致频繁的冲突和相互影响。

此时,会将整个应用程序根据功能模块进行拆分,从而形成多个子网站或子频道。

7 应用垂直拆分

面对一个巨无霸式的应用,就像面对一团毛线团,总有一种无法下手的感觉。对此,可以将其进行拆分,将其拆分为多个应用,每个应用独立开发、独立部署、独立维护。

图片

该部署方案更加灵活,大大降低维护成本。

  1. 通过不同的域名或 URL 将整个系统分解为多个子系统;
  2. 用户通过浏览器将各子系统拼接成一个完整的系统;
  3. 各系统间存在少量交互,甚至没有交互;

问题慢慢展现出来,系统间公共部分没有统一维护点,同样的功能、同样的代码分布在各个系统中。

当然,我们可以通过发布 jar 包的方式,共享功能代码;但当 jar 升级时,就需要所有的子系统同步升级,运维开销巨大。此时,我们需要引入服务化架构。

8 服务化架构

我们可以将通用功能封装成一个服务,独立开发、独立部署、独立维护。

图片

在该方案中,我们将业务逻辑进行了进一步拆分:

  1. 整理各个系统间通用业务功能,将其封装为服务,以承载核心业务逻辑,构建成 服务集群;
  2. 原来的子系统或子频道,变成薄薄的一层,不承载核心业务,只是根据业务流程对业务服务进行编排;
  3. 应用服务与业务服务间通过 HTTP 或 其他协议进行通信,常见的包括 Dubbo、Thrift等。

服务化解决了系统之间的直接调用问题,也就是常说的 RPC,整个系统的协调点全部由应用服务完成。这种架构适用于多种场景,但在一些需要异步处理的极端场景就显得有心无力了。

此时,我们需要引入消息中间件。

9 引入消息队列

服务化解决了直接调用问题,对于异步调用,最常见的便是消息中间件。

图片

相比之前的架构,变化很小,只是在各个业务服务间添加了另外的一种调用方式。

10 小结

冰冻三尺非一日之寒,一个大型系统的构建也不是一朝一夕的事情。我们需要根据业务情况、数据量情况、请求量情况对系统进行合理规划。

切记,架构不是越复杂越好,而是“适合自己的便是最好的”。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

安全系列之 跨域资源共享CORS 简介 CORS举例 COR

发表于 2021-09-10

简介

什么是跨域资源共享呢? 我们知道一个域是由scheme、domain和port三部分来组成的,这三个部分可以唯一标记一个域,或者一个服务器请求的地址。跨域资源共享的意思就是服务器允许其他的域来访问它自己域的资源。

CORS是一个基于HTTP-header检测的机制,本文将会详细对其进行说明。

CORS举例

为了安全起见,一般一个域发起的请求只能获取该域自己的资源,因为域资源内部的互相调用被认为是安全的。

但是随着现代浏览器技术和ajax技术的发展,渐渐的出现了从javascript中去请求其他域资源的需求,我们把这样的需求叫做跨域请求。

比如说客户端从域www.flydean.com向域http://www.abc.co…

那么客户端是怎么知道服务器是否支持CORS的呢?

这里会使用到一个叫做preflight的请求,这个请求只是向服务器确认是否支持要访问资源的跨域请求,当客户端得到响应之后,才会真正的去请求服务器中的跨域资源。

虽然是客户端去设置HTTP请求的header来进行CORS请求,但是服务端也需要进行一些设置来保证能够响应客户端的请求。所以本文同时适合前端开发者和后端开发者。

CORS protocol

没错,任意一种请求要想标准化,那么必须制定标准的协议,CORS也一样,CORS protocol主要定义了HTTP中的请求头和响应头。我们分别来详细了解。

HTTP request headers

首先是HTTP的请求头。请求头是客户端请求资源时所带的数据。CORS请求头主要包含三部分。

第一部分是Origin,表示发起跨域资源请求的request或者preflight request源:

1
makefile复制代码Origin: <origin>

Origin只包含server name信息,并不包含任何PATH信息。

注意,Origin的值可能为null

第二部分是Access-Control-Request-Method,这是一个preflight request,告诉服务器下一次真正会使用的HTTP资源请求方法:

1
sql复制代码Access-Control-Request-Method: <method>

第三部分是Access-Control-Request-Headers,同样也是一个preflight request,告诉服务器下一次真正使用的HTTP请求中要带的header数据。header中的数据是和server端的Access-Control-Allow-Headers相对应的。

1
xml复制代码Access-Control-Request-Headers: <field-name>[, <field-name>]*

HTTP response headers

有了客户端的请求,还需要服务器端的响应,我们看下服务器端都需要设置那些HTTP header数据。

  1. Access-Control-Allow-Origin

Access-Control-Allow-Origin表示服务器允许的CORS的域,可以指定特定的域,也可以使用*表示接收所有的域。

1
makefile复制代码Access-Control-Allow-Origin: <origin> | *

要注意的是,如果请求带有认证信息,则不能使用*。

我们看一个例子:

1
2
makefile复制代码Access-Control-Allow-Origin: http://www.flydean.com
Vary: Origin

上面例子表示服务器允许接收来自www.flydean.com的请求,这里指定了具体的某一个域,而不是使用*。因为服务器端可以设置一个允许的域列表,所以这里返回的只是其中的一个域地址,所以还需要在下面加上一个Vary:Origin头信息,表示Access-Control-Allow-Origin会随客户端请求头中的Origin信息自动发送变化。

  1. Access-Control-Expose-Headers

Access-Control-Expose-Headers表示服务器端允许客户端或者CORS资源的同时能够访问到的header信息。其格式如下:

1
xml复制代码Access-Control-Expose-Headers: <header-name>[, <header-name>]*

例如:

1
vbnet复制代码Access-Control-Expose-Headers: Custom-Header1, Custom-Header2

上面的例子将向客户端暴露Custom-Header1, Custom-Header2两个header,客户端可以获取到这两个header的值。

  1. Access-Control-Max-Age

Access-Control-Max-Age表示preflight request的请求结果将会被缓存多久,其格式如下:

1
makefile复制代码Access-Control-Max-Age: <delta-seconds>

delta-seconds是以秒为单位。

  1. Access-Control-Allow-Credentials

这个字段用来表示服务器端是否接受客户端带有credentials字段的请求。如果用在preflight请求中,则表示后续的真实请求是否支持credentials,其格式如下:

1
yaml复制代码Access-Control-Allow-Credentials: true
  1. Access-Control-Allow-Methods

这个字段表示访问资源允许的方法,主要用在preflight request中。其格式如下:

1
sql复制代码Access-Control-Allow-Methods: <method>[, <method>]*
  1. Access-Control-Allow-Headers

用在preflight request中,表示真正能够被用来做请求的header字段,其格式如下:

1
xml复制代码Access-Control-Allow-Headers: <header-name>[, <header-name>]*

有了CORS协议的基本概念之后,我们就可以开始使用CORS来构建跨域资源访问了。

基本CORS

先来看一个最基本的CORS请求,比如现在我们的网站是www.flydean.com,在该网站中的某个页面中,我们希望获取到https://google.com…

1
2
3
4
5
6
ini复制代码const xhr = new XMLHttpRequest();
const url = 'https://google.com/data/dataA';

xhr.open('GET', url);
xhr.onreadystatechange = someHandler;
xhr.send();

该请求是一个最基本的CORS请求,我们看下客户端发送的请求包含哪些数据:

1
2
3
4
5
6
7
8
makefile复制代码GET /data/dataA HTTP/1.1
Host: google.com
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:71.0) Gecko/20100101 Firefox/71.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: en-us,en;q=0.5
Accept-Encoding: gzip,deflate
Connection: keep-alive
Origin: http://www.flydean.com

这个请求跟CORS有关的就是Origin,表示请求的源域是www.flydean.com。

可能的返回结果如下:

1
2
3
4
5
6
7
8
9
10
yaml复制代码HTTP/1.1 200 OK
Date: Mon, 01 May 2021 00:23:53 GMT
Server: Apache/2
Access-Control-Allow-Origin: *
Keep-Alive: timeout=2, max=100
Connection: Keep-Alive
Transfer-Encoding: chunked
Content-Type: application/xml

[…Data…]

上面的返回结果要注意的是Access-Control-Allow-Origin: *,表示服务器允许所有的Origin请求。

Preflighted requests

上面的例子是一个最基本的请求,客户端直接向服务器端请求资源。接下来我们看一个Preflighted requests的例子,Preflighted requests的请求分两部分,第一部分是请求判断,第二部分才是真正的请求。

注意,GET请求是不会发送preflighted的。

什么时候会发送Preflighted requests呢?

当客户端发送OPTIONS方法给服务器的时候,为了安全起见,因为服务器并不一定能够接受这些OPTIONS的方法,所以客户端需要首先发送一个
preflighted requests,等待服务器响应,等服务器确认之后,再发送真实的请求。我们举一个例子。

1
2
3
4
5
6
ini复制代码const xhr = new XMLHttpRequest();
xhr.open('POST', 'https://google.com/data/dataA');flydean
xhr.setRequestHeader('cust-head', 'www.flydean.com');
xhr.setRequestHeader('Content-Type', 'application/xml');
xhr.onreadystatechange = handler;
xhr.send('<site>www.flydean.com</site>');

上例中,我们向服务器端发送了一个POST请求,在这个请求中我们添加了一个自定义的header:cust-head。因为这个header并不是HTTP1.1中标准的header,所以需要发送一个Preflighted requests先。

1
2
3
4
5
6
7
8
9
10
makefile复制代码OPTIONS /data/dataA HTTP/1.1
Host: google.com
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:71.0) Gecko/20100101 Firefox/71.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: en-us,en;q=0.5
Accept-Encoding: gzip,deflate
Connection: keep-alive
Origin: http://www.flydean.com
Access-Control-Request-Method: POST
Access-Control-Request-Headers: cust-head, Content-Type

请求中添加了Access-Control-Request-Method和Access-Control-Request-Headers这两个多出来的字段。

得到的服务器响应如下:

1
2
3
4
5
6
7
8
9
10
yaml复制代码HTTP/1.1 204 No Content
Date: Mon, 01 May 2021 01:15:39 GMT
Server: Apache/2
Access-Control-Allow-Origin: http://www.flydean.com
Access-Control-Allow-Methods: POST, GET, OPTIONS
Access-Control-Allow-Headers: cust-head, Content-Type
Access-Control-Max-Age: 86400
Vary: Accept-Encoding, Origin
Keep-Alive: timeout=2, max=100
Connection: Keep-Alive

响应中返回了Access-Control-Allow-Origin,Access-Control-Allow-Methods和Access-Control-Allow-Headers。

当客户端收到服务器的响应之后,发现配后续的请求,就可以继续发送真实的请求了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
yaml复制代码POST /data/dataA HTTP/1.1
Host: google.com
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:71.0) Gecko/20100101 Firefox/71.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: en-us,en;q=0.5
Accept-Encoding: gzip,deflate
Connection: keep-alive
cust-head: www.flydean.com
Content-Type: text/xml; charset=UTF-8
Referer: http://www.flydean.com/index.html
Content-Length: 55
Origin: http://www.flydean.com
Pragma: no-cache
Cache-Control: no-cache

<site>www.flydean.com</site>

在真实的请求中,我们不需要再发送Access-Control-Request*头标记了,只需要发送真实的请求数据即可。

最后,我们得到server端的响应:

1
2
3
4
5
6
7
8
9
10
11
12
yaml复制代码HTTP/1.1 200 OK
Date: Mon, 01 May 2021 01:15:40 GMT
Server: Apache/2
Access-Control-Allow-Origin: http://www.flydean.com
Vary: Accept-Encoding, Origin
Content-Encoding: gzip
Content-Length: 235
Keep-Alive: timeout=2, max=99
Connection: Keep-Alive
Content-Type: text/plain

[Some data]

带认证的请求

有时候,我们需要访问的资源需要带认证信息,这些认证信息是通过HTTP cookies来进行传输的,但是对于浏览器来说,默认情况下是不会进行认证的。要想进行认证,必须设置特定的标记:

1
2
3
4
5
6
7
8
9
10
11
ini复制代码const invocation = new XMLHttpRequest();
const url = 'https://google.com/data/dataA';

function corscall() {
if (invocation) {
invocation.open('GET', url, true);
invocation.withCredentials = true;
invocation.onreadystatechange = handler;
invocation.send();
}
}

上面的例子中,我们设置了withCredentials flag,表示这是一个带认证的请求。

其对应的请求如下:

1
2
3
4
5
6
7
8
9
10
makefile复制代码GET data/dataA HTTP/1.1
Host: google.com
User-Agent: Mozilla/5.0 (Macintosh; Intel Mac OS X 10.14; rv:71.0) Gecko/20100101 Firefox/71.0
Accept: text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8
Accept-Language: en-us,en;q=0.5
Accept-Encoding: gzip,deflate
Connection: keep-alive
Referer: http://www.flydean.com/index.html
Origin: http://www.flydean.com
Cookie: name=flydean

请求中我们带上了Cookie,服务器对应的响应如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
yaml复制代码HTTP/1.1 200 OK
Date: Mon, 01 May 2021 01:34:52 GMT
Server: Apache/2
Access-Control-Allow-Origin: http://www.flydean.com
Access-Control-Allow-Credentials: true
Cache-Control: no-cache
Pragma: no-cache
Set-Cookie: name=flydean; expires=Wed, 31-May-2021 01:34:53 GMT
Vary: Accept-Encoding, Origin
Content-Encoding: gzip
Content-Length: 106
Keep-Alive: timeout=2, max=100
Connection: Keep-Alive
Content-Type: text/plain

[text/plain payload]

服务器返回了Access-Control-Allow-Credentials: true,表示服务器接收credentials认证,并且返回了Set-Cookie选项对客户端的cookie进行更新。

要注意的是如果服务器支持credentials,那么返回的Access-Control-Allow-Origin,Access-Control-Allow-Headers和Access-Control-Allow-Methods的值都不能是*。

总结

本文简单介绍了HTTP协议中的CORS协议,要注意的是CORS实际上是HTTP请求头和响应头之间的交互。

本文已收录于 www.flydean.com/cors/

最通俗的解读,最深刻的干货,最简洁的教程,众多你不知道的小技巧等你来发现!

欢迎关注我的公众号:「程序那些事」,懂技术,更懂你!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

火山引擎流批数据质量解决方案和最佳实践

发表于 2021-09-10

火山引擎的数据质量平台是在多年服务字节跳动今日头条、抖音等业务的过程中打磨出来的。面对今日头条、抖音等不同产品线的复杂数据质量场景,数据质量平台如何满足多样的需求?

本文将介绍火山引擎数据质量平台是如何弥合大数据场景下数据质量校验与计算消耗资源大、校验计算时间长的冲突,并介绍数据质量平台是如何用一套架构框架来满足流批方面的数据质量监控。

什么是数据质量

广义上来说,数据质量的定义是数据满足一组固有特性(质量维度)要求的程度。业界通常有 6 个维度:

  • 完整性:指数据的记录和信息是否完整,是否存在缺失的情况。数据缺失主要包括记录的缺失和记录中某个字段信息的缺失,两者都会造成统计结果不准确,所以说完整性是数据质量最基础的保障。在做监控时,需要考虑两个方面:数据条数是否少了;某些字段的取值是否缺失。完整性的监控,多出现在日志级别的监控上,一般会在数据接入的时候来做数据完整性校验。
  • 准确性:指数据中记录的信息和数据是否准确,是否存在异常或者错误。一般准确性的监控多集中在对业务结果数据的监控,比如每日的活跃、收入等数据是否正常。
  • 一致性:指同一指标在不同地方的结果是否一致。数据不一致的情况,多出现在数据系统达到一定的复杂度后,同一指标会在多处进行计算,由于计算口径或者开发人员的不同,容易造成同一指标出现不同的结果。
  • 及时性:在确保数据的完整性、准确性和一致性后,接下来就要保障数据能够及时产出,这样才能体现数据的价值。及时性很容易理解,主要就是数据计算出来的速度是否够快,这点在数据质量监控中可以体现在监控结果数据是否在指定时间点前计算完成。
  • 规范性:指数据是否按照要求的规则进行存储,如邮箱校验、IP 地址校验、电话格式校验等,具有一定的语义意义。
  • 唯一性:指数据是否有重复,如字段的唯一值、字段的重复值等。

我们对数据质量有一些流程和规范,并针对上述一些维度开发了一套数据质量平台,主要关注数据质量及其生产链路。

上图展示了在数据开发的流程中,数据质量平台可以提供哪些功能:

  • 数据探查:可以根据各种维度来查看数据明细和分布情况。
  • 数据对比:开发同学可能经常会发现线上表和测试表不一致,所以我们在任务上线的环节提供了数据对比的功能。
  • 任务监控:监控线上数据,提供报警和熔断功能。

数据质量平台最有代表性的功能是:对数据开发平台产出的 Hive 表数据进行主键重复检测,如果存在重复则进行报警。

数据质量监控最有用的场景是防止数据问题蔓延到下游。举个例子:数据任务产出一张 Hive 表,该表可能会同步一些信息到 Hive metastore(HMS)。HMS 的主从架构可能存在一定的延迟,假设 HMS 出现问题,下游任务可能会读到脏数据,这时如果我们使用数据质量监控,就能及时发现问题,阻止下游任务运行。

数据质量挑战

目前我们的数据质量挑战有哪些?可以通过几个用户 case 了解一下。

User Story 1

某流量级产品商业化系统,M 级日志条数/秒;希望秒级监控日志延迟、关键字段空值,T+1 检测日志波动率。

User Story 2

某内部业务系统,日志存储 ES;希望每 5 分钟检测上一周期日志波动情况。

User Story 3

某内部指标平台,业务数据由 Hive 定期同步到 ClickHouse;希望每次同步任务后检查 Hive 与 ClickHouse 中的指标是否一致。

通过上面的介绍,大家应该也大致清楚了当前数据质量需要解决的问题。可能有些同学会说,数据质量平台我也做过,问题归总起来也不复杂,总而言之就是对数据进行各种计算,对比计算来的阈值即可,一般直接依赖于 Spark 引擎或者 Hive 引擎计算即可。确实,其实这也是我们数据质量最开始的样子。那为什么会演化到目前这样,我们面临了一些什么问题?

首先是场景需求非常复杂:

  1. 离线监控不再多说了,大家都熟悉,主要是不同存储的数据质量监控,比如 Hive 或者 ClickHouse 。
  2. 字节跳动内部的广告系统对时效性和准确性要求很高,用广告同学的话说,如果用微批系统 10 min 才做一次检测,可能线上损失就上百万了甚至千万了。所以广告系统同学对实时性要求相对较高。
  3. 另外一个是复杂拓扑情况下的流式延迟监控。
  4. 最后是微批,指一段时间内的定时调度,有些 Kafka 导入 ES 的流式场景,需要每隔几分钟对比下前一周期。

此外,字节跳动各种产品会产出海量的日志数据,我们需要用有限的资源来满足大家对质量监控的需求。

面临这些挑战,我们的解决方案是什么?

流批数据质量解决方案

产品功能架构

火山引擎流批数据质量解决方案有 4 个大的功能:

  • 离线数据质量监控:解决批和微批监控场景,支持 Hive、ClickHouse、ES 等多种数据源,并有字段、唯一性等多种监控维度,允许通过 SQL 自定义维度聚合进行监控。
  • 流式数据质量监控:解决流式监控场景,支持 Kafka/BMQ 等数据源。
  • 数据探查:解决数据开发之前对数据内容存疑问题,支持 Hive 数据源。
  • 数据对比:解决新旧表数据一致性问题,支持 Hive/Hive SQL 数据源。

系统架构

上图是数据质量平台的系统架构图,主要分为 5 个部分:

  • Scheduler:外部调度器,触发离线监控。主要分两种类型:
    • 对外提供 API 调用任务;
    • 定时调度,通过 calljob 调用数据。
  • Backend:后端服务,偏服务层,处理业务逻辑。主要负责:
    • 质量平台和外部的交互,所有 API 响应都是通过这一层进行;
    • 任务提交:用户在质量平台配置的规则会放到业务存储,Scheduler 被调用后,Backend 会将任务相关的参数配置进行任务提交;
    • 获取质量监控的结果并进行判断,然后和外部系统进行交互,在需要时发送警报通知用户。
  • Executor:平台核心的任务执行模块,集成了一些引擎,例如数据探查使用 OLAP 引擎。质量监控部分使用 Griffin 的 Measure 进行数据统计。
  • Monitor:是一个相对独立的模块,主要进行状态服务的流转,提供重复报警等功能。
  • Alert Center:质量平台强依赖于该平台。它是外部报警服务,接收各种报警事件。

离线数据检测流程

下面看一下离线数据的检测流程。

离线数据的监控、探查、对比的执行流程一致,主要分为 4 步:

  1. 监控触发:调度系统调用质量模块 Backend API;
  2. 作业提交:Backend 以 Cluster 模式提交 Spark 作业至 Yarn;
  3. 结果回传:作业结束 (成功、失败),Driver 将结果 sync 至 Backend;
  4. 消息触发:Backend 根据结果触发相应动作 (例如:报警、消息提示)。

我们总结了一下数据质量平台的优势:

  • 调度系统低耦合:数据质量平台没有和调度系统强绑定,一般可以用业务系统的 API 实现互相调用。
  • 事件触发高效,Backend 水平扩展能力强:Backend 是无状态的实例服务,如果质量监控的业务系统较多,Backend 可以采用水平扩展的方式部署,接收请求并提交作业。
  • 没有 Quota 限制:平台本身没有维护数据质量监控单独需要的资源队列,而是把这个权限开放给用户,用他们自身的资源做资源监控。这样就把 Quota 问题转换成了用户资源问题。

当然任何一个工具都不可能是完美的,数据质量平台暂时还有一些待提升的地方:

  • 非 CPU 密集型查询较重:整个平台的设计是以任务提交的方式完成离线场景的需求。但是后来我们发现其实不需要启动 Spark 的作业仍然会启动一个 Spark 作业,如 ES SQL 查询,这个查询是很重的。
  • 依赖 Yarn 做调度稳定性不高:平台上的任务在资源不充足或被挤占的情况下,会出现任务运行或调用很慢。

流式监控执行

对于流式数据的监控,我们选择了 Flink 引擎,因为流式数据不同于离线数据,不能用快照的方式低成本拿到过程。所以我们要依赖一些外部的时序数据库再加规则引擎来展示对数据的监控。

平台上流式数据监控的流程为:

  1. 根据规则定义,创建 Flink 作业;
  2. 根据报警条件,注册 Bosun 报警事件;
  3. Flink 作业消费 Kafka 数据,计算监控指标写 Metrics;
  4. Bosun 基于 Metrics 的时序数据,定时检测,触发报警;
  5. Backend 接收报警回调,处理报警发送逻辑。

下面着重介绍两个模块的实现。

Executor 实现

Executor 是基于 Apache Griffin 的 Measure 模块改造的一个 Spark Application。功能包括:

  • 适配数据源
  • 数据转化为 DataFrame
  • 规则转化为 SQL 操作
  • 计算结果

Executor 的选型有以下几方面的考虑:

  • 扩展性要足够强,能够适配不同的数据源,如 Hive,MySQL 等等
  • 计算性能要较强
  • 支持的监控类型种类需要足够多

考虑到以上方面的信息,我们选用了 Apache Griffin 的 Measure 模块作为 Executor。它基于 Spark 开发,能够适配不同的数据源,并且对于 DSL 做了一系列拓展。基于平台的设计,我们需要和 Backend 进行较多的互动,并把数据进行回传。其实 Griffin Measure 本身就支持了一些基本的数据质量监控,比如重复值检测、自定义 SQL 等等,这里重点说明一下我们对 Measure 模块的改造:

  • 改造数据源、Sink 使其能够通过 HTTP 访问远程 API;
  • 部分功能增强、修改,例如:支持正则表达式;
  • 流式监控从 Spark Engine 切换为 Flink Engine,优化整体流式监控方案。Measure 本身是 Spark 生态的一部分,只能用 Spark Engine 做理线或者用微批模拟流式做监控。字节跳动内部本身有一定的 Flink 的能力,并且 Flink 对流式数据的处理能力比微批要好很多,所以我们就进行了这样的改造。

Monitor 实现

Monitor 模块主要是为了实现失败报警重试和重复报警功能,根据事件类型触发相应事件(重复报警、失败重试等)。因为业务数据全部存储在 MySQL,平台之前的 Monitor 重复报警做的也比较简单,即直接通过轮询的方式从 MySQL 中轮询拉起已报警实例,然后通过重复提交的方式进行报警。

随着监控的规则越来越多,库的压力会非常大,Monitor 的扫描也遇到了一些瓶颈,因此我们对 Monitor 进行了技术架构升级,具体改造内容包括:

  • 有状态服务,主节点对外提供服务;主备保证 HA
  • 接收 Backend 事件:监控失败、报警
  • 内存定时队列,事件性触发机制

最佳实践

前面介绍了数据质量平台的一些实现方式,下面为大家介绍一些我们在数据量和资源这两个方面的最佳实践。

表行数信息-优先 HMS 获取

内部的离线监控中,表行数的监控占比非常大,可能至少 50% 以上的离线规则都是表行数的监控。对于表行数,之前我们是通过 Spark,Select Count* 提交作业,对资源的消耗非常大。

后来我们对其做了一些优化。在任务提交的过程中,底层引擎在产出表的过程中将表行数记录写入相应分区信息中,我们就可以直接从 HMS 分区里直接获取表行数信息,从而避免了 Spark 任务的提交。

优化后的效果非常明显,目前对于表行数的监控,HMS 获取行数占比约 90 %,HMS 行数监控平均运行时长在秒级别。

注:这个功能需要推动底层服务配合支持,比如 Spark 需要把保存在本地 metric 里面的信息写入到 HMS 中,其他数据传输系统也需要支持。

离线监控优化

这一块是基于 Griffin 的 Measure 来进行,Measure 本身有丰富的功能,我们对其进行了裁剪以节约耗时。主要的裁剪和优化包括:

  • 裁剪掉部分异常数据收集功能;
  • 优化非必要的 join 流程。

另外,我们也对离线监控的执行参数进行了优化,主要包括:

  • 根据不同的监控类型,添加不同的参数 (shuffle to hdfs 等);
  • 根据监控特性,默认参数优化(上调 vcore 等)。

举个例子:用户写了 SQL 进行数据的 join,执行引擎可以分析出执行计划。对于 join 类的操作,shuffle 可能非常大,这种情况下我们默认会开一些 Spark 参数。

根据表行数来预判数据表的大小,如果判断数据表比较大,会默认微调 vcore 和 memory。以上这些优化都能在一定程度上提升性能,目前平台上各类监控的平均运行时长缩短了 10% 以上。

引入 OLAP 引擎

平台上很多数据表和业务表(除了日志表以外),在数仓上层的表监控数据量不是很大,这种情况很适合进行 OLAP 的查询。

这种情况下我们在数据探查场景引入了 presto。之前在该场景下通过 Spark 做探查,引入 presto 之后通过快速 fail 机制,大数据量、计算复杂的探查任务 fallback 到提交 Spark 作业,探查时间中位数从之前的 7min 缩短到目前的不到 40s,效果非常显著。

流式监控支持抽样 & 单 Topic 多 Rule 优化

Kafka 数据抽样

一般流式数据的问题都是通用性问题,可以通过数据采样发现问题。因此我们开发了数据采样的功能,减少数据资源的占比消耗。Flink Kafka Connector 支持抽样,可直接操作 kafka topic 的 offset 来达到抽样的目的。比如,我们按照 1% 的比例进行抽样,原来上 W 个 partition 的 Topic,我们只需要 ** 个机器就可以支撑。

单 Topic 多 Rule 优化

最早的时候我们是对一个 Topic 定义一个 Rule,然后开启一个 Flink 任务进行消费,执行 Rule。后来我们发现一些关键的数据需要对多个维度进行监控,也就是要定义多个维度的 Rule,对每一条 Rule 都开任务去消费是非常耗资源的,所以我们利用监控不是 CPU 密集型作业的特性,复用读取部分,单 slot 中执行多个 Rule,对 Topic 级别进行单一消费,在一个任务中把相关 Rule 都执行完。

未来演进方向

本文介绍了数据质量平台的实现和最佳实践,最后谈谈平台未来的演进方向。

  • 底层引擎统一,流批一体:目前平台的离线任务大部分是基于 Spark 完成的,流式数据采用了 Flink 处理,OLAP 引擎又引进了 presto,导致这套系统架构的运维成本比较高。我们看到 Flink 目前的 presto 能力和 Flinkbatch 的能力也在不断发展,因此我们后续会尝试切一些任务,做到真正意义上的统一引擎。
  • 智能:引入算法进行数据驱动。考虑引入 ML 方法辅助阈值选取或者智能报警,根据数据等级自动推荐质量规则。举几个例子,比如我们可以基于时序算法智能的波动率监控来解决节假日流量高峰和平常的硬规则阈值的提升。
  • 便捷:OLAP 对性能提升比较显著,但是目前我们只用在了数据探查功能上。后续可以将 OLAP 引擎应用于质量检测、数据据探查、数据对比应用与数据开发流程。
  • 优化:比如通过单一 Job,同时运行多个监控,将监控和数据探查结合。我们现在在尝试将数据质量的规则生成和数据探查做结合,做到所见即所得的数据和规则的对应关系。

Q&A

Q:数据质量问题的排查很多时候时间成本非常高,你们在数据质量问题的归因分析上有做什么工作吗?

A:这个问题是非常核心的痛点。这里可以介绍下目前我们的思路:联合字节跳动算法的同学做数据下钻,也就是对数据链路的每一张表都进行数据探查。如果发现质量问题,通过一些类似于血缘和字段的关系找到数据上游的字段。目前我们在做的还是这样偏探查+流程的方式去尽快了解上游数据,归因分析这部分暂时还没有什么进展。

Q:数据质量闭环是如何做的:比如数据质量问题由谁来解决?数据质量如何衡量?

A:数据质量问题谁来解决?谁在关注数据质量,谁去 push 推进,谁开发了数据,谁去解决数据质量问题。这是一个协作上的问题。

如何衡量数据质量?我们内部有一些可治理的指标,比如报警量、核心任何的报警率等。

Q:如何保证端到端数据一致性?

A:端到端数据一致性不是一个单一的工具能解决的,可能需要一些方案,比如:从端上上报的数据,结合埋点系统做数据校验,在发版的时候确定数据是准确的。但是我认为端到端数据一致性目前整个行业都还做的比较欠缺,业务端如果出现了问题,是很难排查的。如果对数据链路的每一层都做监控,可能问题排查起来会相对简单一些,但这种做法代价又比较大。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Golang入门学习之方法(method)--练气九层

发表于 2021-09-10

写在前面

在上一篇文章《Golang入门学习之结构体(struct)》当中,我们学习了Golang当中结构体(struct)的知识点,接下来我们将学习Golang当中的方法(method)。

方法的定义

在Golang当中,方法是作用在接收者(receiver)上的一个函数,接收者是某种类型的变量。因此,方法是一种特殊的函数。这里的接收者可以(几乎,接收者类型不能是一个接口类型或指针类型)任何类型,不仅仅是结构体类型,也就意味着,几乎任何类型都可以方法,甚至是函数类型,或者是int、bool等的别名类型。

我们可以这样理解:一个类型(比如说是结构体)加上它的方法就等价于面向对象语言当中的一个类。

方法的定义格式

1
go复制代码func (recv receiver_type) methodName(parameter_list) (return_value_list) { ... }

recv 就像是面向对象语言中的 this 或 self,但是 Golang 中并没有这两个关键字。随个人喜好,你可以使用 this 或 self 作为 receiver 的名字。

注意点

在 Golang 中,类型的代码和绑定在它上面的方法的代码可以不放置在一起,它们可以存在在不同的源文件,唯一的要求是:它们必须是同一个包的。请看下面这个例子:

我们在src/go_code/method/model/immortal.go 当中定义了一个修仙者类型

1
2
3
4
5
6
7
8
go复制代码package model

// 修仙者
type immortal struct {
name string
age int
gender string
}

然后,我们在src/go_code/method/model/immortal_method.go 当中定义immortal 类型的方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
go复制代码package model

// 工厂函数
func NewImmortal(age int, name, gender string) *immortal {

return &immortal{
name: name,
age: age,
gender: gender,
}
}

// Getter
func (recv *immortal) GetName() string {
return recv.name
}
...

再然后,我们再main包当中使用它

1
2
3
4
5
6
7
8
9
10
11
12
go复制代码package main

import (
"fmt"
"go_code/method/model"
)

func main() {
i := model.NewImmortal(18,"韩立","男")
name :=i.GetName()
fmt.Println(name)
}

输出:

1
复制代码韩立

函数与方法的区别

函数和方法都是一段可复用的代码段。他们的区别在于函数是面向过程的,方法是面向对象。从调用上来看,函数通过函数名进行调用,而方法则通过与实例关联的变量进行调用。

1
2
3
4
5
go复制代码// 函数调用
println("Hello World")
// 方法调用
immortal := model.NewImmortal(18,"韩立","男")
immortal.GetName()

再看Golang当中,方法由接收者类型、方法名、形参列表、返回值列表和方法体五部分构成,并且接收者必须有一个显式的名字,这个名字必须在方法中被使用。而且,接收者类型(receiver_type)必须在和方法同样的包中被声明。

Golang中方法的其他特性

在Golang当中,接收者类型关联的方法不写在类型结构里面(面向对象语言Java的方法是在类当中进行定义的)。因此,在Golang当中方法与接收者类型的耦合更加地宽松,也就是说,数据(字段)与其对应的行为是相互独立。

接收者类型可以是一个值而不是类型的指针吗?

接收者类型可以是一个值而不是类型的指针吗?答案是可以的。但是,基于性能方面的考虑,我并不建议大家这样做。因为接收者是作为值传递给对应的方法的,这相当于将实例的值拷贝传递给方法,这并不是一件划算的买卖。请看下面的例子,接收者完全可以是实例的值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码// 修仙者等级
type Level struct {
level string
levelValue int
}

// 获取等级描述
func (recv Level) GetLevel() string{
return recv.level
}

func main{
level := model.Level{"练气九层",9200}
fmt.Println(level.GetLevel())
}

输出:

1
go复制代码练气九层

注意:

指针方法和值方法都可以在指针或非指针上被调用。如下面程序所示,类型 Level 在值上有一个方法 GetLevel(),在指针上有一个方法 SetLevel(),但是可以看到两个方法都可以在两种类型的变量上被调用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
go复制代码package model


// 修仙者等级
type Level struct {
level string
levelValue int
}

func NewLevel(level string, levelValue int) Level {
return Level{
level: level,
levelValue: levelValue,
}
}


// 获取等级描述
func (recv Level) Level() string{
return recv.level
}

func (recv *Level) SetLevel(level string) {
recv.level = level
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码package main

import (
"fmt"
"go_code/method/model"
)

func main() {

level := model.NewLevel("练气九层",9200)
levelPointer := & level
fmt.Println("晋级之前:",level.Level())
levelPointer.SetLevel("炼气大圆满")
fmt.Println("晋级之后:",level.Level())
}

输出:

1
2
复制代码晋级之前: 练气九层
晋级之后: 炼气大圆满

方法和未导出字段

在上面的例子当中,level类型的字段对包外部而言是不可见的(可以理解为面向对象语言当中的private属性)。因此如果在main包当中直接通过选择器进行访问的话,将会报错。这是,我们可以通过面向对象语言一个众所周知的技术来完成:提供 getter 和 setter 方法。在Golang当中,对于 setter 方法使用 Set 前缀,对于 getter 方法只使用成员名。

关于并发访问对象

对象的字段(属性)不应该由 2 个或 2 个以上的不同线程在同一时间去改变。如果在程序发生这种情况,为了安全并发访问,可以使用包 sync中的方法(比如加个互斥锁)。但是这并不是一个推荐的选项(之后我们将会学习通过 goroutines 和 channels 去探索一种新的方式)。请看下面的例子

src/go_code/method/model/level_lock.go

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
go复制代码package model

import "sync"

// 修仙者等级
type levelLock struct {
Lock sync.Mutex
level string
levelValue int
}

func NewLevelLock(level string, levelValue int) *levelLock {
return &levelLock{
level: level,
levelValue: levelValue,
}
}

func (recv *levelLock) SetLevel(level string) {

recv.level = level

}

src/go_code/struct/main/level_lock.go

1
2
3
4
5
6
7
8
9
10
11
12
13
go复制代码package main

import "go_code/method/model"

func main() {
level := model.NewLevelLock("练气九层",9200)
// 获取锁
level.Lock.Lock()
//修改值
level.SetLevel("练气圆满")
// 释放锁
defer level.Lock.Unlock()
}

内嵌类型的方法和继承

当一个匿名类型被内嵌在结构体中时,匿名类型的可见方法也同样被内嵌,这在效果上等同于外层类型 继承 了这些方法:将父类型放在子类型中来实现亚型。这个机制提供了一种简单的方式来模拟经典面向对象语言中的子类和继承相关的效果。因为一个结构体可以嵌入多个匿名类型,所以实际上我们可以有一个简单版本的多重继承。

在model包当中定义一个immortal2 类型,并让其内嵌一个匿名类型level

src/go_code/method/model/anonymous_type.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
go复制代码package model



// 修仙者
type immortal2 struct {
name string
age int
gender string
Level
lingGen
}

func NewImmortal2(age int, name, gender string,levelName string,levelValue int,lingGenNames...string) *immortal2 {
return &immortal2{
name: name,
age: age,
gender: gender,
Level: Level{levelName,levelValue},
lingGen: lingGen{linGenNames: lingGenNames},
}
}

src/go_code/method/model/level.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
go复制代码package model


// 修仙者等级
type Level struct {
level string
levelValue int
}

func NewLevel(level string, levelValue int) Level {
return Level{
level: level,
levelValue: levelValue,
}
}


// 获取等级描述
func (recv Level) Level() string{
return recv.level
}

func (recv *Level) SetLevel(level string) {
recv.level = level
}

func (recv *Level) LevelName() string{
return recv.level
}

src/go_code/method/model/lingen.go:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
go复制代码package model

// 修士的灵根
type lingGen struct {
linGenNames[] string
}

func NewLinggen(name ...string) *lingGen {
return &lingGen{linGenNames: name}
}

func (recv *lingGen) LingGenNames() []string {
return recv.linGenNames
}

在main包当中导入并使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
go复制代码package main

import (
"fmt"
"go_code/method/model"
)

func main() {

im := model.NewImmortal2(18,"韩立","男",
"练气九层",9200,"木灵根","水灵根","火灵根","土灵根")
im.SetLevel("练气大圆满")
fmt.Println("境界:",im.LevelName())
fmt.Println("灵根:",im.LingGenNames())
}

输出:

1
2
go复制代码境界: 练气大圆满
灵根: [木灵根 水灵根 火灵根 土灵根]

Go 的类型和方法和其他面向对象语言对比

在如 C++、Java、C# 和 Python这样的面向对象语言中,方法在类的上下文中被定义和继承:在一个对象上调用方法时,运行时会检测类以及它的超类中是否有此方法的定义,如果没有会导致异常发生。

在 Golang 中,这样的继承层次是完全没必要的:如果方法在此类型定义了,就可以调用它,和其他类型上是否存在这个方法没有关系。在这个意义上,Golang具有更大的灵活性。

Golang不需要一个显式的类定义,如同 Java和C++等那样,相反地,“类”是通过提供一组作用于一个共同类型的方法集加类型本身来隐式定义的。类型可以是结构体或者任何用户自定义类型。

总结

在Golang中,类=类型+与之关联的方法集。

在 Golang 中,代码复用通过组合和委托实现,多态通过接口的使用来实现:有时这也叫 组件编程(Component Programming)。

相比于类继承,Go 的接口(后面将会详细讲解)提供了更强大、却更简单的多态行为。

写在后面

关于Golang中方法的学习就写到这了。本文当中涉及到的例子可以点击此处下载。如果我的学习笔记能够给你带来帮助,还请多多点赞鼓励。文章如有错漏之处还请各位小伙伴帮忙斧正。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

prometheus告警规则管理 什么是Rule

发表于 2021-09-09

微型公众号:运维开发故事,作者:夏老师

什么是Rule

Prometheus支持用户自定义Rule规则。 Rule分为两类,一类是Recording Rule,另一类是Alerting Rule。Recording Rule的主要目的是通过PromQL可以实时对Prometheus中采集到的样本数据进行查询,聚合以及其它各种运算操作。而在某些PromQL较为复杂且计算量较大时,直接使用PromQL可能会导致Prometheus响应超时的情况。这时需要一种能够类似于后台批处理的机制能够在后台完成这些复杂运算的计算,对于使用者而言只需要查询这些运算结果即可。Prometheus通过Recoding Rule规则支持这种后台计算的方式,可以实现对复杂查询的性能优化,提高查询效率。今天主要带来告警的分析。Prometheus中的告警规则允许你基于PromQL表达式定义告警触发条件,Prometheus后端对这些触发规则进行周期性计算,当满足触发条件后则会触发告警通知。

什么是告警Rule

告警是prometheus的一个重要功能,接下来从源码的角度来分析下告警的执行流程。

怎么定义告警Rule

一条典型的告警规则如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
yaml复制代码groups:
- name: example
rules:
- alert: HighErrorRate
#指标需要在触发告警之前的10分钟内大于0.5。
expr: job:request_latency_seconds:mean5m{job="myjob"} > 0.5
for: 10m
labels:
severity: page
annotations:
summary: High request latency
description: description info

在告警规则文件中,我们可以将一组相关的规则设置定义在一个group下。在每一个group中我们可以定义多个告警规则(rule)。一条告警规则主要由以下几部分组成:

  • alert:告警规则的名称。
  • expr:基于PromQL表达式告警触发条件,用于计算是否有时间序列满足该条件。
  • for:评估等待时间,可选参数。用于表示只有当触发条件持续一段时间后才发送告警。在等待期间新产生告警的状态为pending。
  • labels:自定义标签,允许用户指定要附加到告警上的一组附加标签。
  • annotations:用于指定一组附加信息,比如用于描述告警详细信息的文字等,annotations的内容在告警产生时会一同作为参数发送到Alertmanager。

Rule管理器

规则管理器会根据配置的规则,基于规则PromQL表达式告警的触发条件,用于计算是否有时间序列满足该条件。在满足改条件时,将告警信息发送给告警服务。

1
2
3
4
5
6
7
8
9
10
go复制代码type Manager struct {
opts *ManagerOptions //外部的依赖
groups map[string]*Group //当前的规则组
mtx sync.RWMutex //规则管理器读写锁
block chan struct{}
done chan struct{}
restored bool

logger log.Logger
}
  • opts(*ManagerOptions类型):记录了Manager实例使用到的其他模块,例如storage模块、notify模块等。
  • groups(map[string]*Group类型):记录了所有的rules.Group实例,其中key由rules.Group的名称及其所在的配置文件构成。
  • mtx(sync.RWMutex类型):在读写groups字段时都需要获取该锁进行同步。

读取Rule组配置

在Prometheus Server启动的过程中,首先会调用Manager.Update()方法加载Rule配置文件并进行解析,其大致流程如下。

  • 调用Manager.LoadGroups()方法加载并解析Rule配置文件,最终得到rules.Group实例集合。
  • 停止原有的rules.Group实例,启动新的rules.Group实例。其中会为每个rules.Group实例启动一个goroutine,它会关联rules.Group实例下的全部PromQL查询。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
go复制代码func (m *Manager) Update(interval time.Duration, files []string, externalLabels labels.Labels, externalURL string) error {
m.mtx.Lock()
defer m.mtx.Unlock()
// 从当前文件中加载规则
groups, errs := m.LoadGroups(interval, externalLabels, externalURL, files...)
if errs != nil {
for _, e := range errs {
level.Error(m.logger).Log("msg", "loading groups failed", "err", e)
}
return errors.New("error loading rules, previous rule set restored")
}
m.restored = true

var wg sync.WaitGroup
//循环遍历规则组
for _, newg := range groups {
// If there is an old group with the same identifier,
// check if new group equals with the old group, if yes then skip it.
// If not equals, stop it and wait for it to finish the current iteration.
// Then copy it into the new group.
//根据新的rules.Group的信息获取规则组名
gn := GroupKey(newg.file, newg.name)
//根据规则组名获取到老的规则组并删除原有的rules.Group实例
oldg, ok := m.groups[gn]
delete(m.groups, gn)

if ok && oldg.Equals(newg) {
groups[gn] = oldg
continue
}

wg.Add(1)
//为每一个rules.Group实例启动一个goroutine
go func(newg *Group) {
if ok {
oldg.stop()
//将老的规则组中的状态信息复制到新的规则组
newg.CopyState(oldg)
}
wg.Done()
// Wait with starting evaluation until the rule manager
// is told to run. This is necessary to avoid running
// queries against a bootstrapping storage.
<-m.block
//调用rules.Group.run()方法,开始周期性的执行PromQl语句
newg.run(m.opts.Context)
}(newg)
}

// Stop remaining old groups.
//停止所有老规则组的服务
wg.Add(len(m.groups))
for n, oldg := range m.groups {
go func(n string, g *Group) {
g.markStale = true
g.stop()
if m := g.metrics; m != nil {
m.IterationsMissed.DeleteLabelValues(n)
m.IterationsScheduled.DeleteLabelValues(n)
m.EvalTotal.DeleteLabelValues(n)
m.EvalFailures.DeleteLabelValues(n)
m.GroupInterval.DeleteLabelValues(n)
m.GroupLastEvalTime.DeleteLabelValues(n)
m.GroupLastDuration.DeleteLabelValues(n)
m.GroupRules.DeleteLabelValues(n)
m.GroupSamples.DeleteLabelValues((n))
}
wg.Done()
}(n, oldg)
}

wg.Wait()
//更新规则管理器中的规则组
m.groups = groups

return nil
}

运行Rule组调度方法

规则组启动流程(Group.run):进入Group.run方法后先进行初始化等待,以使规则的运算时间在同一时刻,周期为g.interval;然后定义规则运算调度方法:iter,调度周期为g.interval;在iter方法中调用g.Eval方法执行下一层次的规则运算调度。
规则运算的调度周期g.interval有prometheus.yml配置文件中global中的 [ evaluation_interval: | default = 1m ]指定。
实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
go复制代码func (g *Group) run(ctx context.Context) {
defer close(g.terminated)

// Wait an initial amount to have consistently slotted intervals.
evalTimestamp := g.EvalTimestamp(time.Now().UnixNano()).Add(g.interval)
select {
case <-time.After(time.Until(evalTimestamp))://初始化等待
case <-g.done:
return
}

ctx = promql.NewOriginContext(ctx, map[string]interface{}{
"ruleGroup": map[string]string{
"file": g.File(),
"name": g.Name(),
},
})
//定义规则组规则运算调度算法
iter := func() {
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Inc()

start := time.Now()
//规则运算的入口
g.Eval(ctx, evalTimestamp)
timeSinceStart := time.Since(start)

g.metrics.IterationDuration.Observe(timeSinceStart.Seconds())
g.setEvaluationTime(timeSinceStart)
g.setLastEvaluation(start)
}

// The assumption here is that since the ticker was started after having
// waited for `evalTimestamp` to pass, the ticks will trigger soon
// after each `evalTimestamp + N * g.interval` occurrence.
tick := time.NewTicker(g.interval) //设置规则运算定时器
defer tick.Stop()

defer func() {
if !g.markStale {
return
}
go func(now time.Time) {
for _, rule := range g.seriesInPreviousEval {
for _, r := range rule {
g.staleSeries = append(g.staleSeries, r)
}
}
// That can be garbage collected at this point.
g.seriesInPreviousEval = nil
// Wait for 2 intervals to give the opportunity to renamed rules
// to insert new series in the tsdb. At this point if there is a
// renamed rule, it should already be started.
select {
case <-g.managerDone:
case <-time.After(2 * g.interval):
g.cleanupStaleSeries(ctx, now)
}
}(time.Now())
}()
//调用规则组规则运算的调度方法
iter()
if g.shouldRestore {
// If we have to restore, we wait for another Eval to finish.
// The reason behind this is, during first eval (or before it)
// we might not have enough data scraped, and recording rules would not
// have updated the latest values, on which some alerts might depend.
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
iter()
}

g.RestoreForState(time.Now())
g.shouldRestore = false
}

for {
select {
case <-g.done:
return
default:
select {
case <-g.done:
return
case <-tick.C:
missed := (time.Since(evalTimestamp) / g.interval) - 1
if missed > 0 {
g.metrics.IterationsMissed.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
g.metrics.IterationsScheduled.WithLabelValues(GroupKey(g.file, g.name)).Add(float64(missed))
}
evalTimestamp = evalTimestamp.Add((missed + 1) * g.interval)
//调用规则组规则运算的调度方法
iter()
}
}
}
}

运行Rule调度方法

规则组对具体规则的调度在Group.Eval中实现,在Group.Eval方法中会将规则组下的每条规则通过QueryFunc将(promQL)放到查询引擎(queryEngine)中执行,如果被执行的是AlertingRule类型,那么执行结果指标会被NotifyFunc组件发送给告警服务;如果是RecordingRule类型,最后将改结果指标存储到Prometheus的储存管理器中,并对过期指标进行存储标记处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
go复制代码// Eval runs a single evaluation cycle in which all rules are evaluated sequentially.
func (g *Group) Eval(ctx context.Context, ts time.Time) {
var samplesTotal float64
遍历当前规则组下的所有规则
for i, rule := range g.rules {
select {
case <-g.done:
return
default:
}

func(i int, rule Rule) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "rule")
sp.SetTag("name", rule.Name())
defer func(t time.Time) {
sp.Finish()
//更新服务指标-规则的执行时间
since := time.Since(t)
g.metrics.EvalDuration.Observe(since.Seconds())
rule.SetEvaluationDuration(since)
//记录本次规则执行的耗时
rule.SetEvaluationTimestamp(t)
}(time.Now())
//记录规则运算的次数
g.metrics.EvalTotal.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()
//运算规则
vector, err := rule.Eval(ctx, ts, g.opts.QueryFunc, g.opts.ExternalURL)
if err != nil {
//规则出现错误后,终止查询
rule.SetHealth(HealthBad)
rule.SetLastError(err)
//记录查询失败的次数
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

// Canceled queries are intentional termination of queries. This normally
// happens on shutdown and thus we skip logging of any errors here.
if _, ok := err.(promql.ErrQueryCanceled); !ok {
level.Warn(g.logger).Log("msg", "Evaluating rule failed", "rule", rule, "err", err)
}
return
}
samplesTotal += float64(len(vector))
//判断是否是告警类型规则
if ar, ok := rule.(*AlertingRule); ok {
发送告警
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
var (
numOutOfOrder = 0
numDuplicates = 0
)
//此处为Recording获取存储器指标
app := g.opts.Appendable.Appender(ctx)
seriesReturned := make(map[string]labels.Labels, len(g.seriesInPreviousEval[i]))
defer func() {
if err := app.Commit(); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)
g.metrics.EvalFailures.WithLabelValues(GroupKey(g.File(), g.Name())).Inc()

level.Warn(g.logger).Log("msg", "Rule sample appending failed", "err", err)
return
}
g.seriesInPreviousEval[i] = seriesReturned
}()

for _, s := range vector {
if _, err := app.Append(0, s.Metric, s.T, s.V); err != nil {
rule.SetHealth(HealthBad)
rule.SetLastError(err)

switch errors.Cause(err) {
储存指标返回的各种错误码处理
case storage.ErrOutOfOrderSample:
numOutOfOrder++
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
case storage.ErrDuplicateSampleForTimestamp:
numDuplicates++
level.Debug(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
default:
level.Warn(g.logger).Log("msg", "Rule evaluation result discarded", "err", err, "sample", s)
}
} else {
缓存规则运算后的结果指标
seriesReturned[s.Metric.String()] = s.Metric
}
}
if numOutOfOrder > 0 {
level.Warn(g.logger).Log("msg", "Error on ingesting out-of-order result from rule evaluation", "numDropped", numOutOfOrder)
}
if numDuplicates > 0 {
level.Warn(g.logger).Log("msg", "Error on ingesting results from rule evaluation with different value but same timestamp", "numDropped", numDuplicates)
}

for metric, lset := range g.seriesInPreviousEval[i] {
if _, ok := seriesReturned[metric]; !ok {
//设置过期指标的指标值
// Series no longer exposed, mark it stale.
_, err = app.Append(0, lset, timestamp.FromTime(ts), math.Float64frombits(value.StaleNaN))
switch errors.Cause(err) {
case nil:
case storage.ErrOutOfOrderSample, storage.ErrDuplicateSampleForTimestamp:
// Do not count these in logging, as this is expected if series
// is exposed from a different rule.
default:
level.Warn(g.logger).Log("msg", "Adding stale sample failed", "sample", metric, "err", err)
}
}
}
}(i, rule)
}
if g.metrics != nil {
g.metrics.GroupSamples.WithLabelValues(GroupKey(g.File(), g.Name())).Set(samplesTotal)
}
g.cleanupStaleSeries(ctx, ts)
}

然后就是规则的具体执行了,我们这里先只看AlertingRule的流程。首先看下AlertingRule的结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
go复制代码// An AlertingRule generates alerts from its vector expression.
type AlertingRule struct {
// The name of the alert.
name string
// The vector expression from which to generate alerts.
vector parser.Expr
// The duration for which a labelset needs to persist in the expression
// output vector before an alert transitions from Pending to Firing state.
holdDuration time.Duration
// Extra labels to attach to the resulting alert sample vectors.
labels labels.Labels
// Non-identifying key/value pairs.
annotations labels.Labels
// External labels from the global config.
externalLabels map[string]string
// true if old state has been restored. We start persisting samples for ALERT_FOR_STATE
// only after the restoration.
restored bool
// Protects the below.
mtx sync.Mutex
// Time in seconds taken to evaluate rule.
evaluationDuration time.Duration
// Timestamp of last evaluation of rule.
evaluationTimestamp time.Time
// The health of the alerting rule.
health RuleHealth
// The last error seen by the alerting rule.
lastError error
// A map of alerts which are currently active (Pending or Firing), keyed by
// the fingerprint of the labelset they correspond to.
active map[uint64]*Alert
logger log.Logger
}

这里比较重要的就是active字段了,它保存了执行规则后需要进行告警的资源,具体是否告警还要执行一系列的逻辑来判断是否满足告警条件。具体执行的逻辑如下:

1
2
3
4
5
6
7
8
9
go复制代码func (r *AlertingRule) Eval(ctx context.Context, ts time.Time, query QueryFunc, externalURL *url.URL) (promql.Vector, error) {
res, err := query(ctx, r.vector.String(), ts)
if err != nil {
r.SetHealth(HealthBad)
r.SetLastError(err)
return nil, err
}
// ......
}

这一步通过创建Manager时传入的QueryFunc函数执行规则配置中的expr表达式,然后得到返回的结果,这里的结果是满足表达式的指标的集合。
比如配置的规则为:

1
go复制代码cpu_usage > 90

那么查出来的结果可能是

1
2
go复制代码cpu_usage{instance="192.168.0.11"} 91
cpu_usage{instance="192.168.0.12"} 92

然后遍历查询到的结果,根据指标的标签生成一个hash值,然后判断这个hash值是否之前已经存在(即之前是否已经有相同的指标数据返回),如果是,则更新上次的value及annotations,如果不是,则创建一个新的alert并保存至该规则下的active alert列表中。
然后遍历规则的active alert列表,根据规则的持续时长配置、alert的上次触发时间、alert的当前状态、本次查询alert是否依然存在等信息来修改alert的状态。具体规则如下:

  1. 如果alert之前存在,但本次执行时不存在
    1. 状态是StatePending或者本次检查时间距离上次触发时间超过15分钟(15分钟为写死的常量),则将该alert从active列表中删除
    2. 状态不为StateInactive的alert修改为StateInactive
  2. 如果alert之前存在并且本次执行仍然存在
    1. alert的状态是StatePending并且本次检查距离上次触发时间超过配置的for持续时长,那么状态修改为StateFiring
  3. 其余情况修改alert的状态为StatePending

上面那一步只是修改了alert的状态,但是并没有真正执行发送告警操作。下面才是真正要执行告警操作:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
go复制代码// 判断规则是否是alert规则,如果是则发送告警信息(具体是否真正发送由ar.sendAlerts中的逻辑判断)
if ar, ok := rule.(*AlertingRule); ok {
ar.sendAlerts(ctx, ts, g.opts.ResendDelay, g.interval, g.opts.NotifyFunc)
}
// .......
func (r *AlertingRule) sendAlerts(ctx context.Context, ts time.Time, resendDelay time.Duration, interval time.Duration, notifyFunc NotifyFunc) {
alerts := []*Alert{}
r.ForEachActiveAlert(func(alert *Alert) {
if alert.needsSending(ts, resendDelay) {
alert.LastSentAt = ts
// Allow for two Eval or Alertmanager send failures.
delta := resendDelay
if interval > resendDelay {
delta = interval
}
alert.ValidUntil = ts.Add(4 * delta)
anew := *alert
alerts = append(alerts, &anew)
}
})
notifyFunc(ctx, r.vector.String(), alerts...)
}
func (a *Alert) needsSending(ts time.Time, resendDelay time.Duration) bool {
if a.State == StatePending {
return false
}
// if an alert has been resolved since the last send, resend it
if a.ResolvedAt.After(a.LastSentAt) {
return true
}
return a.LastSentAt.Add(resendDelay).Before(ts)
}

概括一下以上逻辑就是:

  1. 如果alert的状态是StatePending,则不发送告警
  2. 如果alert的已经被解决,那么再次发送告警标识该条信息已经被解决
  3. 如果当前时间距离上次发送告警的时间大于配置的重新发送延时时间(ResendDelay),则发送告警,否则不发送

以上就是prometheus的告警流程。学习这个流程主要是问了能够对prometheus的rules相关的做二次开发。我们可以修改LoadGroups()方法,让其可以动态侧加载定义在mysql中定义的规则,动态实现告警规则更新。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

技术人员的一点产品思维思考 一 产品思维是什么? 二 为什么

发表于 2021-09-09

简介: 作为一线的开发人员,大家是不是都经历过和产品吵得不可开焦,甚至最后谁也无法说服谁,最后只能由老板出面解决的经历。而大多数情况老板还真能以某种方法去解决,并且是一个双方都能接受的方案。然而这不全是因为老板的权威,地位所决定的,更多的是各个老板们有比一线开发更强的产品力,能够听懂对方的诉求和抓住矛盾点并且给出解决方案,这其实就是一种产品思维的方式。

作者 | 行溪

来源 | 阿里技术公众号

一 产品思维是什么?

作为一线的开发人员,大家是不是都经历过和产品吵得不可开焦的经历,甚至最后谁也无法说服谁,只能将问题上升。最后由老板出面解决,而大多数情况下老板还真能够以某种方法去解决,并且是一个双方都能接受的方案。这个时候可能大部分同学会认为是老板的权威,地位导致了这一结果。其实这很不准确(可能有一部分原因但绝对不是主要原因)其实更多的是各个老板们有比一线开发更强的产品力,能够听懂对方的诉求和抓住矛盾点并且给出解决方案。同时其中的表达方式更容易让彼此接受,才导致了最终你看到的老板出马,问题解决,好像自己的观点继续保持了,同时对方也留有余地。那这里这项重要的能力来源于什么呢?其实我认为更是一种产品思维的方式。

从这里可以看到产品思维是通过科学方法论来持续获取最大化价值的思维方式、但这样说或许有点空洞、在基于日常产品技术的产品迭代、更想说产品思维以下这几种形式。

二 为什么技术人员要具备产品思维?

1 技术视角的局限性

  1. 觉得产品提的这个需求没有意义、对业务没有任何帮助、是一种鸡肋需求;
  2. 疑问产品的需求为什么每天改来改去?十分降低工作效率;
  3. 觉得产品的想法天马行空、不专业。完全没有考虑系统的可行性、基本无法落地实施;
  4. 觉得产品的方案一点都不周全、这么明显的逻辑漏洞都没有考虑到;
  5. …….

在日常的工作当中、作为程序员的你是不是经常听过如上的吐槽、其实抛开有一小部分产品可能确实由于经验导致方案不成熟、但更多的有没有想过是由于产品思维和工程师思维的碰撞、导致了大家对同一件事情的认知不一样、从各自的角度出发的时候会觉得难以理解。首先我们来看一组产品思维和技术思维的对比。

举个例子:之前盒马仓储产品评审过一个活鲜仓按箱出库的需求、大致意识呢就是针对活鲜类商品(例如鱼、螃蟹之类)直接以箱为单位进行出入库管理。但这里如果从工程师思维出发会出现几个不可避免的问题:

1、箱入箱出这种场景在之前盒马的仓储系统中是不存在的、具不具备可实施性未知(HOW、技术至上);

2、全链路要适配这种改造、改造点会非常大、难以实现(关注细节、解决方案);

3、针对少货或者多货的场景、涉及上下游链路库存对账会非常麻烦、且极端场景下压根无法对齐(完美情节);

针对这几个问题、研发侧进行了需求的打回、并协议产品业务对齐方案和风险后再次进行评审。但从产品的角度来看角度完全是另一种场景:

1、箱入箱出以前不存在、不代表现在以及将来不会有。这是线下真实需要的业务场景、盒马的仓储需要扩展这类能力(WHY、用户价值);

2、全链路改造工期大、可以梳理工期进行正常排期迭代(迭代思维);

3、异常场景是小概率事件、不能因为小概率事件影响整个项目的超前推进。真正少量异常数据由业务自己兜底。(全局观、完成比完美重要);

在这里由于产品和技术出发的角度不一样、则会带来天然的冲突、在情绪带入之后会很难理解对方的诉求和问题点在哪里、以及是否有综合2者之后相对靠谱的解决方案。在这里技术人员如果转换一下视角用产品思维去应对这个需求、这样去沟通是不是会更合适一些呢?

1、认可箱入箱出是一种新的能力、让产品确定业务价值之后去扩展此类能力。也是对现有仓储系统库存能力的一种补充。

2、告知产品具备心理预期、全链路改造方案成本会比较高、耗时会比较长。看是否能够接受、如果不能接受去尝试中间方案。

3、技术侧如果投入大量成本去改造链路、则需要产品和业务认可此事的技术价值和投入产出比。

当采用这种方式去沟通之后会明确告诉产品技术侧的问题点以及担忧的问题、这样当双方都认可此需求的价值和必要性之后再一起同步携手往前走。回到这个例子、其实就是程序员如果具备产品思维,你就能站在产品角度去思考问题、就容易和产品团队沟通协作培养更加融洽的工作关系、更有利于提升工作效率。

2 技术人员提升产品力后的优势

我们再来看如果技术具备产品思维之后除了更加便于沟通还能给技术人员带来哪些工作上的优势呢?

抽象能力相信大家做技术的都或多或少有一些、平时写代码中也经常用到。但是根据已有的内容去抽象、和面向未来去抽象是2种完全不一样的能力。当技术人员具备产品视角之后,会更容易发现抽象的角度、也更容易表达出来抽象的概念。例如最近接到一个针对大仓加工智能秤称重的产品诉求(将加工过程中的原料重量通过系统记录下来)如果简单的抽象可能是称重任务可以称原料、称成品。如果和产品交流过后具备产品思维就会问这个称重主要解决什么问题?是在实操作业哪一步的动作?这个时候可能结合MES系统你想到的是工序、会进行一个工序任务的抽象。但是这里再考虑表达、是否要更加让大家明白工序是什么意识?这里就会自然的类比精修、贴膜、贴标等等实操环节是否都可以通过工序去表达、再结合他应用的场景这样一个面向未来的工序模型就可以逐步沉淀下来、也便于未来扩展(有兴趣的同学也可以去了解下仓内的精华装载单元是如何面对过去进行统一归纳沉淀、面向未来是如何抽象扩展的、这是一个十分经典的抽象例子)。

修炼思考力,提升角度,更容易看问题本质。大家想一下具备产品思维去形象表达事物的根本属性的时候是不是大多从这三个角度出发:

1、给出清晰的定义;

2、做出准确的简单类比;

3、打出精妙的比方;

你在工作中一直具备如此的习惯、在你遇见困难的时候、会把思考攥在自己手中、而不是交给别人。这是一件当下比较累、但对未来很爽的事情、保持一定的好奇心和想象力去思考、会让自己收获更多的成长。电影《教父》里有一句经典的台词:“花半秒钟就看透事物本质的人,和花一辈子都看不清事物本质的人,注定是截然不同的命运”。

更好的全局视角、这里针对技术人员更好的全局视角意味着什么呢?

1、首先当然是提高系统熟练度、不仅仅是针对当前你所负责的模块、更是你所负责系统的上下游链路也具备相当的了解。这样会给你更多的机会去承担更大的职责。

2、明确的知道做这个需求、这个项目的价值、知道为什么去做、而不是简单的执行机器。会去从需求合理性、投入产出比等问题上去思考需求的必要性。

3、更容易知道如何去体现价值、知道这个项目的重点是什么?知道如何去沉淀数据、从系统的角度来阐述和达到目标。

4、更好的通过技术创造业务增值、技术同学如果具备产品力会更容易发现产品当中的优化点、并创造不小的业务价值。举个例子:之前优选网格仓针对中心仓发货容器进行二次分拨到站点、技术侧发现这里可以针对容器内货品进行分配关系的打乱(总体分配关系不变)从而减少分拨次数。仅这一个技术小点、就减少了网格仓现场12%的分拨次数。再举个例子:之前B2C大仓边拣边播、是拣完一个SKU进入一次巷道,如果巷道中有多个SKU需要折返多次;技术侧具备产品思考之后通过细微的改动给了更好的产品体验、对总拣和分拨这俩个动作进行抽象归类。统一进行总拣,再进行分拨的方式,避免总拣分拨交叉使用的方式下拣货人员往返多次的问题,提升现场14%的拣货人效。

三 如何提升产品力

1 思维的转变

在不同的思考阶段、我们看待问题的角度一定要有进步。能够针对具体表现层的变化、去抽象底层的概念和能力来以不变应万变(举个例子、仓储系统复杂演进的过程更多的就是围绕产能 & 人效 & 成本 & 数字化的不断演进)要不断锻炼自己的思维习惯、这样才能去提升思考力的边界。最近笔者在看一本关于产品法的书并做了部分笔记、这里面关于思维方式我认为以下几点是值得我们技术人员去注意并且不断学习和提升的

  • 本质思维:第一性原理从头算起,只采用最基本的事实作为依据,然后再层层推导,得出结论。抛开别人怎么做,过去怎么做得到不一样的视角(拒绝被同类产品的设计影响和压根不懂同类产品的设计是俩回事 )连环追问法是一种手段,理清过往思路和关键环节,帮助快速判断并且产生新的idea。
  • 相对思维:日光与阴影,让东西明亮不一定是加强它的亮度,可以通过调低周边的环境。这是一种逆向思维。成功与失败,优势与劣势都是暂时,相对的概念。看问题很重要的俩个角度:关系和时间
  • 抽象思维:白痴与上帝,高级抽象视角看问题和用户本能层级看问题会有冲突。如果看不同局部可以切换是比较重要的能力。具体与抽象像飞机腾空时一个个点被不断缩小的过程。多考虑新元素(能力)而不是新功能,元素可以搭建功能。
  • 系统思维:反馈的地位。反馈系统模型是基础的抽象模型,本质上都是在设计反馈。思维误区所有极端和异常的路径是小概率现象。
  • 演化思维:自下而上的设计。极简是演化的基础,好的框架重点突出并且能够收放自如。

2 现实中的一小步

看到这里大家可能会问、上面说了这么多软思维、方法论相关的观点。如果从落地的角度看、在平时的工作中怎么去提升呢?怎么去潜移默化的改变自己的思维呢?

  • 普适的套路:多看书籍、培养自己的知识储备;多做总结、将自己所学到的尽量系统化的表达出来、这也是进一步巩固自己的知识成果;多做分享、如果一个知识点你不光能够自己懂、还能够让大家都听懂你讲了什么你的思考是什么这样会进一步提升你的结构化思考 & 表达能力;
  • 保持好奇心:这一点我更想表达在平时的工作中不要局限于自己的边界、不要仅仅满足于分配给你的工作、要多探寻分配你工作之外的部分扩大自己的领域能力。基础的要求:列如一个项目你负责其中的某一个模块之后、你是否能cover住你上下游的问题、线上出现问题时你是否能及时定位到原因并且协助解决;另外就是保持对周边领域的探寻,对比下周边同学的工作内容和思考看看自己还欠缺哪部分能力、能做哪些针对性的提高。例如工作时可能某个同学负责仓储的拣货实操部分:那是否有了解过拣货单生成部分的主流程?是否了解打包部分的设计?是否知道装笼发运的几种主要方式和约束?又是否知道仓储系统之外单据的交互节点和主要数据?

再放大一些到除开工作之外、是否平时会关心他人的生活经验?是否对于大到国际新闻小到周边内网八卦一概充耳不闻?这样会让自己处于信息的闭塞状态、久而久之会导致思维的僵化。所以一定要保持自己的好奇心、保证自己知识储备的宽度、让自己的思维处于活跃的状态。

  • 多去思考产品需求的本质、至少先做到在PRD评审上多换位思考、多理解产品设计背后的原因。举个例子如果有个用户减肥的需求你会联想到什么?普通人可能想到的就是减肥、但产品思维下的思考应该想到的是、可能是他想要更优美的外貌?可能他需要寻找伴侣?可能想要提升社交地位?

  • 多保持联想、锻炼想象力、平时接到产品的需求之后是否能够联想到系统的现有能力、是否能够结合现有系统来达到需求的最优解?在这里举个仓储系统拣货的例子:之前盒马加工中心有过一个按商品拣货的需求、大意是当拣货员看到库位的商品之后可以自己主动选择商品去拣货、按商品拣货这个需求从产品侧来说是一个比较简单的诉求。这里一般会怎样联想呢?

a. 第一反应接到这个需求一般是直接根据商品选择拣货单返回给用户、再让用户做选择即可(同时需要更改拣货单相关索引);

b. 联想到全局、如果深层次想一下结合B2C仓内拣货任务作业的实时调度、就可以想到这里用任务调度实现是否更符合仓内实操全局调度的规划?

c. 再联想到现有系统的瓶颈和优化点、现在的调度根据分区&任务能力选择队列拉取任务、本质是一种“实时排序”、只能够基于配置优先拉取。其实我们也应该扩宽任务调度的”选择能力”、例如这次的按商品索取、其实是一种队列内部的选择能力。在获取任务时除了L1级别的选择不同队列的能力、我们在队列内部应该要具有L2级别的选择能力、来丰富我们的任务调度中心在实操侧的另一种调度方式(和排序平级的能力);

d. 最后结合过去和可能的未来、除了按商品拣货、之前的DPS拣货 & 标签拣货走任务队列时通过工具去拉取完临时过滤的方式不应该是一种长期的方式。包括后续可能存在的按位置拣货(分区内部的巷道库位、根据人力位置实时获取最优拣货单)、按销售订单拣货(哪个订单要超时了紧急提高优先级)等等可能存在的场景、本质都是根据实时的实操动作去具备L2维度的选择能力、是否可以借本次需求来搭建基础实现能力;

e. 最终给出一些抽象归纳的建议、拣货实操时的变动、具备人的因素、去动态选择。生成任务的时候照正常生产、实操的时候具备动态能力(插入的选择能力根据人的因素去抉择)如果不配置那就默认使用原有队列的排序能力。

当然这里只是一个联想的例子、可能最终决策还要考虑投入产出比等各方面的因素。保持想象力无论是对程序员还是产品经理都是一个提升思考深度比较不错的方式。我们要做的就是培养习惯、让自己的思考伴随着自己的想象力去得到成长。

原文链接

本文为阿里云原创内容,未经允许不得转载。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

记一次JAVA使用Ldap操作AD域

发表于 2021-09-09

早期项目上遇到的需要集成windows域用户的信息的功能,第一次接触ad域,因为不了解而且网上其他介绍不明确,比较费时,这里记录下。

说明:

(1). 特别注意:Java操作查询域用户信息获取到的数据和域管理员在电脑上操作查询的数据可能会存在差异(同一个意思的表示字段,两者可能不同)。

(2). 连接ad域有两个地址: ldap://XXXXX.com:389 和 ldap://XXXXX.com:636(SSL)。

(3). 端口389用于一般的连接,例如登录,查询等非密码操作,端口636安全性较高,用户密码相关操作,例如修改密码等。

(4). 域控可能有多台服务器,之间数据同步不及时,可能会导致已经修改的数据被覆盖掉,这个要么域控缩短同步的时间差,要么同时修改每一台服务器的数据。

1. 389登录

只要不抛出异常就是验证通过

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码public LdapContext adLogin(JSONObject json) {
       String username = json.getString("username");
       String password = json.getString("password");
       String server = "ldap://XXXXXXX.com:389";
       try {
           Hashtable<String, String> env = new Hashtable<String, String>();
           //用户名称,cn,ou,dc 分别:用户,组,域
           env.put(Context.SECURITY_PRINCIPAL, username);
           //用户密码 cn 的密码
           env.put(Context.SECURITY_CREDENTIALS, password);
           //url 格式:协议://ip:端口/组,域   ,直接连接到域或者组上面
           env.put(Context.PROVIDER_URL, server);
           //LDAP 工厂
           env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
           //验证的类型     "none", "simple", "strong"
           env.put(Context.SECURITY_AUTHENTICATION, "simple");
           LdapContext ldapContext = new InitialLdapContext(env, null);
           log.info("ldapContext:" + ldapContext);
           log.info("用户" + username + "登录验证成功");
           return ldapContext;
​
      } catch (NamingException e) {
           log.info("用户" + username + "登录验证失败");
           log.info("错误信息:"+e.getExplanation());
           return null;
      }
  }

2. 636登录验证

证书提前导入的Java库中 参考:www.cnblogs.com/moonson/p/4…

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
java复制代码public LdapContext adLoginSSL(JSONObject json) {
String username = json.getString("username");
String password = json.getString("password");
Hashtable env = new Hashtable();
​
String javaHome = System.getProperty("java.home");
        String keystore = javaHome+"/lib/security/cacerts";
        log.info("java.home,{}",keystore);
    // 加载导入jdk的域证书
        System.setProperty("javax.net.ssl.trustStore", keystore);
        System.setProperty("javax.net.ssl.trustStorePassword", "changeit");
        String LDAP_URL = "ldap://XXXXXX.com:636"; // LDAP访问地址
​
        env.put(Context.INITIAL_CONTEXT_FACTORY, "com.sun.jndi.ldap.LdapCtxFactory");
        env.put(Context.SECURITY_PROTOCOL, "ssl");//链接认证服务器
        env.put(Context.PROVIDER_URL, LDAP_URL);
        env.put(Context.SECURITY_AUTHENTICATION, "simple");
        env.put(Context.SECURITY_PRINCIPAL, username);
        env.put(Context.SECURITY_CREDENTIALS, password);
        try {
            LdapContext ldapContext = new InitialLdapContext(env, null);
            log.info("认证成功");// 这里可以改成异常抛出。
            return ldapContext;
        } catch (javax.naming.AuthenticationException e) {
            log.info("认证失败:{}",e.getMessage());
        } catch (Exception e) {
            log.info("认证出错:{}",e.getMessage());
        }
       return null;
  }

3. 查询域用户信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
java复制代码public List getUserKey(JSONObject json){
​
       JSONObject admin = new JSONObject();
       admin.put("username","Aaaaa");
       admin.put("password", "bbbbbbbb");
       String name = json.getString("name");
       log.info("需要查询的ad信息:{}",name);
       List<JSONObject> resultList = new JSONArray();
       LdapContext ldapContext = adLogin(admin); //连接到域控
       if (ldapContext!=null){
​
           String company = "";
           String result = "";
           try {
               // 域节点
               String searchBase = "DC=XXXXXXX,DC=com";
               // LDAP搜索过滤器类
               //cn=*name*模糊查询          //cn=name 精确查询
          // String searchFilter = "(objectClass="+type+")";
              String searchFilter = "(sAMAccountName="+name+")";    //查询域帐号
​
               // 创建搜索控制器
               SearchControls searchCtls = new SearchControls();
               String  returnedAtts[]={"description","sAMAccountName","userAccountControl"};                        searchCtls.setReturningAttributes(returnedAtts); //设置指定返回的字段,不设置则返回全部
               // 设置搜索范围 深度
               searchCtls.setSearchScope(SearchControls.SUBTREE_SCOPE);
               // 根据设置的域节点、过滤器类和搜索控制器搜索LDAP得到结果
               NamingEnumeration answer = ldapContext.search(searchBase, searchFilter,searchCtls); 
               // 初始化搜索结果数为0
               int totalResults = 0; 
               int rows = 0;
               while (answer.hasMoreElements()) {// 遍历结果集
                   SearchResult sr = (SearchResult) answer.next();// 得到符合搜索条件的DN
                   ++rows;
                   String dn = sr.getName();
                   log.info(dn);
                   Attributes Attrs = sr.getAttributes();// 得到符合条件的属性集
                   if (Attrs != null) {
                       try {
                           for (NamingEnumeration ne = Attrs.getAll(); ne.hasMore();) {
                               Attribute Attr = (Attribute) ne.next();// 得到下一个属性
                               // 读取属性值
                               for (NamingEnumeration e = Attr.getAll(); e.hasMore(); totalResults++) {
                                   company = e.next().toString();
                                   JSONObject tempJson = new JSONObject();
​
                                   tempJson.put(Attr.getID(), company.toString());
                                   resultList.add(tempJson);
                              }
                          }
                      } catch (NamingException e) {
                           log.info("Throw Exception : " + e.getMessage());
                      }
                  } 
              } 
                              log.info("总共用户数:" + rows);
          } catch (NamingException e) {
               log.info("Throw Exception : " + e.getMessage());
          }finally {
               try{
                   ldapContext.close();
              }catch (Exception e){
                   e.printStackTrace();
              }
          }
      }
       return resultList;
  }

4. 重置用户密码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码// 管理员重置用户密码,后强制用户首次登录修改密码
public Map<String, String> updateAdPwd(JSONObject json) {
//要修改的帐号(这个dn是查询的用户信息里的dn的值,而不是域账号)
       String dn = json.getString("dn");
       String password = json.getString("password");//新密码
​
       JSONObject admin = new JSONObject();
       admin.put("username","aaaaaaa");
       admin.put("password", "bbbbbbb");
       Map<String,String> map = new HashMap<String,String>();
       LdapContext ldapContext = adLoginSSL(admin); //连接636端口域
       ModificationItem[] mods = new ModificationItem[2];
       if (ldapContext!=null){
           try {
               String newQuotedPassword = """ + password + """;
               byte[] newUnicodePassword = newQuotedPassword.getBytes("UTF-16LE");
// unicodePwd:修改的字段,newUnicodePassword:修改的值
               mods[0] = new ModificationItem(DirContext.REPLACE_ATTRIBUTE,
                       new BasicAttribute("unicodePwd", newUnicodePassword));
mods[1] = new ModificationItem(DirContext.REPLACE_ATTRIBUTE,
                       new BasicAttribute("pwdLastSet", "0"));  // 首次登录必须修改密码
​
               // 修改密码
               ldapContext.modifyAttributes(dn, mods); 
               map.put("result", "S");
               map.put("message","成功");
          }catch (Exception e){ 
               map.put("result","E");
               map.put("message", "无法重置密码");
          }finally {
               try{
                   ldapContext.close();
              }catch (Exception e){
                   e.printStackTrace();
              }
​
          }
​
      }else {
           log.info("");
           map.put("result","E");
           map.put("message", "验证失败");
      }
       return map;
  }

5. 域账号解锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码// 表示锁定的字段需要测试,不一定这个lockoutTime
public Map<String, String> deblocking(JSONObject json) {
   JSONObject admin = new JSONObject();
   String dn = json.getString("dn"); //被解锁的帐号(这个dn指的是查询用户信息里的dn的值,不是域账号)
   admin.put("username","aaaaaa");
   admin.put("password","bbbbbb");
   Map<String,String> map = new HashMap<String,String>();
   LdapContext ldapContext = adLogin(admin);
   ModificationItem[] mods = new ModificationItem[1];
   if (ldapContext!=null){
       try {
      // "0" 表示未锁定,不为0表示锁定
           mods[0] = new ModificationItem(DirContext.REPLACE_ATTRIBUTE,
                   new BasicAttribute("lockoutTime","0"));
           // 解锁域帐号
           ldapContext.modifyAttributes(dn, mods); 
           map.put("result", "S");
           map.put("message","成功");
      }catch (Exception e){ 
           map.put("result","E");
           map.put("message", "解锁失败");
      }finally {
           try{
               ldapContext.close();
          }catch (Exception e){
               e.printStackTrace();
          }
      }
  }else {
       map.put("result","E");
       map.put("message", "验证失败");
  }
   return map;
}

欢迎关注公众号:纪先生笔记

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…534535536…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%