Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion api/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,5 +19,6 @@ features/workflows/logic/
/tests/ldap_integration_tests/
/tests/saml_unit_tests/

# Unit test coverage
# Code maintenance
.coverage
.mypy_cache
1 change: 1 addition & 0 deletions api/api/urls/v1.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,7 @@
name="environment-document",
),
re_path("", include("features.versioning.urls", namespace="versioning")),
path("", include("features.feature_lifecycle.urls", namespace="feature-lifecycle")),
# API documentation
path(
"swagger.json",
Expand Down
71 changes: 43 additions & 28 deletions api/app_analytics/influxdb_wrapper.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import json
import logging
import typing
Expand All @@ -21,21 +22,10 @@
map_flux_tables_to_usage_data,
map_labels_to_influx_record_values,
)
from app_analytics.types import Labels
from app_analytics.types import DownsampleSize, Labels

logger = logging.getLogger(__name__)

url = settings.INFLUXDB_URL
token = settings.INFLUXDB_TOKEN
influx_org = settings.INFLUXDB_ORG
read_bucket = settings.INFLUXDB_BUCKET + "_downsampled_15m"

retries = Retry(connect=3, read=3, redirect=3)
# Set a timeout to prevent threads being potentially stuck open due to network weirdness
influxdb_client = InfluxDBClient(
url=url, token=token, org=influx_org, retries=retries, timeout=30000
)

DEFAULT_DROP_COLUMNS = (
"organisation",
"organisation_id",
Expand All @@ -52,18 +42,35 @@
)


def get_range_bucket_mappings(date_start: datetime) -> str:
now = timezone.now()
if (now - date_start).days > 10:
return settings.INFLUXDB_BUCKET + "_downsampled_1h"
return settings.INFLUXDB_BUCKET + "_downsampled_15m"


class InfluxDBWrapper:
client = None

def __init__(self, name): # type: ignore[no-untyped-def]
self.name = name
self.records = []
self.write_api = influxdb_client.write_api(write_options=SYNCHRONOUS)

@classmethod
@functools.cache
def get_client(cls) -> InfluxDBClient:
"""A singleton InfluxDB client instance"""
retries = Retry(connect=3, read=3, redirect=3)
return InfluxDBClient(
url=settings.INFLUXDB_URL,
token=settings.INFLUXDB_TOKEN,
org=settings.INFLUXDB_ORG,
retries=retries,
timeout=30000, # Hard stop to prevent hanging requests
)

@classmethod
def get_bucket(cls, size: DownsampleSize) -> str:
return f"{settings.INFLUXDB_BUCKET}_downsampled_{size}"

@classmethod
def get_downsampled_bucket(cls, date_start: datetime) -> str:
if (timezone.now() - date_start).days > 10:
return cls.get_bucket(DownsampleSize.LONG_TERM)
return cls.get_bucket(DownsampleSize.SHORT_TERM)

