#!/usr/bin/env python """ Common functions used in this add-on """ from configparser import ConfigParser from csv import QUOTE_ALL, DictReader from os import environ, path from urllib.parse import urlparse import requests class Utility: """Provide common functions""" def __get_proxy(self, url): """ Determine http proxy setting of a URL according to Splunk server configuration. Return {dict} of http/https proxy value if a URL should be proxied. """ hostname = urlparse(url).hostname server_conf_path = path.join( environ.get("SPLUNK_HOME", path.join("opt", "splunk")), "etc", "system", "local", "server.conf", ) server_conf = ConfigParser() server_conf.read(server_conf_path) proxy_config = ( server_conf["proxyConfig"] if "proxyConfig" in server_conf.sections() else {} ) proxy_rules = proxy_config.get("proxy_rules", "") no_proxy_rules = proxy_config.get("no_proxy", "") http_proxy = proxy_config.get("http_proxy", "") https_proxy = proxy_config.get("https_proxy", "") # https://docs.splunk.com/Documentation/Splunk/9.0.3/Admin/Serverconf#Splunkd_http_proxy_configuration # pylint: disable=too-many-boolean-expressions if ( # either configs should not be empty (len(http_proxy) >= 1 or len(https_proxy) >= 1) # hostname should not be excluded by no_proxy and hostname not in no_proxy_rules # if proxy_rules is set, should include hostname and ( len(proxy_rules) == 0 or (len(proxy_rules) >= 1 and hostname in proxy_rules) ) ): return {"proxies": {"http": http_proxy, "https": https_proxy}} return {} # pylint: disable=inconsistent-return-statements def download(self, urls, index=0): """ Send a GET request to the URL and return content of the response. Arguments: urls {list/tuple/string} -- A list of URLs to try in sequence index -- List's index to start """ if isinstance(urls, str): urls = (urls,) url = urls[index] proxy_config = self.__get_proxy(url) try: res = requests.get(url, timeout=5, **proxy_config) # pylint: disable=no-member if res.status_code == requests.codes.ok: return res.text if index < len(urls) - 1: return self.download(urls, index + 1) res.raise_for_status() except requests.exceptions.HTTPError as errh: raise errh except requests.exceptions.ConnectionError as errc: raise errc except requests.exceptions.Timeout as errt: raise errt except requests.exceptions.RequestException as err: raise err def __split_column(self, input_str=None): """Split {string} into {list} using comma separator""" if isinstance(input_str, str): return [x.strip() for x in input_str.split(",")] if isinstance(input_str, list): return input_str return [] def insert_affix(self, row, prefix_opt=None, suffix_opt=None, affix_opt=None): """ Affix wildcard "*" character to existing values Arguments: row {dict} -- A row of an array-parsed CSV prefix_opt {string/list} -- A column name or a comma-separated list of column names to have wildcard prefixed to their non-empty value. suffix_opt {string/list} -- Same as prefix_opt but have the wildcard suffixed instead. affix_opt {string/list} -- Same as prefix_opt but have the wildcard prefixed and suffixed. Return: A new row with prefix/suffix columns appended """ prefix_opt_list = self.__split_column(prefix_opt) suffix_opt_list = self.__split_column(suffix_opt) affix_opt_list = self.__split_column(affix_opt) new_column = {} for column in prefix_opt_list: if column in row and len(row[column]) >= 1: new_column = { **new_column, **{f"{column}_wildcard_prefix": f"*{row[column]}"}, } for column in suffix_opt_list: if column in row and len(row[column]) >= 1: new_column = { **new_column, **{f"{column}_wildcard_suffix": f"{row[column]}*"}, } for column in affix_opt_list: if column in row and len(row[column]) >= 1: new_column = { **new_column, **{f"{column}_wildcard_affix": f"*{row[column]}*"}, } return {**row, **new_column} def csv_reader(self, csv_str): """Parse an CSV input string into an interable of {dict} rows whose keys correspond to column names""" return DictReader( filter(lambda row: row[0] != "#", csv_str.splitlines()), quoting=QUOTE_ALL )