139 lines
5.5 KiB
Python
139 lines
5.5 KiB
Python
import datetime
|
|
import os
|
|
from typing import List, Dict
|
|
|
|
from databases import Database
|
|
from sqlalchemy import (Column, DateTime, Integer, Float, String,
|
|
MetaData, Table, UniqueConstraint, create_engine)
|
|
|
|
import sqlalchemy
|
|
from sqlalchemy.sql import func
|
|
|
|
class EnergyDB():
|
|
def __init__(self, db_url : str):
|
|
self._engine = create_engine(db_url, connect_args={"check_same_thread": False})
|
|
|
|
# self.database = Database(db_url)
|
|
self._conn = self._engine.connect()
|
|
|
|
self._metadata = MetaData(self._engine)
|
|
self._tables = {
|
|
"energy": Table(
|
|
"energy", self._metadata,
|
|
Column("timestamp", DateTime, primary_key=True),
|
|
Column("channel_id", Integer(), nullable=False),
|
|
Column("value", Float),
|
|
UniqueConstraint("timestamp"),
|
|
),
|
|
"channels": Table(
|
|
"channels", self._metadata,
|
|
# Column("id", Integer(), autoincrement = True),
|
|
Column("id", Integer()), #, primary_key = True),
|
|
Column("name", String(), primary_key = True),
|
|
),
|
|
}
|
|
self._metadata.create_all(self._engine)
|
|
|
|
def metadata(self):
|
|
return self._metadata
|
|
|
|
def url(self) -> str:
|
|
return str(self._engine.url)
|
|
|
|
def engine(self):
|
|
return self._engine
|
|
|
|
def table(self, name : str) -> sqlalchemy.Table:
|
|
return self._tables[name]
|
|
|
|
def execute(self, cmd, values = None, **args):
|
|
if values is None:
|
|
return self._conn.execute(cmd, args)
|
|
else:
|
|
return self._conn.execute(cmd, values, args)
|
|
return self._conn.execute(cmd, args)
|
|
|
|
def addChannels(self, channelNames: List[str]):
|
|
result = []
|
|
try:
|
|
# query = self.table("channels").insert()
|
|
query = sqlalchemy.sql.text("INSERT INTO channels (name) VALUES(:name)")
|
|
# nameDicts = [ {"name": name } for name in channelNames]
|
|
# result = self.execute(query, name=channelNames)
|
|
for n in channelNames:
|
|
result.append({"n": n})
|
|
self.execute(query, name=n)
|
|
except Exception as e:
|
|
raise Exception(f"Database error in addChannels(): {type(e)} - {str(e)}")
|
|
return {
|
|
"query": str(query),
|
|
"result": result,
|
|
# "result": [str(r) for r in result.fetchall()]
|
|
}
|
|
|
|
def getChannels(self) -> dict:
|
|
try:
|
|
table_channels = self.table("channels")
|
|
query = sqlalchemy.select([table_channels.c.name]).select_from(table_channels)
|
|
channels = [dict(r.items()) for r in self.execute(query).fetchall()]
|
|
except Exception as e:
|
|
raise Exception(f"Database error in getChannels(): {type(e)} - {str(e)}")
|
|
return {
|
|
"channels": channels,
|
|
"query": str(query),
|
|
}
|
|
|
|
def getChannelId(self, channelName : str) -> int:
|
|
try:
|
|
query = sqlalchemy.sql.text(f"SELECT _ROWID_, name FROM channels WHERE name == :name")
|
|
chId = self.execute(query, name=channelName).scalar()
|
|
except Exception as e:
|
|
raise Exception(f"Database error in getChannelId(): {type(e)} - {str(e)}")
|
|
if chId is None:
|
|
raise Exception(f"Database error in getChannelId(): channel '{channelName}' not found")
|
|
return chId
|
|
|
|
def getChannelData(self, channelIds : List[int], fromTime : datetime.datetime, tillTime : datetime.datetime) -> Dict[int, list]:
|
|
try:
|
|
chData = {}
|
|
query = sqlalchemy.sql.text(
|
|
"""SELECT timestamp, value FROM energy
|
|
WHERE channel_id == :channel_id
|
|
AND timestamp >= :fromTime
|
|
AND timestamp <= :tillTime
|
|
ORDER BY timestamp"""
|
|
)
|
|
for ch in channelIds:
|
|
result = self.execute(query, channel_id = ch, fromTime = fromTime, tillTime = tillTime)
|
|
data = [{"timestamp": datetime.datetime.fromisoformat(row[0]), "value": row[1]} for row in result.fetchall() ]
|
|
chData[ch] = data
|
|
return chData
|
|
except Exception as e:
|
|
raise Exception(f"Database error in getChannelData(): {type(e)} - {str(e)}")
|
|
|
|
def addChannelData(self, channel_id : int, data : List[Dict[datetime.datetime, float]]):
|
|
try:
|
|
# query = sqlalchemy.sql.text(
|
|
# "INSERT INTO energy (channel_id, timestamp, value) VALUES "
|
|
# )
|
|
queryStr = "INSERT INTO energy (channel_id, timestamp, value) VALUES "
|
|
valueStr = ""
|
|
for d in data:
|
|
timestamp = d["timestamp"]
|
|
value = d["value"]
|
|
# self.execute(
|
|
# # query,
|
|
# # channel_id=channel_id,
|
|
# # timestamp = d["timestamp"],
|
|
# # value=d["value"]
|
|
# sqlalchemy.sql.text(
|
|
# f"""INSERT INTO energy (channel_id, timestamp, value)
|
|
# VALUES ('{channel_id}', '{timestamp}', '{value}')"""
|
|
# )
|
|
# )
|
|
if valueStr != "":
|
|
valueStr += ",\n"
|
|
valueStr += f"({channel_id}, '{timestamp}', {value})"
|
|
self.execute(sqlalchemy.sql.text(queryStr + valueStr + ";"))
|
|
except Exception as e:
|
|
raise Exception(f"Database error in addChannelData(): {type(e)} - {str(e)}")
|