Processing Pipelines¶
This documentation page describes the concepts and classes of pySigma that can be used for transformation of Sigma rules.
Sigma rules are tranformed to take care of differences between the Sigma rule and the target data model. Examples are differences in field naming schemes or value representation.
A processing pipeline has three stages:
Rule pre-processing: transformations that are applied to the rule. Example: field name mapping, adding conditions.
Query post-processing: transformations that are applied to the generated query. In this stage the transformaions have access to the query generated from the backend and the rule that was the source of the conversion. Example: embedding query and rule parts in a template to define custom output formats.
Output finalization: finalizers operate on all post-processed queries to generate the final output. Example: merge all queries and add a header to the output.
Further resources:
SigmaHQ Blog Post: Connecting Sigma Rule Sets to your Environment with Processing Pipelines
SigmaHQ Blog Post: Introducing Query Post-Processing and Output Finalization to Processing Pipelines
Resolvers¶
Pipeline resolvers resolve identifiers and file names into a consolidated processing pipeline and take care of the appropriate ordering via the priority property that should be contained in a processing pipeline.
A processing pipeline resolver is a
sigma.processing.resolver.ProcessingPipelineResolver
object. It is initialized with an
mapping between identifiers and sigma.processing.pipeline.ProcessingPipeline
objects or
callables that return such objects.
The method sigma.processing.resolver.ProcessingPipelineResolver.resolve_pipeline()
returns a
ProcessingPipeline object corresponsing with the given identifier or contained in the specified
YAML file. sigma.processing.resolver.ProcessingPipelineResolver.resolve()
returns a consolidated
pipeline with the appropriate ordering as specified by the priority property of the specified pipelines.
- class sigma.processing.resolver.ProcessingPipelineResolver(pipelines: ~typing.Dict[str, ~sigma.processing.pipeline.ProcessingPipeline | ~typing.Callable[[], ~sigma.processing.pipeline.ProcessingPipeline]] = <factory>)¶
A processing pipeline resolver resolves a list of pipeline specifiers into one summarized processing pipeline. It takes care of sorting by priority and resolution of filenames as well as pipeline name identifiers.
- add_pipeline_class(pipeline: ProcessingPipeline) None ¶
Add named processing pipeline object to resolver. This pipeline can be resolved by the name.
- classmethod from_pipeline_list(pipelines: Iterable[ProcessingPipeline]) ProcessingPipelineResolver ¶
Instantiate processing pipeline resolver from list of pipeline objects.
- list_pipelines() Iterable[Tuple[str, ProcessingPipeline]] ¶
List identifier/processing pipeline tuples.
- resolve(pipeline_specs: List[str], target: str | None = None) ProcessingPipeline ¶
Resolve a list of
processing pipeline names from pipelines added to the resolver or
file paths containing processing pipeline YAML definitions or
directories containing processing pipelines YAML definitions
into a consolidated processing pipeline.
If target is specified this is passed in each resolve_pipeline call to perform a compatibility check for the usage of the specified backend with the pipeline.
- resolve_pipeline(spec: str, target: str | None = None) ProcessingPipeline ¶
Resolve single processing pipeline. It first tries to find a pipeline with this identifier in the registered pipelines. If this fails, spec is treated as file name. If this fails too, a SigmaPipelineNotFoundError is raised.
If target is specified, an additional check of the compatibility of the specified backend to the resolved pipeline is conducted. A SigmaPipelineNotAllowedForBackendError is raised if this check fails.
Processing Pipeline¶
Classes¶
- class sigma.processing.pipeline.ProcessingPipeline(items: ~typing.List[~sigma.processing.pipeline.ProcessingItem] = <factory>, postprocessing_items: ~typing.List[~sigma.processing.pipeline.QueryPostprocessingItem] = <factory>, finalizers: ~typing.List[~sigma.processing.finalization.Finalizer] = <factory>, vars: ~typing.Dict[str, ~typing.Any] = <factory>, priority: int = 0, name: str | None = None, allowed_backends: ~typing.FrozenSet[str] = <factory>)¶
A processing pipeline is configured with the transformation steps that are applied on Sigma rules and are configured by:
a backend to apply a set of base preprocessing of Sigma rules (e.g. renaming of fields).
the user in one or multiple configurations to conduct further rule transformation to adapt the rule to the environment.
A processing pipeline is instantiated once for a rule collection. Rules are processed in order of their appearance in a rule file or include order. Further, processing pipelines can be chained and contain variables that can be used from processing items.
- apply(rule: SigmaRule | SigmaCorrelationRule) SigmaRule | SigmaCorrelationRule ¶
Apply processing pipeline on Sigma rule.
- field_was_processed_by(field: str | None, processing_item_id: str) bool ¶
Check if field name was processed by a particular processing item.
- classmethod from_dict(d: dict) ProcessingPipeline ¶
Instantiate processing pipeline from a parsed processing item description.
- classmethod from_yaml(processing_pipeline: str) ProcessingPipeline ¶
Convert YAML input string into processing pipeline.
- postprocess_query(rule: SigmaRule | SigmaCorrelationRule, query: Any) Any ¶
Post-process queries with postprocessing_items.
- track_field_processing_items(src_field: str, dest_field: List[str], processing_item_id: str | None) None ¶
Track processing items that were applied to field names. This adds the processing_item_id to the set of applied processing items from src_field and assigns a copy of this set ass tracking set to all fields in dest_field.
- class sigma.processing.pipeline.ProcessingItem(transformation: ~sigma.processing.transformations.base.Transformation, rule_condition_linking: ~typing.Callable[[~typing.Iterable[bool]], bool] = <built-in function all>, rule_condition_negation: bool = False, rule_conditions: ~typing.List[~sigma.processing.conditions.base.RuleProcessingCondition] = <factory>, identifier: str | None = None, detection_item_condition_linking: ~typing.Callable[[~typing.Iterable[bool]], bool] = <built-in function all>, detection_item_condition_negation: bool = False, detection_item_conditions: ~typing.List[~sigma.processing.conditions.base.DetectionItemProcessingCondition] = <factory>, field_name_condition_linking: ~typing.Callable[[~typing.Iterable[bool]], bool] = <built-in function all>, field_name_condition_negation: bool = False, field_name_conditions: ~typing.List[~sigma.processing.conditions.base.FieldNameProcessingCondition] = <factory>)¶
A processing item consists of an optional condition and a transformation that is applied in the case that the condition evaluates to true against the given Sigma rule or if the condition is not present.
Processing items are instantiated by the processing pipeline for a whole collection that is about to be converted by a backend.
- apply(rule: SigmaRule | SigmaCorrelationRule) bool ¶
Matches condition against rule and performs transformation if condition is true or not present. Returns Sigma rule and bool if transformation was applied.
- detection_item_condition_linking()¶
Return True if bool(x) is True for all values x in the iterable.
If the iterable is empty, return True.
- field_name_condition_linking()¶
Return True if bool(x) is True for all values x in the iterable.
If the iterable is empty, return True.
- classmethod from_dict(d: dict)¶
Instantiate processing item from parsed definition and variables.
- match_detection_item(detection_item: SigmaDetectionItem) bool ¶
Evalutates detection item and field name conditions from processing item to detection item and returns result.
- match_field_in_value(value: SigmaType) bool ¶
Evaluate field name conditions in field reference values and return result.
- match_field_name(field: str | None) bool ¶
Evaluate field name conditions on field names and return result.
Specifying Processing Pipelines as YAML¶
A processing pipeline can be specified as YAML file that can be loaded with ProcessingPipeline.from_yaml(yaml) or by specifying a filename to ProcessingPipelineResolver.resolve() or ProcessingPipelineResolver.resolve_pipeline().
The following items are expected on the root level of the YAML file:
name: the name of the pipeline.
priority: specifies the ordering of the pipeline in case multiple pipelines are concatenated. Lower priorities are used first.
transformations: contains a list of transformation items for the rule pre-processing stage.
postprocessing: contains a list of transformation items for the query post-processing stage.
finalizers: contains a list of transformation items for the output finalization stage.
Some conventions used for processing pipeline priorities are:
Priority |
Description |
---|---|
10 |
Log source pipelines like for Sysmon. |
20 |
Pipelines provided by backend packages that should be run before the backend pipeline. |
50 |
Backend pipelines that are integrated in the backend and applied automatically. |
60 |
Backend output format pipelines that are integrated in the backend and applied automatically for the asscoiated output format. |
Pipelines with the same priority are applied in the order they were provided. Pipelines without a priority are assumed to have the priority 0.
Transformation items are defined as a map as follows:
id: the identifier of the item. This is also tracked at detection item or condition level and can be used in future conditions.
type: the type of the transformation as specified in the identifier to class mappings below: Transformations
Arbitrary transformation parameters are specified at the samle level.
rule_conditions, detection_item_conditions, field_name_conditions: conditions of the type corresponding to the name. This can be a list of unnamed conditions that are logically linked with the same operator specified in *_cond_op or named conditions that are referenced in the *_cond_expr attribute.
Conditions are specified as follows:
type: defines the condition type. It must be one of the identifiers that are defined in Conditions
rule_cond_op, detection_item_cond_op, field_name_cond_op: boolean operator for the condition result. Must be one of or or and. Defaults to and. Alternatively,
rule_cond_expr, detection_item_cond_expr, field_name_cond_expr: specify a boolean expression that references to named condition items.
rule_cond_not, detection_item_cond_not, field_name_cond_not: if set to True, the condition result is negated.
Arbitrary conditions parameters are specified on the same level.
Specification of an operator and expression is mutually exclusive.
Example:
name: Custom Sysmon field naming
priority: 100
transformations:
- id: field_mapping
type: field_name_mapping
mapping:
CommandLine: command_line
rule_conditions:
- type: logsource
service: sysmon
Conditions¶
Added in version 0.8.0: Field name conditions.
There are three types of conditions:
Rule conditions are evaluated to the whole rule. They are defined in the rule_conditions attribute of a ProcessingItem. These can be applied in the rule pre-processing stage and the query post-processing stage. These conditions are evaluated for all transformations.
Detection item conditions are evaluated for each detection item. They are defined in the detection_item_conditions attribute of a ProcessingPipeline. These can only be applied in the rule pre-processing stage. These conditions are only evaluated for transformations that operate on detection items as well as for field name transformations in the context of detection items.
Field name conditions are evaluated for field names that can be located in detection items, in the field name list of a Sigma rule and in field name references inside of values. They are defined in the field_name_conditions attribute of detection_item_conditions attribute of a ProcessingPipeline. These can only be applied in the rule pre-processing stage and are evaluated only for transformations that operate on field names.
Conditions can be specified unnamed as list that are logically linked with the operator specified in *_condition_linking attributes or named as dict that are referenced in the *_condition_expression.
In addition to the *_conditions attributes of ProcessingPipeline objects, there are further attributes that control the condition matching behavior:
rule_condition_linking, detection_item_condition_linking and field_name_condition_linking: one of any or all functions. Controls if one or all of the conditions from the list must match to result in an overall match.
rule_condition_expression, detection_item_condition_expression and field_name_condition_expression: a boolean expression that references to named condition items.
rule_condition_negation, detection_item_condition_negation and field_name_condition_negation: if set to True, the condition result is negated.
The results of the evaluatuon of different condition types are and-linked. E.g. if a processing item contains rule and field name conditions, both must evaluate to True to get the overall result of True.
Rule Conditions¶
Identifier |
Class |
---|---|
logsource |
LogsourceCondition |
contains_detection_item |
RuleContainsDetectionItemCondition |
processing_item_applied |
RuleProcessingItemAppliedCondition |
processing_state |
RuleProcessingStateCondition |
is_sigma_rule |
IsSigmaRuleCondition |
is_sigma_correlation_rule |
IsSigmaCorrelationRuleCondition |
rule_attribute |
RuleAttributeCondition |
tag |
RuleTagCondition |
- class sigma.processing.conditions.LogsourceCondition(category: str | None = None, product: str | None = None, service: str | None = None)¶
Matches log source on rule. Not specified log source fields are ignored. For Correlation rules, the condition returns true if any of the associated rules have the required log source fields.
- class sigma.processing.conditions.RuleContainsDetectionItemCondition(field: str | None, value: str | int | float | bool)¶
Returns True if rule contains a detection item that matches the given field name and value.
- class sigma.processing.conditions.RuleProcessingItemAppliedCondition(processing_item_id: str)¶
Checks if processing item was applied to rule.
- class sigma.processing.conditions.RuleProcessingStateCondition(key: str, val: str | int | float | bool, op: Literal['eq', 'ne', 'gte', 'gt', 'lte', 'lt'] = 'eq')¶
Matches on processing pipeline state.
- class sigma.processing.conditions.IsSigmaRuleCondition¶
Checks if rule is a SigmaRule.
- class sigma.processing.conditions.IsSigmaCorrelationRuleCondition¶
Checks if rule is a SigmaCorrelationRule.
- class sigma.processing.conditions.RuleAttributeCondition(attribute: str, value: str | int | float, op: Literal['eq', 'ne', 'gte', 'gt', 'lte', 'lt'] = 'eq')¶
Generic match on rule attributes with supported types:
strings (exact matches)
UUIDs (exact matches)
numbers (relations: eq, ne, gte, ge, lte, le)
dates (relations: eq, ne, gte, ge, lte, le)
Rule severity levels (relations: eq, ne, gte, ge, lte, le)
Rule statuses (relations: eq, ne, gte, ge, lte, le)
Fields that contain lists of values, maps or other complex data structures are not supported and raise a SigmaConfigurationError. If the type of the value doesn’t allows a particular relation, the condition also raises a SigmaConfigurationError on match.
- class sigma.processing.conditions.RuleTagCondition(tag: str)¶
Matches if rule is tagged with a specific tag.
Detection Item Conditions¶
Identifier |
Class |
---|---|
match_string |
MatchStringCondition |
is_null |
IsNullCondition |
processing_item_applied |
DetectionItemProcessingItemAppliedCondition |
processing_state |
DetectionItemProcessingStateCondition |
- class sigma.processing.conditions.MatchStringCondition(cond: Literal['any', 'all'], pattern: str, negate: bool = False)¶
Match string values with a regular expression ‘pattern’. The parameter ‘cond’ determines for detection items with multiple values if any or all strings must match. Generally, values which aren’t strings are skipped in any mode or result in a false result in all match mode.
- class sigma.processing.conditions.IsNullCondition(cond: Literal['any', 'all'])¶
Match null values. The parameter ‘cond’ determines for detection items with multiple values if any or all strings must match. Generally, values which aren’t strings are skipped in any mode or result in a false result in all match mode.
- class sigma.processing.conditions.DetectionItemProcessingItemAppliedCondition(processing_item_id: str)¶
Checks if processing item was applied to detection item.
- class sigma.processing.conditions.DetectionItemProcessingStateCondition(key: str, val: str | int | float | bool, op: Literal['eq', 'ne', 'gte', 'gt', 'lte', 'lt'] = 'eq')¶
Matches on processing pipeline state in context of a detection item condition.
Field Name Conditions¶
Identifier |
Class |
---|---|
include_fields |
IncludeFieldCondition |
exclude_fields |
ExcludeFieldCondition |
processing_item_applied |
FieldNameProcessingItemAppliedCondition |
processing_state |
FieldNameProcessingStateCondition |
- class sigma.processing.conditions.IncludeFieldCondition(fields: List[str], type: Literal['plain', 're'] = 'plain')¶
Matches on field name if it is contained in fields list. The parameter ‘type’ determines if field names are matched as plain string (“plain”) or regular expressions (“re”).
- class sigma.processing.conditions.ExcludeFieldCondition(fields: List[str], type: Literal['plain', 're'] = 'plain')¶
Matches on field name if it is not contained in fields list.
- class sigma.processing.conditions.FieldNameProcessingItemAppliedCondition(processing_item_id: str)¶
Checks if processing item was applied to a field name.
- class sigma.processing.conditions.FieldNameProcessingStateCondition(key: str, val: str | int | float | bool, op: Literal['eq', 'ne', 'gte', 'gt', 'lte', 'lt'] = 'eq')¶
Matches on processing pipeline state in context of a field name condition.
Base Classes¶
Base classes must be overridden to implement new conditions that can be used in processing pipelines. In addition, the new class should be mapped to an identifier. This allows to use the condition from processing pipelines defined in YAML files. The mapping is done in the dict rule_conditions or detection_item_conditions in the sigma.processing.conditions package for the respective condition types. This is not necessary for conditions that should be uses privately and not be distributed via the main pySigma distribution.
- class sigma.processing.conditions.RuleProcessingCondition¶
Base for Sigma rule processing condition classes used in processing pipelines.
- class sigma.processing.conditions.DetectionItemProcessingCondition¶
Base for Sigma detection item processing condition classes used in processing pipelines.
- class sigma.processing.conditions.FieldNameProcessingCondition¶
Base class for conditions on field names in detection items, Sigma rule field lists and other use cases that require matching on field names without detection item context.
Transformations¶
Rule Pre-Processing Transformations¶
The following transformations with their corresponding identifiers for usage in YAML-based pipeline definitions are available:
Identifier |
Class |
---|---|
field_name_mapping |
FieldMappingTransformation |
field_name_prefix_mapping |
FieldPrefixMappingTransformation |
field_name_transform |
FieldFunctionTransformation |
drop_detection_item |
DropDetectionItemTransformation |
field_name_suffix |
AddFieldnameSuffixTransformation |
field_name_prefix |
AddFieldnamePrefixTransformation |
wildcard_placeholders |
WildcardPlaceholderTransformation |
value_placeholders |
ValueListPlaceholderTransformation |
query_expression_placeholders |
QueryExpressionPlaceholderTransformation |
add_condition |
AddConditionTransformation |
change_logsource |
ChangeLogsourceTransformation |
add_field |
AddFieldTransformation |
remove_field |
RemoveFieldTransformation |
set_field |
SetFieldTransformation |
replace_string |
ReplaceStringTransformation |
map_string |
MapStringTransformation |
set_state |
SetStateTransformation |
regex |
RegexTransformation |
set_value |
SetValueTransformation |
convert_type |
ConvertTypeTransformation |
rule_failure |
RuleFailureTransformation |
detection_item_failure |
DetectionItemFailureTransformation |
set_custom_attribute |
SetCustomAttributeTransformation |
nest |
NestedProcessingTransformation |
- class sigma.processing.transformations.FieldMappingTransformation(mapping: Dict[str, str | List[str]])¶
Map a field name to one or multiple different.
YAML example:
transformations:
type: field_name_mapping
mapping:
EventID: EventCode
CommandLine:
- command_line
- cmdline
This shows how to map the field name EventID to EventCode and CommandLine to command_line and cmdline. For the latter, OR-conditions will be generated to match the value on both fields. This is useful if different data models are used in the same system.
- class sigma.processing.transformations.FieldPrefixMappingTransformation(mapping: Dict[str, str | List[str]])¶
Map a field name prefix to one or multiple different prefixes.
- class sigma.processing.transformations.FieldFunctionTransformation(transform_func: ~typing.Callable[[str], str], mapping: ~typing.Dict[str, str] = <factory>)¶
Map a field name to another using provided transformation function. You can overwrite transformation by providing explicit mapping for a field.
- class sigma.processing.transformations.DropDetectionItemTransformation¶
Deletes detection items. This should only used in combination with a detection item condition.
- class sigma.processing.transformations.AddFieldnameSuffixTransformation(suffix: str)¶
Add field name suffix.
- class sigma.processing.transformations.AddFieldnamePrefixTransformation(prefix: str)¶
Add field name prefix.
- class sigma.processing.transformations.WildcardPlaceholderTransformation(include: List[str] | None = None, exclude: List[str] | None = None)¶
Replaces placeholders with wildcards. This transformation is useful if remaining placeholders should be replaced with something meaningful to make conversion of rules possible without defining the placeholders content.
- class sigma.processing.transformations.ValueListPlaceholderTransformation(include: List[str] | None = None, exclude: List[str] | None = None)¶
Replaces placeholders with values contained in variables defined in the configuration.
- class sigma.processing.transformations.QueryExpressionPlaceholderTransformation(include: ~typing.List[str] | None = None, exclude: ~typing.List[str] | None = None, expression: str = '', mapping: ~typing.Dict[str, str] = <factory>)¶
Replaces a placeholder with a plain query containing the placeholder or an identifier mapped from the placeholder name. The main purpose is the generation of arbitrary list lookup expressions which are passed to the resulting query.
Parameters: * expression: string that contains query expression with {field} and {id} placeholder where placeholder identifier or a mapped identifier is inserted. * mapping: Mapping between placeholders and identifiers that should be used in the expression. If no mapping is provided the placeholder name is used.
- class sigma.processing.transformations.AddConditionTransformation(conditions: ~typing.Dict[str, str | ~typing.List[str]] = <factory>, name: str | None = None, template: bool = False, negated: bool = False)¶
Add a condition expression to rule conditions.
If template is set to True the condition values are interpreted as string templates and the following placeholders are replaced:
$category, $product and $service: with the corresponding values of the Sigma rule log source.
- class sigma.processing.transformations.ChangeLogsourceTransformation(category: str | None = None, product: str | None = None, service: str | None = None)¶
Replace log source as defined in transformation parameters.
- class sigma.processing.transformations.AddFieldTransformation(field: str | List[str])¶
Add one or multiple fields to the Sigma rule. The field is added to the fields list of the rule:
- class sigma.processing.transformations.RemoveFieldTransformation(field: str | List[str])¶
Remove one or multiple fields from the Sigma rules field list. If a given field is not in the rules list, it is ignored.
- class sigma.processing.transformations.SetFieldTransformation(fields: List[str])¶
Set fields to the Sigma rule. The fields are set to the fields list of the transformation.
- class sigma.processing.transformations.ReplaceStringTransformation(regex: str, replacement: str, skip_special: bool = False, interpret_special: bool = False)¶
Replace string part matched by regular expresssion with replacement string that can reference capture groups. Normally, the replacement operates on the plain string representation of the SigmaString. This allows also to include special characters and placeholders in the replacement. By enabling the skip_special parameter, the replacement is only applied to the plain string parts of a SigmaString and special characters and placeholders are left untouched. The interpret_special option determines for skip_special if special characters and placeholders are interpreted in the replacement result or not.
The replacement is implemented with re.sub() and can use all features available there.
- class sigma.processing.transformations.MapStringTransformation(mapping: Dict[str, str | List[str]])¶
Map static string value to one or multiple other strings.
YAML example:
transformations:
type: map_string
mapping:
value1: mapped1
value2:
- mapped2A
- mapped2B
- class sigma.processing.transformations.SetStateTransformation(key: str, val: Any)¶
Set pipeline state key to value.
- class sigma.processing.transformations.RegexTransformation(method: Literal['plain', 'ignore_case_flag', 'ignore_case_brackets'] = 'ignore_case_brackets')¶
Transform a string value to a case insensitive regular expression. The following methods are available and can be selected with the method parameter:
plain: Convert the string to a regular expression without any change to its case. In most cases this should result in a case-sensitive match of the string.
case_insensitive_flag: Add the case insensitive flag to the regular expression.
case_insensitive_brackets (default): Wrap each character in a bracket expression like [aA] to match both case variants.
This transformation is intended to be used to emulate case insensitive matching in backends that don’t support it natively.
- class sigma.processing.transformations.SetValueTransformation(value: str | int | float | bool | None, force_type: Literal['str', 'num'] | None = None)¶
Set value to a fixed value. The type of the value can be enforced to str or num with the force_type parameter.
- class sigma.processing.transformations.ConvertTypeTransformation(target_type: Literal['str', 'num'])¶
Convert type of value. The conversion into strings and numbers is currently supported.
- class sigma.processing.transformations.RuleFailureTransformation(message: str)¶
Raise a SigmaTransformationError with the provided message. This enables transformation pipelines to signalize that a certain situation can’t be handled, e.g. only a subset of values is allowed because the target data model doesn’t offers all possibilities.
This is a rule transformation. Detection item and field name conditions are not evaluated if this is used.
- class sigma.processing.transformations.DetectionItemFailureTransformation(message: str)¶
Raise a SigmaTransformationError with the provided message. This enables transformation pipelines to signalize that a certain situation can’t be handled, e.g. only a subset of values is allowed because the target data model doesn’t offers all possibilities.
This is a detection item transformation that should be used if detection item or field name conditions are used.
- class sigma.processing.transformations.SetCustomAttributeTransformation(attribute: str, value: Any)¶
Sets an arbitrary custom attribute on a rule, that can be used by a backend during processing.
- class sigma.processing.transformations.NestedProcessingTransformation(items: List[ProcessingItem])¶
Executes a nested processing pipeline as transformation. Main purpose is to apply a whole set of transformations that match the given conditions of the enclosng processing item.
YAML example:
transformations:
type: nest
items:
- type: field_name_mapping
mapping:
EventID: EventCode
CommandLine:
- command_line
- cmdline
- type: set_state
state: processed
- class sigma.processing.transformations.RegexTransformation(method: Literal['plain', 'ignore_case_flag', 'ignore_case_brackets'] = 'ignore_case_brackets')¶
Transform a string value to a case insensitive regular expression. The following methods are available and can be selected with the method parameter:
plain: Convert the string to a regular expression without any change to its case. In most cases this should result in a case-sensitive match of the string.
case_insensitive_flag: Add the case insensitive flag to the regular expression.
case_insensitive_brackets (default): Wrap each character in a bracket expression like [aA] to match both case variants.
This transformation is intended to be used to emulate case insensitive matching in backends that don’t support it natively.
- class sigma.processing.transformations.SetValueTransformation(value: str | int | float | bool | None, force_type: Literal['str', 'num'] | None = None)¶
Set value to a fixed value. The type of the value can be enforced to str or num with the force_type parameter.
- class sigma.processing.transformations.ConvertTypeTransformation(target_type: Literal['str', 'num'])¶
Convert type of value. The conversion into strings and numbers is currently supported.
- class sigma.processing.transformations.SetStateTransformation(key: str, val: Any)¶
Set pipeline state key to value.
- class sigma.processing.transformations.RuleFailureTransformation(message: str)¶
Raise a SigmaTransformationError with the provided message. This enables transformation pipelines to signalize that a certain situation can’t be handled, e.g. only a subset of values is allowed because the target data model doesn’t offers all possibilities.
This is a rule transformation. Detection item and field name conditions are not evaluated if this is used.
- class sigma.processing.transformations.DetectionItemFailureTransformation(message: str)¶
Raise a SigmaTransformationError with the provided message. This enables transformation pipelines to signalize that a certain situation can’t be handled, e.g. only a subset of values is allowed because the target data model doesn’t offers all possibilities.
This is a detection item transformation that should be used if detection item or field name conditions are used.
Query Post-Processing Transformations¶
Added in version 0.10.0.
Identifier |
Class |
---|---|
embed |
EmbedQueryTransformation |
simple_template |
QuerySimpleTemplateTransformation |
template |
QueryTemplateTransformation |
json |
EmbedQueryInJSONTransformation |
replace |
ReplaceQueryTransformation |
nest |
NestedQueryPostprocessingTransformation |
- class sigma.processing.postprocessing.EmbedQueryTransformation(prefix: str | None = None, suffix: str | None = None)¶
Embeds a query between a given prefix and suffix. Only applicable to string queries.
- class sigma.processing.postprocessing.QuerySimpleTemplateTransformation(template: str)¶
Replace query with template that can refer to the following placeholders:
query: the postprocessed query.
rule: the Sigma rule including all its attributes like rule.title.
pipeline: the Sigma processing pipeline where this transformation is applied including all current state information in pipeline.state.
The Python format string syntax (str.format()) is used.
- class sigma.processing.postprocessing.QueryTemplateTransformation(template: str, path: str | None = None, autoescape: bool = False)¶
Apply Jinja2 template provided as template object variable to a query. The following variables are available in the context:
query: the postprocessed query.
rule: the Sigma rule including all its attributes like rule.title.
pipeline: the Sigma processing pipeline where this transformation is applied including all current state information in pipeline.state.
if path is given, template is considered as a relative path to a template file below the specified path. If it is not provided, the template is specified as plain string. autoescape controls the Jinja2 HTML/XML auto-escaping.
- class sigma.processing.postprocessing.EmbedQueryInJSONTransformation(json_template: str)¶
Embeds a query into a JSON structure defined as string. the placeholder value %QUERY% is replaced with the query.
- class sigma.processing.postprocessing.ReplaceQueryTransformation(pattern: str, replacement: str)¶
Replace query part specified by regular expression with a given string.
- class sigma.processing.postprocessing.NestedQueryPostprocessingTransformation(items: List[QueryPostprocessingItem])¶
Applies a list of query postprocessing transformations to the query in a nested manner.
Output Finalization Transformations¶
Added in version 0.10.0.
Identifier |
Class |
---|---|
concat |
ConcatenateQueriesFinalizer |
template |
TemplateFinalizer |
json |
JSONFinalizer |
yaml |
YAMLFinalizer |
nested |
NestedFinalizer |
- class sigma.processing.finalization.ConcatenateQueriesFinalizer(separator: str = '\n', prefix: str = '', suffix: str = '')¶
Concatenate queries with a given separator and embed result within a prefix or suffix string.
- class sigma.processing.finalization.TemplateFinalizer(template: str, path: str | None = None, autoescape: bool = False)¶
Apply Jinja2 template provided as template object variable to the queries. The following variables are available in the context:
queries: all post-processed queries generated by the backend.
pipeline: the Sigma processing pipeline where this transformation is applied including all current state information in pipeline.state.
if path is given, template is considered as a relative path to a template file below the specified path. If it is not provided, the template is specified as plain string. autoescape controls the Jinja2 HTML/XML auto-escaping.
- class sigma.processing.finalization.JSONFinalizer(indent: int | None = None)¶
- class sigma.processing.finalization.YAMLFinalizer(indent: int | None = None)¶
- class sigma.processing.finalization.NestedFinalizer(finalizers: List[Finalizer])¶
Apply a list of finalizers to the queries in a nested fashion.
Base Classes¶
There are four transformation base classes that can be derived to implement transformations on particular parts of a Sigma rule or the whole Sigma rule:
- class sigma.processing.transformations.Transformation¶
Base class for processing steps used in pipelines. Override apply with transformation that is applied to the whole rule.
Transformation Tracking¶
tbd