Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 7 additions & 0 deletions SchedulingPractice.sln
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,8 @@ Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubWorker.AndyDemo", "SubWo
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SchedulingPractice.SubWorkerRunner", "SchedulingPractice.SubWorkerRunner\SchedulingPractice.SubWorkerRunner.csproj", "{5D349423-56A0-4E03-A59F-7D1AF95B78E7}"
EndProject
Project("{9A19103F-16F7-4668-BE54-9A1E7A4F7556}") = "SubWorker.KevinUDemo", "SubWorker.KevinUDemo\SubWorker.KevinUDemo.csproj", "{CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}"
EndProject
Global
GlobalSection(SolutionConfigurationPlatforms) = preSolution
Debug|Any CPU = Debug|Any CPU
Expand Down Expand Up @@ -86,6 +88,10 @@ Global
{5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Debug|Any CPU.Build.0 = Debug|Any CPU
{5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Release|Any CPU.ActiveCfg = Release|Any CPU
{5D349423-56A0-4E03-A59F-7D1AF95B78E7}.Release|Any CPU.Build.0 = Release|Any CPU
{CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}.Debug|Any CPU.ActiveCfg = Debug|Any CPU
{CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}.Debug|Any CPU.Build.0 = Debug|Any CPU
{CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}.Release|Any CPU.ActiveCfg = Release|Any CPU
{CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C}.Release|Any CPU.Build.0 = Release|Any CPU
EndGlobalSection
GlobalSection(SolutionProperties) = preSolution
HideSolutionNode = FALSE
Expand All @@ -99,6 +105,7 @@ Global
{7D80EF21-E6C8-46EC-8179-67D328B15335} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B}
{32F4B1E1-4A5B-4695-9BCD-1A76F8566420} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B}
{29711E67-24F9-4736-AAC8-BBE6831A4DEF} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B}
{CE0F52A5-6DB3-4D5F-8DC5-8ECD8F73771C} = {5A225F28-B864-4306-B0C7-83D9CBDABF5B}
EndGlobalSection
GlobalSection(ExtensibilityGlobals) = postSolution
SolutionGuid = {43B102A2-F68E-4926-9EAA-6763FDDEF0B4}
Expand Down
160 changes: 160 additions & 0 deletions SubWorker.KevinUDemo/Program.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,160 @@
using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Linq;
using System.Linq.Expressions;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Hosting;
using SchedulingPractice.Core;

namespace SubWorker.KevinUDemo
{/* 5 instances
Jobs Scheduling Metrics:

--(action count)----------------------------------------------
- CREATE: 873
- ACQUIRE_SUCCESS: 873
- ACQUIRE_FAILURE: 739
- COMPLETE: 873
- QUERYJOB: 4375
- QUERYLIST: 90

--(state count)----------------------------------------------
- COUNT(CREATE): 0
- COUNT(LOCK): 0
- COUNT(COMPLETE): 873

--(statistics)----------------------------------------------
- DELAY(Average): 160
- DELAY(Stdev): 171.2532911466342

--(test result)----------------------------------------------
- Complete Job: True, 873 / 873
- Delay Too Long: 0
- Fail Job: True, 0

--(benchmark score)----------------------------------------------
- Exec Cost Score: 20765 (querylist x 100 + acquire-failure x 10 + queryjob x 1)
- Efficient Score: 331.25 (average + stdev)
*/
//using IHostedService instead of BackgroundService for the practice
public class ScheduleEngine : IHostedService
{
static Lazy<ScheduleQueue<JobInfo>> _que = new ();

private void ProcessingWork(int id)
{
using (JobsRepo repo = new JobsRepo())
{
var nowInfo = repo.GetJob(id);

if (nowInfo.State == 0)
{
if (repo.AcquireJobLock(id))
{
repo.ProcessLockedJob(nowInfo.Id);
Console.WriteLine(
$"[T: {Thread.CurrentThread.ManagedThreadId}] process job({nowInfo.Id}) with delay {(DateTime.Now - nowInfo.RunAt).TotalMilliseconds} msec...");

}
}
}
}

public async Task StartAsync(CancellationToken cancellationToken)
{
Console.WriteLine("Application is running....");
await Task.Delay(3000);
Random rnd = new Random();
Stopwatch timer = new Stopwatch();
using (JobsRepo repo = new JobsRepo())
{
while (true)
{
Console.WriteLine($"[T: {Thread.CurrentThread.ManagedThreadId}] fetch available jobs from repository...");

foreach (var job in repo.GetReadyJobs(JobSettings.MinPrepareTime))
{
var now = DateTime.Now;
int predict_time = rnd.Next(300, 1700);
if (job.RunAt - DateTime.Now > TimeSpan.FromMilliseconds(predict_time)) // 等到約一秒前,可以被取消。一秒內就先 LOCK
{
try
{
await Task.Delay(job.RunAt - DateTime.Now - TimeSpan.FromMilliseconds(predict_time), cancellationToken);
}
catch { }
}
if (repo.GetJob(job.Id).State != 0) continue;
// if (repo.AcquireJobLock(job.Id) == false) continue;
if (DateTime.Now < job.RunAt) await Task.Delay(job.RunAt - DateTime.Now);
_que.Value.Processing = ((info) =>
{
ProcessingWork(info.Id);
});
await _que.Value.Enqueue(job);


}
try
{
await Task.Delay(
(int)Math.Max((JobSettings.MinPrepareTime - timer.Elapsed).TotalMilliseconds, 0),
cancellationToken);
}
catch
{
await StopAsync(cancellationToken);
}
}

}

}

public Task StopAsync(CancellationToken cancellationToken)
{
_que.Value.Dispose();
return Task.CompletedTask;
}
}
class Program
{
static async Task Main(string[] args)
{
Console.WriteLine("Starting Application....");
try
{
await StartUp();
}
catch (Exception e)
{
await _host.StopAsync();
}
}
private static IHost _host;
public static async Task StartUp()
{
_host = Host.CreateDefaultBuilder()
.ConfigureServices((hostBuilderContext, serviceCollection) =>
{
//di,injection engines or services
serviceCollection.AddHostedService<ScheduleEngine>();

})
.ConfigureLogging((hostContext, loggingBuilder) =>
{


})
.Build();

await _host.RunAsync();


}
}
}

