开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

FutureTask源码分析

发表于 2021-11-29

1.本文默认,都是已使用过FutreTask类以及线程池

1.FutureTask 是什么?

Future类图.png

我们可以看到Futuetask 是继承了RunnableFuture接口,然后RunnableFuture继承了Future和Runnable接口,Runnable接口就不多说了,而这个Future接口是一个异步计算类接口,它提供了获取结果、判断是否执行完成、取消任务等接口,所以FutureTask其实就是一个支持异步任务获取结果的一个异步任务类,其实内部还使用一个callable接口这个接口是Runnable接口的进阶版它也是创建线程的一种方式只是他支持返回结果以及抛出异常,FutureTask它支持callable接口以及Runnable接口提交任务,内部使用了一个RunnableAdapter接口做适配.

2.FutureTask如何使用?

  1. 提交的是Runnable 的任务
1
2
3
4
5
6
typescript复制代码FutureTask<String> task = new FutureTask<String>(new Runnable() {
@Override
public void run() {
//处理逻辑
}
},"");
  1. 提交的是Callable的任务
1
2
3
4
5
6
7
typescript复制代码FutureTask<String> callableTask = new FutureTask<String>(new Callable<String>() {

@Override
public String call() throws Exception {
return "callable";
}
});
  1. 基于函数式编程提交任务(其实就是callable)
1
2
3
4
arduino复制代码FutureTask<String> functionTask = new FutureTask<String>(()->{
//处理逻辑
return "demo";
});

最后把任务提交给线程池处理即可

3.FutureTask源码分析

3.1 FutureTask构建函数以及相关变量

FutureTask类是一个一个stat变量用来记录当前任务执行的状态:新建、正在计算(完成的中间)、完成、取消、正在中断、中断等几个状态。
构造函数有两个,一个是基于Callable接口提交任务,一个基于Runnable,但是FutureTask的任务是callable所以,它需要通过适配器类进行适配。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
java复制代码全局状态,用来判断当前任务处于那个阶段
private volatile int state;
处于新建状态
private static final int NEW = 0;
任务处理完成中间状态,马上就可以获得结果
private static final int COMPLETING = 1;
任务完成时的状态
private static final int NORMAL = 2;
任务异常后的状态
private static final int EXCEPTIONAL = 3;
任务被取消状态
private static final int CANCELLED = 4;
中断时的中间状态
private static final int INTERRUPTING = 5;
中断状态
private static final int INTERRUPTED = 6;

(具体执行任务的 接口带返回值的Callable接口
private Callable<V> callable;
任务结果:可能是正常结果,也可能是异常信息
private Object outcome; // non-volatile, protected by state reads/writes
执行callable的线程
private volatile Thread runner;
在等待的线程,是一个单链表
private volatile WaitNode waiters;

callable的构造器函数
public FutureTask(Callable<V> callable) {
if (callable == null)
throw new NullPointerException();
this.callable = callable;
this.state = NEW; // ensure visibility of callable
}

//基于runnable接口的构造函数,runnable没有返回值,所以需要传入一个结果类型。
public FutureTask(Runnable runnable, V result) {
// 调用适配器类Executors去返回一个callable接口的子类。
this.callable = Executors.callable(runnable, result);
this.state = NEW; // ensure visibility of callable
}

//Executors.callable,这里用到了适配器模式
public static <T> Callable<T> callable(Runnable task, T result) {
if (task == null)
throw new NullPointerException();
//创建了一个适配器类返回
return new RunnableAdapter<T>(task, result); //适配器只需要实现callable接口即可
}

3.2 获取任务结果的get方法

FutureTask的get()获取结果的方法,有两个一个是不指定时间获取,一直阻塞到任务执行完成,第二个是可以指定时间获取结果,如果获取不到就会抛出异常信息和返回异常信息。

get方法做的操作主要有几个:

  • 调用awaitDone方法进行获取任务状态,该方法主要进行几个判断:
  1. 判断线程是否已经中断,如果在一九被中断,则从等待线程链表节点中移除,然后抛出中断移除
  2. 判断当前任务状态,如果状态是已经完成状态,则把当前线程节点的线程置空,然后返回任务状态
  3. 如果当前结果正在计算中(完成前的中间状态),则让当前线程进行礼让(让出cpu)
  4. 如果当前节点是null ,则构建一个waiterNode节点(默认是当前线程)
  5. 如果节点没入队,则把waiteNode节点入队
  6. 如果等待有时间限制,则判断是否超过时间,如果超过则移除节点,返回状态,否则通过LockSupport挂起线程(分有时间限制和没时间限制)
  • 调用report方法处理结果,该方法通过判断当前任务状态处理什么状态返回返回不同的结果。

下面源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
java复制代码/**
* 获取结果,如果任务没完成则调用awaitDone()方法等待任务完成,返回一个状态然后让report处理结果
*/
public V get() throws InterruptedException, ExecutionException {
int s = state;
if (s <= COMPLETING)
s = awaitDone(false, 0L);
//最终调用report返回结果
return report(s);
}

/**
* 指点时间返回
*/
public V get(long timeout, TimeUnit unit)
throws InterruptedException, ExecutionException, TimeoutException {
if (unit == null)
throw new NullPointerException();
int s = state;
if (s <= COMPLETING &&
(s = awaitDone(true, unit.toNanos(timeout))) <= COMPLETING)
throw new TimeoutException();
return report(s);
}

//主要是通过中断和超时等待结果或者正常完成或者终止。
private int awaitDone(boolean timed, long nanos)
throws InterruptedException {
//获取处理时间
final long deadline = timed ? System.nanoTime() + nanos : 0L;

WaitNode q = null;
//判断节点是否在链表中
boolean queued = false;
//自旋
for (;;) {
//1.线程已经中断判断是否已经被中断,如果中断则移除该任务
if (Thread.interrupted()) {
removeWaiter(q);
throw new InterruptedException();
}

//2.如果线程已经完成,则直接把当前waitNode置空,并且返回当前状态
int s = state;
if (s > COMPLETING) {
if (q != null)
q.thread = null;
return s;
}
//如果当前状态处于计算机结果中,则线程进行礼让(让出cpu时间片)
else if (s == COMPLETING) // cannot time out yet
Thread.yield();
//如果节点等null,则创建节点
else if (q == null)
q = new WaitNode();
//如果发现不在链表中则添加到链表头部
else if (!queued)
queued = UNSAFE.compareAndSwapObject(this, waitersOffset,q.next = waiters, q);
//如果等待有时间限制,则判断是否已经超时,如果超时则移除节点,并返回状态
else if (timed) {
nanos = deadline - System.nanoTime();
if (nanos <= 0L) {
removeWaiter(q);
return state;
}
//如果没超时则,设置一个有效时间,通过LockSupport把当前线程挂起.
LockSupport.parkNanos(this, nanos);
}
else
//挂起线程。
LockSupport.park(this);
}
}


//内部方法主要用来返回结果,它会判断当前状态是否是正常,如果正常直接返回,否则抛出一个异常
private V report(int s) throws ExecutionException {
Object x = outcome;
if (s == NORMAL)
return (V)x;
if (s >= CANCELLED)
throw new CancellationException();
throw new ExecutionException((Throwable)x);
}

3.3 任务取消方法Cancel

FutureTask 取消任务方法主要是做了几个操作:

  • 如果任务状态新建并且通过cas修改任务状态为中断或者取消状态失败则直接返回,如果修改成功,则进行一个中断处理,然后通过Unsafe方法 需要任务状态设为中断状态
  • 最后调用finishCompletion方法进行收尾工作,其实就是把线程等待节点处理。
    • finishCompletion方法主要是通过Unsafe方法把waiteNode节点轮流置空和把线程唤醒, 处理完成后,会回调一个done方法,这个方法是个钩子方法,让子类去做扩展操作的,在guava中ListenableFuture中就通过这个done方法执行监听器执行链的

下面是源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
typescript复制代码//用于取消任务执行,是否支持中断取消
public boolean cancel(boolean mayInterruptIfRunning) {
//如果表示新建状态,或者通过CAS修改状态为中断或者取消状态失败,则直接返回取消失败
if (!(state == NEW &&
UNSAFE.compareAndSwapInt(this, stateOffset, NEW,
mayInterruptIfRunning ? INTERRUPTING : CANCELLED)))
return false;
try { //如果是可中断处理的则获取执行callable的线程执行中断处理
if (mayInterruptIfRunning) {
try {
Thread t = runner;
if (t != null)
t.interrupt();
} finally { // 调用UNSAFE修改stat状态的偏移量为中断中断状态。
UNSAFE.putOrderedInt(this, stateOffset, INTERRUPTED);
}
}
} finally {
//最后调用finishCompletion 处理结果。
finishCompletion();
}
return true;
}

//判断是否以及取消
public boolean isCancelled() {
return state >= CANCELLED;
}
//判断是否以及完成任务,会发现其他几种除了中间状态其实都代表着任务已经完成了(可能是异常完成或者中断)
public boolean isDone() {
return state != NEW;
}

//任务执行完成或者取消后调用该方法收尾。
private void finishCompletion() {
// 这里主要是移除所以等待线程,等待GC回收
for (WaitNode q; (q = waiters) != null;) {
if (UNSAFE.compareAndSwapObject(this, waitersOffset, q, null)) {
for (;;) {
Thread t = q.thread;
if (t != null) {
q.thread = null;
//把等待线程唤醒。
LockSupport.unpark(t);
}
WaitNode next = q.next;
if (next == null)
break;
q.next = null; // unlink to help gc
q = next;
}
break;
}
}
//这里调用了一个done方法这个方法是抽象方法主要是用于扩展用的,如果你在任务执行完成后还需要做一些其他操作则可以实现该接口处理,
//guava中ListenablFuture就是基于这个接口任务完成后回调done方法去执行监听器执行处理。
done();

callable = null; // 置空等待回收
}


//抽象方法留给子类扩展使用,如果完成任务后需要做额外操作可以通过该接口扩展。
protected void done() { }

3.4 FutureTask的run方法以及Set方法

FutureTask的run方法是重新了接口中的run方法,线程池中创建线程出来执行任务就是调用run方法去执行,run方法的主要操作有几个

  • 判断执行任务的线程状态是否是当前线程以及当前任务的状态,通过cas把当前线程设置为执行任务线程,如果设置失败证明线程已经被其他线程处理了,或者当前任务状态表示新建也是表示已经有其他线程处理了。
  • 如果是为执行的任务,则再次判断状态已经任务callable是否合法,如果校验通过,则调用callable的call方法执行热为奴,如果出现异常则调用setException方法把异常处理
  • 如果正常执行完成,则通过set方法设置结构到全局结构中。
  • 最后把执行任务线程置空,避免重复执行任务(因为是并发执行),,然后会调用一个handlePOossibleCancellationinterrupt来判断是否被其他线程取消任务修改状态
    set方法主要是用来处理结果的,一个是通过判断当前任务状态,如果状态是完成的中间状态的着把结果赋值给全局任务结果outCome ,然后把状态修改为完成,最后调用finishCompletion方法处理收尾工作。
  • runAndReset方法和run方法类似,只这个方法是处理定时任务线程池执行任务才会调用这个,这个方法执行完成后会把任务状态设置为新建状态,为下一次定时执行做准备。

下面是源码分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
typescript复制代码 //该方法是当线程执行完run方法之后会调用set方法使用cas修改状态、设置结果,处理善后工作.
protected void set(V v) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = v;
UNSAFE.putOrderedInt(this, stateOffset, NORMAL); // final state
finishCompletion();
}
}

//处理异常结果的方法
protected void setException(Throwable t) {
if (UNSAFE.compareAndSwapInt(this, stateOffset, NEW, COMPLETING)) {
outcome = t;
UNSAFE.putOrderedInt(this, stateOffset, EXCEPTIONAL); // final state
finishCompletion();
}
}

//实现了RunnableFuture接口的run方法,线程池内部执行是会调用该接口
//这个接口干了啥?
//1. 先判断当前线程的状态,如果表示new则证明已经被调用过
//
//2. 如果没执行过,尝试把当前线程设置为执行这个任务的线程,并且 通过cas把执行线程修改成功则,继续执行,否则cas修改失败证明,这个执行任务线程已经被其他线程设置(已经有线程执行这个操作了)
//修改成功后,执行 callable的call方法执行任务。
public void run() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return;
try {
Callable<V> c = callable;
if (c != null && state == NEW) {
V result;
boolean ran;
try {
result = c.call();
ran = true;
} catch (Throwable ex) {//如果异常则把异常信息处理到结果中
result = null;
ran = false;
setException(ex);
}
//如果正常完成着调用set方法把结果设置到全局outCome
if (ran)
set(result);
}
} finally {
//最后执行完后要置空,防止并非调用call或者run方法
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
int s = state;//重新获取,可能有其他线程修改了,然后进行中断中间状态或者中断状态
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
}

//处理来只取消的中断处理,如果处于中断中则调用线程礼让(方法)方法让出cpu
private void handlePossibleCancellationInterrupt(int s) {

if (s == INTERRUPTING)
while (state == INTERRUPTING)
Thread.yield(); // wait out pending interrupt
}

//该方法是定时线程池中执行run方法会调用这个方法,定时任务中执行一次完成后需要把状态重新设置为new,为下一轮执行做准备。
protected boolean runAndReset() {
if (state != NEW ||
!UNSAFE.compareAndSwapObject(this, runnerOffset,
null, Thread.currentThread()))
return false;
boolean ran = false;
int s = state;
try {
Callable<V> c = callable;
if (c != null && s == NEW) {
try {
c.call(); // don't set result
ran = true;
} catch (Throwable ex) {
setException(ex);
}
}
} finally {
// runner must be non-null until state is settled to
// prevent concurrent calls to run()
runner = null;
// state must be re-read after nulling runner to prevent
// leaked interrupts
s = state;
if (s >= INTERRUPTING)
handlePossibleCancellationInterrupt(s);
}
return ran && s == NEW;
}

3.5 WaitNode内部类及remodeWaitNode方法

waitNode类是一个简单的单链表,主要是用来存储一个堆栈中的等待线程,而waitNode方法主要是进行节点移除操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
ini复制代码  /**
* 单链表结构,用来记录一个堆栈中的等待线程
*/
static final class WaitNode {
volatile Thread thread;
volatile WaitNode next;
WaitNode() { thread = Thread.currentThread(); }
}



//移除等待的线程节点
private void removeWaiter(WaitNode node) {
if (node != null) {
node.thread = null;
retry:
for (;;) { // restart on removeWaiter race
for (WaitNode pred = null, q = waiters, s; q != null; q = s) {
s = q.next;
if (q.thread != null)
pred = q;
else if (pred != null) {
pred.next = s;
if (pred.thread == null) // check for race
continue retry;
}
else if (!UNSAFE.compareAndSwapObject(this, waitersOffset,
q, s))
continue retry;
}
break;
}
}
}

