site_css_reliability/client-snpcmp.py

402 lines
13 KiB
Python

#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
from concurrent.futures import Future, ProcessPoolExecutor
import hashlib
import json
import subprocess
import sys
import pandas
import importlib
import socket
import time
import traceback
import zipfile
from collections import defaultdict
from dataclasses import dataclass
from io import BytesIO, StringIO
from pathlib import Path
from typing import Iterable, Literal, TypeVar
import numpy
import PIL.Image
import requests
BASEAPI = Path('baseapi.txt').read_text(encoding='utf-8').strip()
APIKEY = Path('apikey.txt').read_text(encoding='utf-8').strip()
UPDURL = Path('updurl.txt').read_text(encoding='utf-8').strip()
HOSTNAME = socket.gethostname()
if Path('hostname_override.txt').is_file():
HOSTNAME = Path('hostname_override.txt').read_text(
encoding='utf-8').strip()
_VT = TypeVar('_VT')
@dataclass
class BrowsingContext:
platform: str
hostname: str
browser: str
@dataclass
class DisplayContext(BrowsingContext):
screen: str
def flatten(lli: Iterable[Iterable[_VT]]) -> Iterable[_VT]:
return (i for li in lli for i in li)
def get_git_asset_url(fl: str) -> str:
return UPDURL.rsplit('/', 1)[0]+'/'+fl
def get_content_checking(fl: str) -> bytes:
resp = requests.get(fl)
resp.raise_for_status()
return resp.content
def get_git_asset(fl: str) -> bytes:
return get_content_checking(get_git_asset_url(fl))
def zip_in_memory_extract_all(zf: zipfile.ZipFile) -> dict[str, bytes]:
return dict((zil.filename, zf.read(zil)) for zil in zf.infolist())
def into_rgb(pi: PIL.Image.Image) -> PIL.Image.Image:
im = PIL.Image.new('RGB', pi.size, '#FFFFFF')
im.paste(pi)
return im
def into_rgb_max_size(pi: PIL.Image.Image, other_size: tuple[int, int]) -> PIL.Image.Image:
im = PIL.Image.new('RGB', (
max(pi.size[0], other_size[0]),
max(pi.size[1], other_size[1])
), '#FFFFFF')
im.paste(pi)
return im
def dual_rgb_to_diff(rgb1: PIL.Image.Image, rgb2: PIL.Image.Image) -> PIL.Image.Image:
np_1 = numpy.asarray(into_rgb_max_size(
rgb1, rgb2.size)).astype(numpy.float64)
np_2 = numpy.asarray(into_rgb_max_size(
rgb2, rgb1.size)).astype(numpy.float64)
e = np_1 - np_2
eabs = e.__abs__().astype(numpy.uint8)
return PIL.Image.fromarray(eabs)
def diff_to_rmse(i: PIL.Image.Image) -> float:
e = numpy.asarray(i).astype(numpy.float64)
se = e*e
mse = se.sum()/se.size
rmse = mse**.5
return rmse
def img_to_png(i: PIL.Image.Image) -> bytes:
bio = BytesIO()
i.save(bio, format='PNG', optimize=True)
return bio.getvalue()
def avg(li: list[int | float], on_empty: float | None = None) -> float:
if len(li) <= 0:
if on_empty is None:
raise ValueError('List is empty')
return on_empty
return sum(li)/len(li)
def run_job(
jobId: int,
completeness: int,
workers: dict[str, str | None],
):
with ProcessPoolExecutor(16) as pe:
name_images = {
name: into_rgb(PIL.Image.open(BytesIO(bts)))
for name, bts in
[*flatten([
zip_in_memory_extract_all(zipfile.ZipFile(
BytesIO(get_content_checking(f'{BASEAPI}/{worker}')))).items()
for worker in workers.values() if worker is not None])]}
samples: dict[str, list[tuple[BrowsingContext,
PIL.Image.Image]]] = defaultdict(list)
for name, im in name_images.items():
plat, host, browser, sizing = name.split('.', 3)
sizing = sizing.rsplit('.', 1)[0]
bc = BrowsingContext(plat, host, browser)
samples[sizing].append((bc, im))
del plat, host, browser, name, im, bc
del name_images
sample_pairs_futdiff: dict[str,
list[tuple[BrowsingContext,
BrowsingContext,
Future[PIL.Image.Image]]]] = defaultdict(list)
for sizing, data_points in samples.items():
if len(data_points) < 2:
sample_pairs_futdiff[sizing] = list()
continue
for i, (bc1, im1) in enumerate(data_points[:-1]):
for bc2, im2 in data_points[i+1:]:
pe.submit(dual_rgb_to_diff, im1, im2)
sample_pairs_futdiff[sizing].append(
(bc1, bc2, pe.submit(dual_rgb_to_diff, im1, im2)))
del samples
sample_pairs_diff: dict[str,
list[tuple[BrowsingContext,
BrowsingContext,
PIL.Image.Image]]] = {
k: [(bc1, bc2, fut.result()) for bc1, bc2, fut in vs]
for k, vs in sample_pairs_futdiff.items()
}
del sample_pairs_futdiff
sample_pairs_futrmse: dict[str,
list[tuple[BrowsingContext,
BrowsingContext,
Future[float]]]] = {
k: [(bc1, bc2, pe.submit(diff_to_rmse, dif))
for bc1, bc2, dif in vs]
for k, vs in sample_pairs_diff.items()
}
sample_pairs_futbytes: dict[str,
list[tuple[BrowsingContext,
BrowsingContext,
Future[bytes]]]] = {
k: [(bc1, bc2, pe.submit(img_to_png, dif))
for bc1, bc2, dif in vs]
for k, vs in sample_pairs_diff.items()
}
del sample_pairs_diff
sample_pairs_rmse: dict[str,
list[tuple[BrowsingContext,
BrowsingContext,
float]]] = {
k: [(bc1, bc2, fut.result()) for bc1, bc2, fut in vs]
for k, vs in sample_pairs_futrmse.items()
}
sample_pairs_bytes: dict[str,
list[tuple[BrowsingContext,
BrowsingContext,
bytes]]] = {
k: [(bc1, bc2, fut.result()) for bc1, bc2, fut in vs]
for k, vs in sample_pairs_futbytes.items()
}
bzf = BytesIO()
zf = zipfile.ZipFile(bzf, mode='w', compresslevel=9,
compression=zipfile.ZIP_DEFLATED)
for fname, imcontents in flatten([
[(
(
'{}.{}.{}.{}.{}.{}.{}.{}.png'.format(
k.split('.')[0],
k.split('.')[1],
v1.hostname,
v2.hostname,
v1.platform,
v2.platform,
v1.browser,
v2.browser,
)
),
v3,
) for v1, v2, v3 in vs]
for k, vs in sample_pairs_bytes.items()
]):
zf.writestr(fname, imcontents)
del fname, imcontents
df = pandas.DataFrame([
*flatten([
[(
k.split('.')[0],
k.split('.')[1],
v1.hostname,
v2.hostname,
v1.platform,
v2.platform,
v1.browser,
v2.browser,
v3,
)
for v1, v2, v3 in vs]
for k, vs in sample_pairs_rmse.items()])],
columns=[
'resolution',
'printScope',
'hostname1',
'hostname2',
'platform1',
'platform2',
'browser1',
'browser2',
'rmse',
]).sort_values(by=[
'rmse',
'resolution',
'printScope',
'hostname1',
'hostname2',
'platform1',
'platform2',
'browser1',
'browser2',
])
recordsio = StringIO()
df.to_json(recordsio, orient='records')
records = json.loads(recordsio.getvalue())
major_difference: dict[Literal['resolution'] | Literal['platform'] | Literal['browser'] | Literal['platformBrowser'], dict[str, list[float]]] = dict(
resolution=defaultdict(list),
platform=defaultdict(list),
browser=defaultdict(list),
platformBrowser=defaultdict(list),
)
for record in records:
major_difference['resolution'][f"{record['resolution']}.{record['printScope']}"].append(
record['rmse'])
if record['platform1'] != record['platform2']:
major_difference['platform'][record['platform1']].append(
record['rmse'])
major_difference['platform'][record['platform2']].append(
record['rmse'])
if record['browser1'] != record['browser2']:
major_difference['browser'][record['browser1']].append(
record['rmse'])
major_difference['browser'][record['browser2']].append(
record['rmse'])
if record['platform1'] != record['platform2'] or record['browser1'] != record['browser2']:
major_difference['platformBrowser'][f"{record['platform1']}.{record['browser1']}"].append(
record['rmse'])
major_difference['platformBrowser'][f"{record['platform2']}.{record['browser2']}"].append(
record['rmse'])
major_difference_avg = {
k: {k2: avg(v) for k2, v in d2.items()} for k, d2 in major_difference.items()}
report = dict(
indicators=major_difference_avg,
records=records,
)
zf.writestr('analysis.json', json.dumps(
report, indent=4).encode(encoding='utf-8'))
zf.close()
b = bzf.getvalue()
m = hashlib.sha256()
m.update(b)
h = m.hexdigest()
print(
f'[DEBUG] About to upload {len(b)/(2**20):.2f} MB for job {jobId}')
requests.post(
f'{BASEAPI}/analysis?key={APIKEY}&worker={HOSTNAME}&jobId={jobId}&completeness={completeness}&sha256={h}',
headers={'content-type': 'application/zip',
'content-length': str(len(b))},
data=b).raise_for_status()
print(f'[INFO] Uploaded analysis for job {jobId} successfully')
def initialize_and_run_job(
jobId: int,
completeness: int,
workers: dict[str, str | None],
):
run_job(jobId, completeness, workers)
def subprocess_run_job(
jobId: int,
completeness: int,
workers: dict[str, str | None],
):
return subprocess.run([
sys.executable, sys.argv[0],
str(jobId),
str(completeness),
json.dumps(workers),
],
text=True, check=True,
stdout=sys.stdout,
stderr=sys.stderr,
stdin=sys.stdin,
)
def gather_next_job():
try:
resp = requests.get(
f'{BASEAPI}/analysis/next?key={APIKEY}&worker={HOSTNAME}')
except requests.exceptions.ConnectionError:
time.sleep(10)
return
if resp.status_code == 404:
print('[INFO] No new analysis')
time.sleep(60)
elif resp.status_code == 200:
job = resp.json()
print(f'[INFO] Running analysis {job["jobId"]}')
subprocess_run_job(
job['jobId'],
job['completeness'],
job['workers'],
)
time.sleep(2)
else:
try:
resp.raise_for_status()
raise ValueError(f'Unknown status code: {resp.status_code}')
except Exception:
print(traceback.format_exc())
time.sleep(30)
def self_update():
global self_update, gather_next_job
pss = Path(__file__)
try:
ncnt = get_git_asset(pss.name)
if ncnt != pss.read_bytes():
if not Path('.git').exists():
pss.write_bytes(ncnt)
except Exception:
print('[WARN] Could not update')
raise
importlib.invalidate_caches()
selfmodule = importlib.import_module(pss.stem)
importlib.reload(selfmodule)
gather_next_job = selfmodule.gather_next_job
self_update = selfmodule.self_update
def main():
if len(sys.argv) == 1:
while True:
try:
self_update()
gather_next_job()
except Exception:
print(traceback.format_exc())
time.sleep(2)
elif len(sys.argv) == 4:
[jobId_str,
completeness_str,
workers_str,] = sys.argv[1:]
jobId = int(jobId_str)
completeness = int(completeness_str)
workers: dict[str, str | None] = json.loads( # type: ignore
workers_str)
initialize_and_run_job(
jobId,
completeness,
workers,
)
else:
raise ValueError(f'Wrong number of arguments: {len(sys.argv)}')
if __name__ == '__main__':
main()