Skip to content
This repository has been archived by the owner on Sep 14, 2020. It is now read-only.

Commit

Permalink
Merge pull request #27 from zalando-incubator/freezing-watching
Browse files Browse the repository at this point in the history
Replace built-in StopIteration with custom StopStreaming for API calls
  • Loading branch information
nolar authored Apr 16, 2019
2 parents af88f40 + dd92c55 commit 84c3852
Show file tree
Hide file tree
Showing 3 changed files with 132 additions and 7 deletions.
9 changes: 2 additions & 7 deletions kopf/reactor/queueing.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,7 @@
from kopf.reactor.peering import peers_keepalive, peers_handler, Peer, detect_own_id
from kopf.reactor.peering import PEERING_CRD_RESOURCE, PEERING_DEFAULT_NAME
from kopf.reactor.registry import get_default_registry, BaseRegistry, Resource
from kopf.reactor.watching import streaming_aiter

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -63,12 +64,6 @@ async def watcher(
The workers, on the other hand, are limited approximately to the life-time of an object's event.
"""

# If not wrapped, causes TypeError: 'async for' requires an object with __aiter__ method, got generator
loop = asyncio.get_event_loop()
async def _async_wrapper(src):
while True:
yield await loop.run_in_executor(None, next, src)

# All per-object workers are handled as fire-and-forget jobs via the scheduler,
# and communicated via the per-object event queues.
scheduler = await aiojobs.create_scheduler(limit=10)
Expand All @@ -81,7 +76,7 @@ async def _async_wrapper(src):
api = kubernetes.client.CustomObjectsApi()
api_fn = api.list_cluster_custom_object
stream = w.stream(api_fn, resource.group, resource.version, resource.plural)
async for event in _async_wrapper(stream):
async for event in streaming_aiter(stream):

# "410 Gone" is for the "resource version too old" error, we must restart watching.
# The resource versions are lost by k8s after few minutes (as per the official doc).
Expand Down
53 changes: 53 additions & 0 deletions kopf/reactor/watching.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
"""
Watching and streaming watch-events.
Kubernetes client's watching streams are synchronous. To make them asynchronous,
we put them into a `concurrent.futures.ThreadPoolExecutor`,
and yield from there asynchronously.
However, async/await coroutines misbehave with `StopIteration` exceptions
raised by the `next` method: see `PEP-479`_.
As a workaround, we replace `StopIteration` with our custom `StopStreaming`
inherited from `RuntimeError` (as suggested by `PEP-479`_),
and re-implement the generators to make them async.
All of this is a workaround for the standard Kubernetes client's limitations.
They would not be needed if the client library were natively asynchronous.
.. _PEP-479: https://www.python.org/dev/peps/pep-0479/
"""

import asyncio
import logging

logger = logging.getLogger(__name__)


class StopStreaming(RuntimeError):
"""
Raised when the watch-stream generator ends streaming.
Replaces `StopIteration`.
"""


def streaming_next(src):
"""
Same as `next`, but replaces the `StopIteration` with `StopStreaming`.
"""
try:
return next(src)
except StopIteration as e:
raise StopStreaming(str(e))


async def streaming_aiter(src, loop=None, executor=None):
"""
Same as `iter`, but asynchronous and stops on `StopStreaming`, not on `StopIteration`.
"""
loop = loop if loop is not None else asyncio.get_event_loop()
while True:
try:
yield await loop.run_in_executor(executor, streaming_next, src)
except StopStreaming:
return
77 changes: 77 additions & 0 deletions tests/test_streaming.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
import collections.abc

import pytest

from kopf.reactor.watching import StopStreaming, streaming_next, streaming_aiter


async def test_streaming_next_never_ends_with_stopiteration():
lst = []
src = iter(lst)

with pytest.raises(StopStreaming) as e:
streaming_next(src)

assert not isinstance(e, StopIteration)
assert not isinstance(e, StopAsyncIteration)


async def test_streaming_next_yields_and_ends():
lst = [1, 2, 3]
src = iter(lst)

val1 = streaming_next(src)
val2 = streaming_next(src)
val3 = streaming_next(src)
assert val1 == 1
assert val2 == 2
assert val3 == 3

with pytest.raises(StopStreaming):
streaming_next(src)


async def test_streaming_iterator_with_regular_next_yields_and_ends():
lst = [1, 2, 3]
src = iter(lst)

itr = streaming_aiter(src)
assert isinstance(itr, collections.abc.AsyncIterator)
assert isinstance(itr, collections.abc.AsyncGenerator)

val1 = next(src)
val2 = next(src)
val3 = next(src)
assert val1 == 1
assert val2 == 2
assert val3 == 3

with pytest.raises(StopIteration):
next(src)


async def test_streaming_iterator_with_asyncfor_works():
lst = [1, 2, 3]
src = iter(lst)

itr = streaming_aiter(src)
assert isinstance(itr, collections.abc.AsyncIterator)
assert isinstance(itr, collections.abc.AsyncGenerator)

vals = []
async for val in itr:
vals.append(val)
assert vals == lst


async def test_streaming_iterator_with_syncfor_fails():
lst = [1, 2, 3]
src = iter(lst)

itr = streaming_aiter(src)
assert isinstance(itr, collections.abc.AsyncIterator)
assert isinstance(itr, collections.abc.AsyncGenerator)

with pytest.raises(TypeError):
for _ in itr:
pass

0 comments on commit 84c3852

Please sign in to comment.