开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

重新认识 Java 中的队列

发表于 2021-11-19

「这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战」

本文被《从小工到专家的 Java 进阶之旅》收录。

你好,我是看山。

书接上文,上次聊了聊 在多线程中使用 ArrayList 会发生什么,这次我们说说平时常用的列表:Vector、ArrayList、CopyOnWriteArrayList、SynchronizedList。

Vector

Vector是在 JDK 1.0 提供的,虽然没有被标记Deprecated,但是事实上已经没人使用了。主要原因是性能差,且不符合需求。

从源码可以看出(这里不贴源码了),Vector是基于数组实现,几乎在所有操作方法上,都用synchronized关键字实现方法同步,这种同步方式可以对单一操作进行加锁,比如多个线程同时执行add会同步阻塞执行,但是多线程执行add和remove时,就不会阻塞了。

但是,大部分需要对队列加锁的场景,是想对整个队列加锁,而不仅仅是对单一操作加锁。也就是说,Vector和我们的期望不同,但是又额外增加了同步操作带来的性能开销。所以,不是必须使用的场景,都可以使用ArrayList代替,即使是多线程情况下需要同步队列,也可以使用CopyOnWriteArrayList和SynchronizedList代替。

ArrayList

ArrayList是在 JDK 1.1 提供的,作为Vector的继任者(ArrayList实现方式与Vector几乎完全相同),ArrayList把方法上的synchronized全部去掉了,完全没有实现同步,是非线程安全的。

它的非线程安全,还体现在迭代器的快速失败上。在使用方法iterator和listIterator创建迭代器之后,如果还对原来的ArrayList队列进行修改(add 或 remove),迭代器迭代的时候就会报ConcurrentModificationException异常。从源码可以看出,迭代器在迭代过程中,会检查队列中修改次数modCount与创建迭代器时落下的修改次数快照expectedModCount是否相等,相等表示没有修改过,代码如下:

1
2
3
4
5
6
7
8
java复制代码private class Itr implements Iterator<E> {
    // 这段代码是从 ArrayList 中摘取的
    // 只留下检查方法,略过其他代码,有兴趣的可以从源码中查看
    final void checkForComodification() {
        if (modCount != expectedModCount)
            throw new ConcurrentModificationException();
    }
}

第三点是在多线程场景中,添加元素可能会丢失数据,或者发生数组越界异常,在多线程中使用 ArrayList 会发生什么 有详细描述,这里就不赘述了。

SynchronizedList

SynchronizedList是Collections的静态内部类,使用Collections.synchronizedList()静态方法创建,是一个通过组合List类实现的封装实现。它的大多数方法通过synchronized (mutex){...}代码块同步方式,因为加锁对象mutex是队列对象中定义的相同对象,所以对mutex加锁时,就实现对整个队列加锁,也就解决了Vector不能对整个队列加锁的问题。所以如果有多个线程同时操作add和remove方法,会阻塞同步执行。

ArrayList中存在的迭代器快速失败情况,依然存在,正如下面源码中的注释:想要使用迭代器,需要用户手动实现同步。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
java复制代码static class SynchronizedList<E>
    extends SynchronizedCollection<E>
    implements List<E> {
    
    // 代码摘自 Collections,省略很多代码

    public void add(int index, E element) {
        synchronized (mutex) {list.add(index, element);}
    }

    public ListIterator<E> listIterator() {
        return list.listIterator(); // Must be manually synched by user
    }

    public ListIterator<E> listIterator(int index) {
        return list.listIterator(index); // Must be manually synched by user
    }
}

手动同步的时候需要注意,既然我们关注的全局同步,在迭代器设置同步的时候,要保证锁定对象与add等方法中对象相同。这个在后续补充说明,这里就不展开了。

CopyOnWriteArrayList

CopyOnWriteArrayList是从 JDK 1.5 开始提供的,先看看add方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
java复制代码public class CopyOnWriteArrayList<E>
    implements List<E>, RandomAccess, Cloneable, java.io.Serializable {

    /** The lock protecting all mutators */
    final transient ReentrantLock lock = new ReentrantLock();

    /** The array, accessed only via getArray/setArray. */
    private transient volatile Object[] array;

    // 代码摘自 CopyOnWriteArrayList,省略很多代码

    public boolean add(E e) {
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            Object[] newElements = Arrays.copyOf(elements, len + 1);
            newElements[len] = e;
            setArray(newElements);
            return true;
        } finally {
            lock.unlock();
        }
    }

    public boolean addAll(Collection<? extends E> c) {
        Object[] cs = (c.getClass() == CopyOnWriteArrayList.class) ?
            ((CopyOnWriteArrayList<?>)c).getArray() : c.toArray();
        if (cs.length == 0)
            return false;
        final ReentrantLock lock = this.lock;
        lock.lock();
        try {
            Object[] elements = getArray();
            int len = elements.length;
            if (len == 0 && cs.getClass() == Object[].class)
                setArray(cs);
            else {
                Object[] newElements = Arrays.copyOf(elements, len + cs.length);
                System.arraycopy(cs, 0, newElements, len, cs.length);
                setArray(newElements);
            }
            return true;
        } finally {
            lock.unlock();
        }
    }

    private E get(Object[] a, int index) {
        return (E) a[index];
    }

    /**
     * {@inheritDoc}
     *
     * @throws IndexOutOfBoundsException {@inheritDoc}
     */
    public E get(int index) {
        return get(getArray(), index);
    }
}

可以看到,CopyOnWriteArrayList借助ReentrantLock实现同步,在synchronized优化之前,ReentrantLock性能高于synchronized。CopyOnWriteArrayList也是通过数组实现的,但是在数组前面增加了volatile关键字,实现了多线程情况下数组的可见性,更加安全。更重要的一点是,CopyOnWriteArrayList在add添加元素的时候,实现方式是重建数组对象,替换原来的数组引用。与ArrayList的扩容方式相比,减少了空间,但是也增加了赋值数组的性能开销。在get获取元素的时候,没有任何锁,直接数据返回。

CopyOnWriteArrayList的迭代器时通过COWIterator实现的,调用iterator方法时,将当前队列中数组的快照赋值到迭代器中的数组引用上。如果原来的队列发生修改,队列中数组会指向别的引用,而迭代器中的数组不会发生变化,所以在多线程执行过程中,通过迭代器遍历数组,也可以修改队列中的数据。这种方式保障线程安全的同时,也可能会出现数据不一致的情况,只能是使用的使用多注意了。

1
2
3
4
5
6
7
8
9
10
11
java复制代码static final class COWIterator<E> implements ListIterator<E> {
    /** Snapshot of the array */
    private final Object[] snapshot;
    /** Index of element to be returned by subsequent call to next.  */
    private int cursor;

    private COWIterator(Object[] elements, int initialCursor) {
        cursor = initialCursor;
        snapshot = elements;
    }
}

对比 CopyOnWriteArrayList 和 SynchronizedList

CopyOnWriteArrayList和SynchronizedList都实现了同步,实现方式上采用的是不同策略,各自的侧重点不同。

CopyOnWriteArrayList侧重于读写分离,发生数据写操作(add或remove)时,会加锁,各个线程阻塞执行,执行过程会创建数据副本,替换对象引用;如果同时有读操作(get或iterator),读操作读取的是老数据,或者成为历史数据快照,或者成为缓存数据。这会造成读写同时发生时,数据不一致的情况,但是数据最终会一致。这种方式与数据库读写分离模式几乎相同,很多特性可以类比。

SynchronizedList侧重数据强一致,也就是说当发生数据写操作(add或remove)时,会加锁,各个线程阻塞执行,而且也会通过相同的锁阻塞get操作。

从CopyOnWriteArrayList和SynchronizedList两种不同事项方式,可以推断CopyOnWriteArrayList在写少读多的场景中执行效率高,SynchronizedList的读写操作效率很均衡,所以在写多读多、写多读少的场景执行效率都会高于CopyOnWriteArrayList。借用网上的测试结果:

图片

对比 CopyOnWriteArrayList 和 SynchronizedList

文末总结

  1. synchronized关键字在 JDK 8 之前性能比较差,可以看到 JDK1.5 之后实现的同步代码,很多是通过ReentrantLock实现的。
  2. 多线程场景中除了需要考虑同步外,还需要考虑数据可见性,可以通过volatile关键字实现。
  3. ArrayList完全没有同步操作,是非线程安全的
  4. CopyOnWriteArrayList和SynchronizedList属于线程安全队列
  5. CopyOnWriteArrayList实现读写分离,适合场景是写少读多的场景
  6. SynchronizedList要求数据强一致,是队列全局加锁方式,读操作也会加锁
  7. Vector只是在迭代器遍历性能很差,如果不考虑全局锁定队列,单纯读操作和单独写操作性能与SynchronizedList相差不大。

推荐阅读

  • JDK中居然也有反模式接口常量
  • java import 导入包时,我们需要注意什么呢?
  • Java 并发基础(一):synchronized 锁同步
  • Java 并发基础(二):主线程等待子线程结束
  • Java 并发基础(三):再谈 CountDownLatch
  • Java 并发基础(四):再谈 CyclicBarrier
  • Java 并发基础(五):面试实战之多线程顺序打印
  • 如果非要在多线程中使用ArrayList会发生什么?
  • 如果非要在多线程中使用 ArrayList 会发生什么?(第二篇)
  • 重新认识 Java 中的队列
  • Java 中 Vector 和 SynchronizedList 的区别
  • 一文掌握 Java8 Stream 中 Collectors 的 24 个操作
  • 一文掌握 Java8 的 Optional 的 6 种操作

你好,我是看山。游于码界,戏享人生。如果文章对您有帮助,请点赞、收藏、关注。欢迎关注公众号「看山的小屋」,发现不一样的世界。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

很酷的自定义博客网址

发表于 2021-11-19

「这是我参与11月更文挑战的第17天,活动详情查看:2021最后一次更文挑战」

写在前面:
大家好,我是 花狗Fdog ,来自内蒙古的一个小城市,目前在泰州读书。
很感谢能有这样一个平台让我能够在这里分享所学所感。
我喜欢编程,喜欢代码,喜欢去做一个程序员。
努力学习,争取多年后,给亲人更好的生活。
QQ / WX:2506897252 欢迎交流。


