Code Review Asked on October 27, 2021
As an exercise, I’ve decided to write a lightweight, dictionary type database. Below are some of the features I’ve implemented:
Questions:
Fernet
in the past, and the absolute requirement of a 32 character long password ensures it will take a considerable amount of time to break the encryption. I’m also fairly concerned about the time between each encrypt and decrypt. Should I only decrypt when the user wants to insert or query the database?Fernet
decide if the password is correct, instead of implementing something myself. Is this a good way of going about this?__encrypt_db
. I’m familiar with the purpose of hiding functions that are meant to be internal. Am I using this convention correctly?lindb.py
"""
LinDB
@author Ben Antonellis.
@date 07-17-2020.
"""
import os
import json
import base64
from cryptography.fernet import Fernet
from cryptography.fernet import InvalidToken
from typing import Any, Union, List, Dict
class LinDB():
def __init__(self, name, pw=None):
self.name = name
self.__pw = pw
self.db = {}
self.file_name = f"{self.name}.json"
self.connected = False
self.new_db = False
self.encrypt = self.__pw != None
if self.encrypt:
if len(self.__pw) > 32:
raise PasswordLengthError("Password must be at least 32 characters long!")
self.__pw = base64.urlsafe_b64encode(self.__pw.encode())
self.fernet = Fernet(self.__pw)
self.__create_db_file()
def insert(self, pair: Dict, overwrite:bool=False) -> None:
"""
Allows the user to insert a dictionary into the database.
"""
if not self.connected:
quit("Please call .connect() to connect to database!")
for key in pair:
value = pair[key]
if overwrite:
for pair_key, db_key in zip(pair, self.db):
if pair_key == db_key:
self.db[db_key] = value
break
self.db.update(pair)
def query(self, key:Any=None, value:Any=None) -> Union[None, List[Any], bool]:
"""
Querys the database for either the key or value.
If both key and value:
Return position in database the first pair was found.
If just key:
Return value associated with key.
If just value:
Return all keys with associated value.
"""
if not self.connected:
quit("Please call .connect() to connect to database!")
try:
if key and value:
index = 0
for k, v in self.db.items():
if k == key and v == value:
return index
index += 1
if key and not value:
return self.db[key]
if value and not key:
return [k for k, v in self.db.items() if v == value]
except KeyError:
return
def save(self) -> None:
"""
Saves the current database to the file.
"""
if not self.connected:
quit("Please call .connect() to connect to database!")
with open(self.file_name, "w") as db_file:
json.dump(self.db, db_file, ensure_ascii=False)
def connect(self) -> None:
"""
Indicates to the database that it should start decrypting now.
"""
if self.__db_empty():
self.connected = True
return
if self.encrypt:
try:
if not self.new_db:
self.__decrypt_db()
self.connected = True
self.__load_db_file()
except InvalidToken:
quit("Wrong password for database!")
def done(self) -> None:
"""
Indicates to the database that it should start encrypting now.
"""
if not self.connected:
quit("Please call .connect() to connect to database!")
if self.encrypt:
self.__encrypt_db()
self.connected = False
def __create_db_file(self) -> None:
"""
Creates a database file with the name of the database as the filename.
"""
if not os.path.exists(self.file_name):
_ = open(self.file_name, "w").close()
self.new_db = True
def __load_db_file(self) -> None:
"""
Load the database into the current database dictionary.
"""
with open(self.file_name, "r") as db_file:
try:
json.load(db_file)
except json.decoder.JSONDecodeError:
print("Previous database not found. Creating new database.")
self.db = {}
def __encrypt_db(self) -> None:
"""
Encrypts the database with Fernet.
"""
with open(self.file_name, 'rb') as db_file:
db = db_file.readline()
encrypted = self.fernet.encrypt(db)
with open(self.file_name, 'wb') as db_file:
db_file.write(encrypted)
def __decrypt_db(self) -> None:
"""
Decrypts the database with Fernet.
"""
with open(self.file_name, 'rb') as db_file:
db = db_file.readline()
decrypted = self.fernet.decrypt(db)
with open(self.file_name, 'wb') as db_file:
db_file.write(decrypted)
def __db_empty(self) -> bool:
"""
Determines if the database if empty.
"""
with open(self.file_name, "r") as db_file:
return not db_file.readlines()
def __repr__(self):
return f"DB: {self.name}"
class PasswordLengthError(Exception):
"""
Raised when the user enters a password less than 32 characters long.
"""
def __init__(self, message):
super().__init__(message)
Below is an example file of how an average user would work with this database:
test_db.py
from lindb import LinDB
# Example password 32 characters long #
pw = "zSLfLhAvjhmX6CrzCbxSE2dzXEZaiOfO"
db = LinDB("DB_TEST", pw=pw)
# Decrypts the file if the password is correct #
db.connect()
# Start inserting pairs #
db.insert({"Ben": 16})
db.insert({"Hannah": 17})
db.insert({"Will": 18})
# Query database and display results #
results = [
db.query(value=16),
db.query(key="Hannah"),
db.query(key="Will", value=18),
db.query(key="Test")
]
for result in results:
print(result)
# Demonstrating the ability to use assignment expressions #
# Should the key and/or value not exist, None is returned #
if result := db.query(key="Be"):
print(result)
# This writes the current database to the file #
db.save()
# Encrypts the file #
db.done()
Both impressive and ambitious!
the absolute requirement of a 32 character long password ensures it will take a considerable amount of time to break the encryption
It will also ensure that some users will be writing that password down or saving it to a text file, defeating the entire purpose of a password. A softer approach would be, during the password saving procedure, do an entropy check with a library that provides this. Issue a warning if the entropy is below a predetermined value.
I let Fernet decide if the password is correct, instead of implementing something myself. Is this a good way of going about this?
Yes!
I'm also fairly concerned about the time between each encrypt and decrypt. Should I only decrypt when the user wants to insert or query the database?
That's a loaded question. If you expect your database to be potentially massive (over the size of RAM), then some of it will need to stay on disc, and it might as well stay encrypted there.
The bigger question is: how do you cache your data? If the cache is aggressively memory-resident, it might be considered a security weakness to hold onto unencrypted contents in RAM for long periods of time. Another factor is the maximum acceptable latency between receiving a query, decrypting the contents on-the-fly if necessary and returning the result. Yet another factor is convenience of use: is authentication per-query, or per-session? I've never seen any databases authenticate per-query, but it's not entirely out of the question.
I don't have good answers to these, so I suggest that you do some testing at scale.
the double underscore ones, such as __encrypt_db. I'm familiar with the purpose of hiding functions that are meant to be internal. Am I using this convention correctly?
Not really. It should just be _encrypt_db
. Read more here.
You're in Python 3, so these parens are not necessary:
class LinDB():
pair: Dict
A dictionary of what? Dict[str, str]
? Also,
name, pw=None
is probably
name: str, pw: Optional[str] = None
This return type:
Union[None, List[Any], bool]
is a huge red flag that your query method is not specific enough, and trying to do too many things at once. I think your callers will not find the merging of all of these invocations convenient, and would benefit instead from you separating this out into query_for_key
, query_for_value
, etc.
Answered by Reinderien on October 27, 2021
Get help from others!
Recent Questions
Recent Answers
© 2024 TransWikia.com. All rights reserved. Sites we Love: PCI Database, UKBizDB, Menu Kuliner, Sharing RPP