迈出spring5源码阅读第一步,如何编译源码并使用IDEA打开

一、下载源码

这里使用的是5.2.8.RELEASE版本!

1
git clone --branch v5.2.8.RELEASE https://gitee.com/Z201/spring-framework.git

二、安装gradle

这里使用的是6.0.1版本!

三、修改源码下的文件内容

3.1 修改settings.gradle文件

加入阿里云的地址

1
2
3
4
5
repositories {
gradlePluginPortal()
maven { url 'https://maven.aliyun.com/repository/public' }
maven { url 'https://repo.spring.io/plugins-release' }
}

3.2 修改gradle.properties文件

1
2
3
4
5
6
version=5.2.8.RELEASE
org.gradle.jvmargs=-Xmx2048M
org.gradle.caching=true
org.gradle.parallel=true
org.gradle.configureondemand=true
org.gradle.daemon=true

3.3 修改build.gradle文件

1
2
maven { url 'https://maven.aliyun.com/nexus/content/groups/public/' }
maven { url 'https://maven.aliyun.com/nexus/content/repositories/jcenter'}

3.4 编译源码

编译spring-oxm模块,编译命令:

1
gradlew :spring-oxm:compileTestJava

编译成功后的截图

四、IDEA导入源码

4.1 下载Idea kotlin插件安装

4.2 IDEA导入Spring源码

1
File -> New -> Project from Existing Souces...


最终编译成功后的截图:

Ambari大数据平台集群利器的探索与实践

一、简介

Ambari是 Apache Software Foundation 中的一个顶级项目,它是用来创建、管理、监视Hadoop整个生态圈的工具。 Ambari是分布式架构,主要由Ambari Server和Ambari Agent组成。

HDP是hortonworks的软件栈,里面包含了hadoop生态系统的所有软件项目,其实就是软件包合集。

HDP-UTILS是工具类库。各软件版本对照表:

https://supportmatrix.hortonworks.com/

hadoop生态圈示图:

二、环境准备

2.1 资源要求

2.2 软件要求

2.3 软件下载地址

1
2
3
[root@ambari-server01 ~]# wget http://public-repo-1.hortonworks.com/ambari/centos7/2.x/updates/2.7.1.0/ambari-2.7.1.0-centos7.tar.gz
[root@ambari-server01 ~]# wget http://public-repo-1.hortonworks.com/HDP/centos7/3.x/updates/3.0.1.0/HDP-3.0.1.0-centos7-rpm.tar.gz
[root@ambari-server01 ~]# wget http://public-repo-1.hortonworks.com/HDP-UTILS-1.1.0.22/repos/centos7/HDP-UTILS-1.1.0.22-centos7.tar.gz

2.4 系统初始化,执行以下脚本(所有节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
#!/bin/bash
#
#服务器初始化脚本

# 关闭防火墙
systemctl stop firewalld
systemctl disable firewalld

# 关闭 SeLinux
setenforce 0
sed -i "s/SELINUX=enforcing/SELINUX=disabled/g" /etc/selinux/config

# 关闭 swap
swapoff -a
sed -i 's/.*swap.*/#&/' /etc/fstab
# echo "1" >/proc/sys/net/bridge/bridge-nf-call-iptables

# 安装必要的调试程序
yum install -y vim lrzsz telnet net-tools tar wget
yum install -y java-1.8.0-openjdk*

# 安装配置时间同步服务
yum install -y ntp
systemctl start ntpd.service
systemctl enable ntpd.service

# 设置进程文件最大打开数量
echo "* soft nofile 65535" >> /etc/security/limits.conf
echo "* hard nofile 65535" >> /etc/security/limits.conf

# 设置系统总限制文件最大打开数量
echo 6553560 > /proc/sys/fs/file-max
echo "fs.file-max = 6553560" >> /etc/sysctl.conf

2.5 设置主机名(所有节点)

1
2
3
4
5
6
7
8
9
10
11
# 三台主机分别设置hostname
[root@ambari-server01 ~]# hostnamectl set-hostname ambari-server01.test.com
[root@ambari-node01 ~]# hostnamectl set-hostname ambari-node01.test.com
[root@ambari-node02 ~]# hostnamectl set-hostname ambari-node02.test.com

# 添加hosts解析(所有节点)
[root@ambari-server01 ~]# cat >> /etc/hosts << EOF
192.168.2.111 ambari-server01.test.com
192.168.2.112 ambari-node01.test.com
192.168.2.113 ambari-node02.test.com
EOF

2.6 设置免密登陆(ambari-server01节点)

1
2
3
4
5
6
7
# 在192.168.2.111节点执行:
[root@ambari-server01 ~]# ssh-keygen
[root@ambari-server01 ~]# ssh-copy-id ambari-server01.test.com
[root@ambari-server01 ~]# ssh-copy-id ambari-node01.test.com
[root@ambari-server01 ~]# ssh-copy-id ambari-node02.test.com
[root@ambari-server01 ~]# scp -r .ssh/ ambari-node01.test.com:/root
[root@ambari-server01 ~]# scp -r .ssh/ ambari-node02.test.com:/root

三、安装mysql相关(ambari-server01节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
# 下载并安装mysql
[root@ambari-server01 ~]# wget -i -c http://dev.mysql.com/get/mysql-community-release-el7-5.noarch.rpm
[root@ambari-server01 ~]# yum -y install mysql-community-release-el7-5.noarch.rpm
[root@ambari-server01 ~]# yum -y install mysql-community-server

# 启动mysql
[root@ambari-server01 ~]# systemctl start mysqld.service # 启动mysql
[root@ambari-server01 ~]# systemctl status mysqld.service # 查看mysql状态
[root@ambari-server01 ~]# systemctl enable mysqld.service # 开机自启

# 设置mysql密码
> set password for root@localhost = password('root123'); # 方法一,需要登陆mysql执行

# 创建ambari数据库及用户名和密码
mysql> create database ambari character set utf8;
mysql> CREATE USER 'ambari'@'%'IDENTIFIED BY 'Ambari123';
mysql> GRANT ALL PRIVILEGES ON ambari.* TO 'ambari'@'%';
mysql> GRANT ALL PRIVILEGES ON ambari.* TO 'ambari'@'ambari-server01.test.com'IDENTIFIED BY 'Ambari123';
mysql> FLUSH PRIVILEGES;

# 创建hive数据库及用户名和密码(如果不需要安装hive服务,则不需要配置以下数据库)
mysql> create database hive character set utf8;
mysql> CREATE USER 'hive'@'%'IDENTIFIED BY 'Hive123';
mysql> GRANT ALL PRIVILEGES ON hive.* TO 'hive'@'%';
mysql> GRANT ALL PRIVILEGES ON hive.* TO 'ambari'@'ambari-server01.test.com'IDENTIFIED BY 'Hive123';
mysql> FLUSH PRIVILEGES;

# 如果需要安装oozie、ranger服务,创建数据库方法同上。

# 下载 mysql-connector-java-5.1.46-bin.jar包
# 复制到/var/lib/ambari-server/resources/mysql-connector-java-5.1.46-bin.jar目录;
# 再复制到/usr/share/java/mysql-connector-java-5.1.46-bin.jar到这个目录一份。

四、利用httpd创建本地yum的repo源(ambari-server01节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
# 安装软件
[root@ambari-server01 ~]# yum -y install yum-utils createrepo httpd
# 创建目录
[root@ambari-server01 ~]# mkdir -p /var/www/html/ambari/
[root@ambari-server01 ~]# mkdir -p /var/www/html/hdp/
[root@ambari-server01 ~]# mkdir -p /var/www/html/hdp/HDP-UTILS/
# 解压tar包
[root@ambari-server01 ~]# tar -zxvf ambari-2.7.1.0-centos7.tar.gz -C /var/www/html/ambari/
[root@ambari-server01 ~]# tar -zxvf HDP-3.0.1.0-centos7-rpm.tar.gz -C /var/www/html/hdp/
[root@ambari-server01 ~]# tar -zxvf HDP-UTILS-1.1.0.22-centos7.tar.gz -C /var/www/html/hdp/HDP-UTILS/
# 启动httpd
[root@ambari-server01 ~]# systemctl start httpd
[root@ambari-server01 ~]# systemctl enable httpd
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
**浏览器查看,如下即为正常**

![](https://p1-juejin.byteimg.com/tos-cn-i-k3u1fbpfcp/6eda665d14c04ee9bb69a5dbbe28b23a~tplv-k3u1fbpfcp-zoom-1.image)

# 配置本地repo
[root@ambari-server01 ~]# wget -O /etc/yum.repos.d/ambari.repo http://public-repo-1.hortonworks.com/ambari/centos7/2.x/updates/2.7.1.0/ambari.repo

# 将下载的repo修改内容为如下
[root@ambari-server01 ~]# vim /etc/yum.repos.d/ambari.repo
#VERSION_NUMBER=2.7.1.0-139
[ambari-2.7.1.0]
#json.url = http://public-repo-1.hortonworks.com/HDP/hdp_urlinfo.json
name=ambari Version - ambari-2.7.1.0
baseurl=http://192.168.2.111/ambari/ambari/centos7/2.7.1.0-169
gpgcheck=1
gpgkey=http://192.168.2.111/ambari/ambari/centos7/2.7.1.0-169/RPM-GPG-KEY/RPM-GPG-KEY-Jenkins
enabled=1
priority=1

# 配置HDP和HDP-UTILS的repo
[root@ambari-server01 ~]# vim /etc/yum.repos.d/HDP.repo
#VERSION_NUMBER=3.0.1.0-187
[HDP-3.0.1.0]
name=HDP Version - HDP-3.0.1.0
baseurl=http://192.168.2.111/hdp/HDP/centos7
gpgcheck=1
gpgkey=http://192.168.2.111/hdp/HDP/centos7/3.0.1.0-187/RPM-GPG-KEY/RPM-GPG-KEY-Jenkins
enabled=1
priority=1
[HDP-UTILS-1.1.0.22]
name=HDP-UTILS Version - HDP-UTILS-1.1.0.22
baseurl=http://192.168.2.111/hdp/HDP-UTILS/centos7/1.1.0.22
gpgcheck=1
gpgkey=http://192.168.2.111/hdp/HDP-UTILS/centos7/1.1.0.22/RPM-GPG-KEY/RPM-GPG-KEY-Jenkins
enabled=1
priority=1

# 将repo拷贝到其他节点
[root@ambari-server01 ~]# cd /etc/yum.repos.d/
[root@ambari-server01 ~]# scp ambari.repo HDP.repo root@192.168.2.112:/etc/yum.repos.d/
[root@ambari-server01 ~]# scp ambari.repo HDP.repo root@192.168.2.113:/etc/yum.repos.d/

# 生成本地源
[root@ambari-server01 ~]# createrepo /var/www/html/hdp/HDP/centos7/
[root@ambari-server01 ~]# createrepo /var/www/html/hdp/HDP-UTILS/

五、安装ambari集群

5.1 安装ambari-server(ambari-server01节点)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58

[root@ambari-server01 ~]# yum -y install ambari-server
[root@ambari-server01 ~]# ambari-server setup
Using python /usr/bin/python
Setup ambari-server
Checking SELinux...
SELinux status is 'disabled'
Customize user account for ambari-server daemon [y/n] (n)? y
Enter user account for ambari-server daemon (root):root # 用户
Adjusting ambari-server permissions and ownership...
Checking firewall status...
Checking JDK...
[1] Oracle JDK 1.8 + Java Cryptography Extension (JCE) Policy Files 8
[2] Custom JDK
==============================================================================
Enter choice (1): 2 # 选择自定义jdk
WARNING: JDK must be installed on all hosts and JAVA_HOME must be valid on all hosts.
WARNING: JCE Policy files are required for configuring Kerberos security. If you plan to use Kerberos,please make sure JCE Unlimited Strength Jurisdiction Policy Files are valid on all hosts.
Path to JAVA_HOME: /usr/lib/jvm/java-1.8.0-openjdk-1.8.0.262.b10-0.el7_8.x86_64/ #jdk安装路径
Validating JDK on Ambari Server...done.
Check JDK version for Ambari Server...
JDK version found: 8
Minimum JDK version is 8 for Ambari. Skipping to setup different JDK for Ambari Server.
Checking GPL software agreement...
GPL License for LZO: https://www.gnu.org/licenses/old-licenses/gpl-2.0.en.html
Enable Ambari Server to download and install GPL Licensed LZO packages [y/n] (n)? y
Completing setup...
Configuring database...
Enter advanced database configuration [y/n] (n)? y
Configuring database...
==============================================================================
Choose one of the following options:
[1] - PostgreSQL (Embedded)
[2] - Oracle
[3] - MySQL / MariaDB
[4] - PostgreSQL
[5] - Microsoft SQL Server (Tech Preview)
[6] - SQL Anywhere
[7] - BDB
==============================================================================
Enter choice (1): 3 # 选择安装的mysql
Hostname (localhost): ambari-server01.test.com # 配置hostname
Port (3306): # 默认
Database name (ambari): # 默认
Username (ambari): # 默认
Enter Database Password (bigdata): # 输入密码
Re-enter password:
Configuring ambari database...
Should ambari use existing default jdbc /usr/share/java/mysql-connector-java.jar [y/n] (y)? y
Configuring remote database connection properties...
WARNING: Before starting Ambari Server, you must run the following DDL directly from the database shell to create the schema: /var/lib/ambari-server/resources/Ambari-DDL-MySQL-CREATE.sql # 此处需注意,启动ambari之前需要执行此句
Proceed with configuring remote database connection properties [y/n] (y)? y
Extracting system views...
ambari-admin-2.7.3.0.139.jar
....
Ambari repo file contains latest json url http://public-repo-1.hortonworks.com/HDP/hdp_urlinfo.json, updating stacks repoinfos with it...
Adjusting ambari-server permissions and ownership...
Ambari Server 'setup' completed successfully. # 安装成功

5.2 使用ambari用户登陆mysql

1
2
3
[root@ambari-server01 ~]# mysql -u ambari -pAmbari123 -h ambari-server01.test.com
mysql> source /var/lib/ambari-server/resources/Ambari-DDL-MySQL-CREATE.sql;
mysql> show tables;

5.3 启动ambari-Server

1
[root@ambari-server01 ~]# ambari-server start

5.4 安装ambari-agent(ambari-node01、ambari-node02节点)

1
[root@ambari-node01 ~]# yum -y install ambari-agent

六、Ambari web页面配置

1
2
3
访问地址:http://192.168.2.111:8080/
默认账号:admin
默认密码:admin

进行web页面配置时没有截图,以下截图为网上下载,仅供参考

6.1 选择版本

6.2 配置节点、密钥

6.3 主机确认

6.4 选择要安装的组件

6.5 节点分配

6.6 分配从属和客户端

6.7 定制服务


1
2
# 执行以下命令(ambari-server01节点)
[root@ambari-server01 ~]# ambari-server setup --jdbc-db=mysql --jdbc-driver=/usr/share/java/mysql-connector-java-5.1.46-bin.jar

6.8 设置存储路径等



6.9 集群情况预览并部署




等待启动完毕,不用担心警告,后期可以调整,搭建完成,可以在展示页面进行查看集群状态

可以查看监控界面,可以看到大数据组件中出现错误,单个组件点开处理

6.10 监控面板

6.10.1 监控面板-主机状态

6.10.2 监控面板-配置历史(可针对单个服务调整配置)

6.11 新部署其他服务

6.12 添加node节点

6.13 配置邮件报警

6.14 ambari会自动安装grafana作为监控面板


至此,搭建完成!

时间序列数据库InfluxDB集群方案高级实践经验,请关注

如果监控全部接入单节点influxdb显然不能满足需求,Influxdb免费版不支持集群,商业版按照节点数收费不合理,在查询Influxdb监控数据的时候天然不会跨表(Measurement)查询。

一、数据处理

连接kafka/RocketMQ服务从其中获取数据,处理数据之后调用代理网关将数据插入数据库中。

二、查询代理

预处理查询语句,过滤危险查询代码,调用查询InfluxDB数据库。

三、网关层

网关用proxy模式来做数据分片,它的功能包括:

  • 按 db 和 measurement 分片写入不同节点
  • 聚合查询请求
  • 保存节点配置、转发规则到nacos中
  • 动态配置和更新网关中的配置
  • 在某个节点宕机后,下线或者替换某个节点
  • 基于raft分布式一致性协议实现AP

四、简易架构图

数据治理的王者Apache-Atlas如何构建自己的API

Apache Atlas是一个优秀的服务治理组件,用于企业Hadoop集群上的数据治理和元数据管理的数据治理工具。接下来我们将讨论构建自己的Java API,这些Java API可使用Apache atlas客户端与Apache Atlas交互以在其中创建新的实体和类型。

一、Atlas客户端Maven依赖关系

以下依赖项可用于pom.xml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-repository</artifactId>
<version>0.7-incubating</version>
</dependency>

二、设置atlas-application.properties

Apache Atlas客户端使用atlas-application属性在我们的API和Apache Atlas服务器之间建立连接。这些属性应放置在resources/atlas-application.properties中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#########  Security Properties  #########

# SSL config
atlas.enableTLS=false

######### Server Properties #########
atlas.rest.address=http://192.168.5.95:21000

atlas.hook.demo.kafka.retries=1
atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181
atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.connection.timeout.ms=2000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.hook.group.id=atlas

三、创建与Atlas服务器的连接

要与Apache atlas Server,baseUrl和用户名创建连接,必须在AtlasClient构造函数中传递密码

1
2
3
4
final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});

四、关于Type相关的测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class AtlasTypesTest {

final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});

static final String DATABASE_TYPE = "DB_Sync";
static final String COLUMN_TYPE = "Column_Sync";
static final String TABLE_TYPE = "Table_Sync";
static final String VIEW_TYPE = "View_Sync";
public static final String DB_ATTRIBUTE = "db";
static final String STORAGE_DESC_TYPE = "StorageDesc";
public static final String COLUMNS_ATTRIBUTE = "columns";
public static final String INPUT_TABLES_ATTRIBUTE = "inputTables";
private static final String[] TYPES =
{DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess",
"ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"};

/**
* 组织定义types
* @return
*/
TypesDef createTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()),
attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()));

HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
.createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()),
attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName()));

HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
.createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"),
new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()),
attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()),
attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()),
attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()),
attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()),
new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null));

HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
.createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null));

return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(),
ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef));
}

private void createTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
System.out.println("typesAsJSON = " + typesAsJSON);
atlasClient.createType(typesAsJSON);
verifyTypesCreated();
}

private void verifyTypesCreated() throws Exception {
List<String> types = atlasClient.listTypes();
for (String type : TYPES) {
assert types.contains(type);
}
}

AttributeDefinition attrDef(String name, String dT) {
return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
}

AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite,
String reverseAttributeName) {
return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName);
}

@Test
public void createNewTypes() throws Exception {
createTypes();
}
}

五、关于Entities相关的测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
public class AtlasEntitiesTest {


final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});

/**
* 创建实例并返创建的Id对象
* @param referenceable
* @return
* @throws Exception
*/
private Id createInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName();
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
List<String> guids = atlasClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),
referenceable.getTypeName());
}

/**
* 创建数据库实例并返创建的数据库Id对象
* @param name
* @param description
* @param owner
* @param locationUri
* @param traitNames
* @return
* @throws Exception
*/
Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());

return createInstance(referenceable);
}

/**
* 创建列的实例并返创建的列的实例对象
* @param name
* @param dataType
* @param comment
* @param traitNames
* @return
* @throws Exception
*/
Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("dataType", dataType);
referenceable.set("comment", comment);

return referenceable;
}

/**
* 创建表的实例并返创建的表的Id对象
* @param name
* @param description
* @param dbId
* @param sd
* @param owner
* @param tableType
* @param columns
* @param traitNames
* @return
* @throws Exception
*/
Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("createTime", System.currentTimeMillis());
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId);
referenceable.set("sd", sd);
referenceable.set("columns", columns);

return createInstance(referenceable);
}

/**
* 创建视图的实例并返创建的视图的Id对象
* @param name
* @param dbId
* @param inputTables
* @param traitNames
* @return
* @throws Exception
*/
Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("db", dbId);

referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);

return createInstance(referenceable);
}

/**
* 原始存储描述符
* @param location
* @param inputFormat
* @param outputFormat
* @param compressed
* @return
* @throws Exception
*/
Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)
throws Exception {
Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
referenceable.set("location", location);
referenceable.set("inputFormat", inputFormat);
referenceable.set("outputFormat", outputFormat);
referenceable.set("compressed", compressed);

return referenceable;
}


@Test
public void createEntities() throws Exception {
//创建数据库实例
Id syncDB = database("sy_sync", "Sync Database", "root", "");
//存储描述符
Referenceable sd =
storageDescriptor("", "TextInputFormat", "TextOutputFormat",
true);
//创建列实例
//1、数据源
List<Referenceable> databaseColumns = ImmutableList
.of(column("id", "long", "id"),
column("name", "string", "name"),
column("type", "string", "type"),
column("url", "string", "url"),
column("database_name", "string", "database name"),
column("username", "string", "username"),
column("password","string","password"),
column("description", "string", "description"),
column("create_time", "string", "create time"),
column("update_time", "string", "update time"),
column("create_id", "long", "user id"),
column("update_id", "long", "user id"));
//2、同步文件夹
List<Referenceable> syncFolderColumns = ImmutableList
.of(column("id", "long", "id"),
column("name", "string", "name"),
column("description", "string", "description"),
column("create_time", "string", "create time"),
column("update_time", "string", "update time"),
column("create_id", "long", "user id"),
column("update_id", "long", "user id"));
//创建表实例
Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns);
Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns);
//创建视图实例

}


@Test
public void getEntity() throws AtlasServiceException {
Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b");
System.out.println(InstanceSerialization.toJson(referenceable, true));
}

}

CountDownLatch、Semaphore、CyclicBarrier、Condition源码分析

一、CountDownLatch

1.1 定义

它是一个同步辅助类,允许一个或多个线程等待,直到在其他线程中执行的一组操作完成。一个倒数计算的概念。
初始化给定一定的整数参数值,然后通过countDown()来实现倒数功能,在这个整数倒数到 0 之前,调用了 await() 方法的程序都必须要等待,当到达0后, 释放所有等待线程。

1.2 源码分析

对于 CountDownLatch,我们仅仅需要关心两个方法,一个是 countDown() 方法,另一个是 await() 方法。

1.2.1 countDown()

CountDownLatch有一个同步内部类Sync
它使用AQS状态表示计数,实现同步控制。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* Synchronization control For CountDownLatch.
* Uses AQS state to represent count.
*/
private static final class Sync extends AbstractQueuedSynchronizer {
private static final long serialVersionUID = 4982264981922014374L;

Sync(int count) {
setState(count);
}

int getCount() {
return getState();
}

protected int tryAcquireShared(int acquires) {
return (getState() == 0) ? 1 : -1;
}

protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}
}

countDown方法调用Sync中releaseShared()方法

1
2
3
public void countDown() {
sync.releaseShared(1);
}

所有的线程由于调用了await()方法阻塞了,只能等到countDown()使得state=0的时候才会被全部唤醒。

1
2
3
4
5
6
7
8
9
10
11
12
protected boolean tryReleaseShared(int releases) {
// Decrement count; signal when transition to zero
//使用自旋的方式实现state-1
for (;;) {
int c = getState();
if (c == 0)
return false;
int nextc = c-1;
if (compareAndSetState(c, nextc))
return nextc == 0;
}
}

当state递减为0的时候,tryReleaseShared才返回true;否则只是返回state-1的值;
如果state=0,调用doReleaseShared()方法,唤醒等待的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
private void doReleaseShared() {

for (;;) {
Node h = head;
if (h != null && h != tail) {
int ws = h.waitStatus;
if (ws == Node.SIGNAL) {
if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))
continue; // loop to recheck cases
unparkSuccessor(h);
}
//PROPAGATE的节点状态,表示处于共享模式,会对线程的唤醒进行传播
else if (ws == 0 &&
!compareAndSetWaitStatus(h, 0, Node.PROPAGATE))
continue; // loop on failed CAS
}
//如果前面唤醒的线程占领了head,那么再进行循环,通过头节点检查是否改变了,如果改变了就继续循环
if (h == head) // loop if head changed
break;
}
}

在线程被唤醒的执行顺序中:

  • h == head 表示头节点还没有被使用;
  • unparkSuccessor(h) 表示唤醒的线程;
  • h != head 表示头节点被刚刚唤醒的线程占用。

1.2.2 await()

1
2
3
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}

如果state<0,那么当前线程需要加入到共享锁队列中,执行doAcquireSharedInterruptibly()方法。

