开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

老友(研发岗)被裁后,想加盟小吃店,我用Python采集了一

发表于 2021-11-21

「这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战」

朋友公司不景气,被裁员了,为了安慰他,我让擦哥爬取了一些小吃加盟项目。

阅读本篇博客你将收获

  • Python 技术的提升;
  • requests 库和 lxml 库的熟悉度增加;
  • 小吃加盟数据,没准用得上。

3158 网加盟小吃数据抓取

目标数据源分析

本次的目标数据源为 3158 加盟网,没写文章前,橡皮擦就没想到还真有专门宣传加盟的网站!

目标数据源地址如下:

1
txt复制代码https://www.3158.cn/xiangmu/canyin/

程序员被裁后,想加盟一个小吃店,我用Python采集了一点数据,多少给他一丝安慰

本篇博客将要涉及的知识点

  1. requests 抓取网页数据;
  2. xpath 熟悉度练习,lxml 格式化提取
  3. csv 文件存储,3 行代码版

数据来源分析

目标数据分页格式如下,简单的规则,并且总页数可直接在网页看到,省去了判断数据是否爬取完毕的逻辑。

1
2
3
4
5
txt复制代码https://www.3158.cn/xiangmu/canyin/?pt=all&page=1
https://www.3158.cn/xiangmu/canyin/?pt=all&page=2
https://www.3158.cn/xiangmu/canyin/?pt=all&page=3

https://www.3158.cn/xiangmu/canyin/?pt=all&page=n

静态网页直接使用 requests 抓取即可,本案例的重点依旧在 lxml 提取,为了防止爬取过程中断,可以先将网页数据批量存储到本地。

抓取代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
python复制代码import requests
from lxml import etree
import time
import re
import random

USER_AGENTS = [
"Mozilla/4.0 (compatible; MSIE 6.0; Windows NT 5.1; SV1; AcooBrowser; .NET CLR 1.1.4322; .NET CLR 2.0.50727)",
"其它 UserAgent 自己选择"
]


def run(url, index):
try:
headers = {"User-Agent": random.choice(USER_AGENTS)}
res = requests.get(url=url, headers=headers)
res.encoding = "utf-8"
html = res.text
with open(f"./html/{index}.html", "w+", encoding="utf-8") as f:
f.write(html)
except Exception as e:
print(e)


if __name__ == '__main__':

for i in range(1, 130):
print(f"正在爬取第{i}页数据")
run(f"https://www.3158.cn/xiangmu/canyin/?pt=all&page={i}", i)

print("全部爬取完毕")

运行上述代码前,需要在代码目录创建 html 文件夹,用于存储静态网页,最终该文件夹下形成下图内容,表示页面抓取完毕。

程序员被裁后,想加盟小吃店,我用Python采集了一点数据,多少有个帮助

数据提取时间

本次提取依旧使用 lxml 库,更多内容可在官方站点持续学习,地址为 https://lxml.de/tutorial.html 。

在上一步骤中,已经将网页保存到本地,后续围绕这 129 个HTML文件处理即可。

打开上图中任意 HTML 页面,出现如下效果,对数据对比之后发现,页面布局存在下图所示的差异。

数据提取完毕,才发现,其实可以忽略这个差异。

老友(程序员)被裁后,想加盟小吃店,我用Python采集了一点数据,多少是个心意
基于上图对数据规则进行设置,目标数据包含:

  • 加盟店名称
  • 投资金额
  • 所在地
  • 行业
  • 标签(如数据存在)
  • 详情页地址

为了后续爬取方面,将详情页地址也进行提取,以上内容就是目标数据最终的格式。

在数据源存在差异的情况下,直接检查 HTML 源代码,寻找页面出现差异性的原因。

li 标签存在差异化,对其进行特殊处理即可。

老友(程序员)被裁后,想加盟小吃店,我用Python采集了一点数据,多少是个心意
提取代码如下所示,相关说明已经添加在注释

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
python复制代码import requests
from lxml import etree
import time
import re
import random

# 列表转换为字符串
def list_str(my_list):
return ",".join(my_list)


def get_data():
for i in range(1, 130):
with open(f"./html/{i}.html", "r", encoding="utf-8") as f:
html = f.read()
element = etree.HTML(html)
# contains 函数 获取包含 xxx 的元素,类似的还有 starts-with,ends-with,last
origin_li = element.xpath("//ul[contains(@class,'xm-list')]/li")
# 循环抓取 li 内部数据
for item in origin_li:

# 提取加盟名称
# title = item.xpath(".//div[@class='r']/h4/text()")[0]
title = item.xpath("./div[@class='top']/a/@title")[0]
# 提取超链接
detail_link = "http://" + item.xpath("./div[@class='top']/a/@href")[0]

# 提取特殊标签
special_tag = list_str(item.xpath("./@class"))
# 当包含特殊标签 xm-list2 时,使用不同的提取规则

if special_tag != "xm-list2":
# 提取标签
tags = list_str(item.xpath(".//div[@class='bot']/span[@class='label']/text()"))
# 提取投资价格
price = list_str(item.xpath(".//div[@class='bot']/span[@class='money']/b/text()"))
# 地址和行业
city_industry = list_str(item.xpath("./div[@class='bot']/p/span/text()"))

long_str = f"{title},{detail_link}, {tags}, {price}, {city_industry}"
save(long_str)
else:
# 地址和行业
city_industry = list_str(item.xpath(
"./div[@class='top']/a/div/p[2]/span/text()"))
long_str = f"{title},{detail_link}, {0}, {0}, {city_industry}"
save(long_str)


def save(long_str):
try:
with open(f"./jiameng.csv", "a+", encoding="utf-8") as f:
f.write("\n"+long_str)
except Exception as e:
print(e)


if __name__ == '__main__':

# for i in range(1, 130):
# print(f"正在爬取第{i}页数据")
# run(f"https://www.3158.cn/xiangmu/canyin/?pt=all&page={i}", i)

get_data()

print("全部提取完毕")

上述代码首先通过 element.xpath("//ul[contains(@class,'xm-list')]/li") 提取 HTML 中的 li 标签,然后遍历提取的 li 数据,进行内部提取。

提取过程中,发现 title 与 detail_link,即标题与详情页的提取代码一致,其它数据通过判断 li 标签的 class 否含有 xm-list2 进行判断,整体代码如上所示。

在使用 lxml 的过程中,最常用的为路径表达式,即 // 从根目录提取, .// 从当前节点的根目录提取,./ 从当前节点提取。

代码中还用到了 contains 函数,在本案例中该函数用于判断属性值中是否包含某个字符串,例如上文代码中提取 ul 标签,正是此函数的应用,在提取所有 li的过程中,需要提前匹配 ul,该标签在 HTML 中属性如下所示,使用 contains 可以进行部分匹配。

老友(研发岗)被裁后,想加盟小吃店,我用Python采集了一点数据,多少是个心意
在代码后半部分,存在一个 XPath 匹配规则 city_industry = list_str(item.xpath("./div[@class='top']/a/div/p[2]/span/text()")) ,注意其中出现的 p[2] ,该代码表示选取第二个 p 标签。

评论时间

==来都来了,评论区不发了评论吗?==

今天是持续写作的第 184 / 200 天。
可以关注我,点赞我、评论我、收藏我啦。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ADG单实例系列搭建之(Data Guard Broker)

发表于 2021-11-21

「这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战」

环境准备

主机名 ip DB Version db_name db_unique_name
主库 orcl 192.168.56.120 11.2.0.4 orcl orcl
备库 orcl_stby 192.168.56.121 11.2.0.4 orcl orcl_stby

Notes:

1、db_unique_name主备库不能相同。

2、db_name主备库需保持一致。

3、主备库DB版本需保持一致。

提前搭建好ADG,请参考:

ADG单实例系列搭建之(RMAN备份恢复)

ADG单实例搭建系列之(Active Database Duplicate Using Image Copies)

ADG单实例搭建系列之 (DBCA)

一、Enable Broker

1
2
3
sql复制代码--both databases (primary and standby)

ALTER SYSTEM SET dg_broker_start=true;

二、Register server with the broker

1
2
3
4
5
6
7
8
9
10
11
12
bash复制代码dgmgrl sys/oracle@orcl

##pri是configuration名称,随意
##第一个orcl是db_unique_name
##第二个orcl是TNSNAME
CREATE CONFIGURATION pri AS PRIMARY DATABASE IS orcl CONNECT IDENTIFIER IS orcl;

##第一个orcl_stby是db_unique_name
##第二个orcl_stby是TNSNAME
ADD DATABASE orcl_stby AS CONNECT IDENTIFIER IS orcl_stby MAINTAINED AS PHYSICAL;

