开发者博客 – IT技术 尽在开发者博客

开发者博客 – 科技是第一生产力


  • 首页

  • 归档

  • 搜索

腾讯工程师教你玩转 RocksDB

发表于 2018-02-02

欢迎大家前往云+社区,获取更多腾讯海量技术实践干货哦~

作者:腾讯云数据库内核团队

原文标题:【腾讯云CDB】教你玩转MyRocks/RocksDB—STATISTICS与后台线程篇

  1. Intro

在facebook的MySQL版本(以下称为MyRocks)中,RocksDB是可选的存储引擎。相比于InnoDB引擎,RocksDB的一个重要的优势是它使用更少的磁盘空间。在生产系统中,特别是用户数在亿级以上的互联网应用,磁盘空间是其中比较大的成本之一,而能够使用更少的磁盘空间的RocksDB无疑是具有吸引力的。然而在生产系统中使用新的存储引擎自然有它的潜在风险,除了通过外部的各种benchmark工具测试得到各种性能数据,全方位的内部指标可以帮助我们真正了解数据库内部正在发生的事情,对于性能调优和开发都具有指导意义。而MyRocks通过SHOW ENGINE ROCKSDB STATUS和多个INFORMATION_SCHEMA表等方式提供了较为全面的内部指标。

本文将介绍SHOW ENGINE ROCKSDB STATUS中关于STATISTICS统计值与后台线程的实现原理。在了解实现原理的基础上,便可以较容易地通过扩展功能使它更好地为我们服务。

调用SHOW ENGINE ROCKSDB STATUS指令会返回多行数据,其中包括:

  • STATISTICS:RocksDB引擎所有线程的所有操作的各类count/time的累加,比如rocksdb.block.cache.hit和rocksdb.db.write.micros。
  • BG_THREADS: 后台线程的状态。
  • DBSTATS: 数据库操作的统计。
  • CF_COMPACTION: 各个Column family进行compaction的相关指标统计。
  • MEMORY_STATS: 内存使用情况。

调用SHOW ENGINE ROCKSDB STATUS会返回若干行数据,然而这些数据并非事先存储于某个表格中,而是通过调用位于rocksdb/ha_rocksdb.cc文件中的rocksdb_show_status函数将内存中对应的数值进行规整返回给用户。

  1. STATISTICS

根据RocksDB官方相关文档介绍STATISTICS,开启STATISTICS会增加增加5%-10%额外开销。

STATISTICS统计值记录着RocksDB引擎所有线程的所有操作的各类count/time的累加。RocksDB引擎在它的各类操作如Put/Get/Delete中的代码都设立了很多埋点。

以函数GetEntryFromCache为例,它的作用是返回可用的block cache。特别地,可以看到statistics是GetEntryFromCache和block_cache->Lookup的一个参数。没错,就是靠着statistics这个参数它到处收集数据。 当有可用的block cache时,调用了三次RecordTick为其中三个统计值增加计数;没有可用的block cache,同样也为BLOCK_CACHE_MISS和block_cache_miss_ticker增加计数。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
复制代码Cache::Handle* GetEntryFromCache(Cache* block_cache, const Slice& key,
Tickers block_cache_miss_ticker,
Tickers block_cache_hit_ticker,
Statistics* statistics) {
auto cache_handle = block_cache->Lookup(key, statistics);
if (cache_handle != nullptr) {
PERF_COUNTER_ADD(block_cache_hit_count, 1);
// overall cache hit
RecordTick(statistics, BLOCK_CACHE_HIT);
// total bytes read from cache
RecordTick(statistics, BLOCK_CACHE_BYTES_READ,
block_cache->GetUsage(cache_handle));
// block-type specific cache hit
RecordTick(statistics, block_cache_hit_ticker);
} else {
// overall cache miss
RecordTick(statistics, BLOCK_CACHE_MISS);
// block-type specific cache miss
RecordTick(statistics, block_cache_miss_ticker);
}

return cache_handle;
}

1.1 RocksDB的STATISTICS接口

使用STATISTICS的方法也很简单。

它的头文件位于:

1
2
复制代码include/rocksdb/statistics.h
monitoring/statistics.h

使用方法:

1
2
复制代码Options options;
options.statistics = rocksdb::CreateDBStatistics();

可选统计级别:

  • kExceptDetailedTimers: 除去mutex等待和压缩的计时
  • kExceptTimeForMutex: 除去mutex等待的计时
  • kAll: 所有

数据统计类型分成两种:

  • ticker:计数,类型是64位无符号整型。用于度量counters (e.g. “rocksdb.block.cache.hit”), cumulative bytes (e.g. “rocksdb.bytes.written”) 或者 time (e.g. “rocksdb.l0.slowdown.micros”)。
  • histogram:统计数据的统计分布,包括最大值、最小值、平均值、中位数、标准差。

统计函数的接口:

  • MeasureTime:函数名有歧义。实际上是把value记录到histogram中。
  • RecordTick:累加ticker。

获取结果的接口:

  • Statistics::getTickerCount:指定ticker type获得count。
  • Statistics::histogramData:指定Histograms type,返回一个HistogramData结构体,成员是统计值,包括最大值、最小值、平均值、中位数、标准差。
  • Statistics::getHistogramString:指定Histograms type,返回直方图可读的字符串。
  • Statistics::ToString():返回可读的字符串,包括所有的ticker和histogram。

1.2 RocksDB的STATISTICS实现

RocksDB实现了StatisticsImpl类,继承了Statistics的接口。

主要接口:

  • getTickerCount
  • histogramData
  • getHistogramString
  • getAndResetTickerCount
  • recordTick
  • measureTime
  • ToString

成员变量:

  • TickerInfo tickers_[INTERNAL_TICKER_ENUM_MAX];
  • HistogramInfo histograms_[INTERNAL_HISTOGRAM_ENUM_MAX];

这里的TickerInfo和HistogramInfo类型的数据结构是相似的:一个线程局部的counter或者time;加上一个非线程局部的统计值用来累加counter或者time。

TickerInfo类型包含两个参数:

ThreadLocalPtr类型(真实类型ThreadTickerInfo)的thread_value,包含:

  • 整型类型的value
  • 指向merged_sum的指针
  • 整型类型的merged_sum
  • HistogreamInfo类型包含两个参数:

ThreadLocalPtr类型(真实类型ThreadHistogramInfo)的thread_value,包含:

  • HistogramImpl类型的value
  • 指向merged_hist的指针
  • 指向merge_lock的指针
  • HistogramImpl类型的merged_hist
  • Mutex类型的merge_lock

事实上,STATISTICS相关实现是比较巧妙的,也是使用STATISTICS仅增加5%-10%的关键。为了避免线程间共享数据导致CPU的cache频繁失效,merged_sum和merged_hist初始化时都是空的,而且当且仅当线程退出时,才调用mergeThreadValue函数将TickerInfo和HistogreamInfo中的线程局部变量累加到merged_sum和merged_hist。

1.3 MyRocks的使用

MyRocks使用了RocksDB提供的接口进行数据统计。通过声明了变量rocksdb_stats,并且随着RocksDB引擎启动时通过rocksdb_init_func函数进行初始化。

1
2
复制代码rocksdb_stats = rocksdb::CreateDBStatistics();
rocksdb_db_options->statistics = rocksdb_stats;

除了使用所有RocksDB引擎层的统计,MyRocks还通过定义了

1
复制代码commit_latency_stats = new rocksdb::HistogramImpl();

在rocksdb_commit_by_xid和rocksdb_commit两个函数中通过计时的方式,统计了每一次commit所花费的时间。

1
2
3
复制代码rocksdb::StopWatchNano timer(rocksdb::Env::Default(), true);
...
commit_latency_stats->Add(timer.ElapsedNanos() / 1000);

在rocksdb_show_status函数中,输出Statistics统计的过程如下:

  1. 如果定义rocksdb_stats,则调用rocksdb_stats->ToString()将统计值转化为可读的字符串;
  2. commit_latency_stats是直方图的类型,输出对应的50%, 95%, 99%, 100%四个位点的对应的值。
  3. 假如定义了is-write-stopped或者actual-delayed-write-rate等Property变量,同样会将它们输出。

2 后台线程

通过调用SHOW ENGINE ROCKSDB STATUS可以得到与BG_THREADS相关结果,它的输出结果类似于:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码Type: BG_THREADS
Name: 140173379593984
Status:
thread_type: Low Pri##
cf_name: default
operation_type: Compaction
operation_stage: CompactionJob::ProcessKeyValueCompaction
elapsed_time_ms: 6172.244 ms
BaseInputLevel: 0
BytesRead: 992806363
BytesWritten: 992071408
IsDeletion: 0
IsManual: 0
IsTrivialMove: 0
JobID: 1936
OutputLevel: 5
TotalInputBytes: 1586832446
state_type:

可以看到较多的信息量:这个线程正在进行Compaction,处于CompactionJob::ProcessKeyValueCompaction阶段,已经耗时6172.244 ms,读取的字节数为992806363,写出的字节数为992071408。然而并不包括可能感兴趣的正在进行Compaction的源文件和目标文件等信息。正如文章开头提到的,了解实现原理能够使我们更好地进行扩展。

2.1 thread status的接口与实现

MyRocks中的SHOW ENGINE ROCKSDB STATUS指令展示BG_THREAD的机制使用了RocksDB中关于thread status的接口。

它的头文件位于:

1
2
3
4
5
复制代码include/rocksdb/env.h
include/rocksdb/thread_status.h
util/thread_operation.h
monitoring/thread_status_updater.h
monitoring/thread_status_util.h

关键类:

ThreadStatusUpdater:存储了各自后台线程的状态和所有后台线程状态的指针。 ThreadStatusUtil:该类只有静态变量和静态方法,推荐通过该类的方法去更新ThreadStatusUpdater中的状态。

使用方法:

  • 将该线程的统计加入ThreadStatusUpdater:调用ThreadStatusUtil::RegisterThread
  • 将该线程的统计从ThreadStatusUpdater删除:调用ThreadStatusUtil::UnregisterThread
  • 其他修改thread status的函数:见monitoring/thread_status_util.h

通过调用env的GetThreadList()函数可以获得当前后台线程的状态,状态的状态值存放于一个vector中。将其中的内容展现出来,类似于下图:

从代码中可以看到,实现thread status的目的展示flush和compaction的运行状态。当然,我们也可以将用户线程的状态存储到thread status,通过调用SHOW ENGINE ROCKSDB STATUS指令展示。

特别地,可以看到compaction特有的状态值有:

1
2
3
4
5
6
7
8
9
复制代码enum CompactionPropertyType : int {
COMPACTION_JOB_ID = 0,
COMPACTION_INPUT_OUTPUT_LEVEL,
COMPACTION_PROP_FLAGS,
COMPACTION_TOTAL_INPUT_BYTES,
COMPACTION_BYTES_READ,
COMPACTION_BYTES_WRITTEN,
NUM_COMPACTION_PROPERTIES
};

flush特有的状态值有:

1
2
3
4
5
6
复制代码  enum FlushPropertyType : int {
FLUSH_JOB_ID = 0,
FLUSH_BYTES_MEMTABLES,
FLUSH_BYTES_WRITTEN,
NUM_FLUSH_PROPERTIES
};

2.2 MyRocks/RocksDB的使用

在RocksDB的线程池实现中,每一个启动的后台线程都会通过调用ThreadStatusUtil::RegisterThread加入被观测的后台线程的集合中。

1
复制代码ThreadPoolImpl::Impl::StartBGThreads-->BGThreadWrapper-->ThreadStatusUtil::RegisterThread

在rocksdb_show_status函数中,输出BG_THREAD的过程如下:

  1. 通过调用GetThreadList(&thread_list)获得所有后台线程的ThreadStatus的集合。

  2. 通过遍历ThreadStatus的集合将每一个后台线程的状态依次输出。

  3. 小结


本文章介绍了SHOW ENGINE ROCKSDB STATUS指令中关于STATISTICS与BG_THREAD的相关内容。

相关阅读

MySQL 内核深度优化

【腾讯云CDB】深入解析MySQL binlog

【腾讯云CDB】源码分析 · MySQL binlog组提交和Multi-Threaded-Slave

此文已由作者授权云加社区发布,转载请注明文章出处;

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Linux Socket编程(不限Linux) 吴秦(Tyl

发表于 2018-02-01

Linux Socket编程(不限Linux) - 吴秦 - 博客园

代码改变世界

  • Posts - 107, Articles - 0, Comments - 1736
  • Cnblogs
  • Dashboard
  • Login
  • Home
  • Contact
  • Gallery
  • RSS

吴秦(Tyler)

Linux Socket编程(不限Linux)

2010-12-12 21:58 by 吴秦, 264619 阅读, 73 评论, 收藏, 编辑
“一切皆Socket!”

话虽些许夸张,但是事实也是,现在的网络编程几乎都是用的socket。

——有感于实际编程和开源项目研究。

我们深谙信息交流的价值,那网络中进程之间如何通信,如我们每天打开浏览器浏览网页时,浏览器的进程怎么与web服务器通信的?当你用QQ聊天时,QQ进程怎么与服务器或你好友所在的QQ进程通信?这些都得靠socket?那什么是socket?socket的类型有哪些?还有socket的基本函数,这些都是本文想介绍的。本文的主要内容如下:

  • 1、网络中进程之间如何通信?
  • 2、Socket是什么?
  • 3、socket的基本操作
    • 3.1、socket()函数
    • 3.2、bind()函数
    • 3.3、listen()、connect()函数
    • 3.4、accept()函数
    • 3.5、read()、write()函数等
    • 3.6、close()函数
  • 4、socket中TCP的三次握手建立连接详解
  • 5、socket中TCP的四次握手释放连接详解
  • 6、一个例子(实践一下)
  • 7、留下一个问题,欢迎大家回帖回答!!!

1、网络中进程之间如何通信?

本地的进程间通信(IPC)有很多种方式,但可以总结为下面4类:

  • 消息传递(管道、FIFO、消息队列)
  • 同步(互斥量、条件变量、读写锁、文件和写记录锁、信号量)
  • 共享内存(匿名的和具名的)
  • 远程过程调用(Solaris门和Sun RPC)

但这些都不是本文的主题!我们要讨论的是网络中进程之间如何通信?首要解决的问题是如何唯一标识一个进程,否则通信无从谈起!在本地可以通过进程PID来唯一标识一个进程,但是在网络中这是行不通的。其实TCP/IP协议族已经帮我们解决了这个问题,网络层的“ip地址”可以唯一标识网络中的主机,而 传输层的“协议+端口”可以唯一标识主机中的应用程序(进程)。这样利用三元组(ip地址,协议,端口)就可以标识网络的进程了,网络中的进程通信就可以利用这个标志与其它进程进行交互。

使用TCP/IP协议的应用程序通常采用应用编程接口:UNIX BSD的套接字(socket)和UNIX System V的TLI(已经被淘汰),来实现网络进程之间的通信。就目前而言,几乎所有的应用程序都是采用socket,而现在又是网络时代,网络中进程通信是无处不在,这就是我为什么说“一切皆socket”。

2、什么是Socket?

上面我们已经知道网络中的进程是通过socket来通信的,那什么是socket呢?socket起源于Unix,而Unix/Linux基本哲学之一就是“一切皆文件”,都可以用“打开open –> 读写write/read –> 关闭close”模式来操作。我的理解就是Socket就是该模式的一个实现,socket即是一种特殊的文件,一些socket函数就是对其进行的操作(读/写IO、打开、关闭),这些函数我们在后面进行介绍。

socket一词的起源

在组网领域的首次使用是在1970年2月12日发布的文献IETF RFC33中发现的,撰写者为Stephen Carr、Steve Crocker和Vint Cerf。根据美国计算机历史博物馆的记载,Croker写道:“命名空间的元素都可称为套接字接口。一个套接字接口构成一个连接的一端,而一个连接可完全由一对套接字接口规定。”计算机历史博物馆补充道:“这比BSD的套接字接口定义早了大约12年。”

3、socket的基本操作

既然socket是“open—write/read—close”模式的一种实现,那么socket就提供了这些操作对应的函数接口。下面以TCP为例,介绍几个基本的socket接口函数。

3.1、socket()函数

1
复制代码int **socket**(int domain, int type, int protocol);

socket函数对应于普通文件的打开操作。普通文件的打开操作返回一个文件描述字,而socket()用于创建一个socket描述符(socket descriptor),它唯一标识一个socket。这个socket描述字跟文件描述字一样,后续的操作都有用到它,把它作为参数,通过它来进行一些读写操作。

正如可以给fopen的传入不同参数值,以打开不同的文件。创建socket的时候,也可以指定不同的参数创建不同的socket描述符,socket函数的三个参数分别为:

  • domain:即协议域,又称为协议族(family)。常用的协议族有,AF_INET、AF_INET6、AF_LOCAL(或称 AF_UNIX,Unix域socket)、AF_ROUTE等等。协议族决定了socket的地址类型,在通信中必须采用对应的地址,如AF_INET决定了要用ipv4地址(32位的)与端口号(16位的)的组合、AF_UNIX决定了要用一个绝对路径名作为地址。
  • type:指定socket类型。常用的socket类型有,SOCK_STREAM、SOCK_DGRAM、 SOCK_RAW、SOCK_PACKET、SOCK_SEQPACKET等等(socket的类型有哪些?)。
  • protocol:故名思意,就是指定协议。常用的协议有,IPPROTO_TCP、IPPTOTO_UDP、IPPROTO_SCTP、 IPPROTO_TIPC等,它们分别对应TCP传输协议、UDP传输协议、STCP传输协议、TIPC传输协议(这个协议我将会单独开篇讨论!)。

注意:并不是上面的type和protocol可以随意组合的,如SOCK_STREAM不可以跟IPPROTO_UDP组合。当protocol为0时,会自动选择type类型对应的默认协议。

当我们调用socket创建一个socket时,返回的socket描述字它存在于协议族(address family,AF_XXX)空间中,但没有一个具体的地址。如果想要给它赋值一个地址,就必须调用bind()函数,否则就当调用connect()、listen()时系统会自动随机分配一个端口。

3.2、bind()函数

正如上面所说bind()函数把一个地址族中的特定地址赋给socket。例如对应AF_INET、AF_INET6就是把一个ipv4或ipv6地址和端口号组合赋给socket。

1
cpp复制代码int bind(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

函数的三个参数分别为:

  • sockfd:即socket描述字,它是通过socket()函数创建了,唯一标识一个socket。bind()函数就是将给这个描述字绑定一个名字。
  • addr:一个const struct sockaddr *指针,指向要绑定给sockfd的协议地址。这个地址结构根据地址创建socket时的地址协议族的不同而不同,如ipv4对应的是:
1
2
3
4
5
6
7
8
9
10
cpp复制代码struct sockaddr_in {
sa_family_t sin_family; /* address family: AF_INET */
in_port_t sin_port; /* port in network byte order */
struct in_addr sin_addr; /* internet address */
};

/* Internet address. */
struct in_addr {
uint32_t s_addr; /* address in network byte order */
};

ipv6对应的是:

1
2
3
4
5
6
7
8
9
10
11
cpp复制代码struct sockaddr_in6 { 
sa_family_t sin6_family; /* AF_INET6 */
in_port_t sin6_port; /* port number */
uint32_t sin6_flowinfo; /* IPv6 flow information */
struct in6_addr sin6_addr; /* IPv6 address */
uint32_t sin6_scope_id; /* Scope ID (new in 2.4) */
};

struct in6_addr {
unsigned char s6_addr[16]; /* IPv6 address */
};

Unix域对应的是:

1
2
3
4
5
6
cpp复制代码#define UNIX_PATH_MAX    108

struct sockaddr_un {
sa_family_t sun_family; /* AF_UNIX */
char sun_path[UNIX_PATH_MAX]; /* pathname */
};
  • addrlen:对应的是地址的长度。

通常服务器在启动的时候都会绑定一个众所周知的地址(如ip地址+端口号),用于提供服务,客户就可以通过它来接连服务器;而客户端就不用指定,有系统自动分配一个端口号和自身的ip地址组合。这就是为什么通常服务器端在listen之前会调用bind(),而客户端就不会调用,而是在 connect()时由系统随机生成一个。

网络字节序与主机字节序

主机字节序就是我们平常说的大端和小端模式:不同的CPU有不同的字节序类型,这些字节序是指整数在内存中保存的顺序,这个叫做主机序。引用标准的Big-Endian和Little-Endian的定义如下:

  a) Little-Endian就是低位字节排放在内存的低地址端,高位字节排放在内存的高地址端。

  b) Big-Endian就是高位字节排放在内存的低地址端,低位字节排放在内存的高地址端。