1
2
3
4
5
6
7
public final void acquireSharedInterruptibly(int arg)
throws InterruptedException {
if (Thread.interrupted())
throw new InterruptedException();
if (tryAcquireShared(arg) < 0)
doAcquireSharedInterruptibly(arg);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
private void doAcquireSharedInterruptibly(int arg)
throws InterruptedException {
//SHARED为共享模式,创建一个共享模式的节点到队列中
final Node node = addWaiter(Node.SHARED);
boolean failed = true;
try {
for (;;) {
final Node p = node.predecessor();
if (p == head) {
//尝试获取锁
int r = tryAcquireShared(arg);
//获得了锁并且state!=0,下面的代码则不会执行
if (r >= 0) {
//把唤醒的节点,设置成head节点
setHeadAndPropagate(node, r);
p.next = null; // help GC
failed = false;
return;
}
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
throw new InterruptedException();
}
} finally {
if (failed)
cancelAcquire(node);
}
}

把唤醒的节点,设置成head节点,当第一个线程被唤醒后,并设置为head节点,依次会唤醒第二个线程……

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
private void setHeadAndPropagate(Node node, int propagate) {
Node h = head; // Record old head for check below
setHead(node);
/*
* Try to signal next queued node if:
* Propagation was indicated by caller,
* or was recorded (as h.waitStatus either before
* or after setHead) by a previous operation
* (note: this uses sign-check of waitStatus because
* PROPAGATE status may transition to SIGNAL.)
* and
* The next node is waiting in shared mode,
* or we don't know, because it appears null
*
* The conservatism in both of these checks may cause
* unnecessary wake-ups, but only when there are multiple
* racing acquires/releases, so most need signals now or soon
* anyway.
*/
if (propagate > 0 || h == null || h.waitStatus < 0 ||
(h = head) == null || h.waitStatus < 0) {
Node s = node.next;
if (s == null || s.isShared())
doReleaseShared();
}
}

二、Semaphore

2.1 定义

从单词的意思理解为信号灯,它可以控制同时访问程序的线程个数,比如停车场总共有100个车位,那么这时一下子来了150辆车需要停放在此停车场,必须要等到停车场有空余的位置才能让停满剩下的车进入此停车场。使用场景可用于限流。

两个重要的方法,acquire()获取一个许可,release()释放一个许可。

2.2 源码分析

2.2.1 FairSync 公平策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class FairSync extends Sync {
private static final long serialVersionUID = 2014338818796000944L;

FairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
for (;;) {
//判断是否有线程在排队,然后再进行CAS操作
if (hasQueuedPredecessors())
return -1;
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}
}

2.2.2 NonFairSync 非公平策略

公平与非公平策略只是多了个hasQueuedPredecessors()判断。

1
2
3
4
5
6
7
8
9
10
11
static final class NonfairSync extends Sync {
private static final long serialVersionUID = -2694183684443567898L;

NonfairSync(int permits) {
super(permits);
}

protected int tryAcquireShared(int acquires) {
return nonfairTryAcquireShared(acquires);
}
}

nonfairTryAcquireShared()方法源码:

1
2
3
4
5
6
7
8
9
final int nonfairTryAcquireShared(int acquires) {
for (;;) {
int available = getState();
int remaining = available - acquires;
if (remaining < 0 ||
compareAndSetState(available, remaining))
return remaining;
}
}

其它源码和 CountDownLatch 的是完全一样,都是基于共享锁的实现的。

三、CyclicBarrier

3.1 定义

从单词组成的意思理解为循环屏障。所谓屏障就是一个同步点,当一组线程到达这个同步点的时候被阻塞了,只有最后一个线程到达这个同步点的时候,屏障(也就是同步点)的大门才会打开,所有被拦截在大门之外的线程才会进入大门而继续工作。可以适用的场景于所有的子线程完成任务后,再执行主线程。

3.2 源码分析

1
2
3
4
5
6
public CyclicBarrier(int parties, Runnable barrierAction) {
if (parties <= 0) throw new IllegalArgumentException();
this.parties = parties;
this.count = parties;
this.barrierCommand = barrierAction;
}

构造方法参数parties,表示参与线程的个数;
每一个线程调用await()方法后,parties递减1,穿过屏障的大门(栅栏)后重置。
第二个参数barrierAction为Runnable实例,由最后一个到达的线程进行执行,如果没有需要执行的,设置为null。

四、Condition

4.1 定义

它是一个用来多线程的协调通信的工具类,当某个线程阻塞等待某个条件时,当满足条件才会被唤醒。

4.2 源码分析

两个重要方法,await()和signal()。

4.2.1 await()

调用此方法会使得线程进入等待队列并释放锁,线程的状态变成等待状态。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public final void await() throws InterruptedException {
//允许线程中断
if (Thread.interrupted())
throw new InterruptedException();
//创建一个状态为condition的节点,采用链表的形式存放数据
Node node = addConditionWaiter();
//释放当前的锁,得到锁的状态,释放等待队列中的一个线程
int savedState = fullyRelease(node);
int interruptMode = 0;
//判断当前节点是或否在队列上
while (!isOnSyncQueue(node)) {
//挂起当前线程
LockSupport.park(this);
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
//acquireQueued为false就拿到了锁
//interruptMode != THROW_IE表示这个线程没有成功将 node 入队,但 signal 执行了 enq 方法让其入队了
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
//将这个变量设置成 REINTERRUPT
interruptMode = REINTERRUPT;
//如果node节点的下一个等待者不为空,则开始进行清理,清理condition节点
if (node.nextWaiter != null) // clean up if cancelled
unlinkCancelledWaiters();
//如果线程中断了,需要抛出异常
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
  • addConditionWaiter()源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
private Node addConditionWaiter() {
Node t = lastWaiter;
// If lastWaiter is cancelled, clean out.
//如果lastWaiter不等于空并且waitStatus不为condition,把这个节点从链表中移除
if (t != null && t.waitStatus != Node.CONDITION) {
unlinkCancelledWaiters();
t = lastWaiter;
}
//创建一个状态为condition的单向列表
Node node = new Node(Thread.currentThread(), Node.CONDITION);
if (t == null)
firstWaiter = node;
else
t.nextWaiter = node;
lastWaiter = node;
return node;
}
  • fullyRelease()方法源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final int fullyRelease(Node node) {
boolean failed = true;
try {
//获得重入的次数
int savedState = getState();
//释放并唤醒同步队列中的线程
if (release(savedState)) {
failed = false;
return savedState;
} else {
throw new IllegalMonitorStateException();
}
} finally {
if (failed)
node.waitStatus = Node.CANCELLED;
}
}
  • isOnSyncQueue()方法源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
final boolean isOnSyncQueue(Node node) {
//判断当前节点是否在队列中,false表示不在,true表示在
if (node.waitStatus == Node.CONDITION || node.prev == null)
return false;
if (node.next != null) // If has successor, it must be on queue
return true;
/*
* node.prev can be non-null, but not yet on queue because
* the CAS to place it on queue can fail. So we have to
* traverse from tail to make sure it actually made it. It
* will always be near the tail in calls to this method, and
* unless the CAS failed (which is unlikely), it will be
* there, so we hardly ever traverse much.
*/
/从tail节点往前扫描AQS队列,如果发现AQS队列中的节点与当前节点相等,则说明节点一定存在与队列中
return findNodeFromTail(node);
}

4.2.2 signal()

调用此方法,将会唤醒在AQS队列中的节点

1
2
3
4
5
6
7
8
9
public final void signal() {
//判断当前线程是否获得了锁
if (!isHeldExclusively())
throw new IllegalMonitorStateException();
//AQS队列的第一个节点
Node first = firstWaiter;
if (first != null)
doSignal(first);
}
  • doSignal()方法的源码:
1
2
3
4
5
6
7
8
9
private void doSignal(Node first) {
do {
//从condition队列中移除first节点
if ( (firstWaiter = first.nextWaiter) == null)
lastWaiter = null;
first.nextWaiter = null;
} while (!transferForSignal(first) &&
(first = firstWaiter) != null);
}
  • transferForSignal()方法的源码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
final boolean transferForSignal(Node node) {
/*
* If cannot change waitStatus, the node has been cancelled.
*/
//更新节点状态为0
if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
return false;

/*
* Splice onto queue and try to set waitStatus of predecessor to
* indicate that thread is (probably) waiting. If cancelled or
* attempt to set waitStatus fails, wake up to resync (in which
* case the waitStatus can be transiently and harmlessly wrong).
*/
//调用 enq,把当前节点添加到AQS队列。并且返回返回按当前节点的上一个节点,也就是原tail 节点
Node p = enq(node);
int ws = p.waitStatus;
//如果上一个节点被取消了,尝试设置上一节点状态为SIGNAL
if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))
//唤醒节点上的线程
LockSupport.unpark(node.thread);
return true;
}
  • 阻塞:await()方法中,在线程释放锁资源之后,如果节点不在AQS等待队列,则阻塞当前线程,如果在等待队列,则自旋等待尝试获取锁;
  • 释放:signal()后,节点会从condition队列移动到AQS等待队列,则进入正常锁的获取流程。

微服务核心重新认识SpringBoot,掌握核心特性及设计思想

Spring Boot makes it easy to create stand-alone, production-grade Spring based Applications that you can “just run”.

We take an opinionated view of the Spring platform and third-party libraries so you can get started with minimum fuss. Most Spring Boot applications need minimal Spring configuration.

这是Spring官网对SpringBoot的定义和评价。

一、SpringBoot的前世今生

对于Spring框架而言,我们接触的比较多的是Spring framework中的SpringMVC、IOC、AOP、DI等。而这些框架在使用过程中需要进行大量的配置文件的编写,或者需要进行很多繁琐的配置才能完成项目的初始化搭建工作。Spring可以说它是万能胶,这样一点没错。下面我们来使用SpringMVC去构建一个Web项目,看看其步骤有多么的繁琐吧。

  • 1、创建一个项目结构(maven/gradle)
  • 2、spring的依赖,spring mvc 、servlet api的依赖
  • 3、web.xml, DispatcherServlet
  • 4、启动一个Spring mVC的配置,Dispatcher-servlet.xml
  • 5、创建一个Controller 发布一个http请求
  • 6、发布到jsp/servlet容器

1.1 SpringBoot的产生过程

2012年10月份,一个叫Mike Youngstrom(扬斯特罗姆)在Spring Jira中创建了一个功能请求,要求在Spring Framework中支持无容器Web应用程序体系结构,他谈到了在主容器引导 spring 容器
内配置 Web 容器服务。

SpringBoot刚出生的时候,引起了很多开源社区的关注,并且也有个人和企业开始尝试使用SpringBoot。 其实直到2016年,SpringBoot才真正在国内被使用起来。

1.2 到底什么是SpringBoot

SpringBoot 框架是为了能够帮助使用Spring框架的开发者快速高效的构建一个基于Spirng框架以及Spring生态体系的应用解决方案。它是对“约定优于配置”这个理念下的一个最佳实践。因此它是一个服务于框架的框架,服务的范围是简化配置文件。

什么才是约定优于配置呢?

  • 只要依赖的spring-boot-starter-web的jar,就会自动内置一个tomcat容器(替换)
    项目结构

  • 默认提供了配置文件application.properties/yml

  • starter启动依赖 - 如果是一个webstarter ,默认认为你是去构建一个spring mvc的应用.

  • EnableAutoConfiguration 默认对于依赖的 starter 进行自动装载

二、SpringBoot与微服务

那为什么Spring Cloud会采用Spring Boot来作为基础框架呢?原因很简单

  1. Spring Cloud它是关注服务治理领域的解决方案,而服务治理是依托于服务架构之上,所以它仍然需要一个承载框架;
  2. Spring Boot 可以简单认为它是一套快速配置Spring应用的脚手架,它可以快速开发单个微服务,所以Spring Cloud的版本和Spring Boot版本的兼容性有很大关联。

三、Spring注解驱动的发展过程

3.1 Spring 1.x

在SpringFramework1.x时代,其中在1.2.0是这个时代的分水岭,当时Java5刚刚发布,业界正兴起了使用Annotation的技术风,SpringFramework自然也提供了支持,比如当时已经支持了@Transactional等注解,但是这个时候,XML配置方式还是唯一选择。

3.2 Spring 2.x

Spring Framework2.x时代,2.0版本在Annotation中添加了@Required、@Repository以及AOP相关的@Aspect等注解,同时也提升了XML配置能力,也就是可扩展的XML,比如Dubbo这样的开源框架就是基于SpringXML的扩展来完美的集成Spring,从而降低了Dubbo使用的门槛。

在2.x时代,2.5版本也是这个时代的分水岭, 它引入了一些很核心的Annotation

  • @Autowired 依赖注入
  • @Qualifier 依赖查找
  • @Component、@Service 组件声明
  • @Controller、@RequestMappring等spring mvc的注解

尽管Spring 2.x时代提供了不少的注解,但是仍然没有脱离XML配置驱动,比如context:annotation-config context:componet-scan,前者的职责是注册Annotation处理器,后者是负责扫描classpath下指定包路径下被Spring模式注解标注的类,将他们注册成为Spring Bean

  • @Required
  • @Repository(Dao)
  • @Aspect

spring 2.5

  • @Component (组件)
  • @Service service(服务接口)
  • @Controller(控制器)
  • @RequetsMapping(请求映射器)

3.3 Spring 3.x

Spring Framework3.0是一个里程碑式的时代,他的功能特性开始出现了非常大的扩展,比如全面拥抱Java5、以及Spring Annotation。更重要的是,它提供了配置类注解@Configuration,它出现的首要任务就是取代XML配置方式。

实现无配置化的方式实现Bean的装配。

  • @Configuraion (去xml化)

把Bean的对象如何以便捷的方式加载到Spring IOC容器中

  • ComponentScan(扫描@Service、@Controller、@Repository)
  • Import(把多个容器配置合并在一个配置中)

Enable模块驱动
在Spring 3.1中,提供了很多以@Enable开头的注解,比如:

  • @EnableWebMvc(引入MVC框架在Spring应用中需要用到的所有的Bean)
  • @EnableScheduling(开启任务计划)
  • @EnableAutoConfiguration
  • @Bean(来声明一个bean)

3.4 Spring 4.x

@Conditional(选择性的对加载的bean进行条件过滤)

3.5 Spring 5.x

四、SpringBoot的特性

首先分析特性的时候,我们不妨从SrpingBootApplication的注解入手,看看它做了什么,首先打开注解的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@SpringBootConfiguration
@EnableAutoConfiguration
@ComponentScan(excludeFilters = {
@Filter(type = FilterType.CUSTOM, classes = TypeExcludeFilter.class),
@Filter(type = FilterType.CUSTOM,
classes = AutoConfigurationExcludeFilter.class) })
public @interface SpringBootApplication {
......

}

SpringBootApplication 本质上是由 3 个注解组成,分别是:

  • @Configuration;
  • @EnableAutoConfiguration;
  • @ComponentScan。
  • 可以直接用这三个注解也可以启动SpringBoot应用,只是每次配置三个注解比较繁琐,所以直接用一个复合注解更方便些。后面会逐一详尽分析这些注解的,这里先简单介绍一下。

4.1 EnableAutoConfiguration自动装配

打开EnableAutoConfigration注解的源码,不难发现会带有一个@Import的注解。其实所有以Enable开头的注解都会有一个@Import注解。下面来看下源码吧。

4.1.1 @Import注解

  • @EnableAutoConfiguration
1
2
3
4
5
6
7
8
9
10
11
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Documented
@Inherited
@AutoConfigurationPackage
@Import({EnableAutoConfigurationImportSelector.class})
public @interface EnableAutoConfiguration {
Class<?>[] exclude() default {};

String[] excludeName() default {};
}
  • @EnableScheduling
1
2
3
4
5
6
@Target({ElementType.TYPE})
@Retention(RetentionPolicy.RUNTIME)
@Import({SchedulingConfiguration.class})
@Documented
public @interface EnableScheduling {
}
  • @EnableWebMvc
1
2
3
4
5
6
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.TYPE})
@Documented
@Import({DelegatingWebMvcConfiguration.class})
public @interface EnableWebMvc {
}

