Writing Crowdsec parser

Parser dependency

The crowdsecurity/syslog-logs parsers is needed by the core parsing engine. Deletion or modification of this could result of Crowdsec being unable to parse logs, so this should be done very carefully.

In the current example, we'll write a parser for the logs produced by iptables (netfilter) with the -j LOG target. This document aims at detailing the process of writing and testing new parsers.

Base parser file

The most simple parser can be defined as :

filter: 1 == 1
debug: true
onsuccess: next_stage
name: me/myparser
description: a cool parser for my service
grok:
#our grok pattern : capture .*
  pattern: ^%{DATA:some_data}$
#the field to which we apply the grok pattern : the log message itself
  apply_on: message
statics:
  - parsed: is_my_service
    value: yes
  • a filter : if the expression is true, the event will enter the parser, otherwise, it won't
  • a onsuccess : defines what happens when the event was successfully parsed : shall we continue ? shall we move to next stage ? etc.
  • a name & a description
  • some statics that will modify the event
  • a debug flag that allows to enable local debugging information.

We are going to use to following sample log as an example :

May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 
May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0 

Trying our mock parser

Warning

Your yaml file must be in the config/parsers/s01-parser/ directory.

For example it can be ~/crowdsec-v0.0.19/tests/config/parsers/s01-parser/myparser.yaml, or /etc/crowdsec/config/parsers/s01-parser/myparser.yaml.

The stage directory might not exist, don't forget to create it.

(deployment is assuming you're using a test environment)

Setting up our new parser :

cd crowdsec-v0.X.Y/tests
mkdir -p config/parsers/s01-parser
cp myparser.yaml config/parsers/s01-parser/                  
./crowdsec -c ./dev.yaml -file ./x.log -type foobar
Expected output
INFO[0000] setting loglevel to info                     
INFO[11-05-2020 15:48:28] Crowdsec v0.0.18-6b1281ba76819fed4b89247a5a673c592a3a9f88
...
DEBU[0000] Event entering node                           id=dark-water name=me/myparser stage=s01-parser
DEBU[0000] eval(TRUE) '1 == 1'                           id=dark-water name=me/myparser stage=s01-parser
DEBU[0000] no ip in event, cidr/ip whitelists not checked  id=dark-water name=me/myparser stage=s01-parser
DEBU[0000] + Grok '' returned 1 entries to merge in Parsed  id=dark-water name=me/myparser stage=s01-parser
DEBU[0000]  .Parsed['some_data'] = 'May 11 16:23:41 sd-126005 kernel: [47615893.721616] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=54555 PROTO=TCP SPT=45225 DPT=8080 WINDOW=1024 RES=0x00 SYN URGP=0 '  id=dark-water name=me/myparser stage=s01-parser
DEBU[0000] + Processing 1 statics                        id=dark-water name=me/myparser stage=s01-parser
DEBU[0000] .Parsed[is_my_service] = 'yes'                id=dark-water name=me/myparser stage=s01-parser
DEBU[0000] Event leaving node : ok                       id=dark-water name=me/myparser stage=s01-parser
DEBU[0000] move Event from stage s01-parser to s02-enrich  id=dark-water name=me/myparser stage=s01-parser
...

We can see our "mock" parser is working, let's see what happened :

  • The event enter the node
  • The filter returned true (1 == 1) so the event will be processed
  • Our grok pattern (just a .* capture) "worked" and captured data (the whole line actually)
  • The grok captures (under the name "some_data") are merged into the .Parsed map of the event
  • The statics section is processed, and .Parsed[is_my_service] is set to yes
  • The event leaves the parser successfully, and because "next_stage" is set, we move the event to the next "stage"

Writing the GROK pattern

We are going to write a parser for iptables logs, they look like this :

May 11 16:23:43 sd-126005 kernel: [47615895.771900] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=99.99.99.99 DST=127.0.0.1 LEN=40 TOS=0x00 PREC=0x00 TTL=245 ID=51006 PROTO=TCP SPT=45225 DPT=8888 WINDOW=1024 RES=0x00 SYN URGP=0 
May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0 

Using an online grok debugger or an online regex debugger, we come up with the following grok pattern :

\[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*

Warning

Check if the pattern you are looking for is not already present in patterns configuration.

Test our new pattern

Now, let's integrate our GROK pattern within our YAML :

#let's set onsuccess to "next_stage" : if the log is parsed, we can consider it has been dealt with
onsuccess: next_stage
#debug, for reasons (don't do this in production)
debug: true
#as seen in our sample log, those logs are processed by the system and have a progname set to 'kernel'
filter: "1 == 1"
#name and description:
name: crowdsecurity/iptables-logs
description: "Parse iptables drop logs"
grok:
#our grok pattern
  pattern: \[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*
#the field to which we apply the grok pattern : the log message itself
  apply_on: message
statics:
  - parsed: is_my_service
    value: yes
./crowdsec -c ./dev.yaml -file ./x.log -type foobar
Expected output
INFO[0000] setting loglevel to info                     
INFO[11-05-2020 16:18:58] Crowdsec v0.0.18-6b1281ba76819fed4b89247a5a673c592a3a9f88 
...
DEBU[0000] Event entering node                           id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] eval(TRUE) '1 == 1'                           id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] no ip in event, cidr/ip whitelists not checked  id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] + Grok '' returned 8 entries to merge in Parsed  id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['dst_port'] = '8080'                 id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['action'] = ''                       id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['int_eth'] = 'enp1s0'                id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['src_ip'] = '99.99.99.99'         id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['dst_ip'] = '127.0.0.1'           id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['length'] = '40'                     id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['proto'] = 'TCP'                     id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['src_port'] = '45225'                id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] + Processing 1 statics                        id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] .Parsed[is_my_service] = 'yes'                id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] Event leaving node : ok                       id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] move Event from stage s01-parser to s02-enrich  id=lingering-breeze name=crowdsecurity/iptables-logs stage=s01-parser
...

