Test Automation with Python Behave and Ansible

In a recent role that involved production support of an in-house framework for IBM WAS, I took the opportunity to venture into the world of testing automation. This post highlights the features and design decisions of the implementation.

The inhouse developed IBM WAS framework (IWF) had been developed over 5 years. It was sophisticated and featured a CLI (command line interface) to assist with the many settings required to configure WebSphere. Unfortunately, the years of development started to show in the spaghetti BASH wrappers upon wrappers, Jython, sprinkled with Java here and there.

IWF did not facilitate release automation and there was an urgent need to start automating the last mile problem. The immediate pain-point was that the actual installation was a manual step. Among other annoyances, installation entailed :

  • downloading the framework packages and pre-requisite packages
  • running the installer interactively and eyeballing the process
  • no certificate management

The idea was that implementation of a continous testing strategy would also provide the building blocks that could fill in the gaps mentioned above.

The plan was to integrate the Behave python library for test automation and Ansible for runbook automation. The organisation was starting to roll out Jenkins as the enterprise build tool so this was leveraged to implement push-button distribution and trusted delegation for self-servicing. I code named this initiative as the SWAT (S**** Websphere Automated Testing) framework.

This design approach facilitates reusable code as documentation that can be used for day to day administration to replace manual tasks; such as procuring the PFX certificate archive and passphrase, and the act of renewing certificates on behalf of the application team.

The intent is that runbook automation allows these tasks to be delegated to the applications team as a self service with a proper security architecture for controlled executions.

While SWAT is not a true alignment to the principles and philosophies behind behaviour driven development (because as Ops we are not the team developing the product and features), it is an acceptable appropriation of the concept to support our testing methodologies, whether for a release or for continous testing. As noted, automated testing is a by-product of test and behaviour driven development.

A runbook abstracts low level manual tasks into readable high level code that is descriptive and detailed enough to also serve as documentation and that can be easily consumed by users, including an automation framework for our purposes, to execute repeatable tasks. These are called "playbooks" in Ansible.

The below points attempts to align the features of the inhouse framework with the principles of behaviour driven testing:

  • Each CLI command is treated as a feature that needs to be tested.
  • Each CLI command has multiple options that, depending on the command, needs to be tested individually and/or every combination of options must also be tested. The different combinations are written as test scenarios.
  • A feature file is created for each command to be tested. Within the feature file, a test scenario is created for each command option combination to be tested. The Gherkin language used to organise and describe the test scenarios.
  • The code logic that executes the tests is implemented as a Ansible playbooks

Behave

Behave is an open source project for implementing behaviour driven development (BDD) practices. Central to all BDD tools, it also uses the Gherkin language to encapsulate business functionality testing in natural language, enabling business owner engagement at the earliest in the development process.

A Gherkin Feature file for testing the SSL Security command

Here is an example of a 'feature' that was tested.

Feature: configure-ssl-security command

  Background: Ensure IWF setup and we are operating the desired server
    Given we have IWF installed
    And we are operating "server00"


  @fixture.emergency_shell.iwf_admin
  Scenario: execute the configure-ssl-security command with --pfxFile
    When we invoke the configure-ssl-security command with "--pfxFile" and values
      |pfx_file           |cert_alias                        |
      |/tmp/iwf-dev.pfx   |iwf development certificate alias |

    Then security.properties is updated
    And the new certificate works

The Step implementation

A step implementation is required to turn the above natural language specifications into actionable code. Behave step implmentations are written in Python.

We use pexpect to execute an Ansible playbook with the ansible-playbook command. pexpect captures the playbook output for further processing.

@when(u'we execute the configure-ssl-security command with {option} and values')
def step_impl(context, option):

    responses = []

    for row in context.table:
        response = {}
        pfx_file' = row['pfx_file']
        cert_alias = row['cert_alias']
        cmd = 'ansible-playbook runbooks/configure_ssl_security.yml --extra-vars "download_file={} friendly_name={}"'
.format(pfx_file, cert_alias)
        (output,status) = pexpect.run(cmd, withexitstatus=True, timeout=300)

        # record the command execution log
        m = re.search(r'logfile=([/\-\d\w.]+)', output)
        response['execution_log_file'] = m.group(1)
        response['execution_status'] = status
        resonses.append(response)

    context.responses = responses

