splunk-malware-filter/bin/utils.py

150 lines
5.1 KiB
Python
Raw Permalink Normal View History

2023-01-27 09:47:59 +00:00
#!/usr/bin/env python
"""
Common functions used in this add-on
"""
2024-07-30 08:18:43 +00:00
from __future__ import annotations
2023-01-27 09:47:59 +00:00
from configparser import ConfigParser
from csv import QUOTE_ALL, DictReader
from os import environ, path
from urllib.parse import urlparse
import requests
class Utility:
"""Provide common functions"""
2024-07-30 08:18:43 +00:00
def __get_proxy(self, url: str) -> dict[str, dict[str, str]] | str:
2023-01-27 09:47:59 +00:00
"""
Determine http proxy setting of a URL according to Splunk server configuration.
Return {dict} of http/https proxy value if a URL should be proxied.
"""
hostname = urlparse(url).hostname
server_conf_path = path.join(
environ.get("SPLUNK_HOME", path.join("opt", "splunk")),
"etc",
"system",
"local",
"server.conf",
)
server_conf = ConfigParser()
server_conf.read(server_conf_path)
proxy_config = (
server_conf["proxyConfig"]
if "proxyConfig" in server_conf.sections()
else {}
)
proxy_rules = proxy_config.get("proxy_rules", "")
no_proxy_rules = proxy_config.get("no_proxy", "")
http_proxy = proxy_config.get("http_proxy", "")
https_proxy = proxy_config.get("https_proxy", "")
# https://docs.splunk.com/Documentation/Splunk/9.0.3/Admin/Serverconf#Splunkd_http_proxy_configuration
if (
# either configs should not be empty
(len(http_proxy) >= 1 or len(https_proxy) >= 1)
# hostname should not be excluded by no_proxy
and hostname not in no_proxy_rules
# if proxy_rules is set, should include hostname
and (
len(proxy_rules) == 0
or (len(proxy_rules) >= 1 and hostname in proxy_rules)
)
):
return {"proxies": {"http": http_proxy, "https": https_proxy}}
return {}
2024-07-30 08:18:43 +00:00
def download(self, urls: list | tuple | str, index: int = 0) -> str:
2023-02-10 20:24:03 +00:00
"""
Send a GET request to the URL and return content of the response.
2024-07-30 08:18:43 +00:00
:param urls: A list of URLs to try in sequence
:param index: List's index to start
2023-02-10 20:24:03 +00:00
"""
if isinstance(urls, str):
urls = (urls,)
url = urls[index]
2023-01-27 09:47:59 +00:00
proxy_config = self.__get_proxy(url)
try:
res = requests.get(url, timeout=5, **proxy_config)
2023-02-10 20:24:03 +00:00
if res.status_code == requests.codes.ok:
return res.text
if index < len(urls) - 1:
return self.download(urls, index + 1)
2023-01-27 09:47:59 +00:00
res.raise_for_status()
except requests.exceptions.HTTPError as errh:
raise errh
except requests.exceptions.ConnectionError as errc:
raise errc
except requests.exceptions.Timeout as errt:
raise errt
except requests.exceptions.RequestException as err:
raise err
2024-07-30 08:18:43 +00:00
def __split_column(self, input_str: str | list | None = None) -> list[str] | list:
2023-01-27 09:47:59 +00:00
"""Split {string} into {list} using comma separator"""
if isinstance(input_str, str):
return [x.strip() for x in input_str.split(",")]
if isinstance(input_str, list):
return input_str
return []
2024-07-30 08:18:43 +00:00
def insert_affix(
self,
row: dict,
prefix_opt: str | list | None = None,
suffix_opt: str | list | None = None,
affix_opt: str | list | None = None,
) -> dict:
2023-01-27 09:47:59 +00:00
"""
Affix wildcard "*" character to existing values
2024-07-30 08:18:43 +00:00
:param row: A row of an array-parsed CSV
:param prefix_opt: A column name or a comma-separated list of column names to have
wildcard prefixed to their non-empty value.
:param suffix_opt: Same as prefix_opt but have the wildcard suffixed instead.
:param affix_opt: Same as prefix_opt but have the wildcard prefixed and suffixed.
2023-01-27 09:47:59 +00:00
2024-07-30 08:18:43 +00:00
Return a new row with prefix/suffix columns appended
2023-01-27 09:47:59 +00:00
"""
2024-07-30 08:18:43 +00:00
2023-01-27 09:47:59 +00:00
prefix_opt_list = self.__split_column(prefix_opt)
suffix_opt_list = self.__split_column(suffix_opt)
affix_opt_list = self.__split_column(affix_opt)
new_column = {}
2024-07-30 08:18:43 +00:00
2023-01-27 09:47:59 +00:00
for column in prefix_opt_list:
if column in row and len(row[column]) >= 1:
new_column = {
**new_column,
**{f"{column}_wildcard_prefix": f"*{row[column]}"},
}
for column in suffix_opt_list:
if column in row and len(row[column]) >= 1:
new_column = {
**new_column,
**{f"{column}_wildcard_suffix": f"{row[column]}*"},
}
for column in affix_opt_list:
if column in row and len(row[column]) >= 1:
new_column = {
**new_column,
**{f"{column}_wildcard_affix": f"*{row[column]}*"},
}
return {**row, **new_column}
2024-07-30 08:18:43 +00:00
def csv_reader(self, csv_str: str) -> DictReader:
2023-01-27 09:47:59 +00:00
"""Parse an CSV input string into an interable of {dict} rows whose keys correspond to column names"""
return DictReader(
filter(lambda row: row[0] != "#", csv_str.splitlines()), quoting=QUOTE_ALL
)