site_css_reliability/server-snpshtr.py

467 lines
15 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import hashlib
import json
import mimetypes
import shutil
import time
import zipfile
from collections import defaultdict
from io import BytesIO
from pathlib import Path
from typing import Any
from uuid import uuid4
import werkzeug.exceptions
from flask import (Flask, jsonify, make_response, redirect, request, send_file,
send_from_directory)
from flask_cors import CORS
APIKEY = Path('apikey.txt').read_text(encoding='utf-8').strip()
app = Flask(__name__, instance_relative_config=True)
CORS(app)
UPTIME_DB = Path('uptime.json')
if not UPTIME_DB.exists():
UPTIME_DB.write_bytes(b'{}')
UPTIME2_DB = Path('uptime2.json')
if not UPTIME2_DB.exists():
UPTIME2_DB.write_bytes(b'{}')
ID_DB = Path('ids.json')
if not ID_DB.exists():
ID_DB.write_bytes(b'{}')
CRON_DB = Path('crons.json')
if not CRON_DB.exists():
CRON_DB.write_bytes(b'[]')
JOB_DB = Path('jobs.json')
if not JOB_DB.exists():
JOB_DB.write_bytes(b'[]')
ANAL_DB = Path('analysis.json')
if not ANAL_DB.exists():
ANAL_DB.write_bytes(b'[]')
JOBS_PATH = Path('jobs')
for x in Path('.').glob('*.temp'):
x.unlink()
del x
JOB_DEFAULTS = dict(
hideScrollbar=1,
wait=0,
scrolltoJs='',
scrolltox=0,
scrolltoy=0,
preRunJs='',
waitJs=0,
checkReadyJs='',
)
class TempFile:
def __init__(self, place='/tmp') -> None:
self.file = Path(f'{place}/{uuid4().hex}.temp')
def __enter__(self) -> Path:
self.file.touch()
return self.file
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
if self.file.exists():
self.file.unlink()
@classmethod
def save_bytes(cls, dest: Path, b: bytes):
with cls('.') as f:
f.write_bytes(b)
dest.parent.mkdir(parents=True, exist_ok=True)
f.rename(dest)
@classmethod
def save_utf8(cls, dest: Path, s: str):
with cls('.') as f:
f.write_text(s, encoding='utf-8')
dest.parent.mkdir(parents=True, exist_ok=True)
f.rename(dest)
def next_id(name: str) -> int:
ids = json.loads(ID_DB.read_text(encoding='utf-8'))
if name not in ids:
ids[name] = 0
ids[name] += 1
TempFile.save_utf8(ID_DB, json.dumps(ids, indent=2))
return ids[name]
@app.route('/', methods=['HEAD', 'OPTIONS', 'GET'])
def index():
return 'Nothing to see here'
def get_updated_job_list() -> list[dict[str, Any]]:
tm = time.time()
anals: list[dict[str, Any]] = json.loads(
ANAL_DB.read_text(encoding='utf-8'))
crons: list[dict[str, Any]] = json.loads(
CRON_DB.read_text(encoding='utf-8'))
jobs: list[dict[str, Any]] = json.loads(JOB_DB.read_text(encoding='utf-8'))
pendingCrons: list[dict[str, Any]] = (
[*filter(lambda c: (tm-c['hours']*3600) > c['lastScheduledSec'], crons)])
for pendingCron in pendingCrons:
pendingCron['lastScheduledSec'] = tm
jobs.append(dict(jobId=next_id('job'), **pendingCron))
if len(pendingCrons):
cronId2historySize: dict[int, int] = dict(
map(lambda c: (c['cronId'], c['historySize']), crons))
#
cronId2jobpos: dict[int, list[int]] = defaultdict(list)
for i, job in enumerate(jobs):
cronId2jobpos[job['cronId']].append(i)
toDiscardPos: list[int] = list()
for cronId, jobspos in cronId2jobpos.items():
toDiscardPos += jobspos[:-round(cronId2historySize[cronId])]
#
for pos in toDiscardPos:
anals = [*filter(
lambda a: a["jobId"] != jobs[pos]["jobId"],
anals)]
#
toDiscardPos.sort()
toDiscardPos.reverse()
for pos in toDiscardPos:
job = jobs.pop(pos)
job_path = JOBS_PATH.joinpath(f'{job["jobId"]:020d}')
shutil.rmtree(job_path, ignore_errors=True)
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
TempFile.save_utf8(JOB_DB, json.dumps(jobs, indent=2))
return jobs
def worker_lastseen_update(name: str):
namestrip = name.strip()
if len(namestrip) > 0:
uptimes: dict[str, float] = json.loads(
UPTIME_DB.read_text(encoding='utf-8'))
uptimes[namestrip] = time.time()
TempFile.save_utf8(UPTIME_DB, json.dumps(uptimes, indent=2))
def analizer_lastseen_update(name: str):
namestrip = name.strip()
if len(namestrip) > 0:
uptimes: dict[str, float] = json.loads(
UPTIME2_DB.read_text(encoding='utf-8'))
uptimes[namestrip] = time.time()
TempFile.save_utf8(UPTIME2_DB, json.dumps(uptimes, indent=2))
def worker_get_next_job(worker: str) -> dict | None:
for job in get_updated_job_list():
if not JOBS_PATH.joinpath(f'{job["jobId"]:020d}/{worker}.zip').exists():
return job
return None
def get_updated_analysis_list() -> list[dict[str, Any]]:
jobs = raw_job_submission_get()
anals: list[dict[str, Any]] = json.loads(
ANAL_DB.read_text(encoding='utf-8'))
upd: bool = False
for job in jobs:
cronId = job['cronId']
jobId = job['jobId']
anal = next(filter(lambda a: a['jobId'] == jobId, anals), None)
completeness = len([*filter(
lambda f: f is not None,
job['workers'].values()
)])
if anal is None or anal['completeness'] != completeness:
upd = True
anal2 = dict(
cronId=cronId,
jobId=jobId,
finished=False,
assignee=None,
assigneeTime=.0,
completeness=completeness,
workers=job['workers'],
analysisFile=None,
analysis=None,
)
if anal is None:
anals.append(anal2)
else:
anal.update(anal2)
if upd:
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
return anals
def analyzer_get_next_job(worker: str) -> dict | None:
tm = time.time()
anals = get_updated_analysis_list()
for anal in anals:
if not anal['finished'] and anal['completeness'] > 0:
if anal['assignee'] == worker and anal['assigneeTime']+300 < tm:
return anal
elif not anal['assignee'] or anal['assigneeTime']+300 >= tm:
anal['assignee'] = worker
anal['assigneeTime'] = tm
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
return anal
else:
# this
# unfinished job
# is considered to be
# assigned and
# still running
# so
# try next job
pass
# there is no analysis to be ran
return None
@app.route('/job/next', methods=['HEAD', 'OPTIONS', 'GET'])
def job_next_get():
if APIKEY != request.args.get('key', '').strip():
resp = make_response('wrong value for GET parameter: key')
resp.status_code = 404
return resp
worker = request.args.get('worker', '')
worker_lastseen_update(worker)
next_job = worker_get_next_job(worker)
if next_job is None:
resp = make_response('no new job')
resp.status_code = 404
return resp
return jsonify({**JOB_DEFAULTS, **next_job})
@app.route('/job', methods=['POST'])
def job_post():
if APIKEY != request.args.get('key', '').strip():
resp = make_response('wrong value for GET parameter: key')
resp.status_code = 404
return resp
worker = request.args.get('worker', '')
if worker.strip() == '':
raise Exception('Unknown worker')
worker_lastseen_update(worker)
next_job = worker_get_next_job(worker)
jobId = int(request.args.get('jobId', '0'))
if next_job['jobId'] != jobId:
raise ValueError('Wrong job')
hashed = request.args.get('sha256', '')
zfb = request.data
m = hashlib.sha256()
m.update(zfb)
h = m.hexdigest()
if h != hashed:
raise ValueError('Sent data was not received right')
TempFile.save_bytes(JOBS_PATH.joinpath(f'{jobId:020d}/{worker}.zip'), zfb)
return jsonify('OK')
@app.route('/job', methods=['HEAD', 'OPTIONS', 'GET'])
def job_get():
return send_file(JOB_DB)
def raw_job_submission_get() -> list[dict[str, Any]]:
jobs = json.loads(JOB_DB.read_text(encoding='utf-8'))
uptimes = json.loads(UPTIME_DB.read_text(encoding='utf-8'))
for job in jobs:
job['workers'] = dict()
for worker in uptimes:
workerzip = JOBS_PATH.joinpath(f'{job["jobId"]:020d}/{worker}.zip')
job['workers'][worker] = None if not workerzip.exists() else str(workerzip)
return jobs
@app.route('/job/submission', methods=['HEAD', 'OPTIONS', 'GET'])
def job_submission_get():
return jsonify(raw_job_submission_get())
@app.route('/analysis', methods=['HEAD', 'OPTIONS', 'GET'])
def analysis_get():
return send_file(ANAL_DB)
@app.route('/analysis/next', methods=['HEAD', 'OPTIONS', 'GET'])
def analysis_next_get():
if APIKEY != request.args.get('key', '').strip():
resp = make_response('wrong value for GET parameter: key')
resp.status_code = 404
return resp
worker = request.args.get('worker', '')
if worker.strip() == '':
raise Exception('Unknown worker')
analizer_lastseen_update(worker)
next_job = analyzer_get_next_job(worker)
if next_job is None:
resp = make_response('no new job')
resp.status_code = 404
return resp
return jsonify(next_job)
@app.route('/analysis', methods=['POST'])
def analysis_post():
if APIKEY != request.args.get('key', '').strip():
resp = make_response('wrong value for GET parameter: key')
resp.status_code = 404
return resp
worker = request.args.get('worker', '')
if worker.strip() == '':
raise Exception('Unknown worker')
analizer_lastseen_update(worker)
next_anal = analyzer_get_next_job(worker)
jobId = int(request.args.get('jobId', '0'))
completeness = int(request.args.get('completeness', '0'))
if next_anal['assignee'] != worker:
raise ValueError('Wrong job')
if next_anal['jobId'] != jobId:
raise ValueError('Wrong job')
if next_anal['completeness'] != completeness:
raise ValueError('Wrong job')
hashed = request.args.get('sha256', '')
zfb = request.data
m = hashlib.sha256()
m.update(zfb)
h = m.hexdigest()
if h != hashed:
raise ValueError('Sent data was not received right')
analysisFile = JOBS_PATH.joinpath(f'{jobId:020d}/analysis.zip')
TempFile.save_bytes(analysisFile, zfb)
zf = zipfile.ZipFile(BytesIO(zfb))
anals: list[dict[str, Any]] = json.loads(
ANAL_DB.read_text(encoding='utf-8'))
for anal in anals:
if anal['jobId'] == next_anal['jobId']:
anal['finished'] = True
anal['analysisFile'] = str(analysisFile)
anal['analysis'] = json.loads(
zf.read('analysis.json').decode(encoding='utf-8'))
break
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
return jsonify('OK')
@app.route('/uptime', methods=['HEAD', 'OPTIONS', 'GET'])
def uptime_get():
return send_file(UPTIME_DB)
@app.route('/uptime2', methods=['HEAD', 'OPTIONS', 'GET'])
def uptime2_get():
return send_file(UPTIME2_DB)
@app.route('/cron', methods=['HEAD', 'OPTIONS', 'GET'])
def cron():
return send_file(CRON_DB)
@app.route('/cron/form', methods=['HEAD', 'OPTIONS', 'GET'])
def cron_form_get():
if request.args.get('apikey', '').strip() and APIKEY != request.args.get('apikey', '').strip():
return redirect('/cron/form')
return send_file(Path('cronform.html'))
@app.route('/cron/form', methods=['POST'])
def cron_form_post():
if APIKEY != request.form.get('apikey', '').strip():
return redirect('/cron/form')
elif request.form['action'] == 'add':
cronId = next_id('cron')
cron = {
**JOB_DEFAULTS,
**dict(
cronId=cronId,
url=request.form['url'].strip(),
hours=float(request.form['hours'].strip()),
historySize=float(request.form['historySize'].strip()),
lastScheduledSec=time.time()-3600 *
float(request.form['hours'].strip()),
preRunJs=request.form['preRunJs'].strip(),
wait=float(request.form['wait'].strip()),
scrolltoJs=request.form['scrolltoJs'].strip(),
scrolltox=float(request.form['scrolltox'].strip()),
scrolltoy=float(request.form['scrolltoy'].strip()),
checkReadyJs=request.form['checkReadyJs'].strip(),
waitJs=float(request.form['waitJs'].strip()),
)
}
crons = json.loads(CRON_DB.read_text(encoding='utf-8'))
crons.append(cron)
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
return redirect('/cron/form?message=added%20successfully&apikey=' + request.form['apikey'])
elif request.form['action'] == 'delete':
cronId = int(request.form['cronId'].strip())
crons = json.loads(CRON_DB.read_text(encoding='utf-8'))
crons = [*filter(lambda c: c['cronId'] != cronId, crons)]
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
return redirect('/cron/form?message=deleted%20successfully&apikey=' + request.form['apikey'])
@app.route('/jobs/<path:path>', methods=['HEAD', 'OPTIONS', 'GET'])
def jobs_static(path):
return send_from_directory('jobs', path)
@app.route('/unzip/jobs', methods=['HEAD', 'OPTIONS', 'GET'])
def unzip_jobs():
return jsonify([*map(lambda a: a.name, Path('jobs').iterdir())])
@app.route('/unzip/jobs/<path:path>', methods=['HEAD', 'OPTIONS', 'GET'])
def unzip_jobs_path(path):
target_zip = Path('jobs').joinpath(path)
if not str(target_zip.resolve()).startswith(str(Path('jobs').resolve())):
raise werkzeug.exceptions.NotAcceptable()
if not target_zip.exists():
raise werkzeug.exceptions.NotFound()
if target_zip.is_dir():
return jsonify([*map(lambda a: a.name, target_zip.iterdir())])
with zipfile.ZipFile(target_zip, mode='r') as zf:
return send_file(
BytesIO(json.dumps(
[i.filename for i in zf.infolist()], indent=2).encode('utf-8')),
last_modified=target_zip.stat().st_mtime,
mimetype='application/json'
)
@app.route('/unzip/jobs/<path:path>.zip/<path:zippath>', methods=['HEAD', 'OPTIONS', 'GET'])
def unzip_jobs_path_inner(path, zippath):
target_zip = Path('jobs').joinpath(path+'.zip')
if not str(target_zip.resolve()).startswith(str(Path('jobs').resolve())):
raise werkzeug.exceptions.NotAcceptable()
if not target_zip.exists():
raise werkzeug.exceptions.NotFound()
if zippath in ('', '/'):
return unzip_jobs_path(path)
with zipfile.ZipFile(target_zip, mode='r') as zf:
try:
zd = zf.read(zippath)
except KeyError:
raise werkzeug.exceptions.NotFound()
return send_file(
BytesIO(zd),
last_modified=target_zip.stat().st_mtime,
mimetype=mimetypes.guess_type(zippath)[0]
)