聊聊jesque的event机制

本文主要介绍一下jesque的event机制

WorkerEvent

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEvent.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
复制代码/**
* The possible WorkerEvents that a WorkerListener may register for.
*/
public enum WorkerEvent {

/**
* The Worker just finished starting up and is about to start running.
*/
WORKER_START,
/**
* The Worker is polling the queue.
*/
WORKER_POLL,
/**
* The Worker is processing a Job.
*/
JOB_PROCESS,
/**
* The Worker is about to execute a materialized Job.
*/
JOB_EXECUTE,
/**
* The Worker successfully executed a materialized Job.
*/
JOB_SUCCESS,
/**
* The Worker caught an Exception during the execution of a materialized Job.
*/
JOB_FAILURE,
/**
* The Worker caught an Exception during normal operation.
*/
WORKER_ERROR,
/**
* The Worker just finished running and is about to shutdown.
*/
WORKER_STOP;
}

JOB_PROCESS与JOB_EXECUTE可能让人有点迷糊。二者之间有个去redis更新状态以及实例化job的操作,而JOB_EXECUTE则是before execute的意思
JOB_SUCCESS以及JOB_FAILURE则是after execute的意思

WorkerEventEmitter

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerEventEmitter.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
复制代码/**
* A WorkerEventEmitter allows WorkerListeners to register for WorkerEvents.
*/
public interface WorkerEventEmitter {

/**
* Register a WorkerListener for all WorkerEvents.
* @param listener the WorkerListener to register
*/
void addListener(WorkerListener listener);

/**
* Register a WorkerListener for the specified WorkerEvents.
* @param listener the WorkerListener to register
* @param events the WorkerEvents to be notified of
*/
void addListener(WorkerListener listener, WorkerEvent... events);

/**
* Unregister a WorkerListener for all WorkerEvents.
* @param listener the WorkerListener to unregister
*/
void removeListener(WorkerListener listener);

/**
* Unregister a WorkerListener for the specified WorkerEvents.
* @param listener the WorkerListener to unregister
* @param events the WorkerEvents to no longer be notified of
*/
void removeListener(WorkerListener listener, WorkerEvent... events);

/**
* Unregister all WorkerListeners for all WorkerEvents.
*/
void removeAllListeners();

/**
* Unregister all WorkerListeners for the specified WorkerEvents.
* @param events the WorkerEvents to no longer be notified of
*/
void removeAllListeners(WorkerEvent... events);
}

定义了event emitter的接口

WorkerListenerDelegate

jesque-2.1.0-sources.jar!/net/greghaines/jesque/worker/WorkerListenerDelegate.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
复制代码/**
* WorkerListenerDelegate keeps track of WorkerListeners and notifies each listener when fireEvent() is invoked.
*/
public class WorkerListenerDelegate implements WorkerEventEmitter {

private static final Logger log = LoggerFactory.getLogger(WorkerListenerDelegate.class);

private final Map<WorkerEvent, ConcurrentSet<WorkerListener>> eventListenerMap;

/**
* Constructor.
*/
public WorkerListenerDelegate() {
final Map<WorkerEvent, ConcurrentSet<WorkerListener>> elp =
new EnumMap<WorkerEvent, ConcurrentSet<WorkerListener>>(WorkerEvent.class);
for (final WorkerEvent event : WorkerEvent.values()) {
elp.put(event, new ConcurrentHashSet<WorkerListener>());
}
this.eventListenerMap = Collections.unmodifiableMap(elp);
}

/**
* {@inheritDoc}
*/
@Override
public void addListener(final WorkerListener listener) {
addListener(listener, WorkerEvent.values());
}

/**
* {@inheritDoc}
*/
@Override
public void addListener(final WorkerListener listener, final WorkerEvent... events) {
if (listener != null) {
for (final WorkerEvent event : events) {
final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
if (listeners != null) {
listeners.add(listener);
}
}
}
}

/**
* {@inheritDoc}
*/
@Override
public void removeListener(final WorkerListener listener) {
removeListener(listener, WorkerEvent.values());
}

/**
* {@inheritDoc}
*/
@Override
public void removeListener(final WorkerListener listener, final WorkerEvent... events) {
if (listener != null) {
for (final WorkerEvent event : events) {
final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
if (listeners != null) {
listeners.remove(listener);
}
}
}
}

/**
* {@inheritDoc}
*/
@Override
public void removeAllListeners() {
removeAllListeners(WorkerEvent.values());
}

/**
* {@inheritDoc}
*/
@Override
public void removeAllListeners(final WorkerEvent... events) {
for (final WorkerEvent event : events) {
final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
if (listeners != null) {
listeners.clear();
}
}
}

/**
* Notify all WorkerListeners currently registered for the given WorkerEvent.
* @param event the WorkerEvent that occurred
* @param worker the Worker that the event occurred in
* @param queue the queue the Worker is processing
* @param job the Job related to the event (only supply for JOB_PROCESS, JOB_EXECUTE, JOB_SUCCESS, and
* JOB_FAILURE events)
* @param runner the materialized object that the Job specified (only supply for JOB_EXECUTE and
* JOB_SUCCESS events)
* @param result the result of the successful execution of the Job (only set for JOB_SUCCESS and if the Job was
* a Callable that returned a value)
* @param t the Throwable that caused the event (only supply for JOB_FAILURE and ERROR events)
*/
public void fireEvent(final WorkerEvent event, final Worker worker, final String queue, final Job job,
final Object runner, final Object result, final Throwable t) {
final ConcurrentSet<WorkerListener> listeners = this.eventListenerMap.get(event);
if (listeners != null) {
for (final WorkerListener listener : listeners) {
if (listener != null) {
try {
listener.onEvent(event, worker, queue, job, runner, result, t);
} catch (Exception e) {
log.error("Failure executing listener " + listener + " for event " + event
+ " from queue " + queue + " on worker " + worker, e);
}
}
}
}
}
}

event emitter的实现类,使用EnumMap来存放listener,key是WorkerEvent枚举,而value则是listener的ConcurrentSet,即同一个event可以有多个listener。

事件的触发

jesque-2.1.2-sources.jar!/net/greghaines/jesque/worker/WorkerImpl.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码protected final WorkerListenerDelegate listenerDelegate = new WorkerListenerDelegate();

//......
protected void process(final Job job, final String curQueue) {
try {
this.processingJob.set(true);
if (threadNameChangingEnabled) {
renameThread("Processing " + curQueue + " since " + System.currentTimeMillis());
}
this.listenerDelegate.fireEvent(JOB_PROCESS, this, curQueue, job, null, null, null);
this.jedis.set(key(WORKER, this.name), statusMsg(curQueue, job));
final Object instance = this.jobFactory.materializeJob(job);
final Object result = execute(job, curQueue, instance);
success(job, instance, result, curQueue);
} catch (Throwable thrwbl) {
failure(thrwbl, job, curQueue);
} finally {
removeInFlight(curQueue);
this.jedis.del(key(WORKER, this.name));
this.processingJob.set(false);
}
}

在wokerImpl类里头,组合了WorkerEventEmitter的实现类,然后在相应的方法里头去触发/通知相应的listener(默认是同步执行)

小结

其实本质就是观察者模式,workerImpl是被观察者,listener是观察者,wokerImpl在有相应执行点会触发相应事件,同步通知listner执行相关逻辑。

doc

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%