开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

漫画:如何判断一个数是否在40亿个整数中?

发表于 2021-09-09

文章来源于:www.iamshuaidi.com,一个专注于校招,面试,面经的编程网站

题目:我有40亿个整数,再给一个新的整数,我需要判断新的整数是否在40亿个整数中,你会怎么做?

【请教大神】

小史回到学校,把面试的情况和计算机学院的吕老师说了一下。

小史忙拉着吕老师问,为什么我说分8次加载数据,面试官会说太慢了呢?

吕老师:哈哈,从磁盘加载数据是磁盘io操作,是非常慢的,你每次都要加载这么大的数据,还要8次,我估计你找一个数的时间可以达到分钟甚至小时级了。

小史:那如果是你,你会怎么办呢?

吕老师:其实面试官已经提示得比较明显了,他说给你一批机器,就是暗示你可以用分布式算法。你把数据分散在8台机器上,然后来一个新的数据,8台机器一起找,最后再汇总结果就行了。

小史:这样的话能快多少?

吕老师:这样应该能达到秒级。小史,你可以自己分析分析。

小史:我想想……哦,这样做的话,因为每台机器都可以一次性把数据读入内存,在比较的时候不用来回加载数据了,所以可以节省加载数据的开销!这真是个好办法。

【更好方案】

吕老师:其实这并不是最好方法,我这还有一种毫秒级的方法,想不想知道啊?

小史:当然想啊,快教教我。

小史:哦,对哦,这样我就申请40亿个位就好了,新的数转换成一个位,然后判断一下这个位是0还是1就行了。

吕老师:小史啊,考虑问题要考虑清楚啊,如果是40亿个位,那么这40亿个位哪些是0,哪些是1呢?来了一个新的数,怎么判断是否在40亿个位之中?

小史:我想想,对啊,40亿个位,40亿个数,那么每个位都是1,这。。。

吕老师:其实你可以想想,32位int的范围,总共就是2的32次方,大概42亿多点。所以你可以申请2的32次方个位。

小史:意思是我把整个整数范围都覆盖了,哦,对哦。这样一来,就可以做了,1代表第一个位,2代表第二个位,2的32次方代表最后一个位。40亿个数中,存在的数就在相应的位置1,其他位就是0。

吕老师:没错,那来了一个新的数呢?

小史:新的数就去找相应的位,比如来了一个1234,就找一下第1234位,如果是1就存在,是0就不存在啦。

吕老师:没错,那么这样的话,需要多大内存呢?

小史:我想想啊,2的32次方个位,相当于2的29次方个字节,哇,才500MB,真是节省了不少内存呢。

小史:这么厉害的算法,你是怎么想到的?

吕老师:其实这是一种非常有名的大数据算法,叫位图法,英文名叫bitmap。顾名思义,就是用位来表示状态,从而节省空间。明天正好我有一节课,就讲位图法,你可以来听一听。

【吕老师的课】

第二天,吕老师开始上课,他一开始就抛出了小史遇到的面试题。

吕老师:同学们,这道题是BAT公司的一道面试题,大家有什么思路吗?

话音刚落,蛋哥就站起来回答。蛋哥是吕老师最得意的门生,以思维活跃著称。

蛋哥:我觉得可以这样。首先,32位int的范围是42亿,40亿整数中肯定有一些是连续的,我们可以先对数据进行一个外部排序,然后用一个初始的数和一个长度构成一个数据结构,来表示一段连续的数,举个例子。

如果数据是1 2 3 4 6 7……这种的,那么可以用(1,4)和(6,2)来表示,这样一来,连续的数都变成了2个数表示。 来了一个新数之后,就用二分法进行查找了。

这样一来,最差情况就是2亿多的断点,也就是2亿多的结构体,每个结构体8个字节,大概16亿字节,1.6GB,在内存中可以放下。

吕老师:嗯,非常好,不仅给出了方案,还能主动分析空间和可行性。

小史听完后深感佩服,问题的解决方法绝对不止一种,只要肯动脑筋,即使没有学过bitmap算法,也能有别的方法来解决问题。

【课后】

下课后,小史又找到吕老师。

吕老师:但是你的理解能力还是很强的,很多东西一听就懂,这可不是谁都能做到的。

大家好,我是帅地,目前也正在更面试,面经,算法 等硬核文章,点击我的头像,你会发现相见恨晚,如果觉得文章不过,也别吝啬你的赞哦,嘻嘻

更多面试现场文章:

1. 如何判断一个数是否在40亿个整数中?

2. 如何实现可以获取最小值的栈?

3. 记一次 shopee 面试:最小栈的最优解

4. 为什么要分稳定排序和非稳定排序?

5. 如何编程解决华容道问题?

6. 如何找到字符串中的最长回文子串?

7. 如何在500w个单词中统计特定前缀的单词有多少个?

8. 如何在10亿数中找出前1000大的数?

9. 如何编程获得最多的年终红包奖?

10. 如何编程解决朋友圈个数问题?

11. 如何设计可自学习的五子棋AI?

12. 为什么MySQL数据库要用B+树存储索引?

13. 记一道字节跳动面试:变形的链表反转

14. 记一次手撕算法面试:字节跳动的面试官把我四连击了

15. 记一次阿里笔试:一行代码解决约瑟夫环问题的

16. 记一次阿里面试:面试挂在了 LRU 缓存算法设计上

17. 记一次网易笔试:前缀和的应用

18. 游戏中的敏感词过滤是如何实现的?

19. 如何只用2GB内存从20/40/80亿个整数中找到出现次数最多的数

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Fastapi框架实战系列-纯个人使用之脚手架搭建篇 1:开

发表于 2021-09-09

1:开篇

1.在实践中检验真理 在实践中发展真理

陆陆续续的整理整个学习Fastapi框架一些小笔记的过程发现,自己其实对某些知识点掌握到实际应用的时候还是容易忘记,而且部分知识点也是难以理解和使用,熬了一段时间之后,也算是完整了公司内部的后台从flask迁移到Fastapi框架上。

再迁移的过程中深知,对于python异步的使用,还是需要多多实践,才能深入去理解和应用asyncio异步IO的使用,也累积了自己一点使用这个框架过程中一些使用小经验。

为了检验自己的知识掌握程度和系统化的进行学习和使用这个框架,另外不辜负关注我的公众号的粉丝们(虽然没多少个,但是仍然感激他们的不离不弃,毕竟我写的东西感觉太凌乱,太烂了~哈哈)之前说有没有实践,所以不管怎么样,还是把自己这些时间整理的脚手架搭建的一些知识点再进行系统的补充一下。

2.内容概要

相关的文章叙述主要通过实战叙述我们日常一些 API 构建过程一些流程,其实无非而已是开发-测试-部署三个范畴:

  • 开发环境的搭建(docker环境搭建)
  • API设计一些规范描述
  • 对脚手架个功能点的整理
+ 项目配置文件处理
+ API日志记录处理
+ 异步redis缓存处理
+ 同步数据库整合使用
+ 异步数据库的整合和使用
+ 全局错误异常处理
+ 全局的Http请求响应报文的处理
+ 扩展第三方插件-限流器
+ 扩展第三方的插件-错误统计处理
+ 扩展-第三方插件-全局的认证JWT
+ 扩展-第三方插件-消息队列的整合
+ API版本的规划和处理
+ Api相关的单元测试和引入相关性能分析
+ 可能的话(还是带出K8S的应用(我还在学习中~))
  • 然后就是完整一个API服务的部署,这里主要是结合Docker和Drone进行相关部署

内容上,可能也会随时改变,但是整体可能还是会围绕上面几个概要点展开。

以上几点是大概接下来我的文章我自己会去整理的,因为我的表达能力有限,可能有些地方诉述会词不达意,希望海涵,也希望各位大佬批评指出。

PS:脚手架纯属个人经验的不断优化累积的沉淀结果,属于个人经验之谈,仅供参考!如有错误,还请各位大佬指正!

3.关于脚手架

3.1 序言

首先,经过之前一系列的基础知识的学习,大致上如何把这个框架应用到我们的自己的业务环境中的,就需要适当进行整合和封装一下,让它更加快捷方便自己的业务开放。

如何的去封装整合一个属于合适自己的脚手架,这个要看自己喜欢是没有的风格了吧,另外如果公司中也有相关的规范要求的话,那你只能根据公司的规范要求来设计。

但是总体上,我个人觉得一个脚手架其实是离不开几点要素,我自己的话,则封装一些常用几个功能(其实也是上面提到几个要点):

  • 支持 Swagger 接口文档生成(fastapi纯天然自带基因)
  • 需要支持对应的日志收集-我这里使用loguru进行日志记录
  • 扩展支持相关的JWT的接口鉴权验证
  • 支持相关的接口的cors跨域支持
  • 对中间件扩展的支持(但是fastapi中间件扩展如何涉及读取reques中的body等信息会有问题,需要考虑自己的需求)
  • 如果你喜欢配置文件类型的读取的方式,还可以进行相关配置文件解析
  • 全局的统一响应返回体(无论是错误还是正常的)
  • 内部的日志的自定义traceid链路追踪(包含第三方的请求处理)
  • 支持相关的rate 接口限流(基于redis)
  • 支持全局的异常捕获,可以进行相关通知
  • sentry异常的捕获上传处理

3.2 脚手架整体结构说明

因为如果项目是针对单一的项目,不是一个大型的项目的话,可以不需提取公共的模块出来,如果有有必要的公共的模块想共享使用的话,就可以提取公共的出来!我这里暂时不提炼出来。

以下是我的自己脚手架的整体的一个结构:

PS:纯属个人的组织方式,仅供参考
图片.png

3.3计划基于脚手架之上会案例结构小示例:

  • 使用异步的方式进行对接三地方接口(天气预报接口)
  • 基于Vue制作一个简单的用户管理系统(使用别人的Vue模板是最快滴)
  • 长远计划,弄一个数据自己的后台系统

2.搭建开始

一个脚手架的搭建首先从规划我们的项目结构开始(当然需要根据自己企业自身的而定咯)。
首先就是如下截图,是我自己规划的结构图:

图片.png

规划好的项目结构之后,那接下里我们第一步就是开始先定义一个我们自己的一个fastapi的对象以及初始化这个基础的对象的时候一些配置信息读取。

因为之前使用flask,习惯了那种工厂模式方式来初始化我们的对象,所以这里也沿袭了那种风格来定义:

图片.png

从上面的可以大概理解出我们的整个的应用启动的时候,需要哪些配件了!

2.1 全局配置文件说明

因为启动一个APP对象的时候,涉及到一些配置信息的时候,我们的需要统一去管理,为了更细致区分,甚至我自己划分了更明晰的配置功能:

图片.png

比如上图所示的,有:

  • 全局认证的配置信息
  • 文档配置信息
  • 数据库的配置信息
  • redis的配置信息

PS:通常生产环境,根据个人需求吧,为了安全性,肯定是把一些重要信息通过写入环境变量来读取!可以结合一下读取环境变量的方式来解析对应的配置信息!

dotenv 这个库可以了解一下!

主要配置文件信息有:

auth_conf.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
python复制代码#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : auth_conf
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/6/9
-------------------------------------------------
修改描述-2021/6/9:
-------------------------------------------------
"""
from functools import lru_cache
from pydantic import BaseSettings
import pprint
import secrets
from typing import List
pp = pprint.PrettyPrinter(indent=4)


class AuthUrlSettings(BaseSettings):
# token相关-加密算法
JWT_ALGORITHM: str = "HS256" #
# 秘钥生成
# JWT_SECRET_KEY: str = secrets.token_urlsafe(32) # 随机生成的base64位字符串
JWT_SECRET_KEY: str = '09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3e7'
# token配置的有效期
JWT_ACCESS_TOKEN_EXPIRE_MINUTES: int = 60 * 24 * 3 # token的时效 3 天 = 60 * 24 * 3
JWT_REFRESH_EXPIRES_DAYS: int = 1

# 跨域设置
ORIGINS: List[str] = ["*"]

class Config:
env_file = ".env"
case_sensitive = True
env_file_encoding = 'utf-8'

ADMIN_WHILE_ROUTE = [
# '/sys/user/logout',
'/5gmsg/sys/user/login',
'/nw/sys/user/login',
'/',
'/check',
'/check23',
'/jcg_admin/api/v1/login',
'/websocket/1',
'/openapi_url',
'/nw/sys/user/login',
'/nw/sys/user/loginceshi'
]


@lru_cache()
def get_auth_settings():
return AuthUrlSettings()


auth = get_auth_settings()

docs_conf.py(初始化我们的Fastapi对象的时候使用到):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
ini复制代码from functools import lru_cache
from pydantic import BaseSettings
import pprint

pp = pprint.PrettyPrinter(indent=4)


class DocsSettings(BaseSettings):
"""配置类"""
API_V1_STR: str = ""
# 文档接口描述相关的配置
DOCS_URL = API_V1_STR + '/docs'
REDOC_URL = API_V1_STR + '/redocs'
# OPENAPI_URL配置我们的openapi,json的地址
OPENAPI_URL = API_V1_STR + '/openapi_url'
# 接口描述
TITLE = "5G消息管理系统后台"
# 首页描述文档的详细介绍信息
DESC = """
`xxxxxx消息管理系统后台用`
- 前端:使用 ANT VBEN的框架进行搭建
- 后端: 同步模式的多线程模式+ 单线程模式的协程模式
- 技术栈 :FastAPI+ POSTGRESQL+自制ORM

