initial commit

This commit is contained in:
stepan TeSt 2026-04-06 14:06:27 +03:00
commit 676d044df2
16 changed files with 546 additions and 0 deletions

View File

@ -0,0 +1,11 @@
Feature: Place info (REST/GraphQL/WebSocket)
Scenario: Authorize as employer
When get access token
Then access token is valid
Scenario: Get place info
When get place info
Then place info is valid for query data

View File

@ -0,0 +1,10 @@
Feature: Place info KVS(REST/GraphQL/WebSocket)
Scenario: Authorize as employer
When get access token
Then access token is valid
Scenario: Get place info
When get place info kvs
Then place info is valid

View File

@ -0,0 +1,40 @@
# pyright: reportCallIssue=false
import os
from typing import Any, Final
from behave import given, when, then
from worklib.QueryData import kvs_query_data, kvs_query_data_place_id
from worklib import admin_data
from worklib.auth_as_employer import get_access_token
from worklib.findplaceinfo.find_place_data import fetch_place_members
expected_result = {
"members": [
{
"id": "bb368ee9-c15f-40ef-acb0-c466df47d096",
"parent_id": None,
"user": {
"username": "+79999956657"
}
}
]
}
@when("get place info kvs") # pyright: ignore[reportGeneralTypeIssues]
def step_get_place_info(context):
token = getattr(context, "access_token", None) or admin_data.get_or_create_user("tester").access_token
data = fetch_place_members(access_token=token, query=kvs_query_data()["query"], variables=kvs_query_data_place_id()["variables"])
context.place_info = data
@then("place info is valid") # pyright: ignore[reportGeneralTypeIssues]
def step_place_info_valid(context):
data = getattr(context, "place_info", None)
assert isinstance(data, dict), "Ответ GraphQL не dict"
assert "data" in data or "place" in str(data), f"Не похоже на успешный GraphQL ответ: {data}"
assert data["data"]["place"]["results"][0]["members"][0]["id"] == expected_result["members"][0]["id"]
assert data["data"]["place"]["results"][0]["members"][0]["parent_id"] == expected_result["members"][0]["parent_id"]
assert data["data"]["place"]["results"][0]["members"][0]["user"]["username"] == expected_result["members"][0]["user"]["username"]

View File

@ -0,0 +1,31 @@
# pyright: reportCallIssue=false
import os
from typing import Any, Final
from behave import given, when, then
from worklib.QueryData import query_data, query_data_place_id_variables
from worklib import admin_data
from worklib.auth_as_employer import get_access_token
from worklib.findplaceinfo.find_place_data import fetch_place_members
# pyright: ignore[reportGeneralTypeIssues]
@when("get access token") # pyright: ignore[reportGeneralTypeIssues]
def step_get_access_token(context):
if not getattr(context, "access_token", None):
token = admin_data.get_access_token_from_env()
context.access_token = token
admin_data.get_or_create_user("tester").access_token = token
@then("access token is valid") # pyright: ignore[reportGeneralTypeIssues]
def step_token_is_valid(context):
token = getattr(context, "access_token", None)
assert isinstance(token, str) and token.strip(), f"access_token пустой/не строка: {token}"

View File

@ -0,0 +1,26 @@
from behave import given, when, then
from typing import Final
from worklib import admin_data
from worklib.findplaceinfo.find_place_data import fetch_place_members
from worklib.QueryData import query_data, query_data_place_id_variables
# pyright: ignore[reportGeneralTypeIssues]
_EXPECTED_RESULT: Final[dict[str, str]] = {
"id": "682b071a163ac2a0995355be",
"place_type": "street",
"name": "ул. Мебельная",
}
@when("get place info") # pyright: ignore[reportGeneralTypeIssues]
def step_get_place_info(context):
token = getattr(context, "access_token", None) or admin_data.get_or_create_user("tester").access_token
data = fetch_place_members(access_token=token, query=query_data()["query"], variables=query_data_place_id_variables()["variables"])
context.place_info = data
@then("place info is valid for query data") # pyright: ignore[reportGeneralTypeIssues]
def step_place_info_valid(context):
data = getattr(context, "place_info", None)
assert isinstance(data, dict), "Ответ GraphQL не dict"
assert "data" in data or "place" in str(data), f"Не похоже на успешный GraphQL ответ: {data}"
assert data["data"]["place"]["results"][0]["id"] == _EXPECTED_RESULT["id"]
assert data["data"]["place"]["results"][0]["place_type"] == _EXPECTED_RESULT["place_type"]
assert data["data"]["place"]["results"][0]["name"] == _EXPECTED_RESULT["name"]

