Code Review Asked by Rfroes87 on November 8, 2021
This has been my approach for "simplifying" MongoDB aggregation queries in a pythonic syntax – was my intent, at the very least:
from datetime import datetime, timedelta
class MatchStage(list):
def _get_day_range(self):
current_time = datetime.now()
return (current_time - timedelta(days=1)), current_time
def diferent(self, **kwargs):
for field, value in kwargs.items():
self.append({field: {"$ne": value}})
def equals(self, **kwargs):
for field, value in kwargs.items():
self.append({field: {"$eq": value}})
def set_interval(self, field, data):
self.starttime, self.endtime = map(datetime.fromtimestamp, ( int(data["starttime"]), int(data["endtime"]) ))
if {"starttime", "endtime"} <= data.keys() else self._get_day_range()
self.append({
field: {
"$gte": self.starttime,
"$lt": self.endtime
}
})
def set_devices(self, devices):
self.devices = devices
self.append({"device": {"$in": [device.id for device in devices]}})
class SortStage(dict):
@staticmethod
def ascending_order(*fields):
return {"$sort": {field: 1 for field in fields} }
@staticmethod
def descending_order(*fields):
return {"$sort": {field: -1 for field in fields} }
class ReRootStage(dict):
@staticmethod
def reset_root(*args):
return {"$replaceRoot": {"newRoot": {"$mergeObjects": [{key: '$' + key for key in args}, "$_id"]}}}
class GroupStage(dict):
def sum_metrics(self, **kwargs):
self.update({k: {"$sum": v if isinstance(v, int) else {"$toLong": v if v.startswith('$') else '$' + v}} for k, v in kwargs.items()})
def group_fields(self, *fields):
self["_id"].update({field: '$' + field for field in fields})
def _nest_field(self, key, field):
self[key] = {"$push": field}
def nest_field(self, key, field):
if not field.startswith('$'): field = '$' + field
self._nest_field(key, field)
def nest_fields(self, *fields, key, **kfields):
if not fields and not kfields: raise Exception("No field specified")
self._nest_field(key, {field: '$' + field for field in fields} or {field: '$' + value for field, value in kfields.items()})
class BasePipeline(list):
_match = MatchStage
_group = GroupStage
_sort = SortStage
_reroot = ReRootStage
def __init__(self, data, devices):
self._set_match().set_interval("time", data)
if "devid" in data:
devices = devices.filter(id__in=data["devid"].split(','))
assert devices.exists(), "Device not found"
self.match.set_devices(devices)
def _set_group(self):
self.group = self._group()
self.append({"$group": self.group})
return self.group
def _set_match(self):
self.match = self._match()
self.append({"$match": {"$and": self.match}})
return self.match
def set_simple_group(self, field):
self._set_group().update({"_id": field if field.startswith('$') else '$' + field})
self.extend([{"$addFields":{"id": "$_id"}}, {"$project": {"_id": 0}}])
def set_composite_group(self, *fields, **kfields):
if not fields and not kfields:
raise Exception("No fields specified")
self._set_group().update({"_id": {field: '$' + field for field in fields} or {field: (value if not isinstance(value, str) or value.startswith('$') else '$' + value) for field, value in kfields.items()}})
def sort_documents(self, *fields, descending=False):
self.append(self._sort.descending_order(*fields) if descending else self._sort.ascending_order(*fields))
class WebFilterBlockedPipeline(BasePipeline):
def aggregate_root(self, *args):
self.append(self._reroot.reset_root(*args))
def aggregate_data(self, **kfields):
self.group.sum_metrics(**kfields)
self.aggregate_root(*kfields)
@classmethod
def create(cls, data, devices):
pipeline = cls(data, devices)
pipeline.match.equals(action="blocked")
pipeline.set_composite_group("url", "source_ip", "profile")
pipeline.aggregate_data(count=1)
pipeline.sort_documents("count", descending=True)
pipeline.set_simple_group(field="url")
pipeline.group.sum_metrics(count="count")
pipeline.group.nest_field("source_ip", "profile", "count", key="users")
pipeline.sort_documents("count", descending=True)
return pipeline
Example output of WebFilterBlockedPipeline.create()
in JSON-format:
[{
"$match": {
"$and": [{
"time": {
"$gte": "2020-07-15T16:04:19",
"$lt": "2020-07-16T16:04:19"
}
},
{
"device": {
"$in": ["FG100ETK18035573"]
}
},
{
"action": {
"$eq": "blocked"
}
}]
}
},
{
"$group": {
"_id": {
"url": "$url",
"source_ip": "$source_ip",
"profile": "$profile"
},
"count": {
"$sum": 1
}
}
},
{
"$replaceRoot": {
"newRoot": {
"$mergeObjects": [{
"count": "$count"
},
"$_id"]
}
}
},
{
"$sort": {
"count": -1
}
},
{
"$group": {
"_id": "$url",
"count": {
"$sum": {
"$toLong": "$count"
}
},
"users": {
"$push": {
"source_ip": "$source_ip",
"profile": "$profile",
"count": "$count"
}
}
}
},
{
"$addFields": {
"id": "$_id"
}
},
{
"$project": {
"_id": 0
}
},
{
"$sort": {
"count": -1
}
}]
I’d like to ask if this is an acceptable implementation and, if not, if I should just scratch it completely, follow another design pattern or any constructive criticism.
Make use of line spacing to make the code easier for humans to read. Using a formatter on the code expands it from 100 lines to nearly 150. It should be even longer, there are multiple lines which are far too dense to read in one go. Avoid leaving out the new line in code like if not field.startswith('$'): field = '$' + field
or if not fields and not kfields: raise Exception("No field specified")
.
self.starttime, self.endtime = map(datetime.fromtimestamp, (int(data["starttime"]), int(data["endtime"])))
if {"starttime", "endtime"} <= data.keys() else self._get_day_range()
This line is way too packed. I can see how you would get to it, but it needs to be split back up.
Mapping over two values seems a bit overkill. Unless there will be more timestamps I would stick with the simpler code.
Putting the desired keys into a set and using subset (implicitly converting the dictionary keys to a set) is a nice trick. However, it is not a very common pattern, and a quick benchmark says it has worse performance than the more naive "starttime" in data and "endtime" in data
.
The simple benchmark in Ipython (with small dictionaries)
def f(data):
return {"starttime", "endtime"} <= data.keys()
def g(data):
return "starttime" in data and "endtime" in data
data1 = {"starttime": 50, "endtime": 100}
data2 = {"starttime": 50, "end": 100}
data3 = {"endtime": 100, "sus": 50}
data4 = {"start": 50, "end": 50}
data5 = {}
%timeit f(data1)
%timeit g(data1)
%timeit f(data2)
%timeit g(data2)
%timeit f(data3)
%timeit g(data3)
%timeit f(data4)
%timeit g(data4)
%timeit f(data5)
%timeit g(data5)
highlights a clear 3 to 4x win for the simple check.
I would prefer code more in line with
def set_interval(self, field, data):
if "starttime" in data and "endtime" in data:
self.starttime = datetime.fromtimestamp(int(data["starttime"]))
self.endtime = datetime.fromtimestamp(int(data["endtime"]))
else:
self.starttime, self.endtime = self._get_day_range()
self.append({field: {"$gte": self.starttime, "$lt": self.endtime}})
def sum_metrics(self, **kwargs):
self.update({k: {"$sum": v if isinstance(v, int) else {"$toLong": v if v.startswith('$') else '$' + v}} for k, v in kwargs.items()})
This is another place with too much on one line.
You have a repeated pattern of prepending a '$'
to a string if it doesn't have one already. I would make a little helper function to capture this logic, and question any code which doesn't use it. The other logic in the dictionary comprehension could also use a helper function.
def dollar(v):
"""Prepend a dollar sign to indicate this is a XXX."""
if v.startswith("$"):
return v
return "$" + v
def sum_metrics(self, **kwargs):
def encode_metric(v):
if isinstance(v, int):
return v
return {"$toLong": dollar(v)}
metrics = {k: {"$sum": encode_metric(v)} for k, v in kwargs.items()}
self.update(metrics)
def nest_fields(self, *fields, key, **kfields):
if not fields and not kfields:
raise Exception("No field specified")
self._nest_field(key, {field: '$' + field for field in fields} or {field: '$' + value for field, value in kfields.items()})
Using a broad/generic Exception is a bad habit, it might only rarely be a problem, but when it is a problem it is painful to debug. Consider a more specific exception like ValueError or a custom exception.
class EmptyFieldsError(ValueError):
pass
There are two branches in this code (after the initial check). Is that obvious from the line presented? If you flip the conditions to be positive you can make the logic a lot easier to follow at a glance.
I would have expected the parameter order to be nest_fields(self, key, *fields, **kfields)
since then the key is first, followed by a list of parameters, rather than the key looking like the last parameter in a list of them.
def nest_fields(self, *fields, key, **kfields):
nested_fields = None
if fields:
nested_fields = {field: dollar(field) for field in fields}
elif kfields:
nested_fields = {
field: dollar(value)
for field, value in kfields.items()
}
if nested_fields is None:
raise EmptyFieldsError("No field specified")
self._nest_field(key, nested_fields)
def sort_documents(self, *fields, descending=False):
self.append(self._sort.descending_order(*fields) if descending else self._sort.ascending_order(*fields))
This is a difficult API to read. Why not copy Python's sorted
and give _sort
a reverse
positional arg?
class SortStage(dict):
@staticmethod
def order(*fields, reverse=False):
return {"$sort": {field: -1 if reverse else 1 for field in fields}}
def sort_documents(self, *fields, descending=False):
sorted_documents = self._sort.order(*fields, reverse=descending)
self.append(sorted_documents)
Answered by spyr03 on November 8, 2021
Get help from others!
Recent Answers
Recent Questions
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP