From 8ea7e498e01a50c623c9f5df3a1281fccee10f3c Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Wed, 7 Sep 2022 20:04:09 +0800 Subject: [PATCH 1/9] init project and set project dependency --- SchedulingPractice.sln | 7 ++++++ .../ChachaSubWorkerBackgroundService.cs | 12 +++++++++ SubWorker.ChachaDemo/Program.cs | 25 +++++++++++++++++++ .../SubWorker.ChachaDemo.csproj | 19 ++++++++++++++ 4 files changed, 63 insertions(+) create mode 100644 SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs create mode 100644 SubWorker.ChachaDemo/Program.cs create mode 100644 SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj diff --git a/SchedulingPractice.sln b/SchedulingPractice.sln index 10fa63e..6ec9239 100644 --- a/SchedulingPractice.sln +++ b/SchedulingPractice.sln @@ -36,6 +36,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubWorker.AndyDemo", "SubWo EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SchedulingPractice.SubWorkerRunner", "SchedulingPractice.SubWorkerRunner\SchedulingPractice.SubWorkerRunner.csproj", "{5D349423-56A0-4E03-A59F-7D1AF95B78E7}" EndProject +Project("{FAE04EC0-301F-11D3-BF4B-00C04F79EFBC}") = "SubWorker.ChachaDemo", "SubWorker.ChachaDemo\SubWorker.ChachaDemo.csproj", "{67ECD76F-8410-436C-AF45-B783EDB7D59E}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -86,6 +88,10 @@ Global {5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Debug|Any CPU.Build.0 = Debug|Any CPU {5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Release|Any CPU.ActiveCfg = Release|Any CPU {5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Release|Any CPU.Build.0 = Release|Any CPU + {67ECD76F-8410-436C-AF45-B783EDB7D59E}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {67ECD76F-8410-436C-AF45-B783EDB7D59E}.Debug|Any CPU.Build.0 = Debug|Any CPU + {67ECD76F-8410-436C-AF45-B783EDB7D59E}.Release|Any CPU.ActiveCfg = Release|Any CPU + {67ECD76F-8410-436C-AF45-B783EDB7D59E}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -99,6 +105,7 @@ Global {7D80EF21-E6C8-46EC-8179-67D328B15335} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B} {32F4B1E1-4A5B-4695-9BCD-1A76F8566420} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B} {29711E67-24F9-4736-AAC8-BBE6831A4DEF} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B} + {67ECD76F-8410-436C-AF45-B783EDB7D59E} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {43B102A2-F68E-4926-9EAA-6763FDDEF0B4} diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs new file mode 100644 index 0000000..4a3e4f6 --- /dev/null +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -0,0 +1,12 @@ +using Microsoft.Extensions.Hosting; + +namespace SubWorker.ChachaDemo +{ + public class ChachaSubWorkerBackgroundService : BackgroundService + { + protected override Task ExecuteAsync(CancellationToken stoppingToken) + { + throw new NotImplementedException(); + } + } +} \ No newline at end of file diff --git a/SubWorker.ChachaDemo/Program.cs b/SubWorker.ChachaDemo/Program.cs new file mode 100644 index 0000000..4df1773 --- /dev/null +++ b/SubWorker.ChachaDemo/Program.cs @@ -0,0 +1,25 @@ +using Microsoft.Extensions.Hosting; +using Microsoft.Extensions.DependencyInjection; + + +namespace SubWorker.ChachaDemo +{ + class Program + { + static void Main(string[] args) + { + var host = new HostBuilder() + .ConfigureServices((context, services) => + { + //services.AddHostedService(); + services.AddHostedService(); + }) + .Build(); + using (host) + { + host.Start(); + host.WaitForShutdown(); + } + } + } +} \ No newline at end of file diff --git a/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj b/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj new file mode 100644 index 0000000..7367ed4 --- /dev/null +++ b/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj @@ -0,0 +1,19 @@ + + + + Exe + netcoreapp2.2 + enable + enable + 7.3 + + + + + + + + + + + From 231b4a2e8cb54613a37e1a90daf90d67280192c0 Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Thu, 15 Sep 2022 09:00:32 +0800 Subject: [PATCH 2/9] remove net 6 feature, set target version is net 2.2 --- SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs | 3 +++ SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj | 2 -- 2 files changed, 3 insertions(+), 2 deletions(-) diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs index 4a3e4f6..008d515 100644 --- a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -1,3 +1,6 @@ +using System; +using System.Threading; +using System.Threading.Tasks; using Microsoft.Extensions.Hosting; namespace SubWorker.ChachaDemo diff --git a/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj b/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj index 7367ed4..af10c6e 100644 --- a/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj +++ b/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj @@ -3,8 +3,6 @@ Exe netcoreapp2.2 - enable - enable 7.3 From e5612f0b904481dae5476b8017f50c2eb61e8239 Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Fri, 16 Sep 2022 13:11:31 +0800 Subject: [PATCH 3/9] first version --- .../ChachaSubWorkerBackgroundService.cs | 44 ++++++++++++++++++- .../SubWorker.ChachaDemo.csproj | 5 ++- 2 files changed, 45 insertions(+), 4 deletions(-) diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs index 008d515..a981486 100644 --- a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -1,15 +1,55 @@ using System; using System.Threading; +using System.Threading.Channels; using System.Threading.Tasks; using Microsoft.Extensions.Hosting; +using SchedulingPractice.Core; namespace SubWorker.ChachaDemo { public class ChachaSubWorkerBackgroundService : BackgroundService { - protected override Task ExecuteAsync(CancellationToken stoppingToken) + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - throw new NotImplementedException(); + var channels = Channel.CreateUnbounded(); + try + { + var getJob = Task.Run(() => { GetJob(channels, stoppingToken); }, stoppingToken).ConfigureAwait(false); + await foreach (var item in channels.Reader.ReadAllAsync(stoppingToken)) + { + await ProcessJob(item); + } + await getJob; + } + + catch + { + // Task.WaitAny(GetJob(channels, stoppingToken), Task.Delay(JobSettings.MinPrepareTime)); + Console.WriteLine("Shut down..."); + } + } + + private void GetJob(Channel channel, CancellationToken cts) + { + using var jobsRepo = new JobsRepo(); + while (!cts.IsCancellationRequested) + { + var jobs = jobsRepo.GetReadyJobs(TimeSpan.FromSeconds(10)); + foreach (var job in jobs) + { + channel.Writer.TryWrite(job); + } + } + } + + private async Task ProcessJob(JobInfo jobInfo) + { + using var jobsRepo = new JobsRepo(); + var now = DateTime.Now; + if (jobInfo.RunAt > now) + await Task.Delay(jobInfo.RunAt.Subtract(now)); + if (jobsRepo.AcquireJobLock(jobInfo.Id)) + jobsRepo.ProcessLockedJob(jobInfo.Id); } } } \ No newline at end of file diff --git a/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj b/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj index af10c6e..7115a9e 100644 --- a/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj +++ b/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj @@ -2,12 +2,13 @@ Exe - netcoreapp2.2 - 7.3 + net6.0 + latest + From bbbdf299caae3274c10c2de1a0818e5f597e17bf Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Fri, 16 Sep 2022 21:36:03 +0800 Subject: [PATCH 4/9] modify channel count, add more channel for doing job --- .../ChachaSubWorkerBackgroundService.cs | 58 ++++++++++++++----- 1 file changed, 43 insertions(+), 15 deletions(-) diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs index a981486..7c9dbf8 100644 --- a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -1,4 +1,5 @@ using System; +using System.Linq; using System.Threading; using System.Threading.Channels; using System.Threading.Tasks; @@ -9,45 +10,72 @@ namespace SubWorker.ChachaDemo { public class ChachaSubWorkerBackgroundService : BackgroundService { + private const int ChannelCount = 5; + private Channel[] channels; + + public ChachaSubWorkerBackgroundService() + { + channels = new Channel[ChannelCount]; + for (var i = 0; i < ChannelCount; i++) + { + channels[i] = Channel.CreateUnbounded(); + } + } + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - var channels = Channel.CreateUnbounded(); try { - var getJob = Task.Run(() => { GetJob(channels, stoppingToken); }, stoppingToken).ConfigureAwait(false); - await foreach (var item in channels.Reader.ReadAllAsync(stoppingToken)) + var getJob = Task.Run(async () => { await GetJob(stoppingToken); }, stoppingToken); + Parallel.For(0, channels.Length, new ParallelOptions() { - await ProcessJob(item); - } - await getJob; + MaxDegreeOfParallelism = Environment.ProcessorCount, + CancellationToken = stoppingToken + }, async i => + { + try + { + await foreach (var item in channels[i].Reader.ReadAllAsync(stoppingToken)) + { + await ProcessJob(item, stoppingToken); + } + } + catch + { + Console.WriteLine("shut down"); + } + }); + + await Task.WhenAny(getJob); } catch { - // Task.WaitAny(GetJob(channels, stoppingToken), Task.Delay(JobSettings.MinPrepareTime)); - Console.WriteLine("Shut down..."); + Console.WriteLine("shut down"); } } - private void GetJob(Channel channel, CancellationToken cts) + private async Task GetJob(CancellationToken cts) { - using var jobsRepo = new JobsRepo(); + using var jobRepo = new JobsRepo(); while (!cts.IsCancellationRequested) { - var jobs = jobsRepo.GetReadyJobs(TimeSpan.FromSeconds(10)); - foreach (var job in jobs) + await Task.Delay(JobSettings.MinPrepareTime, cts); + var jobs = jobRepo.GetReadyJobs().ToList(); + for (var i = 0; i < jobs.Count; i++) { - channel.Writer.TryWrite(job); + var index = i % ChannelCount; + await channels[index].Writer.WriteAsync(jobs[i], cts); } } } - private async Task ProcessJob(JobInfo jobInfo) + private async Task ProcessJob(JobInfo jobInfo, CancellationToken cts) { using var jobsRepo = new JobsRepo(); var now = DateTime.Now; if (jobInfo.RunAt > now) - await Task.Delay(jobInfo.RunAt.Subtract(now)); + await Task.Delay(jobInfo.RunAt.Subtract(now), cts); if (jobsRepo.AcquireJobLock(jobInfo.Id)) jobsRepo.ProcessLockedJob(jobInfo.Id); } From 79c3cacaf42bb88b2283cf32ba64d9b52730a7d5 Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Sat, 17 Sep 2022 04:51:18 +0800 Subject: [PATCH 5/9] bug fix, move getjobs delay. --- .../ChachaSubWorkerBackgroundService.cs | 48 +++++++------------ 1 file changed, 18 insertions(+), 30 deletions(-) diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs index 7c9dbf8..7fc2720 100644 --- a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -13,35 +13,26 @@ public class ChachaSubWorkerBackgroundService : BackgroundService private const int ChannelCount = 5; private Channel[] channels; - public ChachaSubWorkerBackgroundService() - { + public ChachaSubWorkerBackgroundService() { channels = new Channel[ChannelCount]; - for (var i = 0; i < ChannelCount; i++) - { + for (var i = 0; i < ChannelCount; i++) { channels[i] = Channel.CreateUnbounded(); } } - protected override async Task ExecuteAsync(CancellationToken stoppingToken) - { - try - { + protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + try { var getJob = Task.Run(async () => { await GetJob(stoppingToken); }, stoppingToken); - Parallel.For(0, channels.Length, new ParallelOptions() - { + Parallel.For(0, channels.Length, new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = stoppingToken - }, async i => - { - try - { - await foreach (var item in channels[i].Reader.ReadAllAsync(stoppingToken)) - { + }, async i => { + try { + await foreach (var item in channels[i].Reader.ReadAllAsync(stoppingToken)) { await ProcessJob(item, stoppingToken); } } - catch - { + catch { Console.WriteLine("shut down"); } }); @@ -49,33 +40,30 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) await Task.WhenAny(getJob); } - catch - { + catch { Console.WriteLine("shut down"); } } - private async Task GetJob(CancellationToken cts) - { + private async Task GetJob(CancellationToken cts) { using var jobRepo = new JobsRepo(); - while (!cts.IsCancellationRequested) - { - await Task.Delay(JobSettings.MinPrepareTime, cts); - var jobs = jobRepo.GetReadyJobs().ToList(); - for (var i = 0; i < jobs.Count; i++) - { + while (!cts.IsCancellationRequested) { + var jobs = jobRepo.GetReadyJobs(JobSettings.MinPrepareTime).ToList(); + for (var i = 0; i < jobs.Count; i++) { var index = i % ChannelCount; await channels[index].Writer.WriteAsync(jobs[i], cts); } + + await Task.Delay(JobSettings.MinPrepareTime, cts); } } - private async Task ProcessJob(JobInfo jobInfo, CancellationToken cts) - { + private async Task ProcessJob(JobInfo jobInfo, CancellationToken cts) { using var jobsRepo = new JobsRepo(); var now = DateTime.Now; if (jobInfo.RunAt > now) await Task.Delay(jobInfo.RunAt.Subtract(now), cts); + if (jobsRepo.GetJob(jobInfo.Id).State != 0) return; if (jobsRepo.AcquireJobLock(jobInfo.Id)) jobsRepo.ProcessLockedJob(jobInfo.Id); } From 4803e92d970c3c9f0817abd8857db33ade0c7ebd Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Sat, 17 Sep 2022 08:27:02 +0800 Subject: [PATCH 6/9] move lock job --- .../ChachaSubWorkerBackgroundService.cs | 52 ++++++++++++------- 1 file changed, 33 insertions(+), 19 deletions(-) diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs index 7fc2720..edf2e91 100644 --- a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -10,34 +10,33 @@ namespace SubWorker.ChachaDemo { public class ChachaSubWorkerBackgroundService : BackgroundService { - private const int ChannelCount = 5; - private Channel[] channels; + private readonly int _channelCount = Environment.ProcessorCount; + private readonly Channel[] _channels; public ChachaSubWorkerBackgroundService() { - channels = new Channel[ChannelCount]; - for (var i = 0; i < ChannelCount; i++) { - channels[i] = Channel.CreateUnbounded(); + _channels = new Channel[_channelCount]; + for (var i = 0; i < _channelCount; i++) { + _channels[i] = Channel.CreateUnbounded(); } } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { + await Task.Delay(1,stoppingToken); try { - var getJob = Task.Run(async () => { await GetJob(stoppingToken); }, stoppingToken); - Parallel.For(0, channels.Length, new ParallelOptions() { + Task.Run(async () => { await GetJobs(stoppingToken); }, stoppingToken); + Parallel.For(0, _channels.Length, new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = stoppingToken }, async i => { try { - await foreach (var item in channels[i].Reader.ReadAllAsync(stoppingToken)) { - await ProcessJob(item, stoppingToken); + await foreach (var item in _channels[i].Reader.ReadAllAsync(stoppingToken)) { + await ProcessJob(i, item, stoppingToken); } } catch { Console.WriteLine("shut down"); } }); - - await Task.WhenAny(getJob); } catch { @@ -45,27 +44,42 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { } } - private async Task GetJob(CancellationToken cts) { + private async Task GetJobs(CancellationToken cts) { using var jobRepo = new JobsRepo(); while (!cts.IsCancellationRequested) { var jobs = jobRepo.GetReadyJobs(JobSettings.MinPrepareTime).ToList(); for (var i = 0; i < jobs.Count; i++) { - var index = i % ChannelCount; - await channels[index].Writer.WriteAsync(jobs[i], cts); + var index = i % _channelCount; + await _channels[index].Writer.WriteAsync(jobs[i], cts); } await Task.Delay(JobSettings.MinPrepareTime, cts); } } - private async Task ProcessJob(JobInfo jobInfo, CancellationToken cts) { + private async Task ProcessJob(int channelIndex, JobInfo jobInfo, CancellationToken cts) { using var jobsRepo = new JobsRepo(); - var now = DateTime.Now; - if (jobInfo.RunAt > now) - await Task.Delay(jobInfo.RunAt.Subtract(now), cts); if (jobsRepo.GetJob(jobInfo.Id).State != 0) return; - if (jobsRepo.AcquireJobLock(jobInfo.Id)) + if (jobsRepo.AcquireJobLock(jobInfo.Id) == false) return; + var now = DateTime.Now; + if (jobInfo.RunAt > now) { + try { + Console.WriteLine($"Chanel index is {channelIndex}, job id is {jobInfo.Id}"); + await Task.Delay(jobInfo.RunAt.Subtract(now), cts); + } + catch (Exception e) { + Console.WriteLine( + $"Application shout down, early process job, job id is {jobInfo.Id} {e.ToString()}"); + jobsRepo.ProcessLockedJob(jobInfo.Id); + // goto shoutDown(e); + } + } + else { + Console.WriteLine($"Delay {jobInfo.Id}"); jobsRepo.ProcessLockedJob(jobInfo.Id); + } + + jobsRepo.ProcessLockedJob(jobInfo.Id); } } } \ No newline at end of file From 0e8059d39a7c7376db76f06ff98a019ac907c429 Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Sat, 17 Sep 2022 17:11:10 +0800 Subject: [PATCH 7/9] version 1 done --- .../ChachaSubWorkerBackgroundService.cs | 38 +++------- SubWorker.ChachaDemo/README.md | 76 +++++++++++++++++++ 2 files changed, 88 insertions(+), 26 deletions(-) create mode 100644 SubWorker.ChachaDemo/README.md diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs index edf2e91..a95643d 100644 --- a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -21,26 +21,21 @@ public ChachaSubWorkerBackgroundService() { } protected override async Task ExecuteAsync(CancellationToken stoppingToken) { - await Task.Delay(1,stoppingToken); + await Task.Delay(1, stoppingToken); try { - Task.Run(async () => { await GetJobs(stoppingToken); }, stoppingToken); + var getJobs = Task.Run(async () => { await GetJobs(stoppingToken); }, stoppingToken); Parallel.For(0, _channels.Length, new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = stoppingToken }, async i => { - try { - await foreach (var item in _channels[i].Reader.ReadAllAsync(stoppingToken)) { - await ProcessJob(i, item, stoppingToken); - } - } - catch { - Console.WriteLine("shut down"); + await foreach (var item in _channels[i].Reader.ReadAllAsync()) { + await ProcessJob(i, item, stoppingToken); } }); + await Task.WhenAny(getJobs, Task.Delay(Timeout.Infinite, stoppingToken)); } - catch { - Console.WriteLine("shut down"); + Console.WriteLine("Application shut down."); } } @@ -59,27 +54,18 @@ private async Task GetJobs(CancellationToken cts) { private async Task ProcessJob(int channelIndex, JobInfo jobInfo, CancellationToken cts) { using var jobsRepo = new JobsRepo(); - if (jobsRepo.GetJob(jobInfo.Id).State != 0) return; - if (jobsRepo.AcquireJobLock(jobInfo.Id) == false) return; var now = DateTime.Now; - if (jobInfo.RunAt > now) { + if (jobInfo.RunAt > now) try { - Console.WriteLine($"Chanel index is {channelIndex}, job id is {jobInfo.Id}"); await Task.Delay(jobInfo.RunAt.Subtract(now), cts); } - catch (Exception e) { - Console.WriteLine( - $"Application shout down, early process job, job id is {jobInfo.Id} {e.ToString()}"); - jobsRepo.ProcessLockedJob(jobInfo.Id); - // goto shoutDown(e); + catch { + Console.WriteLine($"Leave lock job. job id {jobInfo.Id}"); } - } - else { - Console.WriteLine($"Delay {jobInfo.Id}"); - jobsRepo.ProcessLockedJob(jobInfo.Id); - } - jobsRepo.ProcessLockedJob(jobInfo.Id); + if (jobsRepo.GetJob(jobInfo.Id).State != 0) return; + if (jobsRepo.AcquireJobLock(jobInfo.Id)) + jobsRepo.ProcessLockedJob(jobInfo.Id); } } } \ No newline at end of file diff --git a/SubWorker.ChachaDemo/README.md b/SubWorker.ChachaDemo/README.md new file mode 100644 index 0000000..aa98941 --- /dev/null +++ b/SubWorker.ChachaDemo/README.md @@ -0,0 +1,76 @@ +## Version 1.0 +Without early lock. + +Environment : + +| Item | Configuration | +|----------|----------------------| +| OS | Windows 11 Pro | + | CPU | AMD Ryzen 3700X | +| RAM | ADATA DDR4 3200 32GB | +| Storage | ADATA 1TB M.2 | +| Database | LocalDB | + +### One Instance + +``` +Jobs Scheduling Metrics: + +--(action count)---------------------------------------------- +- CREATE: 1736 +- ACQUIRE_SUCCESS: 1736 +- ACQUIRE_FAILURE: 0 +- COMPLETE: 1736 +- QUERYJOB: 1737 +- QUERYLIST: 65 + +--(state count)---------------------------------------------- +- COUNT(CREATE): 0 +- COUNT(LOCK): 0 +- COUNT(COMPLETE): 1736 + +--(statistics)---------------------------------------------- +- DELAY(Average): 113 +- DELAY(Stdev): 31.2237044096519 + +--(test result)---------------------------------------------- +- Complete Job: True, 1736 / 1736 +- Delay Too Long: 0 +- Fail Job: True, 0 + +--(benchmark score)---------------------------------------------- +- Exec Cost Score: 8237 (querylist x 100 + acquire-failure x 10 + queryjob x 1) +- Efficient Score: 144.22 (average + stdev) + +``` +### Three Instance +```Jobs Scheduling Metrics: + +--(action count)---------------------------------------------- +- CREATE: 1758 +- ACQUIRE_SUCCESS: 1758 +- ACQUIRE_FAILURE: 2445 +- COMPLETE: 1758 +- QUERYJOB: 5275 +- QUERYLIST: 195 + +--(state count)---------------------------------------------- +- COUNT(CREATE): 0 +- COUNT(LOCK): 0 +- COUNT(COMPLETE): 1758 + +--(statistics)---------------------------------------------- +- DELAY(Average): 101 +- DELAY(Stdev): 6.79669291539491 + +--(test result)---------------------------------------------- +- Complete Job: True, 1758 / 1758 +- Delay Too Long: 0 +- Fail Job: True, 0 + +--(benchmark score)---------------------------------------------- +- Exec Cost Score: 49225 (querylist x 100 + acquire-failure x 10 + queryjob x 1) +- Efficient Score: 107.8 (average + stdev) + +Process finished with exit code 0. +``` From a7009c8a3462abe1c92f8e8de73441c4670bcd4b Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Sat, 17 Sep 2022 19:38:43 +0800 Subject: [PATCH 8/9] modify project setting from net core 6 -> net core 2.2, and remove net 6 feature. --- SchedulingPractice.SubWorkerRunner/Program.cs | 2 + .../SchedulingPractice.SubWorkerRunner.csproj | 1 + .../ChachaSubWorkerBackgroundService.cs | 16 ++- SubWorker.ChachaDemo/README.md | 118 +++++++++++++++++- .../SubWorker.ChachaDemo.csproj | 2 +- 5 files changed, 132 insertions(+), 7 deletions(-) diff --git a/SchedulingPractice.SubWorkerRunner/Program.cs b/SchedulingPractice.SubWorkerRunner/Program.cs index ea5924b..0207645 100644 --- a/SchedulingPractice.SubWorkerRunner/Program.cs +++ b/SchedulingPractice.SubWorkerRunner/Program.cs @@ -50,7 +50,9 @@ static void Main(string[] args) }); } else if (mode == "acetaxxxx") services.AddHostedService(); + else if (mode == "jackie82422") services.AddHostedService(); else { throw new ArgumentOutOfRangeException($"Mode: {mode} not is not valid."); } + }) .Build(); diff --git a/SchedulingPractice.SubWorkerRunner/SchedulingPractice.SubWorkerRunner.csproj b/SchedulingPractice.SubWorkerRunner/SchedulingPractice.SubWorkerRunner.csproj index dfad027..cd1c302 100644 --- a/SchedulingPractice.SubWorkerRunner/SchedulingPractice.SubWorkerRunner.csproj +++ b/SchedulingPractice.SubWorkerRunner/SchedulingPractice.SubWorkerRunner.csproj @@ -14,6 +14,7 @@ + diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs index a95643d..a57ef1b 100644 --- a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -24,12 +24,17 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { await Task.Delay(1, stoppingToken); try { var getJobs = Task.Run(async () => { await GetJobs(stoppingToken); }, stoppingToken); - Parallel.For(0, _channels.Length, new ParallelOptions() { + Parallel.For((long)0, _channels.Length, new ParallelOptions() { MaxDegreeOfParallelism = Environment.ProcessorCount, CancellationToken = stoppingToken }, async i => { - await foreach (var item in _channels[i].Reader.ReadAllAsync()) { - await ProcessJob(i, item, stoppingToken); + while (!stoppingToken.IsCancellationRequested) { + try { + var item = await _channels[i].Reader.ReadAsync(stoppingToken); + await ProcessJob(item, stoppingToken); + } + catch { Console.WriteLine($"channel #{i} exit."); + } } }); await Task.WhenAny(getJobs, Task.Delay(Timeout.Infinite, stoppingToken)); @@ -52,7 +57,7 @@ private async Task GetJobs(CancellationToken cts) { } } - private async Task ProcessJob(int channelIndex, JobInfo jobInfo, CancellationToken cts) { + private async Task ProcessJob(JobInfo jobInfo, CancellationToken cts) { using var jobsRepo = new JobsRepo(); var now = DateTime.Now; if (jobInfo.RunAt > now) @@ -60,12 +65,13 @@ private async Task ProcessJob(int channelIndex, JobInfo jobInfo, CancellationTok await Task.Delay(jobInfo.RunAt.Subtract(now), cts); } catch { - Console.WriteLine($"Leave lock job. job id {jobInfo.Id}"); + Console.WriteLine($"X - Leave unlock job. job id {jobInfo.Id}"); } if (jobsRepo.GetJob(jobInfo.Id).State != 0) return; if (jobsRepo.AcquireJobLock(jobInfo.Id)) jobsRepo.ProcessLockedJob(jobInfo.Id); + Console.WriteLine("O"); } } } \ No newline at end of file diff --git a/SubWorker.ChachaDemo/README.md b/SubWorker.ChachaDemo/README.md index aa98941..c1425d8 100644 --- a/SubWorker.ChachaDemo/README.md +++ b/SubWorker.ChachaDemo/README.md @@ -1,4 +1,5 @@ ## Version 1.0 + Without early lock. Environment : @@ -6,10 +7,11 @@ Environment : | Item | Configuration | |----------|----------------------| | OS | Windows 11 Pro | - | CPU | AMD Ryzen 3700X | +| CPU | AMD Ryzen 3700X | | RAM | ADATA DDR4 3200 32GB | | Storage | ADATA 1TB M.2 | | Database | LocalDB | +| Runtime | Net 6 | ### One Instance @@ -43,7 +45,9 @@ Jobs Scheduling Metrics: - Efficient Score: 144.22 (average + stdev) ``` + ### Three Instance + ```Jobs Scheduling Metrics: --(action count)---------------------------------------------- @@ -74,3 +78,115 @@ Jobs Scheduling Metrics: Process finished with exit code 0. ``` +Environment : + +| Item | Configuration | +|----------|----------------------| +| OS | Windows 11 Pro | +| CPU | AMD Ryzen 3700X | +| RAM | ADATA DDR4 3200 32GB | +| Storage | ADATA 1TB M.2 | +| Database | LocalDB | +| Runtime | Net Core 2.2 | + +### One Instance + +``` +Jobs Scheduling Metrics: + +--(action count)---------------------------------------------- +- CREATE: 1749 +- ACQUIRE_SUCCESS: 1749 +- ACQUIRE_FAILURE: 1 +- COMPLETE: 1749 +- QUERYJOB: 1754 +- QUERYLIST: 65 + +--(state count)---------------------------------------------- +- COUNT(CREATE): 0 +- COUNT(LOCK): 0 +- COUNT(COMPLETE): 1749 + +--(statistics)---------------------------------------------- +- DELAY(Average): 113 +- DELAY(Stdev): 30.7915205815142 + +--(test result)---------------------------------------------- +- Complete Job: True, 1749 / 1749 +- Delay Too Long: 0 +- Fail Job: True, 0 + +--(benchmark score)---------------------------------------------- +- Exec Cost Score: 8264 (querylist x 100 + acquire-failure x 10 + queryjob x 1) +- Efficient Score: 143.79 (average + stdev) + +Process finished with exit code 0. +``` + +### Three Instance + +``` +Jobs Scheduling Metrics: + +--(action count)---------------------------------------------- +- CREATE: 1736 +- ACQUIRE_SUCCESS: 1736 +- ACQUIRE_FAILURE: 2767 +- COMPLETE: 1736 +- QUERYJOB: 5209 +- QUERYLIST: 195 + +--(state count)---------------------------------------------- +- COUNT(CREATE): 0 +- COUNT(LOCK): 0 +- COUNT(COMPLETE): 1736 + +--(statistics)---------------------------------------------- +- DELAY(Average): 101 +- DELAY(Stdev): 8.04448627812101 + +--(test result)---------------------------------------------- +- Complete Job: True, 1736 / 1736 +- Delay Too Long: 0 +- Fail Job: True, 0 + +--(benchmark score)---------------------------------------------- +- Exec Cost Score: 52379 (querylist x 100 + acquire-failure x 10 + queryjob x 1) +- Efficient Score: 109.04 (average + stdev) + +Process finished with exit code 0. +``` + +### Five Instances +``` +Jobs Scheduling Metrics: + +--(action count)---------------------------------------------- +- CREATE: 1740 +- ACQUIRE_SUCCESS: 1740 +- ACQUIRE_FAILURE: 5264 +- COMPLETE: 1740 +- QUERYJOB: 8699 +- QUERYLIST: 323 + +--(state count)---------------------------------------------- +- COUNT(CREATE): 0 +- COUNT(LOCK): 0 +- COUNT(COMPLETE): 1740 + +--(statistics)---------------------------------------------- +- DELAY(Average): 102 +- DELAY(Stdev): 8.63974181166169 + +--(test result)---------------------------------------------- +- Complete Job: True, 1740 / 1740 +- Delay Too Long: 0 +- Fail Job: True, 0 + +--(benchmark score)---------------------------------------------- +- Exec Cost Score: 93639 (querylist x 100 + acquire-failure x 10 + queryjob x 1) +- Efficient Score: 110.64 (average + stdev) + +Process finished with exit code 0. + +``` diff --git a/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj b/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj index 7115a9e..47f19cb 100644 --- a/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj +++ b/SubWorker.ChachaDemo/SubWorker.ChachaDemo.csproj @@ -2,7 +2,7 @@ Exe - net6.0 + netcoreapp2.2 latest From 18161fe3dceca4767970bee2dde2246fa6a35cd4 Mon Sep 17 00:00:00 2001 From: ChachaLin Date: Sun, 18 Sep 2022 00:48:55 +0800 Subject: [PATCH 9/9] format --- SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs index a57ef1b..5164bf7 100644 --- a/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs +++ b/SubWorker.ChachaDemo/ChachaSubWorkerBackgroundService.cs @@ -33,7 +33,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken) { var item = await _channels[i].Reader.ReadAsync(stoppingToken); await ProcessJob(item, stoppingToken); } - catch { Console.WriteLine($"channel #{i} exit."); + catch { + Console.WriteLine($"channel #{i} exit."); } } });