traefik-helper-kubernetes/utils/utils.py

117 lines
2.6 KiB
Python
Raw Normal View History

2024-09-21 04:38:15 +00:00
import subprocess
import logging
import os
log = logging.getLogger(__name__)
baseServices = """kind: Service
apiVersion: v1
metadata:
name: {name}-external
namespace: traefik
spec:
type: ExternalName
externalName: {service}
ports:
- name: {protocol}
port: {port}
---
apiVersion: traefik.io/v1alpha1
kind: IngressRoute
metadata:
name: {name}-external-ingress
namespace: traefik
annotations:
kubernetes.io/ingress.class: traefik-external
spec:
entryPoints:
- websecure
routes:
- match: Host(`{host}`)
kind: Rule
services:
- name: {name}-external
port: {port}
tls:
secretName: {tls_secret}
"""
def getServicesPath():
"""
Get the path to the services directory
"""
# Get from the environment variable
if os.getenv("SERVICES_PATH"):
return os.getenv("SERVICES_PATH")
# Fallback to my pc cause im lazy
return "C:/Users/Liam/Nextcloud/Kubernetes/traefik/external"
def checkIfKubectlInstalled():
"""
Check if kubectl is installed
"""
try:
subprocess.check_output(['kubectl', 'version'])
return True
except subprocess.CalledProcessError:
return False
def runKubectl(command):
"""
Run kubectl with the given command
"""
args = command.split(' ')
result = subprocess.run(['kubectl'] + args, capture_output=True, text=True)
if result.returncode != 0:
print(f"Error running kubectl: \n{result.stderr}")
else:
print(f"kubectl output: \n{result.stdout}")
def createService(filePath, name, host, service, protocol, port, tls_secret):
"""
Create a new service and apply it using kubectl
"""
# Generate the YAML content for the service and ingress route
serviceContent = baseServices.format(
name=name,
host=host,
service=service,
protocol=protocol,
port=port,
tls_secret=tls_secret
)
# Write the YAML content to the file and check for errors
try:
with open(filePath, 'w') as f:
f.write(serviceContent)
print(f"Service definition written to {filePath}")
except Exception as e:
print(f"Error writing service file: {e}")
return
# Apply the service using kubectl
runKubectl(f'apply -f {filePath}')
log.info(f"Service {name} created")
log.info(f"Domain: http://{host}")
def deleteService(filePath, name):
"""
Delete a service and apply it using kubectl
"""
# Delete the service using kubectl
runKubectl(f'delete -f {filePath}')
# Remove the service file
try:
os.remove(filePath)
print(f"Service definition deleted from {filePath}")
except Exception as e:
print(f"Error deleting service file: {e}")
return
log.info(f"Service {name} deleted")