// Unsafe 获取 任务状态stat ,已经当前任务执行的线程和等待的线程
private static final sun.misc.Unsafe UNSAFE;
private static final long stateOffset;
private static final long runnerOffset;
private static final long waitersOffset;
static {
try {
UNSAFE = sun.misc.Unsafe.getUnsafe();
Class<?> k = FutureTask.class;
stateOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("state"));
runnerOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("runner"));
waitersOffset = UNSAFE.objectFieldOffset
(k.getDeclaredField("waiters"));
} catch (Exception e) {
throw new Error(e);
}
}
  1. 总结
    FuturTask 是一个可以获取任务结果和处理移除的异步任务类,它需要结合线程池来使用,任务的执行入口其实是在线程池中,线程池通过调用FutureTask实现的run方法,然后FutureTask同Unsafe类处理任务状态以及任务线程来保证线程只有一次,同时还预留了扩展接口done,让我们可以对FutureTask类进行扩展,而guava中的ListenableFutre就是基于FutureTask的done接口进行了扩展实现了future的监听器。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

CLR析构列表是如何添加析构函数类的

发表于 2021-11-29

比如说,有一个类,包含了析构函数

1
2
3
4
5
6
7
8
9
10
11
typescript复制代码  class A
{
public A()
{
Console.WriteLine("Create A Class");
}
~A()
{
Console.WriteLine("Kill A Class");
}
}

当我们实例化这个类的时候

1
css复制代码    A a = new A()

CLR在分配a这个实例的时候,会检测它是否包含了析构函数~A。

1
2
3
4
5
6
7
8
9
10
scss复制代码CHECK_ALLOC_AND_POSSIBLY_REGISTER_FOR_FINALIZATION(newAlloc, size, flags & GC_ALLOC_FINALIZE);


#define CHECK_ALLOC_AND_POSSIBLY_REGISTER_FOR_FINALIZATION(_object, _size, _register) do { \
if ((_object) == NULL || ((_register) && !REGISTER_FOR_FINALIZATION(_object, _size))) \
{ \
STRESS_LOG_OOM_STACK(_size); \
return NULL; \
} \
} while (false)

注意看这个宏的if 判断语句,它首先是个do-while循环,这个很有意思,wihile包含的条件就是个false,表示它只循环一次。其次会判断_object也就是传递进来的参数newAlloc。是否为NULL,如果是直接返回NULL,因为一个NULL就没必要进行后续动作了。

当它不等于NULL,后续需要_register和 !REGISTER_FOR_FINALIZATION(_object, _size))这两个条件。_register实际上就是判断当前的实例化分配的类A是否包含析构函数操作为:flags & GC_ALLOC_FINALIZE。

而!REGISTER_FOR_FINALIZATION(_object, _size))是重点,它包含了如何把析构函数的对象添加到析构列表。

实际上因为 ((_register) && !REGISTER_FOR_FINALIZATION(_object, _size))这两个是&& ,所以_register就算是包含了析构函数,但是REGISTER_FOR+FINALIZATION返回True。它还是不进入If语句里面去。

重点关注REGISTER_FOR_FINALIZATION

1
2
scss复制代码#define REGISTER_FOR_FINALIZATION(_object, _size) \
hp->finalize_queue->RegisterForFinalization (0, (_object), (_size))

它实际上也是个宏,调用了RegisterForFinalization 函数,这个finalize_queue是在hp(hp 就是gc_heap)初始化的时候被Init的。

finallize_queue实例化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
ini复制代码bool CFinalize::Initialize()
{
CONTRACTL {
NOTHROW;
GC_NOTRIGGER;
} CONTRACTL_END;


m_Array = new (nothrow)(Object*[100]);


if (!m_Array)
{
ASSERT (m_Array);
STRESS_LOG_OOM_STACK(sizeof(Object*[100]));
if (GCConfig::GetBreakOnOOM())
{
GCToOSInterface::DebugBreak();
}
return false;
}
m_EndArray = &m_Array[100];


for (int i =0; i < FreeList; i++)
{
SegQueueLimit (i) = m_Array;
}
m_PromotedCount = 0;
lock = -1;
#ifdef _DEBUG
lockowner_threadid.Clear();
#endif // _DEBUG


return true;
}

实际上就做了一件事情,就是让SegQueueLimit的每一个元素指向析构列表

我们来看RegisterForFinalization

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
scss复制代码CFinalize::RegisterForFinalization (int gen, Object* obj, size_t size)
{
CONTRACTL {
NOTHROW;
GC_NOTRIGGER;
} CONTRACTL_END;


EnterFinalizeLock();


// Adjust gen
unsigned int dest = gen_segment (gen);


// Adjust boundary for segments so that GC will keep objects alive.
Object*** s_i = &SegQueue (FreeList);
if ((*s_i) == m_EndArray)
{
if (!GrowArray())
{
LeaveFinalizeLock();
if (method_table(obj) == NULL)
{
// If the object is uninitialized, a valid size should have been passed.
assert (size >= Align (min_obj_size));
dprintf (3, (ThreadStressLog::gcMakeUnusedArrayMsg(), (size_t)obj, (size_t)(obj+size)));
((CObjectHeader*)obj)->SetFree(size);
}
STRESS_LOG_OOM_STACK(0);
if (GCConfig::GetBreakOnOOM())
{
GCToOSInterface::DebugBreak();
}
return false;
}
}
Object*** end_si = &SegQueueLimit (dest);
do
{
//is the segment empty?
if (!(*s_i == *(s_i-1)))
{
//no, swap the end elements.
*(*s_i) = *(*(s_i-1));
}
//increment the fill pointer
(*s_i)++;
//go to the next segment.
s_i--;
} while (s_i > end_si);


// We have reached the destination segment
// store the object
**s_i = obj;
// increment the fill pointer
(*s_i)++;


LeaveFinalizeLock();


return true;
}

这个函数做的主要功能

  1. 获取到当前传递进来的代在m_FillPointers数组的索引4(因为传递进来的代只能是0,所以total_generation_count - gen - 1=4,调用unsigned int dest = gen_segment (gen);)
  2. 获取到m_FillPointers的最末尾元素的地址Object*** s_i = &SegQueue (FreeList),FreeList=7
  3. 获取m_FillPointers索引为N的元素地址 Object*** end_si = &SegQueueLimit (dest)
  4. 判断前一个m_FillPointers的元素是否与当前元素相等。
  5. m_FillPointers元素的值取值,实际上就是指向Object的指针。然后++,实际上就是m_array+8.指向析构队列的下一个元素。
  6. 如此往复循环,找到当前析构对象需要存放的位置。

当GC的时候

  1. 先判断对象是否存在
  2. 执行析构对象里面的方法
  3. 回收掉它

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ReentrantLock原理分析

发表于 2021-11-29

1.ReetrantLock是什么?

ReetrantLock是基于Lock接口实现的一种可重入锁,说到重入锁,其实synchronized也是一种可重入锁,
但是它与synchronized有本质的区别,ReetrantLock底层是依靠AQS来实现的,而Synchronized是基于monitor监视器来实现的;

ReetrantLock是有两种模式的,一种是公平锁模式,一种是非公平锁模式,其实就是获取锁的时候是否按照顺序获得。注:源码只分析非公平锁模式。

2.AQS在ReetrantLock中怎么使用的?

ReetrantLock并没有直接继承AQS这个抽象类,而是通过内部定义一个抽象Sync类来继承AQS,然后实现一部分通用的锁需要的方法比如tryRelease方法,然后再通过定义公平类和非公平类来继承这个抽象类Syn,然后实现具体的Lock方法,其实这里是用到了典型的模板方法模式,AQS中定义了加锁的整个算法执行逻辑结构,具体加锁实现留给子类去实现。

下面是它的AQS在ReetrantLock的实现类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
java复制代码//ReetrantLock的全局AQS子类.
private final Sync sync;



//我们看看Syn这个静态抽象类干了那几件事:
//1.首先是继承了AQS这个抽象同步器类
//2.实现了非公平锁尝试获取锁操作的nonfairTryAcquire方法,
//3.实现了tryRelease尝试释放锁方法 (ps:获取锁和释放锁操作都是模板方法的体现)
abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = -5179523762034025860L;

abstract void lock();

// 首先获取当前线程,然后通过getState()方法获取全局state变量(ps:该变量是AQS的一个全局遍历,它可以判断当前锁被某个线程冲入几次)
//然后通过cas修改这个state从0改为1,如果成功则该线程获取锁成功,把他设置为当前锁持有者线程
//如果不成功,则判断当前锁持有者线程是否是自己,如果是者重入,修改state数量。
final boolean nonfairTryAcquire(int acquires) {
//获取当前线程,已经当前锁的状态state
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
//如果0则还没有线程获得锁,尝试修改状态,如果成功把自己设置为锁持有者线程
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
//判断当前锁持有者是否是自己,如果是进行重入操作
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
//尝试释放锁方法,
//1.计算需要释放释放个数与当前状态的差值,然后判断当前线程释放是锁持有者如果不是抛异常
//2.如果差值等于0者释放成功把锁持有者线程置空,让出位置给下一个线程
//3.最后修改state状态。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
...省略相关判断线程以及状态的简单方法
}

//非公平锁类,Sync的子类,实现了非公平锁加锁方法Lock的业务逻辑
//1.先通过cas修改状态修改成功者把当前线程设置为锁持有者线程
//2.调用acquire获取锁,该方法是AQS实现的
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

final void lock() {
//cas 修改state从0-1如果成功,则获得锁,否则调用acquire方法进行后续尝试获取锁操作。
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//尝试获取锁方法,是由AQS的acquire方法调用,(ps:你可以发现模板方法者这里用得很多。)
//然后调用父类Sync的nonfairTryAcquire非公平锁尝试获取方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

/**
* 公平锁,公平锁不会先尝试修改state状态而是通过acquire去获取。
*/
static final class FairSync extends Sync {
private static final long serialVersionUID = -3000897897090466540L;

final void lock() {
acquire(1);
}
//这个方法是公平锁尝试获取锁的方法
//1.获取当前线程和锁状态state,判断队列里是否有等待线程,如果没有并且cas修改成功则获得锁,修改为持有者线程
//2.判断当前锁持有者是否是自己,如果是进行重入操作
protected final boolean tryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (!hasQueuedPredecessors() &&
compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0)
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
}

3.ReetrantLock的Lock执行流程以及源码分析

reentrantlock-lock.png
上图是Lock的大概流程,下面是部分源码解读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
java复制代码//1.第一步是调用ReetrantLock的lock方法
public void lock() {
sync.lock();
}
//第二步是调用Sync子类的lock方法
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//第三步是调用AQS的acquire方法,先尝试获取锁,如果获取不到则生成一个waiter节点,然后
public final void acquire(int arg) {
//这个if,tryAcquire失败后,进行acquireqQueued去队列来从头节点的下一个节点开始重新tryAcquire获取如果成功则把当前节点中断,获得锁的线程执行。
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}




//第四步是调用NonfairSync的tryAcquire方法
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}


//第五步是调用Sync的nonfairTryAcquire方法,该部分源码前面已经介绍了。
final boolean nonfairTryAcquire(int acquires){}




