-
Notifications
You must be signed in to change notification settings - Fork 693
/
Copy pathtest_journalist_mail.py
245 lines (216 loc) · 8.45 KB
/
test_journalist_mail.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
import os
import time
import pytest
import testinfra
# DRY declaration of why we're skipping all these tests.
# For details, see https://github.com/freedomofpress/securedrop/issues/3689
SKIP_REASON = "unimplemented, see GH#3689"
class TestBase:
@pytest.fixture(autouse=True)
def only_mon_staging_sudo(self, host): # noqa: PT004
if host.backend.host != "mon-staging":
pytest.skip()
with host.sudo():
yield
def ansible(self, host, module, parameters):
r = host.ansible(module, parameters, check=False)
assert "exception" not in r
def run(self, host, cmd):
print(host.backend.host + " running: " + cmd)
r = host.run(cmd)
print(r.stdout)
print(r.stderr)
return r.rc == 0
def wait_for(self, fun):
success = False
for d in (1, 2, 4, 8, 16, 32, 64):
if fun():
success = True
break
time.sleep(d)
return success
def wait_for_command(self, host, cmd):
return self.wait_for(lambda: self.run(host, cmd))
#
# implementation note: we do not use host.ansible("service", ...
# because it only works for services in /etc/init and not those
# legacy only found in /etc/init.d such as postfix
#
def service_started(self, host, name):
assert self.run(host, f"service {name} start")
assert self.wait_for_command(host, f"service {name} status | grep -q 'is running'")
def service_restarted(self, host, name):
assert self.run(host, f"service {name} restart")
assert self.wait_for_command(host, f"service {name} status | grep -q 'is running'")
def service_stopped(self, host, name):
assert self.run(host, f"service {name} stop")
assert self.wait_for_command(host, f"service {name} status | grep -q 'not running'")
class TestJournalistMail(TestBase):
@pytest.mark.skip(reason=SKIP_REASON)
def test_procmail(self, host):
self.service_started(host, "postfix")
for destination, f in (
("journalist", "alert-journalist-one.txt"),
("journalist", "alert-journalist-two.txt"),
("ossec", "alert-ossec.txt"),
):
# Look up CWD, in case tests move in the future
current_dir = os.path.dirname(os.path.abspath(__file__))
self.ansible(host, "copy", "dest=/tmp/{f} src={d}/{f}".format(f=f, d=current_dir))
assert self.run(host, "/var/ossec/process_submissions_today.sh forget")
assert self.run(host, "postsuper -d ALL")
assert self.run(host, f"cat /tmp/{f} | mail -s 'abc' root@localhost")
assert self.wait_for_command(host, f"mailq | grep -q {destination}@ossec.test")
self.service_stopped(host, "postfix")
@pytest.mark.skip(reason=SKIP_REASON)
def test_process_submissions_today(self, host):
assert self.run(host, "/var/ossec/process_submissions_today.sh " "test_handle_notification")
assert self.run(
host, "/var/ossec/process_submissions_today.sh " "test_modified_in_the_past_24h"
)
@pytest.mark.skip(reason=SKIP_REASON)
def test_send_encrypted_alert(self, host):
self.service_started(host, "postfix")
src = "../../install_files/ansible-base/roles/ossec/files/" "test_admin_key.sec"
self.ansible(host, "copy", f"dest=/tmp/test_admin_key.sec src={src}")
self.run(host, "gpg --homedir /var/ossec/.gnupg" " --import /tmp/test_admin_key.sec")
def trigger(who, payload):
assert self.run(host, f"! mailq | grep -q {who}@ossec.test")
assert self.run(
host,
"""
( echo 'Subject: TEST' ; echo ; echo -e '{payload}' ) | \
/var/ossec/send_encrypted_alarm.sh {who}
""".format(
who=who, payload=payload
),
)
assert self.wait_for_command(host, f"mailq | grep -q {who}@ossec.test")
#
# encrypted mail to journalist or ossec contact
#
for who, payload, expected in (
("journalist", "JOURNALISTPAYLOAD", "JOURNALISTPAYLOAD"),
("ossec", "OSSECPAYLOAD", "OSSECPAYLOAD"),
):
assert self.run(host, "postsuper -d ALL")
trigger(who, payload)
assert self.run(
host,
"""
job=$(mailq | sed -n -e '2p' | cut -f1 -d ' ')
postcat -q $job | tee /dev/stderr | \
gpg --homedir /var/ossec/.gnupg --decrypt 2>&1 | \
grep -q {expected}
""".format(
expected=expected
),
)
#
# failure to encrypt must trigger an emergency mail to ossec contact
#
try:
assert self.run(host, "postsuper -d ALL")
assert self.run(host, "mv /usr/bin/gpg /usr/bin/gpg.save")
trigger(who, "MYGREATPAYLOAD")
assert self.run(
host,
"""
job=$(mailq | sed -n -e '2p' | cut -f1 -d ' ')
postcat -q $job | grep -q 'Failed to encrypt OSSEC alert'
""",
)
finally:
assert self.run(host, "mv /usr/bin/gpg.save /usr/bin/gpg")
self.service_stopped(host, "postfix")
@pytest.mark.skip(reason=SKIP_REASON)
def test_missing_journalist_alert(self, host):
#
# missing journalist mail does nothing
#
assert self.run(
host,
"""
JOURNALIST_EMAIL= \
bash -x /var/ossec/send_encrypted_alarm.sh journalist | \
tee /dev/stderr | \
grep -q 'no notification sent'
""",
)
# https://ossec-docs.readthedocs.io/en/latest/manual/rules-decoders/testing.html
@pytest.mark.skip(reason=SKIP_REASON)
def test_ossec_rule_journalist(self, host):
assert self.run(
host,
"""
set -ex
l="ossec: output: 'head -1 /var/lib/securedrop/submissions_today.txt"
echo "$l" | /var/ossec/bin/ossec-logtest
echo "$l" | /var/ossec/bin/ossec-logtest -U '400600:1:ossec'
""",
)
@pytest.mark.skip(reason=SKIP_REASON)
def test_journalist_mail_notification(self, host):
mon = host
app = testinfra.host.Host.get_host(
"ansible://app-staging", ansible_inventory=host.backend.ansible_inventory
)
#
# run ossec & postfix on mon
#
self.service_started(mon, "postfix")
self.service_started(mon, "ossec")
#
# ensure the submission_today.txt file exists
#
with app.sudo():
assert self.run(
app,
"""
cd /var/www/securedrop
./manage.py were-there-submissions-today
test -f /var/lib/securedrop/submissions_today.txt
""",
)
#
# empty the mailq on mon in case there were leftovers
#
assert self.run(mon, "postsuper -d ALL")
#
# forget about past notifications in case there were leftovers
#
assert self.run(mon, "/var/ossec/process_submissions_today.sh forget")
#
# the command fires every time ossec starts,
# regardless of the frequency
# https://github.com/ossec/ossec-hids/issues/1415
#
with app.sudo():
self.service_restarted(app, "ossec")
#
# wait until at exactly one notification is sent
#
assert self.wait_for_command(mon, "mailq | grep -q [email protected]")
assert self.run(mon, "test 1 = $(mailq | grep [email protected] | wc -l)")
assert self.run(
mon, "grep --count 'notification suppressed' /var/log/syslog " "> /tmp/before"
)
#
# The second notification within less than 24h is suppressed
#
with app.sudo():
self.service_restarted(app, "ossec")
assert self.wait_for_command(
mon,
"""
grep --count 'notification suppressed' /var/log/syslog > /tmp/after
test $(cat /tmp/before) -lt $(cat /tmp/after)
""",
)
#
# teardown the ossec and postfix on mon and app
#
self.service_stopped(mon, "postfix")
self.service_stopped(mon, "ossec")
with app.sudo():
self.service_stopped(app, "ossec")