类似于<import resource/> 形式的注解,就是把多个容器配置合并在一个配置中。可以配置三种不同的class:

  • 普通的bean或者带有@Configuration注解的bean;
  • 实现ImportSelector接口进行动态注入:
  • 实现ImportBeanDefinitionRegistror接口进行动态注入。

4.1.2 EnableAutoConfiguration分析

EnableAutoConfiguration的主要作用就是把SpringBoot中所有符合条件的@Configuration配置都加载到创建并使用的IoC容器中。

在注解源码中我们看到@Import注解中配置了EnableAutoConfigurationImportSelector这个类。

EnableAutoConfigurationImportSelector又是什么呢?

从名字上看一定是实现了ImportSelector接口,所以是基于动态bean的加载功能。来看下selectImports方法的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public String[] selectImports(AnnotationMetadata metadata) {
try {
AnnotationAttributes attributes = this.getAttributes(metadata);
List<String> configurations = this.getCandidateConfigurations(metadata, attributes);
configurations = this.removeDuplicates(configurations);
Set<String> exclusions = this.getExclusions(metadata, attributes);
configurations.removeAll(exclusions);
configurations = this.sort(configurations);
this.recordWithConditionEvaluationReport(configurations, exclusions);
return (String[])configurations.toArray(new String[configurations.size()]);
} catch (IOException var5) {
throw new IllegalStateException(var5);
}
}

此处返回的String数组,是所有类的全类名,它们都会被纳入到Spring的IoC容器中。

其实 EnableAutoConfiguration会帮助SpringBoot应用把所有符合@Configuration 配置都加载到当前SpringBoot创建的IoC容器,而这里面借助了Spring框架提供的一个工具类 SpringFactoriesLoader的支持。以及用到了Spring提供的条件注解 @Conditional,选择性的针对需要加载的 bean 进行条件过滤。

4.1.3 SpringFactoriesLoader

SpringFactoriesLoader其实和java中的SPI机制是一样的。但是不会像SPI一样一次性加载所有的类,而是根据key进行加载。其key是配置在META-INF/spring.factories配置文件中,根据key来加载对于的bean到Ioc容器中。

1
2
3
4
5
6
7
public abstract class SpringFactoriesLoader {
private static final Log logger = LogFactory.getLog(SpringFactoriesLoader.class);
public static final String FACTORIES_RESOURCE_LOCATION = "META-INF/spring.factories";

public SpringFactoriesLoader() {
}
}

SPI机制

Service provider interface
满足以下条件

  • 需要在classpath目录下创建一个 META-INF/services;

  • 在该目录下创建一个扩展点的全路径名:

    1、文件中填写这个扩展点的实现

    2、文件编码格式UTF-8

    3、ServiceLoader去进行加载

4.1.4 条件过滤Conditional的分析

通过条件过滤减少带有@Configuration注解类的数量,从而减少SpringBoot的启动时间。

@Conditional中的其它注解

  • @ConditionalOnBean(在存在某个bean的时候);
  • @ConditionalOnMissingBean(不存在某个bean的时候);
  • @ConditionalOnClass(当classpath可以找到某个类型的类时);
  • @ConditionalOnMissingClass(当classpath不能找到某个类型的类时);
  • @ConditionalOnResource(当前classpath是否存在某个资源文件);
  • @ConditionalOnProperty(当前jvm是否包含某个系统属性的某个值);
  • @ConditionalOnWebApplication(当前spring context是否是web应用程序)。

4.2 Starter

Starter相当于模块,能将模块所需要的依赖整合起来并对模块内的bean根据环境来进行自动配置。使用者只需要依赖相应功能的Starter,无需做过多的配置和依赖,SpringBoot 就能自动扫描并加载相应的模块。

  • 官方包 spring-boot-starter-xxx;
  • 第三方包 xxx-spring-boot-starter

4.3 Actuator

SpringBoot提供了spring-boot-start-actuator支持对SpringBoot应用的监控。

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-actuator</artifactId>
</dependency>

4.3.1 endpoint

通过访问地址:http://localhost:8080/actuator

可以看到非常多的 Endpoint。 有一些 Endpoint 是不能访问的,涉及到安全问题。

开启所有的endpoint:(management.endpoints.web.exposure.include=* *)

  • health(健康检查):management.endpoint.health.show-details= always
  • Loggers(日志配置信息,针对每个package 对应的日志级别)
  • beans(IoC 容器中所有的 bean)
  • Dump(获取活动线程的快照)
  • Mappings(全部的 uri 路径,以及和控制器的映射关系)
  • conditions(当前所有的条件注解)
  • shutdown(关闭应用):management.endpoint .shutdown.enabled= true,注意不要开启,比较危险
  • Env(获取全部的环境信息)

4.3.2 Health的原理分析

应用健康状态的检查应该是监控系统中最基本的需求,所以我们基于 health 来分析一下它是如何实现的。
org.springframework.boot.actuate.autoconfigure.HealthIndicatorAutoConfiguration类自动装配载入的,打开对应包下的spring.foctories文件:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
org.springframework.boot.autoconfigure.EnableAutoConfiguration=\
org.springframework.boot.actuate.autoconfigure.AuditAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.CacheStatisticsAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.CrshAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.EndpointAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.EndpointMBeanExportAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.EndpointWebMvcAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.HealthIndicatorAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.JolokiaAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.ManagementServerPropertiesAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.ManagementWebSecurityAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.MetricFilterAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.MetricRepositoryAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.MetricsDropwizardAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.MetricsChannelAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.MetricExportAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.PublicMetricsAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.TraceRepositoryAutoConfiguration,\
org.springframework.boot.actuate.autoconfigure.TraceWebFilterAutoConfiguration

org.springframework.boot.actuate.autoconfigure.ManagementContextConfiguration=\
org.springframework.boot.actuate.autoconfigure.EndpointWebMvcManagementContextConfiguration,\
org.springframework.boot.actuate.autoconfigure.EndpointWebMvcHypermediaManagementContextConfiguration

Actuator 中提供了非常多的扩展点,默认情况下提供了一些常见的服务的监控检查的支持。

  • DataSourceHealthIndicator
  • DiskSpaceHealthIndicator
  • RedisHealthIndicator

JAVA并发编程关于锁的那些事,ReentantLock的底层设计深入浅出

一、介绍JUC

java.util.concurrent是在并发编程中比较常用的工具类,里面包含很多用来在并发场景中使用的组件。比如线程池、阻塞队列、计时器、同步器、并发集合等等。

二、介绍Lock

Lock最为重要的特性就是解决并发程序的安全性问题。 在JUC大部分组件都使用了Lock,所以了解和使用Lock显得尤为重要。Lock在JUC中本质上是以一个接口的形势表现的。

我们可以从上面的图中可以看出关于锁有很多不同的实现类。下面来简单介绍一翻吧。

2.1 ReentrantLock(重入锁)

ReentrantLock实现了Lock接口,表示重入锁。是线程在获得锁之后,再次获取锁不需要阻塞,而是直接关联一次计数器增加重入次数。后面我们重点分析ReentrantLock的原理。

2.2 ReentrantReadWriteLock(重入读写锁)

ReentrantReadWriteLock实现了ReadWriteLock接口,其中有两把锁,一个ReadLock,一个WriteLock,它们分别实现了Lock接口。适合读多写少的场景。
基本原则:

  • 读和读不互斥;
  • 读和写互斥;
  • 写和写互斥。

    2.3 StampedLock(改进版读写锁)

    StampedLock是JDK1.8引进的新的锁机制,它是读写锁的一个改进版。一种乐观的读策略,使得乐观锁完全不阻塞写线程。

三、ReentrantLock设计

说到重入锁ReentrantLock,就是再次获取锁的同时,只是对重入次数进行计数,而不需要阻塞来获取锁。先来看一个案例代码,这样容易理解重入锁的概念。

我们在测试代码中调用test()方法获得了当前对象的锁,然后在这个方法中去调用test1()方法,test2()中也存在一个实例锁,这个时候当前线程无法获取test1()中的对象锁而阻塞, 这样就会产生死锁。ReentrantLock重入锁的目的就是为了避免线程产生死锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
public class ReentrantLockDemo {

public synchronized void test() {
System.out.println("Begin test...");
test1();
}

public void test1() {
System.out.println("Begin test1...");
synchronized (this) {

}
}

public static void main(String[] args) {
ReentrantLockDemo reentrantLockDemo = new ReentrantLockDemo();
new Thread(reentrantLockDemo::test).start();
}

}

3.1 ReentrantLock重入锁使用案例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
public class ReentrantLockDemo {

private static int count = 0;

static Lock lock = new ReentrantLock(true);

public static void incr() {
//线程A获取锁,计数state = 1
lock.lock();
try {
//退出线程 中断的过程往下传递. true
// sleep/ join/ wait
//while()
// ...
Thread.sleep(1);
count++;
// decr();
}catch (InterruptedException e) {
e.printStackTrace();
}
finally {
//线程A释放锁,state=1-1=0
lock.unlock();
}
}


public static void decr() {
//线程A再次获取锁,计数加1,state = 2
lock.lock();
try{
count--;
}finally {
lock.unlock();
}
}

public static void main(String[] args) throws InterruptedException {

Thread t1 = new Thread(()->{
ReentrantLockDemo.incr();
});
t1.start();
t1.interrupt();//线程中断


for(int i = 0 ; i < 1000; i++) {
new Thread(()->{
ReentrantLockDemo.incr();
}).start();
}
Thread.sleep(3000);
System.out.println("result = " + count);
}
}

3.2 ReentrantReadWriteLock重入读写锁案例