//第六步如果还是失败则调用addWaiter
private Node addWaiter(Node mode) {
//创建当前线程创建成一个waiter节点
Node node = new Node(Thread.currentThread(), mode);
// 获取AQS队列的尾节点,然后把当前node 节点在尾部添加并且修改为节点为当前新node
Node pred = tail;
if (pred != null) {
node.prev = pred;
//这里存在并发操作,可能为节点变为null,就修改失败
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
//cas修改为尾节点可能会失败,enq失败后初始化操作
enq(node);
return node;
}
//自旋,并且判断尾节点是否是null,如果是null,则当前没有等待节点则需要初始化一个节点作为头节点,该节点类似于哨兵作用。
//头节点是不干活的
private Node enq(final Node node) {
for (;;) {
Node t = tail;
if (t == null) { // 创建一个node作为头节点,进行初始化
if (compareAndSetHead(new Node()))
tail = head;
} else {
//重新插入前面的node节点。
node.prev = t;
if (compareAndSetTail(t, node)) {
t.next = node;
return t;
}
}
}
}
//第七步,调用AQS的acquireQueued,
//1. 节点添加到队列后,自旋,获取当前node节点的前一个节点的,
//2.如果当前的node的前一个节点是头节点,则证明队列里只有一个线程在等待,进行调用tryAcquire再次尝试获取锁,如果成功则把队列节点置空并且返回重新设置头节点
//3.获取失败,准备挂起,在挂起之前则调用shouldParkAfterFailedAcquire,主要是处理有一些等待节点已经被唤醒者需要移出队列的操作
//4.处理完后调用parkAndCheckInterrupt 进行挂起操作
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
//处理被唤醒的节点,和挂起当前线程
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
//
if (failed)
cancelAcquire(node);
}
}
//处理已经被唤醒的节点,
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
return true;
if (ws > 0) {
//一直while找到等待状态低于的节点然后把当前节点插入为下一个节点。
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
//调用LockSupport.park挂起当前线程
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

简单小结:首先线程尝试获取锁,如果失败,创建一个Node节点,该节点包含当前线程,然后添加到AQS的双向等待队列中,最后再一次进行尝试获取锁,如果失败,则准备挂起当前线程,但是挂起之前会把等待队列中已经被Signal唤醒的节点移除调用再添加到等待队列中,最后挂起。

4.ReetrantLock的unLock执行流程以及源码分析

reentrantLock的unlock.png

上图是解锁的大概流程,下面看一些具体的源码解读:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码第一步:先调用unlock
public void unlock() {
sync.release(1);
}
第二步: 调用AQS的release方法,模板方法
public final boolean release(int arg) {
//调用Syn的tryRelease尝试获取锁。
if (tryRelease(arg)) {
Node h = head;
//如果头节点不是空而且状态不是0则尝试唤醒队列的后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
第三步,唤醒后续节点
private void unparkSuccessor(Node node) {

int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
//一直遍历找到可以唤醒的jied
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//通过unpark唤醒
if (s != null)
LockSupport.unpark(s.thread);
}

小结: 解锁比较简单,就是判断是否是当前线程,如果是则通过cas修改state,并且判断是否是只重入了一次,如果是则已经释放锁成功,则唤醒队列后续可以唤醒的节点,否则不是重入,则锁持有者线程不变。

其他一些比如Condition条件分析,待续….

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

RDS 类数据库设计规范

发表于 2021-11-29

这是我参与11月更文挑战的第5天,活动详情查看:2021最后一次更文挑战

说明:本规范是自己基于多家公司经验,有自己开发经验也有行业规范要求,列举一些常用的场景而成文,提供给初创团队参考使用,可根据实际开发参考

一. 核心理念

制定规范的直接目的是约束设计行为,最终目的是确保设计的合理统一。规
范虽然是有丰富项目经验的人制定的,但维护的却不是某个人的意志,而是项目*的意志,因为遵守此规范对项目是好的有利的,此规范才有意义
。

对于数据库核心理念可以归纳为以下几点:

◆不在数据库做计算,cpu 计算务必移至业务层

◆控制单表数据量,单表记录控制在千万级

◆控制列数量,字段数控制在 30 以内

◆平衡范式与冗余,为提高效率可以牺牲范式设计,冗余数据

◆拒绝 3B(big),大 sql,大事务,大批量

二. 表结构设计

2.1 禁用保留字作为字段名或表名

  • desc、range、match、delayed、comment 等加上引号也不行

2.3 标志类字段优先使用 tinyint

  • 启用标记、状态标记、纳税人状态等统一使用 tinyint 存储
  • 标记类字段必须为非空且带有默认值
  • 对于标记特殊类型建议采用负值记录
  • 启用标记一般情况默认都是启用,启用使用 1 表示

2.4 避免使用 NULL 字段
*NULL 的列使索引/索引统计/值比较都更加复杂,对 MySQL 来说更难优化

  • NULL 字段的索引需要额外空间
  • NULL 字段的复合索引无效
  • 可以为空时要额外详细说明原因
  • NOT NULL 之后能有默认最好有默认值
  • 对 null 的处理时候,只能采用 is null 或 is not null,而不能采用=、in、<、<>、!=、not in 这些操作符号。如:where name!=’zhuyungao’,如果存在 name 为null 值的记录,查询结果就不会包含 name 为 null 值的记录

2.7 禁止在主表中使用 TEXT、BLOB 类型

  • 会浪费更多的磁盘和内存空间,非必要的大量的大字段查询会淘汰掉热数据,导致内存命中率急剧降低,影响数据库性能
  • 可以在主表中存储索引,利用索引到明细表具体查询
  • 对于大的字符串内容,可以在主表中存储 MD5 摘要进行匹配
  • 不允许对大字段进行列表查询
  • 对于图片、音频等,可以存放在 OSS 上,数据库中存 URI

2.8 原则上禁止使用浮点类型

  • 浮点数和机器精度相关,容易导致偏差
  • 一定要使用小数必须使用 decimal 类型

2.9 必须使用 varchar(20 或 50)存储手机号

  • 涉及到区号或者国家代号,可能出现+-()
  • 手机号并不会做数学运算
  • varchar 可以支持模糊查询

2.11 必须包含显式主键

  • 主键推荐为整数,且必须设置自增, 自增从 10000 开始
  • 禁止使用 varchar 作为主键
  • 禁止使用复合列作为主键

2.12 禁止使用外键

  • 外键逻辑在程序里处理

2.13 建议使用 InnoDB 存储引擎

  • 当前业务来看不需要使用其它引擎
  • InnoDB 引擎支持事务、行级锁、并发性能更好、CPU 及内存缓存页优化使得资源利用率更高
  • 我们的 RDS 默认就是 InnoDB 引擎,不指定即可

2.14 必须使用 UTF-8 字符集

  • 我们的 RDS 默认就是 UTF-8 字符集
  • 如果需要存储表情字符,则需要显式指定为 UTF-8-mb4

2.15 数据表、数据字段必须加入中文注释

  • 表结构是需要给其他人或后来维护者看的
  • 特别注意修改字段之后要同时修改注释

2.16 禁止使用存储过程、触发器、Event

  • 高并发大数据的互联网业务,架构设计思路是“解放数据库 CPU,将计算转移到服务层”,并发量大的情况下,这些功能很可能将数据库拖死,业务逻辑放到服务层具备更好的扩展性,能够轻易实现“增机器就加性能”。数据库擅长存储与索引,CPU 计算还是上移吧
  • RDS 本身对这些功能的支持有一定的局限性
  • 可以一定程度使用视图

2.18 表名和字段名命名规范

  • 都统一使用大写,下划线风格,不超过 30 个字,根据业务命名,不得中英混用
  • 英文尽量不要使用缩写
  • 表名如果是英文命名请使用单数

2.19 原则上表字段必须小于 30 个 l 过多字段应该考虑使用扩展表

  • 该规则根本目的是为了防止表过大
  • 如果每个字段都很轻,可以允许超出
  • 从其他数据库同步、其他方式导入等场景例外

2.20 固定长度字符列尽量使用定长 char

  • 对于数据大多在固定长度范围的 varchar 可转换为 char
  • 使用 char 的时候必须是 not null

2.21 字段要适当冗余

  • 打破范式设计,适当冗余避免大量查询,ᨀ高查询性能
  • 冗余的字段应该更新不会过于频繁,或者有手段能让其更新不频繁

2.22 对于单表数据超过 300 万行才允许分库分表

  • 设计时应评估三年内的数据量情况,能达到 300 万行数据的才推荐使用分库分表
  • 对于本身数据量不大的表严禁分库分表

2.23 选择适当的存储长度

  • 深入了解业务,根据业务场景选择合适的长度,能省则省
  • 对于 varchar 等可扩展格式,不宜在最初就设定一个超长值
  • 在选择长度时应考虑各系统标准规范,如文件路径在 windows 下标准为 255
  • 也要考虑横向第三方系统兼容性,老旧系统兼容性,特别是金三系统

2.24 字段命名与取长尽力保持已知习惯

  • 业内知名的必须使用,如 qymc、nsrsbh、shxydm 等 l 大家已经习惯的强烈建议保持,比如 xxsx(显式顺序)、node_id 等 l 目前系统记录作者字段采用了 varchar(30),那么后续应尽量使用该长度

三. SQL 语句规范

3.1 SQL 语句要尽可能简单

  • 一条 SQL 只能在一个 CPU 中运算,一条大的 SQL 可以长期占有 CPU 最终会堵死整个数据库,且没有任何中断处理措施.
  • 把大语句拆成小语句,通过代码逻辑多次执行,减少锁时间
  • 如果涉及到事务,更加要注意事务要尽可能的小,执行时间短,比如上传图片、加工表数据就决不能使用事务,带事务的处理必须在 100ms 以内
  • 在 DRDS 模式下,禁止使用触发器和函数,在 SQL 里也不允许出现自定义函数,官方函数除聚合函数之外也应该尽量少使用
  • 所有 SQL 必须在 1000ms 内返回,原则上在 100ms 内返回

3.2 禁止使用 SELECT *

  • SELECT 会额外消耗 CPU、IO、网络资源
  • SELECT * 也不利于程序扩展性,一旦字段变动程序就会出现问题
  • 不能有效的利用覆盖索引

3.3 limit 高效分页

  • limit 越大,效率越低,所以当存在很大的 limit 情况时应考虑通过线性 ID来取巧设计.select id from t limit 1000000, 10; 可以改为 =>select id from t where id > 1000000 limit 10;
  • 当 limit 起始为很大数时,应考虑业务上给予限制,比如分页结果应该限制最多翻到 1000 页.

3.4 建议使用 union all 替代 union

  • union 有去重开销,首先业务上查询时就不应该存在重复,确有重复可以在代码中去除
  • 大多数业务场景都可以在业务端保证不重复
  • 要正确认识和在正确场景下分别使用 union all 和 union

3.5 统一使用 count(*)

  • count(*)是 SQL 标准写法
  • count(*)会统计值为 NULL 的行,而 count(列名)不会统计,但是出现统计非 NULL 行这种业务本身就不合理
  • 一定要统计指定列非空场景则至少应该使用 count(distinct 列名)

3.6 JOIN 使用规范

  • 尽量不用连接 JOIN,保持单表查询
  • JOIN 连接字段的数据类型保持一致,避免类型转换,甚至全表扫᧿
  • JOIN 连接字段原则建议都带有索引,主表侧必须带有索引
  • 表很小的情况下可以没有索引,保持索引区分度
  • 原则上禁止 3 张自建业务表以上的 JOIN
  • 禁止大表使用 JOIN,会产生大量临时表,极大消耗数据库内存

3.7 禁止使用 OR

  • RDS 对 OR 子句的支持很差,虽然 MySQL 后续版本已经优化,但是当前 RDS还是存在严重 OR 子句问题
  • 可以把 OR 改写为 IN 子句或者 UNION 子句

3.8 禁止使用 INSERT INTO 表名 VALUES(xxx)

  • 必须显示指定插入的列清单,容易在增加或者删除字段后出现程序 BUG

3.9 禁止使用属性隐式转换

  • 禁止查询时类型不匹配,如数据库是字符串类型,查询时给了 int 类型,会导致全表扫比如: SELECT xxx FROM 表名 WHERE 字符串类=123456

3.10 禁止在 WHERE 条件的属性上使用函数或者表达式

  • 应该对条件值使用函数而不是对条件列使用函数,禁止对条件列使用函数 如:SELECT xxx FROM 表名 WHERE from_unixtime(day)>=’2017-02-15’正确的写法是:SELECT xxx FROM 表名 WHERE day>= unix_timestamp(‘2017-02-15’)

3.11 禁止负向查询

  • 负向查询条件:NOT、!=、<>、!<、!>、NOT IN、NOT LIKE 等,会导致全表扫

3.12 禁止左模糊匹配

  • %开头的左模糊查询无法利用索引,会导致全表扫

3.13 使用 ISNULL()来判断是否 NULL 值 l NULL = NULL 的结果返回值是 NULL 而不是 true

  • NULL=1 的结果返回值是 NULL 而不是 true
  • 对 null 的处理时候,只能采用 is null 或 is not null,而不能采用=、in、<、<>、!=、not in 这些操作符号。如:WHERE name != ’zhuyungao’,如果存在 name 为null 值的记录,查询结果就不会包含 name 为 null 值的记录

3.14 查询尽量利用索引

  • 使用索引需注意 where 条件以及排序、分组,比如有联合索引 idx_a_b_c
    where a=1 and b=2 and c=3 可以完全利用索引
    where a=1 and c=3 and b=2 也可以利用索引,但是需要一层内存转换消耗
    where a=1 and b>2 and c=3 仅可以利用 a、b 列索引
    where a=1 and b like ‘hello%’ order by 3 仅可以利用 a、b 列索引
    where a=1 and b=2 group by c 可以利用 a、b、c 列索引
  • 可以查看执行计划确认索引使用情况
    *至少需要达到 range 级别以上

3.15 禁止使用 DELETE 语句
*业务表数据都通过软删除来处理

  • DELETE 语句一律运维操作执行

3.16 数据库一次查询行数必须小于 1000 行 l 超过 1000 行一定要分页查询
3.17 禁止 IN 配合 SQL 子句
*IN (params),必须保证 params 为常量值,而不能是 SQL 子句

  • 尽量应转换为 JOIN 查询

3.18 SQL 中不允许出现密钥或密码

  • 可以先使用占位符代替在私聊给数据库执行人

四. 索引规范

4.1 唯一特性组合字段必须有索引

  • 业务上唯一的单个字段必须加上唯一索引
  • 业务上多个字段组合逻辑唯一必须加上联合唯一索引,不能仅在程序里逻辑保证

4.2 JOIN 列必须有索引

  • JOIN 列上务必有索引且类型相同,防止全表扫᧿

4.3 尽量不要在 varchar 上建立索引

  • 在 varchar 字段上需要索引说明表设计存在一定的缺陷或业务流程为能理清
  • 一定要在 varchar 上建立索引则必须指定索引长度

4.4 联合索引顺序规范

  • 联合索引首先按照常使用的业务查询顺序排列,适当调整业务查询顺序
  • 再者按照列的区分程度高低顺序排列,区分程度高的一定要在前
  • ORDER BY 的字段放在联合索引最后,防止 file_sort

4.5 单表索引不得超过 5 个 l 可以考虑使用联合索引合并多个索引,MySQL 中联合索引支持左匹配
4.6 一个索引的字段不得超过 6 个 l 一个业务特性需要索引 6 个列来区分,本身就存在问题
4.7 建立索引的区分度必须小于千分之一

  • 索引能定位到的行占总行的千分之一以下
  • 区分度低的索引不仅不能ᨀ高查询效率,反而大幅降低插入效率
  • 覆盖记录条数过多的列绝不能建索引,例如“性别”

4.8 索引命名规范

  • 唯一索引使用 uk_xxxx,根据业务取名,不得中英混合,不得超过 32 个字
  • 其它索引使用 idx_xxxx,根据业务取名,不得中英混合,不得超过 32 个字

五. 业务类规范

5.1 业务表前缀指定

  • A业务表统一使用前缀 A_ l

5.2 业务表必须包含字段

  • 必须包含 creator 字段,用于记录行创建人,不用于业务记录
  • 必须包含 creator_date 字段,用于记录行创建时间,不用于业务记录
  • 必须包含 modifier 字段,用于记录行修改人
  • 必须包含 modification_date 字段,用于记录行修改时间
  • 拼音模式下为 cjr、cjsj、xgr、xgsj

5.3 业务字段命名规范

  • 手机号码统一英文使用 mobile、拼音统一使用 sjhm
  • 启用标记统一英文使用 enabled、拼音统一使用 qybj

5.4 加密类规范

  • 反射 Bean 和 Mapper 文件时必须使用公司的反射工具
  • 公司的反射工具通过字段注释来区分采用何种方式加密并自动生成加解密 TypeHandler
  • #CryptBase36#代表加密结果集在 26 个字母+10 个数字范围内
  • #CryptBase52#代表加密结果集在 52 个字母+10 个数字范围内
  • #CryptSimple#加密结果在字符串范围内

六. 程序使用规范

6.1 禁止使用 Mybatis 的部分用法

  • 禁止使用 association 关键字
  • 禁止使用 collection 作为映射中关键字
  • 禁止使用 select 作为结果映射中关键字

七. 其他规范

7.1 数据库审批流程附件必须以.sql 结尾

  • SQL 必须以附件形式出现
  • 附件必须以.sql 结尾

7.2 表结构修改必须说明目的

  • 涉及到结构修改应说明修改目的、设计思维
  • 另需对比说明原来为何如此设计、现在这样改带来的好处

7.3 审批流程仅指执行生产库

  • 开发库可以由开发自行执行
  • 测试库在事业部负责人同意情况下可自行执行

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Monotone Increasing Digits

发表于 2021-11-28

「这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战」。

LeetCode题目链接

Given a non-negative integer N, find the largest number that is less than or equal to N with monotone increasing digits.
(Recall that an integer has monotone increasing digits if and only if each pair of adjacent digits x and y satisfy x <= y.)
Note: N is an integer in the range [0, 10^9].

给一个非负整数N,找出最大的、不大于N的monotone increasing digits X。所谓monotone increasing digits 指的是从左往右看,每位数字都是单调非减,如12345, 12233。

X要满足两个条件,①X<=N且X要尽可能大 ②X的各位数字dandianfeijian。该怎么找这样的数字呢?从哲学的角度想,解铃还须系铃人,肯定要从N入手。

首先如果N本身就是monotone increasing digits ,那么X=N;如果N不是,那么就需要依据N构造出X。要满足条件①,从高位开始X的数字要和N的数字尽可能相等;当不能相等时(假设是第i位),也就是为了满足条件②,我们只能第i-1位数字减小,i-1后的数字变为9。

1
2
3
ini复制代码	以N=1234532为例:
N = 1 2 3 4 5 3 2
X = 1 2 3 4 5 _ _

第i=6位时,X的第6位如果取3则不满足条件②,又不能取一个更大的数字,这样会违反条件①,这种情况下只能让第i-1=5位数字减一:

1
2
ini复制代码	N = 1 2 3 4 5 3 2
X = 1 2 3 4 4 _ _

此时X已经小于N,故后面的数字全取9也不会大于N,同时也满足条件②。
看起来这样已经解决这个问题了,其实我们忽略了一个地方,那就是第i-1位减一后,可能会出现Xi−2>Xi−1X_{i-2} \gt X_{i-1}Xi−2​>Xi−1​ 的情况,这样就违反了条件②。如:33321

1
2
ini复制代码	N = 3 3 3 2 1
X = 3 3 2 9 9(之前做法得到的结果)

所以我们还需要向前检查,当发生Xi−1>XiX_{i-1} \gt X_{i}Xi−1​>Xi​ 的情况时,Xi−1X_{i-1}Xi−1​ -= 1, 继续向前检查,直到满足Xi−1≤XiX_{i-1} \le X_{i}Xi−1​≤Xi​ , 然后从i开始所有的数字取9。

1
2
ini复制代码	N = 3 3 3 2 1
X = 2 9 9 9 9
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
ini复制代码class Solution {
public:
int monotoneIncreasingDigits(int N) {
string str = to_string(N);
int len = str.size();
if (len == 1) return N;
for (int i = 1;i < len;++i){
if (str[i] < str[i-1]){//找到违反单调不减的下标
int t = i;
while (t>=1 && str[t-1]>str[t]){//向前检查
--str[t-1];
--t;
}
for (int k =t+1;k < len;++k)//t之后的数字取9
str[k] = '9';
break;
}
}
cout << str << endl;
return atoi(str.c_str());
}
};

看到还有人的做法如下:

解题思路:从高位向低位遍历整数N的各位数,找到第一个违反单调不减的数的下标x将x及x后的所有数替换为0,记得到的新数为M,则M - 1即为答案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
python复制代码class Solution(object):
def monotoneIncreasingDigits(self, N):
"""
:type N: int
:rtype: int
"""
sn = str(N)
size = len(sn)
flag = False
for x in range(size - 1):
if sn[x] > sn[x + 1]:
flag = True
break
if not flag: return N
while x > 0 and sn[x - 1] == sn[x]: x -= 1
y = len(sn) - x - 1
return (N / (10 ** y)) * (10 ** y) - 1

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【Redis】重学主从模式

发表于 2021-11-28

这是我参与11月更文挑战的第27天,活动详情查看:2021最后一次更文挑战

一、概述

即使用了 AOF 和 RDB,单机 Redis 也依然存在服务不可用的问题。
例如:一个 Redis 实例宕机了,之后在恢复期间,是无法服务新来的数据。

Redis 具有高可靠性,意味着:

  1. 数据尽量少丢失:AOF 和 RDB 保证了。
  2. 服务尽量少中断:增加副本冗余量。

即将一份数据同时保存在多个实例上。
即使有一个实例出现了故障,需要过一段时间才能恢复,其他实例也可以对外提供服务,不会影响业务使用。

单机的 redis 几乎不太可能 QPS 超过 10w。

要提高并发,一般的方案是 读写分离。

多实例保存同一份数据,就会存在多实例(副本)之间的数据如何保持一致性问题?

Redis 提供了主从模式,以保证数据副本的一致,主从库之间采用读写分离方式:

  1. 读操作:主库、从库读可以接收。(主要是从库)
  2. 写操作:首先到主库执行,之后主库同步给从库
    2021-11-2716-32-37.png

那么问题来了:主从库同步是如何完成的呢?

主从库同步

先来了解下,Redis 主从库同步三种模式:

  • 全量复制
  • 基于长连接的命令传播
  • 增量复制

启动多个 Redis 实例,可以通过 replicaof (Redis 5.0 之前使用 slaveof)命令来形成主库和从库关系。

举个栗子:

  • 实例1 (ip: 172.16.19.3)
  • 实例2 (ip:172.16.19.5)

在实例2上执行 replicaof 命令,使实例2变为实例1的从库:

1
shell复制代码replicaof  172.16.19.3  6379

主从库数据同步的三个阶段,如下:

  • 第一阶段:建立连接,协商同步
  • 第二阶段:主库同步数据给从库
  • 第三阶段:主库发送新写命令给从库
    2021-11-2716-46-21.png

第一阶段:建立连接,协商同步

第一阶段:是主从库间建立连接、协商同步的过程,主要是为全量复制做准备。

  1. 从库发送命令 psync:
    从库会发送 psync 命令(表示需要进行数据同步)给主库,主库会根据命令的参数来进行复制。
    psync 命令包含了主库的 runID 和 复制进度 offset:
  • runID:是每个 Redis 实例启动时都会自动生成的一个随机 ID,用来唯一标记这个实例。

当从库和主库第一次复制时,因为不知道主库的 runID,所以将 runID 设为“?”。

  • offset:此时设为 -1,表示第一次复制。
  1. 主库接收 psync 命令后:会用 FULLRESYNC 命令进行响应
  • 主库会执行 bgsave 命令,生成对应的 RDB 文件。

FULLRESYNC 响应表示第一次复制采用的全量复制。
参数:主库 runID 和 主库目前的复制进度 offset

第二阶段:主库同步数据给从库

  1. 从库:
    从库接收到 RDB 文件后,会先清空当前数据库,然后加载 RDB 文件。

因为是第一次全量复制,所以从库里之前的数据都是脏数据。

  1. 主库:
    主库会在内存中用专门的 replication buffer,记录 RDB 文件生成后收到的所有写操作。

全量复制只是当前的时刻主库的全量,之后会有增量同步,所以需要记录入每个从库的进度。

第三阶段:主库发送新写命令给从库

这个阶段是增量同步。

当主从库完成了全量复制,它们之间就会一直维护一个网络连接:避免频繁建立连接。

  1. 主库:
    同步之后的写命令给从库。
  • 发送写命令
  • 同时会在 replication buffer 中修改对应操作
  1. 从库:
    从库只需接收命令,在本地内存中执行。

二、问题

(1)集群过大:主从级联模式

当集群过大后,影响主库主要操作有:

  1. fork 操作:如果从库数量过多,且都要和主库进行全量复制,就会导致主库忙于 fork 子进程进行 RDB 文件。

fork 操作会阻塞主线程处理正常请求,从而导致主库响应应用程序的请求速度变慢。

  1. 传输 RDB 文件:占用主库的网络带宽

为了分担主库压力:可以采用 主 - 从 - 从 模式。

说白了,把从库假装成主库,在其下在进行关联子从库。

1
2
shell复制代码# 命令操作类似:
replicatof 所选从库IP 6379

2021-11-2809-23-15.png

(2)主从库断连会怎样?断连重连又是如何?

1)主从断连:可能会造成数据不一致

当主从库断连后:

  1. 主库会把断连期间收到的写操作命令,写入 replication buffer:存储写命令
  2. 同时也会把这些操作命令写入 repl_backlog_buffer 缓冲区:存储记录

repl_backlog_buffer 是一个环形缓存区:

  • 主库会记录自己写道的位置
  • 从库则会记录自己已经读到的位置

repl_backlog_buffer 是一个环形缓冲区,当写满之后,会覆盖之前写入的操作。

如果从库的读取速度慢,就可能导致从库还未读取,主库已经开始覆盖写入了。这就会造成主从库间的数据不一致。

为避免这一情况,需要根据实际环境计算出合理的缓存区大小:

  • 缓冲空间的计算公式是:缓冲空间大小 = 主库写入命令速度 * 操作大小 - 主从库间网络传输命令速度 * 操作大小。
  • 在实际应用中,需要考虑到可能存在一些突发的请求压力,通常需要把这个缓冲空间扩大一倍,即 repl_backlog_size = 缓冲空间大小 * 2。

举个栗子:

  • 主库每秒写入 2000 个操作
  • 每个操作大小为 2KB
  • 网络每秒能传输 1000 个操作

这时候就有 1000 个操作需要缓冲起来,就至少需要 2MB 的缓冲区(repl_backlog_size)。

为了应对可能的突发压力,可以把 repl_backlog_size 设置为 4MB

2)断连后又重连

大致流程如下:
2021-11-2809-56-00.png

断开重连,需要知道主从双方需要知道对方情况:

  • 从库:我的状况是这样的
  • 主库:我看你需要这样的

所以,主从库的连接恢复之后:

  • 从库首先会给主库发送 psync 命令,并把自己当前的 slave_repl_offset 发给主库
  • 主库会判断自己的 master_repl_offset 和 slave_repl_offset 之间的差距
    • master_repl_offset:主库写入的位置,偏移量
    • slave_repl_offset:从库已复制的偏移量

在网络断连阶段,主库可能会收到新的写操作命令,所以,一般来说,master_repl_offset 会大于 slave_repl_offset。

此时,主库只用把 master_repl_offset 和 slave_repl_offset 之间的命令操作同步给从库就行。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring AOP之AspectJ注解

发表于 2021-11-28

这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战

Spring AOP之@AspectJ注解

Spring2.0之后发布的新的特性:

  • @AspectJ注解到POJO来定义Aspect以及Advice。
  • xml配置的简化,引入aop独有的命名空间方式,来配置aop。

Spring会搜索注解了@AspectJ的类,然后将他们织入系统中。

使用

定义一个Aspect如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码@Aspect
public class PerformanceTraceAspect {
private final Log logger = LogFactory.getLog(PerformanceTraceAspect.class);

@Pointcut("execution(public void *.method1()) || execution(public void *.method2())")
public void pt() {}

@Around("pt()")
public Object performanceTrace(ProceedingJoinPoint joinPoint) throws Throwable {
StopWatch watch = new StopWatch();
try {
watch.start();
return joinPoint.proceed();
} finally {
watch.stop();
if (logger.isInfoEnabled()) {
logger.info("PT in method[" + joinPoint.getSignature().getName() + "]>>>" + watch.toString());
}
}

}
}

如何将这个Aspect织入到我们的目标对象中?有2种方式:

编程织入方式

使用AspectJProxyFactory:

1
2
3
4
5
java复制代码AspectJProxyFactory weaver = new AspectJProxyFactory();
weaver.setProxyTargetClass(true);
weaver.setTarget(new Foo());
weaver.addAspect(PerformanceTraceAspect.class); //将切面添加进去
Object proxy = weaver.getProxy();
自动代理织入方式

使用AutoProxyCreator:AnnotationAwareAspectJAutoProxyCreator。它会自动收集IOC容器中注册的Aspect,并作用到目标对象上。

只需要在配置文件中注入AnnotationAwareAspectJAutoProxyCreator,spring2.x之后提供了更简单的配置:

1
2
xml复制代码<aop:aspectj-autoproxy proxy-target-clas="true">
</aop:aspectj-autoproxy>

@AspectJ形式的Pointcut

1
2
java复制代码@Pointcut("execution(public void *.method1()) || execution(public void *.method2())")
public void pt() {}

SpringAOP只集成了AspectJ的Pointcut的部分功能,其中包括Pointcut的语言支持。

@Aspect的pointcut申明方式主要包含2个部分:

  • Pointcut Expression
    • 真正指定Pointcut的地方。
    • 表达式中可以&& || !这些逻辑运算符
  • Pointcut Signature
    • 需要一个定义的方法做为载体,这个方法必须是void类型
    • 如果该方法是public的,那么这个pointcut可以被其他的Aspect引用,如果是private那么只能被当前Aspect类引用。

Aspectj的pointcut表述语言中有很多标志符,但是SpringAOP只能是用少数的几种,因为Spring只对方法级别的pointcut。

  • execution
    • 规定格式:execution(<修饰符模式>?<返回类型模式><方法名模式>(<参数模式>)<异常模式>?)
    • 只有返回类型,方法名,参数模式是必须的,其他的可以省略。
    • 这里面我们可以使用2种通配符
      • * 匹配任意的意思
      • ..当前包以及子包里面所有的类
  • within
    • 只接受类型的声明,会匹配指定类型下面所有的Jointpoint。对SpringAOP来说及,匹配这个类下面所有的方法。
  • this和target
    • this指代方法调用方,target指被调用方。
    • this(o1) && this(o2) 即表示当o1类型对象,调用o2类型对象的方法的时候,才会匹配。
  • args
    • 指定参数的类型,当调用方法的参数类型匹配就会捕捉到。
  • @within
    • 指定某些注解,如果某些类上面有指定的注解,那么这个类里面所有的方法都将被匹配。
  • @target
    • 目标类是指定注解的时候,就会被匹配,SpringAOP中和@within没什么区别,只不过@within是静态匹配,@target是运行时动态匹配。
  • @args
    • 如果传入的参数的类型 有其指定的注解类型,那么就被匹配。
  • @annotation
    • 系统中所有的对象的类方法中,有注解了指定注解的方法,都会被匹配。

这些注解的pointcut在spring内部最终都会转为具体的pointcut对象。

@AspectJ形式的Advice

主要就是一些Advice的注解:

  • @Before
    • 想要获取方法的参数等信息:可以2种方法
      • 第一个参数设置为JoinPoint,这个参数必须要放在第一个位置,并且除了Around Advice和Introduction不可以用它,其他的Advice都可以使用。
      • args标志符绑定(不常用)
  • @AfterReturning
    • 有一个独特属性:returning,可以获取到方法返回值。
  • @AfterThrowing
    • 有一个独特属性:throwing 可以接受抛出的异常对象。
  • @After(也叫finally)
    • 一般做资源的释放工作的
  • @Around
    • 它的第一个参数必须是ProceedingJoinPoint类型,且必须指定。通过ProceedingJoinPoint的proceed()方法执行原方法的调用。
    • proceed()方法需要传入参数,则传入一个Object[]数组。
  • @DeclareParents
    • 处理Introduction的,不多描述了。

其他

advice的执行顺序
  • 如果这些advice都在一个aspect类里面:

相同的advice按照申明顺序做优先级,但是注意一点,BeforeAdvice先申明优先级高,则先执行。而AfterReturningAdvice则是先申明优先级高,但是优先级高的越后执行。

1
2
3
4
5
erlang复制代码before1
before2
task
after returning 2
after returning 1
  • 如果这些Advice在不同的Aspect里面:

借助于Ordered接口,否则Advice的执行顺序是无法确定的。

Aspect的实例化模式

有3种:singleton,perthis,pertarget。(不太重要)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

定义Redis工具类

发表于 2021-11-28

「这是我参与11月更文挑战的第26天,活动详情查看:2021最后一次更文挑战」

通过上一篇文章的学习:SpringBoot中集成Redis,已经对Redis在SpringBoot中的使用有了初步的认识,可以使用RedisTemplate对象进行缓存数据的处理。

但是,在实际项目开发中使用Redis的地方很多,每次使用都要注入RedisTemplate对象的话属实麻烦,且SpringBoot提供的RedisTemplate类中功能较多,不应该暴露在太多地方,可以根据项目开发需要自定义合适的Redis工具类进行使用。

  1. RedisTemplate操作Redis

SpringBoot中集成Redis后,使用了如下的方式来测试Redis是否可用:

1
2
3
4
5
6
java复制代码@Test
public void testRedis(){
   ValueOperations<String, String> operations = redisTemplate.opsForValue();
   operations.set("name", "tom");
   System.out.println(operations.get("name"));
}

其中,ValueOperations<K, V>类作SpringBoot为Redis数据定义的类,在RestTemplate中还定义了其他相似的类来满足Redis中不同的数据类型的需要。

image-20211128111033226

其中包含有:

  • ValueOperations:操作简单的字符串K-V
  • ListOperations:处理list类型的数据
  • HashOperations:处理hash类型的数据
  • SetOperations:处理set类型数据
  • ZSetOperations:处理zset类型数据

除了使用opsForValue()方法获取对应的ValueOperations<K, V>对象并设置对应的键和值外,还可以使用链式方法restTemplate.boundValueOps("name").append("tom")来设置缓存数据。

对RedisTemplate类对象中的数据处理方法简单了解后,就可以在此基础上进行Redis工具类的封装了。

  1. Redis工具类基本功能

根据项目中的功能需求程度,定义一个基本的Redis工具类需要提供功能有:

  • 数据的放入和获取
  • 数据的存在判断逻辑
  • 数据有效期设置
  • 数据失效处理
  • 数据的删除
  • 其他如缓存长度、所有key值等方法

其中,数据的放入、获取等需要根据Redis中不同的数据结构类型实现不同的逻辑,而针对缓存key的操作可以进行统一处理。

  1. 工具类实现

工具类定义主要就是在RedisTemplate类对象的基础上进行封装,只需要在此处注入对象,其他地方使用时直接调用工具类中对应方法即可。

3.1 工具类的结构实现

新增一个RedisUtil工具类,其中定义Redis操作相关方法,并使用@Component作为Spring容器组件,保证可以注入并使用RedisTemplate对象。

1
2
3
4
5
6
7
8
9
java复制代码//Redis工具类
@Component
public final class RedisUtil{
   @Autowired
   private RedisTemplate<String, Object> redisTemplate;
   
   //使用redisTemplate实现数据操作
  ...
}

3.2 缓存数据的放入和获取

处理Redis数据时,最基本的功能莫过于在缓存中存放和获取数据,并根据缓存需要设置有效时间,针对Redis中不同的数据结构需要实现不同的逻辑。

  • String数据结构,最简单的键值对
  • list数据结构,字符串的集合,存放多个字符串
  • hash数据结构,key-value结构的集合信息,需要额外的对key的操作
  • set数据结构,不重复的字符串的集合结构,或者有序的zset结构

对于最简单的键值对,数据作为字符串被放入Redis中,只需要调用redisTemplate对象中的opsForValue()获取到字符串操作对象,并讲字符串数据放入Redis中,获取时根据指定key返回字符串即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
java复制代码//String类型数据的放入和获取
public boolean set(String key, Object value) {
   try {
       redisTemplate.opsForValue().set(key, value);
       return true;
  } catch (Exception e) {
       e.printStackTrace();
       return false;
  }
}
public boolean set(String key, Object value, long time) {
   try {
       if (time > 0) {
           redisTemplate.opsForValue().set(key, value, time, TimeUnit.SECONDS);
      } else {
           set(key, value);
      }
       return true;
  } catch (Exception e) {
       e.printStackTrace();
       return false;
  }
}
public Object get(String key) {
   return key == null ? null : redisTemplate.opsForValue().get(key);
}

而对于hash类型的数据,因为数据类型是一个包含多组key-value的集合,因此不仅需要针对Redis中的key进行处理,还需要对hash集合中的key提供查询、增加、删除等功能逻辑。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码//hash数据的放入、获取等操作
public boolean hset(String key, Map<String, Object> map) {
   try {
       redisTemplate.opsForHash().putAll(key, map);
       return true;
  } catch (Exception e) {
       e.printStackTrace();
       return false;
  }
}
public boolean hset(String key, Map<String, Object> map, long time) {
   try {
       redisTemplate.opsForHash().putAll(key, map);
       if (time > 0) {
           expire(key, time);
      }
       return true;
  } catch (Exception e) {
       e.printStackTrace();
       return false;
  }
}
public Map<Object, Object> hget(String key) {
   return redisTemplate.opsForHash().entries(key);
}
//针对hash中的指定key的操作
public boolean hItemSet(String key, String item, Object value) {
   try {
       redisTemplate.opsForHash().put(key, item, value);
       return true;
  } catch (Exception e) {
       e.printStackTrace();
       return false;
  }
}
public Object hItemGet(String key, String item) {
   return redisTemplate.opsForHash().get(key, item);
}
​
public boolean hItemHas(String key, String item) {
   return redisTemplate.opsForHash().hasKey(key, item);
}
public void hItemDel(String key, Object... item) {
   redisTemplate.opsForHash().delete(key, item);
}

3.3 缓存key的操作处理

无论是哪种数据,在Redis中存放是都需要指定一个最外层的redis_key,并以此作为存放和获取的标志,Redis中的key是唯一存在的,对于一些过期、删除、查找等操作可以直接操作key完成。

  • 设置key过期时间
  • 查看指定key过期时间
  • 判断缓存key是否存在
  • 删除指定key
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
java复制代码//设置指定key缓存有效期
public boolean expire(String key, long time) {
   try {
       if (time > 0) {
           redisTemplate.expire(key, time, TimeUnit.SECONDS);
      }
       return true;
  } catch (Exception e) {
       e.printStackTrace();
       return false;
  }
}
​
//获取指定key的有效期
public long getExpire(String key) {
   return redisTemplate.getExpire(key, TimeUnit.SECONDS);
}
​
//判断指定key在缓存中是否存在
public boolean hasKey(String key) {
   try {
       return redisTemplate.hasKey(key);
  } catch (Exception e) {
       e.printStackTrace();
       return false;
  }
}
​
//删除指定key
@SuppressWarnings("unchecked")
public void del(String... key) {
   if (key != null && key.length > 0) {
       if (key.length == 1) {
           redisTemplate.delete(key[0]);
      } else {
           redisTemplate.delete(Arrays.asList(key));
      }
  }
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

starlette源码分析

发表于 2021-11-28

「这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战」

前记

上一篇分析了uvicorn, 但是uvicorn只是一个ASGI容器, 真正处理请求的是ASGI应用程序,而starlette是最出名也是最标准的ASGI应用程序, 通过了解starlette, 可以了解到每个组件都是ASGI APP的设计理念, 了解ASGI APP的兼容性, 能更完整的理解ASGI生态。

NOTE: 使用了几年的starlette以来, 简单了翻过了几次源码, 觉得starlette堪称工艺品, 设计很完美, 各种逻辑实现起来很简单(也可能是我一开始就使用了sanic框架), 从使用至今, 除了初始化中间件, 在中间件读取body以及官方示例文档比较少这些缺点外, 感觉不出有其他的槽点。

最新修订见原文, 关注公众号<博海拾贝diary>可以及时收到新推文通知

NOTE: 本文偏代码+注释比较多

1.starlette的应用

在之前的文章了解过, Uvicron通过一个通用的协定接口与ASGI应用程序交互, 应用程序只要实现如下代码, 即可通过Uvicorn发送和接收信息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
Python复制代码async def app(scope, receive, send):
# 一个最简单的ASGI应用程序
assert scope['type'] == 'http'
await send({
'type': 'http.response.start',
'status': 200,
'headers': [
[b'content-type', b'text/plain'],
]
})
await send({
'type': 'http.response.body',
'body': b'Hello, world!',
})


if __name__ == "__main__":
# uvicorn服务
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000, log_level="info")

而使用uvicorn启动starlette的方式是:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Python复制代码from starlette.applications import Starlette
from starlette.middleware.gzip import GZipMiddleware


app: Starlette = Starlette()


@app.route("/")
def demo_route() -> None: pass


@app.websocket_route("/")
def demo_websocket_route() -> None: pass


@app.add_exception_handlers(404)
def not_found_route() -> None: pass


@app.on_event("startup")
def startup_event_demo() -> None: pass


@app.on_event("shutdown")
def shutdown_event_demo() -> None: pass


app.add_middleware(GZipMiddleware)


if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="127.0.0.1", port=5000)