第一步,先去百度云,腾讯云,或者阿里云的旗下,去购买一个域名,他们都有新用户体验活动,第一次购买.com域名只需要20元,选择一个自己心怡的域名,购买好以后,进行实名验证,这个阶段只需要1个小时就OK了,这期间不耽误我们继续,接着注册我们的github,如果不会注册,自己百度,假设你已经注册好了自己的github,登陆账号,进入到这个页面:
在这里插入图片描述点击左上面的New,新建一个仓库:
在这里插入图片描述

仓库的名字格式是:用户名.github.io 一定要这样,这很关键,比如我的用户名是FdogMain,仓库名就应该是FdogMain.github.io。建好后,创建两个文件,红色箭头创建文件。

在这里插入图片描述 =600x)
创建第一个名叫CNAME的文件,内容为你购买的域名名:

1
c复制代码fdogcsdn.com  //这是我购买的域名

然后保存,建第二个叫index.html的文件,内容如下:

1
2
3
c复制代码<script>  
window.location.href="https://blog.csdn.net/fdog_" //这里写你的CSDN博客地址。
</script>

然后保存,我们进入购买域名的网站进行域名解析,
在这里插入图片描述
我购买的是百度的域名,如图,点击域名管理。
在这里插入图片描述
点击解析,我们这里不需要备案。
在这里插入图片描述

在这里插入图片描述这里的主机记录就是你的仓库名。
在这里插入图片描述
到这里就大功告成了,效果如下,喜欢的话,关注一波。

在这里插入图片描述


若有错误,欢迎指正批评,欢迎讨论。
==闪光的未必都是金子,而沉默的也不一定就是石头。世界上那些最轻易的事情中,拖延时间最不费力。每一天的努力,只是为了让远方变得更近一些。==

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

ADG单实例搭建系列之 (DBCA) 一、介绍 二、环境准备

发表于 2021-11-19

「这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战」

一、介绍

The Database Configuration Assistant (DBCA) can also be used as a simple command-line method to create an Oracle Data Guard physical standby database.

The DBCA command qualifier used to create the physical standby database is createDuplicateDB.

1
2
3
4
5
6
7
bash复制代码dbca -createDuplicateDB 
-gdbName global_database_name
-primaryDBConnectionString easy_connect_string_to_primary
-sid database_system_identifier
[-createAsStandby
[-dbUniqueName db_unique_name_for_standby]]
[-customScripts scripts_list]

更详细参数可参考:The createDuplicateDB command creates a duplicate of an Oracle database.

Notes:

1、12.2.0.1开始支持DBCA创建物理备库

1
2
3
4
5
bash复制代码##限制:
1.主库必须是单机环境,非RAC数据库。
2.主库必须是非CDB环境。

DBCA can only be used to create standby databases for non-multitenant primary databases. In addition, this capability creates only single instance standby databases, not Oracle Real Application Clusters (Oracle RAC) databases. If required, the standby can then be converted to an Oracle RAC standby database, either manually or using Oracle Enterprise Manager Cloud Control.

2、18c之后,以上限制已经取消,支持主库是CDB或者RAC环境。

二、环境准备

主机名 ip DB Version db_name db_unique_name
主库 orcl 192.168.1.172 19.3.0.0 orcl orcl
备库 orcl_stby 192.168.1.180 19.3.0.0 orcl orcl_stby

Notes:

1、db_unique_name主备库不能相同。

2、db_name主备库需保持一致。

3、主备库DB版本需保持一致。

三、搭建过程

1、Oracle软件安装

主库一键安装:

1
bash复制代码./AllOracleSilent.sh -i 192.168.1.172 -d 19c -n orcl -o orcl -b /u01/app -s AL32UTF8

备库一键安装:(备库仅安装ORACLE软件,不建库)

1
bash复制代码./AllOracleSilent.sh -i 192.168.1.180 -d 19c -w Y -n orcl_stby -o orcl -b /u01/app -s AL32UTF8

一键安装脚本可参考:ORACLE一键安装单机11G/12C/18C/19C并建库脚本

2、环境配置

a.配置hosts文件

主库:

1
2
3
4
5
bash复制代码cat <<EOF >> /etc/hosts
##FOR DG BEGIN
192.168.1.180 orcl_stby
##FOR DG END
EOF

备库:

1
2
3
4
5
bash复制代码cat <<EOF >> /etc/hosts
##FOR DG BEGIN
192.168.1.172 orcl
##FOR DG END
EOF

b.配置静态监听和TNS

主库+备库:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
bash复制代码##listener.ora
vi /u01/app/oracle/product/19.3.0/db/network/admin/listener.ora
##添加
SID_LIST_LISTENER =
(SID_LIST =
(SID_DESC =
(GLOBAL_DBNAME = orcl)
(ORACLE_HOME = /u01/app/oracle/product/19.3.0/db)
(SID_NAME = orcl)
)
)

##重启监听
su - oracle -c "lsnrctl stop"
su - oracle -c "lsnrctl start"

##tnsnames.ora
su - oracle -c "cat <<EOF >> /u01/app/oracle/product/19.3.0/db/network/admin/tnsnames.ora
##FOR DG BEGIN
ORCL =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = orcl)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = orcl)
)
)
ORCL_STBY =
(DESCRIPTION =
(ADDRESS_LIST =
(ADDRESS = (PROTOCOL = TCP)(HOST = orcl_stby)(PORT = 1521))
)
(CONNECT_DATA =
(SERVICE_NAME = orcl_stby)
)
)
##FOR DG BEGIN
EOF"

c.主库配置参数

1
2
3
4
5
6
7
8
9
10
11
12
13
sql复制代码SQL> select force_logging,log_mode,cdb from gv$database;

FORCE_LOGGING LOG_MODE CDB
--------------------------------------- ------------ ---
YES ARCHIVELOG YES

##开启方式
alter database force logging;
shutdown immediate
startup mount
alter database archivelog;
alter database open;
alter pluggable database all open;

d.主库添加stanby log文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
sql复制代码set line222
col member for a60
select t2.thread#,t1.group#,t1.member,t2.bytes/1024/1024 from gv$logfile t1,gv$log t2 where t1.group#=t2.group#;

THREAD# GROUP# MEMBER T2.BYTES/1024/1024
---------- ---------- ------------------------------------------------------------ ------------------
1 3 /oradata/ORCL/redo03.log 120
1 2 /oradata/ORCL/redo02.log 120
1 1 /oradata/ORCL/redo01.log 120

--需要注意:
--1.stanby log日志大小与redo log日志保持一致
--2.stanby log数量:
standby logfile=(1+logfile组数)*thread=(1+3)*1=4组,需要加4组standby logfile.
--3.thread要与redo log保持一致,如果是rac,需要增加多个thread对应的standby log

ALTER DATABASE ADD STANDBY LOGFILE thread 1
group 4 ('/oradata/ORCL/standby_redo04.log') SIZE 120M,
group 5 ('/oradata/ORCL/standby_redo05.log') SIZE 120M,
group 6 ('/oradata/ORCL/standby_redo06.log') SIZE 120M,
group 7 ('/oradata/ORCL/standby_redo07.log') SIZE 120M;

SQL> select t2.thread#,t1.group#,t1.member,t2.bytes/1024/1024 from gv$logfile t1,gv$standby_log t2 where t1.group#=t2.group#;

THREAD# GROUP# MEMBER T2.BYTES/1024/1024
---------- ---------- ------------------------------------------------------------ ------------------
1 4 /oradata/ORCL/standby_redo04.log 120
1 5 /oradata/ORCL/standby_redo05.log 120
1 6 /oradata/ORCL/standby_redo06.log 120
1 7 /oradata/ORCL/standby_redo07.log 120

3、 DBCA AsStandby

1
2
3
4
5
6
7
8
9
bash复制代码dbca -silent -createDuplicateDB \
-gdbName orcl \
-sid orcl \
-sysPassword oracle \
-primaryDBConnectionString 192.168.1.172:1521/orcl \
-nodelist orcl_stby \
-databaseConfigType SINGLE \
-createAsStandby -dbUniqueName orcl_stby \
-datafileDestination '/oradata'

4、设置主库+备库DG参数

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
sql复制代码--主库设置DG参数
ALTER SYSTEM SET LOG_ARCHIVE_CONFIG='DG_CONFIG=(ORCL,ORCL_STBY)';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_1='LOCATION=/archivelog VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=ORCL';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_2='SERVICE=orcl_stby ASYNC VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAME=ORCL_STBY';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_STATE_2=ENABLE;
ALTER SYSTEM SET LOG_ARCHIVE_FORMAT='%t_%s_%r.arc' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_ARCHIVE_MAX_PROCESSES=4;
ALTER SYSTEM SET REMOTE_LOGIN_PASSWORDFILE=EXCLUSIVE SCOPE=SPFILE;
ALTER SYSTEM SET FAL_SERVER=ORCL_STBY;
ALTER SYSTEM SET FAL_CLIENT=ORCL;
ALTER SYSTEM SET DB_FILE_NAME_CONVERT='/oradata/ORCL','/oradata/ORCL_STBY' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_FILE_NAME_CONVERT='/oradata/ORCL','/oradata/ORCL_STBY' SCOPE=SPFILE;
ALTER SYSTEM SET STANDBY_FILE_MANAGEMENT=AUTO;

--备库设置DG参数
ALTER SYSTEM SET LOG_ARCHIVE_CONFIG='DG_CONFIG=(ORCL_STBY,ORCL)';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_1='LOCATION=/archivelog VALID_FOR=(ALL_LOGFILES,ALL_ROLES) DB_UNIQUE_NAME=ORCL_STBY';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_2='SERVICE=orcl ASYNC VALID_FOR=(ONLINE_LOGFILES,PRIMARY_ROLE) DB_UNIQUE_NAME=ORCL';
ALTER SYSTEM SET LOG_ARCHIVE_DEST_STATE_2=ENABLE;
ALTER SYSTEM SET LOG_ARCHIVE_FORMAT='%t_%s_%r.arc' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_ARCHIVE_MAX_PROCESSES=4;
ALTER SYSTEM SET REMOTE_LOGIN_PASSWORDFILE=EXCLUSIVE SCOPE=SPFILE;
ALTER SYSTEM SET FAL_SERVER=ORCL;
ALTER SYSTEM SET FAL_CLIENT=ORCL_STBY;
ALTER SYSTEM SET DB_FILE_NAME_CONVERT='/oradata/ORCL_STBY','/oradata/ORCL' SCOPE=SPFILE;
ALTER SYSTEM SET LOG_FILE_NAME_CONVERT='/oradata/ORCL_STBY','/oradata/ORCL' SCOPE=SPFILE;
ALTER SYSTEM SET STANDBY_FILE_MANAGEMENT=AUTO;

5、开启日志应用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
sql复制代码SQL> select open_mode,database_role from v$database;

OPEN_MODE DATABASE_ROLE
-------------------- ----------------
READ ONLY PHYSICAL STANDBY

SQL> show pdbs

CON_ID CON_NAME OPEN MODE RESTRICTED
---------- ------------------------------ ---------- ----------
2 PDB$SEED READ ONLY NO
3 PDB01 READ ONLY NO


--开启日志应用
alter database recover managed standby database using current logfile disconnect;


SQL> select database_role,open_mode from v$database;

DATABASE_ROLE OPEN_MODE
---------------- --------------------
PHYSICAL STANDBY READ ONLY WITH APPLY

SQL> SELECT protection_mode FROM v$database;

PROTECTION_MODE
--------------------
MAXIMUM PERFORMANCE

6、测试同步情况

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
sql复制代码set line222
col member for a60

--查看是否存在RFS和MRP进程
select process,group#,thread#,sequence# from gv$managed_standby;
SQL>
--查看standby日志status是否存在active
select t1.group#,t1.thread#,t1.bytes/1024/1024,t1.status,t2.member from gv$standby_log t1,gv$logfile t2 where t1.group#=t2.group#;

PROCESS GROUP# THREAD# SEQUENCE#
--------- ---------------------------------------- ---------- ----------
DGRD N/A 0 0
DGRD N/A 0 0
ARCH N/A 0 0
ARCH N/A 0 0
ARCH N/A 0 0
ARCH N/A 0 0
RFS N/A 1 0
RFS 1 1 28
MRP0 N/A 1 28
DGRD N/A 0 0

10 rows selected.

SQL> SQL> SQL>
GROUP# THREAD# T1.BYTES/1024/1024 STATUS MEMBER
---------- ---------- ------------------ ---------- ------------------------------------------------------------
4 1 120 ACTIVE /oradata/ORCL_STBY/standby_redo04.log
5 1 120 UNASSIGNED /oradata/ORCL_STBY/standby_redo05.log
6 1 120 UNASSIGNED /oradata/ORCL_STBY/standby_redo06.log
7 1 120 UNASSIGNED /oradata/ORCL_STBY/standby_redo07.log

SQL> show pdbs

CON_ID CON_NAME OPEN MODE RESTRICTED
---------- ------------------------------ ---------- ----------
2 PDB$SEED READ ONLY NO
3 PDB01 READ ONLY NO

--主库插入数据
sqlplus test/test@pdb01
insert into test values (999);
commit;

--备库查询
SQL> alter session set container=pdb01;
SQL> select * from test.test;

ID
----------
1
2
999

--备库已同步

四、Database Switchover

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
bash复制代码--是否存在GAP
select thread#,low_sequence#,high_sequence# from v$archive_gap;

--主库确认可切换角色
select open_mode,database_role,protection_mode,protection_level,switchover_status from v$database;

OPEN_MODE DATABASE_ROLE PROTECTION_MODE PROTECTION_LEVEL SWITCHOVER_STATUS
-------------------- ---------------- -------------------- -------------------- --------------------
READ WRITE PRIMARY MAXIMUM PERFORMANCE MAXIMUM PERFORMANCE TO STANDBY

--备库确认可切换角色
select open_mode,database_role,protection_mode,protection_level,switchover_status from v$database;
OPEN_MODE DATABASE_ROLE PROTECTION_MODE PROTECTION_LEVEL SWITCHOVER_STATUS
-------------------- ---------------- -------------------- -------------------- --------------------
READ ONLY WITH APPLY PHYSICAL STANDBY MAXIMUM PERFORMANCE MAXIMUM PERFORMANCE NOT ALLOWED

NOTES:
A 如果switchover_status为TO_STANDBY说明可以直接转换
alter database commit to switchover to physical standby;

B 如果switchover_status为SESSIONS ACTIVE 则关闭会话
alter database commit to switchover to physical standby with session shutdown;

主库:

1
2
3
4
5
6
7
8
9
10
11
12
sql复制代码ALTER DATABASE COMMIT TO SWITCHOVER TO PHYSICAL STANDBY;
SHUTDOWN IMMEDIATE;
STARTUP NOMOUNT;
ALTER DATABASE MOUNT STANDBY DATABASE;
ALTER DATABASE OPEN READ ONLY;
ALTER DATABASE RECOVER MANAGED STANDBY DATABASE USING CURRENT LOGFILE DISCONNECT FROM SESSION;

SQL> select open_mode,database_role,protection_mode,protection_level,switchover_status from v$database;

OPEN_MODE DATABASE_ROLE PROTECTION_MODE PROTECTION_LEVEL SWITCHOVER_STATUS
-------------------- ---------------- -------------------- -------------------- --------------------
READ ONLY WITH APPLY PHYSICAL STANDBY MAXIMUM PERFORMANCE MAXIMUM PERFORMANCE TO PRIMARY

备库:

1
2
3
4
5
6
7
8
9
10
11
sql复制代码alter system set log_archive_dest_state_2=DEFER;
ALTER DATABASE COMMIT TO SWITCHOVER TO PRIMARY;
SHUTDOWN IMMEDIATE;
STARTUP;
alter system set log_archive_dest_state_2=ENABLE;

SQL> select open_mode,database_role,protection_mode,protection_level,switchover_status from v$database;

OPEN_MODE DATABASE_ROLE PROTECTION_MODE PROTECTION_LEVEL SWITCHOVER_STATUS
-------------------- ---------------- -------------------- -------------------- --------------------
READ WRITE PRIMARY MAXIMUM PERFORMANCE MAXIMUM PERFORMANCE TO STANDBY

五、Failover

Failover后,原主库将从DG配置中删除,如果原主库启用了Flashback,则在修复故障后,故障的数据库可恢复为新的standby数据库。

1
2
sql复制代码ALTER DATABASE RECOVER MANAGED STANDBY DATABASE FINISH;
ALTER DATABASE ACTIVATE STANDBY DATABASE;

测试Failover

1、主库开启闪回,防止Failover后无法再次切回备库

1
2
3
4
5
sql复制代码alter database flashback on;
alter system set db_recovery_file_dest='/u01/app/oracle/fast_recovery_area' scope=spfile;
alter system set db_recovery_file_dest_size=5G scope=spfile;
shutdown immediate
startup

2、备库进行Failover

1
2
3
4
5
6
7
8
9
10
sql复制代码ALTER DATABASE RECOVER MANAGED STANDBY DATABASE FINISH;
ALTER DATABASE ACTIVATE STANDBY DATABASE;
ALTER DATABASE OPEN;

--切换成功
select database_role,open_mode from v$database;

DATABASE_ROLE OPEN_MODE
---------------- --------------------
PRIMARY READ WRITE

3、将原主库闪回,切换回备库

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sql复制代码--查询新主库的scn号
SQL> select to_char(standby_became_primary_scn) from v$database;

TO_CHAR(STANDBY_BECAME_PRIMARY_SCN)
----------------------------------------
1022650

--原主库闪回到scn 1022650
SQL> flashback database to scn 1022650;

Flashback complete.

--切换到physical standby
SQL> alter database convert to physical standby;

Database altered.

shutdown immediate
startup

--开启日志应用
alter database recover managed standby database using current logfile disconnect from session;

--原主库已恢复为备库
SQL> select open_mode,database_role from v$database;

OPEN_MODE DATABASE_ROLE
-------------------- ----------------
READ ONLY WITH APPLY PHYSICAL STANDBY

SQL> select flashback_on from v$database;

FLASHBACK_ON
------------------
YES

六、Snapshot Standby

Snapshot standby database是ORACLE 11g的新特性。允许Physical standby短时间的使用read write模式。

1、切换为Snapshot Standby

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
sql复制代码--记录表test状态
SQL> select * from test.test;

ID
----------
1
2
3

--关闭备库
SHUTDOWN IMMEDIATE;
STARTUP MOUNT;

--取消日志应用
ALTER DATABASE RECOVER MANAGED STANDBY DATABASE CANCEL;

--查看闪回状态
SELECT flashback_on FROM v$database;
FLASHBACK_ON
------------------
NO

--切换为snapshot standby
ALTER DATABASE CONVERT TO SNAPSHOT STANDBY;

--打开数据库到读写状态
ALTER DATABASE OPEN;

SELECT flashback_on FROM v$database;
FLASHBACK_ON
------------------
RESTORE POINT ONLY

--You can now do treat the standby like any read-write database.
--主库插入数据,测试之后切换回physical standby是否能同步
insert into test.test values (9999);
commit;

--备库尝试插入数据,drop表
SQL> insert into test.test values (777);

1 row created.

SQL> commit;

Commit complete.

SQL> select * from test.test;

ID
----------
1
2
3
777

SQL> drop table test.test;

Table dropped.

2、切换回PHYSICAL STANDBY

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
sql复制代码--关闭数据库
SHUTDOWN IMMEDIATE;

--开启到mount
STARTUP MOUNT;

--切换回PHYSICAL STANDBY
ALTER DATABASE CONVERT TO PHYSICAL STANDBY;

--关闭数据库
SHUTDOWN IMMEDIATE;

--开启数据库,并开启到read only
STARTUP NOMOUNT;
ALTER DATABASE MOUNT STANDBY DATABASE;
ALTER DATABASE OPEN READ ONLY;

--开启日志应用
ALTER DATABASE RECOVER MANAGED STANDBY DATABASE USING CURRENT LOGFILE DISCONNECT FROM SESSION;

SELECT flashback_on FROM v$database;

FLASHBACK_ON
------------------
NO

--已恢复到PHYSICAL STANDBY,查看表是否正常,期间主库的操作是否同步
SQL> select * from test.test;

ID
----------
1
2
3
9999

