Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

A bunch of fixes from production usage #3 #31

Merged
merged 15 commits into from
Nov 21, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Codecs/Image/Codecs.Image.csproj
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
<Project Sdk="Microsoft.NET.Sdk">
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<OutputType>Library</OutputType>
<TargetFramework>net8.0</TargetFramework>
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<Optimize>true</Optimize>
<Optimize>False</Optimize>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

Expand Down
15 changes: 14 additions & 1 deletion Common/Classes/Disposables/BaseDisposable.cs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ The above copyright notice and this permission notice shall be included in all c
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

#endregion

Expand All @@ -60,7 +61,7 @@ namespace Media.Common
/// </remarks>
[CLSCompliant(true)]
[System.Runtime.InteropServices.ComVisible(true)]
public abstract class BaseDisposable : IDisposed
public abstract class BaseDisposable : IDisposed, IAsyncDisposable
{
#region Constants / Statics

Expand Down Expand Up @@ -357,6 +358,18 @@ void IDisposable.Dispose()
Destruct();
}

/// <summary>
/// Allows derived implemenations a chance to destory manged or unmanged resources.
/// Calls <see cref="Destruct"/> if not <see cref="IsFinalized"/>, <see cref="IsUndisposed"/>, <see cref="ShouldDispose"/>, and not <see cref="IsDisposed"/>
/// </summary>
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
public virtual ValueTask DisposeAsync()
{
Destruct();

return ValueTask.CompletedTask;
}

/// <summary>
/// Indicates if the instance is not yet disposed, only checks the virtual constraint if <see cref="State"/> indicates the instance is not already diposed or finalized.
/// </summary>
Expand Down
2 changes: 1 addition & 1 deletion Common/Common.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
</PropertyGroup>

<PropertyGroup Condition="'$(Configuration)|$(Platform)'=='Debug|AnyCPU'">
<Optimize>true</Optimize>
<Optimize>false</Optimize>
<AllowUnsafeBlocks>true</AllowUnsafeBlocks>
</PropertyGroup>

Expand Down
27 changes: 11 additions & 16 deletions Common/Extensions/ThreadExtensions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -76,37 +76,32 @@ public static void AbortAndFree(ref System.Threading.Thread thread, System.Threa
if (thread != null && (thread.IsAlive && thread.ThreadState.HasFlag(state)))
{
//Attempt to join
if (false == thread.Join(timeout))
if (thread.Join(timeout) is false)
{
try
{
//Abort
thread.Abort();
}
catch (System.Threading.ThreadAbortException) { System.Threading.Thread.ResetAbort(); }
catch { throw; } //Cancellation not supported
throw new System.PlatformNotSupportedException("Thread.Abort is not supported. Ensure your thread has stopped.");
}

//Reset the state of the thread to indicate success
thread = null;
}
}

public static void AbortAndFree(ref System.Threading.Thread thread, System.TimeSpan timeout, System.Threading.ThreadState state = System.Threading.ThreadState.Stopped)
public static void AbortAndFree(ref System.Threading.Thread thread, System.TimeSpan timeout,
System.Threading.ThreadState state = System.Threading.ThreadState.Stopped)
{
//If the worker IsAlive and has doesn't have the requested state.
if (thread is not null &&
false.Equals(thread.ThreadState.HasFlag(state)))
thread.ThreadState.HasFlag(state) is false)
{
//Attempt to join if not already, todo check flags are compatible in all implementations.
if (false.Equals(thread.ThreadState.HasFlag(System.Threading.ThreadState.AbortRequested | System.Threading.ThreadState.Aborted)) &&
if (thread
.ThreadState
.HasFlag(System.Threading.ThreadState.AbortRequested |
System.Threading.ThreadState.Aborted) is false &&
IsRunning(thread) &&
false.Equals(thread.Join(timeout)))
thread.Join(timeout) is false)
{
//Abort
try { thread.Abort(); }
catch (System.Threading.ThreadAbortException) { System.Threading.Thread.ResetAbort(); }
catch { throw; } //Cancellation not supported
throw new System.PlatformNotSupportedException("Thread.Abort is not supported. Ensure your thread has stopped.");
}
}

Expand Down
17 changes: 0 additions & 17 deletions Concepts/Classes/Threading/Threading.cs
Original file line number Diff line number Diff line change
Expand Up @@ -139,23 +139,6 @@ public void Start(System.Threading.Thread thread)
thread.Priority = (System.Threading.ThreadPriority)RunningPriority;
}


