In a recent role that involved production support of an in-house framework for IBM WAS, I took the opportunity to venture into the world of testing automation. This post highlights the features and design decisions of the implementation.
The inhouse developed IBM WAS framework (IWF) had been developed over 5 years. It was sophisticated and featured a CLI (command line interface) to assist with the many settings required to configure WebSphere. Unfortunately, the years of development started to show in the spaghetti BASH wrappers upon wrappers, Jython, sprinkled with Java here and there.
IWF did not facilitate release automation and there was an urgent need to start automating the last mile problem. The immediate pain-point was that the actual installation was a manual step. Among other annoyances, installation entailed :
The idea was that implementation of a continous testing strategy would also provide the building blocks that could fill in the gaps mentioned above.
The plan was to integrate the Behave python library for test automation and Ansible for runbook automation. The organisation was starting to roll out Jenkins as the enterprise build tool so this was leveraged to implement push-button distribution and trusted delegation for self-servicing. I code named this initiative as the SWAT (S**** Websphere Automated Testing) framework.
This design approach facilitates reusable code as documentation that can be used for day to day administration to replace manual tasks; such as procuring the PFX certificate archive and passphrase, and the act of renewing certificates on behalf of the application team.
The intent is that runbook automation allows these tasks to be delegated to the applications team as a self service with a proper security architecture for controlled executions.
While SWAT is not a true alignment to the principles and philosophies behind behaviour driven development (because as Ops we are not the team developing the product and features), it is an acceptable appropriation of the concept to support our testing methodologies, whether for a release or for continous testing. As noted, automated testing is a by-product of test and behaviour driven development.
A runbook abstracts low level manual tasks into readable high level code that is descriptive and detailed enough to also serve as documentation and that can be easily consumed by users, including an automation framework for our purposes, to execute repeatable tasks. These are called "playbooks" in Ansible.
The below points attempts to align the features of the inhouse framework with the principles of behaviour driven testing:
Behave is an open source project for implementing behaviour driven development (BDD) practices. Central to all BDD tools, it also uses the Gherkin language to encapsulate business functionality testing in natural language, enabling business owner engagement at the earliest in the development process.
Here is an example of a 'feature' that was tested.
Feature: configure-ssl-security command
Background: Ensure IWF setup and we are operating the desired server
Given we have IWF installed
And we are operating "server00"
@fixture.emergency_shell.iwf_admin
Scenario: execute the configure-ssl-security command with --pfxFile
When we invoke the configure-ssl-security command with "--pfxFile" and values
|pfx_file |cert_alias |
|/tmp/iwf-dev.pfx |iwf development certificate alias |
Then security.properties is updated
And the new certificate works
A step implementation is required to turn the above natural language specifications into actionable code. Behave step implmentations are written in Python.
We use pexpect
to execute an Ansible playbook with the ansible-playbook command. pexpect
captures the playbook output for further processing.
@when(u'we execute the configure-ssl-security command with {option} and values')
def step_impl(context, option):
responses = []
for row in context.table:
response = {}
pfx_file' = row['pfx_file']
cert_alias = row['cert_alias']
cmd = 'ansible-playbook runbooks/configure_ssl_security.yml --extra-vars "download_file={} friendly_name={}"'
.format(pfx_file, cert_alias)
(output,status) = pexpect.run(cmd, withexitstatus=True, timeout=300)
# record the command execution log
m = re.search(r'logfile=([/\-\d\w.]+)', output)
response['execution_log_file'] = m.group(1)
response['execution_status'] = status
resonses.append(response)
context.responses = responses
Ansible is my automation framework of choice. It competes with Puppet and Chef for mindshare in the Devops world. The factors differentiating Ansible from the others for me include:
Here is the playbook referenced in the above step implementation. This playbook downloads and install the specified certificates. It includes tasks from a custom role to do all the heavy lifting.
This playbook is resuable as a standalone task when the operator needs to perform these operations on an ad-hoc basis.
---
- hosts: localhost
roles:
- iwf-role
tasks:
- name: install was dev certificate
include_role:
name: iwf_role
tasks_from: install_certificates
vars:
url: '{{ url }}'
username: "{{ username | default(lookup('env', 'username')) }}"
password: "{{ password | default(lookup('env', 'password')) }}"
download_format: '{{ download_format }}'
download_file: '{{ download_file }}'
certificate_id: '{{ certificate_id }}'
validate_certs: '{{ validate_certs | default(True) }}'
friendly_name: '{{ friendly_name | default() }}'
register: output
An Ansible Role was developed to manage an deployment. The role encapsulates logic to interact with the IWF through its CLI as well as auxillary functions such as downloading and installing certificates.
This is an example role task to download certiifcates from the Venafi TPP service using the venafi_tpp
module. It then executes the IWF command to install the certificate using the configure_ssl_security
module.
These modules are also custom developed and included in the IWF role.
# runbooks/roles/iwf_role/task/install_certificates.yml
---
- name: generate random passphrase
set_fact: passphrase={{ lookup('password', '/dev/null length=15 chars=ascii_letters') }}
- name: 'download certificate {{ certificate_id }}'
venafi_tpp:
url: '{{ url }}'
download_format: '{{ download_format }}'
download_file: '{{ download_file }}'
certificate_id: '{{ certificate_id }}'
passphrase: '{{ passphrase }}'
validate_certs: '{{ validate_certs }}'
username: '{{ username }}'
password: '{{ password }}'
friendly_name: '{{ friendly_name | default() }}'
- name: execute configure ssl security
configure_ssl_security:
validate_keystore_only: false
no_health_check: true
passphrase: '{{ passphrase }}'
pfx_file: '{{ download_file }}'
register: output
- name: log file
debug:
msg: "logfile={{output['log_file']}}"
This is the code for the configure_ssl_security
module.
#!/usr/bin/python
ANSIBLE_METADATA = {
'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'
}
DOCUMENTATION = '''
---
module: configure_ssl_security
short_description: executes the configure-ssl-security command
options:
timeout:
description:
- command timeout
required: false
timeout:
description:
- Amount of time in seconds to wait for the expected strings
default: 300
required: false
requirements:
- python >= 2.6
- pexpect >= 3.3
'''
import os
import re
import subprocess
import sys
from hamcrest import assert_that, contains_string, matches_regexp, is_not
import pexpect
import datetime
import traceback
import logging
import json
try:
import pexpect
HAS_PEXPECT = True
except ImportError:
HAS_PEXPECT = False
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils._text import to_native, to_text
def append_output(list, cmd, before=None, matched=None):
item = dict()
item['before'] = before
item['matched_condition'] = matched
item['response'] = cmd
list.append(item)
def parse_arguments(module):
'''
converts the module arguments to canonical format expected by the CLI
'''
cmd = []
aliases = module.aliases
for (k, v) in module.params.items():
try:
# get the alias corresponding to the module argument
alias = aliases.keys()[aliases.values().index(k)]
# boolean args are always flags in the CLI
if v is True:
cmd.append(alias)
elif v:
cmd.append(alias)
cmd.append(v)
except ValueError:
continue
return cmd
'''
helper function to handle different responses from the configure-ssl-security command
'''
class expectSwitch(object):
def __init__(self, passphrase=''):
self.switch = {
'pexpect_EOF': {
'action': None,
'pattern': pexpect.EOF,
'sendline': None
},
'pexpect_TIMEOUT': {
'action': None,
'pattern': pexpect.TIMEOUT,
'sendline': None
},
'command_complete': {
'action': None,
'pattern': r'The configure-ssl-security command has completed successfully\r\n\-+',
'sendline': None
},
'iwf_server_start_successful': {
'action': None,
'pattern': r'ADMU3000I: Server iwfServer open for e-business; process id is',
'sendline': None
},
'enter passphrase for certificate': {
'action': None,
'pattern': r"\r\nEnter the password for the keyStore '.*':\r\n",
'sendline': passphrase
},
'warn_alias_not_match_previous_entry': {
'action': None,
'pattern': r"WARNING:.*Alias name .* from .* does not match previous alias name .* from .*Would you like to continue.*You must enter the number, rather than the word.\r\n1\) yes\r\n2\) no\r\n",
'sendline': '1'
},
'command_output_log_file': {
'action': None,
'pattern': r"The script [-\w]+ is logging to: ([/\-\d\w.]+)\r\n",
'sendline': None
},
}
def expects(self):
return [v['pattern'] for v in self.switch.values()]
def responses(self):
return [v['sendline'] for v in self.switch.values()]
def messages(self):
return self.switch.keys()
def main():
module = AnsibleModule(
argument_spec=dict(
trust_environment=dict(
choices=['dev', 'test', 'prod'],
aliases=['--trustEnvironment']),
passphrase=dict(required=True, no_log=True),
timeout=dict(type='int', required=False, default=300),
jks_file=dict(type='path', aliases=['--jksFile']),
pfx_file=dict(type='path', aliases=['--pfxFile']),
update_trust_only=dict(type='bool', aliases=['--updateTrustOnly']),
dont_start=dict(type='bool', aliases=['--dontStart']),
no_health_check=dict(type='bool', aliases=['--noHealthCheck']),
validate_keystore_only=dict(
type='bool', aliases=['--validateKeystoreOnly']),
reapply_iwf_keystore_only=dict(
type='bool', aliases=['--reapplyKeyStoreOnly']),
),
)
if not HAS_PEXPECT:
module.fail_json(msg='The pexpect python module is required')
debug_outputs = []
append_output(debug_outputs, 'module.params', module.params)
append_output(debug_outputs, 'module.aliases', module.aliases)
# module.exit_json(debug=debug_outputs)
trust_environment = module.params['trust_environment']
passphrase = module.params['passphrase']
jks_file = module.params['jks_file']
pfx_file = module.params['pfx_file']
update_trust_only = module.params['update_trust_only']
dont_start = module.params['dont_start']
no_health_check = module.params['no_health_check']
validate_keystore_only = module.params['validate_keystore_only']
reapply_keystore_only = module.params['reapply_keystore_only']
timeout = module.params['timeout']
startd = datetime.datetime.now()
# construct the command + arguments based on the module arguments
cmd = 'configure-ssl-security'
cmd_args = parse_arguments(module)
outputs = []
rc = None
changed = False
log_file = None
# get the list of prompts and reponses that are expected
expect_switch = expectSwitch(passphrase=passphrase)
expects = expect_switch.expects()
responses = expect_switch.responses()
messages = expect_switch.messages()
p = pexpect.spawn(cmd, cmd_args, timeout=timeout)
while True:
# execute the command and wait for the expected conditions
idx = p.expect(expects)
# exit the loop if we get these exit signals
if messages[idx] in ('pexpect_EOF', 'pexpect_TIMEOUT', 'command_complete'):
# must close the shell to get the correct exitstatus
p.sendline('exit')
append_output(outputs, 'exiting shell', p.before, messages[idx])
p.close()
# only the keystore validation op won't make a system change
if not validate_keystore_only and messages[idx] == 'command_complete':
changed=True
if rc is None:
rc = p.exitstatus
break
# capture the IWF log file path from stdout
elif messages[idx] == 'command_output_log_file':
log_file = p.match.group(1)
else:
# get the response to the expected condition
response = responses[idx]
before = 'use -vvv to see text stream before the matched condition'
if module._verbosity >= 3:
before = p.before
# log the response, matched condition (p.after), and output stream before the match.
# stringify the match pattern, else TypeError exception when EOF/TIMEOUT matched
# (these are special matches and are not string)
append_output(outputs, response, before, to_text(p.after))
# send the response if any to the child process
if response:
p.sendline(response)
endd = datetime.datetime.now()
delta = endd - startd
result = dict(
cmd=' '.join([cmd] + cmd_args),
rc=rc,
start=str(startd),
end=str(endd),
delta=str(delta),
changed=changed,
command_outputs=outputs,
debug=debug_outputs,
log_file=log_file,
)
if rc == 0:
module.exit_json(**result)
elif rc == 1:
module.fail_json(
msg='timed out waiting for expected pattern', **result)
elif rc == 2:
module.fail_json(msg='command exited unexpectedly', **result)
else:
module.fail_json(msg='unexpected exit status', **result)
if __name__ == '__main__':
main()
This is the code for the venafi_tpp
module. It understands how to interact with the Venafi TPP product to login, and download certificates.
This implementation uses the Venafi's private APIs to download certificates instead of the published APIs. This is due to the organisation not exposing Venafi's APIs. Hence the module also needs to cater for password authentication.
#!/usr/bin/python
# -*- coding: utf-8 -*-
from __future__ import absolute_import, division, print_function
__metaclass__ = type
ANSIBLE_METADATA = {'metadata_version': '1.1',
'status': ['preview'],
'supported_by': 'community'}
DOCUMENTATION = '''
---
module: venafi
requirements:
- "python >= 2.6"
short_description: manage certificates using the Venafi Trust Protection Platform
options:
username:
description: Username for authenticating to the TPP
. Required if not using API token
password:
description: Password for authenticating to the TPP
. Required if not using API token
api_token:
description:
Account API token.
Required if not using password
uri:
description: TPP URI
required: true
port:
description: Service port
timeout:
description:
- Timeout for API calls
default: 30
'''
import json
import os
import shutil
import tempfile
import traceback
from ansible.module_utils.basic import AnsibleModule
from ansible.module_utils.six.moves.urllib.parse import quote, urlencode
from ansible.module_utils._text import to_native, to_text
from ansible.module_utils.urls import fetch_url
def write_file(module, dest, content):
''' helper function to write to file. Returns true if changed, false otherwise'''
# create a tempfile with some test content
fd, tmpsrc = tempfile.mkstemp()
f = open(tmpsrc, 'wb')
try:
f.write(content)
except Exception as e:
os.remove(tmpsrc)
module.fail_json(msg="failed to create temporary content file: %s" % to_native(e),
exception=traceback.format_exc())
f.close()
checksum_src = None
checksum_dest = None
# raise an error if there is no tmpsrc file
if not os.path.exists(tmpsrc):
os.remove(tmpsrc)
module.fail_json(msg="Source '%s' does not exist" % tmpsrc)
if not os.access(tmpsrc, os.R_OK):
os.remove(tmpsrc)
module.fail_json(msg="Source '%s' not readable" % tmpsrc)
checksum_src = module.sha1(tmpsrc)
# check if there is no dest file
if os.path.exists(dest):
# raise an error if copy has no permission on dest
if not os.access(dest, os.W_OK):
os.remove(tmpsrc)
module.fail_json(msg="Destination '%s' not writable" % dest)
if not os.access(dest, os.R_OK):
os.remove(tmpsrc)
module.fail_json(msg="Destination '%s' not readable" % dest)
checksum_dest = module.sha1(dest)
else:
if not os.access(os.path.dirname(dest), os.W_OK):
os.remove(tmpsrc)
module.fail_json(msg="Destination dir '%s' not writable" % os.path.dirname(dest))
if checksum_src != checksum_dest:
try:
shutil.copyfile(tmpsrc, dest)
return True
except Exception as e:
os.remove(tmpsrc)
module.fail_json(msg="failed to copy %s to %s: %s" % (tmpsrc, dest, to_native(e)),
exception=traceback.format_exc())
os.remove(tmpsrc)
return False
class VenafiTpp(object):
changed = False
formats_map = {
'der': {
'fileFormat': 'DER',
'isFriendlyNameAvailable': False,
'isPfxChainOrderAllowed': False,
'isPrivateKeyAvailable': False,
'isPrivateKeyRequired': False,
'isRootChainAvailable': False,
'pfxChainOrder': False,
'isRootChainAvailable': False},
'pem':{
'fileFormat': 'PEM (OpenSSL)',
'isFriendlyNameAvailable': False,
'isPfxChainOrderAllowed': True,
'isPrivateKeyAvailable': True,
'isPrivateKeyRequired': False,
'isRootChainAvailable': True,
'pfxChainOrder': False
},
'pkcs7': {
'fileFormat':'PKCS #7',
'isFriendlyNameAvailable': False,
'isPfxChainOrderAllowed': True,
'isPrivateKeyAvailable': False,
'isPrivateKeyRequired': False,
'isRootChainAvailable': True,
'pfxChainOrder': False},
'pkcs8': {
'fileFormat':'PEM (PKCS #8)',
'isFriendlyNameAvailable': False,
'isPfxChainOrderAllowed': True,
'isPrivateKeyAvailable': True,
'isPrivateKeyRequired': False,
'isRootChainAvailable': True,
'pfxChainOrder': False},
'pkcs12': {
'fileFormat':'PKCS #12',
'isFriendlyNameAvailable': True,
'isPfxChainOrderAllowed': False,
'isPrivateKeyAvailable': True,
'isPrivateKeyRequired': True,
'isRootChainAvailable': True,
'pfxChainOrder': False},
}
def __init__(self, module):
self.module = module
self.api_key = None
try:
self.download_format = self.formats_map.get(module.params['download_format'], None).get('fileFormat', None)
except KeyError:
self.module.fail_json(msg='VenafiTpp object init failed, {} is an invalid certificate file format'.format(module.params['download_format']))
def auth(self):
'''
posts username and password to authenticate to the TPP aperture portal
'''
url = self.module.params['url'] + '/users/authorize'
password = self.module.params['password']
username = self.module.params['username']
payload = {"username": username, "password": password}
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'X-ApplicationUrl': '/aperture/',
'X-Requested-With': 'XMLHttpRequest',
}
try:
resp, info = fetch_url(
module=self.module,
url=url,
data=self.module.jsonify(payload),
# data=self.module.jsonify(payload),
headers=headers,
method='POST')
# catch exceptions handled by fetch_url
if info['status'] == -1:
self.module.fail_json(msg=info['msg'])
json_resp = json.loads(resp.read())
self.api_key = json_resp['apiKey']
return json_resp, info
except AttributeError:
# there was no content, but the error read()
# may have been stored in the info as 'body'
return resp, info
except Exception as err:
self.module.fail_json(msg='unhandled exception: {}'.format(self.module.jsonify(err)))
def _request(self, module, url, data, headers, method):
'''
a helper function to send http requests and parse the results
'''
resp, info = fetch_url(
module=module,
url=url,
data=self.module.jsonify(data),
headers=headers,
method=method)
try:
if 'application/json' in info['content-type']:
return json.loads(resp.read()), info
return resp.read(), info
except AttributeError:
# there was no content, but the error read()
# may have been stored in the info as 'body'
return resp, info
def retrieve_certificate(self):
'''
downloads a certificate identified by the certificate_id property.
Return true if download_file is specified and was updated based on
checksum diff and http header If-Modified-Since is newer than download_file
modified timestamp
Returns false otherwise
'''
if self.api_key is None:
self.module.fail_json(msg='API key is missing, unable to download certificate')
download_file = self.module.params['download_file']
certificate_id = self.module.params['certificate_id']
passphrase = self.module.params['passphrase']
friendly_name = self.module.params['friendly_name']
url_base = self.module.params['url']
# wrap certificate GUID in curly braces and url-encode
certificate_id = quote('{{{}}}'.format(certificate_id))
url = '{}/certificates/{}/downloadUrl'.format(url_base, certificate_id)
download_options = dict(
fileFormat = self.download_format,
includePrivateKey = False,
includeChain = True,
pfxRootFirst = False
)
if passphrase:
download_options['password'] = passphrase
download_options['includePrivateKey'] = True
if friendly_name:
download_options['friendlyName'] = friendly_name
headers = {
'Content-Type': 'application/json; charset=UTF-8',
'X-ApplicationUrl': '/aperture/',
'X-Requested-With': 'XMLHttpRequest',
'Authorization': 'VENAFI ' + self.api_key
}
# POST the options for the certificate retrieval
# and get the download URL
try:
resp, info = self._request(
module=self.module,
url=url,
data=download_options,
headers=headers,
method='POST')
except Exception as e:
self.module.fail_json(msg='set download options failed', status=e)
# validate the status info
if info['status'] > 400:
self.module.fail_json(msg='set download options failed', status=info)
# certificate retrieval using the download URL
try:
url = '{}/{}'.format(url_base, resp['downloadUrl'])
resp, info = self._request(
module=self.module,
url=url,
data=None,
headers=headers,
method='GET')
except Exception as e:
self.module.fail_json(msg='retrieve failed', status=e)
# validate the status info
if info['status'] > 400:
self.module.fail_json(msg='retrieve failed', status=info, response=resp)
# return the contents and status info
try:
# write to file if needed and update the changed attribute
if download_file is not None:
self.changed = write_file(self.module, download_file, resp)
# if there is a return response then return that as the content
content = resp.read()
except AttributeError:
# if there is no response body then just return the response object
content = resp
except Exception as e:
# add exception details to the info object
info['exception'] = self.module.jsonify(e)
info['changed'] = self.changed
return content, info
def main():
module = AnsibleModule(
argument_spec=dict(
state=dict(type='str', choices=['present', 'absent'], default='present'),
url=dict(required=True, type='str'),
username=dict(required=True, no_log=True, type='str'),
password=dict(required=True, no_log=True, type='str'),
passphrase=dict(no_log=True, type='str'),
certificate_id=dict(default=None, type='str'),
download_format=dict(type='str', choices=['json', 'pem', 'pkcs12'], default='pem'),
download_file=dict(type='str'),
friendly_name=dict(type='str'),
timeout=dict(type='int', default=300),
validate_certs=dict(type='bool', default=True),
),
supports_check_mode=True,
mutually_exclusive=[['api_token', 'username'],['api_token', 'password']],
required_together=[['username', 'password']],
required_if=([
('state', 'present', ['username', 'password']),
('download_format', 'pkcs12', ['download_file', 'passphrase']),
]
),
)
# exit if download file exists
if os.path.exists(module.params['download_file']):
module.exit_json(changed=False,msg="Skipped because download file exists")
# parse and validate params
tpp_api = VenafiTpp(module)
resp, info = tpp_api.auth()
resp, info = tpp_api.retrieve_certificate()
if not info.get('changed', None):
info['changed'] = False
module.exit_json(changed=info['changed'], result={'info': info})
if __name__ == '__main__':
main()
All test features and scenarios are authored outside JIRA Xray and kept alongside the test implementation code. The benefits of doing this include:
Some thoughts on the JIRA Xray issue status in context of release and testing lifecycles:
The GIT, Jenkins, JIRA Xray workflow:
This ensures the Test repository is evolving in line with new features released and is alway up to date for continual regression testing
Here is the Jenkins pipeline for deploying and executing the SWAT tests.
pipeline {
agent { label 'swatNode'}
parameters {
string(name: 'projectKey', defaultValue: 'SWAT')
string(name: 'jiraSite', defaultValue: 'Ent Jira')
string(name: 'jiraInstanceId', defaultValue: '7e023757-c0e6-4eca-9168-9b3e522f8fd7')
string(name: 'gitUrl', defaultValue: 'https:itbucket.company.com/scm/swat/test-automation.git')
}
stages {
stage('install SWAT framework') {
steps{
sh 'make install'
}
}
stage('Synch Tests with JIRA Xray'){
steps {
git branch: "${env.GIT_BRANCH}", changelog: false, credentialsId: '170c8996-4bca-45ec-88d9-4374bdc90c1f', poll: false, url: "${params.gitUrl}"
step([$class: 'XrayImportFeatureBuilder', folderPath: 'features', lastModified: '', projectKey: params.projectKey, serverInstance: params.jiraInstanceId])
}
}
stage('Export tests from Xray'){
steps {
sh "rm features/*.feature"
step([$class: 'XrayExportBuilder', filePath: 'features', serverInstance: params.jiraInstanceId, filter: '86314'])
}
}
stage('Execute Behave tests'){
steps {
sh '''
. venv/bin/activate
behave -f json.pretty -o outputs/results.json features
'''
}
}
}
post {
always {
step([$class: 'XrayImportBuilder', endpointName: '/behave', importFilePath: 'outputs/results.json', serverInstance: params.jiraInstanceId])
}
}
}
The pipeline is fairly self explanatory: