#!/usr/bin/env python3 # -*- encoding: utf-8 -*- from concurrent.futures import Future, ProcessPoolExecutor import hashlib import json import subprocess import sys import pandas import importlib import socket import time import traceback import zipfile from collections import defaultdict from dataclasses import dataclass from io import BytesIO, StringIO from pathlib import Path from typing import Iterable, Literal, TypeVar import numpy import PIL.Image import requests BASEAPI = Path('baseapi.txt').read_text(encoding='utf-8').strip() APIKEY = Path('apikey.txt').read_text(encoding='utf-8').strip() UPDURL = Path('updurl.txt').read_text(encoding='utf-8').strip() HOSTNAME = socket.gethostname() if Path('hostname_override.txt').is_file(): HOSTNAME = Path('hostname_override.txt').read_text( encoding='utf-8').strip() _VT = TypeVar('_VT') @dataclass class BrowsingContext: platform: str hostname: str browser: str @dataclass class DisplayContext(BrowsingContext): screen: str def flatten(lli: Iterable[Iterable[_VT]]) -> Iterable[_VT]: return (i for li in lli for i in li) def get_git_asset_url(fl: str) -> str: return UPDURL.rsplit('/', 1)[0]+'/'+fl def get_content_checking(fl: str) -> bytes: resp = requests.get(fl) resp.raise_for_status() return resp.content def get_git_asset(fl: str) -> bytes: return get_content_checking(get_git_asset_url(fl)) def zip_in_memory_extract_all(zf: zipfile.ZipFile) -> dict[str, bytes]: return dict((zil.filename, zf.read(zil)) for zil in zf.infolist()) def into_rgb(pi: PIL.Image.Image) -> PIL.Image.Image: im = PIL.Image.new('RGB', pi.size, '#FFFFFF') im.paste(pi) return im def into_rgb_max_size(pi: PIL.Image.Image, other_size: tuple[int, int]) -> PIL.Image.Image: im = PIL.Image.new('RGB', ( max(pi.size[0], other_size[0]), max(pi.size[1], other_size[1]) ), '#FFFFFF') im.paste(pi) return im def dual_rgb_to_diff(rgb1: PIL.Image.Image, rgb2: PIL.Image.Image) -> PIL.Image.Image: np_1 = numpy.asarray(into_rgb_max_size( rgb1, rgb2.size)).astype(numpy.float64) np_2 = numpy.asarray(into_rgb_max_size( rgb2, rgb1.size)).astype(numpy.float64) e = np_1 - np_2 eabs = e.__abs__().astype(numpy.uint8) return PIL.Image.fromarray(eabs) def diff_to_rmse(i: PIL.Image.Image) -> float: e = numpy.asarray(i).astype(numpy.float64) se = e*e mse = se.sum()/se.size rmse = mse**.5 return rmse def img_to_png(i: PIL.Image.Image) -> bytes: bio = BytesIO() i.save(bio, format='PNG', optimize=True) return bio.getvalue() def avg(li: list[int | float], on_empty: float | None = None) -> float: if len(li) <= 0: if on_empty is None: raise ValueError('List is empty') return on_empty return sum(li)/len(li) def run_job( jobId: int, completeness: int, workers: dict[str, str | None], ): with ProcessPoolExecutor(16) as pe: name_images = { name: into_rgb(PIL.Image.open(BytesIO(bts))) for name, bts in [*flatten([ zip_in_memory_extract_all(zipfile.ZipFile( BytesIO(get_content_checking(f'{BASEAPI}/{worker}')))).items() for worker in workers.values() if worker is not None])]} samples: dict[str, list[tuple[BrowsingContext, PIL.Image.Image]]] = defaultdict(list) for name, im in name_images.items(): plat, host, browser, sizing = name.split('.', 3) sizing = sizing.rsplit('.', 1)[0] bc = BrowsingContext(plat, host, browser) samples[sizing].append((bc, im)) del plat, host, browser, name, im, bc del name_images sample_pairs_futdiff: dict[str, list[tuple[BrowsingContext, BrowsingContext, Future[PIL.Image.Image]]]] = defaultdict(list) for sizing, data_points in samples.items(): if len(data_points) < 2: sample_pairs_futdiff[sizing] = list() continue for i, (bc1, im1) in enumerate(data_points[:-1]): for bc2, im2 in data_points[i+1:]: pe.submit(dual_rgb_to_diff, im1, im2) sample_pairs_futdiff[sizing].append( (bc1, bc2, pe.submit(dual_rgb_to_diff, im1, im2))) del samples sample_pairs_diff: dict[str, list[tuple[BrowsingContext, BrowsingContext, PIL.Image.Image]]] = { k: [(bc1, bc2, fut.result()) for bc1, bc2, fut in vs] for k, vs in sample_pairs_futdiff.items() } del sample_pairs_futdiff sample_pairs_futrmse: dict[str, list[tuple[BrowsingContext, BrowsingContext, Future[float]]]] = { k: [(bc1, bc2, pe.submit(diff_to_rmse, dif)) for bc1, bc2, dif in vs] for k, vs in sample_pairs_diff.items() } sample_pairs_futbytes: dict[str, list[tuple[BrowsingContext, BrowsingContext, Future[bytes]]]] = { k: [(bc1, bc2, pe.submit(img_to_png, dif)) for bc1, bc2, dif in vs] for k, vs in sample_pairs_diff.items() } del sample_pairs_diff sample_pairs_rmse: dict[str, list[tuple[BrowsingContext, BrowsingContext, float]]] = { k: [(bc1, bc2, fut.result()) for bc1, bc2, fut in vs] for k, vs in sample_pairs_futrmse.items() } sample_pairs_bytes: dict[str, list[tuple[BrowsingContext, BrowsingContext, bytes]]] = { k: [(bc1, bc2, fut.result()) for bc1, bc2, fut in vs] for k, vs in sample_pairs_futbytes.items() } bzf = BytesIO() zf = zipfile.ZipFile(bzf, mode='w', compresslevel=9, compression=zipfile.ZIP_DEFLATED) for fname, imcontents in flatten([ [( ( '{}.{}.{}.{}.{}.{}.{}.{}.png'.format( k.split('.')[0], k.split('.')[1], v1.hostname, v2.hostname, v1.platform, v2.platform, v1.browser, v2.browser, ) ), v3, ) for v1, v2, v3 in vs] for k, vs in sample_pairs_bytes.items() ]): zf.writestr(fname, imcontents) del fname, imcontents df = pandas.DataFrame([ *flatten([ [( k.split('.')[0], k.split('.')[1], v1.hostname, v2.hostname, v1.platform, v2.platform, v1.browser, v2.browser, v3, ) for v1, v2, v3 in vs] for k, vs in sample_pairs_rmse.items()])], columns=[ 'resolution', 'printScope', 'hostname1', 'hostname2', 'platform1', 'platform2', 'browser1', 'browser2', 'rmse', ]).sort_values(by=[ 'rmse', 'resolution', 'printScope', 'hostname1', 'hostname2', 'platform1', 'platform2', 'browser1', 'browser2', ]) recordsio = StringIO() df.to_json(recordsio, orient='records') records = json.loads(recordsio.getvalue()) major_difference: dict[Literal['resolution'] | Literal['platform'] | Literal['browser'] | Literal['platformBrowser'], dict[str, list[float]]] = dict( resolution=defaultdict(list), platform=defaultdict(list), browser=defaultdict(list), platformBrowser=defaultdict(list), ) for record in records: major_difference['resolution'][f"{record['resolution']}.{record['printScope']}"].append( record['rmse']) if record['platform1'] != record['platform2']: major_difference['platform'][record['platform1']].append( record['rmse']) major_difference['platform'][record['platform2']].append( record['rmse']) if record['browser1'] != record['browser2']: major_difference['browser'][record['browser1']].append( record['rmse']) major_difference['browser'][record['browser2']].append( record['rmse']) if record['platform1'] != record['platform2'] or record['browser1'] != record['browser2']: major_difference['platformBrowser'][f"{record['platform1']}.{record['browser1']}"].append( record['rmse']) major_difference['platformBrowser'][f"{record['platform2']}.{record['browser2']}"].append( record['rmse']) major_difference_avg = { k: {k2: avg(v) for k2, v in d2.items()} for k, d2 in major_difference.items()} report = dict( indicators=major_difference_avg, records=records, ) zf.writestr('analysis.json', json.dumps( report, indent=4).encode(encoding='utf-8')) zf.close() b = bzf.getvalue() m = hashlib.sha256() m.update(b) h = m.hexdigest() print( f'[DEBUG] About to upload {len(b)/(2**20):.2f} MB for job {jobId}') requests.post( f'{BASEAPI}/analysis?key={APIKEY}&worker={HOSTNAME}&jobId={jobId}&completeness={completeness}&sha256={h}', headers={'content-type': 'application/zip', 'content-length': str(len(b))}, data=b).raise_for_status() print(f'[INFO] Uploaded analysis for job {jobId} successfully') def initialize_and_run_job( jobId: int, completeness: int, workers: dict[str, str | None], ): run_job(jobId, completeness, workers) def subprocess_run_job( jobId: int, completeness: int, workers: dict[str, str | None], ): return subprocess.run([ sys.executable, sys.argv[0], str(jobId), str(completeness), json.dumps(workers), ], text=True, check=True, stdout=sys.stdout, stderr=sys.stderr, stdin=sys.stdin, ) def gather_next_job(): try: resp = requests.get( f'{BASEAPI}/analysis/next?key={APIKEY}&worker={HOSTNAME}') except requests.exceptions.ConnectionError: time.sleep(10) return if resp.status_code == 404: print('[INFO] No new analysis') time.sleep(60) elif resp.status_code == 200: job = resp.json() print(f'[INFO] Running analysis {job["jobId"]}') subprocess_run_job( job['jobId'], job['completeness'], job['workers'], ) time.sleep(2) else: try: resp.raise_for_status() raise ValueError(f'Unknown status code: {resp.status_code}') except Exception: print(traceback.format_exc()) time.sleep(30) def self_update(): global self_update, gather_next_job pss = Path(__file__) try: ncnt = get_git_asset(pss.name) if ncnt != pss.read_bytes(): if not Path('.git').exists(): pss.write_bytes(ncnt) except Exception: print('[WARN] Could not update') raise importlib.invalidate_caches() selfmodule = importlib.import_module(pss.stem) importlib.reload(selfmodule) gather_next_job = selfmodule.gather_next_job self_update = selfmodule.self_update def main(): if len(sys.argv) == 1: while True: try: self_update() gather_next_job() except Exception: print(traceback.format_exc()) time.sleep(2) elif len(sys.argv) == 4: [jobId_str, completeness_str, workers_str,] = sys.argv[1:] jobId = int(jobId_str) completeness = int(completeness_str) workers: dict[str, str | None] = json.loads( # type: ignore workers_str) initialize_and_run_job( jobId, completeness, workers, ) else: raise ValueError(f'Wrong number of arguments: {len(sys.argv)}') if __name__ == '__main__': main()