Compare commits

...
Sign in to create a new pull request.

4 commits

8 changed files with 809 additions and 240 deletions

9
.gitignore vendored
View file

@ -1,4 +1,13 @@
# Ignore the configuration and the database
energyDB.conf energyDB.conf
energyDB.sqlite
# Ignore Python caches and compiles modules
*.pyd *.pyd
__pycache__ __pycache__
# Ignore Pythons venv stuff
bin/
lib/
LICENSE.md
pyvenv.cfg

View file

@ -5,7 +5,8 @@
while true; do while true; do
NOTIFY=$(inotifywait -rq -e modify . | grep '\.py') NOTIFY=$(inotifywait -rq -e modify . | grep '\.py')
if [ _$? == _0 ]; then if [ _$? == _0 ]; then
pytest --capture=no test echo -e "\n\n\n"
pytest --capture=no #test
echo ">>>>>>>> Test finished at: $(date)" echo ">>>>>>>> Test finished at: $(date)"
fi fi
done done

4
pytest.ini Normal file
View file

@ -0,0 +1,4 @@
[pytest]
testpaths =
test
log_cli = True

139
srv/db.py Normal file
View file

@ -0,0 +1,139 @@
import datetime
import os
from typing import List, Dict
from databases import Database
from sqlalchemy import (Column, DateTime, Integer, Float, String,
MetaData, Table, UniqueConstraint, create_engine)
import sqlalchemy
from sqlalchemy.sql import func
class EnergyDB():
def __init__(self, db_url : str):
self._engine = create_engine(db_url, connect_args={"check_same_thread": False})
# self.database = Database(db_url)
self._conn = self._engine.connect()
self._metadata = MetaData(self._engine)
self._tables = {
"energy": Table(
"energy", self._metadata,
Column("timestamp", DateTime, primary_key=True),
Column("channel_id", Integer(), nullable=False),
Column("value", Float),
UniqueConstraint("timestamp"),
),
"channels": Table(
"channels", self._metadata,
# Column("id", Integer(), autoincrement = True),
Column("id", Integer()), #, primary_key = True),
Column("name", String(), primary_key = True),
),
}
self._metadata.create_all(self._engine)
def metadata(self):
return self._metadata
def url(self) -> str:
return str(self._engine.url)
def engine(self):
return self._engine
def table(self, name : str) -> sqlalchemy.Table:
return self._tables[name]
def execute(self, cmd, values = None, **args):
if values is None:
return self._conn.execute(cmd, args)
else:
return self._conn.execute(cmd, values, args)
return self._conn.execute(cmd, args)
def addChannels(self, channelNames: List[str]):
result = []
try:
# query = self.table("channels").insert()
query = sqlalchemy.sql.text("INSERT INTO channels (name) VALUES(:name)")
# nameDicts = [ {"name": name } for name in channelNames]
# result = self.execute(query, name=channelNames)
for n in channelNames:
result.append({"n": n})
self.execute(query, name=n)
except Exception as e:
raise Exception(f"Database error in addChannels(): {type(e)} - {str(e)}")
return {
"query": str(query),
"result": result,
# "result": [str(r) for r in result.fetchall()]
}
def getChannels(self) -> dict:
try:
table_channels = self.table("channels")
query = sqlalchemy.select([table_channels.c.name]).select_from(table_channels)
channels = [dict(r.items()) for r in self.execute(query).fetchall()]
except Exception as e:
raise Exception(f"Database error in getChannels(): {type(e)} - {str(e)}")
return {
"channels": channels,
"query": str(query),
}
def getChannelId(self, channelName : str) -> int:
try:
query = sqlalchemy.sql.text(f"SELECT _ROWID_, name FROM channels WHERE name == :name")
chId = self.execute(query, name=channelName).scalar()
except Exception as e:
raise Exception(f"Database error in getChannelId(): {type(e)} - {str(e)}")
if chId is None:
raise Exception(f"Database error in getChannelId(): channel '{channelName}' not found")
return chId
def getChannelData(self, channelIds : List[int], fromTime : datetime.datetime, tillTime : datetime.datetime) -> Dict[int, list]:
try:
chData = {}
query = sqlalchemy.sql.text(
"""SELECT timestamp, value FROM energy
WHERE channel_id == :channel_id
AND timestamp >= :fromTime
AND timestamp <= :tillTime
ORDER BY timestamp"""
)
for ch in channelIds:
result = self.execute(query, channel_id = ch, fromTime = fromTime, tillTime = tillTime)
data = [{"timestamp": datetime.datetime.fromisoformat(row[0]), "value": row[1]} for row in result.fetchall() ]
chData[ch] = data
return chData
except Exception as e:
raise Exception(f"Database error in getChannelData(): {type(e)} - {str(e)}")
def addChannelData(self, channel_id : int, data : List[Dict[datetime.datetime, float]]):
try:
# query = sqlalchemy.sql.text(
# "INSERT INTO energy (channel_id, timestamp, value) VALUES "
# )
queryStr = "INSERT INTO energy (channel_id, timestamp, value) VALUES "
valueStr = ""
for d in data:
timestamp = d["timestamp"]
value = d["value"]
# self.execute(
# # query,
# # channel_id=channel_id,
# # timestamp = d["timestamp"],
# # value=d["value"]
# sqlalchemy.sql.text(
# f"""INSERT INTO energy (channel_id, timestamp, value)
# VALUES ('{channel_id}', '{timestamp}', '{value}')"""
# )
# )
if valueStr != "":
valueStr += ",\n"
valueStr += f"({channel_id}, '{timestamp}', {value})"
self.execute(sqlalchemy.sql.text(queryStr + valueStr + ";"))
except Exception as e:
raise Exception(f"Database error in addChannelData(): {type(e)} - {str(e)}")

View file

@ -1,42 +1,34 @@
from typing import Optional, List from typing import Optional, List, Dict
from fastapi import FastAPI, Header, HTTPException from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
import datetime import datetime
import databases
import os import os
import sqlalchemy
from sqlite3 import OperationalError from .db import EnergyDB
import logging
API_VERSION_MAJOR = 1 API_VERSION_MAJOR = 1
API_VERSION_MINOR = 0 API_VERSION_MINOR = 0
REST_API_ROOT = f"/energy/v{API_VERSION_MAJOR}/" DB_URL = os.getenv("DATABASE_URL") #, default="sqlite://")
if DB_URL is None or len(DB_URL) == 0:
raise Exception("Environment variable DATABASE_URL missed!")
# print(f"DB URL: {DB_URL}")
db = EnergyDB(DB_URL)
metadata = sqlalchemy.MetaData() app = FastAPI(debug=True)
energy = sqlalchemy.Table(
"energy",
metadata,
sqlalchemy.Column("timestamp", sqlalchemy.DateTime, primary_key=True),
sqlalchemy.Column("channel_id", sqlalchemy.Integer(), nullable=False),
sqlalchemy.Column("value", sqlalchemy.Float),
sqlalchemy.UniqueConstraint("timestamp"),
)
# DATABASE_URL = "sqlite:///./energyDB.sqlite" class InfoData(BaseModel):
DATABASE_URL = os.getenv("DATABASE_URL", default="sqlite://") info: Dict
print(f"DB URL: {DATABASE_URL}")
db = databases.Database(DATABASE_URL)
engine = sqlalchemy.create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False})
metadata.create_all(engine)
class EnergyValue(BaseModel): class EnergyValue(BaseModel):
timestamp: datetime.datetime timestamp: datetime.datetime
value: float value: float
class ChannelData(BaseModel): class ChannelData(BaseModel):
channel_id: int channel_id: Optional[int]
channel: Optional[str]
data: List[EnergyValue] data: List[EnergyValue]
msg: Optional[str] msg: Optional[str]
@ -49,80 +41,206 @@ class BulkDataRequest(BaseModel):
fromTime: datetime.datetime fromTime: datetime.datetime
tillTime: datetime.datetime = datetime.datetime.now() tillTime: datetime.datetime = datetime.datetime.now()
app = FastAPI(debug=True) class ChannelInfo(BaseModel):
name: str
@app.on_event("startup") class Channels(BaseModel):
async def startup(): channels: List[ChannelInfo]
"""On startup connect to the database"""
await db.connect()
@app.on_event("shutdown") def _restApiPath(subPath : str) -> str:
async def shutdown(): """Return the full REST API path
"""On shutdow diconnect from the database"""
await db.disconnect()
# def _raiseHttpExceptionOnWrongToken(token : str): Arguments:
# if token != fake_secret_token: subPath {str} -- The sub-URL
# raise HTTPException(status_code=400, detail="Invalid X-Token header")
@app.put(REST_API_ROOT + "bulkData") Returns:
async def putBulkEnergyData(bulkData: BulkData): str -- The full REST API URL
valuesToInsert = [] """
for channelData in bulkData.bulk: REST_API_ROOT = f"/energy/v{API_VERSION_MAJOR}"
for measurement in channelData.data: if (subPath.startswith("/")):
valuesToInsert.append({ return REST_API_ROOT + subPath
"channel_id": channelData.channel_id, else:
"timestamp": measurement.timestamp, return REST_API_ROOT + "/" + subPath
"value": measurement.value})
query = energy.insert().values(valuesToInsert) @app.get(_restApiPath("/version"))
result = await db.execute(query) async def restApiGetVersion() -> dict:
"""Return the version information
Returns:
dict -- The version information of energyDB and SQLAlchemy
"""
return { return {
"valuesToInsert": valuesToInsert "version": f"{API_VERSION_MAJOR}.{API_VERSION_MINOR}",
} }
@app.get(REST_API_ROOT + "bulkData", response_model = BulkData) @app.get(_restApiPath("/info"), response_model = InfoData)
async def apiGetInfo():
info = {}
info["db_url"] = db.url()
e = sqlalchemy.Table("energy", db.metadata(), autoload=True, autoload_with=db.engine())
info["db_fields"] = [c.name for c in e.columns]
result = db.execute("select * from energy")
info["db_dir"] = str(dir(db))
info["result_type"] = str(type(result))
info["result_dir"] = str(dir(result))
info["result_count"] = str(result.rowcount)
info["db_contents"] = [str(row) for row in result.fetchall()]
result = db.execute("SELECT name FROM sqlite_master WHERE type='table'")
info["tables"] = [str(row) for row in result.fetchall()]
result = db.execute("SELECT sql FROM sqlite_master WHERE type='table'")
info["energy.sql"] = str(result.fetchone()[0]).replace("\n", "").replace("\t","")
result = db.execute("SELECT COUNT(*) FROM energy")
info["rows"] = str(result.fetchone())
# info["rows"] = [str(row) for row in result.fetchall()]
info["tables"] = [t.name for t in db.metadata().sorted_tables]
for t in db.metadata().sorted_tables:
# info[f"columns_{t}"] = str(dir(t))
info[f"columns_{t}"] = t.schema
return {
"info": info
}
@app.get(_restApiPath("/bulkData"), response_model = BulkData)
async def getBulkEnergyData(bulkDataRequest: BulkDataRequest): async def getBulkEnergyData(bulkDataRequest: BulkDataRequest):
bulkData = [] bulkData = []
for ch in bulkDataRequest.channel_ids:
query = sqlalchemy.select([energy.c.timestamp, energy.c.value]) \
.where(energy.c.channel_id == ch) \
.where(energy.c.timestamp >= bulkDataRequest.fromTime) \
.where(energy.c.timestamp <= bulkDataRequest.tillTime)
try: try:
data = await db.fetch_all(query) chData = db.getChannelData(bulkDataRequest.channel_ids, bulkDataRequest.fromTime, bulkDataRequest.tillTime)
bulkData.append({"channel_id": ch, "data": data}) for chId in chData:
except OperationalError as e: bulkData.append({
raise HTTPException(status_code=500, detail="Database error") "channel_id": chId,
"channel": None,
"data": chData[chId],
"msg": str(chData),
})
except Exception as e:
raise HTTPException(
status_code=404,
detail=f"Database error: {type(e)} - {str(e)}"
)
return { return {
"bulk": bulkData, "bulk": bulkData,
"msg": __name__ + " - " +str(query.compile()) "msg": str(chData.keys())
} }
@app.put(_restApiPath("/bulkData"))
@app.get("/energy/{channel_id}", response_model = ChannelData) async def putBulkEnergyData(bulkData: BulkData):
async def getChannelData(channel_id: int):
try: try:
query = sqlalchemy.select([energy.c.timestamp, energy.c.value]).where(energy.c.channel_id == channel_id) valuesToInsert = []
result = await db.fetch_all(query) for channelData in bulkData.bulk:
return { if channelData.channel_id is None:
"channel_id": channel_id, channel_id = db.getChannelId(channelData.channel)
"data": result else:
} channel_id = channelData.channel_id
except Exception as ex: values = []
raise HTTPException(status_code=500, detail=f"Internal error: {type(ex)} - \"{ex}\"") for v in channelData.data:
values.append({"timestamp": v.timestamp, "value": v.value})
db.addChannelData(channel_id, values)
return
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal error: \"{str(e)}\"")
@app.put("/energy/{channel_id}") #, response_model = EnergyValue) @app.put(_restApiPath("/channels"))
async def putChannelData(channel_id: int, data: EnergyValue): async def putChannels(channel_info: Channels):
query = energy.insert().values( result = "ok"
timestamp=data.timestamp, query = "???"
channel_id=channel_id, rows_before = {}
value=data.value) rows_after = {}
result = await db.execute(query)
# # return await db.fetch_all(query) try:
r = db.getChannels()
rows_before = r["channels"]
channelNames = [c.name for c in channel_info.channels]
res = db.addChannels(channelNames)
result = res["result"]
query = res["query"]
r = db.getChannels()
rows_after = r["channels"]
except Exception as e:
result = f"Exception \"{str(e)}\""
return { return {
"timestamp": datetime.datetime.now(), "result": result,
# "channel" : data.channel, "channels": str(channel_info.channels),
"value": data.value, "channelNames": channelNames,
"msg": str(result) "query": query,
"rows_before": rows_before,
"rows_after": rows_after,
} }
@app.get(_restApiPath("/channels"))
async def getChannels() -> dict:
"""Return a list of all channels
Raises:
HTTPException: with status 500 on a database error
Returns:
dict -- the list of all channels
"""
try:
result = db.getChannels()
return {"channels": [ch["name"] for ch in result["channels"]]}
except Exception as e:
raise HTTPException(status_code = 500, detail = str(e))
@app.get(_restApiPath("/channels/{channel}/id"))
async def getChannelId(channel: str): # -> dict:
try:
chId = db.getChannelId(channel)
return {
"channel": channel,
"id": chId,
# "query": str(r["query"])
}
except Exception as e:
raise HTTPException(status_code=500,detail=f"Database error: {type(e)} - {str(e)}")
# @app.get("/energy/{channel_id}", response_model = ChannelData)
# async def getChannelData(channel_id: int):
# try:
# query = sqlalchemy.select([energy.c.timestamp, energy.c.value]).where(energy.c.channel_id == channel_id)
# result = await db.fetch_all(query)
# return {
# "channel_id": channel_id,
# "data": result
# }
# except Exception as ex:
# raise HTTPException(status_code=500, detail=f"Internal error: {type(ex)} - \"{ex}\"")
# @app.put("/energy/{channel_id}") #, response_model = EnergyValue)
# async def putChannelData(channel_id: int, data: EnergyValue):
# query = energy.insert().values(
# timestamp=data.timestamp,
# channel_id=channel_id,
# value=data.value)
# result = await db.execute(query)
# # # return await db.fetch_all(query)
# return {
# "timestamp": datetime.datetime.now(),
# # "channel" : data.channel,
# "value": data.value,
# "msg": str(result)
# }
@app.get(_restApiPath("/{channel_id}/count")) # response_model = ChannelData)
async def getChannelRowCount(channel_id: int):
info = {}
info["columns"] = str( db.table("energy").columns)
# countQuery = sqlalchemy.select([sqlalchemy.func.count()]).select_from(db.tables["energy"])
# info["stmt"] = str(countQuery)
# countResult = db.execute( countQuery )
# info["count"] = countResult.fetchone()[0]
# info["tables"] = [t.name for t in metadata.sorted_tables]
# for t in metadata.tables:
# info[f"columns_{t}"] = str(type(t))
# # info[f"columns_{t}"] = list(sqlalchemy.inspect(t).columns)
return { "info": info }

View file

@ -1,151 +0,0 @@
from fastapi.testclient import TestClient
import pytest
from datetime import datetime
import json
import os
import urllib.parse
#TODO Use in-memory DB to test the case that there is no table
#TODO Add helper function to fill the in-memory DB before test
os.environ["DATABASE_URL"] = "sqlite:///./energyDB.sqlite"
from srv import energyDB
class Test_energyDb:
restApiRoot = "/energy/v1/"
bulkTestData = [
{
"channel_id": 1,
"data": (
{ "timestamp": "2020-12-11T12:00:22", "value": 1100.1 },
{ "timestamp": "2020-12-11T12:10:15", "value": 1109.2 },
{ "timestamp": "2020-12-11T12:20:13", "value": 1119.3 },
{ "timestamp": "2020-12-11T12:30:21", "value": 1131.4 },
{ "timestamp": "2020-12-11T12:40:08", "value": 1143.5 },
{ "timestamp": "2020-12-11T12:50:13", "value": 1152.6 },
{ "timestamp": "2020-12-11T13:00:11", "value": 1160.7 },
{ "timestamp": "2020-12-11T13:10:09", "value": 1169.8 },
{ "timestamp": "2020-12-11T13:20:10", "value": 1181.9 },
{ "timestamp": "2020-12-11T13:30:17", "value": 1190.0 },
)
},
{
"channel_id": 2,
"data": [
{ "timestamp": "2020-12-11T12:01:15", "value": 1200.1 },
{ "timestamp": "2020-12-11T12:21:28", "value": 1219.2 },
{ "timestamp": "2020-12-11T12:41:21", "value": 1243.3 },
{ "timestamp": "2020-12-11T13:01:16", "value": 1260.4 },
{ "timestamp": "2020-12-11T13:21:18", "value": 1281.5 },
]
}
]
client = TestClient(energyDB)
# def setup(self):
# self.client = TestClient(energyDB)
# def teardown(self):
# self.client = None
def _test_bulkData_put(self):
# response = self.client.put("/energy/bulkData", json=self.bulkTestData);
response = self.client.put(
self.restApiRoot + "bulkData",
json={"bulk": self.bulkTestData})
# print(f"dir(response): {dir(response)}")
# print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# requestJson = json.loads(response.request.body)
# print(f"response.request.body: {json.dumps(requestJson, indent=2)}")
print("---- response")
print(f"response.reason: {response.reason}")
responseJson = json.loads(response.text)
print(f"response.text: {json.dumps(responseJson, indent=2)}")
# print(f"response.text: {json.dumps(response.text, indent=2)}")
assert response.status_code == 200
def test_bulkData_get(self):
print(f"DB_URL: {os.getenv('DATABASE_URL')}")
# response = self.client.put("/energy/bulkData", json=self.bulkTestData);
fromTimestamp = datetime.fromisoformat("2020-12-11T12:30:00")
tillTimestamp = datetime.fromisoformat("2020-12-11T12:30:59")
response = self.client.get(
self.restApiRoot + "bulkData",
json = {
"channel_ids": [1, 2, 3],
"fromTime": fromTimestamp.isoformat(),
# "tillTime": tillTimestamp.isoformat()
})
# print(f"dir(response): {dir(response)}")
# print(f"dir(response.request): {dir(response.request)}")
print("---- request")
print(f"response.request.method: {response.request.method}")
print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
print(f"response.request.headers: {response.request.headers}")
print(f"dir(response.request): {dir(response.request)}")
print(f"response.request.body: {response.request.body}")
# requestJson = json.loads(response.request.body)
# print(f"response.request.body: {json.dumps(requestJson, indent=2)}")
print("---- response")
print(f"response.reason: {response.reason}")
responseJson = json.loads(response.text)
print(f"response.text: {json.dumps(responseJson, indent=2)}")
# print(f"response.text: {json.dumps(response.text, indent=2)}")
assert response.status_code == 200
@pytest.mark.skip(reason="Ignore me temporarily")
def test_insert_energy(self):
energyData = {
"timestamp": datetime.now().isoformat(),
"value": 234.5,
}
print(f"energyData: {energyData}")
# response = self.client.put("/energies/1", json=energyData) #, headers={"X-Token": "coneofsilence"})
response = self.client.put(
self.restApiRoot + "2",
# params=energyData,
json=energyData) #, headers={"X-Token": "coneofsilence"})
# print(f"dir(response): {dir(response)}")
# print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# print(f"response.request.body: {json.loads(response.request.body)}")
# print("---- response")
# print(f"response.reason: {response.reason}")
# print(f"response.text: {json.loads(response.text)}")
# print(f"request.header: \"{response.request.header}\"")
# print(f"request.body: \"{response.request.body}\"")
assert response.status_code == 200
# assert response.json()["msg"] == ""
@pytest.mark.skip(reason="Ignore me temporarily")
def test_get_energy(self):
response = self.client.get(self.restApiRoot + "1")
# print(f"dir(response): {dir(response)}")
# print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# print("---- response")
# print(f"response.reason: {response.reason}")
responseJson = json.loads(response.text)
print(f"response.text: {json.dumps(responseJson, indent=2)}")
data = response.json()
# print(f"data: {type(data)}")
# for k,v in data.items():
# print(f"key: {k} -> value: {v}")
print(f"data of channel: {data['channel_id']}")
for r in data["data"]:
print(f"r: {r}")
assert response.status_code == 200
# assert response.json()["msg"] == ""

