Source code for aioinflux.iterutils

from typing import Optional, Iterator, Callable


[docs]class InfluxDBResult: """Wrapper around an InfluxDB response""" __slots__ = ('_data', 'parser', 'query') def __init__(self, data, parser=None, query=None): self._data = data self.parser = parser self.query = query @property def data(self): return self._data @property def series_count(self): return len(self._count()) def __len__(self): """Returns number of total points in data""" return sum(self._count()) def __repr__(self): q = self.query[:80] + '...' if len(self.query) > 80 else self.query return f'<{type(self).__name__} [q="{q}"]>' def __iter__(self): return iterpoints(self.data, parser=self.parser) def show(self): return list(self) def _count(self): return [len(series['values']) for statement in self._data['results'] if 'series' in statement for series in statement['series']]
[docs]class InfluxDBChunkedResult: """Wrapper around an InfluxDB chunked response""" __slots__ = ('_gen', 'parser', 'query') def __init__(self, gen, parser=None, query=None): self._gen = gen self.parser = parser self.query = query @property def gen(self): return self._gen def __repr__(self): q = self.query[:80] + '...' if len(self.query) > 80 else self.query return f'<{type(self).__name__} [q="{q}"]>' def __aiter__(self): return self.iterpoints() async def iterpoints(self): async for chunk in self._gen: for i in iterpoints(chunk, parser=self.parser): yield i async def iterchunks(self, wrap=False): async for chunk in self._gen: if wrap: yield InfluxDBResult(chunk, parser=self.parser, query=self.query) else: yield chunk
[docs]def iterpoints(resp: dict, parser: Optional[Callable] = None) -> Iterator: """Iterates a response JSON yielding data point by point. Can be used with both regular and chunked responses. By default, returns just a plain list of values representing each point, without column names, or other metadata. In case a specific format is needed, an optional ``parser`` argument can be passed. ``parser`` is a function that takes a list of values for each data point and a metadata dictionary containing all or a subset of the following: ``{'columns', 'name', 'tags', 'statement_id'}``. Sample parser function: .. code:: python def parser(x, meta): return dict(zip(meta['columns'], x)) :param resp: Dictionary containing parsed JSON (output from InfluxDBClient.query) :param parser: Optional parser function :return: Generator object """ for statement in resp['results']: if 'series' not in statement: continue for series in statement['series']: meta = {k: series[k] for k in series if k != 'values'} meta['statement_id'] = statement['statement_id'] if parser is None: return (x for x in series['values']) else: return (parser(x, meta) for x in series['values']) return iter([])