Skip to content

Commit 5b8d9a6

Browse files
committed
feat(securitycenter): Add Resource SCC Org Mgt API SHA Cust Modules (GetEff, ListEff, ListDesc, Simulate)
1 parent c4e0292 commit 5b8d9a6

File tree

2 files changed

+221
-5
lines changed

2 files changed

+221
-5
lines changed

securitycenter/snippets_management_api/security_health_analytics_custom_modules.py

Lines changed: 154 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,12 @@
1414
# See the License for the specific language governing permissions and
1515
# limitations under the License.
1616

17-
import random
18-
import time
19-
from typing import Dict
20-
21-
# [START securitycenter_get_effective_security_health_analytics_custom_module]
2217
from google.api_core.exceptions import NotFound
2318
from google.cloud import securitycentermanagement_v1
2419

20+
# [START securitycenter_get_effective_security_health_analytics_custom_module]
21+
22+
2523
def get_effective_security_health_analytics_custom_module(parent: str, module_id: str):
2624
"""
2725
Retrieves a Security Health Analytics custom module.
@@ -49,3 +47,154 @@ def get_effective_security_health_analytics_custom_module(parent: str, module_id
4947
print(f"Custom Module not found: {response.name}")
5048
raise e
5149
# [END securitycenter_get_effective_security_health_analytics_custom_module]
50+
51+
# [START securitycenter_list_descendant_security_health_analytics_custom_module]
52+
53+
54+
def list_descendant_security_health_analytics_custom_module(parent: str):
55+
"""
56+
Retrieves list of all resident Security Health Analytics custom modules and all of its descendants.
57+
Args:
58+
parent: Use any one of the following options:
59+
- organizations/{organization_id}/locations/{location_id}
60+
- folders/{folder_id}/locations/{location_id}
61+
- projects/{project_id}/locations/{location_id}
62+
Returns:
63+
List of retrieved all resident Security Health Analytics custom modules and all of its descendants.
64+
Raises:
65+
NotFound: If the specified custom module does not exist.
66+
"""
67+
68+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
69+
70+
try:
71+
request = securitycentermanagement_v1.ListDescendantSecurityHealthAnalyticsCustomModulesRequest(
72+
parent=parent,
73+
)
74+
75+
response = client.list_descendant_security_health_analytics_custom_modules(request=request)
76+
77+
custom_modules = []
78+
for custom_module in response:
79+
print(f"Custom Module: {custom_module.name}")
80+
custom_modules.append(custom_module)
81+
return custom_modules
82+
except NotFound as e:
83+
print(f"Parent resource not found: {parent}")
84+
raise e
85+
except Exception as e:
86+
print(f"An error occurred while listing custom modules: {e}")
87+
raise e
88+
# [END securitycenter_list_descendant_security_health_analytics_custom_module]
89+
90+
# [START securitycenter_list_effective_security_health_analytics_custom_module]
91+
92+
93+
def list_effective_security_health_analytics_custom_module(parent: str):
94+
"""
95+
Retrieves list of all Security Health Analytics custom modules.
96+
This includes resident modules defined at the scope of the parent,
97+
and inherited modules, inherited from ancestor organizations, folders, and projects (no descendants).
98+
99+
Args:
100+
parent: Use any one of the following options:
101+
- organizations/{organization_id}/locations/{location_id}
102+
- folders/{folder_id}/locations/{location_id}
103+
- projects/{project_id}/locations/{location_id}
104+
Returns:
105+
List of retrieved all Security Health Analytics custom modules.
106+
Raises:
107+
NotFound: If the specified custom module does not exist.
108+
"""
109+
110+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
111+
112+
try:
113+
request = securitycentermanagement_v1.ListEffectiveSecurityHealthAnalyticsCustomModulesRequest(
114+
parent=parent,
115+
)
116+
117+
response = client.list_effective_security_health_analytics_custom_modules(request=request)
118+
119+
custom_modules = []
120+
for custom_module in response:
121+
print(f"Custom Module: {custom_module.name}")
122+
custom_modules.append(custom_module)
123+
return custom_modules
124+
except NotFound as e:
125+
print(f"Parent resource not found: {parent}")
126+
raise e
127+
except Exception as e:
128+
print(f"An error occurred while listing custom modules: {e}")
129+
raise e
130+
# [END securitycenter_list_effective_security_health_analytics_custom_module]
131+
132+
# [START securitycenter_simulate_security_health_analytics_custom_module]
133+
134+
135+
def simulate_security_health_analytics_custom_module(parent: str):
136+
"""
137+
Simulates the result of using a SecurityHealthAnalyticsCustomModule to check a resource.
138+
139+
Args:
140+
parent: Use any one of the following options:
141+
- organizations/{organization_id}/locations/{location_id}
142+
- folders/{folder_id}/locations/{location_id}
143+
- projects/{project_id}/locations/{location_id}
144+
Returns:
145+
Simulated Security Health Analytics custom module.
146+
"""
147+
148+
client = securitycentermanagement_v1.SecurityCenterManagementClient()
149+
150+
# Define the custom config configuration
151+
custom_config = {
152+
"description": (
153+
"Sample custom module for testing purposes. This custom module evaluates "
154+
"Cloud KMS CryptoKeys to ensure their rotation period exceeds 30 days (2592000 seconds)."
155+
),
156+
"predicate": {
157+
"expression": "has(resource.rotationPeriod) && (resource.rotationPeriod > duration('2592000s'))",
158+
"title": "Cloud KMS CryptoKey Rotation Period",
159+
"description": (
160+
"Evaluates whether the rotation period of a Cloud KMS CryptoKey exceeds 30 days. "
161+
"A longer rotation period might increase the risk of exposure."
162+
),
163+
},
164+
"recommendation": (
165+
"Review and adjust the rotation period for Cloud KMS CryptoKeys to align with your security policies. "
166+
"Consider setting a shorter rotation period if possible."
167+
),
168+
"resource_selector": {"resource_types": ["cloudkms.googleapis.com/CryptoKey"]},
169+
"severity": "CRITICAL",
170+
"custom_output": {
171+
"properties": [
172+
{
173+
"name": "example_property",
174+
"value_expression": {
175+
"description": "The resource name of the CryptoKey being evaluated.",
176+
"expression": "resource.name",
177+
"location": "global",
178+
"title": "CryptoKey Resource Name",
179+
},
180+
}
181+
]
182+
},
183+
}
184+
185+
# Initialize request argument(s)
186+
resource = securitycentermanagement_v1.types.SimulateSecurityHealthAnalyticsCustomModuleRequest.SimulatedResource()
187+
resource.resource_type = "cloudkms.googleapis.com/CryptoKey" # Replace with the correct resource type
188+
189+
request = securitycentermanagement_v1.SimulateSecurityHealthAnalyticsCustomModuleRequest(
190+
parent=parent,
191+
custom_config=custom_config,
192+
resource=resource,
193+
)
194+
195+
response = client.simulate_security_health_analytics_custom_module(request=request)
196+
197+
print(f"Simulated Security Health Analytics Custom Module: {response}")
198+
return response
199+
200+
# [END securitycenter_simulate_security_health_analytics_custom_module]

securitycenter/snippets_management_api/security_health_analytics_custom_modules_test.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -121,6 +121,7 @@ def add_custom_module(org_id: str):
121121
module_id = module_name.split("/")[-1]
122122
return module_name, module_id
123123

124+
124125
@backoff.on_exception(
125126
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
126127
)
@@ -137,3 +138,69 @@ def test_get_effective_security_health_analytics_custom_module():
137138
assert response.display_name.startswith(PREFIX)
138139
assert response.enablement_state == securitycentermanagement_v1.EffectiveSecurityHealthAnalyticsCustomModule.EnablementState.ENABLED
139140
print(f"Retrieved Custom Module: {response.name}")
141+
142+
143+
@backoff.on_exception(
144+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
145+
)
146+
def test_list_descendant_security_health_analytics_custom_module():
147+
148+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
149+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
150+
# Retrieve the list descendant custom modules
151+
custom_modules = security_health_analytics_custom_modules.list_descendant_security_health_analytics_custom_module(parent)
152+
153+
assert custom_modules is not None, "Failed to retrieve the custom modules."
154+
assert len(custom_modules) > 0, "No custom modules were retrieved."
155+
156+
# Verify the created module is in the list
157+
created_module = next(
158+
(module for module in custom_modules if module.name == module_name), None
159+
)
160+
assert created_module is not None, "Created custom module not found in the list."
161+
assert created_module.display_name.startswith(PREFIX)
162+
assert (
163+
created_module.enablement_state
164+
== securitycentermanagement_v1.SecurityHealthAnalyticsCustomModule.EnablementState.ENABLED
165+
)
166+
167+
168+
@backoff.on_exception(
169+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
170+
)
171+
def test_list_effective_security_health_analytics_custom_module():
172+
173+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
174+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
175+
# Retrieve the list of custom modules
176+
custom_modules = security_health_analytics_custom_modules.list_effective_security_health_analytics_custom_module(parent)
177+
178+
assert custom_modules is not None, "Failed to retrieve the custom modules."
179+
assert len(custom_modules) > 0, "No custom modules were retrieved."
180+
181+
# Verify the created module is in the list
182+
created_module = next(
183+
(module for module in custom_modules if (module.name.split("/")[-1]) == module_id), None
184+
)
185+
assert created_module is not None, "Created custom module not found in the list."
186+
assert created_module.display_name.startswith(PREFIX)
187+
assert (
188+
created_module.enablement_state
189+
== securitycentermanagement_v1.EffectiveSecurityHealthAnalyticsCustomModule.EnablementState.ENABLED
190+
)
191+
192+
193+
@backoff.on_exception(
194+
backoff.expo, (InternalServerError, ServiceUnavailable, NotFound), max_tries=3
195+
)
196+
def test_simulate_security_health_analytics_custom_module():
197+
198+
module_name, module_id = add_custom_module(ORGANIZATION_ID)
199+
parent = f"organizations/{ORGANIZATION_ID}/locations/{LOCATION}"
200+
201+
simulated_custom_module = security_health_analytics_custom_modules.simulate_security_health_analytics_custom_module(parent)
202+
203+
assert simulated_custom_module is not None, "Failed to retrieve the simulated custom module."
204+
assert simulated_custom_module.result.no_violation is not None, (
205+
f"Expected no_violation to be present, got {simulated_custom_module.result}."
206+
)

0 commit comments

Comments
 (0)