Compare commits

..

No commits in common. "master" and "release" have entirely different histories.

8 changed files with 237 additions and 806 deletions

9
.gitignore vendored
View file

@ -1,13 +1,4 @@
# Ignore the configuration and the database
energyDB.conf energyDB.conf
energyDB.sqlite
# Ignore Python caches and compiles modules
*.pyd *.pyd
__pycache__ __pycache__
# Ignore Pythons venv stuff
bin/
lib/
LICENSE.md
pyvenv.cfg

View file

@ -5,8 +5,7 @@
while true; do while true; do
NOTIFY=$(inotifywait -rq -e modify . | grep '\.py') NOTIFY=$(inotifywait -rq -e modify . | grep '\.py')
if [ _$? == _0 ]; then if [ _$? == _0 ]; then
echo -e "\n\n\n" pytest --capture=no test
pytest --capture=no #test
echo ">>>>>>>> Test finished at: $(date)" echo ">>>>>>>> Test finished at: $(date)"
fi fi
done done

View file

@ -1,4 +0,0 @@
[pytest]
testpaths =
test
log_cli = True

139
srv/db.py
View file

@ -1,139 +0,0 @@
import datetime
import os
from typing import List, Dict
from databases import Database
from sqlalchemy import (Column, DateTime, Integer, Float, String,
MetaData, Table, UniqueConstraint, create_engine)
import sqlalchemy
from sqlalchemy.sql import func
class EnergyDB():
def __init__(self, db_url : str):
self._engine = create_engine(db_url, connect_args={"check_same_thread": False})
# self.database = Database(db_url)
self._conn = self._engine.connect()
self._metadata = MetaData(self._engine)
self._tables = {
"energy": Table(
"energy", self._metadata,
Column("timestamp", DateTime, primary_key=True),
Column("channel_id", Integer(), nullable=False),
Column("value", Float),
UniqueConstraint("timestamp"),
),
"channels": Table(
"channels", self._metadata,
# Column("id", Integer(), autoincrement = True),
Column("id", Integer()), #, primary_key = True),
Column("name", String(), primary_key = True),
),
}
self._metadata.create_all(self._engine)
def metadata(self):
return self._metadata
def url(self) -> str:
return str(self._engine.url)
def engine(self):
return self._engine
def table(self, name : str) -> sqlalchemy.Table:
return self._tables[name]
def execute(self, cmd, values = None, **args):
if values is None:
return self._conn.execute(cmd, args)
else:
return self._conn.execute(cmd, values, args)
return self._conn.execute(cmd, args)
def addChannels(self, channelNames: List[str]):
result = []
try:
# query = self.table("channels").insert()
query = sqlalchemy.sql.text("INSERT INTO channels (name) VALUES(:name)")
# nameDicts = [ {"name": name } for name in channelNames]
# result = self.execute(query, name=channelNames)
for n in channelNames:
result.append({"n": n})
self.execute(query, name=n)
except Exception as e:
raise Exception(f"Database error in addChannels(): {type(e)} - {str(e)}")
return {
"query": str(query),
"result": result,
# "result": [str(r) for r in result.fetchall()]
}
def getChannels(self) -> dict:
try:
table_channels = self.table("channels")
query = sqlalchemy.select([table_channels.c.name]).select_from(table_channels)
channels = [dict(r.items()) for r in self.execute(query).fetchall()]
except Exception as e:
raise Exception(f"Database error in getChannels(): {type(e)} - {str(e)}")
return {
"channels": channels,
"query": str(query),
}
def getChannelId(self, channelName : str) -> int:
try:
query = sqlalchemy.sql.text(f"SELECT _ROWID_, name FROM channels WHERE name == :name")
chId = self.execute(query, name=channelName).scalar()
except Exception as e:
raise Exception(f"Database error in getChannelId(): {type(e)} - {str(e)}")
if chId is None:
raise Exception(f"Database error in getChannelId(): channel '{channelName}' not found")
return chId
def getChannelData(self, channelIds : List[int], fromTime : datetime.datetime, tillTime : datetime.datetime) -> Dict[int, list]:
try:
chData = {}
query = sqlalchemy.sql.text(
"""SELECT timestamp, value FROM energy
WHERE channel_id == :channel_id
AND timestamp >= :fromTime
AND timestamp <= :tillTime
ORDER BY timestamp"""
)
for ch in channelIds:
result = self.execute(query, channel_id = ch, fromTime = fromTime, tillTime = tillTime)
data = [{"timestamp": datetime.datetime.fromisoformat(row[0]), "value": row[1]} for row in result.fetchall() ]
chData[ch] = data
return chData
except Exception as e:
raise Exception(f"Database error in getChannelData(): {type(e)} - {str(e)}")
def addChannelData(self, channel_id : int, data : List[Dict[datetime.datetime, float]]):
try:
# query = sqlalchemy.sql.text(
# "INSERT INTO energy (channel_id, timestamp, value) VALUES "
# )
queryStr = "INSERT INTO energy (channel_id, timestamp, value) VALUES "
valueStr = ""
for d in data:
timestamp = d["timestamp"]
value = d["value"]
# self.execute(
# # query,
# # channel_id=channel_id,
# # timestamp = d["timestamp"],
# # value=d["value"]
# sqlalchemy.sql.text(
# f"""INSERT INTO energy (channel_id, timestamp, value)
# VALUES ('{channel_id}', '{timestamp}', '{value}')"""
# )
# )
if valueStr != "":
valueStr += ",\n"
valueStr += f"({channel_id}, '{timestamp}', {value})"
self.execute(sqlalchemy.sql.text(queryStr + valueStr + ";"))
except Exception as e:
raise Exception(f"Database error in addChannelData(): {type(e)} - {str(e)}")