122 changes: 122 additions & 0 deletions SubWorker.KevinUDemo/ScheduleQueue.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
using System;
using System.Collections.Concurrent;
using System.Collections.Generic;
using System.Linq;
using System.Linq.Expressions;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

namespace SubWorker.KevinUDemo
{

public class ScheduleQueue<T> : IDisposable
{
public Action<T> Processing;
private ConcurrentDictionary<int, T> _infos = new();
private ConcurrentQueue<T> _queue = new();

private int _sn = 0;
private ManualResetEvent _reset = new ManualResetEvent(false);
public bool Set()
{
try
{
_isWaiting = false;
return _reset.Set();
}
catch { }
return false;
}
public bool WaitOne(TimeSpan timeout)
{

try
{
_reset.Reset();
_isWaiting = true;
return _reset.WaitOne(timeout);
}
catch { }
return false;
}
bool isdisposed = false;
object isdisposedLock = new object();
public void End()
{
if (isdisposed) return;
lock (isdisposedLock)
{
if (isdisposed) return;
isdisposed = true;
}

Set();
_reset.Dispose();
}
public bool IsWaiting => _isWaiting;
private bool _isWaiting = false;

//將待處裡任務enqueue
//用.net 自己調度的task
public Task Enqueue(T thisJob)
{
_queue.Enqueue(thisJob);
if (IsWaiting)
{
Set();
return Task.CompletedTask;
}
if (_infos.Count > 8)
{
return Task.CompletedTask;
}
if (_queue.Count > 0)
{
var serial = Interlocked.Increment(ref _sn);

_infos.TryAdd(serial, thisJob);

new Thread(delegate ()
{
while (isdisposed)
{
if (_queue.TryDequeue(out var jobItem))
{
Processing.Invoke(jobItem);

}
if (_queue.Count == 0)
{
if (WaitOne(TimeSpan.FromSeconds(30))) continue;
if (_queue.Count == 0) break;
}
}
_infos.TryRemove(serial, out var jobItemRemoved);
End();


}).Start();

}
return Task.CompletedTask;

}


public void Dispose()
{
if (isdisposed) return;
lock (isdisposedLock)
{
if (isdisposed) return;
isdisposed = true;
}

while (_queue.TryDequeue(out var waitToDo)) ;

}
}


}
16 changes: 16 additions & 0 deletions SubWorker.KevinUDemo/SubWorker.KevinUDemo.csproj
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Exe</OutputType>
<TargetFramework>net5.0</TargetFramework>
</PropertyGroup>

<ItemGroup>
<PackageReference Include="Microsoft.Extensions.Hosting" Version="5.0.0" />
</ItemGroup>

<ItemGroup>
<ProjectReference Include="..\SchedulingPractice.Core\SchedulingPractice.Core.csproj" />
</ItemGroup>

</Project>