TransWikia.com

Python REST API and Mongo - Aggregation Pipeline/Stage classes

Code Review Asked by Rfroes87 on November 8, 2021

This has been my approach for "simplifying" MongoDB aggregation queries in a pythonic syntax – was my intent, at the very least:

from datetime import datetime, timedelta

class MatchStage(list):
    def _get_day_range(self):
        current_time = datetime.now()
        return (current_time - timedelta(days=1)), current_time
    def diferent(self, **kwargs):
        for field, value in kwargs.items():
            self.append({field: {"$ne": value}})
    def equals(self, **kwargs):
        for field, value in kwargs.items():
            self.append({field: {"$eq": value}})
    def set_interval(self, field, data):
        self.starttime, self.endtime = map(datetime.fromtimestamp, ( int(data["starttime"]), int(data["endtime"]) )) 
            if {"starttime", "endtime"} <= data.keys() else self._get_day_range()
        self.append({
            field: {
                "$gte": self.starttime,
                "$lt": self.endtime
            }
        })
    def set_devices(self, devices):
        self.devices = devices
        self.append({"device": {"$in": [device.id for device in devices]}})

class SortStage(dict):
    @staticmethod
    def ascending_order(*fields):
        return {"$sort": {field: 1 for field in fields} }
    @staticmethod
    def descending_order(*fields):
        return {"$sort": {field: -1 for field in fields} }

class ReRootStage(dict):
    @staticmethod
    def reset_root(*args):
        return {"$replaceRoot": {"newRoot": {"$mergeObjects": [{key: '$' + key for key in args}, "$_id"]}}}

class GroupStage(dict):
    def sum_metrics(self, **kwargs):
        self.update({k: {"$sum": v if isinstance(v, int) else {"$toLong": v if v.startswith('$') else '$' + v}} for k, v in kwargs.items()})
    def group_fields(self, *fields):
        self["_id"].update({field: '$' + field for field in fields})
    def _nest_field(self, key, field):
        self[key] = {"$push": field}
    def nest_field(self, key, field):
        if not field.startswith('$'): field = '$' + field
        self._nest_field(key, field)
    def nest_fields(self, *fields, key, **kfields):
        if not fields and not kfields: raise Exception("No field specified")
        self._nest_field(key, {field: '$' + field for field in fields} or {field: '$' + value for field, value in kfields.items()})
    
class BasePipeline(list):
    _match = MatchStage
    _group = GroupStage
    _sort = SortStage
    _reroot = ReRootStage
    def __init__(self, data, devices):
        self._set_match().set_interval("time", data)
        if "devid" in data:
            devices = devices.filter(id__in=data["devid"].split(','))
            assert devices.exists(), "Device not found"
        self.match.set_devices(devices)
    def _set_group(self):
        self.group = self._group()
        self.append({"$group": self.group})
        return self.group
    def _set_match(self):
        self.match = self._match()
        self.append({"$match": {"$and": self.match}})
        return self.match
    def set_simple_group(self, field):
        self._set_group().update({"_id": field if field.startswith('$') else '$' + field})
        self.extend([{"$addFields":{"id": "$_id"}}, {"$project": {"_id": 0}}])
    def set_composite_group(self, *fields, **kfields):
        if not fields and not kfields:
            raise Exception("No fields specified")
        self._set_group().update({"_id": {field: '$' + field for field in fields} or {field: (value if not isinstance(value, str) or value.startswith('$') else '$' + value) for field, value in kfields.items()}})
    def sort_documents(self, *fields, descending=False):
        self.append(self._sort.descending_order(*fields) if descending else self._sort.ascending_order(*fields))

class WebFilterBlockedPipeline(BasePipeline):
    def aggregate_root(self, *args):
        self.append(self._reroot.reset_root(*args))
    def aggregate_data(self, **kfields):
        self.group.sum_metrics(**kfields)
        self.aggregate_root(*kfields)
    @classmethod
    def create(cls, data, devices):
        pipeline = cls(data, devices)
        pipeline.match.equals(action="blocked")
        pipeline.set_composite_group("url", "source_ip", "profile")
        pipeline.aggregate_data(count=1)
        pipeline.sort_documents("count", descending=True)
        pipeline.set_simple_group(field="url")
        pipeline.group.sum_metrics(count="count")
        pipeline.group.nest_field("source_ip", "profile", "count", key="users")
        pipeline.sort_documents("count", descending=True)
        return pipeline

Example output of WebFilterBlockedPipeline.create() in JSON-format:

[{
    "$match": {
        "$and": [{
            "time": {
                "$gte": "2020-07-15T16:04:19",
                "$lt": "2020-07-16T16:04:19"
            }
        },
        {
            "device": {
                "$in": ["FG100ETK18035573"]
            }
        },
        {
            "action": {
                "$eq": "blocked"
            }
        }]
    }
},
{
    "$group": {
        "_id": {
            "url": "$url",
            "source_ip": "$source_ip",
            "profile": "$profile"
        },
        "count": {
            "$sum": 1
        }
    }
},
{
    "$replaceRoot": {
        "newRoot": {
            "$mergeObjects": [{
                "count": "$count"
            },
            "$_id"]
        }
    }
},
{
    "$sort": {
        "count": -1
    }
},
{
    "$group": {
        "_id": "$url",
        "count": {
            "$sum": {
                "$toLong": "$count"
            }
        },
        "users": {
            "$push": {
                "source_ip": "$source_ip",
                "profile": "$profile",
                "count": "$count"
            }
        }
    }
},
{
    "$addFields": {
        "id": "$_id"
    }
},
{
    "$project": {
        "_id": 0
    }
},
{
    "$sort": {
        "count": -1
    }
}]