Ansible

Ansible is my automation framework of choice. It competes with Puppet and Chef for mindshare in the Devops world. The factors differentiating Ansible from the others for me include:

  • agentless and based on Python 2.7 and 3+ so out of the box support on Linux
  • playbooks are written in YAML and highly readable
  • playbooks are imperative leading to predictability in the execution of resources
  • easily extensible with custom facts, modules, plugins and roles
  • just like Puppet Forge and Chef Supermarket, Ansible Galaxy has all categories of functionality written by the community
  • modules are easy to write
  • orchestration comes out of the box
  • simple integration with scheduling and runbook automation tools like Rundeck

An Ansible playbook to automate certificate installs

Here is the playbook referenced in the above step implementation. This playbook downloads and install the specified certificates. It includes tasks from a custom role to do all the heavy lifting.

This playbook is resuable as a standalone task when the operator needs to perform these operations on an ad-hoc basis.

---
- hosts: localhost
  roles:
    - iwf-role
  tasks:
    - name: install was dev certificate
      include_role:
        name: iwf_role
        tasks_from: install_certificates
      vars:
        url: '{{ url }}'
        username: "{{ username | default(lookup('env', 'username')) }}"
        password: "{{ password | default(lookup('env', 'password')) }}"
        download_format: '{{ download_format }}'
        download_file: '{{ download_file }}'
        certificate_id: '{{ certificate_id }}'
        validate_certs: '{{ validate_certs | default(True) }}'
        friendly_name: '{{ friendly_name | default() }}'
      register: output

The custom IWF role

An Ansible Role was developed to manage an deployment. The role encapsulates logic to interact with the IWF through its CLI as well as auxillary functions such as downloading and installing certificates.

This is an example role task to download certiifcates from the Venafi TPP service using the venafi_tpp module. It then executes the IWF command to install the certificate using the configure_ssl_security module.

These modules are also custom developed and included in the IWF role.

# runbooks/roles/iwf_role/task/install_certificates.yml
---
- name: generate random passphrase
  set_fact: passphrase={{ lookup('password', '/dev/null length=15 chars=ascii_letters') }}

- name: 'download certificate {{ certificate_id }}'
  venafi_tpp:
    url: '{{ url }}'
    download_format: '{{ download_format }}'
    download_file: '{{ download_file }}'
    certificate_id: '{{ certificate_id }}'
    passphrase: '{{ passphrase }}'
    validate_certs: '{{ validate_certs }}'
    username: '{{ username }}'
    password: '{{ password }}'
    friendly_name: '{{ friendly_name | default() }}'

- name: execute configure ssl security
  configure_ssl_security:
    validate_keystore_only: false
    no_health_check: true
    passphrase: '{{ passphrase }}'
    pfx_file: '{{ download_file }}'
  register: output

- name: log file
  debug:
    msg: "logfile={{output['log_file']}}"

IWF Role modules

The 'configuresslsecurity' module

This is the code for the configure_ssl_security module.

#!/usr/bin/python

ANSIBLE_METADATA = {
    'metadata_version': '1.1',
    'status': ['preview'],
    'supported_by': 'community'
}

DOCUMENTATION = '''
---
module: configure_ssl_security
short_description: executes the configure-ssl-security command
options:
    timeout:
        description:
            - command timeout
        required: false
    timeout:
        description:
            - Amount of time in seconds to wait for the expected strings
        default: 300
        required: false
    requirements:
    - python >= 2.6
    - pexpect >= 3.3
'''
import os
import re
import subprocess
import sys
from hamcrest import assert_that, contains_string, matches_regexp, is_not
import pexpect
import datetime
import traceback
import logging
import json


try:
    import pexpect
    HAS_PEXPECT = True
except ImportError:
    HAS_PEXPECT = False

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native, to_text


def append_output(list, cmd, before=None, matched=None):
    item = dict()
    item['before'] = before
    item['matched_condition'] = matched
    item['response'] = cmd
    list.append(item)

