Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions fs-constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ enum class Seek {
enum class SocketType {
Stream,
Datagram,
SeqPacket
};

const int CURRENT_DIRECTORY = -100;
Expand Down
4 changes: 4 additions & 0 deletions fs-descriptor.h
Original file line number Diff line number Diff line change
Expand Up @@ -277,6 +277,10 @@ struct SocketDescriptor : FileDescriptor {
return _socket->peer();
}

Socket &peerHandle() {
return _socket->peerHandle();
}

const Socket::Address &address() const {
return _socket->address();
}
Expand Down
247 changes: 190 additions & 57 deletions fs-file.h
Original file line number Diff line number Diff line change
Expand Up @@ -399,6 +399,7 @@ struct Socket : File {
}

virtual Socket &peer() = 0;
virtual Socket &peerHandle() = 0;

virtual bool canReceive( size_t ) const = 0;
virtual bool canConnect() const = 0;
Expand Down Expand Up @@ -434,87 +435,109 @@ inline void swap( Socket::Address &lhs, Socket::Address &rhs ) {
lhs.swap( rhs );
}

struct SocketStream : Socket {
struct ReliableSocket : Socket {

SocketStream() :
_peer( nullptr ),
_stream( 1024 ),
_passive( false ),
_ready( false ),
_limit( 0 )
ReliableSocket() :
_peer( nullptr ),
_passive( false ),
_ready( false ),
_limit( 0 )
{}

SocketStream( Node partner ) :
_peerHandle( std::move( partner ) ),
_peer( _peerHandle->data()->as< SocketStream >() ),
_stream( 1024 ),
_passive( true ),
_ready( true ),
_limit( 0 )
{}
ReliableSocket( Node partner ) :
_peerHandle( std::move( partner ) ),
_peer( _peerHandle->data()->as< ReliableSocket >() ),
_passive(false ),
_ready( true ),
_limit( 0 )
{
_peer->_peer = this;
_peer->_ready = true;
}

Socket &peer() override {
virtual Socket &peer() override {
if ( !_peer )
throw Error( ENOTCONN );
return *_peer;
}

virtual Socket &peerHandle() override {
if ( !_peer || !_ready )
throw Error( ENOTCONN );
return *_peerHandle->data()->as<Socket>();
}

void abort() override {
_peerHandle.reset();
_peer = nullptr;
}

void listen( int limit ) override {
void setPeerHandle(Node handle) {
_peerHandle = std::move(handle);
}

bool canConnect() const override {
return _passive && !closed();
}

virtual void listen( int limit ) override {
_passive = true;
_limit = limit;
}
Node accept() override {

virtual Node accept() override {
if ( !_passive )
throw Error( EINVAL );

// progress or deadlock
while ( _backlog.empty() )
FS_MAKE_INTERRUPT();

Node result( std::move( _backlog.front() ) );
Node client( std::move( _backlog.front() ) );
_backlog.pop();
return result;
return client;
}

void connected( Node self, Node model, bool allocateNew ) {
if ( _peer )
throw Error( EISCONN );
virtual void addBacklog( Node incomming ) override {
if ( _backlog.size() == _limit )
throw Error( ECONNREFUSED );
_backlog.push( std::move( incomming ) );
}

SocketStream *m = model->data()->as< SocketStream >();
protected:
Node _peerHandle;
ReliableSocket *_peer;
utils::Queue< Node > _backlog;
bool _passive;
bool _ready;
int _limit;
};

if ( allocateNew ) {
if ( !m->canConnect() )
throw Error( ECONNREFUSED );

_peerHandle = std::allocate_shared< INode >(
memory::AllocatorPure(),
Mode::GRANTS,
_peer = new( memory::nofail ) SocketStream( self )
);
struct SocketStream : ReliableSocket {

m->addBacklog( _peerHandle );
}
else {
_peerHandle = std::move( model );
_peer = m;
_peer->_peerHandle = std::move( self );
_peer->_peer = this;
}
}
SocketStream() :
_stream( 1024 )
{}

SocketStream( Node partner ) :
ReliableSocket(std::move(partner)),
_stream( 1024 )
{ }

void connected( Node self, Node model ) override {
connected( std::move( self ), std::move( model ), true );
}
if ( _peer )
throw Error( EISCONN );

void addBacklog( Node incomming ) override {
if ( _backlog.size() == _limit )
SocketStream *m = model->data()->as< SocketStream >();

if (!m)
throw Error( EBADF );
if ( !m->canConnect() )
throw Error( ECONNREFUSED );
_backlog.push( std::move( incomming ) );

m->addBacklog(std::move(self));
_peerHandle = std::move(model);
}

bool canRead() const override {
Expand All @@ -526,9 +549,6 @@ struct SocketStream : Socket {
bool canReceive( size_t amount ) const override {
return _stream.size() + amount <= _stream.capacity();
}
bool canConnect() const override {
return _passive && !closed();
}

void send( const char *buffer, size_t &length, Flags< flags::Message > fls ) override {
if ( !_peer )
Expand Down Expand Up @@ -567,10 +587,10 @@ struct SocketStream : Socket {
address = _peer->address();
}


void fillBuffer( const Address &, const char *, size_t & ) override {
throw Error( EPROTOTYPE );
}

void fillBuffer( const char *buffer, size_t &length ) override {
if ( closed() ) {
abort();
Expand All @@ -580,15 +600,123 @@ struct SocketStream : Socket {
length = _stream.push( buffer, length );
}


private:
Node _peerHandle;
SocketStream *_peer;
storage::Stream _stream;
bool _passive;
bool _ready;
utils::Queue< Node > _backlog;
int _limit;
};

struct SeqPacketSocket : ReliableSocket {

SeqPacketSocket() {}

SeqPacketSocket(Node partner) :
ReliableSocket(std::move(partner))
{}

void connected( Node self, Node model ) override {
if ( _peer )
throw Error( EISCONN );

SeqPacketSocket *m = model->data()->as< SeqPacketSocket >();

if (!m)
throw Error( EBADF );
if ( !m->canConnect() )
throw Error( ECONNREFUSED );

m->addBacklog(std::move(self));
_peerHandle = std::move(model);
}

bool canRead() const override {
return !_packets.empty();
}

bool canWrite() const override {
return _peer->canReceive(0);
}

bool canReceive( size_t ) const override {
return !closed();
}

void sendTo( const char *buffer, size_t &length, Flags< flags::Message > fls, Node ) override {
send( buffer, length, fls );
}

void send( const char *buffer, size_t &length, Flags< flags::Message > fls ) override {
if ( !_peer )
throw Error( ENOTCONN );

if ( !_peerHandle->mode().userWrite() )
throw Error( EACCES );

if ( fls.has( flags::Message::DontWait ) && !_peer->canReceive( length ) )
throw Error( EAGAIN );
_peer->fillBuffer(buffer, length);

}

void fillBuffer( const Address &, const char *, size_t & ) override {
throw Error( EPROTOTYPE );
}

void fillBuffer( const char *buffer, size_t &length ) override {
if ( closed() ) {
abort();
throw Error( ECONNRESET );
}

_packets.emplace( buffer, length );
}

void receive( char *buffer, size_t &length, Flags< flags::Message > fls, Address &address ) override {

if ( fls.has( flags::Message::DontWait ) && _packets.empty() )
throw Error( EAGAIN );

if ( !_peer && !closed() )
throw Error( ENOTCONN );

while ( _packets.empty() )
FS_MAKE_INTERRUPT();

length = _packets.front().read( buffer, length );
if ( !fls.has( flags::Message::Peek ) )
_packets.pop();

address = _peer->address();
}

private:
struct Packet {

Packet( const char *data, size_t length ) :
_data( data, data + length )
{}

Packet( const Packet & ) = delete;
Packet( Packet && ) = default;
Packet &operator=( Packet other ) {
swap( other );
return *this;
}

size_t read( char *buffer, size_t max ) const {
size_t result = std::min( max, _data.size() );
std::copy( _data.begin(), _data.begin() + result, buffer );
return result;
}

void swap( Packet &other ) {
using std::swap;
swap( _data, other._data );
}

private:
utils::Vector< char > _data;
};
utils::Queue< Packet > _packets;

};

struct SocketDatagram : Socket {
Expand All @@ -607,6 +735,11 @@ struct SocketDatagram : Socket {
throw Error( ENOTCONN );
}

Socket &peerHandle() override {
throw Error( EOPNOTSUPP );
}


bool canRead() const override {
return !_packets.empty();
}
Expand Down
Loading