Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added support for Ampq v1.0 #38

Merged
merged 8 commits into from
Jan 8, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>

<IsPackable>false</IsPackable>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Streams.TestKit" Version="1.3.9" />
<PackageReference Include="Akka.TestKit.Xunit2" Version="1.3.9" />
<PackageReference Include="FluentAssertions" Version="5.4.2" />
<PackageReference Include="Microsoft.NET.Test.Sdk" Version="15.8.0" />
<PackageReference Include="xunit" Version="2.4.0" />
<PackageReference Include="xunit.runner.visualstudio" Version="2.4.0" />
<DotNetCliToolReference Include="dotnet-xunit" Version="2.3.1" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\Akka.Streams.Amqp.V1\Akka.Streams.Amqp.V1.csproj" />
</ItemGroup>

</Project>
54 changes: 54 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1.Tests/AmqpConnectorsTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
using Akka.Serialization;
using Akka.Streams.Amqp.V1.Dsl;
using Akka.Streams.Dsl;
using Amqp;
using System;
using Xunit;

namespace Akka.Streams.Amqp.V1.Tests
{
public class AmqpConnectorsTest : Akka.TestKit.Xunit2.TestKit
{
private readonly Serializer serializer;
private readonly Address address;
private readonly ActorMaterializer materializer;

public AmqpConnectorsTest()
{
materializer = ActorMaterializer.Create(Sys);
serializer = Sys.Serialization.FindSerializerForType(typeof(string));
address = new Address("amqp://guest:guest@localhost:5672");
}

[Fact]
public void Publish_and_consume_elements_through_a_simple_queue_again_in_the_same_process()
{
Connection connection = new Connection(address);
Session session = new Session(connection);

var queueName = "simple-queue-test" + Guid.NewGuid();
var senderlinkName = "amqp-conn-test-sender";
var receiverlinkName = "amqp-conn-test-receiver";

//create sink and source
var amqpSink = AmpqSink.Create(new NamedQueueSinkSettings<string>(session, senderlinkName, queueName, serializer));
var amqpSource = AmpqSource.Create(new NamedQueueSourceSettings<string>(session, receiverlinkName, queueName, 200, serializer));

//run sink
var input = new[] { "one", "two", "three", "four", "five" };
Source.From(input).RunWith(amqpSink, materializer).Wait();

//run source
var result = amqpSource
.Take(input.Length)
.RunWith(Sink.Seq<string>(), materializer);

result.Wait(TimeSpan.FromSeconds(30));
Assert.True(result.IsCompleted);
Assert.Equal(input, result.Result);

session.Close();
connection.Close();
}
}
}
47 changes: 47 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1.Tests/SimpleAmqpTest.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
using Amqp;
using Amqp.Framing;
using Amqp.Types;
using System.Threading.Tasks;
using Xunit;

namespace Akka.Streams.Amqp.V1.Tests
{
public class SimpleAmqpTest
{
[Fact]
public async Task TestHelloWorld()
{
//strange, works using regular activeMQ and the amqp test broker from here: http://azure.github.io/amqpnetlite/articles/hello_amqp.html
//but this does not work in ActiveMQ Artemis
Address address = new Address("amqp://guest:guest@localhost:5672");
Connection connection = await Connection.Factory.CreateAsync(address);
Session session = new Session(connection);

Message message = new Message("Hello AMQP");

Target target = new Target
{
Address = "q1",
Capabilities = new Symbol[] { new Symbol("queue") }
};

SenderLink sender = new SenderLink(session, "sender-link", target, null);
await sender.SendAsync(message);

Source source = new Source
{
Address = "q1",
Capabilities = new Symbol[] { new Symbol("queue") }
};

ReceiverLink receiver = new ReceiverLink(session, "receiver-link", source, null);
message = await receiver.ReceiveAsync();
receiver.Accept(message);

await sender.CloseAsync();
await receiver.CloseAsync();
await session.CloseAsync();
await connection.CloseAsync();
}
}
}
13 changes: 13 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1/Akka.Streams.Amqp.V1.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>netcoreapp2.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Akka.Streams" Version="1.3.9" />
<PackageReference Include="AMQPNetLite.Core" Version="2.1.3" />
<PackageReference Include="AMQPNetLite.Serialization" Version="2.1.3" />
</ItemGroup>

</Project>
63 changes: 63 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1/AmqpSinkStage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
using Akka.Streams.Stage;
using Amqp;
using System.Threading.Tasks;

namespace Akka.Streams.Amqp.V1
{
public sealed class AmqpSinkStage<T> : GraphStageWithMaterializedValue<SinkShape<T>, Task>
{
public Inlet<T> In { get; }
public override SinkShape<T> Shape { get; }
public IAmpqSinkSettings<T> AmpqSourceSettings { get; }

public override ILogicAndMaterializedValue<Task> CreateLogicAndMaterializedValue(Attributes inheritedAttributes)
{
var promise = new TaskCompletionSource<Done>();
var logic = new AmqpSinkStageLogic(this, promise, Shape);
return new LogicAndMaterializedValue<Task>(logic, promise.Task);
}

public AmqpSinkStage(IAmpqSinkSettings<T> ampqSourceSettings)
{
In = new Inlet<T>("AmqpSink.in");
Shape = new SinkShape<T>(In);
AmpqSourceSettings = ampqSourceSettings;
}

private class AmqpSinkStageLogic : GraphStageLogic
{
private readonly AmqpSinkStage<T> stage;
private readonly TaskCompletionSource<Done> promise;
private readonly SenderLink sender;

public AmqpSinkStageLogic(AmqpSinkStage<T> amqpSinkStage, TaskCompletionSource<Done> promise, SinkShape<T> shape) : base(shape)
{
stage = amqpSinkStage;
this.promise = promise;
sender = amqpSinkStage.AmpqSourceSettings.GetSenderLink();

SetHandler(stage.In, () =>
{
var elem = Grab(stage.In);
sender.Send(new Message(amqpSinkStage.AmpqSourceSettings.GetBytes(elem)));
Pull(stage.In);
},
onUpstreamFinish: () => promise.SetResult(Done.Instance),
onUpstreamFailure: ex => promise.SetException(ex)
);
}

public override void PreStart()
{
base.PreStart();
Pull(stage.In);
}

public override void PostStop()
{
sender.Close();
base.PostStop();
}
}
}
}
96 changes: 96 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1/AmqpSourceStage.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
using Akka.Streams.Amqp.V1.Util;
using Akka.Streams.Stage;
using Akka.Streams.Supervision;
using Amqp;
using Amqp.Framing;
using Amqp.Types;
using System;
using System.Collections.Generic;

