Skip to content

Commit ba0868f

Browse files
committed
Merge branch 'release/0.165'
2 parents f7a8207 + cfa3cf4 commit ba0868f

12 files changed

Lines changed: 319 additions & 196 deletions

File tree

pom.xml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@
66
<modelVersion>4.0.0</modelVersion>
77
<groupId>org.reaktivity</groupId>
88
<artifactId>reaktor</artifactId>
9-
<version>0.164</version>
9+
<version>0.165</version>
1010
<name>Reaktor for Java Nuklei</name>
1111
<description>Reaktor for Java Nuklei</description>
1212
<url>https://github.com/reaktivity/reaktor.java</url>

src/main/java/org/reaktivity/nukleus/Elektron.java

Lines changed: 9 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -15,26 +15,29 @@
1515
*/
1616
package org.reaktivity.nukleus;
1717

18-
import org.agrona.concurrent.Agent;
18+
import java.nio.channels.SelectableChannel;
19+
import java.util.function.Function;
20+
1921
import org.reaktivity.nukleus.route.AddressFactoryBuilder;
2022
import org.reaktivity.nukleus.route.RouteKind;
2123
import org.reaktivity.nukleus.stream.StreamFactoryBuilder;
24+
import org.reaktivity.reaktor.poller.PollerKey;
2225