10
main.py Normal file
View File

@ -0,0 +1,10 @@
import worklib.auth_as_employer
from tests.TesFindPlaceInfo import TestFindPlaceInfo
test = TestFindPlaceInfo()

7
pyrightconfig.json Normal file
View File

@ -0,0 +1,7 @@
{
"venvPath": ".",
"venv": ".venv",
"typeCheckingMode": "basic",
"reportMissingImports": true,
"reportMissingTypeStubs": "none"
}

10
pytest.ini Normal file
View File

@ -0,0 +1,10 @@
[pytest]
testpaths = tests
python_files = *.py
python_classes = Test*
python_functions = test_*
addopts =
--alluredir=allure-results
--clean-alluredir
markers =
integration: тесты с реальными HTTP-запросами (auth + GraphQL)

3
requirements.txt Normal file
View File

@ -0,0 +1,3 @@
behave
pytest>=7.0
allure-pytest>=2.13.0

View File

@ -0,0 +1,27 @@
# Запуск тестов с записью результатов Allure и генерация HTML-отчёта (нужен Allure CLI в PATH).
# Установка Allure 2: https://github.com/allure-framework/allure2/releases
# Добавьте bin каталог разархивированного allure в переменную PATH.
$ErrorActionPreference = "Stop"
$Root = Split-Path -Parent $PSScriptRoot
Set-Location $Root
$venvPy = Join-Path $Root ".venv\Scripts\python.exe"
if (-not (Test-Path $venvPy)) {
Write-Error "Нет виртуального окружения .venv. Выполните: python -m venv .venv && .\.venv\Scripts\python -m pip install -r requirements.txt"
}
& $venvPy -m pytest tests\TesFindPlaceInfo.py -v
$allure = Get-Command allure -ErrorAction SilentlyContinue
if (-not $allure) {
Write-Host ""
Write-Host "Allure CLI не найден в PATH. Установите Allure 2 и добавьте bin в PATH."
Write-Host "Сырые результаты уже в папке: allure-results"
Write-Host "После установки CLI выполните: allure generate allure-results -o allure-report --clean"
exit 0
}
& allure generate allure-results -o allure-report --clean
Write-Host ""
Write-Host "Отчёт: file:///$((Join-Path $Root 'allure-report\index.html') -replace '\\', '/')"

113
tests/TesFindPlaceInfo.py Normal file
View File

@ -0,0 +1,113 @@
"""Проверка ответа GraphQL place: структура и значения полей results[ ] + Allure-отчёт."""
from __future__ import annotations
import json
from typing import Any, Final
import allure # pyright: ignore[reportMissingImports]
import pytest
from allure_commons.types import AttachmentType # pyright: ignore[reportMissingImports]
from worklib.findplaceinfo.find_place_data import fetch_place_members
# Ожидаемый фрагмент ответа (ключи и значения должны совпадать хотя бы в одной записи results).
_EXPECTED_RESULT: Final[dict[str, str]] = {
"id": "682b071a163ac2a0995355be",
"place_type": "street",
"name": "ул. Мебельная",
}
ALLURE_EPIC = "Place API"
ALLURE_FEATURE = "GraphQL place"
ALLURE_STORY = "Формат ответа и значения полей"
@allure.epic(ALLURE_EPIC)
@allure.feature(ALLURE_FEATURE)
@allure.story(ALLURE_STORY)
@pytest.mark.integration
class TestFindPlaceInfo:
"""Интеграционный тест: реальный запрос и сверка формата/значений."""
@pytest.fixture(scope="class")
def place_response(self) -> dict[str, Any]:
with allure.step("Запрос place через GraphQL (fetch_place_members)"):
payload = fetch_place_members()
allure.attach(
json.dumps(payload, ensure_ascii=False, indent=2),
name="Ответ GraphQL (raw JSON)",
attachment_type=AttachmentType.JSON,
)
return payload
@allure.title("Структура: data → place → results[] с полями id, place_type, name")
@allure.severity(allure.severity_level.NORMAL)
def test_response_has_place_shape(self, place_response: dict[str, Any]) -> None:
with allure.step("Проверка вложенности data.place.results"):
results = _extract_results(place_response)
allure.attach(
json.dumps(results, ensure_ascii=False, indent=2),
name="results (извлечено)",
attachment_type=AttachmentType.JSON,
)
assert len(results) >= 1, "results должен содержать хотя бы один элемент"
for key in ("id", "place_type", "name"):
assert key in results[0], f'В элементе results[0] должен быть ключ "{key}"'
@allure.title("Значения id, place_type, name совпадают с эталоном")
@allure.severity(allure.severity_level.CRITICAL)
def test_expected_id_place_type_name_match(self, place_response: dict[str, Any]) -> None:
allure.attach(
json.dumps(_EXPECTED_RESULT, ensure_ascii=False, indent=2),
name="Ожидаемый объект в results",
attachment_type=AttachmentType.JSON,
)
with allure.step("Поиск записи с полным совпадением id, place_type, name"):
results = _extract_results(place_response)
match = _find_matching_result(results, _EXPECTED_RESULT)
if match is None:
allure.attach(
json.dumps(results, ensure_ascii=False, indent=2),
name="Фактический results (для отладки)",
attachment_type=AttachmentType.JSON,
)
assert match is not None, (
f"Ни одна запись в results не совпадает с ожидаемым набором {_EXPECTED_RESULT}. "
f"Получено: {json.dumps(results, ensure_ascii=False, indent=2)[:2000]}"
)
allure.attach(
json.dumps(match, ensure_ascii=False, indent=2),
name="Найденная запись",
attachment_type=AttachmentType.JSON,
)
assert match["id"] == _EXPECTED_RESULT["id"]
assert match["place_type"] == _EXPECTED_RESULT["place_type"]
assert match["name"] == _EXPECTED_RESULT["name"]
def _extract_results(payload: dict[str, Any]) -> list[dict[str, Any]]:
assert "data" in payload, f'В ответе нет ключа "data": {json.dumps(payload, ensure_ascii=False)[:500]}'
data = payload["data"]
assert isinstance(data, dict), '"data" должен быть объектом'
assert "place" in data, f'В data нет "place": keys={list(data.keys())}'
place = data["place"]
assert place is not None, '"place" не должен быть null'
assert isinstance(place, dict), '"place" должен быть объектом'
assert "results" in place, f'В place нет "results": keys={list(place.keys())}'
results = place["results"]
assert isinstance(results, list), '"results" должен быть массивом'
for i, item in enumerate(results):
assert isinstance(item, dict), f'results[{i}] должен быть объектом, получено {type(item)}'
return results
def _find_matching_result(results: list[dict[str, Any]], expected: dict[str, str]) -> dict[str, Any] | None:
for item in results:
if all(item.get(k) == v for k, v in expected.items()):
return item
return None

58
worklib/QueryData.py Normal file
View File

@ -0,0 +1,58 @@
import re
from typing import Optional
class QuaryData:
def __init__(self, tag):
self.tag = tag
self.username: Optional[str] = None
def kvs_query_data():
return {
"query": """
query ($id: String!){
place(id: $id) {
results {
members {
id
parent_id
user {
username
}
}
}
}
}
""".strip(),
}
def kvs_query_data_place_id():
return {
"variables": {
"id": "6915dc03462d5aea0adc8cbd"
}
}
def query_data():
return {
"query": """
query ($id: String!) {
place(id: $id) {
results {
id
name
place_type
}
}
}
""".strip()
}
def query_data_place_id_variables():
return {
"variables": {
"id": "682b071a163ac2a0995355be"
}
}

6
worklib/__init__.py Normal file
View File

@ -0,0 +1,6 @@
__all__ = [
"admin_data",
"auth_as_employer",
"find_place_data",
]

76
worklib/admin_data.py Normal file
View File