namespace Akka.Streams.Amqp.V1
{
public sealed class AmqpSourceStage<T> : GraphStage<SourceShape<T>>
{
public override SourceShape<T> Shape { get; }
public Outlet<T> Out { get; }
public IAmqpSourceSettings<T> AmqpSourceSettings { get; }
protected override GraphStageLogic CreateLogic(Attributes inheritedAttributes) => new AmqpSourceStageLogic(this, inheritedAttributes);

public AmqpSourceStage(IAmqpSourceSettings<T> amqpSourceSettings)
{
Out = new Outlet<T>("AmqpSource.Out");
Shape = new SourceShape<T>(Out);
AmqpSourceSettings = amqpSourceSettings;
}

private class AmqpSourceStageLogic : GraphStageLogic
{
private readonly Outlet<T> outlet;
private readonly IAmqpSourceSettings<T> ampqSourceSettings;
private readonly ReceiverLink receiver;
private readonly Decider decider;
private readonly Queue<Message> queue = new Queue<Message>();

public AmqpSourceStageLogic(AmqpSourceStage<T> stage, Attributes attributes) : base(stage.Shape)
{
outlet = stage.Out;
ampqSourceSettings = stage.AmqpSourceSettings;
receiver = stage.AmqpSourceSettings.GetReceiverLink();
decider = attributes.GetDeciderOrDefault();

SetHandler(outlet, () =>
{
if (queue.TryDequeue(out Message msg))
{
PushMessage(msg);
}
}, onDownstreamFinish: CompleteStage);
}

public override void PreStart()
{
base.PreStart();

var consumerCallback = GetAsyncCallback<Message>(HandleDelivery);
receiver.Start(ampqSourceSettings.Credit, (_, m) => consumerCallback.Invoke(m));
}

private void HandleDelivery(Message message)
{
queue.Enqueue(message);
//as callback could be called concurrently try to dequeue
//a pull can be waiting for the message
if (IsAvailable(outlet) && queue.TryDequeue(out Message msg))
{
PushMessage(msg);
}
}

private void PushMessage(Message message)
{
T obj = default(T);
try
{
obj = ampqSourceSettings.Convert(message);
receiver.Accept(message);
}
catch (Exception e)
{
if (decider(e) == Directive.Stop)
{
receiver.Reject(message, new Error(new Symbol(e.Message)));
FailStage(e);
}
return;
}
Push(outlet, obj);
}

public override void PostStop()
{
receiver.Close();
base.PostStop();
}
}
}
}
13 changes: 13 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSink.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
using Akka.Streams.Dsl;
using System.Threading.Tasks;

namespace Akka.Streams.Amqp.V1.Dsl
{
public static class AmpqSink
{
public static Sink<T, Task> Create<T>(IAmpqSinkSettings<T> sourceSettings)
{
return Sink.FromGraph(new AmqpSinkStage<T>(sourceSettings));
}
}
}
12 changes: 12 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1/Dsl/AmpqSource.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
using Akka.Streams.Dsl;

namespace Akka.Streams.Amqp.V1.Dsl
{
public static class AmpqSource
{
public static Source<T, NotUsed> Create<T>(IAmqpSourceSettings<T> sourceSettings)
{
return Source.FromGraph(new AmqpSourceStage<T>(sourceSettings));
}
}
}
11 changes: 11 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1/IAmpqSinkSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Akka.IO;
using Amqp;

namespace Akka.Streams.Amqp.V1
{
public interface IAmpqSinkSettings<T>
{
SenderLink GetSenderLink();
byte[] GetBytes(T obj);
}
}
11 changes: 11 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1/IAmqpSourceSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
using Amqp;

namespace Akka.Streams.Amqp.V1
{
public interface IAmqpSourceSettings<T>
{
ReceiverLink GetReceiverLink();
int Credit { get; }
T Convert(Message message);
}
}
39 changes: 39 additions & 0 deletions Amqp.V1/Akka.Streams.Amqp.V1/NamedQueueSinkSettings.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,39 @@
using Akka.IO;
using Amqp;
using Akka.Serialization;
using Amqp.Framing;
using Amqp.Types;

namespace Akka.Streams.Amqp.V1
{
public class NamedQueueSinkSettings<T> : IAmpqSinkSettings<T>
{
private readonly Session session;
private readonly string linkName;
private readonly string queueName;
private readonly Serializer serializer;

public NamedQueueSinkSettings(
Session session,
string linkName,
string queueName,
Serializer serializer)
{
this.session = session;
this.linkName = linkName;
this.queueName = queueName;
this.serializer = serializer;
}

public byte[] GetBytes(T obj)
{
return serializer.ToBinary(obj);
}

public SenderLink GetSenderLink() => new SenderLink(session, linkName, new Target
{
Address = queueName,
Capabilities = new Symbol[] { new Symbol("queue") }
}, null);
}
}
Loading