Skip to content

Commit

Permalink
Use Base.BufferStream instead of SimpleBufferStream
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Sep 24, 2024
1 parent 0af9f6e commit cdbe591
Show file tree
Hide file tree
Showing 3 changed files with 7 additions and 13 deletions.
2 changes: 0 additions & 2 deletions Project.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,8 @@ version = "1.0.0"
LibAwsCommon = "c6e421ba-b5f8-4792-a1c4-42948de3ed9d"
LibAwsIO = "a5388770-19df-4151-b103-3d71de896ddf"
Logging = "56ddb016-857b-54e1-b83d-db4d58db5568"
SimpleBufferStream = "777ac1f9-54b0-4bf8-805c-2214025038e7"

[compat]
SimpleBufferStream = "1.2.0"
julia = "1.6"

[extras]
Expand Down
2 changes: 1 addition & 1 deletion src/AwsIO.jl
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ module AwsIO

module Sockets

using Logging, LibAwsCommon, LibAwsIO, SimpleBufferStream
using Logging, LibAwsCommon, LibAwsIO

struct SocketError <: Exception
msg::String
Expand Down
16 changes: 6 additions & 10 deletions src/sockets/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ mutable struct Client <: IO
channel::Ptr{aws_channel}
slot::Ptr{aws_channel_slot}
ch::Channel{Symbol}
readbuf::BufferStream
readbuf::Base.BufferStream
accumulated_read_count::Int
writelock::ReentrantLock
writebuf::IOBuffer
Expand Down Expand Up @@ -98,7 +98,7 @@ mutable struct Client <: IO
ssl_alpn_list
)
end
x = new(host, port, debug, tls, socket_options, tls_options, buffer_capacity, C_NULL, C_NULL, Channel{Symbol}(0), BufferStream(), 0, ReentrantLock(), PipeBuffer())
x = new(host, port, debug, tls, socket_options, tls_options, buffer_capacity, C_NULL, C_NULL, Channel{Symbol}(0), Base.BufferStream(), 0, ReentrantLock(), PipeBuffer())
GC.@preserve x begin
x.bootstrap = aws_socket_channel_bootstrap_options(
client_bootstrap,
Expand Down Expand Up @@ -313,10 +313,10 @@ Base.flush(sock::Client) = flush(sock.writebuf)

function maybe_increment_read_window(sock::Client, nread, from)
acc = sock.accumulated_read_count += nread
ba = length(sock.readbuf)
ba = bytesavailable(sock.readbuf)
slotobj = unsafe_load(sock.slot)
sock.debug && @info "($(objectid(sock))), ($(from)): maybe_increment_read_window: $nread bytes just read, $ba bytes available, accumulated increment read count $acc, $(slotobj.window_size) window size"
if acc >= 4096
if acc >= (sock.buffer_capacity ÷ 256)
aws_channel_slot_increment_read_window(sock.slot, acc)
sock.accumulated_read_count = 0
end
Expand Down Expand Up @@ -352,12 +352,8 @@ function Base.skip(sock::Client, n)
return ret
end

Base.bytesavailable(sock::Client) = length(sock.readbuf)

function Base.eof(sock::Client)
maybe_increment_read_window(sock, 0, "Base.eof")
eof(sock.readbuf)
end
Base.bytesavailable(sock::Client) = bytesavailable(sock.readbuf)
Base.eof(sock::Client) = eof(sock.readbuf)

Base.isopen(sock::Client) = sock.slot == C_NULL ? false : aws_socket_is_open(aws_socket_handler_get_socket(FieldRef(sock, :handler)))

Expand Down

0 comments on commit cdbe591

Please sign in to comment.