网络字节序:4个字节的32 bit值以下面的次序传输:首先是0~7bit,其次8~15bit,然后16~23bit,最后是24~31bit。这种传输次序称作大端字节序。由于TCP/IP首部中所有的二进制整数在网络中传输时都要求以这种次序,因此它又称作网络字节序。字节序,顾名思义字节的顺序,就是大于一个字节类型的数据在内存中的存放顺序,一个字节的数据没有顺序的问题了。

所以:在将一个地址绑定到socket的时候,请先将主机字节序转换成为网络字节序,而不要假定主机字节序跟网络字节序一样使用的是Big-Endian。由于这个问题曾引发过血案!公司项目代码中由于存在这个问题,导致了很多莫名其妙的问题,所以请谨记对主机字节序不要做任何假定,务必将其转化为网络字节序再赋给socket。

3.3、listen()、connect()函数

如果作为一个服务器,在调用socket()、bind()之后就会调用listen()来监听这个socket,如果客户端这时调用connect()发出连接请求,服务器端就会接收到这个请求。

1
2
cpp复制代码int listen(int sockfd, int backlog);
int connect(int sockfd, const struct sockaddr *addr, socklen_t addrlen);

listen函数的第一个参数即为要监听的socket描述字,第二个参数为相应socket可以排队的最大连接个数。socket()函数创建的socket默认是一个主动类型的,listen函数将socket变为被动类型的,等待客户的连接请求。

connect函数的第一个参数即为客户端的socket描述字,第二参数为服务器的socket地址,第三个参数为socket地址的长度。客户端通过调用connect函数来建立与TCP服务器的连接。

3.4、accept()函数

TCP服务器端依次调用socket()、bind()、listen()之后,就会监听指定的socket地址了。TCP客户端依次调用socket()、 connect()之后就想TCP服务器发送了一个连接请求。TCP服务器监听到这个请求之后,就会调用accept () 函数取接收请求,这样连接就建立好了。之后就可以开始网络I/O操作了,即类同于普通文件的读写I/O操作。

1
cpp复制代码int accept(int sockfd, struct sockaddr *addr, socklen_t *addrlen);

accept函数的第一个参数为服务器的socket描述字,第二个参数为指向struct sockaddr *的指针,用于返回客户端的协议地址,第三个参数为协议地址的长度。如果accpet成功,那么其返回值是由内核自动生成的一个全新的描述字,代表与返回客户的TCP连接。

注意:accept的第一个参数为服务器的socket描述字,是服务器开始调用socket()函数生成的,称为监听socket描述字;而accept函数返回的是 已连接的socket描述字。一个服务器通常通常仅仅只创建一个监听socket描述字,它在该服务器的生命周期内一直存在。内核为每个由服务器进程接受的客户连接创建了一个已连接socket描述字,当服务器完成了对某个客户的服务,相应的已连接socket描述字就被关闭。

3.5、read()、write()等函数

万事具备只欠东风,至此服务器与客户已经建立好连接了。可以调用网络I/O进行读写操作了,即实现了网咯中不同进程之间的通信!网络I/O操作有下面几组:

  • read()/write()
  • recv()/send()
  • readv()/writev()
  • recvmsg()/sendmsg()
  • recvfrom()/sendto()

我推荐使用recvmsg()/sendmsg()函数,这两个函数是最通用的I/O函数,实际上可以把上面的其它函数都替换成这两个函数。它们的声明如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
cpp复制代码       #include <unistd.h>

ssize_t read(int fd, void *buf, size_t count);
ssize_t write(int fd, const void *buf, size_t count);

#include <sys/types.h>
#include <sys/socket.h>

ssize_t send(int sockfd, const void *buf, size_t len, int flags);
ssize_t recv(int sockfd, void *buf, size_t len, int flags);

ssize_t sendto(int sockfd, const void *buf, size_t len, int flags,
const struct sockaddr *dest_addr, socklen_t addrlen);
ssize_t recvfrom(int sockfd, void *buf, size_t len, int flags,
struct sockaddr *src_addr, socklen_t *addrlen);

ssize_t sendmsg(int sockfd, const struct msghdr *msg, int flags);
ssize_t recvmsg(int sockfd, struct msghdr *msg, int flags);

read函数是负责从fd中读取内容.当读成功时,read返回实际所读的字节数,如果返回的值是0表示已经读到文件的结束了,小于0表示出现了错误。如果错误为EINTR说明读是由中断引起的,如果是ECONNREST表示网络连接出了问题。

write函数将buf中的nbytes字节内容写入文件描述符fd.成功时返回写的字节数。失败时返回-1,并设置errno变量。 在网络程序中,当我们向套接字文件描述符写时有俩种可能。1)write的返回值大于0,表示写了部分或者是全部的数据。2)返回的值小于0,此时出现了错误。我们要根据错误类型来处理。如果错误为EINTR表示在写的时候出现了中断错误。如果为EPIPE表示网络连接出现了问题(对方已经关闭了连接)。

其它的我就不一一介绍这几对I/O函数了,具体参见man文档或者baidu、Google,下面的例子中将使用到send/recv。

3.6、close()函数

在服务器与客户端建立连接之后,会进行一些读写操作,完成了读写操作就要关闭相应的socket描述字,好比操作完打开的文件要调用fclose关闭打开的文件。

1
2
cpp复制代码#include <unistd.h>
int close(int fd);

close一个TCP socket的缺省行为时把该socket标记为以关闭,然后立即返回到调用进程。该描述字不能再由调用进程使用,也就是说不能再作为read或write的第一个参数。

注意:close操作只是使相应socket描述字的引用计数-1,只有当引用计数为0的时候,才会触发TCP客户端向服务器发送终止连接请求。

4、socket中TCP的三次握手建立连接详解

我们知道tcp建立连接要进行“三次握手”,即交换三个分组。大致流程如下:

  • 客户端向服务器发送一个SYN J
  • 服务器向客户端响应一个SYN K,并对SYN J进行确认ACK J+1
  • 客户端再想服务器发一个确认ACK K+1

只有就完了三次握手,但是这个三次握手发生在socket的那几个函数中呢?请看下图:

image

图1、socket中发送的TCP三次握手

从图中可以看出,当客户端调用connect时,触发了连接请求,向服务器发送了SYN J包,这时connect进入阻塞状态;服务器监听到连接请求,即收到SYN J包,调用accept函数接收请求向客户端发送SYN K ,ACK J+1,这时accept进入阻塞状态;客户端收到服务器的SYN K ,ACK J+1之后,这时connect返回,并对SYN K进行确认;服务器收到ACK K+1时,accept返回,至此三次握手完毕,连接建立。

总结:客户端的connect在三次握手的第二个次返回,而服务器端的accept在三次握手的第三次返回。

5、socket中TCP的四次握手释放连接详解

上面介绍了socket中TCP的三次握手建立过程,及其涉及的socket函数。现在我们介绍socket中的四次握手释放连接的过程,请看下图:

image

图2、socket中发送的TCP四次握手

图示过程如下:

  • 某个应用进程首先调用close主动关闭连接,这时TCP发送一个FIN M;
  • 另一端接收到FIN M之后,执行被动关闭,对这个FIN进行确认。它的接收也作为文件结束符传递给应用进程,因为FIN的接收意味着应用进程在相应的连接上再也接收不到额外数据;
  • 一段时间之后,接收到文件结束符的应用进程调用close关闭它的socket。这导致它的TCP也发送一个FIN N;
  • 接收到这个FIN的源发送端TCP对它进行确认。

这样每个方向上都有一个FIN和ACK。

6、一个例子(实践一下)

说了这么多了,动手实践一下。下面编写一个简单的服务器、客户端(使用TCP)——服务器端一直监听本机的6666号端口,如果收到连接请求,将接收请求并接收客户端发来的消息;客户端与服务器端建立连接并发送一条消息。

服务器端代码:

)服务器端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
复制代码#include<stdio.h>  
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>

#define MAXLINE 4096

int main(int argc, char** argv)
{
int listenfd, connfd;
struct sockaddr_in servaddr;
char buff[4096];
int n;

if( (listenfd = socket(AF_INET, SOCK_STREAM, 0)) == -1 ){
printf("create socket error: %s(errno: %d)\n",strerror(errno),errno);
exit(0);
}

memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_addr.s_addr = htonl(INADDR_ANY);
servaddr.sin_port = htons(6666);

if( bind(listenfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) == -1){
printf("bind socket error: %s(errno: %d)\n",strerror(errno),errno);
exit(0);
}

if( listen(listenfd, 10) == -1){
printf("listen socket error: %s(errno: %d)\n",strerror(errno),errno);
exit(0);
}

printf("======waiting for client's request======\n");
while(1){
if( (connfd = accept(listenfd, (struct sockaddr*)NULL, NULL)) == -1){
printf("accept socket error: %s(errno: %d)",strerror(errno),errno);
continue;
}
n = recv(connfd, buff, MAXLINE, 0);
buff[n] = '\0';
printf("recv msg from client: %s\n", buff);
close(connfd);
}

close(listenfd);
}

客户端代码:

)客户端

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
复制代码#include<stdio.h>  
#include<stdlib.h>
#include<string.h>
#include<errno.h>
#include<sys/types.h>
#include<sys/socket.h>
#include<netinet/in.h>

#define MAXLINE 4096

int main(int argc, char** argv)
{
int sockfd, n;
char recvline[4096], sendline[4096];
struct sockaddr_in servaddr;

if( argc != 2){
printf("usage: ./client <ipaddress>\n");
exit(0);
}

if( (sockfd = socket(AF_INET, SOCK_STREAM, 0)) < 0){
printf("create socket error: %s(errno: %d)\n", strerror(errno),errno);
exit(0);
}

memset(&servaddr, 0, sizeof(servaddr));
servaddr.sin_family = AF_INET;
servaddr.sin_port = htons(6666);
if( inet_pton(AF_INET, argv[1], &servaddr.sin_addr) <= 0){
printf("inet_pton error for %s\n",argv[1]);
exit(0);
}

if( connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0){
printf("connect error: %s(errno: %d)\n",strerror(errno),errno);
exit(0);
}

printf("send msg to server: \n");
fgets(sendline, 4096, stdin);
if( send(sockfd, sendline, strlen(sendline), 0) < 0)
{
printf("send msg error: %s(errno: %d)\n", strerror(errno), errno);
exit(0);
}

close(sockfd);
exit(0);
}

当然上面的代码很简单,也有很多缺点,这就只是简单的演示socket的基本函数使用。其实不管有多复杂的网络程序,都使用的这些基本函数。上面的服务器使用的是迭代模式的,即只有处理完一个客户端请求才会去处理下一个客户端的请求,这样的服务器处理能力是很弱的,现实中的服务器都需要有并发处理能力!为了需要并发处理,服务器需要fork()一个新的进程或者线程去处理请求等。

7、动动手

留下一个问题,欢迎大家回帖回答!!!是否熟悉Linux下网络编程?如熟悉,编写如下程序完成如下功能:

服务器端:

接收地址192.168.100.2的客户端信息,如信息为“Client Query”,则打印“Receive Query”

客户端:

向地址192.168.100.168的服务器端顺序发送信息“Client Query test”,“Cleint Query”,“Client Query Quit”,然后退出。

题目中出现的ip地址可以根据实际情况定。

——本文只是介绍了简单的socket编程。

更为复杂的需要自己继续深入。

(unix domain socket)使用udp发送>=128K的消息会报ENOBUFS的错误(一个实际socket编程中遇到的问题,希望对你有帮助)

作者:吴秦
出处:http://www.cnblogs.com/skynet/
本文基于署名 2.5 中国大陆许可协议发布,欢迎转载,演绎或用于商业目的,但是必须保留本文的署名吴秦(包含链接).

  • 分类: C/C++ Internals,Unix/Linux下编程

  • < Prev12 Add your comment

  1. #51楼 u011603302 2014-07-01 19:11

    socket base, mark 支持(0)反对(0)

  2. #52楼 sjtufighter 2014-07-28 10:43

    非常精彩,谢谢分享 支持(0)反对(0)

  3. #53楼 涂老 2014-10-09 15:22

    if( connect(sockfd, (struct sockaddr*)&servaddr, sizeof(servaddr)) < 0)这里始终是-1
    代码只改了:
    char IPdotdec[20]; //存放十进制的IP地址
    printf(“please input ip address”);
    scanf(“%s”,IPdotdec);
    inet_pton(AF_INET,IPdotdec,&servaddr.sin_addr)<=0,这一步没有问题,是在xcode5中运行的,始终连不上服务端
    支持(0)反对(0)

  4. #54楼 涂老 2014-10-09 15:24

    这个sizeof(servaddr)是16 支持(0)反对(0)

  5. #55楼 zzf-bingo 2014-11-26 09:50

    很好的一篇文章!不过,我找了一下,没有找到你说的后续会单独开篇的socket()中的protocol字段的单独说明啊?(话说,为了回复你,我专门注册了博客园账号。。。) 支持(0)反对(0)

  6. #56楼 jueying_8888 2015-01-28 08:50

    好文章,mark 支持(0)反对(0)

  7. #57楼 没出没 2015-02-27 16:27

    请教一下,如果我想重启server端,但是整个过程中保持socket不被关闭,也就是仍然可以接收client的请求,只是将其缓存,待server重启成功后继续处理,该怎么办?? 支持(0)反对(0)http://pic.cnblogs.com/face/u303875.gif?id=26210113

  8. #58楼[楼主] 吴秦 2015-02-27 16:30

    @ 没出没

引用请教一下,如果我想重启server端,但是整个过程中保持socket不被关闭,也就是仍然可以接收client的请求,只是将其缓存,待server重启成功后继续处理,该怎么办??
可以参考“nginx平滑重启与平滑升级”的做法 支持(0) 反对(0)http://pic.cnblogs.com/face/u92071.jpg
9. ##### #59楼 roger9567 2015-04-14 00:30

nice! 支持(0)反对(0)
10. ##### #60楼 乐安居 2015-06-03 15:44

真的写的不错。以前“几次握手”只是有个概念,这图片解释的很详细! 支持(0)反对(0)
11. ##### #61楼 Sawyer Ford 2015-06-09 21:18

讲解的很好 支持(0)反对(0)http://pic.cnblogs.com/face/565026/20141119212220.png
12. ##### #62楼 myg 2015-06-21 17:25

楼主 :
服务器监听到连接请求,即收到SYN J包,调用accept函数接收请求向客户端发送SYN K ,ACK J+1,这时accept进入阻塞状态;这句话有问题!!
服务端不调用accept函数 ,也照样会向客户端发送SYN+ACK报文!!! 支持(0)反对(0)
13. ##### #63楼 无他唯手熟尔 2015-07-23 22:22

先感谢楼主的干货,我觉得我还得在看一次。 支持(0)反对(0)
14. ##### #64楼 岁月山庄 2015-11-17 11:07

楼主你好,“服务器监听到连接请求,即收到SYN J包,调用accept函数接收请求向客户端发送SYN K ,ACK J+1,这时accept进入阻塞状态”这一句感觉有问题,服务器向客户端发送SYN K,ACK J+1,不需要调用accept函数,调用accept作用只是已建立连接的tcp队列中取出一个。 支持(0) 反对(0)
15. ##### #65楼 吴老二 2015-12-28 10:40

好文章 支持(0)反对(0)http://pic.cnblogs.com/face/828194/20151027113020.png
16. ##### #66楼 Tomasmule 2016-04-19 11:58

刚开通博客园,已粉 支持(0)反对(0)
17. ##### #67楼 程序猿–少停 2017-04-17 14:03

赞一下 支持(0)反对(0)http://pic.cnblogs.com/face/781239/20170930105542.png
18. ##### #68楼 无畏者无惧 2017-06-19 11:51

不错,赞一个 支持(0)反对(0)http://pic.cnblogs.com/face/726451/20150302175807.png
19. ##### #69楼 Apprentice_saber 2017-07-24 19:19

多谢楼主的干货。赞一个!!! 支持(0)反对(0)
20. ##### #70楼 快跑啊兔兔 2017-09-11 18:04

很精彩,感谢。 支持(0)反对(0)http://pic.cnblogs.com/face/990360/20170523180602.png
21. ##### #71楼 KRCheung 2017-12-05 21:57

难得楼主有心整理!感谢! 支持(0)反对(0)http://pic.cnblogs.com/face/1066524/20170227130418.png
22. ##### #72楼 笔耕不辍 2017-12-06 10:34

真的是很赞。。markmark。。 支持(0)反对(0)
23. ##### #73楼38616902017/12/9 13:54:10 雨落忧伤 2017-12-09 13:54

非常好 支持(0)反对(0)http://pic.cnblogs.com/face/856389/20171120165836.png

< Prev12 刷新评论刷新页面返回顶部 注册用户登录后才能发表评论,请 登录 或 注册,访问网站首页。 【推荐】超50万VC++源码: 大型工控、组态\仿真、建模CAD源码2018!
【推荐】怎样购买腾讯云服务器更划算?
SpreadJS2_1206 最新IT新闻:
· 来给乐视网巨亏算笔账 一次亏掉300家创业板公司利润
· 火爆朋友圈的众筹民宿,为什么我不看好它?
· Facebook用户增长放缓 高管:这只是暂时现象
· 假如iPhone X“遇害”,真凶是谁?
· 有多少中产已经陷入“中等收入陷阱”?
» 更多新闻… 阿里云C2-1208 最新知识库文章:
· 领域驱动设计在互联网业务开发中的实践
· 步入云计算
· 以操作系统的角度述说线程与进程
· 软件测试转型之路
· 门内门外看招聘
» 更多知识库文章…
About


昵称:吴秦
园龄:8年3个月
荣誉:推荐博客
粉丝:3504
关注:
18 +加关注
最新随笔


  • Unity3D手游开发实践
  • Unity3D shader简介
  • PyQt5应用与实践
  • Nginx + CGI/FastCGI + C/Cpp
  • Nginx安装与使用
  • 优雅的使用Python之软件管理
  • 优雅的使用python之环境管理
  • SpriteSheet精灵动画引擎
  • 【译】AS3利用CPU缓存
  • 走在网页游戏开发的路上(十一)
  • 自定义路径创建Cocos2d-x项目
  • C++静态库与动态库
  • C++对象模型
  • Python应用与实践
  • PureMVC(AS3)剖析:设计模式(二)

最新评论

  • Re:C++静态库与动态库
    [@viola

你好,你在引用第三方库的时候有没有编译错误,我现在就是有cannot open fiel ‘C:\Users\Administrator\Documents\Visual.obj,
– Shun_Voice](http://www.cnblogs.com/skynet/p/3372855.html#3888143)

  • Re:C++静态库与动态库
    那个我引用第三方静态库的时候出现错误,错误如下
    cannot open fiel ‘C:\Users\Administrator\Documents\Visual.obj,
    – Shun_Voice
  • Re:C++静态库与动态库
    2017看多最好的文章了!!!棒!
    – 凤梨酥007
  • Re:Linux Socket编程(不限Linux)
    非常好
    – 雨落忧伤
  • Re:Linux Socket编程(不限Linux)
    真的是很赞。。markmark。。
    – 笔耕不辍

随笔档案

  • 2016年4月(1)
  • 2015年8月(1)
  • 2015年1月(1)
  • 2014年12月(3)
  • 2014年11月(1)
  • 2014年2月(3)
  • 2013年11月(1)
  • 2013年10月(1)
  • 2013年9月(1)
  • 2013年5月(1)
  • 2013年3月(2)
  • 2013年2月(2)
  • 2013年1月(2)
  • 2012年12月(4)
  • 2012年11月(1)
  • 2012年8月(1)
  • 2012年4月(1)
  • 2012年3月(2)
  • 2012年1月(1)
  • 2011年7月(1)
  • 2011年6月(5)
  • 2011年5月(3)
  • 2011年3月(2)
  • 2011年2月(1)
  • 2011年1月(2)
  • 2010年12月(6)
  • 2010年10月(1)
  • 2010年9月(4)
  • 2010年7月(12)
  • 2010年6月(4)
  • 2010年5月(14)
  • 2010年4月(12)
  • 2010年3月(10)

日历

| | < | 2010年12月 | > | | — | — | — | |
| — | — | — | — |
| 日 | 一 | 二 | 三 | 四 | 五 | 六 |
| 28 | 29 | 30 | 1 | 2 | 3 | 4 |
| 5 | 6 | 7 | 8 | 9 | 10 | 11 |
| 12 | 13 | 14 | 15 | 16 | 17 | 18 |
| 19 | 20 | 21 | 22 | 23 | 24 | 25 |
| 26 | 27 | 28 | 29 | 30 | 31 | 1 |
| 2 | 3 | 4 | 5 | 6 | 7 | 8 |

随笔分类

  • .NET 2.0配置解谜系列(9)
  • .NET(C#) Internals (10)
  • 【日常小记】(3)
  • 【转载】(2)
  • Android开发之旅(18)
  • as3(1)
  • C/C++ Internals(15)
  • cocos2d-x(1)
  • JavaScript(1)
  • nginx(2)
  • PureMVC(AS3)剖析(5)
  • Python(5)
  • Unity3D(2)
  • Unix/Linux下编程(8)
  • 服务器开发(3)
  • 基于AIR Android应用开发(1)
  • 客户端开发(1)
  • 数据库(4)
  • 网页游戏开发(23)
  • 源码剖析:DotText源码学习(2)
  • 源码剖析:Mongoose(5)

推荐排行榜

  • 1. 字符集和字符编码(Charset & Encoding)(170)
  • 2. HTTP协议及其POST与GET操作差异 & C#中如何使用POST、GET等(140)
  • 3. Android开发之旅:环境搭建及HelloWorld(138)
  • 4. Linux Socket编程(不限Linux)(129)
  • 5. 浏览器缓存机制(87)
  • 6. Unity3D手游开发实践(83)
  • 7. C++静态库与动态库(75)
  • 8. HTTP Keep-Alive模式(60)
  • 9. Linux多线程编程(不限Linux)(58)
  • 10. C++项目中的extern “C” {}(51)

阅读排行榜

  • 1. Android开发之旅:环境搭建及HelloWorld(1065239)
  • 2. Nginx安装与使用(266235)
  • 3. Linux Socket编程(不限Linux)(264619)
  • 4. 字符集和字符编码(Charset & Encoding)(226355)
  • 5. 【日常小记】linux中强大且常用命令:find、grep(125537)
  • 6. C++的函数重载(114479)
  • 7. Android开发之旅:android架构(106403)
  • 8. Android 开发之旅:view的几种布局方式及实践(100615)
  • 9. C++静态库与动态库(99245)
  • 10. C/C++内存泄漏及检测(93857)

系列索引帖

  • .NET 2.0配置解谜系列索引(完结)

www.spiga.com.mx

Copyright ©2018 吴秦

博客园

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Spring Boot中使用LDAP来统一管理用户信息

发表于 2018-01-30

很多时候,我们在构建系统的时候都会自己创建用户管理体系,这对于开发人员来说并不是什么难事,但是当我们需要维护多个不同系统并且相同用户跨系统使用的情况下,如果每个系统维护自己的用户信息,那么此时用户信息的同步就会变的比较麻烦,对于用户自身来说也会非常困扰,很容易出现不同系统密码不一致啊等情况出现。如果此时我们引入LDAP来集中存储用户的基本信息并提供统一的读写接口和校验机制,那么这样的问题就比较容易解决了。下面就来说说当我们使用Spring Boot开发的时候,如何来访问LDAP服务端。

LDAP简介

LDAP(轻量级目录访问协议,Lightweight Directory Access Protocol)是实现提供被称为目录服务的信息服务。目录服务是一种特殊的数据库系统,其专门针对读取,浏览和搜索操作进行了特定的优化。目录一般用来包含描述性的,基于属性的信息并支持精细复杂的过滤能力。目录一般不支持通用数据库针对大量更新操作操作需要的复杂的事务管理或回卷策略。而目录服务的更新则一般都非常简单。这种目录可以存储包括个人信息、web链结、jpeg图像等各种信息。为了访问存储在目录中的信息,就需要使用运行在TCP/IP
之上的访问协议—LDAP。

LDAP目录中的信息是是按照树型结构组织,具体信息存储在条目(entry)的数据结构中。条目相当于关系数据库中表的记录;条目是具有区别名DN (Distinguished Name)的属性(Attribute),DN是用来引用条目的,DN相当于关系数据库表中的关键字(Primary Key)。属性由类型(Type)和一个或多个值(Values)组成,相当于关系数据库中的字段(Field)由字段名和数据类型组成,只是为了方便检索的需要,LDAP中的Type可以有多个Value,而不是关系数据库中为降低数据的冗余性要求实现的各个域必须是不相关的。LDAP中条目的组织一般按照地理位置和组织关系进行组织,非常的直观。LDAP把数据存放在文件中,为提高效率可以使用基于索引的文件数据库,而不是关系数据库。类型的一个例子就是mail,其值将是一个电子邮件地址。

LDAP的信息是以树型结构存储的,在树根一般定义国家(c=CN)或域名(dc=com),在其下则往往定义一个或多个组织 (organization)(o=Acme)或组织单元(organizational units) (ou=People)。一个组织单元可能包含诸如所有雇员、大楼内的所有打印机等信息。此外,LDAP支持对条目能够和必须支持哪些属性进行控制,这是有一个特殊的称为对象类别(objectClass)的属性来实现的。该属性的值决定了该条目必须遵循的一些规则,其规定了该条目能够及至少应该包含哪些属性。例如:inetorgPerson对象类需要支持sn(surname)和cn(common
name)属性,但也可以包含可选的如邮件,电话号码等属性。

LDAP简称对应

  • o:organization(组织-公司)
  • ou:organization unit(组织单元-部门)
  • c:countryName(国家)
  • dc:domainComponent(域名)
  • sn:suer name(真实名称)
  • cn:common name(常用名称)

以上内容参考自:LDAP快速入门

入门示例

在了解了LDAP的基础概念之后,我们通过一个简单例子进一步理解!

  • 创建一个基础的Spring Boot项目(如果您还不会,可以参考这两篇文章:入门1或入门2)
  • 在pom.xml中引入两个重要依赖
1
2
3
4
5
6
7
8
9
10
复制代码<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-ldap</artifactId>
</dependency>

<dependency>
<groupId>com.unboundid</groupId>
<artifactId>unboundid-ldapsdk</artifactId>
<scope>test</scope>
</dependency>

其中,spring-boot-starter-data-ldap是Spring Boot封装的对LDAP自动化配置的实现,它是基于spring-data-ldap来对LDAP服务端进行具体操作的。

而unboundid-ldapsdk主要是为了在这里使用嵌入式的LDAP服务端来进行测试操作,所以scope设置为了test,实际应用中,我们通常会连接真实的、独立部署的LDAP服务器,所以不需要此项依赖。

  • 在src/test/resources目录下创建ldap-server.ldif文件,用来存储LDAP服务端的基础数据,以备后面的程序访问之用。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码dn: dc=didispace,dc=com
objectClass: top
objectClass: domain

dn: ou=people,dc=didispace,dc=com
objectclass: top
objectclass: organizationalUnit
ou: people

dn: uid=ben,ou=people,dc=didispace,dc=com
objectclass: top
objectclass: person
objectclass: organizationalPerson
objectclass: inetOrgPerson
cn: didi
sn: zhaiyongchao
uid: didi
userPassword: {SHA}nFCebWjxfaLbHHG1Qk5UU4trbvQ=

这里创建了一个基础用户,真实姓名为zhaiyongchao,常用名didi,在后面的程序中,我们会来读取这些信息。更多内容解释大家可以深入学习LDAP来理解,这里不做过多的讲解。

  • 在application.properties中添加嵌入式LDAP的配置
1
2
复制代码spring.ldap.embedded.ldif=ldap-server.ldif
spring.ldap.embedded.base-dn=dc=didispace,dc=com
  • 使用spring-data-ldap的基础用法,定义LDAP中属性与我们Java中定义实体的关系映射以及对应的Repository
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码@Data
@Entry(base = "ou=people,dc=didispace,dc=com", objectClasses = "inetOrgPerson")
public class Person {

@Id
private Name id;
@DnAttribute(value = "uid", index = 3)
private String uid;
@Attribute(name = "cn")
private String commonName;
@Attribute(name = "sn")
private String suerName;
private String userPassword;

}

public interface PersonRepository extends CrudRepository<Person, Name> {

}

通过上面的定义之后,已经将Person对象与LDAP存储内容实现了映射,我们只需要使用PersonRepository就可以轻松的对LDAP内容实现读写。

  • 创建单元测试用例读取所有用户信息:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

@Autowired
private PersonRepository personRepository;

@Test
public void findAll() throws Exception {
personRepository.findAll().forEach(p -> {
System.out.println(p);
});
}
}

启动该测试用例之后,我们可以看到控制台中输出了刚才维护在ldap-server.ldif中的用户信息:

1
2
复制代码2018-01-27 14:25:06.283  WARN 73630 --- [           main] o.s.ldap.odm.core.impl.ObjectMetaData    : The Entry class Person should be declared final
Person(id=uid=ben,ou=people,dc=didispace,dc=com, uid=ben, commonName=didi, suerName=zhaiyongchao, userPassword=123,83,72,65,125,110,70,67,101,98,87,106,120,102,97,76,98,72,72,71,49,81,107,53,85,85,52,116,114,98,118,81,61)

添加用户

通过上面的入门示例,如果您能够独立完成,那么在Spring Boot中操作LDAP的基础目标已经完成了。

如果您足够了解Spring Data,其实不难想到,这个在其下的子项目必然也遵守Repsitory的抽象。所以,我们可以使用上面定义的PersonRepository来轻松实现操作,比如下面的代码就可以方便的往LDAP中添加用户:

1
2
3
4
5
6
复制代码Person person = new Person();
person.setUid("uid:1");
person.setSuerName("AAA");
person.setCommonName("aaa");
person.setUserPassword("123456");
personRepository.save(person);

如果还想实现更多操作,您可以参考spring-data-ldap的文档来进行使用。

连接LDAP服务端

在本文的例子中都采用了嵌入式的LDAP服务器,事实上这种方式也仅限于我们本地测试开发使用,真实环境下LDAP服务端必然是独立部署的。

在Spring Boot的封装下,我们只需要配置下面这些参数就能将上面的例子连接到远端的LDAP而不是嵌入式的LDAP。

1
2
3
4
复制代码spring.ldap.urls=ldap://localhost:1235
spring.ldap.base=dc=didispace,dc=com
spring.ldap.username=didispace
spring.ldap.password=123456

本文代码

可以通过下面两个仓库中查阅chapter3-2-10目录:

  • Github:github.com/dyc87112/Sp…
  • Gitee:gitee.com/didispace/S…

本文首发:blog.didispace.com/spring-boot…

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

【docker实操】使用docker部署一个laravel应

发表于 2018-01-29

最近部署上线一个项目,新的服务器,在生产环境安装配置nginx、php、mysql、git、composer等各种东西一大堆很麻烦。docker已经火了一段时间,已经成了后端和运维的必备技能,之前老大也说要尝试下,于是就来体验一下docker的魅力。本文是我学习并使用docker部署项目的一个记录。

docker优势

docker的主要优势可以总结为:一次构建,到处部署。当我们使用docker构建好项目后,在开发环境、测试环境、生产环境都可以使用统一的镜像来部署,保证了环境的一致。并且docker支持linux、mac os、windows三大平台,代码运行在容器中,不受外部操作系统的影响,只和你的镜像有依赖关系。

另外我们可以部署不同版本的应用,例如mysql5.5、mysql5.7,并且互不干扰。

操作思路

关于docker的安装和相关概念在此都不赘述了,网上有很多资料,说下具体实施的思路。

正确的使用方式不是项目完成后才开始构建docker镜像,而是项目架构、依赖、环境确定了之后就构建和配置好docker镜像,之后代码的开发和运行都在容器之中。之后部署只需要移植镜像生成容器,就能保证环境的一致。

我这个项目很简单,只用到nginx、php、mysql这三件套,nginx使用php-fpm作为cgi解析器。需要使用三个镜像:nginx、php-fpm、mysql。为什么要用三个镜像而不是把这三个集成到一个大镜像中?这是docker推荐的做法,镜像之间尽量解耦,方便复用,避免臃肿的镜像,容器之件通过网络连接起来。

官方提供了很多优质镜像,我们只需要挑选并基于官方镜像来配置就好。

构建镜像

完整文件详见我的github

nginx镜像

DockerFile:

1
2
3
4
5
6
7
复制代码FROM nginx:1.10

ADD nginx.conf /etc/nginx/nginx.conf
ADD site.conf /etc/nginx/conf.d/
ADD mkdir.sh /opt/

RUN chmod +x /opt/mkdir.sh && /opt/mkdir.sh

将基本配置和站点配置复制进去,用shell脚步来创建好log文件夹

php-fpm镜像

DockerFile:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码FROM php:7.0-fpm

ADD apt-list /opt/

RUN cp /etc/apt/sources.list /etc/apt/sources.list.bak \
&& cat /opt/apt-list >/etc/apt/sources.list \
&& apt-get update \
&& apt-get install -y \
libfreetype6-dev \
libjpeg62-turbo-dev \
libmcrypt-dev \
libpng-dev

RUN docker-php-ext-install -j$(nproc) iconv mcrypt \
&& docker-php-ext-configure gd --with-freetype-dir=/usr/include/ --with-jpeg-dir=/usr/include/ \
&& docker-php-ext-install -j$(nproc) gd \
&& docker-php-ext-install pdo_mysql \
&& docker-php-ext-install zip

RUN curl -sS https://getcomposer.org/installer | php -- --install-dir=/usr/local/bin --filename=composer \
&& composer config -g repo.packagist composer https://packagist.phpcomposer.com \

php镜像的是最麻烦的,要装系统依赖,php扩展,composer,还要设置环境变量。php扩展那块参考官方镜像dockr-hub-php

使用了apt,所以我们要修改apt源,官方镜像是基于debian的,不是ubuntu的源。

dockerfile一个run建一层,这里建议apt install作为一层,安装php扩展作为一层,而不是都放在一层。这样的好处就是如果第二层php扩展装失败了,不用再跑一遍apt install。最后一层我们装上composer。

为了让php版本一致,我们应该调用php容器内的php作为cli:docker run -it -v $PWD:/opt/code -w /opt/code my-php php -v

命令太长了,我们可以做个alias,在~/.bash_aliases中加入一行(注意最后空格):alias myphp=’docker run -it -v $PWD:/opt/code -w /opt/code –rm my-php ‘

我们在yourpath/opt/code下创建laravel应用,修改权限,注意在宿主机将你加入www-data group:

1
2
3
复制代码cd ~/docker-php/opt/code
myphp composer create-project --prefer-dist laravel/laravel .
sudo chown -R www-data:www-data . && sudo chmod -R 775 .

mysql镜像

DockerFile:

1
复制代码FROM mysql:5.7

mysql最简单,直接用官方的就行了

docker-compose启动容器

我们现在有三个镜像,如果一个个启动、连接、挂载很麻烦,官方提供了docker-compose工具,将配置写在一起,一键启动。

docker-compose.yml:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
复制代码nginx:
build: ./nginx
ports:
- "80:80"
links:
- "php"
volumes:
- ~/docker-php/opt:/opt

php:
build: ./php
ports:
- "9000:9000"
links:
- "mysql"
volumes:
- ~/docker-php/opt:/opt

mysql:
build: ./mysql
ports:
- "3306:3306"
volumes:
- ~/docker-php/opt/data/mysql:/var/lib/mysql
environment:
MYSQL_ROOT_PASSWORD: 123456

使用docker-compose up命令启动容器组:

启动访问 yourhost:80 可以看到熟悉的laravel欢迎页面:

进入mysql容器建个数据库建个表测试一下数据库连接:docker exec -it dockerphp_mysql_1 bash

修改yourpath/opt/code/routes:

测试了一下报错:SQLSTATE[HY000] [2002] Connection refused

google了一下,修改一下.env,把db_host改为你的ip:DB_HOST=127.0.0.1 -> DB_HOST=yourhost

访问/mysql,页面输出,大功告成:

Reference:

  • docker入门到实践
  • Docker在PHP项目开发环境中的应用

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

Android开源框架源码鉴赏:LruCache与DiskL

发表于 2018-01-28

关于作者

郭孝星,程序员,吉他手,主要从事Android平台基础架构方面的工作,欢迎交流技术方面的问题,可以去我的Github提issue或者发邮件至guoxiaoxingse@163.com与我交流。

文章目录

  • 一 Lru算法
  • 二 LruCache原理分析
    • 2.1 写入缓存
    • 2.2 读取缓存
    • 2.3 删除缓存
  • 三 DiskLruCache原理分析
    • 3.1 写入缓存
    • 3.2 读取缓存
    • 3.3 删除缓存

更多Android开源框架源码分析文章请参见Android open framework analysis。

一 Lru算法

在分析LruCache与DiskLruCache之前,我们先来简单的了解下LRU算法的核心原理。

LRU算法可以用一句话来描述,如下所示:

LRU是Least Recently Used的缩写,最近最久未使用算法,从它的名字就可以看出,它的核心原则是如果一个数据在最近一段时间没有使用到,那么它在将来被
访问到的可能性也很小,则这类数据项会被优先淘汰掉。

LRU算法流程图如下所示:


了解了算法原理,我们来思考一下如果是我们来做,应该如何实现这个算法。从上图可以看出,双向链表是一个好主意。

假设我们从表尾访问数据,在表头删除数据,当访问的数据项在链表中存在时,则将该数据项移动到表尾,否则在表尾新建一个数据项。当链表容量超过一定阈值,则移除表头的数据。

好,以上便是整个Lru算法的原理,我们接着来分析LruCache与DiskLruCache的实现。

二 LruCache原理分析

理解了Lru算法的原理,我们接着从LruCache的使用入手,逐步分析LruCache的源码实现。

👉 LruCache.java

在分析LruCache的源码实现之前,我们先来看看LruCache的简单使用,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码int maxMemorySize = (int) (Runtime.getRuntime().totalMemory() / 1024);
int cacheMemorySize = maxMemorySize / 8;
LruCache<String, Bitmap> lrucache = new LruCache<String, Bitmap>(cacheMemorySize) {

@Override
protected int sizeOf(String key, Bitmap value) {
return getBitmapSize(value);
}

@Override
protected void entryRemoved(boolean evicted, String key, Bitmap oldValue, Bitmap newValue) {
super.entryRemoved(evicted, key, oldValue, newValue);
}

@Override
protected Bitmap create(String key) {
return super.create(key);
}
};

注:getBitmapSize()用来计算图片占内存的大小,具体方法参见附录。

可以发现,在使用LruCache的过程中,需要我们关注的主要有三个方法:

  • sizeOf():覆写此方法实现自己的一套定义计算entry大小的规则。
  • V create(K key):如果key对象缓存被移除了,则调用次方法重建缓存。
  • entryRemoved(boolean evicted, K key, V oldValue, V newValue) :当key对应的缓存被删除时回调该方法。

我们来看看这三个方法的默认实现,如下所示:

1
2
3
4
5
6
7
8
9
10
11
复制代码public class LruCache<K, V> {

//该方法默认返回1,也就是以entry的数量来计算entry的大小,这通常不符合我们的需求,所以我们一般会覆写此方法。
protected int sizeOf(K key, V value) {
return 1;
}
protected void entryRemoved(boolean evicted, K key, V oldValue, V newValue) {}
protected V create(K key) {
return null;
}
}

可以发现entryRemoved()方法为空实现,create()方法也默认返回null。sizeOf()方法默认返回1,也就是以entry的数量来计算entry的大小,这通常不符合我们的需求,所以我们一般会覆写此方法。

我们前面提到,要实现Lru算法,可以利用双向链表。

假设我们从表尾访问数据,在表头删除数据,当访问的数据项在链表中存在时,则将该数据项移动到表尾,否则在表尾新建一个数据项。当链表容量超过一定阈值,则移除表头的数据。

LruCache使用的是LinkedHashMap,为什么会选择LinkedHashMap呢?🤔

这跟LinkedHashMap的特性有关,LinkedHashMap的构造函数里有个布尔参数accessOrder,当它为true时,LinkedHashMap会以访问顺序为序排列元素,否则以插入顺序为序排序元素。

1
2
3
4
5
6
7
8
复制代码public class LruCache<K, V> {
public LinkedHashMap(int initialCapacity,
float loadFactor,
boolean accessOrder) {
super(initialCapacity, loadFactor);
this.accessOrder = accessOrder;
}
}

我们来写个小例子验证一下。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
复制代码Map<Integer, Integer> map = new LinkedHashMap<>(5, 0.75F, true);
map.put(1, 1);
map.put(2, 2);
map.put(3, 3);
map.put(4, 4);
map.put(5, 5);

Log.d(TAG, "before visit");

for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
Log.d(TAG, String.valueOf(entry.getValue()));
}

