diff --git a/fs-constants.h b/fs-constants.h index 004db4d..68a4e9f 100644 --- a/fs-constants.h +++ b/fs-constants.h @@ -19,6 +19,7 @@ enum class Seek { enum class SocketType { Stream, Datagram, + SeqPacket }; const int CURRENT_DIRECTORY = -100; diff --git a/fs-descriptor.h b/fs-descriptor.h index ef47230..8ecb981 100644 --- a/fs-descriptor.h +++ b/fs-descriptor.h @@ -277,6 +277,10 @@ struct SocketDescriptor : FileDescriptor { return _socket->peer(); } + Socket &peerHandle() { + return _socket->peerHandle(); + } + const Socket::Address &address() const { return _socket->address(); } diff --git a/fs-file.h b/fs-file.h index 416f56d..85d3d8a 100644 --- a/fs-file.h +++ b/fs-file.h @@ -399,6 +399,7 @@ struct Socket : File { } virtual Socket &peer() = 0; + virtual Socket &peerHandle() = 0; virtual bool canReceive( size_t ) const = 0; virtual bool canConnect() const = 0; @@ -434,41 +435,57 @@ inline void swap( Socket::Address &lhs, Socket::Address &rhs ) { lhs.swap( rhs ); } -struct SocketStream : Socket { +struct ReliableSocket : Socket { - SocketStream() : - _peer( nullptr ), - _stream( 1024 ), - _passive( false ), - _ready( false ), - _limit( 0 ) + ReliableSocket() : + _peer( nullptr ), + _passive( false ), + _ready( false ), + _limit( 0 ) {} - SocketStream( Node partner ) : - _peerHandle( std::move( partner ) ), - _peer( _peerHandle->data()->as< SocketStream >() ), - _stream( 1024 ), - _passive( true ), - _ready( true ), - _limit( 0 ) - {} + ReliableSocket( Node partner ) : + _peerHandle( std::move( partner ) ), + _peer( _peerHandle->data()->as< ReliableSocket >() ), + _passive(false ), + _ready( true ), + _limit( 0 ) + { + _peer->_peer = this; + _peer->_ready = true; + } - Socket &peer() override { + virtual Socket &peer() override { if ( !_peer ) throw Error( ENOTCONN ); return *_peer; } + virtual Socket &peerHandle() override { + if ( !_peer || !_ready ) + throw Error( ENOTCONN ); + return *_peerHandle->data()->as(); + } + void abort() override { _peerHandle.reset(); _peer = nullptr; } - void listen( int limit ) override { + void setPeerHandle(Node handle) { + _peerHandle = std::move(handle); + } + + bool canConnect() const override { + return _passive && !closed(); + } + + virtual void listen( int limit ) override { _passive = true; _limit = limit; } - Node accept() override { + + virtual Node accept() override { if ( !_passive ) throw Error( EINVAL ); @@ -476,45 +493,51 @@ struct SocketStream : Socket { while ( _backlog.empty() ) FS_MAKE_INTERRUPT(); - Node result( std::move( _backlog.front() ) ); + Node client( std::move( _backlog.front() ) ); _backlog.pop(); - return result; + return client; } - void connected( Node self, Node model, bool allocateNew ) { - if ( _peer ) - throw Error( EISCONN ); + virtual void addBacklog( Node incomming ) override { + if ( _backlog.size() == _limit ) + throw Error( ECONNREFUSED ); + _backlog.push( std::move( incomming ) ); + } - SocketStream *m = model->data()->as< SocketStream >(); +protected: + Node _peerHandle; + ReliableSocket *_peer; + utils::Queue< Node > _backlog; + bool _passive; + bool _ready; + int _limit; +}; - if ( allocateNew ) { - if ( !m->canConnect() ) - throw Error( ECONNREFUSED ); - _peerHandle = std::allocate_shared< INode >( - memory::AllocatorPure(), - Mode::GRANTS, - _peer = new( memory::nofail ) SocketStream( self ) - ); +struct SocketStream : ReliableSocket { - m->addBacklog( _peerHandle ); - } - else { - _peerHandle = std::move( model ); - _peer = m; - _peer->_peerHandle = std::move( self ); - _peer->_peer = this; - } - } + SocketStream() : + _stream( 1024 ) + {} + + SocketStream( Node partner ) : + ReliableSocket(std::move(partner)), + _stream( 1024 ) + { } void connected( Node self, Node model ) override { - connected( std::move( self ), std::move( model ), true ); - } + if ( _peer ) + throw Error( EISCONN ); - void addBacklog( Node incomming ) override { - if ( _backlog.size() == _limit ) + SocketStream *m = model->data()->as< SocketStream >(); + + if (!m) + throw Error( EBADF ); + if ( !m->canConnect() ) throw Error( ECONNREFUSED ); - _backlog.push( std::move( incomming ) ); + + m->addBacklog(std::move(self)); + _peerHandle = std::move(model); } bool canRead() const override { @@ -526,9 +549,6 @@ struct SocketStream : Socket { bool canReceive( size_t amount ) const override { return _stream.size() + amount <= _stream.capacity(); } - bool canConnect() const override { - return _passive && !closed(); - } void send( const char *buffer, size_t &length, Flags< flags::Message > fls ) override { if ( !_peer ) @@ -567,10 +587,10 @@ struct SocketStream : Socket { address = _peer->address(); } - void fillBuffer( const Address &, const char *, size_t & ) override { throw Error( EPROTOTYPE ); } + void fillBuffer( const char *buffer, size_t &length ) override { if ( closed() ) { abort(); @@ -580,15 +600,123 @@ struct SocketStream : Socket { length = _stream.push( buffer, length ); } - private: - Node _peerHandle; - SocketStream *_peer; storage::Stream _stream; - bool _passive; - bool _ready; - utils::Queue< Node > _backlog; - int _limit; +}; + +struct SeqPacketSocket : ReliableSocket { + + SeqPacketSocket() {} + + SeqPacketSocket(Node partner) : + ReliableSocket(std::move(partner)) + {} + + void connected( Node self, Node model ) override { + if ( _peer ) + throw Error( EISCONN ); + + SeqPacketSocket *m = model->data()->as< SeqPacketSocket >(); + + if (!m) + throw Error( EBADF ); + if ( !m->canConnect() ) + throw Error( ECONNREFUSED ); + + m->addBacklog(std::move(self)); + _peerHandle = std::move(model); + } + + bool canRead() const override { + return !_packets.empty(); + } + + bool canWrite() const override { + return _peer->canReceive(0); + } + + bool canReceive( size_t ) const override { + return !closed(); + } + + void sendTo( const char *buffer, size_t &length, Flags< flags::Message > fls, Node ) override { + send( buffer, length, fls ); + } + + void send( const char *buffer, size_t &length, Flags< flags::Message > fls ) override { + if ( !_peer ) + throw Error( ENOTCONN ); + + if ( !_peerHandle->mode().userWrite() ) + throw Error( EACCES ); + + if ( fls.has( flags::Message::DontWait ) && !_peer->canReceive( length ) ) + throw Error( EAGAIN ); + _peer->fillBuffer(buffer, length); + + } + + void fillBuffer( const Address &, const char *, size_t & ) override { + throw Error( EPROTOTYPE ); + } + + void fillBuffer( const char *buffer, size_t &length ) override { + if ( closed() ) { + abort(); + throw Error( ECONNRESET ); + } + + _packets.emplace( buffer, length ); + } + + void receive( char *buffer, size_t &length, Flags< flags::Message > fls, Address &address ) override { + + if ( fls.has( flags::Message::DontWait ) && _packets.empty() ) + throw Error( EAGAIN ); + + if ( !_peer && !closed() ) + throw Error( ENOTCONN ); + + while ( _packets.empty() ) + FS_MAKE_INTERRUPT(); + + length = _packets.front().read( buffer, length ); + if ( !fls.has( flags::Message::Peek ) ) + _packets.pop(); + + address = _peer->address(); + } + +private: + struct Packet { + + Packet( const char *data, size_t length ) : + _data( data, data + length ) + {} + + Packet( const Packet & ) = delete; + Packet( Packet && ) = default; + Packet &operator=( Packet other ) { + swap( other ); + return *this; + } + + size_t read( char *buffer, size_t max ) const { + size_t result = std::min( max, _data.size() ); + std::copy( _data.begin(), _data.begin() + result, buffer ); + return result; + } + + void swap( Packet &other ) { + using std::swap; + swap( _data, other._data ); + } + + private: + utils::Vector< char > _data; + }; + utils::Queue< Packet > _packets; + }; struct SocketDatagram : Socket { @@ -607,6 +735,11 @@ struct SocketDatagram : Socket { throw Error( ENOTCONN ); } + Socket &peerHandle() override { + throw Error( EOPNOTSUPP ); + } + + bool canRead() const override { return !_packets.empty(); } diff --git a/fs-manager.cpp b/fs-manager.cpp index 61f425b..7a536ab 100644 --- a/fs-manager.cpp +++ b/fs-manager.cpp @@ -460,6 +460,9 @@ int Manager::socket( SocketType type, Flags< flags::Open > fl ) { case SocketType::Datagram: s = new( memory::nofail ) SocketDatagram; break; + case SocketType::SeqPacket: + s = new( memory::nofail ) SeqPacketSocket; + break; default: throw Error( EPROTONOSUPPORT ); } @@ -474,43 +477,59 @@ int Manager::socket( SocketType type, Flags< flags::Open > fl ) { } std::pair< int, int > Manager::socketpair( SocketType type, Flags< flags::Open > fl ) { - if ( type != SocketType::Stream ) + if ( type != SocketType::Stream && type != SocketType::SeqPacket ) throw Error( EOPNOTSUPP ); + Node client, server; + if (type == SocketType::Stream) { + SocketStream *cl = new( memory::nofail ) SocketStream; - SocketStream *cl = new( memory::nofail ) SocketStream; - - Node client = std::allocate_shared< INode >( - memory::AllocatorPure(), - Mode::GRANTS | Mode::SOCKET, - cl ); - Node server = std::allocate_shared< INode >( - memory::AllocatorPure(), - Mode::GRANTS | Mode::SOCKET, - new( memory::nofail ) SocketStream ); + client = std::allocate_shared< INode >( + memory::AllocatorPure(), + Mode::GRANTS | Mode::SOCKET, + cl ); + server = std::allocate_shared< INode >( + memory::AllocatorPure(), + Mode::GRANTS | Mode::SOCKET, + new( memory::nofail ) SocketStream(client) ); - cl->connected( client, server ); + cl->setPeerHandle(server); + }else { + SeqPacketSocket *cl = new( memory::nofail ) SeqPacketSocket; - return { - _getFileDescriptor( - std::allocate_shared< SocketDescriptor >( + client = std::allocate_shared< INode >( memory::AllocatorPure(), - server, - fl - ) - ), - _getFileDescriptor( - std::allocate_shared< SocketDescriptor >( + Mode::GRANTS | Mode::SOCKET, + cl ); + server = std::allocate_shared< INode >( memory::AllocatorPure(), - client, - fl + Mode::GRANTS | Mode::SOCKET, + new( memory::nofail ) SeqPacketSocket(client) ); + + cl->setPeerHandle(server); + } + + return { + _getFileDescriptor( + std::allocate_shared< SocketDescriptor >( + memory::AllocatorPure(), + server, + fl + ) + ), + _getFileDescriptor( + std::allocate_shared< SocketDescriptor >( + memory::AllocatorPure(), + client, + fl + ) ) - ) }; } void Manager::bind( int sockfd, Socket::Address address ) { auto sd = getSocket( sockfd ); + if (!sd) throw Error ( EDESTADDRREQ ); Node current; utils::String name = address.value(); std::tie( current, name ) = _findDirectoryOfFile( name ); @@ -536,12 +555,20 @@ void Manager::connect( int sockfd, const Socket::Address &address ) { int Manager::accept( int sockfd, Socket::Address &address ) { Node partner = getSocket( sockfd )->accept(); + if(partner->data()->as() == nullptr) { + throw Error ( EOPNOTSUPP ); + } address = partner->data()->as< Socket >()->address(); - + Socket *socket; + if(partner->data()->as() != nullptr) { + socket = new( memory::nofail ) SocketStream(std::move(partner)); + }else { + socket = new( memory::nofail ) SeqPacketSocket(std::move(partner)); + } return _getFileDescriptor( std::allocate_shared< SocketDescriptor >( memory::AllocatorPure(), - std::move( partner ), + std::allocate_shared(memory::AllocatorPure(),Mode::GRANTS | Mode::SOCKET, socket), flags::Open::NoFlags ) ); diff --git a/fs.cpp b/fs.cpp index 46cbc14..27fe5ba 100644 --- a/fs.cpp +++ b/fs.cpp @@ -865,6 +865,9 @@ int socket( int domain, int t, int protocol ) { case SOCK_DGRAM: type = SocketType::Datagram; break; + case SOCK_SEQPACKET: + type = SocketType::SeqPacket; + break; default: throw Error( EPROTONOSUPPORT ); } diff --git a/sys/stat.h b/sys/stat.h index 43486f1..4bf8dcb 100644 --- a/sys/stat.h +++ b/sys/stat.h @@ -25,7 +25,9 @@ #include #include "../bits/stat.h" +#if defined( __MAC_OS_X_VERSION_MAX_ALLOWED ) typedef __darwin_socklen_t socklen_t; +#endif #ifdef __cplusplus extern "C" { diff --git a/sys/types.h b/sys/types.h index 2da6d0d..bf509a0 100644 --- a/sys/types.h +++ b/sys/types.h @@ -9,7 +9,7 @@ #ifndef __divine__ #if defined( __MAC_OS_X_VERSION_MAX_ALLOWED ) typedef __darwin_socklen_t socklen_t; -#elif +#else typedef __socklen_t socklen_t; #endif #else