#!/usr/bin/env python3 import argparse import logging import requests import json import sys import time import urllib LOGGER = logging.getLogger(__name__) def xcode_valid(resp) -> bool: ''' Determine if a transcoder state is valid. ''' if resp.status_code != 200: LOGGER.debug('bad status %s', resp.status_code) return False ctype = resp.headers['content-type'] if ctype != 'application/json': LOGGER.debug('bad content-type %s', ctype) return False body = resp.json() if body['parsed_as'] == 'pending': LOGGER.debug('item is still pending') return False return True def main(): ''' CLI entrypoint ''' parser = argparse.ArgumentParser( description='Encode and send execution ARI to AMP Agents.', epilog='At least one --agent must be given to actually send commands.', ) parser.add_argument('--log-level', choices=('debug', 'info', 'warning', 'error'), default='info', help='The minimum log severity.') parser.add_argument('exec', nargs='+', metavar='ARI', help='Text form ARIs to send as commands') parser.add_argument('--mgrapi', help='Manager REST API base URI with or without a trailing slash.') parser.add_argument('--agent', default=[], action='append', metavar='EID', help='Agent EID to send commands. This can be given multiple times.') parser.add_argument('--wait-time', default=5, type=float, help='The duration of time to wait between polling requests. Used for transcoder and reports.') parser.add_argument('--wait-report', default=0, type=int, help='The number of reports to wait for from each Agent, which can be zero to not wait.') args = parser.parse_args() logging.basicConfig(level=args.log_level.upper(), stream=sys.stderr) LOGGER.debug('args %s', args) def resolved(rel): return urllib.parse.urljoin(args.mgrapi, rel) sess = requests.Session() resp = sess.get(resolved('nm/version')) if resp.status_code != 200: LOGGER.error('Failed to access ANMS API at %s', args.mgrapi) raise RuntimeError('Failed to access ANMS API') else: LOGGER.debug('Manager reachable at %s', args.mgrapi) failures = 0 # clear API reports before any operations for agent_eid in args.agent: eid_uri = urllib.parse.quote(agent_eid, safe='') resp = sess.put( resolved(f'nm/agents/eid/{eid_uri}/clear_reports'), ) if resp.status_code != 200: LOGGER.error('Failed to clear_reports for %s', agent_eid) failures += 1 resp = sess.put( resolved(f'nm/agents/eid/{eid_uri}/clear_tables'), ) if resp.status_code != 200: LOGGER.error('Failed to clear_tables for %s', agent_eid) failures += 1 if failures: return failures # Convert ARI forms and IDs to_send = [] for ari_text in args.exec: LOGGER.info('Converting input %s', ari_text) resp = sess.put( resolved('transcoder/ui/incoming/str'), params={ 'ari': ari_text, } ) if resp.status_code != 200: LOGGER.error('Failed to access manager API at %s', args.mgrapi) failures += 1 continue xcode_id = resp.json()['id'] LOGGER.info('Waiting on transcoder ID %s', xcode_id) xcode_uri = resolved(f'transcoder/db/id/{xcode_id}') while True: resp = sess.get( xcode_uri, headers={ 'accept': 'application/json', }, ) if xcode_valid(resp): break LOGGER.info('Waiting for another %s seconds', args.wait_time) time.sleep(args.wait_time) body = resp.json() ari_cborhex = body['cbor'] if not ari_cborhex: LOGGER.error('Failed to process input %s with error: %s', ari_text, resp.text) failures += 1 continue LOGGER.info('Got binary form: %s', ari_cborhex) to_send.append(ari_cborhex) if failures: return failures # Send the encoded ARIs for ari_cborhex in to_send: for agent_eid in args.agent: eid_uri = urllib.parse.quote(agent_eid, safe='') resp = sess.put( resolved(f'nm/agents/eid/{eid_uri}/hex'), headers={ 'accept': 'application/json', }, json={ 'data': ari_cborhex, }, ) if resp.status_code != 200: LOGGER.error('Failed to send to %s with error: %s', agent_eid, resp.text) failures += 1 continue LOGGER.info('Sent to agent %s', agent_eid) if failures: return failures # Wait for results for agent_eid in args.agent: eid_uri = urllib.parse.quote(agent_eid, safe='') LOGGER.info('Waiting for %d reports from %s', args.wait_report, agent_eid) reports = [] while len(reports) < args.wait_report: resp = sess.get( resolved(f'nm/agents/eid/{eid_uri}/reports/json'), headers={ 'accept': 'application/json', }, ) if resp.status_code != 200: LOGGER.error('Failed to get reports for %s', agent_eid) else: body = resp.json() # require non-empty list to overwrite if 'reports' in body and body['reports']: reports = body['reports'] LOGGER.info('Waiting for another %s seconds', args.wait_time) time.sleep(args.wait_time) LOGGER.info('got %s', reports) return failures if __name__ == '__main__': sys.exit(main())