**注意:**一旦snapshot standby被激活的时间超出了primary 的最大负载时间,再次的本地更新操作将会产生额外的异常。

七、开启FLASHBACK(备库)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
sql复制代码--取消日志应用
ALTER DATABASE RECOVER MANAGED STANDBY DATABASE CANCEL;

--开启闪回
ALTER DATABASE FLASHBACK ON;

--配置闪回参数
alter system set db_recovery_file_dest='/u01/app/oracle/fast_recovery_area' scope=spfile;
alter system set db_recovery_file_dest_size=5G scope=spfile;

--开启数据库到read only
shutdown immediate
startup mount
alter database open read only;

--开启日志应用
ALTER DATABASE RECOVER MANAGED STANDBY DATABASE USING CURRENT LOGFILE DISCONNECT FROM SESSION;

参考官方文档12c:Using DBCA to Create a Data Guard Standby 12C

参考官方文档19c:Using DBCA to Create a Data Guard Standby 19C

参考官方文档21c:Using DBCA to Create a Data Guard Standby 21C

参考MOS文档:Creating a Physical Standby database using DBCA duplicate (Doc ID 2283697.1)

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

MySQL调优指南(持续更新)

发表于 2021-11-19

这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战

MySQL数据库优化通常有如下四个维度:硬件、系统配置、数据库表结构、SQL及索引。

image.png

  • 从优化成本来看:硬件>系统配置>数据库表结构>SQL及索引。
  • 从优化效果来看:硬件<系统配置<数据库表结构<SQL及索引。

要想对MySQL进行优化,我们需要知道MySQL各组件之间如何协同工作以及MySQL是如何优化和执行查询的。

MySQL逻辑架构

image.png

MySQL逻辑架构整体分为三层,最上层为客户端层,并非MySQL所独有,诸如:连接处理、授权认证、安全等功能均在这一层处理。

MySQL大多数核心服务均在中间这一层,包括查询解析、分析、优化、缓存、内置函数(比如:时间、数学、加密等函数)。所有的跨存储引擎的功能也在这一层实现:存储过程、触发器、视图等。

最下层为存储引擎,其负责MySQL中的数据存储和提取。和Linux下的文件系统类似,每种存储引擎都有其优势和劣势。中间的服务层通过API与存储引擎通信,这些API接口屏蔽了不同存储引擎间的差异。

image.png

MySQL查询过程

img

  1. 客户端发送一条查询给服务器。
  2. 服务器先检查查询缓存,如果命中了缓存,则立刻返回存储在缓存中的结果.否则,进入下一个阶段。
  3. 服务器进行SQL解析.预处理,再由优化器生成对应的执行计划。
  4. MySQL根据优化器生成的执行计划,调用存储引擎的API来执行查询。
  5. 将结果返回给客户端。

关于查询缓存的说明:

在解析一个查询语句前,如果缓存是打开的,那么MySQL会优先检查这个查询是否命中查询缓存中的数据。

如果没有命中,则进入下一阶段的处理。

如果命中查询缓存,则会检查用户的权限。

如果权限没有问题,MySQL会跳过其他阶段,直接拿数据返回给客户端。

下面我们将从硬件、系统配置、表设计、sql语句等方面来优化MySQL。

硬件层面优化

  • CPU:选择多核的CPU,主频高的CPU。
  • 内存:选择更大的内存。
  • 磁盘:选择更快的转速、RAID、阵列卡,或者在条件允许的情况下,使用SSD。
  • 网络环境:尽量部署在局域网,使用光缆、千/万兆网等提供网络,通过双网线提供冗余、使用多端口绑定监听。

系统配置优化

操作系统配置优化

  • 使用64位操作系统,更好的使用大内存。
  • 优化内核参数。
  • 加大文件描述符限制。
  • 文件系统选择:XFS,JFS,EXT3/EXT4(文件系统的选择对确保数据的安全性很重要)。

Mysql软件优化

  • 开启mysql复制,实现读写分离、负载均衡,将读的负载分摊到多个从服务器上,提高服务器的处理能力。
  • 使用推荐的GA版本(正式发布的版本),提升性能。
  • 利用分区新功能进行大数据的数据拆分。

Mysql配置优化

注:全局参数一经设置,随服务器启动预占用资源。

  • wait_time_out参数:线程连接的超时时间,尽量不要设置很大,推荐10s。
  • thread_concurrency参数:线程并发利用数量( 在5.7.2版本的mysql中被移除)。在InnoDB中,我们可以通过设置参数innodb_thread_concurrency参数限制线程的数量。

innodb_thread_concurrency的使用建议

在官方文档上,对于innodb_thread_concurrency的使用,也给出了一些建议,如下:

如果一个工作负载中,并发用户线程的数量小于64,建议设置innodb_thread_concurrency=0;

如果工作负载一直较为严重甚至偶尔达到顶峰,建议先设置innodb_thread_concurrency=128,并通过不断的降低这个参数,96, 80, 64等等,直到发现能够提供最佳性能的线程数,例如,假设系统通常有40到50个用户,但定期的数量增加至60,70,甚至200。你会发现,性能在80个并发用户设置时表现稳定,如果高于这个数,性能反而下降。在这种情况下,建议设置innodb_thread_concurrency参数为80,以避免影响性能。

  • read_buffer_size参数:全表扫描时为查询预留的缓冲大小,根据select_scan判断。
  • tmp_table_size参数:临时内存表的设置,如果超过设置就会转化成磁盘表, 根据参数(created_tmp_disk_tables)判断。

Mysql表设计优化

存储引擎的选择

  • Myisam:适合并发量不大,读多写少,而且都能很好的用到索引,且sql语句比较简单的应用,比如,数据仓库。
  • Innodb:适合并发访问大,写操作比较多,有外键、事务等需求的应用,系统内存较大。

命名规则

  • 采用多数开发语言命名规则,比如MyAdress(驼峰原则)
  • 采用多数开源思想命名规则,比如my_address,通常采用下划线这种命名规则。
  • 避免随便命名,最好能够见名知意。

字段类型选择

根据需求选择合适的字段类型,在满足需求的情况下字段类型尽可能小。只分配满足需求的最小字符数,不要太慷慨。

原因:更小的字段类型和更小的字符数将占用更少的内存,占用更少的磁盘空间,占用更少的磁盘IO,以及占用更少的带宽。

编码选择

  • 单字节-latin1
  • 多字节-utf8(汉字占3个字节,英文字母占用一个字节)
  • 如果含有中文字符的话最好都统一采用utf8类型,避免乱码的情况发生。

主键选择

注:这里说的主键设计主要是针对INNODB引擎。

  • 能唯一的表示行。
  • 显式的定义一个数值类型自增字段的主键,这个字段可以仅用于做主键,不做其他用途。
  • MySQL主键应该是单列的,以便提高连接和筛选操作的效率。
  • 主键字段类型尽可能小,能用SMALLINT就不用INT,能用INT就不用BIGINT。
  • 尽量保证不对主键字段进行更新修改,防止主键字段发生变化,引发数据存储碎片,降低IO性能。
  • MySQL主键不应包含动态变化的数据,如时间戳、创建时间列、修改时间列等。
  • MySQL主键应当由自动生成。
  • 主键字段放在数据表的第一顺序。

通常,我们推荐采用数值类型做主键并采用auto_increment属性让其自动增长。

Mysql语句层面优化

  • 性能差的读语句,在innodb中统计行数,建议另外弄一张统计表,采用myisam,定期做统计。一般对统计的数据不会要求太精准的情况下适用。
  • 尽量不要在数据库中做运算。
  • 避免 负向查询 和 %前缀模糊查询。
  • 不在索引列做运算或者使用函数。
  • 不要在生产环境程序中使用select * from的形式查询数据。只查询需要使用的列。
  • 查询时,尽可能使用limit减少返回的行数,减少数据传输时间和带宽浪费。
  • where子句尽可能避免对查询列使用函数,因为对查询列使用函数用不到索引。
  • 避免隐式类型转换,例如字符型一定要用’’,数字型一定不要使用’’。
  • 所有的SQL关键词用大写,养成良好的习惯,避免SQL语句重复编译造成系统资源的浪费。
  • 联表查询的时候,记得把小结果集放在前面,遵循小结果集驱动大结果集的原则。
  • 开启慢查询,定期用explain优化慢查询中的SQL语句。
  • 拆分大的delete或insert语句。

总结

从上面看出,MYSQL主要从以下几方面进行优化:

  • 表设计:合理的存储引擎,字段类型,范式与逆范式
  • 功能:合适的索引,缓存,分区分表。
  • 架构:采用主从复制,读写分离,负载均衡。
  • 合理SQL:测试及对比同一功能不同sql的查询效率,根据过往的经验编写高效的sql。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

springboot多数据源配置实例

发表于 2021-11-19

多数据源配置, 大多数情况下是用在读写库或者主从库之间的自动切换. 但是在有些情况下, 业务上也需要切换数据源.

而现在主流的多数据源方案, 无论是使用现有的框架还是自定义, 使用的核心原理都是类似的. 这个实例, 就是模拟业务上多数据源的使用场景, 自定义切换规则, 以实现多数据源的切换.

实例的实现逻辑是:

在有注解标注的地方, 按照优先使用注解标注的数据源;

如果注解没有指定数据源, 将解析URL来自动匹配数据源;

如果没有使用注解, 将使用默认数据源

使用的原理是线程内部维护数据源变量, 当请求进来时, 利用切面, 指定数据源, 并在线程内部传递

因为使用的变量都是维护在线程内部, 故而当有线程切换时, 会丢失数据源, 此时需要手动传递数据源, 或者依据对应技术使用可行的技术实现动态的切换, 但是原理是一致的

实例里的多处核心逻辑都是可以扩展或者自定义的, 以满足不同业务的需求.

使用的框架和技术:

springboot mybatis mysql

ThreadLocal AOP DynamicDataSource

代码展示

先展示一下实例的文件结构:

image.png

下面对主要的几个文件进行说明:

application.yml

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
yml复制代码# 应用名称
spring:
profiles:
# 引入application-jdbc.properties配置文件
active: jdbc
application:
name: database
# 数据库驱动
datasource:
db1:
driver-class-name: com.mysql.cj.jdbc.Driver
username: ${db1.username}
password: ${db1.password}
jdbc-url: ${db1.url}
db2:
driver-class-name: com.mysql.cj.jdbc.Driver
username: ${db2.username}
password: ${db2.password}
jdbc-url: ${db2.url}

