开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

火爆!GitHub 标星 144k 的前后端学习路线

发表于 2021-03-11

正在光顾掘金的你,好呀,我是沉默王二(掘金编辑器改版了呀,用起来舒服了,感谢感谢)。

另外,有幸入围掘金 2 月份后端榜单第二名,敲开心!

上周在欣赏阮一峰老师的《科技周刊》时,发现了一个牛逼的学习路线,在 GitHub 上已经标星 144k 了,简直火爆。里面不仅涵盖了前端和后端的学习路线,还有运维的学习路线。作为一名程序员,如果你还不知道这个学习路线的话,那可就亏大发了。

这个学习路线原本是作者为他的大学教授绘制的,给学生们看的,后来就开源到了 GitHub,希望能够借助社区的力量帮助到更多的编程爱好者。

这个学习路线还有配套的视频课程,包含 TCP/UDP、TCP/IP、HTTP 缓存、CDN、DNS 等等,虽然是英文版的,但制作精良,观影体验还是非常不错的。地址如下所示:

roadmap.sh/watch

还有图文版的教程,包括 OAuth、字符编码、SSL/SSH、设计模式、代理服务器等等,文章内容不长,在翻译软件的帮助下,很快就能掌握了。地址如下所示:

roadmap.sh/guides

这份学习路线原本只有英文版的,后来就有雷锋把它翻译成了中文版。我们先来看后端的学习路线,主线是互联网→操作系统→编程语言→版本控制系统→关系型数据库→缓存→网络安全知识→测试→设计和开发原则→消息代理→容器→应用服务器→学无止境。

这幅图做得可真棒,一眼看上去,就爱上了,仿佛我的女神李孝利就站在面前。紫色勾住的是作者认为的重点,你比如说什么是HTTP、DNS是如何工作的、操作系统是如何工作的、进程管理、线程与并发、内存管理、IO 管理、Git 的基本用法、MongoDB、数据库的 ACID 原则、索引及其工作机制、OAuth、Token 验证、REST、CDN、Redis、单元测试、HTTPS、驱动测试开发、ElasticSearch、RabbitMQ、Docker、Nginx 等等,确实都是一个后端工程师需要掌握的知识点。

再来看一下前端的学习路线,主线是互联网→HTML→CSS→JavaScript→版本控制系统→Web 安全知识→包管理工具→构建工具→前端框架→CSS 框架→测试→移动端应用开发→学无止境。

运维方向的学习路线图也来欣赏一下吧。

有了这样清晰的学习路线,是不是就不用再为学什么而发愁了?我相信你的答案是肯定的。当然了,你也不用完全按照上面的学习路线走,因为作者是国外的,国外的开发环境和国内的还是有一些差异的。

比如说编程语言方面,作者推荐的是 JavaScript,国内显然 Java 的应用场景更广泛一些;再比如关系型数据库方面,作者推荐的是 PostgreSQL,国内显然是 MySQL。

你也不用担心,“这么多知识点,我学到头秃也学不完啊!”是滴,任谁都学不完。你需要的是路线,然后结合自己的实际情况,以及未来的职业方向,从中做出选择。

我之所以推荐这个学习路线,并不是因为这个学习路线有多详细,这个路线图画得有多漂亮。而是,大多数初学者在进入编程这个领域后,往往很迷茫,不知道自己该学习什么,东学一点西学一点,等到时间一点一点流逝后,却发现好像什么都没学会。

有了这个学习路线图就完全不一样了,你能很清楚地知道自己要学什么,不用学什么,学习效率就会大幅度提升。

顺带呢,你还可以按照作者的方式给自己整一个学习路线。既然这个学习路线有 144k 的星标,就能说明大家很认可,很值得效仿一下。

想知道作者的路线图是怎么绘制的?作者在 CONTRIBUTING.md 文件中透露了,参照下图。

用的是 Balsamiq 这个工具,有 Windows 和 macOS 两个版本,下载链接如下所示:

balsamiq.com/wireframes/…

顺带贴一下这个学习路线图的中文版 GitHub 地址:

github.com/kamranahmed…

可以趁机把图片保存下来,顺藤摸瓜的评估一下,看看自己的学习路线是否有走偏。

文末,给你推荐两个牛逼的资源:

第一个:GitHub 上星标 1.9k 的开源 Java 电子书,我已经同步到了开源广场:

JavaBooks

第二个:JavaGuide 面试突击最新版 V4.0(GitHub 星标 98k,帮助了无数面试者成功上岸),下载链接:

JavaGuide 面试突击最新版,无套路,不需要解压密码

PS:哇,上期分享的狂补计算机基础知识,让我上了瘾深受掘金的小伙伴们喜欢呀,已经 607 个赞了!开心。

好了,我是沉默王二,希望这篇文章能够帮到你!下期见。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

mongodb聚合操作之Aggregation Pipeli

发表于 2021-03-11
  1. Aggregation Pipeline简介

聚合功能可以把数据像 放入传送带一样,先把原始数据按照一定的规则进行筛选处理,然后通过多个不同的数据处理阶处理数据,最终输出一个汇总的结果

Pipeline 是什么呢?英文直译为“管道”,可以形象的比喻为,将原始数据经过多条“管道”进行聚合处理,然后经过最后一道管道进行聚合处理以后,将聚合结果返回;其中每一个“管道”的处理流程,在官方文档中有一个术语,叫做Pipeline Stage,既是每一个 pipeline 阶段;MongoDB 为每一个 pipeline stage 提供了一系列的 operators 操作来完成聚合处理操作;

使用 MongoDB Pipeline 的优势是,MongoDB 提供了许多内置的( native )的方法( 就是指这些 operators )来帮助高效的进行数据的聚合处理;

Aggregation Pipeline 可以在分片集群上进行处理;

以下是官网的一个演示Aggregation Pipline功能的过程图

启动 Aggregation Pipeline,包含两个部分

通过 db.collection.aggregate() 方法启动

通过 aggregate 命令启动,例如:

db.runCommand( {

aggregate: “articles”,

pipeline: [

{ $project: { tags: 1 } },

{ unwind:”unwind: “unwind:”tags” },

{ group: { \_id: “tags”, count: { $sum : 1 } } }

],

cursor: { }

} )

Pipeline 的执行过程由一系列的 Aggregation Stages 组成

每一个 Stages 又可以包含多个 Pipeline Expressions 来为每个文档进行更深入的操作;该部分又分为两个部分,

常规的 expressions

accumulator expressions

  1. mongodb aggregate操作

说明:

计算集合中的数据的聚合值。

语法:

db.collection.aggregate(pipeline, options)

参数讲解:

pipeline:数组(Array)数据聚合操作或阶段的序列。有关详细信息,请参阅聚合管道操作符

options:可选的。aggregate()传递给aggregate命令的其他选项。仅当您将管道指定为数组时可用。选项可以包含以下字段和值:

2.1. aggregate options参数

2.1.1. explain

boolean类型,可选的。指定返回关于管道处理的信息。有关示例,请参见聚合管道操作的返回信息,在多文档事务中不可用。示例:db.orders.explain().aggregate([

{ $match: { status: “A” } },

{ group: { \_id: “cust_id”, total: { sum:”sum: “sum:”amount” } } },

{ $sort: { total: -1 } }

])

db.orders.aggregate(

[

{ $match: { status: “A” } },

{ group: { \_id: “cust_id”, total: { sum:”sum: “sum:”amount” } } },

{ $sort: { total: -1 } }

],

{

explain: true

}

)

2.1.2. allowDiskUse

boolean类型,可选的。允许写入临时文件。当设置为true时,聚合操作可以将数据写入dbPath目录中的_tmp子目录。有关示例,请参见使用外部排序执行大型排序操作。从MongoDB 4.2开始,如果任何聚合阶段由于内存限制而将数据写到临时文件,则分析器日志消息和诊断日志消息包括一个usedDisk指示器。示例:

var results = db.stocks.aggregate(

[

{ $project : { cusip: 1, date: 1, price: 1, _id: 0 } },

{ $sort : { cusip : 1, date: 1 } }

],

{

allowDiskUse: true

}

)

2.1.3. cursor

可选的。指定游标的初始批处理大小。游标字段的值是一个字段batchSize的文档。有关语法和示例,请参见指定初始批处理大小。示例:cursor: { batchSize: }

2.1.4. maxTimeMs

可选的。指定处理游标操作的时间限制(以毫秒为单位)。如果没有为maxTimeMS指定值,操作将不会超时。值0显式指定默认的无限制行为。MongoDB使用与db.killOp()相同的机制终止超过分配时间限制的操作。MongoDB只在一个指定的中断点终止一个操作。

2.1.5. bypassDocumentValidation

可选的。仅当指定out或out或out或merge聚合阶段时才适用。使db.collection.aggregate以绕过操作期间的文档验证。这允许插入不满足验证要求的文档。

2.1.6. readConcern

可选的。指定读取关系。

2.1.7. collation

可选的。指定操作要使用的排序规则。

2.1.8. hint

可选的。用于聚合的索引。索引位于对其运行聚合的初始集合/视图上。通过索引名称或索引规范文档指定索引。

2.1.9. comment

可选的。用户可以指定任意字符串来帮助通过数据库分析器、currentOp和日志跟踪操作。通过索引名称或索引规范文档指定索引。

2.1.10. writeConcern

可选的。表示要与out或out或out或merge阶段一起使用的写关注点的文档。通过索引名称或索引规范文档指定索引。忽略对out或out或out或merge阶段使用默认的写入关注。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java实现解压缩文件和文件夹 一 前言 二 压缩文件 三

发表于 2021-03-10

一 前言

项目开发中,总会遇到解压缩文件的时候。比如,用户下载多个文件时,服务端可以将多个文件压缩成一个文件(例如xx.zip或xx.rar)。用户上传资料时,允许上传压缩文件,服务端进行解压读取每一个文件。

基于通用性,以下介绍几种解压缩文件的方式,包装成工具类,供平时开发使用。

二 压缩文件

压缩文件,顾名思义,即把一个或多个文件压缩成一个文件。压缩也有2种形式,一种是将所有文件压缩到同一目录下,此种方式要注意文件重名覆盖的问题。另一种是按原有文件树结构进行压缩,即压缩后的文件树结构保持不变。

压缩文件操作,会使用到一个类,即ZipOutputStream。

2.1 压缩多个文件

此方法将所有文件压缩到同一个目录下。方法传入多个文件列表,和一个最终压缩到的文件路径名。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
java复制代码    /**
* 压缩多个文件,压缩后的所有文件在同一目录下
*
* @param zipFileName 压缩后的文件名
* @param files 需要压缩的文件列表
* @throws IOException IO异常
*/
public static void zipMultipleFiles(String zipFileName, File... files) throws IOException {
ZipOutputStream zipOutputStream = null;
try {
// 输出流
zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFileName));
// 遍历每一个文件,进行输出
for (File file : files) {
zipOutputStream.putNextEntry(new ZipEntry(file.getName()));
FileInputStream fileInputStream = new FileInputStream(file);
int readLen;
byte[] buffer = new byte[1024];
while ((readLen = fileInputStream.read(buffer)) != -1) {
zipOutputStream.write(buffer, 0, readLen);
}
// 关闭流
fileInputStream.close();
zipOutputStream.closeEntry();
}
} finally {
if (null != zipOutputStream) {
try {
zipOutputStream.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}

测试,将D盘下的infp.txt和infp1.txt文件压缩到D盘下,压缩文件名为my.zip。

1
2
3
java复制代码    public static void main(String[] args) throws Exception {
zipMultipleFiles("D:/my.zip", new File("D:/infp.txt"), new File("D:/infp1.txt"));
}

2.2 压缩文件或文件树

此方法将文件夹下的所有文件按原有的树形结构压缩到文件中,也支持压缩单个文件。原理也简单,无非就是递归遍历文件树中的每一个文件,进行压缩。有个注意的点每一个文件的写入路径是基于压缩文件位置的相对路径。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
java复制代码package com.nobody.zip;

import java.io.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

public class ZipUtils {

/**
* 压缩文件或文件夹(包括所有子目录文件)
*
* @param sourceFile 源文件
* @param format 格式(zip或rar)
* @throws IOException 异常信息
*/
public static void zipFileTree(File sourceFile, String format) throws IOException {
ZipOutputStream zipOutputStream = null;
try {
String zipFileName;
if (sourceFile.isDirectory()) { // 目录
zipFileName = sourceFile.getParent() + File.separator + sourceFile.getName() + "."
+ format;
} else { // 单个文件
zipFileName = sourceFile.getParent()
+ sourceFile.getName().substring(0, sourceFile.getName().lastIndexOf("."))
+ "." + format;
}
// 压缩输出流
zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFileName));
zip(sourceFile, zipOutputStream, "");
} finally {
if (null != zipOutputStream) {
// 关闭流
try {
zipOutputStream.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}

/**
* 递归压缩文件
*
* @param file 当前文件
* @param zipOutputStream 压缩输出流
* @param relativePath 相对路径
* @throws IOException IO异常
*/
private static void zip(File file, ZipOutputStream zipOutputStream, String relativePath)
throws IOException {

FileInputStream fileInputStream = null;
try {
if (file.isDirectory()) { // 当前为文件夹
// 当前文件夹下的所有文件
File[] list = file.listFiles();
if (null != list) {
// 计算当前的相对路径
relativePath += (relativePath.length() == 0 ? "" : "/") + file.getName();
// 递归压缩每个文件
for (File f : list) {
zip(f, zipOutputStream, relativePath);
}
}
} else { // 压缩文件
// 计算文件的相对路径
relativePath += (relativePath.length() == 0 ? "" : "/") + file.getName();
// 写入单个文件
zipOutputStream.putNextEntry(new ZipEntry(relativePath));
fileInputStream = new FileInputStream(file);
int readLen;
byte[] buffer = new byte[1024];
while ((readLen = fileInputStream.read(buffer)) != -1) {
zipOutputStream.write(buffer, 0, readLen);
}
zipOutputStream.closeEntry();
}
} finally {
// 关闭流
if (fileInputStream != null) {
try {
fileInputStream.close();
} catch (IOException ex) {
ex.printStackTrace();
}
}
}
}

public static void main(String[] args) throws Exception {
String path = "D:/test";
String format = "zip";
zipFileTree(new File(path), format);
}
}

上例将test目录下的所有文件压缩到同一目录下的test.zip文件中。

2.3 借助文件访问器压缩

还有一种更简单的方式,我们不自己写递归遍历。借助Java原生类,SimpleFileVisitor,它提供了几个访问文件的方法,其中有个方法visitFile,对于文件树中的每一个文件(文件夹除外),都会调用这个方法。我们只要写一个类继承SimpleFileVisitor,然后重写visitFile方法,实现将每一个文件写入到压缩文件中即可。

当然,除了visitFile方法,它里面还有preVisitDirectory,postVisitDirectory,visitFileFailed等方法,通过方法名大家也猜出什么意思了。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
java复制代码package com.nobody.zip;

import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.*;
import java.nio.file.attribute.BasicFileAttributes;
import java.util.zip.ZipEntry;
import java.util.zip.ZipOutputStream;

/**
* @Description
* @Author Mr.nobody
* @Date 2021/3/8
* @Version 1.0.0
*/
public class ZipFileTree extends SimpleFileVisitor<Path> {

// zip输出流
private ZipOutputStream zipOutputStream;
// 源目录
private Path sourcePath;

public ZipFileTree() {}

/**
* 压缩目录以及所有子目录文件
*
* @param sourceDir 源目录
*/
public void zipFile(String sourceDir) throws IOException {
try {
// 压缩后的文件和源目录在同一目录下
String zipFileName = sourceDir + ".zip";
this.zipOutputStream = new ZipOutputStream(new FileOutputStream(zipFileName));
this.sourcePath = Paths.get(sourceDir);

// 开始遍历文件树
Files.walkFileTree(sourcePath, this);
} finally {
// 关闭流
if (null != zipOutputStream) {
zipOutputStream.close();
}
}
}

// 遍历到的每一个文件都会执行此方法
@Override
public FileVisitResult visitFile(Path file, BasicFileAttributes attributes) throws IOException {
// 取相对路径
Path targetFile = sourcePath.relativize(file);
// 写入单个文件
zipOutputStream.putNextEntry(new ZipEntry(targetFile.toString()));
byte[] bytes = Files.readAllBytes(file);
zipOutputStream.write(bytes, 0, bytes.length);
zipOutputStream.closeEntry();
// 继续遍历
return FileVisitResult.CONTINUE;
}

// 遍历每一个目录时都会调用的方法
@Override
public FileVisitResult preVisitDirectory(Path dir, BasicFileAttributes attrs)
throws IOException {
return super.preVisitDirectory(dir, attrs);
}

// 遍历完一个目录下的所有文件后,再调用这个目录的方法
@Override
public FileVisitResult postVisitDirectory(Path dir, IOException exc) throws IOException {
return super.postVisitDirectory(dir, exc);
}

// 遍历文件失败后调用的方法
@Override
public FileVisitResult visitFileFailed(Path file, IOException exc) throws IOException {
return super.visitFileFailed(file, exc);
}

public static void main(String[] args) throws IOException {
// 需要压缩源目录
String sourceDir = "D:/test";
// 压缩
new ZipFileTree().zipFile(sourceDir);
}
}

三 解压文件

解压压缩包,借助ZipInputStream类,可以读取到压缩包中的每一个文件,然后根据读取到的文件属性,写入到相应路径下即可。对于解压压缩包中是文件树的结构,每读取到一个文件后,如果是多层路径下的文件,需要先创建父目录,再写入文件流。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
java复制代码package com.nobody.zip;

import java.io.*;
import java.util.zip.ZipEntry;
import java.util.zip.ZipInputStream;
import java.util.zip.ZipOutputStream;

/**
* @Description 解压缩文件工具类
* @Author Mr.nobody
* @Date 2021/3/8
* @Version 1.0.0
*/
public class ZipUtils {

/**
* 解压
*
* @param zipFilePath 带解压文件
* @param desDirectory 解压到的目录
* @throws Exception
*/
public static void unzip(String zipFilePath, String desDirectory) throws Exception {

File desDir = new File(desDirectory);
if (!desDir.exists()) {
boolean mkdirSuccess = desDir.mkdir();
if (!mkdirSuccess) {
throw new Exception("创建解压目标文件夹失败");
}
}
// 读入流
ZipInputStream zipInputStream = new ZipInputStream(new FileInputStream(zipFilePath));
// 遍历每一个文件
ZipEntry zipEntry = zipInputStream.getNextEntry();
while (zipEntry != null) {
if (zipEntry.isDirectory()) { // 文件夹
String unzipFilePath = desDirectory + File.separator + zipEntry.getName();
// 直接创建
mkdir(new File(unzipFilePath));
} else { // 文件
String unzipFilePath = desDirectory + File.separator + zipEntry.getName();
File file = new File(unzipFilePath);
// 创建父目录
mkdir(file.getParentFile());
// 写出文件流
BufferedOutputStream bufferedOutputStream =
new BufferedOutputStream(new FileOutputStream(unzipFilePath));
byte[] bytes = new byte[1024];
int readLen;
while ((readLen = zipInputStream.read(bytes)) != -1) {
bufferedOutputStream.write(bytes, 0, readLen);
}
bufferedOutputStream.close();
}
zipInputStream.closeEntry();
zipEntry = zipInputStream.getNextEntry();
}
zipInputStream.close();
}

// 如果父目录不存在则创建
private static void mkdir(File file) {
if (null == file || file.exists()) {
return;
}
mkdir(file.getParentFile());
file.mkdir();
}

public static void main(String[] args) throws Exception {
String zipFilePath = "D:/test.zip";
String desDirectory = "D:/a";
unzip(zipFilePath, desDirectory);
}
}

四 总结

  • 在解压缩文件过程中,主要是对流的读取操作,注意进行异常处理,以及关闭流。
  • web应用中,通过接口可以实现文件上传下载,对应的我们只要把压缩后的文件,写入到response.getOutputStream()输出流即可。
  • 解压缩文件时,注意空文件夹的处理。

此演示项目已上传到Github,如有需要可自行下载,欢迎 Star 。 github.com/LucioChn/co…

在这里插入图片描述

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Flink学习01-快速上手(Kafka+Flink+Mys

发表于 2021-03-10

背景介绍

近期项目中需要对数据进行转存,简单学习了一下Flink相关的知识点,通过一个简单的集成,对近期的学习成果做一个记录和梳理,帮助初学的同学窥探Flink流式数据处理的完整过程。Demo的设计思路是:应用系统定期向Kafka中发送数据,Flink获取Kafka中的流式数据后存储到MySQL数据库中。

1
2
复制代码【说明】由于本人在写笔记的时候也是一个Flink的初学者,难免会有思虑不全或者不对的地方,希望大家能够见谅,
如果本文有幸对您的工作或学习有所帮助,笔者在此感到无限荣幸。

一、环境需求

1、开发环境操作系统是windows,需要安装kafka(2.13-2.7.0),kafka启动依赖zookeeper(3.5.5),jdk(1.8.61),mysql(8.0.15)

2、zookeeper环境:由于kafka启动时对zookeeper有依赖,首先配置zookeeper环境,解压从官网下载的zookeeper安装文件后(切记目录不要放得太深,否则windows下启动会保错),进入conf文件加下,修改zoo.cfg文件(如果没有可以将zoo_sample.cfg重命名),增加或修改内容如下

1
2
3
4
ini复制代码# 可随意指定数据存放目录
dataDir=D:\software\bigData\zookeeper\3.5.5\data
# 文件末尾增加一行日志存放目录
dataLogDir=D:\software\bigData\zookeeper\3.5.5\data\log

从命令行进入zookeeper安装目录下,使用下面命令启动windows下的zookeeper服务,默认服务端口为2181

1
python复制代码.\bin\zkServer.cmd

命令行启动日志没有error信息说明启动成功,继续下面步骤。

3、kafka环境:解压下载的kafka安装包至合适的位置后,进入config目录,修改server.properties文件,修改内容如下

1
2
3
4
ini复制代码# kafka日志文件位置
log.dirs=./logs
# 步骤2中配置的zookeeper的地址和端口
zookeeper.connect=localhost:2181

由于当前示例仅仅是测试开发,kafka、zookeeper没有做集群配置,其他配置均使用默认,使用下面命令启动kafka服务,如果控制台没有错误信息,说明启动成功。

1
vbscript复制代码.\bin\windows\kafka-server-start.bat .\config\server.properties

二、Demo开发

1、创建mysql数据库表

1
2
3
4
5
6
7
sql复制代码CREATE TABLE `student`  (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(20) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
`age` bigint(20) NULL DEFAULT NULL,
`createDate` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL,
PRIMARY KEY (`id`) USING BTREE
)

2、开发项目中的maven依赖

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
pom复制代码<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>8</maven.compiler.source>
<maven.compiler.target>8</maven.compiler.target>
<flink.version>1.10.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka-0.11_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-jdbc_2.11</artifactId>
<version>${flink.version}</version>
</dependency>

<dependency>
<groupId>org.influxdb</groupId>
<artifactId>influxdb-java</artifactId>
<version>2.17</version>
</dependency>

<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.15</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>druid</artifactId>
<version>1.1.22</version>
</dependency>

<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.12</version>
</dependency>

<dependency>
<groupId>com.google.code.gson</groupId>
<artifactId>gson</artifactId>
<version>2.8.5</version>
</dependency>

<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.71</version>
</dependency>

<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-collections4</artifactId>
<version>4.2</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>1.7.25</version>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-simple</artifactId>
<version>1.7.25</version>
<scope>compile</scope>
</dependency>

<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-nop</artifactId>
<version>1.7.25</version>
<scope>test</scope>
</dependency>

3、使用druid创建数据库连接池

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
java复制代码package com.nandy.influxdb.common;

import com.alibaba.druid.pool.DruidDataSourceFactory;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.io.IOException;
import java.io.InputStream;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.Properties;

/**
* @author nandy
* @create 2021/3/2 16:32
*/
@Slf4j
public class ConnectionPoolUtils {

private static final Properties properties;
private static Connection conn;

static {
properties = new Properties();
ClassLoader classLoader = ConnectionPoolUtils.class.getClassLoader();
InputStream resourceAsStream = classLoader.getResourceAsStream("jdbc.properties");
// 加载配置文件
try {
properties.load(resourceAsStream);
} catch (IOException e) {
log.error("Load jdbc properties exception.");
}
}

private ConnectionPoolUtils() {
}

public static Connection getConnection() throws SQLException {

try {
DataSource dataSource = DruidDataSourceFactory.createDataSource(properties);
conn = dataSource.getConnection();
} catch (SQLException e) {
log.error("error occurred :" + e);
conn = null;
} catch (Exception e) {
log.error("error occurred while creating connection pool!");
}
return conn;
}
}

jdbc.properties

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
properties复制代码driverClassName=com.mysql.cj.jdbc.Driver
url=jdbc:mysql://localhost:3306/flink_study?serverTimezone=UTC
username=root
password=xxx
filters=stat
initialSize=5
maxActive=50
maxWait=60000
timeBetweenEvictionRunsMillis=60000
minEvictableIdleTimeMillis=300000
validationQuery=SELECT 1
testWhileIdle=true
testOnBorrow=false
testOnReturn=false
poolPreparedStatements=false
maxPoolPreparedStatementPerConnectionSize=200

log4j.properties

1
2
3
4
5
js复制代码log4j.rootLogger=info,stdout
log4j.appender.stdout = org.apache.log4j.ConsoleAppender
log4j.appender.stdout.Target = System.out
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS} [%t] [%c] [%p] - %m%n

4、数据库持久化对象Student

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
java复制代码package com.nandy.models;

import lombok.Getter;
import lombok.Setter;

import java.io.Serializable;

/**
* @author nandy
* @create 2021/3/2 18:54
*/
@Setter
@Getter
public class Student implements Serializable {

private static final long serialVersionUID = -3247106837870523911L;

private int id;

private String name;

private int age;

private String createDate;
}

5、Flink中的sink对象

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
java复制代码package com.nandy.mysql;

import com.nandy.models.Student;
import com.nandy.mysql.utils.ConnectionPoolUtils;
import lombok.extern.slf4j.Slf4j;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.util.List;

/**
* @author nandy
* @create 2021/3/2 17:13
*/
@Slf4j
public class Flink2JdbcWriter extends RichSinkFunction<List<Student>> {
private static final long serialVersionUID = -5072869539213229634L;


private transient Connection connection = null;
private transient PreparedStatement ps = null;
private volatile boolean isRunning = true;

@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);

connection = ConnectionPoolUtils.getConnection();
if (null != connection) {
ps = connection.prepareStatement("insert into student (name, age, createDate) values (?, ?, ?);");
}
}

@Override
public void invoke(List<Student> list, Context context) throws Exception {

if (isRunning && null != ps) {
for (Student one : list) {
ps.setString(1, one.getName());
ps.setInt(2, one.getAge());
ps.setString(3, one.getCreateDate());
ps.addBatch();
}
int [] count = ps.executeBatch();
log.info("成功写入Mysql数量:" + count.length);
}
}

@Override
public void close() throws Exception {
try {
super.close();
if (connection != null) {
connection.close();
}
if (ps != null) {
ps.close();
}
} catch (Exception e) {
e.printStackTrace();
}
isRunning = false;
}
}

6、主函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
java复制代码/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nandy;

import com.alibaba.fastjson.JSON;
import com.nandy.models.Student;
import com.nandy.mysql.Flink2JdbcWriter;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.shaded.guava18.com.google.common.collect.Lists;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.windowing.AllWindowFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010;
import org.apache.flink.util.Collector;

import java.util.List;
import java.util.Properties;

/**
* Skeleton for a Flink Streaming Job.
*
* <p>For a tutorial how to write a Flink streaming application, check the
* tutorials and examples on the <a href="http://flink.apache.org/docs/stable/">Flink Website</a>.
*
* <p>To package your application into a JAR file for execution, run
* 'mvn clean package' on the command line.
*
* <p>If you change the name of the main class (with the public static void main(String[] args))
* method, change the respective entry in the POM.xml file (simply search for 'mainClass').
*/
@Slf4j
public class FlinkReadDbWriterDb {

public static void main(String[] args) throws Exception {
// 构建流执行环境
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

// kafka 配置
Properties props = new Properties();
props.put("bootstrap.servers", "192.168.10.42:9092");
props.put("zookeeper.connect", "192.168.10.42:2181");
props.put("group.id", "metric-group");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "latest");

DataStreamSource<String> dataStreamSource = env.addSource(
new FlinkKafkaConsumer010<String>(
//这个 kafka topic 需要和上面的工具类的 topic 一致
"student",
new SimpleStringSchema(),
props))
//单线程打印,控制台不乱序,不影响结果
.setParallelism(1);
//从kafka里读取数据,转换成Person对象
DataStream<Student> dataStream = dataStreamSource.map(string -> JSON.parseObject(string, Student.class));

//收集5秒钟的总数
dataStream.timeWindowAll(Time.seconds(5L)).apply(new AllWindowFunction<Student, List<Student>, TimeWindow>() {
@Override
public void apply(TimeWindow timeWindow, Iterable<Student> iterable, Collector<List<Student>> collector) throws Exception {
List<Student> students = Lists.newArrayList(iterable);
if(CollectionUtils.isNotEmpty(students)){
log.info("5秒的总共收到的条数:" + students.size());
collector.collect(students);
}
}
//sink 到数据库
}).addSink(new Flink2JdbcWriter());

env.execute("Flink Streaming Java API Skeleton");
}
}