def parse_arguments(module):
    '''
    converts the module arguments to canonical format expected by the CLI
   '''
    cmd = []
    aliases = module.aliases

    for (k, v) in module.params.items():

        try:
            # get the alias corresponding to the module argument
            alias = aliases.keys()[aliases.values().index(k)]

            # boolean args are always flags in the CLI
            if v is True:
                cmd.append(alias)
            elif v:
                cmd.append(alias)
                cmd.append(v)
        except ValueError:
            continue

    return cmd

'''
helper function to handle different responses from the configure-ssl-security command
'''
class expectSwitch(object):
    def __init__(self, passphrase=''):
        self.switch = {
            'pexpect_EOF': {
                'action': None,
                'pattern': pexpect.EOF,
                'sendline': None
            },
            'pexpect_TIMEOUT': {
                'action': None,
                'pattern': pexpect.TIMEOUT,
                'sendline': None
            },
            'command_complete': {
                'action': None,
                'pattern': r'The configure-ssl-security command has completed successfully\r\n\-+',
                'sendline': None
            },
            'iwf_server_start_successful': {
                'action': None,
                'pattern': r'ADMU3000I: Server iwfServer open for e-business; process id is',
                'sendline': None
            },
            'enter passphrase for certificate': {
                'action': None,
                'pattern': r"\r\nEnter the password for the keyStore '.*':\r\n",
                'sendline': passphrase
            },
            'warn_alias_not_match_previous_entry': {
                'action': None,
                'pattern': r"WARNING:.*Alias name .* from .* does not match previous alias name .* from .*Would you like to continue.*You must enter the number, rather than the word.\r\n1\) yes\r\n2\) no\r\n",
                'sendline': '1'
            },
            'command_output_log_file': {
                'action': None,
                'pattern': r"The script [-\w]+ is logging to: ([/\-\d\w.]+)\r\n",
                'sendline': None
            },
    }

    def expects(self):
        return [v['pattern'] for v in self.switch.values()]

    def responses(self):
        return [v['sendline'] for v in self.switch.values()]

    def messages(self):
        return self.switch.keys()


def main():
    module = AnsibleModule(
        argument_spec=dict(
            trust_environment=dict(
                choices=['dev', 'test', 'prod'],
                aliases=['--trustEnvironment']),
            passphrase=dict(required=True, no_log=True),
            timeout=dict(type='int', required=False, default=300),
            jks_file=dict(type='path', aliases=['--jksFile']),
            pfx_file=dict(type='path', aliases=['--pfxFile']),
            update_trust_only=dict(type='bool', aliases=['--updateTrustOnly']),
            dont_start=dict(type='bool', aliases=['--dontStart']),
            no_health_check=dict(type='bool', aliases=['--noHealthCheck']),
            validate_keystore_only=dict(
                type='bool', aliases=['--validateKeystoreOnly']),
            reapply_iwf_keystore_only=dict(
                type='bool', aliases=['--reapplyKeyStoreOnly']),
        ),
    )
    if not HAS_PEXPECT:
        module.fail_json(msg='The pexpect python module is required')
    debug_outputs = []
    append_output(debug_outputs, 'module.params', module.params)
    append_output(debug_outputs, 'module.aliases', module.aliases)
    # module.exit_json(debug=debug_outputs)

    trust_environment = module.params['trust_environment']
    passphrase = module.params['passphrase']
    jks_file = module.params['jks_file']
    pfx_file = module.params['pfx_file']
    update_trust_only = module.params['update_trust_only']
    dont_start = module.params['dont_start']
    no_health_check = module.params['no_health_check']
    validate_keystore_only = module.params['validate_keystore_only']
    reapply_keystore_only = module.params['reapply_keystore_only']
    timeout = module.params['timeout']


    startd = datetime.datetime.now()

    # construct the command + arguments based on the module arguments
    cmd = 'configure-ssl-security'
    cmd_args = parse_arguments(module)

    outputs = []
    rc = None
    changed = False
    log_file = None

    # get the list of prompts and reponses that are expected
    expect_switch = expectSwitch(passphrase=passphrase)
    expects = expect_switch.expects()
    responses = expect_switch.responses()
    messages = expect_switch.messages()

    p = pexpect.spawn(cmd, cmd_args, timeout=timeout)
    while True:

        # execute the command and wait for the expected conditions
        idx = p.expect(expects)

        # exit the loop if we get these exit signals
        if messages[idx] in ('pexpect_EOF', 'pexpect_TIMEOUT', 'command_complete'):

            # must close the shell to get the correct exitstatus
            p.sendline('exit')
            append_output(outputs, 'exiting shell', p.before, messages[idx])
            p.close()

            # only the keystore validation op won't make a system change
            if not validate_keystore_only and messages[idx] == 'command_complete':
                changed=True

            if rc is None:
                rc = p.exitstatus

            break

        # capture the IWF log file path from stdout
        elif messages[idx] == 'command_output_log_file':
            log_file = p.match.group(1)

        else:
            # get the response to the expected condition
            response = responses[idx]

            before = 'use -vvv to see text stream before the matched condition'
            if module._verbosity >= 3:
                before = p.before

            # log the response, matched condition (p.after), and output stream before the match.
            # stringify the match pattern, else TypeError exception when EOF/TIMEOUT matched
            # (these are special matches and are not string)
            append_output(outputs, response, before, to_text(p.after))

            # send the response if any to the child process
            if response:
                p.sendline(response)

    endd = datetime.datetime.now()
    delta = endd - startd

    result = dict(
        cmd=' '.join([cmd] + cmd_args),
        rc=rc,
        start=str(startd),
        end=str(endd),
        delta=str(delta),
        changed=changed,
        command_outputs=outputs,
        debug=debug_outputs,
        log_file=log_file,
    )

    if rc == 0:
        module.exit_json(**result)
    elif rc == 1:
        module.fail_json(
            msg='timed out waiting for expected pattern', **result)
    elif rc == 2:
        module.fail_json(msg='command exited unexpectedly', **result)
    else:
        module.fail_json(msg='unexpected exit status', **result)


