数据治理的王者Apache-Atlas如何构建自己的API

Apache Atlas是一个优秀的服务治理组件,用于企业Hadoop集群上的数据治理和元数据管理的数据治理工具。接下来我们将讨论构建自己的Java API,这些Java API可使用Apache atlas客户端与Apache Atlas交互以在其中创建新的实体和类型。

一、Atlas客户端Maven依赖关系

以下依赖项可用于pom.xml文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-client</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-typesystem</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-notification</artifactId>
<version>0.7-incubating</version>
</dependency>
<dependency>
<groupId>org.apache.atlas</groupId>
<artifactId>atlas-repository</artifactId>
<version>0.7-incubating</version>
</dependency>

二、设置atlas-application.properties

Apache Atlas客户端使用atlas-application属性在我们的API和Apache Atlas服务器之间建立连接。这些属性应放置在resources/atlas-application.properties中

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
#########  Security Properties  #########

# SSL config
atlas.enableTLS=false

######### Server Properties #########
atlas.rest.address=http://192.168.5.95:21000

atlas.hook.demo.kafka.retries=1
atlas.kafka.zookeeper.connect=192.168.5.93:2181,192.168.5.94:2181,192.168.5.95:2181
atlas.kafka.bootstrap.servers=192.168.5.93:9092,192.168.5.94:9092,192.168.5.95:9092
atlas.kafka.zookeeper.session.timeout.ms=4000
atlas.kafka.zookeeper.connection.timeout.ms=2000
atlas.kafka.zookeeper.sync.time.ms=20
atlas.kafka.auto.commit.interval.ms=1000
atlas.kafka.hook.group.id=atlas

三、创建与Atlas服务器的连接

要与Apache atlas Server,baseUrl和用户名创建连接,必须在AtlasClient构造函数中传递密码

1
2
3
4
final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});

四、关于Type相关的测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
public class AtlasTypesTest {

final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});

static final String DATABASE_TYPE = "DB_Sync";
static final String COLUMN_TYPE = "Column_Sync";
static final String TABLE_TYPE = "Table_Sync";
static final String VIEW_TYPE = "View_Sync";
public static final String DB_ATTRIBUTE = "db";
static final String STORAGE_DESC_TYPE = "StorageDesc";
public static final String COLUMNS_ATTRIBUTE = "columns";
public static final String INPUT_TABLES_ATTRIBUTE = "inputTables";
private static final String[] TYPES =
{DATABASE_TYPE, TABLE_TYPE, STORAGE_DESC_TYPE, COLUMN_TYPE, VIEW_TYPE, "JdbcAccess",
"ETL", "Metric", "PII", "Fact", "Dimension", "Log Data"};

/**
* 组织定义types
* @return
*/
TypesDef createTypeDefinitions() {
HierarchicalTypeDefinition<ClassType> dbClsDef = TypesUtil
.createClassTypeDef(DATABASE_TYPE, DATABASE_TYPE, null,
TypesUtil.createUniqueRequiredAttrDef("name", DataTypes.STRING_TYPE),
attrDef("description", DataTypes.STRING_TYPE.getName()), attrDef("locationUri", DataTypes.STRING_TYPE.getName()),
attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()));

HierarchicalTypeDefinition<ClassType> columnClsDef = TypesUtil
.createClassTypeDef(COLUMN_TYPE, COLUMN_TYPE, null, attrDef("name", DataTypes.STRING_TYPE.getName()),
attrDef("dataType", DataTypes.STRING_TYPE.getName()), attrDef("comment", DataTypes.STRING_TYPE.getName()));

HierarchicalTypeDefinition<ClassType> tblClsDef = TypesUtil
.createClassTypeDef(TABLE_TYPE, TABLE_TYPE, ImmutableSet.of("DataSet"),
new AttributeDefinition(DB_ATTRIBUTE, DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("sd", STORAGE_DESC_TYPE, Multiplicity.REQUIRED, true, null),
attrDef("owner", DataTypes.STRING_TYPE.getName()), attrDef("createTime", DataTypes.LONG_TYPE.getName()),
attrDef("lastAccessTime", DataTypes.LONG_TYPE.getName()), attrDef("retention", DataTypes.LONG_TYPE.getName()),
attrDef("viewOriginalText", DataTypes.STRING_TYPE.getName()),
attrDef("viewExpandedText", DataTypes.STRING_TYPE.getName()), attrDef("tableType", DataTypes.STRING_TYPE.getName()),
attrDef("temporary", DataTypes.BOOLEAN_TYPE.getName()),
new AttributeDefinition(COLUMNS_ATTRIBUTE, DataTypes.arrayTypeName(COLUMN_TYPE),
Multiplicity.COLLECTION, true, null));

HierarchicalTypeDefinition<ClassType> viewClsDef = TypesUtil
.createClassTypeDef(VIEW_TYPE, VIEW_TYPE, ImmutableSet.of("DataSet"),
new AttributeDefinition("db", DATABASE_TYPE, Multiplicity.REQUIRED, false, null),
new AttributeDefinition("inputTables", DataTypes.arrayTypeName(TABLE_TYPE),
Multiplicity.COLLECTION, false, null));

return TypesUtil.getTypesDef(ImmutableList.<EnumTypeDefinition>of(), ImmutableList.<StructTypeDefinition>of(),
ImmutableList.of(),
ImmutableList.of(dbClsDef, columnClsDef, tblClsDef, viewClsDef));
}

private void createTypes() throws Exception {
TypesDef typesDef = createTypeDefinitions();
String typesAsJSON = TypesSerialization.toJson(typesDef);
System.out.println("typesAsJSON = " + typesAsJSON);
atlasClient.createType(typesAsJSON);
verifyTypesCreated();
}

private void verifyTypesCreated() throws Exception {
List<String> types = atlasClient.listTypes();
for (String type : TYPES) {
assert types.contains(type);
}
}

AttributeDefinition attrDef(String name, String dT) {
return attrDef(name, dT, Multiplicity.OPTIONAL, false, null);
}

AttributeDefinition attrDef(String name, String dT, Multiplicity m, boolean isComposite,
String reverseAttributeName) {
return new AttributeDefinition(name, dT, m, isComposite, reverseAttributeName);
}

@Test
public void createNewTypes() throws Exception {
createTypes();
}
}

五、关于Entities相关的测试类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
public class AtlasEntitiesTest {


final AtlasClient atlasClient = new AtlasClient
(new String[]{"http://192.168.5.95:21000"},
new String[]{"admin",
"admin"});

/**
* 创建实例并返创建的Id对象
* @param referenceable
* @return
* @throws Exception
*/
private Id createInstance(Referenceable referenceable) throws Exception {
String typeName = referenceable.getTypeName();
String entityJSON = InstanceSerialization.toJson(referenceable, true);
System.out.println("Submitting new entity= " + entityJSON);
List<String> guids = atlasClient.createEntity(entityJSON);
System.out.println("created instance for type " + typeName + ", guid: " + guids);
return new Id(guids.get(guids.size() - 1), referenceable.getId().getVersion(),
referenceable.getTypeName());
}

/**
* 创建数据库实例并返创建的数据库Id对象
* @param name
* @param description
* @param owner
* @param locationUri
* @param traitNames
* @return
* @throws Exception
*/
Id database(String name, String description, String owner, String locationUri, String... traitNames)
throws Exception {
Referenceable referenceable = new Referenceable(DATABASE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("locationUri", locationUri);
referenceable.set("createTime", System.currentTimeMillis());

return createInstance(referenceable);
}

/**
* 创建列的实例并返创建的列的实例对象
* @param name
* @param dataType
* @param comment
* @param traitNames
* @return
* @throws Exception
*/
Referenceable column(String name, String dataType, String comment, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(COLUMN_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set("dataType", dataType);
referenceable.set("comment", comment);

return referenceable;
}

/**
* 创建表的实例并返创建的表的Id对象
* @param name
* @param description
* @param dbId
* @param sd
* @param owner
* @param tableType
* @param columns
* @param traitNames
* @return
* @throws Exception
*/
Id table(String name, String description, Id dbId, Referenceable sd, String owner, String tableType,
List<Referenceable> columns, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(TABLE_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("description", description);
referenceable.set("owner", owner);
referenceable.set("tableType", tableType);
referenceable.set("createTime", System.currentTimeMillis());
referenceable.set("lastAccessTime", System.currentTimeMillis());
referenceable.set("retention", System.currentTimeMillis());
referenceable.set("db", dbId);
referenceable.set("sd", sd);
referenceable.set("columns", columns);

return createInstance(referenceable);
}

/**
* 创建视图的实例并返创建的视图的Id对象
* @param name
* @param dbId
* @param inputTables
* @param traitNames
* @return
* @throws Exception
*/
Id view(String name, Id dbId, List<Id> inputTables, String... traitNames) throws Exception {
Referenceable referenceable = new Referenceable(VIEW_TYPE, traitNames);
referenceable.set("name", name);
referenceable.set(AtlasClient.REFERENCEABLE_ATTRIBUTE_NAME, name);
referenceable.set("db", dbId);

referenceable.set(INPUT_TABLES_ATTRIBUTE, inputTables);

return createInstance(referenceable);
}

/**
* 原始存储描述符
* @param location
* @param inputFormat
* @param outputFormat
* @param compressed
* @return
* @throws Exception
*/
Referenceable storageDescriptor(String location, String inputFormat, String outputFormat, boolean compressed)
throws Exception {
Referenceable referenceable = new Referenceable(STORAGE_DESC_TYPE);
referenceable.set("location", location);
referenceable.set("inputFormat", inputFormat);
referenceable.set("outputFormat", outputFormat);
referenceable.set("compressed", compressed);

return referenceable;
}


@Test
public void createEntities() throws Exception {
//创建数据库实例
Id syncDB = database("sy_sync", "Sync Database", "root", "");
//存储描述符
Referenceable sd =
storageDescriptor("", "TextInputFormat", "TextOutputFormat",
true);
//创建列实例
//1、数据源
List<Referenceable> databaseColumns = ImmutableList
.of(column("id", "long", "id"),
column("name", "string", "name"),
column("type", "string", "type"),
column("url", "string", "url"),
column("database_name", "string", "database name"),
column("username", "string", "username"),
column("password","string","password"),
column("description", "string", "description"),
column("create_time", "string", "create time"),
column("update_time", "string", "update time"),
column("create_id", "long", "user id"),
column("update_id", "long", "user id"));
//2、同步文件夹
List<Referenceable> syncFolderColumns = ImmutableList
.of(column("id", "long", "id"),
column("name", "string", "name"),
column("description", "string", "description"),
column("create_time", "string", "create time"),
column("update_time", "string", "update time"),
column("create_id", "long", "user id"),
column("update_id", "long", "user id"));
//创建表实例
Id database = table("datasource", "database table", syncDB, sd, "root", "External", databaseColumns);
Id syncFolder = table("folder", "sync folder table", syncDB, sd, "root", "External", syncFolderColumns);
//创建视图实例

}


@Test
public void getEntity() throws AtlasServiceException {
Referenceable referenceable = atlasClient.getEntity("1406ddd0-5d51-41d4-b174-859bd4f34a5b");
System.out.println(InstanceSerialization.toJson(referenceable, true));
}

}