View file

@ -1,34 +1,42 @@
from typing import Optional, List, Dict from typing import Optional, List
from fastapi import FastAPI, Header, HTTPException from fastapi import FastAPI, Header, HTTPException
from pydantic import BaseModel from pydantic import BaseModel
import datetime import datetime
import databases
import os import os
import sqlalchemy
from .db import EnergyDB from sqlite3 import OperationalError
import logging
API_VERSION_MAJOR = 1 API_VERSION_MAJOR = 1
API_VERSION_MINOR = 0 API_VERSION_MINOR = 0
DB_URL = os.getenv("DATABASE_URL") #, default="sqlite://") REST_API_ROOT = f"/energy/v{API_VERSION_MAJOR}/"
if DB_URL is None or len(DB_URL) == 0:
raise Exception("Environment variable DATABASE_URL missed!")
# print(f"DB URL: {DB_URL}")
db = EnergyDB(DB_URL)
app = FastAPI(debug=True) metadata = sqlalchemy.MetaData()
energy = sqlalchemy.Table(
"energy",
metadata,
sqlalchemy.Column("timestamp", sqlalchemy.DateTime, primary_key=True),
sqlalchemy.Column("channel_id", sqlalchemy.Integer(), nullable=False),
sqlalchemy.Column("value", sqlalchemy.Float),
sqlalchemy.UniqueConstraint("timestamp"),
)
class InfoData(BaseModel): # DATABASE_URL = "sqlite:///./energyDB.sqlite"
info: Dict DATABASE_URL = os.getenv("DATABASE_URL", default="sqlite://")
print(f"DB URL: {DATABASE_URL}")
db = databases.Database(DATABASE_URL)
engine = sqlalchemy.create_engine(
DATABASE_URL,
connect_args={"check_same_thread": False})
metadata.create_all(engine)
class EnergyValue(BaseModel): class EnergyValue(BaseModel):
timestamp: datetime.datetime timestamp: datetime.datetime
value: float value: float
class ChannelData(BaseModel): class ChannelData(BaseModel):
channel_id: Optional[int] channel_id: int
channel: Optional[str]
data: List[EnergyValue] data: List[EnergyValue]
msg: Optional[str] msg: Optional[str]
@ -41,206 +49,80 @@ class BulkDataRequest(BaseModel):
fromTime: datetime.datetime fromTime: datetime.datetime
tillTime: datetime.datetime = datetime.datetime.now() tillTime: datetime.datetime = datetime.datetime.now()
class ChannelInfo(BaseModel): app = FastAPI(debug=True)
name: str
class Channels(BaseModel): @app.on_event("startup")
channels: List[ChannelInfo] async def startup():
"""On startup connect to the database"""
await db.connect()
def _restApiPath(subPath : str) -> str: @app.on_event("shutdown")
"""Return the full REST API path async def shutdown():
"""On shutdow diconnect from the database"""
await db.disconnect()
Arguments: # def _raiseHttpExceptionOnWrongToken(token : str):
subPath {str} -- The sub-URL # if token != fake_secret_token:
# raise HTTPException(status_code=400, detail="Invalid X-Token header")
Returns: @app.put(REST_API_ROOT + "bulkData")
str -- The full REST API URL
"""
REST_API_ROOT = f"/energy/v{API_VERSION_MAJOR}"
if (subPath.startswith("/")):
return REST_API_ROOT + subPath
else:
return REST_API_ROOT + "/" + subPath
@app.get(_restApiPath("/version"))
async def restApiGetVersion() -> dict:
"""Return the version information
Returns:
dict -- The version information of energyDB and SQLAlchemy
"""
return {
"version": f"{API_VERSION_MAJOR}.{API_VERSION_MINOR}",
}
@app.get(_restApiPath("/info"), response_model = InfoData)
async def apiGetInfo():
info = {}
info["db_url"] = db.url()
e = sqlalchemy.Table("energy", db.metadata(), autoload=True, autoload_with=db.engine())
info["db_fields"] = [c.name for c in e.columns]
result = db.execute("select * from energy")
info["db_dir"] = str(dir(db))
info["result_type"] = str(type(result))
info["result_dir"] = str(dir(result))
info["result_count"] = str(result.rowcount)
info["db_contents"] = [str(row) for row in result.fetchall()]
result = db.execute("SELECT name FROM sqlite_master WHERE type='table'")
info["tables"] = [str(row) for row in result.fetchall()]
result = db.execute("SELECT sql FROM sqlite_master WHERE type='table'")
info["energy.sql"] = str(result.fetchone()[0]).replace("\n", "").replace("\t","")
result = db.execute("SELECT COUNT(*) FROM energy")
info["rows"] = str(result.fetchone())
# info["rows"] = [str(row) for row in result.fetchall()]
info["tables"] = [t.name for t in db.metadata().sorted_tables]
for t in db.metadata().sorted_tables:
# info[f"columns_{t}"] = str(dir(t))
info[f"columns_{t}"] = t.schema
return {
"info": info
}
@app.get(_restApiPath("/bulkData"), response_model = BulkData)
async def getBulkEnergyData(bulkDataRequest: BulkDataRequest):
bulkData = []
try:
chData = db.getChannelData(bulkDataRequest.channel_ids, bulkDataRequest.fromTime, bulkDataRequest.tillTime)
for chId in chData:
bulkData.append({
"channel_id": chId,
"channel": None,
"data": chData[chId],
"msg": str(chData),
})
except Exception as e:
raise HTTPException(
status_code=404,
detail=f"Database error: {type(e)} - {str(e)}"
)
return {
"bulk": bulkData,
"msg": str(chData.keys())
}
@app.put(_restApiPath("/bulkData"))
async def putBulkEnergyData(bulkData: BulkData): async def putBulkEnergyData(bulkData: BulkData):
try:
valuesToInsert = [] valuesToInsert = []
for channelData in bulkData.bulk: for channelData in bulkData.bulk:
if channelData.channel_id is None: for measurement in channelData.data:
channel_id = db.getChannelId(channelData.channel) valuesToInsert.append({
else: "channel_id": channelData.channel_id,
channel_id = channelData.channel_id "timestamp": measurement.timestamp,
values = [] "value": measurement.value})
for v in channelData.data: query = energy.insert().values(valuesToInsert)
values.append({"timestamp": v.timestamp, "value": v.value}) result = await db.execute(query)
db.addChannelData(channel_id, values)
return
except Exception as e:
raise HTTPException(status_code=500, detail=f"Internal error: \"{str(e)}\"")
@app.put(_restApiPath("/channels"))
async def putChannels(channel_info: Channels):
result = "ok"
query = "???"
rows_before = {}
rows_after = {}
try:
r = db.getChannels()
rows_before = r["channels"]
channelNames = [c.name for c in channel_info.channels]
res = db.addChannels(channelNames)
result = res["result"]
query = res["query"]
r = db.getChannels()
rows_after = r["channels"]
except Exception as e:
result = f"Exception \"{str(e)}\""
return { return {
"result": result, "valuesToInsert": valuesToInsert
"channels": str(channel_info.channels),
"channelNames": channelNames,
"query": query,
"rows_before": rows_before,
"rows_after": rows_after,
} }
@app.get(_restApiPath("/channels")) @app.get(REST_API_ROOT + "bulkData", response_model = BulkData)
async def getChannels() -> dict: async def getBulkEnergyData(bulkDataRequest: BulkDataRequest):
"""Return a list of all channels bulkData = []
for ch in bulkDataRequest.channel_ids:
Raises: query = sqlalchemy.select([energy.c.timestamp, energy.c.value]) \
HTTPException: with status 500 on a database error .where(energy.c.channel_id == ch) \
.where(energy.c.timestamp >= bulkDataRequest.fromTime) \
Returns: .where(energy.c.timestamp <= bulkDataRequest.tillTime)
dict -- the list of all channels
"""
try: try:
result = db.getChannels() data = await db.fetch_all(query)
return {"channels": [ch["name"] for ch in result["channels"]]} bulkData.append({"channel_id": ch, "data": data})
except Exception as e: except OperationalError as e:
raise HTTPException(status_code = 500, detail = str(e)) raise HTTPException(status_code=500, detail="Database error")
@app.get(_restApiPath("/channels/{channel}/id"))
async def getChannelId(channel: str): # -> dict:
try:
chId = db.getChannelId(channel)
return { return {
"channel": channel, "bulk": bulkData,
"id": chId, "msg": __name__ + " - " +str(query.compile())
# "query": str(r["query"])
} }
except Exception as e:
raise HTTPException(status_code=500,detail=f"Database error: {type(e)} - {str(e)}")
# @app.get("/energy/{channel_id}", response_model = ChannelData) @app.get("/energy/{channel_id}", response_model = ChannelData)
# async def getChannelData(channel_id: int): async def getChannelData(channel_id: int):
# try: try:
# query = sqlalchemy.select([energy.c.timestamp, energy.c.value]).where(energy.c.channel_id == channel_id) query = sqlalchemy.select([energy.c.timestamp, energy.c.value]).where(energy.c.channel_id == channel_id)
# result = await db.fetch_all(query) result = await db.fetch_all(query)
# return { return {
# "channel_id": channel_id, "channel_id": channel_id,
# "data": result "data": result
# } }
# except Exception as ex: except Exception as ex:
# raise HTTPException(status_code=500, detail=f"Internal error: {type(ex)} - \"{ex}\"") raise HTTPException(status_code=500, detail=f"Internal error: {type(ex)} - \"{ex}\"")
# @app.put("/energy/{channel_id}") #, response_model = EnergyValue) @app.put("/energy/{channel_id}") #, response_model = EnergyValue)
# async def putChannelData(channel_id: int, data: EnergyValue): async def putChannelData(channel_id: int, data: EnergyValue):
# query = energy.insert().values( query = energy.insert().values(
# timestamp=data.timestamp, timestamp=data.timestamp,
# channel_id=channel_id, channel_id=channel_id,
# value=data.value) value=data.value)
# result = await db.execute(query) result = await db.execute(query)
# # # return await db.fetch_all(query) # # return await db.fetch_all(query)
# return { return {
# "timestamp": datetime.datetime.now(), "timestamp": datetime.datetime.now(),
# # "channel" : data.channel, # "channel" : data.channel,
# "value": data.value, "value": data.value,
# "msg": str(result) "msg": str(result)
# } }
@app.get(_restApiPath("/{channel_id}/count")) # response_model = ChannelData)
async def getChannelRowCount(channel_id: int):
info = {}
info["columns"] = str( db.table("energy").columns)
# countQuery = sqlalchemy.select([sqlalchemy.func.count()]).select_from(db.tables["energy"])
# info["stmt"] = str(countQuery)
# countResult = db.execute( countQuery )
# info["count"] = countResult.fetchone()[0]
# info["tables"] = [t.name for t in metadata.sorted_tables]
# for t in metadata.tables:
# info[f"columns_{t}"] = str(type(t))
# # info[f"columns_{t}"] = list(sqlalchemy.inspect(t).columns)
return { "info": info }