**具体项目描述**
- [1] [系统管理后台模块]
- [2] [商户管理模块]
- [3] [消息模板模块]
"""
TAGS_METADATA = [
{
"name": "后台管理系统",
"description": "后台所有的公司的相关的权限管理",
},
# {
# "name": "xxxxxx消息管理模块",
# "description": "后台所有的公司的相关的权限管理",
# },
# {
# "name": "xxxxx",
# "description": "xxxxx后台所有的公司的相关的权限管理",
# "externalDocs": {
# "description": "子文档信息",
# "url": "https://fastapi.tiangolo.com/",
# },
# },

]
# 配置代理相关的参数信息
SERVERS = [
{"url": "/", "description": "本地调试环境"},
{"url": "https://xx.xx.com", "description": "线上测试环境"},
{"url": "https://xx2.xx2.com", "description": "线上生产环境"},
]


@lru_cache()
def get_settings():
return DocsSettings()


# 配置实例的对象的创建
docs = get_settings()

pgdb_conf.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
ini复制代码from functools import lru_cache
from pydantic import BaseSettings
import pprint
pp = pprint.PrettyPrinter(indent=4)

class DatabaseSettings(BaseSettings):
DEPLOY_HOST: str = '0.0.0.0'
DEPLOY_PORT: int = 8888
DEPLOY_DEBUG: bool = False
DEPLOY_RELOAD: bool = False
DEPLOY_ACCESS_LOG: bool = False


@lru_cache()
def get_settings():
return DatabaseSettings()

# 配置实例的对象的创建
pgconf = get_settings()

redis_conf.py:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
python复制代码from functools import lru_cache
from pydantic import BaseSettings
import pprint
from pydantic import AnyUrl, BaseSettings
import os
pp = pprint.PrettyPrinter(indent=4)


class RedisSettings(BaseSettings):
DEPLOY_HOST: str = '0.0.0.0'
DEPLOY_PORT: int = 8888
DEPLOY_DEBUG: bool = False
DEPLOY_RELOAD: bool = False
DEPLOY_ACCESS_LOG: bool = False

# redis://:root12345@127.0.0.1:6379/0?encoding=utf-8
redis_url: AnyUrl = os.environ.get("REDIS_URL", "redis://127.0.0.1:6379/0?encoding=utf-8")
redis_password: str = os.getenv("REDIS_PASSWORD", "")
redis_db: int = int(os.getenv("REDIS_DB", "0"))

# 哨兵机制的链接的配置
use_redis_sentinel: bool = (
True if os.getenv("REDIS_USE_SENTINEL", "0") == "1" else False
)
redis_sentinel_port: int = int(os.getenv("REDIS_SENTINEL_PORT", "26379"))
redis_sentinel_url: str = os.getenv("REDIS_SENTINEL_URL", "")
redis_sentinel_password: str = os.getenv("REDIS_SENTINEL_PASSWORD", "")
redis_sentinel_master_name: str = os.getenv(
"REDIS_SENTINEL_MASTER_NAME", "molmaster"
)


@lru_cache()
def get_settings():
return RedisSettings()


# 配置实例的对象的创建
redisconf = get_settings()

以上是是所以关于配置信息说明:

2.2 App示例对象创建

一个Fastapi的实例就是一个应用服务,这个服务需要哪些配件,再启动的时候就需要配置好。所以有了以下规划:

图片.png

主要内容就是:

创建一个APP对应的时候,顺便实例化我们的一些插件或导入的API服务等,一下是详细的方法的说明:

图片.png

所以我们的展开的时候也会按上面的相关注册功能模块展开。

2.2.1 注册日志模块的处理

在一个应用中日志对我们的后续的异常定位是必不可缺的一部分,所以我们的需要对我们的相关的请求日志做好写入本地,便于后续回溯问题定位。

这里的日志模块处理,我使用的是loguru,这个库其实是挺不错的日志库,也支持异步的写入,所以再异步里的,感觉使用上应该是很理想的。

再定义日志模块之前,需要考虑的问题点:

  • 日志存贮目录
  • 日志记录格式
  • 日志切割处理

那基于上述几个问题,我们把我们的日志记录,也进行一个插件化的方式来引入。

图片.png

所以再我们的ext模块下,就有对应日志插件的处理:

图片.png

  • 定义日志:

loger_config.py 文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
python复制代码import time
# 封装一下关于记录序号的日志记录用于全链路的日志请求的日志
from datetime import datetime
from loguru import logger


def creat_customize_log_loguru(pro_path=None):
'''
:param pro_path: 当前需要生产的日志文件的存在路径
:return:
'''
import os
if not pro_path:
# BASE_DIR = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
pro_path = os.path.split(os.path.realpath(__file__))[0]
# 定义info_log文件名称
log_file_path = os.path.join(pro_path, 'log/info_{time:YYYYMMDD}.log')
# 定义err_log文件名称
err_log_file_path = os.path.join(pro_path, 'log/error_{time:YYYYMMDD}.log')

from sys import stdout
LOGURU_FORMAT: str = '<green>{time:YYYY-MM-DD HH:mm:ss.SSS}</green> | <level>{level: <16}</level> | <bold>{message}</bold>'
# 这句话很关键避免多次的写入我们的日志
logger.configure(handlers=[{'sink': stdout, 'format': LOGURU_FORMAT}])
# 这个也可以启动避免多次的写入的作用,但是我们的 app:register_logger:40 -无法输出
# logger.remove()
# 错误日志不需要压缩
format = " {time:YYYY-MM-DD HH:mm:ss:SSS} | process_id:{process.id} process_name:{process.name} | thread_id:{thread.id} thread_name:{thread.name} | {level} |\n {message}"
# enqueue=True表示 开启异步写入
# 使用 rotation 参数实现定时创建 log 文件,可以实现每天 0 点新创建一个 log 文件输出了
logger.add(err_log_file_path, format=format, rotation='00:00', encoding='utf-8', level='ERROR', enqueue=True) # Automatically rotate too big file
# 对应不同的格式
format2 = " {time:YYYY-MM-DD HH:mm:ss:SSS} | process_id:{process.id} process_name:{process.name} | thread_id:{thread.id} thread_name:{thread.name} | {level} | {message}"

# enqueue=True表示 开启异步写入
# 使用 rotation 参数实现定时创建 log 文件,可以实现每天 0 点新创建一个 log 文件输出了
logger.add(log_file_path, format=format2, rotation='00:00', compression="zip", encoding='utf-8', level='INFO', enqueue=True) # Automatically rotate too big file

PS:注意点,再使用这个日志处理器的时候,下面句很关键避免多次的写入我们的日志,没有这个的话,会在记录日志的时候重复写入多次!

logger.configure(handlers=[{‘sink’: stdout, ‘format’: LOGURU_FORMAT}])

  • 定义日志记录对象:

为什么需要这样的处理,因为之前几篇文章,我也有叙说过,关于fastapi在中间件处理日志的时候问题,使用中间件的方式记录日志的话,我们的无法对
request: Request 二次使用,而我的自己对日志需求就是如下示例:

1
2
json复制代码2021-08-03 10:21:14:718 | process_id:24088 process_name:MainProcess | thread_id:2684 thread_name:MainThread | INFO | {"traceid": "DKVNm2JsxA2wVhY2hvFSQa", "trace_index": 1, "event_type": "request", "msg": {"useragent": {"os": "Windows 10", "browser": "QQ Browser 10.8.4405", "device": {"family": "Other", "brand": null, "model": null}}, "url": "/check", "method": "GET", "ip": "127.0.0.1", "params": {}, "ts": "2021-08-03 10:21:14"}}
2021-08-03 10:21:14:719 | process_id:24088 process_name:MainProcess | thread_id:2684 thread_name:MainThread | INFO | {"traceid": "DKVNm2JsxA2wVhY2hvFSQa", "trace_index": 2, "event_type": "response", "msg": {"status_code": 200, "cost_time": "0.00", "rsp": "ok", "ts": "2021-08-03 10:21:14"}}

因为我的请求日志和记录响应的日志是分开记录,所以我上面处理方式和某大佬提出的是有点出路的。
基于我自己上面那种需求,所以我就分开记录,分开记录就需要一个 thread_id:所有有下面使用自定义ContextLogerRoute的实现:

文件名称:

1
复制代码contexr_logger_route.py

文件内容:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
python复制代码from time import perf_counter
from loguru import logger
from fastapi import APIRouter, FastAPI, Request, Response, Body
from fastapi.routing import APIRoute
from typing import Callable, List
from fastapi.responses import Response
from apps.utils import json_helper
import uuid
import shortuuid
from datetime import datetime
from user_agents import parse
from urllib.parse import parse_qs


# 因为Fastapi无法再中间二次消费请求的问题,只能通过自定义的路由的方式来进行日志的记录

class ContextLogerRoute(APIRoute):
pass
# 配置需要特殊记录的请求的头的值的信息
nesss_access_heads_keys = []

# 封装一下关于记录序号的日志记录用于全链路的日志请求的日志
@staticmethod
async def async_trace_add_log_record(request: Request, event_type='', msg={}, remarks=''):
'''

:param event_type: 日志记录事件描述
:param msg: 日志记录信息字典
:param remarks: 日志备注信息
:return:
'''

# print("我当前的请求ID:",request.app.state.curr_request,id(request.app.state.curr_request))
# print("我当前的请求ID:", request,id(request))
#
# print("我当前的请求ID:", request.app.state.curr_request.state.ssss)
# print("我当前的请求ID:", request.state.ssss)



# 如果没有这个标记的属性的,说明这个接口的不需要记录啦!
if hasattr(request.state, 'traceid'):
# 自增编号索引序
trace_links_index = request.state.trace_links_index = getattr(request.state, 'trace_links_index') + 1
log = {
# 自定义一个新的参数复制到我们的请求上下文的对象中
'traceid': getattr(request.state, 'traceid'),
# 定义链路所以序号
'trace_index': trace_links_index,
# 时间类型描述描述
'event_type': event_type,
# 日志内容详情
'msg': msg,
# 日志备注信息
'remarks': remarks,

}
# 为少少相关记录,删除不必要的为空的日志内容信息,
if not remarks:
log.pop('remarks')
if not msg:
log.pop('msg')
try:
log_msg = json_helper.dict_to_json_ensure_ascii(log) # 返回文本
logger.info(log_msg)
except:
logger.info(getattr(request.state, 'traceid') + ':索引:' + str(getattr(request.state, 'trace_links_index')) + ':日志信息写入异常')

async def _init_trace_start_log_record(self, request: Request):
'''
请求记录初始化
:return:
'''

# 配置当前的清除的上下文对象
# request.app.


path_info = request.url.path
if path_info not in ['/favicon.ico'] and 'websocket' not in path_info:
if request.method != 'OPTIONS':
# 追踪索引
request.state.trace_links_index = 0
# 追踪ID
# request.traceid = str(uuid.uuid4()).replace('-', '')
request.state.traceid = shortuuid.uuid()
# 计算时间
request.state.start_time = perf_counter()
# 获取请求来源的IP,请求的方法
ip, method, url = request.client.host, request.method, request.url.path
# print('scope', request.scope)
# 先看表单有没有数据:
try:
body_form = await request.form()
except:
body_form = None

body = None
try:
body_bytes = await request.body()
if body_bytes:
try:
body = await request.json()
except:
pass
if body_bytes:
try:
body = body_bytes.decode('utf-8')
except:
body = body_bytes.decode('gb2312')
except:
pass

# 从头部里面获取出对应的请求头信息,用户用户机型等信息获取
user_agent = parse(request.headers["user-agent"])
browser = user_agent.browser.version
if len(browser) >= 2:
browser_major, browser_minor = browser[0], browser[1]
else:
browser_major, browser_minor = 0, 0

user_os = user_agent.os.version
if len(user_os) >= 2:
os_major, os_minor = user_os[0], user_os[1]
else:
os_major, os_minor = 0, 0

log_msg = {
# 'headers': str(request.headers),
# 'user_agent': str(request.user_agent),
# 记录请求头信息----如果需要特殊的获取某些请求的记录则做相关的配置即可
'headers': [request.headers.get(i, '') for i in self.nesss_access_heads_keys] if self.nesss_access_heads_keys else None,
# 记录请求URL信息
"useragent":
{
"os": "{} {}".format(user_agent.os.family, user_agent.os.version_string),
'browser': "{} {}".format(user_agent.browser.family, user_agent.browser.version_string),
"device": {
"family": user_agent.device.family,
"brand": user_agent.device.brand,
"model": user_agent.device.model,
}
},
'url': url,
# 记录请求方法
'method': method,
# 记录请求来源IP
'ip': ip,
# 'path': request.path,
# 记录请求提交的参数信息
'params': {
'query_params': parse_qs(str(request.query_params)),
'from': body_form,
'body': body
},
# 记录请求的开始时间
"ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
# 'start_time': f'{(start_time)}',
}
if not log_msg['headers']:
log_msg.pop('headers')

if not log_msg['params']['query_params']:
log_msg['params'].pop('query_params')
if not log_msg['params']['from']:
log_msg['params'].pop('from')
if not log_msg['params']['body']:
log_msg['params'].pop('body')
# 执行写入--日志具体的内容信息
await self.async_trace_add_log_record(request, event_type='request', msg=log_msg)

async def _init_trace_end_log_record(self, request: Request, response: Response):

# https://stackoverflow.com/questions/64115628/get-starlette-request-body-in-the-middleware-context
# 如果响应图的类型,仅仅记录字符串类型的结尾的日志信息
if 'image' not in response.media_type and hasattr(request.state, 'traceid'):
start_time = getattr(request.state, 'start_time')
end_time = f'{(perf_counter() - start_time):.2f}'
# 获取响应报文信息内容
rsp = None
if isinstance(response, Response):
rsp = str(response.body, encoding='utf-8')
log_msg = {
# 记录请求耗时
"status_code": response.status_code,
'cost_time': end_time,
# 记录请求响应的最终报文信息--eval的作用是去除相关的 转义符号 ""ok""===》ok
'rsp': json_helper.json_to_dict(rsp),
"ts": f'{datetime.now():%Y-%m-%d %H:%M:%S%z}'
}
await self.async_trace_add_log_record(request, event_type='response', msg=log_msg)

def get_route_handler(self) -> Callable:
original_route_handler = super().get_route_handler()

# 自定义路由的方式内容
async def custom_route_handler(request: Request) -> Response:
# 请求前的处理-日志的初始化操作
await self._init_trace_start_log_record(request)

response: Response = await original_route_handler(request)

# 一个API请求处理完成后的-日志收尾记录
await self._init_trace_end_log_record(request, response)

return response

return custom_route_handler

有了上面的插件之后,那接下里就可以直接再app对象里面注册了.

图片.png

有了上面仅仅是对我们的日志对象和我们日志信息的配置的初始化,下一步我们还需要对应我们的路由的日志也进行初始化:

image.png

给我们的路由添加自定义的路由实现,用于请求日志的记录:
image.png

有了上面的注册时候,如果在有需要记录日志的地方只需要使用:

1
2
ini复制代码
await ContextLogerRoute.async_trace_add_log_record(self.app.state.curr_request, event_type='Third party interface', msg=info_interface)

验证示例如下:

image.png

从上面看我们可以把整个请求的链路日志给记录下来了!

2.2.2 注册全局异常捕获信息

全局异常处理器,对于统一异常的拦截捕获和统一响应报文,是非常关键滴!

2.2.2.1 异常类插件-扩展位置:

image.png

2.2.2.2 异常类插件- 注册方式:

image.png

思考,其实我们的所有的插件扩展思路无法就是把我们app对象注册到我们的扩展中,让他们可以在我们的扩展里面引用app对象相关调用,所以顺着这个思路其实我们扩展我们所谓的插件,这样就可以很方面定义自己需要东西了!

如我们的全局异常的对象主要使用的我们的app的异常拦截的,所以我们的可以自定义自己的一个异常的捕获类,来处理相关的所有的异常。

2.2.2.3 异常类插件- 类实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
python复制代码#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : __init__.py
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/7/15
-------------------------------------------------
修改描述-2021/7/15:
-------------------------------------------------
"""

from fastapi import FastAPI, Request
from apps.response.json_response import *
from starlette.exceptions import HTTPException as StarletteHTTPException
from fastapi.exceptions import HTTPException as FastapiHTTPException
from fastapi.exceptions import RequestValidationError
from pydantic.errors import *
from apps.ext.logger import logger
import traceback
from apps.utils.singleton_helper import Singleton


@Singleton
class ApiExceptionHandler():
def __init__(self, app=None, *args, **kwargs):
super().__init__(*args, **kwargs)
if app is not None:
self.init_app(app)

def init_app(self, app: FastAPI):

# @app.exception_handler(StarletteHTTPException)
# @app.exception_handler(RequestValidationError)
# @app.exception_handler(Exception)
app.add_exception_handler(Exception, handler=self.all_exception_handler)
# 捕获StarletteHTTPException返回的错误异常,如返回405的异常的时候,走的是这个地方
app.add_exception_handler(StarletteHTTPException, handler=self.http_exception_handler)
app.add_exception_handler(RequestValidationError, handler=self.validation_exception_handler)

async def validation_exception_handler(self, request: Request, exc: RequestValidationError):
# print("参数提交异常错误selfself", exc.errors()[0].get('loc'))
# 路径参数错误
# 判断错误类型
if isinstance(exc.raw_errors[0].exc, IntegerError):
pass
elif isinstance(exc.raw_errors[0].exc, MissingError):
pass
return ParameterException(http_status_code=400, api_code=400, message='参数校验错误', result={
"detail": exc.errors(),
"body": exc.body
})

async def all_exception_handler(self, request: Request, exc: Exception):
'''
全局的捕获抛出的HTTPException异常,注意这里需要使用StarletteHTTPException的才可以
:param request:
:param exc:
:return:
'''
# log_msg = f"捕获到系统错误:请求路径:{request.url.path}\n错误信息:{traceback.format_exc()}"
if isinstance(exc, StarletteHTTPException) or isinstance(exc, FastapiHTTPException):
if exc.status_code == 405:
return MethodnotallowedException()
if exc.status_code == 404:
return NotfoundException()
elif exc.status_code == 429:
return LimiterResException()
elif exc.status_code == 500:
return InternalErrorException()
elif exc.status_code == 400:
# 有部分的地方直接的选择使用raise的方式抛出了异常,这里也需要进程处理
# raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token')
return BadrequestException(msg=exc.detail)

return BadrequestException()
else:
# 其他内部的异常的错误拦截处理
logger.exception(exc)
traceback.print_exc()
return InternalErrorException()

async def http_exception_handler(self, request: Request, exc: StarletteHTTPException):
'''
全局的捕获抛出的HTTPException异常,注意这里需要使用StarletteHTTPException的才可以
:param request:
:param exc:
:return:
'''
# 这里全局监听了我们的所有的HTTP响应,包括了200 的也会尽到这里来!
# log_msg = f"捕获到系统错误:请求路径:{request.url.path}\n错误信息:{traceback.format_exc()}"

if exc.status_code == 405:
return MethodnotallowedException()
if exc.status_code == 404:
return NotfoundException()
elif exc.status_code == 429:
return LimiterResException()
elif exc.status_code == 500:
return InternalErrorException()
elif exc.status_code == 400:
# 有部分的地方直接的选择使用raise的方式抛出了异常,这里也需要进程处理
# raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token')
return BadrequestException(msg=exc.detail)

PS:上面实现其实可以使用两种方式,一个是定义函数的方式,然后通过add_exception_handler处理我们的异常;另一种其实就是直接装饰器的方式,为了清晰点我们的采取的第一种方案!

2.2.2.4 异常类插件- 验证:

验证函数故意引发异常:

image.png
结果:

image.png

异常引发捕获位置:

image.png

对应的引发异常响应报文定义:

image.png

2.2.2.5 补充全局响应报文定义):

  • 定义的位置:
    image.png
    默认其实是对JSON解析的有三种,我们为方便分开了三个,这个只显示一个!
  • json_response.py
1
ini复制代码

#!/usr/bin/evn python

-- coding: utf-8 --

“””

文件名称 : json_response
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/7/15


修改描述-2021/7/15:

“””

from typing import Any, Dict, Optional

自定义返回的错误的响应体信息

ORJSONResponse一依赖于:orjson

from fastapi.responses import JSONResponse

import time
from fastapi.encoders import jsonable_encoder

class ApiResponse(JSONResponse):
# 定义返回响应码–如果不指定的话则默认都是返回200
http_status_code = 200
# 默认成功
api_code = 0
# 默认Node.如果是必选的,去掉默认值即可
result: Optional[Dict[str, Any]] = None # 结果可以是{} 或 []
message = ‘成功’
success = True
timestamp = int(time.time() * 1000)

def __init__(self, success= None, http_status_code=None, api_code=None, result=None, message=None, **options):

    if result:
        self.result = result
    if message:
        self.message = message

    if api_code:
        self.api_code = api_code

    if success != None:
        self.success = success

    if http_status_code:
        self.http_status_code = http_status_code

    # 返回内容体
    body = dict(
        message=self.message,
        code=self.api_code,
        success=self.success,
        result=self.result,
        timestamp=self.timestamp,
        # 形如request="POST v1/client/register"
        # request=request.method + ' ' + self.get_url_no_param()
    )

    # customize_headers = {
    #     # 'Access-Control-Allow-Origin': '*',
    #     # access-control-allow-methods: DELETE, GET, OPTIONS, PATCH, POST, PUT
    #     'access-control-allow-methods': 'DELETE, GET, OPTIONS, PATCH, POST, PUT',
    #     'Access-Control-Allow-Origin': '*',
    #     'Access-Control-Allow-Headers': '*,X-Access-Token,School-Teacher-Token,school-teacher-token,T-Access-Token,x-access-token,Referer, Accept, Origin, User-Agent,X-Requested-With, Content-Type, X-File-Name',
    #     # 'Access-Control-Request-Headers': 'Content-Type,Access-Token',
    #     'Content-Type': 'application/json;charset=UTF-8'
    # }

    # jsonable_encoder 处理不同字符串返回  比如时间戳 datatime类型的处理
    super(ApiResponse, self).__init__(status_code=self.http_status_code, content=jsonable_encoder(body), **options)

