Skip to content

Commit

Permalink
Merge branch 'master' into visi-aha-gather
Browse files Browse the repository at this point in the history
  • Loading branch information
OCBender authored Jan 3, 2025
2 parents ec74235 + 746605b commit 50cf5a0
Show file tree
Hide file tree
Showing 35 changed files with 662 additions and 198 deletions.
5 changes: 5 additions & 0 deletions changes/34ba15e2aef8b5c6be4d0274db884f84.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
desc: Refactored backup streaming logic and error handling.
prs: []
type: feat
...
5 changes: 5 additions & 0 deletions changes/5433695fd5f2187f2cd9f7a4ba0ae4d9.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
desc: Fixed SIGINT handling in the ``synapse.tools.storm`` CLI tool.
prs: []
type: bug
...
7 changes: 7 additions & 0 deletions changes/9d85121addff53793ad88883586796cc.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
---
desc: Updated the Storm ``parallel`` command behavior to avoid creating empty pipelines
when there are fewer inbound nodes than the number of pipelines specified by the
``--size`` argument.
prs: []
type: feat
...
5 changes: 5 additions & 0 deletions changes/bd45773cff22d0cf547370e90918ba58.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
desc: Added a patch for Python ``http.cookies`` module to address CVE-2024-7592 exposure.
prs: []
type: bug
...
6 changes: 6 additions & 0 deletions changes/e25248c44d5ba33a43ae937a0bb25fd5.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
---
desc: Fixed an issue where certain exceptions raised while calling a function in Storm
were not providing appropriate details about the origin of the exception.
prs: []
type: bug
...
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ dependencies = [
'aiohttp-socks>=0.9.0,<0.10.0',
'aioimaplib>=1.1.0,<1.2.0',
'aiosmtplib>=3.0.0,<3.1.0',
'prompt-toolkit>=3.0.4,<3.1.0',
'prompt_toolkit>=3.0.29,<3.1.0',
'lark==1.2.2',
'Pygments>=2.7.4,<2.18.0',
'packaging>=20.0,<25.0',
Expand Down
2 changes: 1 addition & 1 deletion requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ aiohttp>=3.10.0,<4.0
aiohttp-socks>=0.9.0,<0.10.0
aioimaplib>=1.1.0,<1.2.0
aiosmtplib>=3.0.0,<3.1.0
prompt-toolkit>=3.0.4,<3.1.0
prompt_toolkit>=3.0.29,<3.1.0
lark==1.2.2
Pygments>=2.7.4,<2.18.0
fastjsonschema>=2.18.0,<2.20.0
Expand Down
15 changes: 15 additions & 0 deletions synapse/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@
import contextlib
import collections

import http.cookies

import yaml
import regex

Expand All @@ -38,6 +40,8 @@
import synapse.lib.structlog as s_structlog

import synapse.vendor.cpython.lib.ipaddress as ipaddress
import synapse.vendor.cpython.lib.http.cookies as v_cookies


try:
from yaml import CSafeLoader as Loader
Expand Down Expand Up @@ -1218,6 +1222,17 @@ def trimText(text: str, n: int = 256, placeholder: str = '...') -> str:
assert n > plen
return f'{text[:mlen]}{placeholder}'

def _patch_http_cookies():
'''
Patch stdlib http.cookies._unquote from the 3.11.10 implementation if
the interpreter we are using is not patched for CVE-2024-7592.
'''
if not hasattr(http.cookies, '_QuotePatt'):
return
http.cookies._unquote = v_cookies._unquote

_patch_http_cookies()

# TODO: Switch back to using asyncio.wait_for when we are using py 3.12+
# This is a workaround for a race where asyncio.wait_for can end up
# ignoring cancellation https://github.com/python/cpython/issues/86296
Expand Down
34 changes: 16 additions & 18 deletions synapse/cortex.py
Original file line number Diff line number Diff line change
Expand Up @@ -2240,7 +2240,7 @@ async def _onSetStormCmd(self, cdef):
'''
name = cdef.get('name')
await self._setStormCmd(cdef)
self._setStormCmd(cdef)
self.cmddefs.set(name, cdef)

async def _reqStormCmd(self, cdef):
Expand Down Expand Up @@ -2483,7 +2483,7 @@ def filtercmpr(sode):
async for sodes in self._mergeSodes(layers, genrs, cmprkey_indx, filtercmpr, reverse=reverse):
yield sodes

async def _setStormCmd(self, cdef):
def _setStormCmd(self, cdef):
'''
Note:
No change control or persistence
Expand Down Expand Up @@ -2543,13 +2543,9 @@ def getStorNode(form):
name = cdef.get('name')
self.stormcmds[name] = ctor

await self.fire('core:cmd:change', cmd=name, act='add')

async def _popStormCmd(self, name):
def _popStormCmd(self, name):
self.stormcmds.pop(name, None)

await self.fire('core:cmd:change', cmd=name, act='del')

async def delStormCmd(self, name):
'''
Remove a previously set pure storm command.
Expand All @@ -2575,8 +2571,6 @@ async def _delStormCmd(self, name):
self.cmddefs.pop(name)
self.stormcmds.pop(name, None)

await self.fire('core:cmd:change', cmd=name, act='del')

async def addStormPkg(self, pkgdef, verify=False):
'''
Add the given storm package to the cortex.
Expand Down Expand Up @@ -2630,11 +2624,11 @@ async def _addStormPkg(self, pkgdef):
olddef = self.pkgdefs.get(name, None)
if olddef is not None:
if s_hashitem.hashitem(pkgdef) != s_hashitem.hashitem(olddef):
await self._dropStormPkg(olddef)
self._dropStormPkg(olddef)
else:
return

await self.loadStormPkg(pkgdef)
self.loadStormPkg(pkgdef)
self.pkgdefs.set(name, pkgdef)

self._clearPermDefs()
Expand Down Expand Up @@ -2664,7 +2658,7 @@ async def _delStormPkg(self, name):
if pkgdef is None:
return

await self._dropStormPkg(pkgdef)
self._dropStormPkg(pkgdef)

self._clearPermDefs()

Expand Down Expand Up @@ -2713,7 +2707,7 @@ def getDataModel(self):
async def _tryLoadStormPkg(self, pkgdef):
try:
await self._normStormPkg(pkgdef, validstorm=False)
await self.loadStormPkg(pkgdef)
self.loadStormPkg(pkgdef)

except asyncio.CancelledError: # pragma: no cover TODO: remove once >= py 3.8 only
raise
Expand Down Expand Up @@ -2881,7 +2875,9 @@ async def _normStormPkg(self, pkgdef, validstorm=True):
for configvar in pkgdef.get('configvars', ()):
self._reqStormPkgVarType(pkgname, configvar.get('type'))

async def loadStormPkg(self, pkgdef):
# N.B. This function is intentionally not async in order to prevent possible user race conditions for code
# executing outside of the nexus lock.
def loadStormPkg(self, pkgdef):
'''
Load a storm package into the storm library for this cortex.
Expand Down Expand Up @@ -2911,7 +2907,7 @@ async def loadStormPkg(self, pkgdef):
self.stormmods = stormmods

for cdef in cmds:
await self._setStormCmd(cdef)
self._setStormCmd(cdef)

for gdef in pkgdef.get('graphs', ()):
gdef = copy.deepcopy(gdef)
Expand All @@ -2937,7 +2933,9 @@ async def _onload():
await self.fire('core:pkg:onload:complete', pkg=name)
self.schedCoro(_onload())

async def _dropStormPkg(self, pkgdef):
# N.B. This function is intentionally not async in order to prevent possible user race conditions for code
# executing outside of the nexus lock.
def _dropStormPkg(self, pkgdef):
'''
Reverse the process of loadStormPkg()
'''
Expand All @@ -2948,7 +2946,7 @@ async def _dropStormPkg(self, pkgdef):

for cdef in pkgdef.get('commands', ()):
name = cdef.get('name')
await self._popStormCmd(name)
self._popStormCmd(name)

pkgname = pkgdef.get('name')

Expand Down Expand Up @@ -4435,7 +4433,7 @@ async def _initPureStormCmds(self):

async def _trySetStormCmd(self, name, cdef):
try:
await self._setStormCmd(cdef)
self._setStormCmd(cdef)
except (asyncio.CancelledError, Exception):
logger.exception(f'Storm command load failed: {name}')

Expand Down
7 changes: 6 additions & 1 deletion synapse/exc.py
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,14 @@ def setdefault(self, name, valu):
self.errinfo[name] = valu
self._setExcMesg()

def update(self, items: dict):
'''Update multiple items in the errinfo dict at once.'''
self.errinfo.update(**items)
self._setExcMesg()

class StormRaise(SynErr):
'''
This represents a user provided exception inside of a Storm runtime. It requires a errname key.
This represents a user provided exception raised in the Storm runtime. It requires a errname key.
'''
def __init__(self, *args, **info):
SynErr.__init__(self, *args, **info)
Expand Down
36 changes: 25 additions & 11 deletions synapse/lib/ast.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ def getPosInfo(self):

def addExcInfo(self, exc):
if 'highlight' not in exc.errinfo:
exc.errinfo['highlight'] = self.getPosInfo()
exc.set('highlight', self.getPosInfo())
return exc

def repr(self):
Expand Down Expand Up @@ -3579,10 +3579,23 @@ async def compute(self, runt, path):
kwargs = {k: v for (k, v) in await self.kids[2].compute(runt, path)}

with s_scope.enter({'runt': runt}):
retn = func(*argv, **kwargs)
if s_coro.iscoro(retn):
return await retn
return retn
try:
retn = func(*argv, **kwargs)
if s_coro.iscoro(retn):
return await retn
return retn

except TypeError as e:
mesg = str(e)
if (funcpath := getattr(func, '_storm_funcpath', None)) is not None:
mesg = f"{funcpath}(){mesg.split(')', 1)[1]}"

raise self.addExcInfo(s_exc.StormRuntimeError(mesg=mesg))

except s_exc.SynErr as e:
if getattr(func, '_storm_runtime_lib_func', None) is not None:
e.errinfo.pop('highlight', None)
raise self.addExcInfo(e)

class DollarExpr(Value):
'''
Expand Down Expand Up @@ -4891,8 +4904,9 @@ async def once():

@s_stormtypes.stormfunc(readonly=True)
async def realfunc(*args, **kwargs):
return await self.callfunc(runt, argdefs, args, kwargs)
return await self.callfunc(runt, argdefs, args, kwargs, realfunc._storm_funcpath)

realfunc._storm_funcpath = self.name
await runt.setVar(self.name, realfunc)

count = 0
Expand All @@ -4914,7 +4928,7 @@ def validate(self, runt):
# var scope validation occurs in the sub-runtime
pass

async def callfunc(self, runt, argdefs, args, kwargs):
async def callfunc(self, runt, argdefs, args, kwargs, funcpath):
'''
Execute a function call using the given runtime.
Expand All @@ -4925,7 +4939,7 @@ async def callfunc(self, runt, argdefs, args, kwargs):

argcount = len(args) + len(kwargs)
if argcount > len(argdefs):
mesg = f'{self.name}() takes {len(argdefs)} arguments but {argcount} were provided'
mesg = f'{funcpath}() takes {len(argdefs)} arguments but {argcount} were provided'
raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))

# Fill in the positional arguments
Expand All @@ -4939,7 +4953,7 @@ async def callfunc(self, runt, argdefs, args, kwargs):
valu = kwargs.pop(name, s_common.novalu)
if valu is s_common.novalu:
if defv is s_common.novalu:
mesg = f'{self.name}() missing required argument {name}'
mesg = f'{funcpath}() missing required argument {name}'
raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))
valu = defv

Expand All @@ -4950,11 +4964,11 @@ async def callfunc(self, runt, argdefs, args, kwargs):
# used a kwarg not defined.
kwkeys = list(kwargs.keys())
if kwkeys[0] in posnames:
mesg = f'{self.name}() got multiple values for parameter {kwkeys[0]}'
mesg = f'{funcpath}() got multiple values for parameter {kwkeys[0]}'
raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))

plural = 's' if len(kwargs) > 1 else ''
mesg = f'{self.name}() got unexpected keyword argument{plural}: {",".join(kwkeys)}'
mesg = f'{funcpath}() got unexpected keyword argument{plural}: {",".join(kwkeys)}'
raise self.kids[1].addExcInfo(s_exc.StormRuntimeError(mesg=mesg))

assert len(mergargs) == len(argdefs)
Expand Down
Loading

0 comments on commit 50cf5a0

Please sign in to comment.