import re
import time
import warnings
from collections import defaultdict
from functools import reduce
from itertools import chain
from typing import Mapping, Union, Dict
from . import pd, np
if pd is None:
import ciso8601
# Special characters documentation:
# https://docs.influxdata.com/influxdb/v1.4/write_protocols/line_protocol_reference/#special-characters
# Although not in the official docs, new line characters are removed in order to avoid issues.
key_escape = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '=': r'\=', '\n': ''})
tag_escape = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '=': r'\=', '\n': ''})
str_escape = str.maketrans({'\\': '\\\\', '"': r'\"', '\n': ''})
measurement_escape = str.maketrans({'\\': '\\\\', ',': r'\,', ' ': r'\ ', '\n': ''})
def escape(string, escape_pattern):
"""Assistant function for string escaping"""
try:
return string.translate(escape_pattern)
except AttributeError:
warnings.warn("Non-string-like data passed. "
"Attempting to convert to 'str'.")
return str(string).translate(tag_escape)
def parse_data(data, measurement=None, tag_columns=None, **extra_tags):
"""Converts input data into line protocol format"""
if isinstance(data, bytes):
return data
elif isinstance(data, str):
return data.encode('utf-8')
elif pd is not None and isinstance(data, pd.DataFrame):
if measurement is None:
raise ValueError("Missing 'measurement'")
return parse_df(data, measurement, tag_columns, **extra_tags)
elif isinstance(data, dict):
return make_line(data, measurement, **extra_tags).encode('utf-8')
elif hasattr(data, '__iter__'):
return b'\n'.join([parse_data(i, measurement, tag_columns, **extra_tags) for i in data])
else:
raise ValueError('Invalid input', data)
def make_line(point: Mapping, measurement=None, **extra_tags) -> str:
"""Converts dictionary-like data into a single line protocol line (point)"""
tags = _parse_tags(point, extra_tags)
if tags:
return (f'{_parse_measurement(point, measurement)},{tags} '
f'{_parse_fields(point)} {_parse_timestamp(point)}')
return (f'{_parse_measurement(point, measurement)} '
f'{_parse_fields(point)} {_parse_timestamp(point)}')
def _parse_measurement(point, measurement):
try:
return escape(point['measurement'], measurement_escape)
except KeyError:
if measurement is None:
raise ValueError("'measurement' missing")
return escape(measurement, measurement_escape)
def _parse_tags(point, extra_tags):
output = []
try:
for k, v in {**point['tags'], **extra_tags}.items():
k = escape(k, key_escape)
v = escape(v, tag_escape)
if not v:
continue # ignore blank/null string tags
output.append(f'{k}={v}')
except KeyError:
pass
if output:
return ','.join(output)
else:
return ''
def _parse_timestamp(point):
if 'time' not in point:
return ''
dt = point['time']
if pd is not None:
return pd.Timestamp(dt).value
if isinstance(dt, (str, bytes)):
dt = ciso8601.parse_datetime(dt)
if not dt:
raise ValueError('Invalid datetime string')
if not dt.tzinfo:
# Assume tz-naive input to be in UTC, not local time
return int(dt.timestamp() - time.timezone) * 10 ** 9 + dt.microsecond * 1000
return int(dt.timestamp()) * 10 ** 9 + dt.microsecond * 1000
def _parse_fields(point):
"""Field values can be floats, integers, strings, or Booleans."""
output = []
for k, v in point['fields'].items():
k = escape(k, key_escape)
if isinstance(v, bool):
output.append(f'{k}={v}')
elif isinstance(v, int):
output.append(f'{k}={v}i')
elif isinstance(v, str):
output.append(f'{k}="{v.translate(str_escape)}"')
elif v is None:
# Empty values
continue
else:
# Floats
output.append(f'{k}={v}')
return ','.join(output)
DataFrameType = None if pd is None else Union[bool, pd.DataFrame, Dict[str, pd.DataFrame]]
[docs]def make_df(resp) -> DataFrameType:
"""Makes a dictionary of DataFrames from a response object"""
def maker(series) -> pd.DataFrame:
df = pd.DataFrame(series.get('values', []), columns=series['columns'])
if 'time' not in df.columns:
return df
df: pd.DataFrame = df.set_index(pd.to_datetime(df['time'])).drop('time', axis=1)
df.index = df.index.tz_localize('UTC')
df.index.name = None
if 'tags' in series:
for k, v in series['tags'].items():
df[k] = v
if 'name' in series:
df.name = series['name']
return df
def drop_zero_index(df):
if isinstance(df.index, pd.DatetimeIndex):
if all(i.value == 0 for i in df.index):
df.reset_index(drop=True, inplace=True)
# Parsing
df_list = [((series.get('name'), tuple(series.get('tags', {}).items())), maker(series))
for statement in resp['results'] if 'series' in statement
for series in statement['series']]
# Concatenation
d = defaultdict(list)
for k, df in sorted(df_list, key=lambda x: x[0]):
d[k].append(df)
dfs = {k: pd.concat(v, axis=0) for k, v in d.items()}
# Post-processing
for df in dfs.values():
drop_zero_index(df)
# Return
if len(dfs) == 1:
return dfs[list(dfs.keys())[0]]
return dfs
def itertuples(df):
"""Custom implementation of ``DataFrame.itertuples`` that
returns plain tuples instead of namedtuples. About 50% faster.
"""
cols = [df.iloc[:, k] for k in range(len(df.columns))]
return zip(df.index, *cols)
def make_replacements(df):
obj_cols = {k for k, v in dict(df.dtypes).items() if v is np.dtype('O')}
other_cols = set(df.columns) - obj_cols
obj_nans = (f'{k}="nan"' for k in obj_cols)
other_nans = (f'{k}=nan' for k in other_cols)
replacements = [
('|'.join(chain(obj_nans, other_nans)), ''),
(',{2,}', ','),
('|'.join([', ,', ', ', ' ,']), ' '),
]
return replacements
[docs]def parse_df(df, measurement, tag_columns=None, **extra_tags):
"""Converts a Pandas DataFrame into line protocol format"""
# Pre-processing
if not isinstance(df.index, pd.DatetimeIndex):
raise ValueError('DataFrame index is not DatetimeIndex')
tag_columns = set(tag_columns or [])
tag_columns.update(k for k, v in dict(df.dtypes).items()
if isinstance(v, pd.api.types.CategoricalDtype))
isnull = df.isnull().any(axis=1)
# Make parser function
tags = []
fields = []
for k, v in extra_tags.items():
tags.append(f"{k}={escape(v, key_escape)}")
for i, (k, v) in enumerate(df.dtypes.items()):
k = k.translate(key_escape)
if k in tag_columns:
tags.append(f"{k}={{p[{i+1}]}}")
elif issubclass(v.type, np.integer):
fields.append(f"{k}={{p[{i+1}]}}i")
elif issubclass(v.type, (np.float, np.bool_)):
fields.append(f"{k}={{p[{i+1}]}}")
else:
# String escaping is skipped for performance reasons
# Strings containing double-quotes can cause strange write errors
# and should be sanitized by the user.
# e.g., df[k] = df[k].astype('str').str.translate(str_escape)
fields.append(f"{k}=\"{{p[{i+1}]}}\"")
fmt = (f'{measurement}', f'{"," if tags else ""}', ','.join(tags),
' ', ','.join(fields), ' {p[0].value}')
f = eval("lambda p: f'{}'".format(''.join(fmt)))
# Map/concat
if isnull.any():
lp = map(f, itertuples(df[~isnull]))
rep = make_replacements(df)
lp_nan = (reduce(lambda a, b: re.sub(*b, a), rep, f(p))
for p in itertuples(df[isnull]))
return '\n'.join(chain(lp, lp_nan)).encode('utf-8')
else:
return '\n'.join(map(f, itertuples(df))).encode('utf-8')