next.js frontend w/ mongodb local backend
This commit is contained in:
parent
e24e17e42b
commit
f333d9ba00
18 changed files with 3144 additions and 337 deletions
69
tests/test_app.py
Normal file
69
tests/test_app.py
Normal file
|
|
@ -0,0 +1,69 @@
|
|||
import pytest
|
||||
from pymongo import MongoClient
|
||||
from pymongo.errors import ConnectionFailure # Import ConnectionFailure
|
||||
from datetime import datetime
|
||||
from bson.objectid import ObjectId
|
||||
from pyxr.app import app
|
||||
|
||||
@pytest.fixture
|
||||
def client():
|
||||
app.config['TESTING'] = True
|
||||
with app.test_client() as client:
|
||||
yield client
|
||||
|
||||
@pytest.fixture
|
||||
def mongo_client():
|
||||
# 使用测试数据库
|
||||
client = MongoClient('localhost', 27017)
|
||||
db = client.test_flask_db
|
||||
todos = db.todos
|
||||
yield todos
|
||||
# 清理测试数据
|
||||
todos.delete_many({})
|
||||
client.close()
|
||||
|
||||
def test_get_todos_empty(client, mongo_client):
|
||||
# 测试空数据库情况
|
||||
response = client.get('/api/todos')
|
||||
assert response.status_code == 200
|
||||
assert response.json == []
|
||||
|
||||
def test_create_todo(client, mongo_client):
|
||||
# 测试创建待办事项
|
||||
test_todo = {
|
||||
'content': '测试待办事项',
|
||||
'degree': 1
|
||||
}
|
||||
response = client.post('/api/todos', json=test_todo)
|
||||
assert response.status_code == 201
|
||||
assert '_id' in response.json
|
||||
|
||||
# 验证数据是否正确保存
|
||||
saved_todo = mongo_client.find_one({'_id': ObjectId(response.json['_id'])})
|
||||
assert saved_todo is not None
|
||||
assert saved_todo['content'] == test_todo['content']
|
||||
assert saved_todo['degree'] == test_todo['degree']
|
||||
assert 'timestamp' in saved_todo
|
||||
|
||||
def test_delete_todo(client, mongo_client):
|
||||
# 先创建一个待办事项
|
||||
todo = {
|
||||
'content': '要删除的待办事项',
|
||||
'degree': 1,
|
||||
'timestamp': datetime.now()
|
||||
}
|
||||
result = mongo_client.insert_one(todo)
|
||||
todo_id = str(result.inserted_id)
|
||||
|
||||
# 测试删除
|
||||
response = client.delete(f'/api/todos/{todo_id}')
|
||||
assert response.status_code == 204
|
||||
|
||||
# 验证是否已删除
|
||||
assert mongo_client.find_one({'_id': ObjectId(todo_id)}) is None
|
||||
|
||||
def test_delete_nonexistent_todo(client):
|
||||
# 测试删除不存在的待办事项
|
||||
fake_id = str(ObjectId())
|
||||
response = client.delete(f'/api/todos/{fake_id}')
|
||||
assert response.status_code == 204 # 即使不存在也返回204
|
||||
42
tests/test_mongodb_health.py
Normal file
42
tests/test_mongodb_health.py
Normal file
|
|
@ -0,0 +1,42 @@
|
|||
import pytest
|
||||
from pymongo import MongoClient
|
||||
from pymongo.errors import ConnectionFailure
|
||||
|
||||
@pytest.fixture
|
||||
def test_mongo_client():
|
||||
client = MongoClient('localhost', 27017)
|
||||
try:
|
||||
# 测试连接是否成功
|
||||
client.admin.command('ping')
|
||||
db = client.test_health_check_db
|
||||
collection = db.test_collection
|
||||
yield collection
|
||||
except ConnectionError:
|
||||
pytest.fail("MongoDB连接失败 - 请确保MongoDB服务正在运行")
|
||||
finally:
|
||||
# 清理测试数据
|
||||
if 'test_health_check_db' in client.list_database_names():
|
||||
client.drop_database('test_health_check_db')
|
||||
client.close()
|
||||
|
||||
def test_mongodb_connection_and_operations(test_mongo_client):
|
||||
# 测试数据插入
|
||||
test_doc = {"test_key": "test_value"}
|
||||
insert_result = test_mongo_client.insert_one(test_doc)
|
||||
assert insert_result.inserted_id is not None
|
||||
|
||||
# 测试数据查询
|
||||
found_doc = test_mongo_client.find_one({"test_key": "test_value"})
|
||||
assert found_doc is not None
|
||||
assert found_doc["test_key"] == "test_value"
|
||||
|
||||
# 测试数据更新
|
||||
update_result = test_mongo_client.update_one(
|
||||
{"test_key": "test_value"},
|
||||
{"$set": {"test_key": "updated_value"}}
|
||||
)
|
||||
assert update_result.modified_count == 1
|
||||
|
||||
# 测试数据删除
|
||||
delete_result = test_mongo_client.delete_one({"test_key": "updated_value"})
|
||||
assert delete_result.deleted_count == 1
|
||||
Loading…
Add table
Reference in a new issue