forked from amundsen-io/amundsendatabuilder
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathtable_owner.py
More file actions
98 lines (84 loc) · 3.21 KB
/
table_owner.py
File metadata and controls
98 lines (84 loc) · 3.21 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
# Copyright Contributors to the Amundsen project.
# SPDX-License-Identifier: Apache-2.0
from typing import (
List, Optional, Union,
)
from databuilder.models.graph_node import GraphNode
from databuilder.models.graph_relationship import GraphRelationship
from databuilder.models.graph_serializable import GraphSerializable
from databuilder.models.owner_constants import OWNER_OF_OBJECT_RELATION_TYPE, OWNER_RELATION_TYPE
from databuilder.models.user import User
class TableOwner(GraphSerializable):
"""
Hive table owner model.
"""
OWNER_TABLE_RELATION_TYPE = OWNER_OF_OBJECT_RELATION_TYPE
TABLE_OWNER_RELATION_TYPE = OWNER_RELATION_TYPE
def __init__(self,
db_name: str,
schema: str,
table_name: str,
owners: Union[List, str],
cluster: str = 'gold',
) -> None:
self.db = db_name
self.schema = schema
self.table = table_name
if isinstance(owners, str):
owners = owners.split(',')
self.owners = [owner.strip() for owner in owners]
self.cluster = cluster
self._node_iter = iter(self.create_nodes())
self._relation_iter = iter(self.create_relation())
def create_next_node(self) -> Optional[GraphNode]:
# return the string representation of the data
try:
return next(self._node_iter)
except StopIteration:
return None
def create_next_relation(self) -> Optional[GraphRelationship]:
try:
return next(self._relation_iter)
except StopIteration:
return None
def get_owner_model_key(self, owner: str) -> str:
return User.USER_NODE_KEY_FORMAT.format(email=owner)
def get_metadata_model_key(self) -> str:
return f'{self.db}://{self.cluster}.{self.schema}/{self.table}'
def create_nodes(self) -> List[GraphNode]:
"""
Create a list of Neo4j node records
:return:
"""
results = []
for owner in self.owners:
if owner:
node = GraphNode(
key=self.get_owner_model_key(owner),
label=User.USER_NODE_LABEL,
attributes={
User.USER_NODE_EMAIL: owner
}
)
results.append(node)
return results
def create_relation(self) -> List[GraphRelationship]:
"""
Create a list of relation map between owner record with original hive table
:return:
"""
results = []
for owner in self.owners:
relationship = GraphRelationship(
start_key=self.get_owner_model_key(owner),
start_label=User.USER_NODE_LABEL,
end_key=self.get_metadata_model_key(),
end_label='Table',
type=TableOwner.OWNER_TABLE_RELATION_TYPE,
reverse_type=TableOwner.TABLE_OWNER_RELATION_TYPE,
attributes={}
)
results.append(relationship)
return results
def __repr__(self) -> str:
return f'TableOwner({self.db!r}, {self.cluster!r}, {self.schema!r}, {self.table!r}, {self.owners!r})'