Skip to content

Commit cb145d4

Browse files
committed
[Actor유틸] Kafka ConsumerActor 추가
1 parent 71b4f41 commit cb145d4

8 files changed

Lines changed: 157 additions & 22 deletions

File tree

AkkaDotBootApi/Test/TestAkka.cs

Lines changed: 29 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,13 @@ static public void Start(IApplicationBuilder app, ActorSystem actorSystem)
4949
//##### 보안연결이 지원하기때문에 Saas형태의 Kafka에 보안연결이 가능합니다.
5050
//##### 커스텀한 액터를 생성하여,AkkaStream을 이해하고 직접 연결할수 있을때 유용합니다.
5151
//##################################################################
52+
5253
//ProducerActor
53-
5454
var producerAkkaOption = new ProducerAkkaOption()
5555
{
5656
BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
5757
ProducerName = "webnori-kafka",
58-
SecuritOption = new KafkaSecurityOption()
58+
SecurityOption = new KafkaSecurityOption()
5959
{
6060
SecurityProtocol = SecurityProtocol.SaslSsl,
6161
SaslMechanism = SaslMechanism.Plain,
@@ -66,7 +66,6 @@ static public void Start(IApplicationBuilder app, ActorSystem actorSystem)
6666
};
6767

6868
string producerActorName = "producerActor";
69-
7069
var producerActor= AkkaLoad.RegisterActor(producerActorName /*AkkaLoad가 인식하는 유니크명*/,
7170
actorSystem.ActorOf(Props.Create(() =>
7271
new ProducerActor(producerAkkaOption)),
@@ -82,6 +81,33 @@ static public void Start(IApplicationBuilder app, ActorSystem actorSystem)
8281
}
8382
});
8483

84+
//ConsumerActor
85+
var consumerAkkaOption = new ConsumerAkkaOption()
86+
{
87+
BootstrapServers = "webnori-kafka.servicebus.windows.net:9093",
88+
Topics = "akka100",
89+
AutoOffsetReset = AutoOffsetReset.Earliest,
90+
KafkaGroupId = "akakTestGroup",
91+
RelayActor = null, //작업자 액터를 연결하면, 소비메시지가 작업자에게 전달된다 ( 컨슘기능과 작업자 기능의 분리)
92+
SecurityOption = new KafkaSecurityOption()
93+
{
94+
SecurityProtocol = SecurityProtocol.SaslSsl,
95+
SaslMechanism = SaslMechanism.Plain,
96+
SaslUsername = "$ConnectionString",
97+
SaslPassword = "Endpoint=sb://webnori-kafka.servicebus.windows.net/;SharedAccessKeyName=kafka-client;SharedAccessKey=PfL0qRUm50AXZHRXLiVfnatIRI3OqAh+dT6Owsqrd2M=",
98+
SslCaLocation = "./cacert.pem"
99+
}
100+
};
101+
102+
string consumerActorName = "consumerActor";
103+
var consumerActor = AkkaLoad.RegisterActor(consumerActorName /*AkkaLoad가 인식하는 유니크명*/,
104+
actorSystem.ActorOf(Props.Create(() =>
105+
new ConsumerActor(consumerAkkaOption)),
106+
consumerActorName /*AKKA가 인식하는 Path명*/
107+
));
108+
109+
//컨슈머를 작동시킨다.
110+
consumerActor.Tell(new ConsumerStart());
85111

86112
//##################################################################
87113
//##### Akka.Streams.Kafka(의존:Confluent.Kafka) 을 사용하는 모드로, Security(SSL)이 아직 지원되지 않습니다.
Lines changed: 111 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,11 +1,117 @@
1-
using System;
2-
using System.Collections.Generic;
3-
using System.Text;
1+
using Akka.Actor;
2+
using Akka.Event;
3+
using AkkaDotModule.Kafka;
4+
using AkkaDotModule.Models;
5+
using Confluent.Kafka;
6+
using System;
7+
using System.Threading;
48

59
namespace AkkaDotModule.ActorUtils.Confluent
610
{
7-
//TODO : 준비중....소비자는 생산자보다 조금더 난이도가 있습니다.
8-
class ConsumerActor
11+
public class ConsumerStart
912
{
13+
};
14+
15+
public class ConsumerPull
16+
{
17+
};
18+
19+
public class ConsumerStop
20+
{
21+
};
22+
23+
public class ConsumerActor : ReceiveActor
24+
{
25+
private readonly string topic;
26+
27+
private readonly ConsumerConfig consumerConfig;
28+
29+
private readonly CancellationTokenSource cancellationTokenSource;
30+
31+
private readonly IConsumer<Ignore, string> consumer;
32+
33+
private readonly IActorRef workActor;
34+
35+
private readonly ILoggingAdapter logger = Context.GetLogger();
36+
37+
public ConsumerActor(ConsumerAkkaOption consumerAkkaOption)
38+
{
39+
topic = consumerAkkaOption.Topics;
40+
cancellationTokenSource = new CancellationTokenSource();
41+
workActor = consumerAkkaOption.RelayActor;
42+
43+
if (consumerAkkaOption.SecurityOption != null)
44+
{
45+
consumerConfig = new ConsumerConfig()
46+
{
47+
GroupId = consumerAkkaOption.KafkaGroupId,
48+
BootstrapServers = consumerAkkaOption.BootstrapServers,
49+
AutoOffsetReset = consumerAkkaOption.AutoOffsetReset,
50+
//For Security
51+
SecurityProtocol = consumerAkkaOption.SecurityOption.SecurityProtocol,
52+
SaslMechanism = consumerAkkaOption.SecurityOption.SaslMechanism,
53+
SaslUsername = consumerAkkaOption.SecurityOption.SaslUsername,
54+
SaslPassword = consumerAkkaOption.SecurityOption.SaslPassword,
55+
SslCaLocation = consumerAkkaOption.SecurityOption.SslCaLocation,
56+
};
57+
}
58+
else
59+
{
60+
consumerConfig = new ConsumerConfig()
61+
{
62+
GroupId = consumerAkkaOption.KafkaGroupId,
63+
BootstrapServers = consumerAkkaOption.BootstrapServers,
64+
AutoOffsetReset = consumerAkkaOption.AutoOffsetReset,
65+
};
66+
}
67+
68+
consumer = new ConsumerBuilder<Ignore, string>(consumerConfig).Build();
69+
70+
ReceiveAsync<ConsumerStart>(async msg =>
71+
{
72+
IActorRef selfActor = this.Self;
73+
consumer.Subscribe(topic);
74+
75+
var cr = consumer.Consume(cancellationTokenSource.Token);
76+
selfActor.Tell(new KafkaTextMessage()
77+
{
78+
Topic = cr.Topic,
79+
Message = cr.Message.Value
80+
});
81+
});
82+
83+
ReceiveAsync<ConsumerPull>(async msg =>
84+
{
85+
IActorRef selfActor = this.Self;
86+
var cr = consumer.Consume(cancellationTokenSource.Token);
87+
selfActor.Tell(new KafkaTextMessage()
88+
{
89+
Topic = cr.Topic,
90+
Message = cr.Message.Value
91+
});
92+
});
93+
94+
ReceiveAsync<KafkaTextMessage>(async msg =>
95+
{
96+
IActorRef selfActor = this.Self;
97+
//이 액터는 카프카 메시지소비만 담당하며
98+
//소비된 메시지는 작업 액터에게 전달한다.
99+
string logText = $"Consumed message '{msg.Message}' Topic: '{msg.Topic}'.";
100+
logger.Debug(logText);
101+
102+
if (workActor != null)
103+
{
104+
workActor.Tell(msg);
105+
}
106+
selfActor.Tell(new ConsumerPull());
107+
});
108+
}
109+
110+
protected override void PostStop()
111+
{
112+
Console.WriteLine("try down KafkaConsumer.....");
113+
cancellationTokenSource.Cancel();
114+
}
115+
10116
}
11117
}

AkkaDotModule/ActorUtils/Confluent/ProducerActor.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -22,16 +22,16 @@ public ProducerActor(ProducerAkkaOption producerAkkaOption)
2222
//Actor Init
2323
sentMessageCount = 0;
2424

25-
if (producerAkkaOption.SecuritOption != null)
25+
if (producerAkkaOption.SecurityOption != null)
2626
{
2727
producerConfig = new ProducerConfig()
2828
{
2929
BootstrapServers = producerAkkaOption.BootstrapServers,
30-
SecurityProtocol = producerAkkaOption.SecuritOption.SecurityProtocol,
31-
SaslMechanism = producerAkkaOption.SecuritOption.SaslMechanism,
32-
SaslUsername = producerAkkaOption.SecuritOption.SaslUsername,
33-
SaslPassword = producerAkkaOption.SecuritOption.SaslPassword,
34-
SslCaLocation = producerAkkaOption.SecuritOption.SslCaLocation,
30+
SecurityProtocol = producerAkkaOption.SecurityOption.SecurityProtocol,
31+
SaslMechanism = producerAkkaOption.SecurityOption.SaslMechanism,
32+
SaslUsername = producerAkkaOption.SecurityOption.SaslUsername,
33+
SaslPassword = producerAkkaOption.SecurityOption.SaslPassword,
34+
SslCaLocation = producerAkkaOption.SecurityOption.SslCaLocation,
3535
//Debug = "security,broker,protocol" //Uncomment for librdkafka debugging information
3636
};
3737

AkkaDotModule/AkkaDotModule.csproj

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,7 @@
44
<TargetFramework>netstandard2.0</TargetFramework>
55
<GeneratePackageOnBuild>true</GeneratePackageOnBuild>
66
<PackageId>AkkaDotModule.Webnori</PackageId>
7-
<Version>1.0.8</Version>
7+
<Version>1.0.9</Version>
88
<RepositoryUrl>https://github.com/psmon/AkkaDotModule</RepositoryUrl>
99
<PackageProjectUrl>https://github.com/psmon/AkkaDotModule</PackageProjectUrl>
1010
</PropertyGroup>

AkkaDotModule/Kafka/ConsumerSystem.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,9 +28,9 @@ public void Start(ConsumerAkkaOption consumerActorOption)
2828
.WithBootstrapServers(consumerActorOption.BootstrapServers)
2929
.WithGroupId(consumerActorOption.KafkaGroupId);
3030

31-
if(consumerActorOption.SucuritOption != null)
31+
if(consumerActorOption.SecurityOption != null)
3232
{
33-
KafkaSecurityOption kafkaSecurityOption = consumerActorOption.SucuritOption;
33+
KafkaSecurityOption kafkaSecurityOption = consumerActorOption.SecurityOption;
3434
}
3535

3636

AkkaDotModule/Kafka/KafkaOptions.cs

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,9 @@ public class ConsumerAkkaOption
2929

3030
public string Topics { get; set; }
3131

32-
public KafkaSecurityOption SucuritOption { get; set; }
32+
public AutoOffsetReset AutoOffsetReset { get;set;}
33+
34+
public KafkaSecurityOption SecurityOption { get; set; }
3335
}
3436

3537
public class ProducerAkkaOption
@@ -38,7 +40,7 @@ public class ProducerAkkaOption
3840

3941
public string ProducerName { get; set; }
4042

41-
public KafkaSecurityOption SecuritOption { get; set; }
43+
public KafkaSecurityOption SecurityOption { get; set; }
4244
}
4345

4446

AkkaDotModule/Kafka/ProducerSystem.cs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -36,9 +36,9 @@ public void Start(ProducerAkkaOption producerAkkaOption)
3636
var producer = ProducerSettings<Null, string>.Create(producerSystem, null, null)
3737
.WithBootstrapServers(producerAkkaOption.BootstrapServers);
3838

39-
if(producerAkkaOption.SecuritOption != null)
39+
if(producerAkkaOption.SecurityOption != null)
4040
{
41-
KafkaSecurityOption kafkaSecurityOption = producerAkkaOption.SecuritOption;
41+
KafkaSecurityOption kafkaSecurityOption = producerAkkaOption.SecurityOption;
4242
/*
4343
producer = producer
4444
.WithProperty("security.protocol", kafkaSecurityOption.SecurityProtocol)

README.md

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,8 @@ AKKA의 버전업에 항상 대응하는것이아닌, 유닛테스트를 통해
3232

3333
# 주요 릴리즈 노트
3434

35-
- 1.0.8 : Kafka ProducerActor 추가 (목적:Kafka SSL모드로 쉽게 작동하기 위한 신규 모듈)
35+
- 1.0.9 : Kafka ConsumerActor 추가 (목적:Kafka SSL모드지원) - [사용법](AkkaDotBootApi/Test/TestAkka.cs)
36+
- 1.0.8 : Kafka ProducerActor 추가 (목적:Kafka SSL모드지원) - [Link](http://wiki.webnori.com/display/webfr/Auzere+EventHub%28KAFKA%29+With+Actor)
3637
- 1.0.7 : 실시간 배치처리기([BatchActor](TestAkkaDotModule/TestActors/BatchActorTest.cs)) 추가
3738
- 1.0.6 : Kafka 도커 인프라추가및, TestAPI 샘플 추가
3839
- 1.0.5 : Kafka Stream 지원 : 액터시스템을 이용하여 Kafka를 더 심플하고 강력하게 사용가능합니다.

0 commit comments

Comments
 (0)