server:
port: 80
servlet:
context-path: /db
# 指定Mybatis的Mapper文件
mybatis:
mapper-locations: classpath:mappers/*xml
type-aliases-package: jin.panpan.database.entity

#日志配置
logging:
config: classpath:log4j2.xml

配置文件, 除了基本的应用名称、端口、路径、日志等配置以外, 主要看数据库驱动的配置, 这里我们不像平时的单数据源的配置, 我们需要配置多个数据源, 上面示例的配置文件里, 由两个注意点:

  • db1和db2是我们自定义的数据源名称, 这个我们在后面的MybatisConfig文件里会用到
  • 数据源里的配置jdbc-url对应单数据源配置里的url, 这里使用url会注入失败
  • ${db1.username}等是导入的application-jdbc.properties文件里的配置值

application-jdbc.properties

1
2
3
4
5
6
7
properties复制代码db1.username = user
db1.password =
db1.url = jdbc:mysql://xx.xx.xx.xx:3306/db1

db2.username = user
db2.password =
db2.url = jdbc:mysql://xx.xx.xx.xx:3306/db2

key-value结构, key可以在引入此文件的配置里使用${key}引用

这里我们使用两个数据库, 但是数据库的表结构一致(至少多数据源涉及到的表需要结构一致), 以避免不能共用一套代码

DataSourceType

1
2
3
4
5
6
7
8
java复制代码public enum DataSourceType {
//NONE用来返回默认
NONE(""),
DB1("db1"),
DB2("db2"),
;
//省略部分代码
}

此文件是数据源的枚举, 这里的枚举我们只配置里一个参数, 可以依据业务扩展. 本枚举的值代表数据源的名称, 是和配置文件里的配置对应上的

另外说明一下, NONE是用来代表默认数据源的, 是为了方便编程添加的, 不是必须的

DataSourceUtil

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
java复制代码public class DataSourceUtil {

private static final ThreadLocal<DataSourceType> localDataSource = new ThreadLocal<>();

private DataSourceUtil(){
}

public static DataSourceType get() {
return localDataSource.get();
}

public static void set(DataSourceType type){
localDataSource.set(type);
}

public static void remove() {
localDataSource.remove();
}

}

数据源切换工具类, 这里的核心是ThreadLocal<DataSourceType>变量, ThreadLocal以前有过文档分析, 主要是用来维护线程内部变量的, 其中:

  • get()方法用来获取数据源
  • set()方法用来设置数据源
  • remove()用来清除数据源

MybatisConfig*

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
Java复制代码@Configuration
@MapperScan("jin.panpan.database.dao")
public class MybatisConfig {
//默认数据源
@Primary
@Bean("db1")
@ConfigurationProperties(prefix = "spring.datasource.db1")
public DataSource dataSource1(){
return DataSourceBuilder.create().build();
}
@Bean("db2")
@ConfigurationProperties(prefix = "spring.datasource.db2")
public DataSource dataSource2(){
return DataSourceBuilder.create().build();
}
//动态数据源选择
@Bean
public DynamicDataSource dynamicDataSource(@Qualifier("db1") DataSource db1,
@Qualifier("db2") DataSource db2){
Map<Object, Object> map = new HashMap<>();
//此处的key 要和DynamicDataSource类的determineCurrentLookupKey方法返回值一致
map.put(DataSourceType.DB1.getDatabase(), db1);
map.put(DataSourceType.DB2.getDatabase(), db2);

DynamicDataSource dynamicDataSource = new DynamicDataSource();
dynamicDataSource.setTargetDataSources(map);
//设置默认数据库, 选择的数据库Bean注入时要带@Primary注释
dynamicDataSource.setDefaultTargetDataSource(db1);

return dynamicDataSource;
}
//会话工厂配置
@Bean
public SqlSessionFactory sqlSessionFactory(DynamicDataSource dynamicDataSource) throws Exception {
SqlSessionFactoryBean factoryBean = new SqlSessionFactoryBean();
factoryBean.setDataSource(dynamicDataSource);
Resource[] resources = new PathMatchingResourcePatternResolver()
.getResources("classpath:mappers/*xml");
factoryBean.setMapperLocations(resources);
return factoryBean.getObject();
}
//事务管理配置
@Bean
public PlatformTransactionManager transactionManager(DynamicDataSource dynamicDataSource){
return new DataSourceTransactionManager(dynamicDataSource);
}

}

本类主要用来配置和注入mybatis相关配置bean, 主要有下面几步:

  • 自动扫描mapper接口配置

@MapperScan(“jin.panpan.database.dao”)

一般常规配置, 如果不配置多数据源, 一般是在入口类上注解

  • 数据源注入

@ConfigurationProperties 导入配置文件里的配置

@Bean 指定数据源名称

@Primary 标记默认数据源

  • 数据源动态选择Bean注入

加载多个数据源, 如果需要的话, 指定默认数据源

需要注意数据源容器Map的key要和DataSourceType里的字段值一致, 否则无法切换

  • 会话工厂Bean配置

单数据源时, 一般无需手动配置; 多数据源时, 需要手动指定会话工厂, 但是配置是模式化的

  • 事务管理Bean配置

事务管理的配置也是模式化的, 一般无特殊处理之处

DynamicDataSource*

1
2
3
4
5
6
7
8
9
10
Java复制代码public class DynamicDataSource extends AbstractRoutingDataSource {
private static final Logger logger = LoggerFactory.getLogger(DynamicDataSource.class);

@Override
protected Object determineCurrentLookupKey() {
String database = DataSourceUtil.get() == null ? null : DataSourceUtil.get().getDatabase();
logger.info("DynamicDataSource, 动态数据源返回={}", database);
return database;
}
}

此类其实是动态路由数据源的核心类, 扩展AbstractRoutingDataSource类, 重写determineCurrentLookupKey方法, determineCurrentLookupKey方法的返回值就是动态数据源, 返回值需要和配置文件及MybatisConfig里的一一对应.

DataSource和DataSourceAspect*

1
2
3
4
5
6
7
8
Java复制代码@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.METHOD)
@Documented
public @interface DataSource {

DataSourceType value() default DataSourceType.NONE;

}

DataSource是一个方法注解, 用来作为数据源切换切入点, 此处可以使用类注解或者直接正则织入想监控的点

DataSourceAspect是注解的实现, 是核心实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
Java复制代码@Aspect
@Component
public class DataSourceAspect {
private static final Logger logger = LoggerFactory.getLogger(DataSourceAspect.class);

@Pointcut("@annotation(jin.panpan.database.dynamic.datasource.annotate.DataSource)")
public void pointCut() {
}

@SneakyThrows
@Around("pointCut()")
public Object around(ProceedingJoinPoint pjp) {
//数据源
DataSourceType dataSourceType = null;

//方法对象
MethodSignature signature = (MethodSignature) pjp.getSignature();
Method method = signature.getMethod();
//注释
DataSource dynamicDataSource = method.getAnnotation(DataSource.class);
String path = null;
//注释中指定数据源, 则使用数据库中的数据源
if(dynamicDataSource.value()!=null && dynamicDataSource.value() != DataSourceType.NONE){
dataSourceType = dynamicDataSource.value();
}

//否则从请求路径中获取
if(dataSourceType == null){
HttpServletRequest request = ((ServletRequestAttributes) Objects.requireNonNull(RequestContextHolder.getRequestAttributes())).getRequest();
path = request.getRequestURL().toString();
//TODO 待优化 需截取域名进行检查
if(path.contains(DataSourceType.DB1.getDatabase())){
dataSourceType = DataSourceType.DB1;
}
if(path.contains(DataSourceType.DB2.getDatabase())){
dataSourceType = DataSourceType.DB2;
}
}

logger.info("DataSource, 方法={}, 数据源={}, 路径={}, 注释自带数据源={}",
method.getName(), dataSourceType==null ? null : dataSourceType.getDatabase(),
path, dynamicDataSource.value());

if(dataSourceType == null){
throw new Exception("非法数据源");
}

//指定数据库
DataSourceUtil.set(dataSourceType);

Object result = pjp.proceed();

//清除数据库
logger.info("DataSource, 清除数据源, 方法={}, 数据源={}, 路径={}, 注释自带数据源={}",
method.getName(), dataSourceType.getDatabase(),
path, dynamicDataSource.value());
DataSourceUtil.remove();

return result;
}

}

重要的步骤已经加了注解, 完成的工作是:

  • 获取数据源 从注解/从路径
  • 指定数据源
  • 调用结束后清除数据源

核心步骤是通过DataSourceUtil切换数据源

此方法是实现切换逻辑的核心方法, 具体的切换逻辑, 应该由业务逻辑决定, 这里只是示范

逻辑验证

这里说明一下测试数据:

两个数据库: db1 db2

db1中有一条数据 id=1

db2中没有数据

核心代码也有日志, 可以依据日志观察数据源

为了区分路径, 在hosts里加了如下配置, 以区分不同环境路径:

1
2
3
复制代码127.0.0.1       db1.db.com
127.0.0.1       db2.db.com
127.0.0.1       db.com

不指定数据源, 也不启用数据源注解

1
2
3
4
5
6
7
8
9
10
11
Java复制代码@GetMapping("queryById/{id}")
public Result<BasTableEntity> selectById(@PathVariable("id") Long id){
BasTableEntity entity;
try {
entity = basTableService.queryById(id);
}catch (Exception e){
logger.error("查询异常, 错误={}", e.getMessage(), e);
return Result.fail(null, "查询异常");
}
return Result.success(entity);
}

按预期使用默认数据源db1,可以查询到一条数据

image.png

启用注解, 不指定数据源

1
2
3
4
5
6
7
8
9
10
11
12
Java复制代码@DataSource
@GetMapping("queryByIdj/{id}")
public Result<BasTableEntity> selectByIdj(@PathVariable("id") Long id){
BasTableEntity entity;
try {
entity = basTableService.queryById(id);
}catch (Exception e){
logger.error("查询异常, 错误={}", e.getMessage(), e);
return Result.fail(null, "查询异常");
}
return Result.success(entity);
}

按预期将启用地址匹配数据源

image.png

image.png

启用注解, 指定数据源

1
2
3
4
5
6
7
8
9
10
11
12
Java复制代码@DataSource(DataSourceType.DB2)
@GetMapping("queryByIdp/{id}")
public Result<BasTableEntity> selectByIdp(@PathVariable("id") Long id){
BasTableEntity entity;
try {
entity = basTableService.queryById(id);
}catch (Exception e){
logger.error("查询异常, 错误={}", e.getMessage(), e);
return Result.fail(null, "查询异常");
}
return Result.success(entity);
}

按预期将使用指定的数据源2

image.png

测试在线程间切换时, 导致的数据源丢失

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
Java复制代码@SneakyThrows
@DataSource(DataSourceType.DB2)
@GetMapping("queryByIda/{id}")
public Result<BasTableEntity> selectByIda(@PathVariable("id") Long id){
logger.info("1.basTable={}", DataSourceUtil.get());
//线程池, 模拟要在多线程环境下进行的业务动作
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
Future<BasTableEntity> future;
BasTableEntity entity;
try {
future = newCachedThreadPool.submit(() -> {
logger.info("2.basTable={}", DataSourceUtil.get());
return basTableService.queryById(id);
});
}catch (Exception e){
logger.error("查询异常, 错误={}", e.getMessage(), e);
return Result.fail(null, "查询异常");
}
logger.info("3.basTable={}", DataSourceUtil.get());
entity = future.get();
newCachedThreadPool.shutdown();
return Result.success(entity);
}

image.png

可以看出, 虽然指定了db2, 但是还是查询出了数据, 说明走了默认数据源, 有日志也可以看出

image.png
在线程里(2.basTable=)数据源丢失了, 因为线程已经不是原先的线程了

在线程间切换时, 导致的数据源丢失问题解决

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
Java复制代码@SneakyThrows
@DataSource(DataSourceType.DB2)
@GetMapping("queryByIdt/{id}")
public Result<BasTableEntity> selectByIdt(@PathVariable("id") Long id){
//获取数据源
DataSourceType type = DataSourceUtil.get();//****核心代码1****
logger.info("1.basTable={}", type);
//线程池, 模拟要在多线程环境下进行的业务动作
ExecutorService newCachedThreadPool = Executors.newCachedThreadPool();
Future<BasTableEntity> future;
BasTableEntity entity;
try {
future = newCachedThreadPool.submit(() -> {
//进入新线程, 传递数据源
DataSourceUtil.set(type);//****核心代码2****
logger.info("2.basTable={}", DataSourceUtil.get());
return basTableService.queryById(id);
});
}catch (Exception e){
logger.error("查询异常, 错误={}", e.getMessage(), e);
return Result.fail(null, "查询异常");
}
logger.info("3.basTable={}", DataSourceUtil.get());
entity = future.get();
newCachedThreadPool.shutdown();
return Result.success(entity);
}

image.png
看日志:

image.png

数据源传递成功

在其他的数据源可能丢失的情况, 都可以使用此方法实现


资源清单:
database-dev.zip

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

大数据Hadoop-生产调优学习之旅第二篇 一、HDFS 多

发表于 2021-11-19

「这是我参与11月更文挑战的第19天,活动详情查看:2021最后一次更文挑战」。

一、HDFS 多目录

1、NameNode 多目录配置

  • NameNode的本地目录可以配置成多个,且每个目录存放内容相同,增加了可靠性

image.png

  • 具体配置如下
+ 在hdfs-site.xml文件中添加如下内容。



1
2
3
4
xml复制代码<property>
<name>dfs.namenode.name.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/name1,file://${hadoop.tmp.dir}/dfs/name2</value>
</property>
注意:因为每台服务器节点的磁盘情况不同,所以这个配置配完之后,可以选择不分发 + 停止集群,删除三台节点的data和logs中所有数据。
1
2
3
shell复制代码[moe@hadoop102 hadoop-3.1.3]$ rm -rf data/ logs/
[moe@hadoop103 hadoop-3.1.3]$ rm -rf data/ logs/
[moe@hadoop104 hadoop-3.1.3]$ rm -rf data/ logs/
+ 格式化集群并启动。
1
2
shell复制代码[moe@hadoop102 hadoop-3.1.3]$ bin/hdfs namenode -format
[moe@hadoop102 hadoop-3.1.3]$ sbin/start-dfs.sh
  • 查看结果
1
2
3
4
5
shell复制代码[moe@hadoop102 dfs]$ ll
总用量 12
drwx------. 3 moe moe 4096 12月 11 08:03 data
drwxrwxr-x. 3 moe moe 4096 12月 11 08:03 name1
drwxrwxr-x. 3 moe moe 4096 12月 11 08:03 name2

检查name1和name2里面的内容,发现一模一样。

2、DataNode 多目录配置

  • DataNode可以配置成多个目录,每个目录存储的数据不一样(数据不是副本)

image.png

  • 具体配置如下

在hdfs-site.xml文件中添加如下内容

1
2
3
4
xml复制代码<property>
<name>dfs.datanode.data.dir</name>
<value>file://${hadoop.tmp.dir}/dfs/data1,file://${hadoop.tmp.dir}/dfs/data2</value>
</property>
  • 查看结果
1
2
3
4
5
6
shell复制代码[moe@hadoop102 dfs]$ ll
总用量 12
drwx------. 3 moe moe 4096 4月 4 14:22 data1
drwx------. 3 moe moe 4096 4月 4 14:22 data2
drwxrwxr-x. 3 moe moe 4096 12月 11 08:03 name1
drwxrwxr-x. 3 moe moe 4096 12月 11 08:03 name2
  • 向集群上传一个文件,再次观察两个文件夹里面的内容发现不一致(一个有数一个没有)
1
shell复制代码[moe@hadoop102 hadoop-3.1.3]$ hadoop fs -put wcinput/word.txt /

3、集群数据均衡之磁盘间数据均衡

生产环境,由于硬盘空间不足,往往需要增加一块硬盘。刚加载的硬盘没有数据时,可以执行磁盘数据均衡命令。(Hadoop3.x新特性)

image.png

  • 生成均衡计划(我们只有一块磁盘,不会生成计划)
1
shell复制代码hdfs diskbalancer -plan hadoop103
  • 执行均衡计划
1
shell复制代码hdfs diskbalancer -execute hadoop103.plan.json
  • 查看当前均衡任务的执行情况
1
shell复制代码hdfs diskbalancer -query hadoop103
  • 取消均衡任务
1
shell复制代码hdfs diskbalancer -cancel hadoop103.plan.json

二、HDFS 集群扩容及缩容

1、添加白名单

白名单:表示在白名单的主机IP地址可以,用来存储数据。

企业中:配置白名单,可以尽量防止黑客恶意访问攻击。

image.png

配置白名单步骤如下:

  • 在NameNode节点的/opt/module/hadoop-3.1.3/etc/hadoop目录下分别创建whitelist 和blacklist文件
+ 创建白名单



1
txt复制代码[moe@hadoop102 hadoop]$ vim whitelist
在whitelist中添加如下主机名称,假如集群正常工作的节点为102 103
1
2
txt复制代码hadoop102
hadoop103
+ 创建黑名单
1
shell复制代码[moe@hadoop102 hadoop]$ touch blacklist
保持空的就可以
  • 在hdfs-site.xml配置文件中增加dfs.hosts配置参数
1
2
3
4
5
6
7
8
9
10
11
xml复制代码<!-- 白名单 -->
<property>
<name>dfs.hosts</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/whitelist</value>
</property>

<!-- 黑名单 -->
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/blacklist</value>
</property>
  • 分发配置文件whitelist,hdfs-site.xml
1
shell复制代码[moe@hadoop102 hadoop]$ xsync hdfs-site.xml whitelist
  • 第一次添加白名单必须重启集群,不是第一次,只需要刷新NameNode节点即可
1
2
shell复制代码[moe@hadoop102 hadoop-3.1.3]$ myhadoop.sh stop
[moe@hadoop102 hadoop-3.1.3]$ myhadoop.sh start
  • 在web浏览器上查看DN,http://hadoop102:9870/dfshealth.html#tab-datanode

image.png

  • 在hadoop104上执行上传数据数据失败
1
shell复制代码[moe@hadoop104 hadoop-3.1.3]$ hadoop fs -put NOTICE.txt /
  • 二次修改白名单,增加hadoop104
1
2
3
4
5
shell复制代码[moe@hadoop102 hadoop]$ vim whitelist
修改为如下内容
hadoop102
hadoop103
hadoop104
  • 刷新NameNode
1
2
shell复制代码[moe@hadoop102 hadoop-3.1.3]$ hdfs dfsadmin -refreshNodes
Refresh nodes successful
  • 在web浏览器上查看DN,http://hadoop102:9870/dfshealth.html#tab-datanode

image.png

2、服役新服务器

  • 需求

随着公司业务的增长,数据量越来越大,原有的数据节点的容量已经不能满足存储数据的需求,需要在原有集群基础上动态添加新的数据节点。

  • 环境准备
+ 在hadoop100主机上再克隆一台hadoop105主机
+ 修改IP地址和主机名称
+ 拷贝hadoop102的/opt/module目录和/etc/profile.d/my\_env.sh到hadoop105
+ 删除hadoop105上Hadoop的历史数据,data和log数据
+ 配置hadoop102和hadoop103到hadoop105的ssh无密登录
  • 服役新节点具体步骤

直接启动DataNode,即可关联到集群

1
2
shell复制代码[moe@hadoop105 hadoop-3.1.3]$ hdfs --daemon start datanode
[moe@hadoop105 hadoop-3.1.3]$ yarn --daemon start nodemanager

image.png

  • 在白名单中增加新服役的服务器
+ 在白名单whitelist中增加hadoop105,并重启集群



1
2
3
4
5
6
txt复制代码[moe@hadoop102 hadoop]$ vim whitelist
修改为如下内容
hadoop102
hadoop103
hadoop104
hadoop105
+ 分发
1
shell复制代码[moe@hadoop102 hadoop]$ xsync whitelist
+ 刷新NameNode
1
2
shell复制代码[moe@hadoop102 hadoop-3.1.3]$ hdfs dfsadmin -refreshNodes
Refresh nodes successful
  • 在hadoop105上上传文件
1
shell复制代码[moe@hadoop105 hadoop-3.1.3]$ hadoop fs -put /opt/module/hadoop-3.1.3/LICENSE.txt /

image.png

3、服务器间数据均衡

  • 企业经验

在企业开发中,如果经常在hadoop102和hadoop104上提交任务,且副本数为2,由于数据本地性原则,就会导致hadoop102和hadoop104数据过多,hadoop103存储的数据量小。

另一种情况,就是新服役的服务器数据量比较少,需要执行集群均衡命令。

image.png

  • 开启数据均衡命令
1
shell复制代码[moe@hadoop105 hadoop-3.1.3]$ sbin/start-balancer.sh -threshold 10

对于参数10,代表的是集群中各个节点的磁盘空间利用率相差不超过10%,可根据实际情况进行调整。

  • 停止数据均衡命令
1
shell复制代码[moe@hadoop105 hadoop-3.1.3]$ sbin/stop-balancer.sh

注意:由于HDFS需要启动单独的Rebalance Server来执行Rebalance操作,所以尽量不要在NameNode上执行start-balancer.sh,而是找一台比较空闲的机器。

4、黑名单退役服务器

黑名单:表示在黑名单的主机IP地址不可以。

企业中:配置黑名单,用来退役服务器。

image.png

黑名单配置步骤如下:

  • 编辑/opt/module/hadoop-3.1.3/etc/hadoop目录下的blacklist文件
1
txt复制代码[moe@hadoop102 hadoop] vim blacklist

添加如下主机名称(要退役的节点)

1
txt复制代码hadoop105

注意:如果白名单中没有配置,需要在hdfs-site.xml配置文件中增加dfs.hosts 配置参数

1
2
3
4
5
xml复制代码<!-- 黑名单 -->
<property>
<name>dfs.hosts.exclude</name>
<value>/opt/module/hadoop-3.1.3/etc/hadoop/blacklist</value>
</property>
  • 分发配置文件blacklist,hdfs-site.xml
1
shell复制代码[moe@hadoop102 hadoop]$ xsync hdfs-site.xml blacklist
  • 第一次添加黑名单必须重启集群,不是第一次,只需要刷新NameNode节点即可
1
2
shell复制代码[moe@hadoop102 hadoop-3.1.3]$ hdfs dfsadmin -refreshNodes
Refresh nodes successful
  • 检查Web浏览器,退役节点的状态为decommission in progress(退役中),说明数据节点正在复制块到其他节点

image.png

  • 等待退役节点状态为decommissioned(所有块已经复制完成),停止该节点及节点资源管理器。注意:如果副本数是3,服役的节点小于等于3,是不能退役成功的,需要修改副本数后才能退役

image.png

1
shell复制代码[moe@hadoop105 hadoop-3.1.3]$ hdfs --daemon stop datanode

stopping datanode

1
shell复制代码[moe@hadoop105 hadoop-3.1.3]$ yarn --daemon stop nodemanager

stopping nodemanager

  • 如果数据不均衡,可以用命令实现集群的再平衡
1
shell复制代码[moe@hadoop102 hadoop-3.1.3]$ sbin/start-balancer.sh -threshold 10

三、友情链接

大数据Hadoop-生产调优学习之旅第一篇

大数据Hadoop-Yarn学习之旅第二篇

大数据Hadoop-Yarn学习之旅第一篇

大数据Hadoop-MapReduce学习之旅第六篇

大数据Hadoop-MapReduce学习之旅第五篇

大数据Hadoop-MapReduce学习之旅第四篇

大数据Hadoop-MapReduce学习之旅第三篇

大数据Hadoop-MapReduce学习之旅第二篇

大数据Hadoop-MapReduce学习之旅第一篇

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【算法攻坚】回溯电话号码

发表于 2021-11-18

这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战

今日题目

给定一个仅包含数字 2-9 的字符串,返回所有它能表示的字母组合。答案可以按 任意顺序 返回。

给出数字到字母的映射如下(与电话按键相同)。注意 1 不对应任何字母。

img

示例 1:

输入:digits = “23”
输出:[“ad”,”ae”,”af”,”bd”,”be”,”bf”,”cd”,”ce”,”cf”]

示例 2:

输入:digits = “”
输出:[]

示例 3:

输入:digits = “2”
输出:[“a”,”b”,”c”]

提示:

0 <= digits.length <= 4
digits[i] 是范围 [‘2’, ‘9’] 的一个数字。

思路

从示例上来说,输入”23”,最直接的想法就是两层for循环遍历了,正好把组合的情况都输出了。

如果输入”234”呢,那么就三层for循环,如果”2345”呢,就四层for循环…….

所以这种暴力破解办法的算法复杂度是至少是O(3^len(digist)),所以要想别的方案

仔细观察题目,要完成这道题需要解决如下三个问题:

  • 数字和字母如何映射
  • 两个字母就两个for循环,三个字符我就三个for循环,以此类推,然后发现代码根本写不出来
  • 输入1 * #按键等等异常情况

对于这三个问题,相应的处理方式:

  • 只要是映射关系直接考虑map,这道题key是数字且连续的,所以同时可以用数组索引当做key
  • 这种情况可以利用回溯思想解决,回溯思想其实还是需要利用递归方程才能实现,把满足条件的场景存起来,然后回退,直到遍历完所有场景
  • 把主要的异常场景都要考虑到,不然算法题还是很难AC,可能需要多次调试

从上面分析可以看出最难点就是在回溯思想的实现:

  • 确定回溯函数参数
  • 确定终止条件
  • 确定单层遍历逻辑

解决方式:

  • 用StringBuilder存储一次遍历的结果
  • 什么时候算是一遍呢?其实是长度为输入数字的个数
  • 单层循环逻辑即为,单个数字对应的字符循环

对应实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
java复制代码public class Solution {
// 数字到号码的映射
private final String[] map = {"abc", "def", "ghi", "jkl", "mno", "pqrs", "tuv", "wxyz"};
// 路径
private final StringBuilder path = new StringBuilder();

public List<String> letterCombinations(String digits) {
if (digits == null || "".equals(digits)) {
return Collections.emptyList();
}

List<String> result = new ArrayList<>();
backtrack(result, digits, 0);
return result;
}

// 回溯函数
private void backtrack(List<String> result, String digits, int index) {
if (path.length() == digits.length()) {
result.add(path.toString());
return;
}
String val = map[digits.charAt(index) - '2'];
for (char c : val.toCharArray()) {
path.append(c);
backtrack(result, digits, index + 1);
path.deleteCharAt(path.length() - 1);
}
}

执行用时:0 ms, 在所有 Java 提交中击败了100.00%的用户

内存消耗:37.2 MB, 在所有 Java 提交中击败了37.71%的用户

小结

回溯算法还是还好用的,除了占用内存比较大外,可能还有更好的解题方式,以后再分享吧,我们下道题见

今天多学一点。明天就少说一句求人的话

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【分布式技术专题】分布式缓存优化之初探布隆过滤器的使用指南

发表于 2021-11-18

这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战

布隆过滤器的思想

如果想要判断一个元素是不是在一个集合里,一般想到的是将所有元素保存起来,然后通过比较确定。链表,树等等数据结构都是这种思路. 但是随着集合中元素的增加,我们需要的存储空间越来越大,检索速度也越来越慢(O(n),O(logn))。

Hash表的数据结构

不过世界上还有一种叫作散列表(又叫哈希表,Hash table)的数据结构。它可以通过一个Hash函数将一个元素映射成一个位阵列(Bit array)中的一个点。这样一来,我们只要看看这个点是不是1就可以知道集合中有没有它了。这就是布隆过滤器的基本思想。

布隆过滤器的Hash算法

Hash面临的问题就是冲突。假设Hash函数是良好的,如果我们的位阵列长度为m个点,那么如果我们想将冲突率降低到例如 1%, 这个散列表就只能容纳m / 100个元素。显然这就不叫空间效率了(Space-efficient)了。解决方法也简单,就是使用多个Hash,如果它们有一个说元素不在集合中,那肯定就不在。如果它们都说在,虽然也有一定可能性它们在说谎,不过直觉上判断这种事情的概率是比较低的。

纯Java实现的方案

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
java复制代码 public class MyBloomFilter {

/**
* 一个长度为10 亿的比特位
*/
private static final int DEFAULT_SIZE = 256 << 22;