if __name__ == '__main__':
    main()

The venafi_tpp module

This is the code for the venafi_tpp module. It understands how to interact with the Venafi TPP product to login, and download certificates.

This implementation uses the Venafi's private APIs to download certificates instead of the published APIs. This is due to the organisation not exposing Venafi's APIs. Hence the module also needs to cater for password authentication.

#!/usr/bin/python
# -*- coding: utf-8 -*-

from __future__ import absolute_import, division, print_function
__metaclass__ = type


ANSIBLE_METADATA = {'metadata_version': '1.1',
                    'status': ['preview'],
                    'supported_by': 'community'}


DOCUMENTATION = '''
---
module: venafi
requirements:
   - "python >= 2.6"
short_description: manage certificates using the Venafi Trust Protection Platform
options:
  username:
    description: Username for authenticating to the TPP
. Required if not using API token
  password:
    description: Password for authenticating to the TPP
. Required if not using API token
  api_token:
    description:
 Account API token.
 Required if not using password
  uri:
    description: TPP URI
    required: true
  port:
    description: Service port
  timeout:
    description:
      - Timeout for API calls
    default: 30

'''

import json
import os
import shutil
import tempfile
import traceback

from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six.moves.urllib.parse import quote, urlencode
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.urls import fetch_url

def write_file(module, dest, content):
    ''' helper function to write to file. Returns true if changed, false otherwise'''
    # create a tempfile with some test content
    fd, tmpsrc = tempfile.mkstemp()
    f = open(tmpsrc, 'wb')
    try:
        f.write(content)
    except Exception as e:
        os.remove(tmpsrc)
        module.fail_json(msg="failed to create temporary content file: %s" % to_native(e),
                         exception=traceback.format_exc())
    f.close()

    checksum_src = None
    checksum_dest = None

    # raise an error if there is no tmpsrc file
    if not os.path.exists(tmpsrc):
        os.remove(tmpsrc)
        module.fail_json(msg="Source '%s' does not exist" % tmpsrc)
    if not os.access(tmpsrc, os.R_OK):
        os.remove(tmpsrc)
        module.fail_json(msg="Source '%s' not readable" % tmpsrc)
    checksum_src = module.sha1(tmpsrc)

    # check if there is no dest file
    if os.path.exists(dest):
        # raise an error if copy has no permission on dest
        if not os.access(dest, os.W_OK):
            os.remove(tmpsrc)
            module.fail_json(msg="Destination '%s' not writable" % dest)
        if not os.access(dest, os.R_OK):
            os.remove(tmpsrc)
            module.fail_json(msg="Destination '%s' not readable" % dest)
        checksum_dest = module.sha1(dest)
    else:
        if not os.access(os.path.dirname(dest), os.W_OK):
            os.remove(tmpsrc)
            module.fail_json(msg="Destination dir '%s' not writable" % os.path.dirname(dest))

    if checksum_src != checksum_dest:
        try:
            shutil.copyfile(tmpsrc, dest)
            return True
        except Exception as e:
            os.remove(tmpsrc)
            module.fail_json(msg="failed to copy %s to %s: %s" % (tmpsrc, dest, to_native(e)),
                             exception=traceback.format_exc())

    os.remove(tmpsrc)
    return False

