from typing import Optional, List from fastapi import FastAPI, Header, HTTPException from pydantic import BaseModel import datetime import databases import sqlalchemy from sqlite3 import OperationalError API_VERSION_MAJOR = 1 API_VERSION_MINOR = 0 REST_API_ROOT = f"/energy/v{API_VERSION_MAJOR}/" REST_API_VERSION = REST_API_ROOT + ".0" DATABASE_URL = "sqlite:///./energyDB.sqlite" # DATABASE_URL = "sqlite://" db = databases.Database(DATABASE_URL) metadata = sqlalchemy.MetaData() energy = sqlalchemy.Table( "energy", metadata, sqlalchemy.Column("timestamp", sqlalchemy.DateTime, primary_key=True), sqlalchemy.Column("channel_id", sqlalchemy.Integer(), nullable=False), sqlalchemy.Column("value", sqlalchemy.Float), sqlalchemy.UniqueConstraint("timestamp"), ) engine = sqlalchemy.create_engine( DATABASE_URL, connect_args={"check_same_thread": False} ) metadata.create_all(engine) class EnergyValue(BaseModel): timestamp: datetime.datetime value: float class ChannelData(BaseModel): channel_id: int data: List[EnergyValue] msg: Optional[str] class BulkData(BaseModel): bulk: List[ChannelData] msg: Optional[str] class BulkDataRequest(BaseModel): channel_ids: List[int] fromTime: datetime.datetime tillTime: datetime.datetime = datetime.datetime.now() app = FastAPI(debug=True) @app.on_event("startup") async def startup(): """On startup connect to the database""" await db.connect() @app.on_event("shutdown") async def shutdown(): """On shutdow diconnect from the database""" await db.disconnect() # def _raiseHttpExceptionOnWrongToken(token : str): # if token != fake_secret_token: # raise HTTPException(status_code=400, detail="Invalid X-Token header") @app.put(REST_API_ROOT + "bulkData") async def putBulkEnergyData(bulkData: BulkData): valuesToInsert = [] for channelData in bulkData.bulk: for measurement in channelData.data: valuesToInsert.append({ "channel_id": channelData.channel_id, "timestamp": measurement.timestamp, "value": measurement.value}) query = energy.insert().values(valuesToInsert) result = await db.execute(query) return { "valuesToInsert": valuesToInsert } @app.get(REST_API_ROOT + "bulkData", response_model = BulkData) async def getBulkEnergyData(bulkDataRequest: BulkDataRequest): bulkData = [] for ch in bulkDataRequest.channel_ids: query = sqlalchemy.select([energy.c.timestamp, energy.c.value]) \ .where(energy.c.channel_id == ch) \ .where(energy.c.timestamp >= bulkDataRequest.fromTime) \ .where(energy.c.timestamp <= bulkDataRequest.tillTime) data = await db.fetch_all(query) bulkData.append({"channel_id": ch, "data": data}) return { "bulk": bulkData, "msg": __name__ + " - " +str(query.compile()) } @app.get("/energy/{channel_id}", response_model = ChannelData) async def getChannelData(channel_id: int): try: query = sqlalchemy.select([energy.c.timestamp, energy.c.value]).where(energy.c.channel_id == channel_id) result = await db.fetch_all(query) return { "channel_id": channel_id, "data": result } except Exception as ex: raise HTTPException(status_code=500, detail=f"Internal error: {type(ex)} - \"{ex}\"") @app.put("/energy/{channel_id}") #, response_model = EnergyValue) async def putChannelData(channel_id: int, data: EnergyValue): query = energy.insert().values( timestamp=data.timestamp, channel_id=channel_id, value=data.value) result = await db.execute(query) # # return await db.fetch_all(query) return { "timestamp": datetime.datetime.now(), # "channel" : data.channel, "value": data.value, "msg": str(result) }