import enum
import ciso8601
import time
# noinspection PyUnresolvedReferences
import re # noqa
from collections import Counter
# noinspection PyUnresolvedReferences
from .common import * # noqa
from ..compat import pd
[docs]class DataPoint:
"""Base class for dynamically generated datapoint class"""
__slots__ = ()
[docs] def items(self):
"""Returns an iterator over pair of keys and values"""
[docs] def to_dict(self) -> dict:
"""Converts datapoint to a regular dictionary"""
[docs] def to_lineprotocol(self) -> bytes:
"""Returns InfluxDB line protocol representation of datapoint"""
[docs]class InfluxType(enum.Enum):
MEASUREMENT = 0
TIMEINT = 10
TIMESTR = 11
TIMEDT = 12
TAG = 20
TAGENUM = 21
# Fields (>=25)
PLACEHOLDER = 25
BOOL = 30
INT = 40
FLOAT = 50
STR = 60
ENUM = 61
def str_to_dt(s):
dt = ciso8601.parse_datetime(s)
if dt:
return dt
raise ValueError(f'Invalid datetime string: {dt!r}')
def dt_to_int(dt):
if not dt.tzinfo:
# Assume tz-naive input to be in UTC, not local time
return int(dt.timestamp() - time.timezone) * 10 ** 9 + dt.microsecond * 1000
return int(dt.timestamp()) * 10 ** 9 + dt.microsecond * 1000
def td_to_int(td):
return int(td.total_seconds()) * 10 ** 9 + td.microseconds * 1000
def _make_serializer(schema, meas, rm_none=False, extra_tags=None):
"""Factory of datapoint -> line protocol parsers"""
tags = []
fields = []
ts = None
meas = meas
for k, t in schema.items():
if t is InfluxType.MEASUREMENT:
meas = f"{{i.{k}}}"
elif t is InfluxType.TIMEINT:
ts = f"{{i.{k}}}"
elif t is InfluxType.TIMESTR:
if pd:
ts = f"{{pd.Timestamp(i.{k} or 0).value}}"
else:
ts = f"{{dt_to_int(str_to_dt(i.{k}))}}"
elif t is InfluxType.TIMEDT:
if pd:
ts = f"{{pd.Timestamp(i.{k} or 0).value}}"
else:
ts = f"{{dt_to_int(i.{k})}}"
elif t is InfluxType.TAG:
tags.append(f"{k}={{str(i.{k}).translate(tag_escape)}}")
elif t is InfluxType.TAGENUM:
tags.append(f"{k}={{getattr(i.{k}, 'name', i.{k} or None)}}")
elif t in (InfluxType.FLOAT, InfluxType.BOOL):
fields.append(f"{k}={{i.{k}}}")
elif t is InfluxType.INT:
fields.append(f"{k}={{i.{k}}}i")
elif t is InfluxType.PLACEHOLDER:
fields.append(f"{k}=true")
elif t is InfluxType.STR:
fields.append(f"{k}=\\\"{{str(i.{k}).translate(str_escape)}}\\\"")
elif t is InfluxType.ENUM:
fields.append(f"{k}=\\\"{{getattr(i.{k}, 'name', i.{k} or None)}}\\\"")
else:
raise TypeError(f"Unknown type: {t!r}")
extra_tags = extra_tags or {}
for k, v in extra_tags.items():
tags.append(f"{k}={v}")
sep = ',' if tags else ''
ts = f' {ts}' if ts else ''
fmt = f"{meas}{sep}{','.join(tags)} {','.join(fields)}{ts}"
if rm_none:
# Has substantial runtime impact. Best avoided if performance is critical.
# First field can't be removed.
pat = ',\w+="?None"?i?'
f = eval('lambda i: re.sub(r\'{}\', "", f"{}").encode()'.format(pat, fmt))
else:
f = eval('lambda i: f"{}".encode()'.format(fmt))
f.__doc__ = DataPoint.to_lineprotocol.__doc__
return f
[docs]def datapoint(schema=None, name="DataPoint", *, rm_none=False, fill_none=False, extra_tags=None):
"""Dynamic datapoint class factory
Can be used as a decorator (similar to Python 3.7 :py:mod:`dataclasses`)
or as a function (similar to :py:func:`~collections.namedtuple`, but mutable).
Main characteristics:
- Supports accessing field values by attribute or subscription
- Support dict-like iteration via ``items`` method
- Built-in serialization to InfluxDB line protocol through the ``to_lineprotocol`` method.
- About 2-3x faster serialization than the ``serialization.mapping`` module.
- Difference gets smaller (1x-1.5x) when ``rm_none=True`` or when the number of
fields/tags is very large (20+).
:param schema: Dictionary-based (functional namedtuple style)
or @dataclass decorator-based (dataclass style) measurement schema
:param name: Class name (used when passing schema dictionaries only)
:param rm_none: Whether apply a regex to remove ``None`` values from.
If ``False``, passing ``None`` values to boolean, integer or float or time fields
will result in write errors. Setting to ``True`` is "safer" but impacts performance.
:param fill_none: Whether or not to set missing fields to ``None``.
Likely best used together with ``rm_none=True``.
:param extra_tags: Hard coded tags to be added to every point generated.
"""
def _datapoint(schema):
cls_name = getattr(schema, "__name__", name)
docstring = getattr(schema, '__doc__', DataPoint.__doc__)
schema = getattr(schema, "__annotations__", schema)
schema = {k: schema[k] for k in sorted(schema, key=lambda x: schema[x].value)}
# Sanity check
c = Counter(schema.values())
assert c[InfluxType.MEASUREMENT] <= 1
assert sum([c[e] for e in InfluxType if 0 < e.value < 20]) <= 1 # 0 or 1 timestamp
assert sum([c[e] for e in InfluxType if e.value >= 25]) > 0 # 1 or more fields
# Generate __init__
if fill_none:
args = ', '.join([f"{k}=None" for k in schema])
else:
args = ', '.join(schema)
exec(
f"def __init__(self, {args}):\n"
+ "\n".join([f' self.{k} = {k}' for k in schema])
)
def __repr__(self):
items = [f'{k}={repr(v)}' for k, v in self.items()]
return f'{cls_name}({", ".join(items)})'
def items(self):
for k in self._schema:
yield k, getattr(self, k)
yield from self._extra_tags.items()
cls_attrs = {
'_schema': schema,
'_opts': (rm_none, fill_none),
'_extra_tags': extra_tags or {},
'__slots__': tuple(schema),
'__init__': locals()['__init__'],
'__repr__': locals()['__repr__'],
'__getitem__': lambda self, item: getattr(self, item),
'__len__': lambda self: len(self._schema),
'__iter__': lambda self: iter(self._schema),
'__eq__': lambda self, other: all(self[k] == other[k] for k in self),
'__doc__': docstring,
'items': locals()['items'],
'to_dict': lambda self: dict(self.items()),
'to_lineprotocol': _make_serializer(schema, cls_name, rm_none, extra_tags)
}
return type(cls_name, (DataPoint,), cls_attrs)
return _datapoint(schema) if schema else _datapoint