diff --git a/specific_task/HyperCoreBalancerv2/.GITIGNORE b/specific_task/HyperCoreBalancerv2/.GITIGNORE new file mode 100644 index 0000000..2eea525 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/.GITIGNORE @@ -0,0 +1 @@ +.env \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/.env.template b/specific_task/HyperCoreBalancerv2/.env.template new file mode 100644 index 0000000..7b3925d --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/.env.template @@ -0,0 +1,61 @@ +# ============================================================================= +# HyperCore Load Balancer - Environment Configuration +# ============================================================================= +# Copy this file to .env and fill in your real values. +# NEVER commit the .env file to version control. +# ============================================================================= + +# --- HyperCore Cluster Credentials --- +SC_HOST=https://nodeip/rest/v1 +SC_USERNAME=admin +SC_PASSWORD=password +SC_VERIFY_SSL=false + +# --- InfluxDB --- +DOCKER_INFLUXDB_INIT_USERNAME=admin +DOCKER_INFLUXDB_INIT_PASSWORD=CHANGE_ME +INFLUX_TOKEN=CHANGE_ME +INFLUX_ORG=hypercore +INFLUX_BUCKET=metrics + +# Retention & Downsampling +INFLUX_PRIMARY_RETENTION=365 +INFLUX_LONGTERM_BUCKET=metrics_longterm +INFLUX_LONGTERM_RETENTION=1825 + +# --- Grafana --- +GF_SECURITY_ADMIN_PASSWORD=CHANGE_ME + +# --- Collector --- +SC_POLL_INTERVAL=30 +SC_SLOW_POLL_INTERVAL=300 + +# --- Balancer: Master Toggles --- +SC_DRY_RUN=true +SC_VERBOSITY=3 + +# --- Balancer: Reactive Engine --- +SC_AVG_WINDOW_MINUTES=5 +SC_SAMPLE_INTERVAL_SECONDS=30 +SC_RAM_LIMIT_PERCENT=85.0 +SC_RAM_UPPER_THRESHOLD_PERCENT=65.0 +SC_CPU_UPPER_THRESHOLD_PERCENT=50.0 +SC_MAX_VCPU_RATIO=2.0 +SC_VM_MOVE_COOLDOWN_MINUTES=30 +SC_MIGRATION_COOLDOWN_MINUTES=5 +SC_EXCLUDE_NODE_IPS= + +# --- Balancer: Predictive Engine (The Oracle) --- +SC_PREDICTIVE_BALANCING_ENABLED=true +SC_PREDICTIVE_INTERVAL_SECONDS=43200 +SC_PREDICTIVE_THRESHOLD=80.0 +SC_PREDICTIVE_MIN_HISTORY_HOURS=336 +SC_PREDICTIVE_LOOKBACK_DAYS=90 +SC_PREDICTIVE_MAX_WORKERS=8 +SC_PREDICTIVE_LEAD_TIME_HOURS=1 +SC_PREDICTIVE_RESERVATION_MINUTES=5 + +# --- Dashboard --- +SC_DASHBOARD_STALE_SECONDS=120 +SC_DASHBOARD_USER=admin +SC_DASHBOARD_PASSWORD=CHANGE_ME diff --git a/specific_task/HyperCoreBalancerv2/Balancer/Dockerfile b/specific_task/HyperCoreBalancerv2/Balancer/Dockerfile new file mode 100644 index 0000000..cbfaf91 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Balancer/Dockerfile @@ -0,0 +1,17 @@ +FROM python:3.11-slim + +WORKDIR /app + +# Install build dependencies required by Prophet +RUN apt-get update && apt-get install -y build-essential && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt + +# Copy your scripts into the container +COPY HyperCore_balancer.py . +COPY predictive_engine.py . +COPY config_db.py . + +# Run the balancer unbuffered so logs show up immediately in Docker +CMD ["python", "-u", "HyperCore_balancer.py"] \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/Balancer/HyperCore_balancer.py b/specific_task/HyperCoreBalancerv2/Balancer/HyperCore_balancer.py new file mode 100644 index 0000000..9d96e13 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Balancer/HyperCore_balancer.py @@ -0,0 +1,733 @@ +#!/usr/bin/env python3 + +import os +import sys +import gc +import time +import signal +import random +import threading +import warnings +import traceback +import requests +from collections import deque +from statistics import mean +from typing import Dict, List, Optional, Tuple, Any +from dotenv import load_dotenv +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.write_api import SYNCHRONOUS + +# Load environment variables +load_dotenv() + +import config_db + +# ============================================================================= +# Configuration & Defaults +# ============================================================================= + +def get_config(key: str, default: Any, cast_type: type = str) -> Any: + """Helper to fetch and cast environment variables (used for configurables only).""" + value = os.getenv(key) + if value is None: + return default + try: + if cast_type == bool: + return value.lower() in ('true', '1', 'yes', 'y') + if cast_type == list: + return [ip.strip() for ip in value.split(',') if ip.strip()] + return cast_type(value) + except (ValueError, TypeError): + return default + +# Cluster Credentials & Endpoint (configurables — stay in .env) +SC_HOST = get_config('SC_HOST', "https://cluster-ip/rest/v1") +SC_USERNAME = get_config('SC_USERNAME', "admin") +SC_PASSWORD = get_config('SC_PASSWORD', "password") +SC_VERIFY_SSL = get_config('SC_VERIFY_SSL', False, bool) + +# InfluxDB (for writing migration events — configurables) +INFLUX_URL = get_config('INFLUX_URL', "http://influxdb:8086") +INFLUX_TOKEN = get_config('INFLUX_TOKEN', "super-secret-auth-token") +INFLUX_ORG = get_config('INFLUX_ORG', "hypercore") +INFLUX_BUCKET = get_config('INFLUX_BUCKET', "metrics") + +# Tunables are loaded from config_db at startup (seeded from env vars on first run) + +# ============================================================================= +# API Client (with automatic re-authentication on 401) +# ============================================================================= + +class HyperCoreApiClient: + def __init__(self, base_url: str, user: str, pw: str, verify: bool): + self.base_url = base_url.rstrip('/') + self.user = user + self.pw = pw + self.session = requests.Session() + self.session.verify = verify + + if not verify: + from urllib3.exceptions import InsecureRequestWarning + warnings.simplefilter('ignore', InsecureRequestWarning) + + def login(self) -> bool: + url = f"{self.base_url}/login" + payload = {"username": self.user, "password": self.pw} + try: + self.session.cookies.clear() + response = self.session.post(url, json=payload, timeout=10) + response.raise_for_status() + return True + except requests.exceptions.RequestException as e: + print(f"Login failed: {e}") + return False + + def close(self): + try: + self.session.post(f"{self.base_url}/logout", timeout=5) + except: + pass + self.session.close() + + def fetch(self, endpoint: str) -> Any: + response = self.session.get(f"{self.base_url}{endpoint}", timeout=15) + if response.status_code == 401: + ts = time.strftime("%Y-%m-%d %H:%M:%S") + print(f"[{ts}] Session expired (401). Re-authenticating...") + if self.login(): + response = self.session.get(f"{self.base_url}{endpoint}", timeout=15) + else: + raise requests.exceptions.HTTPError("Re-authentication failed after 401") + response.raise_for_status() + return response.json() + + def get_nodes(self): return self.fetch("/Node") + def get_vms(self): return self.fetch("/VirDomain") + def get_vm_stats(self): return self.fetch("/VirDomainStats") + + def get_task_status(self, task_tag: str) -> str: + try: + res = self.fetch(f"/TaskTag/{task_tag}") + return res[0]['state'] if res else "UNKNOWN" + except: + return "UNKNOWN" + + def migrate(self, vm_uuid: str, target_node: str) -> Dict: + action = [{ + "virDomainUUID": vm_uuid, + "actionType": "LIVEMIGRATE", + "nodeUUID": target_node + }] + resp = self.session.post(f"{self.base_url}/VirDomain/action", json=action, timeout=15) + if resp.status_code == 401: + ts = time.strftime("%Y-%m-%d %H:%M:%S") + print(f"[{ts}] Session expired (401) during migration. Re-authenticating...") + if self.login(): + resp = self.session.post(f"{self.base_url}/VirDomain/action", json=action, timeout=15) + else: + raise requests.exceptions.HTTPError("Re-authentication failed during migration") + resp.raise_for_status() + return resp.json() + +# ============================================================================= +# Migration Event Logger (writes structured events to InfluxDB) +# ============================================================================= + +class MigrationLogger: + """Writes migration events to InfluxDB so the dashboard can display them. + + Measurement: migration_events + Tags: engine, mode, dry_run, vm_name, source_ip, target_ip + Fields: vm_uuid, reason + """ + + def __init__(self, url: str, token: str, org: str, bucket: str): + self.bucket = bucket + try: + self.client = InfluxDBClient(url=url, token=token, org=org) + self.write_api = self.client.write_api(write_options=SYNCHRONOUS) + self.available = True + except Exception as e: + print(f"[MIGRATION LOGGER] Could not connect to InfluxDB: {e}") + self.available = False + + def log_event(self, vm_name: str, vm_uuid: str, source_ip: str, target_ip: str, + engine: str, mode: str, dry_run: bool, reason: str = ""): + if not self.available: + return + try: + point = Point("migration_events") \ + .tag("engine", engine) \ + .tag("mode", mode) \ + .tag("dry_run", str(dry_run).lower()) \ + .tag("vm_name", vm_name) \ + .tag("source_ip", source_ip) \ + .tag("target_ip", target_ip) \ + .field("vm_uuid", vm_uuid) \ + .field("reason", reason) + self.write_api.write(bucket=self.bucket, record=point) + except Exception as e: + print(f"[MIGRATION LOGGER] Failed to write event: {e}") + + def close(self): + if self.available: + try: + self.client.close() + except: + pass + +# ============================================================================= +# Balancer Engine +# ============================================================================= + +class LoadBalancer: + def __init__(self, client: HyperCoreApiClient, config: Dict, migration_logger: MigrationLogger): + self.client = client + self.config = config + self.migration_logger = migration_logger + self.max_history = int((config['AVG_WINDOW_MINUTES'] * 60) / config['SAMPLE_INTERVAL_SECONDS']) + + self.node_cpu_history = {} + self.node_ram_history = {} + self.vm_cpu_history = {} + + self.vm_cooldowns = {} + self.last_migration_time = 0 + self.active_task = None + + self.last_predictive_run = 0 + self.reserved_nodes = {} + + self._predictive_thread = None + self._predictive_results = None + self._predictive_overrun_logged = False + self._pending_predictive_recs = [] + + self._last_config_check = 0 + + def reload_config(self): + """Hot-reload tunables from SQLite. Logs any changed values.""" + new = config_db.get_all() + for key in list(self.config.keys()): + if key in new and new[key] != self.config[key]: + self.log(1, f"[CONFIG] {key}: {self.config[key]} → {new[key]}") + self.config[key] = new[key] + # Recalculate derived window size; clear histories if it changed + new_max = int((self.config['AVG_WINDOW_MINUTES'] * 60) / self.config['SAMPLE_INTERVAL_SECONDS']) + if new_max != self.max_history: + self.max_history = new_max + self.node_cpu_history.clear() + self.node_ram_history.clear() + self.vm_cpu_history.clear() + self.log(0, f"[CONFIG] History window resized to {self.max_history} samples — buffers cleared.") + + def log(self, level: int, message: str): + if level == 0 or self.config['VERBOSITY'] >= level: + ts = time.strftime("%Y-%m-%d %H:%M:%S") + print(f"[{ts}] {message}") + + def update_metrics(self, nodes, vm_stats): + for n in nodes: + uid = n['uuid'] + if uid not in self.node_cpu_history: + self.node_cpu_history[uid] = deque(maxlen=self.max_history) + self.node_ram_history[uid] = deque(maxlen=self.max_history) + self.node_cpu_history[uid].append(n.get('cpuUsage', 0.0)) + mem_pct = (n.get('totalMemUsageBytes', 0) / n.get('memSize', 1)) * 100 + self.node_ram_history[uid].append(mem_pct) + + for s in vm_stats: + uid = s['uuid'] + if uid not in self.vm_cpu_history: + self.vm_cpu_history[uid] = deque(maxlen=self.max_history) + self.vm_cpu_history[uid].append(s.get('cpuUsage', 0.0)) + + def evaluate_cluster(self, nodes, vms) -> Dict: + state = {} + for n in nodes: + uid = n['uuid'] + lan_ip = n.get('lanIP', uid) + is_excluded = lan_ip in self.config['EXCLUDE_NODE_IPS'] + vms_on_node = [] + + for vm in vms: + if vm.get('nodeUUID') == uid and vm.get('state') == 'RUNNING': + avg_cpu = mean(self.vm_cpu_history.get(vm['uuid'], [0])) + vms_on_node.append({ + "uuid": vm['uuid'], + "name": vm.get('name', 'Unknown'), + "mem": vm.get('mem', 0), + "vcpus": vm.get('numVCPU', 1), + "avg_cpu": avg_cpu, + "tags": vm.get('tags', "") + }) + + threads = n.get('numThreads', 1) + allocated_vcpus = sum(vm['vcpus'] for vm in vms_on_node) + is_usable = n.get('networkStatus') == 'ONLINE' and n.get('allowRunningVMs', True) + + state[uid] = { + "uuid": uid, + "name": lan_ip, + "avg_cpu": mean(self.node_cpu_history.get(uid, [0])), + "avg_ram": mean(self.node_ram_history.get(uid, [0])), + "used_ram": n.get('totalMemUsageBytes', 0), + "total_ram": n.get('memSize', 1), + "threads": threads, + "v_ratio": allocated_vcpus / threads, + "is_usable": is_usable and not is_excluded, + "is_excluded": is_excluded, + "vms": sorted(vms_on_node, key=lambda x: x['avg_cpu'], reverse=True) + } + return state + + def select_candidate(self, state: Dict) -> Tuple: + usable = [u for u in state.values() if u['is_usable']] + if not usable: + self.log(2, "No usable nodes found (all nodes may be excluded or offline).") + return None, None, None, "NONE" + + peak_cpu_node = max(usable, key=lambda x: x['avg_cpu']) + cpu_threshold = self.config['CPU_UPPER_THRESHOLD_PERCENT'] + + primary = "CPU_FOCUS" if peak_cpu_node['avg_cpu'] >= cpu_threshold else "RAM_FOCUS" + modes = [primary] + if primary == "CPU_FOCUS": + modes.append("RAM_FOCUS") + + for mode in modes: + self.log(2, f"Evaluating {mode} (Peak Cluster CPU: {peak_cpu_node['avg_cpu']:.1f}%)") + sources = sorted(usable, key=lambda x: x['avg_cpu'] if mode == "CPU_FOCUS" else (x['avg_ram'], x['v_ratio']), reverse=True) + + for src in sources: + if mode == "CPU_FOCUS" and src['avg_cpu'] < cpu_threshold: continue + if mode == "RAM_FOCUS" and src['avg_ram'] < self.config['RAM_UPPER_THRESHOLD_PERCENT']: continue + + for vm in src['vms']: + if f"node_{src['name'].split('.')[-1]}" in vm['tags']: + self.log(3, f" - Skipping {vm['name']} (pinned to {src['name']})") + continue + + cd_remaining = (self.config['VM_MOVE_COOLDOWN_MINUTES'] * 60) - (time.time() - self.vm_cooldowns.get(vm['uuid'], 0)) + if cd_remaining > 0: + self.log(3, f" - Skipping {vm['name']} (cooldown active)") + continue + + # Anti-affinity sets for the candidate VM + vm_anti = {t.strip()[5:] for t in vm['tags'].split(',') if t.strip().startswith('anti_')} + + targets = sorted(usable, key=lambda x: x['avg_ram'] if mode == "RAM_FOCUS" else x['avg_cpu']) + for tgt in targets: + if tgt['name'] == src['name']: continue + + if tgt['uuid'] in self.reserved_nodes: + if time.time() < self.reserved_nodes[tgt['uuid']]: + self.log(3, f" - Skipping target node {tgt['name']} (Reserved for impending predicted spike)") + continue + else: + del self.reserved_nodes[tgt['uuid']] + + # Anti-affinity: skip if candidate's anti_X tag matches a VM name on target, + # or if any VM on target has anti_ pointing back at us + tgt_names = {v['name'] for v in tgt['vms']} + tgt_anti = {t.strip()[5:] for v in tgt['vms'] for t in v['tags'].split(',') if t.strip().startswith('anti_')} + if vm_anti & tgt_names: + self.log(2, f" - Skipping target {tgt['name']}: anti-affinity ({vm_anti & tgt_names})") + continue + if vm['name'] in tgt_anti: + self.log(2, f" - Skipping target {tgt['name']}: anti-affinity (reverse block on {vm['name']})") + continue + + p_ram = ((tgt['used_ram'] + vm['mem']) / tgt['total_ram']) * 100 + p_ratio = (sum(v['vcpus'] for v in tgt['vms']) + vm['vcpus']) / tgt['threads'] + + if p_ram > self.config['RAM_LIMIT_PERCENT'] or p_ratio > self.config['MAX_VCPU_RATIO']: + continue + + if mode == "CPU_FOCUS" and tgt['avg_cpu'] >= src['avg_cpu']: continue + + self.log(2, f"Candidate found: {vm['name']} via {mode}") + return vm, src, tgt, mode + return None, None, None, primary + + def find_affinity_violation(self, usable): + """ + Highest-priority pass: detect and return one remediation move for any affinity violation. + Bypasses migration cooldown and per-VM cooldown — affinity is always fixed first. + + Checks (in priority order): + 1. Anti-affinity: VM with anti_X tag sharing a node with VM named X + 2. Node-pin: VM with node_X tag residing on the wrong node + + If the ideal target is at capacity, returns a 'make room' move instead — + the lightest evictable VM on the target is moved out so the real fix + can execute next cycle. + + Returns (vm, src_node, tgt_node, reason) or (None, None, None, None). + """ + + def _can_accept(tgt, vm): + p_ram = ((tgt['used_ram'] + vm['mem']) / tgt['total_ram']) * 100 + p_ratio = (sum(v['vcpus'] for v in tgt['vms']) + vm['vcpus']) / tgt['threads'] + return (p_ram <= self.config['RAM_LIMIT_PERCENT'] and + p_ratio <= self.config['MAX_VCPU_RATIO']) + + def _anti_ok(vm_name, vm_anti, tgt): + tgt_names = {v['name'] for v in tgt['vms']} + tgt_anti = {t.strip()[5:] for v in tgt['vms'] + for t in v['tags'].split(',') if t.strip().startswith('anti_')} + return not (vm_anti & tgt_names) and vm_name not in tgt_anti + + def _pinned_suffix(vm): + for t in vm['tags'].split(','): + t = t.strip() + if t.startswith('node_'): + return t[5:] + return None + + def _find_room_move(target_node): + """Evict the lightest non-pinned VM from target_node to any valid destination.""" + for blocker in sorted(target_node['vms'], key=lambda v: v['mem']): + if _pinned_suffix(blocker) == target_node['name'].split('.')[-1]: + continue # Cannot evict a VM pinned to this node + blocker_anti = {t.strip()[5:] for t in blocker['tags'].split(',') + if t.strip().startswith('anti_')} + for dest in sorted(usable, key=lambda n: n['avg_cpu']): + if dest['uuid'] == target_node['uuid']: + continue + if not _anti_ok(blocker['name'], blocker_anti, dest): + continue + if not _can_accept(dest, blocker): + continue + return blocker, dest + return None, None + + # ── 1. Anti-affinity violations ───────────────────────────────────── + for src in usable: + names_here = {v['name'] for v in src['vms']} + for vm in src['vms']: + vm_anti = {t.strip()[5:] for t in vm['tags'].split(',') + if t.strip().startswith('anti_')} + conflicts = vm_anti & names_here + if not conflicts: + continue + + self.log(1, f"[AFFINITY] VIOLATION: {vm['name']} shares {src['name']} " + f"with {conflicts} — remediating.") + + for tgt in sorted(usable, key=lambda n: n['avg_cpu']): + if tgt['uuid'] == src['uuid']: + continue + pin = _pinned_suffix(vm) + if pin and pin != tgt['name'].split('.')[-1]: + continue # VM is also node-pinned — respect that constraint + if not _anti_ok(vm['name'], vm_anti, tgt): + continue + if _can_accept(tgt, vm): + return (vm, src, tgt, + f"Anti-affinity: {vm['name']} must not share node with {conflicts}") + blocker, room_tgt = _find_room_move(tgt) + if blocker: + return (blocker, tgt, room_tgt, + f"Making space for {vm['name']}") + + self.log(1, f"[AFFINITY] Cannot fix anti-affinity for {vm['name']} — " + f"no valid target available. Cluster may be physically full.") + + # ── 2. Node-pin violations ─────────────────────────────────────────── + for src in usable: + src_suffix = src['name'].split('.')[-1] + for vm in src['vms']: + pin = _pinned_suffix(vm) + if not pin or pin == src_suffix: + continue # Not pinned, or already on correct node + + tgt = next((n for n in usable if n['name'].split('.')[-1] == pin), None) + if not tgt: + self.log(1, f"[AFFINITY] Cannot fix pin for {vm['name']}: " + f"node_{pin} is not in the usable set.") + continue + + self.log(1, f"[AFFINITY] VIOLATION: {vm['name']} is on {src['name']} " + f"but pinned to {tgt['name']} — remediating.") + + vm_anti = {t.strip()[5:] for t in vm['tags'].split(',') + if t.strip().startswith('anti_')} + if not _anti_ok(vm['name'], vm_anti, tgt): + self.log(1, f"[AFFINITY] Cannot fix pin for {vm['name']}: " + f"anti-affinity conflict on {tgt['name']}.") + continue + if _can_accept(tgt, vm): + return (vm, src, tgt, + f"Pin violation: {vm['name']} belongs on node_{pin} ({tgt['name']})") + blocker, room_tgt = _find_room_move(tgt) + if blocker: + return (blocker, tgt, room_tgt, + f"Making space for {vm['name']}") + self.log(1, f"[AFFINITY] Cannot make room on {tgt['name']} for {vm['name']}.") + + return None, None, None, None + + def _run_predictive_background(self, vms_snapshot, nodes_snapshot): + """Runs the predictive engine in a background thread, then discards the snapshot.""" + try: + from predictive_engine import get_proactive_migrations + self._predictive_results = get_proactive_migrations(vms_snapshot, nodes_snapshot) + except ImportError: + self.log(0, "[PREDICTIVE ERROR] Could not load predictive_engine. Are 'pandas' and 'prophet' installed?") + self._predictive_results = [] + except Exception as e: + self.log(0, f"[PREDICTIVE ERROR] Engine failed: {e}") + traceback.print_exc() + self._predictive_results = [] + finally: + del vms_snapshot, nodes_snapshot + gc.collect() + + def start(self): + mode_str = "DRY" if self.config['DRY_RUN'] else "LIVE" + self.log(0, f"Balancer Start (Mode: {mode_str})") + + if self.config['PREDICTIVE_BALANCING_ENABLED']: + self.log(0, "🔮 Predictive Balancing Module is ENABLED.") + + try: + while True: + # Hot-reload tunables every 60 seconds + now_ts = time.time() + if now_ts - self._last_config_check > 60: + self.reload_config() + self._last_config_check = now_ts + + if self.active_task: + status = self.client.get_task_status(self.active_task['tag']) + if status in ["COMPLETE", "ERROR"]: + self.log(0, f"Migration {status} for {self.active_task['vm']}") + self.active_task = None + self.last_migration_time = time.time() + time.sleep(10); continue + + try: + nodes, vms, stats = self.client.get_nodes(), self.client.get_vms(), self.client.get_vm_stats() + self.update_metrics(nodes, stats) + except Exception as e: + self.log(0, f"API Fetch Error: {e}"); time.sleep(30); continue + + # Evaluate cluster state once — shared by affinity, predictive, and reactive engines + state = self.evaluate_cluster(nodes, vms) + usable = [n for n in state.values() if n['is_usable']] + + # ================================================================= + # AFFINITY ENFORCEMENT — highest priority, bypasses all cooldowns + # ================================================================= + aff_vm, aff_src, aff_tgt, aff_reason = self.find_affinity_violation(usable) + if aff_vm: + info = f"{aff_vm['name']} | {aff_src['name']} -> {aff_tgt['name']} (Affinity: {aff_reason})" + if self.config['DRY_RUN']: + self.log(0, f"[AFFINITY DRY RUN] Would move {info}") + self.vm_cooldowns[aff_vm['uuid']] = time.time() + self.last_migration_time = time.time() + else: + self.log(0, f"[AFFINITY ACTION] {info}") + try: + task = self.client.migrate(aff_vm['uuid'], aff_tgt['uuid']) + self.active_task = {"tag": task['taskTag'], "vm": aff_vm['name']} + self.vm_cooldowns[aff_vm['uuid']] = time.time() + self.last_migration_time = time.time() + except Exception as e: + self.log(0, f"[AFFINITY ERROR] Migration rejected for {aff_vm['name']}: {e}") + self.vm_cooldowns[aff_vm['uuid']] = time.time() + aff_mode = "anti-affinity" if "anti-affinity" in aff_reason.lower() else "node-affinity" + self.migration_logger.log_event( + vm_name=aff_vm['name'], vm_uuid=aff_vm['uuid'], + source_ip=aff_src['name'], target_ip=aff_tgt['name'], + engine="Affinity", mode=aff_mode, + dry_run=self.config['DRY_RUN'], reason="affinity-violation" + ) + time.sleep(self.config['SAMPLE_INTERVAL_SECONDS']) + continue + + # Non-affinity migration cooldown + elapsed = time.time() - self.last_migration_time + cluster_cd = self.config['MIGRATION_COOLDOWN_MINUTES'] * 60 + if elapsed < cluster_cd: + time.sleep(self.config['SAMPLE_INTERVAL_SECONDS']); continue + + # ================================================================= + # OPTIONAL: PREDICTIVE BALANCING + # ================================================================= + if self.config['PREDICTIVE_BALANCING_ENABLED']: + applied_predictive = False + now = time.time() + lead_time_seconds = self.config['PREDICTIVE_LEAD_TIME_HOURS'] * 3600 + + # --- Store results from a completed forecast thread --- + if self._predictive_thread is not None and not self._predictive_thread.is_alive(): + recommendations = self._predictive_results or [] + self._predictive_results = None + self._predictive_thread = None + self._predictive_overrun_logged = False + + if recommendations: + current_vms = {vm['uuid']: vm for vm in vms} + current_node_uuids = {n['uuid'] for n in nodes} + valid_recs = [] + for rec in recommendations: + vm_current = current_vms.get(rec.vm_uuid) + if not vm_current: + self.log(0, f"[PREDICTIVE] Skipping stale recommendation: VM '{rec.vm_name}' no longer exists in cluster.") + continue + if vm_current.get('nodeUUID') != rec.source_node: + self.log(0, f"[PREDICTIVE] Skipping stale recommendation: VM '{rec.vm_name}' has already moved to a different node.") + continue + if rec.target_node not in current_node_uuids: + self.log(0, f"[PREDICTIVE] Skipping stale recommendation: target node for '{rec.vm_name}' is no longer available.") + continue + valid_recs.append(rec) + + self._pending_predictive_recs = valid_recs + if valid_recs: + earliest = min(valid_recs, key=lambda r: r.peak_time) + earliest_str = time.strftime('%Y-%m-%d %H:%M', time.localtime(earliest.peak_time)) + self.log(0, f"[PREDICTIVE] {len(valid_recs)} recommendation(s) queued. Earliest peak at {earliest_str}.") + + # Reserve all source nodes immediately — don't wait for each migration + # to execute. This prevents the reactive engine from routing VMs back + # onto a node the Oracle just decided is going to be overloaded. + reservation_secs = self.config.get('PREDICTIVE_RESERVATION_MINUTES', 5) * 60 + seen_sources = set() + for rec in valid_recs: + if rec.source_node in seen_sources: + continue + seen_sources.add(rec.source_node) + release_time = rec.peak_time + reservation_secs + existing = self.reserved_nodes.get(rec.source_node, 0) + if release_time > existing: + self.reserved_nodes[rec.source_node] = release_time + fmt = time.strftime('%Y-%m-%d %H:%M', time.localtime(release_time)) + self.log(0, f"[RESERVATION] Node {rec.source_ip} locked as target until {fmt} (Oracle: predicted overload)") + + # --- Discard recommendations whose peak has already passed --- + self._pending_predictive_recs = [r for r in self._pending_predictive_recs if r.peak_time > now] + + # --- Act on any recommendation whose peak is within the lead time window --- + ready_recs = [r for r in self._pending_predictive_recs if r.peak_time - now <= lead_time_seconds] + if ready_recs: + rec = ready_recs[0] + self._pending_predictive_recs.remove(rec) + info = f"{rec.vm_name} | {rec.source_ip} -> {rec.target_ip} (Predictive: {rec.reason})" + + if self.config['DRY_RUN']: + self.log(0, f"[PREDICTIVE DRY RUN] Would move {info}") + self.vm_cooldowns[rec.vm_uuid] = time.time() + self.last_migration_time = time.time() + else: + self.log(0, f"[PREDICTIVE ACTION] Migrating {info}") + try: + task = self.client.migrate(rec.vm_uuid, rec.target_node) + self.active_task = {"tag": task['taskTag'], "vm": rec.vm_name} + self.vm_cooldowns[rec.vm_uuid] = time.time() + self.last_migration_time = time.time() + except Exception as e: + self.log(0, f"[PREDICTIVE ERROR] Migration request rejected for {rec.vm_name}: {e}") + self.vm_cooldowns[rec.vm_uuid] = time.time() + applied_predictive = False + continue + + self.migration_logger.log_event( + vm_name=rec.vm_name, vm_uuid=rec.vm_uuid, + source_ip=rec.source_ip, target_ip=rec.target_ip, + engine="predictive", mode="PREDICTIVE", + dry_run=self.config['DRY_RUN'], reason=rec.reason + ) + + reservation_secs = self.config.get('PREDICTIVE_RESERVATION_MINUTES', 5) * 60 + release_time = rec.peak_time + reservation_secs + existing = self.reserved_nodes.get(rec.source_node, 0) + if release_time > existing: + self.reserved_nodes[rec.source_node] = release_time + formatted_time = time.strftime('%Y-%m-%d %H:%M', time.localtime(release_time)) + self.log(0, f"[RESERVATION] Node {rec.source_ip} locked from receiving VMs until {formatted_time}") + applied_predictive = True + + # --- Start a new forecast thread if interval has elapsed --- + jitter = random.uniform(0, self.config['PREDICTIVE_INTERVAL_SECONDS'] * 0.1) + if time.time() - self.last_predictive_run > (self.config['PREDICTIVE_INTERVAL_SECONDS'] + jitter): + if self._predictive_thread is not None and self._predictive_thread.is_alive(): + if not self._predictive_overrun_logged: + self.log(0, "[PREDICTIVE] WARNING: Previous forecast is still running — skipping this interval.") + self._predictive_overrun_logged = True + else: + self.log(0, "[PREDICTIVE] Starting background forecast thread.") + vms_snapshot = list(vms) + nodes_snapshot = list(nodes) + self._predictive_thread = threading.Thread( + target=self._run_predictive_background, + args=(vms_snapshot, nodes_snapshot), + daemon=True, + name="PredictiveEngine" + ) + self._predictive_thread.start() + self.last_predictive_run = time.time() + + if self.active_task or (self.config['DRY_RUN'] and applied_predictive): + time.sleep(self.config['SAMPLE_INTERVAL_SECONDS']) + continue + + # ================================================================= + # REACTIVE BALANCING + # ================================================================= + first_uid = nodes[0]['uuid'] if nodes else None + if first_uid and len(self.node_cpu_history.get(first_uid, [])) < self.max_history: + self.log(2, f"Buffer: {len(self.node_cpu_history[first_uid])}/{self.max_history}") + time.sleep(self.config['SAMPLE_INTERVAL_SECONDS']); continue + + vm, src, tgt, mode = self.select_candidate(state) + + if vm: + info = f"{vm['name']} | {src['name']} -> {tgt['name']} ({mode})" + if self.config['DRY_RUN']: + self.log(0, f"[REACTIVE DRY RUN] Would move {info}") + self.last_migration_time = time.time() + self.vm_cooldowns[vm['uuid']] = time.time() + self.node_cpu_history.clear() + else: + self.log(0, f"[REACTIVE ACTION] Migrating {info}") + try: + task = self.client.migrate(vm['uuid'], tgt['uuid']) + self.active_task = {"tag": task['taskTag'], "vm": vm['name']} + self.vm_cooldowns[vm['uuid']] = time.time() + except Exception as e: + self.log(0, f"[REACTIVE ERROR] Migration request rejected for {vm['name']}: {e}") + self.vm_cooldowns[vm['uuid']] = time.time() + time.sleep(self.config['SAMPLE_INTERVAL_SECONDS']) + continue + + # Log the event to InfluxDB for the dashboard + self.migration_logger.log_event( + vm_name=vm['name'], vm_uuid=vm['uuid'], + source_ip=src['name'], target_ip=tgt['name'], + engine="reactive", mode=mode, + dry_run=self.config['DRY_RUN'], + reason=f"Reactive {mode} balancing" + ) + + time.sleep(self.config['SAMPLE_INTERVAL_SECONDS']) + except KeyboardInterrupt: self.log(0, "Shutdown requested.") + finally: + self.client.close() + self.migration_logger.close() + +def _handle_sigterm(*_): + raise KeyboardInterrupt() + +if __name__ == "__main__": + signal.signal(signal.SIGTERM, _handle_sigterm) + config_db.seed_from_env() + CONFIG = config_db.get_all() + client = HyperCoreApiClient(SC_HOST, SC_USERNAME, SC_PASSWORD, SC_VERIFY_SSL) + migration_logger = MigrationLogger(INFLUX_URL, INFLUX_TOKEN, INFLUX_ORG, INFLUX_BUCKET) + if client.login(): + LoadBalancer(client, CONFIG, migration_logger).start() + else: + sys.exit("Login failed. Check credentials.") \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/Balancer/config_db.py b/specific_task/HyperCoreBalancerv2/Balancer/config_db.py new file mode 100644 index 0000000..7b9072a --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Balancer/config_db.py @@ -0,0 +1,214 @@ +""" +config_db.py — Live-tunable configuration store +================================================ +SQLite-backed key/value store for runtime-tunable settings. +The Dashboard seeds and owns the database; Balancer and Collector +poll it each loop iteration for live changes without restarting. + +Shared Docker volume: sc-config → /config (all three containers) +Database file: /config/tunables.db + +On first run, values are seeded from environment variables (falling +back to hardcoded defaults). After that, env vars are ignored — +the database wins. +""" + +import os +import sqlite3 +import threading +from typing import Any + +DB_PATH = os.getenv('SC_CONFIG_DB_PATH', '/config/tunables.db') + +_lock = threading.Lock() + +# Registry: key → (env_var_name, hardcoded_default, type_hint) +TUNABLES = { + # ── Balancer: Reactive ────────────────────────────────────────────────── + 'DRY_RUN': ('SC_DRY_RUN', True, 'bool'), + 'VERBOSITY': ('SC_VERBOSITY', 3, 'int'), + 'AVG_WINDOW_MINUTES': ('SC_AVG_WINDOW_MINUTES', 5, 'int'), + 'SAMPLE_INTERVAL_SECONDS': ('SC_SAMPLE_INTERVAL_SECONDS', 30, 'int'), + 'RAM_LIMIT_PERCENT': ('SC_RAM_LIMIT_PERCENT', 85.0, 'float'), + 'RAM_UPPER_THRESHOLD_PERCENT': ('SC_RAM_UPPER_THRESHOLD_PERCENT', 80.0, 'float'), + 'CPU_UPPER_THRESHOLD_PERCENT': ('SC_CPU_UPPER_THRESHOLD_PERCENT', 50.0, 'float'), + 'MAX_VCPU_RATIO': ('SC_MAX_VCPU_RATIO', 2.0, 'float'), + 'VM_MOVE_COOLDOWN_MINUTES': ('SC_VM_MOVE_COOLDOWN_MINUTES', 30, 'int'), + 'MIGRATION_COOLDOWN_MINUTES': ('SC_MIGRATION_COOLDOWN_MINUTES', 5, 'int'), + 'EXCLUDE_NODE_IPS': ('SC_EXCLUDE_NODE_IPS', '', 'list'), + # ── Balancer: Predictive ──────────────────────────────────────────────── + 'PREDICTIVE_BALANCING_ENABLED': ('SC_PREDICTIVE_BALANCING_ENABLED', False, 'bool'), + 'PREDICTIVE_INTERVAL_SECONDS': ('SC_PREDICTIVE_INTERVAL_SECONDS', 43200, 'int'), + 'PREDICTIVE_LEAD_TIME_HOURS': ('SC_PREDICTIVE_LEAD_TIME_HOURS', 1, 'int'), + 'PREDICTIVE_THRESHOLD': ('SC_PREDICTIVE_THRESHOLD', 80.0, 'float'), + 'PREDICTIVE_MIN_HISTORY_HOURS': ('SC_PREDICTIVE_MIN_HISTORY_HOURS', 336, 'int'), + 'PREDICTIVE_LOOKBACK_DAYS': ('SC_PREDICTIVE_LOOKBACK_DAYS', 90, 'int'), + 'PREDICTIVE_MAX_WORKERS': ('SC_PREDICTIVE_MAX_WORKERS', 8, 'int'), + 'PREDICTIVE_RESERVATION_MINUTES': ('SC_PREDICTIVE_RESERVATION_MINUTES', 5, 'int'), + # ── Collector ─────────────────────────────────────────────────────────── + 'POLL_INTERVAL': ('SC_POLL_INTERVAL', 30, 'int'), + 'SLOW_POLL_INTERVAL': ('SC_SLOW_POLL_INTERVAL', 300, 'int'), + 'INFLUX_WRITE_RETRIES': ('INFLUX_WRITE_RETRIES', 3, 'int'), + 'INFLUX_RETRY_DELAY': ('INFLUX_RETRY_DELAY', 5, 'int'), + # ── Dashboard ─────────────────────────────────────────────────────────── + 'STALE_SECONDS': ('SC_DASHBOARD_STALE_SECONDS', 120, 'int'), +} + + +def _cast(value: str, type_hint: str) -> Any: + if type_hint == 'bool': + return str(value).lower() in ('true', '1', 'yes', 'y') + if type_hint == 'int': + return int(float(value)) # float() first handles "30.0" edge case + if type_hint == 'float': + return float(value) + if type_hint == 'list': + return [ip.strip() for ip in str(value).split(',') if ip.strip()] + return str(value) + + +def _serialize(value: Any, type_hint: str) -> str: + if type_hint == 'list': + return ','.join(str(v) for v in value) if isinstance(value, list) else str(value) + if type_hint == 'bool': + return 'true' if value else 'false' + return str(value) + + +def _connect() -> sqlite3.Connection: + dirpath = os.path.dirname(DB_PATH) + if dirpath: + os.makedirs(dirpath, exist_ok=True) + conn = sqlite3.connect(DB_PATH, check_same_thread=False) + conn.execute(''' + CREATE TABLE IF NOT EXISTS tunables ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + type TEXT NOT NULL, + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + ''') + conn.commit() + return conn + + +def seed_from_env() -> None: + """ + Called once at service startup. + - Missing keys are inserted from env var (or hardcoded default). + - Existing keys are updated from env var only if their stored value still + matches the hardcoded default (i.e. never touched by the user via the UI). + This lets env vars fill in values that were seeded empty on a previous run + before the env var was available, without clobbering user changes. + """ + with _lock: + conn = _connect() + try: + for key, (env_var, default, type_hint) in TUNABLES.items(): + raw_env = os.getenv(env_var) or None # treat empty string same as unset + row = conn.execute( + 'SELECT value FROM tunables WHERE key = ?', (key,) + ).fetchone() + if not row: + value = _cast(raw_env, type_hint) if raw_env is not None else default + conn.execute( + 'INSERT INTO tunables (key, value, type) VALUES (?, ?, ?)', + (key, _serialize(value, type_hint), type_hint) + ) + elif raw_env is not None and row[0] == _serialize(default, type_hint): + # Still at hardcoded default — env var now available, apply it + conn.execute( + 'UPDATE tunables SET value = ?, updated_at = datetime(\'now\') WHERE key = ?', + (_serialize(_cast(raw_env, type_hint), type_hint), key) + ) + conn.commit() + finally: + conn.close() + + +def get(key: str) -> Any: + """Return a single tunable cast to its Python type.""" + with _lock: + conn = _connect() + try: + row = conn.execute( + 'SELECT value, type FROM tunables WHERE key = ?', (key,) + ).fetchone() + finally: + conn.close() + if row is None: + entry = TUNABLES.get(key) + return entry[1] if entry else None + return _cast(row[0], row[1]) + + +def get_all() -> dict: + """Return all tunables as {key: typed_value}.""" + with _lock: + conn = _connect() + try: + rows = conn.execute( + 'SELECT key, value, type FROM tunables' + ).fetchall() + finally: + conn.close() + return {r[0]: _cast(r[1], r[2]) for r in rows} + + +def save(key: str, value: Any) -> None: + """Upsert a single tunable.""" + if key not in TUNABLES: + raise ValueError(f'Unknown tunable: {key}') + _, _, type_hint = TUNABLES[key] + with _lock: + conn = _connect() + try: + conn.execute( + '''INSERT INTO tunables (key, value, type, updated_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at''', + (key, _serialize(value, type_hint), type_hint) + ) + conn.commit() + finally: + conn.close() + + +def save_many(updates: dict) -> None: + """Upsert multiple tunables in one transaction. Unknown keys are silently skipped.""" + with _lock: + conn = _connect() + try: + for key, value in updates.items(): + if key not in TUNABLES: + continue + _, _, type_hint = TUNABLES[key] + conn.execute( + '''INSERT INTO tunables (key, value, type, updated_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at''', + (key, _serialize(value, type_hint), type_hint) + ) + conn.commit() + finally: + conn.close() + + +def get_with_meta() -> list: + """Return all tunables with type and timestamp — for the settings API.""" + with _lock: + conn = _connect() + try: + rows = conn.execute( + 'SELECT key, value, type, updated_at FROM tunables ORDER BY key' + ).fetchall() + finally: + conn.close() + return [ + {'key': r[0], 'value': _cast(r[1], r[2]), 'type': r[2], 'updated_at': r[3]} + for r in rows + ] diff --git a/specific_task/HyperCoreBalancerv2/Balancer/predictive_engine.py b/specific_task/HyperCoreBalancerv2/Balancer/predictive_engine.py new file mode 100644 index 0000000..ac9380d --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Balancer/predictive_engine.py @@ -0,0 +1,262 @@ +import os +import re +import gc +import warnings +import concurrent.futures +import pandas as pd +from prophet import Prophet +from influxdb_client import InfluxDBClient +import logging + +# Suppress Prophet's chatty terminal output +warnings.filterwarnings("ignore") +logging.getLogger('cmdstanpy').setLevel(logging.WARNING) + +import config_db + +# InfluxDB Configuration (configurables — stay in .env) +INFLUX_URL = os.getenv('INFLUX_URL', "http://localhost:8086") +INFLUX_TOKEN = os.getenv('INFLUX_TOKEN', "super-secret-auth-token") +INFLUX_ORG = os.getenv('INFLUX_ORG', "hypercore") +INFLUX_BUCKET = os.getenv('INFLUX_BUCKET', "metrics") + +# Tunables are read live from config_db at forecast time (see get_proactive_migrations) + + +class ProactiveMigration: + """A simple data object to hold a migration recommendation.""" + def __init__(self, vm_uuid, vm_name, source_node, target_node, reason, peak_time, source_ip, target_ip): + self.vm_uuid = vm_uuid + self.vm_name = vm_name + self.source_node = source_node + self.target_node = target_node + self.reason = reason + self.peak_time = peak_time + self.source_ip = source_ip + self.target_ip = target_ip + + +def _sanitize_uuid(value: str) -> str: + """Validates that a value looks like a UUID to prevent Flux query injection.""" + if not re.match(r'^[a-fA-F0-9\-]+$', value): + raise ValueError(f"Invalid UUID format: {value}") + return value + + +def get_proactive_migrations(vms, nodes, forecast_hours=24): + # Read tunables live so each Oracle run picks up any dashboard changes + threshold = config_db.get('PREDICTIVE_THRESHOLD') or 80.0 + min_history = config_db.get('PREDICTIVE_MIN_HISTORY_HOURS') or 336 + lookback_days = config_db.get('PREDICTIVE_LOOKBACK_DAYS') or 90 + max_workers = config_db.get('PREDICTIVE_MAX_WORKERS') or 8 + print(f"[ORACLE] Waking up. Forecasting the next {forecast_hours} hours across {len(vms)} VMs " + f"using up to {max_workers} workers (lookback={lookback_days}d, threshold={threshold}%)...") + return _run_forecast(vms, nodes, forecast_hours, lookback_days, min_history, max_workers, threshold) + + +def _forecast_single_vm(vm, forecast_hours, lookback_days, min_history_hours): + """Forecast CPU load for a single VM. Creates its own DB connection.""" + vm_uuid = vm.get('uuid') + vm_name = vm.get('name', 'Unknown') + current_node = vm.get('nodeUUID') + vcpus = vm.get('numVCPU', 1) + + if not current_node: + return None + + try: + safe_uuid = _sanitize_uuid(vm_uuid) + except ValueError as e: + print(f"[ORACLE] Skipping VM with bad UUID: {e}") + return None + + client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) + try: + query = f""" + from(bucket: "{INFLUX_BUCKET}") + |> range(start: -{lookback_days}d) + |> filter(fn: (r) => r["_measurement"] == "vm_metrics") + |> filter(fn: (r) => r["vm_uuid"] == "{safe_uuid}") + |> filter(fn: (r) => r["_field"] == "cpu_usage") + |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) + |> yield(name: "mean") + """ + result = client.query_api().query(org=INFLUX_ORG, query=query) + except Exception as e: + print(f"[ORACLE] Failed to fetch data for {vm_name}: {e}") + return None + finally: + client.close() + + records = [] + for table in result: + for record in table.records: + dt = record.get_time().replace(tzinfo=None) + records.append({'ds': dt, 'y': record.get_value()}) + + # min_history_hours (default 336 = 2 weeks) gives Prophet enough data for weekly patterns. + if len(records) < min_history_hours: + print(f"[ORACLE] Skipping {vm_name}: Not enough historical data ({len(records)} hours). Needs at least {min_history_hours}.") + return None + + df = pd.DataFrame(records) + del records + last_date = df['ds'].max() + + m = Prophet(daily_seasonality=15, weekly_seasonality=20, yearly_seasonality=False, seasonality_prior_scale=15.0) + m.fit(df) + del df + + future = m.make_future_dataframe(periods=forecast_hours, freq='h') + forecast = m.predict(future) + del m, future + + future_forecast = forecast[forecast['ds'] > last_date] + peak_pred = future_forecast.loc[future_forecast['yhat_upper'].idxmax()] + max_expected_cpu = min(peak_pred['yhat_upper'], 100.0) + peak_timestamp = peak_pred['ds'].timestamp() + del forecast, future_forecast + + return { + 'uuid': vm_uuid, + 'name': vm_name, + 'current_node': current_node, + 'raw_expected_cpu': max_expected_cpu, + 'vcpus': vcpus, + 'peak_timestamp': peak_timestamp + } + + +def _run_forecast(vms, nodes, forecast_hours, lookback_days, min_history_hours, max_workers, threshold): + """Core forecasting logic using a thread pool for parallel VM forecasting.""" + + node_threads = {n['uuid']: n.get('numThreads', 1) for n in nodes} + node_ips = {n['uuid']: n.get('lanIP', n['uuid'][:8]) for n in nodes} + + predicted_node_loads = {n['uuid']: 0.0 for n in nodes} + vm_forecasts = [] + recommendations = [] + + # Anti-affinity lookups — built from current placement, updated as moves are planned + # vm_anti_by_name: vm_name → set of VM names it must not share a node with + vm_anti_by_name = {} + # planned_node_vm_names: node_uuid → set of VM names planned to be there + planned_node_vm_names = {n['uuid']: set() for n in nodes} + for vm in vms: + name = vm.get('name', '') + tags_str = vm.get('tags', '') + anti = {t.strip()[5:] for t in tags_str.split(',') if t.strip().startswith('anti_')} + vm_anti_by_name[name] = anti + node_uuid = vm.get('nodeUUID') + if node_uuid and node_uuid in planned_node_vm_names: + planned_node_vm_names[node_uuid].add(name) + + # Phase 1: Parallel VM forecasting + with concurrent.futures.ThreadPoolExecutor(max_workers=max_workers) as executor: + futures = { + executor.submit(_forecast_single_vm, vm, forecast_hours, lookback_days, min_history_hours): vm.get('name', 'Unknown') + for vm in vms + } + for future in concurrent.futures.as_completed(futures): + try: + result = future.result() + except Exception as e: + print(f"[ORACLE] Unexpected error forecasting {futures[future]}: {e}") + continue + if result: + vm_forecasts.append(result) + current_node_threads = node_threads.get(result['current_node'], 1) + normalized_impact = result['raw_expected_cpu'] * (result['vcpus'] / current_node_threads) + if result['current_node'] in predicted_node_loads: + predicted_node_loads[result['current_node']] += normalized_impact + + # Phase 2: Analyze future node states and generate migration recommendations + print("[ORACLE] Forecast complete. Analyzing future node capacities...") + for node_uuid, future_load in predicted_node_loads.items(): + source_ip = node_ips.get(node_uuid, node_uuid[:8]) + + if future_load <= threshold: + continue + + print(f"[ORACLE] WARNING: Node {source_ip} predicted to hit {future_load:.1f}% CPU. " + f"Queuing as many moves as needed to bring it below {threshold}%.") + + # Sort lightest-first: each candidate is easy to place, making it more likely + # we find a safe target and can iterate through multiple VMs if needed. + vms_on_node = sorted( + [v for v in vm_forecasts if v['current_node'] == node_uuid], + key=lambda x: x['raw_expected_cpu'] + ) + + for candidate_vm in vms_on_node: + if predicted_node_loads[node_uuid] <= threshold: + break # Node is now predicted safe — stop evicting + + # Re-sort targets each iteration so we always pick the least-loaded one + target_nodes = sorted( + [n for n in nodes if n['uuid'] != node_uuid], + key=lambda n: predicted_node_loads.get(n['uuid'], 100.0) + ) + + placed = False + candidate_name = candidate_vm['name'] + candidate_anti = vm_anti_by_name.get(candidate_name, set()) + + for best_target in target_nodes: + target_ip = node_ips.get(best_target['uuid'], best_target['uuid'][:8]) + target_future_load = predicted_node_loads.get(best_target['uuid'], 100.0) + target_threads = node_threads.get(best_target['uuid'], 1) + target_impact = candidate_vm['raw_expected_cpu'] * (candidate_vm['vcpus'] / target_threads) + + # Anti-affinity: skip target if candidate's anti_X tag matches a VM there, + # or if any VM on the target has anti_ pointing back at us + tgt_names = planned_node_vm_names.get(best_target['uuid'], set()) + tgt_anti = {a for n in tgt_names for a in vm_anti_by_name.get(n, set())} + if candidate_anti & tgt_names: + print(f"[ORACLE] Skipping {target_ip} for {candidate_name}: anti-affinity conflict") + continue + if candidate_name in tgt_anti: + print(f"[ORACLE] Skipping {target_ip} for {candidate_name}: anti-affinity conflict (reverse)") + continue + + if (target_future_load + target_impact) < threshold: + reason = (f"Preventing future overload " + f"({predicted_node_loads[node_uuid]:.1f}%) on source node {source_ip}.") + rec = ProactiveMigration( + vm_uuid=candidate_vm['uuid'], + vm_name=candidate_name, + source_node=node_uuid, + target_node=best_target['uuid'], + reason=reason, + peak_time=candidate_vm['peak_timestamp'], + source_ip=source_ip, + target_ip=target_ip + ) + recommendations.append(rec) + + source_impact = candidate_vm['raw_expected_cpu'] * (candidate_vm['vcpus'] / node_threads.get(node_uuid, 1)) + predicted_node_loads[node_uuid] -= source_impact + predicted_node_loads[best_target['uuid']] += target_impact + # Update planned placement so subsequent iterations respect this move + planned_node_vm_names[node_uuid].discard(candidate_name) + planned_node_vm_names[best_target['uuid']].add(candidate_name) + print(f"[ORACLE] Queuing: {candidate_name} -> {target_ip} " + f"(node load now {predicted_node_loads[node_uuid]:.1f}%)") + placed = True + break + + if not placed: + # If we couldn't place the lightest remaining VM, heavier ones won't fit either + print(f"[ORACLE] No safe target for {candidate_vm['name']} — cluster headroom exhausted. " + f"Node {source_ip} remains at {predicted_node_loads[node_uuid]:.1f}% predicted load. " + f"Reactive engine will handle the rest.") + break + + if predicted_node_loads[node_uuid] > threshold: + print(f"[ORACLE] Node {source_ip}: predicted load still {predicted_node_loads[node_uuid]:.1f}% " + f"after all possible moves.") + + del vm_forecasts + gc.collect() + + return recommendations \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/Balancer/requirements.txt b/specific_task/HyperCoreBalancerv2/Balancer/requirements.txt new file mode 100644 index 0000000..d72f5bf --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Balancer/requirements.txt @@ -0,0 +1,6 @@ +requests +python-dotenv +pandas +prophet +influxdb-client +urllib3 \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/Collector/Dockerfile b/specific_task/HyperCoreBalancerv2/Collector/Dockerfile new file mode 100644 index 0000000..9b54fc9 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Collector/Dockerfile @@ -0,0 +1,7 @@ +FROM python:3.11-slim +WORKDIR /app +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY collector.py . +COPY config_db.py . +CMD ["python", "-u", "collector.py"] \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/Collector/collector.py b/specific_task/HyperCoreBalancerv2/Collector/collector.py new file mode 100644 index 0000000..e469400 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Collector/collector.py @@ -0,0 +1,434 @@ +import os +import time +import requests +import warnings +from urllib3.exceptions import InsecureRequestWarning +from influxdb_client import InfluxDBClient, Point +from influxdb_client.client.write_api import SYNCHRONOUS +import config_db + +# Configurables — stay in .env, require restart to change +SC_HOST = os.getenv('SC_HOST') +SC_USER = os.getenv('SC_USERNAME') +SC_PASS = os.getenv('SC_PASSWORD') +SC_VERIFY_SSL = os.getenv('SC_VERIFY_SSL', 'false').lower() in ('true', '1', 'yes', 'y') +if not SC_VERIFY_SSL: + warnings.simplefilter('ignore', InsecureRequestWarning) +INFLUX_URL = os.getenv('INFLUX_URL') +INFLUX_TOKEN = os.getenv('INFLUX_TOKEN') +INFLUX_ORG = os.getenv('INFLUX_ORG') +INFLUX_BUCKET = os.getenv('INFLUX_BUCKET') + +# Tunables are seeded on startup and re-read from config_db each loop iteration + + +def fetch_data(session, endpoint): + res = session.get(f"{SC_HOST}{endpoint}", timeout=15) + res.raise_for_status() + return res.json() + + +def fetch_safe(session, endpoint, default=None): + """Fetch with graceful fallback — returns default on failure instead of crashing.""" + try: + return fetch_data(session, endpoint) + except Exception as e: + print(f"[{time.strftime('%H:%M:%S')}] Warning: Failed to fetch {endpoint}: {e}") + return default if default is not None else [] + + +def do_login(session): + print(f"[{time.strftime('%H:%M:%S')}] Authenticating with HyperCore API...") + session.cookies.clear() + res = session.post(f"{SC_HOST}/login", json={"username": SC_USER, "password": SC_PASS}, timeout=10) + res.raise_for_status() + + +def do_logout(session): + print(f"[{time.strftime('%H:%M:%S')}] Logging out from HyperCore API...") + try: + session.post(f"{SC_HOST}/logout", timeout=5) + except Exception as e: + print(f"Logout failed or already disconnected: {e}") + + +def write_with_retry(write_api, bucket, points, max_retries=3, retry_delay=5): + """Attempts to write points to InfluxDB with retries on transient failures.""" + for attempt in range(1, max_retries + 1): + try: + write_api.write(bucket=bucket, record=points) + return True + except Exception as e: + if attempt < max_retries: + print(f"[{time.strftime('%H:%M:%S')}] InfluxDB write failed (attempt {attempt}/{max_retries}): {e}. Retrying in {retry_delay}s...") + time.sleep(retry_delay) + else: + print(f"[{time.strftime('%H:%M:%S')}] InfluxDB write failed after {max_retries} attempts: {e}. Dropping {len(points)} points.") + return False + + +def collect_fast(session, nodes, vms, vm_stats, node_info_map): + """Collects high-frequency metrics: node performance, VM performance, drive health, VSD I/O.""" + points = [] + + # ========================================================================= + # NODES — expanded with system memory, capacity, network status + # ========================================================================= + for node in nodes: + node_uuid = node.get('uuid', 'unknown') + peer_id = str(node.get('peerID', 'unknown')) + lan_ip = node.get('lanIP', 'unknown') + node_info_map[node_uuid] = {'peer_id': peer_id, 'lan_ip': lan_ip} + + p = Point("node_metrics") \ + .tag("node_uuid", node_uuid) \ + .tag("node_peer_id", peer_id) \ + .tag("node_lan_ip", lan_ip) \ + .field("cpu_usage", float(node.get('cpuUsage', 0))) \ + .field("total_mem_usage_bytes", int(node.get('totalMemUsageBytes', 0))) \ + .field("system_mem_usage_bytes", int(node.get('systemMemUsageBytes', 0))) \ + .field("mem_usage_percentage", float(node.get('memUsagePercentage', 0))) \ + .field("mem_size", int(node.get('memSize', 0))) \ + .field("capacity_bytes", int(node.get('capacity', 0))) \ + .field("num_threads", int(node.get('numThreads', 0))) \ + .field("num_cores", int(node.get('numCores', 0))) \ + .field("num_sockets", int(node.get('numSockets', 0))) \ + .field("network_online", node.get('networkStatus') == 'ONLINE') \ + .field("virtualization_online", bool(node.get('virtualizationOnline', False))) \ + .field("allow_running_vms", bool(node.get('allowRunningVMs', True))) + points.append(p) + + # --- Physical Drives — expanded --- + for drive in node.get('drives', []): + p_drive = Point("drive_metrics") \ + .tag("node_uuid", node_uuid) \ + .tag("node_peer_id", peer_id) \ + .tag("drive_uuid", drive.get('uuid', 'unknown')) \ + .tag("drive_slot", str(drive.get('slot', 'unknown'))) \ + .tag("drive_type", str(drive.get('type', 'unknown'))) \ + .tag("drive_serial", drive.get('serialNumber', 'unknown')) \ + .field("capacity_bytes", int(drive.get('capacityBytes', 0))) \ + .field("used_bytes", int(drive.get('usedBytes', 0))) \ + .field("error_count", int(drive.get('errorCount', 0))) \ + .field("reallocated_sectors", int(drive.get('reallocatedSectors', 0))) \ + .field("is_healthy", bool(drive.get('isHealthy', False))) + + if drive.get('hasTemperature'): + p_drive.field("temperature", int(drive.get('temperature', 0))) + if drive.get('hasMaxTemperature'): + p_drive.field("max_temperature", int(drive.get('maxTemperature', 0))) + if drive.get('hasTemperatureThreshold'): + p_drive.field("temperature_threshold", int(drive.get('temperatureThreshold', 0))) + + points.append(p_drive) + + # ========================================================================= + # VM INFO MAP — used by stats and block/net device sections + # ========================================================================= + vm_info_map = {} + vsd_info_map = {} + + for vm in vms: + vm_uuid = vm.get('uuid') + current_node_uuid = vm.get('nodeUUID', 'unknown') + node_info = node_info_map.get(current_node_uuid, {'peer_id': 'unknown', 'lan_ip': 'unknown'}) + + raw_tags = vm.get('tags', '') + first_tag = raw_tags.split(',')[0].strip() if raw_tags else '' + + net_devs = vm.get('netDevs', []) + primary_vlan = str(net_devs[0].get('vlan', 0)) if net_devs else '0' + + vm_info_map[vm_uuid] = { + "name": vm.get('name', 'Unknown'), + "node_uuid": current_node_uuid, + "node_peer_id": node_info['peer_id'], + "first_tag": first_tag, + "primary_vlan": primary_vlan, + "state": vm.get('state', 'UNKNOWN'), + "num_vcpu": vm.get('numVCPU', 0), + "mem": vm.get('mem', 0), + "os": vm.get('operatingSystem', ''), + "description": vm.get('description', ''), + "snapshot_count": len(vm.get('snapUUIDs', [])), + } + + for block_dev in vm.get('blockDevs', []): + bd_uuid = block_dev.get('uuid') + bd_name = block_dev.get('name', '') + bd_type = block_dev.get('type', 'unknown') + vsd_info_map[bd_uuid] = { + "name": bd_name if bd_name else bd_type, + "type": bd_type, + "capacity": block_dev.get('capacity', 0), + "allocation": block_dev.get('allocation', 0), + "physical": block_dev.get('physical', 0), + "slot": block_dev.get('slot', -1), + "cache_mode": block_dev.get('cacheMode', 'unknown'), + } + + # ========================================================================= + # VM METRICS — performance (from VirDomainStats) + # ========================================================================= + for stat in vm_stats: + vm_uuid = stat.get('uuid') + vm_info = vm_info_map.get(vm_uuid, { + "name": "Unknown", "node_uuid": "unknown", "node_peer_id": "unknown", + "first_tag": "", "primary_vlan": "0" + }) + + p_vm = Point("vm_metrics") \ + .tag("vm_uuid", vm_uuid) \ + .tag("vm_name", vm_info['name']) \ + .tag("node_uuid", vm_info['node_uuid']) \ + .tag("node_peer_id", vm_info['node_peer_id']) \ + .tag("vm_tag", vm_info['first_tag']) \ + .tag("vm_vlan", vm_info['primary_vlan']) \ + .field("cpu_usage", float(stat.get('cpuUsage', 0))) \ + .field("rx_bit_rate", int(stat.get('rxBitRate', 0))) \ + .field("tx_bit_rate", int(stat.get('txBitRate', 0))) + points.append(p_vm) + + # --- VSD I/O Metrics --- + for vsd in stat.get('vsdStats', []): + vsd_uuid = vsd.get('uuid') + rates = vsd.get('rates', []) + vsd_info = vsd_info_map.get(vsd_uuid, {"name": "unknown", "type": "unknown"}) + + if rates: + r = rates[0] + p_vsd = Point("vsd_metrics") \ + .tag("vm_uuid", vm_uuid) \ + .tag("vm_name", vm_info['name']) \ + .tag("node_uuid", vm_info['node_uuid']) \ + .tag("node_peer_id", vm_info['node_peer_id']) \ + .tag("vsd_uuid", vsd_uuid) \ + .tag("vsd_name", vsd_info['name']) \ + .tag("vsd_type", vsd_info['type']) \ + .field("read_kbps", int(r.get('readKibibytesPerSecond', 0))) \ + .field("write_kbps", int(r.get('writeKibibytesPerSecond', 0))) \ + .field("read_latency_us", int(r.get('meanReadLatencyMicroseconds', 0))) \ + .field("write_latency_us", int(r.get('meanWriteLatencyMicroseconds', 0))) \ + .field("read_iops", int(r.get('millireadsPerSecond', 0)) / 1000) \ + .field("write_iops", int(r.get('milliwritesPerSecond', 0)) / 1000) \ + .field("mean_read_size_bytes", int(r.get('meanReadSizeBytes', 0))) \ + .field("mean_write_size_bytes", int(r.get('meanWriteSizeBytes', 0))) + points.append(p_vsd) + + # ========================================================================= + # VM INVENTORY — state snapshot per cycle (for tracking power state changes, + # resource allocation over time, snapshot counts, etc.) + # ========================================================================= + for vm_uuid, info in vm_info_map.items(): + vsd_info = vsd_info_map # for block device capacity lookup + p_inv = Point("vm_inventory") \ + .tag("vm_uuid", vm_uuid) \ + .tag("vm_name", info['name']) \ + .tag("node_uuid", info['node_uuid']) \ + .tag("node_peer_id", info['node_peer_id']) \ + .tag("vm_tag", info['first_tag']) \ + .tag("vm_vlan", info['primary_vlan']) \ + .tag("vm_state", info.get('state', 'UNKNOWN')) \ + .tag("vm_os", info.get('os', '')) \ + .field("num_vcpu", int(info.get('num_vcpu', 0))) \ + .field("mem_bytes", int(info.get('mem', 0))) \ + .field("snapshot_count", int(info.get('snapshot_count', 0))) + points.append(p_inv) + + # ========================================================================= + # BLOCK DEVICE INVENTORY — capacity and allocation tracking + # ========================================================================= + for bd_uuid, bd_info in vsd_info_map.items(): + p_bd = Point("block_device_metrics") \ + .tag("vsd_uuid", bd_uuid) \ + .tag("vsd_name", bd_info['name']) \ + .tag("vsd_type", bd_info['type']) \ + .tag("cache_mode", bd_info.get('cache_mode', 'unknown')) \ + .field("capacity_bytes", int(bd_info.get('capacity', 0))) \ + .field("allocation_bytes", int(bd_info.get('allocation', 0))) \ + .field("physical_bytes", int(bd_info.get('physical', 0))) \ + .field("slot", int(bd_info.get('slot', -1))) + points.append(p_bd) + + return points, vm_info_map + + +def collect_slow(session, node_info_map): + """Collects low-frequency data: snapshots, replication, cluster info, conditions.""" + points = [] + + # ========================================================================= + # CLUSTER INFO — version tracking, name + # ========================================================================= + cluster = fetch_safe(session, "/Cluster") + for c in cluster: + p = Point("cluster_metrics") \ + .tag("cluster_uuid", c.get('uuid', 'unknown')) \ + .tag("cluster_name", c.get('clusterName', 'unknown')) \ + .field("icos_version", c.get('icosVersion', 'unknown')) + points.append(p) + + # ========================================================================= + # SNAPSHOTS — per-VM snapshot tracking + # ========================================================================= + snapshots = fetch_safe(session, "/VirDomainSnapshot") + snap_by_vm = {} + for snap in snapshots: + dom_uuid = snap.get('domainUUID', 'unknown') + if dom_uuid not in snap_by_vm: + snap_by_vm[dom_uuid] = {'count': 0, 'total_block_diff': 0, 'latest_timestamp': 0} + snap_by_vm[dom_uuid]['count'] += 1 + snap_by_vm[dom_uuid]['total_block_diff'] += snap.get('blockCountDiff', 0) + ts = snap.get('timestamp', 0) + if ts > snap_by_vm[dom_uuid]['latest_timestamp']: + snap_by_vm[dom_uuid]['latest_timestamp'] = ts + + for vm_uuid, snap_info in snap_by_vm.items(): + p = Point("snapshot_metrics") \ + .tag("vm_uuid", vm_uuid) \ + .field("snapshot_count", snap_info['count']) \ + .field("total_block_diff", snap_info['total_block_diff']) \ + .field("latest_snapshot_timestamp", snap_info['latest_timestamp']) + points.append(p) + + # ========================================================================= + # REPLICATION — progress tracking + # ========================================================================= + replications = fetch_safe(session, "/VirDomainReplication") + for rep in replications: + progress = rep.get('progress', {}) + p = Point("replication_metrics") \ + .tag("replication_uuid", rep.get('uuid', 'unknown')) \ + .tag("source_vm_uuid", rep.get('sourceDomainUUID', 'unknown')) \ + .tag("enabled", str(rep.get('enable', False)).lower()) \ + .field("label", rep.get('label', '')) \ + .field("percent_complete", int(progress.get('percentComplete', 0))) \ + .field("block_count_diff", int(progress.get('blockCountDiff', 0))) \ + .field("block_count_sent", int(progress.get('blockCountSent', 0))) + points.append(p) + + # ========================================================================= + # CONDITIONS — cluster health alerts + # ========================================================================= + conditions = fetch_safe(session, "/Condition") + for cond in conditions: + # Only collect user-visible, active conditions to avoid noise + if not cond.get('userVisible', False): + continue + p = Point("condition_metrics") \ + .tag("condition_name", cond.get('name', 'unknown')) \ + .tag("node_uuid", cond.get('nodeUUID', '')) \ + .tag("node_lan_ip", cond.get('nodeLANIP', '')) \ + .tag("severity", cond.get('severity', 'INFO')) \ + .field("value", bool(cond.get('value', False))) \ + .field("is_ok", bool(cond.get('isOK', True))) \ + .field("is_expired", bool(cond.get('isExpired', False))) \ + .field("description", cond.get('description', '')) + points.append(p) + + return points + + +INFLUX_PRIMARY_RETENTION = int(os.getenv('INFLUX_PRIMARY_RETENTION', 365)) + + +def set_bucket_retention(influx_client): + """Sets retention on the primary bucket once at startup via the InfluxDB API.""" + # Wait for InfluxDB to be ready + print(f"[{time.strftime('%H:%M:%S')}] Waiting for InfluxDB to be ready...") + for attempt in range(1, 61): + try: + influx_client.ping() + print(f"[{time.strftime('%H:%M:%S')}] InfluxDB is ready.") + break + except Exception: + if attempt % 10 == 0: + print(f"[{time.strftime('%H:%M:%S')}] Still waiting for InfluxDB... ({attempt}s)") + time.sleep(1) + else: + print(f"[{time.strftime('%H:%M:%S')}] WARNING: InfluxDB not reachable after 60s, skipping retention config.") + return + + retention_seconds = INFLUX_PRIMARY_RETENTION * 86400 + try: + buckets_api = influx_client.buckets_api() + bucket = buckets_api.find_bucket_by_name(INFLUX_BUCKET) + if bucket: + bucket.retention_rules = [{"type": "expire", "everySeconds": retention_seconds}] + buckets_api.update_bucket(bucket=bucket) + print(f"[{time.strftime('%H:%M:%S')}] Bucket '{INFLUX_BUCKET}' retention set to {INFLUX_PRIMARY_RETENTION} days.") + else: + print(f"[{time.strftime('%H:%M:%S')}] WARNING: Bucket '{INFLUX_BUCKET}' not found, skipping retention config.") + except Exception as e: + print(f"[{time.strftime('%H:%M:%S')}] WARNING: Could not set bucket retention: {e}") + + +def main(): + print("Start HyperCore Collector (Extended)...") + + config_db.seed_from_env() + + influx_client = InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) + write_api = influx_client.write_api(write_options=SYNCHRONOUS) + + # Set retention policy on startup + set_bucket_retention(influx_client) + + session = requests.Session() + session.verify = SC_VERIFY_SSL + + try: + do_login(session) + except Exception as e: + print(f"Initial login failed: {e}") + + last_slow_poll = 0 + node_info_map = {} + + try: + while True: + # Re-read tunables each iteration — picks up live dashboard changes + poll_interval = config_db.get('POLL_INTERVAL') or 30 + slow_poll_interval = config_db.get('SLOW_POLL_INTERVAL') or 300 + write_retries = config_db.get('INFLUX_WRITE_RETRIES') or 3 + retry_delay = config_db.get('INFLUX_RETRY_DELAY') or 5 + + try: + # --- Fast poll: performance metrics --- + nodes = fetch_data(session, "/Node") + vms = fetch_data(session, "/VirDomain") + vm_stats = fetch_data(session, "/VirDomainStats") + + fast_points, vm_info_map = collect_fast(session, nodes, vms, vm_stats, node_info_map) + + # --- Slow poll: inventory & health --- + slow_points = [] + if time.time() - last_slow_poll >= slow_poll_interval: + slow_points = collect_slow(session, node_info_map) + last_slow_poll = time.time() + + all_points = fast_points + slow_points + if write_with_retry(write_api, INFLUX_BUCKET, all_points, write_retries, retry_delay): + slow_note = f" + {len(slow_points)} slow" if slow_points else "" + print(f"[{time.strftime('%H:%M:%S')}] Wrote {len(all_points)} datapoints ({len(fast_points)} fast{slow_note}).") + + except requests.exceptions.HTTPError as e: + if e.response is not None and e.response.status_code == 401: + print(f"[{time.strftime('%H:%M:%S')}] Session expired (401). Re-authenticating...") + try: + do_login(session) + except Exception as login_err: + print(f"Re-authentication failed: {login_err}") + else: + print(f"[{time.strftime('%H:%M:%S')}] HTTP Error: {e}") + except Exception as e: + print(f"[{time.strftime('%H:%M:%S')}] Unexpected Error: {e}") + + time.sleep(poll_interval) + + finally: + do_logout(session) + + +if __name__ == "__main__": + main() \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/Collector/config_db.py b/specific_task/HyperCoreBalancerv2/Collector/config_db.py new file mode 100644 index 0000000..7b9072a --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Collector/config_db.py @@ -0,0 +1,214 @@ +""" +config_db.py — Live-tunable configuration store +================================================ +SQLite-backed key/value store for runtime-tunable settings. +The Dashboard seeds and owns the database; Balancer and Collector +poll it each loop iteration for live changes without restarting. + +Shared Docker volume: sc-config → /config (all three containers) +Database file: /config/tunables.db + +On first run, values are seeded from environment variables (falling +back to hardcoded defaults). After that, env vars are ignored — +the database wins. +""" + +import os +import sqlite3 +import threading +from typing import Any + +DB_PATH = os.getenv('SC_CONFIG_DB_PATH', '/config/tunables.db') + +_lock = threading.Lock() + +# Registry: key → (env_var_name, hardcoded_default, type_hint) +TUNABLES = { + # ── Balancer: Reactive ────────────────────────────────────────────────── + 'DRY_RUN': ('SC_DRY_RUN', True, 'bool'), + 'VERBOSITY': ('SC_VERBOSITY', 3, 'int'), + 'AVG_WINDOW_MINUTES': ('SC_AVG_WINDOW_MINUTES', 5, 'int'), + 'SAMPLE_INTERVAL_SECONDS': ('SC_SAMPLE_INTERVAL_SECONDS', 30, 'int'), + 'RAM_LIMIT_PERCENT': ('SC_RAM_LIMIT_PERCENT', 85.0, 'float'), + 'RAM_UPPER_THRESHOLD_PERCENT': ('SC_RAM_UPPER_THRESHOLD_PERCENT', 80.0, 'float'), + 'CPU_UPPER_THRESHOLD_PERCENT': ('SC_CPU_UPPER_THRESHOLD_PERCENT', 50.0, 'float'), + 'MAX_VCPU_RATIO': ('SC_MAX_VCPU_RATIO', 2.0, 'float'), + 'VM_MOVE_COOLDOWN_MINUTES': ('SC_VM_MOVE_COOLDOWN_MINUTES', 30, 'int'), + 'MIGRATION_COOLDOWN_MINUTES': ('SC_MIGRATION_COOLDOWN_MINUTES', 5, 'int'), + 'EXCLUDE_NODE_IPS': ('SC_EXCLUDE_NODE_IPS', '', 'list'), + # ── Balancer: Predictive ──────────────────────────────────────────────── + 'PREDICTIVE_BALANCING_ENABLED': ('SC_PREDICTIVE_BALANCING_ENABLED', False, 'bool'), + 'PREDICTIVE_INTERVAL_SECONDS': ('SC_PREDICTIVE_INTERVAL_SECONDS', 43200, 'int'), + 'PREDICTIVE_LEAD_TIME_HOURS': ('SC_PREDICTIVE_LEAD_TIME_HOURS', 1, 'int'), + 'PREDICTIVE_THRESHOLD': ('SC_PREDICTIVE_THRESHOLD', 80.0, 'float'), + 'PREDICTIVE_MIN_HISTORY_HOURS': ('SC_PREDICTIVE_MIN_HISTORY_HOURS', 336, 'int'), + 'PREDICTIVE_LOOKBACK_DAYS': ('SC_PREDICTIVE_LOOKBACK_DAYS', 90, 'int'), + 'PREDICTIVE_MAX_WORKERS': ('SC_PREDICTIVE_MAX_WORKERS', 8, 'int'), + 'PREDICTIVE_RESERVATION_MINUTES': ('SC_PREDICTIVE_RESERVATION_MINUTES', 5, 'int'), + # ── Collector ─────────────────────────────────────────────────────────── + 'POLL_INTERVAL': ('SC_POLL_INTERVAL', 30, 'int'), + 'SLOW_POLL_INTERVAL': ('SC_SLOW_POLL_INTERVAL', 300, 'int'), + 'INFLUX_WRITE_RETRIES': ('INFLUX_WRITE_RETRIES', 3, 'int'), + 'INFLUX_RETRY_DELAY': ('INFLUX_RETRY_DELAY', 5, 'int'), + # ── Dashboard ─────────────────────────────────────────────────────────── + 'STALE_SECONDS': ('SC_DASHBOARD_STALE_SECONDS', 120, 'int'), +} + + +def _cast(value: str, type_hint: str) -> Any: + if type_hint == 'bool': + return str(value).lower() in ('true', '1', 'yes', 'y') + if type_hint == 'int': + return int(float(value)) # float() first handles "30.0" edge case + if type_hint == 'float': + return float(value) + if type_hint == 'list': + return [ip.strip() for ip in str(value).split(',') if ip.strip()] + return str(value) + + +def _serialize(value: Any, type_hint: str) -> str: + if type_hint == 'list': + return ','.join(str(v) for v in value) if isinstance(value, list) else str(value) + if type_hint == 'bool': + return 'true' if value else 'false' + return str(value) + + +def _connect() -> sqlite3.Connection: + dirpath = os.path.dirname(DB_PATH) + if dirpath: + os.makedirs(dirpath, exist_ok=True) + conn = sqlite3.connect(DB_PATH, check_same_thread=False) + conn.execute(''' + CREATE TABLE IF NOT EXISTS tunables ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + type TEXT NOT NULL, + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + ''') + conn.commit() + return conn + + +def seed_from_env() -> None: + """ + Called once at service startup. + - Missing keys are inserted from env var (or hardcoded default). + - Existing keys are updated from env var only if their stored value still + matches the hardcoded default (i.e. never touched by the user via the UI). + This lets env vars fill in values that were seeded empty on a previous run + before the env var was available, without clobbering user changes. + """ + with _lock: + conn = _connect() + try: + for key, (env_var, default, type_hint) in TUNABLES.items(): + raw_env = os.getenv(env_var) or None # treat empty string same as unset + row = conn.execute( + 'SELECT value FROM tunables WHERE key = ?', (key,) + ).fetchone() + if not row: + value = _cast(raw_env, type_hint) if raw_env is not None else default + conn.execute( + 'INSERT INTO tunables (key, value, type) VALUES (?, ?, ?)', + (key, _serialize(value, type_hint), type_hint) + ) + elif raw_env is not None and row[0] == _serialize(default, type_hint): + # Still at hardcoded default — env var now available, apply it + conn.execute( + 'UPDATE tunables SET value = ?, updated_at = datetime(\'now\') WHERE key = ?', + (_serialize(_cast(raw_env, type_hint), type_hint), key) + ) + conn.commit() + finally: + conn.close() + + +def get(key: str) -> Any: + """Return a single tunable cast to its Python type.""" + with _lock: + conn = _connect() + try: + row = conn.execute( + 'SELECT value, type FROM tunables WHERE key = ?', (key,) + ).fetchone() + finally: + conn.close() + if row is None: + entry = TUNABLES.get(key) + return entry[1] if entry else None + return _cast(row[0], row[1]) + + +def get_all() -> dict: + """Return all tunables as {key: typed_value}.""" + with _lock: + conn = _connect() + try: + rows = conn.execute( + 'SELECT key, value, type FROM tunables' + ).fetchall() + finally: + conn.close() + return {r[0]: _cast(r[1], r[2]) for r in rows} + + +def save(key: str, value: Any) -> None: + """Upsert a single tunable.""" + if key not in TUNABLES: + raise ValueError(f'Unknown tunable: {key}') + _, _, type_hint = TUNABLES[key] + with _lock: + conn = _connect() + try: + conn.execute( + '''INSERT INTO tunables (key, value, type, updated_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at''', + (key, _serialize(value, type_hint), type_hint) + ) + conn.commit() + finally: + conn.close() + + +def save_many(updates: dict) -> None: + """Upsert multiple tunables in one transaction. Unknown keys are silently skipped.""" + with _lock: + conn = _connect() + try: + for key, value in updates.items(): + if key not in TUNABLES: + continue + _, _, type_hint = TUNABLES[key] + conn.execute( + '''INSERT INTO tunables (key, value, type, updated_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at''', + (key, _serialize(value, type_hint), type_hint) + ) + conn.commit() + finally: + conn.close() + + +def get_with_meta() -> list: + """Return all tunables with type and timestamp — for the settings API.""" + with _lock: + conn = _connect() + try: + rows = conn.execute( + 'SELECT key, value, type, updated_at FROM tunables ORDER BY key' + ).fetchall() + finally: + conn.close() + return [ + {'key': r[0], 'value': _cast(r[1], r[2]), 'type': r[2], 'updated_at': r[3]} + for r in rows + ] diff --git a/specific_task/HyperCoreBalancerv2/Collector/requirements.txt b/specific_task/HyperCoreBalancerv2/Collector/requirements.txt new file mode 100644 index 0000000..e6ead60 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/Collector/requirements.txt @@ -0,0 +1,3 @@ +requests +influxdb-client +urllib3 \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/HyperCore_Cluster_Balancer_Manual_v2.docx b/specific_task/HyperCoreBalancerv2/HyperCore_Cluster_Balancer_Manual_v2.docx new file mode 100644 index 0000000..e18755b Binary files /dev/null and b/specific_task/HyperCoreBalancerv2/HyperCore_Cluster_Balancer_Manual_v2.docx differ diff --git a/specific_task/HyperCoreBalancerv2/ISSUES.md b/specific_task/HyperCoreBalancerv2/ISSUES.md new file mode 100644 index 0000000..b333cf7 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/ISSUES.md @@ -0,0 +1,41 @@ +# HyperCore Load Balancer — Issue List + +## HIGH + +_(none currently)_ + +## MEDIUM + +_(none currently)_ + +## LOW + +1. **Credentials use env-var fallback defaults** — should log a startup warning if defaults are in use +2. **SSL verification disabled by default** — should warn loudly if `SC_VERIFY_SSL=False` at startup +3. **Open CORS policy on dashboard** — only exploitable if dashboard is internet-facing +4. **Line 124 bare `except` swallows task status errors silently** — should log before returning `"UNKNOWN"` +5. **No version pinning in requirements.txt** — unpinned deps can break on library updates +6. **Magic numbers scattered through code** — hardcoded multipliers with no named constants +7. **Emoji characters in production logs** — can break log parsers +8. **Inconsistent logging** — custom numeric levels instead of standard `logging` module +9. **No log rotation configured** — Docker logs can grow unbounded +10. **No SLA/success metrics tracked** — migration duration, downtime, and failure rates aren't recorded + +## RESOLVED + +- ~~**No graceful shutdown on SIGTERM**~~ — fixed: `_handle_sigterm` registered via `signal.signal` in `HyperCore_balancer.py` +- ~~**Inefficient 90-day InfluxDB query on every predictive run**~~ — fixed: parallel `ThreadPoolExecutor` per VM, background thread, configurable lookback via `SC_PREDICTIVE_LOOKBACK_DAYS`, `SC_PREDICTIVE_MAX_WORKERS` +- ~~**Active migration state not persisted**~~ — crash risk fixed: both `migrate()` calls wrapped with error handling; failed migrations set VM cooldown to prevent repeated attempts +- ~~**No health checks in docker-compose**~~ — fixed: InfluxDB health check added; all services now wait for `service_healthy` before starting; collector ping loop retained as additional safeguard +- ~~**No resource limits in docker-compose**~~ — fixed: hard limits added to all containers with explanatory comments warning against over-provisioning; balancer note links CPU limit to `SC_PREDICTIVE_MAX_WORKERS` +- ~~**Env var naming inconsistency**~~ — all application-specific vars now use `SC_` prefix across all scripts, docker-compose.yaml, and .env; new vars `SC_PREDICTIVE_LOOKBACK_DAYS`, `SC_PREDICTIVE_MAX_WORKERS`, `SC_DASHBOARD_STALE_SECONDS` added +- ~~**Synchronous InfluxDB writes in collector**~~ — not an issue in practice; data volume is ~72GB/year at 500 VMs; synchronous mode also enables the existing retry logic +- ~~**No authentication on the Dashboard API**~~ — fixed: HTTP Basic Auth via `require_auth` decorator on all routes; credentials set via `SC_DASHBOARD_USER` / `SC_DASHBOARD_PASSWORD`; auth self-disables if vars are unset +- ~~**No connection pooling in dashboard**~~ — not an issue in practice; dashboard is a personal tool used on-demand by a single user +- ~~**Dry-run mutates state**~~ — intentional; prevents the same VM being recommended repeatedly during dry-run testing +- ~~**No timeout on migration task polling**~~ — not an issue; cluster manages task timeouts and returns `ERROR` state +- ~~**Bare `except:` clauses on logout/close**~~ — intentional; logout must be attempted even during shutdown +- ~~**Unsafe dictionary/list access**~~ — false positive; guards are in place throughout +- ~~**Hardcoded credentials**~~ — fallback defaults only; `get_config()` always checks env vars first +- ~~**SSL verification disabled**~~ — moved to low; default is for dev use only +- ~~**Open CORS policy**~~ — moved to low; only relevant if internet-facing diff --git a/specific_task/HyperCoreBalancerv2/README.md b/specific_task/HyperCoreBalancerv2/README.md new file mode 100644 index 0000000..87d7f38 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/README.md @@ -0,0 +1,188 @@ +# HyperCore Cluster Balancer + +**Reactive, Predictive & Affinity-Based Load Balancing for Scale Computing HyperCore** + +The HyperCore Cluster Balancer is a containerized solution that automatically distributes virtual machine workloads across Scale Computing HyperCore cluster nodes. It monitors resource usage in real time, enforces VM placement rules, and — optionally — uses machine learning to predict and prevent future overloads before they happen. + +--- + +## Features + +- **Reactive Balancing** — Continuously monitors node CPU and RAM usage via sliding-window averages. When a node exceeds configurable thresholds, the lightest suitable VM is live-migrated to the least-loaded node. +- **Predictive Balancing (The Oracle)** — Uses [Facebook Prophet](https://facebook.github.io/prophet/) to forecast per-VM CPU load 24 hours ahead. Proactively migrates VMs before predicted spikes arrive. Runs in a background thread with parallel workers so the reactive engine is never blocked. +- **Affinity Enforcement** — Highest-priority engine that enforces VM placement rules via HyperCore tags: + - `node_` — Pin a VM to a specific node (e.g. `node_241` pins to the node whose IP ends in `.241`) + - `anti_` — Prevent two VMs from sharing the same node (e.g. `anti_dc02` on `dc01`) +- **Live Configuration** — All tunable settings are stored in a shared SQLite database. Change any parameter through the dashboard UI at runtime — no container restarts required. +- **Built-in Dashboard** — Web-based UI (port 5000) showing node/VM performance, drive health, VSD I/O, migration event log, and a live settings editor. Fully offline-capable for dark-site deployments. +- **Metrics Collection** — Comprehensive HyperCore telemetry (node performance, VM stats, drive health, snapshots, replication, cluster conditions) stored in InfluxDB with automatic downsampling. +- **Grafana Integration** — Optional advanced dashboarding via Grafana, available on-demand through Docker profiles. + +## Architecture + +``` +┌────────────┐ ┌────────────┐ ┌─────────────┐ +│ Collector │────▶│ InfluxDB │◀────│ Dashboard │ +│ (30s poll)│ │ (metrics) │ │ (port 5000)│ +└────────────┘ └─────┬──────┘ └─────────────┘ + │ + ┌─────▼───────┐ + │ Balancer │──────▶ HyperCore REST API + │ (reactive + │ (live migrations) + │ predictive │ + │ + affinity)│ + └─────────────┘ +``` + +| Container | Role | +|-------------|-------------------------------------------------------------| +| **Collector** | Polls HyperCore API, writes metrics to InfluxDB | +| **Balancer** | Decision engine — affinity, reactive, and predictive modes | +| **InfluxDB** | Time-series storage with 1-year detailed + 5-year long-term retention | +| **Dashboard** | Web UI for monitoring and live configuration | +| **Grafana** | *(optional)* Advanced data exploration | + +## Quick Start + +```bash +git clone HyperCoreBalancer +cd HyperCoreBalancer + +cp .env.template .env +# Edit .env — set SC_HOST, SC_USERNAME, SC_PASSWORD, and change all CHANGE_ME values + +docker compose up --build -d +``` + +The dashboard is available at **http://localhost:5000**. + +> **Note:** On first startup, the balancer needs ~2.5 minutes to fill its averaging buffers before it begins evaluating the cluster. The dashboard will show data after the first collector cycle (~30 seconds). + +### Reactive-Only Mode (No InfluxDB / Dashboard) + +If you only need reactive load balancing without metrics storage or the web UI: + +```bash +# Create a minimal .env with only cluster credentials and reactive settings +# Set SC_PREDICTIVE_BALANCING_ENABLED=false + +docker compose up --build -d balancer +``` + +See the [manual](HyperCore_Cluster_Balancer_Manual_v2.docx) section 1.2.1 for the full minimal `.env` template. + +## Configuration + +All settings are defined in `.env` and seeded into a shared SQLite config database on first startup. After that, settings can be changed live through the dashboard UI. + +### Key Parameters + +| Parameter | Default | Description | +|-----------|---------|-------------| +| `SC_DRY_RUN` | `true` | **Start here.** Logs decisions without executing migrations. | +| `SC_CPU_UPPER_THRESHOLD_PERCENT` | `50.0` | Node CPU % that triggers reactive balancing | +| `SC_RAM_UPPER_THRESHOLD_PERCENT` | `65.0` | Node RAM % that triggers reactive balancing | +| `SC_RAM_LIMIT_PERCENT` | `85.0` | Hard ceiling — never overload a target node beyond this | +| `SC_MAX_VCPU_RATIO` | `2.0` | Max vCPU-to-thread overcommit ratio | +| `SC_PREDICTIVE_BALANCING_ENABLED` | `true` | Enable/disable the Oracle | +| `SC_PREDICTIVE_INTERVAL_SECONDS` | `43200` | How often the Oracle runs (default: 12 hours) | +| `SC_PREDICTIVE_MAX_WORKERS` | `8` | Parallel threads for VM forecasting | +| `SC_EXCLUDE_NODE_IPS` | *(empty)* | Comma-separated node IPs to exclude from balancing | + +See the [full configuration reference](HyperCore_Cluster_Balancer_Manual_v2.docx) in Part 2 of the manual. + +## VM Placement Tags + +Placement rules are defined as tags on VMs in HyperCore — no balancer configuration needed. + +| Tag Format | Effect | Example | +|------------|--------|---------| +| `node_` | Pin VM to the node whose LAN IP ends with `` | `node_241` | +| `anti_` | Never place this VM on the same node as `` | `anti_dc02` | + +## Dark-Site / Air-Gapped Deployment + +```bash +# On an internet-connected machine: +docker compose build +docker save $(docker compose config --images) -o hypercore-balancer-images.tar + +# On the air-gapped machine: +docker load -i hypercore-balancer-images.tar +docker compose up -d +``` + +The entire stack runs fully offline once images are built. + +## Resource Requirements + +| VMs | Min. vCPUs | Recommended RAM | Disk (1yr) | +|------|-----------|-----------------|------------| +| 5 | 2 | 1 GB | 2 GB | +| 50 | 2 | 1 GB | 12 GB | +| 100 | 2 | 2 GB | 23 GB | +| 500 | 4 | 4 GB | 112 GB | + +## Grafana + +Grafana is not started by default. To enable it: + +```bash +docker compose --profile tools up -d grafana +# Available at http://localhost:3000 +``` + +Connect it to InfluxDB using Flux query language at `http://influxdb:8086`. + +## Project Structure + +``` +├── docker-compose.yaml +├── .env.template +├── .gitignore +├── influxdb/ +│ ├── Dockerfile +│ └── init.sh # Creates long-term bucket & downsampling tasks +├── Collector/ +│ ├── Dockerfile +│ ├── requirements.txt +│ ├── config_db.py # Shared live-config module +│ └── collector.py +├── Balancer/ +│ ├── Dockerfile +│ ├── requirements.txt +│ ├── config_db.py +│ ├── HyperCore_balancer.py # Main decision engine +│ └── predictive_engine.py # The Oracle (Prophet forecasting) +└── dashboard/ + ├── Dockerfile + ├── requirements.txt + ├── config_db.py + ├── app.py # Flask API + └── static/ + └── index.html # Single-page dashboard +``` + +## Documentation + +The complete installation, configuration, and operations manual is available as a Word document: + +[HyperCore Cluster Balancer Manual v2.0](HyperCore_Cluster_Balancer_Manual_v2.docx) + +--- + +## Disclaimer + +**USE AT YOUR OWN RISK.** While the HyperCore Cluster Balancer has been carefully developed and tested, it is provided "as is" without warranty of any kind, express or implied. Scale Computing accepts no responsibility or liability for any damage, data loss, downtime, or other consequences resulting from the use of this software. Always run in dry-run mode (`SC_DRY_RUN=true`) for an extended observation period before enabling live migrations. + +## License + +MIT License + +Copyright (c) 2026 Scale Computing, Inc. + +Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the "Software"), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE. diff --git a/specific_task/HyperCoreBalancerv2/dashboard/Dockerfile b/specific_task/HyperCoreBalancerv2/dashboard/Dockerfile new file mode 100644 index 0000000..4c9ba78 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/dashboard/Dockerfile @@ -0,0 +1,26 @@ +FROM python:3.11-slim +WORKDIR /app + +# Install curl for downloading static assets +RUN apt-get update && apt-get install -y curl && rm -rf /var/lib/apt/lists/* + +COPY requirements.txt . +RUN pip install --no-cache-dir -r requirements.txt +COPY app.py . +COPY config_db.py . +COPY static/ static/ + +# Download external dependencies for offline/dark-site support +RUN mkdir -p static/vendor && \ + curl -sL "https://cdnjs.cloudflare.com/ajax/libs/Chart.js/4.4.1/chart.umd.min.js" -o static/vendor/chart.umd.min.js && \ + curl -sL "https://cdn.jsdelivr.net/npm/chartjs-adapter-date-fns@3.0.0/dist/chartjs-adapter-date-fns.bundle.min.js" -o static/vendor/chartjs-adapter-date-fns.bundle.min.js && \ + curl -sL "https://fonts.googleapis.com/css2?family=JetBrains+Mono:wght@300;400;500;600;700&family=Space+Grotesk:wght@300;400;500;600;700&display=swap" -o static/vendor/fonts.css && \ + # Download the actual font files referenced in the CSS + grep -oP 'url\(\K[^)]+' static/vendor/fonts.css | while read url; do \ + FILENAME=$(echo "$url" | sed 's/.*\///;s/?.*//' ); \ + curl -sL "$url" -o "static/vendor/${FILENAME}"; \ + sed -i "s|${url}|/static/vendor/${FILENAME}|g" static/vendor/fonts.css; \ + done + +EXPOSE 5000 +CMD ["gunicorn", "-w", "2", "-b", "0.0.0.0:5000", "app:app"] \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/dashboard/app.py b/specific_task/HyperCoreBalancerv2/dashboard/app.py new file mode 100644 index 0000000..81206cc --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/dashboard/app.py @@ -0,0 +1,444 @@ +""" +HyperCore Dashboard API +======================== +A lightweight Flask API that queries InfluxDB and serves data to the frontend. + +KEY DESIGN: Module Discovery +----------------------------- +Any new container in the stack just needs to write data to InfluxDB with a +distinct measurement name. The /api/modules endpoint auto-discovers all +measurements in the bucket, so the frontend can dynamically list and +display data from new modules without code changes. + +Endpoints: + GET /api/nodes - Current node CPU/RAM metrics (last 1h) + GET /api/drives - Physical drive health & usage, grouped by drive (last 1h) + GET /api/vms - Current VM CPU metrics (last 1h) + GET /api/vsds - Virtual storage device I/O, grouped by VSD (last 1h) + GET /api/migrations - Migration event log (last 7d, configurable) + GET /api/modules - Auto-discovered measurement names in the bucket + GET /api/query/ - Generic query for any measurement + GET / - Serves the dashboard HTML +""" + +import os +import json +import re +from functools import wraps +from datetime import datetime, timezone, timedelta +from flask import Flask, jsonify, request, send_from_directory, Response +from flask_cors import CORS +from influxdb_client import InfluxDBClient +import config_db + +app = Flask(__name__, static_folder='static') +CORS(app) + +# Seed tunables DB from env vars on first run (no-op if already seeded) +config_db.seed_from_env() + +# InfluxDB Configuration (configurables — stay in .env, require restart to change) +INFLUX_URL = os.getenv('INFLUX_URL', 'http://influxdb:8086') +INFLUX_TOKEN = os.getenv('INFLUX_TOKEN', 'super-secret-auth-token') +INFLUX_ORG = os.getenv('INFLUX_ORG', 'hypercore') +INFLUX_BUCKET = os.getenv('INFLUX_BUCKET', 'metrics') + +# Fallback stale threshold — live value is read from config_db per request +STALE_SECONDS = int(os.getenv('SC_DASHBOARD_STALE_SECONDS', 120)) + +# Basic auth credentials — if either is unset, auth is disabled (dev mode) +DASHBOARD_USER = os.getenv('SC_DASHBOARD_USER', '') +DASHBOARD_PASSWORD = os.getenv('SC_DASHBOARD_PASSWORD', '') + + +def require_auth(f): + @wraps(f) + def decorated(*args, **kwargs): + if not DASHBOARD_USER or not DASHBOARD_PASSWORD: + return f(*args, **kwargs) + auth = request.authorization + if not auth or auth.username != DASHBOARD_USER or auth.password != DASHBOARD_PASSWORD: + return Response( + 'Authentication required.', + 401, + {'WWW-Authenticate': 'Basic realm="HyperCore Dashboard"'} + ) + return f(*args, **kwargs) + return decorated + + +def get_influx_client(): + return InfluxDBClient(url=INFLUX_URL, token=INFLUX_TOKEN, org=INFLUX_ORG) + + +def is_stale(history_list): + """Returns True if the most recent data point is older than STALE_SECONDS.""" + if not history_list: + return True + stale_secs = config_db.get('STALE_SECONDS') or STALE_SECONDS + latest_time = max(h['time'] for h in history_list) + latest_dt = datetime.fromisoformat(latest_time.replace('Z', '+00:00')) + return (datetime.now(timezone.utc) - latest_dt).total_seconds() > stale_secs + + +# ============================================================================= +# API: Node Metrics (last 1 hour, 1-minute windows) +# ============================================================================= +@app.route('/api/nodes') +@require_auth +def get_nodes(): + range_hours = request.args.get('hours', '1') + client = get_influx_client() + try: + query = f''' + from(bucket: "{INFLUX_BUCKET}") + |> range(start: -{range_hours}h) + |> filter(fn: (r) => r["_measurement"] == "node_metrics") + |> filter(fn: (r) => r["_field"] != "network_online" and r["_field"] != "virtualization_online" and r["_field"] != "allow_running_vms") + |> aggregateWindow(every: 1m, fn: mean, createEmpty: false) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + ''' + result = client.query_api().query(org=INFLUX_ORG, query=query) + + nodes = {} + for table in result: + for record in table.records: + node_uuid = record.values.get('node_uuid', 'unknown') + peer_id = record.values.get('node_peer_id', 'unknown') + + if node_uuid not in nodes: + nodes[node_uuid] = { + 'uuid': node_uuid, + 'peer_id': peer_id, + 'history': [] + } + + nodes[node_uuid]['history'].append({ + 'time': record.get_time().isoformat(), + 'cpu_usage': record.values.get('cpu_usage'), + 'mem_usage_bytes': record.values.get('total_mem_usage_bytes') + }) + + # Filter out nodes that haven't reported recently + active = [n for n in nodes.values() if not is_stale(n['history'])] + return jsonify(active) + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + client.close() + + +# ============================================================================= +# API: VM Metrics (last 1 hour, 1-minute windows) +# ============================================================================= +@app.route('/api/vms') +@require_auth +def get_vms(): + range_hours = request.args.get('hours', '1') + client = get_influx_client() + try: + query = f''' + from(bucket: "{INFLUX_BUCKET}") + |> range(start: -{range_hours}h) + |> filter(fn: (r) => r["_measurement"] == "vm_metrics") + |> aggregateWindow(every: 1m, fn: mean, createEmpty: false) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + ''' + result = client.query_api().query(org=INFLUX_ORG, query=query) + + vms = {} + for table in result: + for record in table.records: + vm_uuid = record.values.get('vm_uuid', 'unknown') + vm_name = record.values.get('vm_name', 'Unknown') + node_uuid = record.values.get('node_uuid', 'unknown') + + if vm_uuid not in vms: + vms[vm_uuid] = { + 'uuid': vm_uuid, + 'name': vm_name, + 'node_uuid': node_uuid, + 'node_peer_id': record.values.get('node_peer_id', 'unknown'), + 'tag': record.values.get('vm_tag', ''), + 'vlan': record.values.get('vm_vlan', '0'), + 'history': [] + } + + vms[vm_uuid]['history'].append({ + 'time': record.get_time().isoformat(), + 'cpu_usage': record.values.get('cpu_usage'), + 'rx_bit_rate': record.values.get('rx_bit_rate'), + 'tx_bit_rate': record.values.get('tx_bit_rate') + }) + + # Filter out VMs that haven't reported recently (deleted/stopped) + active = [v for v in vms.values() if not is_stale(v['history'])] + return jsonify(active) + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + client.close() + + +# ============================================================================= +# API: Drive Metrics (physical drives, nested under nodes) +# ============================================================================= +@app.route('/api/drives') +@require_auth +def get_drives(): + range_hours = request.args.get('hours', '1') + client = get_influx_client() + try: + query = f''' + from(bucket: "{INFLUX_BUCKET}") + |> range(start: -{range_hours}h) + |> filter(fn: (r) => r["_measurement"] == "drive_metrics") + |> aggregateWindow(every: 1m, fn: last, createEmpty: false) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + ''' + result = client.query_api().query(org=INFLUX_ORG, query=query) + + drives = {} + for table in result: + for record in table.records: + drive_uuid = record.values.get('drive_uuid', 'unknown') + + if drive_uuid not in drives: + drives[drive_uuid] = { + 'uuid': drive_uuid, + 'node_uuid': record.values.get('node_uuid', 'unknown'), + 'node_peer_id': record.values.get('node_peer_id', 'unknown'), + 'slot': record.values.get('drive_slot', '?'), + 'history': [] + } + + entry = { + 'time': record.get_time().isoformat(), + 'used_bytes': record.values.get('used_bytes'), + 'error_count': record.values.get('error_count'), + 'is_healthy': record.values.get('is_healthy'), + } + temp = record.values.get('temperature') + if temp is not None: + entry['temperature'] = temp + drives[drive_uuid]['history'].append(entry) + + # Filter out drives that haven't reported recently + active = [d for d in drives.values() if not is_stale(d['history'])] + return jsonify(active) + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + client.close() + + +# ============================================================================= +# API: VSD Metrics (virtual storage devices, nested under VMs) +# ============================================================================= +@app.route('/api/vsds') +@require_auth +def get_vsds(): + range_hours = request.args.get('hours', '1') + client = get_influx_client() + try: + query = f''' + from(bucket: "{INFLUX_BUCKET}") + |> range(start: -{range_hours}h) + |> filter(fn: (r) => r["_measurement"] == "vsd_metrics") + |> aggregateWindow(every: 1m, fn: mean, createEmpty: false) + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + ''' + result = client.query_api().query(org=INFLUX_ORG, query=query) + + vsds = {} + for table in result: + for record in table.records: + vsd_uuid = record.values.get('vsd_uuid', 'unknown') + + if vsd_uuid not in vsds: + vsds[vsd_uuid] = { + 'uuid': vsd_uuid, + 'name': record.values.get('vsd_name', 'unknown'), + 'type': record.values.get('vsd_type', 'unknown'), + 'vm_uuid': record.values.get('vm_uuid', 'unknown'), + 'vm_name': record.values.get('vm_name', 'Unknown'), + 'node_uuid': record.values.get('node_uuid', 'unknown'), + 'history': [] + } + + vsds[vsd_uuid]['history'].append({ + 'time': record.get_time().isoformat(), + 'read_kbps': record.values.get('read_kbps'), + 'write_kbps': record.values.get('write_kbps'), + 'read_latency_us': record.values.get('read_latency_us'), + 'write_latency_us': record.values.get('write_latency_us') + }) + + # Filter out VSDs that haven't reported recently + active = [v for v in vsds.values() if not is_stale(v['history'])] + return jsonify(active) + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + client.close() + + +# ============================================================================= +# API: Migration Events (last 7 days by default) +# ============================================================================= +@app.route('/api/migrations') +@require_auth +def get_migrations(): + range_days = request.args.get('days', '7') + client = get_influx_client() + try: + query = f''' + from(bucket: "{INFLUX_BUCKET}") + |> range(start: -{range_days}d) + |> filter(fn: (r) => r["_measurement"] == "migration_events") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + ''' + result = client.query_api().query(org=INFLUX_ORG, query=query) + + events = [] + for table in result: + for record in table.records: + events.append({ + 'time': record.get_time().isoformat(), + 'vm_name': record.values.get('vm_name', 'Unknown'), + 'vm_uuid': record.values.get('vm_uuid', ''), + 'source_ip': record.values.get('source_ip', ''), + 'target_ip': record.values.get('target_ip', ''), + 'engine': record.values.get('engine', ''), + 'mode': record.values.get('mode', ''), + 'dry_run': record.values.get('dry_run', 'true'), + 'reason': record.values.get('reason', '') + }) + + events.sort(key=lambda e: e['time'], reverse=True) + return jsonify(events[:50]) + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + client.close() + + +# ============================================================================= +# API: Module Discovery +# ============================================================================= +# This is the key endpoint for extensibility. It queries InfluxDB for all +# measurement names in the bucket. When you add a new container that writes +# to a new measurement (e.g. "backup_events", "snapshot_metrics"), it shows +# up here automatically — no frontend code changes needed. +# ============================================================================= +@app.route('/api/modules') +@require_auth +def get_modules(): + client = get_influx_client() + try: + query = f''' + import "influxdata/influxdb/schema" + schema.measurements(bucket: "{INFLUX_BUCKET}") + ''' + result = client.query_api().query(org=INFLUX_ORG, query=query) + + measurements = [] + for table in result: + for record in table.records: + measurements.append(record.get_value()) + + return jsonify({ + 'measurements': sorted(measurements), + 'description': 'All data sources writing to InfluxDB. Add a new container that writes to a new measurement and it appears here automatically.' + }) + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + client.close() + + +# ============================================================================= +# API: Generic Measurement Query +# ============================================================================= +# This lets the frontend query ANY measurement dynamically. When a new module +# appears in /api/modules, the frontend can immediately query its data without +# any backend changes. +# ============================================================================= +@app.route('/api/query/') +@require_auth +def query_measurement(measurement): + try: + range_hours = max(1, min(int(request.args.get('hours', '1')), 8760)) + limit = max(1, min(int(request.args.get('limit', '100')), 10000)) + except (ValueError, TypeError): + return jsonify({'error': 'Invalid hours or limit parameter'}), 400 + client = get_influx_client() + try: + # Sanitize measurement name (alphanumeric + underscores only) + if not re.match(r'^[a-zA-Z0-9_]+$', measurement): + return jsonify({'error': 'Invalid measurement name'}), 400 + + query = f''' + from(bucket: "{INFLUX_BUCKET}") + |> range(start: -{range_hours}h) + |> filter(fn: (r) => r["_measurement"] == "{measurement}") + |> pivot(rowKey: ["_time"], columnKey: ["_field"], valueColumn: "_value") + |> sort(columns: ["_time"], desc: true) + |> limit(n: {limit}) + ''' + result = client.query_api().query(org=INFLUX_ORG, query=query) + + records = [] + for table in result: + for record in table.records: + row = {'time': record.get_time().isoformat()} + for key, value in record.values.items(): + if key not in ('result', 'table', '_start', '_stop', '_time', '_measurement'): + row[key] = value + records.append(row) + + return jsonify(records) + except Exception as e: + return jsonify({'error': str(e)}), 500 + finally: + client.close() + + +# ============================================================================= +# API: Tunables Config (live read/write) +# ============================================================================= +@app.route('/api/config', methods=['GET']) +@require_auth +def get_config(): + """Return all tunables with metadata for the settings UI.""" + try: + return jsonify(config_db.get_with_meta()) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +@app.route('/api/config', methods=['POST']) +@require_auth +def post_config(): + """Update one or more tunables. Accepts {key: value, ...} JSON body.""" + try: + updates = request.get_json(force=True) + if not isinstance(updates, dict): + return jsonify({'error': 'Expected a JSON object'}), 400 + config_db.save_many(updates) + return jsonify({'ok': True}) + except Exception as e: + return jsonify({'error': str(e)}), 500 + + +# ============================================================================= +# Serve the frontend +# ============================================================================= +@app.route('/') +@require_auth +def index(): + return send_from_directory('static', 'index.html') + + +if __name__ == '__main__': + app.run(host='0.0.0.0', port=5000, debug=False) \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/dashboard/config_db.py b/specific_task/HyperCoreBalancerv2/dashboard/config_db.py new file mode 100644 index 0000000..7b9072a --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/dashboard/config_db.py @@ -0,0 +1,214 @@ +""" +config_db.py — Live-tunable configuration store +================================================ +SQLite-backed key/value store for runtime-tunable settings. +The Dashboard seeds and owns the database; Balancer and Collector +poll it each loop iteration for live changes without restarting. + +Shared Docker volume: sc-config → /config (all three containers) +Database file: /config/tunables.db + +On first run, values are seeded from environment variables (falling +back to hardcoded defaults). After that, env vars are ignored — +the database wins. +""" + +import os +import sqlite3 +import threading +from typing import Any + +DB_PATH = os.getenv('SC_CONFIG_DB_PATH', '/config/tunables.db') + +_lock = threading.Lock() + +# Registry: key → (env_var_name, hardcoded_default, type_hint) +TUNABLES = { + # ── Balancer: Reactive ────────────────────────────────────────────────── + 'DRY_RUN': ('SC_DRY_RUN', True, 'bool'), + 'VERBOSITY': ('SC_VERBOSITY', 3, 'int'), + 'AVG_WINDOW_MINUTES': ('SC_AVG_WINDOW_MINUTES', 5, 'int'), + 'SAMPLE_INTERVAL_SECONDS': ('SC_SAMPLE_INTERVAL_SECONDS', 30, 'int'), + 'RAM_LIMIT_PERCENT': ('SC_RAM_LIMIT_PERCENT', 85.0, 'float'), + 'RAM_UPPER_THRESHOLD_PERCENT': ('SC_RAM_UPPER_THRESHOLD_PERCENT', 80.0, 'float'), + 'CPU_UPPER_THRESHOLD_PERCENT': ('SC_CPU_UPPER_THRESHOLD_PERCENT', 50.0, 'float'), + 'MAX_VCPU_RATIO': ('SC_MAX_VCPU_RATIO', 2.0, 'float'), + 'VM_MOVE_COOLDOWN_MINUTES': ('SC_VM_MOVE_COOLDOWN_MINUTES', 30, 'int'), + 'MIGRATION_COOLDOWN_MINUTES': ('SC_MIGRATION_COOLDOWN_MINUTES', 5, 'int'), + 'EXCLUDE_NODE_IPS': ('SC_EXCLUDE_NODE_IPS', '', 'list'), + # ── Balancer: Predictive ──────────────────────────────────────────────── + 'PREDICTIVE_BALANCING_ENABLED': ('SC_PREDICTIVE_BALANCING_ENABLED', False, 'bool'), + 'PREDICTIVE_INTERVAL_SECONDS': ('SC_PREDICTIVE_INTERVAL_SECONDS', 43200, 'int'), + 'PREDICTIVE_LEAD_TIME_HOURS': ('SC_PREDICTIVE_LEAD_TIME_HOURS', 1, 'int'), + 'PREDICTIVE_THRESHOLD': ('SC_PREDICTIVE_THRESHOLD', 80.0, 'float'), + 'PREDICTIVE_MIN_HISTORY_HOURS': ('SC_PREDICTIVE_MIN_HISTORY_HOURS', 336, 'int'), + 'PREDICTIVE_LOOKBACK_DAYS': ('SC_PREDICTIVE_LOOKBACK_DAYS', 90, 'int'), + 'PREDICTIVE_MAX_WORKERS': ('SC_PREDICTIVE_MAX_WORKERS', 8, 'int'), + 'PREDICTIVE_RESERVATION_MINUTES': ('SC_PREDICTIVE_RESERVATION_MINUTES', 5, 'int'), + # ── Collector ─────────────────────────────────────────────────────────── + 'POLL_INTERVAL': ('SC_POLL_INTERVAL', 30, 'int'), + 'SLOW_POLL_INTERVAL': ('SC_SLOW_POLL_INTERVAL', 300, 'int'), + 'INFLUX_WRITE_RETRIES': ('INFLUX_WRITE_RETRIES', 3, 'int'), + 'INFLUX_RETRY_DELAY': ('INFLUX_RETRY_DELAY', 5, 'int'), + # ── Dashboard ─────────────────────────────────────────────────────────── + 'STALE_SECONDS': ('SC_DASHBOARD_STALE_SECONDS', 120, 'int'), +} + + +def _cast(value: str, type_hint: str) -> Any: + if type_hint == 'bool': + return str(value).lower() in ('true', '1', 'yes', 'y') + if type_hint == 'int': + return int(float(value)) # float() first handles "30.0" edge case + if type_hint == 'float': + return float(value) + if type_hint == 'list': + return [ip.strip() for ip in str(value).split(',') if ip.strip()] + return str(value) + + +def _serialize(value: Any, type_hint: str) -> str: + if type_hint == 'list': + return ','.join(str(v) for v in value) if isinstance(value, list) else str(value) + if type_hint == 'bool': + return 'true' if value else 'false' + return str(value) + + +def _connect() -> sqlite3.Connection: + dirpath = os.path.dirname(DB_PATH) + if dirpath: + os.makedirs(dirpath, exist_ok=True) + conn = sqlite3.connect(DB_PATH, check_same_thread=False) + conn.execute(''' + CREATE TABLE IF NOT EXISTS tunables ( + key TEXT PRIMARY KEY, + value TEXT NOT NULL, + type TEXT NOT NULL, + updated_at TEXT NOT NULL DEFAULT (datetime('now')) + ) + ''') + conn.commit() + return conn + + +def seed_from_env() -> None: + """ + Called once at service startup. + - Missing keys are inserted from env var (or hardcoded default). + - Existing keys are updated from env var only if their stored value still + matches the hardcoded default (i.e. never touched by the user via the UI). + This lets env vars fill in values that were seeded empty on a previous run + before the env var was available, without clobbering user changes. + """ + with _lock: + conn = _connect() + try: + for key, (env_var, default, type_hint) in TUNABLES.items(): + raw_env = os.getenv(env_var) or None # treat empty string same as unset + row = conn.execute( + 'SELECT value FROM tunables WHERE key = ?', (key,) + ).fetchone() + if not row: + value = _cast(raw_env, type_hint) if raw_env is not None else default + conn.execute( + 'INSERT INTO tunables (key, value, type) VALUES (?, ?, ?)', + (key, _serialize(value, type_hint), type_hint) + ) + elif raw_env is not None and row[0] == _serialize(default, type_hint): + # Still at hardcoded default — env var now available, apply it + conn.execute( + 'UPDATE tunables SET value = ?, updated_at = datetime(\'now\') WHERE key = ?', + (_serialize(_cast(raw_env, type_hint), type_hint), key) + ) + conn.commit() + finally: + conn.close() + + +def get(key: str) -> Any: + """Return a single tunable cast to its Python type.""" + with _lock: + conn = _connect() + try: + row = conn.execute( + 'SELECT value, type FROM tunables WHERE key = ?', (key,) + ).fetchone() + finally: + conn.close() + if row is None: + entry = TUNABLES.get(key) + return entry[1] if entry else None + return _cast(row[0], row[1]) + + +def get_all() -> dict: + """Return all tunables as {key: typed_value}.""" + with _lock: + conn = _connect() + try: + rows = conn.execute( + 'SELECT key, value, type FROM tunables' + ).fetchall() + finally: + conn.close() + return {r[0]: _cast(r[1], r[2]) for r in rows} + + +def save(key: str, value: Any) -> None: + """Upsert a single tunable.""" + if key not in TUNABLES: + raise ValueError(f'Unknown tunable: {key}') + _, _, type_hint = TUNABLES[key] + with _lock: + conn = _connect() + try: + conn.execute( + '''INSERT INTO tunables (key, value, type, updated_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at''', + (key, _serialize(value, type_hint), type_hint) + ) + conn.commit() + finally: + conn.close() + + +def save_many(updates: dict) -> None: + """Upsert multiple tunables in one transaction. Unknown keys are silently skipped.""" + with _lock: + conn = _connect() + try: + for key, value in updates.items(): + if key not in TUNABLES: + continue + _, _, type_hint = TUNABLES[key] + conn.execute( + '''INSERT INTO tunables (key, value, type, updated_at) + VALUES (?, ?, ?, datetime('now')) + ON CONFLICT(key) DO UPDATE SET + value = excluded.value, + updated_at = excluded.updated_at''', + (key, _serialize(value, type_hint), type_hint) + ) + conn.commit() + finally: + conn.close() + + +def get_with_meta() -> list: + """Return all tunables with type and timestamp — for the settings API.""" + with _lock: + conn = _connect() + try: + rows = conn.execute( + 'SELECT key, value, type, updated_at FROM tunables ORDER BY key' + ).fetchall() + finally: + conn.close() + return [ + {'key': r[0], 'value': _cast(r[1], r[2]), 'type': r[2], 'updated_at': r[3]} + for r in rows + ] diff --git a/specific_task/HyperCoreBalancerv2/dashboard/requirements.txt b/specific_task/HyperCoreBalancerv2/dashboard/requirements.txt new file mode 100644 index 0000000..9595d89 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/dashboard/requirements.txt @@ -0,0 +1,4 @@ +flask +flask-cors +influxdb-client +gunicorn \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/dashboard/static/index.html b/specific_task/HyperCoreBalancerv2/dashboard/static/index.html new file mode 100644 index 0000000..93830ce --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/dashboard/static/index.html @@ -0,0 +1,1413 @@ + + + + + +HyperCore Cluster Dashboard + + + + + + +
+
+ +

