From 1641a4c63dafed2f038d73dd0937ceaf64d5b7ba Mon Sep 17 00:00:00 2001 From: "DESKTOP-O1U24RQ\\Kevin_Yu" Date: Thu, 14 Jul 2022 15:25:18 +0800 Subject: [PATCH] add test --- SchedulingPractice.sln | 7 + SubWorker.KevinUDemo/Program.cs | 160 ++++++++++++++++++ SubWorker.KevinUDemo/ScheduleQueue.cs | 122 +++++++++++++ .../SubWorker.KevinUDemo.csproj | 16 ++ 4 files changed, 305 insertions(+) create mode 100644 SubWorker.KevinUDemo/Program.cs create mode 100644 SubWorker.KevinUDemo/ScheduleQueue.cs create mode 100644 SubWorker.KevinUDemo/SubWorker.KevinUDemo.csproj diff --git a/SchedulingPractice.sln b/SchedulingPractice.sln index 10fa63e..6dbde01 100644 --- a/SchedulingPractice.sln +++ b/SchedulingPractice.sln @@ -36,6 +36,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubWorker.AndyDemo", "SubWo EndProject Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SchedulingPractice.SubWorkerRunner", "SchedulingPractice.SubWorkerRunner\SchedulingPractice.SubWorkerRunner.csproj", "{5D349423-56A0-4E03-A59F-7D1AF95B78E7}" EndProject +Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubWorker.KevinUDemo", "SubWorker.KevinUDemo\SubWorker.KevinUDemo.csproj", "{CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}" +EndProject Global GlobalSection(SolutionConfigurationPlatforms) = preSolution Debug|Any CPU = Debug|Any CPU @@ -86,6 +88,10 @@ Global {5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Debug|Any CPU.Build.0 = Debug|Any CPU {5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Release|Any CPU.ActiveCfg = Release|Any CPU {5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Release|Any CPU.Build.0 = Release|Any CPU + {CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU + {CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}.Debug|Any CPU.Build.0 = Debug|Any CPU + {CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}.Release|Any CPU.ActiveCfg = Release|Any CPU + {CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}.Release|Any CPU.Build.0 = Release|Any CPU EndGlobalSection GlobalSection(SolutionProperties) = preSolution HideSolutionNode = FALSE @@ -99,6 +105,7 @@ Global {7D80EF21-E6C8-46EC-8179-67D328B15335} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B} {32F4B1E1-4A5B-4695-9BCD-1A76F8566420} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B} {29711E67-24F9-4736-AAC8-BBE6831A4DEF} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B} + {CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B} EndGlobalSection GlobalSection(ExtensibilityGlobals) = postSolution SolutionGuid = {43B102A2-F68E-4926-9EAA-6763FDDEF0B4} diff --git a/SubWorker.KevinUDemo/Program.cs b/SubWorker.KevinUDemo/Program.cs new file mode 100644 index 0000000..c149e93 --- /dev/null +++ b/SubWorker.KevinUDemo/Program.cs @@ -0,0 +1,160 @@ +using System; +using System.Collections.Generic; +using System.Diagnostics; +using System.Linq; +using System.Linq.Expressions; +using System.Threading; +using System.Threading.Tasks; +using Microsoft.Extensions.DependencyInjection; +using Microsoft.Extensions.Hosting; +using SchedulingPractice.Core; + +namespace SubWorker.KevinUDemo +{/* 5 instances + Jobs Scheduling Metrics: + +--(action count)---------------------------------------------- +- CREATE: 873 +- ACQUIRE_SUCCESS: 873 +- ACQUIRE_FAILURE: 739 +- COMPLETE: 873 +- QUERYJOB: 4375 +- QUERYLIST: 90 + +--(state count)---------------------------------------------- +- COUNT(CREATE): 0 +- COUNT(LOCK): 0 +- COUNT(COMPLETE): 873 + +--(statistics)---------------------------------------------- +- DELAY(Average): 160 +- DELAY(Stdev): 171.2532911466342 + +--(test result)---------------------------------------------- +- Complete Job: True, 873 / 873 +- Delay Too Long: 0 +- Fail Job: True, 0 + +--(benchmark score)---------------------------------------------- +- Exec Cost Score: 20765 (querylist x 100 + acquire-failure x 10 + queryjob x 1) +- Efficient Score: 331.25 (average + stdev) + */ + //using IHostedService instead of BackgroundService for the practice + public class ScheduleEngine : IHostedService + { + static Lazy> _que = new (); + + private void ProcessingWork(int id) + { + using (JobsRepo repo = new JobsRepo()) + { + var nowInfo = repo.GetJob(id); + + if (nowInfo.State == 0) + { + if (repo.AcquireJobLock(id)) + { + repo.ProcessLockedJob(nowInfo.Id); + Console.WriteLine( + $"[T: {Thread.CurrentThread.ManagedThreadId}] process job({nowInfo.Id}) with delay {(DateTime.Now - nowInfo.RunAt).TotalMilliseconds} msec..."); + + } + } + } + } + + public async Task StartAsync(CancellationToken cancellationToken) + { + Console.WriteLine("Application is running...."); + await Task.Delay(3000); + Random rnd = new Random(); + Stopwatch timer = new Stopwatch(); + using (JobsRepo repo = new JobsRepo()) + { + while (true) + { + Console.WriteLine($"[T: {Thread.CurrentThread.ManagedThreadId}] fetch available jobs from repository..."); + + foreach (var job in repo.GetReadyJobs(JobSettings.MinPrepareTime)) + { + var now = DateTime.Now; + int predict_time = rnd.Next(300, 1700); + if (job.RunAt - DateTime.Now > TimeSpan.FromMilliseconds(predict_time)) // 等到約一秒前,可以被取消。一秒內就先 LOCK + { + try + { + await Task.Delay(job.RunAt - DateTime.Now - TimeSpan.FromMilliseconds(predict_time), cancellationToken); + } + catch { } + } + if (repo.GetJob(job.Id).State != 0) continue; + // if (repo.AcquireJobLock(job.Id) == false) continue; + if (DateTime.Now < job.RunAt) await Task.Delay(job.RunAt - DateTime.Now); + _que.Value.Processing = ((info) => + { + ProcessingWork(info.Id); + }); + await _que.Value.Enqueue(job); + + + } + try + { + await Task.Delay( + (int)Math.Max((JobSettings.MinPrepareTime - timer.Elapsed).TotalMilliseconds, 0), + cancellationToken); + } + catch + { + await StopAsync(cancellationToken); + } + } + + } + + } + + public Task StopAsync(CancellationToken cancellationToken) + { + _que.Value.Dispose(); + return Task.CompletedTask; + } + } + class Program + { + static async Task Main(string[] args) + { + Console.WriteLine("Starting Application...."); + try + { + await StartUp(); + } + catch (Exception e) + { + await _host.StopAsync(); + } + } + private static IHost _host; + public static async Task StartUp() + { + _host = Host.CreateDefaultBuilder() + .ConfigureServices((hostBuilderContext, serviceCollection) => + { + //di,injection engines or services + serviceCollection.AddHostedService(); + + }) + .ConfigureLogging((hostContext, loggingBuilder) => + { + + + }) + .Build(); + + await _host.RunAsync(); + + + } + } +} + diff --git a/SubWorker.KevinUDemo/ScheduleQueue.cs b/SubWorker.KevinUDemo/ScheduleQueue.cs new file mode 100644 index 0000000..5426831 --- /dev/null +++ b/SubWorker.KevinUDemo/ScheduleQueue.cs @@ -0,0 +1,122 @@ +using System; +using System.Collections.Concurrent; +using System.Collections.Generic; +using System.Linq; +using System.Linq.Expressions; +using System.Text; +using System.Threading; +using System.Threading.Tasks; + +namespace SubWorker.KevinUDemo +{ + + public class ScheduleQueue : IDisposable + { + public Action Processing; + private ConcurrentDictionary _infos = new(); + private ConcurrentQueue _queue = new(); + + private int _sn = 0; + private ManualResetEvent _reset = new ManualResetEvent(false); + public bool Set() + { + try + { + _isWaiting = false; + return _reset.Set(); + } + catch { } + return false; + } + public bool WaitOne(TimeSpan timeout) + { + + try + { + _reset.Reset(); + _isWaiting = true; + return _reset.WaitOne(timeout); + } + catch { } + return false; + } + bool isdisposed = false; + object isdisposedLock = new object(); + public void End() + { + if (isdisposed) return; + lock (isdisposedLock) + { + if (isdisposed) return; + isdisposed = true; + } + + Set(); + _reset.Dispose(); + } + public bool IsWaiting => _isWaiting; + private bool _isWaiting = false; + + //將待處裡任務enqueue + //用.net 自己調度的task + public Task Enqueue(T thisJob) + { + _queue.Enqueue(thisJob); + if (IsWaiting) + { + Set(); + return Task.CompletedTask; + } + if (_infos.Count > 8) + { + return Task.CompletedTask; + } + if (_queue.Count > 0) + { + var serial = Interlocked.Increment(ref _sn); + + _infos.TryAdd(serial, thisJob); + + new Thread(delegate () + { + while (isdisposed) + { + if (_queue.TryDequeue(out var jobItem)) + { + Processing.Invoke(jobItem); + + } + if (_queue.Count == 0) + { + if (WaitOne(TimeSpan.FromSeconds(30))) continue; + if (_queue.Count == 0) break; + } + } + _infos.TryRemove(serial, out var jobItemRemoved); + End(); + + + }).Start(); + + } + return Task.CompletedTask; + + } + + + public void Dispose() + { + if (isdisposed) return; + lock (isdisposedLock) + { + if (isdisposed) return; + isdisposed = true; + } + + while (_queue.TryDequeue(out var waitToDo)) ; + + } + } + + +} diff --git a/SubWorker.KevinUDemo/SubWorker.KevinUDemo.csproj b/SubWorker.KevinUDemo/SubWorker.KevinUDemo.csproj new file mode 100644 index 0000000..3afd409 --- /dev/null +++ b/SubWorker.KevinUDemo/SubWorker.KevinUDemo.csproj @@ -0,0 +1,16 @@ + + + + Exe + net5.0 + + + + + + + + + + +