1+ from datetime import date
12from typing import Optional
23
3- from django .db .models import Count , Exists , IntegerField , OuterRef , Q , Subquery
4+ from django .db .models import Count , Exists , IntegerField , OuterRef , Q , Subquery , Sum
45from django .db .models .functions import Coalesce
56from django .db .models .query import QuerySet
67
1516from application .core .types import Severity , Status
1617from application .licenses .models import License_Component
1718from application .licenses .types import License_Policy_Evaluation_Result
19+ from application .metrics .models import Product_License_Metrics , Product_Metrics
20+
21+ SEVERITY_MAPPING = {
22+ Severity .SEVERITY_CRITICAL : "active_critical" ,
23+ Severity .SEVERITY_HIGH : "active_high" ,
24+ Severity .SEVERITY_MEDIUM : "active_medium" ,
25+ Severity .SEVERITY_LOW : "active_low" ,
26+ Severity .SEVERITY_NONE : "active_none" ,
27+ Severity .SEVERITY_UNKNOWN : "active_unknown" ,
28+ }
29+
30+ EVALUATION_RESULT_MAPPING = {
31+ License_Policy_Evaluation_Result .RESULT_ALLOWED : "allowed" ,
32+ License_Policy_Evaluation_Result .RESULT_FORBIDDEN : "forbidden" ,
33+ License_Policy_Evaluation_Result .RESULT_IGNORED : "ignored" ,
34+ License_Policy_Evaluation_Result .RESULT_REVIEW_REQUIRED : "review_required" ,
35+ License_Policy_Evaluation_Result .RESULT_UNKNOWN : "unknown" ,
36+ }
1837
1938
2039def get_product_by_id (
21- product_id : int , is_product_group : bool = None , with_annotations : bool = False
40+ product_id : int ,
41+ is_product_group : bool = None ,
42+ with_observation_annotations : bool = False ,
43+ with_metrics_annotations : bool = False ,
2244) -> Optional [Product ]:
2345 try :
2446 if is_product_group is None :
25- return _add_annotations (Product .objects .all (), False , False ).get (id = product_id )
26- return _add_annotations (Product . objects . all (), is_product_group , with_annotations ). get (
27- id = product_id , is_product_group = is_product_group
28- )
47+ return _add_annotations (Product .objects .all (), False , False , False ).get (id = product_id )
48+ return _add_annotations (
49+ Product . objects . all () , is_product_group , with_observation_annotations , with_metrics_annotations
50+ ). get ( id = product_id , is_product_group = is_product_group )
2951 except Product .DoesNotExist :
3052 return None
3153
@@ -39,7 +61,9 @@ def get_product_by_name(name: str, is_product_group: bool = None) -> Optional[Pr
3961 return None
4062
4163
42- def get_products (is_product_group : bool = None , with_annotations : bool = False ) -> QuerySet [Product ]:
64+ def get_products (
65+ is_product_group : bool = None , with_observation_annotations : bool = False , with_metrics_annotations : bool = False
66+ ) -> QuerySet [Product ]:
4367 user = get_current_user ()
4468
4569 if user is None :
@@ -48,7 +72,7 @@ def get_products(is_product_group: bool = None, with_annotations: bool = False)
4872 products = Product .objects .all ()
4973
5074 if is_product_group is not None :
51- products = _add_annotations (products , is_product_group , with_annotations )
75+ products = _add_annotations (products , is_product_group , with_observation_annotations , with_metrics_annotations )
5276
5377 if not user .is_superuser :
5478 product_members = Product_Member .objects .filter (product = OuterRef ("pk" ), user = user )
@@ -83,12 +107,18 @@ def get_products(is_product_group: bool = None, with_annotations: bool = False)
83107 return products
84108
85109
86- def _add_annotations (queryset : QuerySet , is_product_group : bool , with_annotations : bool ) -> QuerySet :
87- if not with_annotations :
110+ def _add_annotations (
111+ queryset : QuerySet , is_product_group : bool , with_observation_annotations : bool , with_metrics_annotations : bool
112+ ) -> QuerySet :
113+ if not with_observation_annotations and not with_metrics_annotations :
88114 return queryset
89115
90- queryset = _add_observation_annotations (queryset , is_product_group )
91- queryset = _add_license_annotations (queryset , is_product_group )
116+ if with_observation_annotations :
117+ queryset = _add_observation_annotations (queryset , is_product_group )
118+ queryset = _add_license_annotations (queryset , is_product_group )
119+ elif with_metrics_annotations :
120+ queryset = _add_observation_metrics_annotations (queryset , is_product_group )
121+ queryset = _add_license_metrics_annotations (queryset , is_product_group )
92122 return queryset
93123
94124
@@ -136,6 +166,50 @@ def _add_observation_annotations(queryset: QuerySet, is_product_group: bool) ->
136166 return queryset
137167
138168
169+ def _add_observation_metrics_annotations (queryset : QuerySet , is_product_group : bool ) -> QuerySet :
170+ subquery_active_critical = (
171+ _get_product_group_metrics_subquery (Severity .SEVERITY_CRITICAL )
172+ if is_product_group
173+ else _get_product_metrics_subquery (Severity .SEVERITY_CRITICAL )
174+ )
175+ subquery_active_high = (
176+ _get_product_group_metrics_subquery (Severity .SEVERITY_HIGH )
177+ if is_product_group
178+ else _get_product_metrics_subquery (Severity .SEVERITY_HIGH )
179+ )
180+ subquery_active_medium = (
181+ _get_product_group_metrics_subquery (Severity .SEVERITY_MEDIUM )
182+ if is_product_group
183+ else _get_product_metrics_subquery (Severity .SEVERITY_MEDIUM )
184+ )
185+ subquery_active_low = (
186+ _get_product_group_metrics_subquery (Severity .SEVERITY_LOW )
187+ if is_product_group
188+ else _get_product_metrics_subquery (Severity .SEVERITY_LOW )
189+ )
190+ subquery_active_none = (
191+ _get_product_group_metrics_subquery (Severity .SEVERITY_NONE )
192+ if is_product_group
193+ else _get_product_metrics_subquery (Severity .SEVERITY_NONE )
194+ )
195+ subquery_active_unknown = (
196+ _get_product_group_metrics_subquery (Severity .SEVERITY_UNKNOWN )
197+ if is_product_group
198+ else _get_product_metrics_subquery (Severity .SEVERITY_UNKNOWN )
199+ )
200+
201+ queryset = queryset .annotate (
202+ active_critical_observation_count = Coalesce (subquery_active_critical , 0 ),
203+ active_high_observation_count = Coalesce (subquery_active_high , 0 ),
204+ active_medium_observation_count = Coalesce (subquery_active_medium , 0 ),
205+ active_low_observation_count = Coalesce (subquery_active_low , 0 ),
206+ active_none_observation_count = Coalesce (subquery_active_none , 0 ),
207+ active_unknown_observation_count = Coalesce (subquery_active_unknown , 0 ),
208+ )
209+
210+ return queryset
211+
212+
139213def _add_license_annotations (queryset : QuerySet , is_product_group : bool ) -> QuerySet :
140214 settings = Settings .load ()
141215 if settings .feature_license_management :
@@ -176,6 +250,46 @@ def _add_license_annotations(queryset: QuerySet, is_product_group: bool) -> Quer
176250 return queryset
177251
178252
253+ def _add_license_metrics_annotations (queryset : QuerySet , is_product_group : bool ) -> QuerySet :
254+ settings = Settings .load ()
255+ if settings .feature_license_management :
256+ subquery_license_forbidden = (
257+ _get_product_group_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_FORBIDDEN )
258+ if is_product_group
259+ else _get_product_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_FORBIDDEN )
260+ )
261+ subquery_license_review_required = (
262+ _get_product_group_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_REVIEW_REQUIRED )
263+ if is_product_group
264+ else _get_product_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_REVIEW_REQUIRED )
265+ )
266+ subquery_license_unknown = (
267+ _get_product_group_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_UNKNOWN )
268+ if is_product_group
269+ else _get_product_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_UNKNOWN )
270+ )
271+ subquery_license_allowed = (
272+ _get_product_group_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_ALLOWED )
273+ if is_product_group
274+ else _get_product_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_ALLOWED )
275+ )
276+ subquery_license_ignored = (
277+ _get_product_group_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_IGNORED )
278+ if is_product_group
279+ else _get_product_license_metrics_subquery (License_Policy_Evaluation_Result .RESULT_IGNORED )
280+ )
281+
282+ queryset = queryset .annotate (
283+ forbidden_licenses_count = Coalesce (subquery_license_forbidden , 0 ),
284+ review_required_licenses_count = Coalesce (subquery_license_review_required , 0 ),
285+ unknown_licenses_count = Coalesce (subquery_license_unknown , 0 ),
286+ allowed_licenses_count = Coalesce (subquery_license_allowed , 0 ),
287+ ignored_licenses_count = Coalesce (subquery_license_ignored , 0 ),
288+ )
289+
290+ return queryset
291+
292+
179293def _get_product_observation_subquery (severity : str ) -> Subquery :
180294 branch_filter = Q (branch__is_default_branch = True ) | (
181295 Q (branch__isnull = True ) & Q (product__repository_default_branch__isnull = True )
@@ -216,6 +330,23 @@ def _get_product_group_observation_subquery(severity: str) -> Subquery:
216330 )
217331
218332
333+ def _get_product_metrics_subquery (severity : str ) -> Subquery :
334+ return Subquery (
335+ Product_Metrics .objects .filter (product = OuterRef ("pk" ), date = date .today ()).values (SEVERITY_MAPPING [severity ]),
336+ output_field = IntegerField (),
337+ )
338+
339+
340+ def _get_product_group_metrics_subquery (severity : str ) -> Subquery :
341+ return Subquery (
342+ Product_Metrics .objects .filter (product__product_group = OuterRef ("pk" ), date = date .today ())
343+ .values ("product__product_group" )
344+ .annotate (total = Sum (SEVERITY_MAPPING [severity ]))
345+ .values ("total" ),
346+ output_field = IntegerField (),
347+ )
348+
349+
219350def _get_product_license_subquery (evaluation_result : str ) -> Subquery :
220351 branch_filter = Q (branch__is_default_branch = True ) | (
221352 Q (branch__isnull = True ) & Q (product__repository_default_branch__isnull = True )
@@ -252,3 +383,22 @@ def _get_product_group_license_subquery(evaluation_result: str) -> Subquery:
252383 .values ("count" ),
253384 output_field = IntegerField (),
254385 )
386+
387+
388+ def _get_product_license_metrics_subquery (evaluation_result : str ) -> Subquery :
389+ return Subquery (
390+ Product_License_Metrics .objects .filter (product = OuterRef ("pk" ), date = date .today ()).values (
391+ EVALUATION_RESULT_MAPPING [evaluation_result ]
392+ ),
393+ output_field = IntegerField (),
394+ )
395+
396+
397+ def _get_product_group_license_metrics_subquery (evaluation_result : str ) -> Subquery :
398+ return Subquery (
399+ Product_License_Metrics .objects .filter (product__product_group = OuterRef ("pk" ), date = date .today ())
400+ .values ("product__product_group" )
401+ .annotate (total = Sum (EVALUATION_RESULT_MAPPING [evaluation_result ]))
402+ .values ("total" ),
403+ output_field = IntegerField (),
404+ )
0 commit comments