-
Notifications
You must be signed in to change notification settings - Fork 7
Expand file tree
/
Copy pathdata_validation.py
More file actions
71 lines (55 loc) · 2.23 KB
/
data_validation.py
File metadata and controls
71 lines (55 loc) · 2.23 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
import time
from prefect import flow, get_run_logger, task
from prefect.blocks.system import Secret
from bluesky_tiled_plugins.writing.validator import validate
from tiled.client import from_uri
@task(retries=2, retry_delay_seconds=10)
def get_run(uid, api_key=None):
tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = tiled_client["cms/raw"][uid]
return run
@task(retries=2, retry_delay_seconds=10)
def get_run_migration(uid, api_key=None):
tiled_client = from_uri("https://tiled.nsls2.bnl.gov", api_key=api_key)
run = tiled_client["cms/migration"][uid]
return run
@task
def read_stream(run, stream):
stream_data = run[stream].read()
return stream_data
@task(retries=2, retry_delay_seconds=10)
def read_all_streams(uid, api_key=None):
logger = get_run_logger()
run = get_run(uid, api_key=api_key)
logger.info(f"Validating uid {run.start['uid']}")
start_time = time.monotonic()
for stream in run:
logger.info(f"{stream}:")
stream_start_time = time.monotonic()
stream_data = read_stream(run, stream)
stream_elapsed_time = time.monotonic() - stream_start_time
logger.info(f"{stream} elapsed_time = {stream_elapsed_time}")
logger.info(f"{stream} nbytes = {stream_data.nbytes:_}")
elapsed_time = time.monotonic() - start_time
logger.info(f"{elapsed_time = }")
@task(retries=3, retry_delay_seconds=20)
def data_validation_task(uid, api_key=None):
"""Task to validate the data structure and accessibility in Tiled
Parameters
----------
uid : str
The UID of the run to validate
beamline_acronym : str, optional
The acronym of the beamline (default is "cms")
"""
logger = get_run_logger()
logger.info("Connecting to Tiled client for beamline cms")
run = get_run_migration(uid, api_key=api_key)
logger.info(f"Validating uid {uid}")
start_time = time.monotonic()
validate(run, fix_errors=True, try_reading=True, raise_on_error=True)
elapsed_time = time.monotonic() - start_time
logger.info(f"Finished validating data; {elapsed_time = }")
@flow(log_prints=True)
def data_validation_flow(uid, api_key=None):
data_validation_task(uid, api_key=api_key)