Source code for aioinflux.serialization.usertype

import enum
import ciso8601
import time
import decimal
import typing
from collections import Counter
from typing import TypeVar, Optional, Mapping, Union
from datetime import datetime

# noinspection PyUnresolvedReferences
from .common import *  # noqa
from ..compat import pd

__all__ = [
    'lineprotocol', 'SchemaError',
    'MEASUREMENT', 'TIMEINT', 'TIMESTR', 'TIMEDT',
    'TAG', 'TAGENUM',
    'BOOL', 'INT', 'DECIMAL', 'FLOAT', 'STR', 'ENUM',
]

MEASUREMENT = TypeVar('MEASUREMENT', bound=str)
TIMEINT = TypeVar('TIMEINT', bound=int)
TIMESTR = TypeVar('TIMESTR', bound=str)
TIMEDT = TypeVar('TIMEDT', bound=datetime)
TAG = TypeVar('TAG', bound=str)
TAGENUM = TypeVar('TAGENUM', bound=enum.Enum)
BOOL = TypeVar('BOOL', bound=bool)
INT = TypeVar('INT', bound=int)
DECIMAL = TypeVar('DECIMAL', bound=decimal.Decimal)
FLOAT = TypeVar('FLOAT', bound=float)
STR = TypeVar('STR', bound=str)
ENUM = TypeVar('ENUM', bound=enum.Enum)

time_types = [TIMEINT, TIMEDT, TIMESTR]
tag_types = [TAG, TAGENUM]
field_types = [BOOL, INT, DECIMAL, FLOAT, STR, ENUM]
optional_field_types = [Optional[f] for f in field_types]


