Skip to content

Commit

Permalink
More flow ops: drain, onComplete, onDone, onError (#228)
Browse files Browse the repository at this point in the history
  • Loading branch information
adamw authored Oct 8, 2024
1 parent a1c3e7c commit 4331f5e
Show file tree
Hide file tree
Showing 6 changed files with 144 additions and 18 deletions.
7 changes: 5 additions & 2 deletions core/src/main/scala/ox/flow/Flow.scala
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package ox.flow
import ox.channels.ChannelClosed
import ox.channels.Source
import ox.repeatWhile
import scala.annotation.nowarn

/** Describes an asynchronous transformation pipeline. When run, emits elements of type `T`.
*
Expand Down Expand Up @@ -40,11 +41,13 @@ trait FlowEmit[-T]:
def apply(t: T): Unit

object FlowEmit:
private[flow] inline def fromInline[T](inline f: T => Unit): FlowEmit[T] =
// suppressing the "New anonymous class definition will be duplicated at each inline site" warning: the whole point of this inline
// is to create new FlowEmit instances
@nowarn private[flow] inline def fromInline[T](inline f: T => Unit): FlowEmit[T] =
new FlowEmit[T]:
def apply(t: T): Unit = f(t)

/** Propagates all elements and closure events to the given emit. */
/** Propagates all elements to the given emit. */
def channelToEmit[T](source: Source[T], emit: FlowEmit[T]): Unit =
repeatWhile:
val t = source.receiveOrClosed()
Expand Down
10 changes: 8 additions & 2 deletions core/src/main/scala/ox/flow/FlowCompanionOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -16,17 +16,23 @@ import scala.concurrent.Await
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
import scala.annotation.nowarn

trait FlowCompanionOps:
this: Flow.type =>

private[flow] inline def usingEmitInline[T](inline withSink: FlowEmit[T] => Unit): Flow[T] = Flow(
// suppressing the "New anonymous class definition will be duplicated at each inline site" warning: the whole point of this inline
// is to create new FlowStage instances
@nowarn private[flow] inline def usingEmitInline[T](inline withEmit: FlowEmit[T] => Unit): Flow[T] = Flow(
new FlowStage:
override def run(emit: FlowEmit[T]): Unit = withSink(emit)
override def run(emit: FlowEmit[T]): Unit = withEmit(emit)
)

/** Creates a flow, which when run, provides a [[FlowEmit]] instance to the given `withEmit` function. Elements can be emitted to be
* processed by downstream stages by calling [[FlowEmit.apply]].
*
* The `FlowEmit` instance provided to the `withEmit` callback should only be used on the calling thread. That is, `FlowEmit` is
* thread-unsafe`. Moreover, the instance should not be stored or captured in closures, which outlive the invocation of `withEmit`.
*/
def usingEmit[T](withEmit: FlowEmit[T] => Unit): Flow[T] = usingEmitInline(withEmit)

Expand Down
19 changes: 19 additions & 0 deletions core/src/main/scala/ox/flow/FlowOps.scala
Original file line number Diff line number Diff line change
Expand Up @@ -698,6 +698,25 @@ class FlowOps[+T]:
}.tapException(other.errorOrClosed(_).discard)
end alsoToTap

/** Discard all elements emitted by this flow. The returned flow completes only when this flow completes (successfully or with an error).
*/
def drain(): Flow[Nothing] = Flow.usingEmitInline: emit =>
last.run(FlowEmit.fromInline(_ => ()))

/** Always runs `f` after the flow completes, whether it's because all elements are emitted, or when there's an error. */
def onComplete(f: => Unit): Flow[T] = Flow.usingEmitInline: emit =>
try last.run(emit)
finally f

/** Runs `f` after the flow completes successfully, that is when all elements are emitted. */
def onDone(f: => Unit): Flow[T] = Flow.usingEmitInline: emit =>
last.run(emit)
f

/** Runs `f` after the flow completes with an error. The error can't be recovered. */
def onError(f: Throwable => Unit): Flow[T] = Flow.usingEmitInline: emit =>
last.run(emit).tapException(f)

//

protected def runLastToChannelAsync(ch: Sink[T])(using OxUnsupervised): Unit =
Expand Down
27 changes: 27 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsDrainTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*
import java.util.concurrent.atomic.AtomicInteger

class FlowOpsDrainTest extends AnyFlatSpec with Matchers:
behavior of "drain"

it should "drain all elements" in:
val f = Flow.fromValues(1, 2, 3)
f.drain().runToList() shouldBe List.empty

it should "run any side-effects that are part of the flow" in:
val c = new AtomicInteger(0)
val f = Flow.fromValues(1, 2, 3).tap(_ => c.incrementAndGet().discard)
f.drain().runDrain()
c.get() shouldBe 3

it should "merge with another flow" in:
val c = new AtomicInteger(0)
val f1 = Flow.fromValues(1, 2, 3).tap(_ => c.incrementAndGet().discard)
val f2 = Flow.fromValues(4, 5, 6).tap(_ => c.incrementAndGet().discard)
f1.drain().merge(f2).runToList() shouldBe List(4, 5, 6)
c.get() shouldBe 6
end FlowOpsDrainTest
65 changes: 65 additions & 0 deletions core/src/test/scala/ox/flow/FlowOpsOnCompleteTest.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
package ox.flow

import org.scalatest.flatspec.AnyFlatSpec
import org.scalatest.matchers.should.Matchers
import ox.*
import java.util.concurrent.atomic.AtomicBoolean

class FlowOpsEnsureTest extends AnyFlatSpec with Matchers:
behavior of "ensure.onComplete"

it should "run in case of success" in:
val didRun = new AtomicBoolean(false)
val f = Flow.fromValues(1, 2, 3).onComplete(didRun.set(true))

didRun.get() shouldBe false
f.runDrain()
didRun.get() shouldBe true

it should "run in case of error" in:
val didRun = new AtomicBoolean(false)
val f = Flow.fromValues(1, 2, 3).concat(Flow.failed(new RuntimeException)).onComplete(didRun.set(true))

didRun.get() shouldBe false
intercept[RuntimeException]:
f.runDrain()
didRun.get() shouldBe true

behavior of "ensure.onDone"

it should "run in case of success" in:
val didRun = new AtomicBoolean(false)
val f = Flow.fromValues(1, 2, 3).onDone(didRun.set(true))

didRun.get() shouldBe false
f.runDrain()
didRun.get() shouldBe true

it should "not run in case of error" in:
val didRun = new AtomicBoolean(false)
val f = Flow.fromValues(1, 2, 3).concat(Flow.failed(new RuntimeException)).onDone(didRun.set(true))

didRun.get() shouldBe false
intercept[RuntimeException]:
f.runDrain()
didRun.get() shouldBe false

behavior of "ensure.onError"

it should "not run in case of success" in:
val didRun = new AtomicBoolean(false)
val f = Flow.fromValues(1, 2, 3).onError(_ => didRun.set(true))

didRun.get() shouldBe false
f.runDrain()
didRun.get() shouldBe false

it should "run in case of error" in:
val didRun = new AtomicBoolean(false)
val f = Flow.fromValues(1, 2, 3).concat(Flow.failed(new RuntimeException)).onError(_ => didRun.set(true))

didRun.get() shouldBe false
intercept[RuntimeException]:
f.runDrain()
didRun.get() shouldBe true
end FlowOpsEnsureTest
34 changes: 20 additions & 14 deletions doc/streaming/flows.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,7 @@ Flow.iterate(0)(_ + 1) // natural numbers

Note that creating a flow as above doesn't emit any elements, or execute any of the flow's logic. Only when run, the elements are emitted and any effects that are part of the flow's stages happen.

Flows can be also created by providing arbitrary element-emitting logic:

```scala mdoc:compile-only
import ox.flow.Flow

def isNoon(): Boolean = ???

Flow.usingEmit: emit =>
emit(1)
for i <- 4 to 50 do emit(i)
if isNoon() then emit(42)
```

Finally, flows can be created using [channel](channels.md) `Source`s:
Flows can also be created using [channel](channels.md) `Source`s:

```scala mdoc:compile-only
import ox.channels.Channel
Expand All @@ -50,6 +37,25 @@ supervised:
Flow.fromSource(ch) // TODO: transform the flow further & run
```

Finally, flows can be created by providing arbitrary element-emitting logic:

```scala mdoc:compile-only
import ox.flow.Flow

def isNoon(): Boolean = ???

Flow.usingEmit: emit =>
emit(1)
for i <- 4 to 50 do emit(i)
if isNoon() then emit(42)
```

The `emit: FlowEmit` instances is used to emit elements by the flow, that is process them further, as defined by the downstream pipeline. This method only completes once the element is fully processed, and it might throw exceptions in case there's a processing error.

As part of the callback, you can create [supervision scopes](../structured-concurrency/error-handling-scopes.md), fork background computations or run other flows asynchronously. However, take care **not** to share the `emit: FlowEmit` instance across threads. That is, instances of `FlowEmit` are thread-unsafe and should only be used on the calling thread. The lifetime of `emit` should not extend over the duration of the invocation of `withEmit`.

Any asynchronous communication should be best done with [channels](channels.md). You can then manually any elements received from a channel to `emit`, or use e.g. `FlowEmit.channelToEmit`.

## Transforming flows: basics

Multiple transformation stages can be added to a flow, each time returning a new `Flow` instance, describing the extended pipeline. As before, no elements are emitted or transformed until the flow is run, as flows are lazy. There's a number of pre-defined transformation stages, many of them similar in function to corresponding methods on Scala's collections:
Expand Down

0 comments on commit 4331f5e

Please sign in to comment.