def add_data_point(
self,
Expand All @@ -85,8 +92,12 @@ def add_data_point(
self.records.append(point)

def write(self) -> None:
"""Persist collected data points to InfluxDB"""
try:
self.write_api.write(bucket=settings.INFLUXDB_BUCKET, record=self.records)
self.get_client().write_api(write_options=SYNCHRONOUS).write(
bucket=settings.INFLUXDB_BUCKET,
record=self.records,
)
except (HTTPError, InfluxDBError) as e:
logger.warning(
"Failed to write records to Influx: %s",
Expand All @@ -99,15 +110,19 @@ def write(self) -> None:
settings.INFLUXDB_BUCKET,
)

@staticmethod
@classmethod
def influx_query_manager(
cls,
date_start: datetime | None = None,
date_stop: datetime | None = None,
drop_columns: tuple[str, ...] = DEFAULT_DROP_COLUMNS,
filters: str = "|> filter(fn:(r) => r._measurement == 'api_call')",
extra: str = "",
bucket: str = read_bucket,
bucket: str | None = None,
) -> list[FluxTable]:
if bucket is None:
bucket = cls.get_bucket(DownsampleSize.SHORT_TERM) # NOTE: Legacy default

now = timezone.now()
if date_start is None:
date_start = now - timedelta(days=30)
Expand All @@ -119,7 +134,7 @@ def influx_query_manager(
if date_start == date_stop:
return []

query_api = influxdb_client.query_api()
query_api = cls.get_client().query_api()
drop_columns_input = str(list(drop_columns)).replace("'", '"')

query = (
Expand All @@ -132,7 +147,7 @@ def influx_query_manager(
logger.debug("Running query in influx: \n\n %s", query)

try:
return query_api.query(org=influx_org, query=query)
return query_api.query(org=settings.INFLUXDB_ORG, query=query)
except HTTPError as e:
capture_exception(e)
return []
Expand Down Expand Up @@ -390,7 +405,7 @@ def get_top_organisations(
if limit:
limit = f"|> limit(n:{limit})"

bucket = get_range_bucket_mappings(date_start)
bucket = InfluxDBWrapper.get_downsampled_bucket(date_start)
results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
bucket=bucket,
Expand Down Expand Up @@ -432,7 +447,7 @@ def get_current_api_usage(
:return: number of current api calls
"""
bucket = read_bucket
bucket = InfluxDBWrapper.get_bucket(DownsampleSize.SHORT_TERM)
results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
bucket=bucket,
Expand Down Expand Up @@ -474,7 +489,7 @@ def get_platform_usage_trends(

org_id_set = ", ".join(f'"{oid}"' for oid in organisation_ids)

bucket = get_range_bucket_mappings(date_start)
bucket = InfluxDBWrapper.get_downsampled_bucket(date_start)
results = InfluxDBWrapper.influx_query_manager(
date_start=date_start,
date_stop=date_stop,
Expand Down
6 changes: 4 additions & 2 deletions api/app_analytics/migrate_to_pg.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
from app_analytics.constants import ANALYTICS_READ_BUCKET_SIZE
from app_analytics.influxdb_wrapper import influxdb_client, read_bucket
from app_analytics.influxdb_wrapper import InfluxDBWrapper
from app_analytics.models import FeatureEvaluationBucket
from app_analytics.types import DownsampleSize


def migrate_feature_evaluations(migrate_till: int = 30) -> None:
query_api = influxdb_client.query_api()
query_api = InfluxDBWrapper.get_client().query_api()
read_bucket = InfluxDBWrapper.get_bucket(DownsampleSize.SHORT_TERM)

for i in range(migrate_till):
range_start = f"-{i + 1}d"
Expand Down
69 changes: 68 additions & 1 deletion api/app_analytics/services.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,16 @@
from datetime import datetime

from django.conf import settings
from django.db.models import QuerySet

from app_analytics import constants
from app_analytics.cache import APIUsageCache
from app_analytics.models import Resource
from app_analytics.influxdb_wrapper import InfluxDBWrapper, build_filter_string
from app_analytics.models import FeatureEvaluationBucket, Resource
from app_analytics.tasks import track_request
from app_analytics.types import Labels
from environments.models import Environment
from features.models import Feature

api_usage_cache = APIUsageCache()

Expand Down Expand Up @@ -31,3 +38,63 @@ def track_usage_by_resource_host_and_environment(
"labels": labels,
}
)


def get_features_in_use(
environment: Environment,
since: datetime | None = None,
) -> QuerySet[Feature] | None:
"""Obtain features found in recent analytics data, i.e. in use"""
if settings.USE_POSTGRES_FOR_ANALYTICS:
feature_names = _get_feature_names_in_use_from_analytics_db(environment, since)
elif settings.INFLUXDB_TOKEN:
feature_names = _get_feature_names_in_use_from_influxdb(environment, since)
else:
return None
features_in_use: QuerySet[Feature] = Feature.objects.filter(
name__in=feature_names,
project__environments=environment,
)
return features_in_use


def _get_feature_names_in_use_from_analytics_db(
environment: Environment,
since: datetime | None = None,
) -> list[str]:
# NOTE: Neighbour buckets may bleed depending on `since`
buckets = FeatureEvaluationBucket.objects.filter(
environment_id=environment.pk,
bucket_size=constants.ANALYTICS_READ_BUCKET_SIZE,
created_at__gte=since,
total_count__gt=0,
)
feature_names = buckets.values_list("feature_name", flat=True).distinct()
return list(feature_names)


def _get_feature_names_in_use_from_influxdb(
environment: Environment,
since: datetime | None = None,
) -> list[str]:
results = InfluxDBWrapper.influx_query_manager(
date_start=since,
filters=build_filter_string(
[
'r._measurement == "feature_evaluation"',
'r["_field"] == "request_count"',
f'r["environment_id"] == "{environment.pk}"',
]
),
extra=(
'|> keep(columns: ["feature_id"]) '
'|> distinct(column: "feature_id") '
'|> yield(name: "distinct")'
),
)
return [
feature_name
for table in results
for record in table.records
if (feature_name := record.get_value()) is not None
]
6 changes: 6 additions & 0 deletions api/app_analytics/types.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from datetime import date
from enum import StrEnum
from typing import TYPE_CHECKING, Literal, NamedTuple, TypeAlias, TypedDict

if TYPE_CHECKING:
Expand Down Expand Up @@ -27,6 +28,11 @@
]


class DownsampleSize(StrEnum):
SHORT_TERM = "15m"
LONG_TERM = "1h"


class APIUsageCacheKey(NamedTuple):
resource: "Resource"
host: str
Expand Down
12 changes: 12 additions & 0 deletions api/features/feature_lifecycle/serializers.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
from rest_framework import serializers


class FeatureLifecycleCountsSerializer(serializers.Serializer[dict[str, int]]):
"""Number of features in each lifecycle stage for an environment"""

new = serializers.IntegerField()
live = serializers.IntegerField()
stale = serializers.IntegerField()
permanent = serializers.IntegerField()
needs_monitoring = serializers.IntegerField()
to_remove = serializers.IntegerField()
92 changes: 92 additions & 0 deletions api/features/feature_lifecycle/services.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
from datetime import timedelta

from django.db.models.expressions import Case, Exists, OuterRef, Value, When
from django.db.models.fields import BooleanField
from django.db.models.functions import Cast
from django.db.models.query import QuerySet
from django.utils import timezone

from app_analytics.services import get_features_in_use
from environments.models import Environment
from features.feature_lifecycle.types import LifecycleStage
from features.models import Feature
from integrations.flagsmith.client import get_openfeature_client
from organisations.models import Organisation
from projects.code_references.services import get_feature_flags_in_latest_scan
from projects.tags.models import Tag, TagType


def is_feature_lifecycle_enabled(organisation: Organisation) -> bool:
return get_openfeature_client().get_boolean_value(
"feature_lifecycle",
default_value=False,
evaluation_context=organisation.openfeature_evaluation_context,
)


def annotate_feature_queryset_with_lifecycle_stage(
queryset: QuerySet[Feature],
environment: Environment,
) -> QuerySet[Feature]:
"""Annotate `queryset` with `lifecycle_stage: LifecycleStage`."""
days_until_stale = environment.project.stale_flags_limit_days
usage_window = timezone.now() - timedelta(days=days_until_stale)

features_in_code = get_feature_flags_in_latest_scan(environment.project)
features_in_use = get_features_in_use(environment, since=usage_window)

return queryset.alias(
has_code_references=Exists(
features_in_code.filter(pk=OuterRef("pk")),
),
has_recent_usage=(
Exists(features_in_use.filter(pk=OuterRef("pk")))
if features_in_use is not None
else Cast(Value(None), output_field=BooleanField())
),
has_permanent_tag=Exists(
Tag.objects.filter(feature=OuterRef("pk"), is_permanent=True),
),
has_stale_tag=Exists(
Tag.objects.filter(feature=OuterRef("pk"), type=TagType.STALE),
),
).annotate(
lifecycle_stage=Case(
When(
has_code_references=False,
has_permanent_tag=False,
has_stale_tag=False,
then=Value(LifecycleStage.NEW),
),
When(
has_code_references=True,
has_permanent_tag=False,
has_stale_tag=False,
then=Value(LifecycleStage.LIVE),
),
When(
has_code_references=True,
has_permanent_tag=False,
has_stale_tag=True,
then=Value(LifecycleStage.STALE),
),
When(
has_permanent_tag=True,
then=Value(LifecycleStage.PERMANENT),
),
When(
has_code_references=False,
has_permanent_tag=False,
has_stale_tag=True,
has_recent_usage=True,
then=Value(LifecycleStage.NEEDS_MONITORING),
),
When(
has_code_references=False,
has_permanent_tag=False,
has_stale_tag=True,
has_recent_usage=False,
then=Value(LifecycleStage.TO_REMOVE),
),
),
)
Loading
Loading