What changed ? We can now see that the fragment captured by the GROK pattern are merged in the Parsed array ! We now have parsed data, only a few more changes and we will be done :)

Finalizing our parser

#let's set onsuccess to "next_stage" : if the log is parsed, we can consider it has been dealt with
onsuccess: next_stage
#debug, for reasons (don't do this in production)
debug: true
#as seen in our sample log, those logs are processed by the system and have a progname set to 'kernel'
filter: "evt.Parsed.program == 'kernel'"
#name and description:
name: crowdsecurity/iptables-logs
description: "Parse iptables drop logs"
grok:
#our grok pattern
  pattern: \[%{DATA}\]+.*(%{WORD:action})? IN=%{WORD:int_eth} OUT= MAC=%{IP}:%{MAC} SRC=%{IP:src_ip} DST=%{IP:dst_ip} LEN=%{INT:length}.*PROTO=%{WORD:proto} SPT=%{INT:src_port} DPT=%{INT:dst_port}.*
#the field to which we apply the grok pattern : the log message itself
  apply_on: message
statics:
    - meta: log_type
      value: iptables_drop
    - meta: service
      expression: "evt.Parsed.proto == 'TCP' ? 'tcp' : 'unknown'"
    - meta: source_ip
      expression: "evt.Parsed.src_ip"

filter

We changed the filter to correctly filter on the program name. In the current example, our logs are produced by the kernel (netfilter), and thus the program is kernel :

tail -f /var/log/kern.log
May 11 16:23:50 sd-126005 kernel: [47615902.763137] IN=enp1s0 OUT= MAC=00:08:a2:0c:1f:12:00:c8:8b:e2:d6:87:08:00 SRC=44.44.44.44 DST=127.0.0.1 LEN=60 TOS=0x00 PREC=0x00 TTL=49 ID=17451 DF PROTO=TCP SPT=53668 DPT=80 WINDOW=14600 RES=0x00 SYN URGP=0 

statics

We are setting various entries to static or dynamic values to give "context" to the log :

  • .Meta.log_type is set to iptables_drop (so that we later can filter events coming from this)
  • .Meta.source_ip is set the the source ip captured .Parsed.src_ip
  • .Meta.service is set the the result of an expression that relies on the GROK output (proto field)

Look into dedicated statics documentation to know more about its possibilities.

Testing our finalized parser

./crowdsec -c ./dev.yaml -file ./x.log -type kernel
Expected output
...
DEBU[0000] Event entering node                           id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] eval(TRUE) 'evt.Parsed.program == 'kernel''   id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] no ip in event, cidr/ip whitelists not checked  id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] + Grok '' returned 8 entries to merge in Parsed  id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['src_port'] = '45225'                id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['dst_port'] = '8118'                 id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['action'] = ''                       id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['int_eth'] = 'enp1s0'                id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['src_ip'] = '44.44.44.44'            id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['dst_ip'] = '127.0.0.1'              id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['length'] = '40'                     id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000]  .Parsed['proto'] = 'TCP'                     id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] + Processing 3 statics                        id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] .Meta[log_type] = 'iptables_drop'             id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] .Meta[service] = 'tcp'                        id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] .Meta[source_ip] = '44.44.44.44'              id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] Event leaving node : ok                       id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
DEBU[0000] move Event from stage s01-parser to s02-enrich  id=shy-forest name=crowdsecurity/iptables-logs stage=s01-parser
...

Closing word

We have now a fully functional parser for Crowdsec ! We can either deploy it to our production systems to do stuff, or even better, contribute to the Crowdsec Hub !

If you want to know more about directives and possibilities, take a look at the parser reference documentation !