site_css_reliability/client-snpshtr.py

349 lines
10 KiB
Python
Raw Permalink Normal View History

2023-09-03 23:26:56 +00:00
#!/usr/bin/env python3
# -*- encoding: utf-8 -*-
import hashlib
import importlib
from io import BytesIO
import json
2023-09-08 17:09:33 +00:00
import os
2023-09-03 23:26:56 +00:00
from pathlib import Path
import socket
2023-09-10 15:12:01 +00:00
import subprocess
2023-09-03 23:26:56 +00:00
import sys
import traceback
from typing import Union
2023-09-08 17:09:33 +00:00
import PIL.Image
2023-09-03 23:26:56 +00:00
import requests
from selenium import webdriver
import time
import zipfile
from selenium.common.exceptions import WebDriverException
2023-09-03 23:26:56 +00:00
2023-09-08 17:09:33 +00:00
TEST_W, TEST_H = 800, 600
2023-09-03 23:26:56 +00:00
WDTP = Union[webdriver.Firefox, webdriver.Chrome,
webdriver.Edge, webdriver.Safari]
CLS_OPTIONS = dict(
Firefox=webdriver.FirefoxOptions,
Chrome=webdriver.ChromeOptions,
Edge=webdriver.EdgeOptions,
Safari=getattr(webdriver, 'SafariOptions', webdriver.ChromeOptions),
2023-09-03 23:26:56 +00:00
)
CLS_WEBDRIVER = dict(
Firefox=webdriver.Firefox,
Chrome=webdriver.Chrome,
Edge=webdriver.Edge,
Safari=getattr(webdriver, 'Safari', webdriver.Chrome),
2023-09-03 23:26:56 +00:00
)
BASEAPI = Path('baseapi.txt').read_text(encoding='utf-8').strip()
APIKEY = Path('apikey.txt').read_text(encoding='utf-8').strip()
UPDURL = Path('updurl.txt').read_text(encoding='utf-8').strip()
2023-09-10 15:12:01 +00:00
HOSTNAME = socket.gethostname()
PLATFORM = sys.platform
# PLATFORM = 'docker'
2023-10-15 17:26:01 +00:00
if Path('hostname_override.txt').is_file():
HOSTNAME = Path('hostname_override.txt').read_text(
encoding='utf-8').strip()
2023-09-10 15:12:01 +00:00
def get_git_asset_url(fl: str) -> str:
return UPDURL.rsplit('/', 1)[0]+'/'+fl
def get_content_checking(fl: str) -> bytes:
resp = requests.get(fl)
resp.raise_for_status()
return resp.content
def get_git_asset(fl: str) -> bytes:
return get_content_checking(get_git_asset_url(fl))
2023-09-03 23:26:56 +00:00
2023-09-08 15:20:13 +00:00
def run_hide_scrollbar(browser: WDTP):
return browser.execute_script(
'''
(function(){
const newStyle = document.createElement('style');
newStyle.innerHTML = '::-webkit-scrollbar {display: none !important;} ' +
'* {scrollbar-width: none !important; scrollbar-color: transparent !important;}';
document.head.appendChild(newStyle);
})();
'''.strip()
)
2023-09-03 23:26:56 +00:00
def run_job(browsers: list[WDTP],
resolutions_spec: list[tuple[str, tuple[int, int]]],
jobId: int,
2023-09-08 15:20:13 +00:00
hideScrollbar: bool,
2023-09-03 23:26:56 +00:00
wait: float,
2023-09-04 22:19:07 +00:00
scrolltoJs: str,
2023-09-03 23:26:56 +00:00
scrolltox: int,
scrolltoy: int,
preRunJs: str,
waitJs: float,
2023-09-04 22:19:07 +00:00
checkReadyJs: str,
2023-09-03 23:26:56 +00:00
url: str):
bio = BytesIO()
zf = zipfile.ZipFile(
bio, mode='w', compression=zipfile.ZIP_DEFLATED, compresslevel=9)
for browser in browsers:
2023-09-08 17:09:33 +00:00
browser.get('about:blank')
browser.set_window_size(TEST_W, TEST_H)
actual_w, actual_h = PIL.Image.open(
BytesIO(browser.get_screenshot_as_png())).size
compensation_w, compensation_h = TEST_W-actual_w, TEST_H-actual_h
2023-09-03 23:26:56 +00:00
browser.get(url)
2023-09-08 15:20:13 +00:00
if hideScrollbar:
run_hide_scrollbar(browser)
2023-09-03 23:26:56 +00:00
browser.execute_script(preRunJs)
time.sleep(waitJs)
2023-09-04 22:19:07 +00:00
if checkReadyJs:
while (waitReady := browser.execute_script(checkReadyJs)) > 0:
time.sleep(waitReady)
2023-09-03 23:26:56 +00:00
for resolution_name, (resw, resh) in resolutions_spec:
2023-09-08 17:09:33 +00:00
browser.set_window_size(resw+compensation_w, resh+compensation_h)
2023-09-04 22:19:07 +00:00
if scrolltoJs:
browser.execute_script(scrolltoJs)
else:
browser.execute_script(
f'window.scrollTo({scrolltox}, {scrolltoy})')
2023-09-08 15:20:13 +00:00
if hideScrollbar:
run_hide_scrollbar(browser)
2023-09-03 23:26:56 +00:00
time.sleep(wait)
if hasattr(browser, 'get_full_page_screenshot_as_png'):
try:
scrsht = browser.get_full_page_screenshot_as_png()
im = PIL.Image.open(BytesIO(scrsht))
if im.size[0] == resw:
zf.writestr(
f'{PLATFORM}.{HOSTNAME}.{browser.name}.{resolution_name}.full.png',
scrsht
)
except WebDriverException as e:
print(f'[WARN] Ignoring full screenshot: {e}')
scrsht = browser.get_screenshot_as_png()
2023-09-09 04:38:04 +00:00
im = PIL.Image.open(BytesIO(scrsht))
if im.size == (resw, resh):
2023-09-03 23:26:56 +00:00
zf.writestr(
2023-09-10 15:12:01 +00:00
f'{PLATFORM}.{HOSTNAME}.{browser.name}.{resolution_name}.partial.png',
scrsht
2023-09-03 23:26:56 +00:00
)
del im
del scrsht
2023-09-03 23:26:56 +00:00
browser.get('about:blank')
zf.close()
b = bio.getvalue()
m = hashlib.sha256()
m.update(b)
h = m.hexdigest()
2023-10-15 16:16:47 +00:00
resp = requests.post(
2023-09-10 15:12:01 +00:00
f'{BASEAPI}/job?key={APIKEY}&worker={HOSTNAME}&jobId={jobId}&sha256={h}',
2023-09-03 23:26:56 +00:00
headers={'content-type': 'application/zip',
'content-length': str(len(b))},
2023-10-15 16:16:47 +00:00
data=b)
if resp.status_code != 200:
print(
f'[FATAL] Could not upload, got {resp.status_code}:\n{resp.text}')
resp.raise_for_status()
2023-09-10 15:12:01 +00:00
# Path(f'{PLATFORM}.{jobId:012d}.zip').write_bytes(bio.getvalue())
2023-09-04 01:06:38 +00:00
print(f'[INFO] Uploaded results for job {jobId} successfully')
2023-09-03 23:26:56 +00:00
del zf
del bio
del b
2023-09-10 15:12:01 +00:00
def initialize_and_run_job(
jobId: int,
hideScrollbar: bool,
wait: float,
scrolltoJs: str,
scrolltox: int,
scrolltoy: int,
preRunJs: str,
waitJs: float,
checkReadyJs: str,
url: str,
):
browsers_spec = json.loads(Path(f'browsers.{sys.platform}.json').read_text(
encoding='utf-8'))['browsers']
resolutions_spec = [
(str(n), (int(v.split('x')[0]), int(v.split('x')[1])))
for n, v in json.loads(Path('resolutions.json').read_text(
encoding='utf-8'))['resolutions']]
browsers: list[WDTP] = list()
try:
for browser_spec in browsers_spec:
opt = CLS_OPTIONS[browser_spec['type']]()
if os.environ.get('SKIP_ARGS', '') == '':
for arg in browser_spec['arguments']:
opt.add_argument(arg)
else:
opt.headless = False # type: ignore
try:
browser = CLS_WEBDRIVER[browser_spec['type']](opt)
browsers.append(browser)
except Exception:
print(
f'[ERROR] Could not initialize browser {browser_spec["type"]}')
print(traceback.format_exc())
for browser in browsers:
browser.get('about:blank')
run_job(
browsers,
resolutions_spec,
jobId,
hideScrollbar,
wait,
scrolltoJs,
scrolltox,
scrolltoy,
preRunJs,
waitJs,
checkReadyJs,
url,
)
finally:
for browser in browsers:
browser.close()
def self_update():
global self_update, gather_next_job
pss = Path(__file__)
try:
ncnt = get_git_asset(pss.name)
if ncnt != pss.read_bytes():
if not Path('.git').exists():
pss.write_bytes(ncnt)
except Exception:
print('[WARN] Could not update')
raise
importlib.invalidate_caches()
selfmodule = importlib.import_module(pss.stem)
importlib.reload(selfmodule)
gather_next_job = selfmodule.gather_next_job
self_update = selfmodule.self_update
def subprocess_run_job(
jobId: int,
hideScrollbar: bool,
wait: float,
scrolltoJs: str,
scrolltox: int,
scrolltoy: int,
preRunJs: str,
waitJs: float,
checkReadyJs: str,
url: str
):
return subprocess.run([
sys.executable, sys.argv[0],
str(jobId),
str(int(hideScrollbar)),
str(wait),
str(scrolltoJs),
str(scrolltox),
str(scrolltoy),
str(preRunJs),
str(waitJs),
str(checkReadyJs),
str(url),
],
text=True, check=True,
stdout=sys.stdout,
stderr=sys.stderr,
stdin=sys.stdin,
)
def gather_next_job():
2023-09-03 23:26:56 +00:00
try:
resp = requests.get(
2023-09-10 15:12:01 +00:00
f'{BASEAPI}/job/next?key={APIKEY}&worker={HOSTNAME}')
2023-09-03 23:26:56 +00:00
except requests.exceptions.ConnectionError:
2023-09-04 00:42:26 +00:00
time.sleep(10)
2023-09-03 23:26:56 +00:00
return
if resp.status_code == 404:
2023-09-04 00:37:23 +00:00
print('[INFO] No new job')
2023-09-04 00:42:26 +00:00
time.sleep(60)
2023-09-03 23:26:56 +00:00
elif resp.status_code == 200:
job = resp.json()
2023-09-04 00:37:23 +00:00
print(f'[INFO] Running job {job["jobId"]}')
2023-09-10 15:12:01 +00:00
subprocess_run_job(
2023-09-03 23:26:56 +00:00
job['jobId'],
2023-09-08 15:20:13 +00:00
bool(job['hideScrollbar']),
2023-09-03 23:26:56 +00:00
job['wait'],
2023-09-04 22:19:07 +00:00
job['scrolltoJs'],
2023-09-03 23:26:56 +00:00
job['scrolltox'],
job['scrolltoy'],
job['preRunJs'],
job['waitJs'],
2023-09-04 22:19:07 +00:00
job['checkReadyJs'],
2023-09-03 23:26:56 +00:00
job['url'],
)
else:
try:
resp.raise_for_status()
raise ValueError(f'Unknown status code: {resp.status_code}')
except Exception:
print(traceback.format_exc())
2023-09-04 00:42:26 +00:00
time.sleep(30)
2023-09-03 23:26:56 +00:00
def main():
2023-09-10 15:12:01 +00:00
if len(sys.argv) == 1:
while True:
2023-09-03 23:26:56 +00:00
try:
self_update()
2023-09-10 15:12:01 +00:00
gather_next_job()
2023-09-03 23:26:56 +00:00
except Exception:
2023-09-04 00:44:15 +00:00
print(traceback.format_exc())
2023-09-10 15:12:01 +00:00
time.sleep(2)
elif len(sys.argv) == 11:
[jobId_str,
hideScrollbar_str,
wait_str,
scrolltoJs_str,
scrolltox_str,
scrolltoy_str,
preRunJs_str,
waitJs_str,
checkReadyJs_str,
url_str,] = sys.argv[1:]
jobId = int(jobId_str)
hideScrollbar = bool(int(hideScrollbar_str))
wait = float(wait_str)
scrolltoJs = str(scrolltoJs_str)
scrolltox = int(float(scrolltox_str))
scrolltoy = int(float(scrolltoy_str))
preRunJs = str(preRunJs_str)
waitJs = float(waitJs_str)
checkReadyJs = str(checkReadyJs_str)
url = str(url_str)
initialize_and_run_job(
jobId,
hideScrollbar,
wait,
scrolltoJs,
scrolltox,
scrolltoy,
preRunJs,
waitJs,
checkReadyJs,
url,
)
else:
raise ValueError(f'Wrong number of arguments: {len(sys.argv)}')
2023-09-03 23:26:56 +00:00
if __name__ == '__main__':
main()