467 lines
15 KiB
Python
467 lines
15 KiB
Python
#!/usr/bin/env python3
|
|
# -*- encoding: utf-8 -*-
|
|
|
|
import hashlib
|
|
import json
|
|
import mimetypes
|
|
import shutil
|
|
import time
|
|
import zipfile
|
|
from collections import defaultdict
|
|
from io import BytesIO
|
|
from pathlib import Path
|
|
from typing import Any
|
|
from uuid import uuid4
|
|
|
|
import werkzeug.exceptions
|
|
from flask import (Flask, jsonify, make_response, redirect, request, send_file,
|
|
send_from_directory)
|
|
from flask_cors import CORS
|
|
|
|
APIKEY = Path('apikey.txt').read_text(encoding='utf-8').strip()
|
|
|
|
app = Flask(__name__, instance_relative_config=True)
|
|
CORS(app)
|
|
|
|
UPTIME_DB = Path('uptime.json')
|
|
if not UPTIME_DB.exists():
|
|
UPTIME_DB.write_bytes(b'{}')
|
|
|
|
UPTIME2_DB = Path('uptime2.json')
|
|
if not UPTIME2_DB.exists():
|
|
UPTIME2_DB.write_bytes(b'{}')
|
|
|
|
ID_DB = Path('ids.json')
|
|
if not ID_DB.exists():
|
|
ID_DB.write_bytes(b'{}')
|
|
|
|
CRON_DB = Path('crons.json')
|
|
if not CRON_DB.exists():
|
|
CRON_DB.write_bytes(b'[]')
|
|
|
|
JOB_DB = Path('jobs.json')
|
|
if not JOB_DB.exists():
|
|
JOB_DB.write_bytes(b'[]')
|
|
|
|
ANAL_DB = Path('analysis.json')
|
|
if not ANAL_DB.exists():
|
|
ANAL_DB.write_bytes(b'[]')
|
|
|
|
JOBS_PATH = Path('jobs')
|
|
|
|
for x in Path('.').glob('*.temp'):
|
|
x.unlink()
|
|
del x
|
|
|
|
|
|
JOB_DEFAULTS = dict(
|
|
hideScrollbar=1,
|
|
wait=0,
|
|
scrolltoJs='',
|
|
scrolltox=0,
|
|
scrolltoy=0,
|
|
preRunJs='',
|
|
waitJs=0,
|
|
checkReadyJs='',
|
|
)
|
|
|
|
|
|
class TempFile:
|
|
def __init__(self, place='/tmp') -> None:
|
|
self.file = Path(f'{place}/{uuid4().hex}.temp')
|
|
|
|
def __enter__(self) -> Path:
|
|
self.file.touch()
|
|
return self.file
|
|
|
|
def __exit__(self, exc_type, exc_val, exc_tb) -> None:
|
|
if self.file.exists():
|
|
self.file.unlink()
|
|
|
|
@classmethod
|
|
def save_bytes(cls, dest: Path, b: bytes):
|
|
with cls('.') as f:
|
|
f.write_bytes(b)
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
f.rename(dest)
|
|
|
|
@classmethod
|
|
def save_utf8(cls, dest: Path, s: str):
|
|
with cls('.') as f:
|
|
f.write_text(s, encoding='utf-8')
|
|
dest.parent.mkdir(parents=True, exist_ok=True)
|
|
f.rename(dest)
|
|
|
|
|
|
def next_id(name: str) -> int:
|
|
ids = json.loads(ID_DB.read_text(encoding='utf-8'))
|
|
if name not in ids:
|
|
ids[name] = 0
|
|
ids[name] += 1
|
|
TempFile.save_utf8(ID_DB, json.dumps(ids, indent=2))
|
|
return ids[name]
|
|
|
|
|
|
@app.route('/', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def index():
|
|
return 'Nothing to see here'
|
|
|
|
|
|
def get_updated_job_list() -> list[dict[str, Any]]:
|
|
tm = time.time()
|
|
anals: list[dict[str, Any]] = json.loads(
|
|
ANAL_DB.read_text(encoding='utf-8'))
|
|
crons: list[dict[str, Any]] = json.loads(
|
|
CRON_DB.read_text(encoding='utf-8'))
|
|
jobs: list[dict[str, Any]] = json.loads(JOB_DB.read_text(encoding='utf-8'))
|
|
pendingCrons: list[dict[str, Any]] = (
|
|
[*filter(lambda c: (tm-c['hours']*3600) > c['lastScheduledSec'], crons)])
|
|
for pendingCron in pendingCrons:
|
|
pendingCron['lastScheduledSec'] = tm
|
|
jobs.append(dict(jobId=next_id('job'), **pendingCron))
|
|
if len(pendingCrons):
|
|
cronId2historySize: dict[int, int] = dict(
|
|
map(lambda c: (c['cronId'], c['historySize']), crons))
|
|
#
|
|
cronId2jobpos: dict[int, list[int]] = defaultdict(list)
|
|
for i, job in enumerate(jobs):
|
|
cronId2jobpos[job['cronId']].append(i)
|
|
toDiscardPos: list[int] = list()
|
|
for cronId, jobspos in cronId2jobpos.items():
|
|
toDiscardPos += jobspos[:-round(cronId2historySize[cronId])]
|
|
#
|
|
for pos in toDiscardPos:
|
|
anals = [*filter(
|
|
lambda a: a["jobId"] != jobs[pos]["jobId"],
|
|
anals)]
|
|
#
|
|
toDiscardPos.sort()
|
|
toDiscardPos.reverse()
|
|
for pos in toDiscardPos:
|
|
job = jobs.pop(pos)
|
|
job_path = JOBS_PATH.joinpath(f'{job["jobId"]:020d}')
|
|
shutil.rmtree(job_path, ignore_errors=True)
|
|
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
|
|
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
|
|
TempFile.save_utf8(JOB_DB, json.dumps(jobs, indent=2))
|
|
return jobs
|
|
|
|
|
|
def worker_lastseen_update(name: str):
|
|
namestrip = name.strip()
|
|
if len(namestrip) > 0:
|
|
uptimes: dict[str, float] = json.loads(
|
|
UPTIME_DB.read_text(encoding='utf-8'))
|
|
uptimes[namestrip] = time.time()
|
|
TempFile.save_utf8(UPTIME_DB, json.dumps(uptimes, indent=2))
|
|
|
|
|
|
def analizer_lastseen_update(name: str):
|
|
namestrip = name.strip()
|
|
if len(namestrip) > 0:
|
|
uptimes: dict[str, float] = json.loads(
|
|
UPTIME2_DB.read_text(encoding='utf-8'))
|
|
uptimes[namestrip] = time.time()
|
|
TempFile.save_utf8(UPTIME2_DB, json.dumps(uptimes, indent=2))
|
|
|
|
|
|
def worker_get_next_job(worker: str) -> dict | None:
|
|
for job in get_updated_job_list():
|
|
if not JOBS_PATH.joinpath(f'{job["jobId"]:020d}/{worker}.zip').exists():
|
|
return job
|
|
return None
|
|
|
|
|
|
def get_updated_analysis_list() -> list[dict[str, Any]]:
|
|
jobs = raw_job_submission_get()
|
|
anals: list[dict[str, Any]] = json.loads(
|
|
ANAL_DB.read_text(encoding='utf-8'))
|
|
upd: bool = False
|
|
for job in jobs:
|
|
cronId = job['cronId']
|
|
jobId = job['jobId']
|
|
anal = next(filter(lambda a: a['jobId'] == jobId, anals), None)
|
|
completeness = len([*filter(
|
|
lambda f: f is not None,
|
|
job['workers'].values()
|
|
)])
|
|
if anal is None or anal['completeness'] != completeness:
|
|
upd = True
|
|
anal2 = dict(
|
|
cronId=cronId,
|
|
jobId=jobId,
|
|
finished=False,
|
|
assignee=None,
|
|
assigneeTime=.0,
|
|
completeness=completeness,
|
|
workers=job['workers'],
|
|
analysisFile=None,
|
|
analysis=None,
|
|
)
|
|
if anal is None:
|
|
anals.append(anal2)
|
|
else:
|
|
anal.update(anal2)
|
|
if upd:
|
|
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
|
|
return anals
|
|
|
|
|
|
def analyzer_get_next_job(worker: str) -> dict | None:
|
|
tm = time.time()
|
|
anals = get_updated_analysis_list()
|
|
for anal in anals:
|
|
if not anal['finished'] and anal['completeness'] > 0:
|
|
if anal['assignee'] == worker and anal['assigneeTime']+300 < tm:
|
|
return anal
|
|
elif not anal['assignee'] or anal['assigneeTime']+300 >= tm:
|
|
anal['assignee'] = worker
|
|
anal['assigneeTime'] = tm
|
|
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
|
|
return anal
|
|
else:
|
|
# this
|
|
# unfinished job
|
|
# is considered to be
|
|
# assigned and
|
|
# still running
|
|
# so
|
|
# try next job
|
|
pass
|
|
# there is no analysis to be ran
|
|
return None
|
|
|
|
|
|
@app.route('/job/next', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def job_next_get():
|
|
if APIKEY != request.args.get('key', '').strip():
|
|
resp = make_response('wrong value for GET parameter: key')
|
|
resp.status_code = 404
|
|
return resp
|
|
worker = request.args.get('worker', '')
|
|
worker_lastseen_update(worker)
|
|
next_job = worker_get_next_job(worker)
|
|
if next_job is None:
|
|
resp = make_response('no new job')
|
|
resp.status_code = 404
|
|
return resp
|
|
return jsonify({**JOB_DEFAULTS, **next_job})
|
|
|
|
|
|
@app.route('/job', methods=['POST'])
|
|
def job_post():
|
|
if APIKEY != request.args.get('key', '').strip():
|
|
resp = make_response('wrong value for GET parameter: key')
|
|
resp.status_code = 404
|
|
return resp
|
|
worker = request.args.get('worker', '')
|
|
if worker.strip() == '':
|
|
raise Exception('Unknown worker')
|
|
worker_lastseen_update(worker)
|
|
next_job = worker_get_next_job(worker)
|
|
jobId = int(request.args.get('jobId', '0'))
|
|
if next_job['jobId'] != jobId:
|
|
raise ValueError('Wrong job')
|
|
hashed = request.args.get('sha256', '')
|
|
zfb = request.data
|
|
m = hashlib.sha256()
|
|
m.update(zfb)
|
|
h = m.hexdigest()
|
|
if h != hashed:
|
|
raise ValueError('Sent data was not received right')
|
|
TempFile.save_bytes(JOBS_PATH.joinpath(f'{jobId:020d}/{worker}.zip'), zfb)
|
|
return jsonify('OK')
|
|
|
|
|
|
@app.route('/job', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def job_get():
|
|
return send_file(JOB_DB)
|
|
|
|
|
|
def raw_job_submission_get() -> list[dict[str, Any]]:
|
|
jobs = json.loads(JOB_DB.read_text(encoding='utf-8'))
|
|
uptimes = json.loads(UPTIME_DB.read_text(encoding='utf-8'))
|
|
for job in jobs:
|
|
job['workers'] = dict()
|
|
for worker in uptimes:
|
|
workerzip = JOBS_PATH.joinpath(f'{job["jobId"]:020d}/{worker}.zip')
|
|
job['workers'][worker] = None if not workerzip.exists() else str(workerzip)
|
|
return jobs
|
|
|
|
|
|
@app.route('/job/submission', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def job_submission_get():
|
|
return jsonify(raw_job_submission_get())
|
|
|
|
|
|
@app.route('/analysis', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def analysis_get():
|
|
return send_file(ANAL_DB)
|
|
|
|
|
|
@app.route('/analysis/next', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def analysis_next_get():
|
|
if APIKEY != request.args.get('key', '').strip():
|
|
resp = make_response('wrong value for GET parameter: key')
|
|
resp.status_code = 404
|
|
return resp
|
|
worker = request.args.get('worker', '')
|
|
if worker.strip() == '':
|
|
raise Exception('Unknown worker')
|
|
analizer_lastseen_update(worker)
|
|
next_job = analyzer_get_next_job(worker)
|
|
if next_job is None:
|
|
resp = make_response('no new job')
|
|
resp.status_code = 404
|
|
return resp
|
|
return jsonify(next_job)
|
|
|
|
|
|
@app.route('/analysis', methods=['POST'])
|
|
def analysis_post():
|
|
if APIKEY != request.args.get('key', '').strip():
|
|
resp = make_response('wrong value for GET parameter: key')
|
|
resp.status_code = 404
|
|
return resp
|
|
worker = request.args.get('worker', '')
|
|
if worker.strip() == '':
|
|
raise Exception('Unknown worker')
|
|
analizer_lastseen_update(worker)
|
|
next_anal = analyzer_get_next_job(worker)
|
|
jobId = int(request.args.get('jobId', '0'))
|
|
completeness = int(request.args.get('completeness', '0'))
|
|
if next_anal['assignee'] != worker:
|
|
raise ValueError('Wrong job')
|
|
if next_anal['jobId'] != jobId:
|
|
raise ValueError('Wrong job')
|
|
if next_anal['completeness'] != completeness:
|
|
raise ValueError('Wrong job')
|
|
hashed = request.args.get('sha256', '')
|
|
zfb = request.data
|
|
m = hashlib.sha256()
|
|
m.update(zfb)
|
|
h = m.hexdigest()
|
|
if h != hashed:
|
|
raise ValueError('Sent data was not received right')
|
|
analysisFile = JOBS_PATH.joinpath(f'{jobId:020d}/analysis.zip')
|
|
TempFile.save_bytes(analysisFile, zfb)
|
|
zf = zipfile.ZipFile(BytesIO(zfb))
|
|
anals: list[dict[str, Any]] = json.loads(
|
|
ANAL_DB.read_text(encoding='utf-8'))
|
|
for anal in anals:
|
|
if anal['jobId'] == next_anal['jobId']:
|
|
anal['finished'] = True
|
|
anal['analysisFile'] = str(analysisFile)
|
|
anal['analysis'] = json.loads(
|
|
zf.read('analysis.json').decode(encoding='utf-8'))
|
|
break
|
|
TempFile.save_utf8(ANAL_DB, json.dumps(anals, indent=2))
|
|
return jsonify('OK')
|
|
|
|
|
|
@app.route('/uptime', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def uptime_get():
|
|
return send_file(UPTIME_DB)
|
|
|
|
|
|
@app.route('/uptime2', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def uptime2_get():
|
|
return send_file(UPTIME2_DB)
|
|
|
|
|
|
@app.route('/cron', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def cron():
|
|
return send_file(CRON_DB)
|
|
|
|
|
|
@app.route('/cron/form', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def cron_form_get():
|
|
if request.args.get('apikey', '').strip() and APIKEY != request.args.get('apikey', '').strip():
|
|
return redirect('/cron/form')
|
|
return send_file(Path('cronform.html'))
|
|
|
|
|
|
@app.route('/cron/form', methods=['POST'])
|
|
def cron_form_post():
|
|
if APIKEY != request.form.get('apikey', '').strip():
|
|
return redirect('/cron/form')
|
|
elif request.form['action'] == 'add':
|
|
cronId = next_id('cron')
|
|
cron = {
|
|
**JOB_DEFAULTS,
|
|
**dict(
|
|
cronId=cronId,
|
|
url=request.form['url'].strip(),
|
|
hours=float(request.form['hours'].strip()),
|
|
historySize=float(request.form['historySize'].strip()),
|
|
lastScheduledSec=time.time()-3600 *
|
|
float(request.form['hours'].strip()),
|
|
preRunJs=request.form['preRunJs'].strip(),
|
|
wait=float(request.form['wait'].strip()),
|
|
scrolltoJs=request.form['scrolltoJs'].strip(),
|
|
scrolltox=float(request.form['scrolltox'].strip()),
|
|
scrolltoy=float(request.form['scrolltoy'].strip()),
|
|
checkReadyJs=request.form['checkReadyJs'].strip(),
|
|
waitJs=float(request.form['waitJs'].strip()),
|
|
)
|
|
}
|
|
crons = json.loads(CRON_DB.read_text(encoding='utf-8'))
|
|
crons.append(cron)
|
|
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
|
|
return redirect('/cron/form?message=added%20successfully&apikey=' + request.form['apikey'])
|
|
elif request.form['action'] == 'delete':
|
|
cronId = int(request.form['cronId'].strip())
|
|
crons = json.loads(CRON_DB.read_text(encoding='utf-8'))
|
|
crons = [*filter(lambda c: c['cronId'] != cronId, crons)]
|
|
TempFile.save_utf8(CRON_DB, json.dumps(crons, indent=2))
|
|
return redirect('/cron/form?message=deleted%20successfully&apikey=' + request.form['apikey'])
|
|
|
|
|
|
@app.route('/jobs/<path:path>', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def jobs_static(path):
|
|
return send_from_directory('jobs', path)
|
|
|
|
|
|
@app.route('/unzip/jobs', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def unzip_jobs():
|
|
return jsonify([*map(lambda a: a.name, Path('jobs').iterdir())])
|
|
|
|
|
|
@app.route('/unzip/jobs/<path:path>', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def unzip_jobs_path(path):
|
|
target_zip = Path('jobs').joinpath(path)
|
|
if not str(target_zip.resolve()).startswith(str(Path('jobs').resolve())):
|
|
raise werkzeug.exceptions.NotAcceptable()
|
|
if not target_zip.exists():
|
|
raise werkzeug.exceptions.NotFound()
|
|
if target_zip.is_dir():
|
|
return jsonify([*map(lambda a: a.name, target_zip.iterdir())])
|
|
with zipfile.ZipFile(target_zip, mode='r') as zf:
|
|
return send_file(
|
|
BytesIO(json.dumps(
|
|
[i.filename for i in zf.infolist()], indent=2).encode('utf-8')),
|
|
last_modified=target_zip.stat().st_mtime,
|
|
mimetype='application/json'
|
|
)
|
|
|
|
|
|
@app.route('/unzip/jobs/<path:path>.zip/<path:zippath>', methods=['HEAD', 'OPTIONS', 'GET'])
|
|
def unzip_jobs_path_inner(path, zippath):
|
|
target_zip = Path('jobs').joinpath(path+'.zip')
|
|
if not str(target_zip.resolve()).startswith(str(Path('jobs').resolve())):
|
|
raise werkzeug.exceptions.NotAcceptable()
|
|
if not target_zip.exists():
|
|
raise werkzeug.exceptions.NotFound()
|
|
if zippath in ('', '/'):
|
|
return unzip_jobs_path(path)
|
|
with zipfile.ZipFile(target_zip, mode='r') as zf:
|
|
try:
|
|
zd = zf.read(zippath)
|
|
except KeyError:
|
|
raise werkzeug.exceptions.NotFound()
|
|
return send_file(
|
|
BytesIO(zd),
|
|
last_modified=target_zip.stat().st_mtime,
|
|
mimetype=mimetypes.guess_type(zippath)[0]
|
|
)
|