2326
public interface Elektron
2427
{
25-
default StreamFactoryBuilder streamFactoryBuilder(
26-
RouteKind kind)
28+
default void setPollerKeySupplier(
29+
Function<SelectableChannel, PollerKey> supplyPollerKey)
2730
{
28-
return null;
2931
}
3032

31-
default AddressFactoryBuilder addressFactoryBuilder(
33+
default StreamFactoryBuilder streamFactoryBuilder(
3234
RouteKind kind)
3335
{
3436
return null;
3537
}
3638

37-
default Agent agent()
39+
default AddressFactoryBuilder addressFactoryBuilder(
40+
RouteKind kind)
3841
{
3942
return null;
4043
}

src/main/java/org/reaktivity/nukleus/stream/StreamFactoryBuilder.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package org.reaktivity.nukleus.stream;
1717

1818
import java.net.InetAddress;
19+
import java.nio.channels.SelectableChannel;
1920
import java.util.function.Function;
2021
import java.util.function.LongConsumer;
2122
import java.util.function.LongFunction;
@@ -32,6 +33,7 @@
3233
import org.reaktivity.nukleus.concurrent.Signaler;
3334
import org.reaktivity.nukleus.function.MessageConsumer;
3435
import org.reaktivity.nukleus.route.RouteManager;
36+
import org.reaktivity.reaktor.poller.PollerKey;
3537

3638
public interface StreamFactoryBuilder
3739
{
@@ -131,5 +133,11 @@ default StreamFactoryBuilder setHostResolver(
131133
return this;
132134
}
133135

136+
default StreamFactoryBuilder setPollerKeySupplier(
137+
Function<SelectableChannel, PollerKey> supplyPollerKey)
138+
{
139+
return this;
140+
}
141+
134142
StreamFactory build();
135143
}

src/main/java/org/reaktivity/reaktor/ReaktorBuilder.java

Lines changed: 2 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@
3535

3636
import org.agrona.ErrorHandler;
3737
import org.agrona.concurrent.Agent;
38-
import org.reaktivity.nukleus.AgentBuilder;
3938
import org.reaktivity.nukleus.Configuration;
4039
import org.reaktivity.nukleus.Controller;
4140
import org.reaktivity.nukleus.ControllerFactory;
@@ -55,7 +54,6 @@ public class ReaktorBuilder
5554
private Function<String, BitSet> affinityMaskDefault;
5655
private ErrorHandler errorHandler;
5756
private Supplier<NukleusFactory> supplyNukleusFactory;
58-
private Supplier<AgentBuilder> supplyAgentBuilder;
5957
private ThreadFactory threadFactory;
6058

6159
private int threads = 1;
@@ -134,13 +132,6 @@ public ReaktorBuilder loader(
134132
return this;
135133
}
136134

137-
public ReaktorBuilder supplyAgentBuilder(
138-
Supplier<AgentBuilder> supplyAgentBuilder)
139-
{
140-
this.supplyAgentBuilder = supplyAgentBuilder;
141-
return this;
142-
}
143-
144135
public Reaktor build()
145136
{
146137
final Set<Configuration> configs = new LinkedHashSet<>();
@@ -160,11 +151,10 @@ public Reaktor build()
160151
}
161152
}
162153

163-
// ensure control file is not created for no nuklei
164154
NukleusAgent nukleusAgent = null;
165-
if (!nuklei.isEmpty() || supplyAgentBuilder != null)
155+
if (!nuklei.isEmpty())
166156
{
167-
nukleusAgent = new NukleusAgent(config, supplyAgentBuilder);
157+
nukleusAgent = new NukleusAgent(config);
168158
nuklei.forEach(nukleusAgent::assign);
169159
}
170160

src/main/java/org/reaktivity/reaktor/internal/agent/ElektronAgent.java

Lines changed: 8 additions & 91 deletions
Original file line numberDiff line numberDiff line change
@@ -57,7 +57,6 @@
5757
import org.agrona.DeadlineTimerWheel.TimerHandler;
5858
import org.agrona.DirectBuffer;
5959
import org.agrona.MutableDirectBuffer;
60-
import org.agrona.collections.ArrayUtil;
6160
import org.agrona.collections.Int2ObjectHashMap;
6261
import org.agrona.collections.Long2ObjectHashMap;
6362
import org.agrona.concurrent.Agent;
@@ -68,7 +67,6 @@
6867
import org.agrona.concurrent.status.AtomicCounter;
6968
import org.agrona.concurrent.status.CountersManager;
7069
import org.agrona.hints.ThreadHints;
71-
import org.reaktivity.nukleus.AgentBuilder;
7270
import org.reaktivity.nukleus.Elektron;
7371
import org.reaktivity.nukleus.Nukleus;
7472
import org.reaktivity.nukleus.budget.BudgetDebitor;
@@ -94,6 +92,7 @@
9492
import org.reaktivity.reaktor.internal.layouts.BufferPoolLayout;
9593
import org.reaktivity.reaktor.internal.layouts.MetricsLayout;
9694
import org.reaktivity.reaktor.internal.layouts.StreamsLayout;
95+
import org.reaktivity.reaktor.internal.poller.Poller;
9796
import org.reaktivity.reaktor.internal.router.Resolver;
9897
import org.reaktivity.reaktor.internal.router.StreamId;
9998
import org.reaktivity.reaktor.internal.router.Target;
@@ -174,6 +173,7 @@ public class ElektronAgent implements Agent
174173
private final Long2ObjectHashMap<Address> addressesByRouteId;
175174

176175
private final RouteManager resolver;
176+
private final Poller poller;
177177

178178
private final DefaultBudgetCreditor creditor;
179179
private final Int2ObjectHashMap<DefaultBudgetDebitor> debitorsByIndex;
@@ -193,20 +193,16 @@ public class ElektronAgent implements Agent
193193
private long traceId;
194194
private long budgetId;
195195

196-
private volatile Agent[] agents;
197-
198196
private long lastReadStreamId;
199197

200-
201198
public ElektronAgent(
202199
int index,
203200
int count,
204201
ReaktorConfiguration config,
205202
LabelManager labels,
206203
ExecutorService executorService,
207204
Function<String, BitSet> affinityMask,
208-
Supplier<DirectBuffer> routesBufferRef,
209-
Supplier<AgentBuilder> supplyAgentBuilder)
205+
Supplier<DirectBuffer> routesBufferRef)
210206
{
211207
this.reaktorTypeId = labels.supplyLabelId(config.name());
212208
this.localIndex = index;
@@ -267,14 +263,14 @@ public ElektronAgent(
267263
this.affinityByRemoteId = new Long2ObjectHashMap<>();
268264
this.targetsByIndex = new Int2ObjectHashMap<>();
269265
this.writersByIndex = new Int2ObjectHashMap<>();
270-
this.agents = new Agent[0];
271266

272267
this.timerWheel = new DeadlineTimerWheel(MILLISECONDS, currentTimeMillis(), 512, 1024);
273268
this.tasksByTimerId = new Long2ObjectHashMap<>();
274269
this.futuresById = new Long2ObjectHashMap<>();
275270
this.signaler = new ElektronSignaler(executorService);
276271

277272
this.resolver = new ResolverRef(this::newResolver);
273+
this.poller = new Poller();
278274

279275
final BufferPool bufferPool = bufferPoolLayout.bufferPool();
280276

@@ -299,25 +295,6 @@ public ElektronAgent(
299295
this.creditor = new DefaultBudgetCreditor(index, budgetsLayout, this::doSystemFlush, this::supplyBudgetId,
300296
signaler::executeTaskAt, config.childCleanupLingerMillis());
301297
this.debitorsByIndex = new Int2ObjectHashMap<DefaultBudgetDebitor>();
302-
303-
if (supplyAgentBuilder != null)
304-
{
305-
final AgentBuilder agentBuilder = supplyAgentBuilder.get();
306-
final Agent agent = agentBuilder
307-
.setRouteManager(resolver)
308-
.setWriteBuffer(writeBuffer)
309-
.setAddressIdSupplier(labels::supplyLabelId)
310-
.setStreamFactorySupplier(this::supplyStreamFactory)
311-
.setThrottleSupplier(this::supplyThrottle)
312-
.setThrottleRemover(this::removeThrottle)
313-
.setInitialIdSupplier(this::supplyInitialId)
314-
.setReplyIdSupplier(this::supplyReplyId)
315-
.setTraceIdSupplier(this::supplyTraceId)
316-
.setGroupIdSupplier(this::supplyBudgetId)
317-
.setBufferPool(bufferPool)
318-
.build();
319-
this.agents = ArrayUtil.add(agents, agent);
320-
}
321298
}
322299

323300
private void onSystemMessage(
@@ -575,10 +552,7 @@ public int doWork() throws Exception
575552

576553
try
577554
{
578-
for (final Agent agent : agents)
579-
{
580-
workDone += agent.doWork();
581-
}
555+
workDone += poller.doWork();
582556

583557
if (timerWheel.timerCount() != 0L)
584558
{
@@ -621,10 +595,7 @@ public void onClose()
621595
ThreadHints.onSpinWait();
622596
}
623597

624-
for (final Agent agent : agents)
625-
{
626-
agent.onClose();
627-
}
598+
poller.onClose();
628599

629600
int acquiredBuffers = 0;
630601
int acquiredCreditors = 0;
@@ -1317,6 +1288,7 @@ private ElektronRef(
13171288
Elektron elekron)
13181289
{
13191290
this.elektron = requireNonNull(elekron);
1291+
this.elektron.setPollerKeySupplier(poller::register);
13201292

13211293
final Map<RouteKind, StreamFactory> streamFactories = new EnumMap<>(RouteKind.class);
13221294
final Map<RouteKind, AddressFactory> addressFactories = new EnumMap<>(RouteKind.class);
@@ -1357,15 +1329,6 @@ public ElektronRef assign(
13571329
{
13581330
synchronized (this)
13591331
{
1360-
if (this.count == 0)
1361-
{
1362-
final Agent agent = elektron.agent();
1363-
if (agent != null)
1364-
{
1365-
agents = ArrayUtil.add(agents, agent);
1366-
}
1367-
}
1368-
13691332
final StreamFactory streamFactory = streamFactories.get(routeKind);
13701333
if (streamFactory != null)
13711334
{
@@ -1389,31 +1352,6 @@ public ElektronRef unassign(
13891352
{
13901353
final StreamFactory streamFactory = streamFactoriesByAddressId.remove(labelId);
13911354
assert streamFactory == streamFactories.get(routeKind);
1392-
1393-
final Agent agent = elektron.agent();
1394-
if (agent != null)
1395-
{
1396-
// TODO: quiesce streams first
1397-
agents = ArrayUtil.remove(agents, agent);
1398-
final Agent closeAgent = new Agent()
1399-
{
1400-
1401-
@Override
1402-
public int doWork() throws Exception
1403-
{
1404-
quietClose(agent::onClose);
1405-
agents = ArrayUtil.remove(agents, this);
1406-
return 1;
1407-
}
1408-
1409-
@Override
1410-
public String roleName()
1411-
{
1412-
return String.format("%s (deferred close)", agent.roleName());
1413-
}
1414-
};
1415-
agents = ArrayUtil.add(agents, closeAgent);
1416-
}
14171355
}
14181356
}
14191357

@@ -1457,31 +1395,10 @@ private StreamFactory newStreamFactory(
14571395
.setDroppedFrameConsumer(this::handleDroppedReadFrame)
14581396
.setRemoteIndexSupplier(StreamId::remoteIndex)
14591397
.setHostResolver(resolveHost)
1398+
.setPollerKeySupplier(poller::register)
14601399
.build();
14611400
}
14621401

1463-
private StreamFactory supplyStreamFactory(
1464-
int addressId)
1465-
{
1466-
return streamFactoriesByAddressId.get(addressId);
1467-
}
1468-
1469-
private MessageConsumer supplyThrottle(
1470-
long streamId)
1471-
{
1472-
final int instanceId = instanceId(streamId);
1473-
final Int2ObjectHashMap<MessageConsumer> dispatcher = throttles[throttleIndex(streamId)];
1474-
return dispatcher.get(instanceId);
1475-
}
1476-
1477-
private void removeThrottle(
1478-
long streamId)
1479-
{
1480-
final int instanceId = instanceId(streamId);
1481-
final Int2ObjectHashMap<MessageConsumer> dispatcher = throttles[throttleIndex(streamId)];
1482-
dispatcher.remove(instanceId);
1483-
}
1484-
14851402
private long supplyInitialId(
14861403
long routeId)
14871404
{

src/main/java/org/reaktivity/reaktor/internal/agent/NukleusAgent.java

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -24,7 +24,6 @@
2424
import java.util.Map;
2525
import java.util.concurrent.ExecutorService;
2626
import java.util.function.Function;
27-
import java.util.function.Supplier;
2827

2928
import org.agrona.CloseHelper;
3029
import org.agrona.DirectBuffer;
@@ -36,7 +35,6 @@
3635
import org.agrona.concurrent.broadcast.BroadcastTransmitter;
3736
import org.agrona.concurrent.ringbuffer.ManyToOneRingBuffer;
3837
import org.agrona.concurrent.ringbuffer.RingBuffer;
39-
import org.reaktivity.nukleus.AgentBuilder;
4038
import org.reaktivity.nukleus.Nukleus;
4139
import org.reaktivity.nukleus.function.CommandHandler;
4240
import org.reaktivity.nukleus.function.MessagePredicate;
@@ -69,7 +67,6 @@ public class NukleusAgent implements Agent
6967
private final FrozenFW.Builder frozenRW = new FrozenFW.Builder();
7068

7169
private final ReaktorConfiguration config;
72-
private final Supplier<AgentBuilder> supplyAgentBuilder;
7370
private final LabelManager labels;
7471
private final List<ElektronAgent> elektronAgents;
7572
private final Map<String, Nukleus> nukleiByName;
@@ -82,11 +79,9 @@ public class NukleusAgent implements Agent
8279
private final MessageHandler commandHandler;
8380

8481
public NukleusAgent(
85-
ReaktorConfiguration config,
86-
Supplier<AgentBuilder> supplyAgentBuilder)
82+
ReaktorConfiguration config)
8783
{
8884
this.config = config;
89-
this.supplyAgentBuilder = supplyAgentBuilder;
9085
this.labels = new LabelManager(config.directory());
9186
this.elektronAgents = new ArrayList<>();
9287
this.nukleiByName = new HashMap<>();
@@ -152,7 +147,7 @@ public ElektronAgent supplyElektronAgent(
152147
Function<String, BitSet> affinityMask)
153148
{
154149
ElektronAgent newElektronAgent = new ElektronAgent(index, count, config, labels, executor, affinityMask,
155-
router::readonlyRoutesBuffer, supplyAgentBuilder);
150+
router::readonlyRoutesBuffer);
156151
elektronAgents.add(newElektronAgent);
157152
return newElektronAgent;
158153
}

0 commit comments

Comments
 (0)