//访问3,4两个元素
map.get(3);
map.get(4);

Log.d(TAG, "after visit");

for (Map.Entry<Integer, Integer> entry : map.entrySet()) {
Log.d(TAG, String.valueOf(entry.getValue()));
}

程序输入Log:


注:在LinkedHashMap中最近被方位的元素会被移动到表尾,LruCache也是从从表尾访问数据,在表头删除数据,

可以发现,最后访问的数据就会被移动最尾端,这是符合我们的预期的。所有在LruCache的构造方法中构造了一个这样的LinkedHashMap。

1
2
3
4
5
6
7
复制代码public LruCache(int maxSize) {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize <= 0");
}
this.maxSize = maxSize;
this.map = new LinkedHashMap<K, V>(0, 0.75f, true);
}

我们再来看看LruCache是如何进行缓存的写入、获取和删除的。

2.1 写入缓存

写入缓存是通过LruCache的put()方法实现的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
复制代码public class LruCache<K, V> {

public final V put(K key, V value) {
if (key == null || value == null) {
throw new NullPointerException("key == null || value == null");
}

V previous;
//加锁,线程安全
synchronized (this) {
//插入的数量自增
putCount++;
//利用我们提供的sizeOf()方法计算当前项的大小,并增加已有缓存size的大小
size += safeSizeOf(key, value);
//插入当前项、
previous = map.put(key, value);
//previous如果不为空,则说明该项在原来的链表中以及存在,已有缓存大小size恢复到
//以前的大小
if (previous != null) {
size -= safeSizeOf(key, previous);
}
}

//回调entryRemoved()方法
if (previous != null) {
entryRemoved(false, key, previous, value);
}

//调整缓存大小,如果缓存满了,则按照Lru算法删除对应的项。
trimToSize(maxSize);
return previous;
}

public void trimToSize(int maxSize) {
//开启死循环,知道缓存不满为止
while (true) {
K key;
V value;
synchronized (this) {
//参数检查
if (size < 0 || (map.isEmpty() && size != 0)) {
throw new IllegalStateException(getClass().getName()
+ ".sizeOf() is reporting inconsistent results!");
}

//如果缓存为满,直接返回
if (size <= maxSize) {
break;
}

//返回最近最久未使用的元素,也就是链表的表头元素
Map.Entry<K, V> toEvict = map.eldest();
if (toEvict == null) {
break;
}

key = toEvict.getKey();
value = toEvict.getValue();
//删除该表头元素
map.remove(key);
//减少总缓存大小
size -= safeSizeOf(key, value);
//被删除的项的数量自增
evictionCount++;
}
//回到entryRemoved()方法
entryRemoved(true, key, value, null);
}
}
}

整个插入元素的方法put()实现逻辑是很简单的,如下所示:

  1. 插入元素,并相应增加当前缓存的容量。
  2. 调用trimToSize()开启一个死循环,不断的从表头删除元素,直到当前缓存的容量小于最大容量为止。

2.2 读取缓存

读取缓存是通过LruCache的get()方法实现的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
复制代码public class LruCache<K, V> {

public final V get(K key) {
if (key == null) {
throw new NullPointerException("key == null");
}

V mapValue;
synchronized (this) {
//调用LinkedHashMap的get()方法,注意如果该元素存在,且accessOrder为true,这个方法会
//将该元素移动到表尾
mapValue = map.get(key);
if (mapValue != null) {
hitCount++;
return mapValue;
}
//
missCount++;
}

//前面我们就提到过,可以覆写create()方法,当获取不到和key对应的元素时,尝试调用create()方法
//创建建元素,以下就是创建的过程,和put()方法流程相同。
V createdValue = create(key);
if (createdValue == null) {
return null;
}

synchronized (this) {
createCount++;
mapValue = map.put(key, createdValue);

if (mapValue != null) {
// There was a conflict so undo that last put
map.put(key, mapValue);
} else {
size += safeSizeOf(key, createdValue);
}
}

if (mapValue != null) {
entryRemoved(false, key, createdValue, mapValue);
return mapValue;
} else {
trimToSize(maxSize);
return createdValue;
}
}
}

获取元素的逻辑如下所示:

  1. 调用LinkedHashMap的get()方法,注意如果该元素存在,且accessOrder为true,这个方法会将该元素移动到表尾.
  2. 当获取不到和key对应的元素时,尝试调用create()方法创建建元素,以下就是创建的过程,和put()方法流程相同。

2.3 删除缓存

删除缓存是通过LruCache的remove()方法实现的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
复制代码public class LruCache<K, V> {

public final V remove(K key) {
if (key == null) {
throw new NullPointerException("key == null");
}

V previous;
synchronized (this) {
//调用对应LinkedHashMap的remove()方法删除对应元素
previous = map.remove(key);
if (previous != null) {
size -= safeSizeOf(key, previous);
}
}

if (previous != null) {
entryRemoved(false, key, previous, null);
}

return previous;
}

}

删除元素的逻辑就比较简单了,调用对应LinkedHashMap的remove()方法删除对应元素。

三 DiskLruCache原理分析

👉 DiskLruCache.java

在分析DiskLruCache的实现原理之前,我们先来写个简单的小例子,从例子出发去分析DiskLruCache的实现原理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
复制代码File directory = getCacheDir();
int appVersion = 1;
int valueCount = 1;
long maxSize = 10 * 1024;
DiskLruCache diskLruCache = DiskLruCache.open(directory, appVersion, valueCount, maxSize);

DiskLruCache.Editor editor = diskLruCache.edit(String.valueOf(System.currentTimeMillis()));
BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(editor.newOutputStream(0));
Bitmap bitmap = BitmapFactory.decodeResource(getResources(), R.drawable.scenery);
bitmap.compress(Bitmap.CompressFormat.JPEG, 100, bufferedOutputStream);

editor.commit();
diskLruCache.flush();
diskLruCache.close();

这个就是DiskLruCache的大致使用流程,我们来看看这个入口方法的实现,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
复制代码public final class DiskLruCache implements Closeable {

public static DiskLruCache open(File directory, int appVersion, int valueCount, long maxSize)
throws IOException {
if (maxSize <= 0) {
throw new IllegalArgumentException("maxSize <= 0");
}
if (valueCount <= 0) {
throw new IllegalArgumentException("valueCount <= 0");
}

File backupFile = new File(directory, JOURNAL_FILE_BACKUP);
//如果备份文件存在
if (backupFile.exists()) {
File journalFile = new File(directory, JOURNAL_FILE);
// 如果journal文件存在,则把备份文件journal.bkp是删了
if (journalFile.exists()) {
backupFile.delete();
} else {
//如果journal文件不存在,则将备份文件命名为journal
renameTo(backupFile, journalFile, false);
}
}

DiskLruCache cache = new DiskLruCache(directory, appVersion, valueCount, maxSize);

//判断journal文件是否存在
if (cache.journalFile.exists()) {
//如果日志文件以及存在
try {
//读取journal文件,根据记录中不同的操作类型进行相应的处理。
cache.readJournal();
//计算当前缓存容量的大小
cache.processJournal();
cache.journalWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(cache.journalFile, true), Util.US_ASCII));
return cache;
} catch (IOException journalIsCorrupt) {
System.out
.println("DiskLruCache "
+ directory
+ " is corrupt: "
+ journalIsCorrupt.getMessage()
+ ", removing");
cache.delete();
}
}

// Create a new empty cache.
//创建新的缓存目录
directory.mkdirs();
cache = new DiskLruCache(directory, appVersion, valueCount, maxSize);
//调用新的方法建立新的journal文件
cache.rebuildJournal();
return cache;
}
}

先来说一下这个入口方法的四个参数的含义:

  • File directory:缓存目录。
  • int appVersion:应用版本号。
  • int valueCount:一个key对应的缓存文件的数目,如果我们传入的参数大于1,那么缓存文件后缀就是.0,.1等。
  • long maxSize:缓存容量上限。

DiskLruCache的构造方法并没有做别的事情,只是简单的将对应成员变量进行初始化,open()方法主要围绕着journal文件的创建与读写而展开的,如下所示:

  • readJournal():读取journal文件,主要是读取文件头里的信息进行检验,然后调用readJournalLine()逐行去读取,根据读取的内容,执行相应的缓存
    添加、移除等操作。
  • rebuildJournal():重建journal文件,重建journal文件主要是写入文件头(上面提到的journal文件都有的前面五行的内容)。
  • rocessJournal():计算当前缓存容量的大小。

我们接着来分析什么是journal文件,以及它的创建与读写流程。

3.1 journal文件的创建

在前面分析的open()方法中,主要围绕着journal文件的创建和读写来展开的,那么journal文件是什么呢?🤔

我们如果去打开缓存目录,就会发现除了缓存文件,还会发现一个journal文件,journal文件用来记录缓存的操作记录的,如下所示:

1
2
3
4
5
6
7
8
复制代码libcore.io.DiskLruCache
1
1
1

DIRTY 1517126350519
CLEAN 1517126350519 5325928
REMOVE 1517126350519

注:这里的缓存目录是应用的缓存目录/data/data/pckagename/cache,未root的手机可以通过以下命令进入到该目录中或者将该目录整体拷贝出来:

1
2
3
4
5
6
7
8
复制代码
//进入/data/data/pckagename/cache目录
adb shell
run-as com.your.packagename
cp /data/data/com.your.packagename/

//将/data/data/pckagename目录拷贝出来
adb backup -noapk com.your.packagename

我们来分析下这个文件的内容:

  • 第一行:libcore.io.DiskLruCache,固定字符串。
  • 第二行:1,DiskLruCache源码版本号。
  • 第三行:1,App的版本号,通过open()方法传入进去的。
  • 第四行:1,每个key对应几个文件,一般为1.
  • 第五行:空行
  • 第六行及后续行:缓存操作记录。

第六行及后续行表示缓存操作记录,关于操作记录,我们需要了解以下三点:

  1. DIRTY 表示一个entry正在被写入。写入分两种情况,如果成功会紧接着写入一行CLEAN的记录;如果失败,会增加一行REMOVE记录。注意单独只有DIRTY状态的记录是非法的。
  2. 当手动调用remove(key)方法的时候也会写入一条REMOVE记录。
  3. READ就是说明有一次读取的记录。
  4. CLEAN的后面还记录了文件的长度,注意可能会一个key对应多个文件,那么就会有多个数字。

