...
 
Commits (3)
import os
import sys
import json
import logging
import hvac
import slixmpp
......@@ -19,7 +18,6 @@ XMPP_SENDER = os.getenv("HH_XMPP_SENDER", None)
VAULT_ADDR = os.getenv("VAULT_ADDR", None)
VAULT_ENTITY = os.getenv("VAULT_ENTITY", None)
VAULT_TOKEN = os.getenv("VAULT_TOKEN", None)
VAULT_CACHE_FILE = "/tmp/vault_cache.json"
# Send a message (and optional file) via XMPP to a room
......@@ -74,47 +72,6 @@ class XmppMessage(slixmpp.ClientXMPP):
self.disconnect()
# Attempt to read the cache file and return it's contents
# If it doesn't exist, we create it and populate it with an empty JSON array
# Returns: config in JSON format, False on failure
def get_vault_cache():
if os.path.isfile(VAULT_CACHE_FILE) and os.stat(VAULT_CACHE_FILE).st_size > 0:
with open(VAULT_CACHE_FILE, "r") as cache_file:
try:
cache = json.load(cache_file)
return cache
except json.decoder.JSONDecodeError as error:
LOG.error(str(error))
return False
else:
# Init file with dummy JSON data so json.load() doesn't complain
with open(VAULT_CACHE_FILE, "w") as new_cache_file:
new_cache_file.write("{}")
return "{}"
# Attempt to update the cache file with the given key/value
# Returns: True if we update the cache file, False on failure
def add_to_vault_cache(secret, value):
with open(VAULT_CACHE_FILE, "r") as cache_file:
try:
cache = json.load(cache_file)
except json.decoder.JSONDecodeError as error:
LOG.error(str(error))
return False
cache.update({secret: value})
with open(VAULT_CACHE_FILE, "w") as cache_file:
try:
json.dump(cache, cache_file)
except json.decoder.JSONDecodeError as error:
LOG.error(str(error))
return False
return True
# Checks the current token TTL and renews if necessary
# Returns: False if renewing the token fails, True otherwise (renewed/no action needed)
def check_and_renew_token(vault):
......@@ -232,7 +189,7 @@ def get_vault():
# Get a secret, with caching support
# Returns: secret's value, False on failure
def get_vault_secret(secret, cache=True):
def get_vault_secret(secret):
# Get vault
vault = get_vault()
......@@ -253,38 +210,21 @@ def get_vault_secret(secret, cache=True):
LOG.error("Unrecognized secret format: %s", secret)
return False
# Check if cache file exists, and pull from there if possible
if cache:
cache_file = get_vault_cache()
# Grab secret from vault
vault = get_vault()
if vault is False:
return False
if path not in cache_file:
return get_vault_secret(secret, cache=False)
# Read secret
secret_lookup = vault.secrets.kv.v2.read_secret_version(
mount_point=mount_point, path=path
)
secret_value = secret_lookup.get("data").get("data").get("secret")
LOG.info("Using cached value for: %s", path)
secret = cache_file.get(path)
if not secret_value or secret_value is None:
LOG.error("Failed to get secret: %s", str(path))
secret = False
else:
# Grab secret from vault
vault = get_vault()
if vault is False:
return False
# Read secret
secret_lookup = vault.secrets.kv.v2.read_secret_version(
mount_point=mount_point, path=path
)
secret_value = secret_lookup.get("data").get("data").get("secret")
if not secret_value or secret_value is None:
LOG.error("Failed to get secret: %s", str(path))
secret = False
# Add to cache before returning it
if add_to_vault_cache(path, secret_value):
LOG.info("Added: %s to the cache file", str(path))
else:
LOG.warning("Failed to add: %s to the cache file", str(path))
secret = secret_value
return secret
......
......@@ -5,7 +5,7 @@ with open("README.md", "r") as fh:
setuptools.setup(
name="hhelper",
version="1.0.9",
version="1.0.10",
author="Tyler Page",
author_email="hhelper-pypi@tpage.io",
description="A homelab helper module",
......