这段代码Starlette先执行了初始化, 然后注册路由,异常处理, 事件,中间件到自身, 然后传给uvicorn.run, uvicorn.run通过调用starlette的__call__的方法传递请求数据。

简单的了解完启动后, 先从starlette初始化看是分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
Python复制代码class Starlette:
def __init__(
self,
debug: bool = False,
routes: typing.Sequence[BaseRoute] = None,
middleware: typing.Sequence[Middleware] = None,
exception_handlers: typing.Dict[
typing.Union[int, typing.Type[Exception]], typing.Callable
] = None,
on_startup: typing.Sequence[typing.Callable] = None,
on_shutdown: typing.Sequence[typing.Callable] = None,
lifespan: typing.Callable[["Starlette"], typing.AsyncGenerator] = None,
) -> None:
"""
:param debug: 决定是否启用debug功能
:param route: 一个路由列表, 提供HTTP和WebSocket服务.
:param middleware: 中间件列表, 应用于每个请求
:param exception_handler: 存放异常回调的字典, 键为HTTP状态码, 值为回调函数
:on_startup: 启动时调用的回调函数
:on_shutdown: 关闭时的回调函数
:lifespan: ASGI中的lifespan功能
"""

# 这里表示如果有传入lifespan, 则不可传入on_startup以及on_shutdown
# 因为本质上starlette的通过把on_start_up和on_shutdown转为一个lifespan来接收uvicorn调用的
assert lifespan is None or (
on_startup is None and on_shutdown is None
), "Use either 'lifespan' or 'on_startup'/'on_shutdown', not both."