# 这个render会自动调用,如果这里需要特殊的处理的话,可以重写这个地方
# def render(self, content: Any) -> bytes:
#
#     return dict_to_json_ensure_ascii_indent(content)

class BadrequestException(ApiResponse):
http_status_code = 400
api_code = 10031
result = None # 结果可以是{} 或 []
message = ‘错误的请求’
success = False

class LimiterResException(ApiResponse):
http_status_code = 429
api_code = 429
result = None # 结果可以是{} 或 []
message = ‘访问的速度过快’
success = False

class ParameterException(ApiResponse):
http_status_code = 400
result = {}
message = ‘参数校验错误,请检查提交的参数信息’
api_code = 10031
success = False

class UnauthorizedException(ApiResponse):
http_status_code = 401
result = {}
message = ‘未经许可授权’
api_code = 10032
success = False

class ForbiddenException(ApiResponse):
http_status_code = 403
result = {}
message = ‘失败!当前访问没有权限,或操作的数据没权限!’
api_code = 10033
success = False

class NotfoundException(ApiResponse):
http_status_code = 404
result = {}
message = ‘访问地址不存在’
api_code = 10034
success = False

class MethodnotallowedException(ApiResponse):
http_status_code = 405
result = {}
message = ‘不允许使用此方法提交访问’
api_code = 10034
success = False

class OtherException(ApiResponse):
http_status_code = 800
result = {}
message = ‘未知的其他HTTPEOOER异常’
api_code = 10034
success = False

class InternalErrorException(ApiResponse):
http_status_code = 500
result = {}
message = ‘程序员哥哥睡眠不足,系统崩溃了!’
api_code = 500
success = False

class InvalidTokenException(ApiResponse):
http_status_code = 401
api_code = 401
message = ‘很久没操作,令牌失效’
success = False

class ExpiredTokenException(ApiResponse):
http_status_code = 422
message = ‘很久没操作,令牌过期’
api_code = 10050
success = False

class FileTooLargeException(ApiResponse):
http_status_code = 413
api_code = 413
result = None # 结果可以是{} 或 []
message = ‘文件体积过大’

class FileTooManyException(ApiResponse):
http_status_code = 413
message = ‘文件数量过多’
api_code = 10120
result = None # 结果可以是{} 或 []

class FileExtensionException(ApiResponse):
http_status_code = 401
message = ‘文件扩展名不符合规范’
api_code = 10121
result = None # 结果可以是{} 或 []

class Success(ApiResponse):
http_status_code = 200
api_code = 200
result = None # 结果可以是{} 或 []
message = ‘自定义成功返回’
success = True

class Fail(ApiResponse):
http_status_code = 200
api_code = 200
result = None # 结果可以是{} 或 []
message = ‘自定义成功返回’
success = False

1
2


注意点:

上面的一个地方是引入jsonable_encoder
解决的事是部分的json数据类型的的问题!

2.2.3 全局配置跨域设置

这个跨域比较简单,这里不展开叙述,这里提一点就是可以针对某个地址ijnx跨域设置!通过设置:allow_origins来设置自持跨域的白名单。

2.2.4 注册全局中间件的注册

通常全局中间件处理一般对处理认证的,因为大部分的认证都是所有的接口的,所以为了方便,使用中间件的方式进行认证处理是最简单的方式。

image.png

这里我们使用全局认证的插件来说明:

2.2.4.1 全局中间件-认证中间件位置:

image.png

2.2.4.2 全局中间件-注册方式:

image.png

2.2.4.3 全局中间件-实现类:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
python复制代码#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : auth
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/6/7
-------------------------------------------------
修改描述-2021/6/7:
-------------------------------------------------
"""
from time import perf_counter

from loguru import logger
from starlette.middleware.base import BaseHTTPMiddleware
from starlette.requests import Request
from apps.config.auth_conf import auth as auth_conf
from apps.response.json_response import ForbiddenException, InvalidTokenException, ExpiredTokenException, JSONResponse
from apps.ext.jwt.simple_auth import SimpleAuth as Auth
from fastapi import HTTPException
from starlette.status import HTTP_400_BAD_REQUEST


class AuthMiddleware(BaseHTTPMiddleware):

def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
# 认证的形式 1:使用默认的,2 使用自定义的
self.auth_type = 2

def check_auth_token(self, request):
'''
第一步:检测URL地址和携带认证请求头字段信息
:param request:
:return:
'''
# 如果是使用系统自带的认证的虎,它的需要的认证请求头是必须是Authorization的这个的,当然也可以自定义,不过还不知道咋玩
while_auth_ulr = auth_conf.ADMIN_WHILE_ROUTE

# 只有不在白名单的地址需要进行认证的授权的校验
if request.url.path not in while_auth_ulr and 'sys/randomImag' not in request.url.path and 'docs' not in request.url.path:
if self.auth_type == 1:
token = request.headers.get('Authorization', None)
if not token:
return ForbiddenException()
else:
# 从头部提取关键的授权码信息
token = request.headers.get('X-Access-Token', None)
if not token:
# 从get里面进行提取
return ForbiddenException()
# 下面这种方式,会到全局异常捕获那进行处理
# raise HTTPException(HTTP_400_BAD_REQUEST, 'Invalid token')

return token

def authenticate_credentials(self, token):
'''
第2步:检测URL地址和携带认证请求头字段信息
:param token:
:return:
'''
isok, state, token_userinfo_result = Auth.verify_bearer_token_state(token=token)
if not isok and state == 1:
return InvalidTokenException()
if not isok and state == 2:
return ExpiredTokenException()

return token_userinfo_result

async def authenticate_credentials_user_info(self, token_userinfo_result):
'''
进行TOken内部的包含的用户信息的验证
:param token:
:return:
'''
isok, isstatus = False, 2

if not isok:
return ForbiddenException(msg='该用户已经不存在,请联系管理员!')
# 用户状态(1-正常,2-冻结)
# if isstatus.get('status') == 2:
# return ForbiddenException(msg='该用户已经被冻结,请联系管理员!')

async def dispatch(self, request: Request, call_next):
# # ---协程对象的返回-使用方法封装后---返回值的处理需要使用这样方式进行---注意返回的时候处理
# if isinstance(token_result, JSONResponse):
#
# return token_result
# 1:检测是否协调认证信息,没有则返回错误提示,有则返回对应的Token的值


# 如果是使用系统自带的认证的虎,它的需要的认证请求头是必须是Authorization的这个的,当然也可以自定义,不过还不知道咋玩
while_auth_ulr = auth_conf.ADMIN_WHILE_ROUTE

# print('while_auth_ulr',while_auth_ulr)

# 只有不在白名单的地址需要进行认证的授权的校验
# print("鉴权出来11111111111111111")
# print('aaaaaaaaawhile_auth_ulr', while_auth_ulr)

if request.scope["method"]!='OPTIONS' and request.url.path not in while_auth_ulr and 'sys/randomImag' not in request.url.path and 'docs' not in request.url.path:
if self.auth_type == 1:
token = request.headers.get('Authorization', None)
if not token:
return ForbiddenException()
else:
# 从头部提取关键的授权码信息
# print("鉴权出来11111111111111111")
token = request.headers.get('X-Access-Token', None)
# print("鉴权出来11111111111111111",token)
if not token:
# 从get里面进行提取
return ForbiddenException()
isok, state, token_userinfo_result = Auth.verify_bearer_token_state(token=token)
if not isok and state == 1:
return InvalidTokenException()
if not isok and state == 2:
return ExpiredTokenException()

# 写入当前请求上下的当前对象
request.state.token_userinfo_result = token_userinfo_result


response = await call_next(request)

return response

2.2.4.3 全局中间件-认证验证:

新增一个接口,没加入白名单:

image.png

开启我们的中间件热的认证:

image.png

访问没加白的地址:

image.png

2.2.5 注册全局的启动和关闭事件

这个地方其实没什么需要特殊的全局处理,所以没什么内容,当然如果后续的如数据库的地方,就需要处理下,但是不是使用这种方式来处理了!而是也是插件的方式来处理!
所以这里没什么内容需要展开的,或者可以忽略!

image.png

2.2.6 注册全局第三方扩展插件实例

这里第三方的扩展的示例,可以根据自身的需要是否需要再这里进行注册实例化。

我这里的话其实可以选择实例化,比如实例化一个自定义实现了async_client,这样可以就可以全局再其他地址直接使用这个对象实例了!

2.2.6.1 插件 AsynClientSession 实例-位置:

image.png

2.2.6.2 插件 AsynClientSession 实例-注册:

image.png

2.2.6.3 插件 AsynClientSession 实例-类实现:

这个自定义实现,主要是增加第三方接口请求的时候相关的日志,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
python复制代码#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : __init__.py
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/7/16
-------------------------------------------------
修改描述-2021/7/16: Http异步的客户端的请求日志封装
-------------------------------------------------
"""
from dataclasses import dataclass
from aiohttp import ClientSession
from dotmap import DotMap
import traceback
import aiohttp
# 加入日志记录
from apps.ext.logger.contexr_logger_route import ContextLogerRoute
from fastapi import Request,FastAPI
from apps.utils.singleton_helper import Singleton
from urllib.parse import parse_qs
from fastapi import FastAPI
@Singleton
@dataclass
class AsynClientSession():
pass

def __init__(self, aiohttp_session: ClientSession = None,app: FastAPI=None):
self.session = aiohttp_session

# 如果有APPC传入则直接的进行初始化的操作即可
if app is not None:
self.init_app(app)

def init_app( self,app: FastAPI):
self.app = app

async def request(self,api_url, method='GET', headers={},islogrecord=False, params=None):
try:
if islogrecord and not getattr(self.app.state,'curr_request'):
raise Exception('需传入FastapiApp对象,并需要注册全局设置请求体对象的上下文中间件')

if not self.session:
# 使用with会自动的关闭链接-Unclosed client session
async with aiohttp.ClientSession() as session:
async with session.request(url=api_url, method=method, headers=headers, params=params) as resp:
# 处理抛出异常状态又
resp.raise_for_status()
if resp.status in (401, 403):
raise Exception("接口请求异常!401或403错误")
# print('resp.content_type',resp.content_type)


try:
response = await resp.json()
except:
response = await resp.text()


# 日志记录
if islogrecord and self.app:
info_interface = {
'url': api_url,
'method': method,
'headers': str(headers) if headers else '',
'params': parse_qs(str(params)),
'state_code': str(resp.status),
'result': response,
}
await ContextLogerRoute.async_trace_add_log_record(self.app.state.curr_request, event_type='Third party interface', msg=info_interface)

else:
async with self.session.request(url=api_url, method=method, headers=headers, params=params) as resp:
# 处理抛出异常状态又
resp.raise_for_status()
if resp.status in (401, 403):
raise Exception("接口请求异常!401或403错误")
response = await resp.json()
# 需要手动的进行关闭
await self.session.close()
return response
except Exception:
traceback.print_exc()



async_client= AsynClientSession()

if __name__ == '__main__':
from asyncio import run

async def main():
results = await async_client.request(api_url='http://127.0.0.1:8080/check',islogrecord=False)
print(results)


run(main())

2.2.6.4 插件 AsynClientSession 实例-验证:

image.png

查看请求记录日志信息:
image.png

2.2.7 批量导入注册路由

因为我们的一个项目里面可能包含的路由比较多,如果一个一个的去

1
scss复制代码app.include_router(router)

我个人是不太喜欢这种方式!所以我自己参考了以前flask的模式,进行批量的导入注册,
也就是说,寻找某个模块下某个实例对象属性,进行统一的

1
scss复制代码app.include_router(router)

具体的实现方式如下步骤:

2.2.7.1 指定导入模块:

image.png

image.png

2.2.7.2 批量导入工具类-指定导入项目目录:

image.png

知道了要查询的目录,然后遍历下面有没有关于

1
复制代码bp

实例属性,然后动态进行挂载:

1
2
3
4
ini复制代码router = getattr(module, key_attribute)
# 已经全局挂载还需要吗?
# router.route_class = ContextLogerRoute
app.include_router(router)

2.2.7.3 批量导入工具类-实现:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
python复制代码from fastapi import FastAPI, FastAPI
from fastapi import APIRouter
from ZtjDirImport import DirImport
from apps.utils.modules_helper import find_modules, import_string


def print_all_routes_info(app: FastAPI):
for ro in app.routes:
print('name:', ro.name, '=====>', 'path:', ro.path)


# 通过模块的属性值来导入---注意是模块 不是__init
def register_nestable_blueprint(app=None, project_name=None, api_name='api', key_attribute='bp', hongtu='hongtu'):
'''
自动的导入的蓝图模块
:param app:
:return:
'''
if not app:
import warnings
warnings.warn('路由注册失败,需要传入Flask对象实例')
return None
if project_name:
# include_packages 这个设置为True很关键,它包含了 检测 对于的_init__内的属性,这个对于外层的遍历的来说很关键
modules = find_modules(f'{project_name}.{api_name}', include_packages=True, recursive=True)
for name in modules:
module = import_string(name)
if hasattr(module, key_attribute):
# app.register_blueprint(module.mmpbp)
# lantu = getattr(module,key_attribute)
# print('sdasda',getattr(module,key_attribute).__dict__)
app.include_router(getattr(module, key_attribute))
# app.register_blueprint(getattr(module,key_attribute))
if hasattr(module, hongtu): pass
# print('符合紅土', name)
# getattr(module, hongtu).register(lantu)

else:
import warnings
warnings.warn('路由注册失败,外部项目名称还没定义')


def register_nestable_blueprint_for_log(app=None, project_name=None, api_name='api',scan_name='api', key_attribute='bp', hongtu='hongtu'):
'''
自动的导入的蓝图模块
:param app:
:return:
'''
if not app:
import warnings
warnings.warn('路由注册失败,需要传入Fastapi对象实例')
return None
if project_name:
# include_packages 这个设置为True很关键,它包含了 检测 对于的_init__内的属性,这个对于外层的遍历的来说很关键
modules = find_modules(f'{project_name}.{api_name}', include_packages=True, recursive=True)
from apps.ext.logger.contexr_logger_route import ContextLogerRoute
for name in modules:
module = import_string(name)
# 只找某个模块开始的,避免无意义的其他扫描
if not name.endswith(scan_name):
continue

if hasattr(module, key_attribute):
# app.register_blueprint(module.mmpbp)
# lantu = getattr(module,key_attribute)
router = getattr(module, key_attribute)
# 已经全局挂载还需要吗?
# router.route_class = ContextLogerRoute
app.include_router(router)
# app.register_blueprint(getattr(module,key_attribute))
if hasattr(module, hongtu): pass
# print('符合紅土', name)
# getattr(module, hongtu).register(lantu)

else:
import warnings
warnings.warn('路由注册失败,外部项目名称还没定义')

关于里面使用到的

1
javascript复制代码from apps.utils.modules_helper import find_modules, import_string

主要是使用flask的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
python复制代码import sys
# from werkzeug.utils import find_modules, import_string
import pkgutil


def import_string(import_name, silent=False):
"""Imports an object based on a string. This is useful if you want to
use import paths as endpoints or something similar. An import path can
be specified either in dotted notation (``xml.sax.saxutils.escape``)
or with a colon as object delimiter (``xml.sax.saxutils:escape``).

If `silent` is True the return value will be `None` if the import fails.

:param import_name: the dotted name for the object to import.
:param silent: if set to `True` import errors are ignored and
`None` is returned instead.
:return: imported object
"""
# force the import name to automatically convert to strings
# __import__ is not able to handle unicode strings in the fromlist
# if the module is a package
import_name = str(import_name).replace(":", ".")
try:
try:
__import__(import_name)
except ImportError:
if "." not in import_name:
raise
else:
return sys.modules[import_name]

module_name, obj_name = import_name.rsplit(".", 1)
module = __import__(module_name, globals(), locals(), [obj_name])
try:
return getattr(module, obj_name)
except AttributeError as e:
raise ImportError(e)

except ImportError as e:

print("导入异常", e)


def find_modules(import_path, include_packages=False, recursive=False):
"""Finds all the modules below a package. This can be useful to
automatically import all views / controllers so that their metaclasses /
function decorators have a chance to register themselves on the
application.

Packages are not returned unless `include_packages` is `True`. This can
also recursively list modules but in that case it will import all the
packages to get the correct load path of that module.

:param import_path: the dotted name for the package to find child modules.
:param include_packages: set to `True` if packages should be returned, too.
:param recursive: set to `True` if recursion should happen.
:return: generator
"""
module = import_string(import_path)
path = getattr(module, "__path__", None)
if path is None:
raise ValueError("%r is not a package" % import_path)
basename = module.__name__ + "."
for _importer, modname, ispkg in pkgutil.iter_modules(path):
modname = basename + modname
if ispkg:
if include_packages:
yield modname
if recursive:
for item in find_modules(modname, include_packages, True):
yield item
else:
yield modname


def get_modules(package="."):
"""
获取包名下所有非__init__的模块名
"""
import os
modules = []
files = os.listdir(package)
for file in files:
if not file.startswith("__"):
name, ext = os.path.splitext(file)
modules.append(name)
print("名称", name)

return modules

2.2.7.4 批量导入工具类-最终效果:

项目接口的定义:

image.png

接口定义示例:

image.png

不需要太多的干预处理直接的批量导入相关API接口定义:

image.png

以上就是关于路由批量导入的一些简要说明,后续有机会再针对这个定义API再展开一下!

2.3 插件异步redis示例扩展

我们的异步redis也是已插件的方式注册和实例化,主要的是以

