Skip to content

Commit

Permalink
Add doc
Browse files Browse the repository at this point in the history
  • Loading branch information
vuilleumierc committed Sep 20, 2024
1 parent d768d79 commit 69373d5
Showing 1 changed file with 67 additions and 0 deletions.
67 changes: 67 additions & 0 deletions geoservercloud/geoservercloud.py
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,9 @@ def create_workspace(
return response

def delete_workspace(self, workspace: str) -> Response:
"""
Delete a GeoServer workspace (recursively)
"""
path: str = f"/rest/workspaces/{workspace}.json?recurse=true"
response: Response = self.delete_request(path)
if self.default_workspace == workspace:
Expand All @@ -113,6 +116,9 @@ def recreate_workspace(
)

def publish_workspace(self, workspace) -> Response:
"""
Publish the WMS service for a given workspace
"""
path: str = f"{self.workspace_wms_settings_path(workspace)}"

data: dict[str, dict[str, Any]] = Templates.workspace_wms(workspace)
Expand All @@ -121,6 +127,9 @@ def publish_workspace(self, workspace) -> Response:
def set_default_locale_for_service(
self, workspace: str, locale: str | None
) -> Response:
"""
Set a default language for localized WMS requests
"""
path: str = self.workspace_wms_settings_path(workspace)
data: dict[str, dict[str, Any]] = {
"wms": {
Expand All @@ -130,6 +139,9 @@ def set_default_locale_for_service(
return self.put_request(path, json=data)

def unset_default_locale_for_service(self, workspace) -> None:
"""
Remove the default language for localized WMS requests
"""
self.set_default_locale_for_service(workspace, None)

def create_pg_datastore(
Expand Down Expand Up @@ -448,6 +460,9 @@ def get_map(
styles: list[str] | None = None,
language: str | None = None,
) -> ResponseWrapper | None:
"""
WMS GetMap request
"""
if not self.wms:
self.create_wms()
params: dict[str, Any] = {
Expand Down Expand Up @@ -479,6 +494,9 @@ def get_feature_info(
styles: list[str] | None = None,
xy: list[float] = [0, 0],
) -> ResponseWrapper | None:
"""
WMS GetFeatureInfo request
"""
if not self.wms:
self.create_wms()
params = {
Expand All @@ -505,6 +523,9 @@ def get_legend_graphic(
style: str | None = None,
workspace: str | None = None,
) -> Response:
"""
WMS GetLegendGraphic request
"""
path: str
if not workspace:
path = "/wms"
Expand All @@ -527,6 +548,9 @@ def get_legend_graphic(
def get_tile(
self, layer, format, tile_matrix_set, tile_matrix, row, column
) -> ResponseWrapper | None:
"""
WMTS GetTile request
"""
if not self.wmts:
self.create_wmts()
if self.wmts:
Expand Down Expand Up @@ -619,17 +643,29 @@ def get_property_value(
return value_collection.get("wfs:member", {})

def create_role(self, role_name: str) -> Response:
"""
Create a GeoServer role
"""
return self.post_request(f"/rest/security/roles/role/{role_name}")

def delete_role(self, role_name: str) -> Response:
"""
Delete a GeoServer role
"""
return self.delete_request(f"/rest/security/roles/role/{role_name}")

def create_role_if_not_exists(self, role_name: str) -> Response | None:
"""
Create a GeoServer role if it does not yet exist
"""
if self.role_exists(role_name):
return None
return self.create_role(role_name)

def role_exists(self, role_name: str) -> bool:
"""
Check if a GeoServer role exists
"""
response = self.get_request(
f"/rest/security/roles", headers={"Accept": "application/json"}
)
Expand All @@ -644,6 +680,9 @@ def create_acl_admin_rule(
user: str | None = None,
workspace: str | None = None,
) -> Response:
"""
Create a GeoServer ACL admin rule
"""
path = "/acl/api/adminrules"
return self.post_request(
path,
Expand All @@ -657,14 +696,23 @@ def create_acl_admin_rule(
)

def delete_acl_admin_rule(self, id: int) -> Response:
"""
Delete a GeoServer ACL admin rule by id
"""
path = f"/acl/api/adminrules/id/{id}"
return self.delete_request(path)

def delete_all_acl_admin_rules(self) -> Response:
"""
Delete all existing GeoServer ACL admin rules
"""
path = "/acl/api/adminrules"
return self.delete_request(path)

def get_acl_rules(self) -> dict[str, Any]:
"""
Return all GeoServer ACL data rules
"""
path = "/acl/api/rules"
response = self.get_request(path)
return response.json()
Expand All @@ -678,6 +726,9 @@ def create_acl_rules_for_requests(
service: str | None = None,
workspace: str | None = None,
) -> list[Response]:
"""
Create ACL rules for multiple type of OGC requests
"""
responses = []
for request in requests:
responses.append(
Expand All @@ -702,6 +753,9 @@ def create_acl_rule(
request: str | None = None,
workspace: str | None = None,
) -> Response:
"""
Create a GeoServer ACL data rule
"""
path = "/acl/api/rules"
json = {"priority": priority, "access": access}
if role:
Expand All @@ -717,16 +771,26 @@ def create_acl_rule(
return self.post_request(path, json=json)

def delete_all_acl_rules(self) -> Response:
"""
Delete all existing GeoServer ACL data rules
"""
path = "/acl/api/rules"
return self.delete_request(path)

def create_or_update_resource(self, path, resource_path, payload) -> Response:
"""
Create a GeoServer resource or update it if it already exists
"""
if not self.resource_exists(resource_path):
return self.post_request(path, json=payload)
else:
return self.put_request(resource_path, json=payload)

def create_gridset(self, epsg: int) -> Response | None:
"""
Create a gridset for GeoWebCache for a given projection
Supported EPSG codes are 2056, 21781 and 3857
"""
resource_path: str = f"/gwc/rest/gridsets/EPSG:{epsg}.xml"
if self.resource_exists(resource_path):
return None
Expand All @@ -739,6 +803,9 @@ def create_gridset(self, epsg: int) -> Response | None:
return self.put_request(resource_path, data=data, headers=headers)

def resource_exists(self, path: str) -> bool:
"""
Check if a resource (given its path) exists in GeoServer
"""
# GeoServer raises a 500 when posting to a datastore or feature type that already exists, so first do
# a get request
response = self.get_request(path)
Expand Down

0 comments on commit 69373d5

Please sign in to comment.