# 初始化变量
self._debug = debug
self.state = State()
self.router = Router(
routes, on_startup=on_startup, on_shutdown=on_shutdown, lifespan=lifespan
)
self.exception_handlers = (
{} if exception_handlers is None else dict(exception_handlers)
)
self.user_middleware = [] if middleware is None else list(middleware)
# 构建中间件
self.middleware_stack = self.build_middleware_stack()

通过代码可以看到初始化这里已经满足了大多数功能了, 不过还有一个构建中间件的函数, 需要进一步分析:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
Python复制代码class Starlette:
def build_middleware_stack(self) -> ASGIApp:
debug = self.debug
error_handler = None
exception_handlers = {}

# 解析异常处理的回调, 分别存放在error_handler和exception_handlers
# 只有HTTP状态码为500的才会存入到error_handler
for key, value in self.exception_handlers.items():
if key in (500, Exception):
error_handler = value
else:
exception_handlers[key] = value

# 为不同种类的中间件排好顺序
# 第一层为ServerErrorMiddleware, 它能在发现异常的时候打印错误堆栈, 或者在debug模式的时候展示错误页面, 方便调试
# 第二层是用户中间件层, 用户自己注册的所有中间件都会存放在这里
# 第三层是ExceptionMiddleware, 它是异常处理层, 它会处理路由执行时抛出的所有异常
middleware = (
[Middleware(ServerErrorMiddleware, handler=error_handler, debug=debug)]
+ self.user_middleware
+ [
Middleware(
ExceptionMiddleware, handlers=exception_handlers, debug=debug
)
]
)

# 最后把中间件装填到app中
app = self.router
for cls, options in reversed(middleware):
# cls是中间件类本身, options也就是我们传的参数
# 可以看出中间件本身也是一个ASGIAPP, 装填中间件就是一个ASGI APP套上另外一个ASGI APP, 一直套娃。
app = cls(app=app, **options)

# 由于中间件的装填方式是不断的套娃, 而调用是不断的通过`call_next`调用装填它的上级ASGI APP, 所以要采用逆序的方法
return app

构建完中间件后, 初始化就算完成了, 接着就会通过uvicorn.run方法从而调用到__call__方法:

1
2
3
4
Python复制代码class Starlette:
async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
scope["app"] = self
await self.middleware_stack(scope, receive, send)

这个方法很简单, 就是通过scope, 把app设置到请求流程中, 方便后续调用, 然后通过调用middleware_stack开始请求的处理。 通过这个方法和中间件的初始化可以看出, starlette中的中间件本身也是一个ASGI APP(也可以看出route是一个ASGI APP, 处于调用栈的最后一层), 同时starlette也把异常的处理也交给了中间件处理, 这在其他的Web应用框架很少见到, 可以看出starlette的设计是每个组件都尽量是ASGI APP。

虽然starlette中间件的设计是非常不错的, 但是它的这种初始化方式我不太喜欢, 因为在编写的时候IDE无法帮你传入的参数做校验, 比如上面示例的GZip中间件, 你知道需要传minimum_size参数, 但是你有可能打错, 只是没到运行时的时候, 压根不知道它是否有异常:

1
2
3
> Python复制代码app.add_middleware(GZipMiddleware, minimum_size = 500)
>
>

我在设计我的rpc框架rap时也参考了startlette的中间件设计, 但是在这一块进行了优化, 不过与本篇文章关系不大, 有兴趣可以参考:github.com/so1n/rap/bl…

2.中间件

上面说到, 在startlette中, 中间件是一个ASGI APP, 所以在startlette的所有中间件都必定是一个满足如下形式的类:

1
2
3
4
5
6
Python复制代码class BaseMiddleware:
def __init__(self, app: ASGIApp) -> None:
pass

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
pass

在starlette.middleware中, 有很多的中间件实现, 他们都满足这一点, 不过本章节不会讲所有的中间件, 只会挑选几个有代表性的中间件从最靠近Route到远进行分析。

2.1.异常处理中间件-ExceptionMiddleware

第一个就是ExceptionMiddleware中间件, 这个中间件用户是不会直接接触到的(所以没有放在starlette.middleware里面), 而是通过下面的这个方法间接的接触到:

1
2
Python复制代码@app.app_exception_handlers(404)
def not_found_route() -> None: pass

当用户使用这个方法时, startlette会把回调函数挂在对应的字典里面, 这个字典以HTTP状态码为key, 回调函数为value。
当ExceptionMiddleware发现Route请求处理异常时, 可以通过异常的响应HTTP状态码找到对应的回调函数, 并把请求和异常传给用户挂载的对应的回调函数, 最后把用户的回调函数结果抛回上一个ASGI APP。
此外ExceptionMiddleware还支持异常注册, 当Route抛出的异常与注册的异常匹配时, 调用该异常注册的对应的回调函数。
该类的源码和注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
Python复制代码class ExceptionMiddleware:
def __init__(
self, app: ASGIApp, handlers: dict = None, debug: bool = False
) -> None:
self.app = app
self.debug = debug # TODO: We ought to handle 404 cases if debug is set.
# startletter是支持HTTP状态码和Exception两种类型
self._status_handlers = {} # type: typing.Dict[int, typing.Callable]
self._exception_handlers = {
HTTPException: self.http_exception
} # type: typing.Dict[typing.Type[Exception], typing.Callable]
if handlers is not None:
for key, value in handlers.items():
self.add_exception_handler(key, value)

def add_exception_handler(
self,
exc_class_or_status_code: typing.Union[int, typing.Type[Exception]],
handler: typing.Callable,
) -> None:
# 用户通过调用startlette app的方法挂载的异常回调最后都是通过该方法挂载到类里面的_status_handlers或者是_exception_handler里面
if isinstance(exc_class_or_status_code, int):
self._status_handlers[exc_class_or_status_code] = handler
else:
assert issubclass(exc_class_or_status_code, Exception)
self._exception_handlers[exc_class_or_status_code] = handler