image.png
上面这种方式引入的我们的app对象来接管注册对应的钩子函数,并在钩子函数处理相关事件处理。

2.3.1 插件定义位置:

image.png

2.3.1 插件类实现定义:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
python复制代码#!/usr/bin/evn python
# -*- coding: utf-8 -*-
"""
-------------------------------------------------
文件名称 : __init__.py
文件功能描述 : 功能描述
创建人 : 小钟同学
创建时间 : 2021/7/27
-------------------------------------------------
修改描述-2021/7/27:
-------------------------------------------------
# await app.state.redis.set("my-key", "valueaaaaaaaaaaaa")
# value = await app.state.redis.get("my-key")
# print(value)
# print("HASH字典的操作")
# await self.hmset_dict("hash", key1="value1", key2="value2", key3=123)
# result = await self.hgetall("hash")
# print("HASH字典的操作",result)

# result = await self.add_str_ex('sdsds','sssssssssssssss')
# print(result)
# value = await app.state.redis.get("sdsds")
# print(value)
"""
from aioredis import Redis, create_redis_pool, create_sentinel
from apps.config.redis_conf import redisconf
from typing import Tuple, Any
from fastapi import FastAPI
from apps.utils.singleton_helper import Singleton
from contextlib import asynccontextmanager
import asyncio
import json
import datetime
from typing import Set, Any, Optional

# from functools import cached_property, lru_cache
# Python 3.8的cached_property

@Singleton
class AsyncRedisClient():

def __init__(self, app: FastAPI = None):
# 如果有APPC传入则直接的进行初始化的操作即可
self.redis = None
if app is not None:
self.init_app(app)

def init_app(self, app: FastAPI):
self.app = app

@app.on_event("startup")
async def startup_event():
app.state.redis = await self.init_redis_pool()

# 初始化缓冲器对象
from apps.ext.cache import FastAPICache
from apps.ext.cache.backends.redis import RedisBackend
FastAPICache.init(RedisBackend(self.redis_db), prefix="xxx-cache")

# 登入状态集
await self.setbit('login_status', 100010, 1)
await self.setbit('login_status', 100011, 1)

erer = await self.getbit('login_status', 100011)
print('100011的在线状态', erer)
erer = await self.getbit('login_status', 100012)
print('100012的在线状态', erer)

erer = await self.getbit('login_status', 100010)
print('100010的在线状态', erer)
await self.setbit('login_status', 100010, 0)

erer = await self.getbit('login_status', 100010)
print('100010的在线状态', erer)

# 签到处理
# key 可以设计成 uid:sign:{userId}:{yyyyMM},月份的每一天的值 - 1 可以作为 offset(因为 offset 从 0 开始,所以 offset = 日期 - 1)。key 可以设计成 uid:sign:{userId}:{yyyyMM},月份的每一天的值 - 1 可以作为 offset(因为 offset 从 0 开始,所以 offset = 日期 - 1)。
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 15, 1)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 14, 1)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 13, 1)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 12, 1)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 11, 1)
erer = await self.getbit('uid:sign:{0}:{1}'.format(100010, 202105), 15)
print('100010在202105的16号的签到情况', erer)
erer = await self.getbit('uid:sign:{0}:{1}'.format(100010, 202105), 11)
print('100010在202105的16号的签到情况', erer)
erer = await self.bitcount('uid:sign:{0}:{1}'.format(100010, 202105))
print('100010在202105签到总次数', erer)

erer = await self.bitpos('uid:sign:{0}:{1}'.format(100010, 202105), 1)
print('100010在202105首次打开的日期,也就是第一次标记为1的bit的位置应该是11》测试一下看看', erer)
await self.setbit('uid:sign:{0}:{1}'.format(100010, 202105), 11, 0)
erer = await self.bitpos('uid:sign:{0}:{1}'.format(100010, 202105), 1)
print('100010在202105首次打开的日期,也就是第一次标记为1的bit的位置应该是11》测试xiugaiwehou一下看看', erer)

erer = await self.set_sign_status(100010)
print('100010在202105签到:', erer)
erer = await self.get_sign_status(100010)
print('100010在202105签到:', erer)

erer = await self.get_user_week_sign_status(100010)
print('100010这周的签到的情况:', erer)

erer = await self.get_user_month_sign_status(100010)
print('100010这月的签到的情况:', erer)


@app.on_event("shutdown")
async def shutdown_event():
app.state.redis.close()
await app.state.redis.wait_closed()

async def get_redis(self):
return self.redis

# @cached_property
async def init_redis_pool(self) -> Redis:

if redisconf.use_redis_sentinel:
# 创建哨兵机制模型的下的链接对象
sentinel = await create_sentinel(
[(redisconf.redis_sentinel_url, redisconf.redis_sentinel_port)],
db=redisconf.redis_db,
password=redisconf.redis_password,
encoding="utf-8",
)
self.redis_db = sentinel.master_for(redisconf.redis_sentinel_master_name)
else:
# 创建连接池的方式
self.redis_db = await create_redis_pool(
redisconf.redis_url,
# password=redisconf.redis_password,
# encoding="utf-8",
# db=redisconf.redis_db,
)

# result = await self.set_json('sdsds',{
# 'sdas':2323,
# 'sdas222': {
# '你':'唉是就是基22地'
# }
# })
# print(result)
# result = await self.get_json('sdsds')
# print(result)

return self.redis

async def get_with_ttl(self, key: str) -> Tuple[int, str]:
async with self.redis_db.pipeline(transaction=True) as pipe:
return await (pipe.ttl(key).get(key).execute())

async def get(self, key) -> str:
return await self.redis_db.get(key)

async def set(self, key: str, value: str, expire: int = None):
return await self.redis_db.set(key, value, ex=expire)

async def setex(self, key, seconds, value):
print("ssssssssss")
return await self.redis_db.setex(key, seconds, value)

async def pttl(self, key: str) -> int:
"""Get PTTL from a Key"""
return int(await self.redis_db.pttl(key))

async def ttl(self, key: str) -> int:
"""Get TTL from a Key"""
return int(await self.redis_db.ttl(key))

async def pexpire(self, key: str, pexpire: int) -> bool:

return bool(await self.redis_db.pexpire(key, pexpire))

async def expire(self, key: str, expire: int) -> bool:

return bool(await self.redis_db.expire(key, expire))

async def incr(self, key: str) -> int:
"""Increases an Int Key"""
return int(await self.redis_db.incr(key))

async def decr(self, key: str) -> int:
"""Decreases an Int Key"""
return int(await self.redis_db.decr(key))

async def hmset_dict(self, key, **val) -> str:
return await self.redis_db.hmset_dict(key, **val)

async def hgetall(self, key, ):
return await self.redis_db.hgetall(key, encoding="utf-8")

# 不存在则加入,否则不变
async def add_str_nx(self, key, values): # value可以为复杂的json
return await self.redis_db.setnx(key, values)

# 加入缓存,存在会替换,并加入过期时间
async def add_str_ex(self, key, values, time=10): # value可以为复杂的json
return await self.redis_db.setex(key, time, values)

async def clear(self, namespace: str = None, key: str = None) -> int:
if namespace:
lua = f"for i, name in ipairs(redis.call('KEYS', '{namespace}:*')) do redis.call('DEL', name); end"
return await self.redis_db.eval(lua, numkeys=0)
elif key:
return await self.redis_db.delete(key)

async def check_lock(self, key):
"""
检查当前KEY是否有锁
"""
key = 'lock:%s' % key
status = await self.redis_db.get(key)
if status:
return True
else:
return False

async def acquire_lock(self, key, expire=30, step=0.03):
"""
为当前KEY加锁, 默认30秒自动解锁
"""
key = 'lock:%s' % key
while 1:
get_stored = await self.redis_db.get(key)
if get_stored:
await asyncio.sleep(step)
else:
lock = await self.redis_db.setnx(key, 1)
if lock:
await self.redis_db.expire(key, expire)
return True

async def release_lock(self, key):
"""
释放当前KEY的锁
"""
key = 'lock:%s' % key
await self.safe_delete(key)

@asynccontextmanager
async def with_lock(self, key, expire=30, step=0.03):
"""
@desc redis分布式锁封装
:param key: 缓存key
:param expire: 锁失效时间
:param step: 每次尝试获取锁的间隔
:return:
for example:
with RedisCacheProxy().with_lock("key_name") as lock:
"do something"
"""
try:
t = await self.acquire_lock(key, expire, step)
yield t
finally:
await self.release_lock(key)

async def get_many(self, keys: list) -> list:
"""
@desc 批量获取字符串
:params keys: [chan1, char2]
"""
data = await self.redis_db.mget(*keys, encoding="utf-8")
return data

async def set_many(self, data: dict):
"""批量设置字符串缓存"""
data = await self.redis_db.mset(data)
return data

async def get_data(self, key: str) -> str:
"""获取字符串数据并尝试转换json"""
value = await self.redis_db.get(key)
if value:
try:
value = json.loads(value.decode("utf-8"))
except:
pass
return value

async def set_data(self, key: str, value, ex: int = None):
"""尝试转正json字符串存储"""
try:
value = json.dumps(value)
except:
pass
return self.redis_db.set(key, value, ex=ex)

async def delete(self, key):
"""直接删除一个key"""
await self.redis_db.delete(key)

async def safe_delete(self, key: str):
"""失效一个key"""
await self.redis_db.expire(key, -1)

async def delete_many(self, keys: list) -> None:
"""批量key失效"""
await self.redis_db.delete(*keys)

async def exists(self, key: str) -> bool:
"""查询key是否存在"""
data = await self.redis_db.exists(key)
return data

def hget(self, key: str, field: str):
"""获取hash类型一个键值"""
return self.redis_db.hget(key, field)

def hmget(self, key: str, fields: list):
"""
批量获取hash类型键值
:param key:
:param fields:
:return:
"""
return self.redis_db.hmget(key, fields)

async def hget_data(self, key: str, field: str) -> Any:
"""获取hash的单个key"""
data = await self.redis_db.hget(key, field)
return json.loads(data) if data else None

async def hmget_data(self, key: str, fields: list) -> list:
"""
@desc hash类型获取缓存返回一个list
"""
data = await self.redis_db.hmget(key, *fields)
return [json.loads(i) if i is not None else None for i in data]

async def hmget2dict_data(self, key: str, fields: list) -> dict:
"""
@desc hash类型获取缓存返回一个dict,尝试转换json格式
"""
cache_list = await self.redis_db.hmget(key, fields)
return dict(zip(fields, [json.loads(i) if i is not None else None for i in cache_list]))

async def get_json(self, key: str) -> dict:
"""
@desc 获取json格式的字典数据
"""
data = await self.redis_db.hgetall(key)
if data:
return {k: json.loads(v) for k, v in dict(data).items()}
return {}

async def set_json(self, key: str, value: dict, ex: int = None):
"""
@desc 使用hash存贮json结构的数据
:return:
"""
cache_data = []
for k, v in value.items():
cache_data.extend([k, json.dumps(v)])
if not cache_data:
return True
pipe = self.redis_db.pipeline()
pipe.hmset(key, *cache_data)
if ex:
pipe.expire(key, int(ex))
res = await pipe.execute()
return res

async def sadd(self, key: str, values: list) -> int:
"""添加元素"""
if not values:
return 0
count = await self.redis_db.sadd(key, *values)
return count

async def spop(self, key: str, count: int = None) -> list:
"""从集合弹出元素"""
count = 1 if not count else count
values = await self.redis_db.spop(key, count=count)
return values if values else []

async def smembers(self, key: str) -> list:
"""返回一个集合所有元素"""
values = await self.redis_db.smembers(key)
return values if values else []

async def smembers_back_set(self, key: str) -> Set:
"""Gets Set Members"""
return set(await self.redis_connection.smembers(key))

async def scard(self, key: str) -> int:
"""获取一个集合的元素个数"""
count = await self.redis_db.scard(key)
return count

async def zadd(self,key, *args, **kwargs):
# redis zadd操作(批量设置值至args有序集合中)
if not (args or kwargs):
return False
count = await self.redis_db.zadd(key, *args, **kwargs)
return count

async def zrem(self,key, member, *members):
# redis zrem操作(删除name有序集合中的特定元素)
if not key:
return False
count = await self.redis_db.zrem(key,member, *members)
return count

async def zincrby(self,key, name, value, amount=1):
# 如果在key为name的zset中已经存在元素value,则该元素的score增加amount,否则向该集合中添加该元素,其score的值为amount
if not (name or value):
return False
return await self.redis_db.zincrby(key, value, amount)

async def zrevrank(self,key, value):
if not value:
return False
return await self.redis_db.zrevrank(key, value)

async def zscore(self, key,member):
if not member:
return False
return self.redis_db.zscore(key, member)


async def setbit(self, key: str, offset: int, value: int) -> int:
"""
1:设置或者清空 key 的 value 在 offset 处的 bit 值(只能是 0 或者 1)
2:只需要一个 key = login_status 表示存储用户登陆状态集合数据, 将用户 ID 作为 offset,在线就设置为 1,下线设置 0。
3:需要注意的是 offset 从 0 开始
"""
count = await self.redis_db.setbit(key, offset, value)
return count

async def getbit(self, key: str, offset: int) -> int:
"""
1:获取 key 的 value 在 offset 处的 bit 位的值,当 key 不存在时,返回 0。
"""
count = await self.redis_db.getbit(key, offset)
return count

async def bitcount(self, key: Any) -> int:
"""
该指令用于统计给定的 bit 数组中,值 = 1 的 bit 位的数量。
"""
count = await self.redis_db.bitcount(key)
return count

async def bitpos(self, key: Any, bit: Any, start=None, end=None) -> int:
"""
1:返回数据表示 Bitmap 中第一个值为 bitValue 的 offset 位置。
2:在默认情况下, 命令将检测整个位图, 用户可以通过可选的 start 参数和 end 参数指定要检测的范围。
"""
count = await self.redis_db.bitpos(key, bit, start=start, end=end)
return count

# 签到功能的处理
async def set_sign_status(self, user_id: int, _singe_key='sign_in:', day=None, statue=1) -> int:
# 用户签到: 使用日期的来做key
if not day:
day = str(datetime.datetime.now())[:10]
return await self.setbit('{}:{}'.format(_singe_key, day), user_id, statue)

# 获取用户签到的状-当前日志用户今日签到状态,默认是当前的日期
async def get_sign_status(self, user_id: int, _singe_key='sign_in:', day=None) -> int:
if not day:
day = str(datetime.datetime.now())[:10]
return await self.getbit('{}:{}'.format(_singe_key, day), user_id)

# 查询用户求出这个周的签到状况,和总数
async def get_user_week_sign_status(self, user_id: int, _singe_key='sign_in:') -> tuple:
now = datetime.datetime.now()
# 周一是1 周日是7 now.weekday()则是周一是0,周日是6
weekday = now.isoweekday()
pipe = self.redis_db.pipeline()
for d in range(weekday):
check_day = str(now - datetime.timedelta(days=1) * d)[:10]
pipe.getbit('{}:{}'.format(_singe_key, check_day), user_id)
res = await pipe.execute()
return res[::-1],sum(res[::-1])

# 查询用户求出这个月的签到状和总数
async def get_user_month_sign_status(self, user_id: int, _singe_key='sign_in:') -> tuple:
now = datetime.datetime.now()
# 周一是1 周日是7 now.weekday()则是周一是0,周日是6
day = now.day
pipe = self.redis_db.pipeline()
for d in range(day):
check_day = str(now - datetime.timedelta(days=1) * d)[:10]
pipe.getbit('{}:{}'.format(_singe_key, check_day), user_id)
res = await pipe.execute()
return res[::-1],sum(res[::-1])




async_redis_client = AsyncRedisClient()

2.3.1 插件类初始化和验证:

我们再app启动的时候就示例这个redis异步客户端实例,并且进行相关一些验证测试:

image.png

3.总结

关于脚手架内容概要提到几个点,上面讲述的几乎已全部包含了!鉴于文章太长!后续如果可以的话,再继续展开!!!


以上仅仅是个人结合自己的实际需求,做学习的实践笔记!如有笔误!欢迎批评指正!感谢各位大佬!

结尾

END

简书:www.jianshu.com/u/d6960089b…

掘金:juejin.cn/user/296393…

公众号:微信搜【小儿来一壶枸杞酒泡茶】

小钟同学 | 文 【欢迎一起学习交流】| QQ:308711822

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【评论抽奖咯】分布式事务的5种解决方案!

发表于 2021-09-09

花哥很荣幸参加了掘金送周边活动,感谢大家一直对我和掘金的支持,在下方评论区留言,随机送好礼~

抽奖软件:抽奖软件,按评论先后顺序进行录入

抽奖奖品:随机抽选两位小伙伴,送掘金徽章1枚

前言

距离上次发文已经有将近一个月的时间了,最近各种事情堆在一起也是比较忙,月初有幸获得掘金官方的【免费申请掘金周边】活动资格,这不今天给小伙伴们送福利了,毕竟你们的幸福就是我的动力,啊哈哈哈哈。

20210824774702_BpasYc.gif

思绪良久,最终决定今天和大家分享一些分布式相关的知识,用浅显的例子唠一唠分布式中事务的相关解决方案,这也是面试中的高频问题,学会本文你会对2PC、TCC、本地消息表、最终一致性、最大努力通知以及对应的场景有一个更好的认识。

单系统事务

这里先撇开分布式不说,单说事务,什么是事务呢?我想每个人都是能够说出来一些的,比如最常听到的原子性、一致性、隔离性、持久性,即ACID特性,那用官方的话怎么解释呢,维基百科中用转账的例子来说明,A账户转账给B账户100元,此业务包含A账户减100元、B账户加100元这两个操作,对支持事务的系统来说,无论什么情况,都要保证这两个操作都能完成,不能A账户扣减成功,但是B账户没有增加钱。

这里带着掘金等级一级及以下的小伙伴,回顾一下ACID:

  • 原子性(Atomicity): 事务作为一个整体被执行,事务中包含的操作要么不执行,要么全部执行。即转账场景中,A账户扣减、B账户增加这两个动作必须全部成功或者全部失败。
  • 一致性(Consistency): 数据必须满足完整性约束,从一个一致性状态转变到另一个一致性状态。即转账场景中不存在A账户扣减100元,但是B账户没有增加的现象。
  • 隔离性(Isolation): 多个事务并发执行时是互不干扰的。
  • 持久性(Durability): 一旦事务完成之后,对数据库的修改会永久保存下来,之后的其他操作不会对已提交的事务结果产生影响。

接下来用一个常见的支付场景来讲一下单机系统中的事务是怎样的。

image.png

现在的小伙伴,我相信大家都是经常网购的,那大家有没有想过,在我们完成支付后,商城系统内部是怎样运作,来完成订单修改、库存扣减、优惠券扣减、消息通知、日志记录等等这一堆操作的。上面这幅图就是在单机系统中我们常见的写法,在一个事务中完成如下几个操作:

  • 用户发起并完成支付
  • 接收到支付回调
  • 开启事务
  • 订单状态修改
  • 优惠券扣减
  • 日志记录
  • 消息通知
  • 提交事务

在这个过程中我们就用到了事务,一旦其中某个环节失败,系统会将已经完成的操作进行回滚,比如日志记录失败后,会将前两步(订单状态修改、优惠券扣减)操作回滚到修改前的状态,从而保证系统业务上的正确性。

上述的写法虽然逻辑清晰、实现简单,但是同样是缺点非常明显,代码耦合性高,比如后续新增一个积分系统,就必须在原有逻辑中继续追加;除此之外该模式也不适用高并发业务场景,那要怎样解决这一问题呢,也就是我们今天要讲的分布式事务了。

分布式事务

说到分布式系统,给人的第一印象就是:哇,好高大上啊,再看看自己**天天写的都是什么辣鸡需求。那对于分布式中重要的环节,分布式事务我们该怎么理解呢?对于单机系统我们都知道了,各个模块按顺序操作本地数据库,来完成数据的新增修改,在出现异常时对所有操作进行回滚。分布式事务其实也是类似的,先看下面这张图:

image.png

分布式系统中,会将原单机系统中各个模块拆解成一个个独立的系统,分别部署到不同的服务器中,每个子系统都可以单独运行,不依赖其他子系统。

那当用户完成支付后,其实整个流程和单机系统基本是一致的,订单系统首先收到支付成功通知,在对数据库操作的同时也会通知其他所有子系统开始工作,其他子系统收到消息后,会完成自己业务处理并最终写入到数据库中。如果大家所有子系统都顺利完成自身的工作,那整个流程也是完美收官,但是如果某个子系统由于缺陷或者网络超时,比如消息模块因为宕机,没有及时通知到用户,这就导致整个业务没有完整闭环,那这时我们该怎么办呢?其实这里就用到了分布式事务了。

接下来分析几种常用的分布式事务解决方案

2PC

2PC 是Two-phase commit protocol的缩写,翻译过来就是两阶段提交,那什么是两阶段提交呢(一脸懵逼)。顾名思义,就是分两个阶段来控制事务的提交(准备阶段、提交阶段),在2PC中,还引入了两个重要的角色,一个是事务协调者,另一个则是参与者。举个简单的例子,我们在业务中,应该会遇到为了完成某个需求,需要增加两条数据到两个数据库(主库、子库)中,这时候我们就要在同一个事务中完成这两次操作。

没有理解也不用着急,接下来花哥用图示来逐步拆解这个过程。

在准备阶段中,事务协调者会向所有参与者发出准备请求,询问是否能够执行提交操作,参与者收到请求并将准备结果反馈给协调者,完整流程图如下图。

  • 准备阶段

image.png

  • 提交阶段

当协调者收到所有参与者获得的反馈消息都是【准备成功】时,协调者就会通知各个参与者进入提交阶段;此时参与者节点完成操作,并释放整个事务期间占用的资源,并向协调者发起【提交成功】的反馈;最终由协调者完成事务。

image.png

这里还会出现另一种情况: 在准备阶段中,有一个参与者返回准备失败,协调者就会通知所有参与者,要求每个参与者进行回滚操作,参与者在回滚成功后,会告知协调者【回滚成功】。

  • 准备阶段

image.png

  • 提交阶段

image.png

看完上面几幅图后,我们知道,2PC能够保证第一阶段所有参与者都准备成功(失败)时,通过协调者完成对各个数据库(参与者)进行事务提交(回滚)的通知,最终各个角色合作完成整个分布式事务的提交。

认真的小伙伴会有一个疑问,如果在提交阶段有参与者提交失败了呢? 因为在准备阶段会出现两种情况,所以在提交阶段,就会分为两种情况讨论,也要分别讨论:

  • 如果第二阶段为提交事务:通过不断重试,直到所有参与者全部完成提交,如果最终还是不能成功执行,那只能通过人工主动干预……
  • 如果第二阶段为回滚事务:同样也会不断重试,直到所有参与者全部完成回滚,否则第一阶段中的参与者会一直处于阻塞状态。

TCC

TCC 的全程分为三个阶段,分别是 Try、Confirm、Cancel,

  • Try阶段:这个阶段说的是对各个服务的资源做检测以及对资源进行锁定或者预留
  • Confirm阶段:这个阶段是指确认操作,实际已经真正执行了
  • Cancel阶段:如果某个服务的业务方法执行出错,就会将已经执行成功的业务逻辑进行回滚操作

以转账的例子为例,在跨银行进行转账的时候,需要涉及到两个银行的分布式事务,从A 银行向 B 银行转100元,整个流程如下:

  • Try阶段:冻结A银行账户100元,B银行账户预增加100元;
  • Confirm阶段:执行实际的转账操作,A银行账户的资金扣减,B银行账户的资金增加;
  • Cancel阶段:如果任何一个银行的操作失败,那么就需要回滚进行补偿,就是比如A银行账户如果已经扣减了,但是B银行账户资金增加失败了,那么就得把A银行账户资金给加回去。

image.png

TCC 对业务的侵入较大并且是业务紧耦合,这种方案说实话几乎很少用人使用,但是也有它适用的场景。

比较适合的场景:对一致性要求极高,比如常见的就是资金类的场景,可以用TCC方案,通过自己编写大量的业务逻辑,自己判断一个事务中的各个环节是否正常执行,出现异常情况就执行回滚操作。

但是一般来说,这个方案中事务回滚严重依赖于手写代码来进行回滚和补偿,会造成补偿代码巨大,不建议轻易使用。

本地消息表

该机制是在每个系统的数据库中,增加一个消息表,在操作完本系统业务表(如步骤1/5)后,会新增一条与该业务相关的消息记录并保存到消息表中(如步骤2、步骤6),通过整个链路最终保证整个分布式事务的完整性。

如下图,为了保证A系统、B系统的全部操作在一个事务中,我们会在A系统插入完业务数据后,将该数据唯一识别信息(如ID)保存至消息表,注意此时消息表中记录为待确认状态,随后A系统通知MQ;然后B系统会获取MQ中的消息,开始自身的业务逻辑处理,即首先插入业务表数据,再插入消息表数据。

有一点需要注意:B系统插入消息表数据时,需要注意MQ的一些特点,即重复消费的问题,因此B系统在插入消息表时,要保证此次操作为首次执行,可以通过B系统业务表中唯一ID进行确定。

随后B系统回调A系统,告知其本系统操作成功,然后A系统收到消息后,将A系统消息表状态修改为已完成状态,整个分布式至此结束。

为了保证B系统能够正常接收消息,A系统可以增加轮询操作,对所有待确认的消息每隔1s轮询一次,查看是否超过指定时间(如1分钟)还未响应,根据自身业务可以选择重发或者回滚。

image.png

该方案的缺陷是严重依赖数据库的消息表,在并发场景中会有瓶颈也比较明显,而且需要系统容忍一定时间的数据不一致。

消息事务

该模式是通过消息中间件,如RocketMQ 来完成分布式事务。

首先A系统会发送一条prepared状态的消息到MQ中,该类型的消息对订阅者是不可见,因此也不会被消费;一旦发送成功,A系统会继续完成本地事务的执行,若执行正常,就会再发送一条确认消息,告知mq本地事务执行完成,可以通知B系统完成消费了。RocketMQ会轮询prepared状态的消息,一定时间内还未收到确认消息,就会主动进行反查,确认消息是否成功。

当步骤3执行完成后,B系统此时会收到MQ的消息,开始执行本地事务,执行完成后,就会将该消息消费;若B系统在执行本地事务时出现异常,可以通过几种方案进行解决,如协调MQ重发、告知A系统重发、增加其他中间件(如zk)等。

此外,还要注意B系统消费消息时,幂等性的问题,和本地消息表中类似,此处不再赘述。

image.png

最大努力通知

该方案用最直观的说法就是:我已经尽最大的努力去通知其他系统了,如果这样还是不能完成,那我也是没有办法了,此时只能通过人工进行干预。这种方案适用于对分布式事务要求不严格的情况,比如日志记录、购买成功短信通知这类。

这种方案对于A系统来说,压力是比较小的,它只要完成本地事务并向MQ中发送消息,就算是结束本次事务,其他的都会交给【最大努力通知服务】去协调,如果最大努力通知服务一直收不到B系统的反馈,可以进行一定阈值(如20次)的重试,当超过阈值后,可以通知人工进行干预或直接放弃。

流程如下:

  • 系统A执行完本地事务,发送个消息到MQ;
  • 最大努力通知服务消费MQ然后写入数据库中记录下来,或者是放入个内存队列中,接着调用系统B的接口;
  • 如果系统B执行成功,此事务正常结束;但如果系统B执行失败,最大努力通知服务会尝试重新调用系统B,反复N次,最后还是未能成功就直接放弃或通知人工。

image.png

总结

本小节对分布式事务处理中常见的几种方案进行了讲解,在实际应用中,要结合自身的业务来合理选用。

比如2PC适用于数据库层面,TCC属于补偿性事务思想,是在业务层面来完成事务,但是代码侵入性较大,慎重选用,而最后三种的本地消息、事务消息、最大努力通知,共同思想是想保证最终一致性,适用于对时间不敏感以及分布式要求不严格的场景中。

最后给小伙伴们留一个问题,你们在日常开发中用到了哪些方案呢,可以在评论区和大家分享一下其中的坑,最后会随机抽出两位小伙伴送出今日的掘金礼品哦。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring bean的作用域 Spring bean的作用

发表于 2021-09-08

Spring bean的作用域

在默认情况下,应用上下文中的bean都是单例的形式创建的。 也就是不管给定的一个bean注入其他bean多少次,每次注入的都是同一个实例。

在大多数情况下,单例 bean是很理想的,可以反复重用。但是有时候我们所需要的bean是易变的,他会存储一些属于自己的状态,如果使用单例模式,每次操作同一个bean,会对bean造成污染,可能会出现预料不到的问题。

在Spring中定义了多种作用域,可以基于这些作用域创建 bean。

  • 单例(Singleton):在整个应用中,只创建bean的一个实例。
  • 原型(Prototype):每次注入或者通过Spring上下文获取的时候都会创建一个新的 bean实例。
  • 会话(Session):在Web应用中,为每个会话创建一个bean实例。
  • 请求(Request):在Web应用中,为每个请求创建一个bean实例。

配置bean的作用域

怎样指定bean的作用域,我们可以通过@Scope注解。如果是通过组件扫描方式发现和声明 bean,那么它可以和@Component注解搭配使用,将其声明为所配置作用域Bean

1
2
3
java复制代码@Component
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)//将其声明为原型bean
public class Demo { ... }

若是通过java配置类声明bean,它可以与@Bean搭配使用

1
2
3
4
5
java复制代码@Bean
@Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)
public Demo demo(){
return new Demo();
}

若是使用XML来配置bean,我们可以使用<bean>元素的scope属性,配置bean的作用域

1
xml复制代码<bean id="demo" class="com.abiao.Demo" scope="prototype" />

我列举的这几个例子都是声明的prototype(原型)作用域,每次注入或从Spring应用上下文检索该bean时,都会创建新的实例,这样我们每次操作都会得到我们自己的bean实例。


会话作用域和请求作用域

以上我们介绍了,单例与原型作用域的声明。但是在有的场景我们使用前两种作用域,并不合适。例如,在Web应用中实现购物车。

若是使用单例作用域,我们每次往购物车添加商品都是往一个购物车添加,这并不合理。若是使用原型,我们每次添加商品都会创建一个购物车,下次获取购物车也不是同一个,也不合理。按照购物车bean来说,会话作用域是最适合的,为每个用户会话创建一个购物车。

声明会话作用域bean

指定会话作用域bean,我们也是使用@Scope注解。除了给@Scope注解配置value属性,我们还配置了 proxyMode 属性(代理模式),ScopedProxyMode.INTERFACES表示基于接口的动态代理模式(jdk)。配置 proxyMode(代理模式)可以帮我们解决会话作用域bean注入到单例bean所遇到的问题。

1
2
3
4
java复制代码@Component
@Scope(value = WebApplicationContext.SCOPE_SESSION,//常量值为session
proxyMode = ScopedProxyMode.INTERFACES)//基于接口实现的代理模式
public class Demo { ... }

我们以商店与购物车为例,商店 StoreService(单例bean),购物车 ShoppingCart(会话bean)。在 StoreService的setShoppingCart()方法注入 ShoppingCart bean。

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码@Component
public class StoreService {

/** 购物车 */
private ShoppingCart shoppingCart;

@Autowired
public void setShoppingCart(ShoppingCart shoppingCart){
this.shoppingCart = shoppingCart;
}

}

因为 StoreService 是一个单例bean,会在Spring应用上下文加载时创建。在创建时,Spring会试图将 ShoppingCart 注入到 setShoppingCart()方法。但是 ShoppingCart 是会话作用域的bean,此时并不存在。只有当用户进入系统,创建会话之后,才会出现 ShoppingCart 实例。

那么问题来了,系统中会存在多个 ShoppingCart,每个用户进入系统,创建会话后,都会产生一个 ShoppingCart。但是 StoreService 是单例的,只能注入一个,我们并不想注入一个固定的 ShoppingCart 实例到 StoreService 中。我们所希望的是,当 StoreService 处理购物车时,注入的 ShoppingCart 恰好是当前会话所对应的那一个。

其实Spring并不会将 ShoppingCart bean 注入到 StoreService 中,而是注入一个 ShoppingCart bean 的代理。这个代理会暴露与 ShoppingCart 同样的方法,所以 StoreService 会将他看做一个购物车。当 StoreService 调用 ShoppingCart 的方法时,代理会对其进行懒解析并将调用委托给会话作用域内真正的 ShoppingCart bean。

proxyMode属性设置

  • 若 ShoppingCart 是个接口,其实现类 @Scope的 proxyMode 属性设置为 ScopedProxyMode.INTERFACES ,表示这个代理要实现 ShoppingCart 接口,并将调用委托给实现bean。该代理模式也是最为理想的。
  • 若 ShoppingCart 是个类,Spring就没办法创建基于接口的代理,这时,他必须使用CGLib来生成基于类的代理。将proxyMode属性设置为ScopedProxyMode.TARGET_CLASS,Spring会生成基于目标类扩展类的方式创建代理。

以上我们了解了会话作用域可能产生这类问题,同样的请求作用域bean也会面临相同的装配问题。因此,请求作用域的bean也应该使用作用域代理的方式注入。

如图:作用域代理延迟注入请求和会话作用域的bean
未命名文件.png

XML中声明作用域代理

如果不使用@Scope注解声明作用域代理,我们还可以使用XML文件来声明。
在XML中作用域可通过scope属性直接声明,但是声明作用域代理需要使用Spring aop 命名空间的元素<aop:scoped-proxy />。 <aop:scoped-proxy />默认采用CGLib方式创建目标类的代理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
xml复制代码<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns:aop="http://www.springframework.org/schema/aop"
xsi:schemaLocation="
http://www.springframework.org/schema/aop
http://www.springframework.org/schema/beans/spring-aop.xsd
http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans.xsd">

<bean id="cart"
class="com.abiao._03_scope.ShoppingCart"
scope="session">
<aop:scoped-proxy />
</bean>

</beans>

我们也可以通过设置 <aop:scoped-proxy /> 标签的 proxy-target-class 属性值为 false,让它基于接口创建作用域代理。

1
2
3
4
5
xml复制代码<bean id="cart"
class="com.abiao._03_scope.ShoppingCart"
scope="session">
<aop:scoped-proxy proxy-target-class="false"/>
</bean>

以上就是对Spring bean作用域的理解总结。

每天进步一点点,越慢才能越快!


本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

一个命名不规范引发的事故(文末有抽奖源码)

发表于 2021-09-08

本文已申请到掘金周边礼物-两枚掘金徽章,用于评论区抽奖。通过评论的方式参与 详情

写在前面

事情是这样的今天早上看到公司的 Gitlab 有这样的一则关于 Tag 命名的规范:

不能与本项目的已有分支名称重名

好奇心的驱使之下就点开了更多关于故事的背景。

主要的过程就不赘述了。总结来说的事情的过程大致为:

  • 问题:在测试环境构建发布代码的时候,新的逻辑始终不生效。
  • 分析:使用 Gitlab 网页、API 等获取到 master 的代码均不是最新代码。
  • 原因:开发在某一次上线的节点上创建了一个名称为 master 的 tag;导致网页、API 基于 master 的操作默认选择了 tag master 而不是我们期望的 branch master 。
  • 结果:新增 Gitlab Server Hook 对约束 tag 命名。禁止tag命名为 master、禁止 tag 与分支重名等。

