Skip to content

Commit

Permalink
Merge pull request #50 from the9ball/feat/commandmode
Browse files Browse the repository at this point in the history
feat: Add CommandMode to Context
  • Loading branch information
neuecc authored Apr 12, 2024
2 parents 34dbd43 + f2163e6 commit bc68d6f
Show file tree
Hide file tree
Showing 9 changed files with 33 additions and 28 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -380,6 +380,7 @@ It can check on `...` drawer on each worker result.
```csharp
public class WorkloadContext
{
public CommandMode CommandMode { get; }
public ExecutionId ExecutionId { get; }
public WorkloadId WorkloadId { get; }
public int WorkloadCount { get; }
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public DFrameControllerExecutionEngine(ILoggerFactory loggerFactory, IExecutionR
this.eventMessagePublisher = eventMessagePublisher;
}

public bool StartWorkerFlow(string workloadName, int concurrency, long totalRequestCount, int workerLimit, KeyValuePair<string, string?>[] parameters)
public bool StartWorkerFlow(CommandMode commandMode, string workloadName, int concurrency, long totalRequestCount, int workerLimit, KeyValuePair<string, string?>[] parameters)
{
lock (EngineSync)
{
Expand Down Expand Up @@ -136,10 +136,10 @@ public bool StartWorkerFlow(string workloadName, int concurrency, long totalRequ
broadcaster = globalGroup.CreateBroadcasterTo<IWorkerReceiver>(connectionIds);
}

broadcaster.CreateWorkloadAndSetup(executionId, createWorkloadCount, concurrency, totalRequestCount, workloadName, parameters!);
broadcaster.CreateWorkloadAndSetup(commandMode, executionId, createWorkloadCount, concurrency, totalRequestCount, workloadName, parameters!);
StateChanged?.Invoke();
}

}

eventMessagePublisher.Publish(new(EventMessage.ControllerEventMessageType.WorkflowStarted, LatestExecutionSummary));

return true;
Expand Down
10 changes: 9 additions & 1 deletion src/DFrame.Controller/HubDefinitions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ public interface IControllerHub : IStreamingHub<IControllerHub, IWorkerReceiver>

public interface IWorkerReceiver
{
void CreateWorkloadAndSetup(ExecutionId executionId, int createCount, int concurrency, long totalRequestCount, string workloadName, KeyValuePair<string, string>[] parameters);
void CreateWorkloadAndSetup(CommandMode commandMode, ExecutionId executionId, int createCount, int concurrency, long totalRequestCount, string workloadName, KeyValuePair<string, string>[] parameters);
void Execute(long[] executeCount); // exec count per workload
void Stop();
void Teardown();
Expand Down Expand Up @@ -268,4 +268,12 @@ public enum AllowParameterType
DateTime,
String,
}

public enum CommandMode
{
Request,
Repeat,
Duration,
InfiniteLoop
}
}
12 changes: 2 additions & 10 deletions src/DFrame.Controller/Pages/Index.razor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ async Task HandleExecute()

var totalRequest = (vm.CommandMode == CommandMode.InfiniteLoop || vm.CommandMode == CommandMode.Duration) ? long.MaxValue : vm.TotalRequest;

var okToStart = engine.StartWorkerFlow(vm.SelectedWorkload, vm.Concurrency, totalRequest, vm.RequestWorkerLimit, parameters!);
var okToStart = engine.StartWorkerFlow(vm.CommandMode, vm.SelectedWorkload, vm.Concurrency, totalRequest, vm.RequestWorkerLimit, parameters!);
if (!okToStart)
{
logger.LogInformation("Invalid parameters, does not run workflow.");
Expand Down Expand Up @@ -123,7 +123,7 @@ void WatchStateChangedForRepeat()
{
if (state.TryMoveNextRepeat())
{
var okToStart = engine.StartWorkerFlow(state.Workload, state.Concurrency, state.TotalRequest, state.WorkerLimit, state.Parameters!);
var okToStart = engine.StartWorkerFlow(CommandMode.Repeat, state.Workload, state.Concurrency, state.TotalRequest, state.WorkerLimit, state.Parameters!);
if (okToStart)
{
return;
Expand Down Expand Up @@ -207,14 +207,6 @@ public bool TryMoveNextRepeat()
}
}

public enum CommandMode
{
Request,
Repeat,
Duration,
InfiniteLoop
}

public class IndexViewModel : IDisposable
{
readonly IExecutionResultHistoryProvider historyProvider;
Expand Down
14 changes: 7 additions & 7 deletions src/DFrame.Controller/RestApi.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,14 +16,14 @@ public static void RegisterRestApi(WebApplication app)
// mode
app.MapPost("api/request", (DFrameControllerExecutionEngine engine, [FromBody] RequestBody request) =>
{
StartRequest(engine, request.Workload, request.Concurrency, request.TotalRequest, request.Workerlimit, request.Parameters, out var result);
StartRequest(engine, CommandMode.Request, request.Workload, request.Concurrency, request.TotalRequest, request.Workerlimit, request.Parameters, out var result);
return result;
});

app.MapPost("api/repeat", (DFrameControllerExecutionEngine engine, [FromBody] RepeatBody request) =>
{
var workerLimit = request.Workerlimit ?? engine.CurrentConnectingCount;
var ok = StartRequest(engine, request.Workload, request.Concurrency, request.TotalRequest, workerLimit, request.Parameters, out var result);
var ok = StartRequest(engine, CommandMode.Repeat, request.Workload, request.Concurrency, request.TotalRequest, workerLimit, request.Parameters, out var result);
if (!ok) return result;

repeatModeState = new Pages.RepeatModeState(request.Workload, request.Concurrency, request.TotalRequest,
Expand All @@ -41,7 +41,7 @@ public static void RegisterRestApi(WebApplication app)
{
if (!repeatCancellation.IsCancellationRequested && state.TryMoveNextRepeat())
{
var okToStart = engine.StartWorkerFlow(state.Workload, state.Concurrency, state.TotalRequest, state.WorkerLimit, state.Parameters!);
var okToStart = engine.StartWorkerFlow(CommandMode.Repeat, state.Workload, state.Concurrency, state.TotalRequest, state.WorkerLimit, state.Parameters!);
if (okToStart)
{
return;
Expand All @@ -61,7 +61,7 @@ public static void RegisterRestApi(WebApplication app)
app.MapPost("api/duration", (DFrameControllerExecutionEngine engine, [FromBody] DurationBody request) =>
{
var totalRequest = long.MaxValue;
var ok = StartRequest(engine, request.Workload, request.Concurrency, totalRequest, request.Workerlimit, request.Parameters, out var result);
var ok = StartRequest(engine, CommandMode.Duration, request.Workload, request.Concurrency, totalRequest, request.Workerlimit, request.Parameters, out var result);
if (!ok) return result;

durationCancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(request.ExecuteTimeSeconds));
Expand All @@ -76,7 +76,7 @@ public static void RegisterRestApi(WebApplication app)
app.MapPost("api/infinite", (DFrameControllerExecutionEngine engine, [FromBody] InfiniteBody request) =>
{
var totalRequest = long.MaxValue;
StartRequest(engine, request.Workload, request.Concurrency, totalRequest, request.Workerlimit, request.Parameters, out var result);
StartRequest(engine, CommandMode.InfiniteLoop, request.Workload, request.Concurrency, totalRequest, request.Workerlimit, request.Parameters, out var result);
return result;
});

Expand Down Expand Up @@ -124,7 +124,7 @@ public static void RegisterRestApi(WebApplication app)
return new { summary = r.Value.Summary, results = r.Value.Results };
});

static bool StartRequest(DFrameControllerExecutionEngine engine, string workload, int concurrency, long totalRequest, int? workerlimit, Dictionary<string, string?>? parameters, out IResult result)
static bool StartRequest(DFrameControllerExecutionEngine engine, CommandMode commandMode, string workload, int concurrency, long totalRequest, int? workerlimit, Dictionary<string, string?>? parameters, out IResult result)
{
if (engine.IsRunning)
{
Expand All @@ -134,7 +134,7 @@ static bool StartRequest(DFrameControllerExecutionEngine engine, string workload

try
{
var ok = engine.StartWorkerFlow(workload, concurrency, totalRequest, workerlimit ?? engine.CurrentConnectingCount, parameters?.ToArray() ?? Array.Empty<KeyValuePair<string, string?>>());
var ok = engine.StartWorkerFlow(commandMode, workload, concurrency, totalRequest, workerlimit ?? engine.CurrentConnectingCount, parameters?.ToArray() ?? Array.Empty<KeyValuePair<string, string?>>());
if (!ok)
{
result = Results.BadRequest("can not start.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ async Task ConnectAsync(CancellationToken applicationLifeTime)
logger.LogInformation($"Connect completed.");
}

async void IWorkerReceiver.CreateWorkloadAndSetup(ExecutionId executionId, int createCount, int concurrency, long totalRequestCount, string workloadName, KeyValuePair<string, string>[] parameters)
async void IWorkerReceiver.CreateWorkloadAndSetup(CommandMode commandMode, ExecutionId executionId, int createCount, int concurrency, long totalRequestCount, string workloadName, KeyValuePair<string, string>[] parameters)
{
ThreadPoolUtility.SetMinThread(createCount * options.VirtualProcess * 2);

Expand All @@ -277,7 +277,7 @@ async void IWorkerReceiver.CreateWorkloadAndSetup(ExecutionId executionId, int c
for (int i = 0; i < createCount; i++)
{
var workload = description.Activator.Value.Invoke(serviceProvider, description.CrateArgument(parameters));
var t = (new WorkloadContext(executionId, createCount, i, concurrency, totalRequestCount, workloadLifeTime!.Token), (Workload)workload);
var t = (new WorkloadContext(commandMode, executionId, createCount, i, concurrency, totalRequestCount, workloadLifeTime!.Token), (Workload)workload);
workloads.Add(t);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace DFrame
{
public class WorkloadContext
{
public CommandMode CommandMode { get; }
public ExecutionId ExecutionId { get; }
public WorkloadId WorkloadId { get; }
public int WorkloadCount { get; }
Expand All @@ -13,8 +14,9 @@ public class WorkloadContext
public long TotalRequestCount { get; }
public CancellationToken CancellationToken { get; }

public WorkloadContext(ExecutionId executionId, int count, int index, int concurrency, long totalRequestCount, CancellationToken cancellationToken)
public WorkloadContext(CommandMode commandMode, ExecutionId executionId, int count, int index, int concurrency, long totalRequestCount, CancellationToken cancellationToken)
{
this.CommandMode = commandMode;
this.ExecutionId = executionId;
this.WorkloadId = WorkloadId.New();
this.WorkloadCount = count;
Expand Down
4 changes: 2 additions & 2 deletions src/DFrame.Worker/DFrameWorkerApp.cs
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,7 @@ async Task ConnectAsync(CancellationToken applicationLifeTime)
logger.LogInformation($"Connect completed.");
}

async void IWorkerReceiver.CreateWorkloadAndSetup(ExecutionId executionId, int createCount, int concurrency, long totalRequestCount, string workloadName, KeyValuePair<string, string>[] parameters)
async void IWorkerReceiver.CreateWorkloadAndSetup(CommandMode commandMode, ExecutionId executionId, int createCount, int concurrency, long totalRequestCount, string workloadName, KeyValuePair<string, string>[] parameters)
{
ThreadPoolUtility.SetMinThread(createCount * options.VirtualProcess * 2);

Expand All @@ -277,7 +277,7 @@ async void IWorkerReceiver.CreateWorkloadAndSetup(ExecutionId executionId, int c
for (int i = 0; i < createCount; i++)
{
var workload = description.Activator.Value.Invoke(serviceProvider, description.CrateArgument(parameters));
var t = (new WorkloadContext(executionId, createCount, i, concurrency, totalRequestCount, workloadLifeTime!.Token), (Workload)workload);
var t = (new WorkloadContext(commandMode, executionId, createCount, i, concurrency, totalRequestCount, workloadLifeTime!.Token), (Workload)workload);
workloads.Add(t);
}

Expand Down
4 changes: 3 additions & 1 deletion src/DFrame.Worker/WorkloadContext.cs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ namespace DFrame
{
public class WorkloadContext
{
public CommandMode CommandMode { get; }
public ExecutionId ExecutionId { get; }
public WorkloadId WorkloadId { get; }
public int WorkloadCount { get; }
Expand All @@ -13,8 +14,9 @@ public class WorkloadContext
public long TotalRequestCount { get; }
public CancellationToken CancellationToken { get; }

public WorkloadContext(ExecutionId executionId, int count, int index, int concurrency, long totalRequestCount, CancellationToken cancellationToken)
public WorkloadContext(CommandMode commandMode, ExecutionId executionId, int count, int index, int concurrency, long totalRequestCount, CancellationToken cancellationToken)
{
this.CommandMode = commandMode;
this.ExecutionId = executionId;
this.WorkloadId = WorkloadId.New();
this.WorkloadCount = count;
Expand Down

0 comments on commit bc68d6f

Please sign in to comment.