这几种操作对应到DiskLruCache源码中,如下所示:

1
2
3
4
复制代码private static final String CLEAN = "CLEAN";
private static final String DIRTY = "DIRTY";
private static final String REMOVE = "REMOVE";
private static final String READ = "READ";

那么构建一个新的journal文件呢?上面我们也说过这是调用rebuildJournal()方法来完成的。

rebuildJournal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
复制代码public final class DiskLruCache implements Closeable {

static final String MAGIC = "libcore.io.DiskLruCache";

private synchronized void rebuildJournal() throws IOException {
if (journalWriter != null) {
journalWriter.close();
}

Writer writer = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(journalFileTmp), Util.US_ASCII));
try {
//写入文件头
writer.write(MAGIC);
writer.write("\n");
writer.write(VERSION_1);
writer.write("\n");
writer.write(Integer.toString(appVersion));
writer.write("\n");
writer.write(Integer.toString(valueCount));
writer.write("\n");
writer.write("\n");

for (Entry entry : lruEntries.values()) {
if (entry.currentEditor != null) {
writer.write(DIRTY + ' ' + entry.key + '\n');
} else {
writer.write(CLEAN + ' ' + entry.key + entry.getLengths() + '\n');
}
}
} finally {
writer.close();
}

if (journalFile.exists()) {
renameTo(journalFile, journalFileBackup, true);
}
renameTo(journalFileTmp, journalFile, false);
journalFileBackup.delete();

journalWriter = new BufferedWriter(
new OutputStreamWriter(new FileOutputStream(journalFile, true), Util.US_ASCII));
}
}

你可以发现,构建一个新的journal文件过程就是写入文件头的过程,文件头内容包含前面我们说的appVersion、valueCount、空行等五行内容。

我们再来看看如何读取journal文件里的内容。

readJournal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
复制代码public final class DiskLruCache implements Closeable {
private void readJournal() throws IOException {
StrictLineReader reader = new StrictLineReader(new FileInputStream(journalFile), Util.US_ASCII);
try {
//读取文件头,并进行校验。
String magic = reader.readLine();
String version = reader.readLine();
String appVersionString = reader.readLine();
String valueCountString = reader.readLine();
String blank = reader.readLine();
//检查前五行的内容是否合法
if (!MAGIC.equals(magic)
|| !VERSION_1.equals(version)
|| !Integer.toString(appVersion).equals(appVersionString)
|| !Integer.toString(valueCount).equals(valueCountString)
|| !"".equals(blank)) {
throw new IOException("unexpected journal header: [" + magic + ", " + version + ", "
+ valueCountString + ", " + blank + "]");
}

int lineCount = 0;
while (true) {
try {
//开启死循环,逐行读取journal内容
readJournalLine(reader.readLine());
//文件以及读取的行数
lineCount++;
} catch (EOFException endOfJournal) {
break;
}
}
//lineCount表示文件总行数,lruEntries.size()表示最终缓存的个数,redundantOpCount
//就表示非法缓存记录的个数,这些非法缓存记录会被移除掉。
redundantOpCount = lineCount - lruEntries.size();
} finally {
Util.closeQuietly(reader);
}
}

private void readJournalLine(String line) throws IOException {
//每行记录都是用空格开分隔的,这里取第一个空格出现的位置
int firstSpace = line.indexOf(' ');
//如果没有空格,则说明是非法的记录
if (firstSpace == -1) {
throw new IOException("unexpected journal line: " + line);
}

//第一个空格前面就是CLEAN、READ这些操作类型,接下来针对不同的操作类型进行
//相应的处理
int keyBegin = firstSpace + 1;
int secondSpace = line.indexOf(' ', keyBegin);
final String key;
if (secondSpace == -1) {
key = line.substring(keyBegin);
//1. 如果该条记录以REMOVE为开头,则执行删除操作。
if (firstSpace == REMOVE.length() && line.startsWith(REMOVE)) {
lruEntries.remove(key);
return;
}
} else {
key = line.substring(keyBegin, secondSpace);
}

//2. 如果该key不存在,则新建Entry并加入lruEntries。
Entry entry = lruEntries.get(key);
if (entry == null) {
entry = new Entry(key);
lruEntries.put(key, entry);
}

//3. 如果该条记录以CLEAN为开头,则初始化entry,并设置entry.readable为true、设置entry.currentEditor为
//null,初始化entry长度。
//CLEAN 3400330d1dfc7f3f7f4b8d4d803dfcf6 832 21054
if (secondSpace != -1 && firstSpace == CLEAN.length() && line.startsWith(CLEAN)) {
//数组中其实是数字,其实就是文件的大小。因为可以通过valueCount来设置一个key对应的value的个数,
//所以文件大小也是有valueCount个
String[] parts = line.substring(secondSpace + 1).split(" ");
entry.readable = true;
entry.currentEditor = null;
entry.setLengths(parts);
}
//4. 如果该条记录以DIRTY为开头。则设置currentEditor对象。
//DIRTY 335c4c6028171cfddfbaae1a9c313c52
else if (secondSpace == -1 && firstSpace == DIRTY.length() && line.startsWith(DIRTY)) {
entry.currentEditor = new Editor(entry);
}
//5. 如果该条记录以READ为开头,则什么也不做。
else if (secondSpace == -1 && firstSpace == READ.length() && line.startsWith(READ)) {
// This work was already done by calling lruEntries.get().
} else {
throw new IOException("unexpected journal line: " + line);
}
}
}

新来说一下这个lruEntries是什么,如下所示:

1
2
复制代码private final LinkedHashMap<String, Entry> lruEntries =
new LinkedHashMap<String, Entry>(0, 0.75f, true);

就跟上面的LruCache一样,它也是一个以访问顺序为序的LinkedHashMap,可以用它来实现Lru算法。

该方法的逻辑就是根据记录中不同的操作类型进行相应的处理,如下所示:

  1. 如果该条记录以REMOVE为开头,则执行删除操作。
  2. 如果该key不存在,则新建Entry并加入lruEntries。
  3. 如果该条记录以CLEAN为开头,则初始化entry,并设置entry.readable为true、设置entry.currentEditor为null,初始化entry长度。
  4. 如果该条记录以DIRTY为开头。则设置currentEditor对象。
  5. 如果该条记录以READ为开头,则什么也不做。

说了这么多,readJournalLine()方法主要是通过读取journal文件的每一行,然后封装成entry对象,放到了LinkedHashMap集合中。并且根据每一行不同的开头,设置entry的值。也就是说通过读取这
个文件,我们把所有的在本地缓存的文件的key都保存到了集合中,这样我们用的时候就可以通过集合来操作了。

processJournal()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
复制代码public final class DiskLruCache implements Closeable {

private void processJournal() throws IOException {
//删除journal.tmp临时文件
deleteIfExists(journalFileTmp);
//变量缓存集合里的所有元素
for (Iterator<Entry> i = lruEntries.values().iterator(); i.hasNext(); ) {
Entry entry = i.next();
//如果当前元素entry的currentEditor不为空,则计算该元素的总大小,并添加到总缓存容量size中去
if (entry.currentEditor == null) {
for (int t = 0; t < valueCount; t++) {
size += entry.lengths[t];
}
}
//如果当前元素entry的currentEditor不为空,代表该元素时非法缓存记录,该记录以及对应的缓存文件
//都会被删除掉。
else {
entry.currentEditor = null;
for (int t = 0; t < valueCount; t++) {
deleteIfExists(entry.getCleanFile(t));
deleteIfExists(entry.getDirtyFile(t));
}
i.remove();
}
}
}
}

这里提到了一个非常缓存记录,那么什么是非法缓存记录呢?🤔

DIRTY 表示一个entry正在被写入。写入分两种情况,如果成功会紧接着写入一行CLEAN的记录;如果失败,会增加一行REMOVE记录。注意单独只有DIRTY状态的记录是非法的。

该方法主要用来计算当前的缓存总容量,并删除非法缓存记录以及该记录对应的文件。

理解了journal文件的创建以及读写流程,我们来看看硬盘缓存的写入、读取和删除的过程。

3.2 写入缓存

DiskLruCache缓存的写入是通过edit()方法来完成的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
复制代码public final class DiskLruCache implements Closeable {

private synchronized Editor edit(String key, long expectedSequenceNumber) throws IOException {
checkNotClosed();
validateKey(key);
//从之前的缓存中读取对应的entry
Entry entry = lruEntries.get(key);
//当前无法写入磁盘缓存
if (expectedSequenceNumber != ANY_SEQUENCE_NUMBER && (entry == null
|| entry.sequenceNumber != expectedSequenceNumber)) {
return null; // Snapshot is stale.
}

//如果entry为空,则新建一个entry对象加入到缓存集合中
if (entry == null) {
entry = new Entry(key);
lruEntries.put(key, entry);
}
//currentEditor不为空,表示当前有别的插入操作在执行
else if (entry.currentEditor != null) {
return null; // Another edit is in progress.
}

//为当前创建的entry知道新创建的editor
Editor editor = new Editor(entry);
entry.currentEditor = editor;

//向journal写入一行DIRTY + 空格 + key的记录,表示这个key对应的缓存正在处于被编辑的状态。
journalWriter.write(DIRTY + ' ' + key + '\n');
//刷新文件里的记录
journalWriter.flush();
return editor;
}
}

这个方法构建了一个Editor对象,它主要做了两件事情:

  1. 从集合中找到对应的实例(如果没有创建一个放到集合中),然后创建一个editor,将editor和entry关联起来。
  2. 向journal中写入一行操作数据(DITTY 空格 和key拼接的文字),表示这个key当前正处于编辑状态。

我们在前面的DiskLruCache的使用例子中,调用了Editor的newOutputStream()方法创建了一个OutputStream来写入缓存文件。如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
复制代码public final class DiskLruCache implements Closeable {

public InputStream newInputStream(int index) throws IOException {
synchronized (DiskLruCache.this) {
if (entry.currentEditor != this) {
throw new IllegalStateException();
}
if (!entry.readable) {
return null;
}
try {
return new FileInputStream(entry.getCleanFile(index));
} catch (FileNotFoundException e) {
return null;
}
}
}
}

这个方法的形参index就是我们开始在open()方法里传入的valueCount,这个valueCount表示了一个key对应几个value,也就是说一个key对应几个缓存文件。那么现在传入的这个index就表示
要缓存的文件时对应的第几个value。

有了输出流,我们在接着调用Editor的commit()方法就可以完成缓存文件的写入了,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码public final class DiskLruCache implements Closeable {
public void commit() throws IOException {
//如果通过输出流写入缓存文件出错了就把集合中的缓存移除掉
if (hasErrors) {
completeEdit(this, false);
remove(entry.key); // The previous entry is stale.
} else {
//调用completeEdit()方法完成缓存写入。
completeEdit(this, true);
}
committed = true;
}
}

可以看到该方法调用DiskLruCache的completeEdit()方法来完成缓存写入,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
复制代码public final class DiskLruCache implements Closeable {

private synchronized void completeEdit(Editor editor, boolean success) throws IOException {
Entry entry = editor.entry;
if (entry.currentEditor != editor) {
throw new IllegalStateException();
}

// If this edit is creating the entry for the first time, every index must have a value.
if (success && !entry.readable) {
for (int i = 0; i < valueCount; i++) {
if (!editor.written[i]) {
editor.abort();
throw new IllegalStateException("Newly created entry didn't create value for index " + i);
}
if (!entry.getDirtyFile(i).exists()) {
editor.abort();
return;
}
}
}

for (int i = 0; i < valueCount; i++) {
//获取对象缓存的临时文件
File dirty = entry.getDirtyFile(i);
if (success) {
//如果临时文件存在,则将其重名为正式的缓存文件
if (dirty.exists()) {
File clean = entry.getCleanFile(i);
dirty.renameTo(clean);
long oldLength = entry.lengths[i];
long newLength = clean.length();
entry.lengths[i] = newLength;
//重新计算缓存的大小
size = size - oldLength + newLength;
}
} else {
//如果写入不成功,则删除掉临时文件。
deleteIfExists(dirty);
}
}

//操作次数自增
redundantOpCount++;
//将当前缓存的编辑器置为空
entry.currentEditor = null;
if (entry.readable | success) {
//缓存已经写入,设置为可读。
entry.readable = true;
//向journal写入一行CLEAN开头的记录,表示缓存成功写入到磁盘。
journalWriter.write(CLEAN + ' ' + entry.key + entry.getLengths() + '\n');
if (success) {
entry.sequenceNumber = nextSequenceNumber++;
}
} else {
//如果不成功,则从集合中删除掉这个缓存
lruEntries.remove(entry.key);
//向journal文件写入一行REMOVE开头的记录,表示删除了缓存
journalWriter.write(REMOVE + ' ' + entry.key + '\n');
}
journalWriter.flush();

//如果缓存总大小已经超过了设定的最大缓存大小或者操作次数超过了2000次,
// 就开一个线程将集合中的数据删除到小于最大缓存大小为止并重新写journal文件
if (size > maxSize || journalRebuildRequired()) {
executorService.submit(cleanupCallable);
}
}
}

这个方法一共做了以下几件事情:

  1. 如果输出流写入数据成功,就把写入的临时文件重命名为正式的缓存文件
  2. 重新设置当前总缓存的大小
  3. 向journal文件写入一行CLEAN开头的字符(包括key和文件的大小,文件大小可能存在多个 使用空格分开的)
  4. 如果输出流写入失败,就删除掉写入的临时文件,并且把集合中的缓存也删除
  5. 向journal文件写入一行REMOVE开头的字符
  6. 重新比较当前缓存和最大缓存的大小,如果超过最大缓存或者journal文件的操作大于2000条,就把集合中的缓存删除一部分,直到小于最大缓存,重新建立新的journal文件

到这里,缓存的插入流程就完成了。

3.3 读取缓存

读取缓存是由DiskLruCache的get()方法来完成的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
复制代码public final class DiskLruCache implements Closeable {

public synchronized Snapshot get(String key) throws IOException {
checkNotClosed();
validateKey(key);
//获取对应的entry
Entry entry = lruEntries.get(key);
if (entry == null) {
return null;
}

//如果entry不可读,说明可能在编辑,则返回空。
if (!entry.readable) {
return null;
}

//打开所有缓存文件的输入流,等待被读取。
InputStream[] ins = new InputStream[valueCount];
try {
for (int i = 0; i < valueCount; i++) {
ins[i] = new FileInputStream(entry.getCleanFile(i));
}
} catch (FileNotFoundException e) {
// A file must have been deleted manually!
for (int i = 0; i < valueCount; i++) {
if (ins[i] != null) {
Util.closeQuietly(ins[i]);
} else {
break;
}
}
return null;
}

redundantOpCount++;
//向journal写入一行READ开头的记录,表示执行了一次读取操作
journalWriter.append(READ + ' ' + key + '\n');


//如果缓存总大小已经超过了设定的最大缓存大小或者操作次数超过了2000次,
// 就开一个线程将集合中的数据删除到小于最大缓存大小为止并重新写journal文件
if (journalRebuildRequired()) {
executorService.submit(cleanupCallable);
}

//返回一个缓存文件快照,包含缓存文件大小,输入流等信息。
return new Snapshot(key, entry.sequenceNumber, ins, entry.lengths);
}
}

读取操作主要完成了以下几件事情:

  1. 获取对应的entry。
  2. 打开所有缓存文件的输入流,等待被读取。
  3. 向journal写入一行READ开头的记录,表示执行了一次读取操作。
  4. 如果缓存总大小已经超过了设定的最大缓存大小或者操作次数超过了2000次,就开一个线程将集合中的数据删除到小于最大缓存大小为止并重新写journal文件。
  5. 返回一个缓存文件快照,包含缓存文件大小,输入流等信息。

该方法最终返回一个缓存文件快照,包含缓存文件大小,输入流等信息。利用这个快照我们就可以读取缓存文件了。

3.4 删除缓存

删除缓存是由DiskLruCache的remove()方法来完成的,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
复制代码public final class DiskLruCache implements Closeable {

public synchronized boolean remove(String key) throws IOException {
checkNotClosed();
validateKey(key);
//获取对应的entry
Entry entry = lruEntries.get(key);
if (entry == null || entry.currentEditor != null) {
return false;
}

//删除对应的缓存文件,并将缓存大小置为0.
for (int i = 0; i < valueCount; i++) {
File file = entry.getCleanFile(i);
if (file.exists() && !file.delete()) {
throw new IOException("failed to delete " + file);
}
size -= entry.lengths[i];
entry.lengths[i] = 0;
}

redundantOpCount++;
//向journal文件添加一行REMOVE开头的记录,表示执行了一次删除操作。
journalWriter.append(REMOVE + ' ' + key + '\n');
lruEntries.remove(key);


//如果缓存总大小已经超过了设定的最大缓存大小或者操作次数超过了2000次,
// 就开一个线程将集合中的数据删除到小于最大缓存大小为止并重新写journal文件
if (journalRebuildRequired()) {
executorService.submit(cleanupCallable);
}

return true;
}
}

删除操作主要做了以下几件事情:

  1. 获取对应的entry。
  2. 删除对应的缓存文件,并将缓存大小置为0.
  3. 向journal文件添加一行REMOVE开头的记录,表示执行了一次删除操作。
  4. 如果缓存总大小已经超过了设定的最大缓存大小或者操作次数超过了2000次,就开一个线程将集合中的数据删除到小于最大缓存大小为止并重新写journal文件。

好,到这里LrcCache和DiskLruCache的实现原理都讲完了,这两个类在主流的图片框架Fresco、Glide和网络框架Okhttp等都有着广泛的应用,后续的文章后继续分析LrcCache和DiskLruCache
在这些框架里的应用。

附录

图片占用内存大小的计算