/// <summary>
/// Sets the Priority to <see cref="AbortPriority"/> and call Abort
/// </summary>
/// <param name="thread"></param>
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
public void RaiseAbort(System.Threading.Thread thread)
{
if (thread is null || thread.ThreadState.HasFlag(System.Threading.ThreadState.Aborted) || thread.ThreadState.HasFlag(System.Threading.ThreadState.AbortRequested)) return;

thread.Priority = (System.Threading.ThreadPriority)AbortPriority;

try { thread.Abort(); }
catch (System.Threading.ThreadAbortException) { System.Threading.Thread.ResetAbort(); }
catch { throw; }
}

/// <summary>
/// Sets the Priority and calls Join.
/// </summary>
Expand Down
15 changes: 3 additions & 12 deletions Concepts/Classes/Timer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,7 @@ public System.TimeSpan Frequency
void Count()
{
//Todo, make JumpExpression ... along with TypedReferenceExpression
//System.Action ShouldStop = () => if (false.Equals(m_Enabled.Equals(uint.MinValue))) goto Approximate; System.Threading.Thread.ResetAbort();
//System.Action ShouldStop = () => if (false.Equals(m_Enabled.Equals(uint.MinValue))) goto Approximate;

System.Threading.Thread Event = null;

Expand Down Expand Up @@ -227,8 +227,7 @@ void Count()
}
catch (System.SystemException se)
{
if (se is System.Threading.ThreadAbortException) System.Threading.Thread.ResetAbort();
else if (se is System.Threading.ThreadInterruptedException | false.Equals(m_Enabled.Equals(uint.MinValue))) goto Approximate;
if (se is System.Threading.ThreadInterruptedException | false.Equals(m_Enabled.Equals(uint.MinValue))) goto Approximate;
else if (se is System.OutOfMemoryException)
{
if ((ulong)Producer.Count > approximate) Producer.Clear();
Expand Down Expand Up @@ -322,15 +321,7 @@ protected override void Dispose(bool disposing)

Stop();

try { m_Counter.Abort(m_Frequency); }
catch (System.Threading.ThreadAbortException) { System.Threading.Thread.ResetAbort(); }
catch { }
finally
{
Tick = null;
}


Tick = null;
}

}
Expand Down
11 changes: 6 additions & 5 deletions Http/HttpClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@ public class HttpClient : Common.SuppressedFinalizerDisposable, Common.ISocketRe
/// <param name="socket"></param>
internal static void ConfigureHttpSocket(Socket socket)
{
if (socket == null) throw new ArgumentNullException("Socket");
ArgumentNullException.ThrowIfNull(socket);

Media.Common.Extensions.Socket.SocketExtensions.EnableAddressReuse(socket);
Media.Common.Extensions.Exception.ExceptionExtensions.ResumeOnError(() => Media.Common.Extensions.Socket.SocketExtensions.EnableAddressReuse(socket));
//socket.ExclusiveAddressUse = false;
//socket.SetSocketOption(SocketOptionLevel.Socket, SocketOptionName.ReuseAddress, true);

Expand Down Expand Up @@ -56,13 +56,14 @@ internal static void ConfigureHttpSocket(Socket socket)
//Media.Common.Extensions.Socket.SocketExtensions.EnableTcpCongestionAlgorithm(socket);

// Set option that allows socket to close gracefully without lingering.
Media.Common.Extensions.Socket.SocketExtensions.DisableLinger(socket);
Media.Common.Extensions.Exception.ExceptionExtensions.ResumeOnError(() => Media.Common.Extensions.Socket.SocketExtensions.DisableLinger(socket));

//Retransmit for 0 sec
if(Common.Extensions.OperatingSystemExtensions.IsWindows) Media.Common.Extensions.Socket.SocketExtensions.DisableTcpRetransmissions(socket);
if(Common.Extensions.OperatingSystemExtensions.IsWindows)
Media.Common.Extensions.Exception.ExceptionExtensions.ResumeOnError(() => Media.Common.Extensions.Socket.SocketExtensions.DisableTcpRetransmissions(socket));

//If both send and receieve buffer size are 0 then there is no coalescing when nagle's algorithm is disabled
Media.Common.Extensions.Socket.SocketExtensions.DisableTcpNagelAlgorithm(socket);
Media.Common.Extensions.Exception.ExceptionExtensions.ResumeOnError(() => Media.Common.Extensions.Socket.SocketExtensions.DisableTcpNagelAlgorithm(socket));
//socket.NoDelay = true;

//Allow more than one byte of urgent data
Expand Down
91 changes: 36 additions & 55 deletions Rtp/RtpClient.Methods.cs
Original file line number Diff line number Diff line change
Expand Up @@ -392,21 +392,20 @@ context.Goodbye is not null &&
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
internal protected /*virtual*/ TransportContext GetContextBySourceId(int sourceId)
{
if (Common.IDisposedExtensions.IsNullOrDisposed(this)) return null;

RtpClient.TransportContext c = null;
if (Common.IDisposedExtensions.IsNullOrDisposed(this))
return null;

for (int i = TransportContexts.Count - 1; i >= 0; --i)
{
c = TransportContexts[i];
RtpClient.TransportContext c = TransportContexts[i];

if (Common.IDisposedExtensions.IsNullOrDisposed(c) is false &&
c.SynchronizationSourceIdentifier.Equals(sourceId) || c.RemoteSynchronizationSourceIdentifier.Equals(sourceId)) break;

c = null;
if (Common.IDisposedExtensions.IsNullOrDisposed(c) is false &&
(c.SynchronizationSourceIdentifier == sourceId ||
c.RemoteSynchronizationSourceIdentifier == sourceId))
return c;
}

return c;
return null;
}

//DataChannel ControlChannel or overload?
Expand Down Expand Up @@ -1109,10 +1108,9 @@ public int SendRtpPacket(RtpPacket packet, TransportContext context)
if (m_StopRequested is false && IsActive) return;

//Create the worker thread
m_WorkerThread = new System.Threading.Thread(new System.Threading.ThreadStart(SendReceieve))
m_WorkerThread = new System.Threading.Thread(SendReceieve)
{
Name = "RtpClient-" + InternalId,

//Start highest.
Priority = System.Threading.ThreadPriority.Highest
};
Expand All @@ -1127,23 +1125,16 @@ public int SendRtpPacket(RtpPacket packet, TransportContext context)
m_WorkerThread.Start();

//Wait for thread to actually start
while (IsActive is false) m_EventReady.Wait(Common.Extensions.TimeSpan.TimeSpanExtensions.OneTick);
while (IsActive is false)
m_EventReady.Wait(Common.Extensions.TimeSpan.TimeSpanExtensions.OneTick);

//Could also use the Join but would have to add logic in the thread to handle this.
//m_WorkerThread.Join(Common.Extensions.TimeSpan.TimeSpanExtensions.OneTick);

#region Unused Feature [Early Rtcp]

//Should allow to be overridden by option or otherwise, should not be required.

//Send the initial senders report, needs to check the SessionDescription to determine if sending is supported..
//SendSendersReports();

//Send the initial receivers report, needs to check the SessionDescription to see if recieve is supported...
//SendReceiversReports();

#endregion
}
catch (System.ObjectDisposedException) { return; }
catch (System.ObjectDisposedException)
{
return;
}
catch (System.Exception ex)
{
Common.ILoggingExtensions.LogException(Logger, ex);
Expand All @@ -1158,13 +1149,14 @@ public int SendRtpPacket(RtpPacket packet, TransportContext context)
[System.Runtime.CompilerServices.MethodImpl(System.Runtime.CompilerServices.MethodImplOptions.AggressiveInlining)]
public void Deactivate()
{
if (Common.IDisposedExtensions.IsNullOrDisposed(this) || false == IsActive) return;
if (Common.IDisposedExtensions.IsNullOrDisposed(this) || IsActive is false) return;

SendGoodbyes();

m_StopRequested = true;

foreach (TransportContext tc in TransportContexts) if (tc.IsActive) tc.DisconnectSockets();
foreach (TransportContext tc in TransportContexts)
if (tc.IsActive) tc.DisconnectSockets();

Media.Common.Extensions.Thread.ThreadExtensions.TryAbortAndFree(ref m_WorkerThread);

Expand Down Expand Up @@ -1202,11 +1194,11 @@ int ReadApplicationLayerFraming(ref int received, ref int sessionRequired, ref i
context = null;

//The channel of the frame - The Framing Method
frameChannel = default(byte);
frameChannel = default;

raisedEvent = false;

buffer = buffer ?? m_Buffer.Array;
buffer ??= m_Buffer.Array;

int bufferLength = buffer.Length, bufferOffset = offset;

Expand All @@ -1215,13 +1207,13 @@ int ReadApplicationLayerFraming(ref int received, ref int sessionRequired, ref i
//Assume given enough for sessionRequired

//Todo Determine from Context to use control channel and length. (Check MediaDescription)
//NEEDS TO HANDLE CASES WHERE RFC4571 Framing are in play and no $ or Channel are used....
//NEEDS TO HANDLE CASES WHERE RFC4571 Framing are in play and no $ or Channel are used....
//int sessionRequired = InterleavedOverhead;

if (received <= 0 || sessionRequired < 0 || received < sessionRequired) return -1;

//Look for the frame control octet
int startOfFrame = System.Array.IndexOf<byte>(buffer, BigEndianFrameControl, bufferOffset, received);
int startOfFrame = System.Array.IndexOf(buffer, BigEndianFrameControl, bufferOffset, received);

//If not found everything belongs to the upper layer
if (startOfFrame == -1)
Expand All @@ -1234,7 +1226,9 @@ int ReadApplicationLayerFraming(ref int received, ref int sessionRequired, ref i
//Indicate the amount of data consumed.
return received;
}
else if (startOfFrame > bufferOffset) // If the start of the frame is not at the beginning of the buffer

// If the start of the frame is not at the beginning of the buffer
if (startOfFrame > bufferOffset)
{
//Determine the amount of data which belongs to the upper layer
int upperLayerData = startOfFrame - bufferOffset;
Expand All @@ -1245,14 +1239,20 @@ int ReadApplicationLayerFraming(ref int received, ref int sessionRequired, ref i

raisedEvent = true;

//Indicate length from offset until next possible frame. (should always be positive, if somehow -1 is returned this will signal a end of buffer to callers)
//Indicate length from offset until next possible frame.
//(should always be positive, if somehow -1 is returned this will
//signal a end of buffer to callers)

//If there is more data related to upperLayerData it will be evented in the next run. (See RtspClient ProcessInterleaveData notes)
//If there is more data related to upperLayerData it will be evented
//in the next run. (See RtspClient ProcessInterleaveData notes)
return upperLayerData;
}

//If there is not enough data for a frame header return
if (bufferOffset + sessionRequired > bufferLength) return -1;
if (bufferOffset + sessionRequired > bufferLength)
{
return -1;
}

//TODO if RFC4571 is specified do check here to avoid reading channel.

Expand Down Expand Up @@ -2196,7 +2196,6 @@ void HandleEvents()
HandleEvent();
}
}
catch (System.Threading.ThreadAbortException) { System.Threading.Thread.ResetAbort(); Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@HandleEvents Aborted"); }
catch (System.Exception ex) { Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@HandleEvents: " + ex.Message); goto Begin; }
}
}
Expand Down Expand Up @@ -2276,7 +2275,7 @@ void SendReceieve()
bool duplexing, rtpEnabled, rtcpEnabled;

//Until aborted
while (false.Equals(shouldStop = IsUndisposed is false || m_StopRequested))
while ((shouldStop = IsUndisposed is false || m_StopRequested) is false)
{
//Keep how much time has elapsed thus far
System.TimeSpan taken = System.DateTime.UtcNow - lastOperation;
Expand Down Expand Up @@ -2704,26 +2703,8 @@ void SendReceieve()
#endregion
}
}
catch (System.Threading.ThreadAbortException)
{
System.Threading.Thread.ResetAbort();

if (critical) System.Threading.Thread.EndCriticalRegion();

Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@SendRecieve Aborted");
}
catch (System.Exception ex)
{
//if (ex is SocketException)
//{
// SocketException se = ex as SocketException;

// if (se.SocketErrorCode == SocketError.ConnectionAborted || se.SocketErrorCode == SocketError.ConnectionReset)
// {
// return;
// }
//}

Media.Common.ILoggingExtensions.Log(Logger, ToString() + "@SendRecieve: " + ex.Message);

if (critical) System.Threading.Thread.EndCriticalRegion();
Expand Down
Loading
Loading