|
- import io
- import os
- import random
- import re
- import sys
- import threading
- import time
- import warnings
- import zlib
- from abc import ABC, abstractmethod
- from contextlib import contextmanager
- from datetime import datetime, timezone
- from functools import wraps, partial
-
- import sentry_sdk
- from sentry_sdk.utils import (
- ContextVar,
- now,
- nanosecond_time,
- to_timestamp,
- serialize_frame,
- json_dumps,
- )
- from sentry_sdk.envelope import Envelope, Item
- from sentry_sdk.tracing import TransactionSource
-
- from typing import TYPE_CHECKING
-
- if TYPE_CHECKING:
- from typing import Any
- from typing import Callable
- from typing import Dict
- from typing import Generator
- from typing import Iterable
- from typing import List
- from typing import Optional
- from typing import Set
- from typing import Tuple
- from typing import Union
-
- from sentry_sdk._types import BucketKey
- from sentry_sdk._types import DurationUnit
- from sentry_sdk._types import FlushedMetricValue
- from sentry_sdk._types import MeasurementUnit
- from sentry_sdk._types import MetricMetaKey
- from sentry_sdk._types import MetricTagValue
- from sentry_sdk._types import MetricTags
- from sentry_sdk._types import MetricTagsInternal
- from sentry_sdk._types import MetricType
- from sentry_sdk._types import MetricValue
-
-
- warnings.warn(
- "The sentry_sdk.metrics module is deprecated and will be removed in the next major release. "
- "Sentry will reject all metrics sent after October 7, 2024. "
- "Learn more: https://sentry.zendesk.com/hc/en-us/articles/26369339769883-Upcoming-API-Changes-to-Metrics",
- DeprecationWarning,
- stacklevel=2,
- )
-
- _in_metrics = ContextVar("in_metrics", default=False)
- _set = set # set is shadowed below
-
- GOOD_TRANSACTION_SOURCES = frozenset(
- [
- TransactionSource.ROUTE,
- TransactionSource.VIEW,
- TransactionSource.COMPONENT,
- TransactionSource.TASK,
- ]
- )
-
- _sanitize_unit = partial(re.compile(r"[^a-zA-Z0-9_]+").sub, "")
- _sanitize_metric_key = partial(re.compile(r"[^a-zA-Z0-9_\-.]+").sub, "_")
- _sanitize_tag_key = partial(re.compile(r"[^a-zA-Z0-9_\-.\/]+").sub, "")
-
-
- def _sanitize_tag_value(value):
- # type: (str) -> str
- table = str.maketrans(
- {
- "\n": "\\n",
- "\r": "\\r",
- "\t": "\\t",
- "\\": "\\\\",
- "|": "\\u{7c}",
- ",": "\\u{2c}",
- }
- )
- return value.translate(table)
-
-
- def get_code_location(stacklevel):
- # type: (int) -> Optional[Dict[str, Any]]
- try:
- frm = sys._getframe(stacklevel)
- except Exception:
- return None
-
- return serialize_frame(
- frm, include_local_variables=False, include_source_context=True
- )
-
-
- @contextmanager
- def recursion_protection():
- # type: () -> Generator[bool, None, None]
- """Enters recursion protection and returns the old flag."""
- old_in_metrics = _in_metrics.get()
- _in_metrics.set(True)
- try:
- yield old_in_metrics
- finally:
- _in_metrics.set(old_in_metrics)
-
-
- def metrics_noop(func):
- # type: (Any) -> Any
- """Convenient decorator that uses `recursion_protection` to
- make a function a noop.
- """
-
- @wraps(func)
- def new_func(*args, **kwargs):
- # type: (*Any, **Any) -> Any
- with recursion_protection() as in_metrics:
- if not in_metrics:
- return func(*args, **kwargs)
-
- return new_func
-
-
- class Metric(ABC):
- __slots__ = ()
-
- @abstractmethod
- def __init__(self, first):
- # type: (MetricValue) -> None
- pass
-
- @property
- @abstractmethod
- def weight(self):
- # type: () -> int
- pass
-
- @abstractmethod
- def add(self, value):
- # type: (MetricValue) -> None
- pass
-
- @abstractmethod
- def serialize_value(self):
- # type: () -> Iterable[FlushedMetricValue]
- pass
-
-
- class CounterMetric(Metric):
- __slots__ = ("value",)
-
- def __init__(
- self, first # type: MetricValue
- ):
- # type: (...) -> None
- self.value = float(first)
-
- @property
- def weight(self):
- # type: (...) -> int
- return 1
-
- def add(
- self, value # type: MetricValue
- ):
- # type: (...) -> None
- self.value += float(value)
-
- def serialize_value(self):
- # type: (...) -> Iterable[FlushedMetricValue]
- return (self.value,)
-
-
- class GaugeMetric(Metric):
- __slots__ = (
- "last",
- "min",
- "max",
- "sum",
- "count",
- )
-
- def __init__(
- self, first # type: MetricValue
- ):
- # type: (...) -> None
- first = float(first)
- self.last = first
- self.min = first
- self.max = first
- self.sum = first
- self.count = 1
-
- @property
- def weight(self):
- # type: (...) -> int
- # Number of elements.
- return 5
-
- def add(
- self, value # type: MetricValue
- ):
- # type: (...) -> None
- value = float(value)
- self.last = value
- self.min = min(self.min, value)
- self.max = max(self.max, value)
- self.sum += value
- self.count += 1
-
- def serialize_value(self):
- # type: (...) -> Iterable[FlushedMetricValue]
- return (
- self.last,
- self.min,
- self.max,
- self.sum,
- self.count,
- )
-
-
- class DistributionMetric(Metric):
- __slots__ = ("value",)
-
- def __init__(
- self, first # type: MetricValue
- ):
- # type(...) -> None
- self.value = [float(first)]
-
- @property
- def weight(self):
- # type: (...) -> int
- return len(self.value)
-
- def add(
- self, value # type: MetricValue
- ):
- # type: (...) -> None
- self.value.append(float(value))
-
- def serialize_value(self):
- # type: (...) -> Iterable[FlushedMetricValue]
- return self.value
-
-
- class SetMetric(Metric):
- __slots__ = ("value",)
-
- def __init__(
- self, first # type: MetricValue
- ):
- # type: (...) -> None
- self.value = {first}
-
- @property
- def weight(self):
- # type: (...) -> int
- return len(self.value)
-
- def add(
- self, value # type: MetricValue
- ):
- # type: (...) -> None
- self.value.add(value)
-
- def serialize_value(self):
- # type: (...) -> Iterable[FlushedMetricValue]
- def _hash(x):
- # type: (MetricValue) -> int
- if isinstance(x, str):
- return zlib.crc32(x.encode("utf-8")) & 0xFFFFFFFF
- return int(x)
-
- return (_hash(value) for value in self.value)
-
-
- def _encode_metrics(flushable_buckets):
- # type: (Iterable[Tuple[int, Dict[BucketKey, Metric]]]) -> bytes
- out = io.BytesIO()
- _write = out.write
-
- # Note on sanitization: we intentionally sanitize in emission (serialization)
- # and not during aggregation for performance reasons. This means that the
- # envelope can in fact have duplicate buckets stored. This is acceptable for
- # relay side emission and should not happen commonly.
-
- for timestamp, buckets in flushable_buckets:
- for bucket_key, metric in buckets.items():
- metric_type, metric_name, metric_unit, metric_tags = bucket_key
- metric_name = _sanitize_metric_key(metric_name)
- metric_unit = _sanitize_unit(metric_unit)
- _write(metric_name.encode("utf-8"))
- _write(b"@")
- _write(metric_unit.encode("utf-8"))
-
- for serialized_value in metric.serialize_value():
- _write(b":")
- _write(str(serialized_value).encode("utf-8"))
-
- _write(b"|")
- _write(metric_type.encode("ascii"))
-
- if metric_tags:
- _write(b"|#")
- first = True
- for tag_key, tag_value in metric_tags:
- tag_key = _sanitize_tag_key(tag_key)
- if not tag_key:
- continue
- if first:
- first = False
- else:
- _write(b",")
- _write(tag_key.encode("utf-8"))
- _write(b":")
- _write(_sanitize_tag_value(tag_value).encode("utf-8"))
-
- _write(b"|T")
- _write(str(timestamp).encode("ascii"))
- _write(b"\n")
-
- return out.getvalue()
-
-
- def _encode_locations(timestamp, code_locations):
- # type: (int, Iterable[Tuple[MetricMetaKey, Dict[str, Any]]]) -> bytes
- mapping = {} # type: Dict[str, List[Any]]
-
- for key, loc in code_locations:
- metric_type, name, unit = key
- mri = "{}:{}@{}".format(
- metric_type, _sanitize_metric_key(name), _sanitize_unit(unit)
- )
-
- loc["type"] = "location"
- mapping.setdefault(mri, []).append(loc)
-
- return json_dumps({"timestamp": timestamp, "mapping": mapping})
-
-
- METRIC_TYPES = {
- "c": CounterMetric,
- "g": GaugeMetric,
- "d": DistributionMetric,
- "s": SetMetric,
- } # type: dict[MetricType, type[Metric]]
-
- # some of these are dumb
- TIMING_FUNCTIONS = {
- "nanosecond": nanosecond_time,
- "microsecond": lambda: nanosecond_time() / 1000.0,
- "millisecond": lambda: nanosecond_time() / 1000000.0,
- "second": now,
- "minute": lambda: now() / 60.0,
- "hour": lambda: now() / 3600.0,
- "day": lambda: now() / 3600.0 / 24.0,
- "week": lambda: now() / 3600.0 / 24.0 / 7.0,
- }
-
-
- class LocalAggregator:
- __slots__ = ("_measurements",)
-
- def __init__(self):
- # type: (...) -> None
- self._measurements = (
- {}
- ) # type: Dict[Tuple[str, MetricTagsInternal], Tuple[float, float, int, float]]
-
- def add(
- self,
- ty, # type: MetricType
- key, # type: str
- value, # type: float
- unit, # type: MeasurementUnit
- tags, # type: MetricTagsInternal
- ):
- # type: (...) -> None
- export_key = "%s:%s@%s" % (ty, key, unit)
- bucket_key = (export_key, tags)
-
- old = self._measurements.get(bucket_key)
- if old is not None:
- v_min, v_max, v_count, v_sum = old
- v_min = min(v_min, value)
- v_max = max(v_max, value)
- v_count += 1
- v_sum += value
- else:
- v_min = v_max = v_sum = value
- v_count = 1
- self._measurements[bucket_key] = (v_min, v_max, v_count, v_sum)
-
- def to_json(self):
- # type: (...) -> Dict[str, Any]
- rv = {} # type: Any
- for (export_key, tags), (
- v_min,
- v_max,
- v_count,
- v_sum,
- ) in self._measurements.items():
- rv.setdefault(export_key, []).append(
- {
- "tags": _tags_to_dict(tags),
- "min": v_min,
- "max": v_max,
- "count": v_count,
- "sum": v_sum,
- }
- )
- return rv
-
-
- class MetricsAggregator:
- ROLLUP_IN_SECONDS = 10.0
- MAX_WEIGHT = 100000
- FLUSHER_SLEEP_TIME = 5.0
-
- def __init__(
- self,
- capture_func, # type: Callable[[Envelope], None]
- enable_code_locations=False, # type: bool
- ):
- # type: (...) -> None
- self.buckets = {} # type: Dict[int, Any]
- self._enable_code_locations = enable_code_locations
- self._seen_locations = _set() # type: Set[Tuple[int, MetricMetaKey]]
- self._pending_locations = {} # type: Dict[int, List[Tuple[MetricMetaKey, Any]]]
- self._buckets_total_weight = 0
- self._capture_func = capture_func
- self._running = True
- self._lock = threading.Lock()
-
- self._flush_event = threading.Event() # type: threading.Event
- self._force_flush = False
-
- # The aggregator shifts its flushing by up to an entire rollup window to
- # avoid multiple clients trampling on end of a 10 second window as all the
- # buckets are anchored to multiples of ROLLUP seconds. We randomize this
- # number once per aggregator boot to achieve some level of offsetting
- # across a fleet of deployed SDKs. Relay itself will also apply independent
- # jittering.
- self._flush_shift = random.random() * self.ROLLUP_IN_SECONDS
-
- self._flusher = None # type: Optional[threading.Thread]
- self._flusher_pid = None # type: Optional[int]
-
- def _ensure_thread(self):
- # type: (...) -> bool
- """For forking processes we might need to restart this thread.
- This ensures that our process actually has that thread running.
- """
- if not self._running:
- return False
-
- pid = os.getpid()
- if self._flusher_pid == pid:
- return True
-
- with self._lock:
- # Recheck to make sure another thread didn't get here and start the
- # the flusher in the meantime
- if self._flusher_pid == pid:
- return True
-
- self._flusher_pid = pid
-
- self._flusher = threading.Thread(target=self._flush_loop)
- self._flusher.daemon = True
-
- try:
- self._flusher.start()
- except RuntimeError:
- # Unfortunately at this point the interpreter is in a state that no
- # longer allows us to spawn a thread and we have to bail.
- self._running = False
- return False
-
- return True
-
- def _flush_loop(self):
- # type: (...) -> None
- _in_metrics.set(True)
- while self._running or self._force_flush:
- if self._running:
- self._flush_event.wait(self.FLUSHER_SLEEP_TIME)
- self._flush()
-
- def _flush(self):
- # type: (...) -> None
- self._emit(self._flushable_buckets(), self._flushable_locations())
-
- def _flushable_buckets(self):
- # type: (...) -> (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
- with self._lock:
- force_flush = self._force_flush
- cutoff = time.time() - self.ROLLUP_IN_SECONDS - self._flush_shift
- flushable_buckets = () # type: Iterable[Tuple[int, Dict[BucketKey, Metric]]]
- weight_to_remove = 0
-
- if force_flush:
- flushable_buckets = self.buckets.items()
- self.buckets = {}
- self._buckets_total_weight = 0
- self._force_flush = False
- else:
- flushable_buckets = []
- for buckets_timestamp, buckets in self.buckets.items():
- # If the timestamp of the bucket is newer that the rollup we want to skip it.
- if buckets_timestamp <= cutoff:
- flushable_buckets.append((buckets_timestamp, buckets))
-
- # We will clear the elements while holding the lock, in order to avoid requesting it downstream again.
- for buckets_timestamp, buckets in flushable_buckets:
- for metric in buckets.values():
- weight_to_remove += metric.weight
- del self.buckets[buckets_timestamp]
-
- self._buckets_total_weight -= weight_to_remove
-
- return flushable_buckets
-
- def _flushable_locations(self):
- # type: (...) -> Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
- with self._lock:
- locations = self._pending_locations
- self._pending_locations = {}
- return locations
-
- @metrics_noop
- def add(
- self,
- ty, # type: MetricType
- key, # type: str
- value, # type: MetricValue
- unit, # type: MeasurementUnit
- tags, # type: Optional[MetricTags]
- timestamp=None, # type: Optional[Union[float, datetime]]
- local_aggregator=None, # type: Optional[LocalAggregator]
- stacklevel=0, # type: Optional[int]
- ):
- # type: (...) -> None
- if not self._ensure_thread() or self._flusher is None:
- return None
-
- if timestamp is None:
- timestamp = time.time()
- elif isinstance(timestamp, datetime):
- timestamp = to_timestamp(timestamp)
-
- bucket_timestamp = int(
- (timestamp // self.ROLLUP_IN_SECONDS) * self.ROLLUP_IN_SECONDS
- )
- serialized_tags = _serialize_tags(tags)
- bucket_key = (
- ty,
- key,
- unit,
- serialized_tags,
- )
-
- with self._lock:
- local_buckets = self.buckets.setdefault(bucket_timestamp, {})
- metric = local_buckets.get(bucket_key)
- if metric is not None:
- previous_weight = metric.weight
- metric.add(value)
- else:
- metric = local_buckets[bucket_key] = METRIC_TYPES[ty](value)
- previous_weight = 0
-
- added = metric.weight - previous_weight
-
- if stacklevel is not None:
- self.record_code_location(ty, key, unit, stacklevel + 2, timestamp)
-
- # Given the new weight we consider whether we want to force flush.
- self._consider_force_flush()
-
- # For sets, we only record that a value has been added to the set but not which one.
- # See develop docs: https://develop.sentry.dev/sdk/metrics/#sets
- if local_aggregator is not None:
- local_value = float(added if ty == "s" else value)
- local_aggregator.add(ty, key, local_value, unit, serialized_tags)
-
- def record_code_location(
- self,
- ty, # type: MetricType
- key, # type: str
- unit, # type: MeasurementUnit
- stacklevel, # type: int
- timestamp=None, # type: Optional[float]
- ):
- # type: (...) -> None
- if not self._enable_code_locations:
- return
- if timestamp is None:
- timestamp = time.time()
- meta_key = (ty, key, unit)
- start_of_day = datetime.fromtimestamp(timestamp, timezone.utc).replace(
- hour=0, minute=0, second=0, microsecond=0, tzinfo=None
- )
- start_of_day = int(to_timestamp(start_of_day))
-
- if (start_of_day, meta_key) not in self._seen_locations:
- self._seen_locations.add((start_of_day, meta_key))
- loc = get_code_location(stacklevel + 3)
- if loc is not None:
- # Group metadata by day to make flushing more efficient.
- # There needs to be one envelope item per timestamp.
- self._pending_locations.setdefault(start_of_day, []).append(
- (meta_key, loc)
- )
-
- @metrics_noop
- def need_code_location(
- self,
- ty, # type: MetricType
- key, # type: str
- unit, # type: MeasurementUnit
- timestamp, # type: float
- ):
- # type: (...) -> bool
- if self._enable_code_locations:
- return False
- meta_key = (ty, key, unit)
- start_of_day = datetime.fromtimestamp(timestamp, timezone.utc).replace(
- hour=0, minute=0, second=0, microsecond=0, tzinfo=None
- )
- start_of_day = int(to_timestamp(start_of_day))
- return (start_of_day, meta_key) not in self._seen_locations
-
- def kill(self):
- # type: (...) -> None
- if self._flusher is None:
- return
-
- self._running = False
- self._flush_event.set()
- self._flusher = None
-
- @metrics_noop
- def flush(self):
- # type: (...) -> None
- self._force_flush = True
- self._flush()
-
- def _consider_force_flush(self):
- # type: (...) -> None
- # It's important to acquire a lock around this method, since it will touch shared data structures.
- total_weight = len(self.buckets) + self._buckets_total_weight
- if total_weight >= self.MAX_WEIGHT:
- self._force_flush = True
- self._flush_event.set()
-
- def _emit(
- self,
- flushable_buckets, # type: (Iterable[Tuple[int, Dict[BucketKey, Metric]]])
- code_locations, # type: Dict[int, List[Tuple[MetricMetaKey, Dict[str, Any]]]]
- ):
- # type: (...) -> Optional[Envelope]
- envelope = Envelope()
-
- if flushable_buckets:
- encoded_metrics = _encode_metrics(flushable_buckets)
- envelope.add_item(Item(payload=encoded_metrics, type="statsd"))
-
- for timestamp, locations in code_locations.items():
- encoded_locations = _encode_locations(timestamp, locations)
- envelope.add_item(Item(payload=encoded_locations, type="metric_meta"))
-
- if envelope.items:
- self._capture_func(envelope)
- return envelope
- return None
-
-
- def _serialize_tags(
- tags, # type: Optional[MetricTags]
- ):
- # type: (...) -> MetricTagsInternal
- if not tags:
- return ()
-
- rv = []
- for key, value in tags.items():
- # If the value is a collection, we want to flatten it.
- if isinstance(value, (list, tuple)):
- for inner_value in value:
- if inner_value is not None:
- rv.append((key, str(inner_value)))
- elif value is not None:
- rv.append((key, str(value)))
-
- # It's very important to sort the tags in order to obtain the
- # same bucket key.
- return tuple(sorted(rv))
-
-
- def _tags_to_dict(tags):
- # type: (MetricTagsInternal) -> Dict[str, Any]
- rv = {} # type: Dict[str, Any]
- for tag_name, tag_value in tags:
- old_value = rv.get(tag_name)
- if old_value is not None:
- if isinstance(old_value, list):
- old_value.append(tag_value)
- else:
- rv[tag_name] = [old_value, tag_value]
- else:
- rv[tag_name] = tag_value
- return rv
-
-
- def _get_aggregator():
- # type: () -> Optional[MetricsAggregator]
- client = sentry_sdk.get_client()
- return (
- client.metrics_aggregator
- if client.is_active() and client.metrics_aggregator is not None
- else None
- )
-
-
- def _get_aggregator_and_update_tags(key, value, unit, tags):
- # type: (str, Optional[MetricValue], MeasurementUnit, Optional[MetricTags]) -> Tuple[Optional[MetricsAggregator], Optional[LocalAggregator], Optional[MetricTags]]
- client = sentry_sdk.get_client()
- if not client.is_active() or client.metrics_aggregator is None:
- return None, None, tags
-
- updated_tags = dict(tags or ()) # type: Dict[str, MetricTagValue]
- updated_tags.setdefault("release", client.options["release"])
- updated_tags.setdefault("environment", client.options["environment"])
-
- scope = sentry_sdk.get_current_scope()
- local_aggregator = None
-
- # We go with the low-level API here to access transaction information as
- # this one is the same between just errors and errors + performance
- transaction_source = scope._transaction_info.get("source")
- if transaction_source in GOOD_TRANSACTION_SOURCES:
- transaction_name = scope._transaction
- if transaction_name:
- updated_tags.setdefault("transaction", transaction_name)
- if scope._span is not None:
- local_aggregator = scope._span._get_local_aggregator()
-
- experiments = client.options.get("_experiments", {})
- before_emit_callback = experiments.get("before_emit_metric")
- if before_emit_callback is not None:
- with recursion_protection() as in_metrics:
- if not in_metrics:
- if not before_emit_callback(key, value, unit, updated_tags):
- return None, None, updated_tags
-
- return client.metrics_aggregator, local_aggregator, updated_tags
-
-
- def increment(
- key, # type: str
- value=1.0, # type: float
- unit="none", # type: MeasurementUnit
- tags=None, # type: Optional[MetricTags]
- timestamp=None, # type: Optional[Union[float, datetime]]
- stacklevel=0, # type: int
- ):
- # type: (...) -> None
- """Increments a counter."""
- aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
- key, value, unit, tags
- )
- if aggregator is not None:
- aggregator.add(
- "c", key, value, unit, tags, timestamp, local_aggregator, stacklevel
- )
-
-
- # alias as incr is relatively common in python
- incr = increment
-
-
- class _Timing:
- def __init__(
- self,
- key, # type: str
- tags, # type: Optional[MetricTags]
- timestamp, # type: Optional[Union[float, datetime]]
- value, # type: Optional[float]
- unit, # type: DurationUnit
- stacklevel, # type: int
- ):
- # type: (...) -> None
- self.key = key
- self.tags = tags
- self.timestamp = timestamp
- self.value = value
- self.unit = unit
- self.entered = None # type: Optional[float]
- self._span = None # type: Optional[sentry_sdk.tracing.Span]
- self.stacklevel = stacklevel
-
- def _validate_invocation(self, context):
- # type: (str) -> None
- if self.value is not None:
- raise TypeError(
- "cannot use timing as %s when a value is provided" % context
- )
-
- def __enter__(self):
- # type: (...) -> _Timing
- self.entered = TIMING_FUNCTIONS[self.unit]()
- self._validate_invocation("context-manager")
- self._span = sentry_sdk.start_span(op="metric.timing", name=self.key)
- if self.tags:
- for key, value in self.tags.items():
- if isinstance(value, (tuple, list)):
- value = ",".join(sorted(map(str, value)))
- self._span.set_tag(key, value)
- self._span.__enter__()
-
- # report code locations here for better accuracy
- aggregator = _get_aggregator()
- if aggregator is not None:
- aggregator.record_code_location("d", self.key, self.unit, self.stacklevel)
-
- return self
-
- def __exit__(self, exc_type, exc_value, tb):
- # type: (Any, Any, Any) -> None
- assert self._span, "did not enter"
- aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
- self.key,
- self.value,
- self.unit,
- self.tags,
- )
- if aggregator is not None:
- elapsed = TIMING_FUNCTIONS[self.unit]() - self.entered # type: ignore
- aggregator.add(
- "d",
- self.key,
- elapsed,
- self.unit,
- tags,
- self.timestamp,
- local_aggregator,
- None, # code locations are reported in __enter__
- )
-
- self._span.__exit__(exc_type, exc_value, tb)
- self._span = None
-
- def __call__(self, f):
- # type: (Any) -> Any
- self._validate_invocation("decorator")
-
- @wraps(f)
- def timed_func(*args, **kwargs):
- # type: (*Any, **Any) -> Any
- with timing(
- key=self.key,
- tags=self.tags,
- timestamp=self.timestamp,
- unit=self.unit,
- stacklevel=self.stacklevel + 1,
- ):
- return f(*args, **kwargs)
-
- return timed_func
-
-
- def timing(
- key, # type: str
- value=None, # type: Optional[float]
- unit="second", # type: DurationUnit
- tags=None, # type: Optional[MetricTags]
- timestamp=None, # type: Optional[Union[float, datetime]]
- stacklevel=0, # type: int
- ):
- # type: (...) -> _Timing
- """Emits a distribution with the time it takes to run the given code block.
-
- This method supports three forms of invocation:
-
- - when a `value` is provided, it functions similar to `distribution` but with
- - it can be used as a context manager
- - it can be used as a decorator
- """
- if value is not None:
- aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
- key, value, unit, tags
- )
- if aggregator is not None:
- aggregator.add(
- "d", key, value, unit, tags, timestamp, local_aggregator, stacklevel
- )
- return _Timing(key, tags, timestamp, value, unit, stacklevel)
-
-
- def distribution(
- key, # type: str
- value, # type: float
- unit="none", # type: MeasurementUnit
- tags=None, # type: Optional[MetricTags]
- timestamp=None, # type: Optional[Union[float, datetime]]
- stacklevel=0, # type: int
- ):
- # type: (...) -> None
- """Emits a distribution."""
- aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
- key, value, unit, tags
- )
- if aggregator is not None:
- aggregator.add(
- "d", key, value, unit, tags, timestamp, local_aggregator, stacklevel
- )
-
-
- def set(
- key, # type: str
- value, # type: Union[int, str]
- unit="none", # type: MeasurementUnit
- tags=None, # type: Optional[MetricTags]
- timestamp=None, # type: Optional[Union[float, datetime]]
- stacklevel=0, # type: int
- ):
- # type: (...) -> None
- """Emits a set."""
- aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
- key, value, unit, tags
- )
- if aggregator is not None:
- aggregator.add(
- "s", key, value, unit, tags, timestamp, local_aggregator, stacklevel
- )
-
-
- def gauge(
- key, # type: str
- value, # type: float
- unit="none", # type: MeasurementUnit
- tags=None, # type: Optional[MetricTags]
- timestamp=None, # type: Optional[Union[float, datetime]]
- stacklevel=0, # type: int
- ):
- # type: (...) -> None
- """Emits a gauge."""
- aggregator, local_aggregator, tags = _get_aggregator_and_update_tags(
- key, value, unit, tags
- )
- if aggregator is not None:
- aggregator.add(
- "g", key, value, unit, tags, timestamp, local_aggregator, stacklevel
- )
|