diff --git a/changes/34ba15e2aef8b5c6be4d0274db884f84.yaml b/changes/34ba15e2aef8b5c6be4d0274db884f84.yaml new file mode 100644 index 00000000000..87e11e76496 --- /dev/null +++ b/changes/34ba15e2aef8b5c6be4d0274db884f84.yaml @@ -0,0 +1,5 @@ +--- +desc: Refactored backup streaming logic and error handling. +prs: [] +type: feat +... diff --git a/changes/5433695fd5f2187f2cd9f7a4ba0ae4d9.yaml b/changes/5433695fd5f2187f2cd9f7a4ba0ae4d9.yaml new file mode 100644 index 00000000000..08ef0211ca3 --- /dev/null +++ b/changes/5433695fd5f2187f2cd9f7a4ba0ae4d9.yaml @@ -0,0 +1,5 @@ +--- +desc: Fixed SIGINT handling in the ``synapse.tools.storm`` CLI tool. +prs: [] +type: bug +... diff --git a/changes/9d85121addff53793ad88883586796cc.yaml b/changes/9d85121addff53793ad88883586796cc.yaml new file mode 100644 index 00000000000..2b05e194f4a --- /dev/null +++ b/changes/9d85121addff53793ad88883586796cc.yaml @@ -0,0 +1,7 @@ +--- +desc: Updated the Storm ``parallel`` command behavior to avoid creating empty pipelines + when there are fewer inbound nodes than the number of pipelines specified by the + ``--size`` argument. +prs: [] +type: feat +... diff --git a/changes/bd45773cff22d0cf547370e90918ba58.yaml b/changes/bd45773cff22d0cf547370e90918ba58.yaml new file mode 100644 index 00000000000..56ff78936af --- /dev/null +++ b/changes/bd45773cff22d0cf547370e90918ba58.yaml @@ -0,0 +1,5 @@ +--- +desc: Added a patch for Python ``http.cookies`` module to address CVE-2024-7592 exposure. +prs: [] +type: bug +... diff --git a/changes/e25248c44d5ba33a43ae937a0bb25fd5.yaml b/changes/e25248c44d5ba33a43ae937a0bb25fd5.yaml new file mode 100644 index 00000000000..1019c9ead55 --- /dev/null +++ b/changes/e25248c44d5ba33a43ae937a0bb25fd5.yaml @@ -0,0 +1,6 @@ +--- +desc: Fixed an issue where certain exceptions raised while calling a function in Storm + were not providing appropriate details about the origin of the exception. +prs: [] +type: bug +... diff --git a/pyproject.toml b/pyproject.toml index d05e5c234de..bbb5acdabc4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -35,7 +35,7 @@ dependencies = [ 'aiohttp-socks>=0.9.0,<0.10.0', 'aioimaplib>=1.1.0,<1.2.0', 'aiosmtplib>=3.0.0,<3.1.0', - 'prompt-toolkit>=3.0.4,<3.1.0', + 'prompt_toolkit>=3.0.29,<3.1.0', 'lark==1.2.2', 'Pygments>=2.7.4,<2.18.0', 'packaging>=20.0,<25.0', diff --git a/requirements.txt b/requirements.txt index 45b4f71eba4..bd52fb24e1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ aiohttp>=3.10.0,<4.0 aiohttp-socks>=0.9.0,<0.10.0 aioimaplib>=1.1.0,<1.2.0 aiosmtplib>=3.0.0,<3.1.0 -prompt-toolkit>=3.0.4,<3.1.0 +prompt_toolkit>=3.0.29,<3.1.0 lark==1.2.2 Pygments>=2.7.4,<2.18.0 fastjsonschema>=2.18.0,<2.20.0 diff --git a/synapse/common.py b/synapse/common.py index 602bed9445a..75f2c46201c 100644 --- a/synapse/common.py +++ b/synapse/common.py @@ -29,6 +29,8 @@ import contextlib import collections +import http.cookies + import yaml import regex @@ -38,6 +40,8 @@ import synapse.lib.structlog as s_structlog import synapse.vendor.cpython.lib.ipaddress as ipaddress +import synapse.vendor.cpython.lib.http.cookies as v_cookies + try: from yaml import CSafeLoader as Loader @@ -1218,6 +1222,17 @@ def trimText(text: str, n: int = 256, placeholder: str = '...') -> str: assert n > plen return f'{text[:mlen]}{placeholder}' +def _patch_http_cookies(): + ''' + Patch stdlib http.cookies._unquote from the 3.11.10 implementation if + the interpreter we are using is not patched for CVE-2024-7592. + ''' + if not hasattr(http.cookies, '_QuotePatt'): + return + http.cookies._unquote = v_cookies._unquote + +_patch_http_cookies() + # TODO: Switch back to using asyncio.wait_for when we are using py 3.12+ # This is a workaround for a race where asyncio.wait_for can end up # ignoring cancellation https://github.com/python/cpython/issues/86296 diff --git a/synapse/cortex.py b/synapse/cortex.py index 64a2d9f87b1..603da3dee6d 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -2240,7 +2240,7 @@ async def _onSetStormCmd(self, cdef): ''' name = cdef.get('name') - await self._setStormCmd(cdef) + self._setStormCmd(cdef) self.cmddefs.set(name, cdef) async def _reqStormCmd(self, cdef): @@ -2483,7 +2483,7 @@ def filtercmpr(sode): async for sodes in self._mergeSodes(layers, genrs, cmprkey_indx, filtercmpr, reverse=reverse): yield sodes - async def _setStormCmd(self, cdef): + def _setStormCmd(self, cdef): ''' Note: No change control or persistence @@ -2543,13 +2543,9 @@ def getStorNode(form): name = cdef.get('name') self.stormcmds[name] = ctor - await self.fire('core:cmd:change', cmd=name, act='add') - - async def _popStormCmd(self, name): + def _popStormCmd(self, name): self.stormcmds.pop(name, None) - await self.fire('core:cmd:change', cmd=name, act='del') - async def delStormCmd(self, name): ''' Remove a previously set pure storm command. @@ -2575,8 +2571,6 @@ async def _delStormCmd(self, name): self.cmddefs.pop(name) self.stormcmds.pop(name, None) - await self.fire('core:cmd:change', cmd=name, act='del') - async def addStormPkg(self, pkgdef, verify=False): ''' Add the given storm package to the cortex. @@ -2630,11 +2624,11 @@ async def _addStormPkg(self, pkgdef): olddef = self.pkgdefs.get(name, None) if olddef is not None: if s_hashitem.hashitem(pkgdef) != s_hashitem.hashitem(olddef): - await self._dropStormPkg(olddef) + self._dropStormPkg(olddef) else: return - await self.loadStormPkg(pkgdef) + self.loadStormPkg(pkgdef) self.pkgdefs.set(name, pkgdef) self._clearPermDefs() @@ -2664,7 +2658,7 @@ async def _delStormPkg(self, name): if pkgdef is None: return - await self._dropStormPkg(pkgdef) + self._dropStormPkg(pkgdef) self._clearPermDefs() @@ -2713,7 +2707,7 @@ def getDataModel(self): async def _tryLoadStormPkg(self, pkgdef): try: await self._normStormPkg(pkgdef, validstorm=False) - await self.loadStormPkg(pkgdef) + self.loadStormPkg(pkgdef) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise @@ -2881,7 +2875,9 @@ async def _normStormPkg(self, pkgdef, validstorm=True): for configvar in pkgdef.get('configvars', ()): self._reqStormPkgVarType(pkgname, configvar.get('type')) - async def loadStormPkg(self, pkgdef): + # N.B. This function is intentionally not async in order to prevent possible user race conditions for code + # executing outside of the nexus lock. + def loadStormPkg(self, pkgdef): ''' Load a storm package into the storm library for this cortex. @@ -2911,7 +2907,7 @@ async def loadStormPkg(self, pkgdef): self.stormmods = stormmods for cdef in cmds: - await self._setStormCmd(cdef) + self._setStormCmd(cdef) for gdef in pkgdef.get('graphs', ()): gdef = copy.deepcopy(gdef) @@ -2937,7 +2933,9 @@ async def _onload(): await self.fire('core:pkg:onload:complete', pkg=name) self.schedCoro(_onload()) - async def _dropStormPkg(self, pkgdef): + # N.B. This function is intentionally not async in order to prevent possible user race conditions for code + # executing outside of the nexus lock. + def _dropStormPkg(self, pkgdef): ''' Reverse the process of loadStormPkg() ''' @@ -2948,7 +2946,7 @@ async def _dropStormPkg(self, pkgdef): for cdef in pkgdef.get('commands', ()): name = cdef.get('name') - await self._popStormCmd(name) + self._popStormCmd(name) pkgname = pkgdef.get('name') @@ -4435,7 +4433,7 @@ async def _initPureStormCmds(self): async def _trySetStormCmd(self, name, cdef): try: - await self._setStormCmd(cdef) + self._setStormCmd(cdef) except (asyncio.CancelledError, Exception): logger.exception(f'Storm command load failed: {name}') diff --git a/synapse/exc.py b/synapse/exc.py index fe115b6305e..8c5b2ef752e 100644 --- a/synapse/exc.py +++ b/synapse/exc.py @@ -56,9 +56,14 @@ def setdefault(self, name, valu): self.errinfo[name] = valu self._setExcMesg() + def update(self, items: dict): + '''Update multiple items in the errinfo dict at once.''' + self.errinfo.update(**items) + self._setExcMesg() + class StormRaise(SynErr): ''' - This represents a user provided exception inside of a Storm runtime. It requires a errname key. + This represents a user provided exception raised in the Storm runtime. It requires a errname key. ''' def __init__(self, *args, **info): SynErr.__init__(self, *args, **info) diff --git a/synapse/lib/ast.py b/synapse/lib/ast.py index bf85543c01a..dce4d2eed94 100644 --- a/synapse/lib/ast.py +++ b/synapse/lib/ast.py @@ -60,7 +60,7 @@ def getPosInfo(self): def addExcInfo(self, exc): if 'highlight' not in exc.errinfo: - exc.errinfo['highlight'] = self.getPosInfo() + exc.set('highlight', self.getPosInfo()) return exc def repr(self): @@ -3579,10 +3579,23 @@ async def compute(self, runt, path): kwargs = {k: v for (k, v) in await self.kids[2].compute(runt, path)} with s_scope.enter({'runt': runt}): - retn = func(*argv, **kwargs) - if s_coro.iscoro(retn): - return await retn - return retn + try: + retn = func(*argv, **kwargs) + if s_coro.iscoro(retn): + return await retn + return retn + + except TypeError as e: + mesg = str(e) + if (funcpath := getattr(func, '_storm_funcpath', None)) is not None: + mesg = f"{funcpath}(){mesg.split(')', 1)[1]}" + + raise self.addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) + + except s_exc.SynErr as e: + if getattr(func, '_storm_runtime_lib_func', None) is not None: + e.errinfo.pop('highlight', None) + raise self.addExcInfo(e) class DollarExpr(Value): ''' @@ -4891,8 +4904,9 @@ async def once(): @s_stormtypes.stormfunc(readonly=True) async def realfunc(*args, **kwargs): - return await self.callfunc(runt, argdefs, args, kwargs) + return await self.callfunc(runt, argdefs, args, kwargs, realfunc._storm_funcpath) + realfunc._storm_funcpath = self.name await runt.setVar(self.name, realfunc) count = 0 @@ -4914,7 +4928,7 @@ def validate(self, runt): # var scope validation occurs in the sub-runtime pass - async def callfunc(self, runt, argdefs, args, kwargs): + async def callfunc(self, runt, argdefs, args, kwargs, funcpath): ''' Execute a function call using the given runtime. @@ -4925,7 +4939,7 @@ async def callfunc(self, runt, argdefs, args, kwargs): argcount = len(args) + len(kwargs) if argcount > len(argdefs): - mesg = f'{self.name}() takes {len(argdefs)} arguments but {argcount} were provided' + mesg = f'{funcpath}() takes {len(argdefs)} arguments but {argcount} were provided' raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) # Fill in the positional arguments @@ -4939,7 +4953,7 @@ async def callfunc(self, runt, argdefs, args, kwargs): valu = kwargs.pop(name, s_common.novalu) if valu is s_common.novalu: if defv is s_common.novalu: - mesg = f'{self.name}() missing required argument {name}' + mesg = f'{funcpath}() missing required argument {name}' raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) valu = defv @@ -4950,11 +4964,11 @@ async def callfunc(self, runt, argdefs, args, kwargs): # used a kwarg not defined. kwkeys = list(kwargs.keys()) if kwkeys[0] in posnames: - mesg = f'{self.name}() got multiple values for parameter {kwkeys[0]}' + mesg = f'{funcpath}() got multiple values for parameter {kwkeys[0]}' raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) plural = 's' if len(kwargs) > 1 else '' - mesg = f'{self.name}() got unexpected keyword argument{plural}: {",".join(kwkeys)}' + mesg = f'{funcpath}() got unexpected keyword argument{plural}: {",".join(kwkeys)}' raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) assert len(mergargs) == len(argdefs) diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 5fb20916b5e..78c6b0b6dca 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -2643,18 +2643,12 @@ def walkpath(path): walkpath(self.backdirn) return backups - async def iterBackupArchive(self, name, user): - - success = False - loglevel = logging.WARNING - - path = self._reqBackDirn(name) - cellguid = os.path.join(path, 'cell.guid') - if not os.path.isfile(cellguid): - mesg = 'Specified backup path has no cell.guid file.' - raise s_exc.BadArg(mesg=mesg, arg='path', valu=path) - + async def _streamBackupArchive(self, path, user, name): link = s_scope.get('link') + if link is None: + mesg = 'Link not found in scope. This API must be called via a CellApi.' + raise s_exc.SynErr(mesg=mesg) + linkinfo = await link.getSpawnInfo() linkinfo['logconf'] = await self._getSpawnLogConf() @@ -2662,42 +2656,42 @@ async def iterBackupArchive(self, name, user): ctx = multiprocessing.get_context('spawn') - proc = None - mesg = 'Streaming complete' - def getproc(): proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo)) proc.start() return proc + mesg = 'Streaming complete' + proc = await s_coro.executor(getproc) + cancelled = False try: - proc = await s_coro.executor(getproc) - await s_coro.executor(proc.join) + self.backlastuploaddt = datetime.datetime.now() + logger.debug(f'Backup streaming completed successfully for {name}') - except (asyncio.CancelledError, Exception) as e: + except asyncio.CancelledError: + logger.warning('Backup streaming was cancelled.') + cancelled = True + raise - # We want to log all exceptions here, an asyncio.CancelledError - # could be the result of a remote link terminating due to the - # backup stream being completed, prior to this function - # finishing. + except Exception as e: logger.exception('Error during backup streaming.') - - if proc: - proc.terminate() - mesg = repr(e) raise - else: - success = True - loglevel = logging.DEBUG - self.backlastuploaddt = datetime.datetime.now() - finally: - phrase = 'successfully' if success else 'with failure' - logger.log(loglevel, f'iterBackupArchive completed {phrase} for {name}') - raise s_exc.DmonSpawn(mesg=mesg) + proc.terminate() + + if not cancelled: + raise s_exc.DmonSpawn(mesg=mesg) + + async def iterBackupArchive(self, name, user): + path = self._reqBackDirn(name) + cellguid = os.path.join(path, 'cell.guid') + if not os.path.isfile(cellguid): + mesg = 'Specified backup path has no cell.guid file.' + raise s_exc.BadArg(mesg=mesg, arg='path', valu=path) + await self._streamBackupArchive(path, user, name) async def iterNewBackupArchive(self, user, name=None, remove=False): @@ -2708,9 +2702,6 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): if remove: self.backupstreaming = True - success = False - loglevel = logging.WARNING - if name is None: name = time.strftime('%Y%m%d%H%M%S', datetime.datetime.now().timetuple()) @@ -2719,10 +2710,6 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): mesg = 'Backup with name already exists' raise s_exc.BadArg(mesg=mesg) - link = s_scope.get('link') - linkinfo = await link.getSpawnInfo() - linkinfo['logconf'] = await self._getSpawnLogConf() - try: await self.runBackup(name) except Exception: @@ -2732,54 +2719,13 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): logger.debug(f'Removed {path}') raise - await self.boss.promote('backup:stream', user=user, info={'name': name}) - - ctx = multiprocessing.get_context('spawn') - - proc = None - mesg = 'Streaming complete' - - def getproc(): - proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo)) - proc.start() - return proc - - try: - proc = await s_coro.executor(getproc) - - await s_coro.executor(proc.join) - - except (asyncio.CancelledError, Exception) as e: - - # We want to log all exceptions here, an asyncio.CancelledError - # could be the result of a remote link terminating due to the - # backup stream being completed, prior to this function - # finishing. - logger.exception('Error during backup streaming.') - - if proc: - proc.terminate() - - mesg = repr(e) - raise - - else: - success = True - loglevel = logging.DEBUG - self.backlastuploaddt = datetime.datetime.now() - - finally: - if remove: - logger.debug(f'Removing {path}') - await s_coro.executor(shutil.rmtree, path, ignore_errors=True) - logger.debug(f'Removed {path}') - - phrase = 'successfully' if success else 'with failure' - logger.log(loglevel, f'iterNewBackupArchive completed {phrase} for {name}') - raise s_exc.DmonSpawn(mesg=mesg) + await self._streamBackupArchive(path, user, name) finally: if remove: + logger.debug(f'Removing {path}') + await s_coro.executor(shutil.rmtree, path, ignore_errors=True) + logger.debug(f'Removed {path}') self.backupstreaming = False async def isUserAllowed(self, iden, perm, gateiden=None, default=False): diff --git a/synapse/lib/cli.py b/synapse/lib/cli.py index 4feb1d217ff..ed5ed77d93c 100644 --- a/synapse/lib/cli.py +++ b/synapse/lib/cli.py @@ -281,18 +281,26 @@ async def _onItemFini(self): await self.fini() - async def addSignalHandlers(self): + async def addSignalHandlers(self): # pragma: no cover ''' Register SIGINT signal handler with the ioloop to cancel the currently running cmdloop task. + Removes the handler when the cli is fini'd. ''' - def sigint(): - self.printf('') if self.cmdtask is not None: self.cmdtask.cancel() self.loop.add_signal_handler(signal.SIGINT, sigint) + def onfini(): + # N.B. This is reaches into some loop / handle internals but + # prevents us from removing a handler that overwrote our own. + hndl = self.loop._signal_handlers.get(signal.SIGINT, None) # type: asyncio.Handle + if hndl is not None and hndl._callback is sigint: + self.loop.remove_signal_handler(signal.SIGINT) + + self.onfini(onfini) + def get(self, name, defval=None): return self.locs.get(name, defval) @@ -324,8 +332,12 @@ async def prompt(self, text=None): if text is None: text = self.cmdprompt - with patch_stdout(): - retn = await self.sess.prompt_async(text, vi_mode=self.vi_mode, enable_open_in_editor=True) + with patch_stdout(): # pragma: no cover + retn = await self.sess.prompt_async(text, + vi_mode=self.vi_mode, + enable_open_in_editor=True, + handle_sigint=False # We handle sigint in the loop + ) return retn def printf(self, mesg, addnl=True, color=None): @@ -390,7 +402,7 @@ async def runCmdLoop(self): self.cmdtask = self.schedCoro(coro) await self.cmdtask - except KeyboardInterrupt: + except (KeyboardInterrupt, asyncio.CancelledError): if self.isfini: return @@ -408,11 +420,8 @@ async def runCmdLoop(self): if self.cmdtask is not None: self.cmdtask.cancel() try: - self.cmdtask.result() - except asyncio.CancelledError: - # Wait a beat to let any remaining nodes to print out before we print the prompt - await asyncio.sleep(1) - except Exception: + await asyncio.wait_for(self.cmdtask, timeout=0.1) + except (asyncio.CancelledError, asyncio.TimeoutError): pass async def runCmdLine(self, line): diff --git a/synapse/lib/parser.py b/synapse/lib/parser.py index 798058c66b9..614f0ab7c3c 100644 --- a/synapse/lib/parser.py +++ b/synapse/lib/parser.py @@ -507,7 +507,7 @@ def _larkToSynExc(self, e): origexc = e.orig_exc if not isinstance(origexc, s_exc.SynErr): raise e.orig_exc # pragma: no cover - origexc.errinfo['text'] = self.text + origexc.set('text', self.text) return s_exc.BadSyntax(**origexc.errinfo) elif isinstance(e, lark.exceptions.UnexpectedCharacters): # pragma: no cover diff --git a/synapse/lib/snap.py b/synapse/lib/snap.py index 511b3665b26..656f44a2525 100644 --- a/synapse/lib/snap.py +++ b/synapse/lib/snap.py @@ -363,9 +363,9 @@ async def _set(self, prop, valu, norminfo=None, ignore_ro=False): valu, norminfo = prop.type.norm(valu) except s_exc.BadTypeValu as e: oldm = e.errinfo.get('mesg') - e.errinfo['prop'] = prop.name - e.errinfo['form'] = prop.form.name - e.errinfo['mesg'] = f'Bad prop value {prop.full}={valu!r} : {oldm}' + e.update({'prop': prop.name, + 'form': prop.form.name, + 'mesg': f'Bad prop value {prop.full}={valu!r} : {oldm}'}) if self.ctx.snap.strict: raise e await self.ctx.snap.warn(e) @@ -493,7 +493,7 @@ async def _addNode(self, form, valu, props=None, norminfo=None): try: valu, norminfo = form.type.norm(valu) except s_exc.BadTypeValu as e: - e.errinfo['form'] = form.name + e.set('form', form.name) if self.snap.strict: raise e await self.snap.warn(f'addNode() BadTypeValu {form.name}={valu} {e}') return None diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index 0c51ab2946f..a80263db096 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -5344,6 +5344,12 @@ class ParallelCmd(Cmd): inet:ipv4#foo | parallel { $place = $lib.import(foobar).lookup(:latlong) [ :place=$place ] } NOTE: Storm variables set within the parallel query pipelines do not interact. + + NOTE: If there are inbound nodes to the parallel command, parallel pipelines will be created as each node + is processed, up to the number specified by --size. If the number of nodes in the pipeline is less + than the value specified by --size, additional pipelines with no inbound node will not be created. + If there are no inbound nodes to the parallel command, the number of pipelines specified by --size + will always be created. ''' name = 'parallel' readonly = True @@ -5400,19 +5406,33 @@ async def execStormCmd(self, runt, genr): inq = asyncio.Queue(maxsize=size) outq = asyncio.Queue(maxsize=size) - async def pump(): - try: - async for pumpitem in genr: - await inq.put(pumpitem) - [await inq.put(None) for i in range(size)] - except asyncio.CancelledError: # pragma: no cover - raise - except Exception as e: - await outq.put(e) - - base.schedCoro(pump()) - for i in range(size): - base.schedCoro(self.pipeline(runt, query, inq, outq)) + tsks = 0 + try: + while tsks < size: + await inq.put(await genr.__anext__()) + base.schedCoro(self.pipeline(runt, query, inq, outq)) + tsks += 1 + except StopAsyncIteration: + [await inq.put(None) for i in range(tsks)] + + # If a full set of tasks were created, keep pumping nodes into the queue + if tsks == size: + async def pump(): + try: + async for pumpitem in genr: + await inq.put(pumpitem) + [await inq.put(None) for i in range(size)] + except Exception as e: + await outq.put(e) + + base.schedCoro(pump()) + + # If no tasks were created, make a full set + elif tsks == 0: + tsks = size + for i in range(size): + base.schedCoro(self.pipeline(runt, query, inq, outq)) + [await inq.put(None) for i in range(tsks)] exited = 0 while True: @@ -5423,7 +5443,7 @@ async def pump(): if item is None: exited += 1 - if exited == size: + if exited == tsks: return continue @@ -5566,9 +5586,6 @@ async def pipeline(self, runt, outq, genr=None): await outq.put(None) - except asyncio.CancelledError: # pragma: no cover - raise - except Exception as e: await outq.put(e) diff --git a/synapse/lib/stormtypes.py b/synapse/lib/stormtypes.py index e9435f0bb99..d8987e05650 100644 --- a/synapse/lib/stormtypes.py +++ b/synapse/lib/stormtypes.py @@ -201,11 +201,29 @@ def registerLib(self, ctor): raise Exception('no key!') self.addStormLib(path, ctor) + for info in ctor._storm_locals: + rtype = info.get('type') + if isinstance(rtype, dict) and rtype.get('type') == 'function': + if (fname := rtype.get('_funcname')) == '_storm_query': + continue + + if (func := getattr(ctor, fname, None)) is not None: + funcpath = '.'.join(('lib',) + ctor._storm_lib_path + (info['name'],)) + func._storm_funcpath = f"${funcpath}" + return ctor def registerType(self, ctor): '''Decorator to register a StormPrim''' self.addStormType(ctor.__name__, ctor) + + for info in ctor._storm_locals: + rtype = info.get('type') + if isinstance(rtype, dict) and rtype.get('type') == 'function': + fname = rtype.get('_funcname') + if (func := getattr(ctor, fname, None)) is not None: + func._storm_funcpath = f"{ctor._storm_typename}.{info['name']}" + return ctor def iterLibs(self): @@ -628,6 +646,7 @@ async def initLibAsync(self): if callable(v) and v.__name__ == 'realfunc': v._storm_runtime_lib = self v._storm_runtime_lib_func = k + v._storm_funcpath = f'${".".join(("lib",) + self.name + (k,))}' self.locls[k] = v diff --git a/synapse/models/inet.py b/synapse/models/inet.py index 3641daa5ddf..7481eed8fbf 100644 --- a/synapse/models/inet.py +++ b/synapse/models/inet.py @@ -1732,6 +1732,9 @@ def getModelDefs(self): 'template': {'service:base': 'object'}, 'props': ( + ('url', ('inet:url', {}), { + 'doc': 'The primary URL associated with the {service:base}.'}), + ('status', ('inet:service:object:status', {}), { 'doc': 'The status of the {service:base}.'}), @@ -1815,6 +1818,9 @@ def getModelDefs(self): ('inet:email:message', {}, ( + ('id', ('str', {'strip': True}), { + 'doc': 'The ID parsed from the "message-id" header.'}), + ('to', ('inet:email', {}), { 'doc': 'The email address of the recipient.'}), @@ -2175,6 +2181,9 @@ def getModelDefs(self): ('dst:ssh:key', ('crypto:key', {}), { 'doc': 'The key sent by the server as part of an SSH session setup.'}), + ('capture:host', ('it:host', {}), { + 'doc': 'The host which captured the flow.'}), + ('raw', ('data', {}), { 'doc': 'A raw record used to create the flow which may contain additional protocol details.'}), )), @@ -2198,6 +2207,9 @@ def getModelDefs(self): ('host', ('it:host', {}), { 'doc': 'The host that used the network egress.'}), + ('host:iface', ('inet:iface', {}), { + 'doc': 'The interface which the host used to connect out via the egress.'}), + ('account', ('inet:service:account', {}), { 'doc': 'The service account which used the client address to egress.'}), diff --git a/synapse/models/infotech.py b/synapse/models/infotech.py index b1a00c02a29..e0e44c2b552 100644 --- a/synapse/models/infotech.py +++ b/synapse/models/infotech.py @@ -963,12 +963,12 @@ def getModelDefs(self): }), ('it:exec:pipe', ('guid', {}), { 'interfaces': ('it:host:activity',), - 'doc': 'A named pipe created by a process at runtime.', - }), + 'doc': 'A named pipe created by a process at runtime.'}), + ('it:exec:url', ('guid', {}), { 'interfaces': ('it:host:activity',), - 'doc': 'An instance of a host requesting a URL.', - }), + 'doc': 'An instance of a host requesting a URL using any protocol scheme.'}), + ('it:exec:bind', ('guid', {}), { 'interfaces': ('it:host:activity',), 'doc': 'An instance of a host binding a listening port.', diff --git a/synapse/models/risk.py b/synapse/models/risk.py index 6fe3992fd45..347b44420a3 100644 --- a/synapse/models/risk.py +++ b/synapse/models/risk.py @@ -242,9 +242,18 @@ def getModelDefs(self): (('risk:mitigation', 'uses', 'inet:service:rule'), { 'doc': 'The mitigation uses the service rule.'}), + (('risk:mitigation', 'uses', 'it:prod:softver'), { + 'doc': 'The mitigation uses the software version.'}), + + (('risk:mitigation', 'uses', 'it:prod:hardware'), { + 'doc': 'The mitigation uses the hardware.'}), + (('risk:leak', 'leaked', None), { 'doc': 'The leak included the disclosure of the target node.'}), + (('risk:leak', 'enabled', 'risk:leak'), { + 'doc': 'The source leak enabled the target leak to occur.'}), + (('risk:extortion', 'leveraged', None), { 'doc': 'The extortion event was based on attacker access to the target node.'}), @@ -407,10 +416,12 @@ def getModelDefs(self): 'doc': 'A description of the mitigation approach for the vulnerability.'}), ('software', ('it:prod:softver', {}), { - 'doc': 'A software version which implements a fix for the vulnerability.'}), + 'deprecated': True, + 'doc': 'Deprecated. Please use risk:mitigation -(uses)> it:prod:softver.'}), ('hardware', ('it:prod:hardware', {}), { - 'doc': 'A hardware version which implements a fix for the vulnerability.'}), + 'deprecated': True, + 'doc': 'Deprecated. Please use risk:mitigation -(uses)> it:prod:hardware.'}), ('reporter', ('ou:org', {}), { 'doc': 'The organization reporting on the mitigation.'}), @@ -1034,6 +1045,9 @@ def getModelDefs(self): ('leaker', ('ps:contact', {}), { 'doc': 'The identity which leaked the information.'}), + ('recipient', ('ps:contact', {}), { + 'doc': 'The identity which received the leaked information.'}), + ('type', ('risk:leak:type:taxonomy', {}), { 'doc': 'A type taxonomy for the leak.'}), diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index 21ec021e270..8729ee88a45 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -484,7 +484,7 @@ async def test_cortex_stormiface(self): self.len(0, mods) self.len(0, core.modsbyiface.get('lookup')) - await core.loadStormPkg(pkgdef) + core.loadStormPkg(pkgdef) mods = await core.getStormIfaces('lookup') self.len(1, mods) @@ -513,7 +513,7 @@ async def test_cortex_stormiface(self): vals = [r async for r in core.view.callStormIface('boom', todo)] self.eq((), vals) - await core._dropStormPkg(pkgdef) + core._dropStormPkg(pkgdef) self.none(core.modsbyiface.get('lookup')) mods = await core.getStormIfaces('lookup') @@ -558,7 +558,7 @@ async def test_cortex_lookup_search_dedup(self): nodes = await core.nodes('foo@bar.com foo@bar.com', opts={'mode': 'lookup'}) self.eq(['inet:email', 'inet:email'], [n.ndef[0] for n in nodes]) - await core.loadStormPkg(pkgdef) + core.loadStormPkg(pkgdef) self.len(1, await core.getStormIfaces('search')) todo = s_common.todo('search', ('foo@bar.com',)) diff --git a/synapse/tests/test_exc.py b/synapse/tests/test_exc.py index 0428303000d..604955ba3de 100644 --- a/synapse/tests/test_exc.py +++ b/synapse/tests/test_exc.py @@ -27,6 +27,9 @@ def test_basic(self): e.setdefault('defv', 2) self.eq("SynErr: defv=1 foo='words' hehe=1234 mesg='words'", str(e)) + e.update({'foo': 'newwords', 'bar': 'baz'}) + self.eq("SynErr: bar='baz' defv=1 foo='newwords' hehe=1234 mesg='words'", str(e)) + self.eq(e.errname, 'SynErr') e2 = s_exc.BadTypeValu(mesg='haha') diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index ed5934c77ae..285b793610e 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -362,7 +362,9 @@ def looptime(): appt = await agenda.get(guid) self.eq(appt.isrunning, False) - self.eq(appt.lastresult, "raised exception StormRaise: errname='OmgWtfBbq' mesg='boom'") + self.isin("raised exception StormRaise: errname='OmgWtfBbq'", appt.lastresult) + self.isin("highlight={'hash': '6736b8252d9413221a9b693b2b19cf53'", appt.lastresult) + self.isin("mesg='boom'", appt.lastresult) # Test setting the global enable/disable flag await agenda.delete(guid) diff --git a/synapse/tests/test_lib_ast.py b/synapse/tests/test_lib_ast.py index d7dee25874d..50673db478a 100644 --- a/synapse/tests/test_lib_ast.py +++ b/synapse/tests/test_lib_ast.py @@ -131,7 +131,7 @@ async def test_mode_search(self): self.stormIsInWarn('Storm search interface is not enabled!', msgs) async with self.getTestCore() as core: - await core.loadStormPkg({ + core.loadStormPkg({ 'name': 'testsearch', 'modules': [ {'name': 'testsearch', 'interfaces': ['search'], 'storm': ''' @@ -3119,6 +3119,48 @@ async def test_ast_highlight(self): off, end = errm[1][1]['highlight']['offsets'] self.eq('newp', text[off:end]) + visi = (await core.addUser('visi'))['iden'] + text = '$users=$lib.auth.users.list() $lib.print($users.0.profile)' + msgs = await core.stormlist(text, opts={'user': visi}) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('lib.print($users.0.profile)', text[off:end]) + + text = '$lib.len(foo, bar)' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('lib.len(foo, bar)', text[off:end]) + self.stormIsInErr('$lib.len()', msgs) + + text = '$foo=$lib.pkg.get $foo()' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('foo()', text[off:end]) + self.stormIsInErr('$lib.pkg.get()', msgs) + + text = '$obj = $lib.pipe.gen(${ $obj.put() }) $obj.put(foo, bar, baz)' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('obj.put(foo, bar, baz)', text[off:end]) + self.stormIsInErr('pipe.put()', msgs) + + text = '$lib.gen.campaign(foo, bar, baz)' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('lib.gen.campaign(foo, bar, baz)', text[off:end]) + self.stormIsInErr('$lib.gen.campaign()', msgs) + + text = '$gen = $lib.gen.campaign $gen(foo, bar, baz)' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('gen(foo, bar, baz)', text[off:end]) + self.stormIsInErr('$lib.gen.campaign()', msgs) + async def test_ast_bulkedges(self): async with self.getTestCore() as core: diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index d088118b424..a103b592d71 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -1433,6 +1433,11 @@ def diffdev(dirn): with mock.patch('os.stat', diffdev): await self.asyncraises(s_exc.LowSpace, proxy.runBackup()) + user = await core.auth.getUserByName('root') + with self.raises(s_exc.SynErr) as cm: + await core.iterNewBackupArchive(user) + self.isin('This API must be called via a CellApi', cm.exception.get('mesg')) + async def err(*args, **kwargs): raise RuntimeError('boom') @@ -2298,11 +2303,13 @@ async def test_backup_restore_double_promote_aha(self): # Backup the mirror (core01) which points to the core00 async with await axon00.upload() as upfd: async with core01.getLocalProxy() as prox: + tot_chunks = 0 async for chunk in prox.iterNewBackupArchive(): await upfd.write(chunk) + tot_chunks += len(chunk) size, sha256 = await upfd.save() - await asyncio.sleep(0) + self.eq(size, tot_chunks) furl = f'{url}{s_common.ehex(sha256)}' purl = await aha.addAhaSvcProv('00.mynewcortex') @@ -3287,3 +3294,66 @@ async def test_lib_cell_sadaha(self): # coverage for failure of aha client to connect with self.raises(TimeoutError): self.none(await cell.getAhaProxy(timeout=0.1)) + + async def test_stream_backup_exception(self): + + with self.getTestDir() as dirn: + backdirn = os.path.join(dirn, 'backups') + coredirn = os.path.join(dirn, 'cortex') + + conf = {'backup:dir': backdirn} + s_common.yamlsave(conf, coredirn, 'cell.yaml') + + async with self.getTestCore(dirn=coredirn) as core: + async with core.getLocalProxy() as proxy: + + await proxy.runBackup(name='bkup') + + mock_proc = mock.Mock() + mock_proc.join = mock.Mock() + + async def mock_executor(func, *args, **kwargs): + if isinstance(func, mock.Mock) and func is mock_proc.join: + raise Exception('boom') + return mock_proc + + with mock.patch('synapse.lib.cell.s_coro.executor', mock_executor): + with self.getAsyncLoggerStream('synapse.lib.cell', 'Error during backup streaming') as stream: + with self.raises(Exception) as cm: + async for _ in proxy.iterBackupArchive('bkup'): + pass + self.true(await stream.wait(timeout=6)) + + async def test_iter_new_backup_archive(self): + + with self.getTestDir() as dirn: + backdirn = os.path.join(dirn, 'backups') + coredirn = os.path.join(dirn, 'cortex') + + conf = {'backup:dir': backdirn} + s_common.yamlsave(conf, coredirn, 'cell.yaml') + + async with self.getTestCore(dirn=coredirn) as core: + async with core.getLocalProxy() as proxy: + + async def mock_runBackup(*args, **kwargs): + raise Exception('backup failed') + + with mock.patch.object(s_cell.Cell, 'runBackup', mock_runBackup): + with self.getAsyncLoggerStream('synapse.lib.cell', 'Removing') as stream: + with self.raises(s_exc.SynErr) as cm: + async for _ in proxy.iterNewBackupArchive('failedbackup', remove=True): + pass + + self.isin('backup failed', str(cm.exception)) + self.true(await stream.wait(timeout=6)) + + path = os.path.join(backdirn, 'failedbackup') + self.false(os.path.exists(path)) + + self.false(core.backupstreaming) + + core.backupstreaming = True + with self.raises(s_exc.BackupAlreadyRunning): + async for _ in proxy.iterNewBackupArchive('newbackup', remove=True): + pass diff --git a/synapse/tests/test_lib_storm.py b/synapse/tests/test_lib_storm.py index 04ab8bd271a..8629226368d 100644 --- a/synapse/tests/test_lib_storm.py +++ b/synapse/tests/test_lib_storm.py @@ -4,6 +4,7 @@ import datetime import itertools import urllib.parse as u_parse +import unittest.mock as mock import synapse.exc as s_exc import synapse.common as s_common @@ -646,32 +647,6 @@ async def test_lib_storm_basics(self): self.none(task['info'].get('opts')) self.eq(core.view.iden, task['info'].get('view')) - # test the parallel command - nodes = await core.nodes('parallel --size 4 { [ ou:org=* ] }') - self.len(4, nodes) - - # check that subquery validation happens - with self.raises(s_exc.NoSuchVar): - await core.nodes('parallel --size 4 { [ ou:org=$foo ] }') - - # check that an exception on inbound percolates correctly - with self.raises(s_exc.BadTypeValu): - await core.nodes('[ ou:org=* ou:org=foo ] | parallel { [:name=bar] }') - - # check that an exception in the parallel pipeline percolates correctly - with self.raises(s_exc.BadTypeValu): - await core.nodes('parallel { [ou:org=foo] }') - - nodes = await core.nodes('ou:org | parallel {[ :name=foo ]}') - self.true(all([n.get('name') == 'foo' for n in nodes])) - - # Runtsafety test - q = '[ inet:fqdn=www.vertex.link ] $q=:domain | parallel $q' - await self.asyncraises(s_exc.StormRuntimeError, core.nodes(q)) - - nodes = await core.nodes('ou:org | parallel ${ $foo=bar [ :name=$foo ]}') - self.true(all([n.get('name') == 'bar' for n in nodes])) - # test $lib.exit() and the StormExit handlers msgs = [m async for m in core.view.storm('$lib.exit()')] self.eq(msgs[-1][0], 'fini') @@ -789,10 +764,10 @@ async def test_lib_storm_basics(self): }, ) } - await core.loadStormPkg(emptypkg) + core.loadStormPkg(emptypkg) await core.addStormPkg(strverpkg) - await core.loadStormPkg(pkg0) + core.loadStormPkg(pkg0) await core.nodes('$lib.import(foo.baz)', opts=opts) await core.nodes('$lib.import(foo.baz, reqvers="==0.0.1")', opts=opts) @@ -3437,6 +3412,73 @@ async def test_storm_tee(self): q = '[ inet:fqdn=www.vertex.link ] $q=:domain | tee $q' await self.asyncraises(s_exc.StormRuntimeError, core.nodes(q)) + async def test_storm_parallel(self): + + async with self.getTestCore() as core: + + nodes = await core.nodes('parallel --size 4 { [ ou:org=* ] }') + self.len(4, nodes) + + # check that subquery validation happens + with self.raises(s_exc.NoSuchVar): + await core.nodes('parallel --size 4 { [ ou:org=$foo ] }') + + # check that an exception on inbound percolates correctly + with self.raises(s_exc.BadTypeValu): + await core.nodes('[ ou:org=(foo,) ou:org=foo ] | parallel { [:name=bar] }') + + with self.raises(s_exc.BadTypeValu): + await core.nodes('[ ou:org=(foo,) ou:org=foo ] | parallel --size 1 { [:name=bar] }') + + # check that an exception in the parallel pipeline percolates correctly + with self.raises(s_exc.BadTypeValu): + await core.nodes('parallel { [ou:org=foo] }') + + nodes = await core.nodes('ou:org | parallel {[ :name=foo ]}') + self.true(all([n.get('name') == 'foo' for n in nodes])) + + # Runtsafety test + q = '[ inet:fqdn=www.vertex.link ] $q=:domain | parallel $q' + await self.asyncraises(s_exc.StormRuntimeError, core.nodes(q)) + + nodes = await core.nodes('ou:org | parallel ${ $foo=bar [ :name=$foo ]}') + self.true(all([n.get('name') == 'bar' for n in nodes])) + + orig = s_storm.ParallelCmd.pipeline + tsks = {'cnt': 0} + + async def pipecnt(self, runt, query, inq, outq): + tsks['cnt'] += 1 + await orig(self, runt, query, inq, outq) + + with mock.patch('synapse.lib.storm.ParallelCmd.pipeline', pipecnt): + + nodes = await core.nodes('ou:org parallel --size 4 {[ :name=bar ]}') + self.len(5, nodes) + self.true(all([n.get('name') == 'bar' for n in nodes])) + self.eq(4, tsks['cnt']) + + tsks['cnt'] = 0 + + nodes = await core.nodes('ou:org parallel --size 5 {[ :name=bar ]}') + self.len(5, nodes) + self.true(all([n.get('name') == 'bar' for n in nodes])) + self.eq(5, tsks['cnt']) + + tsks['cnt'] = 0 + + # --size greater than number of nodes only creates a pipeline for each node + nodes = await core.nodes('ou:org parallel --size 10 {[ :name=foo ]}') + self.len(5, nodes) + self.true(all([n.get('name') == 'foo' for n in nodes])) + self.eq(5, tsks['cnt']) + + tsks['cnt'] = 0 + + nodes = await core.nodes('parallel --size 4 {[ ou:org=* ]}') + self.len(4, nodes) + self.eq(4, tsks['cnt']) + async def test_storm_yieldvalu(self): async with self.getTestCore() as core: @@ -3882,7 +3924,7 @@ async def test_storm_cmd_help(self): )}, ), } - await core.loadStormPkg(pdef) + core.loadStormPkg(pdef) msgs = await core.stormlist('woot --help') helptext = '\n'.join([m[1].get('mesg') for m in msgs if m[0] == 'print']) self.isin('Inputs:\n\n hehe:haha\n hoho:lol - We know whats up', helptext) @@ -4656,7 +4698,7 @@ async def test_storm_tagprune(self): async def test_storm_cmdscope(self): async with self.getTestCore() as core: - await core.loadStormPkg({ + core.loadStormPkg({ 'name': 'testpkg', 'version': '0.0.1', 'commands': ( diff --git a/synapse/tests/test_lib_stormlib_scrape.py b/synapse/tests/test_lib_stormlib_scrape.py index 63ea4077b3a..b41f758dbfd 100644 --- a/synapse/tests/test_lib_stormlib_scrape.py +++ b/synapse/tests/test_lib_stormlib_scrape.py @@ -92,7 +92,7 @@ async def test_storm_lib_scrape_iface(self): self.len(0, mods) self.len(0, core.modsbyiface.get('scrape')) - await core.loadStormPkg(pkgdef) + core.loadStormPkg(pkgdef) mods = await core.getStormIfaces('scrape') self.len(2, mods) @@ -131,7 +131,7 @@ async def test_storm_lib_scrape_iface(self): conf = {'storm:interface:scrape': False, } async with self.getTestCore(conf=conf) as core: - await core.loadStormPkg(pkgdef) + core.loadStormPkg(pkgdef) mods = await core.getStormIfaces('scrape') self.len(2, mods) diff --git a/synapse/tests/test_model_inet.py b/synapse/tests/test_model_inet.py index 5709d2ccf58..f565635ec3d 100644 --- a/synapse/tests/test_model_inet.py +++ b/synapse/tests/test_model_inet.py @@ -480,6 +480,7 @@ async def test_flow(self): :raw=((10), (20)) :src:txfiles={[ file:attachment=* :name=foo.exe ]} :dst:txfiles={[ file:attachment=* :name=bar.exe ]} + :capture:host=* )]''' nodes = await core.nodes(q, opts={'vars': {'valu': valu, 'p': props}}) self.len(1, nodes) @@ -523,11 +524,13 @@ async def test_flow(self): self.eq(node.get('src:rdp:hostname'), 'syncoder') self.eq(node.get('src:rdp:keyboard:layout'), 'azerty') self.eq(node.get('raw'), (10, 20)) + self.nn(node.get('capture:host')) self.len(2, await core.nodes('inet:flow -> crypto:x509:cert')) self.len(1, await core.nodes('inet:flow :src:ssh:key -> crypto:key')) self.len(1, await core.nodes('inet:flow :dst:ssh:key -> crypto:key')) self.len(1, await core.nodes('inet:flow :src:txfiles -> file:attachment +:name=foo.exe')) self.len(1, await core.nodes('inet:flow :dst:txfiles -> file:attachment +:name=bar.exe')) + self.len(1, await core.nodes('inet:flow :capture:host -> it:host')) async def test_fqdn(self): formname = 'inet:fqdn' @@ -2769,6 +2772,7 @@ async def test_model_inet_email_message(self): q = ''' [ inet:email:message="*" + :id="Woot-12345 " :to=woot@woot.com :from=visi@vertex.link :replyto=root@root.com @@ -2790,6 +2794,7 @@ async def test_model_inet_email_message(self): nodes = await core.nodes(q, opts={'vars': {'flow': flow}}) self.len(1, nodes) + self.eq(nodes[0].get('id'), 'Woot-12345') self.eq(nodes[0].get('cc'), ('baz@faz.org', 'foo@bar.com')) self.eq(nodes[0].get('received:from:ipv6'), '::1') self.eq(nodes[0].get('received:from:ipv4'), 0x01020304) @@ -2870,6 +2875,7 @@ async def test_model_inet_egress(self): nodes = await core.nodes(''' [ inet:egress=* :host = * + :host:iface = * :client=1.2.3.4 :client:ipv6="::1" ] @@ -2877,10 +2883,14 @@ async def test_model_inet_egress(self): self.len(1, nodes) self.nn(nodes[0].get('host')) + self.nn(nodes[0].get('host:iface')) self.eq(nodes[0].get('client'), 'tcp://1.2.3.4') self.eq(nodes[0].get('client:ipv4'), 0x01020304) self.eq(nodes[0].get('client:ipv6'), '::1') + self.len(1, await core.nodes('inet:egress -> it:host')) + self.len(1, await core.nodes('inet:egress -> inet:iface')) + async def test_model_inet_tls_handshake(self): async with self.getTestCore() as core: @@ -2999,6 +3009,7 @@ async def test_model_inet_service(self): (inet:service:account=(blackout, account, vertex, slack) :id=U7RN51U1J :user=blackout + :url=https://vertex.link/users/blackout :email=blackout@vertex.link :profile={ gen.ps.contact.email vertex.employee blackout@vertex.link } :tenant={[ inet:service:tenant=({"id": "VS-31337"}) ]} @@ -3026,6 +3037,7 @@ async def test_model_inet_service(self): self.eq(accounts[0].ndef, ('inet:service:account', s_common.guid(('blackout', 'account', 'vertex', 'slack')))) self.eq(accounts[0].get('id'), 'U7RN51U1J') self.eq(accounts[0].get('user'), 'blackout') + self.eq(accounts[0].get('url'), 'https://vertex.link/users/blackout') self.eq(accounts[0].get('email'), 'blackout@vertex.link') self.eq(accounts[0].get('profile'), blckprof.ndef[1]) diff --git a/synapse/tests/test_model_risk.py b/synapse/tests/test_model_risk.py index 8dee9732596..3a1d0baf521 100644 --- a/synapse/tests/test_model_risk.py +++ b/synapse/tests/test_model_risk.py @@ -430,6 +430,7 @@ async def addNode(text): :disclosed=20231102 :owner={ gen.ou.org.hq acme } :leaker={ gen.ou.org.hq wikileaks } + :recipient={ gen.ou.org.hq everyone } :type=public :goal={[ ou:goal=* :name=publicity ]} :compromise={[ risk:compromise=* :target={ gen.ou.org.hq acme } ]} @@ -458,6 +459,7 @@ async def addNode(text): self.len(1, await core.nodes('risk:leak -> risk:leak:type:taxonomy')) self.len(1, await core.nodes('risk:leak :owner -> ps:contact +:orgname=acme')) self.len(1, await core.nodes('risk:leak :leaker -> ps:contact +:orgname=wikileaks')) + self.len(1, await core.nodes('risk:leak :recipient -> ps:contact +:orgname=everyone')) self.len(1, await core.nodes('risk:leak -> ou:goal +:name=publicity')) self.len(1, await core.nodes('risk:leak -> risk:compromise :target -> ps:contact +:orgname=acme')) self.len(1, await core.nodes('risk:leak :reporter -> ou:org +:name=vertex')) diff --git a/synapse/tests/test_tools_storm.py b/synapse/tests/test_tools_storm.py index 92464d94ae6..a93713a19be 100644 --- a/synapse/tests/test_tools_storm.py +++ b/synapse/tests/test_tools_storm.py @@ -1,4 +1,9 @@ import os +import sys +import signal +import asyncio +import multiprocessing + import synapse.tests.utils as s_test from prompt_toolkit.document import Document @@ -6,10 +11,49 @@ import synapse.exc as s_exc import synapse.common as s_common +import synapse.telepath as s_telepath + +import synapse.lib.coro as s_coro import synapse.lib.output as s_output import synapse.lib.msgpack as s_msgpack import synapse.tools.storm as s_t_storm +def run_cli_till_print(url, evt1): + ''' + Run the stormCLI until we get a print mesg then set the event. + + This is a Process target. + ''' + async def main(): + outp = s_output.OutPutStr() # Capture output instead of sending it to stdout + async with await s_telepath.openurl(url) as proxy: + async with await s_t_storm.StormCli.anit(proxy, outp=outp) as scli: + cmdqueue = asyncio.Queue() + await cmdqueue.put('while (true) { $lib.print(go) $lib.time.sleep(1) }') + await cmdqueue.put('!quit') + + async def fake_prompt(): + return await cmdqueue.get() + + scli.prompt = fake_prompt + + d = {'evt1': False} + async def onmesg(event): + if d.get('evt1'): + return + mesg = event[1].get('mesg') + if mesg[0] != 'print': + return + evt1.set() + d['evt1'] = True + + with scli.onWith('storm:mesg', onmesg): + await scli.addSignalHandlers() + await scli.runCmdLoop() + + asyncio.run(main()) + sys.exit(137) + class StormCliTest(s_test.SynTest): async def test_tools_storm(self): @@ -378,3 +422,54 @@ async def get_completions(text): ), vals ) + + async def test_storm_cmdloop_interrupt(self): + ''' + Test interrupting a long-running query in the command loop + ''' + async with self.getTestCore() as core: + + async with core.getLocalProxy() as proxy: + + outp = s_test.TstOutPut() + async with await s_t_storm.StormCli.anit(proxy, outp=outp) as scli: + + cmdqueue = asyncio.Queue() + await cmdqueue.put('while (true) { $lib.time.sleep(1) }') + await cmdqueue.put('!quit') + + async def fake_prompt(): + return await cmdqueue.get() + scli.prompt = fake_prompt + + cmdloop_task = asyncio.create_task(scli.runCmdLoop()) + await asyncio.sleep(0.1) + + if scli.cmdtask is not None: + scli.cmdtask.cancel() + + await cmdloop_task + + outp.expect('') + outp.expect('o/') + self.true(scli.isfini) + + async def test_storm_cmdloop_sigint(self): + ''' + Test interrupting a long-running query in the command loop with a process target and SIGINT. + ''' + + async with self.getTestCore() as core: + url = core.getLocalUrl() + + ctx = multiprocessing.get_context('spawn') + + evt1 = ctx.Event() + + proc = ctx.Process(target=run_cli_till_print, args=(url, evt1,)) + proc.start() + + self.true(await s_coro.executor(evt1.wait, timeout=30)) + os.kill(proc.pid, signal.SIGINT) + proc.join(timeout=30) + self.eq(proc.exitcode, 137) diff --git a/synapse/tests/test_utils_getrefs.py b/synapse/tests/test_utils_getrefs.py index 6bb1c0ec6bd..246467bbea9 100644 --- a/synapse/tests/test_utils_getrefs.py +++ b/synapse/tests/test_utils_getrefs.py @@ -24,7 +24,7 @@ def getVcr(self): cm = myvcr.use_cassette(fp) return cm - async def test_basics(self): + def test_basics(self): args = s_getrefs.parse_args([ s_data.path('attack-flow', 'attack-flow-schema-2.0.0.json') diff --git a/synapse/utils/getrefs.py b/synapse/utils/getrefs.py index 44619cb7457..a5ea5a5742b 100644 --- a/synapse/utils/getrefs.py +++ b/synapse/utils/getrefs.py @@ -1,11 +1,12 @@ import sys import json import urllib +import asyncio import logging import pathlib import argparse -import requests +import aiohttp import synapse.exc as s_exc import synapse.data as s_data @@ -20,7 +21,13 @@ def download_refs_handler(uri): This function downloads the JSON schema at the given URI, parses the given URI to get the path component, and then saves the referenced schema to the 'jsonschemas' directory of synapse.data. + + This function runs its own asyncio loop for each URI being requested. ''' + ret = asyncio.run(_download_refs_handler(uri)) + return ret + +async def _download_refs_handler(uri): try: parts = urllib.parse.urlparse(uri) @@ -45,8 +52,12 @@ def download_refs_handler(uri): # Get the data from the interwebs logger.info(f'Downloading schema from {uri}.') - resp = requests.get(uri) - data = resp.json() + async with aiohttp.ClientSession() as session: + async with session.get(uri) as resp: + resp.raise_for_status() + buf = await resp.read() + + data = json.loads(buf.decode()) # Save the json schema to disk with filepath.open('w') as fp: diff --git a/synapse/vendor/cpython/lib/http/__init__.py b/synapse/vendor/cpython/lib/http/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/synapse/vendor/cpython/lib/http/cookies.py b/synapse/vendor/cpython/lib/http/cookies.py new file mode 100644 index 00000000000..93243453dc4 --- /dev/null +++ b/synapse/vendor/cpython/lib/http/cookies.py @@ -0,0 +1,59 @@ +############################################################################## +# Taken from the cpython 3.11 source branch after the 3.11.10 release. +############################################################################## +#### +# Copyright 2000 by Timothy O'Malley +# +# All Rights Reserved +# +# Permission to use, copy, modify, and distribute this software +# and its documentation for any purpose and without fee is hereby +# granted, provided that the above copyright notice appear in all +# copies and that both that copyright notice and this permission +# notice appear in supporting documentation, and that the name of +# Timothy O'Malley not be used in advertising or publicity +# pertaining to distribution of the software without specific, written +# prior permission. +# +# Timothy O'Malley DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS +# SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL Timothy O'Malley BE LIABLE FOR +# ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +# PERFORMANCE OF THIS SOFTWARE. +# + +# +# Import our required modules +# +import re + +_unquote_sub = re.compile(r'\\(?:([0-3][0-7][0-7])|(.))').sub + +def _unquote_replace(m): + if m[1]: + return chr(int(m[1], 8)) + else: + return m[2] + +def _unquote(str): + # If there aren't any doublequotes, + # then there can't be any special characters. See RFC 2109. + if str is None or len(str) < 2: + return str + if str[0] != '"' or str[-1] != '"': + return str + + # We have to assume that we must decode this string. + # Down to work. + + # Remove the "s + str = str[1:-1] + + # Check for special sequences. Examples: + # \012 --> \n + # \" --> " + # + return _unquote_sub(_unquote_replace, str) diff --git a/synapse/vendor/cpython/lib/test/test_http_cookies.py b/synapse/vendor/cpython/lib/test/test_http_cookies.py new file mode 100644 index 00000000000..02d746f2c0e --- /dev/null +++ b/synapse/vendor/cpython/lib/test/test_http_cookies.py @@ -0,0 +1,49 @@ +############################################################################## +# Taken from the cpython 3.11 source branch after the 3.11.10 release. +# It has been modified for vendored imports and vendored test harness. +############################################################################## + +# Simple test suite for http/cookies.py + +from http import cookies + +# s_v_utils runs the monkeypatch +import synapse.vendor.utils as s_v_utils + +class CookieTests(s_v_utils.VendorTest): + + def test_unquote(self): + cases = [ + (r'a="b=\""', 'b="'), + (r'a="b=\\"', 'b=\\'), + (r'a="b=\="', 'b=='), + (r'a="b=\n"', 'b=n'), + (r'a="b=\042"', 'b="'), + (r'a="b=\134"', 'b=\\'), + (r'a="b=\377"', 'b=\xff'), + (r'a="b=\400"', 'b=400'), + (r'a="b=\42"', 'b=42'), + (r'a="b=\\042"', 'b=\\042'), + (r'a="b=\\134"', 'b=\\134'), + (r'a="b=\\\""', 'b=\\"'), + (r'a="b=\\\042"', 'b=\\"'), + (r'a="b=\134\""', 'b=\\"'), + (r'a="b=\134\042"', 'b=\\"'), + ] + for encoded, decoded in cases: + with self.subTest(encoded): + C = cookies.SimpleCookie() + C.load(encoded) + self.assertEqual(C['a'].value, decoded) + + def test_unquote_large(self): + n = 10**6 + for encoded in r'\\', r'\134': + with self.subTest(encoded): + data = 'a="b=' + encoded * n + ';"' + C = cookies.SimpleCookie() + C.load(data) + value = C['a'].value + self.assertEqual(value[:3], 'b=\\') + self.assertEqual(value[-2:], '\\;') + self.assertEqual(len(value), n + 3)