86
test/test_db.py Normal file
View file

@ -0,0 +1,86 @@
import pytest
from datetime import datetime
import os
# os.environ["DATABASE_URL"] = "sqlite:///./testDB.sqlite"
os.environ["DATABASE_URL"] = "sqlite://"
from srv import db
class Test_db:
_DB_URL = "sqlite://"
def setup(self):
self._db = db.EnergyDB(self._DB_URL)
def teardown(self):
pass
# --- helper functions
def _clearTable(self, tableName : str):
self._db.execute(f"DELETE FROM {tableName}")
# --- test functions
def test_url(self):
assert self._db.url() == self._DB_URL
def test_table(self):
assert self._db.table("energy") == self._db._tables["energy"]
assert self._db.table("channels") == self._db._tables["channels"]
def _test_table_unknownTable(self):
pass
def test_getChannels_emptyDatabase(self):
channels = self._db.getChannels()
assert channels["channels"] == []
def test_addChannels(self):
self._db.addChannels(["abc", "def", "ghi"])
result = self._db.getChannels()
assert type(result) == dict
channels = result["channels"]
assert channels[0] == {"name": "abc"}
assert channels[1] == {"name": "def"}
assert channels[2] == {"name": "ghi"}
def test_getChannelId(self):
self._db.addChannels(["abc", "def", "ghi"])
assert self._db.getChannelId("abc") == 1
assert self._db.getChannelId("def") == 2
assert self._db.getChannelId("ghi") == 3
def test_getChannelId_ExceptionIfChannelIsUnknown(self):
with pytest.raises(Exception):
assert self._db.getChannelId("jkl") == 0
def test_getChannelData_EmptyDatabase(self):
fromTime = datetime.now()
tillTime = datetime.now()
result = self._db.getChannelData([1], fromTime, tillTime)
assert list(result.keys()) == [1]
assert result[1] == []
def test_addChannelData(self):
data = [
{"timestamp": datetime.fromisoformat("2020-12-12T09:00:01"), "value": 900.01},
{"timestamp": datetime.fromisoformat("2020-12-12T09:05:02"), "value": 905.02},
{"timestamp": datetime.fromisoformat("2020-12-12T09:10:03"), "value": 910.03},
]
self._db.addChannelData(8, data)
result = self._db.getChannelData(
[8],
datetime.fromisoformat("2020-12-12T09:00:00"),
datetime.now()
)
assert isinstance(result, dict)
assert len(result) == 1
assert 8 in result
channelData = result[8]
assert len(channelData) == 3
assert channelData[0] == data[0]
assert channelData[1] == data[1]
assert channelData[2] == data[2]

