128 lines
4.6 KiB
Python
128 lines
4.6 KiB
Python
#!/usr/bin/env python
|
|
|
|
"""
|
|
Common functions used in this add-on
|
|
"""
|
|
|
|
from configparser import ConfigParser
|
|
from csv import QUOTE_ALL, DictReader
|
|
from os import environ, path
|
|
from urllib.parse import urlparse
|
|
|
|
import requests
|
|
|
|
|
|
class Utility:
|
|
"""Provide common functions"""
|
|
|
|
def __get_proxy(self, url):
|
|
"""
|
|
Determine http proxy setting of a URL according to Splunk server configuration.
|
|
Return {dict} of http/https proxy value if a URL should be proxied.
|
|
"""
|
|
hostname = urlparse(url).hostname
|
|
|
|
server_conf_path = path.join(
|
|
environ.get("SPLUNK_HOME", path.join("opt", "splunk")),
|
|
"etc",
|
|
"system",
|
|
"local",
|
|
"server.conf",
|
|
)
|
|
server_conf = ConfigParser()
|
|
server_conf.read(server_conf_path)
|
|
proxy_config = (
|
|
server_conf["proxyConfig"]
|
|
if "proxyConfig" in server_conf.sections()
|
|
else {}
|
|
)
|
|
proxy_rules = proxy_config.get("proxy_rules", "")
|
|
no_proxy_rules = proxy_config.get("no_proxy", "")
|
|
http_proxy = proxy_config.get("http_proxy", "")
|
|
https_proxy = proxy_config.get("https_proxy", "")
|
|
|
|
# https://docs.splunk.com/Documentation/Splunk/9.0.3/Admin/Serverconf#Splunkd_http_proxy_configuration
|
|
# pylint: disable=too-many-boolean-expressions
|
|
if (
|
|
# either configs should not be empty
|
|
(len(http_proxy) >= 1 or len(https_proxy) >= 1)
|
|
# hostname should not be excluded by no_proxy
|
|
and hostname not in no_proxy_rules
|
|
# if proxy_rules is set, should include hostname
|
|
and (
|
|
len(proxy_rules) == 0
|
|
or (len(proxy_rules) >= 1 and hostname in proxy_rules)
|
|
)
|
|
):
|
|
return {"proxies": {"http": http_proxy, "https": https_proxy}}
|
|
|
|
return {}
|
|
|
|
def download(self, url):
|
|
"""Send a GET request to the URL and return content of the response."""
|
|
proxy_config = self.__get_proxy(url)
|
|
try:
|
|
res = requests.get(url, timeout=5, **proxy_config)
|
|
res.raise_for_status()
|
|
return res.text
|
|
except requests.exceptions.HTTPError as errh:
|
|
raise errh
|
|
except requests.exceptions.ConnectionError as errc:
|
|
raise errc
|
|
except requests.exceptions.Timeout as errt:
|
|
raise errt
|
|
except requests.exceptions.RequestException as err:
|
|
raise err
|
|
|
|
def __split_column(self, input_str=None):
|
|
"""Split {string} into {list} using comma separator"""
|
|
if isinstance(input_str, str):
|
|
return [x.strip() for x in input_str.split(",")]
|
|
if isinstance(input_str, list):
|
|
return input_str
|
|
return []
|
|
|
|
def insert_affix(self, row, prefix_opt=None, suffix_opt=None, affix_opt=None):
|
|
"""
|
|
Affix wildcard "*" character to existing values
|
|
|
|
Arguments:
|
|
row {dict} -- A row of an array-parsed CSV
|
|
prefix_opt {string/list} -- A column name or a comma-separated list of column names to have wildcard prefixed to their non-empty value.
|
|
suffix_opt {string/list} -- Same as prefix_opt but have the wildcard suffixed instead.
|
|
affix_opt {string/list} -- Same as prefix_opt but have the wildcard prefixed and suffixed.
|
|
|
|
Return:
|
|
A new row with prefix/suffix columns appended
|
|
"""
|
|
prefix_opt_list = self.__split_column(prefix_opt)
|
|
suffix_opt_list = self.__split_column(suffix_opt)
|
|
affix_opt_list = self.__split_column(affix_opt)
|
|
new_column = {}
|
|
for column in prefix_opt_list:
|
|
if column in row and len(row[column]) >= 1:
|
|
new_column = {
|
|
**new_column,
|
|
**{f"{column}_wildcard_prefix": f"*{row[column]}"},
|
|
}
|
|
for column in suffix_opt_list:
|
|
if column in row and len(row[column]) >= 1:
|
|
new_column = {
|
|
**new_column,
|
|
**{f"{column}_wildcard_suffix": f"{row[column]}*"},
|
|
}
|
|
for column in affix_opt_list:
|
|
if column in row and len(row[column]) >= 1:
|
|
new_column = {
|
|
**new_column,
|
|
**{f"{column}_wildcard_affix": f"*{row[column]}*"},
|
|
}
|
|
|
|
return {**row, **new_column}
|
|
|
|
def csv_reader(self, csv_str):
|
|
"""Parse an CSV input string into an interable of {dict} rows whose keys correspond to column names"""
|
|
return DictReader(
|
|
filter(lambda row: row[0] != "#", csv_str.splitlines()), quoting=QUOTE_ALL
|
|
)
|