读写锁维护了一个读锁,一个写锁。一般情况下读写锁比排它锁的性能要好一些,因为大多数的场景是读多写少的。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
public class ReentrantReadWriteLockDemo {

static Map<String, Object> map = new HashMap<>();

static ReentrantReadWriteLock rrwl = new ReentrantReadWriteLock();
static Lock read = rrwl.readLock();
static Lock write = rrwl.writeLock();

public static Object get(String key) {
System.out.println("Begin reading data...");
read.lock();
try{
return map.get(key);
}finally {
read.unlock();
}
}

public static Object put(String key, Object obj) {
System.out.println("Begin writing data...");
write.lock();
try{
return map.put(key, obj);
}finally {
write.unlock();
}
}


}
  • 读锁与读锁可以共享;
  • 读锁与写锁不可以共享(排他);
  • 写锁与写锁不可以共享(排他。

3.3 ReentrantLock的实现原理

我们在Synchronized中分析了偏向锁、轻量级锁、重量级锁。它们是基于乐观锁以及自旋锁来优化synchronized加锁的开销,在重量级锁阶段是通过线程的阻塞以及唤醒来达到线程竞争和同步的目的。

那么在ReentrantLock也一定存在这样的问题,那么它是怎么去解决的呢?这里我们需要引入AQS(AbstractQueueSynchronizer)。

3.3.1 什么是AQS

在Lock中,AQS是一个同步队列,它是一个同步工具,也是Lock用来实现线程同步的核心组件。

3.3.2 AQS的独占锁和共享锁

  • 独占锁:每次只有一个线程持有锁,ReentrantLock的独占锁方式;
  • 共享锁:允许多个线程同时获得锁,并访问共享资源,ReentrantReadWriteLock的共享锁方式。

3.3.3 AQS的内部实现

AQS内部维护的是一个FIFO的双向链表,这种数据结构的特点就是有两个指针,分别指向直接的后继节点next和直接的前驱节点prev。当线程抢占锁失败后,会封装成一个Node直接放入到AQS阻塞队列中。

3.3.4 AQS中的Node

先上AQS中的源码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
static final class Node {
/** Marker to indicate a node is waiting in shared mode */
static final Node SHARED = new Node();
/** Marker to indicate a node is waiting in exclusive mode */
static final Node EXCLUSIVE = null;

/** waitStatus value to indicate thread has cancelled */
static final int CANCELLED = 1;
/** waitStatus value to indicate successor's thread needs unparking */
static final int SIGNAL = -1;
/** waitStatus value to indicate thread is waiting on condition */
static final int CONDITION = -2;
/**
* waitStatus value to indicate the next acquireShared should
* unconditionally propagate
*/
static final int PROPAGATE = -3;

/**
* Status field, taking on only the values:
* SIGNAL: The successor of this node is (or will soon be)
* blocked (via park), so the current node must
* unpark its successor when it releases or
* cancels. To avoid races, acquire methods must
* first indicate they need a signal,
* then retry the atomic acquire, and then,
* on failure, block.
* CANCELLED: This node is cancelled due to timeout or interrupt.
* Nodes never leave this state. In particular,
* a thread with cancelled node never again blocks.
* CONDITION: This node is currently on a condition queue.
* It will not be used as a sync queue node
* until transferred, at which time the status
* will be set to 0. (Use of this value here has
* nothing to do with the other uses of the
* field, but simplifies mechanics.)
* PROPAGATE: A releaseShared should be propagated to other
* nodes. This is set (for head node only) in
* doReleaseShared to ensure propagation
* continues, even if other operations have
* since intervened.
* 0: None of the above
*
* The values are arranged numerically to simplify use.
* Non-negative values mean that a node doesn't need to
* signal. So, most code doesn't need to check for particular
* values, just for sign.
*
* The field is initialized to 0 for normal sync nodes, and
* CONDITION for condition nodes. It is modified using CAS
* (or when possible, unconditional volatile writes).
*/
volatile int waitStatus;

/**
* Link to predecessor node that current node/thread relies on
* for checking waitStatus. Assigned during enqueuing, and nulled
* out (for sake of GC) only upon dequeuing. Also, upon
* cancellation of a predecessor, we short-circuit while
* finding a non-cancelled one, which will always exist
* because the head node is never cancelled: A node becomes
* head only as a result of successful acquire. A
* cancelled thread never succeeds in acquiring, and a thread only
* cancels itself, not any other node.
*/
volatile Node prev;//前驱节点

/**
* Link to the successor node that the current node/thread
* unparks upon release. Assigned during enqueuing, adjusted
* when bypassing cancelled predecessors, and nulled out (for
* sake of GC) when dequeued. The enq operation does not
* assign next field of a predecessor until after attachment,
* so seeing a null next field does not necessarily mean that
* node is at end of queue. However, if a next field appears
* to be null, we can scan prev's from the tail to
* double-check. The next field of cancelled nodes is set to
* point to the node itself instead of null, to make life
* easier for isOnSyncQueue.
*/
volatile Node next;//后继节点

/**
* The thread that enqueued this node. Initialized on
* construction and nulled out after use.
*/
volatile Thread thread;//当前线程

/**
* Link to next node waiting on condition, or the special
* value SHARED. Because condition queues are accessed only
* when holding in exclusive mode, we just need a simple
* linked queue to hold nodes while they are waiting on
* conditions. They are then transferred to the queue to
* re-acquire. And because conditions can only be exclusive,
* we save a field by using special value to indicate shared
* mode.
*/
Node nextWaiter;//存储在condition队列中的后继节点

/**
* Returns true if node is waiting in shared mode.
*/
//是否为共享锁
final boolean isShared() {
return nextWaiter == SHARED;
}

/**
* Returns previous node, or throws NullPointerException if null.
* Use when predecessor cannot be null. The null check could
* be elided, but is present to help the VM.
*
* @return the predecessor of this node
*/
final Node predecessor() throws NullPointerException {
Node p = prev;
if (p == null)
throw new NullPointerException();
else
return p;
}

Node() { // Used to establish initial head or SHARED marker
}
//将线程组装成一个Node,添加到队列中
Node(Thread thread, Node mode) { // Used by addWaiter
this.nextWaiter = mode;
this.thread = thread;
}
//在condition队列中进行使用
Node(Thread thread, int waitStatus) { // Used by Condition
this.waitStatus = waitStatus;
this.thread = thread;
}
}

四、ReentrantLock的源码分析

4.1 画出ReentrantLock时序图

4.2 ReentrantLock.lock()

源码如下:

1
2
3
public void lock() {
sync.lock();
}


根据源码可以看到具体的实现,分别是FairSync(公平)和NonFairSync(非公平)两个类。

  • FairSync:所有线程严格按照FIFO规则获取锁;
  • NonFairSync:可以存在抢占锁的功能,不管队列上是否存在其他线程等待,新线程都有机会抢占锁。

4.3 NonFairSync.lock()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
static final class NonfairSync extends Sync {
private static final long serialVersionUID = 7316153563782823691L;

/**
* Performs lock. Try immediate barge, backing up to normal
* acquire on failure.
*/
final void lock() {
//对于非公平锁,一开始就CAS抢占一下
//如果CAS成功了,就表示获得了锁
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else//如果CAS失败了,调用acquire()方法走竞争逻辑
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
}

4.3.1 CAS的实现原理

1
2
3
4
protected final boolean compareAndSetState(int expect, int update) {
// See below for intrinsics setup to support this
return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

CAS 就是 Unsafe 类中提供的一个原子操作。

  • var1:需要改变的对象;
  • var2:偏移量(headOffset的值);
  • var4:期待的值;
  • var5:更新后的值。

整个方法更新成功返回true,失败则返回false。

state是AQS中的一个属性,对于重入锁(ReentrantLock)而言,它表示一个同步状态。有两层含义:

  • 当state=0时,表示无锁状态;
  • 当state>0时,表示线程获得了锁,state+1,重入多少次数,state会递增;而当锁释放的时候,state次数递减,直到state=0其它线程才有资格抢占锁

接下来我们来看unsafe.cpp文件中最终执行的源码方法吧。

1
2
3
4
5
6
7
8
9
UNSAFE_ENTRY(jboolean, Unsafe_CompareAndSwapInt(JNIEnv *env, jobject unsafe, jobject obj, jlong offset, jint e, jint x))
UnsafeWrapper("Unsafe_CompareAndSwapInt");
//将Java对象解析成JVM的oop
oop p = JNIHandles::resolve(obj);
//根据对象p和地址偏移量找到地址
jint* addr = (jint *) index_oop_from_field_offset_long(p, offset);
//基于 cas 比较并替换, x 表示需要更新的值,addr 表示 state 在内存中的地址,e 表示预期值
return (jint)(Atomic::cmpxchg(x, addr, e)) == e;
UNSAFE_END

4.3.2 Unsafe类

属于sun.misc包,不属于Java标准。但是很多 Java 的基础类库,包
括一些被广泛使用的高性能开发库都是基于 Unsafe 类开发的,比如Netty、
Hadoop、Kafka
等;

Unsafe 可认为是 Java 中留下的后门,提供了一些低层次操作,如直接内存访问、
线程的挂起和恢复、CAS、线程同步、内存屏障等。

4.4 AQS.acquire()

从下面源码分析来看,如果CAS未能操作成功,说明state已经不等于0了,此时需要执行acquire(1)方法。

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

接着看acquire(1)方法的源码吧。

1
2
3
4
5
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
  • 尝试使用tryAcquire(arg)获得独占锁,如果成功返回true,失败返回false;
  • 如果tryAcquire失败,则通过addWaiter方法将当前线程封装成Node对象加入到AQS队列尾部;
  • acquireQueued,将Node作为参数,通过自旋的方式获得锁,下面是对于的源码。
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {//自旋方式获得锁
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}

4.4.1 NonfairSync.tryAcquire()

这个方法的作用是尝试获取锁,如果成功返回 true,不成功返回 false。

1
2
3
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}

下面来看此方法的具体实现:

  • 获得当前线程,判断当前锁的状态;
  • 如果state=0表示无锁状态,通过CAS更新state状态的值;
  • 如果当前线程属于重入,则增加重入次数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}

4.5 AQS.addWaiter()

当tryAcquire()获取锁失败时,则会调用此方法来将当前线程封装成Node对象加入到AQS队列尾部。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
//tail表示AQS队列的尾部,默认为null
Node pred = tail;
if (pred != null) {
//当前线程的prev执行tail
node.prev = pred;
//通过CAS把node加入到队列中,并设置为tail
if (compareAndSetTail(pred, node)) {
//设置成功后,把tail节点的next指向当前node
pred.next = node;
return node;
}
}
//tail为null时,把node加入到同步队列
enq(node);
return node;
}

enq(node)方法通过自旋的方式,把当前节点node加入到同步队列中去,下面看一下enq源码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private Node enq(final Node node) {
for (;;) {
//将新的节点prev指向tail
Node t = tail;
if (t == null) { // Must initialize
//通过CAS将tail设置为新的节点
if (compareAndSetHead(new Node()))
tail = head;
} else {
node.prev = t;
if (compareAndSetTail(t, node)) {
//将原来的tail的next节点指向新的节点
t.next = node;
return t;
}
}
}
}

4.6 AQS.acquireQueued()

通过 addWaiter 方法把线程添加到链表后,会接着把 Node 作为参数传递给
acquireQueued 方法,去竞争锁。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
//线程中断标记
boolean interrupted = false;
for (;;) {
//获得当前节点的prev节点
final Node p = node.predecessor();
//如果是head节点,说明有资格去抢占锁
if (p == head && tryAcquire(arg)) {
//获取锁成功,线程A已经释放了锁,然后设置head为线程B获得执行权限
setHead(node);
//把原来的head节点从链表中移除,弱引用
p.next = null; // help GC
failed = false;
return interrupted;
}
//线程A可能还没释放锁,使得线程B在执行tryAcquire时返回false
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
//当前线程在等待过程中有没有中断
interrupted = true;
}
} finally {
//取消锁的操作
if (failed)
cancelAcquire(node);
}
}

4.6.1 shouldParkAfterFailedAcquire()

线程A的锁可能还没释放,那么此时线程B来抢占锁肯定失败,就会调用此方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
//前置节点
int ws = pred.waitStatus;
//如果前置节点为 SIGNAL,意味着只需要等待其他前置节点的线程被释放
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
//返回true,可以放心挂起了
return true;
//ws 大于 0,意味着 prev 节点取消了排队,直接移除这个节点就行
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
//相当于: pred=pred.prev;node.prev=pred;
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);//这里采用循环,从双向列表中移除 CANCELLED 的节点
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
//利用 cas 设置 prev 节点的状态为 SIGNAL(-1)
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}

Node的状态有5种,默认状态是0,以下是其它四种状态:

1
2
3
4
5
6
7
8
//在同步队列中等待的线程等待超时或被中断,需要从同步队列中取消该 Node 的结点, 其结点的 waitStatus为CANCELLED,即结束状态,进入该状态后的结点将不会再变化
static final int CANCELLED = 1;
//只要前置节点释放锁,就会通知标识为 SIGNAL 状态的后续节点的线程
static final int SIGNAL = -1;
//表示该线程在condition队列中阻塞
static final int CONDITION = -2;
//共享模式下,PROPAGATE 状态的线程处于可运行状态
static final int PROPAGATE = -3;

4.6.2 parkAndCheckInterrupt()

使用LockSupport.park(this)挂起当前线程为WAITING状态

1
2
3
4
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}

Thread.interrupted,返回当前线程是否被其他线程触发过中断请求,也就是
thread.interrupt(); 如果有触发过中断请求,那么这个方法会返回当前的中断标识
true,并且对中断标识进行复位标识已经响应过了中断请求。如果返回 true,意味
着在 acquire 方法中会执行 selfInterrupt()。