363
test/test_energyDB.py Normal file
View file

@ -0,0 +1,363 @@
from fastapi.testclient import TestClient
import pytest
from datetime import datetime
import json
import os
import urllib.parse
#TODO Use in-memory DB to test the case that there is no table
#TODO Add helper function to fill the in-memory DB before test
# os.environ["DATABASE_URL"] = "sqlite:///./testDB.sqlite"
os.environ["DATABASE_URL"] = "sqlite://"
from srv import energyDB
class Test_energyDB:
restApiRoot = "/energy/v1"
testData = {
"channels": (
{"name": "dc_power1"},
{"name": "daily_yield"},
{"name": "total_yield"},
),
"bulkdata": (
{
"channel_id": 1,
"data": (
{ "timestamp": "2020-12-11T12:00:22", "value": 1100.1 },
{ "timestamp": "2020-12-11T12:10:15", "value": 1109.2 },
{ "timestamp": "2020-12-11T12:20:13", "value": 1119.3 },
{ "timestamp": "2020-12-11T12:30:21", "value": 1131.4 },
{ "timestamp": "2020-12-11T12:40:08", "value": 1143.5 },
{ "timestamp": "2020-12-11T12:50:13", "value": 1152.6 },
{ "timestamp": "2020-12-11T13:00:11", "value": 1160.7 },
{ "timestamp": "2020-12-11T13:10:09", "value": 1169.8 },
{ "timestamp": "2020-12-11T13:20:10", "value": 1181.9 },
{ "timestamp": "2020-12-11T13:30:17", "value": 1190.0 },
)
},
{
"channel_id": 2,
"data": [
{ "timestamp": "2020-12-11T12:01:15", "value": 1200.1 },
{ "timestamp": "2020-12-11T12:21:28", "value": 1219.2 },
{ "timestamp": "2020-12-11T12:41:21", "value": 1243.3 },
{ "timestamp": "2020-12-11T13:01:16", "value": 1260.4 },
{ "timestamp": "2020-12-11T13:21:18", "value": 1281.5 },
]
}
)
}
# client = TestClient(energyDB)
def setup(self):
self.client = TestClient(energyDB)
def teardown(self):
self.client = None
# --- helper functions
def _apiUrl(self, sub_url : str):
if sub_url.startswith("/"):
return self.restApiRoot + sub_url
else:
return self.restApiRoot + "/" + sub_url
def _fillDatabase(self):
response = self.client.put(
self._apiUrl("/channels"),
json = {"channels": self.testData["channels"]}
)
# self._dumpRequestAndResponse("_fillDatabase(channels)", response)
assert response.status_code == 200
response = self.client.put(
self._apiUrl("/bulkData"),
json = {"bulk": self.testData["bulkdata"]}
)
# self._dumpRequestAndResponse("_fillDatabase(data)", response)
assert response.status_code == 200
def _dumpRequestAndResponse(self, context : str, response):
print("\n")
print(f"---- request ({context})")
print(f"response.request.method: {response.request.method}")
print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
print(f"response.request.headers: {response.request.headers}")
try:
requestJson = json.loads(response.request.body)
print(f"response.request.body(json): {json.dumps(requestJson, indent=2)}")
except:
print(f"response.request.body(plain): {response.request.body}")
print(f"---- response ({context})")
print(f"response.status_code: {response.status_code}")
print(f"response.reason: {response.reason}")
try:
responseJson = json.loads(response.text)
print(f"response.text(json): {json.dumps(responseJson, indent=2)}")
except:
print(f"response.text(plain): {response.text}")
# --- test functions
def test_invalidRoute(self):
response = self.client.get("/")
assert response.status_code == 404
def test_getVersion(self):
response = self.client.get(self._apiUrl("/version"))
# self._dumpRequestAndResponse("test_getVersion", response)
assert response.status_code == 200
assert response.json()["version"] == "1.0"
def _test_getInfo(self):
response = self.client.get( self._apiUrl("/info" ))
# self._dumpRequestAndResponse("test_getInfo", response)
assert response.status_code == 200
def test_getChannelsOfEmptyTable(self):
response = self.client.get(self._apiUrl("/channels"))
# self._dumpRequestAndResponse("test_getChannelsOfEmptyTable", response)
assert response.status_code == 200
assert response.json()["channels"] == []
def test_getBulkDataOfEmptyTable(self):
response = self.client.get(
self._apiUrl("/bulkData"),
json = {
"channel_ids": [1],
"fromTime": "0001-01-01T00:00:00"
}
)
# self._dumpRequestAndResponse("test_getBulkDataOfEmptyTable", response)
assert response.status_code == 200
assert "bulk" in response.json()
bulkData = response.json()["bulk"]
assert len(bulkData) == 1
assert "channel_id" in bulkData[0]
assert "data" in bulkData[0]
assert bulkData[0]["channel_id"] == 1
channelData = bulkData[0]["data"]
assert len(channelData) == 0
def test_fillDatabase(self):
self._fillDatabase()
response = self.client.get(self._apiUrl("/1/count"))
# self._dumpRequestAndResponse("test_fillDatabase", response)
assert response.status_code == 200
def test_getChannels(self):
response = self.client.get(self._apiUrl("/channels"))
# self._dumpRequestAndResponse("test_getChannels", response)
assert response.status_code == 200
def test_getChannelId(self):
response = self.client.get(self._apiUrl("/channels/total_yield/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 200
response = self.client.get(self._apiUrl("/channels/dc_power1/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 200
response = self.client.get(self._apiUrl("/channels/daily_yield/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 200
def test_putChannels(self):
response = self.client.get(self._apiUrl("/channels/frequency/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 500
response = self.client.put(
self._apiUrl("/channels"),
json = {"channels": [{"name": "frequency"}]}
)
# self._dumpRequestAndResponse("test_fillDatabase", response)
assert response.status_code == 200
response = self.client.get(self._apiUrl("/channels/frequency/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 200
def test_getBulkData(self):
response = self.client.get(
self._apiUrl("/bulkData"),
json = {
"channel_ids": [1],
"fromTime": "0001-01-01T00:00:00"
}
)
# self._dumpRequestAndResponse("test_getBulkData", response)
assert response.status_code == 200
assert "bulk" in response.json()
bulkData = response.json()["bulk"]
assert len(bulkData) == 1
channelData = bulkData[0]
assert channelData["channel_id"] == 1
referenceData = self.testData["bulkdata"][0]
assert len(channelData["data"]) == len(referenceData["data"])
assert channelData["data"][0] == referenceData["data"][0]
assert channelData["data"][1] == referenceData["data"][1]
assert channelData["data"][2] == referenceData["data"][2]
assert channelData["data"][3] == referenceData["data"][3]
assert channelData["data"][4] == referenceData["data"][4]
assert channelData["data"][5] == referenceData["data"][5]
assert channelData["data"][6] == referenceData["data"][6]
assert channelData["data"][7] == referenceData["data"][7]
assert channelData["data"][8] == referenceData["data"][8]
assert channelData["data"][9] == referenceData["data"][9]
def test_putBulkData(self):
newData = [{
"channel_id": None,
"channel": "total_yield",
"data": [
{ "timestamp": "2020-12-11T12:01:20", "value": 120120.1 },
{ "timestamp": "2020-12-11T12:30:25", "value": 123025.2 },
]
}]
response = self.client.put(
self._apiUrl("/bulkData"),
json = {"bulk": newData}
)
# self._dumpRequestAndResponse("test_putBulkData", response)
assert response.status_code == 200
response = self.client.get(
self._apiUrl("/bulkData"),
json = {
"channel_ids": [3],
"fromTime": "2020-12-11T12:00:00",
"tillTime": "2020-12-11T12:59:59"
}
)
channelData = response.json()["bulk"][0]
assert channelData["channel_id"] == 3
assert len(channelData["data"]) == 2
assert channelData["data"][0] == newData[0]["data"][0]
assert channelData["data"][1] == newData[0]["data"][1]
# def test_getRecordCount(self):
# response = self.client.get(self._apiUrl("/1/count"))
# self._dumpRequestAndResponse("test_getRecordCount", response)
# assert response.status_code == 200
# assert response.json()["info"] == {"columns"}
# def _test_bulkData_put(self):
# # response = self.client.put("/energy/bulkData", json=self.bulkTestData);
# response = self.client.put(
# self.restApiRoot + "bulkData",
# json={"bulk": self.bulkTestData})
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# # print("---- request")
# # print(f"response.request.method: {response.request.method}")
# # print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# # print(f"response.request.headers: {response.request.headers}")
# # requestJson = json.loads(response.request.body)
# # print(f"response.request.body: {json.dumps(requestJson, indent=2)}")
# print("---- response")
# print(f"response.reason: {response.reason}")
# responseJson = json.loads(response.text)
# print(f"response.text: {json.dumps(responseJson, indent=2)}")
# # print(f"response.text: {json.dumps(response.text, indent=2)}")
# assert response.status_code == 200
# def _test_getInfo2(self):
# response = self.client.get( self.restApiRoot + "info" )
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# # print(f"dir(response.request): {dir(response.request)}")
# print("---- response")
# print(f"response.reason: {response.reason}")
# # print(f"response.text: {response.text}")
# responseJson = json.loads(response.text)
# print(f"response.text: {json.dumps(responseJson, indent=2)}")
# assert False #response.status_code == 404
# def _test_bulkData_get(self):
# print(f"DB_URL: {os.getenv('DATABASE_URL')}")
# # response = self.client.put("/energy/bulkData", json=self.bulkTestData);
# fromTimestamp = datetime.fromisoformat("2020-12-11T12:30:00")
# tillTimestamp = datetime.fromisoformat("2020-12-11T12:30:59")
# response = self.client.get(
# self.restApiRoot + "bulkData",
# json = {
# "channel_ids": [1, 2, 3],
# "fromTime": fromTimestamp.isoformat(),
# # "tillTime": tillTimestamp.isoformat()
# })
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# print(f"dir(response.request): {dir(response.request)}")
# print(f"response.request.body: {response.request.body}")
# # requestJson = json.loads(response.request.body)
# # print(f"response.request.body: {json.dumps(requestJson, indent=2)}")
# print("---- response")
# print(f"response.reason: {response.reason}")
# responseJson = json.loads(response.text)
# print(f"response.text: {json.dumps(responseJson, indent=2)}")
# # print(f"response.text: {json.dumps(response.text, indent=2)}")
# assert response.status_code == 200
# @pytest.mark.skip(reason="Ignore me temporarily")
# def test_insert_energy(self):
# energyData = {
# "timestamp": datetime.now().isoformat(),
# "value": 234.5,
# }
# print(f"energyData: {energyData}")
# # response = self.client.put("/energies/1", json=energyData) #, headers={"X-Token": "coneofsilence"})
# response = self.client.put(
# self.restApiRoot + "2",
# # params=energyData,
# json=energyData) #, headers={"X-Token": "coneofsilence"})
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# # print("---- request")
# # print(f"response.request.method: {response.request.method}")
# # print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# # print(f"response.request.headers: {response.request.headers}")
# # print(f"response.request.body: {json.loads(response.request.body)}")
# # print("---- response")
# # print(f"response.reason: {response.reason}")
# # print(f"response.text: {json.loads(response.text)}")
# # print(f"request.header: \"{response.request.header}\"")
# # print(f"request.body: \"{response.request.body}\"")
# assert response.status_code == 200
# # assert response.json()["msg"] == ""
# @pytest.mark.skip(reason="Ignore me temporarily")
# def test_get_energy(self):
# response = self.client.get(self.restApiRoot + "1")
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# # print("---- request")
# # print(f"response.request.method: {response.request.method}")
# # print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# # print(f"response.request.headers: {response.request.headers}")
# # print("---- response")
# # print(f"response.reason: {response.reason}")
# responseJson = json.loads(response.text)
# print(f"response.text: {json.dumps(responseJson, indent=2)}")
# data = response.json()
# # print(f"data: {type(data)}")
# # for k,v in data.items():
# # print(f"key: {k} -> value: {v}")
# print(f"data of channel: {data['channel_id']}")
# for r in data["data"]:
# print(f"r: {r}")
# assert response.status_code == 200
# # assert response.json()["msg"] == ""