def _lookup_exception_handler(
self, exc: Exception
) -> typing.Optional[typing.Callable]:
# 查找注册异常相关的回调函数, 通过mro发现异常的对应回调函数
#
# 用户挂载的可能是一个基类, 后续在遇到挂载异常的子类时, 也会调用基类注册的回调
# 比如用户注册了一个基类, 然后会有用户异常和系统异常两个异常都继承于这个基类
# 后续函数抛出用户异常或系统异常时, 都会执行到基类注册的对应回调
for cls in type(exc).__mro__:
if cls in self._exception_handlers:
return self._exception_handlers[cls]
return None

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
# 熟悉的ASGI 调用方法
if scope["type"] != "http":
# 不支持websocket请求
await self.app(scope, receive, send)
return

# 防止同一个响应产生多个异常
response_started = False

async def sender(message: Message) -> None:
nonlocal response_started

if message["type"] == "http.response.start":
response_started = True
await send(message)

try:
# 调用下一个ASGI APP
await self.app(scope, receive, sender)
except Exception as exc:
handler = None

if isinstance(exc, HTTPException):
# 如果是HTTPException异常, 则从注册HTTP回调字典中寻找
handler = self._status_handlers.get(exc.status_code)

if handler is None:
# 如果是普通的异常, 则从异常回调字典去寻找
handler = self._lookup_exception_handler(exc)

if handler is None:
# 找不到对应的异常, 则往上面抛
raise exc from None

# 一个响应只接收一次异常处理
if response_started:
msg = "Caught handled exception, but response already started."
raise RuntimeError(msg) from exc

request = Request(scope, receive=receive)
if asyncio.iscoroutinefunction(handler):
response = await handler(request, exc)
else:
response = await run_in_threadpool(handler, request, exc)
# 通过回调函数生成的response处理请求
await response(scope, receive, sender)

2.2.用户中间件

接着就是用户中间件了, 这个也是我们接触最多的中间件, 在使用starlette.middleware时, 我们都会继承于一个叫BaseHTTPMiddleware的中间件, 然后基于如下代码进行拓展:

1
2
3
4
5
6
7
8
9
10
11
12
Python复制代码class DemoMiddleware(BaseHTTPMiddleware):
def __init__(
self,
app: ASGIApp,
) -> None:
super(DemoMiddleware, self).__init__(app)

async def dispatch(self, request: Request, call_next: RequestResponseEndpoint) -> Response:
# before
response: Response = await call_next(request)
# after
return response

如果在请求之前进行预处理, 就在before块编写相关代码,如果要在请求之后进行处理的, 就在after块编写代码, 使用非常简单, 而且他们是处于同一个作用域的, 这就意味着该方法里面的变量不用通过上下文或者动态变量来传播(如果你接触了Django或者Flask的类中间件实现, 也就懂得了starlette这种实现的优雅)。