class VenafiTpp(object):

    changed = False

    formats_map = {
        'der': {
            'fileFormat': 'DER',
            'isFriendlyNameAvailable': False,
            'isPfxChainOrderAllowed': False,
            'isPrivateKeyAvailable': False,
            'isPrivateKeyRequired': False,
            'isRootChainAvailable': False,
            'pfxChainOrder': False,
            'isRootChainAvailable': False},
        'pem':{
            'fileFormat': 'PEM (OpenSSL)',
            'isFriendlyNameAvailable': False,
            'isPfxChainOrderAllowed': True,
            'isPrivateKeyAvailable': True,
            'isPrivateKeyRequired': False,
            'isRootChainAvailable': True,
            'pfxChainOrder': False
            },
        'pkcs7': {
            'fileFormat':'PKCS #7',
            'isFriendlyNameAvailable': False,
            'isPfxChainOrderAllowed': True,
            'isPrivateKeyAvailable': False,
            'isPrivateKeyRequired': False,
            'isRootChainAvailable': True,
            'pfxChainOrder': False},
        'pkcs8': {
            'fileFormat':'PEM (PKCS #8)',
            'isFriendlyNameAvailable': False,
            'isPfxChainOrderAllowed': True,
            'isPrivateKeyAvailable': True,
            'isPrivateKeyRequired': False,
            'isRootChainAvailable': True,
            'pfxChainOrder': False},
        'pkcs12': {
            'fileFormat':'PKCS #12',
            'isFriendlyNameAvailable': True,
            'isPfxChainOrderAllowed': False,
            'isPrivateKeyAvailable': True,
            'isPrivateKeyRequired': True,
            'isRootChainAvailable': True,
            'pfxChainOrder': False},
    }

    def __init__(self, module):
        self.module = module
        self.api_key = None

        try:
            self.download_format = self.formats_map.get(module.params['download_format'], None).get('fileFormat', None)
        except KeyError:
            self.module.fail_json(msg='VenafiTpp object init failed, {} is an invalid certificate file format'.format(module.params['download_format']))

    def auth(self):
        '''
        posts username and password to authenticate to the TPP aperture portal
        '''

        url = self.module.params['url'] + '/users/authorize'
        password = self.module.params['password']
        username = self.module.params['username']

        payload = {"username": username, "password": password}
        headers = {
            'Content-Type': 'application/json; charset=UTF-8',
            'X-ApplicationUrl': '/aperture/',
            'X-Requested-With': 'XMLHttpRequest',
        }

        try:
            resp, info = fetch_url(
                module=self.module,
                url=url,
                data=self.module.jsonify(payload),
                # data=self.module.jsonify(payload),
                headers=headers,
                method='POST')

            # catch exceptions handled by fetch_url
            if info['status'] == -1:
                self.module.fail_json(msg=info['msg'])

            json_resp = json.loads(resp.read())
            self.api_key = json_resp['apiKey']
            return json_resp, info

        except AttributeError:
            # there was no content, but the error read()
            # may have been stored in the info as 'body'
            return resp, info

        except Exception as err:
            self.module.fail_json(msg='unhandled exception: {}'.format(self.module.jsonify(err)))

    def _request(self, module, url, data, headers, method):
        '''
        a helper function to send http requests and parse the results
        '''

        resp, info = fetch_url(
            module=module,
            url=url,
            data=self.module.jsonify(data),
            headers=headers,
            method=method)

        try:
            if 'application/json' in info['content-type']:
                return json.loads(resp.read()), info
            return resp.read(), info
        except AttributeError:
            # there was no content, but the error read()
            # may have been stored in the info as 'body'
            return resp, info

    def retrieve_certificate(self):
        '''
        downloads a certificate identified by the certificate_id property.
        Return true if download_file is specified and was updated based on
        checksum diff and http header If-Modified-Since is newer than download_file
        modified timestamp
        Returns false otherwise
        '''

        if self.api_key is None:
            self.module.fail_json(msg='API key is missing, unable to download certificate')

        download_file = self.module.params['download_file']
        certificate_id = self.module.params['certificate_id']
        passphrase = self.module.params['passphrase']
        friendly_name = self.module.params['friendly_name']
        url_base = self.module.params['url']
        # wrap certificate GUID in curly braces and url-encode
        certificate_id = quote('{{{}}}'.format(certificate_id))

        url = '{}/certificates/{}/downloadUrl'.format(url_base, certificate_id)

        download_options = dict(
            fileFormat = self.download_format,
            includePrivateKey = False,
            includeChain = True,
            pfxRootFirst = False
        )

        if passphrase:
            download_options['password'] = passphrase
            download_options['includePrivateKey'] = True

        if friendly_name:
            download_options['friendlyName'] = friendly_name

        headers = {
            'Content-Type': 'application/json; charset=UTF-8',
            'X-ApplicationUrl': '/aperture/',
            'X-Requested-With': 'XMLHttpRequest',
            'Authorization': 'VENAFI ' + self.api_key
        }

        # POST the options for the certificate retrieval
        # and get the download URL
        try:
            resp, info = self._request(
                module=self.module,
                url=url,
                data=download_options,
                headers=headers,
                method='POST')
        except Exception as e:
            self.module.fail_json(msg='set download options failed', status=e)

        # validate the status info
        if info['status'] > 400:
            self.module.fail_json(msg='set download options failed', status=info)

        # certificate retrieval using the download URL
        try:
            url = '{}/{}'.format(url_base, resp['downloadUrl'])
            resp, info = self._request(
                module=self.module,
                url=url,
                data=None,
                headers=headers,
                method='GET')

        except Exception as e:
            self.module.fail_json(msg='retrieve failed', status=e)

        # validate the status info
        if info['status'] > 400:
            self.module.fail_json(msg='retrieve failed', status=info, response=resp)

        # return the contents and status info
        try:
            # write to file if needed and update the changed attribute
            if download_file is not None:
                self.changed = write_file(self.module, download_file, resp)

            # if there is a return response then return that as the content
            content = resp.read()
        except AttributeError:
            # if there is no response body then just return the response object
            content = resp
        except Exception as e:
            # add exception details to the info object
            info['exception'] = self.module.jsonify(e)

        info['changed'] = self.changed
        return content, info

