unified test reporter.
TestRail & Jenkins & Launchpad & Proboscis and Pytest - dish
diff --git a/__init__.py b/__init__.py
new file mode 100755
index 0000000..e69de29
--- /dev/null
+++ b/__init__.py
diff --git a/setup.py b/setup.py
new file mode 100755
index 0000000..a5b4264
--- /dev/null
+++ b/setup.py
@@ -0,0 +1,30 @@
+# Copyright 2013 - 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from setuptools import find_packages
+from setuptools import setup
+
+
+setup(
+ name='unified_test_reporter',
+ version='1.0.0',
+ description='Library for creating and publishing reports',
+ author='Mirantis, Inc.',
+ author_email='product@mirantis.com',
+ url='http://mirantis.com',
+ keywords='fuel universal unified_test_reporter',
+ zip_safe=False,
+ include_package_data=True,
+ packages=find_packages(),
+)
diff --git a/unified_test_reporter/.gitignore b/unified_test_reporter/.gitignore
new file mode 100755
index 0000000..492f621
--- /dev/null
+++ b/unified_test_reporter/.gitignore
@@ -0,0 +1,40 @@
+*.pyc
+*.sqlite
+
+*.gem
+
+# vim swap files
+.*.swp
+
+# services' runtime files
+*.log
+*.pid
+
+# Vagrant housekeeping file
+.vagrant
+
+build
+dist
+/local_mirror
+/virtualbox/iso/*.iso
+nosetests.xml
+nailgun.log
+lock
+
+node_modules
+nailgun/static/build
+
+*.egg
+.testrepository
+.tox
+.venv
+.idea
+.DS_Store
+test_run/*
+.cache
+
+*.egg-info
+
+fuel-web-venv
+
+.bashrc
diff --git a/unified_test_reporter/__init__.py b/unified_test_reporter/__init__.py
new file mode 100755
index 0000000..ebb074b
--- /dev/null
+++ b/unified_test_reporter/__init__.py
@@ -0,0 +1,13 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
diff --git a/unified_test_reporter/providers/__init__.py b/unified_test_reporter/providers/__init__.py
new file mode 100755
index 0000000..e69de29
--- /dev/null
+++ b/unified_test_reporter/providers/__init__.py
diff --git a/unified_test_reporter/providers/jenkins_client.py b/unified_test_reporter/providers/jenkins_client.py
new file mode 100755
index 0000000..878288e
--- /dev/null
+++ b/unified_test_reporter/providers/jenkins_client.py
@@ -0,0 +1,461 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+import re
+import time
+
+import requests
+import xmltodict
+from cached_property import cached_property
+from requests.packages.urllib3 import disable_warnings
+
+from unified_test_reporter.providers.providers import TestResultProvider
+from unified_test_reporter.settings import JENKINS
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.settings import logger
+from unified_test_reporter.pylib.pylib import make_cleanup
+
+disable_warnings()
+
+
+class Build(TestResultProvider):
+ def __init__(self, name, number='latest'):
+ """Get build info via Jenkins API, get test info via direct HTTP
+ request.
+
+ If number is 'latest', get latest completed build.
+ """
+
+ self.name = name
+ self.requested_number = number
+ self.job_info = self.get_job_info(depth=0)
+ self.latest_number = self.job_info["lastCompletedBuild"]["number"]
+ self.latest_started = self.job_info["lastBuild"]["number"]
+ if number == 'latest':
+ self.number = int(self.latest_number)
+ elif number == 'latest_started':
+ self.number = int(self.latest_started)
+ elif number is None:
+ self.number = int(self.latest_number)
+ else:
+ self.number = int(number)
+ self.build_data = self.get_build_data(depth=0)
+ self.url = self.build_data["url"]
+ self.results = self.get_results()
+ self.failures = self.get_build_failure_reasons()
+
+ def get_job_info(self, depth=1):
+ job_url = "/".join([JENKINS["url"], 'job',
+ self.name,
+ 'api/json?depth={depth}'.format(depth=depth)])
+ logger.debug("Request job info from {}".format(job_url))
+ return requests.get(job_url).json()
+
+ def get_build_data(self, depth=1):
+ build_url = "/".join([JENKINS["url"], 'job',
+ self.name,
+ str(self.number),
+ 'api/json?depth={depth}'.format(depth=depth)])
+ logger.debug("Request build data from {}".format(build_url))
+ return requests.get(build_url).json()
+
+ def get_job_console(self):
+ job_url = "/".join([JENKINS["url"], 'job', self.name,
+ str(self.number), 'consoleText'])
+ logger.debug("Request job console from {}".format(job_url))
+ return requests.get(job_url).text.split('\n')
+
+ def get_environment_variables(self):
+ build_url = "/".join([JENKINS["url"], 'job',
+ self.name,
+ str(self.number),
+ 'injectedEnvVars',
+ 'api/json'])
+ logger.debug("Request Environment variables from {}".format(build_url))
+ return requests.get(build_url).json()
+
+ @staticmethod
+ def get_build_artifact(url, artifact):
+ """Return content of job build artifact
+ """
+ url = "/".join([url, 'artifact', artifact])
+ logger.debug("Request artifact content from {}".format(url))
+ return requests.get(url).text
+
+ @staticmethod
+ def get_downstream_builds_from_html(url):
+ """Return list of downstream jobs builds from specified job
+ """
+ url = "/".join([url, 'downstreambuildview/'])
+ logger.debug("Request downstream builds data from {}".format(url))
+ response = requests.get(url).text
+ jobs = []
+ raw_downstream_builds = re.findall(
+ '.*downstream-buildview.*href="(/job/\S+/[0-9]+/).*', response)
+ for raw_build in raw_downstream_builds:
+ sub_job_name = raw_build.split('/')[2]
+ sub_job_build = raw_build.split('/')[3]
+ build = Build(name=sub_job_name, number=sub_job_build)
+ jobs.append(
+ {
+ 'name': build.name,
+ 'number': build.number,
+ 'result': build.build_data['result']
+ }
+ )
+ return jobs
+
+ @staticmethod
+ def get_jobs_for_view(view):
+ """Return list of jobs from specified view
+ """
+ view_url = "/".join([JENKINS["url"], 'view', view, 'api/json'])
+ logger.debug("Request view data from {}".format(view_url))
+ view_data = requests.get(view_url).json()
+ jobs = [job["name"] for job in view_data["jobs"]]
+ return jobs
+
+ @staticmethod
+ def get_test_data(url, result_path=None):
+ """ Get build test data from Jenkins from testReport api
+
+ :param: None
+ :return: test_data: dict - test result info or None otherwise
+ """
+ if result_path:
+ test_url = "/".join(
+ [url.rstrip("/"), 'testReport'] + result_path + ['api/json'])
+ else:
+ test_url = "/".join([url.rstrip("/"), 'testReport', 'api/json'])
+
+ logger.debug("Request test data from {}".format(test_url))
+ return requests.get(test_url).json()
+
+ def get_groups(self, distros):
+ """ Get build test groups from Jenkins
+
+ :param: distros - list of os which shall be included
+ :return: groups: dict - group info or None otherwise
+ """
+ res = {}
+
+ def _get_suffix(distros, job_name):
+ for distro in distros:
+ if distro in job_name:
+ sep = '.' + distro + '.'
+ job_suffix = job_name.split(sep)[-1]
+ break
+ else:
+ job_suffix = job_name.split('.')[-1]
+ return job_suffix
+
+ if not self.build_data['subBuilds']:
+ test_group = self.get_environment_variables.get('TEST_GROUP')
+ job_suffix = _get_suffix(distros, self.name)
+ res[job_suffix] = test_group
+ else:
+ for b in self.build_data['subBuilds']:
+ if b['result'] is None:
+ logger.debug("Skipping '{0}' job (build #{1}) because it's still "
+ "running...".format(b['jobName'], b['buildNumber'], ))
+ continue
+ # Get the test group from the Environment variables
+ sub_build = Build(b['jobName'], b['buildNumber'])
+ test_group = sub_build.get_environment_variables().get('TEST_GROUP')
+ # Get the job suffix
+ job_suffix = _get_suffix(distros, b['jobName'])
+ res[job_suffix] = test_group
+ return res
+
+ def get_results(self):
+ """ Get build test data from Jenkins from nosetests.xml
+
+ :param: None
+ :return: test_data: dict - build info or None otherwise
+ """
+
+ test_data = None
+ logger.info('Request results from {} {}'.format(self.name,
+ self.number))
+ if not self.build_data:
+ logger.error('Getting subbuilds info is failed. '
+ 'Job={} Build={}'.format(self.name, self.number))
+ return test_data
+ try:
+ artifact_paths = [v for i in self.build_data.get('artifacts')
+ for k, v in i.items() if k == 'relativePath' and
+ v == JENKINS.get('xml_testresult_file_name')][0]
+ artifact_url = "/".join([JENKINS['url'], 'job', self.name,
+ str(self.number)])
+ xunit_data = self.get_build_artifact(artifact_url, artifact_paths)
+ test_data = xmltodict.parse(xunit_data, xml_attribs=True)
+ test_data.update({'build_number': self.number,
+ 'job_name': self.name,
+ 'job_url': self.build_data.get('url'),
+ 'job_description':
+ self.build_data.get('description'),
+ 'job_status': self.build_data.get('result')})
+ except:
+ test_data = None
+ return test_data
+
+ def test_data(self, result_path=None):
+ try:
+ data = self.get_test_data(self.url, result_path)
+ except Exception as e:
+ logger.warning("No test data for {0}: {1}".format(
+ self.url,
+ e,
+ ))
+ # If we failed to get any tests for the build, return
+ # meta test case 'jenkins' with status 'failed'.
+ data = {
+ "suites": [
+ {
+ "cases": [
+ {
+ "name": "jenkins",
+ "className": "jenkins",
+ "status": "failed",
+ "duration": 0
+ }
+ ]
+ }
+ ]
+ }
+
+ return data
+
+ def get_downstream_builds(self, status=None):
+ if 'subBuilds' not in self.build_data.keys():
+ return self.get_downstream_builds_from_html(self.build_data['url'])
+ return [{'name': b['jobName'], 'number': b['buildNumber'],
+ 'result': b['result']} for b in self.build_data['subBuilds']]
+
+ def generate_test_plan_name(self):
+ """ Generate name of TestPlan basing on iso image name
+ taken from Jenkins job build parameters"""
+ milestone, iso_number, prefix = self.get_version()
+ return ' '.join(filter(lambda x: bool(x),
+ (milestone, prefix, 'iso', '#' + str(iso_number))))
+
+ def generate_test_run_name(self):
+ """ Generate name of TestRun basing on iso image name
+ taken from Jenkins job build parameters"""
+ milestone = self.get_version()[0]
+ return ''.join(filter(lambda x: bool(x),
+ ('[', milestone, ']', ' Swarm')))
+
+ def get_job_parameter(self, parameter):
+ parameters = [a['parameters'] for a in self.build_data['actions']
+ if 'parameters' in a.keys()][0]
+ target_params = [p['value'] for p in parameters
+ if p['name'].lower() == str(parameter).lower()]
+ if len(target_params) > 0:
+ return target_params[0]
+
+ def get_version(self):
+ version = self.get_version_from_parameters()
+ if not version:
+ version = self.get_version_from_artifacts()
+ if not version:
+ version = self.get_version_from_upstream_job()
+ if not version:
+ raise Exception('Failed to get iso version from Jenkins jobs '
+ 'parameters/artifacts!')
+ return version
+
+ @staticmethod
+ def get_version_from_iso_name(iso_link):
+ match = re.search(r'.*\bfuel-(?P<prefix1>[a-zA-Z]*)-?(?P<version>\d+'
+ r'(?P<version2>\.\d+)+)-(?P<prefix2>[a-zA-Z]*)-?'
+ r'(?P<buildnum>\d+)-.*', iso_link)
+ if match:
+ return (match.group('version'),
+ int(match.group('buildnum')),
+ match.group('prefix1') or match.group('prefix2'))
+
+ def get_version_from_parameters(self):
+ custom_version = self.get_job_parameter('CUSTOM_VERSION')
+ if custom_version:
+ swarm_timestamp = self.build_data['timestamp'] // 1000 \
+ if 'timestamp' in self.build_data else None
+ return (TestRailSettings.milestone,
+ time.strftime("%D %H:%M", time.localtime(swarm_timestamp)),
+ custom_version)
+ iso_link = self.get_job_parameter('magnet_link')
+ if iso_link:
+ return self.get_version_from_iso_name(iso_link)
+
+ def get_version_from_upstream_job(self):
+ upstream_job = self.get_job_parameter('UPSTREAM_JOB_URL')
+ if not upstream_job:
+ return
+ causes = [a['causes'] for a in self.build_data['actions']
+ if 'causes' in a.keys()][0]
+ if len(causes) > 0:
+ upstream_job_name = causes[0]['upstreamProject']
+ upstream_build_number = causes[0]['upstreamBuild']
+ upstream_build = Build(upstream_job_name, upstream_build_number)
+ return (upstream_build.get_version_from_artifacts() or
+ upstream_build.get_version_from_parameters())
+
+ def get_version_from_artifacts(self):
+ if not any([artifact for artifact in self.build_data['artifacts']
+ if artifact['fileName'] == JENKINS['magnet_link_artifact']]):
+ return
+ iso_link = (self.get_build_artifact(
+ url=self.build_data['url'],
+ artifact=JENKINS['magnet_link_artifact']))
+ if iso_link:
+ return self.get_version_from_iso_name(iso_link)
+
+ def get_test_build(self, check_rebuild=False):
+ """Get test data from Jenkins job build
+ :param build_name: string
+ :param build_number: string
+ :param check_rebuild: bool, if True then look for newer job rebuild(s)
+ :return: dict
+ """
+ if self.test_data()['suites'][0]['cases'].pop()['name'] == 'jenkins':
+ if not check_rebuild:
+ return self
+ iso_magnet = self.get_job_parameter(self.build_data, 'MAGNET_LINK')
+ if not iso_magnet:
+ return self
+ latest_build_number = self.build_data('latest').number
+ for n in range(self.number, latest_build_number):
+ test_rebuild = Build(self.name, n + 1)
+ if test_rebuild.get_job_parameter('MAGNET_LINK') \
+ == iso_magnet:
+ logger.debug("Found test job rebuild: "
+ "{0}".format(test_rebuild.url))
+ return test_rebuild
+ return self
+
+ def get_sub_builds(self):
+ """ Gather all sub build info into subbuild list
+
+ :param build_number: int - Jenkins build number
+ :param job_name: str - Jenkins job_name
+ :param jenkins_url: str - Jenkins http url
+ :return: sub_builds: list of dicts or None otherwise
+ {build_info, test_data, failure_reasons}
+ where:
+ build_info(sub build specific info got from Jenkins)-dict
+ test_data(test data per one sub build)-dict
+ failure_reasons(failures per one sub build)-list
+ """
+
+ parent_build_info = self.build_data
+ sub_builds = None
+ if parent_build_info:
+ sub_builds = parent_build_info.get('subBuilds')
+ if sub_builds:
+ for i in sub_builds:
+ sub_build = Build(i.get('jobName'), i.get('buildNumber'))
+ if sub_build and sub_build.results:
+ i.update({'test_data': sub_build.results})
+ i.update({'description': sub_build.results.get('job_description')})
+ i.update({'failure_reasons': sub_build.failures})
+ return sub_builds
+
+ def get_build_failure_reasons(self):
+ """ Gather all failure reasons across all tests
+
+ :param test_data: dict - test data which were extracted from Jenkins
+ :return: test_data: list of dicts
+ {failure, test, build_number, job_name, url, test_url}
+ where:
+ failure(type and message were exctracted from nosetests.xml)-str
+ test(@classname was exctracted from nosetests.xml)-str
+ build_number(number which exctracted from build_info early)-int
+ job_name(Jenkins job name extracted from build_info early)-str
+ url(Jenkins job name full URL) - str
+ test_url(Jenkins test result URL) - str
+ [] otherwise
+ """
+ failure_reasons = []
+ if not (self.results and self.results.get('testsuite')):
+ return failure_reasons
+ for test in self.results.get('testsuite').get('testcase'):
+ failure_reason = None
+ if test.get('error'):
+ failure_reason = "___".join(['error',
+ 'type',
+ test.get('error', {}).get('@type', ''),
+ 'message',
+ test.get('error', {}).get('@message', '')])
+ elif test.get('failure'):
+ failure_reason = "___".join(['failure',
+ 'type',
+ test.get('failure', {}).get('@type', ''),
+ 'message',
+ test.get('failure', {}).get('@message', '')])
+ elif test.get('skipped'):
+ failure_reason = "___".join(['skipped',
+ 'type',
+ test.get('skipped', {}).get('@type', ''),
+ 'message',
+ test.get('skipped', {}).get('@message', '')])
+ if failure_reason:
+ failure_reason_cleanup = make_cleanup(failure_reason)
+ failure_reasons.append({'failure': failure_reason_cleanup,
+ 'failure_origin': failure_reason,
+ 'test': test.get('@classname'),
+ 'build_number':
+ self.results.get('build_number'),
+ 'job_name': self.results.get('job_name'),
+ 'job_url': self.results.get('job_url'),
+ 'job_status': self.results.get('job_status'),
+ 'test_fail_url':
+ "".join([self.results.get('job_url'),
+ 'testReport/(root)/',
+ test.get('@classname'),
+ '/', test.get('@name')])
+ })
+ return failure_reasons
+
+ def __str__(self):
+ string = "\n".join([
+ "{0}: {1}".format(*item) for item in self.build_record()
+ ])
+ return string
+
+ def build_record(self):
+ """Return list of pairs.
+
+ We cannot use dictionary, because columns are ordered.
+ """
+
+ data = [
+ ('number', str(self.number)),
+ ('name', self.name),
+ ('requested_number', self.requested_number),
+ ('latest_started', self.latest_started),
+ ('latest_number', self.latest_number),
+ ('id', self.build_data["id"]),
+ ('description', self.build_data["description"]),
+ ('url', self.build_data["url"]),
+ ]
+
+ test_data = self.test_data()
+ for suite in test_data['suites']:
+ for case in suite['cases']:
+ column_id = case['className'].lower().replace("_", "-")
+ data.append((column_id, case['status'].lower()))
+
+ return data
diff --git a/unified_test_reporter/providers/launchpad_client.py b/unified_test_reporter/providers/launchpad_client.py
new file mode 100755
index 0000000..7684578
--- /dev/null
+++ b/unified_test_reporter/providers/launchpad_client.py
@@ -0,0 +1,59 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+from launchpadlib.launchpad import Launchpad
+from unified_test_reporter.providers.providers import BugProvider
+
+
+class LaunchpadBug(BugProvider):
+ """LaunchpadBug.""" # TODO documentation
+
+ def __init__(self, bug_id):
+ self.launchpad = Launchpad.login_anonymously('just testing',
+ 'production',
+ '.cache')
+ self.bug = self.launchpad.bugs[int(bug_id)]
+
+ @property
+ def targets(self):
+ return [
+ {
+ 'project': task.bug_target_name.split('/')[0],
+ 'milestone': str(task.milestone).split('/')[-1],
+ 'status': task.status,
+ 'importance': task.importance,
+ 'title': task.title,
+ } for task in self.bug_tasks]
+
+ def get_bug_title(self):
+ """ Get bug title
+
+ :param none
+ :return: bug title - str
+ """
+ targets = self.targets
+ return targets[0].get('title', '')
+
+ def get_duplicate_of(self):
+ bug = self.bug
+ duplicates = []
+ while bug.duplicate_of and bug.id not in duplicates:
+ duplicates.append(bug.id)
+ bug = self.launchpad.load(str(bug.duplicate_of))
+ return LaunchpadBug(bug.id)
+
+ def __getattr__(self, item):
+ return self.bug.__getattr__(item)
diff --git a/unified_test_reporter/providers/proboscis_provider.py b/unified_test_reporter/providers/proboscis_provider.py
new file mode 100644
index 0000000..43c93d7
--- /dev/null
+++ b/unified_test_reporter/providers/proboscis_provider.py
@@ -0,0 +1,45 @@
+from cached_property import cached_property
+from proboscis import TestPlan
+from proboscis.decorators import DEFAULT_REGISTRY
+from system_test import define_custom_groups
+from system_test import discover_import_tests
+from system_test import get_basepath
+from system_test import register_system_test_cases
+from system_test import tests_directory
+from system_test.tests.base import ActionTest
+
+from unified_test_reporter.providers.providers import TestCaseProvider
+from unified_test_reporter.settings import logger
+
+
+GROUP_FIELD = 'custom_test_group'
+
+
+class ProbockisTestCaseProvider(TestCaseProvider):
+
+ @cached_property
+ def get_cases(self, groups):
+ discover_import_tests(get_basepath(), tests_directory)
+ define_custom_groups()
+ for one in groups:
+ register_system_test_cases(one)
+ return TestPlan.create_from_registry(DEFAULT_REGISTRY)
+
+ def _is_case_processable(case, tests):
+ if not case.entry.info.enabled or not hasattr(case.entry, 'parent'):
+ return False
+
+ parent_home = case.entry.parent.home
+ if issubclass(parent_home, ActionTest) and \
+ any([test[GROUP_FIELD] == parent_home.__name__ for test in tests]):
+ return False
+
+ # Skip @before_class methods without doc strings:
+ # they are just pre-checks, not separate tests cases
+ if case.entry.info.before_class:
+ if case.entry.home.func_doc is None:
+ logger.debug('Skipping method "{0}", because it is not a '
+ 'test case'.format(case.entry.home.func_name))
+ return False
+
+ return True
\ No newline at end of file
diff --git a/unified_test_reporter/providers/providers.py b/unified_test_reporter/providers/providers.py
new file mode 100755
index 0000000..caf2553
--- /dev/null
+++ b/unified_test_reporter/providers/providers.py
@@ -0,0 +1,252 @@
+from cached_property import cached_property
+import re
+import string
+
+from unified_test_reporter.settings import logger
+
+"""
+TESTS_RUNNER=10.0.swarm.runner
+TESTRAIL_TEST_SUITE=[10.0] Swarm
+TESTRAIL_MILESTONE=10.0
+LAUNCHPAD_MILESTONE=10.0
+TESTRAIL_USER=****
+TESTRAIL_PASSWORD=****
+TESTRAIL_PROJECT='Mirantis OpenStack'
+TESTRAIL_URL=https://mirantis.testrail.com
+
+python fuelweb_test/testrail/upload_cases_description.py -v -l -j 10.0.swarm.runner
+get_test_group: https://product-ci.infra.mirantis.net/view/10.0_swarm/job/10.0.system_test.ubuntu.thread_1/76/injectedEnvVars/api/json
+
+
+"""
+class BugProvider(object):
+ @cached_property
+ def get_bugs(self):
+ raise NotImplemented
+
+class TestResultProvider(object):
+ @cached_property
+ def get_results(self):
+ raise NotImplemented
+
+
+class TestCaseProvider(object):
+ @cached_property
+ def get_groups(self):
+ raise NotImplemented
+
+ @cached_property
+ def get_cases(self):
+ raise NotImplemented
+
+ def _is_included(self, case_name, include):
+ if include and case_name not in include:
+ logger.debug("Skipping '{0}' test because it doesn't "
+ "contain '{1}' in method name".format(case_name,
+ include))
+ return False
+ else:
+ return True
+
+ def _is_excluded(self, case_name, exclude):
+ if exclude and case_name in exclude:
+ logger.debug("Skipping '{0}' test because it contains"
+ " '{1}' in method name".format(case_name, exclude))
+ return True
+ else:
+ return False
+
+class DocStringProvider(object):
+
+ STEP_NUM_PATTERN = re.compile(r'^(\d{1,3})[.].+')
+ DURATION_PATTERN = re.compile(r'Duration:?\s+(\d+(?:[sm]|\s?m))(?:in)?\b')
+
+ @staticmethod
+ def _get_docstring(parent_home, case_state, home):
+ if issubclass(parent_home, ActionTest):
+ docstring = parent_home.__doc__.split('\n')
+ case_state.instance._load_config()
+ configuration = case_state.instance.config_name
+ docstring[0] = '{0} on {1}'.format(docstring[0], configuration)
+ docstring = '\n'.join(docstring)
+ else:
+ docstring = home.func_doc or ''
+ return docstring
+
+ @staticmethod
+ def _parse_docstring(s, case):
+ split_s = s.strip().split('\n\n')
+ title_r, steps_r, duration_r = self._unpack_docstring(split_s)
+ title = self._parse_title(title_r, case) if title_r else ''
+ steps = self._parse_steps(steps_r) if steps_r else ''
+ duration = self._parse_duration(duration_r)
+ return title, steps, duration
+
+ @staticmethod
+ def _unpack_docstring(items):
+ count = len(items)
+ title = steps = duration = ''
+ if count > 3:
+ title, steps, duration, _ = self._unpack_list(*items)
+ elif count == 3:
+ title, steps, duration = items
+ elif count == 2:
+ title, steps = items
+ elif count == 1:
+ title = items
+ return title, steps, duration
+
+ @staticmethod
+ def _unpack_list(title, steps, duration, *other):
+ return title, steps, duration, other
+
+ @staticmethod
+ def _parse_title(s, case):
+ title = ' '.join(map(string.strip, s.split('\n')))
+ return title if title else case.entry.home.func_name
+
+ @staticmethod
+ def _parse_steps(strings):
+ steps = []
+ index = -1
+ for s_raw in strings.strip().split('\n'):
+ s = s_raw.strip()
+ _match = self.STEP_NUM_PATTERN.search(s)
+ if _match:
+ steps.append({'content': _match.group(), 'expected': 'pass'})
+ index += 1
+ else:
+ if index > -1:
+ steps[index]['content'] = ' '.join([steps[index]['content'],
+ s])
+ return steps
+
+ @staticmethod
+ def _parse_duration(s):
+ match = self.DURATION_PATTERN.search(s)
+ return match.group(1).replace(' ', '') if match else '3m'
+
+
+class TestPublisher(object):
+ def add_descriptions(self):
+ """ Publish test case decription
+ example:
+ {"descriptions": [{
+ "test_name": "Check VirtLib",
+ "test_id": "101",
+ "steps":[
+ {"step_id": "1",
+ "content": "Step 1",
+ "expected": "Expected Result 1",
+ "actual": "Actual Result 1"},
+ {"step_id": "2",
+ "content": "Step 2",
+ "expected": "Expected Result 2",
+ "actual": "Actual Result 2",
+ "status_id": "2"}]
+ }]}
+ :return: 1/0
+ """
+ raise NotImplemented
+
+ def add_results(self):
+ """ Publish test case results
+ status_id:
+ 1 Passed
+ 2 Blocked
+ 3 Untested
+ 4 Retest
+ 5 Failed
+ {"results": [
+ {"test_name": "Check VirtLib",
+ "test_id": 101,
+ "status_id": 5,
+ "comment": "This test failed",
+ "elapsed": "15s",
+ "defects": ["TR-7", "LP-1010"],
+ "steps": [
+ {"step_id": 1,
+ "status_id": 1},
+ {"step_id": 2,
+ "expected": "2",
+ "actual": "3",
+ "status_id": 5}]
+ },
+ {"test_name": "Check IPMILib",
+ "test_id": 102,
+ "status_id": 1,
+ "comment": "This test passed",
+ "elapsed": "5m"
+ }]}
+ :return: 1/0
+ """
+ raise NotImplemented
+
+
+class NoseTestTestResultProvider(TestResultProvider):
+ pass
+
+
+class TestResult(object):
+ """TestResult.""" # TODO documentation
+
+ def __init__(self, name, group, status, duration, url=None,
+ version=None, description=None, comments=None,
+ launchpad_bug=None, steps=None):
+ self.name = name
+ self.group = group
+ self._status = status
+ self.duration = duration
+ self.url = url
+ self._version = version
+ self.description = description
+ self.comments = comments
+ self.launchpad_bug = launchpad_bug
+ self.available_statuses = {
+ 'passed': ['passed', 'fixed'],
+ 'failed': ['failed', 'regression'],
+ 'skipped': ['skipped'],
+ 'blocked': ['blocked'],
+ 'custom_status2': ['in_progress']
+ }
+ self._steps = steps
+
+ @property
+ def version(self):
+ # Version string length is limited by 250 symbols because field in
+ # TestRail has type 'String'. This limitation can be removed by
+ # changing field type to 'Text'
+ return (self._version or '')[:250]
+
+ @version.setter
+ def version(self, value):
+ self._version = value[:250]
+
+ @property
+ def status(self):
+ for s in self.available_statuses.keys():
+ if self._status in self.available_statuses[s]:
+ return s
+ logger.error('Unsupported result status: "{0}"!'.format(self._status))
+ return self._status
+
+ @status.setter
+ def status(self, value):
+ self._status = value
+
+ @property
+ def steps(self):
+ return self._steps
+
+ def __str__(self):
+ result_dict = {
+ 'name': self.name,
+ 'group': self.group,
+ 'status': self.status,
+ 'duration': self.duration,
+ 'url': self.url,
+ 'version': self.version,
+ 'description': self.description,
+ 'comments': self.comments
+ }
+ return str(result_dict)
diff --git a/unified_test_reporter/providers/pytest_provider.py b/unified_test_reporter/providers/pytest_provider.py
new file mode 100644
index 0000000..34fde7f
--- /dev/null
+++ b/unified_test_reporter/providers/pytest_provider.py
@@ -0,0 +1,30 @@
+# pylint: disable=no-name-in-module
+# noinspection PyUnresolvedReferences
+from pytest import Session
+# pylint: enable=no-name-in-module
+from _pytest.config import _prepareconfig
+from _pytest.python import FixtureManager
+from _pytest.mark import MarkMapping
+
+from unified_test_reporter.providers.providers import TestCaseProvider
+
+
+class PyTestTestCaseProvider(TestCaseProvider):
+
+ def get_cases(self, group):
+ config = _prepareconfig(args=str(""))
+ session = Session(config)
+ session._fixturemanager = FixtureManager(session)
+ ret = [i for i
+ in session.perform_collect() if
+ group in list(MarkMapping(i.keywords)._mymarks)]
+ return ret
+
+ def group_in(group):
+ config = _prepareconfig(args=str(""))
+ session = Session(config)
+ session._fixturemanager = FixtureManager(session)
+ l = [list(MarkMapping(i.keywords)._mymarks) for i
+ in session.perform_collect()]
+ groups = set([item for sublist in l for item in sublist])
+ return group in groups
\ No newline at end of file
diff --git a/unified_test_reporter/providers/testrail_client.py b/unified_test_reporter/providers/testrail_client.py
new file mode 100755
index 0000000..c6cf2af
--- /dev/null
+++ b/unified_test_reporter/providers/testrail_client.py
@@ -0,0 +1,618 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+import re
+
+from unified_test_reporter.providers.jenkins_client import Build
+from unified_test_reporter.providers.launchpad_client import LaunchpadBug
+from unified_test_reporter.providers.providers import BugProvider
+from unified_test_reporter.providers.providers import TestCaseProvider
+from unified_test_reporter.providers.providers import TestPublisher
+from unified_test_reporter.providers.providers import TestResultProvider
+from unified_test_reporter.pylib.testrail import APIClient
+from unified_test_reporter.pylib.testrail import APIError
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.settings import logger
+
+class TestRailProject(TestPublisher, TestCaseProvider,
+ TestResultProvider, BugProvider):
+ """TestRailProject.""" # TODO documentation
+
+ def __init__(self, url, user, password, project):
+ self.client = APIClient(base_url=url)
+ self.client.user = user
+ self.client.password = password
+ self.project = self._get_project(project)
+
+ def _get_project(self, project_name):
+ projects_uri = 'get_projects'
+ projects = self.client.send_get(uri=projects_uri)
+ for project in projects:
+ if project['name'] == project_name:
+ return project
+ return None
+
+ def test_run_struct(self, name, suite_id, milestone_id, description,
+ config_ids, include_all=True, assignedto=None,
+ case_ids=None):
+ struct = {
+ 'name': name,
+ 'suite_id': suite_id,
+ 'milestone_id': milestone_id,
+ 'description': description,
+ 'include_all': include_all,
+ 'config_ids': config_ids
+ }
+ if case_ids:
+ struct['include_all'] = False
+ struct['case_ids'] = case_ids
+ if assignedto:
+ struct['assignedto_id'] = self.get_user(assignedto)['id']
+ return struct
+
+ def get_users(self):
+ users_uri = 'get_users'
+ return self.client.send_get(uri=users_uri)
+
+ def get_user(self, user_id):
+ user_uri = 'get_user/{user_id}'.format(user_id=user_id)
+ return self.client.send_get(uri=user_uri)
+
+ def get_user_by_name(self, name):
+ for user in self.get_users():
+ if user['name'] == name:
+ return self.get_user(user_id=user['id'])
+
+ def get_configs(self):
+ configs_uri = 'get_configs/{project_id}'.format(
+ project_id=self.project['id'])
+ return self.client.send_get(configs_uri)
+
+ def get_config(self, config_id):
+ for configs in self.get_configs():
+ for config in configs['configs']:
+ if config['id'] == int(config_id):
+ return config
+
+ def get_config_by_name(self, name):
+ for config in self.get_configs():
+ if config['name'] == name:
+ return config
+
+ def get_priorities(self):
+ priorities_uri = 'get_priorities'
+ return self.client.send_get(uri=priorities_uri)
+
+ def get_milestones(self):
+ milestones_uri = 'get_milestones/{project_id}'.format(
+ project_id=self.project['id'])
+ return self.client.send_get(uri=milestones_uri)
+
+ def get_milestone(self, milestone_id):
+ milestone_uri = 'get_milestone/{milestone_id}'.format(
+ milestone_id=milestone_id)
+ return self.client.send_get(uri=milestone_uri)
+
+ def get_milestone_by_name(self, name):
+ for milestone in self.get_milestones():
+ if milestone['name'] == name:
+ return self.get_milestone(milestone_id=milestone['id'])
+
+ def get_suites(self):
+ suites_uri = 'get_suites/{project_id}'.format(
+ project_id=self.project['id'])
+ return self.client.send_get(uri=suites_uri)
+
+ def get_suite(self, suite_id):
+ suite_uri = 'get_suite/{suite_id}'.format(suite_id=suite_id)
+ return self.client.send_get(uri=suite_uri)
+
+ def get_suite_by_name(self, name):
+ for suite in self.get_suites():
+ if suite['name'] == name:
+ return self.get_suite(suite_id=suite['id'])
+
+ def get_sections(self, suite_id):
+ sections_uri = 'get_sections/{project_id}&suite_id={suite_id}'.format(
+ project_id=self.project['id'],
+ suite_id=suite_id
+ )
+ return self.client.send_get(sections_uri)
+
+ def get_section(self, section_id):
+ section_uri = 'get_section/{section_id}'.format(section_id=section_id)
+ return self.client.send_get(section_uri)
+
+ def get_section_by_name(self, suite_id, section_name):
+ for section in self.get_sections(suite_id=suite_id):
+ if section['name'] == section_name:
+ return self.get_section(section_id=section['id'])
+
+ def create_section(self, suite_id, name, parent_id=None):
+ return self.client.send_post('add_section/' + str(self.project['id']),
+ dict(suite_id=suite_id, name=name,
+ parent_id=parent_id))
+
+ def delete_section(self, section_id):
+ return self.client.send_post('delete_section/' + str(section_id), {})
+
+ def create_suite(self, name, description=None):
+ return self.client.send_post('add_suite/' + str(self.project['id']),
+ dict(name=name, description=description))
+
+ def get_cases(self, suite_id, section_id=None):
+ cases_uri = 'get_cases/{project_id}&suite_id={suite_id}'.format(
+ project_id=self.project['id'],
+ suite_id=suite_id
+ )
+ if section_id:
+ cases_uri = '{0}§ion_id={section_id}'.format(
+ cases_uri, section_id=section_id
+ )
+ return self.client.send_get(cases_uri)
+
+ def get_case(self, case_id):
+ case_uri = 'get_case/{case_id}'.format(case_id=case_id)
+ return self.client.send_get(case_uri)
+
+ def get_case_by_name(self, suite_id, name, cases=None):
+ for case in cases or self.get_cases(suite_id):
+ if case['title'] == name:
+ return self.get_case(case_id=case['id'])
+
+ def get_case_by_group(self, suite_id, group, cases=None):
+ for case in cases or self.get_cases(suite_id):
+ if case['custom_test_group'] == group:
+ return self.get_case(case_id=case['id'])
+
+ def add_case(self, section_id, case):
+ add_case_uri = 'add_case/{section_id}'.format(section_id=section_id)
+ return self.client.send_post(add_case_uri, case)
+
+ def update_case(self, case_id, fields):
+ return self.client.send_post('update_case/{0}'.format(case_id), fields)
+
+ def delete_case(self, case_id):
+ return self.client.send_post('delete_case/' + str(case_id), None)
+
+ def get_case_fields(self):
+ return self.client.send_get('get_case_fields')
+
+ def get_plans(self, milestone_ids=None, limit=None, offset=None):
+ plans_uri = 'get_plans/{project_id}'.format(
+ project_id=self.project['id'])
+ if milestone_ids:
+ plans_uri += '&milestone_id=' + ','.join([str(m)
+ for m in milestone_ids])
+ if limit:
+ plans_uri += '&limit={0}'.format(limit)
+ if offset:
+ plans_uri += '&offset={0}'.format(offset)
+ return self.client.send_get(plans_uri)
+
+ def get_plan(self, plan_id):
+ plan_uri = 'get_plan/{plan_id}'.format(plan_id=plan_id)
+ return self.client.send_get(plan_uri)
+
+ def get_plan_by_name(self, name):
+ for plan in self.get_plans():
+ if plan['name'] == name:
+ return self.get_plan(plan['id'])
+
+ def add_plan(self, name, description, milestone_id, entries):
+ add_plan_uri = 'add_plan/{project_id}'.format(
+ project_id=self.project['id'])
+ new_plan = {
+ 'name': name,
+ 'description': description,
+ 'milestone_id': milestone_id,
+ 'entries': entries
+ }
+ return self.client.send_post(add_plan_uri, new_plan)
+
+ def update_plan(self, plan_id, name='', description='',
+ milestone_id=None, entries=None):
+ if entries is None:
+ entries = []
+ update_plan_uri = 'update_plan/{plan_id}'.format(
+ plan_id=plan_id)
+ updated_plan = {}
+ if name:
+ updated_plan['name'] = name
+ if description:
+ updated_plan['description'] = description
+ if milestone_id:
+ updated_plan['milestone_id'] = milestone_id
+ if entries:
+ updated_plan['entries'] = entries
+ return self.client.send_post(update_plan_uri, updated_plan)
+
+ def add_plan_entry(self, plan_id, suite_id, config_ids, runs, name=None):
+ add_plan_entry_uri = 'add_plan_entry/{plan_id}'.format(plan_id=plan_id)
+ new_entry = {
+ 'suite_id': suite_id,
+ 'config_ids': config_ids,
+ 'runs': runs
+ }
+ if name:
+ new_entry['name'] = name
+ return self.client.send_post(add_plan_entry_uri, new_entry)
+
+ def delete_plan(self, plan_id):
+ delete_plan_uri = 'delete_plan/{plan_id}'.format(plan_id=plan_id)
+ self.client.send_post(delete_plan_uri, {})
+
+ def get_runs(self):
+ runs_uri = 'get_runs/{project_id}'.format(
+ project_id=self.project['id'])
+ return self.client.send_get(uri=runs_uri)
+
+ def get_run(self, run_id):
+ run_uri = 'get_run/{run_id}'.format(run_id=run_id)
+ return self.client.send_get(uri=run_uri)
+
+ def get_run_by_name(self, name):
+ for run in self.get_runs():
+ if run['name'] == name:
+ return self.get_run(run_id=run['id'])
+
+ def get_previous_runs(self, milestone_id, suite_id, config_id, limit=None):
+ previous_runs = []
+ offset = 0
+
+ while len(previous_runs) < limit:
+ existing_plans = self.get_plans(milestone_ids=[milestone_id],
+ limit=limit,
+ offset=offset)
+ if not existing_plans:
+ break
+
+ for plan in existing_plans:
+ for entry in self.get_plan(plan['id'])['entries']:
+ if entry['suite_id'] == suite_id:
+ run_ids = [run for run in entry['runs'] if
+ config_id in run['config_ids']]
+ previous_runs.extend(run_ids)
+
+ offset += limit
+
+ return previous_runs
+
+ def get_runid_by_planid(self, plan_id, run_name):
+ """ Get test rail runid
+
+ :param testplan_id: testreil testplan id
+ :param runname: testreil runname
+ :return: id: testrail run id
+ """
+ testplan = self.get_plan(plan_id)
+ for j in testplan.get('entries'):
+ for k in j.get('runs'):
+ if k.get('name') == run_name:
+ return k.get('id')
+ return None
+
+ def add_run(self, new_run):
+ add_run_uri = 'add_run/{project_id}'.format(
+ project_id=self.project['id'])
+ return self.client.send_post(add_run_uri, new_run)
+
+ def update_run(self, name, milestone_id=None, description=None,
+ config_ids=None, include_all=None, case_ids=None):
+ tests_run = self.get_run(name)
+ update_run_uri = 'update_run/{run_id}'.format(run_id=tests_run['id'])
+ update_run = {}
+ if milestone_id:
+ update_run['milestone_id'] = milestone_id
+ if description:
+ update_run['description'] = description
+ if include_all is not None:
+ update_run['include_all'] = include_all is True
+ if case_ids:
+ update_run['case_ids'] = case_ids
+ if config_ids:
+ update_run['config_ids'] = config_ids
+ return self.client.send_post(update_run_uri, update_run)
+
+ def create_or_update_run(self, name, suite, milestone_id, description,
+ config_ids, include_all=True, assignedto=None,
+ case_ids=None):
+ if self.get_run(name):
+ self.update_run(name=name,
+ milestone_id=milestone_id,
+ description=description,
+ config_ids=config_ids,
+ include_all=include_all,
+ case_ids=case_ids)
+ else:
+ self.add_run(self.test_run_struct(name, suite, milestone_id,
+ description, config_ids,
+ include_all=include_all,
+ assignedto=assignedto,
+ case_ids=case_ids))
+
+ def get_statuses(self):
+ statuses_uri = 'get_statuses'
+ return self.client.send_get(statuses_uri)
+
+ def get_status(self, name):
+ for status in self.get_statuses():
+ if status['name'] == name:
+ return status
+
+ def get_tests(self, run_id, status_id=None):
+ tests_uri = 'get_tests/{run_id}'.format(run_id=run_id)
+ if status_id:
+ tests_uri = '{0}&status_id={1}'.format(tests_uri,
+ ','.join(status_id))
+ return self.client.send_get(tests_uri)
+
+ def get_test(self, test_id):
+ test_uri = 'get_test/{test_id}'.format(test_id=test_id)
+ return self.client.send_get(test_uri)
+
+ def get_test_by_name(self, run_id, name):
+ for test in self.get_tests(run_id):
+ if test['title'] == name:
+ return self.get_test(test_id=test['id'])
+
+ def get_test_by_group(self, run_id, group, tests=None):
+ for test in tests or self.get_tests(run_id):
+ if test['custom_test_group'] == group:
+ return self.get_test(test_id=test['id'])
+
+ def get_test_by_name_and_group(self, run_id, name, group):
+ for test in self.get_tests(run_id):
+ if test['title'] == name and test['custom_test_group'] == group:
+ return self.get_test(test_id=test['id'])
+
+ def get_tests_by_group(self, run_id, group, tests=None):
+ test_list = []
+ for test in tests or self.get_tests(run_id):
+ if test['custom_test_group'] == group:
+ test_list.append(self.get_test(test_id=test['id']))
+ return test_list
+
+ def get_results_for_test(self, test_id, run_results=None):
+ if run_results:
+ test_results = []
+ for result in run_results:
+ if result['test_id'] == test_id:
+ test_results.append(result)
+ return test_results
+ results_uri = 'get_results/{test_id}'.format(test_id=test_id)
+ return self.client.send_get(results_uri)
+
+ def get_results_for_run(self, run_id, created_after=None,
+ created_before=None, created_by=None, limit=None,
+ offset=None, status_id=None):
+ results_run_uri = 'get_results_for_run/{run_id}'.format(run_id=run_id)
+ if created_after:
+ results_run_uri += '&created_after={}'.format(created_after)
+ if created_before:
+ results_run_uri += '&created_before={}'.format(created_before)
+ if created_by:
+ results_run_uri += '&created_by={}'.format(created_by)
+ if limit:
+ results_run_uri += '&limit={}'.format(limit)
+ if offset:
+ results_run_uri += '&offset={}'.format(offset)
+ if status_id:
+ results_run_uri += '&status_id={}'.format(status_id)
+ return self.client.send_get(results_run_uri)
+
+ def get_results_for_case(self, run_id, case_id):
+ results_case_uri = 'get_results_for_case/{run_id}/{case_id}'.format(
+ run_id=run_id, case_id=case_id)
+ return self.client.send_get(results_case_uri)
+
+ def get_all_results_for_case(self, run_ids, case_id):
+ all_results = []
+ for run_id in run_ids:
+ try:
+ results = self.get_results_for_case(run_id=run_id,
+ case_id=case_id)
+ except APIError as e:
+ logger.error("[{0}], run_id={1}, case_id={2}"
+ .format(e, run_id, case_id))
+ continue
+ all_results.extend(results)
+ return all_results
+
+ def add_results_for_test(self, test_id, test_results):
+ new_results = {
+ 'status_id': self.get_status(test_results.status)['id'],
+ 'comment': '\n'.join(filter(lambda x: x is not None,
+ [test_results.description,
+ test_results.url,
+ test_results.comments])),
+ 'elapsed': test_results.duration,
+ 'version': test_results.version
+ }
+ if test_results.steps:
+ new_results['custom_step_results'] = test_results.steps
+ return self.add_raw_results_for_test(test_id, new_results)
+
+ def add_raw_results_for_test(self, test_id, test_raw_results):
+ add_results_test_uri = 'add_result/{test_id}'.format(test_id=test_id)
+ return self.client.send_post(add_results_test_uri, test_raw_results)
+
+ def add_results_for_cases(self, run_id, suite_id, tests_results):
+ add_results_test_uri = 'add_results_for_cases/{run_id}'.format(
+ run_id=run_id)
+ new_results = {'results': []}
+ tests_cases = self.get_cases(suite_id)
+ for results in tests_results:
+ case = self.get_case_by_group(suite_id=suite_id,
+ group=results.group,
+ cases=tests_cases)
+ case_id = case['id']
+ new_result = {
+ 'case_id': case_id,
+ 'status_id': self.get_status(results.status)['id'],
+ 'comment': '\n'.join(filter(lambda x: x is not None,
+ [results.description,
+ results.url,
+ results.comments])),
+ 'elapsed': results.duration,
+ 'version': results.version,
+ 'custom_launchpad_bug': results.launchpad_bug
+ }
+ if results.steps:
+ custom_step_results = []
+ steps = case.get('custom_test_case_steps', None)
+ if steps and len(steps) == len(results.steps):
+ for s in zip(steps, results.steps):
+ custom_step_results.append({
+ "content": s[0]["content"],
+ "expected": s[0]["expected"],
+ "actual": s[1]['actual'],
+ "status_id": self.get_status(s[1]['status'])['id']
+ })
+ else:
+ for s in results.steps:
+ custom_step_results.append({
+ "content": s['name'],
+ "expected": 'pass',
+ "actual": s['actual'],
+ "status_id": self.get_status(s['status'])['id']
+ })
+ new_result['custom_test_case_steps_results'] = \
+ custom_step_results
+ new_results['results'].append(new_result)
+ return self.client.send_post(add_results_test_uri, new_results)
+
+ def add_results_for_tempest_cases(self, run_id, tests_results):
+ add_results_test_uri = 'add_results_for_cases/{run_id}'.format(
+ run_id=run_id)
+ new_results = {'results': tests_results}
+ return self.client.send_post(add_results_test_uri, new_results)
+
+ def add_results(self, xunitresults):
+ pass
+
+ def add_descriptions(self, descriptions):
+ pass
+
+ @staticmethod
+ def get_color(stat_id, statuses):
+ """ Get color for test result
+
+ :param stat_id: int - status id
+ :param statuses: list - statuses info extracted from TestRail
+ :return: color - str
+ """
+ for stat in statuses:
+ if stat_id == stat.get('id'):
+ color = str(hex(stat.get('color_dark', 0)))[2:]
+ return "#" + color
+
+ @staticmethod
+ def get_label(stat_id, statuses):
+ """ Get label for test result
+
+ :param stat_id: int - status id
+ :param statuses: list - statuses info extracted from TestRail
+ :return: label - str
+ """
+ for stat in statuses:
+ if stat_id == stat.get('id'):
+ return stat.get('label', 'None')
+
+ def get_testrail_test_urls(self, run_id, test_name):
+ """ Get test case url and test result url
+
+ :param tests: list - TestRail tests gathered by run_id
+ :param test_name: string - TestRail custom_test_group field
+ :return: test case and test result urls - dict
+ {} otherwise return back
+ """
+
+ for test in self.get_tests(run_id):
+ if test.get('custom_test_group') == test_name:
+ testcase_url = "".join([TestRailSettings.url,
+ '/index.php?/cases/view/',
+ str(test.get('case_id'))])
+ testresult_url = "".join([TestRailSettings.url,
+ '/index.php?/tests/view/',
+ str(test.get('id'))])
+ testresult_status = self.get_label(test.get('status_id'),
+ self.get_statuses())
+ testresult_color = self.get_color(test.get('status_id'),
+ self.get_statuses())
+ return {'testcase_url': testcase_url,
+ 'testresult_url': testresult_url,
+ 'testresult_status': testresult_status,
+ 'testresult_status_color': testresult_color}
+ return {}
+
+ def get_bugs(self, run_id):
+ """Get bugs of failed tests
+
+ :param run_id: testrail run
+ :return: bugs: dict - bugs extracted from testrail
+ and they are belong to those failed tests
+ """
+
+ total_bugs = {}
+ tests = self.get_tests(run_id)
+ results = self.get_results_for_run(run_id)
+ bugs = [(t.get('id'), rid.get('custom_launchpad_bug'), rid.get('status_id'))
+ for t in tests
+ for rid in results
+ if t.get('id') == rid.get('test_id') and rid.get('custom_launchpad_bug')]
+ for i in bugs:
+ if i[1] and i[1].find('bugs.launchpad.net') > 0:
+ key = i[0]
+ iid = int(re.search(r'.*bugs?/(\d+)/?', i[1]).group(1))
+ title = LaunchpadBug(iid).get_bug_title() or str(iid)
+ label = self.get_label(i[2], self.get_statuses())
+ color = self.get_color(i[2], self.get_statuses())
+ bug = {'id': iid,
+ 'url': i[1],
+ 'title': title,
+ 'label': label,
+ 'color': color}
+ if not total_bugs.get(key):
+ total_bugs[key] = []
+ else:
+ total_bugs[key].append(bug)
+ return total_bugs
+
+ def get_testrail_data_by_jenkinsjob(self, job_name, build_number):
+ """ Get test rail plan and run by Swarm Jenkins job
+
+ :param sub_builds: list of dict per each subbuild
+ :return: plan, run: tuple - TestRail plan and run dicts
+ """
+
+ build = Build(job_name, build_number)
+ planname = build.generate_test_plan_name()
+ runname = build.generate_test_run_name()
+ plan = self.get_plan_by_name(planname)
+ runid = self.get_runid_by_planid(plan.get('id'), runname)
+ run = self.get_run(runid)
+ milestone = self.get_milestone_by_name(TestRailSettings.milestone)
+ statuses = self.get_statuses()
+ tests = self.get_tests(run.get('id'))
+ results = self.get_results_for_run(run.get('id'))
+ return {'project': self.project,
+ 'plan': plan,
+ 'run': run,
+ 'milestone': milestone,
+ 'statuses': statuses,
+ 'tests': tests,
+ 'results': results}
\ No newline at end of file
diff --git a/unified_test_reporter/pylib/__init__.py b/unified_test_reporter/pylib/__init__.py
new file mode 100755
index 0000000..e69de29
--- /dev/null
+++ b/unified_test_reporter/pylib/__init__.py
diff --git a/unified_test_reporter/pylib/pylib.py b/unified_test_reporter/pylib/pylib.py
new file mode 100755
index 0000000..fdf0471
--- /dev/null
+++ b/unified_test_reporter/pylib/pylib.py
@@ -0,0 +1,114 @@
+from __future__ import division
+
+import functools
+import hashlib
+import re
+
+MINUTE = 60
+HOUR = MINUTE ** 2
+DAY = HOUR * 8
+WEEK = DAY * 5
+
+
+def retry(count=3):
+ def wrapped(func):
+ @functools.wraps(func)
+ def wrapper(*args, **kwargs):
+ i = 0
+ while True:
+ try:
+ return func(*args, **kwargs)
+ except:
+ i += 1
+ if i >= count:
+ raise
+ return wrapper
+ return wrapped
+
+
+def duration_to_testrail_estimate(duration):
+ """Converts duration in minutes to testrail estimate format
+ """
+ seconds = duration * MINUTE
+ week = seconds // WEEK
+ days = seconds % WEEK // DAY
+ hours = seconds % DAY // HOUR
+ minutes = seconds % HOUR // MINUTE
+ estimate = ''
+ for val, char in ((week, 'w'), (days, 'd'), (hours, 'h'), (minutes, 'm')):
+ if val:
+ estimate = ' '.join([estimate, '{0}{1}'.format(val, char)])
+ return estimate.lstrip()
+
+
+def get_sha(input_string):
+ """get sha hash
+
+ :param input_string: str - input string
+ :return: sha hash string
+ """
+
+ return hashlib.sha256(input_string).hexdigest()
+
+
+def make_cleanup(input_string):
+ """clean up string: remove IP/IP6/Mac/etc... by using regexp
+
+ :param input_string: str - input string
+ :return: s after regexp and clean up
+ """
+
+ # let's try to find all IP, IP6, MAC
+ ip4re = re.compile(r'\b(?:[0-9]{1,3}\.){3}[0-9]{1,3}\b')
+ ip6re = re.compile(r'\b(?:[a-fA-F0-9]{4}[:|\-]?){8}\b')
+ macre = re.compile(r'\b[a-fA-F0-9]{2}[:][a-fA-F0-9]{2}[:]'
+ r'[a-fA-F0-9]{2}[:][a-fA-F0-9]{2}[:]'
+ r'[a-fA-F0-9]{2}[:][a-fA-F0-9]{2}\b')
+ digitre = re.compile(r'\b(?:[0-9]{1,3}){1,50}\b')
+ hexre = re.compile(r'\b(?:[0-9a-fA-F]{1,8}){1,50}\b')
+ # punctuation = re.compile(r'["\'!,?.:;\(\)\{\}\[\]\/\\\<\>]+')
+
+ def ismatch(match):
+ """
+ :param match: string
+ :return: value or ''
+ """
+
+ value = match.group()
+ return " " if value else value
+
+ stmp = ip4re.sub(ismatch, input_string)
+ stmp = ip6re.sub(ismatch, stmp)
+ stmp = macre.sub(ismatch, stmp)
+ # stmp = punctuation.sub(ismatch, stmp)
+ stmp = digitre.sub('x', stmp)
+ listhex = hexre.findall(stmp)
+ if listhex:
+ for i in listhex:
+ stmp = hexre.sub('x' * len(i), stmp)
+ return stmp
+
+
+def distance(astr, bstr):
+ """Calculates the Levenshtein distance between a and b
+
+ :param astr: str - input string
+ :param bstr: str - input string
+ :return: distance: int - distance between astr and bstr
+ """
+
+ alen, blen = len(astr), len(bstr)
+ if alen > blen:
+ astr, bstr = bstr, astr
+ alen, blen = blen, alen
+ current_row = list(range(alen + 1)) # Keep current and previous row
+ for i in range(1, blen + 1):
+ previous_row, current_row = current_row, [i] + [0] * alen
+ for j in range(1, alen + 1):
+ add = previous_row[j] + 1
+ delete = current_row[j - 1] + 1
+ change = previous_row[j - 1]
+ if astr[j - 1] != bstr[i - 1]:
+ change += 1
+ current_row[j] = min(add, delete, change)
+ return current_row[alen]
\ No newline at end of file
diff --git a/unified_test_reporter/pylib/testrail.py b/unified_test_reporter/pylib/testrail.py
new file mode 100755
index 0000000..52d6024
--- /dev/null
+++ b/unified_test_reporter/pylib/testrail.py
@@ -0,0 +1,116 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+#
+# TestRail API binding for Python 2.x (API v2, available since
+# TestRail 3.0)
+#
+# Learn more:
+#
+# http://docs.gurock.com/testrail-api2/start
+# http://docs.gurock.com/testrail-api2/accessing
+#
+# Copyright Gurock Software GmbH. See license.md for details.
+#
+
+from __future__ import unicode_literals
+
+import base64
+import time
+
+import requests
+from requests.exceptions import HTTPError
+from requests.packages.urllib3 import disable_warnings
+
+from unified_test_reporter.settings import logger
+
+
+disable_warnings()
+
+
+def request_retry(codes):
+ log_msg = "Got {0} Error! Waiting {1} seconds and trying again..."
+
+ def retry_request(func):
+ def wrapper(*args, **kwargs):
+ iter_number = 0
+ while True:
+ try:
+ response = func(*args, **kwargs)
+ response.raise_for_status()
+ except HTTPError as e:
+ error_code = e.response.status_code
+ if error_code in codes:
+ if iter_number < codes[error_code]:
+ wait = 60
+ if 'Retry-After' in e.response.headers:
+ wait = int(e.response.headers['Retry-after'])
+ logger.debug(log_msg.format(error_code, wait))
+ time.sleep(wait)
+ iter_number += 1
+ continue
+ raise
+ else:
+ return response.json()
+ return wrapper
+ return retry_request
+
+
+class APIClient(object):
+ """APIClient.""" # TODO documentation
+
+ def __init__(self, base_url):
+ self.user = ''
+ self.password = ''
+ if not base_url.endswith('/'):
+ base_url += '/'
+ self.__url = base_url + 'index.php?/api/v2/'
+
+ def send_get(self, uri):
+ return self.__send_request('GET', uri, None)
+
+ def send_post(self, uri, data):
+ return self.__send_request('POST', uri, data)
+
+ def __send_request(self, method, uri, data):
+ retry_codes = {429: 3,
+ 503: 10}
+
+ @request_retry(codes=retry_codes)
+ def __get_response(_url, _headers, _data):
+ if method == 'POST':
+ return requests.post(_url, json=_data, headers=_headers)
+ return requests.get(_url, headers=_headers)
+
+ url = self.__url + uri
+
+ auth = base64.encodestring(
+ '{0}:{1}'.format(self.user, self.password)).strip()
+
+ headers = {'Authorization': 'Basic {}'.format(auth),
+ 'Content-Type': 'application/json'}
+
+ try:
+ return __get_response(url, headers, data)
+ except HTTPError as e:
+ if e.message:
+ error = e.message
+ else:
+ error = 'No additional error message received'
+ raise APIError('TestRail API returned HTTP {0}: "{1}"'.format(
+ e.response.status_code, error))
+
+
+class APIError(Exception):
+ """APIError.""" # TODO documentation
+ pass
diff --git a/unified_test_reporter/reports/__init__.py b/unified_test_reporter/reports/__init__.py
new file mode 100755
index 0000000..d5a9f6b
--- /dev/null
+++ b/unified_test_reporter/reports/__init__.py
@@ -0,0 +1,20 @@
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+import os
+
+from unified_test_reporter.settings import LOGS_DIR
+
+if not os.path.exists(LOGS_DIR):
+ os.makedirs(LOGS_DIR)
diff --git a/unified_test_reporter/reports/generate_failure_group_statistics.py b/unified_test_reporter/reports/generate_failure_group_statistics.py
new file mode 100755
index 0000000..5a18033
--- /dev/null
+++ b/unified_test_reporter/reports/generate_failure_group_statistics.py
@@ -0,0 +1,515 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+
+from __future__ import division
+
+import argparse
+import json
+import re
+import sys
+from logging import CRITICAL
+from logging import DEBUG
+
+import tablib
+
+from unified_test_reporter.providers.jenkins_client import Build
+from unified_test_reporter.providers.testrail_client import TestRailProject
+from unified_test_reporter.pylib.pylib import distance
+from unified_test_reporter.settings import FAILURE_GROUPING
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.settings import logger
+
+
+def get_bugs(subbuilds, testraildata):
+ """Get bugs of failed tests
+
+ :param sub_builds: list of dict per each subbuild
+ :param testraildata: list test results for testrail run
+ :return: bugs: dict - bugs extracted from testrail
+ and they are belong to those failed tests
+ """
+
+ if not testraildata.get('tests'):
+ return {}
+ total_bugs = ({str(j.get('test')): []
+ for i in subbuilds
+ for j in i.get('failure_reasons', [])})
+ tests = [(i, j.get('id')) for i in total_bugs.keys()
+ for j in testraildata.get('tests')
+ if i == j.get('custom_test_group')]
+ bugs = [(t, iid,
+ rid.get('custom_launchpad_bug'),
+ rid.get('status_id'))
+ for (t, iid) in tests
+ for rid in testraildata.get('results')
+ if iid == rid.get('test_id')]
+ for i in bugs:
+ if i[2] and i[2].find('bugs.launchpad.net') > 0:
+ iid = int(re.search(r'.*bugs?/(\d+)/?', i[2]).group(1))
+ title = Build.get_bug_title(iid) or str(iid)
+ label = Build.get_label(i[3], Build.get_statuses())
+ color = Build.get_color(i[3], Build.get_statuses())
+ item = {'id': iid,
+ 'url': i[2],
+ 'title': title,
+ 'label': label,
+ 'color': color}
+ total_bugs[i[0]].append(item)
+ return total_bugs
+
+
+def get_global_failure_group_list(
+ sub_builds, threshold=FAILURE_GROUPING.get('threshold')):
+ """ Filter out and grouping of all failure reasons across all tests
+
+ :param sub_builds: list of dict per each subbuild
+ :param threshold: float -threshold
+ :return: (failure_group_dict, failure_reasons): tuple or () otherwise
+ where:
+ failure_group_dict(all failure groups and
+ associated failed test info per each failure group) - dict
+ failure_reasons(all failures across all subbuild) - list
+ """
+ # let's find all failures in all builds
+ failure_reasons = []
+ failure_group_dict = {}
+ failure_group_list = []
+ for build in sub_builds:
+ if build.get('failure_reasons'):
+ for failure in build.get('failure_reasons'):
+ failure_reasons.append(failure)
+ failure_group_list.append(failure.get('failure'))
+ # let's truncate list
+ failure_group_list = list(set(failure_group_list))
+ # let's update failure_group_dict
+ for failure in failure_reasons:
+ if failure.get('failure') in failure_group_list:
+ key = failure.get('failure')
+ if not failure_group_dict.get(key):
+ failure_group_dict[key] = []
+ failure_group_dict[key].append(failure)
+ # let's find Levenshtein distance and update failure_group_dict
+ for num1, key1 in enumerate(failure_group_dict.keys()):
+ for key2 in failure_group_dict.keys()[num1 + 1:]:
+ # let's skip grouping if len are different more 10%
+ if key1 == key2 or abs(float(len(key1) / len(key2))) >\
+ FAILURE_GROUPING.get('max_len_diff'):
+ continue
+ # let's find other failures which can be grouped
+ # if normalized Levenshtein distance less threshold
+ llen = distance(key1, key2)
+ cal_threshold = float(llen) / max(len(key1), len(key2))
+ if cal_threshold < threshold:
+ # seems we shall combine those groups to one
+ failure_group_dict[key1].extend(failure_group_dict[key2])
+ logger.info("Those groups are going to be combined"
+ " due to Levenshtein distance\n"
+ " {}\n{}".format(key1, key2))
+ del failure_group_dict[key2]
+ return failure_group_dict, failure_reasons
+
+
+def update_subbuilds_failuregroup(sub_builds, failure_group_dict,
+ testrail_testdata, testrail_project, bugs):
+ """ update subbuilds by TestRail and Launchpad info
+
+ :param sub_builds: dict of subbuilds
+ :param failure_group_dict: dict of failures
+ :param testrail_testdata: dict - data extracted from TestRail
+ :param bugs: dict - data extracted from launchpad
+ :return: None
+ """
+
+ failure_reasons_builds = [i for j in sub_builds
+ for i in j.get('failure_reasons', {})]
+ if failure_reasons_builds:
+ for fail in failure_reasons_builds:
+ fail.update(
+ testrail_project.get_testrail_test_urls(
+ testrail_testdata.get('run').get('id'),
+ fail.get('test')))
+ fail.update({'bugs': bugs.get(fail.get('test'))})
+ for fgroup, flist in failure_group_dict.items():
+ for fail in failure_reasons_builds:
+ for ffail in flist:
+ if not fail.get('failure_group')\
+ and fail.get('failure') == ffail.get('failure'):
+ fail.update({'failure_group': fgroup})
+ if fail.get('test') == ffail.get('test'):
+ ffail.update({'testresult_status':
+ fail.get('testresult_status'),
+ 'testresult_status_color':
+ fail.get('testresult_status_color'),
+ 'testcase_url':
+ fail.get('testcase_url'),
+ 'testresult_url':
+ fail.get('testresult_url'),
+ 'bugs': fail.get('bugs')})
+
+
+def get_statistics(failure_group_dict, format_out=None):
+ """ Generate statistics for all failure reasons across all tests
+
+ Note: non hml format is going to be flat
+ :param failure_group_dict: dict of failures
+ :param testrail_tests: list of test cases extracted from TestRail
+ :param format_output: html, json, xls, xlsx, csv, yam
+ :return: statistics
+ """
+
+ if format_out != 'html':
+ return failure_group_dict
+ html_statistics = {}
+ failure_type_count = 0
+ failure_group_count = 0
+ ctests = list()
+ cbugs = list()
+ for failure, tests in failure_group_dict.items():
+ # let's through list of tests
+ ftype = failure.split('___message___')[0]
+ skipped = (ftype.find('skipped___type___') == 0)
+ if not skipped:
+ if not html_statistics.get(ftype):
+ html_statistics[ftype] = {}
+ failure_type_count += 1
+ if not html_statistics[ftype].get(failure):
+ html_statistics[ftype][failure] = []
+ failure_group_count += 1
+ for test in tests:
+ html_statistics[ftype][failure].append(test)
+ ctests.append(test.get('test'))
+ for bug in test.get('bugs', {}):
+ cbugs.append(bug.get('id'))
+ return {'html_statistics': html_statistics,
+ 'failure_type_count': failure_type_count,
+ 'failure_group_count': failure_group_count,
+ 'test_count': len(set(ctests)),
+ 'bug_count': len(set(cbugs))}
+
+
+def dump_statistics(statistics, build_number, job_name,
+ format_output=None, file_output=None):
+ """ Save statistics info to file according to requested format
+ Note: Please, follow tablib python lib supported formats
+ http://docs.python-tablib.org/en/latest/
+
+ non hml format is going to be flat
+ html format shall use rowspan for tests under one failure group
+
+ :param statistics: list
+ :param format_output: html, json, xls, xlsx, csv, yam
+ :param file_output: output file path
+ :return: None
+ """
+
+ filename = None
+ html_statistics = statistics.get('html_statistics')
+ data = tablib.Dataset()
+ html_top = "<html><body>"
+ html_total_count = "<table border=1><tr>" \
+ "<th>Build</th>" \
+ "<th>Job</th>" \
+ "<th>FailureTypeCount</th>" \
+ "<th>FailureGroupCount</th>" \
+ "<th>TestCount</th>" \
+ "<th>BugCount</th></tr>"\
+ "<tr><td><font color='#ff0000'>{}</font>" \
+ "</td><td>{}</td>" \
+ "<td>{}</td>" \
+ "<td><font color='#00ff00'>{}</font></td>" \
+ "<td>{}</td>" \
+ "<td><font color='#0000ff'>{}</font></td>" \
+ "</tr></table>".\
+ format(build_number,
+ job_name,
+ statistics.get('failure_type_count'),
+ statistics.get('failure_group_count'),
+ statistics.get('test_count'),
+ statistics.get('bug_count'))
+
+ html_failurestat_header = "<table border=1><tr><th>FailureType</th>" \
+ "<th>FailureGroup</th>" \
+ "<th>Test</th><th>Bug</th></tr>"
+ html_buttom = "</table></body></html>"
+ html = ""
+ if format_output and file_output:
+ filename = ".".join([file_output, format_output])
+ if format_output != 'html':
+ data.json = json.dumps(html_statistics)
+ else:
+ html_body = ""
+ for failure_type in html_statistics.keys():
+ rowspan_failure_type = len([j for i in html_statistics.
+ get(failure_type).keys()
+ for j in html_statistics.
+ get(failure_type).get(i)])
+ failure_groups = sorted(html_statistics.get(failure_type).keys())
+ rowspan_failure_group = len([j for j in html_statistics.
+ get(failure_type).
+ get(failure_groups[0])])
+ tests = html_statistics.get(failure_type).get(failure_groups[0])
+ failure_message = ": ".join(failure_groups[0].
+ split('___type___')[1].
+ split('___message___'))
+ failure_message = re.sub('\t', ' ',
+ failure_message)
+ failure_message = '<br>'.join(failure_message.splitlines())
+
+ html_bugs = "<br>". \
+ join(['<a href={}>#{}</a>: {}'.
+ format(bug.get('url'),
+ bug.get('id'),
+ bug.get('title'))
+ for bug in tests[0].get('bugs')])
+ html_tr = '<tr>' \
+ '<td rowspan="{}">count groups:{} / ' \
+ 'count tests:{}<br>{}</td>' \
+ '<td rowspan="{}">count tests: {}<br>{}</td>' \
+ '<td><font color={}>{}</font>' \
+ '<br><a href={}>{}</a>' \
+ '<br><a href={}>[job]</a></td>' \
+ '<td>{}</td>'\
+ '</tr>'.format(rowspan_failure_type,
+ len(failure_groups),
+ rowspan_failure_type,
+ failure_type,
+ rowspan_failure_group,
+ rowspan_failure_group,
+ failure_message,
+ tests[0].get('testresult_status_color'),
+ tests[0].get('testresult_status'),
+ tests[0].get('testresult_url'),
+ tests[0].get('test'),
+ tests[0].get('test_fail_url'),
+ html_bugs)
+ html_body += html_tr
+ if len(tests) > 1:
+ for i in tests[1:]:
+ html_bugs = "<br>".\
+ join(['<a href={}>#{}</a>: {}'.
+ format(bug.get('url'),
+ bug.get('id'),
+ bug.get('title'))
+ for bug in i.get('bugs')])
+ html_tr = "".join(["<tr>",
+ "<td><font color={}>{}</font>"
+ "<br><a href={}>{}</a>"
+ "<br><a href={}>[job]</a></td>\
+ <td>{}</td>".
+ format(i.get('testresult_status_color'),
+ i.get('testresult_status'),
+ i.get('testresult_url'),
+ i.get('test'),
+ i.get('test_fail_url'),
+ html_bugs),
+ "</tr>"])
+ html_body += html_tr
+ for fgroup in failure_groups[1:]:
+ tstat = html_statistics.get(failure_type).get(fgroup)
+ rowspan_fg = len(tstat)
+ failure_message = ": ".join(fgroup.
+ split('___type___')[1].
+ split('___message___'))
+ failure_message = re.sub('\t', ' ',
+ failure_message)
+ failure_message = '<br>'.join(failure_message.splitlines())
+ html_bugs = "<br>". \
+ join(['<a href={}>#{}</a>: {}'.
+ format(bug.get('url'),
+ bug.get('id'),
+ bug.get('title'))
+ for bug in tstat[0].get('bugs')])
+ html_tr = '<tr>' \
+ '<td rowspan="{}">{}<br>{}</td>' \
+ '<td><font color={}>{}</font>' \
+ '<br><a href={}>{}</a>' \
+ '<br><a href={}>[job]</a></td>' \
+ '<td>{}</td>' \
+ '</tr>'.format(rowspan_fg, rowspan_fg,
+ failure_message,
+ tstat[0].
+ get('testresult_status_color'),
+ tstat[0].get('testresult_status'),
+ tstat[0].get('testresult_url'),
+ tstat[0].get('test'),
+ tstat[0].get('test_fail_url'),
+ html_bugs)
+ html_body += html_tr
+ if len(tstat) > 1:
+ for i in tstat[1:]:
+ html_bugs = "<br>". \
+ join(['<a href={}>#{}</a>: {}'.
+ format(bug.get('url'),
+ bug.get('id'),
+ bug.get('title'))
+ for bug in i.get('bugs')])
+ color = i.get('testresult_status_color')
+ html_tr = "".join(["<tr>",
+ "<td><font color={}>{}</font>"
+ "<br><a href={}>{}</a>"
+ "<br><a href={}>[job]</a></td>\
+ <td>{}</td>".
+ format(color,
+ i.get('testresult_status'),
+ i.get('testresult_url'),
+ i.get('test'),
+ i.get('test_fail_url'),
+ html_bugs),
+ "</tr>"])
+ html_body += html_tr
+ html += html_top
+ html += html_total_count
+ html += html_failurestat_header
+ html += html_body
+ html += html_buttom
+ if filename:
+ with open(filename, 'w') as fileoutput:
+ if format_output not in ['html']:
+ mdata = getattr(data, format_output)
+ fileoutput.write(mdata)
+ else:
+ fileoutput.write(html)
+
+
+def publish_statistics(stat, build_number, job_name):
+ """ Publish statistics info to TestRail
+ Note: Please, follow tablib python lib supported formats
+
+ :param statistics: list.
+ Each item contains test specific info and failure reason group
+ :return: True/False
+ """
+
+ dump_statistics(stat, build_number, job_name,
+ format_output='html',
+ file_output='/tmp/failure_groups_statistics')
+ # We've got file and it shall be uploaded to TestRail to custom field
+ # but TestRail shall be extended at first. Waiting...
+ return True
+
+
+def main():
+ """
+ :param argv: command line arguments
+ :return: None
+ """
+
+ parser = argparse.ArgumentParser(description='Get downstream build info'
+ ' for Jenkins swarm.runner build.'
+ ' Generate matrix statisctics:'
+ ' (failure group -> builds & tests).'
+ ' Publish matrix to Testrail'
+ ' if necessary.')
+ parser.add_argument('-n', '--build-number', type=int, required=False,
+ dest='build_number', help='Jenkins job build number')
+ parser.add_argument('-j', '--job-name', type=str,
+ dest='job_name', default='10.0.swarm.runner',
+ help='Name of Jenkins job which runs tests (runner)')
+ parser.add_argument('-f', '--format', type=str, dest='formatfile',
+ default='html',
+ help='format statistics: html,json,table')
+ parser.add_argument('-o', '--out', type=str, dest="fileoutput",
+ default='failure_groups_statistics',
+ help='Save statistics to file')
+ parser.add_argument('-t', '--track', action="store_true",
+ help='Publish statistics to TestPlan description')
+ parser.add_argument('-q', '--quiet', action="store_true",
+ help='Be quiet (disable logging except critical) '
+ 'Overrides "--verbose" option.')
+ parser.add_argument("-v", "--verbose", action="store_true",
+ help="Enable debug logging.")
+ args = parser.parse_args()
+
+ if args.verbose:
+ logger.setLevel(DEBUG)
+ if args.quiet:
+ logger.setLevel(CRITICAL)
+ if args.formatfile and\
+ args.formatfile not in ['json', 'html', 'xls', 'xlsx', 'yaml', 'csv']:
+ logger.info('Not supported format output. Exit')
+ return 2
+
+ runner_build = Build(args.job_name, args.build_number)
+ testrail_project = TestRailProject(url=TestRailSettings.url,
+ user=TestRailSettings.user,
+ password=TestRailSettings.password,
+ project=TestRailSettings.project)
+
+ if not args.build_number:
+ logger.info('Latest build number is {}. Job is {}'.
+ format(runner_build.number, args.job_name))
+ args.build_number = runner_build.number
+
+ logger.info('Getting subbuilds for {} {}'.format(args.job_name,
+ args.build_number))
+ subbuilds = runner_build.get_sub_builds()
+ if not subbuilds:
+ logger.error('Necessary subbuilds info are absent. Exit')
+ return 3
+ logger.info('{} Subbuilds have been found'.format(len(subbuilds)))
+
+ logger.info('Calculating failure groups')
+ failure_gd = get_global_failure_group_list(subbuilds)[0]
+ if not failure_gd:
+ logger.error('Necessary failure grpoup info are absent. Exit')
+ return 4
+ logger.info('{} Failure groups have been found'.format(len(failure_gd)))
+
+ logger.info('Getting TestRail data')
+ testrail_testdata = testrail_project.get_testrail_data_by_jenkinsjob(
+ args.job_name, args.build_number)
+ if not testrail_testdata:
+ logger.error('Necessary testrail info are absent. Exit')
+ return 5
+ logger.info('TestRail data have been downloaded')
+
+ logger.info('Getting TestRail bugs')
+ testrail_bugs = get_bugs(subbuilds, testrail_testdata)
+ if not testrail_bugs:
+ logger.error('Necessary testrail bugs info are absent. Exit')
+ return 6
+ logger.info('TestRail bugs have been got')
+
+ logger.info('Update subbuilds data')
+ update_subbuilds_failuregroup(subbuilds, failure_gd,
+ testrail_testdata,
+ testrail_project,
+ testrail_bugs)
+ logger.info('Subbuilds data have been updated')
+
+ logger.info('Generating statistics across all failure groups')
+ statistics = get_statistics(failure_gd, format_out=args.formatfile)
+ if not statistics:
+ logger.error('Necessary statistics info are absent. Exit')
+ return 7
+ logger.info('Statistics have been generated')
+
+ if args.fileoutput and args.formatfile:
+ logger.info('Save statistics')
+ dump_statistics(statistics, args.build_number, args.job_name,
+ args.formatfile, args.fileoutput)
+ logger.info('Statistics have been saved')
+ if args.track:
+ logger.info('Publish statistics to TestRail')
+ if publish_statistics(statistics, args.build_number, args.job_name):
+ logger.info('Statistics have been published')
+ else:
+ logger.info('Statistics have not been published'
+ 'due to internal issue')
+
+if __name__ == '__main__':
+ sys.exit(main())
diff --git a/unified_test_reporter/reports/generate_statistics.py b/unified_test_reporter/reports/generate_statistics.py
new file mode 100755
index 0000000..45c0eb4
--- /dev/null
+++ b/unified_test_reporter/reports/generate_statistics.py
@@ -0,0 +1,523 @@
+#!/usr/bin/env python
+#
+# Copyright 2016 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+import json
+import os
+import re
+import sys
+import time
+
+import argparse
+from collections import OrderedDict
+from logging import CRITICAL
+from logging import DEBUG
+
+from unified_test_reporter.providers.jenkins_client import Build
+from unified_test_reporter.providers.launchpad_client import LaunchpadBug
+from unified_test_reporter.reports.report import get_version
+from unified_test_reporter.settings import GROUPS_TO_EXPAND
+from unified_test_reporter.settings import LaunchpadSettings
+from unified_test_reporter.settings import logger
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.providers.testrail_client import TestRailProject
+
+
+def inspect_bug(bug):
+ # Return target which matches defined in settings project/milestone and
+ # has 'open' status. If there are no such targets, then just return first
+ # one available target.
+ for target in bug.targets:
+ if target['project'] == LaunchpadSettings.project and \
+ LaunchpadSettings.milestone in target['milestone'] and\
+ target['status'] not in LaunchpadSettings.closed_statuses:
+ return target
+ return bug.targets[0]
+
+
+def generate_test_plan_name(job_name, build_number):
+ # Generate name of TestPlan basing on iso image name
+ # taken from Jenkins job build parameters
+ runner_build = Build(job_name, build_number)
+ milestone, iso_number, prefix = get_version(runner_build.build_data)
+ return ' '.join(filter(lambda x: bool(x),
+ (milestone, prefix, 'iso', '#' + str(iso_number))))
+
+
+def get_testrail():
+ logger.info('Initializing TestRail Project configuration...')
+ return TestRailProject(url=TestRailSettings.url,
+ user=TestRailSettings.user,
+ password=TestRailSettings.password,
+ project=TestRailSettings.project)
+
+
+class TestRunStatistics(object):
+ """Statistics for attached bugs in TestRun
+ """
+
+ def __init__(self, project, run_id, check_blocked=False):
+ self.project = project
+ self.run = self.project.get_run(run_id)
+ self.tests = self.project.get_tests(run_id)
+ self.results = self.get_results()
+ logger.info('Found TestRun "{0}" on "{1}" with {2} tests and {3} '
+ 'results'.format(self.run['name'],
+ self.run['config'] or 'default config',
+ len(self.tests), len(self.results)))
+ self.blocked_statuses = [self.project.get_status(s)['id']
+ for s in TestRailSettings.stauses['blocked']]
+ self.failed_statuses = [self.project.get_status(s)['id']
+ for s in TestRailSettings.stauses['failed']]
+ self.check_blocked = check_blocked
+ self._bugs_statistics = {}
+
+ def __getitem__(self, item):
+ return self.run.__getitem__(item)
+
+ def get_results(self):
+ results = []
+ stop = 0
+ offset = 0
+ while not stop:
+ new_results = self.project.get_results_for_run(
+ self.run['id'],
+ limit=TestRailSettings.max_results_per_request,
+ offset=offset)
+ results += new_results
+ offset += len(new_results)
+ stop = TestRailSettings.max_results_per_request - len(new_results)
+ return results
+
+ def get_test_by_group(self, group, version):
+ if group in GROUPS_TO_EXPAND:
+ m = re.search(r'^\d+_(\S+)_on_[\d\.]+', version)
+ if m:
+ tests_thread = m.group(1)
+ group = '{0}_{1}'.format(group, tests_thread)
+ for test in self.tests:
+ if test['custom_test_group'] == group:
+ return test
+ logger.error('Test with group "{0}" not found!'.format(group))
+
+ def handle_blocked(self, test, result):
+ if result['custom_launchpad_bug']:
+ return False
+ m = re.search(r'Blocked by "(\S+)" test.', result['comment'] or '')
+ if m:
+ blocked_test_group = m.group(1)
+ else:
+ logger.warning('Blocked result #{0} for test {1} does '
+ 'not have upstream test name in its '
+ 'comments!'.format(result['id'],
+ test['custom_test_group']))
+ return False
+
+ if not result['version']:
+ logger.debug('Blocked result #{0} for test {1} does '
+ 'not have version, can\'t find upstream '
+ 'test case!'.format(result['id'],
+ test['custom_test_group']))
+ return False
+
+ bug_link = None
+ blocked_test = self.get_test_by_group(blocked_test_group,
+ result['version'])
+ if not blocked_test:
+ return False
+ logger.debug('Test {0} was blocked by failed test {1}'.format(
+ test['custom_test_group'], blocked_test_group))
+
+ blocked_results = self.project.get_results_for_test(
+ blocked_test['id'])
+
+ # Since we manually add results to failed tests with statuses
+ # ProdFailed, TestFailed, etc. and attach bugs links to them,
+ # we could skip original version copying. So look for test
+ # results with target version, but allow to copy links to bugs
+ # from other results of the same test (newer are checked first)
+ if not any(br['version'] == result['version'] and
+ br['status_id'] in self.failed_statuses
+ for br in blocked_results):
+ logger.debug('Did not find result for test {0} with version '
+ '{1}!'.format(blocked_test_group, result['version']))
+ return False
+
+ for blocked_result in sorted(blocked_results,
+ key=lambda x: x['id'],
+ reverse=True):
+ if blocked_result['status_id'] not in self.failed_statuses:
+ continue
+
+ if blocked_result['custom_launchpad_bug']:
+ bug_link = blocked_result['custom_launchpad_bug']
+ break
+
+ if bug_link is not None:
+ result['custom_launchpad_bug'] = bug_link
+ self.project.add_raw_results_for_test(test['id'], result)
+ logger.info('Added bug {0} to blocked result of {1} test.'.format(
+ bug_link, test['custom_test_group']))
+ return bug_link
+ return False
+
+ @property
+ def bugs_statistics(self):
+ if self._bugs_statistics != {}:
+ return self._bugs_statistics
+ logger.info('Collecting stats for TestRun "{0}" on "{1}"...'.format(
+ self.run['name'], self.run['config'] or 'default config'))
+
+ for test in self.tests:
+ logger.debug('Checking "{0}" test...'.format(test['title']))
+ test_results = sorted(
+ self.project.get_results_for_test(test['id'], self.results),
+ key=lambda x: x['id'], reverse=True)
+
+ linked_bugs = []
+ is_blocked = False
+
+ for result in test_results:
+ if result['status_id'] in self.blocked_statuses:
+ if self.check_blocked:
+ new_bug_link = self.handle_blocked(test, result)
+ if new_bug_link:
+ linked_bugs.append(new_bug_link)
+ is_blocked = True
+ break
+ if result['custom_launchpad_bug']:
+ linked_bugs.append(result['custom_launchpad_bug'])
+ is_blocked = True
+ break
+ if result['status_id'] in self.failed_statuses \
+ and result['custom_launchpad_bug']:
+ linked_bugs.append(result['custom_launchpad_bug'])
+
+ bug_ids = set([re.search(r'.*bugs?/(\d+)/?', link).group(1)
+ for link in linked_bugs
+ if re.search(r'.*bugs?/(\d+)/?', link)])
+
+ for bug_id in bug_ids:
+ if bug_id in self._bugs_statistics:
+ self._bugs_statistics[bug_id][test['id']] = {
+ 'group': test['custom_test_group'] or 'manual',
+ 'config': self.run['config'] or 'default',
+ 'blocked': is_blocked
+ }
+
+ else:
+ self._bugs_statistics[bug_id] = {
+ test['id']: {
+ 'group': test['custom_test_group'] or 'manual',
+ 'config': self.run['config'] or 'default',
+ 'blocked': is_blocked
+ }
+ }
+ return self._bugs_statistics
+
+
+class StatisticsGenerator(object):
+ """Generate statistics for bugs attached to TestRuns in TestPlan
+ """
+
+ def __init__(self, project, plan_id, run_ids=(), handle_blocked=False):
+ self.project = project
+ self.test_plan = self.project.get_plan(plan_id)
+ logger.info('Found TestPlan "{0}"'.format(self.test_plan['name']))
+
+ self.test_runs_stats = [
+ TestRunStatistics(self.project, r['id'], handle_blocked)
+ for e in self.test_plan['entries'] for r in e['runs']
+ if r['id'] in run_ids or len(run_ids) == 0
+ ]
+
+ self.bugs_statistics = {}
+
+ def generate(self):
+ for test_run in self.test_runs_stats:
+ test_run_stats = test_run.bugs_statistics
+ self.bugs_statistics[test_run['id']] = dict()
+ for bug, tests in test_run_stats.items():
+ if bug in self.bugs_statistics[test_run['id']]:
+ self.bugs_statistics[test_run['id']][bug].update(tests)
+ else:
+ self.bugs_statistics[test_run['id']][bug] = tests
+ logger.info('Found {0} linked bug(s)'.format(
+ len(self.bugs_statistics[test_run['id']])))
+
+ def update_desription(self, stats):
+ old_description = self.test_plan['description']
+ new_description = ''
+ for line in old_description.split('\n'):
+ if not re.match(r'^Bugs Statistics \(generated on .*\)$', line):
+ new_description += line + '\n'
+ else:
+ break
+ new_description += '\n' + stats
+ return self.project.update_plan(plan_id=self.test_plan['id'],
+ description=new_description)
+
+ def dump(self, run_id=None):
+ stats = dict()
+
+ if not run_id:
+ joint_bugs_statistics = dict()
+ for run in self.bugs_statistics:
+ for bug, tests in self.bugs_statistics[run].items():
+ if bug in joint_bugs_statistics:
+ joint_bugs_statistics[bug].update(tests)
+ else:
+ joint_bugs_statistics[bug] = tests
+ else:
+ for _run_id, _stats in self.bugs_statistics.items():
+ if _run_id == run_id:
+ joint_bugs_statistics = _stats
+
+ for bug_id in joint_bugs_statistics:
+ try:
+ lp_bug = LaunchpadBug(bug_id).get_duplicate_of()
+ except KeyError:
+ logger.warning("Bug with ID {0} not found! Most probably it's "
+ "private or private security.".format(bug_id))
+ continue
+ bug_target = inspect_bug(lp_bug)
+
+ if lp_bug.bug.id in stats:
+ stats[lp_bug.bug.id]['tests'].update(
+ joint_bugs_statistics[bug_id])
+ else:
+ stats[lp_bug.bug.id] = {
+ 'title': bug_target['title'],
+ 'importance': bug_target['importance'],
+ 'status': bug_target['status'],
+ 'project': bug_target['project'],
+ 'link': lp_bug.bug.web_link,
+ 'tests': joint_bugs_statistics[bug_id]
+ }
+ stats[lp_bug.bug.id]['failed_num'] = len(
+ [t for t, v in stats[lp_bug.bug.id]['tests'].items()
+ if not v['blocked']])
+ stats[lp_bug.bug.id]['blocked_num'] = len(
+ [t for t, v in stats[lp_bug.bug.id]['tests'].items()
+ if v['blocked']])
+
+ return OrderedDict(sorted(stats.items(),
+ key=lambda x: (x[1]['failed_num'] +
+ x[1]['blocked_num']),
+ reverse=True))
+
+ def dump_html(self, stats=None, run_id=None):
+ if stats is None:
+ stats = self.dump()
+
+ html = '<html xmlns="http://www.w3.org/1999/xhtml" lang="en">\n'
+ html += '<h2>Bugs Statistics (generated on {0})</h2>\n'.format(
+ time.strftime("%c"))
+ html += '<h3>TestPlan: "{0}"</h3>\n'.format(self.test_plan['name'])
+ if run_id:
+ test_run = [r for r in self.test_runs_stats if r['id'] == run_id]
+ if test_run:
+ html += '<h4>TestRun: "{0}"</h4>\n'.format(test_run[0]['name'])
+
+ for values in stats.values():
+ if values['status'].lower() in ('invalid',):
+ color = 'gray'
+ elif values['status'].lower() in ('new', 'confirmed', 'triaged'):
+ color = 'red'
+ elif values['status'].lower() in ('in progress',):
+ color = 'blue'
+ elif values['status'].lower() in ('fix committed',):
+ color = 'goldenrod'
+ elif values['status'].lower() in ('fix released',):
+ color = 'green'
+ else:
+ color = 'orange'
+
+ title = re.sub(r'(Bug\s+#\d+\s+)(in\s+[^:]+:\s+)', '\g<1>',
+ values['title'])
+ title = re.sub(r'(.{100}).*', '\g<1>...', title)
+ html += '[{0:<3} failed TC(s)]'.format(values['failed_num'])
+ html += '[{0:<3} blocked TC(s)]'.format(values['blocked_num'])
+ html += ('[{0:^4}][{1:^9}]'
+ '[<b><font color={3}>{2:^13}</font></b>]').format(
+ values['project'], values['importance'], values['status'],
+ color)
+ html += '[<a href="{0}">{1}</a>]'.format(values['link'], title)
+ index = 1
+ for tid, params in values['tests'].items():
+ if index > 1:
+ link_text = '{}'.format(index)
+ else:
+ link_text = '{0} on {1}'.format(params['group'],
+ params['config'])
+ html += ('[<a href="{0}/index.php?/tests/view/{1}">{2}</a>]</'
+ 'font>').format(TestRailSettings.url, tid, link_text)
+ index += 1
+ html += '</br>\n'
+ html += '</html>\n'
+ return html
+
+ def publish(self, stats=None):
+ if stats is None:
+ stats = self.dump()
+
+ header = 'Bugs Statistics (generated on {0})\n'.format(
+ time.strftime("%c"))
+ header += '==================================\n'
+
+ bugs_table = ('|||:Failed|:Blocked|:Project|:Priority'
+ '|:Status|:Bug link|:Tests\n')
+
+ for values in stats.values():
+ title = re.sub(r'(Bug\s+#\d+\s+)(in\s+[^:]+:\s+)', '\g<1>',
+ values['title'])
+ title = re.sub(r'(.{100}).*', '\g<1>...', title)
+ title = title.replace('[', '{')
+ title = title.replace(']', '}')
+ bugs_table += (
+ '||{failed}|{blocked}|{project}|{priority}|{status}|').format(
+ failed=values['failed_num'], blocked=values['blocked_num'],
+ project=values['project'].upper(),
+ priority=values['importance'], status=values['status'])
+ bugs_table += '[{0}]({1})|'.format(title, values['link'])
+ index = 1
+ for tid, params in values['tests'].items():
+ if index > 1:
+ link_text = '{}'.format(index)
+ else:
+ link_text = '{0} on {1}'.format(params['group'],
+ params['config'])
+ bugs_table += '[{{{0}}}]({1}/index.php?/tests/view/{2}) '.\
+ format(link_text, TestRailSettings.url, tid)
+ index += 1
+ bugs_table += '\n'
+
+ return self.update_desription(header + bugs_table)
+
+
+def save_stats_to_file(stats, file_name, html=''):
+ def warn_file_exists(file_path):
+ if os.path.exists(file_path):
+ logger.warning('File {0} exists and will be '
+ 'overwritten!'.format(file_path))
+
+ json_file_path = '{}.json'.format(file_name)
+ warn_file_exists(json_file_path)
+
+ with open(json_file_path, 'w+') as f:
+ json.dump(stats, f)
+
+ if html:
+ html_file_path = '{}.html'.format(file_name)
+ warn_file_exists(html_file_path)
+ with open(html_file_path, 'w+') as f:
+ f.write(html)
+
+
+def main():
+ parser = argparse.ArgumentParser(
+ description="Generate statistics for bugs linked to TestRun. Publish "
+ "statistics to testrail if necessary."
+ )
+ parser.add_argument('plan_id', type=int, nargs='?', default=None,
+ help='Test plan ID in TestRail')
+ parser.add_argument('-j', '--job-name',
+ dest='job_name', type=str, default=None,
+ help='Name of Jenkins job which runs tests (runner). '
+ 'It will be used for TestPlan search instead ID')
+ parser.add_argument('-n', '--build-number', dest='build_number',
+ default='latest', help='Jenkins job build number')
+ parser.add_argument('-r', '--run-id',
+ dest='run_ids', type=str, default=None,
+ help='(optional) IDs of TestRun to check (skip other)')
+ parser.add_argument('-b', '--handle-blocked', action="store_true",
+ dest='handle_blocked', default=False,
+ help='Copy bugs links to downstream blocked results')
+ parser.add_argument('-s', '--separate-runs', action="store_true",
+ dest='separate_runs', default=False,
+ help='Create separate statistics for each test run')
+ parser.add_argument('-p', '--publish', action="store_true",
+ help='Publish statistics to TestPlan description')
+ parser.add_argument('-o', '--out-file', dest='output_file',
+ default=None, type=str,
+ help='Path to file to save statistics as JSON and/or '
+ 'HTML. Filename extension is added automatically')
+ parser.add_argument('-H', '--html', action="store_true",
+ help='Save statistics in HTML format to file '
+ '(used with --out-file option)')
+ parser.add_argument('-q', '--quiet', action="store_true",
+ help='Be quiet (disable logging except critical) '
+ 'Overrides "--verbose" option.')
+ parser.add_argument("-v", "--verbose", action="store_true",
+ help="Enable debug logging.")
+
+ args = parser.parse_args()
+
+ if args.verbose:
+ logger.setLevel(DEBUG)
+
+ if args.quiet:
+ logger.setLevel(CRITICAL)
+
+ testrail_project = get_testrail()
+
+ if args.job_name:
+ logger.info('Inspecting {0} build of {1} Jenkins job for TestPlan '
+ 'details...'.format(args.build_number, args.job_name))
+ test_plan_name = generate_test_plan_name(args.job_name,
+ args.build_number)
+ test_plan = testrail_project.get_plan_by_name(test_plan_name)
+ if test_plan:
+ args.plan_id = test_plan['id']
+ else:
+ logger.warning('TestPlan "{0}" not found!'.format(test_plan_name))
+
+ if not args.plan_id:
+ logger.error('There is no TestPlan to process, exiting...')
+ return 1
+
+ run_ids = () if not args.run_ids else tuple(
+ int(arg) for arg in args.run_ids.split(','))
+
+ generator = StatisticsGenerator(testrail_project,
+ args.plan_id,
+ run_ids,
+ args.handle_blocked)
+ generator.generate()
+ stats = generator.dump()
+
+ if args.publish:
+ logger.debug('Publishing bugs statistics to TestRail..')
+ generator.publish(stats)
+
+ if args.output_file:
+ html = generator.dump_html(stats) if args.html else args.html
+ save_stats_to_file(stats, args.output_file, html)
+
+ if args.separate_runs:
+ for run in generator.test_runs_stats:
+ file_name = '{0}_{1}'.format(args.output_file, run['id'])
+ stats = generator.dump(run_id=run['id'])
+ html = (generator.dump_html(stats, run['id']) if args.html
+ else args.html)
+ save_stats_to_file(stats, file_name, html)
+
+ logger.info('Statistics generation complete!')
+
+
+if __name__ == "__main__":
+ sys.exit(main())
diff --git a/unified_test_reporter/reports/report.py b/unified_test_reporter/reports/report.py
new file mode 100755
index 0000000..e81c01d
--- /dev/null
+++ b/unified_test_reporter/reports/report.py
@@ -0,0 +1,487 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import division
+from __future__ import unicode_literals
+
+import re
+from logging import DEBUG
+from optparse import OptionParser
+
+from unified_test_reporter.providers.jenkins_client import Build
+from unified_test_reporter.providers.launchpad_client import LaunchpadBug
+from unified_test_reporter.providers.providers import TestResult
+from unified_test_reporter.providers.testrail_client import TestRailProject
+from unified_test_reporter.pylib.pylib import retry
+from unified_test_reporter.settings import GROUPS_TO_EXPAND
+from unified_test_reporter.settings import JENKINS
+from unified_test_reporter.settings import LaunchpadSettings
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.settings import logger
+
+
+def expand_test_group(group, systest_build_name, os):
+ """Expand specified test names with the group name of the job
+ which is taken from the build name, for example:
+ group: 'setup_master'
+ systest_build_name: '7.0.system_test.ubuntu.bonding_ha_one_controller'
+ os: str, release name in lower case, for example: 'ubuntu'
+ return: 'setup_master_bonding_ha_one_controller'
+ """
+ if group in GROUPS_TO_EXPAND:
+ if os in systest_build_name:
+ sep = '.' + os + '.'
+ else:
+ sep = '.'
+ systest_group_name = systest_build_name.split(sep)[-1]
+
+ if systest_group_name:
+ group = '_'.join([group, systest_group_name])
+ return group
+
+
+def check_blocked(test):
+ """Change test result status to 'blocked' if it was
+ skipped due to failure of another dependent test
+ :param test: dict, test result info
+ :return: None
+ """
+ if test['status'].lower() != 'skipped':
+ return
+ match = re.match(r'^Failure in <function\s+(\w+)\s+at\s0x[a-f0-9]+>',
+ test['skippedMessage'])
+ if match:
+ failed_func_name = match.group(1)
+ if test['name'] != failed_func_name:
+ test['status'] = 'blocked'
+ test['skippedMessage'] = 'Blocked by "{0}" test.'.format(
+ failed_func_name)
+
+
+def check_untested(test):
+ """Check if test result is fake
+ :param test: dict
+ :return: bool
+ """
+ if test['name'] == 'jenkins' and 'skippedMessage' not in test:
+ return True
+ return False
+
+
+@retry(count=3)
+def get_tests_results(systest_build, os):
+ tests_results = []
+ test_build = Build(systest_build['name'],
+ systest_build['number']).\
+ get_test_build(check_rebuild=True)
+ run_test_data = test_build.test_data()
+ test_classes = {}
+ for one in run_test_data['suites'][0]['cases']:
+ class_name = one['className']
+ if class_name not in test_classes:
+ test_classes[class_name] = {}
+ test_classes[class_name]['child'] = []
+ test_classes[class_name]['duration'] = 0
+ test_classes[class_name]["failCount"] = 0
+ test_classes[class_name]["passCount"] = 0
+ test_classes[class_name]["skipCount"] = 0
+ else:
+ if one['className'] == one['name']:
+ logger.warning("Found duplicate test in run - {}".format(
+ one['className']))
+ continue
+
+ test_class = test_classes[class_name]
+ test_class['child'].append(one)
+ test_class['duration'] += float(one['duration'])
+ if one['status'].lower() in ('failed', 'error'):
+ test_class["failCount"] += 1
+ if one['status'].lower() == 'passed':
+ test_class["passCount"] += 1
+ if one['status'].lower() == 'skipped':
+ test_class["skipCount"] += 1
+
+ for klass in test_classes:
+ klass_result = test_classes[klass]
+ fuel_tests_results = []
+ if klass.startswith('fuel_tests.'):
+ for one in klass_result['child']:
+ test_name = one['name']
+ test_package, _, test_class = one['className'].rpartition('.')
+ test_result = TestResult(
+ name=test_name,
+ group=expand_test_group(one['name'],
+ systest_build['name'],
+ os),
+ status=one['status'].lower(),
+ duration='{0}s'.format(int(one['duration']) + 1),
+ url='{0}testReport/{1}/{2}/{3}'.format(
+ test_build.url,
+ test_package,
+ test_class,
+ test_name),
+ version='_'.join(
+ [test_build.build_data["id"]] + (
+ test_build.build_data["description"] or
+ test_name).split()),
+ description=(test_build.build_data["description"] or
+ test_name),
+ comments=one['skippedMessage'],
+ )
+ fuel_tests_results.append(test_result)
+ elif len(klass_result['child']) == 1:
+ test = klass_result['child'][0]
+ if check_untested(test):
+ continue
+ check_blocked(test)
+ test_result = TestResult(
+ name=test['name'],
+ group=expand_test_group(test['className'],
+ systest_build['name'],
+ os),
+ status=test['status'].lower(),
+ duration='{0}s'.format(int(test['duration']) + 1),
+ url='{0}testReport/(root)/{1}/'.format(test_build.url,
+ test['name']),
+ version='_'.join(
+ [test_build.build_data["id"]] + (
+ test_build.build_data["description"] or
+ test['name']).split()),
+ description=test_build.build_data["description"] or
+ test['name'],
+ comments=test['skippedMessage']
+ )
+ else:
+ case_steps = []
+ test_duration = sum(
+ [float(c['duration']) for c in klass_result['child']])
+ steps = [c for c in klass_result['child']
+ if c['name'].startswith('Step')]
+ steps = sorted(steps, key=lambda k: k['name'])
+ test_name = steps[0]['className']
+ test_group = steps[0]['className']
+ test_comments = None
+ is_test_failed = any([s['status'].lower() in ('failed', 'error')
+ for s in steps])
+
+ for step in steps:
+ if step['status'].lower() in ('failed', 'error'):
+ case_steps.append({
+ "content": step['name'],
+ "actual": step['errorStackTrace'] or
+ step['errorDetails'],
+ "status": step['status'].lower()})
+ test_comments = "{err}\n\n\n{stack}".format(
+ err=step['errorDetails'],
+ stack=step['errorStackTrace'])
+ else:
+ case_steps.append({
+ "content": step['name'],
+ "actual": "pass",
+ "status": step['status'].lower()
+ })
+ test_result = TestResult(
+ name=test_name,
+ group=expand_test_group(test_group,
+ systest_build['name'],
+ os),
+ status='failed' if is_test_failed else 'passed',
+ duration='{0}s'.format(int(test_duration) + 1),
+ url='{0}testReport/(root)/{1}/'.format(test_build.url,
+ test_name),
+ version='_'.join(
+ [test_build.build_data["id"]] + (
+ test_build.build_data["description"] or
+ test_name).split()),
+ description=test_build.build_data["description"] or
+ test_name,
+ comments=test_comments,
+ steps=case_steps,
+ )
+ if fuel_tests_results:
+ tests_results.extend(fuel_tests_results)
+ else:
+ tests_results.append(test_result)
+ return tests_results
+
+
+def publish_results(project, milestone_id, test_plan,
+ suite_id, config_id, results):
+ test_run_ids = [run['id'] for entry in test_plan['entries']
+ for run in entry['runs'] if suite_id == run['suite_id'] and
+ config_id in run['config_ids']]
+ logger.debug('Looking for previous tests runs on "{0}" using tests suite '
+ '"{1}"...'.format(project.get_config(config_id)['name'],
+ project.get_suite(suite_id)['name']))
+ previous_tests_runs = project.get_previous_runs(
+ milestone_id=milestone_id,
+ suite_id=suite_id,
+ config_id=config_id,
+ limit=TestRailSettings.previous_results_depth)
+ logger.debug('Found next test runs: {0}'.format(
+ [test_run['description'] for test_run in previous_tests_runs]))
+ cases = project.get_cases(suite_id=suite_id)
+ tests = project.get_tests(run_id=test_run_ids[0])
+ results_to_publish = []
+
+ for result in results:
+ test = project.get_test_by_group(run_id=test_run_ids[0],
+ group=result.group,
+ tests=tests)
+ if not test:
+ logger.error("Test for '{0}' group not found: {1}".format(
+ result.group, result.url))
+ continue
+ existing_results_versions = [r['version'] for r in
+ project.get_results_for_test(test['id'])]
+ if result.version in existing_results_versions:
+ continue
+ if result.status not in ('passed', 'blocked'):
+ case_id = project.get_case_by_group(suite_id=suite_id,
+ group=result.group,
+ cases=cases)['id']
+ run_ids = [run['id'] for run in previous_tests_runs[0:
+ int(TestRailSettings.previous_results_depth)]]
+ previous_results = project.get_all_results_for_case(
+ run_ids=run_ids,
+ case_id=case_id)
+ lp_bug = get_existing_bug_link(previous_results)
+ if lp_bug:
+ result.launchpad_bug = lp_bug['bug_link']
+ results_to_publish.append(result)
+
+ try:
+ if len(results_to_publish) > 0:
+ project.add_results_for_cases(run_id=test_run_ids[0],
+ suite_id=suite_id,
+ tests_results=results_to_publish)
+ except:
+ logger.error('Failed to add new results for tests: {0}'.format(
+ [r.group for r in results_to_publish]
+ ))
+ raise
+ return results_to_publish
+
+
+@retry(count=3)
+def get_existing_bug_link(previous_results):
+ results_with_bug = [result for result in previous_results if
+ result["custom_launchpad_bug"] is not None]
+ if not results_with_bug:
+ return
+ for result in sorted(results_with_bug,
+ key=lambda k: k['created_on'],
+ reverse=True):
+ try:
+ bug_id = int(result["custom_launchpad_bug"].strip('/').split(
+ '/')[-1])
+ except ValueError:
+ logger.warning('Link "{0}" doesn\'t contain bug id.'.format(
+ result["custom_launchpad_bug"]))
+ continue
+ try:
+ bug = LaunchpadBug(bug_id).get_duplicate_of()
+ except KeyError:
+ logger.warning("Bug with id '{bug_id}' is private or \
+ doesn't exist.".format(bug_id=bug_id))
+ continue
+ except Exception:
+ logger.exception("Strange situation with '{bug_id}' \
+ issue".format(bug_id=bug_id))
+ continue
+
+ for target in bug.targets:
+ if target['project'] == LaunchpadSettings.project and\
+ target['milestone'] == LaunchpadSettings.milestone and\
+ target['status'] not in LaunchpadSettings.closed_statuses:
+ target['bug_link'] = result["custom_launchpad_bug"]
+ return target
+
+
+def main():
+
+ parser = OptionParser(
+ description="Publish results of system tests from Jenkins build to "
+ "TestRail. See settings.py for configuration."
+ )
+ parser.add_option('-j', '--job-name', dest='job_name', default=None,
+ help='Jenkins swarm runner job name')
+ parser.add_option('-N', '--build-number', dest='build_number',
+ default='latest',
+ help='Jenkins swarm runner build number')
+ parser.add_option('-o', '--one-job', dest='one_job_name',
+ default=None,
+ help=('Process only one job name from the specified '
+ 'parent job or view'))
+ parser.add_option("-w", "--view", dest="jenkins_view", default=False,
+ help="Get system tests jobs from Jenkins view")
+ parser.add_option("-l", "--live", dest="live_report", action="store_true",
+ help="Get tests results from running swarm")
+ parser.add_option("-m", "--manual", dest="manual_run", action="store_true",
+ help="Manually add tests cases to TestRun (tested only)")
+ parser.add_option('-c', '--create-plan-only', action="store_true",
+ dest="create_plan_only", default=False,
+ help='Jenkins swarm runner job name')
+ parser.add_option("-v", "--verbose",
+ action="store_true", dest="verbose", default=False,
+ help="Enable debug output")
+
+ (options, _) = parser.parse_args()
+
+ if options.verbose:
+ logger.setLevel(DEBUG)
+
+ if options.live_report and options.build_number == 'latest':
+ options.build_number = 'latest_started'
+
+ # STEP #1
+ # Initialize TestRail Project and define configuration
+ logger.info('Initializing TestRail Project configuration...')
+ project = TestRailProject(url=TestRailSettings.url,
+ user=TestRailSettings.user,
+ password=TestRailSettings.password,
+ project=TestRailSettings.project)
+
+ tests_suite = project.get_suite_by_name(TestRailSettings.tests_suite)
+ operation_systems = [{'name': config['name'], 'id': config['id'],
+ 'distro': config['name'].split()[0].lower()}
+ for config in project.get_config_by_name(
+ 'Operation System')['configs'] if
+ config['name'] in TestRailSettings.operation_systems]
+ tests_results = {os['distro']: [] for os in operation_systems}
+
+ # STEP #2
+ # Get tests results from Jenkins
+ logger.info('Getting tests results from Jenkins...')
+ if options.jenkins_view:
+ jobs = Build.get_jobs_for_view(options.jenkins_view)
+ tests_jobs = [{'name': j, 'number': 'latest'}
+ for j in jobs if 'system_test' in j] if \
+ not options.create_plan_only else []
+ runner_job = [j for j in jobs if 'runner' in j][0]
+ runner_build = Build(runner_job, 'latest')
+ elif options.job_name:
+ runner_build = Build(options.job_name, options.build_number)
+ tests_jobs = runner_build.get_downstream_builds() if \
+ not options.create_plan_only else []
+ else:
+ logger.error("Please specify either Jenkins swarm runner job name (-j)"
+ " or Jenkins view with system tests jobs (-w). Exiting..")
+ return
+
+ for systest_build in tests_jobs:
+ if (options.one_job_name and
+ options.one_job_name != systest_build['name']):
+ logger.debug("Skipping '{0}' because --one-job is specified"
+ .format(systest_build['name']))
+ continue
+ if options.job_name:
+ if 'result' not in systest_build.keys():
+ logger.debug("Skipping '{0}' job because it does't run tests "
+ "(build #{1} contains no results)".format(
+ systest_build['name'],
+ systest_build['number']))
+ continue
+ if systest_build['result'] is None:
+ logger.debug("Skipping '{0}' job (build #{1}) because it's sti"
+ "ll running...".format(systest_build['name'],
+ systest_build['number'],))
+ continue
+ for os in tests_results.keys():
+ if os in systest_build['name'].lower():
+ tests_results[os].extend(get_tests_results(systest_build, os))
+
+ # STEP #3
+ # Create new TestPlan in TestRail (or get existing) and add TestRuns
+ milestone, iso_number, prefix = get_version(runner_build.build_data)
+ milestone = project.get_milestone_by_name(name=milestone)
+
+ test_plan_name = ' '.join(
+ filter(lambda x: bool(x),
+ (milestone['name'], prefix, 'iso', '#' + str(iso_number))))
+
+ test_plan = project.get_plan_by_name(test_plan_name)
+ iso_job_name = '{0}{1}.all'.format(milestone['name'],
+ '-{0}'.format(prefix) if prefix
+ else '')
+ iso_link = '/'.join([JENKINS['url'], 'job', iso_job_name, str(iso_number)])
+ if not test_plan:
+ test_plan = project.add_plan(test_plan_name,
+ description=iso_link,
+ milestone_id=milestone['id'],
+ entries=[]
+ )
+ logger.info('Created new TestPlan "{0}".'.format(test_plan_name))
+ else:
+ logger.info('Found existing TestPlan "{0}".'.format(test_plan_name))
+
+ if options.create_plan_only:
+ return
+
+ plan_entries = []
+ all_cases = project.get_cases(suite_id=tests_suite['id'])
+ for os in operation_systems:
+ cases_ids = []
+ if options.manual_run:
+ all_results_groups = [r.group for r in tests_results[os['distro']]]
+ for case in all_cases:
+ if case['custom_test_group'] in all_results_groups:
+ cases_ids.append(case['id'])
+ plan_entries.append(
+ project.test_run_struct(
+ name='{suite_name}'.format(suite_name=tests_suite['name']),
+ suite_id=tests_suite['id'],
+ milestone_id=milestone['id'],
+ description='Results of system tests ({tests_suite}) on is'
+ 'o #"{iso_number}"'.format(tests_suite=tests_suite['name'],
+ iso_number=iso_number),
+ config_ids=[os['id']],
+ include_all=True,
+ case_ids=cases_ids
+ )
+ )
+
+ if not any(entry['suite_id'] == tests_suite['id']
+ for entry in test_plan['entries']):
+ if project.add_plan_entry(plan_id=test_plan['id'],
+ suite_id=tests_suite['id'],
+ config_ids=[os['id'] for os
+ in operation_systems],
+ runs=plan_entries):
+ test_plan = project.get_plan(test_plan['id'])
+
+ # STEP #4
+ # Upload tests results to TestRail
+ logger.info('Uploading tests results to TestRail...')
+ for os in operation_systems:
+ logger.info('Checking tests results for "{0}"...'.format(os['name']))
+ results_to_publish = publish_results(
+ project=project,
+ milestone_id=milestone['id'],
+ test_plan=test_plan,
+ suite_id=tests_suite['id'],
+ config_id=os['id'],
+ results=tests_results[os['distro']]
+ )
+ logger.debug('Added new results for tests ({os}): {tests}'.format(
+ os=os['name'], tests=[r.group for r in results_to_publish]
+ ))
+
+ logger.info('Report URL: {0}'.format(test_plan['url']))
+
+
+if __name__ == "__main__":
+ main()
diff --git a/unified_test_reporter/reports/report_pi.py b/unified_test_reporter/reports/report_pi.py
new file mode 100755
index 0000000..a87e88b
--- /dev/null
+++ b/unified_test_reporter/reports/report_pi.py
@@ -0,0 +1,209 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+from logging import DEBUG
+from optparse import OptionParser
+
+import requests
+
+from unified_test_reporter.providers.jenkins_client import Build
+from unified_test_reporter.reports.report import get_tests_results
+from unified_test_reporter.reports.report import publish_results
+from unified_test_reporter.settings import JENKINS
+from unified_test_reporter.settings import logger
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.providers.testrail_client import TestRailProject
+
+
+def find_run_by_name(test_plan, run_name):
+ """This function finds the test run by its name
+ """
+ for entry in test_plan['entries']:
+ for run in entry['runs']:
+ if run['name'] == run_name:
+ return run
+
+
+def get_job_info(url):
+ job_url = "/".join([url, 'api/json'])
+ logger.debug("Request job info from %s", job_url)
+ return requests.get(job_url).json()
+
+
+def main():
+ parser = OptionParser(
+ description="Publish results of system tests from Jenkins build to "
+ "TestRail. See settings.py for configuration."
+ )
+ parser.add_option('-j', '--job-name', dest='job_name', default=None,
+ help='Jenkins swarm runner job name')
+ parser.add_option('-N', '--build-number', dest='build_number',
+ default='latest',
+ help='Jenkins swarm runner build number')
+ parser.add_option("-l", "--live", dest="live_report", action="store_true",
+ help="Get tests results from running swarm")
+ parser.add_option("-v", "--verbose",
+ action="store_true", dest="verbose", default=False,
+ help="Enable debug output")
+
+ (options, _) = parser.parse_args()
+
+ if options.verbose:
+ logger.setLevel(DEBUG)
+
+ if options.live_report and options.build_number == 'latest':
+ build_number = 'latest_started'
+ else:
+ build_number = options.build_number
+
+ # STEP #1
+ # Initialize TestRail Project and define configuration
+ logger.info('Initializing TestRail Project configuration...')
+ project = TestRailProject(url=TestRailSettings.url,
+ user=TestRailSettings.user,
+ password=TestRailSettings.password,
+ project=TestRailSettings.project)
+ logger.info('Initializing TestRail Project configuration... done')
+
+ operation_systems = [{'name': config['name'], 'id': config['id'],
+ 'distro': config['name'].split()[0].lower()}
+ for config in project.get_config_by_name(
+ 'Operation System')['configs']]
+ os_mile = {'6.1': ['Centos 6.5', 'Ubuntu 14.04'],
+ '6.0.1': ['Centos 6.5', 'Ubuntu 12.04']}
+
+ tests_results = {}
+
+ # STEP #2
+ # Get tests results from Jenkins
+ runner_build = Build(options.job_name, build_number)
+ runs = runner_build.build_data['runs']
+
+ # Analyze each test individually
+ for run_one in runs:
+ if '5.1' in run_one['url']:
+ continue # Release 5.1 to skip
+ tests_result = get_job_info(run_one['url'])
+ if not tests_result['description']:
+ continue # Not completed results to skip
+ if 'skipping' in tests_result['description']:
+ continue # Not performed tests to skip
+ tests_job = {'result': tests_result['result'],
+ 'name': (options.job_name + '/' +
+ tests_result['url'].split('/')[-3]),
+ 'number': int(tests_result['url'].split('/')[-2]),
+ 'mile': (tests_result['description'].
+ split()[0].split('-')[0]),
+ 'iso': (int(tests_result['description'].
+ split()[0].split('-')[1]))}
+ if tests_job['mile'] not in tests_results:
+ tests_results[tests_job['mile']] = {}
+ test_mile = tests_results[tests_job['mile']]
+ if tests_job['iso'] not in test_mile:
+ test_mile[tests_job['iso']] = {}
+ test_iso = test_mile[tests_job['iso']]
+ for os in operation_systems:
+ if os['distro'] in tests_job['name'].lower() and\
+ os['name'] in os_mile[tests_job['mile']]:
+ if os['id'] not in test_iso:
+ test_iso[os['id']] = []
+ test_os_id = test_iso[os['id']]
+ test_os_id.extend(get_tests_results(tests_job, os['distro']))
+
+ # STEP #3
+ # Create new TestPlan in TestRail (or get existing) and add TestRuns
+ for mile in tests_results:
+ mile_tests_suite = '{0}{1}'.format(TestRailSettings.tests_suite, mile)
+ logger.info(mile_tests_suite)
+ tests_suite = project.get_suite_by_name(mile_tests_suite)
+ milestone = project.get_milestone_by_name(name=mile)
+ for iso_number in tests_results.get(mile, {}):
+ # Create new TestPlan name check the same name in testrail
+ test_plan_name = '{milestone} iso #{iso_number}'.format(
+ milestone=milestone['name'],
+ iso_number=iso_number)
+ test_plan = project.get_plan_by_name(test_plan_name)
+ if not test_plan:
+ test_plan = project.add_plan(
+ test_plan_name,
+ description='/'.join([JENKINS['url'],
+ 'job',
+ '{0}.all'.format(milestone['name']),
+ str(iso_number)]),
+ milestone_id=milestone['id'],
+ entries=[])
+ logger.info('Created new TestPlan "{0}".'
+ .format(test_plan_name))
+ else:
+ logger.info('Found existing TestPlan "{0}".'
+ .format(test_plan_name))
+ plan_entries = []
+ # Create a test plan entry
+ config_ids = []
+ for os in operation_systems:
+ if os['name'] in os_mile[mile]:
+ config_ids.append(os['id'])
+ cases_ids = []
+ plan_entries.append(
+ project.test_run_struct(
+ name=tests_suite['name'],
+ suite_id=tests_suite['id'],
+ milestone_id=milestone['id'],
+ description=('Results of system tests ({t_suite})'
+ ' on iso #"{iso_number}"'
+ .format(t_suite=tests_suite['name'],
+ iso_number=iso_number)),
+ config_ids=[os['id']],
+ include_all=True,
+ case_ids=cases_ids))
+ # Create a test plan entry with the test run
+ run = find_run_by_name(test_plan, tests_suite['name'])
+ if not run:
+ logger.info('Adding a test plan entry with test run %s ...',
+ tests_suite['name'])
+ entry = project.add_plan_entry(plan_id=test_plan['id'],
+ suite_id=tests_suite['id'],
+ config_ids=config_ids,
+ runs=plan_entries)
+ logger.info('The test plan entry has been added.')
+ run = entry['runs'][0]
+ test_plan = project.get_plan(test_plan['id'])
+
+ # STEP #4
+ # Upload tests results to TestRail
+ logger.info('Uploading tests results to TestRail...')
+ for os_id in tests_results.get(mile, {})\
+ .get(iso_number, {}):
+ logger.info('Checking tests results for %s...',
+ project.get_config(os_id)['name'])
+ tests_added = publish_results(
+ project=project,
+ milestone_id=milestone['id'],
+ test_plan=test_plan,
+ suite_id=tests_suite['id'],
+ config_id=os_id,
+ results=tests_results[mile][iso_number][os_id])
+ logger.debug('Added new results for tests (%s): %s',
+ project.get_config(os_id)['name'],
+ [r.group for r in tests_added])
+
+ logger.info('Report URL: %s', test_plan['url'])
+
+
+if __name__ == "__main__":
+ main()
diff --git a/unified_test_reporter/reports/report_tempest_results.py b/unified_test_reporter/reports/report_tempest_results.py
new file mode 100755
index 0000000..50ca994
--- /dev/null
+++ b/unified_test_reporter/reports/report_tempest_results.py
@@ -0,0 +1,281 @@
+#!/usr/bin/env python
+#
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+import optparse
+from xml.etree import ElementTree
+
+# pylint: disable=import-error
+# noinspection PyUnresolvedReferences
+from six.moves import urllib
+# pylint: enable=import-error
+
+from unified_test_reporter.reports import report
+from unified_test_reporter.settings import JENKINS
+from unified_test_reporter.settings import logger
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.providers.testrail_client import TestRailProject
+
+
+LOG = logger
+
+
+def parse_xml_report(path_to_report):
+ """This function parses the Tempest XML report and returns the list with
+ TestResult objects. Each TestResult object corresponds to one of the tests
+ and contains all the result information for the respective test.
+ """
+
+ tree = ElementTree.parse(path_to_report)
+ test_results = []
+ for elem in tree.findall('testcase'):
+ status = 'passed'
+ description = None
+ child_elem = elem.getchildren()
+ if child_elem:
+ status = child_elem[0].tag
+ description = child_elem[0].text
+
+ test_result = report.TestResult(name=elem.get('name'),
+ group=elem.get('classname'),
+ status='failed'
+ if status == 'failure' else status,
+ description=description,
+ duration=1)
+ test_results.append(test_result)
+
+ return test_results
+
+
+def mark_all_tests_as_blocked(client, tests_suite):
+ """This function marks all Tempest tests as blocked and returns the list
+ with TestResult objects. Each TestResult object corresponds to one of
+ the tests and contains the information that the test is blocked.
+ """
+
+ test_results = []
+ for case in client.get_cases(tests_suite['id']):
+ test_result = report.TestResult(name=case['title'],
+ group=case['custom_test_group'],
+ status='blocked',
+ description=None,
+ duration=1)
+ test_results.append(test_result)
+
+ return test_results
+
+
+def mark_all_tests_as_in_progress(client, tests_suite):
+ """This function marks all Tempest tests as "in progress" and returns
+ the list with TestResult objects. Each TestResult object corresponds
+ to one of the tests and contains the information that the test is
+ "in progress" status.
+ """
+
+ test_results = []
+ for case in client.get_cases(tests_suite['id']):
+ test_result = report.TestResult(name=case['title'],
+ group=case['custom_test_group'],
+ status='in_progress',
+ description=None,
+ duration=1)
+ test_results.append(test_result)
+
+ return test_results
+
+
+def find_run_by_name_and_config_in_test_plan(test_plan, run_name, config):
+ """This function finds the test run by its name and the specified
+ configuration (for example, Centos 6.5) in the specified test plan.
+ """
+
+ for entry in test_plan['entries']:
+ for run in entry['runs']:
+ if run['name'] == run_name and run['config'] == config:
+ return run
+
+
+def find_run_by_config_in_test_plan_entry(test_plan_entry, config):
+ """This function finds the test run by the specified configuration
+ (for example, Ubuntu 14.04) in the specified test plan entry.
+ """
+
+ for run in test_plan_entry['runs']:
+ if run['config'] == config:
+ return run
+
+
+def upload_test_results(client, test_run, suite_id, test_results):
+ """ This function allows to upload large number of test results
+ with the minimum number of APi requests to TestRail.
+ """
+
+ test_cases = client.get_cases(suite_id)
+ results = []
+ statuses = {}
+
+ for test_result in test_results:
+ if test_result.status in statuses:
+ status_id = statuses[test_result.status]
+ else:
+ status_id = client.get_status(test_result.status)['id']
+ statuses[test_result.status] = status_id
+
+ if 'setUpClass' in test_result.name:
+ i = test_result.name.find('tempest')
+ group = test_result.name[i:-1]
+ for test in test_cases:
+ if group in test.get("custom_test_group"):
+ results.append({"case_id": test['id'],
+ "status_id": status_id})
+ else:
+ for test in test_cases:
+ if test_result.name in test.get("title"):
+ results.append({"case_id": test['id'],
+ "status_id": status_id})
+
+ client.add_results_for_tempest_cases(test_run['id'], results)
+
+
+def main():
+ parser = optparse.OptionParser(
+ description='Publish the results of Tempest tests in TestRail')
+ parser.add_option('-r', '--run-name', dest='run_name',
+ help='The name of a test run. '
+ 'The name should describe the configuration '
+ 'of the environment where Tempest tests were run')
+ parser.add_option('-i', '--iso', dest='iso_number', help='ISO number')
+ parser.add_option('-p', '--path-to-report', dest='path',
+ help='The path to the Tempest XML report')
+ parser.add_option('-c', '--conf', dest='config', default='Ubuntu 14.04',
+ help='The name of one of the configurations')
+ parser.add_option('-m', '--multithreading', dest='threads_count',
+ default=100, help='The count of threads '
+ 'for uploading the test results')
+ parser.add_option('-b', '--block-all-tests',
+ dest='all_tests_blocked', action='store_true',
+ help='Mark all Tempest tests as "blocked"')
+ parser.add_option('-t', '--tests-in-progress',
+ dest='tests_in_progress', action='store_true',
+ help='Mark all Tempest tests as "in progress"')
+ parser.add_option('--prefix',
+ dest='prefix', action='store_true', default='',
+ help='Add some prefix to test run')
+
+ (options, _) = parser.parse_args()
+
+ if options.run_name is None:
+ raise optparse.OptionValueError('No run name was specified!')
+ if options.iso_number is None:
+ raise optparse.OptionValueError('No ISO number was specified!')
+ if (options.path is None and
+ not options.all_tests_blocked and not options.tests_in_progress):
+ raise optparse.OptionValueError('No path to the Tempest '
+ 'XML report was specified!')
+
+ # STEP #1
+ # Initialize TestRail project client
+ LOG.info('Initializing TestRail project client...')
+ client = TestRailProject(url=TestRailSettings.url,
+ user=TestRailSettings.user,
+ password=TestRailSettings.password,
+ project=TestRailSettings.project)
+ LOG.info('TestRail project client has been initialized.')
+
+ tests_suite = client.get_suite_by_name(TestRailSettings.tests_suite)
+ LOG.info('Tests suite is "{0}".'.format(tests_suite['name']))
+
+ # STEP #2
+ # Parse the test results
+ if options.all_tests_blocked:
+ test_results = mark_all_tests_as_blocked(client, tests_suite)
+ elif options.tests_in_progress:
+ test_results = mark_all_tests_as_in_progress(client, tests_suite)
+ else:
+ LOG.info('Parsing the test results...')
+ test_results = parse_xml_report(options.path)
+ LOG.info('The test results have been parsed.')
+
+ # STEP #3
+ # Create new test plan (or find existing)
+ name = '{0} {1}iso #{2}'
+ if options.prefix is not '':
+ options.prefix += ' '
+
+ milestone = client.get_milestone_by_name(TestRailSettings.milestone)
+ test_plan_name = name.format(milestone['name'], options.prefix,
+ options.iso_number)
+ LOG.info('Test plan name is "{0}".'.format(test_plan_name))
+
+ LOG.info('Trying to find test plan "{0}"...'.format(test_plan_name))
+ test_plan = client.get_plan_by_name(test_plan_name)
+ if not test_plan:
+ LOG.info('The test plan not found. Creating one...')
+ url = '/job/{0}.all/{1}'.format(milestone['name'], options.iso_number)
+ description = urllib.parse.urljoin(JENKINS['url'], url)
+ test_plan = client.add_plan(test_plan_name,
+ description=description,
+ milestone_id=milestone['id'],
+ entries=[])
+ LOG.info('The test plan has been created.')
+ else:
+ LOG.info('The test plan found.')
+
+ # Get ID of each OS from list "TestRailSettings.operation_systems"
+ config_ids = []
+ for os_name in TestRailSettings.operation_systems:
+ for conf in client.get_config_by_name('Operation System')['configs']:
+ if conf['name'] == os_name:
+ config_ids.append(conf['id'])
+ break
+
+ # Define test runs for CentOS and Ubuntu
+ run_name = 'Tempest - ' + options.run_name
+ runs = []
+ for conf_id in config_ids:
+ run = client.test_run_struct(name=run_name,
+ suite_id=tests_suite['id'],
+ milestone_id=milestone['id'],
+ description='Tempest results',
+ config_ids=[conf_id])
+ runs.append(run)
+
+ # Create a test plan entry with the test runs
+ run = find_run_by_name_and_config_in_test_plan(test_plan,
+ run_name, options.config)
+ if not run:
+ LOG.info('Adding a test plan entry with test run '
+ '"{0} ({1})" ...'.format(run_name, options.config))
+ entry = client.add_plan_entry(plan_id=test_plan['id'],
+ suite_id=tests_suite['id'],
+ config_ids=config_ids,
+ runs=runs,
+ name=run_name)
+ LOG.info('The test plan entry has been added.')
+ run = find_run_by_config_in_test_plan_entry(entry, options.config)
+
+ # STEP #4
+ # Upload the test results to TestRail for the specified test run
+ LOG.info('Uploading the test results to TestRail...')
+
+ upload_test_results(client, run, tests_suite['id'], test_results)
+
+ LOG.info('The results of Tempest tests have been uploaded.')
+ LOG.info('Report URL: {0}'.format(test_plan['url']))
+
+if __name__ == "__main__":
+ main()
diff --git a/unified_test_reporter/reports/upload_cases_description.py b/unified_test_reporter/reports/upload_cases_description.py
new file mode 100755
index 0000000..18fdb5d
--- /dev/null
+++ b/unified_test_reporter/reports/upload_cases_description.py
@@ -0,0 +1,300 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+
+from logging import DEBUG
+from optparse import OptionParser
+
+from unified_test_reporter.settings import GROUPS_TO_EXPAND
+from unified_test_reporter.settings import logger
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.providers.testrail_client import TestRailProject
+from unified_test_reporter.pylib.pylib import duration_to_testrail_estimate
+
+
+def get_tests_descriptions(milestone_id,
+ tests_include, tests_exclude,
+ groups,
+ default_test_priority):
+ plan = _create_test_plan_from_registry(groups=groups)
+ all_plan_tests = plan.tests[:]
+
+ tests = []
+
+ for jenkins_suffix in groups:
+ group = groups[jenkins_suffix]
+ if group_in_pytest(group):
+ for case in get_cases_from_pytest(group):
+ docstring = case.obj.__doc__ or ''
+
+ title, steps, duration = _parse_docstring(docstring, case)
+
+ test_group = case.obj.__name__
+
+ test_case = {
+ "title": title,
+ "type_id": 1,
+ "milestone_id": milestone_id,
+ "priority_id": default_test_priority,
+ "estimate": duration,
+ "refs": "",
+ "custom_test_group": test_group,
+ "custom_test_case_description": docstring or " ",
+ "custom_test_case_steps": steps
+ }
+ tests.append(test_case)
+ else:
+ plan.filter(group_names=[group])
+ for case in plan.tests:
+ if not _is_case_processable(case=case, tests=tests):
+ continue
+
+ case_name = test_group = _get_test_case_name(case)
+
+ if _is_not_included(case_name, tests_include) or \
+ _is_excluded(case_name, tests_exclude):
+ continue
+
+ docstring = _get_docstring(parent_home=case.entry.parent.home,
+ case_state=case.state,
+ home=case.entry.home)
+
+ title, steps, duration = _parse_docstring(docstring, case)
+
+ if case.entry.home.func_name in GROUPS_TO_EXPAND:
+ """Expand specified test names with the group names that are
+ used in jenkins jobs where this test is started.
+ """
+ title = ' - '.join([title, jenkins_suffix])
+ test_group = '_'.join([case.entry.home.func_name,
+ jenkins_suffix])
+
+ test_case = {
+ "title": title,
+ "type_id": 1,
+ "milestone_id": milestone_id,
+ "priority_id": default_test_priority,
+ "estimate": duration,
+ "refs": "",
+ "custom_test_group": test_group,
+ "custom_test_case_description": docstring or " ",
+ "custom_test_case_steps": steps
+ }
+
+ if not any([x[GROUP_FIELD] == test_group for x in tests]):
+ tests.append(test_case)
+ else:
+ logger.warning("Testcase '{0}' run in multiple "
+ "Jenkins jobs!".format(test_group))
+
+ plan.tests = all_plan_tests[:]
+
+ return tests
+
+
+def upload_tests_descriptions(testrail_project, section_id,
+ tests, check_all_sections):
+ tests_suite = testrail_project.get_suite_by_name(
+ TestRailSettings.tests_suite)
+ check_section = None if check_all_sections else section_id
+ cases = testrail_project.get_cases(suite_id=tests_suite['id'],
+ section_id=check_section)
+ existing_cases = [case[GROUP_FIELD] for case in cases]
+ custom_cases_fields = _get_custom_cases_fields(
+ case_fields=testrail_project.get_case_fields(),
+ project_id=testrail_project.project['id'])
+
+ for test_case in tests:
+ if test_case[GROUP_FIELD] in existing_cases:
+ testrail_case = _get_testrail_case(testrail_cases=cases,
+ test_case=test_case,
+ group_field=GROUP_FIELD)
+ fields_to_update = _get_fields_to_update(test_case, testrail_case)
+
+ if fields_to_update:
+ logger.debug('Updating test "{0}" in TestRail project "{1}", '
+ 'suite "{2}", section "{3}". Updated fields: {4}'
+ .format(
+ test_case[GROUP_FIELD],
+ TestRailSettings.project,
+ TestRailSettings.tests_suite,
+ TestRailSettings.tests_section,
+ ', '.join(fields_to_update.keys())))
+ testrail_project.update_case(case_id=testrail_case['id'],
+ fields=fields_to_update)
+ else:
+ logger.debug('Skipping "{0}" test case uploading because '
+ 'it is up-to-date in "{1}" suite'
+ .format(test_case[GROUP_FIELD],
+ TestRailSettings.tests_suite))
+
+ else:
+ for case_field, default_value in custom_cases_fields.items():
+ if case_field not in test_case:
+ test_case[case_field] = default_value
+
+ logger.debug('Uploading test "{0}" to TestRail project "{1}", '
+ 'suite "{2}", section "{3}"'.format(
+ test_case[GROUP_FIELD],
+ TestRailSettings.project,
+ TestRailSettings.tests_suite,
+ TestRailSettings.tests_section))
+ testrail_project.add_case(section_id=section_id, case=test_case)
+
+def _get_test_case_name(case):
+ """Returns test case name
+ """
+ parent_home = case.entry.parent.home
+ return parent_home.__name__ if issubclass(parent_home, ActionTest) \
+ else case.entry.home.func_name
+
+
+def _get_custom_cases_fields(case_fields, project_id):
+ custom_cases_fields = {}
+ for field in case_fields:
+ for config in field['configs']:
+ if ((project_id in
+ config['context']['project_ids'] or
+ not config['context']['project_ids']) and
+ config['options']['is_required']):
+ try:
+ custom_cases_fields[field['system_name']] = \
+ int(config['options']['items'].split(',')[0])
+ except:
+ logger.error("Couldn't find default value for required "
+ "field '{0}', setting '1' (index)!".format(
+ field['system_name']))
+ custom_cases_fields[field['system_name']] = 1
+ return custom_cases_fields
+
+
+def _get_fields_to_update(test_case, testrail_case):
+ """Produces dictionary with fields to be updated
+ """
+ fields_to_update = {}
+ for field in ('title', 'estimate', 'custom_test_case_description',
+ 'custom_test_case_steps'):
+ if test_case[field] and \
+ test_case[field] != testrail_case[field]:
+ if field == 'estimate':
+ testcase_estimate_raw = int(test_case[field][:-1])
+ testcase_estimate = \
+ duration_to_testrail_estimate(
+ testcase_estimate_raw)
+ if testrail_case[field] == testcase_estimate:
+ continue
+ elif field == 'custom_test_case_description' and \
+ test_case[field] == testrail_case[field].replace('\r', ''):
+ continue
+ fields_to_update[field] = test_case[field]
+ return fields_to_update
+
+
+def _get_testrail_case(testrail_cases, test_case, group_field):
+ """Returns testrail case that corresponds to test case from repo
+ """
+ return next((case for case in testrail_cases
+ if case[group_field] == test_case[group_field]))
+
+
+def main():
+ parser = OptionParser(
+ description="Upload tests cases to TestRail. "
+ "See settings.py for configuration."
+ )
+ parser.add_option("-v", "--verbose",
+ action="store_true", dest="verbose", default=False,
+ help="Enable debug output")
+ parser.add_option('-j', '--job-name', dest='job_name', default=None,
+ help='Jenkins swarm runner job name')
+ parser.add_option('-N', '--build-number', dest='build_number',
+ default='latest',
+ help='Jenkins swarm runner build number')
+ parser.add_option('-o', '--check_one_section', action="store_true",
+ dest='check_one_section', default=False,
+ help='Look for existing test case only in specified '
+ 'section of test suite.')
+ parser.add_option("-l", "--live", dest="live_upload", action="store_true",
+ help="Get tests results from running swarm")
+
+ (options, _) = parser.parse_args()
+
+ if options.verbose:
+ logger.setLevel(DEBUG)
+
+ if options.live_upload and options.build_number == 'latest':
+ options.build_number = 'latest_started'
+
+ project = TestRailProject(
+ url=TestRailSettings.url,
+ user=TestRailSettings.user,
+ password=TestRailSettings.password,
+ project=TestRailSettings.project
+ )
+
+ testrail_section = project.get_section_by_name(
+ suite_id=project.get_suite_by_name(TestRailSettings.tests_suite)['id'],
+ section_name=TestRailSettings.tests_section
+ )
+
+ testrail_milestone = project.get_milestone_by_name(
+ name=TestRailSettings.milestone)
+
+ testrail_default_test_priority = [priority['id'] for priority in
+ project.get_priorities() if
+ priority['is_default'] is True][0]
+
+ distros = [config['name'].split()[0].lower()
+ for config in project.get_config_by_name(
+ 'Operation System')['configs']
+ if config['name'] in TestRailSettings.operation_systems]
+
+ #tests_groups = get_tests_groups_from_jenkins(
+ # options.job_name,
+ # options.build_number,
+ # distros) if options.job_name else []
+
+ tests_groups = get_groups(
+ options.job_name,
+ options.build_number,
+ distros) if options.job_name else []
+
+ # If Jenkins job build is specified, but it doesn't have downstream builds
+ # with tests groups in jobs names, then skip tests cases uploading because
+ # ALL existing tests cases will be uploaded
+ if options.job_name and not tests_groups:
+ return
+
+ tests_descriptions = get_tests_descriptions(
+ milestone_id=testrail_milestone['id'],
+ tests_include=TestRailSettings.tests_include,
+ tests_exclude=TestRailSettings.tests_exclude,
+ groups=tests_groups,
+ default_test_priority=testrail_default_test_priority
+ )
+
+ import json
+ print json.dump(tests_descriptions, open("/home/krozin/Documents/"
+ "{}_{}_tests_descriptions.json".format(options.job_name, options.build_number), 'w'))
+
+ """upload_tests_descriptions(testrail_project=project,
+ section_id=testrail_section['id'],
+ tests=tests_descriptions,
+ check_all_sections=not options.check_one_section)
+ """
+
+if __name__ == '__main__':
+ main()
diff --git a/unified_test_reporter/reports/upload_tempest_test_suite.py b/unified_test_reporter/reports/upload_tempest_test_suite.py
new file mode 100755
index 0000000..3f7b228
--- /dev/null
+++ b/unified_test_reporter/reports/upload_tempest_test_suite.py
@@ -0,0 +1,153 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+import subprocess
+
+from joblib import Parallel, delayed
+
+from unified_test_reporter.settings import TestRailSettings
+from unified_test_reporter.providers.testrail_client import TestRailProject
+
+
+TEST_GROUPS = ["API", "CLI", "Scenario", "ThirdParty"]
+TEST_SECTIONS = ["Ceilometer", "Cinder", "Glance", "Heat", "Ironic",
+ "Keystone", "Network", "Nova", "Sahara", "Swift", "Other"]
+
+
+def generate_groups(line):
+ section = "Other"
+
+ for group in [{"names": [".telemetry.", ], "tag": "Ceilometer"},
+ {"names": [".volume.", ], "tag": "Cinder"},
+ {"names": [".image.", ], "tag": "Glance"},
+ {"names": [".orchestration.", ], "tag": "Heat"},
+ {"names": [".baremetal.", ], "tag": "Ironic"},
+ {"names": [".identity.", ], "tag": "Keystone"},
+ {"names": [".network.", ], "tag": "Network"},
+ {"names": [".compute.", ], "tag": "Nova"},
+ {"names": [".data_processing.", ], "tag": "Sahara"},
+ {"names": [".object_storage.", ], "tag": "Swift"}]:
+ for name in group["names"]:
+ if name in line:
+ section = group["tag"]
+
+ for group in TEST_SECTIONS:
+ if group.lower() in line and section == "Other":
+ section = group
+
+ return section
+
+
+def get_tests_descriptions(milestone_id, tests_include, tests_exclude):
+ # To get the Tempest tests list, need to execute the following commands:
+ # git clone https://github.com/openstack/tempest & cd tempest & tox -venv
+ get_tempest_tests = """cd tempest && .tox/venv/bin/nosetests \\
+ --collect-only tempest/{0} -v 2>&1 | grep 'id-.*'"""
+
+ tests = []
+
+ for group in TEST_GROUPS:
+ p = subprocess.Popen(get_tempest_tests.format(group.lower()),
+ shell=True, stdout=subprocess.PIPE)
+
+ for line in iter(p.stdout.readline, b''):
+ if "id-" in line:
+ section = generate_groups(line) if group == "API" else group
+
+ test_class = []
+ for r in line.split("."):
+ if "id-" in r:
+ title = r.strip()
+ break
+ else:
+ test_class.append(r)
+
+ steps = [{"run this tempest test": "passed"}, ]
+ test_case = {
+ "title": title,
+ "type_id": 1,
+ "priority_id": 5,
+ "estimate": "1m",
+ "refs": "",
+ "milestone_id": milestone_id,
+ "custom_test_group": ".".join(test_class),
+ "custom_test_case_description": title,
+ "custom_test_case_steps": steps,
+ "section": section
+ }
+ tests.append(test_case)
+
+ return tests
+
+
+def delete_case(testrail_project, test_id):
+ testrail_project.delete_case(test_id)
+
+
+def add_case(testrail_project, test_suite, test_case):
+ suite = testrail_project.get_suite_by_name(test_suite)
+ section = testrail_project.get_section_by_name(
+ suite_id=suite['id'], section_name=test_case["section"])
+ testrail_project.add_case(section_id=section["id"], case=test_case)
+
+
+def upload_tests_descriptions(testrail_project, tests):
+ test_suite = TestRailSettings.tests_suite
+ suite = testrail_project.get_suite_by_name(test_suite)
+
+ # remove old sections and test cases
+ old_sections = testrail_project.get_sections(suite_id=suite['id'])
+ for section in old_sections:
+ if section["parent_id"] is None:
+ testrail_project.delete_section(section["id"])
+
+ # create new groups
+ for group in TEST_GROUPS:
+ testrail_project.create_section(suite["id"], group)
+
+ api_group = testrail_project.get_section_by_name(suite["id"], "API")
+ for section in TEST_SECTIONS:
+ testrail_project.create_section(suite["id"], section, api_group["id"])
+
+ # add test cases to test suite in 100 parallel threads
+ Parallel(n_jobs=100)(delayed(add_case)
+ (testrail_project, test_suite, test_case)
+ for test_case in tests)
+
+
+def main():
+ testrail_project = TestRailProject(
+ url=TestRailSettings.url,
+ user=TestRailSettings.user,
+ password=TestRailSettings.password,
+ project=TestRailSettings.project
+ )
+
+ testrail_milestone = testrail_project.get_milestone_by_name(
+ name=TestRailSettings.milestone)
+
+ tests_descriptions = get_tests_descriptions(
+ milestone_id=testrail_milestone['id'],
+ tests_include=TestRailSettings.tests_include,
+ tests_exclude=TestRailSettings.tests_exclude
+ )
+
+ upload_tests_descriptions(testrail_project=testrail_project,
+ tests=tests_descriptions)
+
+
+if __name__ == '__main__':
+ main()
diff --git a/unified_test_reporter/requirements.txt b/unified_test_reporter/requirements.txt
new file mode 100755
index 0000000..2e287f0
--- /dev/null
+++ b/unified_test_reporter/requirements.txt
@@ -0,0 +1,33 @@
+#git+git://github.com/openstack/fuel-devops.git@2.9.20
+#python-dev
+py
+anyjson==0.3.1
+beautifulsoup4>=4.2.0
+proboscis==1.2.6.0
+netaddr>=0.7.12,!=0.7.16
+cryptography
+multi_key_dict
+enum
+pyasn1
+Jinja2
+babel
+imagesize
+cffi
+pyopenssl
+pytest>=2.9
+pyyaml>=3.1.0 # MIT
+requests>=2.8.1,!=2.9.0 # Apache-2.0
+tablib>=0.11.2
+xmltodict>=0.10.1 # MIT
+
+launchpadlib
+python-jenkins>=0.4.12
+simplejson
+
+python-glanceclient==0.17.1
+python-keystoneclient>=0.3.2
+python-novaclient>=2.15.0
+python-cinderclient>=1.0.5
+python-neutronclient>=2.0
+python-ironicclient>=0.8.0
+
diff --git a/unified_test_reporter/set.sh b/unified_test_reporter/set.sh
new file mode 100755
index 0000000..a1c781d
--- /dev/null
+++ b/unified_test_reporter/set.sh
@@ -0,0 +1,42 @@
+#!/bin/bash
+
+VENV_DIR=/tmp/venv
+CODE_DIR=$(pwd)/../
+export PYTHONPATH="$CODE_DIR:$PYTHONPATH"
+
+virtualenv $VENV_DIR
+echo
+pushd . > /dev/null
+cd $VENV_DIR > /dev/null
+source bin/activate
+
+pwd
+
+CUR_DIR=$(pwd)
+FUELQA_DIR=/home/krozin/@Git/MIRANTIS/fuel-qa
+export PYTHONPATH="${PYTHONPATH}:$CUR_DIR:$FUELQA_DIR"
+export JENKINS_URL=https://product-ci.infra.mirantis.net
+export TESTRAIL_URL=https://mirantis.testrail.com
+export TESTRAIL_PROJECT="Mirantis OpenStack"
+export TESTRAIL_USER=all@mirantis.com
+export TESTRAIL_PASSWORD=mirantis1C@@L
+export TESTS_RUNNER=10.0.swarm.runner
+export TEST_RUNNER_JOB_NAME=10.0.swarm.runner
+export TESTRAIL_TEST_SUITE='[10.0] Swarm'
+export TESTRAIL_MILESTONE=10.0
+export LAUNCHPAD_MILESTONE=10.0
+
+ln -s $CODE_DIR/unified_test_reporter reporter
+pip install -r reporter/requirements.txt > /dev/null
+python reporter/../setup.py develop
+
+# -------------- EXAMPLES -----------------
+#python reporter/testrail/generate_failure_group_statistics.py -o /tmp/report
+#python reporter/testrail/upload_cases_description.py -v -j ${TESTS_RUNNER}
+#python reporter/testrail/report.py -v -j 9.0.test_all -N 195
+#python reporter/testrail/generate_statistics.py --verbose --handle-blocked --out-file bugs_link_stat --job-name 9.0.swarm.runner --html
+#python reporter/testrail/upload_cases_description.py -v -l -j 10.0.swarm.runner
+rm reporter
+deactivate
+popd
+
diff --git a/unified_test_reporter/settings.py b/unified_test_reporter/settings.py
new file mode 100755
index 0000000..e2e88cf
--- /dev/null
+++ b/unified_test_reporter/settings.py
@@ -0,0 +1,91 @@
+# Copyright 2015 Mirantis, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License"); you may
+# not use this file except in compliance with the License. You may obtain
+# a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+# License for the specific language governing permissions and limitations
+# under the License.
+
+from __future__ import unicode_literals
+
+import logging
+import os
+
+logger = logging.getLogger(__package__)
+ch = logging.StreamHandler()
+formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
+ch.setFormatter(formatter)
+logger.addHandler(ch)
+logger.setLevel(logging.INFO)
+
+LOGS_DIR = os.environ.get('LOGS_DIR', os.getcwd())
+
+os.environ["ENV_NAME"] = "some_environment"
+os.environ["ISO_PATH"] = "./fuel.iso"
+os.environ["CENTOS_CLOUD_IMAGE_PATH"] = "./centos-cloud-image.img"
+
+JENKINS = {
+ 'url': os.environ.get('JENKINS_URL', 'http://localhost/'),
+ 'magnet_link_artifact': os.environ.get('JENKINS_MAGNET_LINK_ARTIFACT',
+ 'magnet_link.txt'),
+ 'username': os.environ.get('JENKINS_USER', None),
+ 'password': os.environ.get('JENKINS_PASS', None),
+ 'job_name': os.environ.get('TEST_RUNNER_JOB_NAME', '10.0.swarm.runner'),
+ 'test_runner': os.environ.get('TESTS_RUNNER', '10.0.swarm.runner'),
+ 'xml_testresult_file_name': os.environ.get('TEST_XML_RESULTS',
+ 'nosetests.xml')
+}
+
+GROUPS_TO_EXPAND = [
+ 'setup_master', 'prepare_release', 'prepare_slaves_1', 'prepare_slaves_3',
+ 'prepare_slaves_5', 'prepare_slaves_9']
+
+FAILURE_GROUPING = {'threshold': 0.04, 'max_len_diff': 0.1}
+
+
+class LaunchpadSettings(object):
+ """LaunchpadSettings.""" # TODO documentation
+
+ project = os.environ.get('LAUNCHPAD_PROJECT', 'fuel')
+ milestone = os.environ.get('LAUNCHPAD_MILESTONE', '10.0')
+ closed_statuses = [
+ os.environ.get('LAUNCHPAD_RELEASED_STATUS', 'Fix Released'),
+ os.environ.get('LAUNCHPAD_INVALID_STATUS', 'Invalid')
+ ]
+
+
+class TestRailSettings(object):
+ """TestRailSettings.""" # TODO documentation
+
+ url = os.environ.get('TESTRAIL_URL')
+ user = os.environ.get('TESTRAIL_USER', 'user@example.com')
+ password = os.environ.get('TESTRAIL_PASSWORD', 'password')
+ project = os.environ.get('TESTRAIL_PROJECT', 'Fuel')
+ milestone = os.environ.get('TESTRAIL_MILESTONE', '10.0')
+ tests_suite = os.environ.get('TESTRAIL_TEST_SUITE',
+ '[{0}] Swarm'.format(milestone))
+ tests_section = os.environ.get('TESTRAIL_TEST_SECTION', 'All')
+ tests_include = os.environ.get('TESTRAIL_TEST_INCLUDE', None)
+ tests_exclude = os.environ.get('TESTRAIL_TEST_EXCLUDE', None)
+ previous_results_depth = os.environ.get('TESTRAIL_TESTS_DEPTH', 5)
+ operation_systems = []
+ centos_enabled = os.environ.get('USE_CENTOS', 'false') == 'true'
+ ubuntu_enabled = os.environ.get('USE_UBUNTU', 'true') == 'true'
+ if centos_enabled:
+ operation_systems.append(os.environ.get(
+ 'TESTRAIL_CENTOS_RELEASE', 'Centos 6.5'))
+ if ubuntu_enabled:
+ operation_systems.append(os.environ.get(
+ 'TESTRAIL_UBUNTU_RELEASE', 'Ubuntu 14.04'))
+ stauses = {
+ 'passed': ['passed'],
+ 'failed': ['failed', 'product_failed', 'test_failed', 'infra_failed'],
+ 'blocked': ['blocked']
+ }
+ max_results_per_request = 250
diff --git a/unified_test_reporter/tests/testrail_test.py b/unified_test_reporter/tests/testrail_test.py
new file mode 100644
index 0000000..5da6237
--- /dev/null
+++ b/unified_test_reporter/tests/testrail_test.py
@@ -0,0 +1,43 @@
+import subprocess
+
+command_list = [
+'cd /home/krozin/@Git/MIRANTIS/unified_test_reporter',
+'CUR_DIR=\$\(pwd\)',
+'FUELQA_DIR=/home/krozin/\@Git/MIRANTIS/fuel-qa',
+'export PYTHONPATH="\${PYTHONPATH}:\$CUR_DIR:\$FUELQA_DIR"',
+'export JENKINS_URL=https://product-ci.infra.mirantis.net',
+'export TESTRAIL_URL=https://mirantis.testrail.com',
+'export TESTRAIL_PROJECT="Mirantis OpenStack"',
+'export TESTRAIL_USER=all@mirantis.com',
+'export TESTRAIL_PASSWORD=mirantis1C@@L',
+'export TESTS_RUNNER=10.0.swarm.runner',
+'export TEST_RUNNER_JOB_NAME=10.0.swarm.runner',
+'export TESTRAIL_TEST_SUITE=\'[10.0] Swarm\'',
+'export TESTRAIL_MILESTONE=10.0',
+'export LAUNCHPAD_MILESTONE=10.0']
+
+for i in command_list:
+ print i
+ subprocess.call(i, shell=True)
+
+RUN_ID = 15122
+RUN_ID = 14965
+
+from unified_test_reporter.providers.testrail_client import TestRailProject
+from unified_test_reporter.settings import TestRailSettings
+import json
+
+testRailPlan = TestRailProject(url=TestRailSettings.url,
+ user=TestRailSettings.user,
+ password=TestRailSettings.password,
+ project=TestRailSettings.project)
+
+tests = testRailPlan.get_tests(RUN_ID)
+results = testRailPlan.get_results_for_run(RUN_ID)
+bugs = testRailPlan.get_bugs(RUN_ID)
+urls = testRailPlan.get_testrail_test_urls(RUN_ID, 'setup_master_multiracks_2')
+
+json.dump(tests, open('/home/krozin/Documents/{}_tests.json'.format(RUN_ID), 'w'))
+json.dump(results, open('/home/krozin/Documents/{}_result.json'.format(RUN_ID), 'w'))
+json.dump(bugs, open('/home/krozin/Documents/{}_bugs.json'.format(RUN_ID), 'w'))
+json.dump(urls, open('/home/krozin/Documents/{}_urls.json'.format(RUN_ID), 'w'))