pyxr/tests/test_mongodb_health.py
2025-02-07 10:31:32 +08:00

42 lines
No EOL
1.4 KiB
Python

import pytest
from pymongo import MongoClient
from pymongo.errors import ConnectionFailure
@pytest.fixture
def test_mongo_client():
client = MongoClient('localhost', 27017)
try:
# 测试连接是否成功
client.admin.command('ping')
db = client.test_health_check_db
collection = db.test_collection
yield collection
except ConnectionError:
pytest.fail("MongoDB连接失败 - 请确保MongoDB服务正在运行")
finally:
# 清理测试数据
if 'test_health_check_db' in client.list_database_names():
client.drop_database('test_health_check_db')
client.close()
def test_mongodb_connection_and_operations(test_mongo_client):
# 测试数据插入
test_doc = {"test_key": "test_value"}
insert_result = test_mongo_client.insert_one(test_doc)
assert insert_result.inserted_id is not None
# 测试数据查询
found_doc = test_mongo_client.find_one({"test_key": "test_value"})
assert found_doc is not None
assert found_doc["test_key"] == "test_value"
# 测试数据更新
update_result = test_mongo_client.update_one(
{"test_key": "test_value"},
{"$set": {"test_key": "updated_value"}}
)
assert update_result.modified_count == 1
# 测试数据删除
delete_result = test_mongo_client.delete_one({"test_key": "updated_value"})
assert delete_result.deleted_count == 1