Source code for aioinflux.serialization.mapping

import time
from typing import Mapping

import ciso8601

from .common import *


[docs]def serialize(point: Mapping, measurement=None, **extra_tags) -> bytes: """Converts dictionary-like data into a single line protocol line (point)""" tags = _serialize_tags(point, extra_tags) return ( f'{_serialize_measurement(point, measurement)}' f'{"," if tags else ""}{tags} ' f'{_serialize_fields(point)} ' f'{_serialize_timestamp(point)}' ).encode()
def _serialize_measurement(point, measurement): try: return escape(point['measurement'], measurement_escape) except KeyError: if measurement is None: raise ValueError("'measurement' missing") return escape(measurement, measurement_escape) def _serialize_tags(point, extra_tags): output = [] for k, v in {**point.get('tags', {}), **extra_tags}.items(): k = escape(k, key_escape) v = escape(v, tag_escape) if not v: continue # ignore blank/null string tags output.append(f'{k}={v}') return ','.join(output) def _serialize_timestamp(point): dt = point.get('time') if not dt: return '' elif isinstance(dt, int): return dt elif isinstance(dt, (str, bytes)): dt = ciso8601.parse_datetime(dt) if not dt: raise ValueError(f'Invalid datetime string: {dt!r}') if not dt.tzinfo: # Assume tz-naive input to be in UTC, not local time return int(dt.timestamp() - time.timezone) * 10 ** 9 + dt.microsecond * 1000 return int(dt.timestamp()) * 10 ** 9 + dt.microsecond * 1000 def _serialize_fields(point): """Field values can be floats, integers, strings, or Booleans.""" output = [] for k, v in point['fields'].items(): k = escape(k, key_escape) if isinstance(v, bool): output.append(f'{k}={v}') elif isinstance(v, int): output.append(f'{k}={v}i') elif isinstance(v, str): output.append(f'{k}="{v.translate(str_escape)}"') elif v is None: # Empty values continue else: # Floats output.append(f'{k}={v}') return ','.join(output)