ENABLE CONFIGURATION;

三、Check the configuration and database

1
2
3
4
5
bash复制代码SHOW CONFIGURATION;

SHOW DATABASE orcl;

SHOW DATABASE orcl_stby;

四、Database Switchover

Configure Listener Both Primary and Standby

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
bash复制代码vi $TNS_AMDIN/listener.ora

##GLOBAL_DBNAME是db_unique_name+DGMGRL
##primary add

(SID_DESC =
(GLOBAL_DBNAME = orcl_DGMGRL)
(ORACLE_HOME = /u01/app/oracle/product/11.2.0/db)
(SID_NAME = orcl)
)

##standby add

(SID_DESC =
(GLOBAL_DBNAME = orcl_stby_DGMGRL)
(ORACLE_HOME = /u01/app/oracle/product/11.2.0/db)
(SID_NAME = orcl)
)

Notes:如果listener.ora不配置DGMGRL静态监听,SWITCHOVER将报错:ORA-12514。

1
bash复制代码SWITCHOVER TO orcl_stby;

主切备:

备切主:

五、Database Failover

主库提前开启FLASHBACK,Failover后还可以切回备库,防止主库废掉。

1
2
3
sql复制代码ALTER DATABASE FLASHBACK ON;

ALTER SYSTEM SET db_recovery_file_dest_size=5G;

1
2
bash复制代码##备库DGMGRL操作
FAILOVER TO orcl_stby IMMEDIATE;

原主库切换为备库:

1
2
bash复制代码##备库DGMGRL操作
REINSTATE DATABASE orcl;

六、Snapshot Standby

Snapshot standby database是ORACLE 11g的新特性。允许Physical standby短时间的使用read write模式。***必须是ADG才支持。***

**注意:**一旦snapshot standby被激活的时间超出了primary 的最大负载时间,再次的本地更新操作将会产生额外的异常。

1
2
3
bash复制代码##主库DGMGRL操作

CONVERT DATABASE orcl_stby TO SNAPSHOT STANDBY;

切回物理备库:

1
2
3
bash复制代码##主库DGMGRL执行

CONVERT DATABASE orcl_stby TO PHYSICAL STANDBY;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

吐血推荐-详解分布式锁(上)

发表于 2021-11-21

「这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战」

分布式锁的简介

锁 是一种用来解决多个执行线程 访问共享资源 错误或数据不一致问题的工具。

锁的本质:同一时间只允许一个用户操作共享数据。

为什么需要分布式锁

一般情况下,我们使用分布式锁主要有两个场景:

  1. 避免不同节点重复相同的工作:比如用户执行了某个操作需要输入验证码不同节点没有相互通信会发送多条验证码;
  2. 避免破坏数据的正确性:如果两个节点在同一条数据上同时进行操作,可能会造成数据错误或不一致的情况出现;

Java 中实现分布式锁的常见方式

  1. 基于 MySQL 中的锁:MySQL 本身有自带的悲观锁 for update 关键字,也可以自己实现悲观/乐观锁来达到目的;
  2. 基于 Zookeeper 有序节点:Zookeeper 允许临时创建有序的子节点,这样客户端获取节点列表时,就能够根据当前子节点列表中的序号来判断是否能够获得锁;
  3. 基于 Redis 的单线程:由于 Redis 是单线程,所以命令会以串行的方式执行,并且本身提供了像 SETNX(set if not exists) 这样的指令,并且还扩展了 SET 命令;

每个方案都有各自的优缺点,例如 MySQL 虽然直观理解容易,但是实现起来却需要额外考虑 锁超时、加事务 等问题,并且性能局限于数据库;这边我们以 Redis 为主进行分析。

分布式锁应该具备的特性

  1. 原子性
  2. 互斥性
  3. 独占性:自己的锁只能自己解开
  4. 可重入性
  5. 超时与续期

分布式锁特性场景解析

  1. 超时场景说明:

假如有两个服务 A 和 B,其中服务 A 在 获取锁之后 由于不可抗力因素宕机了(例如:机房停电),因为锁被服务 A 持有,就会导致 B 服务就永远无法获取到锁了,这样显然是不合理的,所以我们需要额外设置一个超时时间,来保证避免这种情况的发生。
2. 独占性场景的说明

延续上面的场景,我们在考虑这种场景,如果在加锁和释放锁之间的逻辑比较复杂,执行时间较长,以至于超出了锁的超时限制,也会出现问题。这时候线程 A 持有锁过期了,而临界区的逻辑还没有执行完,因为锁过期了,所以 Redis 会自动将这个锁对应 key 给删除掉;这个时候线程 B 就可以获得这个分布式锁,当线程 B 刚获取到自己的锁,原本超时的 A 执行到释放自己锁的代码,A 的锁其实已经过期了现在的 key 是 B 的锁,A 现在就会把 B 的锁给释放掉,其实 B 才刚刚获取到锁还没有执行自己的逻辑,所以锁应该有独占性,自己的锁应该只能自己解开。

实现方式:将锁的 value 值设置为一个随机串,释放锁时先匹配随机串是否一致,然后再删除 key。
3. 续期场景说明

为了避免线程没有处理完自己业务就过期的问题,加锁时,先设置一个过期时间,然后开启一个守护线程,定时去检测这个锁的失效时间,如果锁快要过期了,操作共享资源处理业务逻辑还没有完成,那么就自动对锁进行 续期操作,重新设置过期时间。 Redisson 使用的就是这种机制,它将其成为看门狗。
4. 原子性方面

匹配 value 和删除 key 在 Redis 中并不是一个原子性的操作,也没有类似保证原子性的指令,所以可能需要使用像 Lua 这样的脚本来处理了,因为 Lua 脚本可以 保证多个指令的原子性执行。
5. 互斥性方面

一个 key 被一个实例获取之后,其他的实例就不能再次获取了。
6. 可重入性方面

同一个线程方法 A 调用方法 B ,A B 都需要获得锁,A 获得锁之后,执行自己逻辑调用 B ,如果不是重入锁的话,就会发生死锁。

在 Java 编程中 synchronized 和 ReentrantLock 都是可重入锁

使用 Redis 实现分布式锁

Redis 相关命令解析

SETEX:

1
2
3
vbnet复制代码语法:SETEX KEY_NAME TIMEOUT VALUE
版本:redis 版本 >= 2.0.0
作用:setex 命令为指定的 key 设置值及其过期时间。如果 key 已经存在, SETEX 命令将会替换旧的值。

SETNX: (SET if Not eXists)

1
2
3
4
vbnet复制代码语法:SETNX KEY_NAME VALUE
版本:redis 版本 >= 2.0.0
作用:Setnx 命令在指定的 key 不存在时,为 key 设置指定的值。
返回值:设置成功,返回 1 ;设置失败,返回 0 。

我们注意到 setnx 是可以满足我们没有值时候设置成功,有值的时候设置失败的需求的,但是了解完上面章节中介绍的分布式锁应该考虑的问题这边应该需要设置一个超时时间,所以在 Redis 2.6.12 之后扩展了 set 命令:

SET:

1
2
3
4
5
6
7
8
9
10
11
12
scss复制代码# 一条命令保证原子性执行 在 setnx 的基础上 设置超时时间

set key value [EX seconds|PX milliseconds] [NX|XX]
- [EX seconds] 设置过期时间单位为 秒
- [PX milliseconds] 设置过期时间单位为 毫秒
- [NX] key 不存在时设置value, 成功返回OK,失败返回 (nil)
- [XX] key 存在时设置value, 成功返回OK,失败返回 (nil)

set key value EX 过期时间 NX

127.0.0.1:6379> SET lock 1 EX 10 NX
OK

自定义 Redis 分布式锁

上文中提到 SETNX 是原子性操作,但是没有办法同时完成 EXPIRE 操作,不能保证 SETNX 和 EXPIRE 的原子性。这个可以使用 SET 命令来实现并且保证原子性。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
java复制代码package com.aha.train.test.lock.distributed;


import lombok.extern.slf4j.Slf4j;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.data.redis.core.script.DefaultRedisScript;
import org.springframework.stereotype.Service;

import java.util.Arrays;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

