Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #8 from zalando-incubator/error-events
Browse files Browse the repository at this point in the history
React on the errors and unknown events from k8s API
  • Loading branch information
nolar authored Mar 27, 2019
2 parents c553c99 + db28de7 commit 84a99f5
Showing 1 changed file with 39 additions and 21 deletions.
60 changes: 39 additions & 21 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -74,28 +74,46 @@ async def _async_wrapper(src):
scheduler = await aiojobs.create_scheduler(limit=10)
queues = {}
try:
while True:

# Make a Kubernetes call to watch for the events via the API.
w = kubernetes.watch.Watch()
api = kubernetes.client.CustomObjectsApi()
stream = w.stream(api.list_cluster_custom_object, resource.group, resource.version, resource.plural)
async for event in _async_wrapper(stream):
key = (resource, event['object']['metadata']['uid'])

# Filter out all unrelated events as soon as possible (before queues), and silently.
# TODO: Reimplement via api.list_namespaced_custom_object, and API-level filtering.
ns = event['object'].get('metadata', {}).get('namespace', None)
if namespace is not None and ns is not None and ns != namespace:
continue

# Either use the existing object's queue, or create a new one together with the per-object job.
# "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
try:
await queues[key].put(event)
except KeyError:
queues[key] = asyncio.Queue()
await queues[key].put(event)
await scheduler.spawn(worker(handler=handler, queues=queues, key=key))
# Make a Kubernetes call to watch for the events via the API.
w = kubernetes.watch.Watch()
api = kubernetes.client.CustomObjectsApi()
api_fn = api.list_cluster_custom_object
stream = w.stream(api_fn, resource.group, resource.version, resource.plural)
async for event in _async_wrapper(stream):

# "410 Gone" is for the "resource version too old" error, we must restart watching.
# The resource versions are lost by k8s after few minutes (as per the official doc).
# The error occurs when there is nothing happening for few minutes. This is normal.
if event['type'] == 'ERROR' and event['object']['code'] == 410:
logger.debug("Restarting the watch-stream for %r", resource)
break # out of for-cycle, to the while-true-cycle.

# Other watch errors should be fatal for the operator.
if event['type'] == 'ERROR':
raise Exception(f"Error in the watch-stream: {event['object']}")

# Ensure that the event is something we understand and can handle.
if event['type'] not in ['ADDED', 'MODIFIED', 'DELETED']:
logger.warn("Ignoring an unsupported event type: %r", event)
continue

# Filter out all unrelated events as soon as possible (before queues), and silently.
# TODO: Reimplement via api.list_namespaced_custom_object, and API-level filtering.
ns = event['object'].get('metadata', {}).get('namespace', None)
if namespace is not None and ns is not None and ns != namespace:
continue

# Either use the existing object's queue, or create a new one together with the per-object job.
# "Fire-and-forget": we do not wait for the result; the job destroys itself when it is fully done.
key = (resource, event['object']['metadata']['uid'])
try:
await queues[key].put(event)
except KeyError:
queues[key] = asyncio.Queue()
await queues[key].put(event)
await scheduler.spawn(worker(handler=handler, queues=queues, key=key))

finally:
# Forcedly terminate all the fire-and-forget per-object jobs, of they are still running.
Expand Down

0 comments on commit 84a99f5

Please sign in to comment.