[docs]class SchemaError(TypeError): """Raised when invalid schema is passed to :func:`lineprotocol`"""
def str_to_dt(s): dt = ciso8601.parse_datetime(s) if dt: return dt raise ValueError(f'Invalid datetime string: {dt!r}') def dt_to_int(dt): if not dt.tzinfo: # Assume tz-naive input to be in UTC, not local time return int(dt.timestamp() - time.timezone) * 10 ** 9 + dt.microsecond * 1000 return int(dt.timestamp()) * 10 ** 9 + dt.microsecond * 1000 def _validate_schema(schema, placeholder): c = Counter(schema.values()) if not c: raise SchemaError("Schema/type annotations missing") if c[MEASUREMENT] > 1: raise SchemaError("Class can't have more than one 'MEASUREMENT' attribute") if sum(c[e] for e in time_types) > 1: raise SchemaError(f"Can't have more than one timestamp-type attribute {time_types}") if sum(c[e] for e in field_types + optional_field_types) < 1 and not placeholder: raise SchemaError(f"Must have one or more non-empty " f"field-type attributes {field_types}") def is_optional(t, base_type): """Checks if type hint is Optional[base_type]""" # NOTE: The 'typing' module is still "provisional" and documentation sub-optimal, # which requires these kinds instrospection into undocumented implementation details # NOTE: May break in Python 3.8 # TODO: Check if works on Python 3.6 try: cond1 = getattr(t, '__origin__') is Union cond2 = {type(None), base_type} == set(getattr(t, '__args__', [])) if cond1 and cond2: return True except AttributeError: return False return False def _make_serializer(meas, schema, extra_tags, placeholder): # noqa: C901 """Factory of line protocol parsers""" _validate_schema(schema, placeholder) tags = [] fields = [] ts = None meas = meas for k, t in schema.items(): if t is MEASUREMENT: meas = f"{{i.{k}}}" elif t is TIMEINT: ts = f"{{i.{k}}}" elif t is TIMESTR: if pd: ts = f"{{pd.Timestamp(i.{k} or 0).value}}" else: ts = f"{{dt_to_int(str_to_dt(i.{k}))}}" elif t is TIMEDT: if pd: ts = f"{{pd.Timestamp(i.{k} or 0).value}}" else: ts = f"{{dt_to_int(i.{k})}}" elif t is TAG or is_optional(t, TAG): tags.append(f"{k}={{str(i.{k}).translate(tag_escape)}}") elif t is TAGENUM or is_optional(t, TAGENUM): tags.append(f"{k}={{getattr(i.{k}, 'name', i.{k} or None)}}") elif t is FLOAT or is_optional(t, FLOAT): fields.append(f"{k}={{i.{k}}}") elif t is DECIMAL or is_optional(t, DECIMAL): fields.append(f"{k}={{i.{k}}}") elif t is BOOL or is_optional(t, BOOL): fields.append(f"{k}={{i.{k}}}") elif t is INT or is_optional(t, INT): fields.append(f"{k}={{i.{k}}}i") elif t is STR or is_optional(t, STR): fields.append(f"{k}=\\\"{{str(i.{k}).translate(str_escape)}}\\\"") elif t is ENUM or is_optional(t, ENUM): fields.append(f"{k}=\\\"{{getattr(i.{k}, 'name', i.{k} or None)}}\\\"") else: raise SchemaError(f"Invalid attribute type {k!r}: {t!r}") extra_tags = extra_tags or {} for k, v in extra_tags.items(): tags.append(f"{k}={v.translate(tag_escape)}") if placeholder: fields.insert(0, "_=true") sep = ',' if tags else '' ts = f' {ts}' if ts else '' fmt = f"{meas}{sep}{','.join(tags)} {','.join(fields)}{ts}" f = eval(f'lambda i: f"{fmt}".encode()') f.__doc__ = "Returns InfluxDB line protocol representation of user-defined class" return f
[docs]def lineprotocol( cls=None, *, schema: Optional[Mapping[str, type]] = None, rm_none: bool = False, extra_tags: Optional[Mapping[str, str]] = None, placeholder: bool = False ): """Adds ``to_lineprotocol`` method to arbitrary user-defined classes :param cls: Class to monkey-patch :param schema: Schema dictionary (attr/type pairs). :param rm_none: Whether apply a regex to remove ``None`` values. If ``False``, passing ``None`` values to boolean, integer or float or time fields will result in write errors. Setting to ``True`` is "safer" but impacts performance. :param extra_tags: Hard coded tags to be added to every point generated. :param placeholder: If no field attributes are present, add a placeholder attribute (``_``) which is always equal to ``True``. This is a workaround for creating field-less points (which is not supported natively by InfluxDB) """ opts = dict( schema=schema, rm_none=rm_none, extra_tags=extra_tags or {}, placeholder=placeholder, ) def _lineprotocol(cls): _schema = schema or typing.get_type_hints(cls) # TODO: Raise warning or exception if schema has optionals but rm_none is False # for t in _schema.values(): # for bt in field_types + tag_types: # if is_optional(t, bt): # warnings.warn("") f = _make_serializer(cls.__name__, _schema, extra_tags, placeholder) cls.to_lineprotocol = f cls.to_lineprotocol.opts = opts return cls def _rm_none_lineprotocol(cls): def _parser_selector(i): if not hasattr(i, '_asdict'): raise ValueError("'rm_none' can only be used with namedtuples") key = tuple([k for k, v in i._asdict().items() if v != '' and v is not None]) if key not in parsers: _schema = schema or typing.get_type_hints(cls) or {} _schema = {k: v for k, v in _schema.items() if k in key} parsers[key] = _make_serializer(cls.__name__, _schema, extra_tags, placeholder) return parsers[key](i) parsers = {} cls.to_lineprotocol = _parser_selector cls.to_lineprotocol.opts = opts return cls if cls: if rm_none: # Using rm_none has substantial runtime impact. # Best avoided if performance is critical. return _rm_none_lineprotocol(cls) # No options return _lineprotocol(cls) else: if rm_none: return _rm_none_lineprotocol return _lineprotocol