#!/usr/bin/env python """ Common functions used in this add-on """ from __future__ import annotations from configparser import ConfigParser from csv import QUOTE_ALL, DictReader from os import environ, path from urllib.parse import urlparse import requests class Utility: """Provide common functions""" def __get_proxy(self, url: str) -> dict[str, dict[str, str]] | str: """ Determine http proxy setting of a URL according to Splunk server configuration. Return {dict} of http/https proxy value if a URL should be proxied. """ hostname = urlparse(url).hostname server_conf_path = path.join( environ.get("SPLUNK_HOME", path.join("opt", "splunk")), "etc", "system", "local", "server.conf", ) server_conf = ConfigParser() server_conf.read(server_conf_path) proxy_config = ( server_conf["proxyConfig"] if "proxyConfig" in server_conf.sections() else {} ) proxy_rules = proxy_config.get("proxy_rules", "") no_proxy_rules = proxy_config.get("no_proxy", "") http_proxy = proxy_config.get("http_proxy", "") https_proxy = proxy_config.get("https_proxy", "") # https://docs.splunk.com/Documentation/Splunk/9.0.3/Admin/Serverconf#Splunkd_http_proxy_configuration if ( # either configs should not be empty (len(http_proxy) >= 1 or len(https_proxy) >= 1) # hostname should not be excluded by no_proxy and hostname not in no_proxy_rules # if proxy_rules is set, should include hostname and ( len(proxy_rules) == 0 or (len(proxy_rules) >= 1 and hostname in proxy_rules) ) ): return {"proxies": {"http": http_proxy, "https": https_proxy}} return {} def download(self, urls: list | tuple | str, index: int = 0) -> str: """ Send a GET request to the URL and return content of the response. :param urls: A list of URLs to try in sequence :param index: List's index to start """ if isinstance(urls, str): urls = (urls,) url = urls[index] proxy_config = self.__get_proxy(url) try: res = requests.get(url, timeout=5, **proxy_config) if res.status_code == requests.codes.ok: return res.text if index < len(urls) - 1: return self.download(urls, index + 1) res.raise_for_status() except requests.exceptions.HTTPError as errh: raise errh except requests.exceptions.ConnectionError as errc: raise errc except requests.exceptions.Timeout as errt: raise errt except requests.exceptions.RequestException as err: raise err def __split_column(self, input_str: str | list | None = None) -> list[str] | list: """Split {string} into {list} using comma separator""" if isinstance(input_str, str): return [x.strip() for x in input_str.split(",")] if isinstance(input_str, list): return input_str return [] def insert_affix( self, row: dict, prefix_opt: str | list | None = None, suffix_opt: str | list | None = None, affix_opt: str | list | None = None, ) -> dict: """ Affix wildcard "*" character to existing values :param row: A row of an array-parsed CSV :param prefix_opt: A column name or a comma-separated list of column names to have wildcard prefixed to their non-empty value. :param suffix_opt: Same as prefix_opt but have the wildcard suffixed instead. :param affix_opt: Same as prefix_opt but have the wildcard prefixed and suffixed. Return a new row with prefix/suffix columns appended """ prefix_opt_list = self.__split_column(prefix_opt) suffix_opt_list = self.__split_column(suffix_opt) affix_opt_list = self.__split_column(affix_opt) new_column = {} for column in prefix_opt_list: if column in row and len(row[column]) >= 1: new_column = { **new_column, **{f"{column}_wildcard_prefix": f"*{row[column]}"}, } for column in suffix_opt_list: if column in row and len(row[column]) >= 1: new_column = { **new_column, **{f"{column}_wildcard_suffix": f"{row[column]}*"}, } for column in affix_opt_list: if column in row and len(row[column]) >= 1: new_column = { **new_column, **{f"{column}_wildcard_affix": f"*{row[column]}*"}, } return {**row, **new_column} def csv_reader(self, csv_str: str) -> DictReader: """Parse an CSV input string into an interable of {dict} rows whose keys correspond to column names""" return DictReader( filter(lambda row: row[0] != "#", csv_str.splitlines()), quoting=QUOTE_ALL )