director added

This commit is contained in:
Sviatoslav Tsariov Yurievich 2023-09-15 13:16:14 +03:00
parent 3b9a576290
commit 3eb99ae7ae
14 changed files with 898 additions and 2 deletions

52
director/.dockerignore Normal file
View File

@ -0,0 +1,52 @@
# Ignore common Python-related files and directories
__pycache__
*.pyc
*.pyo
*.pyd
# Ignore development and version control files
*.log
*.swp
.git
.vscode
.idea
__init__.py
# Ignore virtual environment and dependencies
venv/
env/
*.egg-info/
*.egg
*.log
# Ignore other temporary and build files
*.bak
*.bak1
*.bak2
*.swp
*.swo
*.swn
*.un~
*.swm
*.swl
*.pyc
*.pyo
*.bak
*.bak1
*.bak2
*.orig
*.rej
*.log
*.backup
*.tmp
*.tmp.*
*.diff
*.cache
*.css.map
*.css.map.*
# Ignore any specific files or directories your project generates
# Add lines for any other project-specific files or directories to ignore
.env
.env*
*.env

3
director/.env.example Normal file
View File

@ -0,0 +1,3 @@
DOWNLOADER_LINK=
PARSER_ENDPOINT_LIST=
SCHEDULE_TIME=

12
director/Dockerfile Normal file
View File