def main():
    module = AnsibleModule(
        argument_spec=dict(
            state=dict(type='str', choices=['present', 'absent'], default='present'),
            url=dict(required=True, type='str'),
            username=dict(required=True, no_log=True, type='str'),
            password=dict(required=True, no_log=True, type='str'),
            passphrase=dict(no_log=True, type='str'),
            certificate_id=dict(default=None, type='str'),
            download_format=dict(type='str', choices=['json', 'pem', 'pkcs12'], default='pem'),
            download_file=dict(type='str'),
            friendly_name=dict(type='str'),
            timeout=dict(type='int', default=300),
            validate_certs=dict(type='bool', default=True),
        ),
        supports_check_mode=True,
        mutually_exclusive=[['api_token', 'username'],['api_token', 'password']],
        required_together=[['username', 'password']],
        required_if=([
            ('state', 'present', ['username', 'password']),
            ('download_format', 'pkcs12', ['download_file', 'passphrase']),
        ]
        ),
    )

    # exit if download file exists
    if os.path.exists(module.params['download_file']):
        module.exit_json(changed=False,msg="Skipped because download file exists")

    # parse and validate params
    tpp_api = VenafiTpp(module)
    resp, info = tpp_api.auth()
    resp, info = tpp_api.retrieve_certificate()


    if not info.get('changed', None):
        info['changed'] = False

    module.exit_json(changed=info['changed'], result={'info': info})

