Logstash
Logstash can be used to log into ElasticSearch, plain file, InfluxDB, etc. It can consume the OpenBMP parsed messages from Kafka and directly import them into another store/log/file. The Postgres container does not use logstash because it does a lot more than logging.
If the use-case is to log all BGP udpates into some DB store, then Logstash is a good option.
Installing
See the Logstash getting started instructions to install directly or use Logstash Docker container.
Configuration
Below is an example
openbmp.conf
input {
kafka {
bootstrap_servers => "obmp-kafka:29092"
topics => [ "openbmp.parsed.collector",
"openbmp.parsed.router",
"openbmp.parsed.peer",
"openbmp.parsed.unicast_prefix",
"openbmp.parsed.ls_node",
"openbmp.parsed.ls_link",
"openbmp.parsed.ls_prefix",
"openbmp.parsed.l3vpn",
"openbmp.parsed.evpn"
]
group_id => "openbmp-logstash"
codec => plain
decorate_events => true
}
}
filter {
# Split the message into header and body
mutate {
split => [ "message", "\n\n" ]
add_field => { "TOPIC" => "%{[@metadata][kafka][topic]}"}
}
# Set the type of message
if "T: unicast_prefix" in [message][0] {
mutate { add_field => { "TYPE" => "unicast_prefix" } }
} else if "T: l3vpn" in [message][0] {
mutate { add_field => { "TYPE" => "l3vpn" } }
} else if "T: evpn" in [message][0] {
mutate { add_field => { "TYPE" => "evpn" } }
} else if "T: ls_prefix" in [message][0] {
mutate { add_field => { "TYPE" => "ls_prefix" } }
} else if "T: ls_link" in [message][0] {
mutate { add_field => { "TYPE" => "ls_link" } }
} else if "T: ls_node" in [message][0] {
mutate { add_field => { "TYPE" => "ls_node" } }
} else if "T: collector" in [message][0] {
mutate { add_field => { "TYPE" => "collector" } }
} else if "T: peer" in [message][0] {
mutate { add_field => { "TYPE" => "peer" } }
} else if "T: router" in [message][0] {
mutate { add_field => { "TYPE" => "router" } }
} else if "T: bmp_stat" in [message][0] {
mutate { add_field => { "TYPE" => "bmp_stat" } }
} else {
drop {}
}
# Split the message body into rows
split {
field => "[message][1]"
terminator => "\n"
target => "row"
}
# Parse message based on type
if [TYPE] == "unicast_prefix" {
csv {
columns=>["action","sequence","hash","router_hash","router_ip","base_attr_hash",
"peer_hash","peer_ip","peer_asn","timestamp","prefix","prefix_len",
"is_IPv4","origin","as_path","as_path_count","origin_as","next_hop","MED","local_pref",
"aggregator","community_list","ext_community_list","cluster_list","is_atomic_agg",
"is_next_hop_IPv4","originator_id", "path_id", "labels", "is_pre_policy", "is_adj_in",
"large_community_list" ]
convert => {
"sequence" => "integer"
"peer_asn" => "integer"
"origin_as" => "integer"
"prefix_len" => "integer"
"path_id" => "integer"
"as_path_count" => "integer"
"MED" => "integer"
"local_pref" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else if [TYPE] == "l3vpn" {
csv {
columns=>["action","sequence","hash","router_hash","router_ip","base_attr_hash",
"peer_hash","peer_ip","peer_asn","timestamp","prefix","prefix_len",
"is_IPv4","origin","as_path","as_path_count","origin_as","next_hop","MED","local_pref",
"aggregator","community_list","ext_community_list","cluster_list","is_atomic_agg",
"is_next_hop_IPv4","originator_id", "path_id", "labels", "is_pre_policy", "is_adj_in",
"rd", "rd_type",
"large_community_list" ]
convert => {
"sequence" => "integer"
"peer_asn" => "integer"
"origin_as" => "integer"
"path_id" => "integer"
"prefix_len" => "integer"
"as_path_count" => "integer"
"MED" => "integer"
"local_pref" => "integer"
"rd_type" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else if [TYPE] == "ls_node" {
csv {
columns=>["action","sequence","hash","base_attr_hash","router_hash","router_ip",
"peer_hash","peer_ip","peer_asn","timestamp",
"igp_router_id","router_id","routing_id","ls_id","mt_id",
"ospf_area_id","isis_area_id","protocol","flags",
"as_path","local_pref","MED","next_hop",
"node_name",
"is_pre_policy", "is_adj_in",
"rd", "rd_type",
"sr_capabilities" ]
convert => {
"sequence" => "integer"
"MED" => "integer"
"peer_asn" => "integer"
"local_pref" => "integer"
"mt_id" => "integer"
"ls_id" => "integer"
"routing_id" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else if [TYPE] == "ls_link" {
csv {
columns=>["action","sequence","hash","base_attr_hash","router_hash","router_ip",
"peer_hash","peer_ip","peer_asn","timestamp",
"igp_router_id","router_id","routing_id","ls_id",
"ospf_area_id","isis_area_id","protocol",
"as_path","local_pref","MED","next_hop",
"mt_id","local_link_id","remote_link_id","interface_ip",
"neighbor_ip","igp_metric","admin_group","max_link_bw",
"max_resv_bw","unreserved_bw","te_default_metric",
"link_protection","mpls_proto_mask","srlg",
"link_name","remote_node_hash","local_node_hash",
"remote_igp_router_id","remote_router_id",
"local_node_asn","remote_node_asn",
"epe_peer_ndoe_sid",
"is_pre_policy", "is_adj_in",
"adj_segment_id"
]
convert => {
"sequence" => "integer"
"MED" => "integer"
"local_pref" => "integer"
"peer_asn" => "integer"
"routing_id" => "integer"
"mt_id" => "integer"
"ls_id" => "integer"
"local_link_id" => "integer"
"remote_link_id" => "integer"
"igp_metric" => "integer"
"admin_group" => "integer"
"max_link_bw" => "integer"
"max_resv_bw" => "integer"
"te_default_metric" => "integer"
"local_node_asn" => "integer"
"remote_node_asn" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else if [TYPE] == "ls_prefix" {
csv {
columns=>["action","sequence","hash","base_attr_hash","router_hash","router_ip",
"peer_hash","peer_ip","peer_asn","timestamp",
"igp_router_id","router_id","routing_id","ls_id",
"ospf_area_id","isis_area_id","protocol",
"as_path","local_pref","MED","next_hop",
"local_node_hash","mt_id","ospf_route_type",
"igp_flags","route_tag","ext_route_tag",
"ospf_fwd_addr","igp_metric",
"prefix","prefix_len",
"is_pre_policy", "is_adj_in",
"prefix_sid"
]
convert => {
"sequence" => "integer"
"MED" => "integer"
"local_pref" => "integer"
"peer_asn" => "integer"
"prefix_len" => "integer"
"routing_id" => "integer"
"mt_id" => "integer"
"ls_id" => "integer"
"route_tag" => "integer"
"ext_route_tag" => "integer"
"igp_metric" => "integer"
"te_default_metric" => "integer"
"local_node_asn" => "integer"
"remote_node_asn" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else if [TYPE] == "evpn" {
csv {
columns=>["action","sequence","hash","router_hash","router_ip","base_attr_hash",
"peer_hash","peer_ip","peer_asn","timestamp",
"origin","as_path","as_path_count","origin_as","next_hop","MED","local_pref",
"aggregator","community_list","ext_community_list","cluster_list","is_atomic_agg",
"is_next_hop_IPv4","originator_id", "path_id","is_pre_policy", "is_adj_in",
"rd","rd_type","origin_router_ip_len","origin_router_ip",
"eth_tag_id","eth_segment_id","mac_len","mac",
"ip_len", "ip", "mpls_label_1","mpls_label_2",
"large_community_list" ]
convert => {
"sequence" => "integer"
"peer_asn" => "integer"
"origin_as" => "integer"
"path_id" => "integer"
"as_path_count" => "integer"
"MED" => "integer"
"local_pref" => "integer"
"rd_type" => "integer"
"mac_len" => "integer"
"ip_len" => "integer"
"mpls_label_1" => "integer"
"mpls_label_2" => "integer"
"origin_router_ip_len" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else if [TYPE] == "collector" {
csv {
columns => ["action","sequence","admin_id","hash","routers","router_count","timestamp"]
convert => {
"sequence" => "integer"
"router_count" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else if [TYPE] == "router" {
csv {
columns => ["action","sequence","name","hash","ip","descr",
"term_code","term_reason","init_data",
"term_data","timestamp", "bgp_id"]
convert => {
"sequence" => "integer"
"term_code" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else if [TYPE] == "peer" {
csv {
columns => ["action","sequence","hash","router_hash","name","remote_bgp_id",
"router_ip","timestamp","remote_asn","remote_ip","peer_rd",
"remote_port","local_asn","local_ip","local_port","local_bgp_id",
"info_data","adv_cap","recv_cap","remote_holddown","adv_holddown",
"bmp_reason","bgp_error_code","bgp_error_subcode","error_text",
"is_l3vpn","is_pre_policy","is_ipv4","is_loc_rib","is_loc_rib_filtered",
"table_name"
]
convert => {
"sequence" => "integer"
"remote_asn" => "integer"
"remote_port" => "integer"
"local_asn" => "integer"
"local_port" => "integer"
"remote_holddown" => "integer"
"adv_holddown" => "integer"
"bmp_reason" => "integer"
"bgp_error_code" => "integer"
"bgp_error_subcode" => "integer"
}
separator => "\t"
source => "row"
remove_field => ["message", "original", "event", "row" ]
}
} else {
drop {}
}
date {
match => ["timestamp", "YYYY-MM-dd HH:mm:ss.SSSSSS"]
remove_field => ["timestamp"]
}
}
output{
stdout {
id => "openbmp"
}
}
Example
Danger
You MUST set config.support_escapes: true
in logstash.yml or via docker environmentvariable CONFIG_SUPPORT_ESCAPES=true
Copy/create openbmp.conf
from the above config file. Modify it as needed. As needed, change the output to go to file(s), Elasticsearch, InfluxDB, etc.
docker run -it --rm --network obmp_default \
-v $(pwd):/work -e CONFIG_SUPPORT_ESCAPES=true -e XPACK_MONITORING_ENABLED=false\
logstash:8.2.2 -f /work/openbmp.conf