/**
* 使用 redis 自定义分布式锁
*
* @author WT
* @date 2021/10/25
*/
@Slf4j
@Service
public class MyRedisLock {

// 释放锁 执行的 LUA 脚本
public static final String RELEASE_LUA_SCRIPT = "if redis.call('get', KEYS[1]) == KEYS[2] then return redis.call('del', KEYS[1]) else return 0 end";

//默认的 lock key 的过期时间 10s
private static final int DEFAULT_LOCK_EXPIRE_TIME_MILLIS = 100 * 1000;

//节点客户端
private RedisTemplate<Object,Object> redisTemplate;


public MyRedisLock(RedisTemplate<Object,Object> redisTemplate) {

this.redisTemplate = redisTemplate;

}

/**
* 尝试加锁
*
* @param acquireTimeout 尝试加锁等待时间
* @return 是否加锁成功
* @throws InterruptedException 线程中断异常
*/
public String acquire(Integer acquireTimeout, String lockKey) throws InterruptedException {

if (acquireTimeout <= 0) {
throw new IllegalArgumentException("加锁的超时时间必须大于0");
}

try {

// 获取锁的超时时间,超过这个时间则放弃获取锁
long end = System.currentTimeMillis() + acquireTimeout;
// 随机生成一个 value
String value = UUID.randomUUID().toString();

while (System.currentTimeMillis() < end) {

// 加锁并设置超时时间 redis 底层使用的应该是 SET 命令 setNX 是没有办法保证 set 和 expire 同时的原子性的
Boolean result = redisTemplate.opsForValue().setIfAbsent(lockKey, value , DEFAULT_LOCK_EXPIRE_TIME_MILLIS, TimeUnit.MILLISECONDS);
if (result != null && result) {
log.info("加锁成功:{}",value);
return value;
}

// 延时 100ms 继续尝试加锁,直到到达加锁的超时时间
Thread.sleep(100);
log.info("获取锁失败,再次尝试获取锁");
}

} catch (Exception e) {
log.error("acquire lock due to error", e);
}

return null;
}


/**
* 主动释放锁
* @param lockValue 主动释放锁的 value
* @return 是否成功释放锁
*/
public boolean release(String lockKey, String lockValue) {

// 这边得使用 long 类型 来接收 LUA 脚本执行的数据 因为 redis 中的 int 类型对应 java 中的 long 类型
DefaultRedisScript<Long> redisScript = new DefaultRedisScript<>(RELEASE_LUA_SCRIPT, Long.class);
/*
* 第一个参数是 执行的脚本内容,第二个参数是 lua 脚本中 key 的集合,第三个参数是 lua 脚本中 args 的集合
* result = jedis.eval(script, Collections.singletonList(lockKey),Collections.singletonList(identify)); 如果使用 jedis 客户端是这种形式的执行脚本
*/
Long result = redisTemplate.execute(redisScript, Arrays.asList(lockKey, lockValue));
return result != null && result > 0L;

}

}

测试自定义分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
java复制代码package com.aha.train.test.lock.distributed;

import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.*;


@RestController
@Slf4j
public class TestLock {

@Autowired
private MyRedisLock myRedisLock;

@GetMapping("/acquire")
public void acquire () throws InterruptedException {

String lockValue = myRedisLock.acquire(200, "TEST_LOCK");
Thread.sleep(1000);
boolean release = myRedisLock.release("TEST_LOCK", lockValue);
log.info("申请锁成功:{},解锁:{}",lockValue,release);

}

@GetMapping("/multiple/thread")
public void testMultipleThread () {

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
(100, 200, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
for (int i = 0; i < 100; i++) {

threadPoolExecutor.execute(() -> {

try {
String key = "aha_key";
String value = myRedisLock.acquire(20000, key);
if (value != null) {
Thread.sleep(10);
log.info("执行业务逻辑");
if (myRedisLock.release(key, value))
log.info("解锁成功");
}
} catch (InterruptedException e) {
e.printStackTrace();
}

});

}
}

}

Redisson 实现分布式锁

导入 Redisson 的依赖

1
2
3
4
5
6
xml复制代码<!-- Redisson -->
<dependency>
<groupId>org.redisson</groupId>
<artifactId>redisson-spring-boot-starter</artifactId>
<version>3.16.1</version>
</dependency>

编写配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
yaml复制代码server:
port: 8002

spring:
# redis 缓存
redis:
# 使用 redisson 配置
redisson:
# 新版本的 redisson 配置文件没有办法使用这种形式了 可以使用下面这种形式
# config: classpath:config/redisson-single.yaml
config: |
# 单机模式
singleServerConfig:
# 连接空闲超时,单位:毫秒
idleConnectionTimeout: 10000
# 连接超时,单位:毫秒
connectTimeout: 10000
# 命令等待超时,单位:毫秒
timeout: 3000
# 命令失败重试次数,如果尝试达到 retryAttempts(命令失败重试次数) 仍然不能将命令发送至某个指定的节点时,将抛出错误。
# 如果尝试在此限制之内发送成功,则开始启用 timeout(命令等待超时) 计时。
retryAttempts: 3
# 命令重试发送时间间隔,单位:毫秒
retryInterval: 1000
# 密码
password: Aha@3166
# 单个连接最大订阅数量
subscriptionsPerConnection: 5
# 客户端名称
clientName: null
# 节点地址
address: redis://10.8.18.115:30379
# 发布和订阅连接的最小空闲连接数
subscriptionConnectionMinimumIdleSize: 1
# 发布和订阅连接池大小
subscriptionConnectionPoolSize: 50
# 最小空闲连接数
connectionMinimumIdleSize: 32
# 连接池大小
connectionPoolSize: 64
# 数据库编号
database: 4
# DNS监测时间间隔,单位:毫秒
dnsMonitoringInterval: 5000
# 线程池数量,默认值: 当前处理核数量 * 2
# threads: 0
# Netty线程池数量,默认值: 当前处理核数量 * 2
# nettyThreads: 0
# 编码
codec: !<org.redisson.codec.JsonJacksonCodec> {}
# 传输模式
transportMode : "NIO"

测试分布式锁

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
Java复制代码package com.aha.distributedlock.redisson;

import lombok.extern.slf4j.Slf4j;
import org.redisson.api.RLock;
import org.redisson.api.RedissonClient;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;

/**
* 测试使用 redisson 创建分布式锁
*
* @author WT
* date 2021/11/6
*/
@Slf4j
@RestController
@RequestMapping("/redisson")
public class TestMyRedisLock {

private final RedissonClient redissonClient;

public TestMyRedisLock(RedissonClient redissonClient) {
this.redissonClient = redissonClient;
}

@GetMapping("/acquire")
public void acquire () {

// RLock 可重入锁
// 获取锁对象(可以为"可重入锁"、"公平锁",如果redis是集群模式,还可以使用"红锁")
// 公平锁
// RLock redissonLock = redissonClient.getFairLock("TEST_REDISSON_LOCK");
// 非公平锁
RLock redissonLock = redissonClient.getLock("TEST_REDISSON_LOCK");

try {
// 尝试加锁,最多等待 100 秒,上锁之后 10 秒之后会自动解锁
// 我们会发现这边在 redis 中设置的值是一个 hash 类型 key 为上面指定的 key
boolean res = redissonLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
log.info("申请锁成功");
log.info("执行对应的业务逻辑..");
redissonLock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
}


}

@GetMapping("/multiple/thread")
public void testMultipleThread () {

ThreadPoolExecutor threadPoolExecutor = new ThreadPoolExecutor
(100, 200, 10, TimeUnit.SECONDS, new ArrayBlockingQueue<>(100));
for (int i = 0; i < 100; i++) {

threadPoolExecutor.execute(() -> {

RLock ahaLock = redissonClient.getLock("aha_key");
try {
boolean res = ahaLock.tryLock(100, 10, TimeUnit.SECONDS);
if (res) {
Thread.sleep(10);
log.info("执行业务逻辑");
ahaLock.unlock();
}
} catch (InterruptedException e) {
e.printStackTrace();
}
});

}
}



}

思考:Redis 实现的分布式锁当主从切换的时候依旧安全吗

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SpringCloud升级之路20200x版-39 改

发表于 2021-11-21

这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战

本系列代码地址:github.com/JoJoTec/spr…

要想实现我们上一节中提到的:

