Skip to content

Commit

Permalink
fix(process-explorer) - Fixed tests, decreased Timeout, added excepti…
Browse files Browse the repository at this point in the history
…on handling, added ProcessExtensions class for testing
  • Loading branch information
lilla28 committed Apr 9, 2024
1 parent 0bf538e commit 91dc056
Show file tree
Hide file tree
Showing 8 changed files with 142 additions and 77 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,8 @@ public abstract class ProcessInfoMonitor : IProcessInfoMonitor
private readonly ObservableCollection<int> _processIds = new();
private readonly object _processIdsLocker = new();
private readonly Subject<KeyValuePair<int, ProcessStatus>> _processIdsSubject = new();

public IObservable<KeyValuePair<int, ProcessStatus>> ProcessIds => _processIdsSubject;


protected ProcessInfoMonitor(ILogger? logger)
{
_logger = logger ?? NullLogger.Instance;
Expand Down Expand Up @@ -151,23 +149,41 @@ public virtual void WatchProcesses(int mainProcessId)
/// </summary>
/// <param name="processId"></param>
/// <returns></returns>
private bool IsComposeProcess(int processId)
private bool IsComposeProcess(int processId, ReadOnlySpan<int> checkedIds)
{
//snapshot if the process has already exited
if (Process.GetProcessById(processId) == null) return false;
try
{
if (ContainsId(processId)) return true;

var process = Process.GetProcessById(processId);
if (process.Id == 0) return false;

var parentId = GetParentId(processId, process.ProcessName);

if (parentId == null) return false;

if (ContainsId(processId)) return true;
var parentProcessId = Convert.ToInt32(parentId);

var process = Process.GetProcessById(processId);
if (process.Id == 0) return false;
if (checkedIds.Contains(parentProcessId)) return false;

var parentProcessId = GetParentId(processId, process.ProcessName);
if (ContainsId(parentProcessId)) return true;

if (parentProcessId == null || parentProcessId == 0) return false;
var processIds = checkedIds
.ToArray()
.Append(parentProcessId)
.ToArray();

if (ContainsId((int)parentProcessId)) return true;
return IsComposeProcess(parentProcessId, processIds);
}
catch (Exception exception)
{
if (_logger.IsEnabled(LogLevel.Debug))
{
_logger.LogDebug(exception, $"Exception thrown while resolving if process: {processId} or it's parentProcessId is ComposeUI process.");
}

return IsComposeProcess(Convert.ToInt32(parentProcessId));
return false;
}
}

/// <summary>
Expand All @@ -185,7 +201,7 @@ public bool CheckIfIsComposeProcess(int processId)
{
try
{
return IsComposeProcess(processId);
return IsComposeProcess(processId, Array.Empty<int>());
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ public Task AddConnections(string assemblyId, IEnumerable<IConnectionInfo> conne
Connections = { connections.Select(conn => conn.DeriveProtoConnectionType()) }
};

return UpdateInfoOnUI(handler => handler.SendMessage(message));
return UpdateInfoOnUI(async handler => await handler.SendMessage(message));
}
catch (Exception exception)
{
Expand Down Expand Up @@ -86,7 +86,7 @@ public Task AddProcess(ProcessInfoData process)
Processes = { list }
};

return UpdateInfoOnUI(handler => handler.SendMessage(message));
return UpdateInfoOnUI(async handler => await handler.SendMessage(message));
}
catch (Exception exception)
{
Expand Down Expand Up @@ -131,7 +131,7 @@ public Task AddProcesses(IEnumerable<ProcessInfoData> processes)
Processes = { processes.Select(proc => proc.DeriveProtoProcessType()) }
};

return UpdateInfoOnUI(handler => handler.SendMessage(message));
return UpdateInfoOnUI(async handler => await handler.SendMessage(message));
}
catch (Exception exception)
{
Expand All @@ -157,7 +157,7 @@ public Task AddRuntimeInfo(string assemblyId, ProcessExplorer.Abstractions.Entit
RuntimeInfo = runtimeInformation.DeriveProtoRuntimeInfoType()
};

return UpdateInfoOnUI(handler => handler.SendMessage(message));
return UpdateInfoOnUI(async handler => await handler.SendMessage(message));
}
catch (Exception exception)
{
Expand All @@ -182,7 +182,7 @@ public Task AddRuntimeInfo(IEnumerable<KeyValuePair<string, MorganStanley.Compos
MultipleRuntimeInfo = { runtimeInfo.DeriveProtoDictionaryType(ProtoConvertHelper.DeriveProtoRuntimeInfoType) }
};

return UpdateInfoOnUI(handler => handler.SendMessage(message));
return UpdateInfoOnUI(async handler => await handler.SendMessage(message));
}
catch (Exception exception)
{
Expand Down Expand Up @@ -506,7 +506,7 @@ private Task UpdateInfoOnUI(Func<IClientConnection<Message>, Task> handlerAction
{
try
{
return Task.WhenAll(CreateCopyOfClients().Select(handlerAction));
return Task.WhenAll(CreateCopyOfClients().Select(async(IClientConnection<Message> connection) => await handlerAction(connection)));
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,11 +48,13 @@ public override async Task Subscribe(Empty request, IServerStreamWriter<Message>
await _processInfoAggregator.UiHandler.AddProcesses(_processInfoAggregator.GetProcesses());
await _processInfoAggregator.UiHandler.AddRuntimeInfo(_processInfoAggregator.GetRuntimeInformation());
await _processInfoAggregator.UiHandler.AddSubsystems(
_processInfoAggregator.SubsystemController.GetSubsystems());
_processInfoAggregator.SubsystemController.GetSubsystems());

//wait here until the user is connected to the service
while (!context.CancellationToken.IsCancellationRequested)
{
continue;
}
}
catch (Exception exception)
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,35 +14,43 @@
using System.Diagnostics;
using System.IO;
using System.Threading;
using System.Threading.Tasks;
using Microsoft.Extensions.Logging;
using Moq;
using MorganStanley.ComposeUI.ProcessExplorer.Abstractions.Processes;
using MorganStanley.ComposeUI.ProcessExplorer.Core.Processes;
using MorganStanley.ComposeUI.ProcessExplorer.Core.Tests.Utils;
using Xunit;

namespace MorganStanley.ComposeUI.ProcessExplorer.Core.Tests.Processes;

public class WindowsProcessInfoManagerTests
{
[Fact]
public void AddChildProcesses_will_add_child_processes_to_the_list()
public async Task AddChildProcesses_will_add_child_processes_to_the_list()
{
var loggerMock = CreateLoggerMock();
var processMonitor = CreateWindowsProcessMonitor(loggerMock.Object);
var testApplication = Process.Start(GetTestApplicationPath()); //It will start an another child process
var testApplicationProcess = Process.Start(
new ProcessStartInfo
{
FileName = GetTestApplicationPath(),
RedirectStandardOutput = true,
}); //It will start an another child process

// Waiting for the child process to start
Thread.Sleep(100);
if (testApplicationProcess == null) throw new NullReferenceException(nameof(testApplicationProcess));

processMonitor.AddChildProcesses(testApplication.Id, testApplication.ProcessName);
await testApplicationProcess.WaitForMessageOfChildProcess("Hello world from ProcessExplorerTestApp2!"); //State when the ChildProcess is started

processMonitor.AddChildProcesses(testApplicationProcess.Id, testApplicationProcess.ProcessName);

var result = processMonitor.GetProcessIds().ToArray();

Assert.NotEmpty(result);
Assert.Equal(2, result.Length);
Assert.Contains(testApplication.Id, result);
Assert.Contains(testApplicationProcess.Id, result);

testApplication.Kill();
testApplicationProcess.Kill();
processMonitor.Dispose();
}

Expand Down Expand Up @@ -176,14 +184,23 @@ public void GetParentId_will_return_null()
}

[Fact]
public void SetProcessIds_will_set_the_ids_and_its_child_process_ids()
public async Task SetProcessIds_will_set_the_ids_and_its_child_process_ids()
{
var loggerMock = CreateLoggerMock();
var processMonitor = CreateWindowsProcessMonitor(loggerMock.Object);

var testApplication = Process.Start(GetTestApplicationPath()); //It will start a process
var testApplicationProcess = Process.Start(
new ProcessStartInfo
{
FileName = GetTestApplicationPath(),
RedirectStandardOutput = true,
}); //It will start an another child process

if (testApplicationProcess == null) throw new NullReferenceException(nameof(testApplicationProcess));

await testApplicationProcess.WaitForMessageOfChildProcess("Hello world from ProcessExplorerTestApp2!");

var processes = new[] { testApplication.Id };
var processes = new[] { testApplicationProcess.Id };

// Waiting for the test process to start
Thread.Sleep(1000);
Expand All @@ -192,11 +209,13 @@ public void SetProcessIds_will_set_the_ids_and_its_child_process_ids()
var result = processMonitor.GetProcessIds().ToArray();

foreach (var process in processes)
{
Assert.Contains(process, result);
}

Assert.Equal(3, result.Length);

testApplication.Kill();
testApplicationProcess.Kill();
processMonitor.Dispose();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,24 @@
// and limitations under the License.

using System.Diagnostics;
using System.Threading.Tasks;

namespace MorganStanley.ComposeUI.TestConsoleApp;
namespace MorganStanley.ComposeUI.ProcessExplorer.Core.Tests.Utils;

internal static class Helper
internal static class ProcessExtensions
{
[Conditional("DEBUG")]
public static void IsDebug(ref bool isDebug) => isDebug = true;
public static async Task WaitForMessageOfChildProcess(this Process process, string expectedOutputLine)
{
using (var streamReader = process.StandardOutput)
{
if (streamReader == null) return;

var line = string.Empty;
while (line != expectedOutputLine)
{
if (line == null) break;
line = await streamReader.ReadLineAsync();
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

using System;
using System.Collections.Generic;
using System.Diagnostics;
using System.Threading;
using System.Threading.Tasks;
using Google.Protobuf.Collections;
Expand Down Expand Up @@ -39,11 +40,13 @@ public class ServerEndToEndTests : IAsyncLifetime
public async Task DisposeAsync()
{
if (_host != null)
{
await _host.StopAsync();
}
}

//TODO(Lilla): investigate why the integrationtests are keeping to fail on CI, but works everytime locally.
[Fact(Skip = "Local end to end test")]
//TODO
[Fact]
public async Task Client_can_connect()
{
var client = CreateGrpcClient();
Expand All @@ -52,7 +55,7 @@ public async Task Client_can_connect()

var messages = new List<Message>();

using var call = client.Subscribe(new Empty(), cancellationToken: cancellationTokenSource.Token);
using var call = client.Subscribe(new Empty(), cancellationToken: CancellationToken.None);

// We want to receive the message, that the subscription is established, and we do not want to wait.
// Due to that no processes/subsystems/runtime information have not been declared it will just receive the subscription alive notification
Expand All @@ -62,16 +65,19 @@ public async Task Client_can_connect()
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
messages.Add(message);
break;
}
}
catch (RpcException) { }
catch (RpcException exception)
{
Debug.WriteLine(exception.ToString());
}

Assert.True(messages.Count >= 1);
Assert.Equal(ActionType.SubscriptionAliveAction, messages[0].Action);
}

//TODO(Lilla): investigate why the integrationtests are keeping to fail on CI, but works everytime locally.
[Fact(Skip = "Local end to end test")]
[Fact]
public async Task Client_can_subscribe_and_receive_messages()
{
// defining here some dummy subsystems to trigger the ProcessExplorer backend to send information about it to the defined ui connections. (not just the subscription alive notification)
Expand All @@ -98,25 +104,24 @@ public async Task Client_can_subscribe_and_receive_messages()
await aggregator.SubsystemController.InitializeSubsystems(subsystems);

var client = CreateGrpcClient();
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(2));
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
var messages = new List<Message>();

using var call = client.Subscribe(new Empty(), cancellationToken: cancellationTokenSource.Token);

//try catch block to avoid OperationCanceledException due to that we are just waiting for 2 seconds
try
var countOfMessages = 0;
await foreach (var message in call.ResponseStream.ReadAllAsync())
{
await foreach (var message in call.ResponseStream.ReadAllAsync())
messages.Add(message);
countOfMessages++;

if (countOfMessages == 2)
{
messages.Add(message);
break;
}
}
catch (RpcException) { }

// We just need to receive SubscriptionAlive and a subsystems collection, skipping if that some error occurred


Assert.True(messages.Count >= 2);
Assert.Equal(ActionType.SubscriptionAliveAction, messages[0].Action);
Assert.Equal(ActionType.AddSubsystemsAction, messages[1].Action);
Assert.Single(messages[1].Subsystems);
Expand All @@ -125,7 +130,6 @@ public async Task Client_can_subscribe_and_receive_messages()
//In Proto3, all fields are optional and have a default value. For example, a string field has a default value of empty string ("") and an int field has a default value of zero (0).
//If you want to create a proto message without a certain field, you have to set its value to the default value.
var result = messages[1].Subsystems[dummyId.ToString()];

Assert.NotNull(result);
Assert.Equal(dummySubsystemInfo.Name, result.Name);
Assert.Equal(dummySubsystemInfo.State, result.State);
Expand All @@ -138,25 +142,18 @@ public async Task Client_can_subscribe_and_receive_messages()
Assert.Empty(result.Description);
}

[Fact(Skip = "Local end to end test")]
[Fact]
public void Client_can_send_message()
{
var client = CreateGrpcClient();
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(2));

var message = new Message()
{
Action = ActionType.SubscriptionAliveAction,
Description = "dummy message"
};

Empty? result = null;
try
{
result = client.Send(message, cancellationToken: cancellationTokenSource.Token);
}
catch (RpcException) { }

var result = client.Send(message);
Assert.NotNull(result);
Assert.IsType<Empty>(result);
}
Expand All @@ -183,7 +180,6 @@ private ProcessExplorerMessageHandler.ProcessExplorerMessageHandlerClient Create
{
var channel = GrpcChannel.ForAddress($"http://{Host}:{Port}/");
var client = new ProcessExplorerMessageHandler.ProcessExplorerMessageHandlerClient(channel);

return client;
}

Expand Down
Loading

0 comments on commit 91dc056

Please sign in to comment.