splunk-malware-filter/bin/utils.py

146 lines
5.1 KiB
Python
Raw Normal View History

2023-01-27 09:47:59 +00:00
#!/usr/bin/env python
"""
Common functions used in this add-on
"""
from configparser import ConfigParser
from csv import QUOTE_ALL, DictReader
from os import environ, path
from urllib.parse import urlparse
import requests
class Utility:
"""Provide common functions"""
def __get_proxy(self, url):
"""
Determine http proxy setting of a URL according to Splunk server configuration.
Return {dict} of http/https proxy value if a URL should be proxied.
"""
hostname = urlparse(url).hostname
server_conf_path = path.join(
environ.get("SPLUNK_HOME", path.join("opt", "splunk")),
"etc",
"system",
"local",
"server.conf",
)
server_conf = ConfigParser()
server_conf.read(server_conf_path)
proxy_config = (
server_conf["proxyConfig"]
if "proxyConfig" in server_conf.sections()
else {}
)
proxy_rules = proxy_config.get("proxy_rules", "")
no_proxy_rules = proxy_config.get("no_proxy", "")
http_proxy = proxy_config.get("http_proxy", "")
https_proxy = proxy_config.get("https_proxy", "")
# https://docs.splunk.com/Documentation/Splunk/9.0.3/Admin/Serverconf#Splunkd_http_proxy_configuration
# pylint: disable=too-many-boolean-expressions
if (
# either configs should not be empty
(len(http_proxy) >= 1 or len(https_proxy) >= 1)
# hostname should not be excluded by no_proxy
and hostname not in no_proxy_rules
# if proxy_rules is set, should include hostname
and (
len(proxy_rules) == 0
or (len(proxy_rules) >= 1 and hostname in proxy_rules)
)
):
return {"proxies": {"http": http_proxy, "https": https_proxy}}
return {}
2023-02-10 20:24:03 +00:00
# pylint: disable=inconsistent-return-statements
def download(self, urls, index=0):
"""
Send a GET request to the URL and return content of the response.
Arguments:
urls {list/tuple/string} -- A list of URLs to try in sequence
index -- List's index to start
"""
if isinstance(urls, str):
urls = (urls,)
url = urls[index]
2023-01-27 09:47:59 +00:00
proxy_config = self.__get_proxy(url)
try:
res = requests.get(url, timeout=5, **proxy_config)
2023-02-10 20:24:03 +00:00
# pylint: disable=no-member
if res.status_code == requests.codes.ok:
return res.text
if index < len(urls) - 1:
return self.download(urls, index + 1)
2023-01-27 09:47:59 +00:00
res.raise_for_status()
except requests.exceptions.HTTPError as errh:
raise errh
except requests.exceptions.ConnectionError as errc:
raise errc
except requests.exceptions.Timeout as errt:
raise errt
except requests.exceptions.RequestException as err:
raise err
def __split_column(self, input_str=None):
"""Split {string} into {list} using comma separator"""
if isinstance(input_str, str):
return [x.strip() for x in input_str.split(",")]
if isinstance(input_str, list):
return input_str
return []
def insert_affix(self, row, prefix_opt=None, suffix_opt=None, affix_opt=None):
"""
Affix wildcard "*" character to existing values
Arguments:
row {dict} -- A row of an array-parsed CSV
prefix_opt {string/list} -- A column name or a comma-separated list of column names to have
wildcard prefixed to their non-empty value.
2023-01-27 09:47:59 +00:00
suffix_opt {string/list} -- Same as prefix_opt but have the wildcard suffixed instead.
affix_opt {string/list} -- Same as prefix_opt but have the wildcard prefixed and suffixed.
Return:
A new row with prefix/suffix columns appended
"""
prefix_opt_list = self.__split_column(prefix_opt)
suffix_opt_list = self.__split_column(suffix_opt)
affix_opt_list = self.__split_column(affix_opt)
new_column = {}
for column in prefix_opt_list:
if column in row and len(row[column]) >= 1:
new_column = {
**new_column,
**{f"{column}_wildcard_prefix": f"*{row[column]}"},
}
for column in suffix_opt_list:
if column in row and len(row[column]) >= 1:
new_column = {
**new_column,
**{f"{column}_wildcard_suffix": f"{row[column]}*"},
}
for column in affix_opt_list:
if column in row and len(row[column]) >= 1:
new_column = {
**new_column,
**{f"{column}_wildcard_affix": f"*{row[column]}*"},
}
return {**row, **new_column}
def csv_reader(self, csv_str):
"""Parse an CSV input string into an interable of {dict} rows whose keys correspond to column names"""
return DictReader(
filter(lambda row: row[0] != "#", csv_str.splitlines()), quoting=QUOTE_ALL
)