Flink 从0-1实现 电商实时数仓 - 日志数据采集

这是我参与8月更文挑战的第2天,活动详情查看:8月更文挑战

日志数据采集

流程

image.png

  1. 前端埋点
  2. 通过nginx到日志服务器
  3. 将 Event 打印到日志文件
  4. 将 Event 发送至Kafka

实现

1. 新建日志项目
  1. 新建 springboot 项目 tmall-logger
  2. 添加 Maven 依赖
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
xml复制代码<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<optional>true</optional>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
  1. 新建 LoggerController
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码@Slf4j
@RestController
public class LoggerController {

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
@Value("${kafka.topic}")
private String kafkaTopic;

@RequestMapping("/applog")
public String logger(String param) {
//打印日志
log.info(param);
//发送至kafka
kafkaTemplate.send(kafkaTopic, param);
return "success";
}

}
  1. 配置 logback
* 给 LoggerController 单独配置打印到一个文件,每行一个 Event
* 在 `resources` 新建 `logback-spring.xml`
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<configuration>

<springProperty scope="context" name="LOG_PATH" source="logging.file.path" defaultValue="./logs"/>
<springProperty scope="context" name="LOG_LEVEL" source="logging.level.root" defaultValue="INFO"/>

<appender name="console" class="ch.qos.logback.core.ConsoleAppender">
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>

<appender name="rollingFile" class="ch.qos.logback.core.rolling.RollingFileAppender">
<file>${LOG_PATH}/app.log</file>
<rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
<fileNamePattern>${LOG_HOME}/app.%d{yyyy-MM-dd}.log</fileNamePattern>
</rollingPolicy>
<encoder>
<pattern>%msg%n</pattern>
</encoder>
</appender>


<!-- 将某一个包下日志单独打印日志 -->
<logger name="wang.yeting.tmall.logger.controller.LoggerController"
level="INFO" additivity="false">
<appender-ref ref="rollingFile" />
<appender-ref ref="console" />
</logger>

<root level="error" additivity="false">
<appender-ref ref="console" />
</root>
</configuration>
  1. 修改配置文件
1
2
3
4
5
6
7
8
9
ini复制代码server.port=8081
//kafka
spring.kafka.bootstrap-servers=hd1:9092,hd2:9092,hd3:9092
spring.kafka.producer.key-serializer=org.apache.kafka.common.serialization.StringSerializer
spring.kafka.producer.value-serializer=org.apache.kafka.common.serialization.StringSerializer
kafka.topic=ods_base_log
//日志
#logging.file.path=/opt/logs
#logging.level.root=info
2. 打包部署
  1. maven 打包
  2. 上传到服务器
  3. 启动

我这里启动了三台,分别是 hd1:8081   hd2:8081   hd3:8081

3. 配置 Nginx
  1. 在server内部配置
1
2
3
bash复制代码location /applog{
proxy_pass http://logserver.com;
}
  1. 在server外部配置反向代理
1
2
3
4
5
ini复制代码upstream logserver.com{
server hd1:8081 weight=1;
server hd2:8081 weight=2;
server hd3:8081 weight=3;
}
4. 群启 群停 脚本

提前配置好java环境变量

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
shell复制代码#!/bin/bash

APPNAME=tmall-logger-0.0.1-SNAPSHOT.jar
case $1 in
"start")
{
for i in hd1 hd2 hd3
do
echo "启动 logger 服务: $i"
ssh $i  "java -Xms32m -Xmx64m  -jar /opt/module/tmall/$APPNAME >/dev/null 2>&1  &"
done

echo "启动 hd1 nginx"
ssh hd1 "/opt/module/nginx/sbin/nginx"
};;

"stop")
{
echo "停止 hd1 nginx"
sh hd1 "/opt/module/nginx/sbin/nginx -s stop"

for i in hd1 hd2 hd3
do
echo "停止 logger 服务: $i"
ssh $i "ps -ef|grep $APPNAME |grep -v grep|awk '{print \$2}'|xargs kill" >/dev/null 2>&1
done
};;
esac

明日预告:Flink 从0-1实现 电商实时数仓 - 业务数据采集

关注专栏持续更新 👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻👇🏻

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

0%