diff --git a/CHANGELOG.rst b/CHANGELOG.rst index a75b3387ed0..e324c23b23e 100644 --- a/CHANGELOG.rst +++ b/CHANGELOG.rst @@ -6,6 +6,45 @@ Synapse Changelog ***************** +v2.192.0 - 2024-12-13 +===================== + +Features and Enhancements +------------------------- +- Added the user-agent string to the structured log information captured by the + HTTP API handlers. + (`#4026 `_) +- Added support for passing ``$lib.true`` to Storm HTTP APIs that accept a + proxy argument to indicate that the configured proxy should be used if set. + (`#4030 `_) +- Added support for passing ``True`` as a proxy argument to the ``wget``, + ``wput``, and ``postfiles`` Axon APIs to indicate that the configured proxy + should be used if set. + (`#4030 `_) +- Added ``synapse.tools.apikey`` tool for managing user API keys via telepath. + (`#4032 `_) + +Bugfixes +-------- +- Fixed an issue where mirrors of Synapse services may fail to indicate that + they have entered into realtime change windows. + (`#4028 `_) +- Fixed a bug that skipped global and form pivots when validating graph + projection Storm queries. + (`#4031 `_) +- Fixed an issue where line number information was not added to exceptions + raised while dereferencing a Storm variable. + (`#4035 `_) + +Deprecations +------------ +- Deprecated passing ``None`` as a proxy argument to the ``wget``, ``wput``, + and ``postfiles`` Axon APIs. + (`#4030 `_) +- Deprecated passing ``$lib.null`` to Storm HTTP APIs that accept a proxy + argument. + (`#4030 `_) + v2.191.0 - 2024-12-06 ===================== diff --git a/pyproject.toml b/pyproject.toml index 916fc19d82b..af8488b3674 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -36,7 +36,7 @@ dependencies = [ 'aiohttp-socks>=0.9.0,<0.10.0', 'aioimaplib>=1.1.0,<1.2.0', 'aiosmtplib>=3.0.0,<3.1.0', - 'prompt-toolkit>=3.0.4,<3.1.0', + 'prompt_toolkit>=3.0.29,<3.1.0', 'lark==1.2.2', 'Pygments>=2.7.4,<2.18.0', 'packaging>=20.0,<25.0', diff --git a/requirements.txt b/requirements.txt index 45b4f71eba4..bd52fb24e1f 100644 --- a/requirements.txt +++ b/requirements.txt @@ -9,7 +9,7 @@ aiohttp>=3.10.0,<4.0 aiohttp-socks>=0.9.0,<0.10.0 aioimaplib>=1.1.0,<1.2.0 aiosmtplib>=3.0.0,<3.1.0 -prompt-toolkit>=3.0.4,<3.1.0 +prompt_toolkit>=3.0.29,<3.1.0 lark==1.2.2 Pygments>=2.7.4,<2.18.0 fastjsonschema>=2.18.0,<2.20.0 diff --git a/synapse/common.py b/synapse/common.py index 4b4dff4e3c8..488087ab346 100644 --- a/synapse/common.py +++ b/synapse/common.py @@ -29,6 +29,8 @@ import contextlib import collections +import http.cookies + import yaml import regex @@ -38,6 +40,8 @@ import synapse.lib.structlog as s_structlog import synapse.vendor.cpython.lib.ipaddress as ipaddress +import synapse.vendor.cpython.lib.http.cookies as v_cookies + try: from yaml import CSafeLoader as Loader @@ -1195,6 +1199,17 @@ def trimText(text: str, n: int = 256, placeholder: str = '...') -> str: assert n > plen return f'{text[:mlen]}{placeholder}' +def _patch_http_cookies(): + ''' + Patch stdlib http.cookies._unquote from the 3.11.10 implementation if + the interpreter we are using is not patched for CVE-2024-7592. + ''' + if not hasattr(http.cookies, '_QuotePatt'): + return + http.cookies._unquote = v_cookies._unquote + +_patch_http_cookies() + # TODO: Switch back to using asyncio.wait_for when we are using py 3.12+ # This is a workaround for a race where asyncio.wait_for can end up # ignoring cancellation https://github.com/python/cpython/issues/86296 diff --git a/synapse/cortex.py b/synapse/cortex.py index b98b9513626..0fec12f0963 100644 --- a/synapse/cortex.py +++ b/synapse/cortex.py @@ -1496,7 +1496,7 @@ async def reqValidStormGraph(self, gdef): for filt in gdef.get('filters', ()): await self.getStormQuery(filt) - for pivo in gdef.get('filters', ()): + for pivo in gdef.get('pivots', ()): await self.getStormQuery(pivo) for form, rule in gdef.get('forms', {}).items(): @@ -1506,7 +1506,7 @@ async def reqValidStormGraph(self, gdef): for filt in rule.get('filters', ()): await self.getStormQuery(filt) - for pivo in rule.get('filters', ()): + for pivo in rule.get('pivots', ()): await self.getStormQuery(pivo) async def addStormGraph(self, gdef, user=None): @@ -2049,7 +2049,7 @@ async def _onSetStormCmd(self, cdef): ''' name = cdef.get('name') - await self._setStormCmd(cdef) + self._setStormCmd(cdef) self.cmddefs.set(name, cdef) async def _reqStormCmd(self, cdef): @@ -2060,7 +2060,7 @@ async def _reqStormCmd(self, cdef): await self.getStormQuery(cdef.get('storm')) - async def _setStormCmd(self, cdef): + def _setStormCmd(self, cdef): ''' Note: No change control or persistence @@ -2115,13 +2115,9 @@ def getRuntPode(): name = cdef.get('name') self.stormcmds[name] = ctor - await self.fire('core:cmd:change', cmd=name, act='add') - - async def _popStormCmd(self, name): + def _popStormCmd(self, name): self.stormcmds.pop(name, None) - await self.fire('core:cmd:change', cmd=name, act='del') - async def delStormCmd(self, name): ''' Remove a previously set pure storm command. @@ -2147,8 +2143,6 @@ async def _delStormCmd(self, name): self.cmddefs.pop(name) self.stormcmds.pop(name, None) - await self.fire('core:cmd:change', cmd=name, act='del') - async def addStormPkg(self, pkgdef, verify=False): ''' Add the given storm package to the cortex. @@ -2202,11 +2196,11 @@ async def _addStormPkg(self, pkgdef): olddef = self.pkgdefs.get(name, None) if olddef is not None: if s_hashitem.hashitem(pkgdef) != s_hashitem.hashitem(olddef): - await self._dropStormPkg(olddef) + self._dropStormPkg(olddef) else: return - await self.loadStormPkg(pkgdef) + self.loadStormPkg(pkgdef) self.pkgdefs.set(name, pkgdef) self._clearPermDefs() @@ -2236,7 +2230,7 @@ async def _delStormPkg(self, name): if pkgdef is None: return - await self._dropStormPkg(pkgdef) + self._dropStormPkg(pkgdef) self._clearPermDefs() @@ -2285,7 +2279,7 @@ def getDataModel(self): async def _tryLoadStormPkg(self, pkgdef): try: await self._normStormPkg(pkgdef, validstorm=False) - await self.loadStormPkg(pkgdef) + self.loadStormPkg(pkgdef) except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only raise @@ -2444,7 +2438,9 @@ async def _normStormPkg(self, pkgdef, validstorm=True): for configvar in pkgdef.get('configvars', ()): self._reqStormPkgVarType(pkgname, configvar.get('type')) - async def loadStormPkg(self, pkgdef): + # N.B. This function is intentionally not async in order to prevent possible user race conditions for code + # executing outside of the nexus lock. + def loadStormPkg(self, pkgdef): ''' Load a storm package into the storm library for this cortex. @@ -2474,7 +2470,7 @@ async def loadStormPkg(self, pkgdef): self.stormmods = stormmods for cdef in cmds: - await self._setStormCmd(cdef) + self._setStormCmd(cdef) for gdef in pkgdef.get('graphs', ()): gdef = copy.deepcopy(gdef) @@ -2500,7 +2496,9 @@ async def _onload(): await self.fire('core:pkg:onload:complete', pkg=name) self.schedCoro(_onload()) - async def _dropStormPkg(self, pkgdef): + # N.B. This function is intentionally not async in order to prevent possible user race conditions for code + # executing outside of the nexus lock. + def _dropStormPkg(self, pkgdef): ''' Reverse the process of loadStormPkg() ''' @@ -2511,7 +2509,7 @@ async def _dropStormPkg(self, pkgdef): for cdef in pkgdef.get('commands', ()): name = cdef.get('name') - await self._popStormCmd(name) + self._popStormCmd(name) pkgname = pkgdef.get('name') @@ -3939,7 +3937,7 @@ async def _initPureStormCmds(self): async def _trySetStormCmd(self, name, cdef): try: - await self._setStormCmd(cdef) + self._setStormCmd(cdef) except (asyncio.CancelledError, Exception): logger.exception(f'Storm command load failed: {name}') diff --git a/synapse/exc.py b/synapse/exc.py index 07520838414..5942cadfc15 100644 --- a/synapse/exc.py +++ b/synapse/exc.py @@ -56,9 +56,14 @@ def setdefault(self, name, valu): self.errinfo[name] = valu self._setExcMesg() + def update(self, items: dict): + '''Update multiple items in the errinfo dict at once.''' + self.errinfo.update(**items) + self._setExcMesg() + class StormRaise(SynErr): ''' - This represents a user provided exception inside of a Storm runtime. It requires a errname key. + This represents a user provided exception raised in the Storm runtime. It requires a errname key. ''' def __init__(self, *args, **info): SynErr.__init__(self, *args, **info) diff --git a/synapse/lib/agenda.py b/synapse/lib/agenda.py index b2a872c3190..e27ae10fea7 100644 --- a/synapse/lib/agenda.py +++ b/synapse/lib/agenda.py @@ -5,9 +5,7 @@ import logging import calendar import datetime -import functools import itertools -import collections from datetime import timezone as tz from collections.abc import Iterable, Mapping diff --git a/synapse/lib/ast.py b/synapse/lib/ast.py index 4cf49ab7e36..eda5748249b 100644 --- a/synapse/lib/ast.py +++ b/synapse/lib/ast.py @@ -59,7 +59,8 @@ def getPosInfo(self): } def addExcInfo(self, exc): - exc.errinfo['highlight'] = self.getPosInfo() + if 'highlight' not in exc.errinfo: + exc.set('highlight', self.getPosInfo()) return exc def repr(self): @@ -3556,7 +3557,10 @@ async def compute(self, runt, path): valu = s_stormtypes.fromprim(base, path=path) with s_scope.enter({'runt': runt}): - return await valu.deref(name) + try: + return await valu.deref(name) + except s_exc.SynErr as e: + raise self.kids[1].addExcInfo(e) class FuncCall(Value): @@ -3577,10 +3581,23 @@ async def compute(self, runt, path): kwargs = {k: v for (k, v) in await self.kids[2].compute(runt, path)} with s_scope.enter({'runt': runt}): - retn = func(*argv, **kwargs) - if s_coro.iscoro(retn): - return await retn - return retn + try: + retn = func(*argv, **kwargs) + if s_coro.iscoro(retn): + return await retn + return retn + + except TypeError as e: + mesg = str(e) + if (funcpath := getattr(func, '_storm_funcpath', None)) is not None: + mesg = f"{funcpath}(){mesg.split(')', 1)[1]}" + + raise self.addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) + + except s_exc.SynErr as e: + if getattr(func, '_storm_runtime_lib_func', None) is not None: + e.errinfo.pop('highlight', None) + raise self.addExcInfo(e) class DollarExpr(Value): ''' @@ -5035,8 +5052,9 @@ async def once(): @s_stormtypes.stormfunc(readonly=True) async def realfunc(*args, **kwargs): - return await self.callfunc(runt, argdefs, args, kwargs) + return await self.callfunc(runt, argdefs, args, kwargs, realfunc._storm_funcpath) + realfunc._storm_funcpath = self.name await runt.setVar(self.name, realfunc) count = 0 @@ -5058,7 +5076,7 @@ def validate(self, runt): # var scope validation occurs in the sub-runtime pass - async def callfunc(self, runt, argdefs, args, kwargs): + async def callfunc(self, runt, argdefs, args, kwargs, funcpath): ''' Execute a function call using the given runtime. @@ -5069,7 +5087,7 @@ async def callfunc(self, runt, argdefs, args, kwargs): argcount = len(args) + len(kwargs) if argcount > len(argdefs): - mesg = f'{self.name}() takes {len(argdefs)} arguments but {argcount} were provided' + mesg = f'{funcpath}() takes {len(argdefs)} arguments but {argcount} were provided' raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) # Fill in the positional arguments @@ -5083,7 +5101,7 @@ async def callfunc(self, runt, argdefs, args, kwargs): valu = kwargs.pop(name, s_common.novalu) if valu is s_common.novalu: if defv is s_common.novalu: - mesg = f'{self.name}() missing required argument {name}' + mesg = f'{funcpath}() missing required argument {name}' raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) valu = defv @@ -5094,11 +5112,11 @@ async def callfunc(self, runt, argdefs, args, kwargs): # used a kwarg not defined. kwkeys = list(kwargs.keys()) if kwkeys[0] in posnames: - mesg = f'{self.name}() got multiple values for parameter {kwkeys[0]}' + mesg = f'{funcpath}() got multiple values for parameter {kwkeys[0]}' raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) plural = 's' if len(kwargs) > 1 else '' - mesg = f'{self.name}() got unexpected keyword argument{plural}: {",".join(kwkeys)}' + mesg = f'{funcpath}() got unexpected keyword argument{plural}: {",".join(kwkeys)}' raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg)) assert len(mergargs) == len(argdefs) diff --git a/synapse/lib/cell.py b/synapse/lib/cell.py index 24bd0ec999b..7f8c0983d84 100644 --- a/synapse/lib/cell.py +++ b/synapse/lib/cell.py @@ -458,6 +458,43 @@ async def addRole(self, name, iden=None): async def delRole(self, iden): return await self.cell.delRole(iden) + async def addUserApiKey(self, name, duration=None, useriden=None): + if useriden is None: + useriden = self.user.iden + + if self.user.iden == useriden: + self.user.confirm(('auth', 'self', 'set', 'apikey'), default=True) + else: + self.user.confirm(('auth', 'user', 'set', 'apikey')) + + return await self.cell.addUserApiKey(useriden, name, duration=duration) + + async def listUserApiKeys(self, useriden=None): + if useriden is None: + useriden = self.user.iden + + if self.user.iden == useriden: + self.user.confirm(('auth', 'self', 'set', 'apikey'), default=True) + else: + self.user.confirm(('auth', 'user', 'set', 'apikey')) + + return await self.cell.listUserApiKeys(useriden) + + async def delUserApiKey(self, iden): + apikey = await self.cell.getUserApiKey(iden) + if apikey is None: + mesg = f'User API key with {iden=} does not exist.' + raise s_exc.NoSuchIden(mesg=mesg, iden=iden) + + useriden = apikey.get('user') + + if self.user.iden == useriden: + self.user.confirm(('auth', 'self', 'set', 'apikey'), default=True) + else: + self.user.confirm(('auth', 'user', 'set', 'apikey')) + + return await self.cell.delUserApiKey(iden) + @adminapi() async def dyncall(self, iden, todo, gatekeys=()): return await self.cell.dyncall(iden, todo, gatekeys=gatekeys) @@ -2381,18 +2418,12 @@ def walkpath(path): walkpath(self.backdirn) return backups - async def iterBackupArchive(self, name, user): - - success = False - loglevel = logging.WARNING - - path = self._reqBackDirn(name) - cellguid = os.path.join(path, 'cell.guid') - if not os.path.isfile(cellguid): - mesg = 'Specified backup path has no cell.guid file.' - raise s_exc.BadArg(mesg=mesg, arg='path', valu=path) - + async def _streamBackupArchive(self, path, user, name): link = s_scope.get('link') + if link is None: + mesg = 'Link not found in scope. This API must be called via a CellApi.' + raise s_exc.SynErr(mesg=mesg) + linkinfo = await link.getSpawnInfo() linkinfo['logconf'] = await self._getSpawnLogConf() @@ -2400,42 +2431,42 @@ async def iterBackupArchive(self, name, user): ctx = multiprocessing.get_context('spawn') - proc = None - mesg = 'Streaming complete' - def getproc(): proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo)) proc.start() return proc + mesg = 'Streaming complete' + proc = await s_coro.executor(getproc) + cancelled = False try: - proc = await s_coro.executor(getproc) - await s_coro.executor(proc.join) + self.backlastuploaddt = datetime.datetime.now() + logger.debug(f'Backup streaming completed successfully for {name}') - except (asyncio.CancelledError, Exception) as e: + except asyncio.CancelledError: + logger.warning('Backup streaming was cancelled.') + cancelled = True + raise - # We want to log all exceptions here, an asyncio.CancelledError - # could be the result of a remote link terminating due to the - # backup stream being completed, prior to this function - # finishing. + except Exception as e: logger.exception('Error during backup streaming.') - - if proc: - proc.terminate() - mesg = repr(e) raise - else: - success = True - loglevel = logging.DEBUG - self.backlastuploaddt = datetime.datetime.now() - finally: - phrase = 'successfully' if success else 'with failure' - logger.log(loglevel, f'iterBackupArchive completed {phrase} for {name}') - raise s_exc.DmonSpawn(mesg=mesg) + proc.terminate() + + if not cancelled: + raise s_exc.DmonSpawn(mesg=mesg) + + async def iterBackupArchive(self, name, user): + path = self._reqBackDirn(name) + cellguid = os.path.join(path, 'cell.guid') + if not os.path.isfile(cellguid): + mesg = 'Specified backup path has no cell.guid file.' + raise s_exc.BadArg(mesg=mesg, arg='path', valu=path) + await self._streamBackupArchive(path, user, name) async def iterNewBackupArchive(self, user, name=None, remove=False): @@ -2446,9 +2477,6 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): if remove: self.backupstreaming = True - success = False - loglevel = logging.WARNING - if name is None: name = time.strftime('%Y%m%d%H%M%S', datetime.datetime.now().timetuple()) @@ -2457,10 +2485,6 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): mesg = 'Backup with name already exists' raise s_exc.BadArg(mesg=mesg) - link = s_scope.get('link') - linkinfo = await link.getSpawnInfo() - linkinfo['logconf'] = await self._getSpawnLogConf() - try: await self.runBackup(name) except Exception: @@ -2470,54 +2494,13 @@ async def iterNewBackupArchive(self, user, name=None, remove=False): logger.debug(f'Removed {path}') raise - await self.boss.promote('backup:stream', user=user, info={'name': name}) - - ctx = multiprocessing.get_context('spawn') - - proc = None - mesg = 'Streaming complete' - - def getproc(): - proc = ctx.Process(target=_iterBackupProc, args=(path, linkinfo)) - proc.start() - return proc - - try: - proc = await s_coro.executor(getproc) - - await s_coro.executor(proc.join) - - except (asyncio.CancelledError, Exception) as e: - - # We want to log all exceptions here, an asyncio.CancelledError - # could be the result of a remote link terminating due to the - # backup stream being completed, prior to this function - # finishing. - logger.exception('Error during backup streaming.') - - if proc: - proc.terminate() - - mesg = repr(e) - raise - - else: - success = True - loglevel = logging.DEBUG - self.backlastuploaddt = datetime.datetime.now() - - finally: - if remove: - logger.debug(f'Removing {path}') - await s_coro.executor(shutil.rmtree, path, ignore_errors=True) - logger.debug(f'Removed {path}') - - phrase = 'successfully' if success else 'with failure' - logger.log(loglevel, f'iterNewBackupArchive completed {phrase} for {name}') - raise s_exc.DmonSpawn(mesg=mesg) + await self._streamBackupArchive(path, user, name) finally: if remove: + logger.debug(f'Removing {path}') + await s_coro.executor(shutil.rmtree, path, ignore_errors=True) + logger.debug(f'Removed {path}') self.backupstreaming = False async def isUserAllowed(self, iden, perm, gateiden=None, default=False): diff --git a/synapse/lib/cli.py b/synapse/lib/cli.py index c92f0388de0..e35c87302cb 100644 --- a/synapse/lib/cli.py +++ b/synapse/lib/cli.py @@ -281,18 +281,26 @@ async def _onItemFini(self): await self.fini() - async def addSignalHandlers(self): + async def addSignalHandlers(self): # pragma: no cover ''' Register SIGINT signal handler with the ioloop to cancel the currently running cmdloop task. + Removes the handler when the cli is fini'd. ''' - def sigint(): - self.printf('') if self.cmdtask is not None: self.cmdtask.cancel() self.loop.add_signal_handler(signal.SIGINT, sigint) + def onfini(): + # N.B. This is reaches into some loop / handle internals but + # prevents us from removing a handler that overwrote our own. + hndl = self.loop._signal_handlers.get(signal.SIGINT, None) # type: asyncio.Handle + if hndl is not None and hndl._callback is sigint: + self.loop.remove_signal_handler(signal.SIGINT) + + self.onfini(onfini) + def get(self, name, defval=None): return self.locs.get(name, defval) @@ -324,8 +332,12 @@ async def prompt(self, text=None): if text is None: text = self.cmdprompt - with patch_stdout(): - retn = await self.sess.prompt_async(text, vi_mode=self.vi_mode, enable_open_in_editor=True) + with patch_stdout(): # pragma: no cover + retn = await self.sess.prompt_async(text, + vi_mode=self.vi_mode, + enable_open_in_editor=True, + handle_sigint=False # We handle sigint in the loop + ) return retn def printf(self, mesg, addnl=True, color=None): @@ -390,7 +402,7 @@ async def runCmdLoop(self): self.cmdtask = self.schedCoro(coro) await self.cmdtask - except KeyboardInterrupt: + except (KeyboardInterrupt, asyncio.CancelledError): if self.isfini: return @@ -408,11 +420,8 @@ async def runCmdLoop(self): if self.cmdtask is not None: self.cmdtask.cancel() try: - self.cmdtask.result() - except asyncio.CancelledError: - # Wait a beat to let any remaining nodes to print out before we print the prompt - await asyncio.sleep(1) - except Exception: + await asyncio.wait_for(self.cmdtask, timeout=0.1) + except (asyncio.CancelledError, asyncio.TimeoutError): pass async def runCmdLine(self, line): diff --git a/synapse/lib/editor.py b/synapse/lib/editor.py index 272c012f13b..d06430eb579 100644 --- a/synapse/lib/editor.py +++ b/synapse/lib/editor.py @@ -421,7 +421,7 @@ async def addTag(self, tag, valu=(None, None), tagnode=None): try: valu, _ = self.model.type('ival').norm(valu) except s_exc.BadTypeValu as e: - e.errinfo['tag'] = tagnode.valu + e.set('tag', tagnode.valu) raise e tagup = tagnode.get('up') @@ -678,9 +678,9 @@ async def _set(self, prop, valu, norminfo=None, ignore_ro=False): valu, norminfo = prop.type.norm(valu) except s_exc.BadTypeValu as e: oldm = e.errinfo.get('mesg') - e.errinfo['prop'] = prop.name - e.errinfo['form'] = prop.form.name - e.errinfo['mesg'] = f'Bad prop value {prop.full}={valu!r} : {oldm}' + e.update({'prop': prop.name, + 'form': prop.form.name, + 'mesg': f'Bad prop value {prop.full}={valu!r} : {oldm}'}) raise e if isinstance(prop.type, s_types.Ndef): @@ -851,7 +851,7 @@ async def _addNode(self, form, valu, norminfo=None): try: valu, norminfo = form.type.norm(valu) except s_exc.BadTypeValu as e: - e.errinfo['form'] = form.name + e.set('form', form.name) raise e return valu, norminfo diff --git a/synapse/lib/parser.py b/synapse/lib/parser.py index 54db8b5dc0c..b61c23195be 100644 --- a/synapse/lib/parser.py +++ b/synapse/lib/parser.py @@ -503,7 +503,7 @@ def _larkToSynExc(self, e): origexc = e.orig_exc if not isinstance(origexc, s_exc.SynErr): raise e.orig_exc # pragma: no cover - origexc.errinfo['text'] = self.text + origexc.set('text', self.text) return s_exc.BadSyntax(**origexc.errinfo) elif isinstance(e, lark.exceptions.UnexpectedCharacters): # pragma: no cover diff --git a/synapse/lib/storm.py b/synapse/lib/storm.py index c1ba72feab2..ab312652606 100644 --- a/synapse/lib/storm.py +++ b/synapse/lib/storm.py @@ -5837,6 +5837,12 @@ class ParallelCmd(Cmd): inet:ipv4#foo | parallel { $place = $lib.import(foobar).lookup(:latlong) [ :place=$place ] } NOTE: Storm variables set within the parallel query pipelines do not interact. + + NOTE: If there are inbound nodes to the parallel command, parallel pipelines will be created as each node + is processed, up to the number specified by --size. If the number of nodes in the pipeline is less + than the value specified by --size, additional pipelines with no inbound node will not be created. + If there are no inbound nodes to the parallel command, the number of pipelines specified by --size + will always be created. ''' name = 'parallel' readonly = True @@ -5893,19 +5899,33 @@ async def execStormCmd(self, runt, genr): inq = asyncio.Queue(maxsize=size) outq = asyncio.Queue(maxsize=size) - async def pump(): - try: - async for pumpitem in genr: - await inq.put(pumpitem) - [await inq.put(None) for i in range(size)] - except asyncio.CancelledError: # pragma: no cover - raise - except Exception as e: - await outq.put(e) - - base.schedCoro(pump()) - for i in range(size): - base.schedCoro(self.pipeline(runt, query, inq, outq)) + tsks = 0 + try: + while tsks < size: + await inq.put(await genr.__anext__()) + base.schedCoro(self.pipeline(runt, query, inq, outq)) + tsks += 1 + except StopAsyncIteration: + [await inq.put(None) for i in range(tsks)] + + # If a full set of tasks were created, keep pumping nodes into the queue + if tsks == size: + async def pump(): + try: + async for pumpitem in genr: + await inq.put(pumpitem) + [await inq.put(None) for i in range(size)] + except Exception as e: + await outq.put(e) + + base.schedCoro(pump()) + + # If no tasks were created, make a full set + elif tsks == 0: + tsks = size + for i in range(size): + base.schedCoro(self.pipeline(runt, query, inq, outq)) + [await inq.put(None) for i in range(tsks)] exited = 0 while True: @@ -5916,7 +5936,7 @@ async def pump(): if item is None: exited += 1 - if exited == size: + if exited == tsks: return continue @@ -6059,9 +6079,6 @@ async def pipeline(self, runt, outq, genr=None): await outq.put(None) - except asyncio.CancelledError: # pragma: no cover - raise - except Exception as e: await outq.put(e) diff --git a/synapse/lib/stormtypes.py b/synapse/lib/stormtypes.py index 2e24d7f37b5..fd20af68f6f 100644 --- a/synapse/lib/stormtypes.py +++ b/synapse/lib/stormtypes.py @@ -189,11 +189,29 @@ def registerLib(self, ctor): raise Exception('no key!') self.addStormLib(path, ctor) + for info in ctor._storm_locals: + rtype = info.get('type') + if isinstance(rtype, dict) and rtype.get('type') == 'function': + if (fname := rtype.get('_funcname')) == '_storm_query': + continue + + if (func := getattr(ctor, fname, None)) is not None: + funcpath = '.'.join(('lib',) + ctor._storm_lib_path + (info['name'],)) + func._storm_funcpath = f"${funcpath}" + return ctor def registerType(self, ctor): '''Decorator to register a StormPrim''' self.addStormType(ctor.__name__, ctor) + + for info in ctor._storm_locals: + rtype = info.get('type') + if isinstance(rtype, dict) and rtype.get('type') == 'function': + fname = rtype.get('_funcname') + if (func := getattr(ctor, fname, None)) is not None: + func._storm_funcpath = f"{ctor._storm_typename}.{info['name']}" + return ctor def iterLibs(self): @@ -616,6 +634,7 @@ async def initLibAsync(self): if callable(v) and v.__name__ == 'realfunc': v._storm_runtime_lib = self v._storm_runtime_lib_func = k + v._storm_funcpath = f'${".".join(("lib",) + self.name + (k,))}' self.locls[k] = v diff --git a/synapse/models/inet.py b/synapse/models/inet.py index 83a5236f19b..f5fccb199ee 100644 --- a/synapse/models/inet.py +++ b/synapse/models/inet.py @@ -1329,7 +1329,11 @@ def getModelDefs(self): 'doc': 'A single result from a web search.', }), - ('inet:web:hashtag', ('str', {'lower': True, 'regex': r'^#\w[\w·]*(? it:prod:softver.'}), ('hardware', ('it:prod:hardware', {}), { - 'doc': 'A hardware version which implements a fix for the vulnerability.'}), + 'deprecated': True, + 'doc': 'Deprecated. Please use risk:mitigation -(uses)> it:prod:hardware.'}), ('reporter', ('ou:org', {}), { 'doc': 'The organization reporting on the mitigation.'}), @@ -1035,6 +1046,9 @@ def getModelDefs(self): ('leaker', ('ps:contact', {}), { 'doc': 'The identity which leaked the information.'}), + ('recipient', ('ps:contact', {}), { + 'doc': 'The identity which received the leaked information.'}), + ('type', ('risk:leak:type:taxonomy', {}), { 'doc': 'A type taxonomy for the leak.'}), diff --git a/synapse/tests/files/stormpkg/testpkg.yaml b/synapse/tests/files/stormpkg/testpkg.yaml index fa71167a402..3423a4aa48c 100644 --- a/synapse/tests/files/stormpkg/testpkg.yaml +++ b/synapse/tests/files/stormpkg/testpkg.yaml @@ -60,7 +60,7 @@ external_modules: graphs: - name: testgraph degrees: 2 - pivots: ["<- meta:seen <- meta:source"] + pivots: ["-> meta:seen <(*)- meta:source"] filters: ["-#nope"] forms: inet:fqdn: diff --git a/synapse/tests/test_cortex.py b/synapse/tests/test_cortex.py index 7fe70efc088..5ae5038fc9e 100644 --- a/synapse/tests/test_cortex.py +++ b/synapse/tests/test_cortex.py @@ -371,7 +371,7 @@ async def test_cortex_stormiface(self): self.len(0, mods) self.len(0, core.modsbyiface.get('lookup')) - await core.loadStormPkg(pkgdef) + core.loadStormPkg(pkgdef) mods = await core.getStormIfaces('lookup') self.len(1, mods) @@ -400,7 +400,7 @@ async def test_cortex_stormiface(self): vals = [r async for r in core.view.callStormIface('boom', todo)] self.eq((), vals) - await core._dropStormPkg(pkgdef) + core._dropStormPkg(pkgdef) self.none(core.modsbyiface.get('lookup')) mods = await core.getStormIfaces('lookup') @@ -445,7 +445,7 @@ async def test_cortex_lookup_search_dedup(self): nodes = await core.nodes('foo@bar.com foo@bar.com', opts={'mode': 'lookup'}) self.eq(['inet:email', 'inet:email'], [n.ndef[0] for n in nodes]) - await core.loadStormPkg(pkgdef) + core.loadStormPkg(pkgdef) self.len(1, await core.getStormIfaces('search')) todo = s_common.todo('search', ('foo@bar.com',)) @@ -3520,7 +3520,7 @@ def checkGraph(seeds, alldefs): 'refs': True, 'edges': False, 'forms': {}, - 'pivots': ['<- meta:seen'], + 'pivots': ['-> meta:seen'], 'degrees': 3, 'filters': ['+#nope'], 'filterinput': False, @@ -3837,6 +3837,58 @@ def checkGraph(seeds, alldefs): gdef = await core.callStorm('return($lib.graph.add(({"name": "def", "permissions": {"default": 0}})))') self.eq(0, gdef['permissions']['default']) + async def test_graph_projection_query_validation(self): + async with self.getTestCore() as core: + valid = { + 'name': 'valid', + 'forms': { + 'inet:fqdn': { + 'pivots': ['<- *'], + 'filters': [] + } + } + } + + self.nn(await core.addStormGraph(valid)) + + bad_form_pivot = { + 'name': 'bad form pivot', + 'forms': { + 'inet:fqdn': { + 'pivots': ['<- * |||'], + 'filters': [] + } + } + } + + await self.asyncraises(s_exc.BadSyntax, core.addStormGraph(bad_form_pivot)) + + bad_form_filter = { + 'name': 'bad form filter', + 'forms': { + 'inet:fqdn': { + 'pivots': [], + 'filters': ['+++:wat'] + } + } + } + + await self.asyncraises(s_exc.BadSyntax, core.addStormGraph(bad_form_filter)) + + bad_global_filter = { + 'name': 'bad global filter', + 'filters': ['+++:wat'] + } + + await self.asyncraises(s_exc.BadSyntax, core.addStormGraph(bad_global_filter)) + + bad_global_pivot = { + 'name': 'bad global pivot', + 'pivots': ['-> * |||'] + } + + await self.asyncraises(s_exc.BadSyntax, core.addStormGraph(bad_global_pivot)) + async def test_storm_two_level_assignment(self): async with self.getTestCore() as core: q = '$foo=baz $bar=$foo [test:str=$bar]' diff --git a/synapse/tests/test_exc.py b/synapse/tests/test_exc.py index 0428303000d..604955ba3de 100644 --- a/synapse/tests/test_exc.py +++ b/synapse/tests/test_exc.py @@ -27,6 +27,9 @@ def test_basic(self): e.setdefault('defv', 2) self.eq("SynErr: defv=1 foo='words' hehe=1234 mesg='words'", str(e)) + e.update({'foo': 'newwords', 'bar': 'baz'}) + self.eq("SynErr: bar='baz' defv=1 foo='newwords' hehe=1234 mesg='words'", str(e)) + self.eq(e.errname, 'SynErr') e2 = s_exc.BadTypeValu(mesg='haha') diff --git a/synapse/tests/test_lib_agenda.py b/synapse/tests/test_lib_agenda.py index dfc025ed997..388a4482db5 100644 --- a/synapse/tests/test_lib_agenda.py +++ b/synapse/tests/test_lib_agenda.py @@ -1,3 +1,4 @@ +import time import asyncio import hashlib import datetime @@ -361,7 +362,9 @@ def looptime(): appt = await agenda.get(guid) self.eq(appt.isrunning, False) - self.eq(appt.lastresult, "raised exception StormRaise: errname='OmgWtfBbq' mesg='boom'") + self.isin("raised exception StormRaise: errname='OmgWtfBbq'", appt.lastresult) + self.isin("highlight={'hash': '6736b8252d9413221a9b693b2b19cf53'", appt.lastresult) + self.isin("mesg='boom'", appt.lastresult) # Test setting the global enable/disable flag await agenda.delete(guid) @@ -806,6 +809,159 @@ async def test_agenda_mirror_realtime(self): data = stream.read() self.isin("_Appt.edits() Invalid attribute received: invalid = 'newp'", data) + async def test_agenda_promotions(self): + # Adjust this knob for the number of cron jobs you want to test. Below + # are some average run times from my dev box + # 100 -> ~15s + # 250 -> ~18s + # 500 -> ~22s + # 5000 -> ~88s + NUMJOBS = 100 + + async with self.getTestAha() as aha: + + conf00 = { + 'aha:provision': await aha.addAhaSvcProv('00.cortex') + } + + async with self.getTestCore(conf=conf00) as core00: + self.false(core00.conf.get('mirror')) + + msgs = await core00.stormlist('[it:dev:str=foo]') + self.stormHasNoWarnErr(msgs) + + # Forward wind agenda to two minutes past the hour so we don't hit any weird timing windows + tick = core00.agenda._getNowTick() + now = time.gmtime(int(tick)) + diff = (60 - now.tm_min) * 60 + core00.agenda._addTickOff(diff + 120) + + # Add NUMJOBS cron jobs that starts every hour + q = ''' + for $ii in $lib.range($numjobs) { + cron.add --name `CRON{$ii}` --hour +1 { $lib.time.sleep(90) } + } + ''' + opts = {'vars': {'numjobs': NUMJOBS}} + await core00.callStorm(q, opts=opts) + + prov01 = {'mirror': '00.cortex'} + conf01 = { + 'aha:provision': await aha.addAhaSvcProv('01.cortex', provinfo=prov01), + } + + async with self.getTestCore(conf=conf01) as core01: + # Advance the ticks so the cronjob starts sooner + core00.agenda._addTickOff(3600) + + # Sync agenda ticks + diff = core00.agenda._getNowTick() - core01.agenda._getNowTick() + core01.agenda._addTickOff(diff) + + mesgs = [] + async for mesg in core00.behold(): + mesgs.append(mesg) + if len(mesgs) >= NUMJOBS: + break + + for mesg in mesgs: + self.eq(mesg['event'], 'cron:start') + + # Inspect crons and tasks + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(NUMJOBS, crons00) + # isrunning is synced via nexus so it should be true for both cortexes + for cron in crons00: + self.true(cron.get('isrunning')) + + cronidens = [k['iden'] for k in crons00] + + await core01.sync() + + crons01 = await core01.callStorm('return($lib.cron.list())') + self.len(NUMJOBS, crons01) + # isrunning is synced via nexus so it should be true for both cortexes + for cron in crons01: + self.true(cron.get('isrunning')) + + tasks00 = await core00.callStorm('return($lib.ps.list())') + # 101 tasks: one for the main task and NUMJOBS for the cronjob instances + self.len(NUMJOBS + 1, tasks00) + self.eq(tasks00[0]['info']['query'], '[it:dev:str=foo]') + for idx, task in enumerate(tasks00): + if idx == 0: + continue + + self.isin(task['info']['iden'], cronidens) + self.eq(task['info']['query'], '$lib.time.sleep(90)') + + # No tasks running on the follower + tasks01 = await core01.callStorm('return($lib.ps.list())') + self.len(0, tasks01) + + with self.getLoggerStream('synapse.lib.agenda', mesg='name=CRON99') as stream: + # Promote and inspect cortex status + await core01.promote(graceful=True) + self.false(core00.isactive) + self.true(core01.isactive) + + stream.seek(0) + data = stream.read() + for ii in range(NUMJOBS): + self.isin(f' name=CRON{ii} with result "cancelled" took ', data) + + # Sync the (now) follower so the isrunning status gets updated to false on both cortexes + await core00.sync() + + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(NUMJOBS, crons00) + for cron in crons00: + self.false(cron.get('isrunning')) + + crons01 = await core01.callStorm('return($lib.cron.list())') + self.len(NUMJOBS, crons01) + for cron in crons01: + self.false(cron.get('isrunning')) + + # Bump the ticks on core01 so the cron jobs start + core01.agenda._addTickOff(3600) + + mesgs = [] + async for mesg in core01.behold(): + mesgs.append(mesg) + if len(mesgs) >= NUMJOBS: + break + + for mesg in mesgs: + self.eq(mesg['event'], 'cron:start') + + # Sync the follower to get the latest isrunning status + await core00.sync() + + crons00 = await core00.callStorm('return($lib.cron.list())') + self.len(NUMJOBS, crons00) + # Cronjobs are running so true on both cortexes + for cron in crons00: + self.true(cron.get('isrunning')) + + crons01 = await core01.callStorm('return($lib.cron.list())') + self.len(NUMJOBS, crons01) + # Cronjobs are running so true on both cortexes + for cron in crons01: + self.true(cron.get('isrunning')) + + tasks00 = await core00.callStorm('return($lib.ps.list())') + # This task is the main task from before promotion + self.len(1, tasks00) + self.eq(tasks00[0]['info']['query'], '[it:dev:str=foo]') + + tasks01 = await core01.callStorm('return($lib.ps.list())') + # The cronjob instances are the only tasks + self.len(NUMJOBS, tasks01) + for task in tasks01: + self.isin(task['info']['iden'], cronidens) + self.eq(task['info']['query'], '$lib.time.sleep(90)') + async def test_cron_kill(self): async with self.getTestCore() as core: diff --git a/synapse/tests/test_lib_ast.py b/synapse/tests/test_lib_ast.py index 7e2efad32d0..e18ab2d1a97 100644 --- a/synapse/tests/test_lib_ast.py +++ b/synapse/tests/test_lib_ast.py @@ -131,7 +131,7 @@ async def test_mode_search(self): self.stormIsInWarn('Storm search interface is not enabled!', msgs) async with self.getTestCore() as core: - await core.loadStormPkg({ + core.loadStormPkg({ 'name': 'testsearch', 'modules': [ {'name': 'testsearch', 'interfaces': ['search'], 'storm': ''' @@ -3142,6 +3142,54 @@ async def test_ast_highlight(self): off, end = errm[1][1]['highlight']['offsets'] self.eq('haha', text[off:end]) + text = '$lib.newp' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('newp', text[off:end]) + + visi = (await core.addUser('visi'))['iden'] + text = '$users=$lib.auth.users.list() $lib.print($users.0.profile)' + msgs = await core.stormlist(text, opts={'user': visi}) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('lib.print($users.0.profile)', text[off:end]) + + text = '$lib.len(foo, bar)' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('lib.len(foo, bar)', text[off:end]) + self.stormIsInErr('$lib.len()', msgs) + + text = '$foo=$lib.pkg.get $foo()' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('foo()', text[off:end]) + self.stormIsInErr('$lib.pkg.get()', msgs) + + text = '$obj = $lib.pipe.gen(${ $obj.put() }) $obj.put(foo, bar, baz)' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('obj.put(foo, bar, baz)', text[off:end]) + self.stormIsInErr('pipe.put()', msgs) + + text = '$lib.gen.campaign(foo, bar, baz)' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('lib.gen.campaign(foo, bar, baz)', text[off:end]) + self.stormIsInErr('$lib.gen.campaign()', msgs) + + text = '$gen = $lib.gen.campaign $gen(foo, bar, baz)' + msgs = await core.stormlist(text) + errm = [m for m in msgs if m[0] == 'err'][0] + off, end = errm[1][1]['highlight']['offsets'] + self.eq('gen(foo, bar, baz)', text[off:end]) + self.stormIsInErr('$lib.gen.campaign()', msgs) + async def test_ast_bulkedges(self): async with self.getTestCore() as core: diff --git a/synapse/tests/test_lib_cell.py b/synapse/tests/test_lib_cell.py index e2fb8956d18..69b95ef198a 100644 --- a/synapse/tests/test_lib_cell.py +++ b/synapse/tests/test_lib_cell.py @@ -1360,6 +1360,11 @@ def diffdev(dirn): with mock.patch('os.stat', diffdev): await self.asyncraises(s_exc.LowSpace, proxy.runBackup()) + user = await core.auth.getUserByName('root') + with self.raises(s_exc.SynErr) as cm: + await core.iterNewBackupArchive(user) + self.isin('This API must be called via a CellApi', cm.exception.get('mesg')) + async def err(*args, **kwargs): raise RuntimeError('boom') @@ -2212,11 +2217,13 @@ async def test_backup_restore_double_promote_aha(self): # Backup the mirror (core01) which points to the core00 async with await axon00.upload() as upfd: async with core01.getLocalProxy() as prox: + tot_chunks = 0 async for chunk in prox.iterNewBackupArchive(): await upfd.write(chunk) + tot_chunks += len(chunk) size, sha256 = await upfd.save() - await asyncio.sleep(0) + self.eq(size, tot_chunks) furl = f'{url}{s_common.ehex(sha256)}' purl = await aha.addAhaSvcProv('00.mynewcortex') @@ -3024,3 +3031,66 @@ async def test_lib_cell_promote_schism_prevent(self): with self.raises(s_exc.BadState) as cm: await cell00.promote(graceful=True) self.isin('02.cell is not the current leader', cm.exception.get('mesg')) + + async def test_stream_backup_exception(self): + + with self.getTestDir() as dirn: + backdirn = os.path.join(dirn, 'backups') + coredirn = os.path.join(dirn, 'cortex') + + conf = {'backup:dir': backdirn} + s_common.yamlsave(conf, coredirn, 'cell.yaml') + + async with self.getTestCore(dirn=coredirn) as core: + async with core.getLocalProxy() as proxy: + + await proxy.runBackup(name='bkup') + + mock_proc = mock.Mock() + mock_proc.join = mock.Mock() + + async def mock_executor(func, *args, **kwargs): + if isinstance(func, mock.Mock) and func is mock_proc.join: + raise Exception('boom') + return mock_proc + + with mock.patch('synapse.lib.cell.s_coro.executor', mock_executor): + with self.getAsyncLoggerStream('synapse.lib.cell', 'Error during backup streaming') as stream: + with self.raises(Exception) as cm: + async for _ in proxy.iterBackupArchive('bkup'): + pass + self.true(await stream.wait(timeout=6)) + + async def test_iter_new_backup_archive(self): + + with self.getTestDir() as dirn: + backdirn = os.path.join(dirn, 'backups') + coredirn = os.path.join(dirn, 'cortex') + + conf = {'backup:dir': backdirn} + s_common.yamlsave(conf, coredirn, 'cell.yaml') + + async with self.getTestCore(dirn=coredirn) as core: + async with core.getLocalProxy() as proxy: + + async def mock_runBackup(*args, **kwargs): + raise Exception('backup failed') + + with mock.patch.object(s_cell.Cell, 'runBackup', mock_runBackup): + with self.getAsyncLoggerStream('synapse.lib.cell', 'Removing') as stream: + with self.raises(s_exc.SynErr) as cm: + async for _ in proxy.iterNewBackupArchive('failedbackup', remove=True): + pass + + self.isin('backup failed', str(cm.exception)) + self.true(await stream.wait(timeout=6)) + + path = os.path.join(backdirn, 'failedbackup') + self.false(os.path.exists(path)) + + self.false(core.backupstreaming) + + core.backupstreaming = True + with self.raises(s_exc.BackupAlreadyRunning): + async for _ in proxy.iterNewBackupArchive('newbackup', remove=True): + pass diff --git a/synapse/tests/test_lib_storm.py b/synapse/tests/test_lib_storm.py index e07c7d42b4a..ef40197f634 100644 --- a/synapse/tests/test_lib_storm.py +++ b/synapse/tests/test_lib_storm.py @@ -4,6 +4,7 @@ import datetime import itertools import urllib.parse as u_parse +import unittest.mock as mock import synapse.exc as s_exc import synapse.common as s_common @@ -646,32 +647,6 @@ async def test_lib_storm_basics(self): self.none(task['info'].get('opts')) self.eq(core.view.iden, task['info'].get('view')) - # test the parallel command - nodes = await core.nodes('parallel --size 4 { [ ou:org=* ] }') - self.len(4, nodes) - - # check that subquery validation happens - with self.raises(s_exc.NoSuchVar): - await core.nodes('parallel --size 4 { [ ou:org=$foo ] }') - - # check that an exception on inbound percolates correctly - with self.raises(s_exc.BadTypeValu): - await core.nodes('[ ou:org=* ou:org=foo ] | parallel { [:name=bar] }') - - # check that an exception in the parallel pipeline percolates correctly - with self.raises(s_exc.BadTypeValu): - await core.nodes('parallel { [ou:org=foo] }') - - nodes = await core.nodes('ou:org | parallel {[ :name=foo ]}') - self.true(all([n.get('name') == 'foo' for n in nodes])) - - # Runtsafety test - q = '[ inet:fqdn=www.vertex.link ] $q=:domain | parallel $q' - await self.asyncraises(s_exc.StormRuntimeError, core.nodes(q)) - - nodes = await core.nodes('ou:org | parallel ${ $foo=bar [ :name=$foo ]}') - self.true(all([n.get('name') == 'bar' for n in nodes])) - # test $lib.exit() and the StormExit handlers msgs = [m async for m in core.view.storm('$lib.exit()')] self.eq(msgs[-1][0], 'fini') @@ -789,10 +764,10 @@ async def test_lib_storm_basics(self): }, ) } - await core.loadStormPkg(emptypkg) + core.loadStormPkg(emptypkg) await core.addStormPkg(strverpkg) - await core.loadStormPkg(pkg0) + core.loadStormPkg(pkg0) await core.nodes('$lib.import(foo.baz)', opts=opts) await core.nodes('$lib.import(foo.baz, reqvers="==0.0.1")', opts=opts) @@ -3436,6 +3411,73 @@ async def test_storm_tee(self): q = '[ inet:fqdn=www.vertex.link ] $q=:domain | tee $q' await self.asyncraises(s_exc.StormRuntimeError, core.nodes(q)) + async def test_storm_parallel(self): + + async with self.getTestCore() as core: + + nodes = await core.nodes('parallel --size 4 { [ ou:org=* ] }') + self.len(4, nodes) + + # check that subquery validation happens + with self.raises(s_exc.NoSuchVar): + await core.nodes('parallel --size 4 { [ ou:org=$foo ] }') + + # check that an exception on inbound percolates correctly + with self.raises(s_exc.BadTypeValu): + await core.nodes('[ ou:org=(foo,) ou:org=foo ] | parallel { [:name=bar] }') + + with self.raises(s_exc.BadTypeValu): + await core.nodes('[ ou:org=(foo,) ou:org=foo ] | parallel --size 1 { [:name=bar] }') + + # check that an exception in the parallel pipeline percolates correctly + with self.raises(s_exc.BadTypeValu): + await core.nodes('parallel { [ou:org=foo] }') + + nodes = await core.nodes('ou:org | parallel {[ :name=foo ]}') + self.true(all([n.get('name') == 'foo' for n in nodes])) + + # Runtsafety test + q = '[ inet:fqdn=www.vertex.link ] $q=:domain | parallel $q' + await self.asyncraises(s_exc.StormRuntimeError, core.nodes(q)) + + nodes = await core.nodes('ou:org | parallel ${ $foo=bar [ :name=$foo ]}') + self.true(all([n.get('name') == 'bar' for n in nodes])) + + orig = s_storm.ParallelCmd.pipeline + tsks = {'cnt': 0} + + async def pipecnt(self, runt, query, inq, outq): + tsks['cnt'] += 1 + await orig(self, runt, query, inq, outq) + + with mock.patch('synapse.lib.storm.ParallelCmd.pipeline', pipecnt): + + nodes = await core.nodes('ou:org parallel --size 4 {[ :name=bar ]}') + self.len(5, nodes) + self.true(all([n.get('name') == 'bar' for n in nodes])) + self.eq(4, tsks['cnt']) + + tsks['cnt'] = 0 + + nodes = await core.nodes('ou:org parallel --size 5 {[ :name=bar ]}') + self.len(5, nodes) + self.true(all([n.get('name') == 'bar' for n in nodes])) + self.eq(5, tsks['cnt']) + + tsks['cnt'] = 0 + + # --size greater than number of nodes only creates a pipeline for each node + nodes = await core.nodes('ou:org parallel --size 10 {[ :name=foo ]}') + self.len(5, nodes) + self.true(all([n.get('name') == 'foo' for n in nodes])) + self.eq(5, tsks['cnt']) + + tsks['cnt'] = 0 + + nodes = await core.nodes('parallel --size 4 {[ ou:org=* ]}') + self.len(4, nodes) + self.eq(4, tsks['cnt']) + async def test_storm_yieldvalu(self): async with self.getTestCore() as core: @@ -3895,7 +3937,7 @@ async def test_storm_cmd_help(self): )}, ), } - await core.loadStormPkg(pdef) + core.loadStormPkg(pdef) msgs = await core.stormlist('woot --help') helptext = '\n'.join([m[1].get('mesg') for m in msgs if m[0] == 'print']) self.isin('Inputs:\n\n hehe:haha\n hoho:lol - We know whats up', helptext) @@ -4647,7 +4689,7 @@ async def test_storm_tagprune(self): async def test_storm_cmdscope(self): async with self.getTestCore() as core: - await core.loadStormPkg({ + core.loadStormPkg({ 'name': 'testpkg', 'version': '0.0.1', 'commands': ( diff --git a/synapse/tests/test_lib_stormlib_scrape.py b/synapse/tests/test_lib_stormlib_scrape.py index 7977d817b05..8d69f67d4b5 100644 --- a/synapse/tests/test_lib_stormlib_scrape.py +++ b/synapse/tests/test_lib_stormlib_scrape.py @@ -92,7 +92,7 @@ async def test_storm_lib_scrape_iface(self): self.len(0, mods) self.len(0, core.modsbyiface.get('scrape')) - await core.loadStormPkg(pkgdef) + core.loadStormPkg(pkgdef) mods = await core.getStormIfaces('scrape') self.len(2, mods) @@ -131,7 +131,7 @@ async def test_storm_lib_scrape_iface(self): conf = {'storm:interface:scrape': False, } async with self.getTestCore(conf=conf) as core: - await core.loadStormPkg(pkgdef) + core.loadStormPkg(pkgdef) mods = await core.getStormIfaces('scrape') self.len(2, mods) diff --git a/synapse/tests/test_model_inet.py b/synapse/tests/test_model_inet.py index e6ac1d7ef4f..5fee95c523d 100644 --- a/synapse/tests/test_model_inet.py +++ b/synapse/tests/test_model_inet.py @@ -10,17 +10,40 @@ class InetModelTest(s_t_utils.SynTest): async def test_model_inet_basics(self): async with self.getTestCore() as core: + self.len(1, await core.nodes('[ inet:web:hashtag="#🫠" ]')) + self.len(1, await core.nodes('[ inet:web:hashtag="#🫠🫠" ]')) + self.len(1, await core.nodes('[ inet:web:hashtag="#·bar"]')) + self.len(1, await core.nodes('[ inet:web:hashtag="#foo·"]')) + self.len(1, await core.nodes('[ inet:web:hashtag="#foo〜"]')) self.len(1, await core.nodes('[ inet:web:hashtag="#hehe" ]')) self.len(1, await core.nodes('[ inet:web:hashtag="#foo·bar"]')) # note the interpunct + self.len(1, await core.nodes('[ inet:web:hashtag="#foo〜bar"]')) # note the wave dash self.len(1, await core.nodes('[ inet:web:hashtag="#fo·o·······b·ar"]')) with self.raises(s_exc.BadTypeValu): await core.nodes('[ inet:web:hashtag="foo" ]') + with self.raises(s_exc.BadTypeValu): - await core.nodes('[ inet:web:hashtag="#foo bar" ]') - with self.raises(s_exc.BadTypeValu): - self.len(1, await core.nodes('[ inet:web:hashtag="#·bar"]')) - with self.raises(s_exc.BadTypeValu): - self.len(1, await core.nodes('[ inet:web:hashtag="#foo·"]')) + await core.nodes('[ inet:web:hashtag="#foo#bar" ]') + + # All unicode whitespace from: + # https://www.compart.com/en/unicode/category/Zl + # https://www.compart.com/en/unicode/category/Zp + # https://www.compart.com/en/unicode/category/Zs + whitespace = [ + '\u0020', '\u00a0', '\u1680', '\u2000', '\u2001', '\u2002', '\u2003', '\u2004', + '\u2005', '\u2006', '\u2007', '\u2008', '\u2009', '\u200a', '\u202f', '\u205f', + '\u3000', '\u2028', '\u2029', + ] + for char in whitespace: + with self.raises(s_exc.BadTypeValu): + await core.callStorm(f'[ inet:web:hashtag="#foo{char}bar" ]') + + with self.raises(s_exc.BadTypeValu): + await core.callStorm(f'[ inet:web:hashtag="#{char}bar" ]') + + # These are allowed because strip=True + await core.callStorm(f'[ inet:web:hashtag="#foo{char}" ]') + await core.callStorm(f'[ inet:web:hashtag=" #foo{char}" ]') async def test_inet_jarm(self): @@ -428,6 +451,7 @@ async def test_flow(self): :raw=((10), (20)) :src:txfiles={[ file:attachment=* :name=foo.exe ]} :dst:txfiles={[ file:attachment=* :name=bar.exe ]} + :capture:host=* )]''' nodes = await core.nodes(q, opts={'vars': {'valu': valu, 'p': props}}) self.len(1, nodes) @@ -466,11 +490,13 @@ async def test_flow(self): self.eq(node.get('src:rdp:hostname'), 'syncoder') self.eq(node.get('src:rdp:keyboard:layout'), 'azerty') self.eq(node.get('raw'), (10, 20)) + self.nn(node.get('capture:host')) self.len(2, await core.nodes('inet:flow -> crypto:x509:cert')) self.len(1, await core.nodes('inet:flow :src:ssh:key -> crypto:key')) self.len(1, await core.nodes('inet:flow :dst:ssh:key -> crypto:key')) self.len(1, await core.nodes('inet:flow :src:txfiles -> file:attachment +:name=foo.exe')) self.len(1, await core.nodes('inet:flow :dst:txfiles -> file:attachment +:name=bar.exe')) + self.len(1, await core.nodes('inet:flow :capture:host -> it:host')) async def test_fqdn(self): formname = 'inet:fqdn' @@ -2369,6 +2395,7 @@ async def test_model_inet_email_message(self): q = ''' [ inet:email:message="*" + :id="Woot-12345 " :to=woot@woot.com :from=visi@vertex.link :replyto=root@root.com @@ -2396,6 +2423,7 @@ async def test_model_inet_email_message(self): nodes = await core.nodes(q, opts={'vars': {'flow': flow}}) self.len(1, nodes) + self.eq(nodes[0].get('id'), 'Woot-12345') self.eq(nodes[0].get('cc'), ('baz@faz.org', 'foo@bar.com')) self.eq(nodes[0].get('received:from:ip'), (4, 0x01020304)) self.eq(nodes[0].get('received:from:fqdn'), 'smtp.vertex.link') @@ -2450,14 +2478,19 @@ async def test_model_inet_egress(self): nodes = await core.nodes(''' [ inet:egress=* :host = * + :host:iface = * :client=1.2.3.4 ] ''') self.len(1, nodes) self.nn(nodes[0].get('host')) + self.nn(nodes[0].get('host:iface')) self.eq(nodes[0].get('client'), 'tcp://1.2.3.4') + self.len(1, await core.nodes('inet:egress -> it:host')) + self.len(1, await core.nodes('inet:egress -> inet:iface')) + async def test_model_inet_tls_handshake(self): async with self.getTestCore() as core: @@ -2576,6 +2609,7 @@ async def test_model_inet_service(self): (inet:service:account=(blackout, account, vertex, slack) :id=U7RN51U1J :user=blackout + :url=https://vertex.link/users/blackout :email=blackout@vertex.link :profile={ gen.ps.contact.email vertex.employee blackout@vertex.link } :tenant={[ inet:service:tenant=({"id": "VS-31337"}) ]} @@ -2603,6 +2637,7 @@ async def test_model_inet_service(self): self.eq(accounts[0].ndef, ('inet:service:account', s_common.guid(('blackout', 'account', 'vertex', 'slack')))) self.eq(accounts[0].get('id'), 'U7RN51U1J') self.eq(accounts[0].get('user'), 'blackout') + self.eq(accounts[0].get('url'), 'https://vertex.link/users/blackout') self.eq(accounts[0].get('email'), 'blackout@vertex.link') self.eq(accounts[0].get('profile'), blckprof.ndef[1]) diff --git a/synapse/tests/test_model_risk.py b/synapse/tests/test_model_risk.py index da6d120c335..b0baa6ee7b5 100644 --- a/synapse/tests/test_model_risk.py +++ b/synapse/tests/test_model_risk.py @@ -426,6 +426,7 @@ async def addNode(text): :disclosed=20231102 :owner={ gen.ou.org.hq acme } :leaker={ gen.ou.org.hq wikileaks } + :recipient={ gen.ou.org.hq everyone } :type=public :goal={[ ou:goal=* :name=publicity ]} :compromise={[ risk:compromise=* :target={ gen.ou.org.hq acme } ]} @@ -454,6 +455,7 @@ async def addNode(text): self.len(1, await core.nodes('risk:leak -> risk:leak:type:taxonomy')) self.len(1, await core.nodes('risk:leak :owner -> ps:contact +:orgname=acme')) self.len(1, await core.nodes('risk:leak :leaker -> ps:contact +:orgname=wikileaks')) + self.len(1, await core.nodes('risk:leak :recipient -> ps:contact +:orgname=everyone')) self.len(1, await core.nodes('risk:leak -> ou:goal +:name=publicity')) self.len(1, await core.nodes('risk:leak -> risk:compromise :target -> ps:contact +:orgname=acme')) self.len(1, await core.nodes('risk:leak :reporter -> ou:org +:name=vertex')) diff --git a/synapse/tests/test_tools_apikey.py b/synapse/tests/test_tools_apikey.py new file mode 100644 index 00000000000..4d6236ac0b9 --- /dev/null +++ b/synapse/tests/test_tools_apikey.py @@ -0,0 +1,227 @@ +import datetime + +import synapse.exc as s_exc +import synapse.common as s_common + +import synapse.lib.time as s_time +import synapse.lib.output as s_output + +import synapse.tests.utils as s_test +import synapse.tools.apikey as s_t_apikey + +async def getApiKeyByName(core, name): + keys = {k.get('name'): k async for k in core.getApiKeys()} + return keys.get(name) + +class ApiKeyTest(s_test.SynTest): + + async def test_tools_apikey(self): + async with self.getTestCore() as core: + + await core.auth.addUser('blackout') + + rooturl = core.getLocalUrl() + blckurl = core.getLocalUrl(user='blackout') + + # Add API keys + argv = ( + '--svcurl', rooturl, + 'add', + 'rootkey00', + '-d', '120', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + + self.isin('Successfully added API key with name=rootkey00.', str(outp)) + rootkey00 = await getApiKeyByName(core, 'rootkey00') + + self.isin(f'Iden: {rootkey00.get("iden")}', str(outp)) + self.isin(' API Key: ', str(outp)) + self.isin(' Name: rootkey00', str(outp)) + self.isin(f' Created: {s_time.repr(rootkey00.get("created"))}', str(outp)) + self.isin(f' Updated: {s_time.repr(rootkey00.get("updated"))}', str(outp)) + self.isin(f' Expires: {s_time.repr(rootkey00.get("expires"))}', str(outp)) + self.eq(rootkey00.get('expires'), rootkey00.get('created') + 120000) + + argv = ( + '--svcurl', rooturl, + 'add', + '-u', 'blackout', + 'blckkey00', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + + self.isin('Successfully added API key with name=blckkey00.', str(outp)) + blckkey00 = await getApiKeyByName(core, 'blckkey00') + + self.isin(f'Iden: {blckkey00.get("iden")}', str(outp)) + self.isin(' API Key: ', str(outp)) + self.isin(' Name: blckkey00', str(outp)) + self.isin(f' Created: {s_time.repr(blckkey00.get("created"))}', str(outp)) + self.isin(f' Updated: {s_time.repr(blckkey00.get("updated"))}', str(outp)) + self.notin(' Expires: ', str(outp)) + + argv = ( + '--svcurl', blckurl, + 'add', + 'blckkey01', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + + self.isin('Successfully added API key with name=blckkey01.', str(outp)) + blckkey01 = await getApiKeyByName(core, 'blckkey01') + + self.isin(f'Iden: {blckkey01.get("iden")}', str(outp)) + self.isin(' API Key: ', str(outp)) + self.isin(' Name: blckkey01', str(outp)) + self.isin(f' Created: {s_time.repr(blckkey01.get("created"))}', str(outp)) + self.isin(f' Updated: {s_time.repr(blckkey01.get("updated"))}', str(outp)) + self.notin(' Expires: ', str(outp)) + + # List API keys + argv = ( + '--svcurl', rooturl, + 'list', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + + self.isin(f'Iden: {rootkey00.get("iden")}', str(outp)) + self.notin(' API Key: ', str(outp)) + self.isin(' Name: rootkey00', str(outp)) + self.isin(f' Created: {s_time.repr(rootkey00.get("created"))}', str(outp)) + self.isin(f' Updated: {s_time.repr(rootkey00.get("updated"))}', str(outp)) + self.isin(f' Expires: {s_time.repr(rootkey00.get("expires"))}', str(outp)) + self.eq(rootkey00.get('expires'), rootkey00.get('created') + 120000) + + argv = ( + '--svcurl', rooturl, + 'list', + '-u', 'blackout', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + + self.isin(f'Iden: {blckkey00.get("iden")}', str(outp)) + self.notin(' API Key: ', str(outp)) + self.isin(' Name: blckkey00', str(outp)) + self.isin(f' Created: {s_time.repr(blckkey00.get("created"))}', str(outp)) + self.isin(f' Updated: {s_time.repr(blckkey00.get("updated"))}', str(outp)) + self.notin(' Expires: ', str(outp)) + + self.isin(f'Iden: {blckkey01.get("iden")}', str(outp)) + self.notin(' API Key: ', str(outp)) + self.isin(' Name: blckkey01', str(outp)) + self.isin(f' Created: {s_time.repr(blckkey01.get("created"))}', str(outp)) + self.isin(f' Updated: {s_time.repr(blckkey01.get("updated"))}', str(outp)) + self.notin(' Expires: ', str(outp)) + + argv = ( + '--svcurl', blckurl, + 'list', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + + self.isin(f'Iden: {blckkey00.get("iden")}', str(outp)) + self.notin(' API Key: ', str(outp)) + self.isin(' Name: blckkey00', str(outp)) + self.isin(f' Created: {s_time.repr(blckkey00.get("created"))}', str(outp)) + self.isin(f' Updated: {s_time.repr(blckkey00.get("updated"))}', str(outp)) + self.notin(' Expires: ', str(outp)) + + self.isin(f'Iden: {blckkey01.get("iden")}', str(outp)) + self.notin(' API Key: ', str(outp)) + self.isin(' Name: blckkey01', str(outp)) + self.isin(f' Created: {s_time.repr(blckkey01.get("created"))}', str(outp)) + self.isin(f' Updated: {s_time.repr(blckkey01.get("updated"))}', str(outp)) + self.notin(' Expires: ', str(outp)) + + # Delete API keys + rootiden00 = rootkey00.get('iden') + argv = ( + '--svcurl', rooturl, + 'del', + rootiden00, + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + self.isin(f'Successfully deleted API key with iden={rootiden00}.', str(outp)) + + blckiden00 = blckkey00.get('iden') + argv = ( + '--svcurl', rooturl, + 'del', + blckiden00, + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + self.isin(f'Successfully deleted API key with iden={blckiden00}.', str(outp)) + + blckiden01 = blckkey01.get('iden') + argv = ( + '--svcurl', blckurl, + 'del', + blckiden01, + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + self.isin(f'Successfully deleted API key with iden={blckiden01}.', str(outp)) + + # List API keys again + argv = ( + '--svcurl', rooturl, + 'list', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + self.isin('No API keys found.', str(outp)) + + argv = ( + '--svcurl', rooturl, + 'list', + '-u', 'blackout', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + self.isin('No API keys found.', str(outp)) + + argv = ( + '--svcurl', blckurl, + 'list', + ) + outp = s_output.OutPutStr() + self.eq(0, await s_t_apikey.main(argv, outp=outp)) + self.isin('No API keys found.', str(outp)) + + # Check errors + argv = ( + '--svcurl', rooturl, + 'list', + '-u', 'newp', + ) + outp = s_output.OutPutStr() + self.eq(1, await s_t_apikey.main(argv, outp=outp)) + self.isin('ERROR: NoSuchUser: No user named newp.', str(outp)) + + argv = ( + '--svcurl', blckurl, + 'list', + '-u', 'root', + ) + outp = s_output.OutPutStr() + self.eq(1, await s_t_apikey.main(argv, outp=outp)) + self.isin('ERROR: AuthDeny: getUserInfo denied for non-admin and non-self', str(outp)) + + newpiden = s_common.guid() + argv = ( + '--svcurl', rooturl, + 'del', + newpiden, + ) + outp = s_output.OutPutStr() + self.eq(1, await s_t_apikey.main(argv, outp=outp)) + self.isin(f"ERROR: NoSuchIden: User API key with iden='{newpiden}' does not exist.", str(outp)) diff --git a/synapse/tests/test_tools_storm.py b/synapse/tests/test_tools_storm.py index cd192319258..6f6a02c9fd2 100644 --- a/synapse/tests/test_tools_storm.py +++ b/synapse/tests/test_tools_storm.py @@ -1,4 +1,9 @@ import os +import sys +import signal +import asyncio +import multiprocessing + import synapse.tests.utils as s_test from prompt_toolkit.document import Document @@ -6,10 +11,49 @@ import synapse.exc as s_exc import synapse.common as s_common +import synapse.telepath as s_telepath + +import synapse.lib.coro as s_coro import synapse.lib.output as s_output import synapse.lib.msgpack as s_msgpack import synapse.tools.storm as s_t_storm +def run_cli_till_print(url, evt1): + ''' + Run the stormCLI until we get a print mesg then set the event. + + This is a Process target. + ''' + async def main(): + outp = s_output.OutPutStr() # Capture output instead of sending it to stdout + async with await s_telepath.openurl(url) as proxy: + async with await s_t_storm.StormCli.anit(proxy, outp=outp) as scli: + cmdqueue = asyncio.Queue() + await cmdqueue.put('while (true) { $lib.print(go) $lib.time.sleep(1) }') + await cmdqueue.put('!quit') + + async def fake_prompt(): + return await cmdqueue.get() + + scli.prompt = fake_prompt + + d = {'evt1': False} + async def onmesg(event): + if d.get('evt1'): + return + mesg = event[1].get('mesg') + if mesg[0] != 'print': + return + evt1.set() + d['evt1'] = True + + with scli.onWith('storm:mesg', onmesg): + await scli.addSignalHandlers() + await scli.runCmdLoop() + + asyncio.run(main()) + sys.exit(137) + class StormCliTest(s_test.SynTest): async def test_tools_storm(self): @@ -351,3 +395,54 @@ async def get_completions(text): ), vals ) + + async def test_storm_cmdloop_interrupt(self): + ''' + Test interrupting a long-running query in the command loop + ''' + async with self.getTestCore() as core: + + async with core.getLocalProxy() as proxy: + + outp = s_test.TstOutPut() + async with await s_t_storm.StormCli.anit(proxy, outp=outp) as scli: + + cmdqueue = asyncio.Queue() + await cmdqueue.put('while (true) { $lib.time.sleep(1) }') + await cmdqueue.put('!quit') + + async def fake_prompt(): + return await cmdqueue.get() + scli.prompt = fake_prompt + + cmdloop_task = asyncio.create_task(scli.runCmdLoop()) + await asyncio.sleep(0.1) + + if scli.cmdtask is not None: + scli.cmdtask.cancel() + + await cmdloop_task + + outp.expect('') + outp.expect('o/') + self.true(scli.isfini) + + async def test_storm_cmdloop_sigint(self): + ''' + Test interrupting a long-running query in the command loop with a process target and SIGINT. + ''' + + async with self.getTestCore() as core: + url = core.getLocalUrl() + + ctx = multiprocessing.get_context('spawn') + + evt1 = ctx.Event() + + proc = ctx.Process(target=run_cli_till_print, args=(url, evt1,)) + proc.start() + + self.true(await s_coro.executor(evt1.wait, timeout=30)) + os.kill(proc.pid, signal.SIGINT) + proc.join(timeout=30) + self.eq(proc.exitcode, 137) diff --git a/synapse/tests/test_utils_getrefs.py b/synapse/tests/test_utils_getrefs.py index 6bb1c0ec6bd..246467bbea9 100644 --- a/synapse/tests/test_utils_getrefs.py +++ b/synapse/tests/test_utils_getrefs.py @@ -24,7 +24,7 @@ def getVcr(self): cm = myvcr.use_cassette(fp) return cm - async def test_basics(self): + def test_basics(self): args = s_getrefs.parse_args([ s_data.path('attack-flow', 'attack-flow-schema-2.0.0.json') diff --git a/synapse/tools/apikey.py b/synapse/tools/apikey.py new file mode 100644 index 00000000000..d9fa1682bbc --- /dev/null +++ b/synapse/tools/apikey.py @@ -0,0 +1,93 @@ +import sys +import asyncio +import argparse + +import synapse.exc as s_exc +import synapse.telepath as s_telepath + +import synapse.lib.time as s_time +import synapse.lib.output as s_output + +descr = ''' +Add, list, or delete user API keys from a Synapse service. +''' + +def printkey(outp, info, apikey=None): + iden = info.get('iden') + name = info.get('name') + created = info.get('created') + updated = info.get('updated') + expires = info.get('expires') + + outp.printf(f'Iden: {iden}') + if apikey: + outp.printf(f' API Key: {apikey}') + outp.printf(f' Name: {name}') + outp.printf(f' Created: {s_time.repr(created)}') + outp.printf(f' Updated: {s_time.repr(updated)}') + if expires: + outp.printf(f' Expires: {s_time.repr(expires)}') + + outp.printf('') + +async def main(argv, outp=s_output.stdout): + + pars = argparse.ArgumentParser(prog='apikey', description=descr) + pars.add_argument('--svcurl', default='cell:///vertex/storage', help='The telepath URL of the Synapse service.') + + subpars = pars.add_subparsers(dest='action', required=True) + + addp = subpars.add_parser('add', help='Add a user API key.') + addp.add_argument('-d', '--duration', type=int, help='The duration of the API key in seconds.') + addp.add_argument('-u', '--username', type=str, help='The username to add an API key to (restricted to admins).') + addp.add_argument('name', help='The name of the API key to add.') + + listp = subpars.add_parser('list', help='List user API keys.') + listp.add_argument('-u', '--username', type=str, help='The username to list API keys for (restricted to admins).') + + delp = subpars.add_parser('del', help='Delete a user API key.') + delp.add_argument('iden', help='The iden of the API key to delete.') + + opts = pars.parse_args(argv) + + async with s_telepath.withTeleEnv(): + + async with await s_telepath.openurl(opts.svcurl) as cell: + + try: + useriden = None + if opts.action in ('add', 'list') and opts.username: + user = await cell.getUserInfo(opts.username) + useriden = user.get('iden') + + if opts.action == 'add': + if (duration := opts.duration) is not None: + # Convert from seconds to milliseconds + duration *= 1000 + + apikey, info = await cell.addUserApiKey(opts.name, duration=duration, useriden=useriden) + outp.printf(f'Successfully added API key with name={opts.name}.') + printkey(outp, info, apikey) + + elif opts.action == 'del': + await cell.delUserApiKey(opts.iden) + outp.printf(f'Successfully deleted API key with iden={opts.iden}.') + + elif opts.action == 'list': + apikeys = await cell.listUserApiKeys(useriden=useriden) + if not apikeys: + outp.printf('No API keys found.') + return 0 + + for info in apikeys: + printkey(outp, info) + + except s_exc.SynErr as exc: + mesg = exc.get('mesg') + outp.printf(f'ERROR: {exc.__class__.__name__}: {mesg}') + return 1 + + return 0 + +if __name__ == '__main__': # pragma: no cover + sys.exit(asyncio.run(main(sys.argv[1:]))) diff --git a/synapse/utils/getrefs.py b/synapse/utils/getrefs.py index 44619cb7457..a5ea5a5742b 100644 --- a/synapse/utils/getrefs.py +++ b/synapse/utils/getrefs.py @@ -1,11 +1,12 @@ import sys import json import urllib +import asyncio import logging import pathlib import argparse -import requests +import aiohttp import synapse.exc as s_exc import synapse.data as s_data @@ -20,7 +21,13 @@ def download_refs_handler(uri): This function downloads the JSON schema at the given URI, parses the given URI to get the path component, and then saves the referenced schema to the 'jsonschemas' directory of synapse.data. + + This function runs its own asyncio loop for each URI being requested. ''' + ret = asyncio.run(_download_refs_handler(uri)) + return ret + +async def _download_refs_handler(uri): try: parts = urllib.parse.urlparse(uri) @@ -45,8 +52,12 @@ def download_refs_handler(uri): # Get the data from the interwebs logger.info(f'Downloading schema from {uri}.') - resp = requests.get(uri) - data = resp.json() + async with aiohttp.ClientSession() as session: + async with session.get(uri) as resp: + resp.raise_for_status() + buf = await resp.read() + + data = json.loads(buf.decode()) # Save the json schema to disk with filepath.open('w') as fp: diff --git a/synapse/vendor/cpython/lib/http/__init__.py b/synapse/vendor/cpython/lib/http/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/synapse/vendor/cpython/lib/http/cookies.py b/synapse/vendor/cpython/lib/http/cookies.py new file mode 100644 index 00000000000..93243453dc4 --- /dev/null +++ b/synapse/vendor/cpython/lib/http/cookies.py @@ -0,0 +1,59 @@ +############################################################################## +# Taken from the cpython 3.11 source branch after the 3.11.10 release. +############################################################################## +#### +# Copyright 2000 by Timothy O'Malley +# +# All Rights Reserved +# +# Permission to use, copy, modify, and distribute this software +# and its documentation for any purpose and without fee is hereby +# granted, provided that the above copyright notice appear in all +# copies and that both that copyright notice and this permission +# notice appear in supporting documentation, and that the name of +# Timothy O'Malley not be used in advertising or publicity +# pertaining to distribution of the software without specific, written +# prior permission. +# +# Timothy O'Malley DISCLAIMS ALL WARRANTIES WITH REGARD TO THIS +# SOFTWARE, INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY +# AND FITNESS, IN NO EVENT SHALL Timothy O'Malley BE LIABLE FOR +# ANY SPECIAL, INDIRECT OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES +# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, +# WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS +# ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR +# PERFORMANCE OF THIS SOFTWARE. +# + +# +# Import our required modules +# +import re + +_unquote_sub = re.compile(r'\\(?:([0-3][0-7][0-7])|(.))').sub + +def _unquote_replace(m): + if m[1]: + return chr(int(m[1], 8)) + else: + return m[2] + +def _unquote(str): + # If there aren't any doublequotes, + # then there can't be any special characters. See RFC 2109. + if str is None or len(str) < 2: + return str + if str[0] != '"' or str[-1] != '"': + return str + + # We have to assume that we must decode this string. + # Down to work. + + # Remove the "s + str = str[1:-1] + + # Check for special sequences. Examples: + # \012 --> \n + # \" --> " + # + return _unquote_sub(_unquote_replace, str) diff --git a/synapse/vendor/cpython/lib/test/test_http_cookies.py b/synapse/vendor/cpython/lib/test/test_http_cookies.py new file mode 100644 index 00000000000..02d746f2c0e --- /dev/null +++ b/synapse/vendor/cpython/lib/test/test_http_cookies.py @@ -0,0 +1,49 @@ +############################################################################## +# Taken from the cpython 3.11 source branch after the 3.11.10 release. +# It has been modified for vendored imports and vendored test harness. +############################################################################## + +# Simple test suite for http/cookies.py + +from http import cookies + +# s_v_utils runs the monkeypatch +import synapse.vendor.utils as s_v_utils + +class CookieTests(s_v_utils.VendorTest): + + def test_unquote(self): + cases = [ + (r'a="b=\""', 'b="'), + (r'a="b=\\"', 'b=\\'), + (r'a="b=\="', 'b=='), + (r'a="b=\n"', 'b=n'), + (r'a="b=\042"', 'b="'), + (r'a="b=\134"', 'b=\\'), + (r'a="b=\377"', 'b=\xff'), + (r'a="b=\400"', 'b=400'), + (r'a="b=\42"', 'b=42'), + (r'a="b=\\042"', 'b=\\042'), + (r'a="b=\\134"', 'b=\\134'), + (r'a="b=\\\""', 'b=\\"'), + (r'a="b=\\\042"', 'b=\\"'), + (r'a="b=\134\""', 'b=\\"'), + (r'a="b=\134\042"', 'b=\\"'), + ] + for encoded, decoded in cases: + with self.subTest(encoded): + C = cookies.SimpleCookie() + C.load(encoded) + self.assertEqual(C['a'].value, decoded) + + def test_unquote_large(self): + n = 10**6 + for encoded in r'\\', r'\134': + with self.subTest(encoded): + data = 'a="b=' + encoded * n + ';"' + C = cookies.SimpleCookie() + C.load(data) + value = C['a'].value + self.assertEqual(value[:3], 'b=\\') + self.assertEqual(value[-2:], '\\;') + self.assertEqual(len(value), n + 3)