@ -0,0 +1,12 @@
# syntax=docker/dockerfile:1
FROM python:3.8-slim-buster
WORKDIR /app
RUN apt-get update && \
rm -rf /var/lib/apt/lists/*
COPY . .
ENTRYPOINT ["/app/wrapper.sh"]

31
director/app.py Normal file
View File

@ -0,0 +1,31 @@
import asyncio
import aiohttp
import schedule
import time
import random
import threading
from config import App
def main():
print("Script is running...")
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
# Schedule the script to run every day at a specific time
print(f'Repos downloading is scheduled on {App.ScheduleTime}')
schedule.every().day.at(App.ScheduleTime).do(lambda: main())
# Run the scheduler in a separate thread
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.start()
# Keep the main thread running
try:
scheduler_thread.join()
except KeyboardInterrupt:
pass

18
director/config.py Normal file
View File

@ -0,0 +1,18 @@
from utils.libs.decouple import config, UndefinedValueError
from utils.utils import str_to_bool
from utils.logger import logger
def set_conf_value(key):
try:
value = config(key)
if value == '':
logger.warn(f'WARNING: The variable {key} is an empty string.')
return value
except (UndefinedValueError):
logger.warn(f'WARNING: Please set the variable {key} in the .env file based on .env.example.')
return None
class App:
DownloaderLink=set_conf_value('DOWNLOADER_LINK')
ParserEndpointList=set_conf_value('PARSER_ENDPOINT_LIST')
ScheduleTime=set_conf_value('SCHEDULE_TIME')

66
director/director.py Normal file
View File

@ -0,0 +1,66 @@
import asyncio
import aiohttp
import schedule
import time
import random
import threading
from config import App
async def fetch_endpoint(session, url):
async with session.get(url) as response:
return await response.text()
async def fetch_endpoint_with_retry(session, url):
timeout = aiohttp.ClientTimeout(total=None) # No timeout
async with session.get(url, timeout=timeout) as response:
if response.status == 200:
return await response.text()
elif response.status == 500:
retry_delay = random.randint(30, 90)
print(f"Received a 500 response from {url}. Retrying in {retry_delay} seconds.")
await asyncio.sleep(retry_delay)
return await fetch_endpoint_with_retry(session, url)
else:
print(f"Received an unexpected response code {response.status} from {url}.")
return None
async def main():
print("Script is running...")
async with aiohttp.ClientSession() as session:
# Fetch all endpoints concurrently
endpoints = App.ParserEndpointList.split()
# Create tasks for fetching all endpoints concurrently
tasks = [fetch_endpoint_with_retry(session, url) for url in endpoints]
# Wait for all endpoint tasks to complete
endpoint_responses = await asyncio.gather(*tasks)
print(endpoint_responses)
# Check if all endpoints returned 200 status codes
if all(response is not None for response in endpoint_responses):
# Call another_endpoint only if all endpoints returned 200
another_endpoint = App.DownloaderLink
response = await fetch_endpoint(session, another_endpoint)
# Process the response from the final endpoint
print(response)
else:
print("Not all endpoints returned 200 status codes. Skipping another_endpoint.")
def run_scheduler():
while True:
schedule.run_pending()
time.sleep(1)
if __name__ == '__main__':
# Schedule the script to run every day at a specific time
print(f'Repos downloading is scheduled on {App.ScheduleTime}')
schedule.every().day.at(App.ScheduleTime).do(lambda: asyncio.run(main()))
# Run the scheduler in a separate thread
scheduler_thread = threading.Thread(target=run_scheduler)
scheduler_thread.start()

View File

@ -0,0 +1,11 @@
version: '2.1'
services:
director:
container_name: director
image: director
build: .
network_mode: host
environment:
DOWNLOADER_LINK: ${DOWNLOADER_LINK}
PARSER_ENDPOINT_LIST: ${PARSER_ENDPOINT_LIST}
SCHEDULE_TIME: ${SCHEDULE_TIME}

10
director/requirements.txt Normal file
View File

@ -0,0 +1,10 @@
aiohttp==3.8.5
aiosignal==1.3.1
async-timeout==4.0.3
attrs==23.1.0
charset-normalizer==3.2.0
frozenlist==1.4.0
idna==3.4
multidict==6.0.4
schedule==1.2.0
yarl==1.9.2

View File

@ -0,0 +1,314 @@
# coding: utf-8
import os
import sys
import string
from shlex import shlex
from io import open
from collections import OrderedDict
# Useful for very coarse version differentiation.
PYVERSION = sys.version_info
if PYVERSION >= (3, 0, 0):
from configparser import ConfigParser
text_type = str
else:
from ConfigParser import SafeConfigParser as ConfigParser
text_type = unicode
if PYVERSION >= (3, 2, 0):
def read_config(parser, file): return parser.read_file(file)
else:
def read_config(parser, file): return parser.readfp(file)
DEFAULT_ENCODING = 'UTF-8'
# Python 3.10 don't have strtobool anymore. So we move it here.
TRUE_VALUES = {"y", "yes", "t", "true", "on", "1"}
FALSE_VALUES = {"n", "no", "f", "false", "off", "0"}
def strtobool(value):
if isinstance(value, bool):
return value
value = value.lower()
if value in TRUE_VALUES:
return True
elif value in FALSE_VALUES:
return False
raise ValueError("Invalid truth value: " + value)
class UndefinedValueError(Exception):
pass
class Undefined(object):
"""
Class to represent undefined type.
"""
pass
# Reference instance to represent undefined values
undefined = Undefined()
class Config(object):
"""
Handle .env file format used by Foreman.
"""
def __init__(self, repository):
self.repository = repository
def _cast_boolean(self, value):
"""
Helper to convert config values to boolean as ConfigParser do.
"""
value = str(value)
return bool(value) if value == '' else bool(strtobool(value))
@staticmethod
def _cast_do_nothing(value):
return value
def get(self, option, default=undefined, cast=undefined):
"""
Return the value for option or default if defined.
"""
# We can't avoid __contains__ because value may be empty.
if option in os.environ:
value = os.environ[option]
elif option in self.repository:
value = self.repository[option]
else:
if isinstance(default, Undefined):
raise UndefinedValueError(
'{} not found. Declare it as envvar or define a default value.'.format(option))
value = default
if isinstance(cast, Undefined):
cast = self._cast_do_nothing
elif cast is bool:
cast = self._cast_boolean
return cast(value)
def __call__(self, *args, **kwargs):
"""
Convenient shortcut to get.
"""
return self.get(*args, **kwargs)
class RepositoryEmpty(object):
def __init__(self, source='', encoding=DEFAULT_ENCODING):
pass
def __contains__(self, key):
return False
def __getitem__(self, key):
return None
class RepositoryIni(RepositoryEmpty):
"""
Retrieves option keys from .ini files.
"""
SECTION = 'settings'
def __init__(self, source, encoding=DEFAULT_ENCODING):
self.parser = ConfigParser()
with open(source, encoding=encoding) as file_:
read_config(self.parser, file_)
def __contains__(self, key):
return (key in os.environ or
self.parser.has_option(self.SECTION, key))
def __getitem__(self, key):
return self.parser.get(self.SECTION, key)
class RepositoryEnv(RepositoryEmpty):
"""
Retrieves option keys from .env files with fall back to os.environ.
"""
def __init__(self, source, encoding=DEFAULT_ENCODING):
self.data = {}
with open(source, encoding=encoding) as file_:
for line in file_:
line = line.strip()
if not line or line.startswith('#') or '=' not in line:
continue
k, v = line.split('=', 1)
k = k.strip()
v = v.strip()
if len(v) >= 2 and ((v[0] == "'" and v[-1] == "'") or (v[0] == '"' and v[-1] == '"')):
v = v[1:-1]
self.data[k] = v
def __contains__(self, key):
return key in os.environ or key in self.data
def __getitem__(self, key):
return self.data[key]
class RepositorySecret(RepositoryEmpty):
"""
Retrieves option keys from files,
where title of file is a key, content of file is a value
e.g. Docker swarm secrets
"""
def __init__(self, source='/run/secrets/'):
self.data = {}
ls = os.listdir(source)
for file in ls:
with open(os.path.join(source, file), 'r') as f:
self.data[file] = f.read()
def __contains__(self, key):
return key in os.environ or key in self.data
def __getitem__(self, key):
return self.data[key]
class AutoConfig(object):
"""
Autodetects the config file and type.
Parameters
----------
search_path : str, optional
Initial search path. If empty, the default search path is the
caller's path.
"""
SUPPORTED = OrderedDict([
('settings.ini', RepositoryIni),
('.env', RepositoryEnv),
])
encoding = DEFAULT_ENCODING
def __init__(self, search_path=None):
self.search_path = search_path
self.config = None
def _find_file(self, path):
# look for all files in the current path
for configfile in self.SUPPORTED:
filename = os.path.join(path, configfile)
if os.path.isfile(filename):
return filename
# search the parent
parent = os.path.dirname(path)
if parent and parent != os.path.abspath(os.sep):
return self._find_file(parent)
# reached root without finding any files.
return ''
def _load(self, path):
# Avoid unintended permission errors
try:
filename = self._find_file(os.path.abspath(path))
except Exception:
filename = ''
Repository = self.SUPPORTED.get(os.path.basename(filename), RepositoryEmpty)
self.config = Config(Repository(filename, encoding=self.encoding))
def _caller_path(self):
# MAGIC! Get the caller's module path.
frame = sys._getframe()
path = os.path.dirname(frame.f_back.f_back.f_code.co_filename)
return path
def __call__(self, *args, **kwargs):
if not self.config:
self._load(self.search_path or self._caller_path())
return self.config(*args, **kwargs)
# A pré-instantiated AutoConfig to improve decouple's usability
# now just import config and start using with no configuration.
config = AutoConfig()
# Helpers
class Csv(object):
"""
Produces a csv parser that return a list of transformed elements.
"""
def __init__(self, cast=text_type, delimiter=',', strip=string.whitespace, post_process=list):
"""
Parameters:
cast -- callable that transforms the item just before it's added to the list.
delimiter -- string of delimiters chars passed to shlex.
strip -- string of non-relevant characters to be passed to str.strip after the split.
post_process -- callable to post process all casted values. Default is `list`.
"""
self.cast = cast
self.delimiter = delimiter
self.strip = strip
self.post_process = post_process
def __call__(self, value):
"""The actual transformation"""
if value is None:
return self.post_process()
def transform(s): return self.cast(s.strip(self.strip))
splitter = shlex(value, posix=True)
splitter.whitespace = self.delimiter
splitter.whitespace_split = True
return self.post_process(transform(s) for s in splitter)
class Choices(object):
"""
Allows for cast and validation based on a list of choices.
"""
def __init__(self, flat=None, cast=text_type, choices=None):
"""
Parameters:
flat -- a flat list of valid choices.
cast -- callable that transforms value before validation.
choices -- tuple of Django-like choices.
"""
self.flat = flat or []
self.cast = cast
self.choices = choices or []
self._valid_values = []
self._valid_values.extend(self.flat)
self._valid_values.extend([value for value, _ in self.choices])
def __call__(self, value):
transform = self.cast(value)
if transform not in self._valid_values:
raise ValueError((
'Value not in list: {!r}; valid values are {!r}'
).format(value, self._valid_values))
else:
return transform

View File

@ -0,0 +1,264 @@
import re
import requests
from http import HTTPStatus
import json as JSON
from infra.config import BaseUri, Settings
from utils.logger import logger, bcolors
from utils.singleton import SingletonMeta
swagger_api_json_endpoint = '/api-json'
api_info_urls = {
BaseUri.Iguana: 'Iguana',
BaseUri.Pyrador: 'Pyrador',
BaseUri.Zoo: 'Zoo'
}
excluded_endpoints = [
('POST', BaseUri.Iguana + '/api/v1/test/controller'),
('GET', BaseUri.Iguana + '/api/v1/test/controller'),
('DELETE', BaseUri.Iguana + '/api/v1/test/controller'),
('GET', BaseUri.Iguana + '/api/v1/health'),
('GET', BaseUri.Iguana + '/api/v1/metrics'),
('GET', BaseUri.Iguana + '/api/v1/settings'),
('POST', BaseUri.Iguana + '/api/v1/settings'),
('PUT', BaseUri.Iguana + '/api/v1/settings'),
('GET', BaseUri.Iguana + '/api/v1/activity'),
('GET', BaseUri.Iguana + '/api/v1/activity/{activity_id}'),
('POST', BaseUri.Iguana + '/api/v1/doorlock'),
('PUT', BaseUri.Iguana + '/api/v1/profile/set-account-number'),
('PUT', BaseUri.Iguana + '/api/v1/profile/address'),
('PUT', BaseUri.Iguana + '/api/v1/profile/contact'),
('POST', BaseUri.Iguana + '/api/v1/profile/set-firebase-token'),
('PUT', BaseUri.Iguana + '/api/v1/profile/balance'),
('GET', BaseUri.Iguana + '/api/v1/providable-service/{place_id}'),
('POST', BaseUri.Iguana + '/api/v1/light-device/toggle'),
('GET', BaseUri.Iguana + '/api/v1/light-device/state/{device_id}'),
('PUT', BaseUri.Iguana + '/api/v1/user-place/{place_id}'),
('GET', BaseUri.Iguana + '/api/v1/user-place/{place_id}/services'),
('PUT', BaseUri.Iguana + '/api/v1/user-place/set/status'),
('POST', BaseUri.Iguana + '/api/v1/profile/device/to/service'),
('DELETE', BaseUri.Iguana + '/api/v1/profile/device/from/service'),
('GET', BaseUri.Iguana + '/api/v1/profile/place/{place_id}/service/devices/{device_category}'),
('POST', BaseUri.Iguana + '/api/v1/room'),
('GET', BaseUri.Iguana + '/api/v1/room/by/place/{parent_id}'),
('PUT', BaseUri.Iguana + '/api/v1/room/{id}'),
('DELETE', BaseUri.Iguana + '/api/v1/room/{id}'),
('GET', BaseUri.Iguana + '/api/v1/device/list/{type}'),
('DELETE', BaseUri.Iguana + '/api/v1/user-place/qrcode'),
('POST', BaseUri.Iguana + '/api/v1/billing'),
('POST', BaseUri.Iguana + '/api/v1/intercom/acceptCall'), # TODO: test it with notifications
('POST', BaseUri.Iguana + '/api/v1/upload/avatar'), # TODO: unable to test
('POST', BaseUri.Zoo + '/api/v1/notifications/send-notification'),
('POST', BaseUri.Zoo + '/api/v1/notifications/send-sms'),
('DELETE', BaseUri.Zoo + '/api/v1/place/available_services')
]
class APICoverageTracker(metaclass=SingletonMeta):
def __init__(self):
self.called_endpoints = {}
self.api_info = self.request_api_info(api_info_urls)
self.build_called_endpoints()
def request_api_info(self, urls):
api_info = {}
for url in urls:
res = requests.get(url + swagger_api_json_endpoint)
api_info[url] = res.json()
return api_info
def build_called_endpoints(self):
for url, info in self.api_info.items():
try:
paths = info.get('paths')
if not url in self.called_endpoints:
self.called_endpoints[url] = {}
for path, methods in paths.items():
endpoint = url + path
self.called_endpoints[url][path] = {}
for method, method_info in methods.items():
if (method.upper(), endpoint) in excluded_endpoints:
continue
self.called_endpoints[url][path][method] = 0
except Exception as e:
logger.error('Error happened while getting api info:', e)
def endpoint_is_called(self, called_endpoint, method):
if not Settings.EnableCoverageStatistics:
return
for url, paths in self.called_endpoints.items():
for path, methods in paths.items():
endpoint = url + path
pattern = re.sub(r'{.+?}', r'[^/]+', endpoint) + '$'
if re.match(pattern, called_endpoint) and method.lower() in methods:
self.called_endpoints[url][path][method.lower()] += 1
return
def print_coverage(self):
def calculate_coverage_statistics(total_urls, covered_urls):
if total_urls == 0:
return 0
coverage_percentage = int(covered_urls / total_urls * 100)
if coverage_percentage < 50:
color = bcolors.FAIL
elif coverage_percentage < 75:
color = bcolors.WARNING
else:
color = bcolors.OKGREEN
statistics = f'{coverage_percentage}% ({covered_urls} / {total_urls})'
return f'{color}{statistics}{bcolors.ENDC}'
def count_urls(gateway_url):
urls_num = 0
covered_urls_num = 0
for url, paths in self.called_endpoints.items():
for path, methods in paths.items():
endpoint = url + path
if gateway_url in endpoint:
for method, num_of_calls in methods.items():
urls_num += 1
if num_of_calls > 0:
covered_urls_num += 1
else:
logger.warn(f'{method.upper()} {endpoint} is not covered')
return urls_num, covered_urls_num
if not Settings.EnableCoverageStatistics:
return
urls_num_sum = 0
covered_urls_num_sum = 0
urls_info = \
[(gateway_name, count_urls(gateway_url)) \
for gateway_url, gateway_name in api_info_urls.items()]
logger.info('Coverage statistics:')
logger.info()
for gateway_name, (urls_num, covered_urls_num) in urls_info:
coverage_statistics = calculate_coverage_statistics(urls_num, covered_urls_num)
message = f' {gateway_name}: {coverage_statistics}'
logger.info(message)
urls_num_sum += urls_num
covered_urls_num_sum += covered_urls_num
coverage_statistics = \
calculate_coverage_statistics(urls_num_sum, covered_urls_num_sum)
logger.info()
logger.info(f' Total: {coverage_statistics}\n')
class Response(requests.Response):
def __init__(self, status_code=HTTPStatus.OK):
super().__init__()
self.status_code = status_code
def log_req(method, url, params=None, data=None, json=None, headers=None):
logger.verbose(f'============================================================')
logger.verbose(f'[REQUEST] {method} {url}')
if params:
logger.verbose(f'params: {params}')
if data:
data = JSON.dumps(data, sort_keys=True, indent=4)
logger.verbose(f'data: {data}')
if json:
json = JSON.dumps(json, sort_keys=True, indent=4)
logger.verbose(f'json: {json}')
if headers:
headers = JSON.dumps(headers, sort_keys=True, indent=4)
logger.verbose(f'headers: {headers}')
def log_res(res: requests.Response):
req = res.request
logger.verbose(f'[RESPONSE] {req.method} {req.url} {res.status_code}')
try:
json = JSON.dumps(res.json(), sort_keys=True, indent=4).replace('\\"', '"')
lines_num = json.count('\n')
max_lines_num = Settings.LoggingResponseMaxLinesNum
if lines_num <= max_lines_num:
logger.verbose(f'json: {json}')
else:
stats = f'{lines_num}/{max_lines_num}'
logger.verbose(f'Maximum number of lines for response exceeded:', stats)
except ValueError:
logger.verbose('response:', res.content)
except Exception as e:
logger.verbose(e)
def request(method, url, headers=None, **kwargs):
APICoverageTracker().endpoint_is_called(url, method)
log_req(method, url, params=kwargs.get('params'), \
data=kwargs.get('data'), json=kwargs.get('json'), headers=headers)
res = requests.request(method, url, **kwargs)
log_res(res)
return res
def get(url, params=None, headers=None, **kwargs):
method = 'GET'
APICoverageTracker().endpoint_is_called(url, method)
log_req(method, url, params=params, \
data=kwargs.get('data'), json=kwargs.get('json'), headers=headers)
res = requests.get(url, params=params, headers=headers, **kwargs)
log_res(res)
return res
def options(url, headers=None, **kwargs):
method = 'OPTIONS'
APICoverageTracker().endpoint_is_called(url, method)
log_req(method, url, params=kwargs.get('params'), \
data=kwargs.get('data'), json=kwargs.get('json'), headers=headers)
res = requests.options(url, headers=headers, **kwargs)
log_res(res)
return res
def head(url, headers=None, **kwargs):
method = 'HEAD'
APICoverageTracker().endpoint_is_called(url, method)
log_req(method, url, params=kwargs.get('params'), \
data=kwargs.get('data'), json=kwargs.get('json'), headers=headers)
res = requests.head(url, headers=headers, **kwargs)
log_res(res)
return res
def post(url, data=None, json=None, headers=None, **kwargs):
method = 'POST'
APICoverageTracker().endpoint_is_called(url, method)
log_req(method, url, params=kwargs.get('params'), \
data=data, json=json, headers=headers)
res = requests.post(url, data=data, json=json, headers=headers, **kwargs)
log_res(res)
return res
def put(url, data=None, headers=None, **kwargs):
method = 'PUT'
APICoverageTracker().endpoint_is_called(url, method)
log_req(method, url, params=kwargs.get('params'), \
data=data, json=kwargs.get('json'), headers=headers),
res = requests.put(url, data=data, headers=headers, **kwargs)
log_res(res)
return res
def patch(url, data=None, headers=None, **kwargs):
method = 'PATCH'
APICoverageTracker().endpoint_is_called(url, method)
log_req(method, url, params=kwargs.get('params'), \
data=data, json=kwargs.get('json'), headers=headers)
res = requests.patch(url, data=data, headers=headers, **kwargs)
log_res(res)
return res
def delete(url, headers=None, **kwargs):
method = 'DELETE'
APICoverageTracker().endpoint_is_called(url, method)
log_req(method, url, params=kwargs.get('params'), \
data=kwargs.get('data'), json=kwargs.get('json'), headers=headers)
res = requests.delete(url, headers=headers, **kwargs)
log_res(res)
return res

69
director/utils/logger.py Normal file
View File

@ -0,0 +1,69 @@
import inspect
import traceback
from utils.utils import trace_origin
class bcolors:
HEADER = '\033[95m'
OKBLUE = '\033[94m'
OKCYAN = '\033[96m'
OKGREEN = '\033[92m'
WARNING = '\033[93m'
FAIL = '\033[91m'
ENDC = '\033[0m'
BOLD = '\033[1m'
UNDERLINE = '\033[4m'
class logger:
def debug(*args, **kwargs):
from infra.config import Settings
if not Settings.EnableDebugMessages:
return
print(bcolors.HEADER, end='[DEBUG] ')
print(*args, **kwargs)
print(bcolors.ENDC, end='')
def error(*args, **kwargs):
print(bcolors.FAIL, end='[ERROR] ')
print(*args, **kwargs)
print(bcolors.ENDC, end='')
def warn(*args, trace_origin_flag=False, stacklevel=0, \
print_stack=False, **kwargs):
print(bcolors.WARNING, end='[WARNING] ')
if trace_origin_flag:
trace_origin(inspect.currentframe(), stacklevel)
if print_stack:
print(traceback.format_exc())
print(*args, **kwargs)
print(bcolors.ENDC, end='')
def verbose(*args, trace_origin_flag=False, stacklevel=0, **kwargs):
from infra.config import Settings
if not Settings.EnableVerboseMessages:
return
if trace_origin_flag:
trace_origin(inspect.currentframe(), stacklevel)
print(bcolors.OKCYAN, end='[VERBOSE] ')
print(*args, **kwargs)
print(bcolors.ENDC, end='')
def log(*args, **kwargs):
print(bcolors.OKGREEN, end='[LOG] ')
print(*args, **kwargs)
print(bcolors.ENDC, end='')
def info(*args, **kwargs):
print(bcolors.OKBLUE, end='[INFO] ')
print(*args, **kwargs)
print(bcolors.ENDC, end='')
class DisableVerbose(object):
def __enter__(self):
from infra.config import Settings
self.verbose_flag = Settings.EnableVerboseMessages
Settings.EnableVerboseMessages = False
def __exit__(self, exc_type, exc_value, traceback):
from infra.config import Settings
Settings.EnableVerboseMessages = self.verbose_flag

19
director/utils/utils.py Normal file
View File

@ -0,0 +1,19 @@
def str_to_bool(val: str):
if not val:
return False
val = val.lower()
if val in ('y', 'yes', 't', 'true', 'on', '1'):
return True
elif val in ('', 'n', 'no', 'f', 'false', 'off', '0'):
return False
else:
raise ValueError('invalid truth value %r' % (val,))
def trace_origin(initial_frame, stacklevel=0):
frame = initial_frame.f_back
for _ in range(stacklevel + 1):
frame = frame.f_back
file_name = frame.f_code.co_filename
line_number = frame.f_lineno
func_name = frame.f_code.co_name
print(file_name, ":", line_number, ": ", func_name, ": ")

27
director/wrapper.sh Executable file
View File

@ -0,0 +1,27 @@
#!/bin/bash
#
# Simple wrapper for executing behave within Docker.
#
# ENVIRONMENT VARIABLES:
#
# - REQUIREMENTS_PATH: requirements fullpath;
# default = "requirements.txt"
#
#
# install Python packages for testing purpose, if any.
#
if [ -z "$REQUIREMENTS_PATH" ]; then
REQUIREMENTS_PATH=requirements.txt
fi
if [ -f "$REQUIREMENTS_PATH" ]; then
pip3 install --no-cache-dir -r $REQUIREMENTS_PATH
fi
#
# execute behave
#
exec python -u director.py

View File

@ -10,7 +10,7 @@ from datetime import datetime, timedelta
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from config import Parser
from config import Parser, Mongo
token = Parser.GithubToken
@ -111,7 +111,7 @@ def scrape(q, out_file):
break
#print(all_repos)
from pymongo import MongoClient
client = MongoClient("mongodb://admin:admin@localhost:27017")
client = MongoClient(f'mongodb://{Mongo.Username}:{Mongo.Password}@{Mongo.Host}:{Mongo.Port}')
db = client['git']
collection = db['repos']
for repo in all_repos: