Source code for aioinflux.serialization.usertype

import enum
import ciso8601
import time
# noinspection PyUnresolvedReferences
import re  # noqa
from collections import Counter
from typing import TypeVar
from datetime import datetime

# noinspection PyUnresolvedReferences
from .common import *  # noqa
from ..compat import pd

__all__ = [
    'lineprotocol', 'SchemaError',
    'MEASUREMENT', 'TIMEINT', 'TIMESTR', 'TIMEDT',
    'TAG', 'TAGENUM',
    'BOOL', 'INT', 'FLOAT', 'STR', 'ENUM',
]

MEASUREMENT = TypeVar('MEASUREMENT', bound=str)
TIMEINT = TypeVar('TIMEINT', bound=int)
TIMESTR = TypeVar('TIMESTR', bound=str)
TIMEDT = TypeVar('TIMEDT', bound=datetime)
TAG = TypeVar('TAG', bound=str)
TAGENUM = TypeVar('TAGENUM', enum.Enum, str)
BOOL = TypeVar('BOOL', bound=bool)
INT = TypeVar('INT', bound=int)
FLOAT = TypeVar('FLOAT', bound=float)
STR = TypeVar('STR', bound=str)
ENUM = TypeVar('ENUM', enum.Enum, str)

time_types = [TIMEINT, TIMEDT, TIMESTR]
field_types = [BOOL, INT, FLOAT, STR, ENUM]


[docs]class SchemaError(TypeError): """Raised when invalid schema is passed to :func:`lineprotocol`"""
def str_to_dt(s): dt = ciso8601.parse_datetime(s) if dt: return dt raise ValueError(f'Invalid datetime string: {dt!r}') def dt_to_int(dt): if not dt.tzinfo: # Assume tz-naive input to be in UTC, not local time return int(dt.timestamp() - time.timezone) * 10 ** 9 + dt.microsecond * 1000 return int(dt.timestamp()) * 10 ** 9 + dt.microsecond * 1000 def _validate_schema(schema, placeholder): c = Counter(schema.values()) if not c: raise SchemaError("Schema/type annotations missing") if c[MEASUREMENT] > 1: raise SchemaError("Class can't have more than one 'MEASUREMENT' attribute") if sum(c[e] for e in time_types) > 1: raise SchemaError(f"Can't have more than one timestamp-type attribute {time_types}") if sum(c[e] for e in field_types) < 1 and not placeholder: raise SchemaError(f"Must have one or more field-type attributes {field_types}") def _make_serializer(meas, schema, rm_none, extra_tags, placeholder): """Factory of line protocol parsers""" _validate_schema(schema, placeholder) tags = [] fields = [] ts = None meas = meas for k, t in schema.items(): if t is MEASUREMENT: meas = f"{{i.{k}}}" elif t is TIMEINT: ts = f"{{i.{k}}}" elif t is TIMESTR: if pd: ts = f"{{pd.Timestamp(i.{k} or 0).value}}" else: ts = f"{{dt_to_int(str_to_dt(i.{k}))}}" elif t is TIMEDT: if pd: ts = f"{{pd.Timestamp(i.{k} or 0).value}}" else: ts = f"{{dt_to_int(i.{k})}}" elif t is TAG: tags.append(f"{k}={{str(i.{k}).translate(tag_escape)}}") elif t is TAGENUM: tags.append(f"{k}={{getattr(i.{k}, 'name', i.{k} or None)}}") elif t in (FLOAT, BOOL): fields.append(f"{k}={{i.{k}}}") elif t is INT: fields.append(f"{k}={{i.{k}}}i") elif t is STR: fields.append(f"{k}=\\\"{{str(i.{k}).translate(str_escape)}}\\\"") elif t is ENUM: fields.append(f"{k}=\\\"{{getattr(i.{k}, 'name', i.{k} or None)}}\\\"") else: raise SchemaError(f"Invalid attribute type {k!r}: {t!r}") extra_tags = extra_tags or {} for k, v in extra_tags.items(): tags.append(f"{k}={v}") if placeholder: fields.insert(0, f"_=true") sep = ',' if tags else '' ts = f' {ts}' if ts else '' fmt = f"{meas}{sep}{','.join(tags)} {','.join(fields)}{ts}" if rm_none: # Has substantial runtime impact. Best avoided if performance is critical. # First field can't be removed. pat = r',\w+="?None"?i?' f = eval('lambda i: re.sub(r\'{}\', "", f"{}").encode()'.format(pat, fmt)) else: f = eval('lambda i: f"{}".encode()'.format(fmt)) f.__doc__ = "Returns InfluxDB line protocol representation of user-defined class" f._args = dict(meas=meas, schema=schema, rm_none=rm_none, extra_tags=extra_tags, placeholder=placeholder) return f
[docs]def lineprotocol(cls=None, *, schema=None, rm_none=False, extra_tags=None, placeholder=False): """Adds ``to_lineprotocol`` method to arbitrary user-defined classes :param schema: Schema dictionary (attr/type pairs). :param rm_none: Whether apply a regex to remove ``None`` values. If ``False``, passing ``None`` values to boolean, integer or float or time fields will result in write errors. Setting to ``True`` is "safer" but impacts performance. :param extra_tags: Hard coded tags to be added to every point generated. :param placeholder: If no field attributes are present, add a placeholder attribute (``_``) which is always equal to ``True``. This is a workaround for creating field-less points (which is not supported natively by InfluxDB) """ def _lineprotocol(cls): _schema = schema or getattr(cls, '__annotations__', {}) f = _make_serializer(cls.__name__, _schema, rm_none, extra_tags, placeholder) cls.to_lineprotocol = f return cls return _lineprotocol(cls) if cls else _lineprotocol