HyperCore Dashboard

+
+
+
+
+ Connecting... +
+ +
+
+ + + +
+ +
+
+
Node CPU Usage Over Time
+
+
+
+
Loading node metrics...
+
+
+ + +
+
+
+
VM CPU Usage (top 10)
+
+
+
+
VSD Latency (top 10 by write latency)
+
+
+
+
+ Group by: + + + + +
+
+
Loading VM metrics...
+
+
+ + +
+
+
Migration Event Log
+
+
Loading migration events...
+
+
+
+ + +
+
+
Loading settings...
+
+
+
+ + +
+
+
+ +

+
+
+ +

+
+
+ +

+
+
+ + +
+
+
Tools
+
+
📊
+
+
Grafana
+
Advanced data exploration and custom dashboards
+
Checking...
+
+ Open Grafana +
+
+ Grafana is not started by default. To launch it, run: + docker compose --profile tools up -d grafana +
+
+
+
Discovered Data Modules
+

+ Every container that writes to InfluxDB appears here automatically. +

+
+
Discovering modules...
+
+
+
+
+ + + + \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/docker-compose.yaml b/specific_task/HyperCoreBalancerv2/docker-compose.yaml new file mode 100644 index 0000000..f534ddf --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/docker-compose.yaml @@ -0,0 +1,192 @@ +version: '3.8' + +# ============================================================================= +# RESOURCE LIMITS — READ BEFORE CHANGING +# ============================================================================= +# Each service has a 'deploy.resources.limits' block defining its maximum CPU +# and memory allocation. These are hard limits enforced by the kernel via +# cgroups — a container cannot exceed them regardless of host availability. +# +# IMPORTANT: The sum of all CPU limits must not exceed the number of physical +# CPU cores available to Docker on the host. Exceeding this causes containers +# to compete for the same cores, increasing latency across the entire stack. +# +# Tuned for a 2-CPU host. Total limits: 2.75 CPU / 2.1 GB RAM +# influxdb: 0.75 CPU 768MB +# balancer: 1.50 CPU 1024MB <- dominant consumer during predictive runs +# collector: 0.25 CPU 128MB +# dashboard: 0.25 CPU 256MB +# grafana: 0.25 CPU 128MB (optional, tools profile only) +# +# Total limits intentionally exceed 2.0 cores because the Oracle (balancer spike) +# and InfluxDB/dashboard/collector never hit their ceilings simultaneously. +# Outside Oracle runs the entire stack uses well under 0.5 cores combined. +# +# The balancer's CPU limit should match or exceed SC_PREDICTIVE_MAX_WORKERS +# in your .env file. If you reduce the CPU limit below the worker count, +# Prophet workers will compete for cores and predictive runs will take longer. +# ============================================================================= + +services: + influxdb: + build: ./influxdb + container_name: sc_influxdb + ports: + - "8086:8086" + environment: + - DOCKER_INFLUXDB_INIT_MODE=setup + - DOCKER_INFLUXDB_INIT_USERNAME=${DOCKER_INFLUXDB_INIT_USERNAME} + - DOCKER_INFLUXDB_INIT_PASSWORD=${DOCKER_INFLUXDB_INIT_PASSWORD} + - DOCKER_INFLUXDB_INIT_ORG=${INFLUX_ORG} + - DOCKER_INFLUXDB_INIT_BUCKET=${INFLUX_BUCKET} + - DOCKER_INFLUXDB_INIT_ADMIN_TOKEN=${INFLUX_TOKEN} + - INFLUX_LONGTERM_BUCKET=${INFLUX_LONGTERM_BUCKET} + - INFLUX_PRIMARY_RETENTION=${INFLUX_PRIMARY_RETENTION} + - INFLUX_LONGTERM_RETENTION=${INFLUX_LONGTERM_RETENTION} + volumes: + - influxdb-data:/var/lib/influxdb2 + healthcheck: + test: ["CMD", "curl", "-f", "http://localhost:8086/health"] + interval: 10s + timeout: 5s + retries: 10 + start_period: 30s + deploy: + resources: + limits: + cpus: "0.75" + memory: 768M + restart: unless-stopped + + grafana: + image: grafana/grafana:latest + container_name: sc_grafana + profiles: + - tools + ports: + - "3000:3000" + environment: + - GF_SECURITY_ADMIN_PASSWORD=${GF_SECURITY_ADMIN_PASSWORD} + depends_on: + influxdb: + condition: service_healthy + volumes: + - grafana-data:/var/lib/grafana + deploy: + resources: + limits: + cpus: "0.25" + memory: 128M + restart: unless-stopped + + collector: + build: ./Collector + container_name: sc_collector + volumes: + - sc-config:/config + environment: + # HyperCore Credentials + - SC_HOST=${SC_HOST} + - SC_USERNAME=${SC_USERNAME} + - SC_PASSWORD=${SC_PASSWORD} + - SC_VERIFY_SSL=${SC_VERIFY_SSL} + # InfluxDB Connection + - INFLUX_URL=http://influxdb:8086 + - INFLUX_TOKEN=${INFLUX_TOKEN} + - INFLUX_ORG=${INFLUX_ORG} + - INFLUX_BUCKET=${INFLUX_BUCKET} + - SC_POLL_INTERVAL=${SC_POLL_INTERVAL} + - SC_SLOW_POLL_INTERVAL=${SC_SLOW_POLL_INTERVAL} + - INFLUX_PRIMARY_RETENTION=${INFLUX_PRIMARY_RETENTION} + depends_on: + influxdb: + condition: service_healthy + deploy: + resources: + limits: + cpus: "0.25" + memory: 128M + restart: unless-stopped + + balancer: + build: ./Balancer + container_name: sc_balancer + volumes: + - sc-config:/config + environment: + # HyperCore Credentials + - SC_HOST=${SC_HOST} + - SC_USERNAME=${SC_USERNAME} + - SC_PASSWORD=${SC_PASSWORD} + - SC_VERIFY_SSL=${SC_VERIFY_SSL} + + # Master Toggles + - SC_DRY_RUN=${SC_DRY_RUN} + - SC_VERBOSITY=${SC_VERBOSITY} + + # --- REACTIVE ENGINE TUNING --- + - SC_AVG_WINDOW_MINUTES=${SC_AVG_WINDOW_MINUTES} + - SC_SAMPLE_INTERVAL_SECONDS=${SC_SAMPLE_INTERVAL_SECONDS} + - SC_RAM_LIMIT_PERCENT=${SC_RAM_LIMIT_PERCENT} + - SC_RAM_UPPER_THRESHOLD_PERCENT=${SC_RAM_UPPER_THRESHOLD_PERCENT} + - SC_CPU_UPPER_THRESHOLD_PERCENT=${SC_CPU_UPPER_THRESHOLD_PERCENT} + - SC_MAX_VCPU_RATIO=${SC_MAX_VCPU_RATIO} + - SC_VM_MOVE_COOLDOWN_MINUTES=${SC_VM_MOVE_COOLDOWN_MINUTES} + - SC_MIGRATION_COOLDOWN_MINUTES=${SC_MIGRATION_COOLDOWN_MINUTES} + - SC_EXCLUDE_NODE_IPS=${SC_EXCLUDE_NODE_IPS} + + # --- PREDICTIVE ENGINE TUNING (The Oracle) --- + - SC_PREDICTIVE_BALANCING_ENABLED=${SC_PREDICTIVE_BALANCING_ENABLED} + - SC_PREDICTIVE_INTERVAL_SECONDS=${SC_PREDICTIVE_INTERVAL_SECONDS} + - SC_PREDICTIVE_THRESHOLD=${SC_PREDICTIVE_THRESHOLD} + - SC_PREDICTIVE_MIN_HISTORY_HOURS=${SC_PREDICTIVE_MIN_HISTORY_HOURS} + - SC_PREDICTIVE_LEAD_TIME_HOURS=${SC_PREDICTIVE_LEAD_TIME_HOURS} + - SC_PREDICTIVE_LOOKBACK_DAYS=${SC_PREDICTIVE_LOOKBACK_DAYS} + - SC_PREDICTIVE_MAX_WORKERS=${SC_PREDICTIVE_MAX_WORKERS} + - SC_PREDICTIVE_RESERVATION_MINUTES=${SC_PREDICTIVE_RESERVATION_MINUTES} + + # Oracle InfluxDB Connection (Talks internally to the docker network) + - INFLUX_URL=http://influxdb:8086 + - INFLUX_TOKEN=${INFLUX_TOKEN} + - INFLUX_ORG=${INFLUX_ORG} + - INFLUX_BUCKET=${INFLUX_BUCKET} + depends_on: + influxdb: + condition: service_healthy + deploy: + resources: + limits: + cpus: "1.5" + memory: 1024M + restart: unless-stopped + + dashboard: + build: ./dashboard + container_name: sc_dashboard + ports: + - "5000:5000" + volumes: + - sc-config:/config + environment: + - INFLUX_URL=http://influxdb:8086 + - INFLUX_TOKEN=${INFLUX_TOKEN} + - INFLUX_ORG=${INFLUX_ORG} + - INFLUX_BUCKET=${INFLUX_BUCKET} + - SC_DASHBOARD_STALE_SECONDS=${SC_DASHBOARD_STALE_SECONDS} + - SC_DASHBOARD_USER=${SC_DASHBOARD_USER} + - SC_DASHBOARD_PASSWORD=${SC_DASHBOARD_PASSWORD} + - SC_EXCLUDE_NODE_IPS=${SC_EXCLUDE_NODE_IPS} + depends_on: + influxdb: + condition: service_healthy + deploy: + resources: + limits: + cpus: "0.25" + memory: 256M + restart: unless-stopped + +volumes: + influxdb-data: + grafana-data: + sc-config: diff --git a/specific_task/HyperCoreBalancerv2/influxdb/Dockerfile b/specific_task/HyperCoreBalancerv2/influxdb/Dockerfile new file mode 100644 index 0000000..c7f409a --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/influxdb/Dockerfile @@ -0,0 +1,3 @@ +FROM influxdb:2.7 +COPY init.sh /docker-entrypoint-initdb.d/init.sh +RUN chmod +x /docker-entrypoint-initdb.d/init.sh \ No newline at end of file diff --git a/specific_task/HyperCoreBalancerv2/influxdb/init.sh b/specific_task/HyperCoreBalancerv2/influxdb/init.sh new file mode 100644 index 0000000..ca32137 --- /dev/null +++ b/specific_task/HyperCoreBalancerv2/influxdb/init.sh @@ -0,0 +1,120 @@ +#!/bin/bash +# ============================================================================= +# InfluxDB Initialization Script +# ============================================================================= +# Runs during InfluxDB first-time setup on port 9999. +# Creates the long-term bucket and downsampling tasks. +# +# NOTE: The primary 'metrics' bucket doesn't exist yet at this stage — +# it gets created AFTER init scripts complete. Its retention is set +# by the influxdb-init sidecar container which waits for full startup. +# ============================================================================= + +INIT_PORT="${INFLUXD_INIT_PORT:-9999}" +INFLUX_HOST="http://localhost:${INIT_PORT}" +TOKEN="${DOCKER_INFLUXDB_INIT_ADMIN_TOKEN}" +ORG="${DOCKER_INFLUXDB_INIT_ORG}" +PRIMARY="${DOCKER_INFLUXDB_INIT_BUCKET}" +LONGTERM="${INFLUX_LONGTERM_BUCKET:-metrics_longterm}" +LONGTERM_RET="${INFLUX_LONGTERM_RETENTION:-1825}" +LONGTERM_RET_H=$(( LONGTERM_RET * 24 )) + +echo "[INIT] Starting configuration (init port: ${INIT_PORT})..." + +# ─── 1. Create long-term bucket ─── +echo "[INIT] Creating long-term bucket '${LONGTERM}'..." +influx bucket create \ + --host "$INFLUX_HOST" \ + -t "$TOKEN" \ + -o "$ORG" \ + -n "$LONGTERM" \ + --retention "${LONGTERM_RET_H}h" 2>&1 || \ +echo "[INIT] Long-term bucket already exists." + +# ─── 2. Create downsampling tasks ─── +echo "[INIT] Creating downsampling tasks..." +TMPDIR=$(mktemp -d) + +# --- Boolean fields per measurement (these get downsampled with last()) --- +declare -A BOOL_FIELDS +BOOL_FIELDS[node_metrics]="network_online|virtualization_online|allow_running_vms" +BOOL_FIELDS[drive_metrics]="is_healthy" +# vm_metrics and vsd_metrics have no boolean fields — pure numeric +BOOL_FIELDS[vm_metrics]="" +BOOL_FIELDS[vsd_metrics]="" + +for M in node_metrics vm_metrics vsd_metrics drive_metrics; do + BOOLS="${BOOL_FIELDS[$M]}" + + if [ -z "$BOOLS" ]; then + # No booleans — simple mean over everything + cat > "${TMPDIR}/${M}.flux" < range(start: -2h, stop: -1h) + |> filter(fn: (r) => r["_measurement"] == "${M}") + |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) + |> to(bucket: "${LONGTERM}", org: "${ORG}") +FLUX + else + # Split: mean() for numeric fields, last() for boolean fields + cat > "${TMPDIR}/${M}.flux" < range(start: -2h, stop: -1h) + |> filter(fn: (r) => r["_measurement"] == "${M}") + +numeric = base + |> filter(fn: (r) => r["_field"] !~ /^(${BOOLS})$/) + |> aggregateWindow(every: 1h, fn: mean, createEmpty: false) + +boolean = base + |> filter(fn: (r) => r["_field"] =~ /^(${BOOLS})$/) + |> aggregateWindow(every: 1h, fn: last, createEmpty: false) + +union(tables: [numeric, boolean]) + |> to(bucket: "${LONGTERM}", org: "${ORG}") +FLUX + fi + + influx task create --host "$INFLUX_HOST" -t "$TOKEN" -o "$ORG" -f "${TMPDIR}/${M}.flux" 2>&1 && \ + echo "[INIT] Created: downsample_${M}_hourly" || \ + echo "[INIT] Skipped: downsample_${M}_hourly (may already exist)" +done + +for M in vm_inventory block_device_metrics; do + cat > "${TMPDIR}/${M}.flux" < range(start: -2d, stop: -1d) + |> filter(fn: (r) => r["_measurement"] == "${M}") + |> aggregateWindow(every: 1d, fn: last, createEmpty: false) + |> to(bucket: "${LONGTERM}", org: "${ORG}") +FLUX + influx task create --host "$INFLUX_HOST" -t "$TOKEN" -o "$ORG" -f "${TMPDIR}/${M}.flux" 2>&1 && \ + echo "[INIT] Created: downsample_${M}_daily" || \ + echo "[INIT] Skipped: downsample_${M}_daily (may already exist)" +done + +for M in migration_events snapshot_metrics replication_metrics condition_metrics cluster_metrics; do + cat > "${TMPDIR}/${M}.flux" < range(start: -2d, stop: -1d) + |> filter(fn: (r) => r["_measurement"] == "${M}") + |> to(bucket: "${LONGTERM}", org: "${ORG}") +FLUX + influx task create --host "$INFLUX_HOST" -t "$TOKEN" -o "$ORG" -f "${TMPDIR}/${M}.flux" 2>&1 && \ + echo "[INIT] Created: archive_${M}_daily" || \ + echo "[INIT] Skipped: archive_${M}_daily (may already exist)" +done + +rm -rf "$TMPDIR" + +echo "[INIT] ✓ Init script complete." +echo "[INIT] Long-term: '${LONGTERM}' (${LONGTERM_RET}d)" +echo "[INIT] Primary bucket retention will be set by the influxdb-init sidecar." \ No newline at end of file