Android里面缓存应用最多的场景就是图片缓存了,谁让图片在内存里是个大胖子呢,在做缓存的时候我们经常会去计算图片展内存的大小。

那么如何去获取一张图片占用内存的大小呢?🤔

1
2
3
4
5
6
7
8
9
10
11
12
复制代码private int getBitmapSize(Bitmap bitmap) {
//API 19
if (Build.VERSION.SDK_INT == Build.VERSION_CODES.KITKAT) {
return bitmap.getAllocationByteCount();
}
//API 12
if (Build.VERSION.SDK_INT == Build.VERSION_CODES.HONEYCOMB_MR1) {
return bitmap.getByteCount();
}
// Earlier Version
return bitmap.getRowBytes() * bitmap.getHeight();
}

那么这三个方法处了版本上的差异,具体有什么区别呢?

getRowBytes()返回的是每行的像素值,乘以高度就是总的像素数,也就是占用内存的大小。 getAllocationByteCount()与getByteCount()的返回值一般情况下都是相等的。只是在图片
复用的时候,getAllocationByteCount()返回的是复用图像所占内存的大小,getByteCount()返回的是新解码图片占用内存的大小。

我们来写一个小例子验证一下,如下所示:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
复制代码BitmapFactory.Options options = new BitmapFactory.Options();
options.inDensity = 320;
options.inTargetDensity = 320;
//要实现复用,图像必须是可变的,也就是inMutable为true。
options.inMutable = true;
Bitmap bitmap = BitmapFactory.decodeResource(getResources(), R.drawable.scenery, options);
Log.d(TAG, "bitmap.getAllocationByteCount(): " + String.valueOf(bitmap.getAllocationByteCount()));
Log.d(TAG, "bitmap.getByteCount(): " + String.valueOf(bitmap.getByteCount()));
Log.d(TAG, "bitmap.getRowBytes() * bitmap.getHeight(): " + String.valueOf(bitmap.getRowBytes() * bitmap.getHeight()));

BitmapFactory.Options reuseOptions = new BitmapFactory.Options();
reuseOptions.inDensity = 320;
reuseOptions.inTargetDensity = 320;
//要复用的Bitmap
reuseOptions.inBitmap = bitmap;
//要实现复用,图像必须是可变的,也就是inMutable为true。
reuseOptions.inMutable = true;
Bitmap reuseBitmap = BitmapFactory.decodeResource(getResources(), R.drawable.scenery_reuse, reuseOptions);
Log.d(TAG, "reuseBitmap.getAllocationByteCount(): " + String.valueOf(reuseBitmap.getAllocationByteCount()));
Log.d(TAG, "reuseBitmap.getByteCount(): " + String.valueOf(reuseBitmap.getByteCount()));
Log.d(TAG, "reuseBitmap.getRowBytes() * reuseBitmap.getHeight(): " + String.valueOf(reuseBitmap.getRowBytes() * reuseBitmap.getHeight()));

运行的log如下所示:


可以发现reuseBitmap的getAllocationByteCount()和getByteCount()返回不一样,getAllocationByteCount()返回的是复用bitmap占用内存的大小,
getByteCount()返回的是新的reuseOptions实际解码占用的内存大小。

注意在复用图片的时候,options.inMutable必须设置为true,否则无法进行复用,如下所示:

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

深入浅出NIO之Selector实现原理

发表于 2018-01-26

前言

Java NIO 由以下几个核心部分组成:
1、Buffer
2、Channel
3、Selector
Buffer和Channel在深入浅出NIO之Channel、Buffer一文中已经介绍过,本文主要讲解NIO的Selector实现原理。
之前进行socket编程时,accept方法会一直阻塞,直到有客户端请求的到来,并返回socket进行相应的处理。整个过程是流水线的,处理完一个请求,才能去获取并处理后面的请求,当然也可以把获取socket和处理socket的过程分开,一个线程负责accept,一个线程池负责处理请求。
但NIO提供了更好的解决方案,采用选择器(Selector)返回已经准备好的socket,并按顺序处理,基于通道(Channel)和缓冲区(Buffer)来进行数据的传输。

如果你也想在IT行业拿高薪,可以参加我们的训练营课程,选择最适合自己的课程学习,技术大牛亲授,7个月后,进入名企拿高薪。我们的课程内容有:Java工程化、高性能及分布式、高性能、深入浅出。高架构。性能调优、Spring,MyBatis,Netty源码分析和大数据等多个知识点。如果你想拿高薪的,想学习的,想就业前景好的,想跟别人竞争能取得优势的,想进阿里面试但担心面试不过的,你都可以来,群号为:575745314

Selector

这里出来一个新概念,selector,具体是一个什么样的东西?
想想一个场景:在一个养鸡场,有这么一个人,每天的工作就是不停检查几个特殊的鸡笼,如果有鸡进来,有鸡出去,有鸡生蛋,有鸡生病等等,就把相应的情况记录下来,如果鸡场的负责人想知道情况,只需要询问那个人即可。
在这里,这个人就相当Selector,每个鸡笼相当于一个SocketChannel,每个线程通过一个Selector可以管理多个SocketChannel。

为了实现Selector管理多个SocketChannel,必须将具体的SocketChannel对象注册到Selector,并声明需要监听的事件(这样Selector才知道需要记录什么数据),一共有4种事件:

1、connect:客户端连接服务端事件,对应值为SelectionKey.OP_CONNECT(8)
2、accept:服务端接收客户端连接事件,对应值为SelectionKey.OP_ACCEPT(16)
3、read:读事件,对应值为SelectionKey.OP_READ(1)
4、write:写事件,对应值为SelectionKey.OP_WRITE(4)
这个很好理解,每次请求到达服务器,都是从connect开始,connect成功后,服务端开始准备accept,准备就绪,开始读数据,并处理,最后写回数据返回。
所以,当SocketChannel有对应的事件发生时,Selector都可以观察到,并进行相应的处理。

服务端代码

为了更好的理解,先看一段服务端的示例代码

服务端操作过程

1、创建ServerSocketChannel实例,并绑定指定端口;
2、创建Selector实例;
3、将serverSocketChannel注册到selector,并指定事件OP_ACCEPT,最底层的socket通过channel和selector建立关联;
4、如果没有准备好的socket,select方法会被阻塞一段时间并返回0;
5、如果底层有socket已经准备好,selector的select方法会返回socket的个数,而且selectedKeys方法会返回socket对应的事件(connect、accept、read
or write);
6、根据事件类型,进行不同的处理逻辑;

在步骤3中,selector只注册了serverSocketChannel的OP_ACCEPT事件
1、如果有客户端A连接服务,执行select方法时,可以通过serverSocketChannel获取客户端A的socketChannel,并在selector上注册socketChannel的OP_READ事件。
2、如果客户端A发送数据,会触发read事件,这样下次轮询调用select方法时,就能通过socketChannel读取数据,同时在selector上注册该socketChannel的OP_WRITE事件,实现服务器往客户端写数据。

Selector实现原理

SocketChannel、ServerSocketChannel和Selector的实例初始化都通过SelectorProvider类实现,其中Selector是整个NIO Socket的核心实现。

SelectorProvider在windows和linux下有不同的实现,provider方法会返回对应的实现。
这里不禁要问,Selector是如何做到同时管理多个socket?
下面我们看看Selector的具体实现,Selector初始化时,会实例化PollWrapper、SelectionKeyImpl数组和Pipe。

pollWrapper用Unsafe类申请一块物理内存pollfd,存放socket句柄fdVal和events,其中pollfd共8位,0-3位保存socket句柄,4-7位保存events。

pollWrapper提供了fdVal和event数据的相应操作,如添加操作通过Unsafe的putInt和putShort实现。

先看看serverChannel.register(selector, SelectionKey.OP_ACCEPT)是如何实现的

1:如果该channel和selector已经注册过,则直接添加事件和附件。
2:否则通过selector实现注册过程。

1、以当前channel和selector为参数,初始化SelectionKeyImpl 对象selectionKeyImpl ,并添加附件attachment。
2、如果当前channel的数量totalChannels等于SelectionKeyImpl数组大小,对SelectionKeyImpl数组和pollWrapper进行扩容操作。
3、如果totalChannels % MAX_SELECTABLE_FDS == 0,则多开一个线程处理selector。
4、pollWrapper.addEntry将把selectionKeyImpl中的socket句柄添加到对应的pollfd。
5、k.interestOps(ops)方法最终也会把event添加到对应的pollfd。

所以,不管serverSocketChannel,还是socketChannel,在selector注册的事件,最终都保存在pollArray中。

接着,再来看看selector中的select是如何实现一次获取多个有事件发生的channel的,底层由selector实现类的doSelect方法实现,如下:

其中 subSelector.poll() 是select的核心,由native函数poll0实现,readFds、writeFds 和exceptFds数组用来保存底层select的结果,数组的第一个位置都是存放发生事件的socket的总数,其余位置存放发生事件的socket句柄fd。

执行 selector.select() ,poll0函数把指向socket句柄和事件的内存地址传给底层函数。
1、如果之前没有发生事件,程序就阻塞在select处,当然不会一直阻塞,因为epoll在timeout时间内如果没有事件,也会返回;
2、一旦有对应的事件发生,poll0方法就会返回;
3、processDeregisterQueue方法会清理那些已经cancelled的SelectionKey;
4、updateSelectedKeys方法统计有事件发生的SelectionKey数量,并把符合条件发生事件的SelectionKey添加到selectedKeys哈希表中,提供给后续使用。

在早期的JDK1.4和1.5 update10版本之前,Selector基于select/poll模型实现,是基于IO复用技术的非阻塞IO,不是异步IO。在JDK1.5 update10和linux core2.6以上版本,sun优化了Selctor的实现,底层使用epoll替换了select/poll。

read实现

通过遍历selector中的SelectionKeyImpl数组,获取发生事件的socketChannel对象,其中保存了对应的socket,实现如下

最终通过Buffer的方式读取socket的数据。

wakeup实现

看来wakeupSinkFd这个变量是为wakeup方法使用的。其中interruptTriggered为中断已触发标志,当pollWrapper.interrupt()之后,该标志即为true了;因为这个标志,连续两次wakeup,只会有一次效果。

epoll原理

epoll是Linux下的一种IO多路复用技术,可以非常高效的处理数以百万计的socket句柄。

三个epoll相关的系统调用:

1: int epoll_create(int size)
epoll_create建立一个epoll对象。参数size是内核保证能够正确处理的最大句柄数,多于这个最大数时内核可不保证效果。
2: int epoll_ctl(int epfd, int op, int fd, struct epoll_event event)
epoll_ctl可以操作epoll_create创建的epoll,如将socket句柄加入到epoll中让其监控,或把epoll正在监控的某个socket句柄移出epoll。
3: int epoll_wait(int epfd, struct epoll_event events,int maxevents, int timeout)
epoll_wait在调用时,在给定的timeout时间内,所监控的句柄中有事件发生时,就返回用户态的进程。

epoll内部实现大概如下:

1:epoll初始化时,会向内核注册一个文件系统,用于存储被监控的句柄文件,调用epoll_create时,会在这个文件系统中创建一个file节点。同时epoll会开辟自己的内核高速缓存区,以红黑树的结构保存句柄,以支持快速的查找、插入、删除。还会再建立一个list链表,用于存储准备就绪的事件。

2:当执行epoll_ctl时,除了把socket句柄放到epoll文件系统里file对象对应的红黑树上之外,还会给内核中断处理程序注册一个回调函数,告诉内核,如果这个句柄的中断到了,就把它放到准备就绪list链表里。所以,当一个socket上有数据到了,内核在把网卡上的数据copy到内核中后,就把socket插入到就绪链表里。

3:当epoll_wait调用时,仅仅观察就绪链表里有没有数据,如果有数据就返回,否则就sleep,超时时立刻返回。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

开源一个企业官网

发表于 2018-01-26

对于PHP开发中使用框架还好。如果不是使用MVC框架的话,你如果通过代码来操作插入一条记录,你说说需要几行?? 本人在平时的积累的基础上基于PDO封装了数据库操作类。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
复制代码<?php
//header('content-type:text/html;charset=utf-8');
class PdoMySQL{
public static $config=array();//设置连接参数,配置信息
public static $link=null;//保存连接标识符
public static $pconnect=false;//是否开启长连接
public static $dbVersion=null;//保存数据库版本
public static $connected=false;//是否连接成功
public static $PDOStatement=null;//保存PDOStatement对象
public static $queryStr=null;//保存最后执行的操作
public static $error=null;//报错错误信息
public static $lastInsertId=null;//保存上一步插入操作产生AUTO_INCREMENT
public static $numRows=0;//上一步操作产生受影响的记录的条数
/**
* 连接PDO
* @param string $dbConfig
* @return boolean
*/
public function __construct($dbConfig=''){
if(!class_exists("PDO")){
self::throw_exception('不支持PDO,请先开启');
}
if(!is_array($dbConfig)){
$dbConfig=array(
'hostname'=>DB_HOST,
'username'=>DB_USER,
'password'=>DB_PWD,
'database'=>DB_NAME,
'hostport'=>DB_PORT,
'dbms'=>DB_TYPE,
'dsn'=>DB_TYPE.":host=".DB_HOST.";dbname=".DB_NAME
);
}
if(empty($dbConfig['hostname']))self::throw_exception('没有定义数据库配置,请先定义');
self::$config=$dbConfig;
if(empty(self::$config['params']))self::$config['params']=array();
if(!isset(self::$link)){
$configs=self::$config;
if(self::$pconnect){
//开启长连接,添加到配置数组中
$configs['params'][constant("PDO::ATTR_PERSISTENT")]=true;
}
try{
self::$link=new PDO($configs['dsn'],$configs['username'],$configs['password'],$configs['params']);
}catch(PDOException $e){
self::throw_exception($e->getMessage());
}
if(!self::$link){
self::throw_exception('PDO连接错误');
return false;
}
self::$link->exec('SET NAMES '.DB_CHARSET);
self::$dbVersion=self::$link->getAttribute(constant("PDO::ATTR_SERVER_VERSION"));
self::$connected=true;
unset($configs);
}
}
/**
* 得到所有记录
* @param string $sql
* @return unknown
*/
public static function getAll($sql=null){
if($sql!=null){
self::query($sql);
}
$result=self::$PDOStatement->fetchAll(constant("PDO::FETCH_ASSOC"));
return $result;
}
/**
* 得到结果集中的一条记录
* @param string $sql
* @return mixed
*/
public static function getRow($sql=null){
if($sql!=null){
self::query($sql);
}
$result=self::$PDOStatement->fetch(constant("PDO::FETCH_ASSOC"));
return $result;
}
/**
* 根据主键查找记录
* @param string $tabName
* @param int $priId
* @param string $fields
* @return mixed
*/
public static function findById($tabName,$priId,$fields='*'){
$sql='SELECT %s FROM %s WHERE id=%d';
return self::getRow(sprintf($sql,self::parseFields($fields),$tabName,$priId));
}
/**
* 执行普通查询
* @param unknown $tables
* @param string $where
* @param string $fields
* @param string $group
* @param string $having
* @param string $order
* @param string $limit
* @return Ambigous <unknown, unknown, multitype:>
*/
public static function find($tables,$where=null,$fields='*',$group=null,$having=null,$order=null,$limit=null){
$sql='SELECT '.self::parseFields($fields).' FROM '.'`'.$tables.'`'
.self::parseWhere($where)
.self::parseGroup($group)
.self::parseHaving($having)
.self::parseOrder($order)
.self::parseLimit($limit);
// echo $sql;
$dataAll=self::getAll($sql);
// return count($dataAll)==1?$dataAll[0]:$dataAll;
return $dataAll;
}
/**
* 添加记录的操作
* @param array $data
* @param string $table
* @return Ambigous <boolean, unknown, number>
*/
public static function add($data,$table){
$keys=array_keys($data);
array_walk($keys,array('PdoMySQL','addSpecialChar'));
$fieldsStr=join(',',$keys);
$values="'".join("','",array_values($data))."'";
$sql="INSERT INTO `{$table}`({$fieldsStr}) VALUES({$values})";
// echo $sql;
return self::execute($sql);
}
/**
* 更新记录
* @param array $data
* @param string $table
* @param string $where
* @param string $order
* @param string $limit
* @return Ambigous <boolean, unknown, number>
*/
public static function update($data,$table,$where=null,$order=null,$limit=0){
foreach($data as $key=>$val){
$sets.="`".$key."`='".$val."',";
}
$sets=rtrim($sets,',');
$sql="UPDATE {$table} SET {$sets} ".self::parseWhere($where).self::parseOrder($order).self::parseLimit($limit);
// echo $sql;
return self::execute($sql);
}
/**
* 删除记录的操作
* @param string $table
* @param string $where
* @param string $order
* @param number $limit
* @return Ambigous <boolean, unknown, number>
*/
public static function delete($table,$where=null,$order=null,$limit=0){
$sql="DELETE FROM `{$table}` ".self::parseWhere($where).self::parseOrder($order).self::parseLimit($limit);
return self::execute($sql);
}
/**
*
*/
public static function truncate($table){
$sql="TRUNCATE TABLE `{$table}` ";
return self::execute($sql);
}
/**
* 得到最后执行的SQL语句
* @return boolean|Ambigous <string, string>
*/
public static function getLastSql(){
$link=self::$link;
if(!$link)return false;
return self::$queryStr;
}
/**
* 得到上一步插入操作产生AUTO_INCREMENT
* @return boolean|string
*/
public static function getLastInsertId(){
$link=self::$link;
if(!$link)return false;
return self::$lastInsertId;
}
/**
* 得到数据库的版本
* @return boolean|mixed
*/
public static function getDbVerion(){
$link=self::$link;
if(!$link)return false;
return self::$dbVersion;
}
/**
* 得到数据库中数据表
* @return multitype:mixed
*/
public static function showTables(){
$tables=array();
if(self::query("SHOW TABLES")){
$result=self::getAll();
foreach($result as $key=>$val){
$tables[$key]=current($val);
}
}
return $tables;
}
/**
* 解析Where条件
* @param unknown $where
* @return string
*/
public static function parseWhere($where){
$whereStr='';
if(is_string($where)&&!empty($where)){
$whereStr=$where;
}
return empty($whereStr)?'':' WHERE '.$whereStr;
}
/**
* 解析group by
* @param unknown $group
* @return string
*/
public static function parseGroup($group){
$groupStr='';
if(is_array($group)){
$groupStr.=' GROUP BY '.implode(',',$group);
}elseif(is_string($group)&&!empty($group)){
$groupStr.=' GROUP BY '.$group;
}
return empty($groupStr)?'':$groupStr;
}
/**
* 对分组结果通过Having子句进行二次删选
* @param unknown $having
* @return string
*/
public static function parseHaving($having){
$havingStr='';
if(is_string($having)&&!empty($having)){
$havingStr.=' HAVING '.$having;
}
return $havingStr;
}
/**
* 解析Order by
* @param unknown $order
* @return string
*/
public static function parseOrder($order){
$orderStr='';
if(is_array($order)){
$orderStr.=' ORDER BY '.join(',',$order);
}elseif(is_string($order)&&!empty($order)){
$orderStr.=' ORDER BY '.$order;
}
return $orderStr;
}
/**
* 解析限制显示条数limit
* limit 3
* limit 0,3
* @param unknown $limit
* @return unknown
*/
public static function parseLimit($limit){
$limitStr='';
if(is_array($limit)){
if(count($limit)>1){
$limitStr.=' LIMIT '.$limit[0].','.$limit[1];
}else{
$limitStr.=' LIMIT '.$limit[0];
}
}elseif(is_string($limit)&&!empty($limit)){
$limitStr.=' LIMIT '.$limit;
}
return $limitStr;
}
/**
* 解析字段
* @param unknown $fields
* @return string
*/
public static function parseFields($fields){
if(is_array($fields)){
array_walk($fields,array('PdoMySQL','addSpecialChar'));
$fieldsStr=implode(',',$fields);
}elseif(is_string($fields)&&!empty($fields)){
if(strpos($fields,'`')===false){
$fields=explode(',',$fields);
array_walk($fields,array('PdoMySQL','addSpecialChar'));
$fieldsStr=implode(',',$fields);
}else{
$fieldsStr=$fields;
}
}else{
$fieldsStr='*';
}
return $fieldsStr;
}
/**
* 通过反引号引用字段,
* @param unknown $value
* @return string
*/
public static function addSpecialChar(&$value){
if($value==='*'||strpos($value,'.')!==false||strpos($value,'`')!==false){
//不用做处理
}elseif(strpos($value,'`')===false){
$value='`'.trim($value).'`';
}
return $value;
}
/**
* 执行增删改操作,返回受影响的记录的条数
* @param string $sql
* @return boolean|unknown
*/
public static function execute($sql=null){
$link=self::$link;
if(!$link) return false;
self::$queryStr=$sql;
if(!empty(self::$PDOStatement))self::free();
$result=$link->exec(self::$queryStr);
self::haveErrorThrowException();
if($result || $result==0){
self::$lastInsertId=$link->lastInsertId();
self::$numRows=$result;
return self::$numRows;
}else{
return false;
}
}
/**
释放结果集
*/
public static function free(){
self::$PDOStatement=null;
}
public static function query($sql=''){
$link=self::$link;
if(!$link) return false;
//判断之前是否有结果集,如果有的话,释放结果集
if(!empty(self::$PDOStatement))self::free();
self::$queryStr=$sql;
self::$PDOStatement=$link->prepare(self::$queryStr);
$res=self::$PDOStatement->execute();
self::haveErrorThrowException();
return $res;
}
public static function haveErrorThrowException(){
$obj=empty(self::$PDOStatement)?self::$link: self::$PDOStatement;
$arrError=$obj->errorInfo();
//print_r($arrError);
if($arrError[0]!='00000'){
self::$error='SQLSTATE: '.$arrError[0].' <br/>SQL Error: '.$arrError[2].'<br/>Error SQL:'.self::$queryStr;
self::throw_exception(self::$error);
return false;
}
if(self::$queryStr==''){
self::throw_exception('没有执行SQL语句');
return false;
}
}
/**
* 自定义错误处理
* @param unknown $errMsg
*/
public static function throw_exception($errMsg){
echo '<div style="width:80%;background-color:#ABCDEF;color:black;font-size:20px;padding:20px 0px;">
'.$errMsg.'
</div>';
}
/**
* 销毁连接对象,关闭数据库
*/
public static function close(){
self::$link=null;
}
public static function search($table1,$table2,$table3){
$PdoMySQL=new PdoMySQL();
$sql1="select * from ".$table1." where status='0'";
$sql2="select * from ".$table2." where status='0'";
$sql3="select * from ".$table3." where status='0'";
$stmt1=$PdoMySQL->prepare($sql1);
$stmt2=$PdoMySQL->prepare($sql2);
$stmt3=$PdoMySQL->prepare($sql3);
$stmt1->execute();
$stmt2->execute();
$stmt3->execute();
//echo $username.'<hr/>'.$password;
$row1=$stmt1->fetch();
echo $row1[0];
}
}
?>

