energyDB/srv/energyDB.py
2020-12-17 01:01:27 +01:00

117 lines
3.7 KiB
Python

from typing import Optional, List
from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel
import datetime
import databases
import sqlalchemy
from sqlite3 import OperationalError
DATABASE_URL = "sqlite:///./energyDB.sqlite"
# DATABASE_URL = "sqlite://"
db = databases.Database(DATABASE_URL)
metadata = sqlalchemy.MetaData()
energy = sqlalchemy.Table(
"energy",
metadata,
sqlalchemy.Column("timestamp", sqlalchemy.DateTime, primary_key=True),
sqlalchemy.Column("channel_id", sqlalchemy.Integer(), nullable=False),
sqlalchemy.Column("value", sqlalchemy.Float),
sqlalchemy.UniqueConstraint("timestamp"),
)
engine = sqlalchemy.create_engine(
DATABASE_URL, connect_args={"check_same_thread": False}
)
metadata.create_all(engine)
class EnergyValue(BaseModel):
timestamp: datetime.datetime
value: float
class ChannelData(BaseModel):
channel_id: int
data: List[EnergyValue]
msg: Optional[str]
class BulkData(BaseModel):
bulk: List[ChannelData]
msg: Optional[str]
class BulkDataRequest(BaseModel):
channel_ids: List[int]
fromTime: datetime.datetime
tillTime: datetime.datetime = datetime.datetime.now()
app = FastAPI(debug=True)
@app.on_event("startup")
async def startup():
"""On startup connect to the database"""
await db.connect()
@app.on_event("shutdown")
async def shutdown():
"""On shutdow diconnect from the database"""
await db.disconnect()
# def _raiseHttpExceptionOnWrongToken(token : str):
# if token != fake_secret_token:
# raise HTTPException(status_code=400, detail="Invalid X-Token header")
@app.put("/energy/bulkData")
async def putBulkEnergyData(bulkData: BulkData):
valuesToInsert = []
for channelData in bulkData.bulk:
for measurement in channelData.data:
valuesToInsert.append({
"channel_id": channelData.channel_id,
"timestamp": measurement.timestamp,
"value": measurement.value})
query = energy.insert().values(valuesToInsert)
result = await db.execute(query)
return {
"valuesToInsert": valuesToInsert
}
@app.get("/energy/bulkData", response_model = BulkData)
async def getBulkEnergyData(bulkDataRequest: BulkDataRequest):
bulkData = []
for ch in bulkDataRequest.channel_ids:
query = sqlalchemy.select([energy.c.timestamp, energy.c.value]) \
.where(energy.c.channel_id == ch) \
.where(energy.c.timestamp >= bulkDataRequest.fromTime) \
.where(energy.c.timestamp <= bulkDataRequest.tillTime)
data = await db.fetch_all(query)
bulkData.append({"channel_id": ch, "data": data})
return {
"bulk": bulkData,
"msg": __name__ + " - " +str(query.compile())
}
@app.get("/energy/{channel_id}", response_model = ChannelData)
async def getChannelData(channel_id: int):
try:
query = sqlalchemy.select([energy.c.timestamp, energy.c.value]).where(energy.c.channel_id == channel_id)
result = await db.fetch_all(query)
return {
"channel_id": channel_id,
"data": result
}
except Exception as ex:
raise HTTPException(status_code=500, detail=f"Internal error: {type(ex)} - \"{ex}\"")
@app.put("/energy/{channel_id}") #, response_model = EnergyValue)
async def putChannelData(channel_id: int, data: EnergyValue):
query = energy.insert().values(
timestamp=data.timestamp,
channel_id=channel_id,
value=data.value)
result = await db.execute(query)
# # return await db.fetch_all(query)
return {
"timestamp": datetime.datetime.now(),
# "channel" : data.channel,
"value": data.value,
"msg": str(result)
}