三、测试及结果

1、创建测试类,向kafka创建的主题发送数据

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
java复制代码package com.nandy.kafka;

import com.alibaba.fastjson.JSON;
import com.nandy.models.Student;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.RandomUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.time.LocalDateTime;
import java.util.Properties;
import java.util.concurrent.TimeUnit;

/**
* @author nandy
* @create 2021/3/4 9:50
*/
@Slf4j
public class KafkaWriter {
//本地的kafka机器列表
public static final String BROKER_LIST = "192.168.10.42:9092";
//kafka的topic
public static final String TOPIC_PERSON = "student";
//key序列化的方式,采用字符串的形式
public static final String KEY_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
//value的序列化的方式
public static final String VALUE_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";

public static void writeToKafka() {
Properties props = new Properties();
props.put("bootstrap.servers", BROKER_LIST);
props.put("key.serializer", KEY_SERIALIZER);
props.put("value.serializer", VALUE_SERIALIZER);

try (KafkaProducer<String, String> producer = new KafkaProducer<>(props)) {
//构建Person对象,在name为hqs后边加个随机数
int randomInt = RandomUtils.nextInt(1, 100000);
Student student = new Student();
student.setName("nandy" + randomInt);
student.setAge(randomInt);
student.setCreateDate(LocalDateTime.now().toString());
//转换成JSON
String personJson = JSON.toJSONString(student);

//包装成kafka发送的记录
ProducerRecord<String, String> record = new ProducerRecord<>(TOPIC_PERSON, null,
null, personJson);
//发送到缓存
producer.send(record);
log.info("向kafka发送数据:" + personJson);
//立即发送
producer.flush();
}
}

public static void main(String[] args) {
int count = 0;
while (count < 20) {
try {
//每三秒写一条数据
TimeUnit.SECONDS.sleep(3);
writeToKafka();
count++;
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

2、数据库student表中的数据

image.png

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

18 个 Java8 日期处理的实践,太有用了! Java

发表于 2021-03-10

Java 8 日期处理

Java 8 推出了全新的日期时间API,在教程中我们将通过一些简单的实例来学习如何使用新API。

Java处理日期、日历和时间的方式一直为社区所诟病,将 java.util.Date设定为可变类型,以及SimpleDateFormat的非线程安全使其应用非常受限。

新API基于ISO标准日历系统,java.time包下的所有类都是不可变类型而且线程安全。

示例1:Java 8中获取今天的日期

Java 8 中的 LocalDate 用于表示当天日期。和java.util.Date不同,它只有日期,不包含时间。当你仅需要表示日期时就用这个类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码package com.shxt.demo02;

import java.time.LocalDate;

public class Demo01 {
public static void main(String[] args) {
LocalDate today = LocalDate.now();
System.out.println("今天的日期:"+today);
}
}
/*
运行结果:
今天的日期:2018-02-05
*/

示例2:Java 8中获取年、月、日信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
ini复制代码package com.shxt.demo02;

import java.time.LocalDate;

public class Demo02 {
public static void main(String[] args) {
LocalDate today = LocalDate.now();
int year = today.getYear();
int month = today.getMonthValue();
int day = today.getDayOfMonth();

System.out.println("year:"+year);
System.out.println("month:"+month);
System.out.println("day:"+day);

}
}

示例3:Java 8中处理特定日期

我们通过静态工厂方法now()非常容易地创建了当天日期,你还可以调用另一个有用的工厂方法LocalDate.of()创建任意日期, 该方法需要传入年、月、日做参数,返回对应的LocalDate实例。这个方法的好处是没再犯老API的设计错误,比如年度起始于1900,月份是从0开 始等等。

1
2
3
4
5
6
7
8
9
10
java复制代码package com.shxt.demo02;

import java.time.LocalDate;

public class Demo03 {
public static void main(String[] args) {
LocalDate date = LocalDate.of(2018,2,6);
System.out.println("自定义日期:"+date);
}
}

示例4:Java 8中判断两个日期是否相等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码package com.shxt.demo02;

import java.time.LocalDate;

public class Demo04 {
public static void main(String[] args) {
LocalDate date1 = LocalDate.now();

LocalDate date2 = LocalDate.of(2018,2,5);

if(date1.equals(date2)){
System.out.println("时间相等");
}else{
System.out.println("时间不等");
}

}
}

示例5:Java 8中检查像生日这种周期性事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码package com.shxt.demo02;

import java.time.LocalDate;
import java.time.MonthDay;

public class Demo05 {
public static void main(String[] args) {
LocalDate date1 = LocalDate.now();

LocalDate date2 = LocalDate.of(2018,2,6);
MonthDay birthday = MonthDay.of(date2.getMonth(),date2.getDayOfMonth());
MonthDay currentMonthDay = MonthDay.from(date1);

if(currentMonthDay.equals(birthday)){
System.out.println("是你的生日");
}else{
System.out.println("你的生日还没有到");
}

}
}

只要当天的日期和生日匹配,无论是哪一年都会打印出祝贺信息。你可以把程序整合进系统时钟,看看生日时是否会受到提醒,或者写一个单元测试来检测代码是否运行正确。

示例6:Java 8中获取当前时间

1
2
3
4
5
6
7
8
9
10
11
java复制代码package com.shxt.demo02;

import java.time.LocalTime;

public class Demo06 {
public static void main(String[] args) {
LocalTime time = LocalTime.now();
System.out.println("获取当前的时间,不含有日期:"+time);

}
}

可以看到当前时间就只包含时间信息,没有日期

示例7:Java 8中获取当前时间

通过增加小时、分、秒来计算将来的时间很常见。Java 8除了不变类型和线程安全的好处之外,还提供了更好的plusHours()方法替换add(),并且是兼容的。注意,这些方法返回一个全新的LocalTime实例,由于其不可变性,返回后一定要用变量赋值。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码package com.shxt.demo02;

import java.time.LocalTime;

public class Demo07 {
public static void main(String[] args) {
LocalTime time = LocalTime.now();
LocalTime newTime = time.plusHours(3);
System.out.println("三个小时后的时间为:"+newTime);

}
}

示例8:Java 8如何计算一周后的日期

和上个例子计算3小时以后的时间类似,这个例子会计算一周后的日期。LocalDate日期不包含时间信息,它的plus()方法用来增加天、周、月,ChronoUnit类声明了这些时间单位。由于LocalDate也是不变类型,返回后一定要用变量赋值。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
java复制代码package com.shxt.demo02;

import java.time.LocalDate;
import java.time.temporal.ChronoUnit;

public class Demo08 {
public static void main(String[] args) {
LocalDate today = LocalDate.now();
System.out.println("今天的日期为:"+today);
LocalDate nextWeek = today.plus(1, ChronoUnit.WEEKS);
System.out.println("一周后的日期为:"+nextWeek);

}
}

可以看到新日期离当天日期是7天,也就是一周。你可以用同样的方法增加1个月、1年、1小时、1分钟甚至一个世纪,更多选项可以查看Java 8 API中的ChronoUnit类

示例9:Java 8计算一年前或一年后的日期

利用minus()方法计算一年前的日期

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码package com.shxt.demo02;

import java.time.LocalDate;
import java.time.temporal.ChronoUnit;

public class Demo09 {
public static void main(String[] args) {
LocalDate today = LocalDate.now();

LocalDate previousYear = today.minus(1, ChronoUnit.YEARS);
System.out.println("一年前的日期 : " + previousYear);

LocalDate nextYear = today.plus(1, ChronoUnit.YEARS);
System.out.println("一年后的日期:"+nextYear);

}
}

示例10:Java 8的Clock时钟类

Java 8增加了一个Clock时钟类用于获取当时的时间戳,或当前时区下的日期时间信息。以前用到System.currentTimeInMillis()和TimeZone.getDefault()的地方都可用Clock替换。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码package com.shxt.demo02;

import java.time.Clock;

public class Demo10 {
public static void main(String[] args) {
// Returns the current time based on your system clock and set to UTC.
Clock clock = Clock.systemUTC();
System.out.println("Clock : " + clock.millis());

// Returns time based on system clock zone
Clock defaultClock = Clock.systemDefaultZone();
System.out.println("Clock : " + defaultClock.millis());

}
}

示例11:如何用Java判断日期是早于还是晚于另一个日期

另一个工作中常见的操作就是如何判断给定的一个日期是大于某天还是小于某天?在Java 8中,LocalDate类有两类方法isBefore()和isAfter()用于比较日期。调用isBefore()方法时,如果给定日期小于当前日期则返回true。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
java复制代码package com.shxt.demo02;

import java.time.LocalDate;
import java.time.temporal.ChronoUnit;


public class Demo11 {
public static void main(String[] args) {
LocalDate today = LocalDate.now();

LocalDate tomorrow = LocalDate.of(2018,2,6);
if(tomorrow.isAfter(today)){
System.out.println("之后的日期:"+tomorrow);
}

LocalDate yesterday = today.minus(1, ChronoUnit.DAYS);
if(yesterday.isBefore(today)){
System.out.println("之前的日期:"+yesterday);
}
}
}

示例12:Java 8中处理时区

Java 8不仅分离了日期和时间,也把时区分离出来了。现在有一系列单独的类如ZoneId来处理特定时区,ZoneDateTime类来表示某时区下的时间。这在Java 8以前都是 GregorianCalendar类来做的。下面这个例子展示了如何把本时区的时间转换成另一个时区的时间。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码package com.shxt.demo02;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.time.ZonedDateTime;

public class Demo12 {
public static void main(String[] args) {
// Date and time with timezone in Java 8
ZoneId america = ZoneId.of("America/New_York");
LocalDateTime localtDateAndTime = LocalDateTime.now();
ZonedDateTime dateAndTimeInNewYork = ZonedDateTime.of(localtDateAndTime, america );
System.out.println("Current date and time in a particular timezone : " + dateAndTimeInNewYork);
}
}

示例13:如何表示信用卡到期这类固定日期,答案就在YearMonth

与 MonthDay检查重复事件的例子相似,YearMonth是另一个组合类,用于表示信用卡到期日、FD到期日、期货期权到期日等。还可以用这个类得到 当月共有多少天,YearMonth实例的lengthOfMonth()方法可以返回当月的天数,在判断2月有28天还是29天时非常有用。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码package com.shxt.demo02;

import java.time.*;

public class Demo13 {
public static void main(String[] args) {
YearMonth currentYearMonth = YearMonth.now();
System.out.printf("Days in month year %s: %d%n", currentYearMonth, currentYearMonth.lengthOfMonth());
YearMonth creditCardExpiry = YearMonth.of(2019, Month.FEBRUARY);
System.out.printf("Your credit card expires on %s %n", creditCardExpiry);
}
}

示例14:如何在Java 8中检查闰年

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码package com.shxt.demo02;

import java.time.LocalDate;

public class Demo14 {
public static void main(String[] args) {
LocalDate today = LocalDate.now();
if(today.isLeapYear()){
System.out.println("This year is Leap year");
}else {
System.out.println("2018 is not a Leap year");
}

}
}

示例15:计算两个日期之间的天数和月数

有一个常见日期操作是计算两个日期之间的天数、周数或月数。在Java 8中可以用java.time.Period类来做计算。下面这个例子中,我们计算了当天和将来某一天之间的月数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码package com.shxt.demo02;

import java.time.LocalDate;
import java.time.Period;

public class Demo15 {
public static void main(String[] args) {
LocalDate today = LocalDate.now();

LocalDate java8Release = LocalDate.of(2018, 12, 14);

Period periodToNextJavaRelease = Period.between(today, java8Release);
System.out.println("Months left between today and Java 8 release : "
+ periodToNextJavaRelease.getMonths() );


}
}

示例16:在Java 8中获取当前的时间戳

Instant类有一个静态工厂方法now()会返回当前的时间戳,如下所示:

1
2
3
4
5
6
7
8
9
10
java复制代码package com.shxt.demo02;

import java.time.Instant;

public class Demo16 {
public static void main(String[] args) {
Instant timestamp = Instant.now();
System.out.println("What is value of this instant " + timestamp.toEpochMilli());
}
}

时间戳信息里同时包含了日期和时间,这和java.util.Date很像。实际上Instant类确实等同于 Java 8之前的Date类,你可以使用Date类和Instant类各自的转换方法互相转换,例如:Date.from(Instant) 将Instant转换成java.util.Date,Date.toInstant()则是将Date类转换成Instant类。

示例17:Java 8中如何使用预定义的格式化工具去解析或格式化日期

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码package com.shxt.demo02;

import java.time.LocalDate;
import java.time.format.DateTimeFormatter;

public class Demo17 {
public static void main(String[] args) {
String dayAfterTommorrow = "20180205";
LocalDate formatted = LocalDate.parse(dayAfterTommorrow,
DateTimeFormatter.BASIC_ISO_DATE);
System.out.println(dayAfterTommorrow+" 格式化后的日期为: "+formatted);
}
}

示例18:字符串互转日期类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
java复制代码package com.shxt.demo02;

import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;

public class Demo18 {
public static void main(String[] args) {
LocalDateTime date = LocalDateTime.now();

DateTimeFormatter format1 = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
//日期转字符串
String str = date.format(format1);

System.out.println("日期转换为字符串:"+str);

DateTimeFormatter format2 = DateTimeFormatter.ofPattern("yyyy/MM/dd HH:mm:ss");
//字符串转日期
LocalDate date2 = LocalDate.parse(str,format2);
System.out.println("日期类型:"+date2);

}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

服务器推送消息到前端实现页面数据实时刷新-分布式Websoc

发表于 2021-03-09

背景

项目上有个新的需求,需要在系统数据发生改变时,前端页面要实时刷新页面数据。

简单的方案一:

  最简单的方式就是直接在前端页面使用定时器定时刷新数据。

  这个方案除非是定时的时间设置很短,否则还是会存在页面刷新不及时的情况。但是如果定时时间设置得过短,一旦客户端使用量变多,整个系统的请求数量会变的非常多,需要消耗许多服务器资源。故放弃这个方案。

方案二:

  服务端推送的方式,通过使用Websocket,进入页面时,前端就与服务端建立起socket通道,当系统数据发生改变时,在服务端选中需要刷新的页面的socket会话,主动发送消息到前端,通知前端重新请求数据。

  这个方案能达到实时刷新的需求,但考虑到的是客户端数量增长上来,建立的socket太多,会不会对占用太多的服务器资源。然后经过自己开发环境的简单测试,建立几千个socket对服务器资源消耗不大,就暂时决定用这个方案了。最终效果如何还是要 看生产环境的表现的。

实现

前端

前端使用的是vue框架

在methods中添加websocket相关函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
javascript复制代码    data(){
return {
ws:null
}
},

methods: {
websocket() {
//建立socket通道
//process.env.VUE_APP_URL为服务端地址
//code为自定义的参数,主要是用于服务端标识当前会话是哪个用户
this.ws = new WebSocket(
'ws:' +
process.env.VUE_APP_URL +
'/websocket?identity=identity'
);
//socket连接成功后的回调函数
this.ws.onopen = () => {
console.log('websocket连接成功!');
//若项目中没有使用nginx转发请求则忽略这步
//设置定时器,每过55秒传一次数据
//以防长时间不通信被nginx自动关闭会话
//也可以通过修改nginx配置文件延长关闭时间
setInterval(() => {
this.keepAlive(ws);
}, 55000);
};
//接收来自服务端消息的回调函数
//fluseData() 为自定义的数据刷新函数
this.ws.onmessage = evt => {
console.log('已接收来自后台的消息:', evt);
// 刷新数据
this.fluseData();
};
//关闭socket的回调函数
this.ws.onclose = function() {
// 关闭 websocket
console.log('连接已关闭...');
};
// 路由跳转时结束websocket链接
this.$router.afterEach(function() {
this.ws.close();
});
},
//持续向后台发送消息,用于维护socket通道不被nginx关闭
keepAlive(webSocket) {
if (webSocket) {
if (webSocket.readyState == webSocket.OPEN) {
webSocket.send('');
}
}
}
}

在页面加载时调用函数建立socket连接

1
2
3
javascript复制代码mounted() {
this.websocket();
}

在页面关闭或销毁时关闭socket

1
2
3
javascript复制代码beforeDestroy() {
this.ws.close();
},

后端

引入jar包

1
2
3
4
5
java复制代码    <dependency>
<groupId>org.apache.tomcat.embed</groupId>
<artifactId>tomcat-embed-websocket</artifactId>
<version>9.0.38</version>
</dependency>

设置websocket配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
java复制代码
@Configuration
public class MyWebsocketConfig implements ServerApplicationConfig {
@Override
public Set<ServerEndpointConfig> getEndpointConfigs(Set<Class<? extends Endpoint>> set) {
return null;
}

@Override
public Set<Class<?>> getAnnotatedEndpointClasses(Set<Class<?>> set) {
return set;
}


}

设置MyWebsocket

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
java复制代码
@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class MyWebsocket {

/**
* 存放每个客户端对应的Session对象。
*/
private static Map<String,Session> socketMap = new ConcurrentHashMap<>();


public MyWebsocket() {
}


/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
String key = getKey(session);
if(key != null){
// 存放 session
socketMap.put(key, session);
sendMessageToUser(key, "后台建立成功!key:"+ key);
log.info("连接成功");
}else{
log.error("socket链接session保存失败!当前key为空");
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 根据session获取key,自定义,用于标示每个session
* @param session
* @return
*/
public String getKey(Session session){
String key = null;
try {
String identity = String.valueOf(session.getRequestParameterMap().get("identity").get(0));
key = identity
} catch (Exception e) {
log.error("根据session获取key失败:",e.getMessage());
e.printStackTrace();
}
return key;
}

/**
* 根据identity获取key
* @return
*/
public String getKey(String identity) {
return identity;
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
try {
String key = getKey(session);
if(key != null){
socketMap.remove(key);
}
log.info("有一连接关闭 ");
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
String key = getKey(session);
log.info("收到客户端:" + key + "的消息:" + message + " ,当前socket数量:" + socketMap.size());
}


/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
onClose(session);
log.error("websocket连接发生错误",e.getMessage());
}

/**
* 发送信息给指定用户
* @param key
* @param message
* @return
*/
public boolean sendMessageToUser(String key) {
if(key == null){
return false;
}
Session session = socketMap.get(key);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.info("发送消息失败----->{}",e.getMessage());
}
return true;
}
}

  由于socket的session是不支持序列化的,所以不能将session存在redis,在线上有多台服务器的情况下就无法共享session。

  所以这里采用redis的消息订阅功能,当有一台服务器监听到系统数据发生变更,需要向前端发送消息时,会向redis发送消息,然后每台服务器的websocket那边会收到redis的 消息,检查自己拥有的session是否有满足相关条件的,若满足条件则向前端发送消息。

redis配置

redis消息监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码@Configuration
public class RedisPublishConfig {
/**
* redis消息监听器容器
* @param connectionFactory
* @param listenerAdapter
* @return
*/
@Bean
RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter) {
RedisMessageListenerContainer container = new RedisMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.addMessageListener(listenerAdapter, new PatternTopic("testChannel"));
return container;
}

/**
* 消息监听器适配器
* @param redisMsg
* @return
*/
@Bean
MessageListenerAdapter listenerAdapter(RedisMsg redisMsg) {
return new MessageListenerAdapter(redisMsg, "receiveMessage");
}
}

redis消息接收接口

1
2
3
4
5
6
7
8
9
java复制代码
@Component
public interface RedisMsg {
/**
* 接收信息
* @param message
*/
public void receiveMessage(String message);
}

在MyWebsocket中继承RedisMsg接口

1
java复制代码public class MyWebsocket implements RedisMsg

在MyWebsocket中实习RedisMsg接口的receiveMessage函数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
java复制代码
/**
* 广播接收信息
* @param message
*/
@Override
public void receiveMessage(String message) {
JSONObject jsonObject = JSONObject.parseObject(message);
String identity = jsonObject.getString("identity");
String news = jsonObject.getString("message");
if(StringUtils.isNotBlank(identity)){
String key = getKey(identity);
sendMessageToUser(key, news);
}else {
socketMap.forEach((k, v) -> {
sendMessageToUser(k, news);
});
}
}

MyWebsocket类的完整代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
java复制代码
@ServerEndpoint(value = "/websocket")
@Component
@Slf4j
public class MyWebsocket implements RedisMsg {
private static CopyOnWriteArraySet<Session> sessionSet=new CopyOnWriteArraySet<>();
// 这里使用静态,让 service 属于类
private static RedisTemplate redisTemplate;

// 注入的时候,给类的 service 注入
@Autowired
public void setRedisTemplate(RedisTemplate redisTemplate) {
MyWebsocket.redisTemplate = redisTemplate;
}


/**
* concurrent包的线程安全Set,用来存放每个客户端对应的Session对象。
*/
private static Map<String,Session> socketMap = new ConcurrentHashMap<>();


public MyWebsocket() {
}


/**
* 连接建立成功调用的方法
*/
@OnOpen
public void onOpen(Session session) {
try {
String key = getKey(session);
if(key != null){
// 存放 session
socketMap.put(key, session);
sendMessageToUser(key, "后台建立成功!key:"+ key + " ,当前socket数量:" + socketMap.size());
log.info("连接成功");
}else{
log.error("socket链接session保存失败!当前key为空");
}
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 根据session获取key
* @param session
* @return
*/
public String getKey(Session session){
String key = null;
try {
String token = String.valueOf(session.getRequestParameterMap().get("token").get(0));
String pageType = String.valueOf(session.getRequestParameterMap().get("pageType").get(0));
Map<String, String> loginContext = SsoStoreManageUtil.getInstance().get(token);
if(loginContext != null){
String userId = loginContext.get("userId");
key = pageType + '_' + userId;
}
} catch (Exception e) {
log.error("根据session获取key失败:",e.getMessage());
e.printStackTrace();
}
return key;
}

/**
* 根据页面类型和用户id获取key
* @param pageType
* @param userId
* @return
*/
public String getKey(String pageType,String userId) {
return pageType + '_' + userId;
}

/**
* 连接关闭调用的方法
*/
@OnClose
public void onClose(Session session) {
try {
String key = getKey(session);
if(key != null){
socketMap.remove(key);
}
log.info("有一连接关闭 ,当前socket数量:" + socketMap.size());
} catch (Exception e) {
e.printStackTrace();
}
}

/**
* 收到客户端消息后调用的方法
*
* @param message 客户端发送过来的消息
*/
@OnMessage
public void onMessage(String message, Session session) {
String key = getKey(session);
log.info("收到客户端:" + key + "的消息:" + message + " ,当前socket数量:" + socketMap.size());
}


/**
* 发生错误时调用
*/
@OnError
public void onError(Session session, Throwable error) {
log.error("发生错误",error.getMessage());
onClose(session);
error.printStackTrace();
}

/**
* 发送信息给指定用户
* @param key
* @param message
* @return
*/
public boolean sendMessageToUser(String key, String message) {
if(key == null){
return false;
}
Session session = socketMap.get(key);
if(session==null) {
return false;
}
if (!session.isOpen()) {
return false;
}
try {
session.getBasicRemote().sendText(message);
} catch (IOException e) {
log.info("发送消息失败----->{}",e.getMessage());
}
return true;
}

/**
* 广播接收信息
* @param message
*/
@Override
public void receiveMessage(String message) {
JSONObject jsonObject = JSONObject.parseObject(message);
String identity = jsonObject.getString("identity");
String news = jsonObject.getString("message");
if(StringUtils.isNotBlank(identity)){
String key = getKey(identity);
sendMessageToUser(key, news);
}else {
socketMap.forEach((k, v) -> {
sendMessageToUser(k, news);
});
}
}
}

业务场景下的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
java复制代码public class Test{

@Autowired
RedisTemplate redisTemplate;


public void webSocketTest() {
JSONObject jsonObject = new JSONObject();
jsonObject.put("identity","");
jsonObject.put("message","");
// 业务逻辑
//广播消息到各个订阅者 testChannel
redisTemplate.convertAndSend("testChannel", jsonObject.toJSONString());

}
}

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

TP6+Swoole4 配置详解

发表于 2021-03-09

配置详解

按照上一篇文章中得步骤安装后会在config目录下增加config\swoole.php配置文档

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
php复制代码use app\webscoket\Manager;
use Swoole\Table;
use think\swoole\websocket\socketio\Parser;

return [
'server' => [
// 默认配置为127.0.0.1 如果不需要用IP+端口访问得话可以不用改
'host' => env('SWOOLE_HOST', '0.0.0.0'), // 监听地址
'port' => env('SWOOLE_PORT', 29999), // 监听端口
'mode' => SWOOLE_PROCESS, // 运行模式 默认为SWOOLE_PROCESS
'sock_type' => SWOOLE_SOCK_TCP, // sock type 默认为SWOOLE_SOCK_TCP
'options' => [
// swoole进程得pid默认配置是在\runtime\swoole.pid
'pid_file' => root_path() . 'swoole.pid',
// swoole运行得日志目录
'log_file' => runtime_path() . 'swoole.log',
// 这个配置会影响swoole启动命令后是否进程守护,关闭命令行后还能继续运行
'daemonize' => true,//是否守护进程
// Normally this value should be 1~4 times larger according to your cpu cores.
'reactor_num' => swoole_cpu_num(),
'worker_num' => swoole_cpu_num(),
'task_worker_num' => swoole_cpu_num(),
'task_enable_coroutine' => true,
'task_max_request' => 2000,//设置 task 进程的最大任务数
'enable_static_handler' => true,
'document_root' => root_path('public'),
'package_max_length' => 20 * 1024 * 1024,
'buffer_output_size' => 10 * 1024 * 1024,
'socket_buffer_size' => 128 * 1024 * 1024,
],
],
//websocket配置区域
'websocket' => [
//是否开启websocket
'enable' => true,
//处理事件类名,这是是根据项目自行写得类,下面也会列出类中得方法和处理机制
'handler' => Manager::class,
//解析类可直接使用TP6内置得类就可以了
'parser' => Parser::class,
'ping_interval' => 25000,//ping频率
'ping_timeout' => 60000,//没有ping后退出毫秒数
//下面是一些房间得配置这里会自动创建一个高性能内存数据库
'room' => [
//房间类型 可切换为redis
'type' => 'table',
'table' => [
'room_rows' => 4096,
'room_size' => 2048,
'client_rows' => 8192,
'client_size' => 2048,
],
'redis' => [
'host' => '127.0.0.1',
'port' => 6379,
'max_active' => 3,
'max_wait_time' => 5,
],
],
//socket监听得事件也可以在这里配置,也可以在app\event.php内配置
'listen' => [],
'subscribe' => [],
],
//远程过程调用,它是一种通过网络从远程计算机程序上请求服务,而不需要了解底层网络技术的思想
//做微服务使用项目中没有使用就不过多说
'rpc' => [
'server' => [
'enable' => false,
'port' => 9000,
'services' => [ ],
],
'client' => [ ],
],
//热更新配置
'hot_update' => [
//是否开启热更新
'enable' => env('APP_DEBUG', false),
//监听文件得类型 例如:*.html / *.js 都是可以得,但这个配置已经够用了不需要再调整
'name' => ['*.php'],
//监听的目录 目前监听得目录有:app\ crmeb\
'include' => [app_path(), root_path('crmeb')],
//排除的目录
'exclude' => [],
], //连接池
'pool' => [
//数据库连接池默认是开启的,在使用Db或者Model中不需要配置什么就自带连接池
'db' => [
'enable' => true,
'max_active' => 3,
'max_wait_time' => 5,
],
//缓存连接池 使用cache方式和之前一模一样没有任何的区别
'cache' => [
'enable' => true,
'max_active' => 3,
'max_wait_time' => 5,
],
//自定义连接池
],
//内存数据库 字段可自行创建 数据库会在swoole启动后自行创建
'tables' => [
//高性能内存数据库
'user' => [
'size' => 2048,
'columns' => [
['name' => 'fd', 'type' => Table::TYPE_INT],
['name' => 'type', 'type' => Table::TYPE_INT],
['name' => 'uid', 'type' => Table::TYPE_INT],
['name' => 'to_uid', 'type' => Table::TYPE_INT],
['name' => 'tourist', 'type' => Table::TYPE_INT]
]
]
], //还有其他配置不做解释,个人用的较少,有需求查阅swoole4的开发文档
];

重要配置讲解

里面有诸多得配置下面会把需要注意得几个地方详细讲解

端口和监听地址

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
dart复制代码return [
'server' => [
// 默认配置为127.0.0.1 如果不需要用IP+端口访问得话可以不用改
'host' => env('SWOOLE_HOST', '0.0.0.0'), // 监听地址
'port' => env('SWOOLE_PORT', 29999), // 监听端口
'mode' => SWOOLE_PROCESS, // 运行模式 默认为SWOOLE_PROCESS
'sock_type' => SWOOLE_SOCK_TCP, // sock type 默认为SWOOLE_SOCK_TCP
'options' => [
// swoole进程得pid默认配置是在\runtime\swoole.pid
'pid_file' => root_path() . 'swoole.pid',
// swoole运行得日志目录
'log_file' => runtime_path() . 'swoole.log',
// 这个配置会影响swoole启动命令后是否进程守护,关闭命令行后还能继续运行
'daemonize' => true,//是否守护进程
],
],
];

server.host默认配置为127.0.0.1,需要外网访问调试的使用这里要监听0.0.0.0

可以看到需要我们使用服务器ip+端口号进行访问,注意这样访问需要开启端口

热更新

1
2
3
4
5
6
7
8
9
10
11
12
dart复制代码return [
//热更新配置 'hot_update' => [
//是否开启热更新
'enable' => env('APP_DEBUG', false),
//监听文件得类型 例如:*.html / *.js 都是可以得,但这个配置已经够用了不需要再调整
'name' => ['*.php'],
//监听的目录 目前监听得目录有:app\ crmeb\
'include' => [app_path(), root_path('crmeb')],
//排除的目录
'exclude' => [],
],
]

主要使用再开发阶段时使用,不用频繁的手动执行重启命令,建议再生产模式下关闭debug运行

内存数据库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
dart复制代码return [
//内存数据库 字段可自行创建 数据库会在swoole启动后自行创建 'tables' => [
//高性能内存数据库
'user' => [
'size' => 2048,
'columns' => [
['name' => 'fd', 'type' => Table::TYPE_INT],
['name' => 'type', 'type' => Table::TYPE_INT],
['name' => 'uid', 'type' => Table::TYPE_INT],
['name' => 'to_uid', 'type' => Table::TYPE_INT],
['name' => 'tourist', 'type' => Table::TYPE_INT]
]
]
],
];

先来看下官方的讲解:

由于 PHP 语言不支持多线程,因此 Swoole 使用多进程模式,在多进程模式下存在进程内存隔离,在工作进程内修改 global 全局变量和超全局变量时,在其他进程是无效的。

优势:

    • 性能强悍,单线程每秒可读写 200 万次;
      • 应用代码无需加锁,Table 内置行锁自旋锁,所有操作均是多线程 / 多进程安全。用户层完全不需要考虑数据同步问题;
      • 支持多进程,Table 可以用于多进程之间共享数据;
      • 使用行锁,而不是全局锁,仅当 2 个进程在同一 CPU 时间,并发读取同一条数据才会进行发生抢锁。

单看第一条就觉得牛*

配置可参考上述配置

使用:

1
2
3
4
rust复制代码use use think\swoole\Table;
use Swoole\Table as SwooleTable;
/** @var SwooleTable $table */
$table = app()->make(Table::class)->get('user');

返回的$table就可以使用swoole\Table的方法了,详细使用文档可参考:wiki.swoole.com/#/memory/ta…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

如何优雅的实现 Spring Boot 接口参数加密解密?

发表于 2021-03-09

因为有小伙伴刚好问到这个问题,松哥就抽空撸一篇文章和大家聊聊这个话题。

加密解密本身并不是难事,问题是在何时去处理?定义一个过滤器,将请求和响应分别拦截下来进行处理也是一个办法,这种方式虽然粗暴,但是灵活,因为可以拿到一手的请求参数和响应数据。不过 SpringMVC 中给我们提供了 ResponseBodyAdvice 和 RequestBodyAdvice,利用这两个工具可以对请求和响应进行预处理,非常方便。

所以今天这篇文章有两个目的:

  • 分享参数/响应加解密的思路。
  • 分享 ResponseBodyAdvice 和 RequestBodyAdvice 的用法。

好了,那么接下来就不废话了,我们一起来看下。

1.开发加解密 starter

为了让我们开发的这个工具更加通用,也为了复习一下自定义 Spring Boot Starter,这里我们就将这个工具做成一个 stater,以后在 Spring Boot 项目中直接引用就可以。

首先我们创建一个 Spring Boot 项目,引入 spring-boot-starter-web 依赖:

1
2
3
4
5
6
java复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
<scope>provided</scope>
<version>2.4.3</version>
</dependency>

因为我们这个工具是为 Web 项目开发的,以后必然使用在 Web 环境中,所以这里添加依赖时 scope 设置为 provided。

依赖添加完成后,我们先来定义一个加密工具类备用,加密这块有多种方案可以选择,对称加密、非对称加密,其中对称加密又可以使用 AES、DES、3DES 等不同算法,这里我们使用 Java 自带的 Cipher 来实现对称加密,使用 AES 算法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
java复制代码public class AESUtils {

private static final String AES_ALGORITHM = "AES/ECB/PKCS5Padding";

// 获取 cipher
private static Cipher getCipher(byte[] key, int model) throws Exception {
SecretKeySpec secretKeySpec = new SecretKeySpec(key, "AES");
Cipher cipher = Cipher.getInstance(AES_ALGORITHM);
cipher.init(model, secretKeySpec);
return cipher;
}

// AES加密
public static String encrypt(byte[] data, byte[] key) throws Exception {
Cipher cipher = getCipher(key, Cipher.ENCRYPT_MODE);
return Base64.getEncoder().encodeToString(cipher.doFinal(data));
}

// AES解密
public static byte[] decrypt(byte[] data, byte[] key) throws Exception {
Cipher cipher = getCipher(key, Cipher.DECRYPT_MODE);
return cipher.doFinal(Base64.getDecoder().decode(data));
}
}

这个工具类比较简单,不需要多解释。需要说明的是,加密后的数据可能不具备可读性,因此我们一般需要对加密后的数据再使用 Base64 算法进行编码,获取可读字符串。换言之,上面的 AES 加密方法的返回值是一个 Base64 编码之后的字符串,AES 解密方法的参数也是一个 Base64 编码之后的字符串,先对该字符串进行解码,然后再解密。

接下来我们封装一个响应工具类备用,这个大家如果经常看松哥视频已经很了解了:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
java复制代码public class RespBean {
private Integer status;
private String msg;
private Object obj;

public static RespBean build() {
return new RespBean();
}

public static RespBean ok(String msg) {
return new RespBean(200, msg, null);
}

public static RespBean ok(String msg, Object obj) {
return new RespBean(200, msg, obj);
}

public static RespBean error(String msg) {
return new RespBean(500, msg, null);
}

public static RespBean error(String msg, Object obj) {
return new RespBean(500, msg, obj);
}

private RespBean() {
}

private RespBean(Integer status, String msg, Object obj) {
this.status = status;
this.msg = msg;
this.obj = obj;
}

public Integer getStatus() {
return status;
}

public RespBean setStatus(Integer status) {
this.status = status;
return this;
}

public String getMsg() {
return msg;
}

public RespBean setMsg(String msg) {
this.msg = msg;
return this;
}

public Object getObj() {
return obj;
}

public RespBean setObj(Object obj) {
this.obj = obj;
return this;
}
}

接下来我们定义两个注解 @Decrypt 和 @Encrypt:

1
2
3
4
5
6
7
8
java复制代码@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD,ElementType.PARAMETER})
public @interface Decrypt {
}
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
public @interface Encrypt {
}

这两个注解就是两个标记,在以后使用的过程中,哪个接口方法添加了 @Encrypt 注解就对哪个接口的数据加密返回,哪个接口/参数添加了 @Decrypt 注解就对哪个接口/参数进行解密。这个定义也比较简单,没啥好说的,需要注意的是 @Decrypt 比 @Encrypt 多了一个使用场景就是 @Decrypt 可以用在参数上。

考虑到用户可能会自己配置加密的 key,因此我们再来定义一个 EncryptProperties 类来读取用户配置的 key:

1
2
3
4
5
6
7
8
9
10
11
12
13
java复制代码@ConfigurationProperties(prefix = "spring.encrypt")
public class EncryptProperties {
private final static String DEFAULT_KEY = "www.itboyhub.com";
private String key = DEFAULT_KEY;

public String getKey() {
return key;
}

public void setKey(String key) {
this.key = key;
}
}

这里我设置了默认的 key 是 www.itboyhub.com,key 是 16 位字符串,松哥这个网站地址刚好满足。以后如果用户想自己配置 key,只需要在 application.properties 中配置 spring.encrypt.key=xxx 即可。

所有准备工作做完了,接下来就该正式加解密了。

因为松哥这篇文章一个很重要的目的是想和大家分享 ResponseBodyAdvice 和 RequestBodyAdvice 的用法,RequestBodyAdvice 在做解密的时候倒是没啥问题,而 ResponseBodyAdvice 在做加密的时候则会有一些局限,不过影响不大,还是我前面说的,如果想非常灵活的掌控一切,那还是自定义过滤器吧。这里我就先用这两个工具来实现了。

另外还有一点需要注意,ResponseBodyAdvice 在你使用了 @ResponseBody 注解的时候才会生效,RequestBodyAdvice 在你使用了 @RequestBody 注解的时候才会生效,换言之,前后端都是 JSON 交互的时候,这两个才有用。不过一般来说接口加解密的场景也都是前后端分离的时候才可能有的事。

先来看接口加密:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
java复制代码@EnableConfigurationProperties(EncryptProperties.class)
@ControllerAdvice
public class EncryptResponse implements ResponseBodyAdvice<RespBean> {
private ObjectMapper om = new ObjectMapper();
@Autowired
EncryptProperties encryptProperties;
@Override
public boolean supports(MethodParameter returnType, Class<? extends HttpMessageConverter<?>> converterType) {
return returnType.hasMethodAnnotation(Encrypt.class);
}

@Override
public RespBean beforeBodyWrite(RespBean body, MethodParameter returnType, MediaType selectedContentType, Class<? extends HttpMessageConverter<?>> selectedConverterType, ServerHttpRequest request, ServerHttpResponse response) {
byte[] keyBytes = encryptProperties.getKey().getBytes();
try {
if (body.getMsg()!=null) {
body.setMsg(AESUtils.encrypt(body.getMsg().getBytes(),keyBytes));
}
if (body.getObj() != null) {
body.setObj(AESUtils.encrypt(om.writeValueAsBytes(body.getObj()), keyBytes));
}
} catch (Exception e) {
e.printStackTrace();
}
return body;
}
}

我们自定义 EncryptResponse 类实现 ResponseBodyAdvice 接口,泛型表示接口的返回类型,这里一共要实现两个方法:

  1. supports:这个方法用来判断什么样的接口需要加密,参数 returnType 表示返回类型,我们这里的判断逻辑就是方法是否含有 @Encrypt 注解,如果有,表示该接口需要加密处理,如果没有,表示该接口不需要加密处理。
  2. beforeBodyWrite:这个方法会在数据响应之前执行,也就是我们先对响应数据进行二次处理,处理完成后,才会转成 json 返回。我们这里的处理方式很简单,RespBean 中的 status 是状态码就不用加密了,另外两个字段重新加密后重新设置值即可。
  3. 另外需要注意,自定义的 ResponseBodyAdvice 需要用 @ControllerAdvice 注解来标记。

再来看接口解密:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码@EnableConfigurationProperties(EncryptProperties.class)
@ControllerAdvice
public class DecryptRequest extends RequestBodyAdviceAdapter {
@Autowired
EncryptProperties encryptProperties;
@Override
public boolean supports(MethodParameter methodParameter, Type targetType, Class<? extends HttpMessageConverter<?>> converterType) {
return methodParameter.hasMethodAnnotation(Decrypt.class) || methodParameter.hasParameterAnnotation(Decrypt.class);
}

@Override
public HttpInputMessage beforeBodyRead(final HttpInputMessage inputMessage, MethodParameter parameter, Type targetType, Class<? extends HttpMessageConverter<?>> converterType) throws IOException {
byte[] body = new byte[inputMessage.getBody().available()];
inputMessage.getBody().read(body);
try {
byte[] decrypt = AESUtils.decrypt(body, encryptProperties.getKey().getBytes());
final ByteArrayInputStream bais = new ByteArrayInputStream(decrypt);
return new HttpInputMessage() {
@Override
public InputStream getBody() throws IOException {
return bais;
}

@Override
public HttpHeaders getHeaders() {
return inputMessage.getHeaders();
}
};
} catch (Exception e) {
e.printStackTrace();
}
return super.beforeBodyRead(inputMessage, parameter, targetType, converterType);
}
}
  1. 首先大家注意,DecryptRequest 类我们没有直接实现 RequestBodyAdvice 接口,而是继承自 RequestBodyAdviceAdapter 类,该类是 RequestBodyAdvice 接口的子类,并且实现了接口中的一些方法,这样当我们继承自 RequestBodyAdviceAdapter 时,就只需要根据自己实际需求实现某几个方法即可。
  2. supports:该方法用来判断哪些接口需要处理接口解密,我们这里的判断逻辑是方法上或者参数上含有 @Decrypt 注解的接口,处理解密问题。
  3. beforeBodyRead:这个方法会在参数转换成具体的对象之前执行,我们先从流中加载到数据,然后对数据进行解密,解密完成后再重新构造 HttpInputMessage 对象返回。

接下来,我们再来定义一个自动化配置类,如下:

1
2
3
4
5
java复制代码@Configuration
@ComponentScan("org.javaboy.encrypt.starter")
public class EncryptAutoConfiguration {

}

这个也没啥好说的,比较简单。

最后,resources 目录下定义 META-INF,然后再定义 spring.factories 文件,内容如下:

1
xml复制代码org.springframework.boot.autoconfigure.EnableAutoConfiguration=org.javaboy.encrypt.starter.autoconfig.EncryptAutoConfiguration

这样当项目启动时,就会自动加载该配置类。

至此,我们的 starter 就开发完成啦。

2.打包发布

我们可以将项目安装到本地仓库,也可以发布到线上供他人使用。

2.1 安装到本地仓库

安装到本地仓库比较简单,直接 mvn install,或者在 IDEA 中,点击右边的 Maven,然后双击 install,如下:

2.2 发布到线上

发不到线上我们可以使用 JitPack 来做。

首先我们在 GitHub 上创建一个仓库,将我们的代码上传上去,这个过程应该不用我多说吧。

上传成功后,点击右边的 Create a new release 按钮,发布一个正式版,如下:

发布成功后,打开 jitpack,输入仓库的完整路径,点击 lookup 按钮,查找到之后,再点击 Get it 按钮完成构建,如下:

构建成功后,JitPack 上会给出项目引用方式:

注意引用时将 tag 改成你具体的版本号。

至此,我们的工具就已经成功发布了!小伙伴们可以通过如下方式引用这个 starter:

1
2
3
4
5
6
7
8
9
10
11
12
13
xml复制代码<dependencies>
<dependency>
<groupId>com.github.lenve</groupId>
<artifactId>encrypt-spring-boot-starter</artifactId>
<version>0.0.3</version>
</dependency>
</dependencies>
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

3.应用

我们创建一个普通的 Spring Boot 项目,引入 web 依赖,再引入我们刚刚的 starter 依赖,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.github.lenve</groupId>
<artifactId>encrypt-spring-boot-starter</artifactId>
<version>0.0.3</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<repositories>
<repository>
<id>jitpack.io</id>
<url>https://jitpack.io</url>
</repository>
</repositories>

然后再创建一个实体类备用:

1
2
3
4
5
java复制代码public class User {
private Long id;
private String username;
//省略 getter/setter
}

创建两个测试接口:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码@RestController
public class HelloController {
@GetMapping("/user")
@Encrypt
public RespBean getUser() {
User user = new User();
user.setId((long) 99);
user.setUsername("javaboy");
return RespBean.ok("ok", user);
}

@PostMapping("/user")
public RespBean addUser(@RequestBody @Decrypt User user) {
System.out.println("user = " + user);
return RespBean.ok("ok", user);
}
}

第一个接口使用了 @Encrypt 注解,所以会对该接口的数据进行加密(如果不使用该注解就不加密),第二个接口使用了 @Decrypt 所以会对上传的参数进行解密,注意 @Decrypt 注解既可以放在方法上也可以放在参数上。

接下来启动项目进行测试。

首先测试 get 请求接口:

可以看到,返回的数据已经加密。

再来测试 post 请求:

可以看到,参数中的加密数据已经被还原了。

如果用户想要修改加密密钥,可以在 application.properties 中添加如下配置:

1
properties复制代码spring.encrypt.key=1234567890123456

加密数据到了前端,前端也有一些 js 工具来处理加密数据,这个松哥后面有空再和大家说说 js 的加解密。

4.小结

好啦,今天这篇文章主要是想和大家聊聊 ResponseBodyAdvice 和 RequestBodyAdvice 的用法,一些加密思路,当然 ResponseBodyAdvice 和 RequestBodyAdvice 还有很多其他的使用场景,小伙伴们可以自行探索~本文使用了对称加密中的 AES 算法,大家也可以尝试改成非对称加密。

好啦,今天就聊这么多,小伙伴们可以去试试啦~公号后台回复 20210309 可以下载本文案例~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【PyCharm中文教程 11】开启护眼模式

发表于 2021-03-09

眼睛对着电子产品,容易干涩,对于程序员这种长期以电脑为伍的人群,应该更加注重眼睛的保护。

有些小白领为了保护自己的眼睛,他们通常会将一些办公软件(比如 word、excel,还有资源管理器)的背景色都设置为 护眼色,跟我们所说的原谅色差不多 hhh。

那在 PyCharm 中有没有办法也这样子设置呢?

当然有啦

我用一张图,就能给你说清楚,设置方法如下:

设置了护眼色,会牺牲 PyCharm 的顔值,不再是那个酷炫的极客风了,这就需要你从中取一个取舍。


文章最后给大家介绍三个我自己写的在线文档:

第一个文档:PyCharm 中文指南 1.0 文档

花了两个多月的时间,整理了 100 个 PyCharm 的使用技巧,为了让新手能够直接上手,我花了很多的时间录制了上百张 GIF 动图,有兴趣的前往在线文档阅读。

第二个文档:PyCharm 黑魔法指南 1.0 文档

系统收录各种 Python 冷门知识,Python Shell 的多样玩法,令人疯狂的 Python 炫技操作,Python 的超详细进阶知识解读,非常实用的 Python 开发技巧等。

第三个文档:Python 中文指南 1.0 文档

花了三个月时间写的一本 适合零基础入门 Python 的全中文教程,搭配大量的代码案例,让初学者对 代码的运作效果有一个直观感受,教程既有深度又有广度,每篇文章都会标内容的难度,是基础还是进阶的,可供读者进行选择,是一本难得的 Python 中文电子教程。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

java 递归详解

发表于 2021-03-09

刚学java的时候通常难以理解递归程序设计的概念。递归思想之所以困难,原因在于它非常像是循环推理(circular reasoning)。它也不是一个直观的过程;当我们指挥别人做事的时候,我们极少会递归地指挥他们。

对刚开始接触计算机编程的人而言,这里有递归的一个简单定义:当函数直接或者间接调用自己时,则发生了递归。

递归是一种常见的解决问题的方法,寄把问题逐渐简单化。递归的基本思想就是

“自己调用自己”,一个使用递归技术的方法会直接或间接的调用自己

递归构造包括两个部分:

定义递归头。什么时候不调用自身方法,如果没有头,将陷入死循环

递归体。什么时候需要调用自身方法

其实递归算法很简单,简单点就是自己调用自己的方法,有条件判断什么时候停止!

递归的经典示例

计算阶乘是递归程序设计的一个经典示例。计算某个数的阶乘就是用那个数去乘包括 1 在内的所有比它小的数。例如,factorial(5) 等价于5*4*3*2*1,而 factorial(3) 等价于 3*2*1。

阶乘的一个有趣特性是,某个数的阶乘等于起始数(starting number)乘以比它小一的数的阶乘。例如,factorial(5) 与 5 * factorial(4) 相同。您很可能会像这样编写阶乘函数:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
html复制代码package cn.itcast.heima2;

/**
* 测试方法
* @author lyy
*/
public class TraditionalThread {

public static long factorial(int n){
if(n == 1){//递归头
return 1;
}else{//递归体
return n*factorial(n-1);
}
}

public static void main(String[] args) {
long num = factorial(10);
System.out.println(num);
}
}

示例二:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
html复制代码package cn.itcast.heima2;

/**
* 测试方法
* @author lyy
*/
public class TraditionalThread {

private static int a = 0;

public static void test01(){
a++;
System.out.println("test——"+a);
if(a <= 10){//递归体
test01();
}else{//递归头
System.out.println("over!");
}
}

public static void test02(){
System.out.println("test02");
}



public static void main(String[] args) {
test01();
}
}

好了基本上递归算法就是以上的两个方法,不喜勿喷!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…708709710…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%