接下来就来看看它是怎么实现的, 代码非常简单, 大概60行左右, 不过我注释写了很多:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
Python复制代码class BaseHTTPMiddleware:
def __init__(self, app: ASGIApp, dispatch: DispatchFunction = None) -> None:
# 赋值下一级的ASGI app
self.app = app
# 如果用户有传dispatch, 就使用用户传的函数, 否则使用自身的dispatch
# 一般用户都是继承于BaseHTTPMiddleware, 然后复写dispatch方法
self.dispatch_func = self.dispatch if dispatch is None else dispatch

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
ASGI 标准的函数签名函数, 代表着ASGI的请求会从这里进来
"""
if scope["type"] != "http":
# 如果类型不是http的, 则不会走中间件(也就是websocket的不支持)
# 要支持websocket的话, 中间件就很难这样实现了, 我在实现rap框架时, 为了支持类websocket的流量中间件处理, 牺牲了一些功能才可以实现
await self.app(scope, receive, send)
return

# 通过scope生成request对象
request = Request(scope, receive=receive)
# 进入dispatch逻辑, 也就是用户的处理逻辑
# 这个逻辑得到的respone实际上是call_next函数生成的, dispatch函数只做了传递的作用
response = await self.dispatch_func(request, self.call_next)
# 根据生成的response, 返回数据到上一层
await response(scope, receive, send)

async def call_next(self, request: Request) -> Response:
loop = asyncio.get_event_loop()
# 通过queue生产消费模式来获取下一级的消息
queue: "asyncio.Queue[typing.Optional[Message]]" = asyncio.Queue()

scope = request.scope
# 通过request.receive对象把uvicorn的receive对象传过来
# 这里用到的receive对象还是uvicorn初始化时的receive对象
receive = request.receive
send = queue.put

async def coro() -> None:
try:
await self.app(scope, receive, send)
finally:
# 这个put操作能确保get那边不会被卡死
await queue.put(None)

# 通过loop.create_task, 在另一个协程跑下一个ASGI APP
task = loop.create_task(coro())
# 等待下一个ASGI APP的返回
message = await queue.get()
if message is None:
# 如果拿到是空的, 则代表下一个ASGI APP没有返回响应, 这时可能出错,
# 通过调用task.result(), 如果该协程出现异常, 则会把该协程的错误抛出来
task.result()
# 如果没有异常抛出来, 就可能是用户写错等原因, 返回了一个空响应,
# 这时候是没办法返回响应给客户端的, 需要自己制造一个异常, 方便后续生成一个500的响应
raise RuntimeError("No response returned.")

# ASGI处理响应的时候会分多步走, 正常情况下, 上面的queue.get, 是获取响应的第一步
assert message["type"] == "http.response.start"

async def body_stream() -> typing.AsyncGenerator[bytes, None]:
# 其他的处理会交给body_stream函数处理
# 这个方法所做的就是一直返回数据流
while True:
message = await queue.get()
if message is None:
break
assert message["type"] == "http.response.body"
yield message.get("body", b"")
task.result()

# 将body_stream函数放到Response方法中
# response本身也是一个类ASGI APP的类, 用户根据教程, 在dispatch方法中通过call_next获得response对象,
# 并在最后返回, 所以这个reponse对象将会交给__call__方法中进行处理。
response = StreamingResponse(
status_code=message["status"], content=body_stream()
)
response.raw_headers = message["headers"]
return response

async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
raise NotImplementedError() # pragma: no cover

2.3.ServerErrorMiddleware

ServerErrorMiddleware跟ExceptionMiddleware很像(所以这一part也不做更多的说明), 整个逻辑基本上都是一致的, 不过ExceptionMiddleware负责的是把用户的路由异常进行捕获处理, ServerErrorMiddleware主要负责是做兜底, 确保返回的一定是合法的HTTP响应。

ServerErrorMiddleware的间接调用函数也跟ExceptionMiddleware一样, 不过只有注册的HTTP状态码为500时, 才会把回调注册到ServerErrorMiddleware中:

1
2
Python复制代码@app.exception_handlers(500)
def not_found_route() -> None: pass

ServerErrorMiddleware是处于ASGI APP中的最顶级, 它负责异常兜底的工作, 它要做的事情很简单, 如果下一级ASGI APP处理发生异常, 就进入兜底逻辑:

  • 1.如果启用debug, 则返回debug页面
  • 2.如果有注册回调, 则执行注册回调
  • 3.如果都没则返回500响应

3.Route

在starlette中, Route分为两部分, 一部分我把它称为Real App的Router, 它处于中间件的下一层级, 但它负责的是Starlette除中间件外的所有事情, 主要包括路由查找匹配, APP启动关闭处理等, 另外一部分则是注册到Router的路由。

3.1.Router

Router很简单, 他的主要责任就是装载路由和匹配路由, 以下是除装载路由外的源码和注释:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
Python复制代码class Router:
def __init__(
self,
routes: typing.Sequence[BaseRoute] = None,
redirect_slashes: bool = True,
default: ASGIApp = None,
on_startup: typing.Sequence[typing.Callable] = None,
on_shutdown: typing.Sequence[typing.Callable] = None,
lifespan: typing.Callable[[typing.Any], typing.AsyncGenerator] = None,
) -> None:
# 装填starlette初始化时的信息
self.routes = [] if routes is None else list(routes)
self.redirect_slashes = redirect_slashes
self.default = self.not_found if default is None else default
self.on_startup = [] if on_startup is None else list(on_startup)
self.on_shutdown = [] if on_shutdown is None else list(on_shutdown)

async def default_lifespan(app: typing.Any) -> typing.AsyncGenerator:
await self.startup()
yield
await self.shutdown()

# 如果初始化lifespan为空, 则把on_startup和on_shuatdown转化为lifespan
self.lifespan_context = default_lifespan if lifespan is None else lifespan

async def not_found(self, scope: Scope, receive: Receive, send: Send) -> None:
"""匹配不到路由执行的逻辑"""
if scope["type"] == "websocket":
# websocket匹配失败
websocket_close = WebSocketClose()
await websocket_close(scope, receive, send)
return

# If we're running inside a starlette application then raise an
# exception, so that the configurable exception handler can deal with
# returning the response. For plain ASGI apps, just return the response.
if "app" in scope:
# 在starlette.applications的__call__方法可以看到starlette把自身存入scope中
# 这里抛出异常后, 可以被ServerErrorMiddleware捕获
raise HTTPException(status_code=404)
else:
# 对于不是starlette调用的, 直接返回错误
response = PlainTextResponse("Not Found", status_code=404)
await response(scope, receive, send)

async def lifespan(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
Handle ASGI lifespan messages, which allows us to manage application
startup and shutdown events.
"""
# lifespan执行的逻辑, 在执行的时候starlette会与ASGI服务器进行通信, 但目前这样的代码估计还有一些待开发的功能
first = True
app = scope.get("app")
await receive()
try:
if inspect.isasyncgenfunction(self.lifespan_context):
async for item in self.lifespan_context(app):
assert first, "Lifespan context yielded multiple times."
first = False
await send({"type": "lifespan.startup.complete"})
await receive()
else:
for item in self.lifespan_context(app): # type: ignore
assert first, "Lifespan context yielded multiple times."
first = False
await send({"type": "lifespan.startup.complete"})
await receive()
except BaseException:
if first:
exc_text = traceback.format_exc()
await send({"type": "lifespan.startup.failed", "message": exc_text})
raise
else:
await send({"type": "lifespan.shutdown.complete"})

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
The main entry point to the Router class.
"""
# 匹配以及执行路由的主要函数
# 目前只支持http, websocket, lifespan三种类型
assert scope["type"] in ("http", "websocket", "lifespan")

# 初始化router到scope中
if "router" not in scope:
scope["router"] = self

if scope["type"] == "lifespan":
# 执行lifespan逻辑
await self.lifespan(scope, receive, send)
return

partial = None

# 进行路由匹配
for route in self.routes:
match, child_scope = route.matches(scope)
if match == Match.FULL:
# 如果是完整匹配(url匹配, method匹配)
# 则进行路由正常处理
scope.update(child_scope)
await route.handle(scope, receive, send)
return
elif match == Match.PARTIAL and partial is None:
# 如果是不完整匹配(url匹配, method不匹配)
# 则保留值, 继续匹配
partial = route
partial_scope = child_scope

if partial is not None:
# 如果存在不完整匹配的路由, 也继续执行, 但这时路由会响应HTTP 方法不对的错误
scope.update(partial_scope)
await partial.handle(scope, receive, send)
return

if scope["type"] == "http" and self.redirect_slashes and scope["path"] != "/":
# 未匹配的情况, 判断重定向
redirect_scope = dict(scope)
if scope["path"].endswith("/"):
redirect_scope["path"] = redirect_scope["path"].rstrip("/")
else:
redirect_scope["path"] = redirect_scope["path"] + "/"

for route in self.routes:
match, child_scope = route.matches(redirect_scope)
if match != Match.NONE:
# 再次进行匹配, 如果结果不为空, 则发送重定向response
redirect_url = URL(scope=redirect_scope)
response = RedirectResponse(url=str(redirect_url))
await response(scope, receive, send)
return

# 上面流程都没命中时, 则代表没有找到任务路由, 这时候会执行默认路由, 默认的默认路由是404 not found
await self.default(scope, receive, send)

可以看出Router的代码非常的简单, 主要的代码都集中在__call__中, 但是在这里出现了多次遍历查询路由且每个路由都是执行一遍正则表达式来判断是否匹配。可能会有人觉得这样的执行速度会很慢,
我曾经也觉得这样的路由查找很慢, 然后就实现了一个路由树来代替它详见route_trie.py, 然而在我实现后做了一次性能测试, 发现在路由没超过50个的情况下, 循环匹配性能是优于路由树的, 在没超过100条的情况下, 两者是相当的, 而在正常情况下, 我们指定的路由都不会超过100个, 所以不用去担心这部分路由的匹配性能, 如果还是很担心, 那么可以使用Mount来对路由进行分组, 使匹配的次数减少。

3.2.其他Route

Moute是继承于BaseRoute, 其它的Route, HostRoute, WebsocketRoute也是一样继承于BaseRoute, 它们提供的方法都差不多, 只是具体实现略有差别而已(主要是初始化,路由匹配和反向查找略有区别), 我们先来看看BaseRoute:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
Python复制代码class BaseRoute:
def matches(self, scope: Scope) -> typing.Tuple[Match, Scope]:
# 一个标准的匹配函数签名, 每个Route都要返回一个(Match, Scope)的元祖
# Match有3种, 分别是
# NONE: 没有匹配到
# PARTIAL: 部分匹配(url匹配了, method匹配失败)
# FULL: 完全匹配(url和method都匹配成功)
# Scope基本上都会返回如下格式, 不过Mount返回的内容更多:
# {"endpoint": self.endpoint, "path_params": path_params}
raise NotImplementedError() # pragma: no cover

def url_path_for(self, name: str, **path_params: str) -> URLPath:
# 根据名字生成反向查找
raise NotImplementedError() # pragma: no cover

async def handle(self, scope: Scope, receive: Receive, send: Send) -> None:
# 被Router匹配后可以调用的函数
raise NotImplementedError() # pragma: no cover

async def __call__(self, scope: Scope, receive: Receive, send: Send) -> None:
"""
A route may be used in isolation as a stand-alone ASGI app.
This is a somewhat contrived case, as they'll almost always be used
within a Router, but could be useful for some tooling and minimal apps.
"""
# 如果该路由被当做ASGI APP单独调用, 则自己进行匹配并响应结果
match, child_scope = self.matches(scope)
if match == Match.NONE:
if scope["type"] == "http":
response = PlainTextResponse("Not Found", status_code=404)
await response(scope, receive, send)
elif scope["type"] == "websocket":
websocket_close = WebSocketClose()
await websocket_close(scope, receive, send)
return

scope.update(child_scope)
await self.handle(scope, receive, send)

可以看到BaseRoute提供的功能不多, 其他的路由则是基于BaseRoute进行拓展:

  • Route: 标准的HTTP路由, 负责通过HTTP URL和HTTP Method进行路由匹配, 然后提供调用HTTP路由的方法
  • WebSocketRoute: 标准的WebSocketRoute, 根据HTTP URL进行路由匹配, 然后通过starlette.websocket的WebSocket生成session再传入对应的函数
  • Mount: 一个路由的套娃封装, 他的匹配方法是URL的前缀匹配, 把请求转发给符合规则的下一级ASGI APP, 当他的下一级ASGI APP是Router时, 他的调用链可能会像这样Router->Mount->Router->Mount->Router->Route, 通过使用Mount可以对路由进行分组, 同时也能加快匹配速度, 推荐使用。 不过, 它还支持把请求分发给其他ASGI APP, 也可以做到如Starlette->ASGI Middleware->Mount->Other Starlette->...
  • Host: 它会根据用户请求的Host分发到对应的ASGI APP, 可以选择Route, Mount, 中间件等等ASGI APP

4.其它组件

从上面可以看到, starlette中的组件基本上都设计成ASGI APP, 可以任意的兼容, 这种设计是非常棒的, 虽然会牺牲一点点性能, 但是它的兼容性非常的强, 而其他的组件也都或多或少的设计得像ASGI APP一样, 在介绍其他组件之前, 先看看整个starlette的整个项目结构:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
Python复制代码├── middleware                       # 中间件
├── applications.py # 启动的应用程序
├── authentication.py # 验证相关
├── background.py # 封装后台任务, 会在返回响应后执行
├── concurrency.py # 一些小的asyncio相关的封装, 在新版本中, 直接使用了anyio库来代替
├── config.py # 配置
├── convertors.py # 一些类型的转换方法
├── datastructures.py # 一些数据结构, 比如Url, Header, Form, QueryParam, State等等
├── endpoints.py # 支持cbv的路由以及一个稍微高级点的Websocket封装
├── exceptions.py # 异常处理
├── formparsers.py # Form,File之类的解析
├── graphql.py # 负责处理graphql相关的
├── __init__.py
├── py.typed # starlette需要用到的TypeHints
├── requests.py # 请求, 供用户获取数据
├── responses.py # 响应, 负责初始化Header和Cookies, 同时根据不同的Respnose类生成响应数据, 然后有个类ASGI调用接口, 该接口会发送ASGI协议到uvicorn服务, 发送完后如果有backgroud task, 则执行backgroud task, 直到执行完成, 该响应流程才结束。
├── routing.py # 路由
├── schemas.py # OpenApi相关的Schemas
├── staticfiles.py # 静态文件
├── status.py # HTTP状态码
├── templating.py # 基于jinja的模板响应
├── testclient.py # 测试客户端
├── types.py # 类型
└── websockets.py # websocket

上面的文件有很多, 有些比较简单就直接略过。

4.1.Request

Request非常的简单, 它继承于HttpConnection, 这个类主要是通过ASGI协议传过来的Scope进行解析, 提取如url, method等信息, 而Request增加了读取请求数据和返回数据(HTTP1.1支持服务端push数据给客户端)的功能, 其中, 读取数据都依赖于一个核心函数–stram,它的源码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
Python复制代码async def stream(self) -> typing.AsyncGenerator[bytes, None]:
# 如果已经读取过的, 则从缓存中获取数据
if hasattr(self, "_body"):
yield self._body
yield b""
return

if self._stream_consumed:
raise RuntimeError("Stream consumed")

self._stream_consumed = True
while True:
# 从ASGI容器的receive循环获取数据
message = await self._receive()
if message["type"] == "http.request":
body = message.get("body", b"")
if body:
# 获取的数据不为空就返回数据
yield body
if not message.get("more_body", False):
# 代表body数据已经被获取完
break
elif message["type"] == "http.disconnect":
# 代表与客户端的连接已经关闭了
self._is_disconnected = True
# 抛出异常, 用户调用await request.body() await request.json()之类的会抛出异常
raise ClientDisconnect()
# 返回空字节,标记结束
yield b""

这个实现非常简单, 但是却有一个小bug, 如果有了解Nginx或者其他Web服务的都会知道, 一般的中间服务器是不会处理body数据的, 只做传递。ASGI也是如此, uvicorn在处理完url和header后就开始调用ASGI APP, 并把send和receive对象传递下去, 这两个对象会在经过多个ASGI APP后,抵达路由这个ASGI APP, 并在函数里供用户使用,, 所以Request接收的receive对象是uvicorn生成的。 而receive的数据源是源自于是一个asyncio.Queue队列, 从中间件的分析可以知道, 每个ASGI APP都依据scope, receive来生成一个Request对象, 意味着每层ASGI APP的Request对象是不一致的, 如果在中间件调用Request对象读取Body的话, 就会提前消费通过receive消费了队列的数据, 导致后续的ASGI APP无法通过Request对象读取Body数据, 该问题示例代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
Python复制代码import asyncio
from starlette.applications import Starlette
from starlette.middleware.base import BaseHTTPMiddleware, RequestResponseEndpoint
from starlette.requests import Request
from starlette.responses import JSONResponse, Response

app: Starlette = Starlette()


class DemoMiddleware(BaseHTTPMiddleware):
async def dispatch(
self, request: Request, call_next: RequestResponseEndpoint
) -> Response:
print(request, await request.body())
return await call_next(request)


app.add_middleware(DemoMiddleware)


@app.route("/")
async def demo(request: Request) -> JSONResponse:
try:
await asyncio.wait_for(request.body(), 1)
return JSONResponse({"result": True})
except asyncio.TimeoutError:
return JSONResponse({"result": False})


if __name__ == "__main__":
import uvicorn # type: ignore

uvicorn.run(app)

运行后执行请求查看结果:

1
2
Bash复制代码-> curl http://127.0.0.1:8000
{"result":false}

可以看到执行的结果是false, 意味着执行request.body超时了, 因为此时receive队列已经空了, 是拿不到数据的, 如果不加超时的话这个请求就会一直卡主。
那么要怎么去解决问题呢, 先看看Request获取是如何获取body的, 因为用户可以同时获取多次body, 但一直都是相同的数据, 它的实现思路是获取数据后, 把数据缓存到一个变量里面, 我们也可以采取这个思路, 由于数据都是通过receive获取的, 那么可以在在读取数据后, 构造一个receive函数, 该函数返回类似于ASGI的通信协议的数据, 并且有完整的body数据(满足Request.stream获取body的构造), 代码如下:

1
2
3
4
5
6
7
Python复制代码async def proxy_get_body(request: Request) -> bytes:
async def receive() -> Message:
return {"type": "http.request", "body": body}

body = await request.body()
request._receive = receive
return body

之后任意层级的ASGI APP如果需要获取Body数据的话, 就可以调用该函数来获取Body数据, 同时又不影响后续的ASGI APP获取Body数据。

4.2.TestClient

在基于TestCLient的测试用例运行时, 没有流量转发, 而是通过请求调用到路由函数, 并根据返回数据转化为一个响应对象。
同时, 它还能会自动运行on_startup和on_shutdown挂载的函数以及挂载的中间件, 我在一开始接触时, 我很好奇它是怎么实现的, 因为大多数的测试用例框架都很难做到直接调用到路由函数, 同时又满足于框架的其他中间件, on_startup和on_shutdown的功能(特别是Python的gRPC自带的测试用例封装…)。

在了解TestClient的运行原理之前, 先看看TestClient的使用用例如下:

1
2
3
4
5
6
7
Python复制代码from starlette.testclient import TestClient
from requests import Response # type: ignore


app: Starlette = Starlette()
with TestClient(app) as client:
response: Response = client.get("/")

这段代码中, 分为几步走:

  • 1:初始化一个app对象
  • 2:把app对象传入TestClient中, 并通过with语法启动一个上下文
  • 3:通过返回的client进行调用, 最后返回一个requests.Response对象。

其中第一点非常简单, 我们也分析过了, 对于第二点, 很难明白为什么要with上下文, 在官方文档说明是可以这样直接运行:

1
Python复制代码response: Response = TestClient(app).get("/")

但是没办法执行on_startup和on_shutdown这两个事件挂载的函数, 所以初步判定with语法与它们有关, 而至于第三步则很难猜透starlette是怎么实现的, 但是返回的是requests.Respnose的对象, 那么一定跟requests这个框架有一些关联, 具体需要分析源码才能知道。

接下来就开始带着问题分析源码, 首先是类和__init__:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
Python复制代码class TestClient(requests.Session):
__test__ = False # For pytest to not discover this up.

def __init__(
self,
app: typing.Union[ASGI2App, ASGI3App],
base_url: str = "http://testserver",
raise_server_exceptions: bool = True,
root_path: str = "",
) -> None:
super(TestClient, self).__init__()
if _is_asgi3(app):
app = typing.cast(ASGI3App, app)
asgi_app = app
else:
app = typing.cast(ASGI2App, app)
asgi_app = _WrapASGI2(app) #  type: ignore
# 使用了request的Adapter功能,
adapter = _ASGIAdapter(
asgi_app,
raise_server_exceptions=raise_server_exceptions,
root_path=root_path,
)
self.mount("http://", adapter)
self.mount("https://", adapter)
self.mount("ws://", adapter)
self.mount("wss://", adapter)
self.headers.update({"user-agent": "testclient"})
self.app = asgi_app
self.base_url = base_url

从这个可以看出, TestClient继承于requests.Session的方法, 证明可以在编写测试用例时, 直接调用到requests.Session的相关的方法。然后在__init__方法中实例化了一个adapter, 这里是使用了requests的adapter机制, 通过adpater机制, 可以拦截请求的数据和响应的数据。
_ASGIdapter的代码比较多, 但是它的实现逻辑很简单, 它重载了Adapter的send方法, 当执行到send方法时, 它会变成执行app(scope, receive, send), 其中receive是负责把请求的数据转换为ASGI协议,供下一级ASGI APP调用。而send(位于Adapter.send里面的闭包函数)则获取ASGI APP返回的数据并存放到字典中, 当ASGI APP执行完毕的时候, Adapter的send方法会根据执行是否异常以及存放数据的字典转化为一个request.Response的实例返回给用户。

通过_ASGIdapter了解了starlette是如何解决第三个问题的, 接下来是with语法相关的__enter__, __exit__:

1
2
3
4
5
6
7
8
9
10
11
12
Python复制代码class TestClient(requests.Session):
def __enter__(self) -> "TestClient":
loop = asyncio.get_event_loop()
self.send_queue = asyncio.Queue() # type: asyncio.Queue
self.receive_queue = asyncio.Queue() # type: asyncio.Queue
self.task = loop.create_task(self.lifespan())
loop.run_until_complete(self.wait_startup())
return self

def __exit__(self, *args: typing.Any) -> None:
loop = asyncio.get_event_loop()
loop.run_until_complete(self.wait_shutdown())

可以看出, 在使用进入上下文和退出上下文时, 自动调用了lifespan方法, 然后通过lifespan机制来实现on_startup和on_shutdown功能, 具体的源码和注释如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
Python复制代码class TestClient(requests.Session):
async def lifespan(self) -> None:
# 构造lifespan的scope
scope = {"type": "lifespan"}
try:
# 发送到starlette, 然后starlette就会根据receive执行对应的事件
await self.app(scope, self.receive_queue.get, self.send_queue.put)
finally:
await self.send_queue.put(None)

async def wait_startup(self) -> None:
# 发送lifespan开始信息
await self.receive_queue.put({"type": "lifespan.startup"})
# 监听starlette返回的lifespan信息, 并判断信息是否正确
message = await self.send_queue.get()
if message is None:
self.task.result()
assert message["type"] in (
"lifespan.startup.complete",
"lifespan.startup.failed",
)
# 如果错误, 则消费task.result
if message["type"] == "lifespan.startup.failed":
message = await self.send_queue.get()
if message is None:
self.task.result()

async def wait_shutdown(self) -> None:
# 发送lifespan关闭信息
await self.receive_queue.put({"type": "lifespan.shutdown"})
message = await self.send_queue.get()
if message is None:
self.task.result()
assert message["type"] == "lifespan.shutdown.complete"
# 等待starlette的lifespan执行结束
await self.task

5.总结

至此, starlette的几个重要的功能代码都分析完了, starlette是一个非常棒的库, 它的设计思路也是非常的棒, 建议大家自己读一遍starlette的源代码, 对以后自己写框架是有帮助的。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SSIS学习使用十七:多个灵活的来源位置 翻译参考 介绍 加

发表于 2021-11-28

这是我参与11月更文挑战的第27天,活动详情查看:2021最后一次更文挑战

翻译参考

本文主要参考翻译自 The Stairway to Integration Services 系列文章的原文 Multiple Flexible Source Locations – Level 17 of the Stairway to Integration Services,目的在于对 SSIS 有一个全面清晰的认识,所有内容在原文的基础上进行实操,由于版本差异、个人疑问等多种原因,未采用完全翻译的原则,同时也会对文中内容进行适当修改,希望最终可以更有利于学习和了解 SSIS,

感谢支持!

介绍

在本文中,我们通过将数据从多个文件加载到不同的主题区域来实现对SSIS参数、变量的综合运用。

在开始之前,请单击此链接(andyweather.com/data/Weathe…

压缩文件中的数据以“MMMYY”格式存储在每个MonthYear文件夹中。天气数据存储在MonthYear文件夹下的子文件夹中。 Dec08和Feb09的MonthYear文件夹包含一个名为TH的子文件夹。 Apr09、Jun09和Aug09的MonthYear文件夹包含两个子文件夹:TH和WIND。

每个TH文件夹都包含一个名为sensor1-all.csv的文件。这些文件表示在2008年12月至2009年8月之间收集的温度和湿度数据。这些文件是累积的——2009年2月文件包括2008年12月中的所有数据,以及2008年12月至2009年2月之间添加的记录,而2009年4月文件包括所有来自2008年12月中的数据以及2008年12月至2009年4月之间添加的记录。所有三个文件的记录都类似于如下所示:

1
2
3
4
5
6
7
ini复制代码Date,Time,Min T,Max T,Average T,Min H,Max H,Average H,Comfort zone,Min DP,Max DP,Average DP,Min HI,Max HI,Average HI,Low Batt

2008-12-25,19:00,8.5,10.9,9.71,32,36,33,2,-6.0,-5.0,-5.71,--,--,--

2008-12-25,20:00,6.3,8.5,7.21,36,40,38,2,-6.0,-5.0,-5.95,--,--,--

2008-12-25,21:00,5.3,6.7,6.37,39,43,40,0,-6.0,-6.0,-6.00,--,--,--

每个WIND子文件夹包含两个名为1day.csv和all.csv的文件,

我们将添加一个新的SSIS包,以从每个MonthYear\WIND文件夹中的all.csv文件加载数据。

但首先,让我们看一下LoadWeatherData.dtsx包。我怀疑新数据在LoadWeatherData.dtsx包中不能很好地运行。我们首先要确保名为SourceFolder的项目参数指向正确的文件夹。

I am building my version of the SSIS project in separate folders so I can save the state of the project at the completion of each article. You are probably not doing this, and that’s ok.

打开项目参数Project.params,确保SourceFolder参数值指向正确的路径:

如果你一直从一开始跟随演练,现在LoadWeatherData.dtsx包的”控制面板”类似如下:

“DFT Stage Temperature and Humidity”数据流任务的棕色点表示断点。

加载温湿度数据时,由于WIND文件数据导致出错

如果你解压了”WeatherData_Dec08_Aug09.zip”文件到之前的数据文件中(即SourceFolder项目参数指定的文件夹),”FOREACH Temperature File”Foreach循环容器会发现新的WIND文件,并且这会导致”DFT Stage Temperature and Humidity”数据流任务失败。

为什么?”FOREACH Temperature File”Foreach循环容器使用SourceFolder项目参数动态设置Foreach文件枚举器的Directory属性(可以在Foreach循环容器编辑器的“收集”——Collection页上找到)。文件规格(File Specification)设置为检索在SourceFolder项目参数指定的文件夹及任何子文件夹中,查找满足所有逗号分隔值的文件(”* .csv”文件)的完全限定文件名(the fully qualified file name),如图所示:

按照当前配置,”FOREACH Temperature File”Foreach循环容器将返回WIND子文件夹中的文件。这将导致”DFT Stage Temperature and Humidity”数据流任务产生问题,因为它被配置为仅加载温度和湿度(temperature and humidity)数据。

让我们看看它是如何破裂的,然后逐步解释它破裂的原因。

在SSIS调试器中执行LoadWeatherData.dtsx。当达到断点时,就像在”SSIS学习使用十:灵活的来源位置”中所做的那样,查看“局部变量”窗口(“Debug”->“Windows”->“Locals”——需要在debug模式下)。展开“变量”节点并滚动,直到可以查看User::SourceFileName变量的值。

注意,当“FOREACH Temperature File”Foreach循环容器遇到WIND子文件夹中的第一个csv文件时,包执行失败——文件的完整路径会推入User::SourceFileName SSIS变量–如图所示:

WIND文件名中的文件只有一个条目,与温度和湿度文件的布局不匹配。在LoadWeatherData.dtsxSSIS包中,错误由”FFSrc Temperature and Humidity”平面文件源适配器产生。错误文本是:”跳过数据行时出错”(An error occurred while skipping data rows)。

WIND数据不能被”FFSrc Temperature and Humidity”平面文件源适配器读取,在当前状态运行SSIS包会生成一个错误。

为什么加载失败?

在设计时创建平面文件连接管理器时,平面文件的格式(其架构)将被读取到平面文件连接管理器中。这仅在设计时或在开发期间发生。

在运行时,可以使用面向对象的开发人员熟悉的术语“不可变”(immutable,即Immutable_object,不可变对象)来描述平面文件连接管理器。声明一个对象是不可变的,意味着它不能更改(在大多数情况下…允许某些属性更改,但不要分散注意力……)。

平面文件连接管理器不是数据流中唯一的不可变对象。数据流管道(data flow pipeline)也是不可变的。作为SSIS开发人员,我们(通常)使用数据流路径与数据流管道进行交互以连接数据流对象。

因此,有一个问题:如果另一个文件具有相同的格式(模式),是否可以由相同的平面文件连接管理器和数据流任务加载该文件,就像连接管理器和数据流为该文件设计一样?

答案是“是”。我可以听到你在想,“安迪,定义等效项”。define equivalent:

  • 字段数必须相同。
  • 数据类型必须与平面文件连接管理器中定义的数据类型匹配,或者必须隐式强制于平面文件连接管理器中定义的数据类型。
  • 数据长度必须等于或小于平面文件连接管理器中定义的长度。

请记住,除非每一列中的数据在所有平面文件中都具有相同含义,否则使用相同的连接管理器和数据流加载另一个文件将以非常聪明的方式污染数据库。

修正错误

你可能会想,”如何修正错误呢,安迪?”,这是一个优秀的问题。我们必须修改文件规格,排除WIND逗号分隔值的文件,仅仅包含温度和湿度的逗号分隔值文件。

仅加载温湿度数据

如果LoadWeatherData.dtsxSSIS包仍在调试器中运行,请停止调试器。打开“FOREACH Temperature File”Foreach循环容器编辑器,然后导航到“收集”页。将“Files”属性从“*.csv”更改为“sensor1-all.csv”,如8所示:

此更改将“FOREACH Temperature File”Foreach循环容器配置为仅返回名为“sensor1-all.csv”的文件,而不是返回任何具有“csv”扩展名的文件。温度和湿度文件的名称为“sensor1-all.csv”,其他天气数据文件的名称则不同。

关闭编辑器,然后在SSIS调试器中重新执行LoadWeatherData.dtsx SSIS程序包。这次包成功。它还将来自两个新的温度和湿度文件的数据加载到数据库中。

在执行过程中,出现如下报错:

错误: 变量“User::SourceFileName”已在写入列表中。一个变量只能向读取锁定列表或写入锁定列表中添加一次。
错误: 变量“(null)”已在写入列表中。一个变量只能向读取锁定列表或写入锁定列表中添加一次。

开始有些不知所措。后面查找”User::SourceFileName”变量,打开Foreach循环编辑器,在”变量映射”页中,发现User::SourceFileName指定了多次。删除多余的,只保留一个即可。(出现的原因不知)

Error: The variable “User::SourceFileName” is already in the write list. A variable can only be added once to the read lock list or write lock list.

LoadWeatherData.dtsxSSIS程序包是可以再次使用的功能,但是现在名称不正确。将其重命名为LoadTemperatureData.dtsx

加载WIND数据

设计

在添加包以加载WIND数据之前,让我们考虑一下设计。我是从右到左设计(right-to-left design)的忠实拥护者——从输出开始,然后一直回到输入(一个或多个),因为它可以帮助我确定设计中的主要步骤(或模块)。从新包的输出开始,找到目标数据库WeatherData,如图所示

我将WIND数据”暂存”在WeatherData数据库的一个表中。向后进行,暂存将通过新SSIS包中的“仅向前”(forward-only)增量加载数据流任务来完成,如图所示。

再往后一步,此数据流任务将在Foreach循环容器内,就像LoadTemperatureData.dtsx SSIS包中的Foreach循环容器一样,如图所示。

以上3个图中的构造图在功能上不准确(大多数框图(block diagrams)在功能上都不准确)。

提升项目连接管理器

LoadTemperatureData.dtsx包有一个用于WeatherData数据库的包连接管理器(package connection manager)。在项目部署模型中开发包时,SSIS 2012提供项目连接管理器(project connection managers)。并且,将包连接管理器转换为项目连接管理器非常容易。

要将包连接管理器升级为项目连接管理器,只需右键单击包连接管理器,然后单击“转换为项目连接”(Convert to Project Connection),如图所示。

点击”转换到项目连接”后,连接管理器会提升到项目连接管理器,连接管理器的名字会添加一个前缀”(project)”——“(项目)”

同时,新提升的项目连接管理器也会在解决方案资源管理器下的”连接管理器”(Connection Managers)节点列出:

这样可以节省新的SSIS包中的步骤。哪一步?让我们创建该包,看看吧!

构建LoadWindData.dtsx

在解决方案资源管理器中右键单击“ SSIS包”节点,然后单击“新建SSIS包”,然后重命名新包为LoadWindData.dtsx。

新包将会在SQL Server Data Tools集成开发环境中打开。

注意,LoadWindData.dtsxSSIS包中可以使用名为“(项目)WIN-FR5GRQSCDPO.WeatherData”的项目连接管理器,而我们无需执行任何操作即可实现此目的。为什么?是因为项目中所有SSIS包都可以使用全部的项目连接管理器。多么酷啊!

将一个Foreach循环容器添加到LoadWindData.dtsxSSIS包的控制流中,并将其重命名为“FOREACH Wind File”。

打开”Foreach循环容器”的编辑器,然后导航到“集合”页。单击Enumerator Expressions属性。

点击Expressions属性集合的值文本框的省略号,打开属性表达式编辑器,从”Property”下拉列表中选择”Directory”,

然后,点击”Expression”文本框的省略号,显示表达式构建器,展开”变量和参数”节点,点击$Project::SourceFolder并拖拽到”表达式”文本框

如果点击”计算表达式”按钮,将会看到项目参数$Project::SourceFolder当前设计时的值。

这是SSIS 2012的项目部署模型中项目级对象的另一个好处。与项目连接管理器类似,SSIS项目中的每个包都可以使用项目参数。同样很酷!

单击”确定”按钮关闭表达式生成器,然后单击”确定”按钮关闭属性表达式编辑器,返回到Foreach循环编辑器。我们希望使用此Foreach循环容器检索的是包含多天的Wind数据的文件。这些文件名为“all.csv”,可以在我们的天气数据文件的MonthYear文件夹的\WIND子文件夹中找到。

在Foreach循环编辑器的“文件”属性中,输入“all.csv”,还要选中“遍历子文件夹”(Traverse subfolders)复选框。如图所示

我们配置了Foreach循环容器,在名为$Project::SourceFolder的项目参数包含的路径(或其子文件夹)中搜索名为all.csv的文件。 Foreach循环每次查找并返回其中的一个文件。

当找到名为“all.csv”的文件时,我们需要将这些文件的完全限定路径发送到某个地方。让我们配置一个SSIS变量,使其包含Foreach循环容器每次迭代的源文件路径。

点击”变量映射”页,点击”Variable”下拉框,然后点击”新建变量”。

在”添加变量”窗口,确保”容器”属性设置为”包容器”——LoadWindData.dtsx

在”名称”(Name)属性中输入变量名:SourceFileName。

单击“确定”按钮关闭“添加变量”窗口,返回到Foreach循环编辑器。

点击”确定”按钮关闭”Foreach Loop Editor”。

从控制流SSIS工具箱中拖拽一个数据流任务到Foreach循环容器,重命名数据流任务”DFT Load Wind Data”:

双击”DFT Load Wind Data”数据流任务打开编辑器,添加一个平面文件源适配器并重命名为”FFSrc Wind Data”。

双击”FFSrc Wind Data”平面文件源适配器打开编辑器:

点击”Flat file connection manager”下拉列表旁边的”新建”按钮,创建一个新的平面文件连接管理器,并打开其编辑器。点击”File name”属性文本框旁边的”浏览”按钮,显示打开对话框,修改文件类型过滤为”*.csv”,导航到\data\Apr09\WIND文件夹,选择”all.csv”文件

修改平面文件连接管理器的名字为”FFCM Wind Data”

点击”列”页,可以预览文件中数据。

点击”确定”按钮关闭平面文件连接管理器编辑器,在”平面文件源编辑器”的”列”页中,显示从Flat File Connection Manager到Flat File Source Adapter返回的列。

在右下方的网格中,找到“外部列”(External Columns)和“输出列”(Output Columns)。外部列从平面文件连接管理器提供给平面文件源适配器。我们可以更改这些列的名称,但不是此处,必须在平面文件连接管理器中才能更改。但是,我们可以通过编辑输出列名来更改列的别名。

我想从列名称中删除空格和“(m s)”文本。可以通过更改Output Columns列中的列名来做到这一点,修改完成后,列表格显示如下:

单击确定按钮关闭平面文件源编辑器。

从SSIS工具箱中拖动一个Lookup Transformation,并将数据流路径从“FFSrc Wind Data”平面文件源适配器连接到”查找转换”。

打开查找编辑器,在“常规”页面上,将“指定如何处理无匹配项的行”下拉列表更改为“将行重定向到无匹配输出”,如图所示。

点击”Connection”页

确认”OLE DB connection manager”下拉列表设置为”WIN-FR5GRQSCDPO.WeatherData”

我们将使用“查找转换”的一些功能来创建目标表。单击“使用表或视图”下拉菜单旁边的“新建”按钮。

与从OLE DB目标适配器创建表时一样,“创建表”窗口中显示的默认表名是“查找转换”的名称——此时为“查找”。

注意,列名反映了对平面文件源适配器中更改的输出列名(没有空格,并且已删除”(m s)”文本),修改语句中的表名为”StageWind”,如图所示。

单击“确定”按钮以创建表,返回到“查找转换编辑器”。

我不确定为什么会发生这种情况,当按照刚才概述的步骤进行操作时,会去选择“使用SQL查询的结果”(Use results of an SQL query)选项,且该查询为select * from 及刚刚使用”新建”按钮创建的表的名称。

很好,因为我确实要选择该选项。但是让我们编辑查询,使其读取:

1
2
sql复制代码Select [Date], [Time]
From [dbo].[StageWind]

切记:我们的数据每小时创建一次,并且永远不会更新。如果目标表中存在相同日期/时间组合的行,则表明我们先前已加载该行。

如果单击“预览”(Preview)按钮,在返回的“日期和时间”列中看不到任何数据(我们刚刚创建了该表,该表为空),但是这样可以测试查询,查看其构造是否足够返回一个空的数据集。

由于我们的数据永远不会更新,因此只需要检查新行或现有行。如果在目标表中找到日期/时间,则它是现有行。因此,我们仅在“Date”和“Time”列上检查匹配。

单击“列”页,然后将[Available Input Columns].[Date]列连接到[Available Lookup Columns].[Date]列,[Available Input Columns].[Time]列连接到[Available Lookup Columns].[Time]列,如图:

我们配置date/time不匹配的行从“查找转换”的”无匹配输出”中流出。简单地丢弃匹配行——匹配的行流到Lookup Transformation的”匹配输出”中,但是我们不会将”匹配输出”连接到数据流中。

单击“确定”按钮关闭“查找转换编辑器”。将”OLE DB目标适配器”拖到数据流上,并重命名为“OLEDBDest Stage Wind”。

将数据流路径从”查找转换”连接到“OLEDBDest Stage Wind” OLE DB目标适配器。出现提示时,从”输入输出选择”中,选择来自查找转换的“查找无匹配输出”(Lookup No Match Output),如图所示。

打开“OLEDBDest Stage Wind” OLE DB目标适配器的编辑器,确认”OLE DB connection manager”下拉列表设置为”WIN-FR5GRQSCDPO.WeatherData”,并设置”Name of the table or the view”下拉列表为StageWind表,如下:

单击Mappings页完成将Available Input Columns自动映射到Available Destination Columns,如图所示:

单击确定按钮完成OLE DB目标适配器的配置。数据流应类似如图所示。

测试时间

在SSDT-BI调试器中执行LoadWindData.dtsx SSIS包之前,打开SSMS,连接到WeatherData数据库,然后执行以下查询:

1
sql复制代码Select * From dbo.StageWind

查询结果应该为空。

在SSDT-BI调试器中执行LoadWindData.dtsx SSIS包,执行完成后,数据流应该类似如下图:

当包完成执行但调试器仍在运行时,单击SSDT-BI中的“Progress”选项卡。如果滚动到标有“Task DFT Load Wind Data”的节点,将发现数据流执行了(“Start”)3次,如图所示。

如果返回SSMS并重新执行测试查询,将在dbo.StageWind表中找到数据,如图所示。

总结

在本文中,我们利用SSIS参数,变量和Foreach循环容器从多个不同的源中加载数据。我们重新配置(并重命名)了原始程序包,以仅加载温度和湿度数据。

新的SSIS包(LoadWindData.dtsx)中的Foreach循环使我们可以将来自三个来源的风力数据归零,但我们可以轻松地从30个来源(或未知数目,或可变数目的来源)中加载数据。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…130131132…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%