最后还有一个超级大BOSS,一个公司企业官网,功能强大,有接口封装类、图片上传(单图、多图)、前端 Bootstrap、REST风格的接口。

企业官方网站Pro版

传送门

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

面试必问的volatile,你了解多少?

发表于 2018-01-25

占小狼 转载请注明原创出处,谢谢!

前言

Java中volatile这个热门的关键字,在面试中经常会被提及,在各种技术交流群中也经常被讨论,但似乎讨论不出一个完美的结果,带着种种疑惑,准备从JVM、C++、汇编的角度重新梳理一遍。

volatile的两大特性:禁止重排序、内存可见性,这两个概念,不太清楚的同学可以看这篇文章 -> java volatile关键字解惑

概念是知道了,但还是很迷糊,它们到底是如何实现的?

本文会涉及到一些汇编方面的内容,如果多看几遍,应该能看懂。

重排序

为了理解重排序,先看一段简单的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
复制代码public class VolatileTest {

int a = 0;
int b = 0;

public void set() {
a = 1;
b = 1;
}

public void loop() {
while (b == 0) continue;
if (a == 1) {
System.out.println("i'm here");
} else {
System.out.println("what's wrong");
}
}
}

VolatileTest类有两个方法,分别是set()和loop(),假设线程B执行loop方法,线程A执行set方法,会得到什么结果?

答案是不确定,因为这里涉及到了编译器的重排序和CPU指令的重排序。

编译器重排序

编译器在不改变单线程语义的前提下,为了提高程序的运行速度,可以对字节码指令进行重新排序,所以代码中a、b的赋值顺序,被编译之后可能就变成了先设置b,再设置a。

因为对于线程A来说,先设置哪个,都不影响自身的结果。

CPU指令重排序

CPU指令重排序又是怎么回事?
在深入理解之前,先看看x86的cpu缓存结构。

1、各种寄存器,用来存储本地变量和函数参数,访问一次需要1cycle,耗时小于1ns;
2、L1 Cache,一级缓存,本地core的缓存,分成32K的数据缓存L1d和32k指令缓存L1i,访问L1需要3cycles,耗时大约1ns;
3、L2 Cache,二级缓存,本地core的缓存,被设计为L1缓存与共享的L3缓存之间的缓冲,大小为256K,访问L2需要12cycles,耗时大约3ns;
4、L3 Cache,三级缓存,在同插槽的所有core共享L3缓存,分为多个2M的段,访问L3需要38cycles,耗时大约12ns;

当然了,还有平时熟知的DRAM,访问内存一般需要65ns,所以CPU访问一次内存和缓存比较起来显得很慢。

对于不同插槽的CPU,L1和L2的数据并不共享,一般通过MESI协议保证Cache的一致性,但需要付出代价。

在MESI协议中,每个Cache line有4种状态,分别是:

1、M(Modified)
这行数据有效,但是被修改了,和内存中的数据不一致,数据只存在于本Cache中

2、E(Exclusive)
这行数据有效,和内存中的数据一致,数据只存在于本Cache中

3、S(Shared)
这行数据有效,和内存中的数据一致,数据分布在很多Cache中

4、I(Invalid)
这行数据无效

每个Core的Cache控制器不仅知道自己的读写操作,也监听其它Cache的读写操作,假如有4个Core:
1、Core1从内存中加载了变量X,值为10,这时Core1中缓存变量X的cache line的状态是E;
2、Core2也从内存中加载了变量X,这时Core1和Core2缓存变量X的cache line状态转化成S;
3、Core3也从内存中加载了变量X,然后把X设置成了20,这时Core3中缓存变量X的cache line状态转化成M,其它Core对应的cache line变成I(无效)

当然了,不同的处理器内部细节也是不一样的,比如Intel的core i7处理器使用从MESI中演化出的MESIF协议,F(Forward)从Share中演化而来,一个cache line如果是F状态,可以把数据直接传给其它内核,这里就不纠结了。

CPU在cache line状态的转化期间是阻塞的,经过长时间的优化,在寄存器和L1缓存之间添加了LoadBuffer、StoreBuffer来降低阻塞时间,LoadBuffer、StoreBuffer,合称排序缓冲(Memoryordering Buffers (MOB)),Load缓冲64长度,store缓冲36长度,Buffer与L1进行数据传输时,CPU无须等待。

1、CPU执行load读数据时,把读请求放到LoadBuffer,这样就不用等待其它CPU响应,先进行下面操作,稍后再处理这个读请求的结果。
2、CPU执行store写数据时,把数据写到StoreBuffer中,待到某个适合的时间点,把StoreBuffer的数据刷到主存中。

因为StoreBuffer的存在,CPU在写数据时,真实数据并不会立即表现到内存中,所以对于其它CPU是不可见的;同样的道理,LoadBuffer中的请求也无法拿到其它CPU设置的最新数据;

由于StoreBuffer和LoadBuffer是异步执行的,所以在外面看来,先写后读,还是先读后写,没有严格的固定顺序。

内存可见性如何实现

从上面的分析可以看出,其实是CPU执行load、store数据时的异步性,造成了不同CPU之间的内存不可见,那么如何做到CPU在load的时候可以拿到最新数据呢?

设置volatile变量

写一段简单的java代码,声明一个volatile变量,并赋值

1
2
3
4
5
6
7
8
复制代码public class VolatileTest {

static volatile int i;

public static void main(String[] args){
i = 10;
}
}

这段代码本身没什么意义,只是想看看加了volatile之后,编译出来的字节码有什么不同,执行 javap -verbose VolatileTest 之后,结果如下:

让人很失望,没有找类似关键字synchronize编译之后的字节码指令(monitorenter、monitorexit),volatile编译之后的赋值指令putstatic没有什么不同,唯一不同是变量i的修饰flags多了一个ACC_VOLATILE标识。

不过,我觉得可以从这个标识入手,先全局搜下ACC_VOLATILE,无从下手的时候,先看看关键字在哪里被使用了,果然在accessFlags.hpp文件中找到类似的名字。

通过is_volatile()可以判断一个变量是否被volatile修饰,然后再全局搜”is_volatile”被使用的地方,最后在bytecodeInterpreter.cpp文件中,找到putstatic字节码指令的解释器实现,里面有is_volatile()方法。

当然了,在正常执行时,并不会走这段逻辑,都是直接执行字节码对应的机器码指令,这段代码可以在debug的时候使用,不过最终逻辑是一样的。

其中cache变量是java代码中变量i在常量池缓存中的一个实例,因为变量i被volatile修饰,所以cache->is_volatile()为真,给变量i的赋值操作由release_int_field_put方法实现。

再来看看release_int_field_put方法

内部的赋值动作被包了一层,OrderAccess::release_store究竟做了魔法,可以让其它线程读到变量i的最新值。

奇怪,在OrderAccess::release_store的实现中,第一个参数强制加了一个volatile,很明显,这是c/c++的关键字。

c/c++中的volatile关键字,用来修饰变量,通常用于语言级别的 memory barrier,在”The C++ Programming Language”中,对volatile的描述如下:

A volatile specifier is a hint to a compiler that an object may change its value in ways not specified by the language so that aggressive optimizations must be avoided.

volatile是一种类型修饰符,被volatile声明的变量表示随时可能发生变化,每次使用时,都必须从变量i对应的内存地址读取,编译器对操作该变量的代码不再进行优化,下面写两段简单的c/c++代码验证一下

1
2
3
4
5
6
7
8
9
10
11
复制代码#include <iostream>

int foo = 10;
int a = 1;
int main(int argc, const char * argv[]) {
// insert code here...
a = 2;
a = foo + 10;
int b = a + 20;
return b;
}

代码中的变量i其实是无效的,执行g++ -S -O2 main.cpp得到编译之后的汇编代码如下:

可以发现,在生成的汇编代码中,对变量a的一些无效负责操作果然都被优化掉了,如果在声明变量a时加上volatile

1
2
3
4
5
6
7
8
9
10
11
复制代码#include <iostream>

int foo = 10;
volatile int a = 1;
int main(int argc, const char * argv[]) {
// insert code here...
a = 2;
a = foo + 10;
int b = a + 20;
return b;
}

再次生成汇编代码如下:

和第一次比较,有以下不同:

1、对变量a赋值2的语句,也保留了下来,虽然是无效的动作,所以volatile关键字可以禁止指令优化,其实这里发挥了编译器屏障的作用;

编译器屏障可以避免编译器优化带来的内存乱序访问的问题,也可以手动在代码中插入编译器屏障,比如下面的代码和加volatile关键字之后的效果是一样

1
2
3
4
5
6
7
8
9
10
11
12
13
复制代码#include <iostream>

int foo = 10;
int a = 1;
int main(int argc, const char * argv[]) {
// insert code here...
a = 2;
__asm__ volatile ("" : : : "memory"); //编译器屏障
a = foo + 10;
__asm__ volatile ("" : : : "memory");
int b = a + 20;
return b;
}

编译之后,和上面类似

2、其中_a(%rip)是变量a的每次地址,通过movl $2, _a(%rip)可以把变量a所在的内存设置成2,关于RIP,可以查看 x64下PIC的新寻址方式:RIP相对寻址

所以,每次对变量a的赋值,都会写入到内存中;每次对变量的读取,都会从内存中重新加载。

感觉有点跑偏了,让我们回到JVM的代码中来。

执行完赋值操作后,紧接着执行OrderAccess::storeload(),这又是啥?

其实这就是经常会念叨的内存屏障,之前只知道念,却不知道是如何实现的。从CPU缓存结构分析中已经知道:一个load操作需要进入LoadBuffer,然后再去内存加载;一个store操作需要进入StoreBuffer,然后再写入缓存,这两个操作都是异步的,会导致不正确的指令重排序,所以在JVM中定义了一系列的内存屏障来指定指令的执行顺序。

JVM中定义的内存屏障如下,JDK1.7的实现

1、loadload屏障(load1,loadload, load2)
2、loadstore屏障(load,loadstore, store)

这两个屏障都通过acquire()方法实现

其中__asm__,表示汇编代码的开始。
volatile,之前分析过了,禁止编译器对代码进行优化。
把这段指令编译之后,发现没有看懂….最后的”memory”是编译器屏障的作用。

在LoadBuffer中插入该屏障,清空屏障之前的load操作,然后才能执行屏障之后的操作,可以保证load操作的数据在下个store指令之前准备好

3、storestore屏障(store1,storestore, store2)
通过”release()”方法实现:

在StoreBuffer中插入该屏障,清空屏障之前的store操作,然后才能执行屏障之后的store操作,保证store1写入的数据在执行store2时对其它CPU可见。

4、storeload屏障(store,storeload, load)
对java中的volatile变量进行赋值之后,插入的就是这个屏障,通过”fence()”方法实现:

看到这个有没有很兴奋?

通过os::is_MP()先判断是不是多核,如果只有一个CPU的话,就不存在这些问题了。

storeload屏障,完全由下面这些指令实现

1
复制代码__asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");

为了试验这些指令到底有什么用,我们再写点c++代码编译一下

1
2
3
4
5
6
7
8
9
10
11
12
复制代码#include <iostream>

int foo = 10;

int main(int argc, const char * argv[]) {
// insert code here...
volatile int a = foo + 10;
// __asm__ volatile ("lock; addl $0,0(%%rsp)" : : : "cc", "memory");
volatile int b = foo + 20;

return 0;
}

为了变量a和b不被编译器优化掉,这里使用了volatile进行修饰,编译后的汇编指令如下:

从编译后的代码可以发现,第二次使用foo变量时,没有从内存重新加载,使用了寄存器的值。

把__asm__ volatile ***指令加上之后重新编译

相比之前,这里多了两个指令,一个lock,一个addl。
lock指令的作用是:在执行lock后面指令时,会设置处理器的LOCK#信号(这个信号会锁定总线,阻止其它CPU通过总线访问内存,直到这些指令执行结束),这条指令的执行变成原子操作,之前的读写请求都不能越过lock指令进行重排,相当于一个内存屏障。

还有一个:第二次使用foo变量时,从内存中重新加载,保证可以拿到foo变量的最新值,这是由如下指令实现

1
复制代码__asm__ volatile ( : : : "cc", "memory");

同样是编译器屏障,通知编译器重新生成加载指令(不可以从缓存寄存器中取)。

读取volatile变量

同样在bytecodeInterpreter.cpp文件中,找到getstatic字节码指令的解释器实现。

通过obj->obj_field_acquire(field_offset)获取变量值

最终通过OrderAccess::load_acquire实现

1
复制代码inline jint OrderAccess::load_acquire(volatile jint* p) { return *p; }

底层基于C++的volatile实现,因为volatile自带了编译器屏障的功能,总能拿到内存中的最新值。

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

妖娆的代理工具shadowProxy – 神出鬼没的切换IP

发表于 2018-01-24

前言

在渗透测试过程中,往往会遇到特别“小气”的目标,稍微碰一下就封IP。这种情况下,我们很自然的想到通过网上大量的免费代理进行IP隐匿。

