-
Notifications
You must be signed in to change notification settings - Fork 0
/
Copy pathProgram.fs
129 lines (107 loc) · 5.4 KB
/
Program.fs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
// Learn more about F# at http://fsharp.org
open System
open Microsoft.Azure.ServiceBus
open System.Text
open System.Threading.Tasks
open System.Threading
open FSharp.Control.Tasks.V2.ContextInsensitive
let sendMessage (connectionString :string) queueName sessionId messageId label (message :string) =
async {
let queueClient = QueueClient(connectionString, queueName)
try
let message = Message(Encoding.UTF8.GetBytes message)
message.CorrelationId <- Guid.NewGuid().ToString()
message.SessionId <- sessionId
message.MessageId <- messageId
message.ContentType <- "application/json"
message.Label <- label
return! queueClient.SendAsync(message) |> Async.AwaitTask
do! queueClient.CloseAsync() |> Async.AwaitTask
with e ->
printf "%s" e.Message
do! queueClient.CloseAsync() |> Async.AwaitTask
}
let printMessage name queueName sessionId messageId message =
task {
printfn
"""
####################### %s ######################
Queue: %s.
SessionId: %s
MessageId: %s
Message: %s
#################################################
"""
name
queueName
sessionId
messageId
message
let random = Random()
return random.Next(0,100) > 5 }
let receiveMessage name (connectionString : string) =
let exceptionReceivedHandler (args : ExceptionReceivedEventArgs) =
let context = args.ExceptionReceivedContext
let error =
sprintf "Error receiving a message from Azure Service Bus. Context is:\r\nAction: %s\r\nClientId: %s\r\nEndpoint: %s\r\nEntityPath: %s"
context.Action
context.ClientId
context.Endpoint
context.EntityPath
printf """%s\r\n%A""" error args.Exception.Message
Task.CompletedTask
let processMessage (queueClient : IQueueClient) (session : IMessageSession) (message : Message) (_ : CancellationToken) =
task {
let! r = printMessage name queueClient.QueueName session.SessionId message.MessageId (Encoding.UTF8.GetString message.Body)
match r with
| true ->
return! session.CompleteAsync(message.SystemProperties.LockToken)
if message.MessageId.Contains("Msg3")
then return! session.CloseAsync()
| false -> return! session.AbandonAsync(message.SystemProperties.LockToken)
do! Task.Delay(5000)
} :> Task
let f queueName =
let queueClient = QueueClient(connectionString, queueName)
try
let sessionHandlerOptions = new SessionHandlerOptions(fun x -> exceptionReceivedHandler x)
sessionHandlerOptions.MaxConcurrentSessions <- 1
sessionHandlerOptions.AutoComplete <- false
let processMessage = processMessage queueClient
queueClient.RegisterSessionHandler(processMessage, sessionHandlerOptions)
with e -> printfn "ERROR RegisterSessionHandler: %O" e
f
let sendKingMessages connectionString queueName =
async {
do! sendMessage connectionString queueName "King's Session" "Msg1/King" "This is message 1" "I'm taking to you from heaven (or cloud??)"
// same message deduplicated
do! sendMessage connectionString queueName "King's Session" "Msg1/King" "This is message 1" "I'm taking to you from heaven (or cloud??)"
do! sendMessage connectionString queueName "King's Session" "Msg1/King" "This is message 1" "I'm taking to you from heaven (or cloud??)"
do! sendMessage connectionString queueName "King's Session" "Msg2/King" "This is message 2" "Still talking"
do! sendMessage connectionString queueName "King's Session" "Msg3/King" "This is message 3" "You don't hear me!"
}
let sendQueenMessages connectionString queueName =
async {
do! sendMessage connectionString queueName "Queen's Session" "Msg1/Queen" "This is message 1" "I'm taking to you from heaven (or cloud??)"
// same message deduplicated
do! sendMessage connectionString queueName "Queen's Session" "Msg1/Queen" "This is message 1" "I'm taking to you from heaven (or cloud??)"
do! sendMessage connectionString queueName "Queen's Session" "Msg2/Queen" "This is message 2" "Still talking"
do! sendMessage connectionString queueName "Queen's Session" "Msg3/Queen" "This is message 3" "I don't hear you"
}
let sendKingdomMessages connectionString queueName=
async {
do! Task.Delay(2000) |> Async.AwaitTask
do! sendQueenMessages connectionString queueName
do! sendKingMessages connectionString queueName
}
[<EntryPoint>]
let main _ =
printfn "Hello Microsoft Azure Service bus from F#!"
let connectionString = "Endpoint=sb://<name space>.servicebus.windows.net/;SharedAccessKeyName=<shared access key name>;SharedAccessKey=<shared access key>"
let queueName = "kingdom_messenger"
receiveMessage "receiver 1" connectionString queueName
receiveMessage "receiver 2" connectionString queueName
sendKingdomMessages connectionString queueName
|> Async.RunSynchronously
Console.ReadLine() |> ignore
0 // return an integer exit code