From 0303a30ed54b2c8442c3475db5c0912222de59a0 Mon Sep 17 00:00:00 2001 From: JietaoXiao Date: Wed, 25 Oct 2023 09:59:01 +0800 Subject: [PATCH] sdk/metric_reader: support instant query and custom promql --- .../1_sdk/metric_reader/metric_reader.py | 25 +++- .../metric_reader/opentsdb_metric_reader.py | 3 + .../metric_reader/prometheus_metric_reader.py | 141 ++++++++++++++---- environment/1_sdk/metric_reader/result.py | 20 ++- environment/1_sdk/metric_reader/task.py | 19 ++- 5 files changed, 172 insertions(+), 36 deletions(-) diff --git a/environment/1_sdk/metric_reader/metric_reader.py b/environment/1_sdk/metric_reader/metric_reader.py index c0528e44..31cfd42a 100644 --- a/environment/1_sdk/metric_reader/metric_reader.py +++ b/environment/1_sdk/metric_reader/metric_reader.py @@ -13,7 +13,7 @@ from .result import MetricResult from .url import MetricReaderUrl from .exceptions import MetricReaderProtoAlreadyExistsException, \ MetricReaderProtoNotExistsException, MetricReaderException -from .task import RangeQueryTask +from .task import RangeQueryTask, InstantQueryTask from .common import StaticConst @@ -60,6 +60,29 @@ class MetricReader(metaclass=ABCMeta): Returns: """ + + @abstractmethod + def get_label_values(self, label_name) -> MetricResult: + """Get values list of specific label_name + + Args: + label_name: + + Returns: + + """ + + @abstractmethod + def instant_query(self, queries: List[InstantQueryTask]) -> MetricResult: + """Query data using query API for a specified metric with promql aggregation function + + Args: + queries([InstantQueryTask]): Query tasks + + Returns: + RangeVectorResult + """ + pass @abstractmethod def range_query(self, queries: List[RangeQueryTask]) -> MetricResult: diff --git a/environment/1_sdk/metric_reader/opentsdb_metric_reader.py b/environment/1_sdk/metric_reader/opentsdb_metric_reader.py index dca92f0b..aea284b3 100644 --- a/environment/1_sdk/metric_reader/opentsdb_metric_reader.py +++ b/environment/1_sdk/metric_reader/opentsdb_metric_reader.py @@ -70,6 +70,9 @@ class OpentsdbMetricReader(MetricReader): else: mr.data = res.json() return mr + + def get_label_values(self, label_name: str) -> List[str]: + pass def range_query(self, queries: List[RangeQueryTask]) -> MetricResult: params = { diff --git a/environment/1_sdk/metric_reader/prometheus_metric_reader.py b/environment/1_sdk/metric_reader/prometheus_metric_reader.py index 31f8e2dc..ff2d83f9 100644 --- a/environment/1_sdk/metric_reader/prometheus_metric_reader.py +++ b/environment/1_sdk/metric_reader/prometheus_metric_reader.py @@ -8,18 +8,24 @@ Description: """ import requests from typing import List +from enum import Enum from urllib.parse import urljoin -from .result import MetricResult, RangeVectorResult +from .result import * from .metric_reader import MetricReader from .url import MetricReaderUrl -from .task import RangeQueryTask +from .task import RangeQueryTask, QueryTask, InstantQueryTask from .filter import FilterType from .common import StaticConst GET_LABEL_NAMES = "/api/v1/series" RANGE_QUERY_API = "/api/v1/query_range" +QUERY_API = "/api/v1/query" METRIC_METADATA = "/api/v1/metadata" +GET_LABEL_VALUE = "/api/v1/label" +class QueryType(Enum): + instant = 0 + range = 1 class PrometheusMetricReader(MetricReader): def __init__(self, url: MetricReaderUrl, **kwargs): @@ -31,6 +37,50 @@ class PrometheusMetricReader(MetricReader): def _get_url(self, api: str): return urljoin(self.base_url, api) + + def _get_basic_promql_query(self, task: QueryTask): + promql_str = task.metric_name + rules = [] + if task.filters is not None and len(task.filters) > 0: + for flt in task.filters: + if flt.filter_type == FilterType.Equal: + rules.append(f'{flt.label_name}="{flt.value}"') + elif flt.filter_type == FilterType.Wildcard: + rules.append(f'{flt.label_name}=~' + f'"{flt.value.replace("*", "(.*?)")}"') + promql_str = promql_str + "{" + ",".join(rules) + "}" + return promql_str + + def _parse_res(self, query_type, res) -> MetricResult: + mr = MetricResult(0, []) + if res.status_code != 200: + mr.code = 1 + mr.err_msg = "Request failed, status_code != 200" + else: + json_res = res.json() + if json_res["status"] != "success": + mr.code = 1 + mr.err_msg = f"Prometheus API error: {json_res['error']}" + else: + if query_type == QueryType.range: + mr.data = [ + RangeVectorResult(item["metric"]["__name__"], + item["metric"], + values=item["values"]) + for item in json_res["data"]["result"] + ] + elif query_type == QueryType.instant: + for item in json_res["data"]["result"]: + metric_name = "" + if "__name__" in item["metric"]: + metric_name = item["metric"]["__name__"] + mr.data.append( + InstantVectorResult(metric_name, + item["metric"], + value=item["value"]) + ) + + return mr def get_metric_names(self, limit: int = -1) -> MetricResult: params = {} @@ -70,45 +120,70 @@ class PrometheusMetricReader(MetricReader): if len(series) > 0: mr.data = list(series[0].keys()) return mr + + def get_label_values(self, label_name) -> MetricResult: + url = GET_LABEL_VALUE + "/" + label_name + "/values" + res = requests.get(self._get_url(url)) + mr = MetricResult(0, []) + if res.status_code != 200: + mr.code = 1 + mr.err_msg = f"Get values for {label_name} failed!" + else: + json_res = res.json() + if json_res["status"] != "success": + mr.code = 1 + mr.err_msg = (f"Get label values for" + f" {label_name} failed => {json_res['error']}") + else: + values = json_res["data"] + if len(values) > 0: + mr.data = values + return mr - def range_query(self, queries: List[RangeQueryTask]) -> MetricResult: - def get_promql_query(task: RangeQueryTask): - promql_str = task.metric_name - rules = [] - if task.filters is not None and len(task.filters) > 0: - for flt in task.filters: - if flt.filter_type == FilterType.Equal: - rules.append(f'{flt.label_name}="{flt.value}"') - elif flt.filter_type == FilterType.Wildcard: - rules.append(f'{flt.label_name}=~' - f'"{flt.value.replace("*", "(.*?)")}"') - promql_str = promql_str + "{" + ",".join(rules) + "}" - return promql_str + def instant_query(self, queries: List[InstantQueryTask]) -> MetricResult: + def query_one(task: InstantQueryTask): + basic_query = self._get_basic_promql_query(task) + """ + range vector aggregation function also need to use query api + example: curl -g 'http://localhost:9090/api/v1/query? + query=avg_over_time(sysom_cgroups[5m])&time=1696679657.796' + """ + if task.aggregation is not None: + if task.interval is not None: + interval = f"[{task.interval}]" + basic_query = basic_query + interval + basic_query = task.aggregation + "(" + basic_query + ")" + if task.clause_label is not None: + basic_query = basic_query + task.clause \ + + "(" + ",".join(task.clause_label) + ")" + + res = requests.get(self._get_url(QUERY_API), { + "query": basic_query, + "time": task.time, + }) + + return self._parse_res(QueryType.instant, res) + + merged_result = MetricResult(0, []) + for query in queries: + mr_result = query_one(query) + if mr_result.code != 0: + merged_result.code = mr_result.code + merged_result.err_msg = mr_result.err_msg + break + merged_result.data.extend(mr_result.data) + return merged_result + def range_query(self, queries: List[RangeQueryTask]) -> MetricResult: def query_one(task: RangeQueryTask): res = requests.get(self._get_url(RANGE_QUERY_API), { - "query": get_promql_query(task), + "query": self._get_basic_promql_query(task), "start": task.start_time, "end": task.end_time, "step": f"{task.step}s" }) - mr = MetricResult(0, []) - if res.status_code != 200: - mr.code = 1 - mr.err_msg = "Request failed, status_code != 200" - else: - json_res = res.json() - if json_res["status"] != "success": - mr.code = 1 - mr.err_msg = f"Prometheus API error: {json_res['error']}" - else: - mr.data = [ - RangeVectorResult(item["metric"]["__name__"], - item["metric"], - values=item["values"]) - for item in json_res["data"]["result"] - ] - return mr + + return self._parse_res(QueryType.range, res) merged_result = MetricResult(0, []) for query in queries: diff --git a/environment/1_sdk/metric_reader/result.py b/environment/1_sdk/metric_reader/result.py index ed3a8c1b..3fedebca 100644 --- a/environment/1_sdk/metric_reader/result.py +++ b/environment/1_sdk/metric_reader/result.py @@ -25,10 +25,26 @@ class RangeVectorResult: def to_dict(self): return dict(self) +class InstantVectorResult: + def __init__(self, metric_name: str, labels: Dict[str, str], + value: Tuple[float, float]): + self.metric_name = metric_name + self.labels = labels + self.value = value + + def keys(self): + return ('metric_name', 'labels', 'value') + + def __getitem__(self, item): + return getattr(self, item) + + def to_dict(self): + return dict(self) + class MetricResult: def __init__(self, code: int, - data: Union[List[RangeVectorResult], List[str]], + data: Union[List[RangeVectorResult], List[InstantVectorResult], List[str]], err_msg: str = ""): self.code = code self.err_msg = err_msg @@ -43,6 +59,8 @@ class MetricResult: if isinstance(obj, List) and len(obj) > 0: if isinstance(obj[0], RangeVectorResult): return [item.to_dict() for item in obj] + if isinstance(obj[0], InstantVectorResult): + return [item.to_dict() for item in obj] else: return obj else: diff --git a/environment/1_sdk/metric_reader/task.py b/environment/1_sdk/metric_reader/task.py index d178ad23..228569a3 100644 --- a/environment/1_sdk/metric_reader/task.py +++ b/environment/1_sdk/metric_reader/task.py @@ -44,7 +44,6 @@ class QueryTask: self.filters.append(EqualFilter(label_name, value)) return self - class RangeQueryTask(QueryTask): """ Args: @@ -60,3 +59,21 @@ class RangeQueryTask(QueryTask): self.start_time = start_time self.end_time = end_time self.step = step + +class InstantQueryTask(QueryTask): + """ + Args: + time(float): Instant time (Unix timestamp) + aggregation(str):Built-in aggregation function of promql + (e.g. avg_over_time, rate, sum) + interval(str): Range of time use to get a range vector(e.g. 5m) + """ + def __init__(self, metric_name: str, time: float, + aggregation: str = None, interval: str = None, + filters: List[Filter] = None, clause_label: List[str] = None): + super().__init__(metric_name, filters) + self.time = time + self.aggregation = aggregation + self.interval = interval + self.clause = "by" + self.clause_label = clause_label \ No newline at end of file -- Gitee