那么问题来了,难道拿到哪些个代理,每用一次手动换下一个代理? 这太像火铳的工作方式了,想想就心累了。

so,小弟就来造一台机关枪,突突突突突… 想想就挺带感。

github.com/odboy/shado…

功能实现

代理功能

主要使用python内建的http.server和http.client库实现。

http.server相关的代码解读可参考我前一篇文章 Python源码分析之从SocketServer到SimpleHTTPServer

主要代理功能代码:

def do_GET(self):

if self.path == 'http://shadow.proxy/':  
    self.send\_cacert()  
    # print("%s download %s" % (self.client\_address, self.cacert))  

  return  

req = self  
content\_length = int(req.headers.get('Content-Length', 0))  
req\_body = self.rfile.read(content\_length) if content\_length else None  

if req.path[0] == ‘/‘:
if isinstance(self.connection, ssl.SSLSocket): # ssl.SSLSocket or ssl.SSLContext
req.path = “https://%s%s” % (req.headers[‘Host’], req.path)

  else:  
        req.path = "http://%s%s" % (req.headers['Host'], req.path)  

u = urlparse(req.path)  
scheme, netloc= u.scheme, u.netloc  

assert scheme in (“http”, “https”)
if netloc:
req.headers[‘Host’] = netloc
setattr(req, ‘headers’, self.filter_headers(req.headers))

retryFlag = 0  

while retryFlag < 10 :
try:
target = (scheme, netloc)
# 输入URL的协议和主机,返回可用的连接HTTP(S)Connection

proxy = proxyCoor.dispatchProxy(target)  
      if proxy is None:  
          print("未能获取到可用Proxy...(可能是Proxy耗尽...)")  

    self.send\_error(502,"proxy resource RUN OUT!!!")  
          return  
      print("%s --> [ %d ] %s" % (proxy, retryFlag +

1, req.path))

        if proxy.split("://")[0] == "http":  
            conn = http.client.HTTPConnection(proxy.split("://")[1], timeout=self.timeout)  

      elif proxy.split("://")[0] == "https":  
            conn = http.client.HTTPSConnection(proxy.split("://")[1], timeout=self.timeout)  


conn.request(self.command, req.path, req\_body, dict(req.headers))  
        res = conn.getresponse()  
        # res.response\_version = 'HTTP/1.1' if res.version

== 11 else ‘HTTP/1.0’
res_body = res.read() # Transfer-Encoding并不需要特殊处理(除了Content-Length外)

    except Exception as e:  

retryFlag += 1  
        # self.send\_error(502)  
        # return  
    else:  
        try:  

          if 'Content-Length' not in res.headers:  
                res.headers['Content-Length'] = str(len(res\_body))  

  setattr(res, 'headers', self.filter\_headers(res.headers))  
            self.send\_response\_only(res.status, res.reason)  

for keyword in res.headers:
self.send_header(keyword, res.headers.get(keyword, “”))
self.end_headers()

        self.wfile.write(res\_body)  
          self.wfile.flush()  
      except:  

pass  
      finally:  
          retryFlag = 9999  # 极大值,结束重试。  
          conn.close()  

其他方法重用GET的方法。

do_HEAD = do_GET
do_POST = do_GET
do_PUT = do_GET
do_DELETE = do_GET
do_OPTIONS = do_GET
do_TRACE = do_GET

代理协调者

主要实现:

导入代理列表

验证代理的可用性和匿名性

维护目标站点、代理列表二维表

根据维护的二维表,反馈可用的代理地址。

另外,我用的代理列表是从kuaidaili.com上爬取的,但代理的质量比较差,很头大。之前还用过xicidaili,情况也差不多。

验证公网IP的网站有如下几个:

ip.chinaz.com/getip.aspx

ifconfig.me/ip

api.ipify.org

ip.seeip.org

ifconfig.co/ip

myexternalip.com/raw

wtfismyip.com/text

icanhazip.com/

ipv4bot.whatismyipaddress.com/

ip4.seeip.org

测试验证

验证代码

透过shadowProxy访问http://ip.chinaz.com/getip.aspx,从而直观查看代理效果。

import requests
import time

i = 0
while True:
try:
i += 1
r =requests.get(“http://ip.chinaz.com/getip.aspx",proxies={"http":"http://127.0.0.1:8088"},timeout=10)
if r.status_code == 200:
msg = “第 %d 次请求 ✅)\t%s”%(i,r.text)
else:
msg = “第 %d 次请求 ⭕\t%d”% (i, r.status_code)

    time.sleep(2)  
except KeyboardInterrupt:  
    print('\r\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\n\t用户中断\t\n\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*\*')  
    break  
except

Exception as e:
msg = “第 %d 次请求 ❗\t%s” % (i, e )
time.sleep(2)

finally:
print(msg)

效果展示

动图展示 : test_shadowProxy.gif


枪是好枪,但还是存在一些问题的。

缺弹少药 - 通过工具爬取到的代理很多重复,很多不可用,只有百八十个。

弹药质量差 - 获取到的代理,很多无法传输大数据包(中断),小包也不稳定。

机枪卡壳 - 由于上述问题,所以工具容错能力/重试功能有待提升。(后续考虑提升的点)

PS:后续代码完善后,可以考虑开源发布。

2018-01-10 Update

目前代理加入了自动重试功能,使其能更稳定的进行查询。

同时,找了个还算不错的proxylist。 github.com/fate0/proxy…

目前便可以比较顺畅的使用了:

开源发布

github.com/odboy/shado…

生成&安装证书

生成证书

shadowProxy git:(master) ✗ ll
total 112
-rw-r–r– 1 bingo staff 573B Jan 10 16:42 PCtest.py
-rw-r–r– 1 bingo staff 5.9K Jan 10 16:42 ProxyCoordinator.py
-rw-r–r– 1 bingo staff 14B Jan 10 16:42 README.md
drwxr-xr-x 3 bingo staff 96B Jan 10 16:42 __pycache__
-rw-r–r– 1 bingo staff
100B Jan 10 16:42 proxylist-4.txt
-rw-r–r– 1 bingo staff 19K Jan 10 16:42 proxylist.txt
-rwxr-xr-x 1 bingo staff 302B Jan 10 16:42 setup_https_intercept.sh

-rw-r–r– 1 bingo staff 11K Jan 10 16:42 shadowProxy.py

shadowProxy git:(master) ✗ ./setup\_https\_intercept.sh   # 直接运行脚本,生成根证书  
Generating RSA private key, 2048 bit long

modulus
……………………………………………………………………………………………………………………….+++
…………………………………………………………+++
e is 65537 (0x10001)
Generating RSA private key, 2048 bit long modulus
…………………………………………………………………………………………………………………….+++
……………….+++
e is 65537 (0x10001)
安装证书

在浏览器设置代理,指向 http://127.0.0.1:8088 , 然后访问 shadow.proxy/,即可弹出证书安装。(…)


安装后,可以访问https网站。

使用测试

➜ shadowProxy git:(master) ✗ python shadowProxy.py -h
.–.
|o_o | ——————
|:_/ | < Author: Mr.Bingo >
// \ \ ——————
(| | ) < oddboy.cn >
/‘\_ _/`\ ——————
\\_)=(___/

usage:
shadowProxy.py [-h] [–bind BIND] [–port PORT]
[–log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}]

[–proxyListFile PROXYLISTFILE]

optional arguments:
-h, –help show this help message and exit
–bind BIND Default: 0.0.0.0

–port PORT Default: 8088
–log-level {DEBUG,INFO,WARNING,ERROR,CRITICAL}
Default: WARNING
–proxyListFile
PROXYLISTFILE
代理列表文件

➜ shadowProxy git:(master) ✗ python shadowProxy.py
.–.
|o_o | ——————
|:_/ | < Author: Mr.Bingo >
// \ \ ——————

(|     | ) <    oddboy.cn     >  
 /'\\_   \_/`\  ------------------  
 \\_\_\_)=(\_\_\_/  

初始化代理池 本地IP :: 36.110.16.74
导入代理池::: proxylist.txt
成功导入 110 个代理
Serving HTTP on 0.0.0.0 port 8088 (http://0.0.0.0:8088/) …
直接访问站点进行测试。


由于该工具主要基于网上免费的代理进行IP隐匿的,所以稳定性仍然不够好,所以只建议用于特定的请求包测试。 在使用过程中遇到什么问题,欢迎给我邮件,我会进行修复完善,如果可以,给我的GitHub点颗星星,谢谢!

本站文章均属原创,转载请注明出处!

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

问题备忘 httpclient连接池异常引发的惨案

发表于 2018-01-24
  1. 问题描述

客户端A –> Ngnix –> 服务B Ngnix做服务B的负载,客户端访问服务B时,客户端偶尔会有抛出TimeoutException异常。

举个例子:如A在09:59:48访问B,则服务B在09:59:53收到请求,并成功执行业务并返回。但是A会在10:00:05左右抛出TimeoutException。此时客户端A认为本次调用失败,然后走失败的业务逻辑。但是查找服务端的日志,发现实际业务在服务B上正常执行了,并正常返回。这样出现客户端和服务端两边数据不一致的问题。

这个问题是难点:

  1. 两边是通过公网调用,公网网络的复杂性会导致问题更复杂
  2. 无法直接定位是服务端还是客户端的问题
  3. 两边的程序不是同一个所写,由不同人维护。自己维护服务端代码,客户端代码是其它人维护
  4. 需要理解TCP/IP的通信协议
  5. 需要理解httplclent和ngnix的超时配置相关的知识

下面我按照以下顺序一一排查问题:

  1. 网络问题
  2. 超时配置参数的问题
  3. GC的问题
  4. 使用tcpdump抓包,分析网络包
  5. 修正代码问题
  6. 上线验证
  1. 问题分析处理的过程

2.1. 网络问题

由于客户端A和服务B是使用公网访问,最开始认为是网络抖动引起,并没有马上处理。但是运行一段时间后且这段时间服务器的流量不是很大,这个问题仍然每天不定时出现,所以猜测可能不是网络的问题。

2.2. 超时配置参数的问题

又猜测可能是超时参数配置的问题。整理服务端和客户端配置如下:
客户端httpclient
客户端httpclient的关于连接的配置相关的参数和意义如下:

  • SocketTimeout 是 5s

    • 连接建立时间,即三次握手完成时间
  • ConnectTimeout 是 3s

    • 连接建立后,数据传输过程中数据包之间间隔的最大时间
  • ConnectionRequestTimeout 是默认值

    • httpclient使用连接池来管理连接,这个时间就是从连接池获取连接的超时时间

这3个属性的关系如下:下图来自网络
这里写图片描述
虽然报文(“abc”)返回总共用了6秒,如果SocketTimeout设置成5秒,实际程序执行的时候是不会抛出java.net.SocketTimeoutException:
Read timed out异常的。
因为SocketTimeout的值表示的是“a”、”b”、”c”这三个报文,每两个相邻的报文的间隔时间没有能超过SocketTimeout。

ngnix端
ngnix端的超时相关的配置如下

  1. keepalive_timeout 10;
  2. send_timeout 60;

经过对以上配置参数的深入理解,再加上即使异常的请求服务端也有收到请求并正常执行业务,且每个请求处理完毕的时间都在100ms左右。总之即使以上的配置参数异常导致超时也跟这个问题的现象不同,所以初步排除配置异常的问题

2.3. GC的问题

请教组内的同事,可能是GC的问题,可能是GC的关系使服务暂停运行没有及时处理业务
在服务端B执行如下命令,发现没有执行过FGC,YGC的时间也是在合理范围内
jstat -gc pid 2000 20
这里写图片描述

在客户端A执行如下命令,发现其频繁执行FCG,YGC
jstat -gc pid 2000 20
使用”jmap -heap pid” 查看客户端A的堆的分配情况,发现其内存分配非常小,老年期的已用空间在96%以上。
综上,我们认为是在httpclient建立连接后,堆内存不足引发频繁的FGC,使得httpclient的无法在及时将数据发送请求到服务端,偶尔出现在5s临界点才发送数据到服务端成功
修改堆内存后,GC的问题解决了,但是很遗憾,这个问题没有被修正。

2.4. 使用tcpdump抓包,分析网络包

最后使出大招,在服务端172.23.4.33端使用tcpdump抓包,内容如下:
这里写图片描述

蓝色框的部分:之前有一个正常的请求,通信完毕后10s,ngnix由于keep-alive=10s时间到了,发送请求通知关闭连接 close notify信令。但是这里有个问题,这里服务端发送FIN信令,而客户端没有发送FIN信令,不符合标准的TCP的四次挥手协议。即连接在服务端已经关闭,而客户端没有关闭。

这里的Alert (level warning description close notify):表示发送方会关闭这个连接,不会在这个连接上发送任何数据

红色框的部分
红色的部分和黄色的部分正好相隔5s:

红色的部分的包的时间,和我们定位的异常请求开始的时间相同,这里的操作可以理解为客户端和服务端相互关闭链接。加上这里的端口和上面蓝色部分的包的端口组相同,所有这两部分是对相同的连接执行操作,这里就比较奇怪。为什么数据传输完毕后,要经过19s才执行连接关闭推行。貌似这个关闭操作是由下一个请求触发的。

后面阅读httpclient的源码发现,httpclient连接池在执行新的请求,如果发现连接异常时,会调用releaseconnection操作,会先执行释放之前连接操作

黄色框的部分
和红色框的部分正好相隔5s
这里是通过3次握手建立连接,然后再执行https进行加密传输。对https的后面的Application Data的解密我们发现这个的确是客户端发送到服务端的数据。现象好像是第一次请求执行失败,httpclient重新发起新的请求

所以以上的包和异常现象正好吻合,我们猜测如下: 客户端A在09:59:48想重用上一次使用的TCP连接,但是发现连接已经关闭。服务端发送RST信令,通话双方重置连接。但是不知道什么原因客户端没有马上重置连接,而在等待5s超时后,然后才向服务端重新建立新的连接,并发送数据,服务端收到数据并执行请求。但是此时客户端发现整个连接时间已经超过5s,抛出TimeoutException。

2.5. 检查代码问题

查看客户端代码,发现有两个问题 一是:httpclient设置(.setRetryHandler(new HttpRetryHandler(3)))重试3次,已知httpclient的connectiontime是最多是5s,但是future.get(5000, TimeUnit.MILLISECONDS)只等待5s,所有如果发生第一次请求失败,则这个肯定会失败。这里客户端的请求服务端的操作和处理返回结果是异步,整个完成请求最多需要20s,而对返回结果的处理是5s之内,如果5s内没有返回,则抛出TimeoutException。但是httpclient的请求并没有被中断,继续执行。这也解释了为什么请求在服务端正常执行,而客户端抛出TimeoutException异常。

1
2
3
4
5
6
7
8
9
10
11
复制代码// 定义httpClient 
httpClient = HttpClients.custom().setConnectionManager(connectionManager)
.setDefaultRequestConfig(defaultRequestConfig)
// 设置重试3次
.setRetryHandler(new HttpRetryHandler(3))
.setMaxConnPerRoute(500).build();
//利用future管理回调
Future<Object> future = executorService.submit(new CmdTask(sId, manageCenter.getService(provide), cmd, command.entry, provide, manageCenter.queryProvide(provide)));
//等待5秒超时
Object resp = future.get(5000, TimeUnit.MILLISECONDS);
Pair<Integer, String> pairResp = null;

二是:httpclient使用了PoolingHttpClientConnectionManager连接池,使用PoolingHttpClientConnectionManager需要注意,详细见官方文档:简单地说PoolingHttpClientConnectionManager里存储的连接,如果连接被服务器端关闭了,客户端监测不到连接的状态变化。在httpclient中,当连接空闲超过10s后,服务端会关闭本端连接。但是客户端的连接一直保持连接,即使服务端关闭连接,客户端也不会关闭连接。所以下次使用连接,程序从连接中获取一个连接(即使这个连接已经被服务端),也需要进行确认,如果发现连接异常,则服务端会发送RST信令,双方重新建立新的连接。

为了解决这个问题HttpClient会在使用某个连接前,监测这个连接是否已经过时,如果服务器端关闭了连接,那么会重现建立一个连接。但是这种过时检查并不是100%有效。所以建立创建一个监控进程来专门回收由于长时间不活动而被判定为失效的连接:

1
2
3
4
5
6
7
8
9
复制代码Timer timer = new Timer();
timer.schedule(new TimerTask() {
@Override
public void run() {
System.out.println("=====closeIdleConnections===");
connectionManager.closeExpiredConnections();
connectionManager.closeIdleConnections(5, TimeUnit.SECONDS);
}
}, 0, 5 * 1000);

PoolingHttpClientConnectionManager里异常连接没有及时释放,这也解释了之前的抓包中为什么每次请求会先使用旧的连接,发现连接关闭后,又重新建立新的连接的现象。

修改代码:

  1. 创建定时任务关闭PoolingHttpClientConnectionManager的异常连接,释放连接和连接相关的资源
  2. 关闭重试操作

2.6 修改代码后上线验证

修改代码,上线,观察4天,问题没有再出现,捕获新的包查看新的双交互包
这里写图片描述

红框部分是:客户端和服务端有正常的请求
黄框部分是:之前的连接使用后,服务端过10s释放连接,是因为ngnix的keepalive的时间是10s,172.23.4.43通知106.2.33.40关闭请求,106.2.33.40发送FIN,ACK信令到172.23.4.43,表示自己已经连接,并通知对方关闭连接. 172.23.4.43收到信令后,关闭连接,并发送FIN,ACK信令,
以上是完整的连接断开四次挥手,整个会话连接完全关闭。

绿框部分:客户端和服务端发启新的请求,此时第一步是客户端和服务端是三次握手建立建立

本文转载自: 掘金

开发者博客 – 和开发相关的 这里全都有

1…899900901…956

开发者博客

9558 日志
1953 标签
RSS
© 2025 开发者博客
本站总访问量次
由 Hexo 强力驱动
|
主题 — NexT.Muse v5.1.4
0%