Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,11 @@ It’s a toolkit you can adopt incrementally.
## Current Focus

- Messaging abstractions
- Network abstractions (HTTP & gRPC)
- RabbitMQ adapter (v0.1)
- Kafka adapter (v0.1)
- HTTP adapter (v0.1)
- gRPC adapter (v0.1)

---

Expand All @@ -51,3 +54,29 @@ producer := kafka.NewProducer(conn, "orders")

producer.Publish(ctx, []byte("key"), []byte(`{"id":"123"}`))
```

### HTTP Client with Retry

```go
client := http.NewClient(10 * time.Second)
defer client.Close()

resp, _ := client.Get(ctx, "https://api.example.com/users",
network.WithHeader("Authorization", "Bearer token"),
network.WithRetry(3, 100*time.Millisecond, 2*time.Second, 2.0),
)
```

### gRPC Client with Retry

```go
client, _ := grpc.NewClient("localhost:50051", 10*time.Second)
defer client.Close()

req := &HelloRequest{Name: "microkit"}
resp := &HelloResponse{}

client.Call(ctx, "/hello.HelloService/SayHello", req, resp,
network.WithRetry(3, 100*time.Millisecond, 2*time.Second, 2.0),
)
```
74 changes: 74 additions & 0 deletions adapters/grpc/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,74 @@
package grpc

import (
"context"
"time"

"github.com/festech-cloud/microkit/network"
"github.com/festech-cloud/microkit/internal/retry"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

type Client struct {
conn *grpc.ClientConn
}

func NewClient(target string, timeout time.Duration) (*Client, error) {
_, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

conn, err := grpc.NewClient(target,
grpc.WithTransportCredentials(insecure.NewCredentials()),
)
if err != nil {
return nil, err
}

return &Client{conn: conn}, nil
}

func (c *Client) Get(ctx context.Context, url string, opts ...network.Option) (*network.Response, error) {
panic("HTTP methods not supported by gRPC client")
}

func (c *Client) Post(ctx context.Context, url string, body []byte, opts ...network.Option) (*network.Response, error) {
panic("HTTP methods not supported by gRPC client")
}

func (c *Client) Put(ctx context.Context, url string, body []byte, opts ...network.Option) (*network.Response, error) {
panic("HTTP methods not supported by gRPC client")
}

func (c *Client) Delete(ctx context.Context, url string, opts ...network.Option) (*network.Response, error) {
panic("HTTP methods not supported by gRPC client")
}

func (c *Client) Patch(ctx context.Context, url string, body []byte, opts ...network.Option) (*network.Response, error) {
panic("HTTP methods not supported by gRPC client")
}

func (c *Client) Call(ctx context.Context, method string, req any, resp any, opts ...network.Option) error {
config := &network.Config{}
for _, opt := range opts {
opt(config)
}

if config.Timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, config.Timeout)
defer cancel()
}

if config.RetryConfig != nil {
return retry.Execute(ctx, *config.RetryConfig, func() error {
return c.conn.Invoke(ctx, method, req, resp)
})
}

return c.conn.Invoke(ctx, method, req, resp)
}

func (c *Client) Close() error {
return c.conn.Close()
}
112 changes: 112 additions & 0 deletions adapters/http/client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
package http

import (
"bytes"
"context"
"io"
"net/http"
"time"

"github.com/festech-cloud/microkit/network"
"github.com/festech-cloud/microkit/internal/retry"
)

type Client struct {
client *http.Client
}

func NewClient(timeout time.Duration) *Client {
return &Client{
client: &http.Client{
Timeout: timeout,
},
}
}

func (c *Client) Get(ctx context.Context, url string, opts ...network.Option) (*network.Response, error) {
return c.do(ctx, "GET", url, nil, opts...)
}

func (c *Client) Post(ctx context.Context, url string, body []byte, opts ...network.Option) (*network.Response, error) {
return c.do(ctx, "POST", url, body, opts...)
}

func (c *Client) Put(ctx context.Context, url string, body []byte, opts ...network.Option) (*network.Response, error) {
return c.do(ctx, "PUT", url, body, opts...)
}

func (c *Client) Patch(ctx context.Context, url string, body []byte, opts ...network.Option) (*network.Response, error) {
return c.do(ctx, "PATCH", url, body, opts...)
}

func (c *Client) Delete(ctx context.Context, url string, opts ...network.Option) (*network.Response, error) {
return c.do(ctx, "DELETE", url, nil, opts...)
}

func (c *Client) Call(ctx context.Context, method string, req interface{}, resp interface{}, opts ...network.Option) error {
panic("gRPC not supported by HTTP client")
}

func (c *Client) do(ctx context.Context, method, url string, body []byte, opts ...network.Option) (*network.Response, error) {
config := &network.Config{}
for _, opt := range opts {
opt(config)
}

if config.RetryConfig != nil {
var resp *network.Response
err := retry.Execute(ctx, *config.RetryConfig, func() error {
var err error
resp, err = c.doRequest(ctx, method, url, body, config)
return err
})
return resp, err
}

return c.doRequest(ctx, method, url, body, config)
}

func (c *Client) doRequest(ctx context.Context, method, url string, body []byte, config *network.Config) (*network.Response, error) {
var bodyReader io.Reader
if body != nil {
bodyReader = bytes.NewReader(body)
}

req, err := http.NewRequestWithContext(ctx, method, url, bodyReader)
if err != nil {
return nil, err
}

for k, v := range config.Headers {
req.Header.Set(k, v)
}

resp, err := c.client.Do(req)
if err != nil {
return nil, err
}
defer resp.Body.Close()

respBody, err := io.ReadAll(resp.Body)
if err != nil {
return nil, err
}

headers := make(map[string]string)
for k, v := range resp.Header {
if len(v) > 0 {
headers[k] = v[0]
}
}

return &network.Response{
StatusCode: resp.StatusCode,
Headers: headers,
Body: respBody,
}, nil
}

func (c *Client) Close() error {
c.client.CloseIdleConnections()
return nil
}
39 changes: 37 additions & 2 deletions adapters/kafka/consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,21 @@ import (
"log"

kafka "github.com/segmentio/kafka-go"
"github.com/festech-cloud/microkit/internal/retry"
)

type ConsumerConfig struct {
RetryConfig retry.Config
EnableDLQ bool
DLQTopic string
}

type Consumer struct {
conn *Connection
topic string
groupID string
r *kafka.Reader
config ConsumerConfig
}

func NewConsumer(conn *Connection, topic, groupID string) *Consumer {
Expand All @@ -24,6 +32,16 @@ func NewConsumer(conn *Connection, topic, groupID string) *Consumer {
}
}

func NewConsumerWithConfig(conn *Connection, topic, groupID string, config ConsumerConfig) *Consumer {
return &Consumer{
conn: conn,
topic: topic,
groupID: groupID,
r: conn.Reader(topic, groupID),
config: config,
}
}

func (c *Consumer) Subscribe(ctx context.Context, handler func([]byte) error) {
go func() {
for {
Expand All @@ -33,13 +51,30 @@ func (c *Consumer) Subscribe(ctx context.Context, handler func([]byte) error) {
continue
}

if err := handler(m.Value); err != nil {
if c.config.RetryConfig.MaxAttempts > 0 {
err = retry.Execute(ctx, c.config.RetryConfig, func() error {
return handler(m.Value)
})
} else {
err = handler(m.Value)
}

if err != nil {
log.Printf("Handler error, message: %s, err: %v\n", string(m.Value), err)
if c.config.EnableDLQ {
c.sendToDLQ(ctx, m.Value)
}
}
}
}()
}

func (c *Consumer) sendToDLQ(ctx context.Context, message []byte) {
producer := NewProducer(c.conn, c.config.DLQTopic)
defer producer.Close()
producer.Publish(ctx, nil, message)
}

func (c *Consumer) Close() error {
return c.r.Close()
}
}
27 changes: 27 additions & 0 deletions examples/network/grpc/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package main

import (
"fmt"
"log"
"time"

"github.com/festech-cloud/microkit/adapters/grpc"
)

func main() {
client, err := grpc.NewClient("localhost:50051", 10*time.Second)
if err != nil {
log.Fatal(err)
}
defer client.Close()

// Example with proper protobuf messages (requires .proto files)
// req := &pb.HelloRequest{Name: "microkit"}
// resp := &pb.HelloResponse{}
//
// err = client.Call(ctx, "/hello.HelloService/SayHello", req, resp,
// network.WithRetry(3, 100*time.Millisecond, 2*time.Second, 2.0),
// )

fmt.Println("gRPC client ready - add your protobuf messages")
}
29 changes: 29 additions & 0 deletions examples/network/http/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package main

import (
"context"
"fmt"
"log"
"time"

"github.com/festech-cloud/microkit/adapters/http"
"github.com/festech-cloud/microkit/network"
)

func main() {
client := http.NewClient(10 * time.Second)
defer client.Close()

ctx := context.Background()

// HTTP GET with retry
resp, err := client.Get(ctx, "https://httpbin.org/get",
network.WithHeader("User-Agent", "microkit/1.0"),
network.WithRetry(3, 100*time.Millisecond, 2*time.Second, 2.0),
)
if err != nil {
log.Fatal(err)
}

fmt.Printf("Response: %d\n", resp.StatusCode)
}
Loading