/**
* 为了降低错误率,使用加法hash算法,所以定义一个8个元素的质数数组
*/
private static final int[] seeds = {3, 5, 7, 11, 13, 31, 37, 61};

/**
* 相当于构建 8 个不同的hash算法
*/
private static HashFunction[] functions = new HashFunction[seeds.length];

/**
* 初始化布隆过滤器的 bitmap
*/
private static BitSet bitset = new BitSet(DEFAULT_SIZE);

/**
* 添加数据
*
* @param value 需要加入的值
*/
public static void add(String value) {
if (value != null) {
for (HashFunction f : functions) {
//计算 hash 值并修改 bitmap 中相应位置为 true
bitset.set(f.hash(value), true);
}
}
}

/**
* 判断相应元素是否存在
* @param value 需要判断的元素
* @return 结果
*/
public static boolean contains(String value) {
if (value == null) {
return false;
}
boolean ret = true;
for (HashFunction f : functions) {
ret = bitset.get(f.hash(value));
//一个 hash 函数返回 false 则跳出循环
if (!ret) {
break;
}
}
return ret;
}

/**
* 测试。。。
*/
public static void main(String[] args) {

for (int i = 0; i < seeds.length; i++) {
functions[i] = new HashFunction(DEFAULT_SIZE, seeds[i]);
}

// 添加1亿数据
for (int i = 0; i < 100000000; i++) {
add(String.valueOf(i));
}
String id = "123456789";
add(id);

System.out.println(contains(id)); // true
System.out.println("" + contains("234567890")); //false
}
}

class HashFunction {

private int size;
private int seed;

public HashFunction(int size, int seed) {
this.size = size;
this.seed = seed;
}

public int hash(String value) {
int result = 0;
int len = value.length();
for (int i = 0; i < len; i++) {
result = seed * result + value.charAt(i);
}
int r = (size - 1) & result;
return (size - 1) & result;
}
}

Redis实现布隆过滤器

布隆过滤器介绍

布隆过滤器是一个很长的二进制向量和一系列随机映射函数,适用于判断某个数据在集合中是否存在,会存在误识别。

  • 优点是空间效率和查询时间都比一般的算法要好的多
  • 缺点是有一定的误识别率和删除困难。

img

img

布隆过滤器使用场景

客户端–布隆过滤器(hashmap)-redis缓存–DB数据库

  1. 在程序启动时,将redis的所有key先缓存预热(加载)到布隆过滤器中,也可以用hashmap,但布隆过滤器的性能会比hashmap快很多。
  2. 客户端请求的时候,先经过布隆过滤器,判断key是否存在,不存在的话,直接返回,可以解决redis的穿透和击穿。
  3. 布隆过滤器误判经过redis里,也不会造成原先大批量的涌入,这是可以接受的
  4. 如果redis的key有所变更,让布隆过滤器重新缓存预热,可解决删除问题

布隆过滤器存在的问题

img

误判

  • Jarye2本身在二进制向量表中不存在,由于hash值和其他碰撞,导致以为存在。
  • 解决方式: 把误判概率设置的足够小,但会导致向量表会大很多。

删除困难

加入把Jarye2删了,会把向量地址8 13的值设置为0,导致原先应该命中的Jarye1无法命中

**解决方式:**缓存重新预热。

布隆过滤器demo示例

1
2
3
4
5
xml复制代码<dependency>
<groupId>com.google.guava</groupId>
<artifactId>guava</artifactId>
<version>22.0</version>
</dependency>
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
csharp复制代码/**
* 测试demo
*/
public class BlongTest {
/**
* 在布隆中存放100万条数据
*/
private static Integer size = 1000000;

public static void main(String[] args) {
/**
* 最后参数为误判率,必须要>0.0
* 误判率是3% 100W的数据,二巷数组长度为730W
* 误判率是1% 100W的数据,二巷数组长度为960W
* 综合效率和准确率,建议值是1%
*
*/
BloomFilter<Integer> integerBloomFilter =
BloomFilter.create(Funnels.integerFunnel(), size, 0.01);
for (int i = 0; i < size; i++) {
integerBloomFilter.put(i);
}
// 从布隆中查询数据是否存在
ArrayList<Integer> strings = new ArrayList<>();
for (int j = size; j < size + 10000; j++) {
if (integerBloomFilter.mightContain(j)) {
strings.add(j);
}
}
System.out.println("误判数量:" + strings.size());
}
}

基于布隆过滤器解决Redis击穿问题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
kotlin复制代码@RequestMapping("/getOrder")
public OrderEntity getOrder(Integer orderId) {
if (integerBloomFilter != null) {
if (!integerBloomFilter.mightContain(orderId)) {
System.out.println("从布隆过滤器中检测到该key不存在");
return null;
}
}
// 1.先查询Redis中数据是否存在
OrderEntity orderRedisEntity = (OrderEntity) redisTemplateUtils.getObject(orderId + "");
if (orderRedisEntity != null) {
System.out.println("直接从Redis中返回数据");
return orderRedisEntity;
}
// 2. 查询数据库的内容
System.out.println("从DB查询数据");
OrderEntity orderDBEntity = orderMapper.getOrderById(orderId);
if (orderDBEntity != null) {
System.out.println("将Db数据放入到Redis中");
redisTemplateUtils.setObject(orderId + "", orderDBEntity);
}
return orderDBEntity;
}

@RequestMapping("/dbToBulong")
public String dbToBulong() {
List<Integer> orderIds = orderMapper.getOrderIds();
integerBloomFilter = BloomFilter.create(Funnels.integerFunnel(), orderIds.size(), 0.01);
for (int i = 0; i < orderIds.size(); i++) {
integerBloomFilter.put(orderIds.get(i));
}
return "success";
}

资料参考

  • www.cnblogs.com/wuwuyong/p/…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

同事告诉我应该这样判空

发表于 2021-11-18

这是我参与11月更文挑战的第16天,活动详情查看:2021最后一次更文挑战

写在前面

我们都知道java8 Optional的诞生,解决了 if NULL 的问题,大家用的也游刃有余。但是这样就真的无后顾之忧了吗,是否还会出现 java.lang.NullPointerException,我们今天来看看

我们先看个测试类

1
2
3
4
5
6
7
8
9
java复制代码public class ListTest {
static List<User> list =null;
public static void main(String[]args) {
//01
System.out.println(list.isEmpty());//
//02
System.out.println(CollectionUtils.isEmpty(list));
}
}

抛出问题:上面01和02 哪个会出现java.lang.NullPointerException,结尾给答案

空指针梦魇

1
java复制代码java.lang.NullPointerException

这个画面的出现是每个程序员碰到的,比较low且不可避免的,之所以是比较low,是因为不该出现这么低级的错误,不可避免是因为下面两种原因

一种是由于业务逻辑没有判空,另外一种是没有一些坑没有考虑到(今天我们主要讨论这种)

我们先来看一段代码,通过id查询用户实体

java8之前是这样的

1
2
3
4
5
6
7
java复制代码public User getById(Integer userId) {
List<User> list = mapper.selectById(userId);
if(list != null){
return list.get(0);
}
return;
}

java8操作

1
2
3
4
java复制代码public User getById(Integer userId) {
List<User> list = mapper.selectById(userId);
return Optional.ofNullable(list).map(user -> user.get(0)).orElse(null);
}

写到这里 我想问,这里会不会报错,答案:会的,当list 查出来不是null,而是0 首先我们要知道这是list,有伙伴就说了,为什么非要弄个list,不能是查出个对象吗,其实这里是根据业务来的,有些业务就是必须查出个list。

回归正题,当list为0的时候 ofNullable 可能会校验不出来,但是对象不会报错,为什么 因为对象只有null 和 非null 两种情况, 但是list不是。

同事看到后,一顿操作,我们看下解决方案:

CollectionUtils

是的,没错 利用Spring中的集合工具类。 看下代码

1
2
3
4
5
6
java复制代码public User getById(Integer userId) {
List<User> list = mapper.selectById(userId);
return Optional.ofNullable(list)
.filter(u -> !CollectionUtils.isEmpty(list))
.map(user -> user.get(0)).orElse(null);
}

利用filter 再筛选一层,利用CollectionUtils.isEmpty函数,不管是null还是0,都不会到map那里get的时候出现NullPointerException。这样便可保证万无一失。

那这个你在平常代码中也可以运用

1
2
3
4
java复制代码List<User> list = mapper.selectById(userId);
if(!CollectionUtils.isEmpty(list)){
//执行逻辑
}

我们看下他的源码

1
2
3
4
5
6
7
8
9
10
11
12
java复制代码public abstract class CollectionUtils {

/**
* Return {@code true} if the supplied Collection is {@code null} or empty.
* Otherwise, return {@code false}.
* @param collection the Collection to check
* @return whether the given Collection is empty
*/
public static boolean isEmpty(@Nullable Collection<?> collection) {
return (collection == null || collection.isEmpty());
}
}

可以看到 他无非也是自己判断了,所以只是个工具类。帮你做了一些事情。

总结

那文章开头01和02的答案自然就出来了。

01会报空,02则安全。

好了 今天的分享就到这里,我们要善于发现这些工具类 来帮我们解决一些问题。

值得深究的是,java为什么不把更好的工具方式合进去呢?或者你们知道的一些工具类和其他好的方案,欢迎评论区聊聊

弦外之音

感谢你的阅读,如果你感觉学到了东西,您可以点赞,关注。也欢迎有问题我们下面评论交流

加油! 我们下期再见!

给大家分享几个我前面写的几篇骚操作

copy对象,这个操作有点骚!

干货!SpringBoot利用监听事件,实现异步操作

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

养成日志统计分析和数据可视化的习惯 日志统计分析的重要性 下

发表于 2021-11-18

「这是我参与11月更文挑战的第18天,活动详情查看:2021最后一次更文挑战」

一定要对数据敏感,养成日志统计分析的习惯,项目推进要对数据负责。

日志统计分析的重要性

今天收到了一个需求,让我统计审核通过的用户换头像的次数,用以评估复审的工作量。

收到需求之后没有马上动手,而是考虑是用日志统计,还是保持到DB中;因为只是查询数据评估工作量,没必要保存到db。

于是我准备保存到Log中,准备撸代码…

值得骄傲的事情发生了,我发现之前竟然打了Log。

之前在没人提需求的时候,我预判到了这里以后可能有用,于是打了Log。(我自己也忘记了,查了代码才发现。)

于是我就对Log数据进行统计分析,很快把数据给了运营同学,用以评估工作量。

说句题外话:如果我没有提前打好Log,运营同学如果需要以周围时间范围评估的话,那还要等一周之后再能拿到数据。

因为我提前打了Log,记录好了数据,把最近2个月的数据情况都导出了,大大节省了时间成本。

下面介绍 Laravel Log 的使用技巧

基本使用就不介绍了,这类文档太多了,只介绍一些我认为一般人可能不知道的比较优雅的用法。

日志记录参数

我们可以使用 Log::info(),或使用更短的 info() 额外参数信息,来记录更详细的信息

1
bash复制代码Log::info('User failed to login.', ['id' => $user->id]);

更方便的 DD

我们可以在 Eloquent 句子或者任何集合结尾添加 ->dd(),而不是使用 dd($result)。

1
2
3
4
5
6
shell复制代码// 以前
$users = User::where('name', 'Taylor')->get();
dd($users);

// 现在
$users = User::where('name', 'Taylor')->get()->dd();

合理集成三方工具

我们可以尝试使用三方的统计工具,对日志进行聚合查询和可视化分析。

比如我们使用了阿里云的日志服务

使用 Superset 这个轻量级BI工具,搭建了数据可视化后台。Superset 预定义了多种可视化图表,满足大部分的数据展示功能。

通过合理的集成三方工具能够极大的提高我们的开发速度和项目效率。

Last but not least

技术交流群请到 这里来。 或者添加我的微信 wangzhongyang0601 ,一起学习一起进步。

感谢大家的点赞、评论、关注;谢谢大佬们的支持,感谢 ღ( ´・ᴗ・` )比心

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…285286287…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%