4.6.3 selfInterrupt()

当前线程在acquireQueued中被中断过,则需要产生一个中断请求,原因是线程在调用acquireQueued方法的时候不会响应中断请求。

1
2
3
static void selfInterrupt() {
Thread.currentThread().interrupt();
}

4.6.4 LockSupport

从Java6开始引用的一个提供了基本的线程同步原语的类,LockSupport本质还是调用了Unsafe中的方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
UNSAFE_ENTRY(void, Unsafe_Unpark(JNIEnv *env, jobject unsafe, jobject jthread))
UnsafeWrapper("Unsafe_Unpark");
Parker* p = NULL;
if (jthread != NULL) {
oop java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
jlong lp = java_lang_Thread::park_event(java_thread);
if (lp != 0) {
// This cast is OK even though the jlong might have been read
// non-atomically on 32bit systems, since there, one word will
// always be zero anyway and the value set is always the same
p = (Parker*)addr_from_java(lp);
} else {
// Grab lock if apparently null or using older version of library
MutexLocker mu(Threads_lock);
java_thread = JNIHandles::resolve_non_null(jthread);
if (java_thread != NULL) {
JavaThread* thr = java_lang_Thread::thread(java_thread);
if (thr != NULL) {
p = thr->parker();
if (p != NULL) { // Bind to Java thread for next time.
java_lang_Thread::set_park_event(java_thread, addr_to_java(p));
}
}
}
}
}
}
if (p != NULL) {
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__unpark, p);
#else /* USDT2 */
HOTSPOT_THREAD_UNPARK(
(uintptr_t) p);
#endif /* USDT2 */
p->unpark();
}
UNSAFE_END
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
UNSAFE_ENTRY(void, Unsafe_Park(JNIEnv *env, jobject unsafe, jboolean isAbsolute, jlong time))
UnsafeWrapper("Unsafe_Park");
EventThreadPark event;
#ifndef USDT2
HS_DTRACE_PROBE3(hotspot, thread__park__begin, thread->parker(), (int) isAbsolute, time);
#else /* USDT2 */
HOTSPOT_THREAD_PARK_BEGIN(
(uintptr_t) thread->parker(), (int) isAbsolute, time);
#endif /* USDT2 */
JavaThreadParkedState jtps(thread, time != 0);
thread->parker()->park(isAbsolute != 0, time);
#ifndef USDT2
HS_DTRACE_PROBE1(hotspot, thread__park__end, thread->parker());
#else /* USDT2 */
HOTSPOT_THREAD_PARK_END(
(uintptr_t) thread->parker());
#endif /* USDT2 */
if (event.should_commit()) {
oop obj = thread->current_park_blocker();
event.set_klass((obj != NULL) ? obj->klass() : NULL);
event.set_timeout(time);
event.set_address((obj != NULL) ? (TYPE_ADDRESS) cast_from_oop<uintptr_t>(obj) : 0);
event.commit();
}
UNSAFE_END

4.7 ReentrantLock.unlock()


在unlock()方法中,会调用release()方法来释放锁:

1
2
3
public void unlock() {
sync.release(1);
}
1
2
3
4
5
6
7
8
9
10
11
12
public final boolean release(int arg) {
//释放锁成功
if (tryRelease(arg)) {
//得到AQS队列中的head节点
Node h = head;
//如果head不为空并且状态不等于0,调用unpark唤醒后续节点
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}

4.7.1 tryRelease()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
protected final boolean tryRelease(int releases) {
//state状态减掉传入的参数1
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
//如果结果为0,将排它锁的Owner设置为null
//解锁的时候减掉 1,同一个锁,在可以重入后,可能会被叠加为 2、3、4 这些值,只有 unlock()的次数与 lock()的次数对应才会将 Owner 线程设置为空,而且也只有这种情况下才会返回 true
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}

4.7.2 unparkSuccessor()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
//获得head节点的状态
int ws = node.waitStatus;
if (ws < 0)
//设置head节点的状态为0
compareAndSetWaitStatus(node, ws, 0);

/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
//得到head节点的下一个节点
Node s = node.next;
//如果下一个节点为 null 或者 status>0 表示 cancelled 状态
if (s == null || s.waitStatus > 0) {
s = null;
//通过从尾部节点开始扫描,找到距离 head 最近的一个waitStatus<=0 的节点
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
//next 节点不为空,直接唤醒这个线程即可
if (s != null)
LockSupport.unpark(s.thread);
}

4.7.3 为什么释放锁的时候是从tail节点开始扫描的?

我们在加锁的enq()方法中,在 cas 操作之后,t.next=node 操作之前。 存在其他线程调用 unlock 方法从 head开始往后遍历,由于 t.next=node 还没执行意味着链表的关系还没有建立完整。就会导致遍历到 t 节点的时候被中断。所以从后往前遍历,一定不会存在这个问题。

4.8 原本挂起的线程如何执行呢?

通过ReentrantLock.unlock()将原本挂起的线程换唤醒后继续执行,原来被挂起的线程是在 acquireQueued() 方法中,所以被唤醒以后继续从这个方法开始执行.

五、公平锁与非公平锁的区别

锁的公平性是相对于获取锁的顺序而言的,如果是一个公平锁,那么锁的获取顺序
就应该符合请求的绝对时间顺序,也就是 FIFO。 只要CAS设置同步状态成功,则表示当前线程获取了锁,而公平锁则不一样,差异点
有两个:

1、FairSync.lock()方法

1
2
3
final void lock() {
acquire(1);
}

2、NonfairSync.lock()方法

非公平锁在获取锁的时候,会先通过 CAS 进行抢占,而公平锁则不会。

1
2
3
4
5
6
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

JAVA并发编程阶段总结篇,解决死锁问题以及ThreadLocal原理分析

一、线程的死锁问题

首先来看下死锁的概念吧:一组相互竞争的线程因为相互等待,造成“永久”阻塞的现象,我们称之为死锁;那么有死锁必然就有活锁了,什么是活锁呢?即任务或者执行者都没有被阻塞,由于某些条件未被满足,一直处于重试->尝试执行->执行失败的过程被成为活锁。

1.1 死锁发生的条件

只要满足以下四个条件,就必然会产生死锁:

  • 线程互斥,共享资源只能被一个线程占用,要么线程A要么线程B(有一个坑位,谁抢到就是谁的);
  • 占有且等待,线程T已经获得资源A,在同时等待资源B的时候,不释放资源A(占着茅坑等送纸);
  • 不可抢占,其它线程不能强制抢占线程T占有的资源(有且仅有的坑位被占,不能马上赶走别人);
  • 循环等待,线程T1等待线程T2占有的资源,线程T2等待线程T2占有的资源(我惦记着你的,你惦记着我的)。

1.2 如何解决死锁问题

针对上面的发生死锁的四个条件,只需要破坏其中的一个条件,就不会发生死锁。

  • 互斥条件无法破坏,因为使用锁(lock或synchronized)就是互斥的;
  • 占有且等待,一次性申请所有的资源,就不存在等待了;
  • 不可抢占,占有资源的线程如果需要申请其它资源的时候,可以主动释放占有的资源;
  • 循环等待,可以有线性顺序的方式来申请资源。从序号小的开始,然后再接着申请序号大的资源。

1.3 Thread.join

它的作用就是让线程的执行结果对后续线程的访问可见。

二、 ThreadLocal原理分析

ThreadLocal实际上是一种线程的隔离机制,就是为了保证在多线程环境下对于共享变量的访问的安全性。

2.1 set()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
 /**
* Sets the current thread's copy of this thread-local variable
* to the specified value. Most subclasses will have no need to
* override this method, relying solely on the {@link #initialValue}
* method to set the values of thread-locals.
*
* @param value the value to be stored in the current thread's copy of
* this thread-local.
*/
public void set(T value) {
Thread t = Thread.currentThread();
ThreadLocalMap map = getMap(t);
if (map != null)
map.set(this, value);
else
createMap(t, value);
}

当map不为空时,执行map.set(this, value)方法,代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
private void set(ThreadLocal<?> key, Object value) {

// We don't use a fast path as with get() because it is at
// least as common to use set() to create new entries as
// it is to replace existing ones, in which case, a fast
// path would fail more often than not.

Entry[] tab = table;
int len = tab.length;
//根据哈希码和数组长度求得元素的放置位置,即Entry数组的下标
int i = key.threadLocalHashCode & (len-1);
//从i开始遍历到数组的最后一个Entry(进行线性探索)
for (Entry e = tab[i];
e != null;
e = tab[i = nextIndex(i, len)]) {
ThreadLocal<?> k = e.get();
//如果key相等,就覆盖value
if (k == key) {
e.value = value;
return;
}
//如果key为空,用新的key,value,同时清理历史key=null(弱引用)的旧数据
if (k == null) {
replaceStaleEntry(key, value, i);
return;
}
}

tab[i] = new Entry(key, value);
int sz = ++size;
//如果超过设置的閥值,则需要进行扩容
if (!cleanSomeSlots(i, sz) && sz >= threshold)
rehash();
}

2.2 线性探测

在上面的源码中使用了线性探测的方式来解决hash冲突问题。

那么什么是线性探测呢?

线性探测是一种开放的寻址策略。hash表是直接通过key访问数据结构的,通过hash函数来把key映射到hash表中的一个位置的访问记录,从而加速查找的速度。存储记录的就叫hash表(也成为散列表)。

由两种方式情况解决这个冲突问题:

  • 写入:找到发生冲突的最近单元
  • 查找:从发生冲突的位置开始,往后查找

通俗的解释是这样子的:我们去蹲坑的时候发现坑位被占,就找后面的一个坑,如果后面的这个坑空着,那么就占用;如果后面的坑被占用,则一直往后面的坑位遍历,直到找到空闲的坑位为止,否则就一直憋着。

2.3 repalceStaleEntry()方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
private void replaceStaleEntry(ThreadLocal<?> key, Object value,
int staleSlot) {
Entry[] tab = table;
int len = tab.length;
Entry e;

// Back up to check for prior stale entry in current run.
// We clean out whole runs at a time to avoid continual
// incremental rehashing due to garbage collector freeing
// up refs in bunches (i.e., whenever the collector runs).
//向前扫描,查找最前一个无效的slot
int slotToExpunge = staleSlot;
for (int i = prevIndex(staleSlot, len);
(e = tab[i]) != null;
i = prevIndex(i, len))
if (e.get() == null)
//通过循环遍历,可以定位到最前面的一个无效的slot
slotToExpunge = i;

// Find either the key or trailing null slot of run, whichever
// occurs first
//从i开始遍历到数组的最后一个Entry(进行线性探索)
for (int i = nextIndex(staleSlot, len);
(e = tab[i]) != null;
i = nextIndex(i, len)) {
ThreadLocal<?> k = e.get();

// If we find key, then we need to swap it
// with the stale entry to maintain hash table order.
// The newly stale slot, or any other stale slot
// encountered above it, can then be sent to expungeStaleEntry
// to remove or rehash all of the other entries in run.
//找到匹配的key
if (k == key) {
//更新对应的slot对应的value
e.value = value;
//与无效的slot进行替换
tab[i] = tab[staleSlot];
tab[staleSlot] = e;

// Start expunge at preceding stale entry if it exists
////如果最早的一个无效的slot和当前的staleSlot相等,则从i作为清理的起点
if (slotToExpunge == staleSlot)
slotToExpunge = i;
//从slotToExpunge开始做一次连续的清理
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
return;
}

// If we didn't find stale entry on backward scan, the
// first stale entry seen while scanning for key is the
// first still present in the run.
//如果当前的slot已经无效,并且向前扫描过程中没有无效slot,则更新slotToExpunge为当前位置
if (k == null && slotToExpunge == staleSlot)
slotToExpunge = i;
}

// If key not found, put new entry in stale slot
//如果key对应的value在entry中不存在,则直接放一个新的entry
tab[staleSlot].value = null;
tab[staleSlot] = new Entry(key, value);

// If there are any other stale entries in run, expunge them
//如果有任何一个无效的slot,则做一次清理
if (slotToExpunge != staleSlot)
cleanSomeSlots(expungeStaleEntry(slotToExpunge), len);
}

2.4 斐波那契额散列(Fibonacci散列算法)

下面还是给一段ThreadLocal的源码:

1
2
3
4
5
6
7
8
9
//HASH_INCREMENT是为了让哈希码能均匀的分布在2的N次方的数组里
private static final int HASH_INCREMENT = 0x61c88647;

/**
* Returns the next hash code.
*/
private static int nextHashCode() {
return nextHashCode.getAndAdd(HASH_INCREMENT);
}

其中定义了一个魔法值 HASH_INCREMENT = 0x61c88647,对于实例变量threadLocalHashCode,每当创建ThreadLocal实例时这个值都会getAndAdd(0x61c88647)

HASH_INCREMENT 再借助一定的算法,就可以将哈希码能均匀的分布在 2 的 N 次方的数组里,保证了散列表的离散度,从而降低了冲突几率。我们不妨来写段测试代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public class FibonacciHash {

private static final int HASH_INCREMENT = 0x61c88647;

public static void main(String[] args) {
magicHash(16);
magicHash(32);
}

private static void magicHash(int size) {
int hashCode = 0;
for (int i = 0; i < size; i++) {
hashCode = i * HASH_INCREMENT + HASH_INCREMENT;
System.out.print((hashCode & (size - 1)) + " ");
}
System.out.println("");
}

}

执行main()方法的结果:

1
2
7 14 5 12 3 10 1 8 15 6 13 4 11 2 9 0 
7 14 21 28 3 10 17 24 31 6 13 20 27 2 9 16 23 30 5 12 19 26 1 8 15 22 29 4 11 18 25 0

产生的哈希码分布确实是很均匀,而且没有任何冲突。

JAVA并发编程递进篇,探索线程安全性volatile关键字如何保证可见性

一开始就直接上代码,直接来看一段木有使用volatile关键字的线程调用代码吧:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class VolatileDemo {
public static boolean stop = false;
public static void main(String[] args) throws InterruptedException {
Thread t = new Thread(()->{
int i = 0;
while(!stop) {
i++;
//System.out.println("result:" + i);
/*
try {
Thread.sleep(0);
} catch (InterruptedException e) {
e.printStackTrace();
}
*/
}
},"myThread");
t.start();
Thread.sleep(1000);
stop=true;
}

}

很显然运行main()方法后,循环并没有结束,程序一直处于运行状态。

如果我们要使得循环结束该怎么做呢?

一、Volatile关键字的使用递进

1.1 System.out.println

使用print打印i的值,发现循环就被终止了。这是为什么呢?我们不妨来看下println()方法的源码吧。

1
2
3
4
5
6
public void println(String x) {
synchronized (this) {
print(x);
newLine();
}
}

底层方法使用synchronized关键字,这个同步会防止循环期间对变量stop的值缓存。

从IO角度来说,print本质上是一个IO的操作,我们知道磁盘IO的效率一定要比CPU的计算效率慢得多,所以IO可以使得CPU有时间去做内存刷新的事情,从而导致这个现象。比如我们可以在里面定义一个new File()。同样会达到效果。

1.2 Thread.sleep(0)

增加Thread.sleep(0)也能生效,是和cpu、以及jvm、操作系统等因素有关系。

官方文档上是说,Thread.sleep没有任何同步语义,编译器不需要在调用Thread.sleep之前把缓存在寄存器中的写刷新到给共享内存、也不需要在Thread.sleep之后重新加载缓存在寄存器中的值。

编译器可以自由选择读取stop的值一次或者多次,这个是由编译器自己来决定的。
Thread.sleep(0)导致线程切换,线程切换会导致缓存失效从而读取到了新的值。

1.3 Volatile关键字

1
public volatile static boolean stop = false;

我们在stop变量加上volatile关键字进行修饰,可以查看汇编指令,使用HSDIS工具进行查看。

  • 在IDEA中加入VM options:
    1
    -server -Xcomp -XX:+UnlockDiagnosticVMOptions -XX:+PrintAssembly -XX:CompileCommand=compileonly,*VolatileDemo.*
    运行程序后,在输出的结果中,查找下 lock 指令,会发现,在修改带有volatile 修饰的成员变量时,会多一个 lock 指令。
1
2
3
4
0x00000000034e49f3: lock add dword ptr [rsp],0h  ;*putstatic stop
; - com.sy.sa.thread.VolatileDemo::<clinit>@1 (line 5)
0x00000000034e4643: lock add dword ptr [rsp],0h ;*putstatic stop
; - com.sy.sa.thread.VolatileDemo::<clinit>@1 (line 5)

运行加了volatile关键字的代码,发现中多了lock汇编指令。那么lock指令是怎么保证可见性的呢?

1.3.1 什么是可见性?

在单线程的环境下,如果向一个变量先写入一个值,然后在没有写干涉的情况下读取这个变量的值,那这个时候读取到的这个变量的值应该是之前写入的那个值。这本来是一个很正常的事情。但是在多线程环境下,读和写发生在不同的线程中的时候,可能会出现:读线程不能及时的读取到其他线程写入的最新的值。这就是所谓的可见性

1.3.2 硬件方面了解可见性本质

硬件方面将从CPU、内存、磁盘I/O 三方面着手。

1.3.2.1 CPU的高速缓存

因为高速缓存的存在,会导致一个缓存一致性问题。

1.3.2.2 总线锁和缓存锁

总线锁,简单来说就是,在多cpu下,当其中一个处理器要对共享内存进行操作的时候,在总线上发出一个LOCK#信号,这个信号使得其他处理器无法通过总线来访问到共享内存中的数据,总线锁定把CPU和内存之间的通信锁住了,这使得锁定期间,其他处理器不能操作其他内存地址的数据,所以总线锁定的开销比较大,这种机制显然是不合适的 。

如何优化呢?最好的方法就是控制锁的保护粒度,我们只需要保证对于被多个CPU缓存的同一份数据是一致的就行。在P6架构的CPU后,引入了缓存锁,如果当前数据已经被CPU缓存了,并且是要协会到主内存中的,就可以采用缓存锁来解决问题。

所谓的缓存锁,就是指内存区域如果被缓存在处理器的缓存行中,并且在Lock期间被锁定,那么当它执行锁操作回写到内存时,不再总线上加锁,而是修改内部的内存地址,基于缓存一致性协议来保证操作的原子性。

总线锁和缓存锁怎么选择,取决于很多因素,比如CPU是否支持、以及存在无法缓存的数据时(比较大或者快约多个缓存行的数据),必然还是会使用总线锁。

1.3.2.3 缓存一致性

MSI ,MESI 、MOSI …
为了达到数据访问的一致,需要各个处理器在访问缓存时遵循一些协议,在读写时根据协议来操作,常见的协议有MSI,MESI,MOSI等。最常见的就是MESI协议。接下来给大家简单讲解一下MESIMESI表示缓存行的四种状态,分别是:

  • M(Modify): 表示共享数据只缓存在当前CPU缓存中,并且是被修改状态,也就是缓存的数据和主内存中的数据不一致;
  • E(Exclusive): 表示缓存的独占状态,数据只缓存在当前CPU缓存中,并且没有被修改;
  • S(Shared): 表示数据可能被多个CPU缓存,并且各个缓存中的数据和主内存数据一致;
  • I(Invalid): 表示缓存已经失效。

1.3.2.4 MESI带来的优化

各CPU通过消息传递来更新各个缓存行的状态。在CPU中引入了Store Bufferes。

CPU0 只需要在写入共享数据时,直接把数据写入到 store bufferes 中,同时发送 invalidate 消息,然后继续去处理其他指令。
当收到其他所有CPU发送了invalidate acknowledge消息时,再将 store bufferes 中的数据数据存储至 cache line中。最后再从缓存行同步到主内存。

指令重排序

来关注下面这段代码,假设分别有两个线程,分别执行executeToCPU0和executeToCPU1,分别由两个不同的CPU来执行。引入Store Bufferes之后,就可能出现 b==1返回true ,但是assert(a==1)返回false。很多肯定会表示不理解,这种情况怎么可能成立?那接下来我们去分析一下,写一段伪代码吧。

1
2
3
4
5
6
7
8
9
executeToCPU0(){
  a=1;
  b=1;
}
executeToCPU1(){
  while(b==1){
    assert(a==1);
 }
}

通过内存屏障禁止了指令重排序

X86的memory barrier指令包括lfence(读屏障) sfence(写屏障) mfence(全屏障)

  • Store Memory Barrier(写屏障):告诉处理器在写屏障之前的所有已经存储在存储缓存(store bufferes)中的数据同步到主内存,简单来说就是使得写屏障之前的指令的结果对屏障之后的读或者写是可见的
  • Load Memory Barrier(读屏障):处理器在读屏障之后的读操作,都在读屏障之后执行。配合写屏障,使得写屏障之前的内存更新对于读屏障之后的读操作是可见的
  • Full Memory Barrier(全屏障):确保屏障前的内存读写操作的结果提交到内存之后,再执行屏障后的读写操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
volatile int a=0;
executeToCpu0(){
  a=1;
  //storeMemoryBarrier()写屏障,写入到内存
  b=1;
 
 // CPU层面的重排序
  //b=1;
  //a=1;
}
executeToCpu1(){
  while(b==1){  //true
    loadMemoryBarrier(); //读屏障
    assert(a==1) //false
 }
}

1.3.3 软件方面了解可见性本质

1.3.3.1 JMM(Java内存模型)

简单来说,JMM定义了共享内存中多线程程序读写操作的行为规范:在虚拟机中把共享变量存储到内存以及从内存中取出共享变量的底层实现细节。通过这些规则来规范对内存的读写操作从而保证指令的正确性,解决了CPU多级缓存、处理器优化、指令重排序导致的内存访问问题,保证了并发场景下的可见性

需要注意的是,JMM并没有主动限制执行引擎使用处理器的寄存器和高速缓存来提升指令执行速度,也没主动限制编译器对于指令的重排序,也就是说在JMM这个模型之上,仍然会存在缓存一致性问题和指令重排序问题。JMM是一个抽象模型,它是建立在不同的操作系统和硬件层面之上对问题进行了统一的抽象,然后再Java层面提供了一些高级指令,让用户选择在合适的时候去引入这些高级指令来解决可见性问题。

1.3.3.2 JMM解决可见性有序性

其实通过前面的内容分析我们发现,导致可见性问题有两个因素,一个是高速缓存导致的可见性问题,另一个是指令重排序。那JMM是如何解决可见性和有序性问题的呢?其实前面在分析硬件层面的内容时,已经提到过了,对于缓存一致性问题,有总线锁和缓存锁,缓存锁是基于MESI协议。而对于指令重排序,硬件层面提供了内存屏障指令。

而JMM在这个基础上提供了volatile、final等关键字,使得开发者可以在合适的时候增加相应相应的关键字来禁止高速缓存禁止指令重排序来解决可见性和有序性问题。

1.3.3.3 Volatile底层的原理

通过javap -v VolatileDemo.class 分析汇编指令。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public static volatile boolean stop;
 descriptor: Z
 flags: ACC_PUBLIC, ACC_STATIC, ACC_VOLATILE
int field_offset = cache->f2_as_index();
     if (cache->is_volatile()) {
      if (tos_type == itos) {
       obj->release_int_field_put(field_offset, STACK_INT(-1));
     } else if (tos_type == atos) {
       VERIFY_OOP(STACK_OBJECT(-1));
       obj->release_obj_field_put(field_offset, STACK_OBJECT(-1));
       OrderAccess::release_store(&BYTE_MAP_BASE[(uintptr_t)obj >>
CardTableModRefBS::card_shift], 0);
     } else if (tos_type == btos) {
       obj->release_byte_field_put(field_offset, STACK_INT(-1));
     } else if (tos_type == ltos) {
       obj->release_long_field_put(field_offset, STACK_LONG(-1));
     } else if (tos_type == ctos) {
       obj->release_char_field_put(field_offset, STACK_INT(-1));
     } else if (tos_type == stos) {
       obj->release_short_field_put(field_offset, STACK_INT(-1));
     } else if (tos_type == ftos) {
       obj->release_float_field_put(field_offset, STACK_FLOAT(-1));
     } else {
       obj->release_double_field_put(field_offset, STACK_DOUBLE(-1));
     }
      OrderAccess::storeload();
    }

1.3.4 Happens-Before模型

除了显示引用volatile关键字能够保证可见性以外,在Java中,还有很多的可见性保障的规则。

从JDK1.5开始,引入了一个happens-before的概念来阐述多个线程操作共享变量的可见性问题。所以我们可以认为在JMM中,如果一个操作执行的结果需要对另一个操作可见,那么这两个操作必须要存在happens-before关系。这两个操作可以是同一个线程,也可以是不同的线程。

1.3.4.1 程序顺序规则

可以认为是as-if-serial语义。

  • 不能改变程序的执行结果(在单线程环境下,执行的结果不变)
  • 依赖问题, 如果两个指令存在依赖关系,是不允许重排序
1
2
3
4
5
6
7
8
9
int a=0;
int b=0;
void test(){
  int a=1;   a
  int b=1;   b
  //int b=1;
  //int a=1;
  int c=a*b;  c
}

a happens -before b ; b happens before c

1.3.4.2 传递性规则

a happens-before b , b happens- before c, a happens-before c

1.3.4.3 volatile变量规则

  • volatile 修饰的变量的写操作,一定happens-before后续对于volatile变量的读操作.
  • 内存屏障机制来防止指令重排.
1
2
3
4
5
6
7
8
9
10
11
12
13
public class VolatileExample{
  int a=0;
  volatile boolean flag=false;
  public void writer(){
    a=1;             1
    flag=true; //修改       2
 }
  public void reader(){
    if(flag){ //true       3
      int i=a;  //1      4
   }
 }
}
  • 1 happens-before 2 是否成立? 是 -> ?
  • 3 happens-before 4 是否成立? 是
  • 2 happens -before 3 ->volatile规则
  • 1 happens-before 4 ; i=1成立.

1.3.4.4 监视器锁规则

对一个锁的解锁,happens-before 于随后对这个锁的加锁

1
2
3
4
5
6
7
8
int x=10;
synchronized(this){
  //后续线程读取到的x的值一定12
  if(x<12){
    x=12;
 }
}
x=12;

1.3.4.5 start规则

如果线程 A 执行操作 ThreadB.start(),那么线程 A 的 ThreadB.start()操作 happens-before 线程 B 中的任意操作

1
2
3
4
5
6
7
8
9
10
11
12
public class StartDemo{
  int x=0;
  Thread t1=new Thread(()->{
    //读取x的值 一定是20
    if(x==20){
     
   }
 });
  x=20;
  t1.start();
 
}

1.3.4.6 Join规则

如果线程 A 执行操作 ThreadB.join()并成功返回,那么线程 B 中的任意操作 happens-before 于线程A 从 ThreadB.join()操作成功返回

1
2
3
4
5
6
7
8
9
public class Test{
  int x=0;
  Thread t1=new Thread(()->{
    x=200;
 });
  t1.start();
  t1.join(); //保证结果的可见性。
  //在此处读取到的x的值一定是200.
}

JAVA并发编程入门篇,思考同步锁Synchronized背后的实现哲学

多线程在概念上类似抢占式多任务处理,线程的合理使用能够提升程序的处理能力,但是使用的同时也带来了弊端,对于共享变量访问就会产生安全性的问题。下面来看一个多线程访问共享变量的例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
public class ThreadSafty {

private static int count = 0;

public static void incr() {
try {
Thread.sleep(1);
} catch (InterruptedException e) {
e.printStackTrace();
}
count ++;
}

public static void main(String[] args) throws InterruptedException {
for (int i = 0 ; i < 1000; i++) {
new Thread(()->{
ThreadSafty.incr();
},"threadSafty" + i).start();
}
TimeUnit.SECONDS.sleep(3);
System.out.println("运行结果是:" + count);
}

}

变量count的运行结果始终是小于等于1000的随机数,因为线程的可见性和原子性。

一、多线程访问的数据安全性

如何保证线程并行运行的数据安全性问题,这里首先能够想到的是加锁吧。关系型数据库中有乐观锁、悲观锁,那么什么是锁呢?它是处理并发的一种手段,实现互斥的特性。

在Java语言中实现锁的关键字是Synchronized

二、Synchronized的基本应用

2.1 Synchronized的三种加锁方式

  • 静态方法:作用于当前类对象加锁,进入同步代码前要获得当前类对象的锁
1
synchronized  static void method(){}
  • 修饰代码块:指定加锁对象,进入同步代码前要获得指定对象的锁
1
2
3
void method(){
synchronized (SynchronizedDemo.class){}
}
  • 修改实例方法:作用于当前实例加锁,进入同步代码前要获得当前实例的锁
    1
    2
    3
    4
    5
    6
    7
    8
    Object lock = new Object();
    //只针对于当前对象实例有效.
    public SynchronizedDemo(Object lock){
    this.lock = lock;
    }
    void method(){
    synchronized(lock){}
    }

2.2 Synchronized锁是如何存储数据的呢?

以对象在jvm内存中是如何存储作为切入点,去看看对象里面有什么特性能够实现锁的

2.2.1 对象在Heap内存中的布局

在Hotspot虚拟机中,对象在堆内存中的布局,可以分为三个部分:

  • 对象头:包括对象标记、类元信息
  • 实例数据
  • 对齐填充

Hotspot 采用instanceOopDescarrayOopDesc 来描述对象头,arrayOopDesc 对象用来描述数组类型的。
instanceOopDesc的定义在Hotspot源码中的instanceOop.hpp文件中,另外,arrayOopDesc的定义对应arrayOop.hpp

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
class instanceOopDesc : public oopDesc {
public:
// aligned header size.
static int header_size() { return sizeof(instanceOopDesc)/HeapWordSize; }

// If compressed, the offset of the fields of the instance may not be aligned.
static int base_offset_in_bytes() {
// offset computation code breaks if UseCompressedClassPointers
// only is true
return (UseCompressedOops && UseCompressedClassPointers) ?
klass_gap_offset_in_bytes() :
sizeof(instanceOopDesc);
}

static bool contains_field_offset(int offset, int nonstatic_field_size) {
int base_in_bytes = base_offset_in_bytes();
return (offset >= base_in_bytes &&
(offset-base_in_bytes) < nonstatic_field_size * heapOopSize);
}
};

#endif // SHARE_VM_OOPS_INSTANCEOOP_HPP

看源码instanceOopDesc继承自oopDesc,oopDesc定义在oop.hpp文件中:

1
2
3
4
5
6
7
8
9
10
11
12
class oopDesc {
friend class VMStructs;
private:
volatile markOop _mark;
union _metadata {
Klass* _klass;//普通指针
narrowKlass _compressed_klass;//压缩类指针
} _metadata;

// Fast access to barrier set. Must be initialized.
static BarrierSet* _bs;
......

在oopDesc类中有两个重要的成员变量,_mark:记录对象和锁有关的信息,属于markOop类型,_metadata:记录类元信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
class markOopDesc: public oopDesc {
private:
// Conversion
uintptr_t value() const { return (uintptr_t) this; }

public:
// Constants
enum {
age_bits = 4,//分代年龄
lock_bits = 2,//锁标识
biased_lock_bits = 1,//是否为偏向锁
max_hash_bits = BitsPerWord - age_bits - lock_bits - biased_lock_bits,
hash_bits = max_hash_bits > 31 ? 31 : max_hash_bits,//对象的hashCode
cms_bits = LP64_ONLY(1) NOT_LP64(0),
epoch_bits = 2//偏向锁的时间戳
};
......

markOopDesc记录了对象和锁有关的信息,也就是我们常说的Mark Word,当某个对象加上Synchronized关键字时,那么和锁有关的一系列操作都与它有关。
32位系统Mark Word的长度是32bit64位系统则是64bit

Mark Word里面的数据会随着锁的标志位的变化而变化的。

2.2.2 Java中打印对象的布局

pom依赖

1
2
3
4
5
<dependency>
<groupId>org.openjdk.jol</groupId>
<artifactId>jol-core</artifactId>
<version>0.10</version>
</dependency>
1
System.out.println(ClassLayout.parseInstance(synchronizedDemo).toPrintable());
1
2
3
4
5
6
7
8
com.sy.sa.thread.SynchronizedDemo object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 31 00 00 00 (00110001 00000000 00000000 00000000) (49)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 05 c1 00 f8 (00000101 11000001 00000000 11111000) (-134168315)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

大端存储和小端存储

1
2
0     4        (object header)                           31 00 00 00 (00110001 00000000 00000000 00000000) (49)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
1
2
16进制: 0x 00 00 00 00 00 00 00 01
(64位)2进制: 00000000 00000000 00000000 00000000 00000000 00000000 00000000 00000 0

0 01 (无锁状态)

  • 通过最后三位来看锁的状态和标记。
1
2
3
4
5
6
7
 OFFSET  SIZE   TYPE DESCRIPTION                               VALUE
0 4 (object header) a8 f7 76 02 (10101000 11110111 01110110 00000010) (41351080)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 05 c1 00 f8 (00000101 11000001 00000000 11111000) (-134168315)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

000表示为轻量级锁

2.2.3 为什么什么对象都能实现锁?

Java 中的每个对象都派生自 Object 类,而每个Java ObjectJVM 内部都有一个 native 的 C++对象oop/oopDesc 进行对应。

线程在获取锁的时候,实际上就是获得一个监视器对象(monitor) ,monitor 可以认为是一个同步对象,所有的Java 对象是天生携带 monitor。在 hotspot 源码的markOop.hpp 文件中,可以看到下面这段代码:

1
2
3
4
5
ObjectMonitor* monitor() const {
assert(has_monitor(), "check");
// Use xor instead of &~ to provide one extra tag-bit check.
return (ObjectMonitor*) (value() ^ monitor_value);
}

多个线程访问同步代码块时,相当于去争抢对象监视器修改对象中的锁标识,上面的代码中ObjectMonitor这个对象和线程争抢锁的逻辑有密切的关系。

2.3 Synchronized的锁升级

锁的状态有:无锁、偏向锁、轻量级锁、重量级锁。 锁的状态根据竞争激烈程度从低到高不断升级。

2.3.1 偏向锁

1
2
3
4
5
存储(以32位为例):线程ID(23bit)
Epoch(2bit)
age(4bit)
是否偏向锁(1bit)
锁标志位(2bit)

当一个线程加入了Synchronized同步锁之后,会在对象头(Object Header)存储线程ID,后续这个线程进入或者退出这个同步代码块的代码时,不需要再次加入和释放锁,而是直接比较对象头里面是否存储了指向当前线程的偏向锁。如果线程ID相等,就表示偏向锁偏向于当前线程,就不需要再重新获得锁了。

1
2
3
4
5
6
7
8
9
10
11
com.sy.sa.thread.ClassLayoutDemo object internals:
OFFSET SIZE  TYPE DESCRIPTION                VALUE
  0   4    (object header)              05 e8 45 03
(00000101 11101000 01000101 00000011) (54913029)
  4   4    (object header)              00 00 00 00
(00000000 00000000 00000000 00000000) (0)
  8   4    (object header)              05 c1 00 f8
(00000101 11000001 00000000 11111000) (-134168315)
  12   4    (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

2.3.2 轻量级锁

1
2
存储(以32位为例):指向栈中锁记录的指针(30bit)
锁标志位(2bit)

如果偏向锁关闭或者当前偏向锁指向其它的线程,那么这个时候有线程去抢占锁,那么将升级为轻量级锁。

轻量级锁在加锁的过程中使用了自旋锁,JDK1.6之后使用了自适应的自旋锁。

2.3.3 重量级锁

1
2
存储(以32位为例):指向互斥量(重量级锁)的指针(30bit)
锁标志位(2bit)

当轻量级锁膨胀为重量级锁后,线程只能被挂起阻塞等待被唤醒了。先来看一个重量级锁的代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class HeavyweightLock {

public static void main(String[] args) {
HeavyweightLock heavyweightLock = new HeavyweightLock();
Thread t1 = new Thread(()->{
synchronized (heavyweightLock) {
System.out.println("tl lock");
System.out.println(ClassLayout.parseInstance(heavyweightLock).toPrintable());
}
},"heavyheightLock");
t1.start();
synchronized (heavyweightLock) {
System.out.println("main lock");
System.out.println(ClassLayout.parseInstance(heavyweightLock).toPrintable());
}
}

}

运行后的结果:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
com.sy.sa.thread.HeavyweightLock object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 2a cc e9 02 (00101010 11001100 11101001 00000010) (48876586)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 05 c1 00 f8 (00000101 11000001 00000000 11111000) (-134168315)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

tl lock
com.sy.sa.thread.HeavyweightLock object internals:
OFFSET SIZE TYPE DESCRIPTION VALUE
0 4 (object header) 2a cc e9 02 (00101010 11001100 11101001 00000010) (48876586)
4 4 (object header) 00 00 00 00 (00000000 00000000 00000000 00000000) (0)
8 4 (object header) 05 c1 00 f8 (00000101 11000001 00000000 11111000) (-134168315)
12 4 (loss due to the next object alignment)
Instance size: 16 bytes
Space losses: 0 bytes internal + 4 bytes external = 4 bytes total

每一个Java对象都会与一个监视器monitor关联,可以把它理解成一把锁,当一个线程要执行用Synchronized修改的代码块或者对象时,该线程最先获取到的是Synchronized修饰对象的monitor
重量级加锁的基本流程:

monitorenter表示去获得一个对象监视器。monitorexit表示释放monitor监视器的所有权,使得其他被阻塞的线程可以尝试去获得这个监视器。

2.3.4 锁升级总结

  • 偏向锁只有在第一次请求时采用CAS在锁对象的标记中记录当前线程的地址,在之后该线程再次进入同步代码块时,不需要抢占锁,直接判断线程ID即可,这种适用于锁会被同一个线程多次抢占的情况。
  • 轻量级锁才用CAS操作,把锁对象的标记字段替换为一个指针指向当前线程栈帧中的LockRecord,该工件存储锁对象原本的标记字段,它针对的是多个线程在不同时间段内申请通一把锁的情况。
  • 重量级锁会阻塞、和唤醒加锁的线程,它适用于多个线程同时竞争同一把锁的情况。