151
test/test_EnergyDB.py Normal file
View file

@ -0,0 +1,151 @@
from fastapi.testclient import TestClient
import pytest
from datetime import datetime
import json
import os
import urllib.parse
#TODO Use in-memory DB to test the case that there is no table
#TODO Add helper function to fill the in-memory DB before test
os.environ["DATABASE_URL"] = "sqlite:///./energyDB.sqlite"
from srv import energyDB
class Test_energyDb:
restApiRoot = "/energy/v1/"
bulkTestData = [
{
"channel_id": 1,
"data": (
{ "timestamp": "2020-12-11T12:00:22", "value": 1100.1 },
{ "timestamp": "2020-12-11T12:10:15", "value": 1109.2 },
{ "timestamp": "2020-12-11T12:20:13", "value": 1119.3 },
{ "timestamp": "2020-12-11T12:30:21", "value": 1131.4 },
{ "timestamp": "2020-12-11T12:40:08", "value": 1143.5 },
{ "timestamp": "2020-12-11T12:50:13", "value": 1152.6 },
{ "timestamp": "2020-12-11T13:00:11", "value": 1160.7 },
{ "timestamp": "2020-12-11T13:10:09", "value": 1169.8 },
{ "timestamp": "2020-12-11T13:20:10", "value": 1181.9 },
{ "timestamp": "2020-12-11T13:30:17", "value": 1190.0 },
)
},
{
"channel_id": 2,
"data": [
{ "timestamp": "2020-12-11T12:01:15", "value": 1200.1 },
{ "timestamp": "2020-12-11T12:21:28", "value": 1219.2 },
{ "timestamp": "2020-12-11T12:41:21", "value": 1243.3 },
{ "timestamp": "2020-12-11T13:01:16", "value": 1260.4 },
{ "timestamp": "2020-12-11T13:21:18", "value": 1281.5 },
]
}
]
client = TestClient(energyDB)
# def setup(self):
# self.client = TestClient(energyDB)
# def teardown(self):
# self.client = None
def _test_bulkData_put(self):
# response = self.client.put("/energy/bulkData", json=self.bulkTestData);
response = self.client.put(
self.restApiRoot + "bulkData",
json={"bulk": self.bulkTestData})
# print(f"dir(response): {dir(response)}")
# print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# requestJson = json.loads(response.request.body)
# print(f"response.request.body: {json.dumps(requestJson, indent=2)}")
print("---- response")
print(f"response.reason: {response.reason}")
responseJson = json.loads(response.text)
print(f"response.text: {json.dumps(responseJson, indent=2)}")
# print(f"response.text: {json.dumps(response.text, indent=2)}")
assert response.status_code == 200
def test_bulkData_get(self):
print(f"DB_URL: {os.getenv('DATABASE_URL')}")
# response = self.client.put("/energy/bulkData", json=self.bulkTestData);
fromTimestamp = datetime.fromisoformat("2020-12-11T12:30:00")
tillTimestamp = datetime.fromisoformat("2020-12-11T12:30:59")
response = self.client.get(
self.restApiRoot + "bulkData",
json = {
"channel_ids": [1, 2, 3],
"fromTime": fromTimestamp.isoformat(),
# "tillTime": tillTimestamp.isoformat()
})
# print(f"dir(response): {dir(response)}")
# print(f"dir(response.request): {dir(response.request)}")
print("---- request")
print(f"response.request.method: {response.request.method}")
print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
print(f"response.request.headers: {response.request.headers}")
print(f"dir(response.request): {dir(response.request)}")
print(f"response.request.body: {response.request.body}")
# requestJson = json.loads(response.request.body)
# print(f"response.request.body: {json.dumps(requestJson, indent=2)}")
print("---- response")
print(f"response.reason: {response.reason}")
responseJson = json.loads(response.text)
print(f"response.text: {json.dumps(responseJson, indent=2)}")
# print(f"response.text: {json.dumps(response.text, indent=2)}")
assert response.status_code == 200
@pytest.mark.skip(reason="Ignore me temporarily")
def test_insert_energy(self):
energyData = {
"timestamp": datetime.now().isoformat(),
"value": 234.5,
}
print(f"energyData: {energyData}")
# response = self.client.put("/energies/1", json=energyData) #, headers={"X-Token": "coneofsilence"})
response = self.client.put(
self.restApiRoot + "2",
# params=energyData,
json=energyData) #, headers={"X-Token": "coneofsilence"})
# print(f"dir(response): {dir(response)}")
# print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# print(f"response.request.body: {json.loads(response.request.body)}")
# print("---- response")
# print(f"response.reason: {response.reason}")
# print(f"response.text: {json.loads(response.text)}")
# print(f"request.header: \"{response.request.header}\"")
# print(f"request.body: \"{response.request.body}\"")
assert response.status_code == 200
# assert response.json()["msg"] == ""
@pytest.mark.skip(reason="Ignore me temporarily")
def test_get_energy(self):
response = self.client.get(self.restApiRoot + "1")
# print(f"dir(response): {dir(response)}")
# print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# print("---- response")
# print(f"response.reason: {response.reason}")
responseJson = json.loads(response.text)
print(f"response.text: {json.dumps(responseJson, indent=2)}")
data = response.json()
# print(f"data: {type(data)}")
# for k,v in data.items():
# print(f"key: {k} -> value: {v}")
print(f"data of channel: {data['channel_id']}")
for r in data["data"]:
print(f"r: {r}")
assert response.status_code == 200
# assert response.json()["msg"] == ""

View file

@ -1,86 +0,0 @@
import pytest
from datetime import datetime
import os
# os.environ["DATABASE_URL"] = "sqlite:///./testDB.sqlite"
os.environ["DATABASE_URL"] = "sqlite://"
from srv import db
class Test_db:
_DB_URL = "sqlite://"
def setup(self):
self._db = db.EnergyDB(self._DB_URL)
def teardown(self):
pass
# --- helper functions
def _clearTable(self, tableName : str):
self._db.execute(f"DELETE FROM {tableName}")
# --- test functions
def test_url(self):
assert self._db.url() == self._DB_URL
def test_table(self):
assert self._db.table("energy") == self._db._tables["energy"]
assert self._db.table("channels") == self._db._tables["channels"]
def _test_table_unknownTable(self):
pass
def test_getChannels_emptyDatabase(self):
channels = self._db.getChannels()
assert channels["channels"] == []
def test_addChannels(self):
self._db.addChannels(["abc", "def", "ghi"])
result = self._db.getChannels()
assert type(result) == dict
channels = result["channels"]
assert channels[0] == {"name": "abc"}
assert channels[1] == {"name": "def"}
assert channels[2] == {"name": "ghi"}
def test_getChannelId(self):
self._db.addChannels(["abc", "def", "ghi"])
assert self._db.getChannelId("abc") == 1
assert self._db.getChannelId("def") == 2
assert self._db.getChannelId("ghi") == 3
def test_getChannelId_ExceptionIfChannelIsUnknown(self):
with pytest.raises(Exception):
assert self._db.getChannelId("jkl") == 0
def test_getChannelData_EmptyDatabase(self):
fromTime = datetime.now()
tillTime = datetime.now()
result = self._db.getChannelData([1], fromTime, tillTime)
assert list(result.keys()) == [1]
assert result[1] == []
def test_addChannelData(self):
data = [
{"timestamp": datetime.fromisoformat("2020-12-12T09:00:01"), "value": 900.01},
{"timestamp": datetime.fromisoformat("2020-12-12T09:05:02"), "value": 905.02},
{"timestamp": datetime.fromisoformat("2020-12-12T09:10:03"), "value": 910.03},
]
self._db.addChannelData(8, data)
result = self._db.getChannelData(
[8],
datetime.fromisoformat("2020-12-12T09:00:00"),
datetime.now()
)
assert isinstance(result, dict)
assert len(result) == 1
assert 8 in result
channelData = result[8]
assert len(channelData) == 3
assert channelData[0] == data[0]
assert channelData[1] == data[1]
assert channelData[2] == data[2]

