Skip to content

Commit 71b4f41

Browse files
committed
[기능추가 ]Kafka ProducerActor 추가
1 parent 3f5bb7f commit 71b4f41

16 files changed

Lines changed: 3612 additions & 50 deletions

File tree

.github/workflows/dotnet-core.yml

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,6 @@ jobs:
2424
run: dotnet restore AkkaDotModule/AkkaDotModule.csproj
2525
- name: Build
2626
run: dotnet build AkkaDotModule/AkkaDotModule.csproj --configuration Release --no-restore
27-
- name: UnitTest
28-
run: dotnet test TestAkkaDotModule
2927
- name: Add private GitHub registry to NuGet
3028
run: dotnet nuget add source https://nuget.pkg.github.com/psmon/index.json -n psmon.github -u ${{ secrets.NUGET_USER }} -p ${{ secrets.NUGET_TOKEN }} --store-password-in-clear-text
3129
- name: Push generated package to GitHub registry

AkkaDotBootApi/AkkaDotBootApi.csproj

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,16 @@
66
<GenerateDocumentationFile>true</GenerateDocumentationFile>
77
</PropertyGroup>
88

9+
<ItemGroup>
10+
<None Remove="cacert.pem" />
11+
</ItemGroup>
12+
13+
<ItemGroup>
14+
<Content Include="cacert.pem">
15+
<CopyToOutputDirectory>PreserveNewest</CopyToOutputDirectory>
16+
</Content>
17+
</ItemGroup>
18+
919
<ItemGroup>
1020
<PackageReference Include="Microsoft.VisualStudio.Azure.Containers.Tools.Targets" Version="1.10.9" />
1121
<PackageReference Include="NLog.Web.AspNetCore" Version="4.8.1" />

AkkaDotBootApi/Controllers/KafkaController.cs

Lines changed: 35 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,7 @@
1-
using AkkaDotModule.Kafka;
1+
using Akka.Actor;
2+
using AkkaDotModule.Config;
3+
using AkkaDotModule.Kafka;
4+
using AkkaDotModule.Models;
25
using Microsoft.AspNetCore.Mvc;
36
using System.Collections.Generic;
47

@@ -10,19 +13,23 @@ public class KafkaController : ControllerBase
1013
{
1114
private ProducerSystem producerSystem;
1215

16+
private IActorRef producerActor;
17+
1318
public KafkaController(ProducerSystem _producerSystem)
1419
{
1520
producerSystem = _producerSystem;
21+
22+
producerActor = AkkaLoad.ActorSelect("producerActor");
1623
}
1724

1825
/// <summary>
19-
/// Kafka 메시지 생성
26+
/// Kafka 메시지 생성 : System이용
2027
/// 개수와 tps조절가능
2128
///
2229
/// testTopic : akka100
2330
/// </summary>
24-
[HttpGet("HelloActor-Tell")]
25-
public int Kafka_ProducerMessage(int count, int tps)
31+
[HttpPost("HelloActor-Tell-System")]
32+
public int Kafka_ProducerMessageByActorSystem(int count, int tps)
2633
{
2734
List<string> messages = new List<string>();
2835
for (int i = 0; i < count; i++)
@@ -32,5 +39,29 @@ public int Kafka_ProducerMessage(int count, int tps)
3239
producerSystem.SinkMessage("producer1", "akka100", messages, tps);
3340
return 1;
3441
}
42+
43+
/// <summary>
44+
/// Kafka 메시지 생성 : Actor이용(이모델을 사용추천)
45+
///
46+
/// testTopic : akka100
47+
/// </summary>
48+
[HttpPost("HelloActor-Tell-Actor")]
49+
public int Kafka_ProducerMessageByActor(string topic, string message,int loop)
50+
{
51+
for(int i = 0; i < loop; i++)
52+
{
53+
producerActor.Tell(new BatchData()
54+
{
55+
Data = new KafkaTextMessage()
56+
{
57+
Topic = "akka100",
58+
Message = "testData-" + i
59+
}
60+
});
61+
}
62+
return 1;
63+
}
64+
65+
3566
}
3667
}

AkkaDotBootApi/Test/TestAkka.cs

