energyDB/srv/db.py

139 lines
5.5 KiB
Python

import datetime
import os
from typing import List, Dict
from databases import Database
from sqlalchemy import (Column, DateTime, Integer, Float, String,
MetaData, Table, UniqueConstraint, create_engine)
import sqlalchemy
from sqlalchemy.sql import func
class EnergyDB():
def __init__(self, db_url : str):
self._engine = create_engine(db_url, connect_args={"check_same_thread": False})
# self.database = Database(db_url)
self._conn = self._engine.connect()
self._metadata = MetaData(self._engine)
self._tables = {
"energy": Table(
"energy", self._metadata,
Column("timestamp", DateTime, primary_key=True),
Column("channel_id", Integer(), nullable=False),
Column("value", Float),
UniqueConstraint("timestamp"),
),
"channels": Table(
"channels", self._metadata,
# Column("id", Integer(), autoincrement = True),
Column("id", Integer()), #, primary_key = True),
Column("name", String(), primary_key = True),
),
}
self._metadata.create_all(self._engine)
def metadata(self):
return self._metadata
def url(self) -> str:
return str(self._engine.url)
def engine(self):
return self._engine
def table(self, name : str) -> sqlalchemy.Table:
return self._tables[name]
def execute(self, cmd, values = None, **args):
if values is None:
return self._conn.execute(cmd, args)
else:
return self._conn.execute(cmd, values, args)
return self._conn.execute(cmd, args)
def addChannels(self, channelNames: List[str]):
result = []
try:
# query = self.table("channels").insert()
query = sqlalchemy.sql.text("INSERT INTO channels (name) VALUES(:name)")
# nameDicts = [ {"name": name } for name in channelNames]
# result = self.execute(query, name=channelNames)
for n in channelNames:
result.append({"n": n})
self.execute(query, name=n)
except Exception as e:
raise Exception(f"Database error in addChannels(): {type(e)} - {str(e)}")
return {
"query": str(query),
"result": result,
# "result": [str(r) for r in result.fetchall()]
}
def getChannels(self) -> dict:
try:
table_channels = self.table("channels")
query = sqlalchemy.select([table_channels.c.name]).select_from(table_channels)
channels = [dict(r.items()) for r in self.execute(query).fetchall()]
except Exception as e:
raise Exception(f"Database error in getChannels(): {type(e)} - {str(e)}")
return {
"channels": channels,
"query": str(query),
}
def getChannelId(self, channelName : str) -> int:
try:
query = sqlalchemy.sql.text(f"SELECT _ROWID_, name FROM channels WHERE name == :name")
chId = self.execute(query, name=channelName).scalar()
except Exception as e:
raise Exception(f"Database error in getChannelId(): {type(e)} - {str(e)}")
if chId is None:
raise Exception(f"Database error in getChannelId(): channel '{channelName}' not found")
return chId
def getChannelData(self, channelIds : List[int], fromTime : datetime.datetime, tillTime : datetime.datetime) -> Dict[int, list]:
try:
chData = {}
query = sqlalchemy.sql.text(
"""SELECT timestamp, value FROM energy
WHERE channel_id == :channel_id
AND timestamp >= :fromTime
AND timestamp <= :tillTime
ORDER BY timestamp"""
)
for ch in channelIds:
result = self.execute(query, channel_id = ch, fromTime = fromTime, tillTime = tillTime)
data = [{"timestamp": datetime.datetime.fromisoformat(row[0]), "value": row[1]} for row in result.fetchall() ]
chData[ch] = data
return chData
except Exception as e:
raise Exception(f"Database error in getChannelData(): {type(e)} - {str(e)}")
def addChannelData(self, channel_id : int, data : List[Dict[datetime.datetime, float]]):
try:
# query = sqlalchemy.sql.text(
# "INSERT INTO energy (channel_id, timestamp, value) VALUES "
# )
queryStr = "INSERT INTO energy (channel_id, timestamp, value) VALUES "
valueStr = ""
for d in data:
timestamp = d["timestamp"]
value = d["value"]
# self.execute(
# # query,
# # channel_id=channel_id,
# # timestamp = d["timestamp"],
# # value=d["value"]
# sqlalchemy.sql.text(
# f"""INSERT INTO energy (channel_id, timestamp, value)
# VALUES ('{channel_id}', '{timestamp}', '{value}')"""
# )
# )
if valueStr != "":
valueStr += ",\n"
valueStr += f"({channel_id}, '{timestamp}', {value})"
self.execute(sqlalchemy.sql.text(queryStr + valueStr + ";"))
except Exception as e:
raise Exception(f"Database error in addChannelData(): {type(e)} - {str(e)}")