import json import logging import os import time from redis.exceptions import RedisError from kombu.exceptions import KombuError from celery import Task, states from celery.result import AsyncResult from collections import defaultdict from lxml import etree import jsonschema from inventory_provider.tasks.app import app from inventory_provider.tasks.common \ import get_next_redis, get_current_redis, \ latch_db, get_latch, set_latch, update_latch_status from inventory_provider.tasks import data, monitor from inventory_provider import config from inventory_provider import environment from inventory_provider.db import db, opsdb from inventory_provider import snmp from inventory_provider import juniper FINALIZER_POLLING_FREQUENCY_S = 2.5 FINALIZER_TIMEOUT_S = 300 # TODO: error callback (cf. http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks) # noqa: E501 environment.setup_logging() logger = logging.getLogger(__name__) def log_task_entry_and_exit(f): # cf. https://stackoverflow.com/a/47663642 def _w(*args, **kwargs): logger.debug(f'>>> {f.__name__}{args}') try: return f(*args, **kwargs) finally: logger.debug(f'<<< {f.__name__}{args}') return _w class InventoryTaskError(Exception): pass class InventoryTask(Task): config = None def __init__(self): self.args = [] if InventoryTask.config: return assert os.path.isfile( os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']), ( 'config file %r not found' % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) with open(os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) as f: logging.info( "Initializing worker with config from: %r" % os.environ['INVENTORY_PROVIDER_CONFIG_FILENAME']) InventoryTask.config = config.load(f) logging.debug("loaded config: %r" % InventoryTask.config) def log_info(self, message): logger.debug(message) self.send_event('task-info', message=message) def log_warning(self, message): logger.warning(message) self.send_event('task-warning', message=message) def log_error(self, message): logger.error(message) self.send_event('task-error', message=message) @app.task(base=InventoryTask, bind=True, name='snmp_refresh_peerings') @log_task_entry_and_exit def snmp_refresh_peerings(self, hostname, community, logical_systems): try: peerings = list( snmp.get_peer_state_info(hostname, community, logical_systems)) except ConnectionError: msg = f'error loading snmp peering data from {hostname}' logger.exception(msg) self.log_warning(msg) r = get_current_redis(InventoryTask.config) peerings = r.get(f'snmp-peerings:hosts:{hostname}') if peerings is None: raise InventoryTaskError( f'snmp error with {peerings}' f' and no cached peering data found') # unnecessary json encode/decode here ... could be optimized peerings = json.loads(peerings.decode('utf-8')) self.log_warning(f'using cached snmp peering data for {hostname}') r = get_next_redis(InventoryTask.config) r.set(f'snmp-peerings:hosts:{hostname}', json.dumps(peerings)) self.log_info(f'snmp peering info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='snmp_refresh_interfaces') @log_task_entry_and_exit def snmp_refresh_interfaces(self, hostname, community, logical_systems): try: interfaces = list( snmp.get_router_snmp_indexes(hostname, community, logical_systems)) except ConnectionError: msg = f'error loading snmp interface data from {hostname}' logger.exception(msg) self.log_warning(msg) r = get_current_redis(InventoryTask.config) interfaces = r.get(f'snmp-interfaces:{hostname}') if not interfaces: raise InventoryTaskError( f'snmp error with {hostname}' f' and no cached snmp interface data found') # unnecessary json encode/decode here ... could be optimized interfaces = json.loads(interfaces.decode('utf-8')) self.log_warning(f'using cached snmp interface data for {hostname}') r = get_next_redis(InventoryTask.config) rp = r.pipeline() rp.set(f'snmp-interfaces:{hostname}', json.dumps(interfaces)) # optimization for DBOARD3-372 # interfaces is a list of dicts like: {'name': str, 'index': int} for ifc in interfaces: ifc['hostname'] = hostname rp.set( f'snmp-interfaces-single:{hostname}:{ifc["name"]}', json.dumps(ifc)) rp.execute() self.log_info(f'snmp interface info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='netconf_refresh_config') @log_task_entry_and_exit def netconf_refresh_config(self, hostname): try: netconf_doc = juniper.load_config( hostname, InventoryTask.config["ssh"]) netconf_str = etree.tostring(netconf_doc, encoding='unicode') except (ConnectionError, juniper.NetconfHandlingError): msg = f'error loading netconf data from {hostname}' logger.exception(msg) self.log_warning(msg) r = get_current_redis(InventoryTask.config) netconf_str = r.get(f'netconf:{hostname}') if not netconf_str: raise InventoryTaskError( f'netconf error with {hostname}' f' and no cached netconf data found') self.log_warning(f'using cached netconf data for {hostname}') r = get_next_redis(InventoryTask.config) r.set(f'netconf:{hostname}', netconf_str) self.log_info(f'netconf info loaded from {hostname}') @app.task(base=InventoryTask, bind=True, name='update_interfaces_to_services') @log_task_entry_and_exit def update_interfaces_to_services(self): interface_services = defaultdict(list) with db.connection(InventoryTask.config["ops-db"]) as cx: for service in opsdb.get_circuits(cx): equipment_interface \ = f'{service["equipment"]}:{service["interface_name"]}' interface_services[equipment_interface].append(service) r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for key in r.scan_iter('opsdb:interface_services:*', count=1000): rp.delete(key) rp.execute() rp = r.pipeline() for equipment_interface, services in interface_services.items(): rp.set( f'opsdb:interface_services:{equipment_interface}', json.dumps(services)) rp.execute() def _unmanaged_interfaces(): def _convert(d): # the config file keys are more readable than # the keys used in redis return { 'name': d['address'], 'interface address': d['network'], 'interface name': d['interface'].lower(), 'router': d['router'].lower() } yield from map( _convert, InventoryTask.config.get('unmanaged-interfaces', [])) @app.task(base=InventoryTask, bind=True, name='update_access_services') @log_task_entry_and_exit def update_access_services(self): access_services = {} with db.connection(InventoryTask.config["ops-db"]) as cx: for service in opsdb.get_access_services(cx): if service['name'] in access_services: logger.warning( 'got multiple access services ' f'with name "{service["name"]}"') access_services[service['name']] = service r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for key in r.scan_iter('opsdb:access_services:*', count=1000): rp.delete(key) rp.execute() rp = r.pipeline() for name, service in access_services.items(): rp.set( f'opsdb:access_services:{name}', json.dumps(service)) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_lg_routers') @log_task_entry_and_exit def update_lg_routers(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for k in r.scan_iter('opsdb:lg:*', count=1000): rp.delete(k) rp.execute() with db.connection(InventoryTask.config["ops-db"]) as cx: rp = r.pipeline() for router in opsdb.lookup_lg_routers(cx): rp.set(f'opsdb:lg:{router["equipment name"]}', json.dumps(router)) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_equipment_locations') @log_task_entry_and_exit def update_equipment_locations(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for k in r.scan_iter('opsdb:location:*', count=1000): rp.delete(k) rp.execute() with db.connection(InventoryTask.config["ops-db"]) as cx: rp = r.pipeline() for h in data.derive_router_hostnames(InventoryTask.config): # lookup_pop_info returns a list of locations # (there can sometimes be more than one match) locations = list(opsdb.lookup_pop_info(cx, h)) rp.set('opsdb:location:%s' % h, json.dumps(locations)) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_circuit_hierarchy') @log_task_entry_and_exit def update_circuit_hierarchy(self): # TODO: integers are not JSON keys with db.connection(InventoryTask.config["ops-db"]) as cx: child_to_parents = defaultdict(list) parent_to_children = defaultdict(list) for relation in opsdb.get_circuit_hierarchy(cx): parent_id = relation["parent_circuit_id"] child_id = relation["child_circuit_id"] parent_to_children[parent_id].append(relation) child_to_parents[child_id].append(relation) r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for key in r.scan_iter('opsdb:services:parents:*', count=1000): rp.delete(key) for key in r.scan_iter('opsdb:services:children:*', count=1000): rp.delete(key) rp.execute() rp = r.pipeline() for cid, parents in parent_to_children.items(): rp.set('opsdb:services:parents:%d' % cid, json.dumps(parents)) for cid, children in child_to_parents.items(): rp.set('opsdb:services:children:%d' % cid, json.dumps(children)) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_geant_lambdas') @log_task_entry_and_exit def update_geant_lambdas(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for key in r.scan_iter('opsdb:geant_lambdas:*', count=1000): rp.delete(key) rp.execute() with db.connection(InventoryTask.config["ops-db"]) as cx: rp = r.pipeline() for ld in opsdb.get_geant_lambdas(cx): rp.set( 'opsdb:geant_lambdas:%s' % ld['name'].lower(), json.dumps(ld)) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_fibre_spans') @log_task_entry_and_exit def update_fibre_spans(self): r = get_next_redis(InventoryTask.config) rp = r.pipeline() # scan with bigger batches, to mitigate network latency effects for key in r.scan_iter('opsdb:ne_fibre_spans:*', count=1000): rp.delete(key) rp.execute() with db.connection(InventoryTask.config["ops-db"]) as cx: rp = r.pipeline() for ne, fs in opsdb.get_fibre_spans(cx): rp.set( f'opsdb:ne_fibre_spans:{ne}', json.dumps(fs)) rp.execute() @app.task(base=InventoryTask, bind=True, name='update_neteng_managed_device_list') @log_task_entry_and_exit def update_neteng_managed_device_list(self): self.log_info('querying netdash for managed routers') routers = list(juniper.load_routers_from_netdash( InventoryTask.config['managed-routers'])) self.log_info(f'found {len(routers)} routers, saving details') r = get_next_redis(InventoryTask.config) r.set('netdash', json.dumps(routers).encode('utf-8')) self.log_info(f'saved {len(routers)} managed routers') def load_netconf_data(hostname): """ this method should only be called from a task :param hostname: :return: """ r = get_next_redis(InventoryTask.config) netconf = r.get('netconf:' + hostname) if not netconf: raise InventoryTaskError('no netconf data found for %r' % hostname) return etree.fromstring(netconf.decode('utf-8')) def clear_cached_classifier_responses(hostname=None): if hostname: logger.debug( 'removing cached classifier responses for %r' % hostname) else: logger.debug('removing all cached classifier responses') r = get_next_redis(InventoryTask.config) def _hostname_keys(): for k in r.keys('classifier-cache:juniper:%s:*' % hostname): yield k # TODO: very inefficient ... but logically simplest at this point for k in r.keys('classifier-cache:peer:*'): value = r.get(k.decode('utf-8')) if not value: # deleted in another thread continue value = json.loads(value.decode('utf-8')) interfaces = value.get('interfaces', []) if hostname in [i['interface']['router'] for i in interfaces]: yield k def _all_keys(): return r.keys('classifier-cache:*') keys_to_delete = _hostname_keys() if hostname else _all_keys() rp = r.pipeline() for k in keys_to_delete: rp.delete(k) rp.execute() @log_task_entry_and_exit def refresh_juniper_bgp_peers(hostname, netconf): host_peerings = list(juniper.all_bgp_peers(netconf)) r = get_next_redis(InventoryTask.config) r.set(f'juniper-peerings:hosts:{hostname}', json.dumps(host_peerings)) @log_task_entry_and_exit def refresh_juniper_interface_list(hostname, netconf): logger.debug( 'removing cached netconf-interfaces for %r' % hostname) r = get_next_redis(InventoryTask.config) rp = r.pipeline() rp.delete(f'netconf-interfaces-hosts:{hostname}') # scan with bigger batches, to mitigate network latency effects for k in r.scan_iter(f'netconf-interfaces:{hostname}:*', count=1000): rp.delete(k) for k in r.scan_iter( f'netconf-interface-bundles:{hostname}:*', count=1000): rp.delete(k) rp.execute() all_bundles = defaultdict(list) rp = r.pipeline() rp.set( f'netconf-interfaces-hosts:{hostname}', json.dumps(list(juniper.interface_addresses(netconf)))) for ifc in juniper.list_interfaces(netconf): bundles = ifc.get('bundle', None) for bundle in bundles: if bundle: all_bundles[bundle].append(ifc['name']) rp.set( f'netconf-interfaces:{hostname}:{ifc["name"]}', json.dumps(ifc)) for k, v in all_bundles.items(): rp.set( f'netconf-interface-bundles:{hostname}:{k}', json.dumps(v)) rp.execute() @app.task(base=InventoryTask, bind=True, name='reload_router_config') @log_task_entry_and_exit def reload_router_config(self, hostname): self.log_info(f'loading netconf data for {hostname}') # get the timestamp for the current netconf data current_netconf_timestamp = None try: netconf_doc = load_netconf_data(hostname) current_netconf_timestamp \ = juniper.netconf_changed_timestamp(netconf_doc) logger.debug( 'current netconf timestamp: %r' % current_netconf_timestamp) except InventoryTaskError: pass # ok at this point if not found # load new netconf data, in this thread netconf_refresh_config.apply(args=[hostname]) netconf_doc = load_netconf_data(hostname) # return if new timestamp is the same as the original timestamp new_netconf_timestamp = juniper.netconf_changed_timestamp(netconf_doc) assert new_netconf_timestamp, \ 'no timestamp available for new netconf data' if new_netconf_timestamp == current_netconf_timestamp: msg = f'no timestamp change for {hostname} netconf data' logger.debug(msg) self.log_info(msg) return # clear cached classifier responses for this router, and # refresh peering data self.log_info(f'refreshing peers & clearing cache for {hostname}') refresh_juniper_bgp_peers(hostname, netconf_doc) refresh_juniper_interface_list(hostname, netconf_doc) # clear_cached_classifier_responses(hostname) # load snmp indexes community = juniper.snmp_community_string(netconf_doc) if not community: raise InventoryTaskError( f'error extracting community string for {hostname}') else: self.log_info(f'refreshing snmp interface indexes for {hostname}') logical_systems = juniper.logical_systems(netconf_doc) # load snmp data, in this thread snmp_refresh_interfaces.apply( args=[hostname, community, logical_systems]) snmp_refresh_peerings.apply( args=[hostname, community, logical_systems]) clear_cached_classifier_responses(None) self.log_info(f'updated configuration for {hostname}') def _erase_next_db(config): """ flush next db, but first save latch and then restore afterwards TODO: handle the no latch scenario nicely :param config: :return: """ r = get_next_redis(config) saved_latch = get_latch(r) r.flushdb() if saved_latch: set_latch( config, new_current=saved_latch['current'], new_next=saved_latch['next']) @app.task(base=InventoryTask, bind=True, name='internal_refresh_phase_2') @log_task_entry_and_exit def internal_refresh_phase_2(self): # second batch of subtasks: # alarms db status cache # juniper netconf & snmp data try: subtasks = [ update_equipment_locations.apply_async(), update_lg_routers.apply_async(), update_access_services.apply_async() ] for hostname in data.derive_router_hostnames(InventoryTask.config): logger.debug('queueing router refresh jobs for %r' % hostname) subtasks.append(reload_router_config.apply_async(args=[hostname])) pending_task_ids = [x.id for x in subtasks] refresh_finalizer.apply_async(args=[json.dumps(pending_task_ids)]) except KombuError: # TODO: possible race condition here # e.g. if one of these tasks takes a long time and another # update is started, we could end up with strange data update_latch_status(config, pending=False, failure=True) logger.exception('error launching refresh phase 2 subtasks') raise @log_task_entry_and_exit def launch_refresh_cache_all(config): """ utility function intended to be called outside of the worker process :param config: config structure as defined in config.py :return: """ try: _erase_next_db(config) update_latch_status(config, pending=True) monitor.clear_joblog(get_current_redis(config)) # first batch of subtasks: refresh cached opsdb data subtasks = [ update_neteng_managed_device_list.apply_async(), update_interfaces_to_services.apply_async(), update_geant_lambdas.apply_async(), update_fibre_spans.apply_async(), update_circuit_hierarchy.apply_async() ] [x.get() for x in subtasks] # now launch the task whose only purpose is to # act as a convenient parent for all of the remaining tasks t = internal_refresh_phase_2.apply_async() return t.id except (RedisError, KombuError): update_latch_status(config, pending=False, failure=True) logger.exception('error launching refresh subtasks') raise def _wait_for_tasks(task_ids, update_callback=lambda s: None): all_successful = True start_time = time.time() while task_ids and time.time() - start_time < FINALIZER_TIMEOUT_S: update_callback('waiting for tasks to complete: %r' % task_ids) time.sleep(FINALIZER_POLLING_FREQUENCY_S) def _task_and_children_result(id): tasks = list(check_task_status(id)) return { 'error': any([t['ready'] and not t['success'] for t in tasks]), 'ready': all([t['ready'] for t in tasks]) } results = dict([ (id, _task_and_children_result(id)) for id in task_ids]) if any([t['error'] for t in results.values()]): all_successful = False task_ids = [ id for id, status in results.items() if not status['ready']] if task_ids: raise InventoryTaskError( 'timeout waiting for pending tasks to complete') if not all_successful: raise InventoryTaskError( 'some tasks finished with an error') update_callback('pending tasks completed in {} seconds'.format( time.time() - start_time)) @app.task(base=InventoryTask, bind=True, name='refresh_finalizer') @log_task_entry_and_exit def refresh_finalizer(self, pending_task_ids_json): # TODO: if more types of errors appear, use a finally block input_schema = { "$schema": "http://json-schema.org/draft-07/schema#", "type": "array", "items": {"type": "string"} } try: task_ids = json.loads(pending_task_ids_json) logger.debug('task_ids: %r' % task_ids) jsonschema.validate(task_ids, input_schema) _wait_for_tasks(task_ids, update_callback=self.log_info) _build_subnet_db(update_callback=self.log_info) _build_snmp_peering_db(update_callback=self.log_info) _build_juniper_peering_db(update_callback=self.log_info) except (jsonschema.ValidationError, json.JSONDecodeError, InventoryTaskError, RedisError, KombuError) as e: update_latch_status(InventoryTask.config, failure=True) raise e latch_db(InventoryTask.config) self.log_info('latched current/next dbs') def _build_subnet_db(update_callback=lambda s: None): r = get_next_redis(InventoryTask.config) update_callback('loading all network addresses') subnets = {} # scan with bigger batches, to mitigate network latency effects for k in r.scan_iter('netconf-interfaces-hosts:*', count=1000): k = k.decode('utf-8') hostname = k[len('netconf-interfaces-hosts:'):] host_interfaces = r.get(k).decode('utf-8') host_interfaces = json.loads(host_interfaces) for ifc in host_interfaces: ifc['router'] = hostname entry = subnets.setdefault(ifc['interface address'], []) entry.append(ifc) for ifc in _unmanaged_interfaces(): entry = subnets.setdefault(ifc['interface address'], []) entry.append(ifc) update_callback('saving {} subnets'.format(len(subnets))) rp = r.pipeline() for k, v in subnets.items(): rp.set(f'subnets:{k}', json.dumps(v)) rp.execute() def _build_juniper_peering_db(update_callback=lambda s: None): def _is_ix(peering_info): if peering_info.get('instance', '') != 'IAS': return False if not peering_info.get('group', '').startswith('GEANT-IX'): return False expected_keys = ('description', 'local-asn', 'remote-asn') if any(peering_info.get(x, None) is None for x in expected_keys): logger.error('internal data error, looks like ix peering but' f'some expected keys are missing: {peering_info}') return False return True r = get_next_redis(InventoryTask.config) update_callback('loading all juniper network peerings') peerings_per_address = {} ix_peerings = [] peerings_per_asn = {} peerings_per_logical_system = {} peerings_per_group = {} peerings_per_routing_instance = {} # scan with bigger batches, to mitigate network latency effects key_prefix = 'juniper-peerings:hosts:' for k in r.scan_iter(f'{key_prefix}*', count=1000): key_name = k.decode('utf-8') hostname = key_name[len(key_prefix):] host_peerings = r.get(key_name).decode('utf-8') host_peerings = json.loads(host_peerings) for p in host_peerings: p['hostname'] = hostname peerings_per_address.setdefault(p['address'], []).append(p) if _is_ix(p): ix_peerings.append(p) asn = p.get('remote-asn', None) if asn: peerings_per_asn.setdefault(asn, []).append(p) logical_system = p.get('logical-system', None) if logical_system: peerings_per_logical_system.setdefault( logical_system, []).append(p) group = p.get('group', None) if group: peerings_per_group.setdefault(group, []).append(p) routing_instance = p.get('instance', None) if routing_instance: peerings_per_routing_instance.setdefault( routing_instance, []).append(p) # sort ix peerings by group ix_groups = {} for p in ix_peerings: description = p['description'] keyword = description.split(' ')[0] # regex needed??? (e.g. tabs???) ix_groups.setdefault(keyword, set()).add(p['address']) rp = r.pipeline() # create peering entries, keyed by remote addresses update_callback(f'saving {len(peerings_per_address)} remote peers') for k, v in peerings_per_address.items(): rp.set(f'juniper-peerings:remote:{k}', json.dumps(v)) # create pivoted ix group name lists update_callback(f'saving {len(ix_groups)} remote ix peering groups') for k, v in ix_groups.items(): group_addresses = list(v) rp.set(f'juniper-peerings:ix-groups:{k}', json.dumps(group_addresses)) # create pivoted asn peering lists update_callback(f'saving {len(peerings_per_asn)} asn peering lists') for k, v in peerings_per_asn.items(): rp.set(f'juniper-peerings:peer-asn:{k}', json.dumps(v)) # create pivoted logical-systems peering lists update_callback( f'saving {len(peerings_per_logical_system)}' ' logical-system peering lists') for k, v in peerings_per_logical_system.items(): rp.set(f'juniper-peerings:logical-system:{k}', json.dumps(v)) # create pivoted group peering lists update_callback( f'saving {len(peerings_per_group)} group peering lists') for k, v in peerings_per_group.items(): rp.set(f'juniper-peerings:group:{k}', json.dumps(v)) # create pivoted routing instance peering lists update_callback( f'saving {len(peerings_per_routing_instance)} group peering lists') for k, v in peerings_per_routing_instance.items(): rp.set(f'juniper-peerings:routing-instance:{k}', json.dumps(v)) rp.execute() def _build_snmp_peering_db(update_callback=lambda s: None): r = get_next_redis(InventoryTask.config) update_callback('loading all snmp network peerings') peerings = {} # scan with bigger batches, to mitigate network latency effects key_prefix = 'snmp-peerings:hosts:' for k in r.scan_iter(f'{key_prefix}*', count=1000): key_name = k.decode('utf-8') hostname = key_name[len(key_prefix):] host_peerings = r.get(key_name).decode('utf-8') host_peerings = json.loads(host_peerings) for p in host_peerings: p['hostname'] = hostname peerings.setdefault(p['remote'], []).append(p) update_callback(f'saving {len(peerings)} remote peers') rp = r.pipeline() for k, v in peerings.items(): rp.set(f'snmp-peerings:remote:{k}', json.dumps(v)) rp.execute() def check_task_status(task_id, parent=None, forget=False): r = AsyncResult(task_id, app=app) assert r.id == task_id # sanity result = { 'id': task_id, 'status': r.status, 'exception': r.status in states.EXCEPTION_STATES, 'ready': r.status in states.READY_STATES, 'success': r.status == states.SUCCESS, 'parent': parent } # TODO: only discovered this case by testing, is this the only one? # ... otherwise need to pre-test json serialization if isinstance(r.result, Exception): result['result'] = { 'error type': type(r.result).__name__, 'message': str(r.result) } else: result['result'] = r.result def child_taskids(children): # reverse-engineered, can't find documentation on this for child in children: if not child: continue if isinstance(child, list): logger.debug(f'list: {child}') yield from child_taskids(child) continue if isinstance(child, str): yield child continue assert isinstance(child, AsyncResult) yield child.id for child_id in child_taskids(getattr(r, 'children', []) or []): yield from check_task_status(child_id, parent=task_id) if forget and result['ready']: r.forget() yield result