if __name__ == '__main__':
    main()

Jira Xray

All test features and scenarios are authored outside JIRA Xray and kept alongside the test implementation code. The benefits of doing this include:

  1. Language, syntax and formatting support in a IDE such VSCode for more efficient editing
  2. Version control
  3. Synchronising tests between our code and Xray test issues is easier via a Jenkins Pipeline stage, compared to manually creating Xray test issues in JIRA
  4. Test execution results can also be uploaded easily within the same Jenkins pipeline
  5. New test scenarios can still be composed within JIRA Xray, exported by the pipeline and executed in the testing stage, then results uploaded to the correct Xray test issues.

Some thoughts on the JIRA Xray issue status in context of release and testing lifecycles:

  1. the status of the Test issue itself is outside the concerns of the testing lifecycle
  2. the Xray Test issue can be moved to done and still be reverted to 'in-progress' - to indicate the development of that test is in progress
  3. the true indicator of the test status as concerned with the testing lifecycle is the test execution results. Each execution has it's own status.

Jenkins Pipeline

The GIT, Jenkins, JIRA Xray workflow:

  1. Create a new branch on the test git repo for the new release
  2. author any new Gherkin tests to cover new features
  3. implement the test code to enable the above new tests
  4. test to ensure the test codes are working
  5. reiterate the above steps until are working as expected
  6. once the new test codes are working and signed off, git tag the commit
  7. Execute the Jenkins testing pipeline
  8. If the testing pipeline is successful then merge the release branch back to master and delete the branch

This ensures the Test repository is evolving in line with new features released and is alway up to date for continual regression testing

Here is the Jenkins pipeline for deploying and executing the SWAT tests.

pipeline {
    agent { label 'swatNode'}
    parameters {
        string(name: 'projectKey', defaultValue: 'SWAT')
        string(name: 'jiraSite', defaultValue: 'Ent Jira')
        string(name: 'jiraInstanceId', defaultValue: '7e023757-c0e6-4eca-9168-9b3e522f8fd7')
        string(name: 'gitUrl', defaultValue: 'https:itbucket.company.com/scm/swat/test-automation.git')

    }
    stages {
        stage('install SWAT framework')  {
            steps{
                sh 'make install'
            }
        }

        stage('Synch Tests with JIRA Xray'){
            steps {
                git branch: "${env.GIT_BRANCH}", changelog: false, credentialsId: '170c8996-4bca-45ec-88d9-4374bdc90c1f', poll: false, url: "${params.gitUrl}"
                step([$class: 'XrayImportFeatureBuilder', folderPath: 'features', lastModified: '', projectKey: params.projectKey, serverInstance: params.jiraInstanceId])
            }
        }

        stage('Export tests from Xray'){
            steps {
                sh "rm features/*.feature"
                step([$class: 'XrayExportBuilder', filePath: 'features', serverInstance: params.jiraInstanceId, filter: '86314'])
            }
        }

        stage('Execute Behave tests'){
            steps {
                sh '''
                    . venv/bin/activate
                    behave -f json.pretty -o outputs/results.json features
                '''
            }
        }
    }

    post {
        always {
            step([$class: 'XrayImportBuilder', endpointName: '/behave', importFilePath: 'outputs/results.json', serverInstance: params.jiraInstanceId])
        }
    }
}

The pipeline is fairly self explanatory:

  1. Running on the SWAT testing nodes where the IWF is already installed, the SWAT framework is now installed
  2. Synchronise our Gherkin features with Jira Xray
  3. Now that all tests are in Xray with their own issue keys, let's download them from Xray so that all test scenarios are properly tagged with Jira issues keys
  4. To execute the tests, activate the Python virtual environment and run Behave. Virtual Environment is part of the SWAT installation from step 1 and is a great way to manage software dependencies
  5. Finally the test execution results are uploaded back into Xray as unique issues and are automatically linked to the test issues. A post is required because if any of the test executions failed then the Jenkins declarative style pipeline automatically skips subsequent stages.