  • 需要在重试以及断路中加一些日志,便于日后的优化
  • 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常
  • 需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能

我们需要将 resilience4j 本身提供的粘合库做一些改造,其实主要就是对 resilience4j 实现的 project reactor 的 Operator 进行改造。

关于断路器的改造

首先,WebClient 的返回对象只可能是 ClientResponse 类型,所以我们这里改造出来的 Operator 不必带上形参,只需要针对 ClientResponse 即可,即:

1
2
3
csharp复制代码public class ClientResponseCircuitBreakerOperator implements UnaryOperator<Publisher<ClientResponse>> {
...
}

在原有的断路器逻辑中,我们需要加入针对 GET 方法以及之前定义的可以重试的路径匹配配置可以重试的逻辑,这需要我们拿到原有请求的 URL 信息。但是 ClientResponse 中并没有暴露这些信息的接口,其默认实现 DefaultClientResponse(我们只要没有自己给 WebClient 加入特殊的改造逻辑,实现都是 DefaultClientResponse) 中的 request() 方法可以获取请求 HttpRequest,其中包含 url 信息。但是这个类还有方法都是 package-private 的,我们需要反射出来:

ClientResponseCircuitBreakerSubscriber

1
2
3
4
5
6
7
8
9
10
11
12
php复制代码private static final Class<?> aClass;
private static final Method request;

static {
try {
aClass = Class.forName("org.springframework.web.reactive.function.client.DefaultClientResponse");
request = ReflectionUtils.findMethod(aClass, "request");
request.setAccessible(true);
} catch (Exception e) {
throw new RuntimeException(e);
}
}

之后,在获取到 ClientResponse 之后记录断路器的逻辑中,需要加入上面提到的关于重试的改造,以及负载均衡器的记录:

ClientResponseCircuitBreakerSubscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
scss复制代码protected void hookOnNext(ClientResponse clientResponse) {
if (!isDisposed()) {
if (singleProducer && successSignaled.compareAndSet(false, true)) {
int rawStatusCode = clientResponse.rawStatusCode();
HttpStatus httpStatus = HttpStatus.resolve(rawStatusCode);
try {
HttpRequest httpRequest = (HttpRequest) request.invoke(clientResponse);
//判断方法是否为 GET,以及是否在可重试路径配置中,从而得出是否可以重试
if (httpRequest.getMethod() != HttpMethod.GET && !webClientProperties.retryablePathsMatch(httpRequest.getURI().getPath())) {
//如果不能重试,则直接返回结果
circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
} else {
if (httpStatus != null && httpStatus.is2xxSuccessful()) {
//如果成功,则直接返回结果
circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
} else {
/**
* 如果异常,参考 DefaultClientResponse 的代码进行异常封装
* @see org.springframework.web.reactive.function.client.DefaultClientResponse#createException
*/
Exception exception;
if (httpStatus != null) {
exception = WebClientResponseException.create(rawStatusCode, httpStatus.getReasonPhrase(), clientResponse.headers().asHttpHeaders(), EMPTY, null, null);
} else {
exception = new UnknownHttpStatusCodeException(rawStatusCode, clientResponse.headers().asHttpHeaders(), EMPTY, null, null);
}
circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), exception);
downstreamSubscriber.onError(exception);
return;
}
}
} catch (Exception e) {
log.fatal("judge request method in circuit breaker error! the resilience4j feature would not be enabled: {}", e.getMessage(), e);
circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);
}
}
eventWasEmitted.set(true);
downstreamSubscriber.onNext(clientResponse);
}
}

同样的,在原有的完成,取消还有失败的记录逻辑中,也加上记录负载均衡数据:

ClientResponseCircuitBreakerSubscriber

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
scss复制代码@Override
protected void hookOnComplete() {
if (successSignaled.compareAndSet(false, true)) {
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
}

downstreamSubscriber.onComplete();
}

@Override
public void hookOnCancel() {
if (!successSignaled.get()) {
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);
if (eventWasEmitted.get()) {
circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());
} else {
circuitBreaker.releasePermission();
}
}
}

@Override
protected void hookOnError(Throwable e) {
serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);
circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);
downstreamSubscriber.onError(e);
}

粘合 WebClient 与 resilience4j 的同时覆盖重试逻辑

由于前面的断路器中,我们针对可以重试的非 2XX 响应封装成为 WebClientResponseException。所以在重试器中,我们需要加上针对这个异常的重试。

同时,需要将重试器放在负载均衡器之前,因为每次重试,都要从负载均衡器中获取一个新的实例。同时,断路器需要放在负载均衡器之后,因为只有在这个之后,才能获取到本次调用的实例,我们的的断路器是针对实例方法级别的:

WebClientDefaultConfiguration.java

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
ini复制代码@Bean
public WebClient getWebClient(
ReactorLoadBalancerExchangeFilterFunction lbFunction,
WebClientConfigurationProperties webClientConfigurationProperties,
Environment environment,
RetryRegistry retryRegistry,
CircuitBreakerRegistry circuitBreakerRegistry,
ServiceInstanceMetrics serviceInstanceMetrics
) {
String name = environment.getProperty(WebClientNamedContextFactory.PROPERTY_NAME);
Map<String, WebClientConfigurationProperties.WebClientProperties> configs = webClientConfigurationProperties.getConfigs();
if (configs == null || configs.size() == 0) {
throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs");
}
WebClientConfigurationProperties.WebClientProperties webClientProperties = configs.get(name);
if (webClientProperties == null) {
throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs." + name);
}
String serviceName = webClientProperties.getServiceName();
//如果没填写微服务名称,就使用配置 key 作为微服务名称
if (StringUtils.isBlank(serviceName)) {
serviceName = name;
}
String baseUrl = webClientProperties.getBaseUrl();
//如果没填写 baseUrl,就使用微服务名称填充
if (StringUtils.isBlank(baseUrl)) {
baseUrl = "http://" + serviceName;
}

Retry retry = null;
try {
retry = retryRegistry.retry(serviceName, serviceName);
} catch (ConfigurationNotFoundException e) {
retry = retryRegistry.retry(serviceName);
}
//覆盖其中的异常判断
retry = Retry.of(serviceName, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {
//WebClientResponseException 会重试,因为在这里能 catch 的 WebClientResponseException 只对可以重试的请求封装了 WebClientResponseException
//参考 ClientResponseCircuitBreakerSubscriber 的代码
if (throwable instanceof WebClientResponseException) {
log.info("should retry on {}", throwable.toString());
return true;
}
//断路器异常重试,因为请求没有发出去
if (throwable instanceof CallNotPermittedException) {
log.info("should retry on {}", throwable.toString());
return true;
}
if (throwable instanceof WebClientRequestException) {
WebClientRequestException webClientRequestException = (WebClientRequestException) throwable;
HttpMethod method = webClientRequestException.getMethod();
URI uri = webClientRequestException.getUri();
//判断是否为响应超时,响应超时代表请求已经发出去了,对于非 GET 并且没有标注可以重试的请求则不能重试
boolean isResponseTimeout = false;
Throwable cause = throwable.getCause();
//netty 的读取超时一般是 ReadTimeoutException
if (cause instanceof ReadTimeoutException) {
log.info("Cause is a ReadTimeoutException which indicates it is a response time out");
isResponseTimeout = true;
} else {
//对于其他一些框架,使用了 java 底层 nio 的一般是 SocketTimeoutException,message 为 read time out
//还有一些其他异常,但是 message 都会有 read time out 字段,所以通过 message 判断
String message = throwable.getMessage();
if (StringUtils.isNotBlank(message) && StringUtils.containsIgnoreCase(message.replace(" ", ""), "readtimeout")) {
log.info("Throwable message contains readtimeout which indicates it is a response time out");
isResponseTimeout = true;
}
}
//如果请求是 GET 或者标注了重试,则直接判断可以重试
if (method == HttpMethod.GET || webClientProperties.retryablePathsMatch(uri.getPath())) {
log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
return true;
} else {
//否则,只针对请求还没有发出去的异常进行重试
if (isResponseTimeout) {
log.info("should not retry on {}-{}, {}", method, uri, throwable.toString());
} else {
log.info("should retry on {}-{}, {}", method, uri, throwable.toString());
return true;
}
}
}
return false;
}).build());


HttpClient httpClient = HttpClient
.create()
.option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) webClientProperties.getConnectTimeout().toMillis())
.doOnConnected(connection ->
connection
.addHandlerLast(new ReadTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
.addHandlerLast(new WriteTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds()))
);

Retry finalRetry = retry;
String finalServiceName = serviceName;
return WebClient.builder()
.exchangeStrategies(ExchangeStrategies.builder()
.codecs(configurer -> configurer
.defaultCodecs()
//最大 body 占用 16m 内存
.maxInMemorySize(16 * 1024 * 1024))
.build())
.clientConnector(new ReactorClientHttpConnector(httpClient))
//Retry在负载均衡前
.filter((clientRequest, exchangeFunction) -> {
return exchangeFunction
.exchange(clientRequest)
.transform(ClientResponseRetryOperator.of(finalRetry));
})
//负载均衡器,改写url
.filter(lbFunction)
//实例级别的断路器需要在负载均衡获取真正地址之后
.filter((clientRequest, exchangeFunction) -> {
ServiceInstance serviceInstance = getServiceInstance(clientRequest);
serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);
CircuitBreaker circuitBreaker;
//这时候的url是经过负载均衡器的,是实例的url
//需要注意的一点是,使用异步 client 的时候,最好不要带路径参数,否则这里的断路器效果不好
//断路器是每个实例每个路径一个断路器
String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort() + clientRequest.url().getPath();
try {
//使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);
} catch (ConfigurationNotFoundException e) {
circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);
}
log.info("webclient circuit breaker [{}-{}] status: {}, data: {}", finalServiceName, instancId, circuitBreaker.getState(), JSON.toJSONString(circuitBreaker.getMetrics()));
return exchangeFunction.exchange(clientRequest).transform(ClientResponseCircuitBreakerOperator.of(circuitBreaker, serviceInstance, serviceInstanceMetrics, webClientProperties));
}).baseUrl(baseUrl)
.build();
}

