Skip to content

Latest commit

 

History

History
105 lines (77 loc) · 3.64 KB

File metadata and controls

105 lines (77 loc) · 3.64 KB

using

Module:

public class ServiceBusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        var serviceBusClient = builder.RegisterMyServiceBusTcpClient(Program.ReloadedSettings(e => e.SpotServiceBusHostPort), ApplicationEnvironment.HostName, Program.LogFactory);

        var queryName = "Liquidity-Reports";


        // publisher (IServiceBusPublisher<PortfolioTrade>)
        builder.RegisterMyServiceBusPublisher<PortfolioTrade>(serviceBusClient, PortfolioTrade.TopicName, false);


        // batch subscriber (ISubscriber<IReadOnlyList<PortfolioTrade>>)
        builder.RegisterMyServiceBusSubscriberBatch<PortfolioTrade>(serviceBusClient, PortfolioTrade.TopicName, queryName, TopicQueueType.PermanentWithSingleConnection);


        // single subscriber (ISubscriber<PortfolioTrade>)
        builder.RegisterMyServiceBusSubscriberSingle<PortfolioTrade>(serviceBusClient, PortfolioPosition.TopicName, queryName, TopicQueueType.PermanentWithSingleConnection);
    }
}

DeserializeExceptionHandler

In case it you want to skip message with deserialize exception then you can use global handler to log and skip message. But be careful to use it, in this case you can miss important messages from topic in case if happe breking change in the message model and you forgot to update client service.

// call in sope plase static method to activete globally DeserializeExceptionHandler

MyServiceBusGlobalEventHandler.SetLogAndSkipDeserializeExceptionHandler(Program.LogFactory);

LifeTime:

public class ApplicationLifetimeManager : ApplicationLifetimeManagerBase
{
    private readonly ServiceBusLifeTime _myServiceBusLifeTime;

    public ApplicationLifetimeManager(IHostApplicationLifetime appLifetime, ServiceBusLifeTime myServiceBusLifeTime)
        : base(appLifetime)
    {
        _myServiceBusLifeTime = myServiceBusLifeTime;
    }

    protected override void OnStarted()
    {
        _myServiceBusLifeTime.Start();
    }

    protected override void OnStopping()
    {
        _myServiceBusLifeTime.Stop();
    }
}

Model:

[DataContract]
public class PortfolioTrade
{
    public const string TopicName = "spot-liquidity-engine-trade";

    [DataMember(Order = 1)] public string TradeId { get; set; }
    [DataMember(Order = 2)] public string Source { get; set; }
    [DataMember(Order = 3)] public bool IsInternal { get; set; }
}

Deduplication:

public class ServiceBusModule : Module
{
    protected override void Load(ContainerBuilder builder)
    {
        var serviceBusClient = builder.RegisterMyServiceBusTcpClient(() => Program.Settings.SpotServiceBusHostPort, ApplicationEnvironment.HostName, Program.LogFactory);

        var queryName = "Liquidity-Reports";

        var deduplicator = builder.RegisterMyServiceBusDeduplicator<PortfolioTrade>(
                    t => t.TraceId,                                             //Func to get unique id
                    Program.ReloadedSettings(t=>t.MyNoSqlWriterUrl),            
                    queryName,                                                  //NoSql table name
                    PortfolioTrade.TopicName,                                   //NoSql partition key
                    TimeSpan.FromHours(4),                                      //Expiration time
                    Program.LogFactory);


        // single subscriber (ISubscriber<PortfolioTrade>)
        // dedupication only available for single subscriber
        builder.RegisterMyServiceBusSubscriberSingle<PortfolioTrade>(serviceBusClient, PortfolioPosition.TopicName, queryName, TopicQueueType.Permanent, deduplicator);
    }
}