Skip to content

Commit

Permalink
Wrap c_scheduled_write in a try-catch to not throw from C side
Browse files Browse the repository at this point in the history
  • Loading branch information
quinnj committed Sep 19, 2024
1 parent a1ddcb7 commit 6aec436
Showing 1 changed file with 38 additions and 36 deletions.
74 changes: 38 additions & 36 deletions src/sockets/client.jl
Original file line number Diff line number Diff line change
Expand Up @@ -224,45 +224,47 @@ function schedule_channel_task(channel, task_fn, arg, type_tag)
end

function c_scheduled_write(channel_task, arg, status)
socket = arg.socket
if status == Int(AWS_TASK_STATUS_RUN_READY)
n = arg.n
socket.debug && @info "c_scheduled_write: writing $n bytes"
buf = aws_byte_buf(0, pointer(socket.writebuf.data), n, C_NULL)
bytes_written = 0
while bytes_written < n
msgptr = aws_channel_acquire_message_from_pool(socket.channel, AWS_IO_MESSAGE_APPLICATION_DATA, n - bytes_written)
if msgptr == C_NULL
socket.debug && @error "c_scheduled_write: failed to acquire message from pool"
close(socket.ch, sockerr("failed to acquire message from pool"))
@goto done
end
msg = StructRef(msgptr)
data = Ref(msg.message_data)
cap = data[].capacity
cursor = Ref(aws_byte_cursor(cap, buf.buffer + bytes_written))
GC.@preserve data cursor begin
aws_byte_buf_append(data, cursor)
msg.message_data = data[]
end
socket.debug && @info "c_scheduled_write: sending $(data[].len) bytes in message"
if aws_channel_slot_send_message(socket.slot, msgptr, AWS_CHANNEL_DIR_WRITE) != 0
aws_mem_release(msg.allocator, msgptr)
socket.debug && @error "c_scheduled_write: failed to send message"
close(socket.ch, sockerr("failed to send message"))
@goto done
try
socket = arg.socket
if status == Int(AWS_TASK_STATUS_RUN_READY)
n = arg.n
socket.debug && @info "c_scheduled_write: writing $n bytes"
buf = aws_byte_buf(0, pointer(socket.writebuf.data), n, C_NULL)
bytes_written = 0
while bytes_written < n
msgptr = aws_channel_acquire_message_from_pool(socket.channel, AWS_IO_MESSAGE_APPLICATION_DATA, n - bytes_written)
if msgptr == C_NULL
socket.debug && @error "c_scheduled_write: failed to acquire message from pool"
close(socket.ch, sockerr("failed to acquire message from pool"))
@goto done
end
msg = StructRef(msgptr)
data = Ref(msg.message_data)
cap = data[].capacity
cursor = Ref(aws_byte_cursor(cap, buf.buffer + bytes_written))
GC.@preserve data cursor begin
aws_byte_buf_append(data, cursor)
msg.message_data = data[]
end
socket.debug && @info "c_scheduled_write: sending $(data[].len) bytes in message"
if aws_channel_slot_send_message(socket.slot, msgptr, AWS_CHANNEL_DIR_WRITE) != 0
aws_mem_release(msg.allocator, msgptr)
socket.debug && @error "c_scheduled_write: failed to send message"
close(socket.ch, sockerr("failed to send message"))
@goto done
end
bytes_written += cap
end
bytes_written += cap
put!(socket.ch, :write_completed)
else
socket.debug && @warn "c_scheduled_write: task cancelled"
close(socket.ch, sockerr("task cancelled"))
end
else
socket.debug && @warn "c_scheduled_write: task cancelled"
close(socket.ch, sockerr("task cancelled"))
@goto done
end
put!(socket.ch, :write_completed)
@label done
aws_mem_release(default_aws_allocator(), channel_task)
socket.debug && @info "c_scheduled_write: write completed"
finally
aws_mem_release(default_aws_allocator(), channel_task)
socket.debug && @info "c_scheduled_write: write completed"
end
return
end

Expand Down

0 comments on commit 6aec436

Please sign in to comment.