private ServiceInstance getServiceInstance(ClientRequest clientRequest) {
URI url = clientRequest.url();
DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();
defaultServiceInstance.setHost(url.getHost());
defaultServiceInstance.setPort(url.getPort());
return defaultServiceInstance;
}

这样,我们就实现了我们封装的基于配置的 WebClient

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

SQL注入问题

发表于 2021-11-21

这是我参与11月更文挑战的第6天,活动详情查看:2021最后一次更文挑战

前言

sql注入是一种通过在输入中注入sql语句,来达到攻击数据库的效果。今天使用Java语言,来分析一下sql注入的相关问题。

一、什么是SQL注入

SQL注入即是指web应用程序对用户输入数据的合法性没有判断或过滤不严,攻击者可以在web应用程序中事先定义好的查询语句的结尾上添加额外的SQL语句,在管理员不知情的情况下实现非法操作,以此来实现欺骗数据库服务器执行非授权的任意查询,从而进一步得到相应的数据信息。

二、模拟SQL注入

我们先创建一个简单的数据库和一个user表:

1
2
3
sql复制代码create database test;
use database test;
create table user(username varchar(20), password(20));

我们在表中插入两个数据:

1
2
sql复制代码insert into user values('zack', '123456');
insert into user values('rudy', '123456');

我们再看一个简单的Java程序:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
java复制代码package com.zack.sql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Scanner;

public class Sql {
//链接的url
private static String url = "jdbc:mysql:///all_test?serverTimezone=GMT";
//用户名
private static String user = "root";
//密码
private static String password = "123456";
//用于输入
Scanner sc = new Scanner(System.in);

public static void main(String[] args) throws Exception {
//加载驱动
Class.forName("com.mysql.jdbc.Driver");
//获取连接
Connection conn = DriverManager.getConnection(url, user, password);
//获取statment对象
Statement stat = conn.createStatement();

//要求用户输入用户名和密码
System.out.println("请输入你的用户名:");
String name = sc.next();
System.out.println("请输入密码:");
String pwd = sc.next();

//code1、通过用户输入的用户名和密码来查询数据库中是否存在该用户
ResultSet set = stat.executeQuery("select * from user where username = '" + name + "' and password = '" + pwd + "'");

//将匹配到的数据打印出来
while(set.next()) {
String username = set.getString("username");
String password = set.getString("password");
System.out.println("name:" + username + ", pwd:" + password);
}
}

}

我们看到code1,假设我们输入的如下:

1
2
bash复制代码zack
123456

这个正好是与我们数据库中匹配的,那么code1中执行的sql语句如下:

1
sql复制代码select * from user where username = 'zack' and password = '123456';

我们原本设想的是,如果输入不匹配的数据,将无法在数据库中查找到相应的东西,但是我们进行如喜爱输入:

1
2
sql复制代码zack' or '1' = '1
111

这个时候,code1中执行的语句如下:

1
sql复制代码select * from user where username = 'zack' or '1'='1' and password = '123456';

其中’1’ = ‘1’是恒为真的,所以这个sql语句不会再去判断密码是否正确,这样就完成了SQL注入攻击的效果。

三、如何防止SQL注入

防止sql注入的方法也非常简单,在jdbc中有一个sql语句预编译的对象,我们可以通过PrepareStatement类来实现。假设我们还是要执行查询操作,执行语句如下:

1
sql复制代码String sql = "select * from user where username = ? and password = ?";

这里我们使用“?”来表示字段的值。然后我们来创建一个PrepareStatement对象,这里和Statement有些不一样:

1
2
java复制代码//在创建PrepareStatement对象时,就传入了sql语句
PrepareStatement preStat = conn.prepareStatement(sql);

这里是在创建PrepareStatement对象时就传入了sql语句,而Statement是在执行查询操作时才传入sql语句。
因为我们已经传入了sql语句,所以在执行查询时不需要传入sql语句,但是要多一步匹配参数的操作:

1
2
3
4
java复制代码//将name的值替换到sql语句中第一个?
preStet.setString(1, name);
//将name的值替换到sql语句中第二个?
preStat.setString(2, pwd);

其中name和pwd是我们输入的字符串变量。接下来我们就可以进行查询操作了:

1
2
3
4
5
6
java复制代码ResultSet set = preStat.executeQuery();
while(set.next()) {
String username = set.getString("username");
String password = set.getString("password");
System.out.println("name:" + username + ", pwd:" + password);
}

这里操作和之前类似,只是不需要传入sql语句。完整代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码package com.zack.sql;

import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.Scanner;

public class Sql {

private static String url = "jdbc:mysql:///all_test?serverTimezone=GMT";
private static String user = "root";
private static String password = "123456";
private static Scanner sc = new Scanner(System.in);
private static String sql = "select * from user where username = ? and password = ?";

public static void main(String[] args) throws Exception {
//数据库操作
Class.forName("com.mysql.jdbc.Driver");
Connection conn = DriverManager.getConnection(url, user, password);
PreparedStatement stat = conn.prepareStatement(sql);

//要求用户输入用户名和密码
System.out.println("请输入你的用户名:");
String name = sc.next();
System.out.println("请输入密码:");
String pwd = sc.next();

//通过用户输入的用户名和密码来查询数据库中是否存在改用户
stat.setString(1, name);
stat.setString(2, pwd);
ResultSet set = stat.executeQuery();
while(set.next()) {
String username = set.getString("username");
String password = set.getString("password");
System.out.println("name:" + username + ", pwd:" + password);
}
}

}

四、总结

SQL注入是早期比较流行的一种攻击数据库的方式,但现在很少会直接使用jdbc进行数据库操作,更不会直接使用sql语句拼接的方式进行操作。所以大多数情况SQL注入都是无效的,可能在一比较老的网址还是有效的,大家可以尝试以下。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

postgresql之用户认证

发表于 2021-11-21

PG通过用$PGDATA目录下的pg_hba.conf文件来设置用户认证的方式。

pg_hba.conf文件的格式是一组纪录,每条一行。每条记录指定一个连接类型、一个数据库名称、一个用户名、一个客户端地址范围、认证方法。 如下所示:

1
2
3
4
5
6
7
8
9
10
11
css复制代码local         database  user  auth-method [auth-options]
host          database  user  address     auth-method  [auth-options]
hostssl       database  user  address     auth-method  [auth-options]
hostnossl     database  user  address     auth-method  [auth-options]
hostgssenc    database  user  address     auth-method  [auth-options]
hostnogssenc  database  user  address     auth-method  [auth-options]
host          database  user  IP-address  IP-mask      auth-method  [auth-options]
hostssl       database  user  IP-address  IP-mask      auth-method  [auth-options]
hostnossl     database  user  IP-address  IP-mask      auth-method  [auth-options]
hostgssenc    database  user  IP-address  IP-mask      auth-method  [auth-options]
hostnogssenc  database  user  IP-address  IP-mask      auth-method  [auth-options]

注意:当建立连接时,服务器将按顺序处理文件中的行,并使用与连接属性匹配的第一行来确定将使用的身份认证方法。如果验证失败,则不会考虑后续记录。

此篇,我们只关注pg它支持的auth method(身份认证方法)。

auth method 身份认证方法

Auth-method:指定连接与此条记录匹配时使用的身份认证方法。 可选项有: trust、reject、scram-sha-256、md5、password、gss、sspi、ident、peer、ldap、radius、cert、pam、bsd。

接下来我们来看几种比较常用的认证方式。

  • trust认证:即trust。只要知道数据库用户名就不需要密码或ident就能登录,建议不要在生产环境中使用。
  • 基于密码的身份验证。有:password、md5、scram-sha-256。
+ password:password以明文形式发送密码,因此很容易受到密码嗅探攻击。
+ md5:采用了一种challege-response机制,并在服务器上存储散列密码,多年来也一直是pg首选的密码散列机制。但是如果攻击者设法从服务器窃取了密码散列,则无法提供保护。md5现在被设为是不安全的认证方法。
+ scram-sha-256 :从10.0开始被引入,可以避免攻击者获得对散列的访问权时出现问题。是目前被认为安全的加密散列机制。
  • peer认证:Peer 认证方法通过从内核获得客户端的操作系统用户名并把它用作被允许的数据库用户名(和可选的用户名映射)来工作。这种方法只在本地连接上支持。
  • indent认证:从身份验证服务器获取客户端操作系统用户名,并将其用作允许的数据库用户名(具有可选的用户名映射)。这只在TCP/IP连接上支持。用法与peer认证类似。使用ident认证方式,需要具备同名用户或建立映射用户。建立映射用户需要配置pg_ident.conf文件。