@ -0,0 +1,76 @@
import re
from typing import Optional
from worklib.auth_as_employer import get_access_token
import os
class NoTokenException(Exception):
def __init__(self, tag):
super().__init__(f"User {tag} does not have access token")
class AdminData:
def __init__(self, tag):
self.tag = tag
self.username: Optional[str] = None
self.access_token: Optional[str] = None
self.refresh_token: Optional[str] = None
self.profile_id: Optional[str] = None
self.phone_number: Optional[str] = None
self.account_id: Optional[str] = None
class UserManager:
def __init__(self):
self._users = {}
self.last_user_tag = 0
def get_or_create_user(self, user_tag) -> AdminData:
user_id = self.str_to_user_tag(user_tag)
if user_id not in self._users:
self._users[user_id] = AdminData(user_id)
return self._users[user_id]
def str_to_user_tag(self, user):
if user == "tester":
return 0
if user == "he":
return self.last_user_tag or 0
user_id = int(re.sub("[^0-9]", "", str(user)) or 0)
self.last_user_tag = user_id
return user_id
def get_user_tags(self):
return [user.tag for user in self._users.values()]
def delete(self, user_tag):
user_id = self.str_to_user_tag(user_tag)
del self._users[user_id]
def delete_user_by_username(self, username):
for user_tag in list(self._users.keys()):
if self._users[user_tag].username == username:
del self._users[user_tag]
return
@property
def users(self):
return self._users.copy()
user_manager = UserManager()
def get_or_create_user(user_tag) -> AdminData:
return user_manager.get_or_create_user(user_tag)
def get_access_token_from_env():
# Берём значения из env, чтобы не хардкодить в тестах.
username = os.getenv("AUTH_USERNAME", "+79214400842")
password = os.getenv("AUTH_PASSWORD", "stepan")
grant_type = os.getenv("AUTH_GRANT_TYPE", "password")
token = get_access_token(username=username, password=password, grant_type=grant_type)
access_token = token
return access_token
# Храним в простом менеджере, чтобы можно было расширять сценарии.
get_or_create_user("tester").access_token = access_token

View File

@ -0,0 +1,56 @@
import json
import os
import urllib.error
import urllib.request
from typing import Optional
DEFAULT_USERNAME = "+79214400842"
DEFAULT_PASSWORD = "stepan"
DEFAULT_GRANT_TYPE = "password"
def get_access_token(
auth_url: Optional[str] = None,
*,
username: str = DEFAULT_USERNAME,
password: str = DEFAULT_PASSWORD,
grant_type: str = DEFAULT_GRANT_TYPE,
timeout_s: int = 30,
) -> str:
"""
POST авторизация, возвращает access_token.
Можно переопределить AUTH_URL через env.
"""
auth_url = auth_url or os.getenv("AUTH_URL") or "https://auth.dev.dipal.ru/api/v1/auth/login"
if not auth_url:
raise ValueError("Не задан AUTH_URL (передайте auth_url или установите env AUTH_URL)")
payload = {"username": username, "password": password, "grant_type": grant_type}
req = urllib.request.Request(
auth_url,
data=json.dumps(payload).encode("utf-8"),
headers={"content-type": "application/json"},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode("utf-8")
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"Auth HTTP {e.code}: {body}") from e
data = json.loads(raw) if raw else {}
if "data" in data and isinstance(data["data"], dict):
token = data["data"].get("access_token")
else:
token = data.get("access_token")
if not token:
raise ValueError(f"В ответе нет access_token. Ключи ответа: {list(data.keys())}")
return token
if __name__ == "__main__":
print(get_access_token())

View File

@ -0,0 +1,62 @@
import json
import os
import urllib.error
import urllib.request
from typing import Any, Dict, Optional
from worklib.QueryData import kvs_query_data, kvs_query_data_place_id
from worklib.auth_as_employer import get_access_token
DEFAULT_COMPANY_ID = "65437401ae3af6f8ffcdbaf8"
def build_headers(access_token: str, *, company_id: str = DEFAULT_COMPANY_ID) -> Dict[str, str]:
return {
"authorization": f"bearer {access_token}",
"x-company-id": company_id,
}
def fetch_place_members(
*,
query: str,
variables: dict[str, str],
graphql_url: Optional[str] = None,
company_id: str = DEFAULT_COMPANY_ID,
access_token: Optional[str] = None,
timeout_s: int = 30,
) -> Dict[str, Any]:
graphql_url = graphql_url or os.getenv("GRAPHQL_URL") or "https://admin.dev.dipal.ru/graphql"
if not graphql_url:
raise ValueError("Не задан GRAPHQL_URL (передайте graphql_url или установите env GRAPHQL_URL)")
token = access_token or get_access_token()
headers = build_headers(token, company_id=company_id)
payload = {"query": query, "variables": variables}
req = urllib.request.Request(
graphql_url,
data=json.dumps(payload).encode("utf-8"),
headers={"content-type": "application/json", **headers},
method="POST",
)
try:
with urllib.request.urlopen(req, timeout=timeout_s) as resp:
raw = resp.read().decode("utf-8")
except urllib.error.HTTPError as e:
body = e.read().decode("utf-8", errors="replace")
raise RuntimeError(f"GraphQL HTTP {e.code}: {body}") from e
data = json.loads(raw) if raw else {}
if isinstance(data, dict) and data.get("errors"):
raise RuntimeError(f"GraphQL errors: {data['errors']}")
return data
if __name__ == "__main__":
print()