重要的事情说三遍:

分支(Branch)名称与 标签(Tag)名称不可相同

分支(Branch)名称与 标签(Tag)名称不可相同

分支(Branch)名称与 标签(Tag)名称不可相同

一些资料

  • Git 内部原理 - Git 引用
  • Git checkout behavior when you have a tag and a branch with the same name
  • Fixing GIT Branch and Tag Name Collisions 一个关于 tag branch 同名情况下关于 git 的操作分析。

问题分析

上述的问题在开发其实很容易就会发生。虽然建议、约束都有但是该有的骚操作一个也不会拉下。下面就结合一个 demo 看看究竟发生了什么?

本地代码演示

尝试在本地创建名称相同的 tag 与 branch。如下代码可以看到在切换分支的时候会有 warning 的提示。提示索引的名称是不清楚模棱两可的。

1
2
3
4
5
6
7
bash复制代码~/github/demo on  master ⌚ 18:30:42
$ git tag master

~/github/demo on  master ⌚ 18:30:48
$ git checkout master
warning: refname 'master' is ambiguous.
Already on 'master'

那么在本地的 git 命令默认切换的是 branch 名称,如何切换到对应的 tag 名称上呢?如下可以指定具体的索引类型,指定切换到具体的 branch/tag 。

1
2
3
4
5
bash复制代码# 切换到指定的 tag
git checkout refs/tags/master

# 切换到指定的 branch
git checkout refs/heads/master

如何删除同名分 branch/tag,可以使用 branch/tag 全称。

1
2
bash复制代码git branch -D refs/heads/dev
git tag -d refs/tags/dev

总结:可以看到本地在使用 checkout 命令遇到同名的 branch tag 会出现的警告,默认会切换到分支上。如果想切换到 tag 上,可以通过指定切换的类型达到目的。

这里很容易就会想到两个问题:

  • Branch 与 Tag 的区别,为什么要有区分?
  • Branch/Tag 名称相同的情况下,Checkout 默认切换到 Tag 上?

Branch 与 Tag 的区别

可以查看 StackOverflow 上关于这方便的讨论 How is a tag different from a branch in Git

答:简单的来说不同点有以下两点:

  • branch 的特点是该分支的指针的位置随着提交不断更新,一般是存储在 refs/heads/。
  • tag 的特点与分支恰恰相反,指向的 commit 不会随着新的提交去更新。一般是存储在 refs/tags/。

引用 StackOverflow 上的例子来解释一下:

image.png

分支(Branch)的概念很像在阅读时候使用的书签,实时记录我们读书时刻的最新位置。而标签(Tag)很像我们在书中贴的便签,方便我们快回到某一页。

Checkout 的优先级

存在 branch 与 tag 同名的情况下使用 git checkout something 的时候为什么会优先使用 tag ?

在 Git Checkout 文档中 git-scm.com/docs/git-ch… 说明 Checkout 有两个主要用途一是用于丢弃某些提交的文件;而是切换 git 索引的位置。如果存在 branch 与 tag 名称相同的情况下,默认切换到分支并给出警告。

创建 commitId 为名称的 tag

如果我创建一个以 commit id 为名称的 branch/tag 会怎样?

这个问题就作为一个课后作业,交给各位感兴趣的小伙伴自行思考了。结合上面的分析答案应该比较明显了。

思考总结

出现这类问题的时候是我们日常中的使用习惯认为 master 应该就是默认的分支。但是忽略了 GitLab CI 部署的时候可能会优先使用 Tag 名称作为部署名称。

如下是一个关于 tag branch 同名情况下 Gitlab CI 的问题讨论。 Branch and tag with same name cause multiple problems 这里很详细的说明 Gitlab branch/tag 同名情况下会出现的问题。

所以在日常使用 Git 的时候一定要养成良好的习惯。

  • 无论是分支的命名还是标签的命名都应该避免冲突。
  • 在分支/标签命名的时候要有规范,见名知义(这一点写代码的同学应该不陌生)。
  • 一定要注意 Warning,有时候解决 Warning 就会解决一个 Bug。
  • 充分的使用工具对一些可预期的行为进行干预。如 git hook 对提交的信息做校验。

抽奖活动

本次活动抽奖规则如下:

首先非常欢迎各位大佬积极评论,也感谢大家对文章的支持。下面是抽奖规则

所有的评论需要符合掘金社区的规范。否则无法参加活动。

评论区随机抽奖,单人最多一个徽章。

若联系不到中奖者,则重新抽取。

抽奖公布时间为:9月12日。

抽奖代码实现

本次抽奖本着公平公正的原则进行抽奖,所以这里贴出JS的抽奖代码。欢迎大家纠错,详细逻辑可以查看下面的代码。

抽奖代码逻辑

    1. 调用掘金文章评论接口获取所有评论用户
    1. Math.random 实现随机获取数组元素
    1. 抽奖并确保两次获奖用户不同
    1. 控制台输出获奖用户

如何使用

打开 Chrome 的控制台,复制粘贴以下代码。控制台输出的用户即为中奖用户。

注意事项:如果你也想使用此抽奖的代码,那么你需要修改 data 中对应的数据。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
js复制代码function choose(choices) {
var index = Math.floor(Math.random() * choices.length);
return choices[index];
}

async function getUserApi(cursor) {

data = {
"item_id": "7005444889106186270",
"item_type": 2,
"cursor": cursor.toString(),
"limit": 20,
"sort": 0,
"client_type": 2608
}

obj = await fetch('https://api.juejin.cn/interact_api/v1/comment/list', {
method: 'POST', body: JSON.stringify(data),
credentials: "include",
headers: {
'Content-type': 'application/json; charset=UTF-8'
}
})
.then(res => {
obj = res.json()
return obj

})
return obj
}

async function getAllCommentInfo() {
cursor = 0
has_more = true
result = []
while (has_more) {
response = await getUserApi(cursor)
cursor = response['cursor']
has_more = response['has_more']
data = response['data']
result = [...result, ...data]
}
return result
}

async function lottery() {
commentInfo = await getAllCommentInfo()
userInfo = commentInfo.map(item => item.user_info.user_id + "-" + item.user_info.user_name)

userInfoDistinct = Array.from(new Set(userInfo))
user1 = choose(userInfoDistinct)
user2 = choose(userInfoDistinct)

while (user1 == user2) {
user2 = choose(userInfoDistinct)
}

console.log("抽奖时间为:" + new Date())
console.log("去重评论数:" + userInfoDistinct.length)
console.log("恭喜您中奖了 " + user1)
console.log("恭喜您中奖了 " + user2)
}

lottery()

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Kafka(五)springboot集成kafka

发表于 2021-09-08

我正在参加中秋创意投稿大赛,详情请看:中秋创意投稿大赛 。

一、添加kafka依赖

在mvnrepository.com/中搜索kafka,我们…

这里面需要注意点版本的问题,参考下图做对比,不同版本的springboot支持不同版本的kafka:

image.png

也可以自己去官方看这个对应关系:spring.io/projects/sp…

对比上图去maven仓库找相应版本就可以了:mvnrepository.com/

image.png

我的springboot版本是2.2.5的,所以使用kafka的版本如下所示:

1
2
3
4
5
6
xml复制代码<!-- https://mvnrepository.com/artifact/org.springframework.kafka/spring-kafka -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
<version>2.3.5.RELEASE</version>
</dependency>

如果版本不对应会有很多的问题,通常kafka的版本要比springboot高一个版本就行了,比如2.2.5则kafka是2.3.5这样。

二、添加配置文件

我这里使用一个共工程模拟生产者和消费者。
我的springboot工程使用的配置文件是yaml类型的,所以按照如下方式添加:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
yaml复制代码spring:
kafka:
bootstrap-servers: 192.168.184.134:9092,192.168.184.135:9092,192.168.184.136:9092
producer:
# 发生错误后,消息重发的次数。
retries: 0
# 键的序列化方式
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# 值的序列化方式
value-serializer: org.apache.kafka.common.serialization.StringSerializer
consumer:
group-id: test
# 键的反序列化方式
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# 值的反序列化方式
value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
listener:
# 但消费者监听的topic不存在时,保证能够是项目启动
missing-topics-fatal: false

配置还有很多,我们这里先集成,后面会逐渐增加配置。

三、代码实现

3.1生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
typescript复制代码package com.cloud.bssp.message.kafka.producer;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Component;

/**
* kafka生产者
*
* @author weirx
* @date 2021/02/03 14:22
**/
@Component
public class KafkaProducer {

@Autowired
private KafkaTemplate kafkaTemplate;

/**
* kafka消息发送
* @param
* @author weirx
* @return void
* @date: 2021/2/3
*/
public void send(String topic){
kafkaTemplate.send(topic,"hello kafka");
}
}

3.2消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码package com.cloud.bssp.message.kafka.consumer;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

import java.util.Optional;

/**
* kafka消费者
*
* @author weirx
* @date 2021/02/03 15:01
**/
@Slf4j
@Component
public class KafkaConsumer {

@KafkaListener(topics = {"test-kafka"})
public void consumer(ConsumerRecord<?, ?> record) {
Optional<?> kafkaMessage = Optional.ofNullable(record.value());
if (kafkaMessage.isPresent()) {
Object message = kafkaMessage.get();
log.info("----------------- record =" + record);
log.info("------------------ message =" + message);
}
}
}

3.3测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
kotlin复制代码package com.cloud.bssp.message.kafka;

import com.cloud.bssp.message.kafka.producer.KafkaProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

/**
* kafka测试类
*
* @author weirx
* @date 2021/02/03 15:03
**/
@RestController
@RequestMapping("test/kafka")
public class TestKafka {

@Autowired
private KafkaProducer producer;

@RequestMapping("/send/{topic}")
public void send(@PathVariable String topic){
producer.send(topic);
}
}

四、测试

这里我将topic作为了参数进行传递,方便我们观察。
我们执行以下测试接口:

1
bash复制代码http://localhost:8085/test/kafka/send/test-kafka

看下控制台,消费者输出的结果:

1
2
ini复制代码2021-02-03 17:12:44.654  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 5, leaderEpoch = 0, offset = 0, CreateTime = 1612343564622, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:12:44.654 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka

从上结果看到:
topic:test-kafka
partition:5
offset:0

我们从kafkatool中看看结果,在分区5中增加了一条消息:

image.png

这里为什么有10个分区呢?在上一章节我们搭建kafka集群的时候,默认给了10个partition,关于partition的作用,在我们学会使用后,后面的章节会详细介绍。

offset是当前partition中的数据个数的偏移量,从0开始。

下面我们再次发送10条消息,使消息再次发送到partition5中。结果如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
ini复制代码2021-02-03 17:15:17.129  INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer     : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 9, leaderEpoch = 0, offset = 0, CreateTime = 1612343717117, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:15:17.129 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:20:45.456 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 0, leaderEpoch = 0, offset = 0, CreateTime = 1612344045453, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:20:45.456 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:12.999 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 4, leaderEpoch = 0, offset = 0, CreateTime = 1612344072988, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:12.999 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:15.983 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 3, leaderEpoch = 0, offset = 0, CreateTime = 1612344075980, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:15.983 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:16.799 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 2, leaderEpoch = 0, offset = 0, CreateTime = 1612344076796, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:16.799 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:17.614 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 1, leaderEpoch = 0, offset = 0, CreateTime = 1612344077612, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:17.614 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:18.487 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 8, leaderEpoch = 0, offset = 0, CreateTime = 1612344078484, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:18.487 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:19.527 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 7, leaderEpoch = 0, offset = 0, CreateTime = 1612344079524, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:19.527 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:20.505 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 6, leaderEpoch = 0, offset = 0, CreateTime = 1612344080500, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:20.505 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka
2021-02-03 17:21:21.440 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ----------------- record =ConsumerRecord(topic = test-kafka, partition = 5, leaderEpoch = 0, offset = 1, CreateTime = 1612344081437, serialized key size = -1, serialized value size = 11, headers = RecordHeaders(headers = [], isReadOnly = false), key = null, value = hello kafka)
2021-02-03 17:21:21.440 INFO 5080 --- [ntainer#0-0-C-1] c.c.b.m.kafka.consumer.KafkaConsumer : ------------------ message =hello kafka

如上结果,又发送了10条消息,消息随机的负载到每个partition中,第10条再次记录到了partition 5中,其offset加1了。

image.png


到此为止,已经完成了springboot和kafka的集成了,后面会逐渐深入到每个特性和组件。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Java Springboot+Vue+MyBatis音乐播

发表于 2021-09-08

摘要:

**网络技术以及计算机的发展,网友们对网络的要求也日益长高,平常在网上听话用一大堆下载软件下载下来也要管理,又占空间,比如那流行歌曲,下载了听了又要删很不方便·而网络音乐库的实现改变了这一状况。它本身就是一个数字音乐交互,用户通过它可是方便快捷、安全地实现国最大的音乐搜索查找歌曲,并能实时试听,将自己喜爱的歌曲加入收藏,为用户建立一个自由、自主、安全的世界局域网。音乐正是在这样的需求前提下应运而生。给人们的日常生活带来了极大的乐趣,让人们在繁忙疲惫的工作之后可以进行休闲·基于此种现状,在充分分析了该行业的市场前景,调研了用户需求之后,设计了该音乐。**

**流行音乐之所以被称为“流行”,原因之一,是她有着传播的时效性·绝大部分流行歌曲可以一夜成名·但是从人们脑子里消失得也很快,从前极力抢购的唱片可能不久之后就被束之高阁,人们追逐的永远是不同于以往的“新”星。但是互联网的出现,一方而因为传播速度提高而加剧了这种时效性,另一方而却又利用其无限的网络胸怀使这些流行音乐具有了一定的持久性。如果这两方面正是人们所需要的,那么,这些都应当归功于音乐·作为音乐的网络载体,音乐在创作、传播、欣赏方式等方而对流行音乐的发展都产生了前所未有的影响:

1)电脑网络技术的发展使人们通过音乐接触到了更多的流行音乐。

2)网民数量的激增使更多的人们通过音乐接触到了流行音乐。

3)音乐为流行音乐创作提供了更多的便利。

4)音乐刺激了流行音乐的传播。

5)音乐使流行音乐的欣赏方式发生了改变。

6)音乐不但刺激了流行音乐的传播,且也刺激了电子数码产品的频繁更新换代。**

主要设计:

功能设计:

用户端:登录注册功能、首页歌曲歌单信息查看、搜索、听歌、歌曲的各项设置、评论、以及我的音乐等。

管理员端: 登录、图形树状图数据查看、用户管理、歌单管理、歌手管理、歌曲编辑、评价等。

主要技术:

Springboot+SpringMvc+mybatis+lombok+cache+拦截器+Jquery+html+VUE+Node.js等

功能截图:

用户端首页:

)​

登录注册:

)​

歌单信息:用户首页可以根据歌单信息进行搜索歌曲

)​

歌手信息:用户首页可以根据歌手信息进行搜索歌曲

)​

)​

我的音乐:

​

评论点赞:

)​

管理员端:

)​

首页:

)​

用户管理:

)​

歌手管理:

)​

歌单管理:

)​