从md5升级到SCRAM

上面我们提到md5是不安全的,scram-sha-256是安全的。出于安全考虑,我们也许会需要将服务器的认证方式从md5升级为scram-sha-256。

对于新上的实例我们可以使用scram-sha-256认证方法。但是存量实例应该怎么升级呢?scram-sha-256是从10.0引入的,如果想支持,需要确保服务器版本高于10.0,同时确保连接PostgreSQL数据库的客户端兼容了SCRAM。

1、修改password_encryption参数

在pg中,每个数据库用户的密码存储在pg_authid系统表中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sql复制代码postgres=# select * from pg_authid where  rolname='test_user';
-[ RECORD 1 ]--+------------------------------------
oid            | 18732
rolname        | test_user
rolsuper       | f
rolinherit     | t
rolcreaterole  | f
rolcreatedb    | f
rolcanlogin    | t
rolreplication | f
rolbypassrls   | f
rolconnlimit   | -1
rolpassword    | md5c203df9125a021758e4693720bd5e8a7
rolvaliduntil  |

我们看到test_user的rolpassword字段的前缀是md5,说明test_user的密码是采用md5方式加密的。用户的密码采用什么方式存储,取决于配置参数password_encryption。

查看下服务器password_encryption的值。

1
2
3
4
5
sql复制代码postgres=# show  password_encryption;
 password_encryption
---------------------
 md5
(1 row)

可以看到此时password_encryption的值是md5,那么升级第一步,就是要先把这个参数修改为scram-sha-256。

postgresql.conf

1
ini复制代码password_encryption = scram-sha-256       # scram-sha-256 or md5

reload

1
bash复制代码pg_ctl reload -D $PGDATA

用户重新设置密码

我们将password_encryption设置为scram-she-256之后,需要将用户的密码重置,这样用户的密码将会使用scram-sha-256进行加密存储。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sql复制代码postgres=# select * from pg_authid where  rolname='test_user';
-[ RECORD 1 ]--+------------------------------------
oid            | 18732
rolname        | test_user
rolsuper       | f
rolinherit     | t
rolcreaterole  | f
rolcreatedb    | f
rolcanlogin    | t
rolreplication | f
rolbypassrls   | f
rolconnlimit   | -1
rolpassword    | md5c203df9125a021758e4693720bd5e8a7
rolvaliduntil  |

postgres=# show password_encryption;
-[ RECORD 1 ]-------+--------------
password_encryption | scram-sha-256

postgres=# alter user test_user password '123456';
ALTER ROLE
postgres=# select * from pg_authid where  rolname='test_user';
-[ RECORD 1 ]--+--------------------------------------------------------------------------------------------------------------------------------------
oid            | 18732
rolname        | test_user
rolsuper       | f
rolinherit     | t
rolcreaterole  | f
rolcreatedb    | f
rolcanlogin    | t
rolreplication | f
rolbypassrls   | f
rolconnlimit   | -1
rolpassword    | SCRAM-SHA-256$4096:5r2cxn7R7pBAdwuW5vmBbQ==$zTphVvxaEiIktUFESJF9mERxARhc4xg2IIGYdHs9deQ=:9qUoKlrJ/fKKM9bYIazKlcCTY6RR/mJkhaGsoXODN8k=
rolvaliduntil  |

3、修改pg_hba.conf

修改pg_hba.conf 本地连接的method方法为scram-sha-256

1
sql复制代码local   all       all         scram-sha-256

注意:PG为了简化从md5方法到新的SCRAM方法的转换,如果在pg_hba.conf中将认证方法设置为md5,但服务器上用户的密码是SCRAM加密的,PG会自动选择基于SCRAM的认证。

参考:

www.cybertec-postgresql.com/en/from-md5…
www.modb.pro/db/40292

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

叮~IDEA2021激活教程,请查收!

发表于 2021-11-21

图片

做一个单纯的程序汪,分享干货,聊聊人生。

微信公众号:后端架构进阶

关注我发现更多的干货,微服务、Spring源码、JVM、SpringCloud Alibaba、K8S等。

如果你觉得本文对你有帮助,麻烦给我点个赞,感谢支持!

几个小伙伴后台说IDEA又过期了,所以特地上来更新一波。下载IDEA就不多说了,我的直接更新到了2021.2.3版本。大部分更新还是和之前的一样,只不过换了更新的方式。

这次我分享的是无限重置的方式,不用获取激活码,一个月后自动重新续期一个月。非常的简单方便。

一、准备工作

公号后台回复 IDEA2021 即可得到我们的破解的插件,目录如下所示:

图片

接下来,就是升级或者重新安装我们的 IDEA2021, 我安装的时候版本是2021.2.3, 所以你的版本如果更高可能不支持。

二、安装步骤

打开我们的插件安装。File - settings - plugins 具体如下图所示

图片

然后找到我们的安装包的位置,我习惯于放在我的IDEA安装目录的同级。图片

安装完成后,会出现如下的提示,这个时候,按提示操作即可\

图片

然后如果自动重启发现有这个提示的时候,那么恭喜你,重置的插件就安装成功了。

图片

每个月自动重置,就是这么牛逼。如果发现哪天过期了,那么怎么操作呢?别慌,1234,再来一次。哈哈哈!

喜欢的老铁记得关注我,每次有新的更新我都会第一时间去同步。很多小伙伴遇到问题也是第一时间就后台问我了,所以上来更新一波,感谢大家的支持和关注。

历史文章汇总

总结

以上就是之前的内容的总结和分享,感谢各位大佬的 关注、点赞和 收藏 !

微信公众号:后端架构进阶

更多文章正在赶来,喜欢记得给我点个 👍 ,感谢支持!

公众号文章同步更新!关注我,不迷路!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

数据优化-多层索引 数据优化-多层索引

发表于 2021-11-21

这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战

数据优化-多层索引

多层索引

创建

环境:Jupyter

1
2
3
4
5
6
python复制代码import numpy as np
import pandas as pd
a=pd.DataFrame(np.random.random(size=(4,4)),index=[['上半年','上半年','下半年','下半年'],
['一季度','二季度','三季度','四季度']],
columns=[['蔬菜','蔬菜','肉类','肉类'],['胡萝卜','白菜','牛肉','猪肉']])
display(a)

image-20211119143824001

设置索引的名称

1
2
3
4
5
6
7
8
python复制代码import numpy as np
import pandas as pd
a=pd.DataFrame(np.random.random(size=(4,4)),index=[['上半年','上半年','下半年','下半年'],
['一季度','二季度','三季度','四季度']],
columns=[['蔬菜','蔬菜','肉类','肉类'],['胡萝卜','白菜','牛肉','猪肉']])
a.index.names=['年度','季度']
a.columns.names=['大类','小类']
display(a)

image-20211119151038703

from_arrays( )-from_tuples()

1
2
3
4
5
6
python复制代码import numpy as np
import pandas as pd
index=pd.MultiIndex.from_arrays([['上半年','上半年','下半年','下半年'],['一季度','二季度','三季度','四季度']])
columns=pd.MultiIndex.from_tuples([('蔬菜','胡萝卜'),('蔬菜','白菜'),('肉类','牛肉'),('肉类','猪肉')])
a=pd.DataFrame(np.random.random(size=(4,4)),index=index,columns=columns)
display(a)

image-20211119151807886

笛卡儿积方式

from_product() 局限性较大

1
2
3
4
python复制代码import pandas as pd
index = pd.MultiIndex.from_product([['上半年','下半年'],['蔬菜','肉类']])
a=pd.DataFrame(np.random.random(size=(4,4)),index=index)
display(a)

image-20211119152719775

多层索引操作

Series

1
2
3
4
5
6
7
python复制代码import pandas as pd
a=pd.Series([1,2,3,4],index=[['a','a','b','b'],['c','d','e','f']])
print(a)
print('---------------------')
print(a.loc['a'])
print('---------------------')
print(a.loc['a','c'])

image-20211119153405471

1
2
3
4
5
6
7
8
9
python复制代码import pandas as pd
a=pd.Series([1,2,3,4],index=[['a','a','b','b'],['c','d','e','f']])
print(a)
print('---------------------')
print(a.iloc[0])
print('---------------------')
print(a.loc['a':'b'])
print('---------------------')
print(a.iloc[0:2])

