Skip to content

Implement UringSystem and FS2 Sockets using netty io_uring API#78

Open
antoniojimeneznieto wants to merge 209 commits into
armanbilge:feature/jvmfrom
antoniojimeneznieto:feature/jvm
Open

Implement UringSystem and FS2 Sockets using netty io_uring API#78
antoniojimeneznieto wants to merge 209 commits into
armanbilge:feature/jvmfrom
antoniojimeneznieto:feature/jvm

Conversation

@antoniojimeneznieto

@antoniojimeneznieto antoniojimeneznieto commented May 30, 2023

Copy link
Copy Markdown
Collaborator

This pull request aims to adapt reusable code from Scala Native to Scala JVM. To provide compatibility with the uring library in JVM, we want to import functions from the Netty io_uring (https://github.com/netty/netty-incubator-transport-io_uring).

Plan:

  • Copy all the reusable code from Native to JVM
  • Import all the Netty functions for compatibility with the uring library
  • Reimplement the Native code in Scala JVM using Netty
    • Implement UringSystem
    • Implement UringSocket
    • Implement UringSocketGroup
    • Implement UringNetwork

@antoniojimeneznieto

Copy link
Copy Markdown
Collaborator Author

If I'm not mistaken, we are going to be using netty's Native class. However, it is not a public class. Do we make a fork of Netty and make it public in our version so we can use it ?

@armanbilge

Copy link
Copy Markdown
Owner

For now, we can create a "backdoor" by adding some code in the io.netty.incubator.channel.uring package that publicly exposes those methods so that we can use them.

@antoniojimeneznieto

antoniojimeneznieto commented Jun 5, 2023

Copy link
Copy Markdown
Collaborator Author

I wanted to see if I am on the right track 🤔. The next goal is to implement the UringSystem Poller, for that we will need to replace:
Ptr[io_uring] -> RingBuffer
Ptr[io_uring_sqe] -> IOUringSubmissionQueue
Ptr[Ptr[io_uring_sqe] -> List[IOUringSubmissionQueue]

@armanbilge

Copy link
Copy Markdown
Owner

That sounds right.

Ptr[Ptr[io_uring_sqe] -> List[IOUringSubmissionQueue]

Not sure about this one. Which method is this?

@antoniojimeneznieto

antoniojimeneznieto commented Jun 5, 2023

Copy link
Copy Markdown
Collaborator Author

In the processCqes:
def processCqes(_cqes: Ptr[Ptr[io_uring_cqe]])

@armanbilge

Copy link
Copy Markdown
Owner

Ah, you can access that one with the IOUringCompletionQueue.

SQE = submission queue entry (where you submit I/O work)
CQE = completion queue entry (where you collect results of completed work)

@antoniojimeneznieto

Copy link
Copy Markdown
Collaborator Author

Maybe I'm forgetting something but we don't seem to have access to FileDescriptorPoller and FileDescriptorPollHandle. What should we do 🤔 ?

@armanbilge

Copy link
Copy Markdown
Owner

Maybe I'm forgetting something but we don't seem to have access to FileDescriptorPoller and FileDescriptorPollHandle. What should we do 🤔 ?

These only make sense for Scala Native, you don't need to implement them for JVM :) you can delete it.

@antoniojimeneznieto

Copy link
Copy Markdown
Collaborator Author

Alright I see thanks :)

@antoniojimeneznieto

Copy link
Copy Markdown
Collaborator Author

I am looking for the equivalent to io_uring_prep_cancel64 for implementing the cancel method in the ApiImpl class and I am not completely sure, is it addPollRemove from IOUringSubmissionQueue 🤔 ?

@armanbilge

Copy link
Copy Markdown
Owner

No, that's something different, I wonder if Netty is missing it 🤔

But it doesn't matter, you can implement it manually using this method:

https://github.com/netty/netty-incubator-transport-io_uring/blob/c17c887b63bef590b83cc2f241a7ebbd16a9fd56/transport-classes-io_uring/src/main/java/io/netty/incubator/channel/uring/IOUringSubmissionQueue.java#L113-L114

Here is how liburing does it.
https://github.com/axboe/liburing/blob/b4ee3108b93f7e4602430246236d14978abad085/src/include/liburing.h#L632-L638

@armanbilge armanbilge left a comment

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ok, so getting serious about landing this ...

If you don't mind one last request: can we make an effort to deduplicate some of code and share between JVM/Native?

For implementation stuff, only if it's easy, I don't care too much (maybe for IOExceptionHelper, UringApp, implicits).

But ... I think we should try hard to share the tests, so we are running identical test suites on both platforms. We're not doing anything JVM-specific, hopefully?


import java.net.ProtocolFamily

private final class UringDatagramSocketGroup[F[_]] extends DatagramSocketGroup[F] {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can work on this in a follow-up PR :)

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you don't mind one last request: can we make an effort to deduplicate some of code and share between JVM/Native?

Great idea! I'll handle it :)

But ... I think we should try hard to share the tests, so we are running identical test suites on both platforms. We're not doing anything JVM-specific, hopefully?

I think we aren't, so it should be easy to do.

Comment thread build.sbt Outdated
Comment thread build.sbt Outdated
Comment on lines +132 to +135
_ <- F.delay {
buffer.clear()
buffer.writeBytes(bytes.toArray)
}

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we are re-using the same buffer for reading and writing? This is a problem if there are concurrent reads and writes, so we should use two buffers.

Also, I think we need logic to increase the buffer size for larger reads/writes?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Absolutely, my bad. Right now we would have two buffers with a predetermined size, we could create the buffer in the read/write itself depending on the number of bytes we want to send. Another option could be to have a default size and if the message is bigger than that size increase the buffer ?

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Another option could be to have a default size and if the message is bigger than that size increase the buffer

Yes, this is how it's currently working in the other implementations: we keep replacing the buffer with a bigger one as-needed.

def createBuffer[F[_]: Sync](size: Int): Resource[F, ByteBuf] =
Resource.make(
Sync[F].delay(UnpooledByteBufAllocator.DEFAULT.directBuffer(size))
)(buf => Sync[F].delay(if (buf.refCnt() > 0) { val _ = buf.release() }))

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, what happens if buf.refCnt() == 0, who is responsible for cleaning it up?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I might be wrong but I think when the counter reaches 0 it is automatically deallocated by netty and if refCnt() returns 0 it has already been deallocated. However, I realized it should be:

buf.release(buf.refCnt()) to decrease the counter to 0 and be deallocated properly.

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hmm, I think that makes more sense.

Well I won't pretend to really understand this 😁 let's just make Netty go away sooner rather than later 😉

Comment thread uring/jvm/src/main/scala/fs2/io/uring/unsafe/util.scala Outdated
linuxSocket => closeSocket(ring, linuxSocket.fd()).to
)

// private[this] def uringOpenSocket(ring: Uring, ipv6: Boolean): Resource[F, Int] = {

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use this?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, the problem is that the UringSocket depends on the LinuxSocket to get the localAddress:

def localAddress: F[SocketAddress[IpAddress]] = F.delay(SocketAddress.fromInetSocketAddress(linuxSocket.getLocalAddress()))

When I implemented it I didn't find any API in netty to get the localAddress from the fd that UringOpenSocket gives us. I was planning to implement it once we remove netty but I'm going to check it, maybe this time I'll find something 🤞

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good point! In that case it's done in a moment. Thanks Arman 😃

antoniojimeneznieto added a commit to antoniojimeneznieto/cats-effect that referenced this pull request May 5, 2026
Adapts the Native UringSystem to the multi-poller runtime that Scala
Native 0.5+ enables. Mirrors the structure of the JVM prototype from
GSoC (armanbilge/fs2-io_uring#78)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants