energyDB/srv/energyDB.py

246 lines
7.5 KiB
Python

from typing import Optional, List, Dict
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel
import datetime
import os
from .db import EnergyDB
import logging
API_VERSION_MAJOR = 1
API_VERSION_MINOR = 0
DB_URL = os.getenv("DATABASE_URL") #, default="sqlite://")
if DB_URL is None or len(DB_URL) == 0:
raise Exception("Environment variable DATABASE_URL missed!")
# print(f"DB URL: {DB_URL}")
db = EnergyDB(DB_URL)
app = FastAPI(debug=True)
class InfoData(BaseModel):
info: Dict
class EnergyValue(BaseModel):
timestamp: datetime.datetime
value: float
class ChannelData(BaseModel):
channel_id: Optional[int]
channel: Optional[str]
data: List[EnergyValue]
msg: Optional[str]
class BulkData(BaseModel):
bulk: List[ChannelData]
msg: Optional[str]
class BulkDataRequest(BaseModel):
channel_ids: List[int]
fromTime: datetime.datetime
tillTime: datetime.datetime = datetime.datetime.now()
class ChannelInfo(BaseModel):
name: str
class Channels(BaseModel):
channels: List[ChannelInfo]
def _restApiPath(subPath : str) -> str:
"""Return the full REST API path
Arguments:
subPath {str} -- The sub-URL
Returns:
str -- The full REST API URL
"""
REST_API_ROOT = f"/energy/v{API_VERSION_MAJOR}"
if (subPath.startswith("/")):
return REST_API_ROOT + subPath
else:
return REST_API_ROOT + "/" + subPath
@app.get(_restApiPath("/version"))
async def restApiGetVersion() -> dict:
"""Return the version information
Returns:
dict -- The version information of energyDB and SQLAlchemy
"""
return {
"version": f"{API_VERSION_MAJOR}.{API_VERSION_MINOR}",
}
@app.get(_restApiPath("/info"), response_model = InfoData)
async def apiGetInfo():
info = {}
info["db_url"] = db.url()
e = sqlalchemy.Table("energy", db.metadata(), autoload=True, autoload_with=db.engine())
info["db_fields"] = [c.name for c in e.columns]
result = db.execute("select * from energy")
info["db_dir"] = str(dir(db))
info["result_type"] = str(type(result))
info["result_dir"] = str(dir(result))
info["result_count"] = str(result.rowcount)
info["db_contents"] = [str(row) for row in result.fetchall()]
result = db.execute("SELECT name FROM sqlite_master WHERE type='table'")
info["tables"] = [str(row) for row in result.fetchall()]
result = db.execute("SELECT sql FROM sqlite_master WHERE type='table'")
info["energy.sql"] = str(result.fetchone()[0]).replace("\n", "").replace("\t","")
result = db.execute("SELECT COUNT(*) FROM energy")
info["rows"] = str(result.fetchone())
# info["rows"] = [str(row) for row in result.fetchall()]
info["tables"] = [t.name for t in db.metadata().sorted_tables]
for t in db.metadata().sorted_tables:
# info[f"columns_{t}"] = str(dir(t))
info[f"columns_{t}"] = t.schema
return {
"info": info
}
@app.get(_restApiPath("/bulkData"), response_model = BulkData)
async def getBulkEnergyData(bulkDataRequest: BulkDataRequest):
bulkData = []
try:
chData = db.getChannelData(bulkDataRequest.channel_ids, bulkDataRequest.fromTime, bulkDataRequest.tillTime)
for chId in chData:
bulkData.append({
"channel_id": chId,
"channel": None,
"data": chData[chId],
"msg": str(chData),
})
except Exception as e:
raise HTTPException(
status_code=404,
detail=f"Database error: {type(e)} - {str(e)}"
)
return {
"bulk": bulkData,
"msg": str(chData.keys())
}
@app.put(_restApiPath("/bulkData"))
async def putBulkEnergyData(bulkData: BulkData):
try:
valuesToInsert = []
for channelData in bulkData.bulk:
if channelData.channel_id is None:
channel_id = db.getChannelId(channelData.channel)
else:
channel_id = channelData.channel_id
values = []
for v in channelData.data:
values.append({"timestamp": v.timestamp, "value": v.value})
db.addChannelData(channel_id, values)
return
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal error: \"{str(e)}\"")
@app.put(_restApiPath("/channels"))
async def putChannels(channel_info: Channels):
result = "ok"
query = "???"
rows_before = {}
rows_after = {}
try:
r = db.getChannels()
rows_before = r["channels"]
channelNames = [c.name for c in channel_info.channels]
res = db.addChannels(channelNames)
result = res["result"]
query = res["query"]
r = db.getChannels()
rows_after = r["channels"]
except Exception as e:
result = f"Exception \"{str(e)}\""
return {
"result": result,
"channels": str(channel_info.channels),
"channelNames": channelNames,
"query": query,
"rows_before": rows_before,
"rows_after": rows_after,
}
@app.get(_restApiPath("/channels"))
async def getChannels() -> dict:
"""Return a list of all channels
Raises:
HTTPException: with status 500 on a database error
Returns:
dict -- the list of all channels
"""
try:
result = db.getChannels()
return {"channels": [ch["name"] for ch in result["channels"]]}
except Exception as e:
raise HTTPException(status_code = 500, detail = str(e))
@app.get(_restApiPath("/channels/{channel}/id"))
async def getChannelId(channel: str): # -> dict:
try:
chId = db.getChannelId(channel)
return {
"channel": channel,
"id": chId,
# "query": str(r["query"])
}
except Exception as e:
raise HTTPException(status_code=500,detail=f"Database error: {type(e)} - {str(e)}")
# @app.get("/energy/{channel_id}", response_model = ChannelData)
# async def getChannelData(channel_id: int):
# try:
# query = sqlalchemy.select([energy.c.timestamp, energy.c.value]).where(energy.c.channel_id == channel_id)
# result = await db.fetch_all(query)
# return {
# "channel_id": channel_id,
# "data": result
# }
# except Exception as ex:
# raise HTTPException(status_code=500, detail=f"Internal error: {type(ex)} - \"{ex}\"")
# @app.put("/energy/{channel_id}") #, response_model = EnergyValue)
# async def putChannelData(channel_id: int, data: EnergyValue):
# query = energy.insert().values(
# timestamp=data.timestamp,
# channel_id=channel_id,
# value=data.value)
# result = await db.execute(query)
# # # return await db.fetch_all(query)
# return {
# "timestamp": datetime.datetime.now(),
# # "channel" : data.channel,
# "value": data.value,
# "msg": str(result)
# }
@app.get(_restApiPath("/{channel_id}/count")) # response_model = ChannelData)
async def getChannelRowCount(channel_id: int):
info = {}
info["columns"] = str( db.table("energy").columns)
# countQuery = sqlalchemy.select([sqlalchemy.func.count()]).select_from(db.tables["energy"])
# info["stmt"] = str(countQuery)
# countResult = db.execute( countQuery )
# info["count"] = countResult.fetchone()[0]
# info["tables"] = [t.name for t in metadata.sorted_tables]
# for t in metadata.tables:
# info[f"columns_{t}"] = str(type(t))
# # info[f"columns_{t}"] = list(sqlalchemy.inspect(t).columns)
return { "info": info }