From 6207c58fd3ba3bb16729c5d6dc281934c2bcc521 Mon Sep 17 00:00:00 2001 From: SweeXordious Date: Thu, 8 Aug 2019 22:42:10 -0700 Subject: [PATCH 1/2] DHT_Example_For_Pushing_Values --- dht-for-values-storage/chat.pb.go | 976 ++++++++++++++++++++++++++++ dht-for-values-storage/chat.proto | 23 + dht-for-values-storage/flags.go | 24 + dht-for-values-storage/main.go | 156 +++++ dht-for-values-storage/package.json | 24 + dht-for-values-storage/protocol.go | 394 +++++++++++ dht-for-values-storage/pubsub.go | 55 ++ 7 files changed, 1652 insertions(+) create mode 100644 dht-for-values-storage/chat.pb.go create mode 100644 dht-for-values-storage/chat.proto create mode 100644 dht-for-values-storage/flags.go create mode 100644 dht-for-values-storage/main.go create mode 100644 dht-for-values-storage/package.json create mode 100644 dht-for-values-storage/protocol.go create mode 100644 dht-for-values-storage/pubsub.go diff --git a/dht-for-values-storage/chat.pb.go b/dht-for-values-storage/chat.pb.go new file mode 100644 index 0000000..6b357d3 --- /dev/null +++ b/dht-for-values-storage/chat.pb.go @@ -0,0 +1,976 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: chat.proto + +package main + +import ( + fmt "fmt" + io "io" + math "math" + + github_com_golang_protobuf_proto "github.com/golang/protobuf/proto" + proto "github.com/golang/protobuf/proto" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.ProtoPackageIsVersion2 // please upgrade the proto package + +type Request_Type int32 + +const ( + Request_SEND_MESSAGE Request_Type = 0 + Request_UPDATE_PEER Request_Type = 1 +) + +var Request_Type_name = map[int32]string{ + 0: "SEND_MESSAGE", + 1: "UPDATE_PEER", +} + +var Request_Type_value = map[string]int32{ + "SEND_MESSAGE": 0, + "UPDATE_PEER": 1, +} + +func (x Request_Type) Enum() *Request_Type { + p := new(Request_Type) + *p = x + return p +} + +func (x Request_Type) String() string { + return proto.EnumName(Request_Type_name, int32(x)) +} + +func (x *Request_Type) UnmarshalJSON(data []byte) error { + value, err := proto.UnmarshalJSONEnum(Request_Type_value, data, "Request_Type") + if err != nil { + return err + } + *x = Request_Type(value) + return nil +} + +func (Request_Type) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_8c585a45e2093e54, []int{0, 0} +} + +type Request struct { + Type *Request_Type `protobuf:"varint,1,req,name=type,enum=main.Request_Type" json:"type,omitempty"` + SendMessage *SendMessage `protobuf:"bytes,2,opt,name=sendMessage" json:"sendMessage,omitempty"` + UpdatePeer *UpdatePeer `protobuf:"bytes,3,opt,name=updatePeer" json:"updatePeer,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *Request) Reset() { *m = Request{} } +func (m *Request) String() string { return proto.CompactTextString(m) } +func (*Request) ProtoMessage() {} +func (*Request) Descriptor() ([]byte, []int) { + return fileDescriptor_8c585a45e2093e54, []int{0} +} +func (m *Request) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *Request) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_Request.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *Request) XXX_Merge(src proto.Message) { + xxx_messageInfo_Request.Merge(m, src) +} +func (m *Request) XXX_Size() int { + return m.Size() +} +func (m *Request) XXX_DiscardUnknown() { + xxx_messageInfo_Request.DiscardUnknown(m) +} + +var xxx_messageInfo_Request proto.InternalMessageInfo + +func (m *Request) GetType() Request_Type { + if m != nil && m.Type != nil { + return *m.Type + } + return Request_SEND_MESSAGE +} + +func (m *Request) GetSendMessage() *SendMessage { + if m != nil { + return m.SendMessage + } + return nil +} + +func (m *Request) GetUpdatePeer() *UpdatePeer { + if m != nil { + return m.UpdatePeer + } + return nil +} + +type SendMessage struct { + Data []byte `protobuf:"bytes,1,req,name=data" json:"data,omitempty"` + Created *int64 `protobuf:"varint,2,req,name=created" json:"created,omitempty"` + Id []byte `protobuf:"bytes,3,req,name=id" json:"id,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *SendMessage) Reset() { *m = SendMessage{} } +func (m *SendMessage) String() string { return proto.CompactTextString(m) } +func (*SendMessage) ProtoMessage() {} +func (*SendMessage) Descriptor() ([]byte, []int) { + return fileDescriptor_8c585a45e2093e54, []int{1} +} +func (m *SendMessage) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *SendMessage) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_SendMessage.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *SendMessage) XXX_Merge(src proto.Message) { + xxx_messageInfo_SendMessage.Merge(m, src) +} +func (m *SendMessage) XXX_Size() int { + return m.Size() +} +func (m *SendMessage) XXX_DiscardUnknown() { + xxx_messageInfo_SendMessage.DiscardUnknown(m) +} + +var xxx_messageInfo_SendMessage proto.InternalMessageInfo + +func (m *SendMessage) GetData() []byte { + if m != nil { + return m.Data + } + return nil +} + +func (m *SendMessage) GetCreated() int64 { + if m != nil && m.Created != nil { + return *m.Created + } + return 0 +} + +func (m *SendMessage) GetId() []byte { + if m != nil { + return m.Id + } + return nil +} + +type UpdatePeer struct { + UserHandle []byte `protobuf:"bytes,1,opt,name=userHandle" json:"userHandle,omitempty"` + XXX_NoUnkeyedLiteral struct{} `json:"-"` + XXX_unrecognized []byte `json:"-"` + XXX_sizecache int32 `json:"-"` +} + +func (m *UpdatePeer) Reset() { *m = UpdatePeer{} } +func (m *UpdatePeer) String() string { return proto.CompactTextString(m) } +func (*UpdatePeer) ProtoMessage() {} +func (*UpdatePeer) Descriptor() ([]byte, []int) { + return fileDescriptor_8c585a45e2093e54, []int{2} +} +func (m *UpdatePeer) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *UpdatePeer) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_UpdatePeer.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalTo(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *UpdatePeer) XXX_Merge(src proto.Message) { + xxx_messageInfo_UpdatePeer.Merge(m, src) +} +func (m *UpdatePeer) XXX_Size() int { + return m.Size() +} +func (m *UpdatePeer) XXX_DiscardUnknown() { + xxx_messageInfo_UpdatePeer.DiscardUnknown(m) +} + +var xxx_messageInfo_UpdatePeer proto.InternalMessageInfo + +func (m *UpdatePeer) GetUserHandle() []byte { + if m != nil { + return m.UserHandle + } + return nil +} + +func init() { + proto.RegisterEnum("main.Request_Type", Request_Type_name, Request_Type_value) + proto.RegisterType((*Request)(nil), "main.Request") + proto.RegisterType((*SendMessage)(nil), "main.SendMessage") + proto.RegisterType((*UpdatePeer)(nil), "main.UpdatePeer") +} + +func init() { proto.RegisterFile("chat.proto", fileDescriptor_8c585a45e2093e54) } + +var fileDescriptor_8c585a45e2093e54 = []byte{ + // 276 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x4c, 0x90, 0xc1, 0x4a, 0xf3, 0x40, + 0x14, 0x85, 0x3b, 0x93, 0x40, 0xe1, 0x26, 0xf4, 0xcf, 0x7f, 0x57, 0xb3, 0x0a, 0x21, 0x0b, 0x89, + 0x20, 0x41, 0xea, 0x13, 0x54, 0x3a, 0x28, 0x48, 0x25, 0x4c, 0xda, 0x75, 0x19, 0x3a, 0x17, 0x0d, + 0x68, 0x1a, 0x93, 0xc9, 0xa2, 0x6f, 0xe2, 0xfb, 0xb8, 0x71, 0xe9, 0x23, 0x48, 0x7c, 0x11, 0x49, + 0xaa, 0x36, 0xbb, 0x99, 0x73, 0xbe, 0xc3, 0xb9, 0xf7, 0x02, 0xec, 0x1e, 0xb5, 0x4d, 0xab, 0x7a, + 0x6f, 0xf7, 0xe8, 0x3e, 0xeb, 0xa2, 0x8c, 0xdf, 0x18, 0x4c, 0x15, 0xbd, 0xb4, 0xd4, 0x58, 0x3c, + 0x03, 0xd7, 0x1e, 0x2a, 0x12, 0x2c, 0xe2, 0xc9, 0x6c, 0x8e, 0x69, 0x0f, 0xa4, 0x3f, 0x66, 0xba, + 0x3e, 0x54, 0xa4, 0x06, 0x1f, 0xaf, 0xc0, 0x6b, 0xa8, 0x34, 0x2b, 0x6a, 0x1a, 0xfd, 0x40, 0x82, + 0x47, 0x2c, 0xf1, 0xe6, 0xff, 0x8f, 0x78, 0x7e, 0x32, 0xd4, 0x98, 0xc2, 0x4b, 0x80, 0xb6, 0x32, + 0xda, 0x52, 0x46, 0x54, 0x0b, 0x67, 0xc8, 0x04, 0xc7, 0xcc, 0xe6, 0x4f, 0x57, 0x23, 0x26, 0x3e, + 0x07, 0xb7, 0x2f, 0xc5, 0x00, 0xfc, 0x5c, 0xde, 0x2f, 0xb7, 0x2b, 0x99, 0xe7, 0x8b, 0x1b, 0x19, + 0x4c, 0xf0, 0x1f, 0x78, 0x9b, 0x6c, 0xb9, 0x58, 0xcb, 0x6d, 0x26, 0xa5, 0x0a, 0x58, 0x7c, 0x07, + 0xde, 0xa8, 0x18, 0x11, 0x5c, 0xa3, 0xad, 0x1e, 0x16, 0xf1, 0xd5, 0xf0, 0x46, 0x01, 0xd3, 0x5d, + 0x4d, 0xda, 0x92, 0x11, 0x3c, 0xe2, 0x89, 0xa3, 0x7e, 0xbf, 0x38, 0x03, 0x5e, 0x18, 0xe1, 0x0c, + 0x2c, 0x2f, 0x4c, 0x7c, 0x01, 0x70, 0x9a, 0x08, 0x43, 0x80, 0xb6, 0xa1, 0xfa, 0x56, 0x97, 0xe6, + 0xa9, 0x3f, 0x0d, 0x4b, 0x7c, 0x35, 0x52, 0xae, 0x83, 0xf7, 0x2e, 0x64, 0x1f, 0x5d, 0xc8, 0x3e, + 0xbb, 0x90, 0xbd, 0x7e, 0x85, 0x93, 0xef, 0x00, 0x00, 0x00, 0xff, 0xff, 0x34, 0xa3, 0x90, 0xc9, + 0x65, 0x01, 0x00, 0x00, +} + +func (m *Request) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *Request) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Type == nil { + return 0, new(github_com_golang_protobuf_proto.RequiredNotSetError) + } else { + dAtA[i] = 0x8 + i++ + i = encodeVarintChat(dAtA, i, uint64(*m.Type)) + } + if m.SendMessage != nil { + dAtA[i] = 0x12 + i++ + i = encodeVarintChat(dAtA, i, uint64(m.SendMessage.Size())) + n1, err := m.SendMessage.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n1 + } + if m.UpdatePeer != nil { + dAtA[i] = 0x1a + i++ + i = encodeVarintChat(dAtA, i, uint64(m.UpdatePeer.Size())) + n2, err := m.UpdatePeer.MarshalTo(dAtA[i:]) + if err != nil { + return 0, err + } + i += n2 + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *SendMessage) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *SendMessage) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.Data == nil { + return 0, new(github_com_golang_protobuf_proto.RequiredNotSetError) + } else { + dAtA[i] = 0xa + i++ + i = encodeVarintChat(dAtA, i, uint64(len(m.Data))) + i += copy(dAtA[i:], m.Data) + } + if m.Created == nil { + return 0, new(github_com_golang_protobuf_proto.RequiredNotSetError) + } else { + dAtA[i] = 0x10 + i++ + i = encodeVarintChat(dAtA, i, uint64(*m.Created)) + } + if m.Id == nil { + return 0, new(github_com_golang_protobuf_proto.RequiredNotSetError) + } else { + dAtA[i] = 0x1a + i++ + i = encodeVarintChat(dAtA, i, uint64(len(m.Id))) + i += copy(dAtA[i:], m.Id) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func (m *UpdatePeer) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalTo(dAtA) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *UpdatePeer) MarshalTo(dAtA []byte) (int, error) { + var i int + _ = i + var l int + _ = l + if m.UserHandle != nil { + dAtA[i] = 0xa + i++ + i = encodeVarintChat(dAtA, i, uint64(len(m.UserHandle))) + i += copy(dAtA[i:], m.UserHandle) + } + if m.XXX_unrecognized != nil { + i += copy(dAtA[i:], m.XXX_unrecognized) + } + return i, nil +} + +func encodeVarintChat(dAtA []byte, offset int, v uint64) int { + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return offset + 1 +} +func (m *Request) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Type != nil { + n += 1 + sovChat(uint64(*m.Type)) + } + if m.SendMessage != nil { + l = m.SendMessage.Size() + n += 1 + l + sovChat(uint64(l)) + } + if m.UpdatePeer != nil { + l = m.UpdatePeer.Size() + n += 1 + l + sovChat(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *SendMessage) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Data != nil { + l = len(m.Data) + n += 1 + l + sovChat(uint64(l)) + } + if m.Created != nil { + n += 1 + sovChat(uint64(*m.Created)) + } + if m.Id != nil { + l = len(m.Id) + n += 1 + l + sovChat(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func (m *UpdatePeer) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.UserHandle != nil { + l = len(m.UserHandle) + n += 1 + l + sovChat(uint64(l)) + } + if m.XXX_unrecognized != nil { + n += len(m.XXX_unrecognized) + } + return n +} + +func sovChat(x uint64) (n int) { + for { + n++ + x >>= 7 + if x == 0 { + break + } + } + return n +} +func sozChat(x uint64) (n int) { + return sovChat(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (m *Request) Unmarshal(dAtA []byte) error { + var hasFields [1]uint64 + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: Request: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: Request: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Type", wireType) + } + var v Request_Type + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= Request_Type(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Type = &v + hasFields[0] |= uint64(0x00000001) + case 2: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field SendMessage", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthChat + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthChat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.SendMessage == nil { + m.SendMessage = &SendMessage{} + } + if err := m.SendMessage.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UpdatePeer", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthChat + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthChat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if m.UpdatePeer == nil { + m.UpdatePeer = &UpdatePeer{} + } + if err := m.UpdatePeer.Unmarshal(dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipChat(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthChat + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthChat + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + if hasFields[0]&uint64(0x00000001) == 0 { + return new(github_com_golang_protobuf_proto.RequiredNotSetError) + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *SendMessage) Unmarshal(dAtA []byte) error { + var hasFields [1]uint64 + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: SendMessage: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: SendMessage: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthChat + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthChat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Data = append(m.Data[:0], dAtA[iNdEx:postIndex]...) + if m.Data == nil { + m.Data = []byte{} + } + iNdEx = postIndex + hasFields[0] |= uint64(0x00000001) + case 2: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Created", wireType) + } + var v int64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int64(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Created = &v + hasFields[0] |= uint64(0x00000002) + case 3: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Id", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthChat + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthChat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Id = append(m.Id[:0], dAtA[iNdEx:postIndex]...) + if m.Id == nil { + m.Id = []byte{} + } + iNdEx = postIndex + hasFields[0] |= uint64(0x00000004) + default: + iNdEx = preIndex + skippy, err := skipChat(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthChat + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthChat + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + if hasFields[0]&uint64(0x00000001) == 0 { + return new(github_com_golang_protobuf_proto.RequiredNotSetError) + } + if hasFields[0]&uint64(0x00000002) == 0 { + return new(github_com_golang_protobuf_proto.RequiredNotSetError) + } + if hasFields[0]&uint64(0x00000004) == 0 { + return new(github_com_golang_protobuf_proto.RequiredNotSetError) + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func (m *UpdatePeer) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: UpdatePeer: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: UpdatePeer: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field UserHandle", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowChat + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthChat + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthChat + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.UserHandle = append(m.UserHandle[:0], dAtA[iNdEx:postIndex]...) + if m.UserHandle == nil { + m.UserHandle = []byte{} + } + iNdEx = postIndex + default: + iNdEx = preIndex + skippy, err := skipChat(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthChat + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthChat + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + m.XXX_unrecognized = append(m.XXX_unrecognized, dAtA[iNdEx:iNdEx+skippy]...) + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipChat(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowChat + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowChat + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowChat + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthChat + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthChat + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowChat + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipChat(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthChat + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthChat = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowChat = fmt.Errorf("proto: integer overflow") +) diff --git a/dht-for-values-storage/chat.proto b/dht-for-values-storage/chat.proto new file mode 100644 index 0000000..bdf1429 --- /dev/null +++ b/dht-for-values-storage/chat.proto @@ -0,0 +1,23 @@ +syntax = "proto2"; +package main; + +message Request { + enum Type { + SEND_MESSAGE = 0; + UPDATE_PEER = 1; + } + + required Type type = 1; + optional SendMessage sendMessage = 2; + optional UpdatePeer updatePeer = 3; +} + +message SendMessage { + required bytes data = 1; + required int64 created = 2; + required bytes id = 3; +} + +message UpdatePeer { + optional bytes userHandle = 1; +} \ No newline at end of file diff --git a/dht-for-values-storage/flags.go b/dht-for-values-storage/flags.go new file mode 100644 index 0000000..43aa2a4 --- /dev/null +++ b/dht-for-values-storage/flags.go @@ -0,0 +1,24 @@ +package main + +import ( + "flag" +) + +type config struct { + topic string + ProtocolID string + listenHost string + listenPort int +} + +func parseFlags() *config { + c := &config{} + + flag.StringVar(&c.topic, "topic", "/libp2p/example/chat/1.0.0", "Unique string to identify group of nodes") + flag.StringVar(&c.listenHost, "host", "0.0.0.0", "The bootstrap node host listen address\n") + flag.StringVar(&c.ProtocolID, "pid", "/chat/1.1.0", "Sets a protocol id for stream headers") + flag.IntVar(&c.listenPort, "port", 4001, "node listen port") + + flag.Parse() + return c +} diff --git a/dht-for-values-storage/main.go b/dht-for-values-storage/main.go new file mode 100644 index 0000000..3540713 --- /dev/null +++ b/dht-for-values-storage/main.go @@ -0,0 +1,156 @@ +package main + +import ( + "context" + "flag" + "fmt" + "os" + "os/signal" + "strconv" + "syscall" + "time" + + levelds "github.com/ipfs/go-ds-leveldb" + "github.com/libp2p/go-libp2p" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/protocol" + "github.com/libp2p/go-libp2p-core/routing" + kaddht "github.com/libp2p/go-libp2p-kad-dht" + mplex "github.com/libp2p/go-libp2p-mplex" + pubsub "github.com/libp2p/go-libp2p-pubsub" + yamux "github.com/libp2p/go-libp2p-yamux" + "github.com/libp2p/go-libp2p/p2p/discovery" + tcp "github.com/libp2p/go-tcp-transport" + ws "github.com/libp2p/go-ws-transport" +) + +type mdnsNotifee struct { + h host.Host + ctx context.Context +} + +func (m *mdnsNotifee) HandlePeerFound(pi peer.AddrInfo) { + m.h.Connect(m.ctx, pi) +} + +func main() { + + // Exection flags + help := flag.Bool("h", false, "Display Help") + config := parseFlags() + + if *help { + fmt.Println("This programs demonstrates the use of a pubsub pattern to broadcast values in the network. Get them stored into some peer (not always the one adding them) and being able to retrieve them from any node by have the CID of the value wanted.") + fmt.Println() + fmt.Println("Usage: Run './start") + flag.PrintDefaults() + return + } + + // Starting a new context + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // ListenAddrStrings configures libp2p to listen on the given (unparsed) addresses. + //listenAddrs := libp2p.ListenAddrStrings("/ip4/" + config.listenHost + "/tcp/" + strconv.Itoa(config.listenPort)) + listenAddrs := libp2p.ListenAddrStrings( + "/ip4/0.0.0.0/tcp/0", + "/ip4/0.0.0.0/tcp/0/ws", + ) + + // Package dht implements a distributed hash table that satisfies the ipfs routing interface. This DHT is modeled after kademlia with S/Kademlia modifications. + var dht *kaddht.IpfsDHT + newDHT := func(h host.Host) (routing.PeerRouting, error) { + var err error + ds, err := levelds.NewDatastore("", nil) + if err != nil { + fmt.Println(err) + } + + dht = kaddht.NewDHT(ctx, h, ds) + return dht, err + } + + routing := libp2p.Routing(newDHT) + + // Choosing transport protocol + transports := libp2p.ChainOptions( + libp2p.Transport(tcp.NewTCPTransport), + libp2p.Transport(ws.New), + ) + + // Choosing muxers for multiple streams + muxers := libp2p.ChainOptions( + libp2p.Muxer("/yamux/1.0.0", yamux.DefaultTransport), + libp2p.Muxer("/mplex/6.7.0", mplex.DefaultTransport), + ) + + // Creating the host with the previous parameters + host, err := libp2p.New( + ctx, + transports, + listenAddrs, + muxers, + routing, + ) + if err != nil { + panic(err) + } + + // Setting the function 'handleStream' to handle incoming connections + host.SetStreamHandler(protocol.ID("tcp"), handleStream) + + fmt.Printf("\n[*] Topic: %s\n\n", config.topic) + multiAddress := "/ip4/" + config.listenHost + "/tcp/" + strconv.Itoa(config.listenPort) + "/p2p/" + host.ID().Pretty() + fmt.Printf("[*] Your MultiAddrress Is: %s\n\n", multiAddress) + fmt.Printf("[*] Your DHT ID is : %s\n\n", dht.PeerID()) + + // New gossip pubsub for the host + ps, err := pubsub.NewGossipSub(ctx, host) + if err != nil { + panic(err) + } + + // subscribing to the topic + sub, err := ps.Subscribe(pubsubTopic) + if err != nil { + panic(err) + } + go pubsubHandler(ctx, sub) + + // Printing the listening interfaces + for _, addr := range host.Addrs() { + fmt.Println("-> Listening on", addr) + } + + fmt.Println("\n[*] How to:\nUse \x1b[32m/peers\x1b[0m to see the peers list\nUse \x1b[32m/name\x1b[0m to set your peer name for message delivery\nUse \x1b[32m/put\x1b[0m to put a value in the DHT\nUse \x1b[32m/get\x1b[0m to get Value from the DHT\n\n") + + // Use mdns for discovery + mdns, err := discovery.NewMdnsService(ctx, host, time.Second*10, pubsubTopic) + if err != nil { + panic(err) + } + mdns.RegisterNotifee(&mdnsNotifee{h: host, ctx: ctx}) + + err = dht.Bootstrap(ctx) + if err != nil { + panic(err) + } + + // Setting some global variables used in the code + setter(ctx, dht, ps, multiAddress) + donec := make(chan struct{}, 1) + go chatInputLoop(ctx, host, ps, donec, dht) + + stop := make(chan os.Signal, 1) + signal.Notify(stop, syscall.SIGINT) + + select { + case <-stop: + host.Close() + os.Exit(0) + case <-donec: + host.Close() + } +} diff --git a/dht-for-values-storage/package.json b/dht-for-values-storage/package.json new file mode 100644 index 0000000..4ef94de --- /dev/null +++ b/dht-for-values-storage/package.json @@ -0,0 +1,24 @@ +{ + "author": "root", + "bugs": { + "url": "idk" + }, + "gx": { + "dvcsimport": "Third_thing/1/08-End" + }, + "gxDependencies": [ + { + "author": "whyrusleeping", + "hash": "QmNohiVssaPw3KVLZik59DBVGTSm2dGvYT9eoXt5DQ36Yz", + "name": "go-ipfs-util", + "version": "1.2.9" + } + ], + "gxVersion": "0.14.2", + "language": "go", + "license": "", + "name": "08-End", + "releaseCmd": "git commit -a -m \"gx publish $VERSION\"", + "version": "0.0.0" +} + diff --git a/dht-for-values-storage/protocol.go b/dht-for-values-storage/protocol.go new file mode 100644 index 0000000..42191af --- /dev/null +++ b/dht-for-values-storage/protocol.go @@ -0,0 +1,394 @@ +package main + +import ( + "bufio" + "context" + "crypto/rand" + "fmt" + "os" + "sync" + "time" + + sha "crypto/sha256" + c "github.com/ipfs/go-cid" + "github.com/libp2p/go-libp2p-core/host" + "github.com/libp2p/go-libp2p-core/network" + p "github.com/libp2p/go-libp2p-core/peer" + "github.com/libp2p/go-libp2p-core/peerstore" + "github.com/libp2p/go-libp2p-core/protocol" + kaddht "github.com/libp2p/go-libp2p-kad-dht" + peer "github.com/libp2p/go-libp2p-peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" + "github.com/multiformats/go-multiaddr" + mh "github.com/multiformats/go-multihash" + "strings" +) + +var ctx context.Context +var dht *kaddht.IpfsDHT +var valueStore map[string]string +var ps *pubsub.PubSub +var wg sync.WaitGroup +var multiAddr string + +// Initializing global Variables used +func setter(ct context.Context, d *kaddht.IpfsDHT, p *pubsub.PubSub, ma string) { + ctx = ct + dht = d + valueStore = make(map[string]string) + ps = p + multiAddr = ma +} + +// Broadcasts a normal message into the network +func sendMessage(ps *pubsub.PubSub, msg string) { + + msgId := make([]byte, 10) + _, err := rand.Read(msgId) + defer func() { + if err != nil { + fmt.Fprintln(os.Stderr, err) + } + }() + if err != nil { + return + } + now := time.Now().Unix() + req := &Request{ + Type: Request_SEND_MESSAGE.Enum(), + SendMessage: &SendMessage{ + Id: msgId, + Data: []byte(msg), + Created: &now, + }, + } + msgBytes, err := req.Marshal() + if err != nil { + return + } + err = ps.Publish(pubsubTopic, msgBytes) + +} + +// Changes peer name from short String ID to a name specified via '/name newName' +func updatePeer(ps *pubsub.PubSub, id peer.ID, handle string) { + oldHandle, ok := handles[id.String()] + if !ok { + oldHandle = id.ShortString() + } + handles[id.String()] = handle + + req := &Request{ + Type: Request_UPDATE_PEER.Enum(), + UpdatePeer: &UpdatePeer{ + UserHandle: []byte(handle), + }, + } + reqBytes, err := req.Marshal() + if err != nil { + fmt.Fprintln(os.Stderr, err) + return + } + err = ps.Publish(pubsubTopic, reqBytes) + if err != nil { + fmt.Fprintln(os.Stderr, err) + return + } + + fmt.Printf("%s -> %s\n", oldHandle, handle) +} + +// Input loop +func chatInputLoop(ctx context.Context, h host.Host, ps *pubsub.PubSub, donec chan struct{}, dht *kaddht.IpfsDHT) { + + scanner := bufio.NewScanner(os.Stdin) + + for scanner.Scan() { + msg := scanner.Text() + + if strings.HasPrefix(msg, "/peers") { // Asking for list of peers '/peers' + fmt.Println("[*] Peers:") + fmt.Println(dht.RoutingTable().ListPeers()) + fmt.Println("\n\n") + + } else if strings.HasPrefix(msg, "/name ") { // Asking to change the name '/name newName' + newHandle := strings.TrimPrefix(msg, "/name ") + newHandle = strings.TrimSpace(newHandle) + updatePeer(ps, h.ID(), newHandle) + + } else if strings.HasPrefix(msg, "/get ") { // Asking to get a value from the network '/get CID' + newHandle := strings.TrimPrefix(msg, "/get ") + newHandle = strings.TrimSpace(newHandle) + getValue(dht, ctx, newHandle) + + } else if strings.HasPrefix(msg, "/put ") { // Put value in the network '/put value' + newHandle := strings.TrimPrefix(msg, "/put ") + newHandle = strings.TrimSpace(newHandle) + addValue(dht, ctx, newHandle) + + } else { // Send a normal message + sendMessage(ps, msg) + + } + } + donec <- struct{}{} +} + +// Gets info of the node providing the value for the contentID +func getProviderInfo(dht *kaddht.IpfsDHT, ctx context.Context, contentID string) (_ p.AddrInfo) { + + cid, err := c.Decode(contentID) + if err != nil { + fmt.Println(err) + } + providers, err := dht.FindProviders(ctx, cid) + + if len(providers) == 0 { + fmt.Println("No Providers Found...") + return + } + + closest_info, err3 := dht.FindPeer(ctx, providers[0].ID) + if err3 != nil { + return p.AddrInfo{ID: "me"} + } + return closest_info +} + +// Find Peer that has the Closest ID to the CID of the value to add +func findClosestPeerInfo(dht *kaddht.IpfsDHT, ctx context.Context, cid string) p.AddrInfo { + + closest_peer_channel, err := dht.GetClosestPeers(ctx, cid) + if err != nil { // Case that there is only one peer in the network + fmt.Println("Couldnt find closest peer ... ", err) + fmt.Println("Pushing in local ...") + + return p.AddrInfo{ + ID: dht.Host().ID(), + Addrs: dht.Host().Addrs(), + } + } + + closest_peer := <-closest_peer_channel + + closest_info, err3 := dht.FindPeer(ctx, closest_peer) + if err3 != nil { + fmt.Println("Couldnt find closest peer found ...", err3) + return p.AddrInfo{} + } + + return closest_info + +} + +// Function that gets executed when running '/get CID' to send a request to the peer providing the value. This peer will open a connection stream with the requester and sends it back the value +func getValue(dht *kaddht.IpfsDHT, ctx context.Context, cid_with_addr string) { + + // cid_with_addr is in the format "cid:addr" + data := strings.Split(cid_with_addr, ":") + contentID := data[0] + + // Getting informations of the node providing the value + closest_info := getProviderInfo(dht, ctx, contentID) + + // If the node providing the value is itself the one requesting it.. It returns a 'peer.AddrInfo' with 'me' as ID + if closest_info.ID == "me" { + fmt.Printf("\x1b[32m%s\x1b[0m>\n ", getValueFromStore(contentID)) + return + } + + // Opening a direct connection with the provider + err := dht.Host().Connect(ctx, closest_info) + if err != nil { + fmt.Println(err) + return + } + + stream, err := dht.Host().NewStream(ctx, closest_info.ID, protocol.ID("tcp")) + if err != nil { + fmt.Println("Couldnt open stream") + } + + getWriter(stream, cid_with_addr) +} + +// Writes in the stream a valueget request in the format "get:cid:req_multiAdd" +func getWriter(stream network.Stream, contentID string) { + rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) + writeData(rw, "get:"+contentID+":"+multiAddr) +} + +// When a peer asks for a value, the provider runs this function to send the value back to him +func getReader(req_multAddr string, contentID string) { + value := getValueFromStore(contentID) + + fmt.Println("\n!! Sending this CID this CID: \x1b[32m", contentID, " \x1b[0m to somebody!") + + // Triming off the \n at the end + req_multAddr = strings.TrimSuffix(req_multAddr, "\n") + + // Turn the destination into a multiaddr + req_maddr, err := multiaddr.NewMultiaddr(req_multAddr) + if err != nil { + fmt.Println(err) + } + + // Extract the peer ID from the multiaddr + req_info, err := p.AddrInfoFromP2pAddr(req_maddr) + if err != nil { + fmt.Println(err) + } + + // Add the address into the peer store to connect to it + dht.Host().Peerstore().AddAddr(req_info.ID, req_info.Addrs[0], peerstore.PermanentAddrTTL) + + stream, err := dht.Host().NewStream(ctx, req_info.ID, protocol.ID("tcp")) + if err != nil { + fmt.Println("Couldnt open stream: ", err) + return + } + rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) + + writeData(rw, "=> "+string(value)) +} + +// Functions get called when we execute '/put value' +func addValue(dht *kaddht.IpfsDHT, ctx context.Context, value string) { + // Creating the sha2-256 of the value provided + buf := []byte(value) + buff := sha.Sum256(buf) + mhbuf, _ := mh.EncodeName(buff[:], "sha2-256") + + // Generating a CID from that hash + cid, err := c.Cast(mhbuf) + fmt.Println("=> Get it by \x1b[32m/get", cid.String(), "\x1b[0m") + if err != nil { + fmt.Println(err) + } + + pushValue(dht, ctx, cid.String(), value) +} + +// function that searches for the closest peer to the CID of the value to be pushed and establishes a direct connection with it to send the value +func pushValue(dht *kaddht.IpfsDHT, ctx context.Context, cid string, value string) { + + // Find closest peer to the CID + closest_info := findClosestPeerInfo(dht, ctx, cid) + + // If the closest peer to the value is the one pushing it to the network, it gets stored locally + if closest_info.ID == dht.Host().ID() { + fmt.Println("Pushing : \x1b[32m", cid, ":", value, "\x1b[0m") + pushToStore(cid, value) + + c, err := c.Decode(cid) + if err != nil { + fmt.Println(err) + return + } + + // Broadcasting that we are holding that value + dht.Provide(ctx, c, true) + return + } + + err4 := dht.Host().Connect(ctx, closest_info) + if err4 != nil { + fmt.Println(err4) + return + } + + stream, err5 := dht.Host().NewStream(ctx, closest_info.ID, protocol.ID("tcp")) + if err5 != nil { + fmt.Println("Stream open failed", err5) + } else { + rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) + putWriter(rw, cid+":"+value) + } + +} + +// Writes the pushValue data in the stream +func putWriter(rw *bufio.ReadWriter, data string) { + writeData(rw, data) +} + +// Gets executed when there is an incoming connection +func handleStream(stream network.Stream) { + + rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) + go readData(rw) + + wg.Add(1) + wg.Wait() + if stream.Close() != nil { + fmt.Println("Couldnt close stream") + } +} + +// Reads the data from the stream opened from a requesting node +func readData(rw *bufio.ReadWriter) { + defer wg.Done() + + str, err := rw.ReadString('\n') + if err != nil { + fmt.Println("Error reading from buffer") + panic(err) + } + + if str == "" { + return + } + if str != "\n" { + // Green console colour: \x1b[32m + // Reset console colour: \x1b[0m + data := strings.Split(str, ":") + + if len(data) == 1 { // "value" + + fmt.Printf("\x1b[32m%s\x1b[0m>\n ", str) + + } else if len(data) == 3 { // "get:cid:multiAddr" + + getReader(data[2], data[1]) + + } else if len(data) == 2 { // "cid:value" + fmt.Printf("Someone just asked me to keep this with me: \x1b[32m%s\x1b[0m> ", data[0]+":"+data[1]+"\n") + + pushToStore(data[0], data[1]) + + cid, _ := c.Decode(data[0]) + + err := dht.Provide(ctx, cid, true) + if err != nil { + fmt.Println("Provider Problem: ", err) + } + } + } + +} + +// Write data to the buffer to be send to the other person +func writeData(rw *bufio.ReadWriter, data string) { + + _, err := rw.WriteString(fmt.Sprintf("%s\n", data)) + if err != nil { + fmt.Println("Error writing to buffer") + panic(err) + } + err = rw.Flush() + if err != nil { + fmt.Println("Error flushing buffer") + panic(err) + } + +} + +// Access the map to get the value or to push them +func pushToStore(cid string, value string) { + valueStore[cid] = value + +} + +func getValueFromStore(cid string) string { + return valueStore[cid] +} diff --git a/dht-for-values-storage/pubsub.go b/dht-for-values-storage/pubsub.go new file mode 100644 index 0000000..c764a60 --- /dev/null +++ b/dht-for-values-storage/pubsub.go @@ -0,0 +1,55 @@ +package main + +import ( + "context" + "fmt" + "os" + + peer "github.com/libp2p/go-libp2p-peer" + pubsub "github.com/libp2p/go-libp2p-pubsub" +) + +var handles = map[string]string{} + +const pubsubTopic = "/libp2p/example/chat/1.0.0" + +func pubsubMessageHandler(id peer.ID, msg *SendMessage) { + handle, ok := handles[id.String()] + if !ok { + handle = id.ShortString() + } + fmt.Printf("%s: %s\n", handle, msg.Data) +} + +func pubsubUpdateHandler(id peer.ID, msg *UpdatePeer) { + oldHandle, ok := handles[id.String()] + if !ok { + oldHandle = id.ShortString() + } + handles[id.String()] = string(msg.UserHandle) + fmt.Printf("%s -> %s\n", oldHandle, msg.UserHandle) +} + +func pubsubHandler(ctx context.Context, sub *pubsub.Subscription) { + for { + msg, err := sub.Next(ctx) + if err != nil { + fmt.Fprintln(os.Stderr, err) + continue + } + + req := &Request{} + err = req.Unmarshal(msg.Data) + if err != nil { + fmt.Fprintln(os.Stderr, err) + continue + } + + switch *req.Type { + case Request_SEND_MESSAGE: + pubsubMessageHandler(msg.GetFrom(), req.SendMessage) + case Request_UPDATE_PEER: + pubsubUpdateHandler(msg.GetFrom(), req.UpdatePeer) + } + } +} From bc5d2aa8a132bd0028a192e8d6f7e9e945e5fd43 Mon Sep 17 00:00:00 2001 From: CHAMI Rachid Date: Thu, 8 Aug 2019 23:32:07 -0700 Subject: [PATCH 2/2] Adding the README --- dht-for-values-storage/README.md | 72 ++++++++++++++++++++++++++++++++ 1 file changed, 72 insertions(+) create mode 100644 dht-for-values-storage/README.md diff --git a/dht-for-values-storage/README.md b/dht-for-values-storage/README.md new file mode 100644 index 0000000..f125362 --- /dev/null +++ b/dht-for-values-storage/README.md @@ -0,0 +1,72 @@ +# p2p Chat and storing values [using PubSub and DHT] +This programs demonstrates the use of a pubsub pattern to broadcast values in the network. Get them stored into some peer (not always the one adding them) and being able to retrieve them from any node by having the CID of the value wanted. + +This example is based on the **ipfs-camp-2019 pubsub code**. + +## How to build + > go get -v -d ./... + + > go build . + + # Code explanation + The functions added to the **ipfs-camp-2019** last code behave in the following way: + + +- /get cid: + - getProviderrInfo (dht, ctx, cid) peer.AddrInfo + - getValue (dht, ctx, "cid:addr") : + - getProviderInfo + - Connect To it + - GetWriter + +- /put value: + - addValue(dht, ctx, value) + - FindClosestPeerInfo(dht, ctx, cid) peer.AddrInfo + - pushValue (dht, ctx, cid, value) : + - FindClosestPeer + - Connect to it + - PutWriter + +- Handle Stream: + - Reading/Writing into the stream + - ReadData(stream) : + - If it is “cid:value”: + - pushToStore(cid:value) + - provide it to the network + - If it is “get:cid:addr”: + - getReader(stream,cid+addr) + - If it is “value” + - PrintItToScreen + - WriteData(stream, valueToBeWritten) + +- Put Writer: + - WriteData(stream, cid:value) +- Get Writer: + - Writedata(stream, get:cid:addr) +- Get Reader: + - getValueFromStore(cid) string + - WriteData(stream, value) + +# How it works + > ./dht-for-values-storage -h + +``` + This programs demonstrates the use of a pubsub pattern to broadcast values in the network. + Get them stored into some peer (not always the one adding them) and being able to retrieve them from any + node that have the CID of the value wanted. + +Usage: Run './start + -h Display Help + -host string + The bootstrap node host listen address + (default "0.0.0.0") + -pid string + Sets a protocol id for stream headers (default "/chat/1.1.0") + -port int + node listen port (default 4001) + -topic string + Unique string to identify group of nodes (default "/libp2p/example/chat/1.0.0") +``` + +#### Author +CHAMI Rachid