View file

@ -1,363 +0,0 @@
from fastapi.testclient import TestClient
import pytest
from datetime import datetime
import json
import os
import urllib.parse
#TODO Use in-memory DB to test the case that there is no table
#TODO Add helper function to fill the in-memory DB before test
# os.environ["DATABASE_URL"] = "sqlite:///./testDB.sqlite"
os.environ["DATABASE_URL"] = "sqlite://"
from srv import energyDB
class Test_energyDB:
restApiRoot = "/energy/v1"
testData = {
"channels": (
{"name": "dc_power1"},
{"name": "daily_yield"},
{"name": "total_yield"},
),
"bulkdata": (
{
"channel_id": 1,
"data": (
{ "timestamp": "2020-12-11T12:00:22", "value": 1100.1 },
{ "timestamp": "2020-12-11T12:10:15", "value": 1109.2 },
{ "timestamp": "2020-12-11T12:20:13", "value": 1119.3 },
{ "timestamp": "2020-12-11T12:30:21", "value": 1131.4 },
{ "timestamp": "2020-12-11T12:40:08", "value": 1143.5 },
{ "timestamp": "2020-12-11T12:50:13", "value": 1152.6 },
{ "timestamp": "2020-12-11T13:00:11", "value": 1160.7 },
{ "timestamp": "2020-12-11T13:10:09", "value": 1169.8 },
{ "timestamp": "2020-12-11T13:20:10", "value": 1181.9 },
{ "timestamp": "2020-12-11T13:30:17", "value": 1190.0 },
)
},
{
"channel_id": 2,
"data": [
{ "timestamp": "2020-12-11T12:01:15", "value": 1200.1 },
{ "timestamp": "2020-12-11T12:21:28", "value": 1219.2 },
{ "timestamp": "2020-12-11T12:41:21", "value": 1243.3 },
{ "timestamp": "2020-12-11T13:01:16", "value": 1260.4 },
{ "timestamp": "2020-12-11T13:21:18", "value": 1281.5 },
]
}
)
}
# client = TestClient(energyDB)
def setup(self):
self.client = TestClient(energyDB)
def teardown(self):
self.client = None
# --- helper functions
def _apiUrl(self, sub_url : str):
if sub_url.startswith("/"):
return self.restApiRoot + sub_url
else:
return self.restApiRoot + "/" + sub_url
def _fillDatabase(self):
response = self.client.put(
self._apiUrl("/channels"),
json = {"channels": self.testData["channels"]}
)
# self._dumpRequestAndResponse("_fillDatabase(channels)", response)
assert response.status_code == 200
response = self.client.put(
self._apiUrl("/bulkData"),
json = {"bulk": self.testData["bulkdata"]}
)
# self._dumpRequestAndResponse("_fillDatabase(data)", response)
assert response.status_code == 200
def _dumpRequestAndResponse(self, context : str, response):
print("\n")
print(f"---- request ({context})")
print(f"response.request.method: {response.request.method}")
print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
print(f"response.request.headers: {response.request.headers}")
try:
requestJson = json.loads(response.request.body)
print(f"response.request.body(json): {json.dumps(requestJson, indent=2)}")
except:
print(f"response.request.body(plain): {response.request.body}")
print(f"---- response ({context})")
print(f"response.status_code: {response.status_code}")
print(f"response.reason: {response.reason}")
try:
responseJson = json.loads(response.text)
print(f"response.text(json): {json.dumps(responseJson, indent=2)}")
except:
print(f"response.text(plain): {response.text}")
# --- test functions
def test_invalidRoute(self):
response = self.client.get("/")
assert response.status_code == 404
def test_getVersion(self):
response = self.client.get(self._apiUrl("/version"))
# self._dumpRequestAndResponse("test_getVersion", response)
assert response.status_code == 200
assert response.json()["version"] == "1.0"
def _test_getInfo(self):
response = self.client.get( self._apiUrl("/info" ))
# self._dumpRequestAndResponse("test_getInfo", response)
assert response.status_code == 200
def test_getChannelsOfEmptyTable(self):
response = self.client.get(self._apiUrl("/channels"))
# self._dumpRequestAndResponse("test_getChannelsOfEmptyTable", response)
assert response.status_code == 200
assert response.json()["channels"] == []
def test_getBulkDataOfEmptyTable(self):
response = self.client.get(
self._apiUrl("/bulkData"),
json = {
"channel_ids": [1],
"fromTime": "0001-01-01T00:00:00"
}
)
# self._dumpRequestAndResponse("test_getBulkDataOfEmptyTable", response)
assert response.status_code == 200
assert "bulk" in response.json()
bulkData = response.json()["bulk"]
assert len(bulkData) == 1
assert "channel_id" in bulkData[0]
assert "data" in bulkData[0]
assert bulkData[0]["channel_id"] == 1
channelData = bulkData[0]["data"]
assert len(channelData) == 0
def test_fillDatabase(self):
self._fillDatabase()
response = self.client.get(self._apiUrl("/1/count"))
# self._dumpRequestAndResponse("test_fillDatabase", response)
assert response.status_code == 200
def test_getChannels(self):
response = self.client.get(self._apiUrl("/channels"))
# self._dumpRequestAndResponse("test_getChannels", response)
assert response.status_code == 200
def test_getChannelId(self):
response = self.client.get(self._apiUrl("/channels/total_yield/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 200
response = self.client.get(self._apiUrl("/channels/dc_power1/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 200
response = self.client.get(self._apiUrl("/channels/daily_yield/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 200
def test_putChannels(self):
response = self.client.get(self._apiUrl("/channels/frequency/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 500
response = self.client.put(
self._apiUrl("/channels"),
json = {"channels": [{"name": "frequency"}]}
)
# self._dumpRequestAndResponse("test_fillDatabase", response)
assert response.status_code == 200
response = self.client.get(self._apiUrl("/channels/frequency/id"))
# self._dumpRequestAndResponse("test_getChannelId", response)
assert response.status_code == 200
def test_getBulkData(self):
response = self.client.get(
self._apiUrl("/bulkData"),
json = {
"channel_ids": [1],
"fromTime": "0001-01-01T00:00:00"
}
)
# self._dumpRequestAndResponse("test_getBulkData", response)
assert response.status_code == 200
assert "bulk" in response.json()
bulkData = response.json()["bulk"]
assert len(bulkData) == 1
channelData = bulkData[0]
assert channelData["channel_id"] == 1
referenceData = self.testData["bulkdata"][0]
assert len(channelData["data"]) == len(referenceData["data"])
assert channelData["data"][0] == referenceData["data"][0]
assert channelData["data"][1] == referenceData["data"][1]
assert channelData["data"][2] == referenceData["data"][2]
assert channelData["data"][3] == referenceData["data"][3]
assert channelData["data"][4] == referenceData["data"][4]
assert channelData["data"][5] == referenceData["data"][5]
assert channelData["data"][6] == referenceData["data"][6]
assert channelData["data"][7] == referenceData["data"][7]
assert channelData["data"][8] == referenceData["data"][8]
assert channelData["data"][9] == referenceData["data"][9]
def test_putBulkData(self):
newData = [{
"channel_id": None,
"channel": "total_yield",
"data": [
{ "timestamp": "2020-12-11T12:01:20", "value": 120120.1 },
{ "timestamp": "2020-12-11T12:30:25", "value": 123025.2 },
]
}]
response = self.client.put(
self._apiUrl("/bulkData"),
json = {"bulk": newData}
)
# self._dumpRequestAndResponse("test_putBulkData", response)
assert response.status_code == 200
response = self.client.get(
self._apiUrl("/bulkData"),
json = {
"channel_ids": [3],
"fromTime": "2020-12-11T12:00:00",
"tillTime": "2020-12-11T12:59:59"
}
)
channelData = response.json()["bulk"][0]
assert channelData["channel_id"] == 3
assert len(channelData["data"]) == 2
assert channelData["data"][0] == newData[0]["data"][0]
assert channelData["data"][1] == newData[0]["data"][1]
# def test_getRecordCount(self):
# response = self.client.get(self._apiUrl("/1/count"))
# self._dumpRequestAndResponse("test_getRecordCount", response)
# assert response.status_code == 200
# assert response.json()["info"] == {"columns"}
# def _test_bulkData_put(self):
# # response = self.client.put("/energy/bulkData", json=self.bulkTestData);
# response = self.client.put(
# self.restApiRoot + "bulkData",
# json={"bulk": self.bulkTestData})
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# # print("---- request")
# # print(f"response.request.method: {response.request.method}")
# # print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# # print(f"response.request.headers: {response.request.headers}")
# # requestJson = json.loads(response.request.body)
# # print(f"response.request.body: {json.dumps(requestJson, indent=2)}")
# print("---- response")
# print(f"response.reason: {response.reason}")
# responseJson = json.loads(response.text)
# print(f"response.text: {json.dumps(responseJson, indent=2)}")
# # print(f"response.text: {json.dumps(response.text, indent=2)}")
# assert response.status_code == 200
# def _test_getInfo2(self):
# response = self.client.get( self.restApiRoot + "info" )
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# # print(f"dir(response.request): {dir(response.request)}")
# print("---- response")
# print(f"response.reason: {response.reason}")
# # print(f"response.text: {response.text}")
# responseJson = json.loads(response.text)
# print(f"response.text: {json.dumps(responseJson, indent=2)}")
# assert False #response.status_code == 404
# def _test_bulkData_get(self):
# print(f"DB_URL: {os.getenv('DATABASE_URL')}")
# # response = self.client.put("/energy/bulkData", json=self.bulkTestData);
# fromTimestamp = datetime.fromisoformat("2020-12-11T12:30:00")
# tillTimestamp = datetime.fromisoformat("2020-12-11T12:30:59")
# response = self.client.get(
# self.restApiRoot + "bulkData",
# json = {
# "channel_ids": [1, 2, 3],
# "fromTime": fromTimestamp.isoformat(),
# # "tillTime": tillTimestamp.isoformat()
# })
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# print("---- request")
# print(f"response.request.method: {response.request.method}")
# print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# print(f"response.request.headers: {response.request.headers}")
# print(f"dir(response.request): {dir(response.request)}")
# print(f"response.request.body: {response.request.body}")
# # requestJson = json.loads(response.request.body)
# # print(f"response.request.body: {json.dumps(requestJson, indent=2)}")
# print("---- response")
# print(f"response.reason: {response.reason}")
# responseJson = json.loads(response.text)
# print(f"response.text: {json.dumps(responseJson, indent=2)}")
# # print(f"response.text: {json.dumps(response.text, indent=2)}")
# assert response.status_code == 200
# @pytest.mark.skip(reason="Ignore me temporarily")
# def test_insert_energy(self):
# energyData = {
# "timestamp": datetime.now().isoformat(),
# "value": 234.5,
# }
# print(f"energyData: {energyData}")
# # response = self.client.put("/energies/1", json=energyData) #, headers={"X-Token": "coneofsilence"})
# response = self.client.put(
# self.restApiRoot + "2",
# # params=energyData,
# json=energyData) #, headers={"X-Token": "coneofsilence"})
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# # print("---- request")
# # print(f"response.request.method: {response.request.method}")
# # print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# # print(f"response.request.headers: {response.request.headers}")
# # print(f"response.request.body: {json.loads(response.request.body)}")
# # print("---- response")
# # print(f"response.reason: {response.reason}")
# # print(f"response.text: {json.loads(response.text)}")
# # print(f"request.header: \"{response.request.header}\"")
# # print(f"request.body: \"{response.request.body}\"")
# assert response.status_code == 200
# # assert response.json()["msg"] == ""
# @pytest.mark.skip(reason="Ignore me temporarily")
# def test_get_energy(self):
# response = self.client.get(self.restApiRoot + "1")
# # print(f"dir(response): {dir(response)}")
# # print(f"dir(response.request): {dir(response.request)}")
# # print("---- request")
# # print(f"response.request.method: {response.request.method}")
# # print(f"response.request.url: {urllib.parse.unquote(response.request.url)}")
# # print(f"response.request.headers: {response.request.headers}")
# # print("---- response")
# # print(f"response.reason: {response.reason}")
# responseJson = json.loads(response.text)
# print(f"response.text: {json.dumps(responseJson, indent=2)}")
# data = response.json()
# # print(f"data: {type(data)}")
# # for k,v in data.items():
# # print(f"key: {k} -> value: {v}")
# print(f"data of channel: {data['channel_id']}")
# for r in data["data"]:
# print(f"r: {r}")
# assert response.status_code == 200
# # assert response.json()["msg"] == ""