Lines changed: 55 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -2,9 +2,11 @@
22
using AkkaDotBootApi.Actor;
33
using AkkaDotModule.ActorSample;
44
using AkkaDotModule.ActorUtils;
5+
using AkkaDotModule.ActorUtils.Confluent;
56
using AkkaDotModule.Config;
67
using AkkaDotModule.Kafka;
78
using AkkaDotModule.Models;
9+
using Confluent.Kafka;
810
using Microsoft.AspNetCore.Builder;
911
using Microsoft.Extensions.DependencyInjection;
1012
using System.Collections.Generic;
@@ -39,7 +41,55 @@ static public void Start(IApplicationBuilder app, ActorSystem actorSystem)
3941
// 배브의 작업자를 지정
4042
throttleWork.Tell(new SetTarget(worker));
4143

42-
// KAFKA 셋팅
44+
45+
// 기호에따라 사용방식이 약간 다른 KAFKA를 선택할수 있습니다.
46+
47+
//##################################################################
48+
//##### Confluent.Kafka를 Akka액터 모드로 연결한 모드로
49+
//##### 보안연결이 지원하기때문에 Saas형태의 Kafka에 보안연결이 가능합니다.
50+
//##### 커스텀한 액터를 생성하여,AkkaStream을 이해하고 직접 연결할수 있을때 유용합니다.
51+
//##################################################################
52+
//ProducerActor
53+
54+
var producerAkkaOption = new ProducerAkkaOption()
55+
{
56+
BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
57+
ProducerName = "webnori-kafka",
58+
SecuritOption = new KafkaSecurityOption()
59+
{
60+
SecurityProtocol = SecurityProtocol.SaslSsl,
61+
SaslMechanism = SaslMechanism.Plain,
62+
SaslUsername = "$ConnectionString",
63+
SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
64+
SslCaLocation = "./cacert.pem"
65+
}
66+
};
67+
68+
string producerActorName = "producerActor";
69+
70+
var producerActor= AkkaLoad.RegisterActor(producerActorName /*AkkaLoad가 인식하는 유니크명*/,
71+
actorSystem.ActorOf(Props.Create(() =>
72+
new ProducerActor(producerAkkaOption)),
73+
producerActorName /*AKKA가 인식하는 Path명*/
74+
));
75+
76+
producerActor.Tell(new BatchData()
77+
{
78+
Data = new KafkaTextMessage()
79+
{
80+
Topic = "akka100",
81+
Message = "testData"
82+
}
83+
});
84+
85+
86+
//##################################################################
87+
//##### Akka.Streams.Kafka(의존:Confluent.Kafka) 을 사용하는 모드로, Security(SSL)이 아직 지원되지 않습니다.
88+
//##### Private으로 구성된, Kafka Pass 모드일때 사용가능합니다.
89+
//##### AkkaStream.Kafka가 제공하는 스트림을 활용핼때 장점이 있습니다.
90+
//##################################################################
91+
92+
// KAFKA -
4393
// 각 System은 싱글톤이기때문에 DI를 통해 Controller에서 참조획득가능
4494
var consumerSystem = app.ApplicationServices.GetService<ConsumerSystem>();
4595
var producerSystem = app.ApplicationServices.GetService<ProducerSystem>();
@@ -48,16 +98,16 @@ static public void Start(IApplicationBuilder app, ActorSystem actorSystem)
4898
consumerSystem.Start(new ConsumerAkkaOption()
4999
{
50100
KafkaGroupId = "testGroup",
51-
KafkaUrl = "kafka:9092",
101+
BootstrapServers = "kafka:9092",
52102
RelayActor = null, //소비되는 메시지가 지정 액터로 전달되기때문에,처리기는 액터로 구현
53-
Topics = "akka100"
103+
Topics = "akka100",
54104
});
55105

56106
//생산자 : 복수개의 생산자 생성가능
57107
producerSystem.Start(new ProducerAkkaOption()
58108
{
59-
KafkaUrl = "kafka:9092",
60-
ProducerName = "producer1"
109+
BootstrapServers = "kafka:9092",
110+
ProducerName = "producer1",
61111
});
62112

63113
List<string> messages = new List<string>();
@@ -69,7 +119,6 @@ static public void Start(IApplicationBuilder app, ActorSystem actorSystem)
69119
//보너스 : 생산의 속도를 조절할수 있습니다.
70120
int tps = 10;
71121
producerSystem.SinkMessage("producer1", "akka100", messages, tps);
72-
73122
}
74123
}
75124
}

AkkaDotBootApi/akka.kafka.conf

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,7 @@ akka.kafka.producer {
3939

4040
# Properties defined by org.apache.kafka.clients.producer.ProducerConfig
4141
# can be defined in this configuration section.
42-
kafka-clients {
42+
kafka-clients {
4343
}
4444
}
4545
# // #producer-settings

0 commit comments

Comments
 (0)