image-20211119153644981

DataFrame

1
2
3
4
5
6
7
8
9
10
python复制代码import numpy as np
import pandas as pd
a=pd.DataFrame(np.random.random(size=(4,4)),index=[['上半年','上半年','下半年','下半年'],
['一季度','二季度','三季度','四季度']],
columns=[['蔬菜','蔬菜','肉类','肉类'],['胡萝卜','白菜','牛肉','猪肉']])
print(a)
print('--------------------')
print(a.loc['上半年','二季度'])
print('--------------------')
print(a.iloc[0])

image-20211119154634116

交换索引

swaplevel( )

1
2
3
4
5
6
7
8
9
python复制代码import numpy as np
import pandas as pd
a=pd.DataFrame(np.random.random(size=(4,4)),index=[['2021','2021','2022','2022'],
['一季度','二季度','三季度','四季度']],
columns=[['蔬菜','蔬菜','肉类','肉类'],['胡萝卜','白菜','牛肉','猪肉']])
a.index.names=['年度','季度']
print(a)
print('--------------------')
print(a.swaplevel('年度','季度'))

image-20211119160130816

索引排序

sort_index( )

level:指定根据哪一层进行排序,默认为最层

inplace:是否修改原数据。默认为False

1
2
3
4
5
6
7
8
9
10
11
python复制代码import numpy as np
import pandas as pd
a=pd.DataFrame(np.random.random(size=(4,4)),index=[['2021','2021','2022','2022'],
[1,3,2,4]],
columns=[['蔬菜','蔬菜','肉类','肉类'],['胡萝卜','白菜','牛肉','猪肉']])
a.index.names=['年度','季度']
print(a)
print('--------------------')
print(a.sort_index())
print('--------------------')
print(a.sort_index(level=1))

image-20211119160714806

索引堆叠

stack( )

将指定层级的列转换成行

1
2
3
4
5
6
7
8
9
10
python复制代码import numpy as np
import pandas as pd
a=pd.DataFrame(np.random.random(size=(4,4)),index=[['2021','2021','2022','2022'],
[1,3,2,4]],
columns=[['蔬菜','蔬菜','肉类','肉类'],['胡萝卜','胡萝卜','牛肉','牛肉']])
print(a)
print('--------------------')
print(a.stack(0))
print('--------------------')
print(a.stack(-1))

image-20211119161612152

取消堆叠

unstack( )

将指定层级的行转换成列

fill_value:指定填充值。

1
2
3
4
5
6
7
8
9
10
11
python复制代码import numpy as np
import pandas as pd
a=pd.DataFrame(np.random.random(size=(4,4)),index=[['2021','2021','2022','2022'],
[1,3,2,4]],
columns=[['蔬菜','蔬菜','肉类','肉类'],['胡萝卜','胡萝卜','牛肉','牛肉']])
print(a)
print('--------------------')
a=a.stack(0)
print(a)
print('--------------------')
print(a.unstack(-1))

image-20211119162659896

1
2
3
4
5
6
7
8
9
10
11
python复制代码import numpy as np
import pandas as pd
a=pd.DataFrame(np.random.random(size=(4,4)),index=[['2021','2021','2022','2022'],
[1,3,2,4]],
columns=[['蔬菜','蔬菜','肉类','肉类'],['胡萝卜','胡萝卜','牛肉','牛肉']])
print(a)
print('--------------------')
a=a.stack(0)
print(a)
print('--------------------')
print(a.unstack(0,fill_value='0'))

image-20211119162928207

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

interrupted()和isInterrupted()还

发表于 2021-11-21

「这是我参与11月更文挑战的第21天,活动详情查看:2021最后一次更文挑战」

前言

当提及如何终止一个线程时,部分读者通常立马想到的方法肯定是stop(),但是stop()方法并不被推荐使用(很多规范中是禁止使用的),其原因是强制终止一个线程,会导致程序不正常的结束,会出现资源未正确释放、程序结果不正确等等问题。而是否终止一个线程应该把这个控制权转交给当前被终止的线程本身,此时采用的办法就是 ****interrupt()方法来终止,该方法相当于修改一个共享变量的值,当运行中的线程判断当前值为false则继续运行,如果有地方调用当前thread的interrupt()方法,那么这个值将变为true,此时当前线程可以根据这个值的修改来正确的终止线程的运行。

API

在java.lang.Thread中主要提供了如下与线程中断相关的方法,其具体方法名与主要作用如下表所示。

方法名 方法作用
public void interrupt() 中断此线程
public static boolean interrupted() 测试当前线程是否被中断,该方法会恢复(清除)中断标志
public boolean isInterrupted() 测试当前线程是否被中断,该方法只会获取中断标志,不会恢复(清除)中断标志
private native boolean isInterrupted(boolean ClearInterrupted); interrupted()和isInterrupted()最终调用,该方法是native本地方法,在jvm中具体实现,也是获取线程中断标志真正调用的方法,参数ClearInterrupted意思是是否恢复(清除)中断标志

源码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
scss复制代码/**
* 中断此线程
*/
public void interrupt() {
if (this != Thread.currentThread())
checkAccess();

synchronized (blockerLock) {
Interruptible b = blocker;
if (b != null) {
interrupt0(); // Just to set the interrupt flag
b.interrupt(this);
return;
}
}
interrupt0();
}

/**
* 测试当前线程是否被中断,返回中断标志
*/
public static boolean interrupted() {
return currentThread().isInterrupted(true);
}

/**
* 测试当前线程是否被中断,返回中断标志
*/
public boolean isInterrupted() {
return isInterrupted(false);
}

/**
* 线程是否被中断native方法,ClearInterrupted为是否清除中断标志参数
*/
private native boolean isInterrupted(boolean ClearInterrupted);

/**
* 中断当前线程的native方法
*/
private native void interrupt0();

interrupted()和isInterrupted()区别

看了上述API讲述和Thread中的源码,已经清楚interrupted()和isInterrupted()的主要区别了

  1. interrupted()为静态方法,isInterrupted()为普通方法
  2. interrupted() 返回中断标志且清除(恢复)中断标志,isInterrupted()仅返回中断标志

使用方法

我们先验证中断异常响应,通过如下两种方法的使用示例来介绍,注意Runner中的run方法的部分区别

方法一

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
java复制代码package com.liziba.p7;

import java.util.concurrent.TimeUnit;

/**
* <p>
*
* </p>
*
* @Author: Liziba
* @Date: 2021/6/24 21:05
*/
public class ThreadInterruptedDemo {

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runner(), "Thread-01");
t1.start();
// 主线程睡眠1秒,保证t1的充分执行
TimeUnit.SECONDS.sleep(1);
// 发起中断
t1.interrupt();
}

static class Runner implements Runnable {

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + " is running .");
}
}
}

}

输出结果

可以看到线程在执行数次后终止运行

方法二

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
java复制代码package com.liziba.p7;

import java.util.concurrent.TimeUnit;

/**
* <p>
*
* </p>
*
* @Author: Liziba
* @Date: 2021/6/24 21:18
*/
public class ThreadInterruptedDemo {

public static void main(String[] args) throws InterruptedException {
Thread t1 = new Thread(new Runner(), "Thread-01");
t1.start();
// 主线程睡眠2秒,保证t1的充分执行
TimeUnit.SECONDS.sleep(1);
// 发起中断
t1.interrupt();
}

static class Runner implements Runnable {

@Override
public void run() {
while (!Thread.currentThread().isInterrupted()) {
System.out.println(Thread.currentThread().getName() + " is running .");
try {
// 睡眠2秒,保证主线程发起的中断能被捕获
TimeUnit.SECONDS.sleep(2);
} catch (InterruptedException e) {
// 不对中断做任何处理,try住异常,打印
e.printStackTrace();
}
}
}
}

}

输出结果

可以看到main线程中发起的t1线程中断,被捕获住异常后,未做任何处理,线程继续持续不断的运行

总结上述两种方式

方法一和方法二,均通过判断Thread.currentThread().isInterrupted()的值来运行run方法中的逻辑,Thread.currentThread().isInterrupted()在线程未中断时返回false,当main线程中执行 t1.interrupt()时,线程t1被中断,Thread.currentThread().isInterrupted()的值变为false;在方法一中,获取到这个变化后直接结束运行;在方法二中,由于sleep()使得线程阻塞会响应中断,但是此时我仅仅catch住异常,并没有对中断做任何处理,这里有个知识点是,线程响应中断抛出异常时,会恢复(清除)中断标志,所以t1.interrupt()对中断标志的修改又被恢复了,程序仍然不断的运行。

\

接下来我们来验证interrupted()对于中断的标志的清除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
java复制代码package com.liziba.p7;

import java.util.concurrent.TimeUnit;

/**
* <p>
* isInterrupted()
* </p>
*
* @Author: Liziba
* @Date: 2021/6/24 21:20
*/
public class ThreadInterruptDemo2 {

public static void main(String[] args) throws InterruptedException {

Thread thread = new Thread(new Runner(), "Thread-1");
thread.start();
TimeUnit.SECONDS.sleep(2);
thread.interrupt();
}


static class Runner implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() +" interrupted flag is " + Thread.currentThread().isInterrupted());

while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println(Thread.currentThread().getName() + " is running .");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// 响应中断,抛出异常后中断位置会被复位,自己中断自己
Thread.currentThread().interrupt();
// 这里调用isInterrupted()获取当前的中断标志
System.out.println(Thread.currentThread().getName()
+" interrupted flag is " + Thread.currentThread().isInterrupted());
}
}
}
}

}

输出结果

这里证明interrupted()不清楚中断标志,线程在获取到 thread.interrupt()发起中断后,执行结束。

将上述catch中的Thread.currentThread().isInterrupted()修改为Thread.interrupted()再次运行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
java复制代码package com.liziba.p7;

import java.util.concurrent.TimeUnit;

/**
* <p>
*
* </p>
*
* @Author: Liziba
* @Date: 2021/6/24 21:23
*/
public class ThreadInterruptDemo2 {


public static void main(String[] args) throws InterruptedException {

Thread thread = new Thread(new Runner(), "Thread-1");
thread.start();
TimeUnit.SECONDS.sleep(2);
thread.interrupt();
}

// 区别在catch中
static class Runner implements Runnable {

@Override
public void run() {
System.out.println(Thread.currentThread().getName() +" interrupted flag is " + Thread.currentThread().isInterrupted());

while (!Thread.currentThread().isInterrupted()) {
try {
System.out.println(Thread.currentThread().getName() + " is running .");
TimeUnit.SECONDS.sleep(1);
} catch (InterruptedException e) {
// 响应中断,抛出异常后中断位置会被复位,自己中断自己
Thread.currentThread().interrupt();
// 注意区别在这里
System.out.println(Thread.currentThread().getName()
+" interrupted flag is " + Thread.interrupted());
}
}
}
}

}

输出结果

线程也响应到了 thread.interrupt()的中断,但是由于catch中调用了Thread.interrupted(),对中断标志进行了清除,所以!Thread.currentThread().isInterrupted()判断仍然等于true,线程继续不断的运行

看到这里,应该已经理解了这两个方法的主要区别和其使用,最后我们来看下一个源码中的使用案例。我们通过观看AbstractQueuedSynchronizer(AQS)中的await()方法,来看其在源码中的使用。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
scss复制代码public final void await() throws InterruptedException {
// 判断当前线程是否被中断,如果被中断则恢复中断标志
if (Thread.interrupted())
throw new InterruptedException();
Node node = addConditionWaiter();
int savedState = fullyRelease(node);
int interruptMode = 0;
while (!isOnSyncQueue(node)) {
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}

AbstractQueuedSynchronizer(AQS)源码中使用静态Thread.interrupted(),判断当前线程是否被中断,并恢复中断标志,如果线程已被中断则抛出InterruptedException中断异常。清除标志位的作用就是为了当前线程响应过中断后,再次进入的时候可以进行后续操作。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【k8s 系列】k8s 学习五,k8s 集群部署尝试

发表于 2021-11-21

「这是我参与11月更文挑战的第 20 天,活动详情查看:2021最后一次更文挑战」

K8S 部署方式有很多,有的方式不太友好,需要注意很多关键点,有的方式对小白比较友好,部署简单方便且高效

二进制源码包的部署方式

使用 二进制源码包的方式部署会比较麻烦,大概分为如下几步:

  • 获取源码包
  • 部署在 master 节点和 worker 节点上
  • 启动相应节点的关键服务

master 节点上

api-server ,分布式存储 例如 etcd,scheduler,controller manager

woker 节点上

kubelet,kube-proxy,docker

在 K8S 中,各个组件之间互相访问都是建议通过证书进行访问的,所以我们也需要生成相应的证书

  • 生成证书

例如 HTTP 的,HTTPS 的

涉及的组件和工具比较多,我们可以后续对 K8S 比较熟之后,再来尝试源码包的方式

使用 kubeadm 方式部署

官方是推荐使用 minikube 来部署,其实他也是通过 kubeadm 来部署的,使用 kubeadm 的部署方式就相对简单,不过步骤也是不少,有:

  • 需要安装 kube 对应的软件,kubelet,kubeadm,kubectl
  • 初始化整个 K8S 集群
  • 添加节点到集群中

执行上述 3 步,我们就可以将 K8S 集群搭建部署起来

  • 生成证书是自动的,不需要我们去干预
  • master 节点是用来管理集群容器的,所有的容器都是运行在 master 节点中,worker 节点是用来执行任务的
  • 关于容器镜像的获取

我们可以直接通过谷歌下载镜像,或者是通过国内阿里云来下载镜像

点我查看 kubeadm

基础环境准备

准备 3 台虚拟机,买云服务器也是可以的,定义好主机名

  • master
  • node1
  • node2

可以这么设置主机名

hostnamectl set-hostname master

hostnamectl set-hostname node1

hostnamectl set-hostname node2

可以通过 hostname 查看主机名

1
2
shell复制代码# hostname
master

安装 docker

可以查看我之前的 docker 文章 【Docker 系列】docker 学习 一,Docker的安装使用及Docker的基

需保证 master 节点和 worker 节点的 docker 版本一致,如:

需要关闭防火墙

1
2
3
4
5
6
7
8
9
shell复制代码// 查看防火墙的当前状态
sudo ufw status

// 启用和禁用防火墙
sudo ufw enable
sudo ufw disable

// 禁用防火墙并删除所有防火墙规则
sudo ufw reset

关闭 selinux

1
2
3
4
shell复制代码sudo setenforce 0 
或者
vim /etc/selinux/config
将 SELINUX 设置为 permissive

禁用 swap

1
shell复制代码sudo swapoff -a

安装 kubeadm 等工具

点我查看参考地址

开始安装 kubeadm、kubelet、kubectl 工具

  • 先安装 k8s 的源文件
1
2
3
4
5
6
7
shell复制代码sudo apt-get update && sudo apt-get install -y apt-transport-https curl

sudo curl -s https://mirrors.aliyun.com/kubernetes/apt/doc/apt-key.gpg | sudo apt-key add -

sudo tee /etc/apt/sources.list.d/kubernetes.list <<-'EOF'
deb https://mirrors.aliyun.com/kubernetes/apt kubernetes-xenial main
EOF
  • 查看 kubeadm 可用版本,选一个合适的

# apt-cache madison kubeadm

这里我们就安装最新的版本吧

1
2
3
4
5
6
7
8
php复制代码// 安装最新版本
# shell sudo apt-get install -y kubelet=1.22.4-00 kubeadm=1.22.4-00 kubectl=1.22.4-00

// 设置标签
# sudo apt-mark hold kubelet=1.22.4-00 kubeadm=1.22.4-00 kubectl=1.22.4-00

// 设置开启启动 kubelet 并 运行 kubelet
# sudo systemctl enable kubelet && sudo systemctl start kubelet

我们简单的运行一下 kubelet 可以看到版本安装是正确的

image-20211120222647806

可是 kubelet 本应该是在后台就会启动的 ,为什么会启动异常呢 , 我们可以查看官方文档,得知需要配置 cgroup 驱动程序

在本地创建一个 yaml 文件,kubeadm-config.yaml

配置上自己的 kubelet 的版本 v1.22.4

1
2
3
4
5
6
7
yaml复制代码kind: ClusterConfiguration
apiVersion: kubeadm.k8s.io/v1beta3
kubernetesVersion: v1.22.4
---
kind: KubeletConfiguration
apiVersion: kubelet.config.k8s.io/v1beta1
cgroupDriver: systemd

执行命令 kubeadm init --config kubeadm-config.yaml

若发现有如下报错,则是因为我们的机器 CPU 的核数小于 2 ,kubeadm 要求 我们的环境 CPU 需要是 2 个 ,赶紧换一个配置高一点的机器吧

image-20211120223723183

若配置没有异常,CPU 核数够,那么将会执行成功

今天就到这里,学习所得,若有偏差,还请斧正

欢迎点赞,关注,收藏

朋友们,你的支持和鼓励,是我坚持分享,提高质量的动力

好了,本次就到这里

技术是开放的,我们的心态,更应是开放的。拥抱变化,向阳而生,努力向前行。

我是小魔童哪吒,欢迎点赞关注收藏,下次见~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…255256257…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%