diff --git a/internal/producer/producer.go b/internal/producer/producer.go index f4fec6a7..07ca10e6 100644 --- a/internal/producer/producer.go +++ b/internal/producer/producer.go @@ -2,298 +2,14 @@ package producer import ( - "bytes" - "database/sql" - "encoding/hex" "fmt" - "io/ioutil" - "log" - "net" - "os" - "os/signal" "runtime" - "syscall" "time" - "github.com/SIGBlockchain/project_aurum/internal/accountstable" "github.com/SIGBlockchain/project_aurum/internal/block" - "github.com/SIGBlockchain/project_aurum/internal/blockchain" - "github.com/SIGBlockchain/project_aurum/internal/constants" - "github.com/SIGBlockchain/project_aurum/internal/contracts" - "github.com/SIGBlockchain/project_aurum/internal/hashing" - "github.com/SIGBlockchain/project_aurum/internal/validation" ) -type Flags struct { - Help *bool - Debug *bool - Version *bool - Height *bool - Genesis *bool - Test *bool - Globalhost *bool - MemoryStats *bool - Logs *string - Port *string - Interval *string - InitSupply *uint64 - NumBlocks *uint64 -} - -var version = uint16(1) -var ledger = "blockchain.dat" -var metadataTable = constants.MetadataTable - -// TODO: Will need to change the name to support get functions -var accountsTable = constants.AccountsTable - -var SecretBytes = hashing.New([]byte("aurum"))[8:16] - -// This stores connection information for the producer -type BlockProducer struct { - Server net.Listener - NewConnection chan net.Conn - Logger *log.Logger - // Add ledger name, metadata name, and contract table name - // Slice of Contracts representing contract pool -} - -// Should contain version, payload type, payload size -type Header struct{} - -// Messages have headers and payloads -// Payloads should correspond to message type -type Message struct{} - -func RunServer(ln net.Listener, bChan chan []byte, debug bool) { - // Set logger - var lgr = log.New(ioutil.Discard, "SRVR_LOG: ", log.Ldate|log.Lmicroseconds|log.Lshortfile) - if debug { - lgr.SetOutput(os.Stdout) - } - for { - lgr.Println("Waiting for connection...") - - // Block for connection - conn, err := ln.Accept() - var nRcvd int - buf := make([]byte, 1024) - if err != nil { - lgr.Println("connection failed") - goto End - } - lgr.Printf("%s connected\n", conn.RemoteAddr()) - defer conn.Close() - - // Block for receiving message - nRcvd, err = conn.Read(buf) - if err != nil { - goto End - } - - lgr.Println("Received message from", conn.RemoteAddr()) - - // Determine the type of message - if nRcvd < 8 || (!bytes.Equal(buf[:8], SecretBytes)) { - conn.Write([]byte("No thanks.\n")) - goto End - } else { - conn.Write([]byte("Thank you.\n")) - if buf[8] == 2 { - lgr.Println("Received account info request") - // TODO: Will require a sync.Mutex lock here eventually - // Open connection to account table - dbConnection, err := sql.Open("sqlite3", constants.AccountsTable) - if err != nil { - lgr.Fatalf("Failed to open account table: %s\n", err) - } - accInfo, err := accountstable.GetAccountInfo(dbConnection, buf[9:nRcvd]) - if err := dbConnection.Close(); err != nil { - lgr.Fatalf("Failed to close account table: %s\n", err) - } - var responseMessage []byte - responseMessage = append(responseMessage, SecretBytes...) - if err != nil { - lgr.Printf("Failed to get account info for %s: %s", hex.EncodeToString(buf[9:nRcvd]), err.Error()) - responseMessage = append(responseMessage, 1) - } else { - responseMessage = append(responseMessage, 0) - if serializedAccInfo, err := accInfo.Serialize(); err == nil { - responseMessage = append(responseMessage, serializedAccInfo...) - } - } - - time.Sleep(3 * time.Second) - conn.Write(responseMessage) - goto End - } - } - - lgr.Println("Sending to channel") - // Send to channel if aurum-related message - bChan <- buf[:nRcvd] - - lgr.Println("Message successfully sent to main") - goto End - End: - lgr.Println("Closing connection.") - conn.Close() - } -} - -func ProduceBlocks(byteChan chan []byte, fl Flags, limit bool) { - // Set logger - var lgr = log.New(ioutil.Discard, "PROD_LOG: ", log.Ldate|log.Lmicroseconds|log.Lshortfile) - if *fl.Debug { - lgr.SetOutput(os.Stdout) - } - - // Open connection to metadata database - metadataConn, err := sql.Open("sqlite3", metadataTable) - if err != nil { - lgr.Fatalf("failed to open metadata table: %s\n", err) - } - defer metadataConn.Close() - - // Open connection to account database - dbConnection, err := sql.Open("sqlite3", constants.AccountsTable) - if err != nil { - lgr.Fatalf("Failed to open account table: %s\n", err) - } - defer func() { - if err := dbConnection.Close(); err != nil { - lgr.Fatalf("Failed to close account table: %v", err) - } - }() - - // Retrieve youngest block header - ledgerFile, err := os.OpenFile(ledger, os.O_RDONLY, 0644) - if err != nil { - lgr.Fatalf("failed to open ledger file: %s\n", err) - } - youngestBlockHeader, err := blockchain.GetYoungestBlockHeader(ledgerFile, metadataConn) - if err != nil { - lgr.Fatalf("failed to retrieve youngest block: %s\n", err) - } - if err := ledgerFile.Close(); err != nil { - lgr.Fatalf("Failed to close blockchain file: %v", err) - } - - // Set up SIGINT channel - signalCh := make(chan os.Signal, 1) - signal.Notify(signalCh, syscall.SIGINT, syscall.SIGTERM) - - // Initialize other variables - var numBlocksGenerated uint64 - var dataPool []contracts.Contract - var ms runtime.MemStats - - // Determine production interval and start trigger goroutine - productionInterval, err := time.ParseDuration(*fl.Interval) - if err != nil { - lgr.Fatalln("failed to parse block production interval") - } else { - lgr.Println("block production interval: " + *fl.Interval) - } - var intervalChannel = make(chan bool) - go triggerInterval(intervalChannel, productionInterval) - - // Main loop - for { - var chainHeight = youngestBlockHeader.Height - select { - case message := <-byteChan: - - // If it's a contract, add it to the contract pool - switch message[8] { - case 1: - lgr.Println("Received contract") - var newContract contracts.Contract - if err := newContract.Deserialize(message[9:]); err == nil { - // TODO: Validate the contract prior to adding - if err := validation.ValidateContract(dbConnection, &newContract); err != nil { - lgr.Println("Invalid contract because: " + err.Error()) - } else { - dataPool = append(dataPool, newContract) - lgr.Println("Valid contract") - } - } - break - } - case <-intervalChannel: - // Triggered if it's time to produce a block - lgr.Printf("block ready for production: #%d\n", chainHeight+1) - // lgr.Printf("Production block dataPool: %v", dataPool) - if newBlock, err := block.New(version, chainHeight+1, block.HashBlockHeader(youngestBlockHeader), dataPool); err != nil { - lgr.Fatalf("failed to add block %s", err.Error()) - os.Exit(1) - } else { - - // Add the block - ledgerFile, err := os.OpenFile(constants.BlockchainFile, os.O_APPEND|os.O_WRONLY, 0644) - if err != nil { - lgr.Fatalf("failed to open ledger file: %s\n", err) - } - err = blockchain.AddBlock(newBlock, ledgerFile, metadataConn) - if err != nil { - lgr.Fatalf("failed to add block: %s", err.Error()) - os.Exit(1) - } else { - if err := ledgerFile.Close(); err != nil { - log.Fatalf("Failed to close blockchain file: %v", err) - } - lgr.Printf("block produced: #%d\n", chainHeight+1) - numBlocksGenerated++ - youngestBlockHeader = newBlock.GetHeader() - go triggerInterval(intervalChannel, productionInterval) - - // TODO: for each contract in the dataPool, update the accounts table - // TODO: will require a sync.Mutex for the accounts table - dbConn, err := sql.Open("sqlite3", constants.AccountsTable) - if err != nil { - lgr.Fatalf("Failed to connect to accounts database: %v", err) - } - for _, contract := range dataPool { - err := accountstable.ExchangeAndUpdateAccounts(dbConn, &contract) - if err != nil { - lgr.Printf("Failed to add contract to accounts database: %v", err) - } - } - dbConn.Close() - dataPool = nil - - // Memory stats - if *fl.MemoryStats { - runtime.ReadMemStats(&ms) - printMemstats(ms) - } - // If in test mode, break the loop - if *fl.Test { - lgr.Printf("Test mode: breaking loop") - return - } - - // If reached limit of blocks desired to be generated, break the loop - if limit && (numBlocksGenerated >= *fl.NumBlocks) { - lgr.Printf("Limit reached: # blocks generated: %d, blocks desired: %d\n", numBlocksGenerated, *fl.NumBlocks) - return - } - } - - } - - case <-signalCh: - - // If you receive a SIGINT, exit the loop - fmt.Print("\r") - lgr.Println("Interrupt signal encountered, program terminating.") - return - } - - } -} - -func printMemstats(ms runtime.MemStats) { +func PrintMemstats(ms runtime.MemStats) { // useful commands: go run -gcflags='-m -m' main.go
fmt.Printf("Bytes of allocated heap objects: %d", ms.Alloc) fmt.Printf("Cumulative bytes allocated for heap objects: %d", ms.TotalAlloc) @@ -301,19 +17,19 @@ func printMemstats(ms runtime.MemStats) { fmt.Printf("Count of heap objects freed: %d", ms.Frees) } -func triggerInterval(intervalChannel chan bool, productionInterval time.Duration) { +func TriggerInterval(intervalChannel chan bool, productionInterval time.Duration) { // Triggers block production case time.Sleep(productionInterval) intervalChannel <- true } -func calculateInterval(youngestBlockHeader block.BlockHeader, productionInterval time.Duration, intervalChannel chan bool) { +func CalculateInterval(youngestBlockHeader block.BlockHeader, productionInterval time.Duration, intervalChannel chan bool) { var lastTimestamp = time.Unix(0, youngestBlockHeader.Timestamp) timeSince := time.Since(lastTimestamp) if timeSince.Nanoseconds() >= productionInterval.Nanoseconds() { - go triggerInterval(intervalChannel, time.Duration(0)) + go TriggerInterval(intervalChannel, time.Duration(0)) } else { diff := productionInterval.Nanoseconds() - timeSince.Nanoseconds() - go triggerInterval(intervalChannel, time.Duration(diff)) + go TriggerInterval(intervalChannel, time.Duration(diff)) } } diff --git a/internal/producer/producer_test.go b/internal/producer/producer_test.go index 377ea8f7..c1beb1a3 100644 --- a/internal/producer/producer_test.go +++ b/internal/producer/producer_test.go @@ -26,372 +26,3 @@ import ( ) var removeFiles = true - -func TestRunServer(t *testing.T) { - senderPrivateKey, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) - recipientPrivateKey, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) - encodedRecipientPublicKey, _ := publickey.Encode(&recipientPrivateKey.PublicKey) - recipientPublicKeyHash := hashing.New(encodedRecipientPublicKey) - contract, _ := contracts.New(1, senderPrivateKey, recipientPublicKeyHash, 1000, 1) - contract.Sign(senderPrivateKey) - serializedContract, err := contract.Serialize() - - var contractMessage []byte - contractMessage = append(contractMessage, SecretBytes...) - contractMessage = append(contractMessage, 1) - contractMessage = append(contractMessage, serializedContract...) - type testArg struct { - name string - messageToBeSent []byte - messageToBeRcvd []byte - } - testArgs := []testArg{ - { - name: "Regular message", - messageToBeSent: []byte("hello\n"), - messageToBeRcvd: []byte("No thanks.\n"), - }, - { - name: "Aurum message", - messageToBeSent: SecretBytes, - messageToBeRcvd: []byte("Thank you.\n"), - }, - { - name: "Contract message", - messageToBeSent: contractMessage, - messageToBeRcvd: []byte("Thank you.\n"), - }, - } - ln, err := net.Listen("tcp", "localhost:13131") - if err != nil { - t.Errorf("failed to startup listener") - } - byteChan := make(chan []byte) - buf := make([]byte, 1024) - go RunServer(ln, byteChan, false) - for _, arg := range testArgs { - conn, err := net.Dial("tcp", "localhost:13131") - if err != nil { - t.Errorf("failed to connect to server") - } - _, err = conn.Write(arg.messageToBeSent) - if err != nil { - t.Errorf("failed to send message") - } - nRead, err := conn.Read(buf) - if err != nil { - t.Errorf("failed to read from connections:\n%s", err.Error()) - } - if !bytes.Equal(buf[:nRead], arg.messageToBeRcvd) { - t.Errorf("did not received desired message:\n%s != %s", string(buf[:nRead]), string(arg.messageToBeRcvd)) - } - if arg.name != "Regular message" { - res := <-byteChan - if !bytes.Equal(res, arg.messageToBeSent) { - t.Errorf("result does not match:\n%s != %s", string(res), string(arg.messageToBeSent)) - } - if arg.name == "Contract message" { - var contract contracts.Contract - if err := contract.Deserialize(res[9:]); err != nil { - t.Errorf("failed to deserialize contract:\n%s", err.Error()) - } - if !bytes.Equal(res[9:], serializedContract) { - t.Errorf("serialized contracts do not match:\n%v != %v", res[9:], serializedContract) - } - } - } - } -} - -func TestByteChannel(t *testing.T) { - t.SkipNow() - genesisHashes, err := genesis.ReadGenesisHashes() - if err != nil { - t.Errorf("failed to read genesis hashes:\n%s", err.Error()) - } - genesisBlock, err := genesis.BringOnTheGenesis(genesisHashes, 1000) - if err != nil { - t.Errorf("failed to create genesis block:\n%s", err.Error()) - } - if err := blockchain.Airdrop(ledger, metadataTable, constants.AccountsTable, genesisBlock); err != nil { - t.Errorf("failed to perform air drop:\n%s", err.Error()) - } - ledgerFile, err := os.OpenFile("blockchain.dat", os.O_RDONLY, 0644) - metadataConn, _ := sql.Open("sqlite3", constants.MetadataTable) - defer func() { - if removeFiles { - ledgerFile.Close() - metadataConn.Close() - if err := os.Remove("blockchain.dat"); err != nil { - t.Errorf("failed to remove blockchain.dat:\n%s", err.Error()) - } - if err := os.Remove(constants.MetadataTable); err != nil { - t.Errorf("failed to remove metadatata.tab:\n%s", err.Error()) - } - if err := os.Remove(constants.AccountsTable); err != nil { - t.Errorf("failed to remove accounts.db:\n%s", err.Error()) - } - } - }() - ln, err := net.Listen("tcp", "localhost:9001") - if err != nil { - t.Errorf("failed to start server:\n%s", err.Error()) - } - byteChan := make(chan []byte) - debug := false - - go RunServer(ln, byteChan, debug) - testMode := true - prodInterval := "2000ms" - memStats := false - fl := Flags{ - Debug: &debug, - Interval: &prodInterval, - Test: &testMode, - MemoryStats: &memStats, - } - senderPrivateKey, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) - recipientPrivateKey, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) - encodedRecipientPublicKey, _ := publickey.Encode(&recipientPrivateKey.PublicKey) - recipientPublicKeyHash := hashing.New(encodedRecipientPublicKey) - contract, _ := contracts.New(1, senderPrivateKey, recipientPublicKeyHash, 1000, 1) - contract.Sign(senderPrivateKey) - serializedContract, _ := contract.Serialize() - - var contractMessage []byte - contractMessage = append(contractMessage, SecretBytes...) - contractMessage = append(contractMessage, 1) - contractMessage = append(contractMessage, serializedContract...) - - conn, err := net.Dial("tcp", "localhost:9001") - if err != nil { - t.Errorf("failed to connect to server:\n%s", err.Error()) - } - _, err = conn.Write(contractMessage) - if err != nil { - t.Errorf("failed to send message") - } - ProduceBlocks(byteChan, fl, true) - - youngestBlock, err := blockchain.GetYoungestBlock(ledgerFile, metadataConn) - if err != nil { - t.Errorf("failed to get youngest block:\n%s", err.Error()) - } - data := youngestBlock.Data[0] - var compContract contracts.Contract - if err := compContract.Deserialize(data); err != nil { - t.Errorf("failed to deserialize data:\n%s", err.Error()) - } - if !bytes.Equal(serializedContract, data) { - t.Errorf("data does not match:\n%s != %s", string(serializedContract), string(data)) - } -} - -func TestResponseToAccountInfoRequest(t *testing.T) { - if err := wallet.SetupWallet(); err != nil { - t.Errorf("failed to setup wallet:\n%s", err.Error()) - } - defer func() { - if err := os.Remove("aurum_wallet.json"); err != nil { - t.Errorf("failed to remove aurum_wallet.json:\n%s", err.Error()) - } - }() - dbName := constants.AccountsTable - dbc, _ := sql.Open("sqlite3", dbName) - defer func() { - err := dbc.Close() - if err != nil { - t.Errorf("Failed to remove database: %s", err) - } - err = os.Remove(dbName) - if err != nil { - t.Errorf("Failed to remove database: %s", err) - } - }() - statement, _ := dbc.Prepare(sqlstatements.CREATE_ACCOUNT_BALANCES_TABLE) - statement.Exec() - walletAddress, err := wallet.GetWalletAddress() - // t.Logf("Wallet address: %v", walletAddress) - if err != nil { - t.Errorf("failed to retrieve wallet address:\n%s", err.Error()) - } - ln, err := net.Listen("tcp", "localhost:10500") - if err != nil { - t.Errorf("failed to start server:\n%s", err.Error()) - } - byteChan := make(chan []byte) - debug := false - - go RunServer(ln, byteChan, debug) - - // Request - var requestInfoMessage []byte - requestInfoMessage = append(requestInfoMessage, SecretBytes...) - requestInfoMessage = append(requestInfoMessage, 2) - requestInfoMessage = append(requestInfoMessage, walletAddress...) - conn, err := net.Dial("tcp", "localhost:10500") - if err != nil { - t.Errorf("failed to connect to server:\n%s", err.Error()) - } - // t.Logf("Sending message: %v", requestInfoMessage) - if _, err := conn.Write(requestInfoMessage); err != nil { - t.Errorf("failed to send request info message:\n%s", err.Error()) - } - buf := make([]byte, 1024) - nRead, err := conn.Read(buf) - if err != nil { - t.Errorf("failed to read from socket:\n%s", err.Error()) - } - if !bytes.Equal(buf[:nRead], []byte("Thank you.\n")) { - t.Errorf("expected different response: %v != %v", string(buf[:nRead]), string([]byte("Thank you\n"))) - } - buf = make([]byte, 1024) - nRead, err = conn.Read(buf) - if err != nil { - t.Errorf("failed to read from socket:\n%s", err.Error()) - } - if buf[8] != 1 { - t.Errorf("failed to get errored response from producer") - } - conn.Close() - - // Check for successful insertion - if err := accountstable.InsertAccountIntoAccountBalanceTable(dbc, walletAddress, 1000); err != nil { - t.Errorf("failed to insert sender account") - } - _, err = accountstable.GetAccountInfo(dbc, walletAddress) - if err != nil { - t.Errorf("failed to retrieve account info:\n%s", err.Error()) - } - // t.Logf("account info: %v", accInfo) - dbc.Close() - - // New request - conn, err = net.Dial("tcp", "localhost:10500") - if err != nil { - t.Errorf("failed to connect to server:\n%s", err.Error()) - } - // t.Logf("Sending message: %v", requestInfoMessage) - if _, err := conn.Write(requestInfoMessage); err != nil { - t.Errorf("failed to send request info message:\n%s", err.Error()) - } - buf = make([]byte, 1024) - nRead, err = conn.Read(buf) - if err != nil { - t.Errorf("failed to read from socket:\n%s", err.Error()) - } - if !bytes.Equal(buf[:nRead], []byte("Thank you.\n")) { - t.Errorf("expected different response: %v != %v", string(buf[:nRead]), string([]byte("Thank you\n"))) - } - buf = make([]byte, 1024) - nRead, err = conn.Read(buf) - if err != nil { - t.Errorf("failed to read from socket:\n%s", err.Error()) - } - - if buf[8] != 0 { - t.Errorf("failed to get success response from producer: %d != %d", buf[8], 0) - } - var accInfo accountinfo.AccountInfo - if err := accInfo.Deserialize(buf[9:nRead]); err != nil { - t.Errorf("failed to deserialize account info:\n%s", err.Error()) - } -} - -func TestData_Serialize(t *testing.T) { - senderPrivateKey, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) - encodedSenderPublicKey, _ := publickey.Encode(&senderPrivateKey.PublicKey) - spkh := hashing.New(encodedSenderPublicKey) - initialContract, _ := contracts.New(1, nil, spkh, 1000, 0) - tests := []struct { - name string - // d *Data - d *contracts.Contract - }{ - { - // d: &Data{ - // Hdr: DataHeader{ - // Version: 1, - // Type: 0, - // }, - // Bdy: initialContract, - // }, - d: initialContract, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - // got, err := tt.d.Serialize() - got, err := initialContract.Serialize() - if err != nil { - t.Errorf(err.Error()) - } - defer func() { - if r := recover(); r != nil { - t.Errorf("panicked, check indexing") - } - }() - // serializedInitialContract, err := tt.d.Serialize() - serializedInitialContract, err := initialContract.Serialize() - if err != nil { - t.Errorf(err.Error()) - } - serializedVersion := make([]byte, 2) - binary.LittleEndian.PutUint16(serializedVersion, 1) - serializedType := make([]byte, 2) - binary.LittleEndian.PutUint16(serializedType, 0) - if !bytes.Equal(got[:2], serializedVersion) { - t.Errorf(fmt.Sprintf("Data header version serialization does not match. Wanted: %v, got: %v", serializedVersion, got[:2])) - } - if !bytes.Equal(got[2:4], serializedType) { - t.Errorf(fmt.Sprintf("Data header type serialization does not match. Wanted: %v, got: %v", serializedVersion, got[2:4])) - } - if !bytes.Equal(got[4:], serializedInitialContract[4:]) { //had to change serializedInitialContract to serializedInitialContract[4:] - t.Errorf(fmt.Sprintf("Data header body serialization does not match. Wanted: %v, got: %v", serializedVersion, got[4:])) - } - }) - } -} - -func TestData_Deserialize(t *testing.T) { - senderPrivateKey, _ := ecdsa.GenerateKey(elliptic.P256(), rand.Reader) - encodedSenderPublicKey, _ := publickey.Encode(&senderPrivateKey.PublicKey) - spkh := hashing.New(encodedSenderPublicKey) - initialContract, _ := contracts.New(1, nil, spkh, 1000, 0) - // someData := &Data{ - // Hdr: DataHeader{ - // Version: 1, - // Type: 0, - // }, - // Bdy: initialContract, - // } - someData := initialContract - serializedsomeData, _ := someData.Serialize() - type args struct { - serializedData []byte - } - tests := []struct { - name string - d *contracts.Contract - args args - wantErr bool - }{ - { - // d: &Data{}, - d: &contracts.Contract{}, - args: args{ - serializedData: serializedsomeData, - }, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if err := tt.d.Deserialize(tt.args.serializedData); (err != nil) != tt.wantErr { - t.Errorf("Data.Deserialize() error = %v, wantErr %v", err, tt.wantErr) - } - if !reflect.DeepEqual(tt.d, someData) { - t.Errorf("Deserialized Data struct failed to match") - } - }) - } -}