9
9
from werkzeug .exceptions import (InternalServerError , NotFound )
10
10
11
11
from foca .utils .logging import log_traffic
12
+ from foca .errors .exceptions import BadRequest
12
13
13
14
logger = logging .getLogger (__name__ )
14
15
@@ -20,27 +21,31 @@ def postPermission() -> str:
20
21
Returns:
21
22
Identifier of the new permission added.
22
23
"""
23
- try :
24
- access_control_adapter = current_app .config ["casbin_adapter" ]
25
- request_json = request .json
26
- rule = request_json .get ("rule" , {})
27
- permission_data = [
28
- rule .get ("v0" , None ),
29
- rule .get ("v1" , None ),
30
- rule .get ("v2" , None ),
31
- rule .get ("v3" , None ),
32
- rule .get ("v4" , None ),
33
- rule .get ("v5" , None )
34
- ]
35
- permission_id = access_control_adapter .save_policy_line (
36
- ptype = request_json .get ("policy_type" , None ),
37
- rule = permission_data
38
- )
39
- logger .info ("New policy added." )
40
- return permission_id
41
- except Exception as e :
42
- logger .error (f"{ type (e ).__name__ } : { e } " )
43
- raise InternalServerError
24
+ request_json = request .json
25
+ if isinstance (request_json , dict ):
26
+ try :
27
+ access_control_adapter = current_app .config ["casbin_adapter" ]
28
+ rule = request_json .get ("rule" , {})
29
+ permission_data = [
30
+ rule .get ("v0" , None ),
31
+ rule .get ("v1" , None ),
32
+ rule .get ("v2" , None ),
33
+ rule .get ("v3" , None ),
34
+ rule .get ("v4" , None ),
35
+ rule .get ("v5" , None )
36
+ ]
37
+ permission_id = access_control_adapter .save_policy_line (
38
+ ptype = request_json .get ("policy_type" , None ),
39
+ rule = permission_data
40
+ )
41
+ logger .info ("New policy added." )
42
+ return permission_id
43
+ except Exception as e :
44
+ logger .error (f"{ type (e ).__name__ } : { e } " )
45
+ raise InternalServerError
46
+ else :
47
+ logger .error ("Invalid request payload." )
48
+ raise BadRequest
44
49
45
50
46
51
@log_traffic
@@ -55,27 +60,34 @@ def putPermission(
55
60
Returns:
56
61
Identifier of updated permission.
57
62
"""
58
- try :
59
- request_json = request .json
60
- access_control_config = current_app .config .foca .access_control
61
- db_coll_permission : Collection = (
62
- current_app .config .foca .db .dbs [access_control_config .db_name ]
63
- .collections [access_control_config .collection_name ].client
64
- )
65
-
66
- permission_data = request_json .get ("rule" , {})
67
- permission_data ["id" ] = id
68
- permission_data ["ptype" ] = request_json .get ("policy_type" , None )
69
- db_coll_permission .replace_one (
70
- filter = {"id" : id },
71
- replacement = permission_data ,
72
- upsert = True
73
- )
74
- logger .info ("Policy updated." )
75
- return id
76
- except Exception as e :
77
- logger .error (f"{ type (e ).__name__ } : { e } " )
78
- raise InternalServerError
63
+ request_json = request .json
64
+ if isinstance (request_json , dict ):
65
+ app_config = current_app .config
66
+ try :
67
+ access_control_config = \
68
+ app_config .foca .access_control # type: ignore[attr-defined]
69
+ db_coll_permission : Collection = (
70
+ app_config .foca .db .dbs [ # type: ignore[attr-defined]
71
+ access_control_config .db_name ]
72
+ .collections [access_control_config .collection_name ].client
73
+ )
74
+
75
+ permission_data = request_json .get ("rule" , {})
76
+ permission_data ["id" ] = id
77
+ permission_data ["ptype" ] = request_json .get ("policy_type" , None )
78
+ db_coll_permission .replace_one (
79
+ filter = {"id" : id },
80
+ replacement = permission_data ,
81
+ upsert = True
82
+ )
83
+ logger .info ("Policy updated." )
84
+ return id
85
+ except Exception as e :
86
+ logger .error (f"{ type (e ).__name__ } : { e } " )
87
+ raise InternalServerError
88
+ else :
89
+ logger .error ("Invalid request payload." )
90
+ raise BadRequest
79
91
80
92
81
93
@log_traffic
@@ -88,11 +100,13 @@ def getAllPermissions(limit=None) -> List[Dict]:
88
100
Returns:
89
101
List of permission dicts.
90
102
"""
91
- logger .info (f"test { current_app .config } " )
92
- access_control_config = current_app .config .foca .access_control
103
+ app_config = current_app .config
104
+ access_control_config = \
105
+ app_config .foca .access_control # type: ignore[attr-defined]
93
106
db_coll_permission : Collection = (
94
- current_app .config .foca .db .dbs [access_control_config .db_name ]
95
- .collections [access_control_config .collection_name ].client
107
+ app_config .foca .db .dbs [ # type: ignore[attr-defined]
108
+ access_control_config .db_name
109
+ ].collections [access_control_config .collection_name ].client
96
110
)
97
111
98
112
if not limit :
@@ -129,10 +143,13 @@ def getPermission(
129
143
Returns:
130
144
Permission data for the given id.
131
145
"""
132
- access_control_config = current_app .config .foca .access_control
146
+ app_config = current_app .config
147
+ access_control_config = \
148
+ app_config .foca .access_control # type: ignore[attr-defined]
133
149
db_coll_permission : Collection = (
134
- current_app .config .foca .db .dbs [access_control_config .db_name ]
135
- .collections [access_control_config .collection_name ].client
150
+ app_config .foca .db .dbs [ # type: ignore[attr-defined]
151
+ access_control_config .db_name
152
+ ].collections [access_control_config .collection_name ].client
136
153
)
137
154
138
155
permission = db_coll_permission .find_one (filter = {"id" : id })
@@ -162,10 +179,13 @@ def deletePermission(
162
179
Returns:
163
180
Delete permission identifier.
164
181
"""
165
- access_control_config = current_app .config .foca .access_control
182
+ app_config = current_app .config
183
+ access_control_config = \
184
+ app_config .foca .access_control # type: ignore[attr-defined]
166
185
db_coll_permission : Collection = (
167
- current_app .config .foca .db .dbs [access_control_config .db_name ]
168
- .collections [access_control_config .collection_name ].client
186
+ app_config .foca .db .dbs [ # type: ignore[attr-defined]
187
+ access_control_config .db_name
188
+ ].collections [access_control_config .collection_name ].client
169
189
)
170
190
171
191
del_obj_permission = db_coll_permission .delete_one ({'id' : id })
0 commit comments