energyDB/srv/db.py

93 lines
3.4 KiB
Python

import os
from typing import List
from databases import Database
from sqlalchemy import (Column, DateTime, Integer, Float, String,
MetaData, Table, UniqueConstraint, create_engine)
import sqlalchemy
from sqlalchemy.sql import func
class EnergyDB():
def __init__(self, db_url : str):
self._engine = create_engine(db_url, connect_args={"check_same_thread": False})
# self.database = Database(db_url)
self._conn = self._engine.connect()
self._metadata = MetaData(self._engine)
self._tables = {
"energy": Table(
"energy", self._metadata,
Column("timestamp", DateTime, primary_key=True),
Column("channel_id", Integer(), nullable=False),
Column("value", Float),
UniqueConstraint("timestamp"),
),
"channels": Table(
"channels", self._metadata,
# Column("id", Integer(), autoincrement = True),
Column("id", Integer()), #, primary_key = True),
Column("name", String(), primary_key = True),
),
}
self._metadata.create_all(self._engine)
def metadata(self):
return self._metadata
def url(self):
return self._engine.url
def engine(self):
return self._engine
def table(self, name : str) -> sqlalchemy.Table:
return self._tables[name]
def execute(self, cmd, values = None, **args):
if values is None:
return self._conn.execute(cmd, args)
else:
return self._conn.execute(cmd, values, args)
return self._conn.execute(cmd, args)
def addChannels(self, channelNames: List[str]):
result = []
try:
# query = self.table("channels").insert()
query = sqlalchemy.sql.text("INSERT INTO channels (name) VALUES(:name)")
# nameDicts = [ {"name": name } for name in channelNames]
# result = self.execute(query, name=channelNames)
for n in channelNames:
result.append({"n": n})
self.execute(query, name=n)
except Exception as e:
raise Exception(f"Database error in addChannels(): {type(e)} - {str(e)}")
return {
"query": str(query),
"result": result,
# "result": [str(r) for r in result.fetchall()]
}
def getChannels(self) -> dict:
try:
table_channels = self.table("channels")
query = sqlalchemy.select([table_channels.c.name, table_channels.c.id]).select_from(table_channels)
channels = [dict(r.items()) for r in self.execute(query).fetchall()]
except Exception as e:
raise Exception(f"Database error in getChannels(): {type(e)} - {str(e)}")
return {
"channels": channels,
"query": str(query),
}
def getChannelId(self, channelName : str) -> int:
try:
query = sqlalchemy.sql.text(f"SELECT _ROWID_, name FROM channels WHERE name == :name")
chId = self.execute(query, name=channelName).scalar()
except Exception as e:
raise Exception(f"Database error in getChannelId(): {type(e)} - {str(e)}")
if chId is None:
raise Exception(f"Database error in getChannelId(): channel '{channelName}' not found")
return chId