Processing Pipelines

This documentation page describes the concepts and classes of pySigma that can be used for transformation of Sigma rules.

Sigma rules are tranformed to take care of differences between the Sigma rule and the target data model. Examples are differences in field naming schemes or value representation.

Resolvers

Pipeline resolvers resolve identifiers and file names into a consolidated processing pipeline and take care of the appropriate ordering via the priority property that should be contained in a processing pipeline.

A processing pipeline resolver is a sigma.processing.resolver.ProcessingPipelineResolver object. It is initialized with an mapping between identifiers and sigma.processing.pipeline.ProcessingPipeline objects or callables that return such objects.

The method sigma.processing.resolver.ProcessingPipelineResolver.resolve_pipeline() returns a ProcessingPipeline object corresponsing with the given identifier or contained in the specified YAML file. sigma.processing.resolver.ProcessingPipelineResolver.resolve() returns a consolidated pipeline with the appropriate ordering as specified by the priority property of the specified pipelines.

class sigma.processing.resolver.ProcessingPipelineResolver(pipelines: typing.Dict[str, typing.Union[sigma.processing.pipeline.ProcessingPipeline, typing.Callable[[], sigma.processing.pipeline.ProcessingPipeline]]] = <factory>)

A processing pipeline resolver resolves a list of pipeline specifiers into one summarized processing pipeline. It takes care of sorting by priority and resolution of filenames as well as pipeline name identifiers.

add_pipeline_class(pipeline: sigma.processing.pipeline.ProcessingPipeline) None

Add named processing pipeline object to resolver. This pipeline can be resolved by the name.

classmethod from_pipeline_list(pipelines: Iterable[sigma.processing.pipeline.ProcessingPipeline]) sigma.processing.resolver.ProcessingPipelineResolver

Instantiate processing pipeline resolver from list of pipeline objects.

resolve(pipeline_specs: List[str]) sigma.processing.pipeline.ProcessingPipeline

Resolve a list of

  • processing pipeline names from pipelines added to the resolver or

  • file paths containing processing pipeline YAML definitions

into a consolidated processing piepline.

resolve_pipeline(spec: str) sigma.processing.pipeline.ProcessingPipeline

Resolve single processing pipeline.

Processing Pipeline

Classes

class sigma.processing.pipeline.ProcessingPipeline(items: typing.List[sigma.processing.pipeline.ProcessingItem] = <factory>, vars: typing.Dict[str, typing.Any] = <factory>, priority: int = 0, name: typing.Optional[str] = None)

A processing pipeline is configured with the transformation steps that are applied on Sigma rules and are configured by:

  • a backend to apply a set of base preprocessing of Sigma rules (e.g. renaming of fields).

  • the user in one or multiple configurations to conduct further rule transformation to adapt the rule to the environment.

A processing pipeline is instantiated once for a rule collection. Rules are processed in order of their appearance in a rule file or include order. Further, processing pipelines can be chained and contain variables that can be used from processing items.

apply(rule: sigma.rule.SigmaRule) sigma.rule.SigmaRule

Apply processing pipeline on Sigma rule.

classmethod from_dict(d: dict) sigma.processing.pipeline.ProcessingPipeline

Instantiate processing pipeline from a parsed processing item description.

classmethod from_yaml(processing_pipeline: str) sigma.processing.pipeline.ProcessingPipeline

Convert YAML input string into processing pipeline.

class sigma.processing.pipeline.ProcessingItem(transformation: sigma.processing.transformations.Transformation, rule_condition_linking: typing.Callable[[typing.Iterable[bool]], bool] = <built-in function all>, rule_condition_negation: bool = False, rule_conditions: typing.List[sigma.processing.conditions.RuleProcessingCondition] = <factory>, detection_item_condition_linking: typing.Callable[[typing.Iterable[bool]], bool] = <built-in function all>, detection_item_condition_negation: bool = False, detection_item_conditions: typing.List[sigma.processing.conditions.DetectionItemProcessingCondition] = <factory>, identifier: typing.Optional[str] = None)

A processing item consists of an optional condition and a transformation that is applied in the case that the condition evaluates to true against the given Sigma rule or if the condition is not present.

Processing items are instantiated by the processing pipeline for a whole collection that is about to be converted by a backend.

apply(pipeline: sigma.processing.pipeline.ProcessingPipeline, rule: sigma.rule.SigmaRule) Tuple[sigma.rule.SigmaRule, bool]

Matches condition against rule and performs transformation if condition is true or not present. Returns Sigma rule and bool if transformation was applied.

detection_item_condition_linking()

Return True if bool(x) is True for all values x in the iterable.

If the iterable is empty, return True.

classmethod from_dict(d: dict)

Instantiate processing item from parsed definition and variables.

match_detection_item(pipeline: sigma.processing.pipeline.ProcessingPipeline, detection_item: sigma.rule.SigmaDetectionItem) bool

Evalutates detection item conditions from processing item to detection item and returns result.

rule_condition_linking()

Return True if bool(x) is True for all values x in the iterable.

If the iterable is empty, return True.

Specifying Processing Pipelines as YAML

A processing pipeline can be specified as YAML file that can be loaded with ProcessingPipeline.from_yaml(yaml) or by specifying a filename to ProcessingPipelineResolver.resolve() or ProcessingPipelineResolver.resolve_pipeline().

The following items are expected on the root level of the YAML file:

  • name: the name of the pipeline.

  • priority: specifies the ordering of the pipeline in case multiple pipelines are concatenated. Lower priorities are used first.

  • transformations: contains a list of transformation items.

Some conventions used for processing pipeline priorities are:

Priority

Description

10

Log source pipelines like for Sysmon.

20

Pipelines provided by backend packages that should be run before the backend pipeline.

50

Backend pipelines that are integrated in the backend and applied automatically.

60

Backend output format pipelines that are integrated in the backend and applied automatically for the asscoiated output format.

Pipelines with the same priority are applied in the order they were provided. Pipelines without a priority are assumed to have the priority 0.

Transformation items are defined as a map as follows:

  • id: the identifier of the item. This is also tracked at detection item or condition level and can be used in future conditions.

  • type: the type of the transformation as specified in the identifier to class mappings below: Transformations

  • Arbitrary transformation parameters are specified at the samle level.

  • rule_conditions or detection_item_conditions: conditions of the type corresponding to the name.

Conditions are specified as follows:

  • type: defines the condition type. It must be one of the identifiers that are defined in Conditions

  • Arbitrary conditions parameters are specified on the same level.

Example:

name: Custom Sysmon field naming
priority: 100
transformations:
- id: field_mapping
    type: field_name_mapping
    mapping:
        CommandLine: command_line
    rule_conditions:
    - type: logsource
        service: sysmon

Conditions

There are two types of conditions: rule conditions which are evaluated to the whole rule and detection item conditions that are evaluated for each detection item.

Rule Conditions

Detection Item Identifiers

Identifier

Class

logosurce

LogsourceCondition

contains_detection_item

RuleContainsDetectionItemCondition

processing_item_applied

RuleProcessingItemAppliedCondition

class sigma.processing.conditions.LogsourceCondition(category: Optional[str] = None, product: Optional[str] = None, service: Optional[str] = None)

Matches log source on rule. Not specified log source fields are ignored.

class sigma.processing.conditions.RuleContainsDetectionItemCondition(field: Optional[str], value: Union[str, int, float, bool])

Returns True if rule contains a detection item that matches the given field name and value.

class sigma.processing.conditions.RuleProcessingItemAppliedCondition(processing_item_id: str)

Checks if processing item was applied to rule.

Detection Item Conditions

Detection Item Identifiers

Identifier

Class

include_fields

IncludeFieldCondition

exclude_fields

ExcludeFieldCondition

match_string

MatchStringCondition

processing_item_applied

DetectionItemProcessingItemAppliedCondition

class sigma.processing.conditions.IncludeFieldCondition(fields: List[str], type: Literal['plain', 're'] = 'plain')

Matches on field name if it is contained in fields list. The parameter ‘type’ determines if field names are matched as plain string (“plain”) or regular expressions (“re”).

class sigma.processing.conditions.ExcludeFieldCondition(fields: List[str], type: Literal['plain', 're'] = 'plain')

Matches on field name if it is not contained in fields list.

class sigma.processing.conditions.MatchStringCondition(cond: Literal['any', 'all'], pattern: str, negate: bool = False)

Match string values with a regular expression ‘pattern’. The parameter ‘cond’ determines for detection items with multiple values if any or all strings must match. Generally, values which aren’t strings are skipped in any mode or result in a false result in all match mode.

class sigma.processing.conditions.DetectionItemProcessingItemAppliedCondition(processing_item_id: str)

Checks if processing item was applied to detection item.

Base Classes

Base classes must be overridden to implement new conditions that can be used in processing pipelines. In addition, the new class should be mapped to an identifier. This allows to use the condition from processing pipelines defined in YAML files. The mapping is done in the dict rule_conditions or detection_item_conditions in the sigma.processing.conditions package for the respective condition types. This is not necessary for conditions that should be uses privately and not be distributed via the main pySigma distribution.

class sigma.processing.conditions.RuleProcessingCondition

Base for Sigma rule processing condition classes used in processing pipelines.

class sigma.processing.conditions.DetectionItemProcessingCondition

Base for Sigma detection item processing condition classes used in processing pipelines.

class sigma.processing.conditions.ValueProcessingCondition(cond: Literal['any', 'all'])

Base class for conditions on values in detection items. The ‘cond’ parameter determines if any or all values of a multivalued detection item must match to result in an overall match.

The method match_value is called for each value and must return a bool result. It should reject values which are incompatible with the condition with a False return value.

Transformations

Implemented Transformations

The following transformations with their corresponding identifiers for usage in YAML-based pipeline definitions are available:

Detection Item Identifiers

Identifier

Class

field_name_mapping

FieldMappingTransformation

field_name_suffix

AddFieldnameSuffixTransformation

field_name_prefix

AddFieldnamePrefixTransformation

drop_detection_item

DropDetectionItemTransformation

wildcard_placeholders

WildcardPlaceholderTransformation

value_placeholders

ValueListPlaceholderTransformation

query_expression_placeholders

QueryExpressionPlaceholderTransformation

add_condition

AddConditionTransformation

change_logsource

ChangeLogsourceTransformation

replace_string

ReplaceStringTransformation

rule_failure

RuleFailureTransformation

detection_item_failure

DetectionItemFailureTransformation

class sigma.processing.transformations.FieldMappingTransformation(mapping: Dict[str, Union[str, List[str]]])

Map a field name to one or multiple different.

class sigma.processing.transformations.AddFieldnameSuffixTransformation(suffix: str)

Add field name suffix.

class sigma.processing.transformations.AddFieldnamePrefixTransformation(prefix: str)

Add field name prefix.

class sigma.processing.transformations.DropDetectionItemTransformation

Deletes detection items. This should only used in combination with a detection item condition.

class sigma.processing.transformations.WildcardPlaceholderTransformation(include: Optional[List[str]] = None, exclude: Optional[List[str]] = None)

Replaces placeholders with wildcards. This transformation is useful if remaining placeholders should be replaced with something meaningful to make conversion of rules possible without defining the placeholders content.

class sigma.processing.transformations.ValueListPlaceholderTransformation(include: Optional[List[str]] = None, exclude: Optional[List[str]] = None)

Replaces placeholders with values contained in variables defined in the configuration.

class sigma.processing.transformations.QueryExpressionPlaceholderTransformation(include: typing.Optional[typing.List[str]] = None, exclude: typing.Optional[typing.List[str]] = None, expression: str = '', mapping: typing.Dict[str, str] = <factory>)

Replaces a placeholder with a plain query containing the placeholder or an identifier mapped from the placeholder name. The main purpose is the generation of arbitrary list lookup expressions which are passed to the resulting query.

Parameters: * expression: string that contains query expression with {field} and {id} placeholder where placeholder identifier or a mapped identifier is inserted. * mapping: Mapping between placeholders and identifiers that should be used in the expression. If no mapping is provided the placeholder name is used.

class sigma.processing.transformations.AddConditionTransformation(conditions: typing.Dict[str, str] = <factory>, name: typing.Optional[str] = None, template: bool = False)

Add and condition expression to rule conditions.

If template is set to True the condition values are interpreted as string templates and the following placeholders are replaced:

  • $category, $product and $service: with the corresponding values of the Sigma rule log source.

class sigma.processing.transformations.ChangeLogsourceTransformation(category: Optional[str] = None, product: Optional[str] = None, service: Optional[str] = None)

Replace log source as defined in transformation parameters.

class sigma.processing.transformations.ReplaceStringTransformation(regex: str, replacement: str)

Replace string part matched by regular expresssion with replacement string that can reference capture groups. It operates on the plain string representation of the SigmaString value.

This is basically an interface to re.sub() and can use all features available there.

class sigma.processing.transformations.RuleFailureTransformation(message: str)

Raise a SigmaTransformationError with the provided message. This enables transformation pipelines to signalize that a certain situation can’t be handled, e.g. only a subset of values is allowed because the target data model doesn’t offers all possibilities.

class sigma.processing.transformations.DetectionItemFailureTransformation(message: str)

Raise a SigmaTransformationError with the provided message. This enables transformation pipelines to signalize that a certain situation can’t be handled, e.g. only a subset of values is allowed because the target data model doesn’t offers all possibilities.

Base Classes

There are four transformation base classes that can be derived to implement transformations on particular parts of a Sigma rule or the whole Sigma rule:

class sigma.processing.transformations.Transformation

Base class for processing steps used in pipelines. Override apply with transformation that is applied to the whole rule.

class sigma.processing.transformations.DetectionItemTransformation

Iterates over all detection items of a Sigma rule and calls the apply_detection_item method for each of them if the detection item condition associated with the processing item evaluates to true. It also takes care to recurse into detections nested into detections.

The apply_detection_item method can directly change the detection or return a replacement object, which can be a SigmaDetection or a SigmaDetectionItem.

The processing item is automatically added to the applied items of the detection items if a replacement value was returned. In the other case the apply_detection_item method must take care of this to make conditional decisions in the processing pipeline working. This can be done with the detection_item_applied() method.

A detection item transformation also marks the item as unconvertible to plain data types.

class sigma.processing.transformations.ValueTransformation

Iterates over all values in all detection items of a Sigma rule and call apply_value method for each of them. The apply_value method can return a single value or a list of values which are inserted into the value list or None if the original value should be passed through. An empty list should be returned by apply_value to drop the value from the transformed results.

class sigma.processing.transformations.ConditionTransformation

Iterates over all rule conditions and calls the apply_condition method for each condition. Automatically takes care of marking condition as applied by processing item.

Transformation Tracking

tbd