import os from typing import List from databases import Database from sqlalchemy import (Column, DateTime, Integer, Float, String, MetaData, Table, UniqueConstraint, create_engine) import sqlalchemy from sqlalchemy.sql import func class EnergyDB(): def __init__(self, db_url : str): self._engine = create_engine(db_url, connect_args={"check_same_thread": False}) # self.database = Database(db_url) self._conn = self._engine.connect() self._metadata = MetaData(self._engine) self._tables = { "energy": Table( "energy", self._metadata, Column("timestamp", DateTime, primary_key=True), Column("channel_id", Integer(), nullable=False), Column("value", Float), UniqueConstraint("timestamp"), ), "channels": Table( "channels", self._metadata, # Column("id", Integer(), autoincrement = True), Column("id", Integer()), #, primary_key = True), Column("name", String(), primary_key = True), ), } self._metadata.create_all(self._engine) def metadata(self): return self._metadata def url(self): return self._engine.url def engine(self): return self._engine def table(self, name : str) -> sqlalchemy.Table: return self._tables[name] def execute(self, cmd, values = None, **args): if values is None: return self._conn.execute(cmd, args) else: return self._conn.execute(cmd, values, args) return self._conn.execute(cmd, args) def addChannels(self, channelNames: List[str]): result = [] try: # query = self.table("channels").insert() query = sqlalchemy.sql.text("INSERT INTO channels (name) VALUES(:name)") # nameDicts = [ {"name": name } for name in channelNames] # result = self.execute(query, name=channelNames) for n in channelNames: result.append({"n": n}) self.execute(query, name=n) except Exception as e: raise Exception(f"Database error in addChannels(): {type(e)} - {str(e)}") return { "query": str(query), "result": result, # "result": [str(r) for r in result.fetchall()] } def getChannels(self) -> dict: try: table_channels = self.table("channels") query = sqlalchemy.select([table_channels.c.name, table_channels.c.id]).select_from(table_channels) channels = [dict(r.items()) for r in self.execute(query).fetchall()] except Exception as e: raise Exception(f"Database error in getChannels(): {type(e)} - {str(e)}") return { "channels": channels, "query": str(query), } def getChannelId(self, channelName : str) -> int: try: query = sqlalchemy.sql.text(f"SELECT _ROWID_, name FROM channels WHERE name == :name") chId = self.execute(query, name=channelName).scalar() except Exception as e: raise Exception(f"Database error in getChannelId(): {type(e)} - {str(e)}") if chId is None: raise Exception(f"Database error in getChannelId(): channel '{channelName}' not found") return chId