-
Notifications
You must be signed in to change notification settings - Fork 40
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added support for Ampq v1.0 #38
Conversation
@AndreSteenbergen so is this going to publish as its own independent package? |
I think it should, as amqp 0.9 (RabbitMq and the likes) and amqp 1.0 are incompatible. With RabbitMQ being quite popular, I don't think it is wise to remove 0.9 support. |
BTW this pull request is just a first start to support AMQP 1.0. I hope good feedback can be given to the code, I hope this contributes something . |
This pull request was made to support this: #37 |
Ah, got it. CC @bobanco - any thoughts on this? |
public sealed class AmqpSinkStage<T> : GraphStageWithMaterializedValue<SinkShape<T>, Task> | ||
{ | ||
public readonly Inlet<T> In = new Inlet<T>("AmqpSink.in"); | ||
public override SinkShape<T> Shape => new SinkShape<T>(In); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's most likely that the getter is called multiple times during materialization, i.e. wenn calling .Run
, so this should be set once in the constructor.
try | ||
{ | ||
var elem = Grab(stage.In); | ||
sender.Send(new Message(amqpSinkStage.AmpqSourceSettings.GetBytes(elem))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a blocking call ?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There is an async version. but should I await it, or just continue?
Pull(stage.In); | ||
} catch (Exception e) | ||
{ | ||
Debugger.Break(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you forgot to remove it or because it's still WIP ?
|
||
public override void PostStop() | ||
{ | ||
promise.TrySetException(new ApplicationException("stage stopped unexpectedly")); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If the stream fails onUpstreamFailure
is called, so I'm not sure why we should try to "throw" another exception from her.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Anyways, why ApplicationException
? I believe MS encouraged to not use it many years ago.
AmqpSourceSettings = amqpSourceSettings; | ||
} | ||
|
||
public override SourceShape<T> Shape => new SourceShape<T>(Out); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, set once via constructor
try | ||
{ | ||
Push(outlet, ampqSourceSettings.Convert(message)); | ||
receiver.Accept(message); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is it a good idea to immediately accept the message ?
What if I would want to accept it at the end of the stream because processing could fail later in the chain and if so I want to try to process it a second time?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How would that work? Sending a reference to the receiverlink with the message.
} | ||
catch (Exception e) | ||
{ | ||
receiver.Reject(message, new Error(new Symbol(e.Message))); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Not sure about the semantics of Reject
but would it maybe make sense to sync it with the result of the decider ?
E.g. if decider returns Resume
or Restart
one maybe doesn't want to reject the message but instead try it another time ?
{ | ||
public static class AmpqSink<T> | ||
{ | ||
public static Sink<T, Task> Create(IAmpqSinkSettings<T> sourceSettings) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would move the T
into the method definition, in theory you should be able to call AmpqSink.Create(settings)
because the T
could be inferred from the settings.
{ | ||
public static class AmpqSource<T> | ||
{ | ||
public static Source<T, NotUsed> Create(IAmqpSourceSettings<T> sourceSettings) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
As above, move the T
this.serializer = serializer; | ||
} | ||
|
||
public T Convert(Message message) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Move {
into the next line ;-)
@AndreSteenbergen left you a few remarks,. One more general question, aren't there any async methods for the queue, e.g. |
public AmqpConnectorsTest() | ||
{ | ||
materializer = ActorMaterializer.Create(Sys); | ||
var serialization = Sys.Serialization; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why this variable?
public override void PostStop() | ||
{ | ||
base.PostStop(); | ||
receiver.Close(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Are you sure we should call the base method first?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thx, nope
public static Decider GetDeciderOrDefault(this Attributes attributes) | ||
{ | ||
var attr = attributes.GetAttribute<ActorAttributes.SupervisionStrategy>(null); | ||
return attr != null ? attr.Decider : Deciders.StoppingDecider; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return attr?.attr.Decider ?? Deciders.StoppingDecider
?
Tested the latest commit using these brokers:
All seem to work now. |
Added an event handler to the subscription to raise event when topic eof reached
…m actions" This reverts commit 44e63db.
@marcpiechura What do you think after the updates? |
@AndreSteenbergen @marcpiechura @vasily-kirichenko any word on this? I'd like to push out the Akka.Streams.Kafka package but if this is ready for merge, we can do both at the same time. |
Do the two tests cover all this PR functionality? |
Sorry was away on vacation; I think all functionality is captured. With amqp 1 the server implements logic, instead of the client. I couldn't think of more tests besides receiving what was send. |
@marcpiechura what is the status of this PR? Could we merge it? |
Im merging this :P |
@Danthar thank you https://www.youtube.com/watch?v=SxMPTEeoShU Let's prep a new release here, along with the updated Kafka stuff if that's ready to go |
No description provided.