部分代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
ini复制代码@RestController
@Controller
public class ConsumerController {

@Autowired
private ConsumerServiceImpl consumerService;

@Configuration
public class MyPicConfig implements WebMvcConfigurer {
@Override
public void addResourceHandlers(ResourceHandlerRegistry registry) {
String os = System.getProperty("os.name");
if (os.toLowerCase().startsWith("win")) { // windos系统
registry.addResourceHandler("/img/avatorImages/**")
.addResourceLocations("file:" + Constants.RESOURCE_WIN_PATH + "\img\avatorImages\");
} else { // MAC、Linux系统
registry.addResourceHandler("/img/avatorImages/**")
.addResourceLocations("file:" + Constants.RESOURCE_MAC_PATH + "/img/avatorImages/");
}
}
}

// 添加用户
@ResponseBody
@RequestMapping(value = "/user/add", method = RequestMethod.POST)
public Object addUser(HttpServletRequest req){
JSONObject jsonObject = new JSONObject();
String username = req.getParameter("username").trim();
String password = req.getParameter("password").trim();
String sex = req.getParameter("sex").trim();
String phone_num = req.getParameter("phone_num").trim();
String email = req.getParameter("email").trim();
String birth = req.getParameter("birth").trim();
String introduction = req.getParameter("introduction").trim();
String location = req.getParameter("location").trim();
String avator = req.getParameter("avator").trim();

if (username.equals("") || username == null){
jsonObject.put("code", 0);
jsonObject.put("msg", "用户名或密码错误");
return jsonObject;
}
Consumer consumer = new Consumer();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
Date myBirth = new Date();
try {
myBirth = dateFormat.parse(birth);
} catch (Exception e){
e.printStackTrace();
}
consumer.setUsername(username);
consumer.setPassword(password);
consumer.setSex(new Byte(sex));
if (phone_num == "") {
consumer.setPhoneNum(null);
} else{
consumer.setPhoneNum(phone_num);
}

if (email == "") {
consumer.setEmail(null);
} else{
consumer.setEmail(email);
}
consumer.setBirth(myBirth);
consumer.setIntroduction(introduction);
consumer.setLocation(location);
consumer.setAvator(avator);
consumer.setCreateTime(new Date());
consumer.setUpdateTime(new Date());

boolean res = consumerService.addUser(consumer);
if (res) {
jsonObject.put("code", 1);
jsonObject.put("msg", "注册成功");
return jsonObject;
} else {
jsonObject.put("code", 0);
jsonObject.put("msg", "注册失败");
return jsonObject;
}
}

// 判断是否登录成功
@ResponseBody
@RequestMapping(value = "/user/login/status", method = RequestMethod.POST)
public Object loginStatus(HttpServletRequest req, HttpSession session){

JSONObject jsonObject = new JSONObject();
String username = req.getParameter("username");
String password = req.getParameter("password");
// System.out.println(username+" "+password);
boolean res = consumerService.veritypasswd(username, password);

if (res){
jsonObject.put("code", 1);
jsonObject.put("msg", "登录成功");
jsonObject.put("userMsg", consumerService.loginStatus(username));
session.setAttribute("username", username);
return jsonObject;
}else {
jsonObject.put("code", 0);
jsonObject.put("msg", "用户名或密码错误");
return jsonObject;
}

}

// 返回所有用户
@RequestMapping(value = "/user", method = RequestMethod.GET)
public Object allUser(){
return consumerService.allUser();
}

// 返回指定ID的用户
@RequestMapping(value = "/user/detail", method = RequestMethod.GET)
public Object userOfId(HttpServletRequest req){
String id = req.getParameter("id");
return consumerService.userOfId(Integer.parseInt(id));
}

// 删除用户
@RequestMapping(value = "/user/delete", method = RequestMethod.GET)
public Object deleteUser(HttpServletRequest req){
String id = req.getParameter("id");
return consumerService.deleteUser(Integer.parseInt(id));
}

// 更新用户信息
@ResponseBody
@RequestMapping(value = "/user/update", method = RequestMethod.POST)
public Object updateUserMsg(HttpServletRequest req){
JSONObject jsonObject = new JSONObject();
String id = req.getParameter("id").trim();
String username = req.getParameter("username").trim();
String password = req.getParameter("password").trim();
String sex = req.getParameter("sex").trim();
String phone_num = req.getParameter("phone_num").trim();
String email = req.getParameter("email").trim();
String birth = req.getParameter("birth").trim();
String introduction = req.getParameter("introduction").trim();
String location = req.getParameter("location").trim();
// String avator = req.getParameter("avator").trim();
// System.out.println(username+" "+password+" "+sex+" "+phone_num+" "+email+" "+birth+" "+introduction+" "+location);

if (username.equals("") || username == null){
jsonObject.put("code", 0);
jsonObject.put("msg", "用户名或密码错误");
return jsonObject;
}
Consumer consumer = new Consumer();
DateFormat dateFormat = new SimpleDateFormat("yyyy-MM-dd");
Date myBirth = new Date();
try {
myBirth = dateFormat.parse(birth);
}catch (Exception e){
e.printStackTrace();
}
consumer.setId(Integer.parseInt(id));
consumer.setUsername(username);
consumer.setPassword(password);
consumer.setSex(new Byte(sex));
consumer.setPhoneNum(phone_num);
consumer.setEmail(email);
consumer.setBirth(myBirth);
consumer.setIntroduction(introduction);
consumer.setLocation(location);
// consumer.setAvator(avator);
consumer.setUpdateTime(new Date());

boolean res = consumerService.updateUserMsg(consumer);
if (res){
jsonObject.put("code", 1);
jsonObject.put("msg", "修改成功");
return jsonObject;
}else {
jsonObject.put("code", 0);
jsonObject.put("msg", "修改失败");
return jsonObject;
}
}

// 更新用户头像
@ResponseBody
@RequestMapping(value = "/user/avatar/update", method = RequestMethod.POST)
public Object updateUserPic(@RequestParam("file") MultipartFile avatorFile, @RequestParam("id")int id){
JSONObject jsonObject = new JSONObject();

if (avatorFile.isEmpty()) {
jsonObject.put("code", 0);
jsonObject.put("msg", "文件上传失败!");
return jsonObject;
}
String fileName = System.currentTimeMillis()+avatorFile.getOriginalFilename();
String filePath = System.getProperty("user.dir") + System.getProperty("file.separator") + "img" + System.getProperty("file.separator") + "avatorImages" ;
File file1 = new File(filePath);
if (!file1.exists()){
file1.mkdir();
}

File dest = new File(filePath + System.getProperty("file.separator") + fileName);
String storeAvatorPath = "/img/avatorImages/"+fileName;
try {
avatorFile.transferTo(dest);
Consumer consumer = new Consumer();
consumer.setId(id);
consumer.setAvator(storeAvatorPath);
boolean res = consumerService.updateUserAvator(consumer);
if (res){
jsonObject.put("code", 1);
jsonObject.put("avator", storeAvatorPath);
jsonObject.put("msg", "上传成功");
return jsonObject;
}else {
jsonObject.put("code", 0);
jsonObject.put("msg", "上传失败");
return jsonObject;
}
}catch (IOException e){
jsonObject.put("code", 0);
jsonObject.put("msg", "上传失败"+e.getMessage());
return jsonObject;
}finally {
return jsonObject;
}
}

数据库设计:

数据库采用mysql5版本、满足数据库设计三范式。编码采用utf8 – UTF-8 Unicode、排序规则采用utf8_general_ci

用户表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
sql复制代码CREATE TABLE `consumer` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT ,
`username` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL ,
`password` varchar(100) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL ,
`sex` tinyint(4) NULL DEFAULT NULL ,
`phone_num` char(15) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`email` char(30) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`birth` datetime NULL DEFAULT NULL ,
`introduction` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`location` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`avator` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`create_time` datetime NOT NULL ,
`update_time` datetime NOT NULL ,
PRIMARY KEY (`id`),
UNIQUE INDEX `username_UNIQUE` (`username`) USING BTREE ,
UNIQUE INDEX `phone_num_UNIQUE` (`phone_num`) USING BTREE ,
UNIQUE INDEX `email_UNIQUE` (`email`) USING BTREE
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
AUTO_INCREMENT=30
ROW_FORMAT=COMPACT
;

评论表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
sql复制代码CREATE TABLE `comment` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT ,
`user_id` int(10) UNSIGNED NOT NULL ,
`song_id` int(10) UNSIGNED NULL DEFAULT NULL ,
`song_list_id` int(10) UNSIGNED NULL DEFAULT NULL ,
`content` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`create_time` datetime NULL DEFAULT NULL ,
`type` tinyint(4) NOT NULL ,
`up` int(10) UNSIGNED NOT NULL DEFAULT 0 ,
PRIMARY KEY (`id`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
AUTO_INCREMENT=59
ROW_FORMAT=COMPACT
;

收藏表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
sql复制代码CREATE TABLE `collect` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT ,
`user_id` int(10) UNSIGNED NOT NULL ,
`type` tinyint(4) NOT NULL ,
`song_id` int(10) UNSIGNED NULL DEFAULT NULL ,
`song_list_id` int(10) UNSIGNED NULL DEFAULT NULL ,
`create_time` datetime NOT NULL ,
PRIMARY KEY (`id`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
AUTO_INCREMENT=54
ROW_FORMAT=COMPACT
;

歌手歌曲表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
sql复制代码CREATE TABLE `singer` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT ,
`name` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL ,
`sex` tinyint(4) NULL DEFAULT NULL ,
`pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`birth` datetime NULL DEFAULT NULL ,
`location` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`introduction` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
PRIMARY KEY (`id`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
AUTO_INCREMENT=46
ROW_FORMAT=COMPACT
;

歌手表:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sql复制代码CREATE TABLE `song` (
`id` int(10) UNSIGNED NOT NULL AUTO_INCREMENT ,
`singer_id` int(10) UNSIGNED NOT NULL ,
`name` varchar(45) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL ,
`introduction` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`create_time` datetime NOT NULL COMMENT '发行时间' ,
`update_time` datetime NOT NULL ,
`pic` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NULL DEFAULT NULL ,
`lyric` text CHARACTER SET utf8 COLLATE utf8_general_ci NULL ,
`url` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL ,
PRIMARY KEY (`id`)
)
ENGINE=InnoDB
DEFAULT CHARACTER SET=utf8 COLLATE=utf8_general_ci
AUTO_INCREMENT=115
ROW_FORMAT=COMPACT
;

评论区抽奖

现在只需要在评论区留言说一下对这个springboot音乐播放系统的想法,或者也可以提一下还有什么想学的小案例,我会根据评论的热度送出两个掘金的勋章哦(掘金官方承担)~来评论区一起说一下你的想法吧!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

分布式消息流平台:不要只想着Kafka,还有Pulsar

发表于 2021-09-08

摘要: Pulsar作为一个云原生的分布式消息流平台,越来越频繁地出现在人们的视野中,大有替代Kafka江湖地位的趋势。

本文分享自华为云社区《MRS Pulsar:下一代分布式消息流平台全新发布!》,作者: Lothar。

Pulsar的前世今生

Apache Pulsar是一个发布-订阅消息系统,使用计算与存储分离的云原生架构。Pulsar 2018年9月成为ASF顶级项目,近两年,随着社区不断发展和诸多企业的应用和贡献,Pulsar作为一个云原生的分布式消息流平台,越来越频繁地出现在人们的视野中,大有替代Kafka江湖地位的趋势。

Pulsar和Kafka的对比

Pulsar和Kafka架构上最大的不同是,Kafka由Broker进行消息的收发和持久化,数据存储在本地文件系统,由Broker统一管理。这也意味着数据和消息处理是耦合的。

Kafka官网描述道:Kafka重度依赖文件系统,用于存储或缓存消息。当Broker接收到消息时,会将消息追加写到本地磁盘上。这一架构决定了Partition和Broker的对应关系是相对固定的,只有在partitionreassign时才会发生数据迁移。Partition的Leader在数据副本分布节点上产生,用于处理生产消费请求。

而Pulsar采用了计算存储分离架构,这也是Pulsar被称作云原生平台的主要原因。Pulsar依赖Apache BookKeeper管理持久化数据,Apache BookKeeper是可扩展、可容错、低延迟的日志存储服务,能够保证在强持久性下的低延迟读写。

*引自Pulsar官网介绍:pulsar.apache.org/docs/en/con…

Broker接收请求后,数据实际分布式存储在BookKeeper服务中。在数据的物理存储模型中某个Topic或Partition的数据并不固定存储在某个Bookie实例上。

Pulsar将分布式日志划分为多个Segment,每个Segment对应BookKeeper中的一个Ledger。与Kafka将某一Partition的数据日志保存在某一固定目录下不同,Pulsar通过划分Segment的方式,可以将同一topic或partition分布到不同的Bookie上。

Pulsar的优势特性

灵活扩展

相信很多使用Kafka的客户都有类似的经历:

  • 磁盘空间不足,只能调整数据TTL,或扩容机器后向新的Broker中迁移Partition
  • Topic或Partition间数据分配不均匀,节点之间或磁盘之间使用不均衡,有的磁盘已经满了,而有的磁盘还有很多空间
  • Broker机器故障,需要将数据迁移到其他节点后下电维修

Pulsar的存算分离架构天然地避免了这些问题。PulsarBroker本身是无状态的,当某个Broker故障时,另一个Broker可以立即接管对应的Topic而不需要迁移数据。BookKeeper分布式日志保证了存储节点间的数据均衡,不会因某一个Partitoin或Topic数据过多而导致IO集中在某一节点上。

当集群需要扩容时,Broker可以立即感知到新加入集群的Bookie,并将新写入的数据存储到新添加的Bookie中。

多租户

Kafka社区在KIP-37正在讨论加入NameSpace以实现多租户特性,而Pulsar已实现这一功能。在企业中,消息队列服务通常会被多个团队使用,在使用Kafka时,有时需要为每个团队维护一个Kafka集群。Pulsar可以配置多个租户,每个租户可以有多个NameSpace,管理员可以对NameSpace进行访问控制、配额管理。

更灵活的订阅模式

Kafka对消息的划分分为两层:对于属于同一个Group的KafkaConsumer,其获取到的消息是互斥的,即某一条消息只能被Group中的一个Consumer处理;对于不同的Group,某一条消息将同时被两个Group处理,消息是共享的。

而Pulsar提供了更灵活的订阅模式:

  • 独占式:

在任意时间,Topic中的数据只能被Group中的一个Consumer消费,不允许其他Consumer获取消息

  • 主备式:

多个Consumer同时消费同一个Topic时,只有一个Consumer被选为主Consumer,其他Consumer则成为备Consumer。当主Consumer故障时,发生主备倒换,备Consumer中的一个将升主,并继续消息的消费。

  • 共享式:

与Kafka类似,使用共享模式,消息将循环分发给不同的Consumer,当某个Consumer故障时,消息将被重新分配给其他Consumer。

分层存储

Pulsar另一个很有吸引里的特性是,流式数据可以转冷并存储在更廉价的存储介质上。通常为了保证性能,流式处理系统配备高性能的SSD。对于Kafka来说,所有需要保留的消息都必须驻留在昂贵的SSD上。有些时候,数据写入一段时间后已不在会被使用,但仍需保留一段时间存档。Pulsar支持将这种冷数据转储到离线存储系统中,BookKeeper只需要保留一部分热数据,可以节省很多存储成本。该特性无疑是很有价值的,Kafka社区同样在进行设计(KIP-405),但目前还没有实现。

Pulsar的性能指标

Kafka和Pulsar社区都针对性能进行了对比测试。综合来看,由于Pulsar数据落盘时,会进行同步fsync,持久性要比Kafka更高,Pulsar社区对此作出修改后进行对比测试,部分测试结果如下:

*引自Pulsar社区性能测试报告

在100Partition时,默认配置下pulsar的吞吐量距离Kafka差距明显,但当本地持久化等级设置为与Kafka相同时,吞吐量与Kafka基本持平。

*引自Pulsar社区性能测试报告

当Partition数增加到2000个时,Pulsar默认本地持久度的吞吐量基本与Kafka持平。

更多细节请移步SreamNative的benckmarking测试报告:benchmarking pulsar kafka a more accurate perspective onpulsar performance.pdf

MRS上的Pulsar

MRS已发布Pulsar的POC版本,客户可以一键式部署Pulsar服务,包括Broker和Bookie角色。支持在Web UI上修改Pulsar配置、启停、监控。

此外MRS还默认集成了KoP。KoP是Pulsar社区开源的一个插件,运行在Pulsar上,用以兼容Kafka协议。使用时,Kafka客户端可以修改连接地址后直接切换到Pulsar集群上,而不需要修改业务对Kafka客户端的依赖。

在MRS Pulsar的商用版本正在规划中,我们将探索Pulsar在云上使用的更多可能,进一步发挥Pulsar存算分离的优势,降低成本,提升资源利用率,为客户创造更多价值,敬请期待。

点击关注,第一时间了解华为云新鲜技术~

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

谈谈Semaphore工具类 前言 Semaphore简介

发表于 2021-09-08

本文已参与【请查收|你有一次免费申请掘金周边礼物的机会】活动。

前言

有幸申请到掘金送福利活动名额,参与评论就有机会获得掘金官方提供的新版徽章,具体的抽奖细节在文末。

Semaphore简介

Semaphore(信号量)是JUC包下的一个并发工具类,用来控制并发访问临界资源(共享资源)的线程数,确保访问临界资源的线程能够正确、合理的使用公共资源。Semaphore和ReetrantLock一样,都是通过直接或间接的调用AQS框架的方法实现。

Semaphore内部维护了一组虚拟的许可,许可的数量可以通过构造函数的参数指定。访问特定资源前,必须使用acquire方法获得许可,如果许可数量为0,该线程则一直阻塞,直到有可用许可。访问资源后,使用release释放许可。

Semaphore的实现

Semaphore类中所提供的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
java复制代码// 调用该方法后线程会从许可集中尝试获取一个许可
public void acquire()

// 线程调用该方法时会释放已获取的许可
public void release()

// Semaphore构造方法:permits→许可集数量
Semaphore(int permits)

// Semaphore构造方法:permits→许可集数量,fair→公平与非公平
Semaphore(int permits, boolean fair)

// 从信号量中获取许可,该方法不响应中断
void acquireUninterruptibly()

// 返回当前信号量中未被获取的许可数
int availablePermits()

// 获取并返回当前信号量中立即未被获取的所有许可
int drainPermits()

// 返回等待获取许可的所有线程Collection集合
protected Collection<Thread> getQueuedThreads();

// 返回等待获取许可的线程估计数量
int getQueueLength()

// 查询是否有线程正在等待获取当前信号量中的许可
boolean hasQueuedThreads()

// 返回当前信号量的公平类型,如为公平锁返回true,非公平锁为false
boolean isFair()

// 获取当前信号量中一个许可,当没有许可可用时直接返回false不阻塞线程
boolean tryAcquire()

// 在给定时间内获取当前信号量中一个许可,超时还未获取成功则返回false
boolean tryAcquire(long timeout, TimeUnit unit)

Semaphore的实现是基于AQS的共享锁,分为公平和非公平两种模式。

Sync实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
java复制代码abstract static class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 1192457210091910933L;

//初始的许可数量,同步锁的state保存许可的数量
Sync(int permits) {
setState(permits);
}

//获取许可数量
final int getPermits() {
return getState();
}

//非公平的方式获取共享锁
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
//可用许可数减去需求的许可数
int remaining = available - acquires;

//许可数大于0时,CAS获取许可
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

//释放许可
protected final boolean tryReleaseShared(int releases) {
for (;;) {
int current = getState();
//许可数加锁释放的许可数
int next = current + releases;
if (next < current) // overflow
throw new Error("Maximum permit count exceeded");
//CAS更新许可,直到成功
if (compareAndSetState(current, next))
return true;
}
}

//减少许可
final void reducePermits(int reductions) {
for (;;) {
int current = getState();
int next = current - reductions;
if (next > current) // underflow
throw new Error("Permit count underflow");
//
if (compareAndSetState(current, next))
return;
}
}

//将许可消耗完
final int drainPermits() {
for (;;) {
int current = getState();
if (current == 0 || compareAndSetState(current, 0))
return current;
}
}
}

公平的FairSync实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

//公平方式获取许可
protected int tryAcquireShared(int acquires) {
for (;;) {
//当前节点有前驱节点,则获取失败
if (hasQueuedPredecessors())
return -1;
//无前驱节点时,CAS方式获取许可
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

公平锁的实现与非公平锁的不同点在于:公平锁的模式下获取锁,会先调用hasQueuedPredecessors()方法判断同步队列中是否存在节点,如果存在则直接返回-1回到acquireSharedInterruptibly()方法if(tryAcquireShared(arg)<0)判断,调用doAcquireSharedInterruptibly(arg)方法将当前线程封装成Node.SHARED共享节点加入同步队列等待。如果队列中不存在节点则尝试直接获取锁或许可。

非公平的Sync实现

1
2
3
4
5
6
7
8
9
10
11
scala复制代码static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}
//直接使用Sync的nonfairTryAcquireShared()实现,非公平方式获取许可
protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

Semaphore中的非公平锁NonfairSync类的构造函数是基于调用父类Sync构造函数完成的,而在创建Semaphore对象时传入的许可数permits最终则会传递给AQS同步器的同步状态标识state,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
java复制代码// 父类 - Sync类构造函数
Sync(int permits) {
setState(permits); // 调用AQS内部的set方法
}

// AQS(AbstractQueuedSynchronizer)同步器
public abstract class AbstractQueuedSynchronizer
extends AbstractOwnableSynchronizer {
// 同步状态标识
private volatile int state;

protected final int getState() {
return state;
}
protected final void setState(int newState) {
state = newState;
}
// 对state变量进行CAS操作
protected final boolean compareAndSetState(int expect, int update) {
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}
}

Semaphore对象创建时传入的许可数permits,其实最终是在对AQS内部的state进行初始化。初始化完成后,state代表着当前信号量对象的可用许可数。

非公平锁NonfairSync获取许可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
java复制代码// Semaphore类 → acquire()方法
public void acquire() throws InterruptedException {
// Sync类继承AQS,此处直接调用AQS内部的acquireSharedInterruptibly()方法
sync.acquireSharedInterruptibly(1);
}

// AbstractQueuedSynchronizer类 → acquireSharedInterruptibly()方法
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
// 判断是否出现线程中断信号(标志)
if (Thread.interrupted())
throw new InterruptedException();
// 如果tryAcquireShared(arg)执行结果不小于0,则线程获取同步状态成功
if (tryAcquireShared(arg) < 0)
// 未获取成功加入同步队列阻塞等待
doAcquireSharedInterruptibly(arg);
}

信号量获取许可的方法acquire()最终是通过Sync对象调用AQS内部的acquireSharedInterruptibly()方法完成的,而acquireSharedInterruptibly()在获取同步状态标识的过程中是可以响应线程中断操作的,如果该操作没有没中断,则首先调用tryAcquireShared(arg)尝试获取一个许可数,获取成功则返回执行业务,方法结束。如果获取失败,则调用doAcquireSharedInterruptibly(arg)将当前线程加入同步队列阻塞等待。tryAcquireShared(arg)方法是AQS提供的方法,没有具体实现,在NonfairSync类中的实现如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
arduino复制代码    // Semaphore类 → NofairSync内部类 → tryAcquireShared()方法
protected int tryAcquireShared(int acquires) {
// 调用了父类Sync中的实现方法
return nonfairTryAcquireShared(acquires);
}

// Syn类 → nonfairTryAcquireShared()方法
abstract static class Sync extends AbstractQueuedSynchronizer {
final int nonfairTryAcquireShared(int acquires) {
// 开启自旋死循环
for (;;) {
int available = getState();
int remaining = available - acquires;
// 判断信号量中可用许可数是否已<0或者CAS执行是否成功
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

首先获取到state值后,减去一得到remaining值,若不小于0则代表着当前信号量中还有可用许可,当前线程开始尝试cas更新state值,cas成功则代表获取同步状态成功,返回remaining值。反之,如果remaining值小于0则代表着信号量中的许可数已被其他线程获取,目前不存在可用许可数,直接返回小于0的remaining值,nonfairTryAcquireShared(acquires)方法执行结束,回到AQS的acquireSharedInterruptibly()方法。当返回的remaining值小于0时,if(tryAcquireShared(arg)<0)条件成立,进入if执行doAcquireSharedInterruptibly(arg)方法将当前线程加入同步队列阻塞,等待其他线程释放同步状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
java复制代码// AbstractQueuedSynchronizer类 → doAcquireSharedInterruptibly()方法
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
// 创建节点状态为Node.SHARED共享模式的节点并将其加入同步队列
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
// 开启自旋操作
for (;;) {
final Node p = node.predecessor();
// 判断前驱节点是否为head
if (p == head) {
// 尝试获取同步状态state
int r = tryAcquireShared(arg);
// 如果r不小于0说明获取同步状态成功
if (r >= 0) {
// 将当前线程结点设置为头节点并唤醒后继节点线程
setHeadAndPropagate(node, r);
p.next = null; // 置空方便GC
failed = false;
return;
}
}
// 调整同步队列中node节点的状态并判断是否应该被挂起
// 并判断是否存在中断信号,如果需要中断直接抛出异常结束执行
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
// 结束该节点线程的请求
cancelAcquire(node);
}
}

// AbstractQueuedSynchronizer类 → setHeadAndPropagate()方法
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // 获取同步队列中原本的head头节点
setHead(node); // 将传入的node节点设置为头节点
/*
* propagate=剩余可用许可数,h=旧的head节点
* h==null,(h=head)==null:
* 非空判断的标准写法,避免原本head以及新的头节点node为空
* 如果当前信号量对象中剩余可用许可数大于0或者
* 原本头节点h或者新的头节点node不是结束状态则唤醒后继节点线程
*
* 写两个if的原因在于避免造成不必要的唤醒,因为很有可能唤醒了后续
* 节点的线程之后,还没有线程释放许可/锁,从而导致再次陷入阻塞
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
// 避免传入的node为同步队列的唯一节点,
// 因为队列中如果只存在node一个节点,那么后驱节点s必然为空
if (s == null || s.isShared())
doReleaseShared(); // 唤醒后继节点
}
}

释放许可

公平锁释放许可的逻辑与非公平锁的实现是一致的,因为都是Sync类的子类,而释放锁的逻辑都是对state减一更新后,唤醒后继节点的线程。所以关于释放锁的具体实现则是交由Sync类实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
arduino复制代码// Semaphore类 → release()方法
public void release() {
sync.releaseShared(1);
}

// AbstractQueuedSynchronizer类 → releaseShared(arg)方法
public final boolean releaseShared(int arg) {
// 调用子类Semaphore中tryReleaseShared()方法实现
if (tryReleaseShared(arg)) {
doReleaseShared();
return true;
}
return false;
}

释放锁则调用的是Semaphore.release()方法,调用该方法之后线程持有的许可会被释放,同时permits/state加一

与之前获取许可的方法一样,Semaphore释放许可的方法release()也是通过间接调用AQS内部的releaseShared(arg)完成。因为AQS的releaseShared(arg)是魔法方法,所以最终的逻辑实现由Semaphore的子类Sync完成,如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
arduino复制代码// Semaphore类 → Sync子类 → tryReleaseShared()方法
protected final boolean tryReleaseShared(int releases) {
for (;;) {
// 获取AQS中当前同步状态state值
int current = getState();
// 对当前的state值进行增加操作
int next = current + releases;
// 不可能出现,除非传入的releases为负数
if (next < current)
throw new Error("Maximum permit count exceeded");
// CAS更新state值为增加之后的next值
if (compareAndSetState(current, next))
return true;
}
}

对比获取许可的逻辑要简单许多,只需更新state值后调用doReleaseShared()方法唤醒后继节点线程即可。但是调用doReleaseShared()方法的线程会存在两种:

一是释放共享锁/许可数的线程。调用release()方法释放许可时必然调用它唤醒后继线程

二是刚获取到共享锁/许可数的线程。

总结

在初始化时传递的许可数/计数器最终都会间接的传递给AQS的同步状态标识state。当一条线程尝试获取共享锁时,会对state减一,当state为0时代表没有可用共享锁了,其他后续请求的线程会被封装成共享节点加入同步队列等待,直至其他持有共享锁的线程释放(state加一)。与独占模式不同的是:共享模式中,除开释放锁时会唤醒后继节点的线程外,获取共享锁成功的线程也会在满足一定条件下唤醒后继节点。关于AQS具体可参见之前的文章:java锁:AQS详解(一)
java锁:AQS详解(二)

抽奖说明

1.参与评论即可(讨论技术相关内容);

2.抽奖规则:如果本文评论达到掘金活动的要求,热评区前两名分别获赠掘金新版徽章一枚(若无热评将从评论区抽取两位幸运用户);

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

字节跳动开源内部微服务中间件 CloudWeGo

发表于 2021-09-08

今天,字节跳动正式宣布开源 CloudWeGo。这是一套以 Golang 语言为核心、专注于微服务通信与治理的中间件集合,具有高性能、可扩展、高可靠的特点。

项目地址:github.com/cloudwego

作为项目维护方,字节跳动基础架构团队已推进主要业务项目以 CloudWeGo 开源库为主进行迭代,未来,团队将坚持内外维护一套代码,统一迭代演进,并逐步开放更多内部微服务最佳实践。

CloudWeGo 概述

多年来,在亿级流量背后,字节跳动基础架构团队开发的技术底座支撑着庞大的微服务生态系统,从 2018 年至今,团队维护的在线微服务数量增长了近 600%,超过 5 万。面对这样的规模和增速,效率、高可扩展和稳定性成了构建 CloudWeGo 的核心。

CloudWeGo 第一批开源了四个项目,以 RPC 框架 Kitex 和网络库 Netpoll 为主,关于它们的技术实践,可以参考此前发布的两篇文章:《字节跳动 Go RPC 框架 Kitex 性能优化实践》、《字节跳动在 Go 网络库上的实践》。

Kitex

Kitex 是一个 Golang 微服务 RPC 框架,具有高性能、强可扩展的特点。

字节跳动使用 Golang 作为主要业务开发语言。早在 2016 年,基础架构团队就已经开始基于 Golang 开发内部框架 Kite,并在 2019 年围绕性能和可扩展启动重构,使其能更好地满足业务发展和技术体系演进需求。

2020 年 10 月,Kitex 正式上线。虽然是一个全新的框架,但经过一年的线上应用,目前字节跳动内部已有超过 50% 的 Golang 微服务迁移到了 Kitex。

下面是 Kitex 的一些特性:

  • 高性能:Kitex 默认集成自研网络库 Netpoll 作为网络传输模块,性能相较使用 go net 有显著优势;除了网络库带来的性能收益,Kitex 对 Thrift 编解码也做了优化,这在之前的性能优化实践文章中有介绍。关于性能数据,可参考 github.com/cloudwego/kitex-benchmark。
  • 扩展性:Kitex 设计上做了模块划分,提供了较多的扩展接口以及默认的扩展实现,使用者也可以根据需要自行定制扩展。Kitex 未耦合 Netpoll,开发者可选择其他网络库扩展使用。
  • 消息协议:RPC 消息协议默认支持 Thrift、Kitex Protobuf、gRPC。Thrift 支持 Buffered 和 Framed 二进制协议;Kitex Protobuf 是 Kitex 自定义的 Protobuf 消息协议,协议格式类似 Thrift;gRPC 是对 gRPC 消息协议的支持,可以与 gRPC 互通。除此之外,使用者也可以扩展自己的消息协议。
  • 传输协议:传输协议封装消息协议进行 RPC 互通,传输协议可以额外透传元信息,用于服务治理,Kitex 支持的传输协议有 TTHeader、HTTP2。TTHeader 可以和 Thrift、Kitex Protobuf 结合使用;HTTP2 目前主要是结合 gRPC 协议使用,后续也会支持 Thrift。
  • 多消息类型:支持 PingPong、Oneway、双向 Streaming。其中 Oneway 目前只对 Thrift 协议支持,双向 Streaming 只对 gRPC 支持,后续会考虑支持 Thrift 的双向 Streaming。
  • 服务治理:支持服务注册/发现、负载均衡、熔断、限流、重试、监控、链路跟踪、日志、诊断等服务治理模块,大部分均已提供默认扩展,使用者可选择集成。
  • 代码生成工具:Kitex 内置代码生成工具,可支持生成 Thrift、Protobuf 以及脚手架代码。原生的 Thrift 代码由本次一起开源的 Thriftgo 生成,Kitex 对 Thrift 优化代码由 Kitex Tool 作为插件生成。Kitex 目前暂未对 Protobuf 单独支持 IDL 解析和代码生成,作为官方 protoc 插件生成 Protobuf 代码。

Netpoll

Netpoll 是字节跳动内部的 Golang 高性能、I/O 非阻塞的网络库,专注于 RPC 场景。

RPC 通常有较重的处理逻辑(业务逻辑、编解码),耗时长,不能像 Redis 一样采用串行处理(必须异步)。而 Go 的标准库 net 设计了 BIO(Blocking I/O) 模式的 API,为了保证异步处理,RPC 框架设计上需要为每个连接都分配一个 goroutine,这在空闲连接较多时,产生大量的空闲 goroutine,增加调度开销。此外,net.Conn 没有提供检查连接活性的 API,很难设计出高效的连接池,池中的失效连接无法及时清理,复用低效。
开源社区目前缺少专注于 RPC 方案的 Go 网络库。类似的项目如:evio 、gnet 等,均面向 Redis、Haproxy 这样的场景。

因此 Netpoll 应运而生,它借鉴了 evio 和 Netty 的优秀设计,具有出色的性能,更适用于微服务架构。

Thriftgo

Thriftgo 是 Go 语言实现的 Thrift IDL 解析和代码生成器,支持完善的 Thrift IDL 语法和语义检查,相较 Apache Thrift 官方的 Golang 生成代码,Thriftgo 做了一些问题修复且支持插件机制,用户可根据需求自定义生成代码。

Kitex 的代码生成工具就是 Thriftgo 的插件,CloudWeGo 近期也会开源另一个 Thriftgo 的插件 thrift-gen-validator,支持 Thrift IDL Validator 用于字段值校验,解决开发者需要自行实现代码校验逻辑的负担。

Thriftgo 目前虽然仅支持生成 Golang Thrift 代码,但其定位是可支持各语言的 Thrift 代码生成,未来如果有需求,我们会考虑生成其他语言的代码。同时我们也会尝试将其回馈至 Apache Thrift 社区。

Netpoll-http2

Netpoll-http2 基于 Golang 标准库 golang.org/x/net/http2 的源码替换 go net 为 Netpoll,目前用于 Kitex 对 gRPC 协议的支持。

内外版本维护

完整的微服务体系离不开底层云计算生态,无论是公有云、私有云还是本地基础设施等环境,开发者要搭建微服务,离不开配套的微服务治理,如治理平台、监控、链路跟踪、注册/发现、配置中心、服务网格等,此外还存在一些定制的规范。

字节跳动自然也有完善的内部服务支持微服务体系,但这些服务短期内无法开源。为了遵守长期投入承诺,内外维护一套代码、统一迭代,基础架构团队已经将与内部生态没有耦合的项目,如 Netpoll,直接迁移到 CloudWeGo 开源库,并将内部依赖调整为开源库。

而对于需要集成治理能力融入微服务体系的 Kitex,我们基于其扩展性,对内外部代码做了拆分,把 Kitex 的核心代码迁移到开源库,内部库封装一层壳保证内部用户无感知升级,而集成内部治理特性的模块则作为 Kitex 的扩展保留在内部库。未来,我们也会持续把已经在字节跳动内部经过稳定性验证的新特性,迁移到开源库。

对于使用 CloudWeGo 的开发者,大家同样可以对 Kitex 进行扩展,将 Kitex 融入自己所在组织的微服务体系中。我们也诚挚欢迎开发者能贡献自己的扩展到 kitex-contrib,为更多用户提供便利。

RoadMap

对于基础架构团队,CloudWeGo 不仅仅是一个开源项目,它也是一个真实的超大规模企业级实践项目。通过开源,我们希望 CloudWeGo 能丰富云原生社区的 Golang 工具体系,为更多开发者和企业搭建云原生化的大规模分布式系统,提供一种现代的、资源高效的的技术方案。

如前文所述,目前 CloudWeGo 只开源了第一批项目,未来,我们会进一步推动其走向完善:

  • 继续开源其他内部项目。我们会开源更多字节跳动常用 Golang 项目,如 HTTP 框架 Hertz、基于共享内存的 IPC 通信库 ShmIPC 等,为开发者提供更多场景的微服务需求支持。此前,我们已将部分 Golang 基础工具库开源,统一在 bytedance/gopkg 维护,感兴趣的开发者也可以关注。
  • 逐步开源经验证的、稳定的特性。CloudWeGo 的主要项目均为字节内部微服务提供支持,许多新特性仍在内部验证,相对成熟后我们会逐步开源,如对 ShmIPC 的集成、无序列化、无生成代码的支持等。
  • 结合内外部用户需求,持续迭代。项目开源后,我们也会根据开发者需求开展迭代。例如近一个月来,我们收到了来自开发者的大量关于 Protobuf 的诉求,为了提供良好的支持,帮助大家快速搭建自己的微服务体系,我们已经在筹备开展 Kitex 对 Protobuf 支持的性能优化。

欢迎大家向 CloudWeGo 提交 issue,也欢迎大家提交 PR 共建 CloudWeGo。我们诚心期待更多的开发者加入,也期待 CloudWeGo 助力越来越多的企业快速构建云原生架构。

相关链接

  • 项目地址:github.com/cloudwego
  • 项目官网:www.cloudwego.io

加入飞书社群:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…535536537…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%