I’d like to ask if this is an acceptable implementation and, if not, if I should just scratch it completely, follow another design pattern or any constructive criticism.

One Answer

Breathe!

Make use of line spacing to make the code easier for humans to read. Using a formatter on the code expands it from 100 lines to nearly 150. It should be even longer, there are multiple lines which are far too dense to read in one go. Avoid leaving out the new line in code like if not field.startswith('$'): field = '$' + field or if not fields and not kfields: raise Exception("No field specified").


self.starttime, self.endtime = map(datetime.fromtimestamp, (int(data["starttime"]), int(data["endtime"]))) 
        if {"starttime", "endtime"} <= data.keys() else self._get_day_range()

This line is way too packed. I can see how you would get to it, but it needs to be split back up.

Mapping over two values seems a bit overkill. Unless there will be more timestamps I would stick with the simpler code.

Putting the desired keys into a set and using subset (implicitly converting the dictionary keys to a set) is a nice trick. However, it is not a very common pattern, and a quick benchmark says it has worse performance than the more naive "starttime" in data and "endtime" in data.

The simple benchmark in Ipython (with small dictionaries)

def f(data): 
    return {"starttime", "endtime"} <= data.keys()

def g(data): 
    return "starttime" in data and "endtime" in data

data1 = {"starttime": 50, "endtime": 100}                                                                                                                                    
data2 = {"starttime": 50, "end": 100}                                                                                                                                       
data3 = {"endtime": 100, "sus": 50}                                                                                                                                         
data4 = {"start": 50, "end": 50}                                                                                                                                            
data5 = {}

%timeit f(data1)
%timeit g(data1)
%timeit f(data2)
%timeit g(data2)
%timeit f(data3)
%timeit g(data3)
%timeit f(data4)
%timeit g(data4)
%timeit f(data5)
%timeit g(data5)

highlights a clear 3 to 4x win for the simple check.

table of benchmark results for the two potential implementations of 'check multiple keys are in a dictionary'

I would prefer code more in line with

def set_interval(self, field, data):
    if "starttime" in data and "endtime" in data:
        self.starttime = datetime.fromtimestamp(int(data["starttime"]))
        self.endtime = datetime.fromtimestamp(int(data["endtime"]))
    else:
        self.starttime, self.endtime = self._get_day_range()

    self.append({field: {"$gte": self.starttime, "$lt": self.endtime}})

def sum_metrics(self, **kwargs):
    self.update({k: {"$sum": v if isinstance(v, int) else {"$toLong": v if v.startswith('$') else '$' + v}} for k, v in kwargs.items()})

This is another place with too much on one line.

You have a repeated pattern of prepending a '$' to a string if it doesn't have one already. I would make a little helper function to capture this logic, and question any code which doesn't use it. The other logic in the dictionary comprehension could also use a helper function.

def dollar(v):
    """Prepend a dollar sign to indicate this is a XXX."""
    if v.startswith("$"):
        return v
    return "$" + v

def sum_metrics(self, **kwargs):
    def encode_metric(v):
        if isinstance(v, int):
            return v

        return {"$toLong": dollar(v)}

    metrics = {k: {"$sum": encode_metric(v)} for k, v in kwargs.items()}
    self.update(metrics)

def nest_fields(self, *fields, key, **kfields):
    if not fields and not kfields:
        raise Exception("No field specified")
    self._nest_field(key, {field: '$' + field for field in fields} or {field: '$' + value for field, value in kfields.items()})

Using a broad/generic Exception is a bad habit, it might only rarely be a problem, but when it is a problem it is painful to debug. Consider a more specific exception like ValueError or a custom exception.

class EmptyFieldsError(ValueError):
    pass

There are two branches in this code (after the initial check). Is that obvious from the line presented? If you flip the conditions to be positive you can make the logic a lot easier to follow at a glance.

I would have expected the parameter order to be nest_fields(self, key, *fields, **kfields) since then the key is first, followed by a list of parameters, rather than the key looking like the last parameter in a list of them.

def nest_fields(self, *fields, key, **kfields):
    nested_fields = None
    if fields:
        nested_fields = {field: dollar(field) for field in fields}
    elif kfields:
        nested_fields = {
            field: dollar(value)
            for field, value in kfields.items()
        }

    if nested_fields is None:
        raise EmptyFieldsError("No field specified")

    self._nest_field(key, nested_fields)

def sort_documents(self, *fields, descending=False):
    self.append(self._sort.descending_order(*fields) if descending else self._sort.ascending_order(*fields))

This is a difficult API to read. Why not copy Python's sorted and give _sort a reverse positional arg?

class SortStage(dict):
    @staticmethod
    def order(*fields, reverse=False):
        return {"$sort": {field: -1 if reverse else 1 for field in fields}}

def sort_documents(self, *fields, descending=False):
    sorted_documents = self._sort.order(*fields, reverse=descending)
    self.append(sorted_documents)

Answered by spyr03 on November 8, 2021

Add your own answers!

Ask a Question

Get help from others!

© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP