When developing complex Splunk dashboards, the native Splunk workflow can be painfully slow. Among other annoynces, the Splunk dashboard editor preview does not render tokens therefore leaving some visualisations unrendered due to the incomplete underlying searches that use tokens. Here is an automation setup that helped me speed up the development loop and throws in some other benefits such as version control.
Initialise environmental values once before starting the development session:
#!/bin/bash
read -s -p 'Password: ' password
export splunk_password="$password"
export splunk_url='https://splunk-test.company.com.au'
export splunk_user='m123456'
The Git post-commit hook:
#!/bin/sh
# Upload the XML dashboard to Splunk
python `pwd`/scripts/iis-dashboard.py --dashboard scorecard_nonprod_ibm_managed --appname MWOPS --xmlfilepath `pwd`/dashboards/scorecard-environment.xml --dashboarduser=$splunk_user
# Splunk needs time to process the dashboard after upload completes
sleep 2
# Refresh our browser to show the new changes
python `pwd`/scripts/webdriver.py --url=$splunk_url --debug
Python script to upload the dashboard XML to our Splunk App:
import sys
import os
import io
import requests
import argparse
import xml.etree.ElementTree as etree
from http.client import responses
import urllib3
urllib3.disable_warnings(urllib3.exceptions.InsecureRequestWarning)
'''
module docstring: module to upload xml dashboard to be used on a git push
'''
## authenticate and get the session key
def authenticate(splunkurl, user, password):
'''
authenticate to get the auth cookie
'''
# first retrieve the cval cookie value by GET to login page
url = splunkurl + '/en-US/account/login'
response = HTTPSESSION.get(url, verify=False)
print('Authentication - get cval cookie: {} {}'.format(response.status_code, responses[response.status_code]))
#print('CVAL Cookie: {}'.format(HTTPSESSION.cookies))
cval = HTTPSESSION.cookies['cval']
auth_payload = {
'username': user,
'password': password,
'cval': cval
}
response = HTTPSESSION.post(url, data=auth_payload, verify=False)
print('Authentication - post creds: {} {}'.format(response.status_code, responses[response.status_code]))
#print(response.text)
#session_key = json.loads(response.text)['sessionKey']
#print(session_key)
#return session_key
return 0
## get dashboard
def get_dashboard_xml(splunkurl, dashboard_name, appname, xmlfilepath, download_flag):
'''
desc: downloads the xml definition for the provided dashboard name
'''
url = splunkurl + '/en-US/splunkd/__raw/servicesNS/' + splunk_user + '/' + appname + '/data/ui/views/' + dashboard_name
# set headers
headers = {
}
#print(headers['Cookie'])
response = HTTPSESSION.get(url, verify=False)
print("Dashboard - downloading: {} {}".format(str(response.status_code), responses[response.status_code]))
#print(response.cookies)
# parse the XML response to get the dashboard xml
xmltree = etree.parse(io.StringIO(response.text))
xmlroot = xmltree.getroot()
xmlentry = xmlroot.findall('entry')
print(xmlentry)
if response.status_code == 200 and download_flag:
with open(xmlfilepath, 'w') as f:
f.write(response.text)
return 0
## save dashboard
def save_dashboard_xml(splunkurl, dashboard_name, appname, xmlfilepath, dashboard_user):
'''
desc: saves the xml definition for the provided dashboard name
'''
url = splunkurl + '/en-US/splunkd/__raw/servicesNS/' + dashboard_user + '/' + appname + '/data/ui/views/' + dashboard_name
# required headers
headers = {
'X-Splunk-Form-Key': HTTPSESSION.cookies['splunkweb_csrf_token_8000'],
'X-Requested-With': 'XMLHttpRequest'
#'Content-type': 'application/json'
}
try:
f1 = open(xmlfilepath, 'r')
payload = {
'eai:data': f1.read()
}
except Exception as e:
sys.exit(str(e))
response = HTTPSESSION.post(url, verify=False, data=payload, headers=headers)
print("Dashboard - saving {} {}".format(str(response.status_code), responses[response.status_code]))
#print("Response headers: " + response.headers)
return 0
def get_dashboard_metadata(splunkurl, appname):
'''
desc: download the json metadata for the dashboard
'''
url = '{}/en-US/splunkd/__raw/servicesNS/-/{}/data/ui/views?output_mode=json&search=((isDashboard=1+AND+isVisible=1)+AND+(eai:acl.owner=%22{}%22))'.format(splunkurl, appname, dashboard_user)
# query data
payload = {
'output_mode':'json',
'search':'((isDashboard=1+AND+isVisible=1)+AND+((eai:acl.sharing="user"+AND+eai:acl.owner="{}")+OR+(eai:acl.sharing!="user")))'.format(dashboard_user)
}
#print(headers['Cookie'])
response = HTTPSESSION.get(url, verify=False)
print('get metadata url: {}'.format(response.url))
print("metadata - results: {} {}".format(str(response.status_code), responses[response.status_code]))
print("metadata - response: {}".format(response.content))
def api_auth():
'''
alternate login
'''
url = ''
def get_args():
''' add arguments '''
parser = argparse.ArgumentParser(description='gets and updates a dashboard')
# saveas xml filename
parser.add_argument('--xmlfilepath', help='file path to save XML')
# username for the app context
parser.add_argument('--dashboarduser', default='nobody', help='username for the dashboard context. Default nobody')
# dashboard name
parser.add_argument('--dashboard', help='name of the dashboard to update')
# app name
parser.add_argument('--appname', help='splunk app name where dashboard lives')
# splunk user name
parser.add_argument('--username', default=os.environ.get('splunk_user', None), help='splunk username for authentication. Either set it here or in an environment variable called splunk_user')
# splunk password
parser.add_argument('--password', default=os.environ.get('splunk_password', None), help='splunk password authentication. Either set it here or in an environment variable called splunk_password')
# splunk password
parser.add_argument('--url', default=os.environ.get('splunk_url', None), help='splunk full URL such as https://splunk.hostname.com:8080. Either set it here or in an environment variable called splunk_url')
# sanity checks
args = parser.parse_args()
if not args.dashboard:
print('dashboard missing')
exit(parser.print_usage())
if not args.username:
print('username missing')
exit(parser.print_usage())
if not args.password:
print('password missing')
exit(parser.print_usage())
if not args.url:
print('url missing')
exit(parser.print_usage())
if not args.appname:
print('appname url missing')
exit(parser.print_usage())
if not args.xmlfilepath:
print(' xmlfilepath missing')
exit(parser.print_usage())
return args
##
'''
#### Main
'''
if __name__ == '__main__':
# get arguments
args = get_args()
# initialise variables
splunk_user = os.environ['splunk_user']
splunk_password = os.environ['splunk_password']
splunk_url = os.environ['splunk_url']
dashboard_name = args.dashboard
appname = args.appname
xmlfilepath = args.xmlfilepath
dashboard_user = args.dashboarduser
try:
HTTPSESSION = requests.Session()
# SESS_KEY = get_session_key()
# print('session key: ' + SESS_KEY)
authenticate(splunk_url, splunk_user, splunk_password)
save_dashboard_xml(splunk_url, dashboard_name, appname, xmlfilepath, dashboard_user)
#get_dashboard_metadata(splunk_url,appname)
# dashboard_xml = get_dashboard_xml(SESS_KEY)
# print(dashboard_xml)
except Exception as e:
sys.exit('Error: ' + str(e))
The Selenium wrapper to refresh the browser after uploading the dashboard XML:
import sys
import os
import io
import argparse, logging
from selenium import webdriver
from selenium.common import exceptions as seleniumException
def get_args():
''' add arguments '''
parser = argparse.ArgumentParser(description='browser automation')
parser.add_argument('--debug', help='enable debug logging', action='store_true')
parser.add_argument('--url', default='https://splunk-test.company.com.au', help='url to retrieve')
# sanity checks
args = parser.parse_args()
if args.debug:
logging.basicConfig(level=logging.DEBUG)
return args
def create_new_driver():
'''starts the selenium webdriver to start the borwser '''
from selenium.webdriver.common.keys import Keys
from selenium.webdriver.common.desired_capabilities import DesiredCapabilities
capabilities = webdriver.DesiredCapabilities.FIREFOX.copy()
capabilities['proxy'] = {'proxyType': 'DIRECT'}
driver = webdriver.Remote(desired_capabilities=capabilities, command_executor='http://127.0.0.1:4444/wd/hub')
# set global environment variables
export_cmd_1 = "{}={}\n".format('session_id', driver.session_id)
export_cmd_2 = "{}={}\n".format('executor_url', driver.command_executor._url)
_f = open(".webdriver_session", "w")
_f.write(export_cmd_1)
_f.write(export_cmd_2)
_f.close()
specs['session_id'] = driver.session_id
specs['url'] = driver.command_executor._url
return driver
##
def create_driver_session(session_id, executor_url):
''' patched the gecko driver to reuse existing sessions '''
from selenium.webdriver.remote.webdriver import WebDriver as RemoteWebDriver
# Save the original function, so we can revert our patch
org_command_execute = RemoteWebDriver.execute
def new_command_execute(self, command, params=None):
if command == "newSession":
# Mock the response
return {'success': 0, 'value': None, 'sessionId': session_id}
else:
return org_command_execute(self, command, params)
# Patch the function before creating the driver object
RemoteWebDriver.execute = new_command_execute
capabilities = webdriver.DesiredCapabilities.FIREFOX.copy()
capabilities['proxy'] = {'proxyType': 'DIRECT'}
new_driver = webdriver.Remote(command_executor=executor_url, desired_capabilities=capabilities)
new_driver.session_id = session_id
# Replace the patched function with original function
RemoteWebDriver.execute = org_command_execute
return new_driver
##
'''
#### Main
'''
if __name__ == '__main__':
logging.basicConfig(filename='webdriver.log', level=logging.INFO, format='%(asctime)s %(message)s')
ARGS = get_args()
URL = ARGS.url
SPECS = {}
# get session_id adn executor_url from shell
try:
ids = {}
with open(".webdriver_session") as f:
for line in f:
key, value = line.split("=")
ids[key] = value.rstrip()
logging.debug('ids: {}'.format(ids))
logging.debug('session id found: {}'.format(ids['session_id']))
logging.debug('executor_url found {}'.format(ids['executor_url']))
driver = create_driver_session(ids['session_id'], ids['executor_url'])
except (OSError, seleniumException.NoSuchWindowException, seleniumException.WebDriverException) as err:
logging.exception('starting new session due to error {}'.format(err))
driver = create_new_driver()
try:
driver.get(URL)
except Exception as err:
logging.exception('Unhandled exception: {}'.format(err))