Oracle alertlog Monitoring with Elasticsearch and Kibana

Right now I’m working on an Elasticsearch based monitor for Oracle Databases. There are many guides on how to set up ELK (Elasticsearch, Logstash and Kibana) so I’ll skip that.

In my opinion log analytics is key to provide a solid IT infrastructure. Especially in terms of security and availability! Oracle has its own log analytics products in the cloud but most customers I support, don’t want/need such a big environment. “want” in terms of pricing. “big” in terms of data volume. The smallest package you can get is 200GB upload per hour which is simply to big for most (or even all) midsize companies in Germany (@ORACLE please provide smaller packets!).

OK, back to ELK & Oracle.

Requirements:

  • Elasticsearch running on machine (let’s say) A
  • Logstash running on any machine (in my case machine A)
  • Filebeat running on the Oracle Database machine

Here is the configuration you need to properly send alertlog data to Elasticsearch. There are a few guides out there but they do not work properly: Sorry to say that! Because they work with the multiline plugin which will mix up incoming messages from multiple sources if they arrive at the same time. I think this is key because I want to use one logstash for many databases (see https://www.elastic.co/guide/en/logstash/current/plugins-codecs-multiline.html).

Here is my configuration of filebeat. Filebeat simply reads the alertlog files and sends all new information to logstash. When you run it for the first time, all data in the logfile is considered as “new”. Maybe you want to clean up the log if you have a >10GB logfile.

filebeat.prospectors:
## 12cR2 and newer
- input_type: log
paths:
- /path/to/alert.log
multiline.pattern: '^[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}T[[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2}'
multiline.negate: true
multiline.match: after

## 12cR1 and older
- input_type: log
paths:
- /path/to/alert.log
multiline.pattern: '^[[:alpha:]]{3} [[:alpha:]]{3} [[:digit:]]{2} [[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2} [[:digit:]]{4}'
multiline.negate: true
multiline.match: after

output.logstash:
hosts: ["elk01:5045"]

This configuration simply sends all alertlog records to logstash. The “multiline.pattern” describes the date pattern in the alertlog (one example for the old format and one for the new format starting with 12.2.0.1). The pattern is the separator between log records.

In the logstash configuration all the magic happens which tells Elasticsearch how to store the information in the index.

## Manage log stream from Oracle Server
input {
#  stdin { }
  beats {
    port => "5021"
  }
}
filter {
  #==================== oracle_listenerlog ====================
  if "oracle_listenerlog" in [tags] {
    ## Extract listener_name from logfile path
    grok {
      match => { "[log][file][path]" => "%{GREEDYDATA}/trace/%{GREEDYDATA:listener_name}.log" }
    }

    ## Oracle 12gR1(--) date format and replace @timestamp
    if [message] =~ /^[[:digit:]]{2}-[[:alpha:]]{3}-[[:digit:]]{4}/ {
      grok {
        match => { "message" => "^%{MONTHDAY:monthday}-%{WORD:month}-%{YEAR:year}[[:space:]]%{TIME:time}[[:space:]]\*[[:space:]]%{GREEDYDATA:message}$" }
        overwrite => ["message"]
      }
      mutate {
        add_field => {
          "TIMESTAMP" => "%{year} %{month} %{monthday} %{time}"
        }
      }
    }
    ## Oracle 12gR2(++) date format
    else if [message] =~ /^[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}T[[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2}/ {
      grok {
        patterns_dir => ["/etc/logstash/logstash-patterns-core/patterns"]
        match => { "message" => "^%{TIMESTAMP_ISO8601:TIMESTAMP}\n((?<cdb>[[:alnum:]]*\([[:digit:]]*\)):)?%{GREEDYDATA:message}$" }
        overwrite => ["message"]
      }
    }

    ## Replace @timestamp with timestamp and remove timestamp
    date {
      #timezone => "MET"
      match => [ "TIMESTAMP", "ISO8601", "yyyy MMM dd HH:mm:ss" ]
      target => "@timestamp"
      remove_field => [ "TIMESTAMP", "monthday", "month", "year", "time" ]
    }

    # Extract COMMAND (like status,reload,stop) and add a field
    if [message] =~ /(?i)COMMAND=/ {
      grok {
        match => { "message" => "^.*(?i)COMMAND=(?<command>.*?)\).*$" }
      }
    } else if [message] =~ /(?i)service_update/ {
      # service update
      grok {
        match => { "message" => "^service_update[[:space:]]\*[[:space:]](?<sid>(\+|\-|\w)*?)[[:space:]]\*[[:space:]]" }
      }
      mutate { add_field => { "command" => "service_update" } }
    } else {
      # Extract useful Info (USER,PROGRAM,IPCLIENT) and add fields
      grok {
        break_on_match => false
        match => { "message" => [
          #"PROGRAM=(?<program_name>[^)]*).*USER=%{USER:[connection_details][user]}.*(SERVICE_NAME|SID)=%{WORD:[connection_details][service]}.*HOST=%{IP:[connection_details][ip_client]}.*PORT=%{NUMBER:[connection_details][port]}.*%{WORD:[connection_details][status]}[[:space:]]\*[[:space:]][[:word:]]+[[:space:]]\*[[:space:]]",
          "PROGRAM=(?<program_name>[^)]*)",
          "USER=%{USER:[connection_details][user]}",
          "(SERVICE_NAME|SID)=%{WORD:[connection_details][service]}",
          "HOST=%{IP:[connection_details][ip_client]:IP}",
          "PORT=%{NUMBER:[connection_details][port]}",
          "%{WORD:[connection_details][status]}[[:space:]]\*[[:space:]][[:word:]]+[[:space:]]\*[[:space:]]"
        ]}
      }
      mutate { rename => { "program_name" => "[connection_details][program]" } }
    }

    # Search for TNS- error message
    if [message] =~ /TNS-/ {
      grok { match => { "message" => "(?<tns->TNS-[0-9]*)" } }
    }
    # SID and SERVICE_NAME
    if [message] =~ /(?i)SID=/ {
      mutate { add_field => { "[connection_details][service_type]" => "sid" } }
    }
    if [message] =~ /(?i)SERVICE_NAME=/ {
      mutate { add_field => { "[connection_details][service_type]" => "service_name" } }
    }
  }

  #==================== oracle_alertlog ====================
  else if "oracle_alertlog" in [tags] {
    ## Extract ORACLE_SID from logfile path
    grok {
      match => { "[log][file][path]" => "%{GREEDYDATA}/rdbms/%{GREEDYDATA:dbname}/%{GREEDYDATA:instname}/trace/alert_%{GREEDYDATA:oracle_sid}.log" }
    }

    ## Oracle 12gR1(--) date format (Multitenant support)
    if [message] =~ /^[[:alpha:]]{3} [[:alpha:]]{3} [[:digit:]]{2}/ {
      grok {
        match => { "message" => "^%{DAY} %{MONTH:month} %{MONTHDAY:monthday} %{TIME:time} %{YEAR:year}\n(%{WORD:cdb}\(%{NUMBER}\):)?%{GREEDYDATA:message}$" }
          overwrite => ["message"]
      }
      mutate {
        add_field => {
          "TIMESTAMP" => "%{year} %{month} %{monthday} %{time}"
        }
      }
    }
    ## Oracle 12cR2(++) date format (Multitenand support)
    else if [message] =~ /^[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}T/ {
      grok {
        patterns_dir => ["/etc/logstash/logstash-patterns-core/patterns"]
        match => { "message" => "^%{TIMESTAMP_ISO8601:TIMESTAMP}\n(%{WORD:cdb}\(%{NUMBER}\):)?%{GREEDYDATA:message}$" }
        overwrite => ["message"]
      }
    }

    ## Replace @timestamp with timestamp and remove timestamp
    date {
      #timezone => "MET"
      match => [ "TIMESTAMP", "ISO8601", "yyyy MMM dd HH:mm:ss" ]
      target => "@timestamp"
      remove_field => [ "TIMESTAMP", "monthday", "month", "year", "time" ]
    }

    # ORA- error
    if [message] =~ /ORA-/ {
      grok { match => { "message" => "(?<ora->ORA-[0-9]*)" } }
    }
    # logswitch
    if [message] =~ /^Thread ([[:word:]]|[[:blank:]])+ sequence [[:digit:]]+/ {
      # std. logswitch
      grok { match => { "message" => "^Thread %{NUMBER:[redo][thread_id]}%{GREEDYDATA}sequence %{NUMBER:[redo][sequence]}( \(%{GREEDYDATA:[redo][action]}\))?$" } }
      # checkpoint not complete | Private strand flush not complete
      if [message] =~ /^Thread [[:digit:]]+ cannot/ {
        grok { match => { "message" => "sequence %{NUMBER}\n%{GREEDYDATA:[redo][lgwr_message]}$" } }
      }
    }
    # archive log added
    else if [message] =~ /^(ARC0 \(PID:2724\)\: )?Archived Log entry/ {
      grok { match => { "message" => "^(%{WORD}%{SPACE}*\(PID\:%{NUMBER}\)\: )?Archived Log entry %{NUMBER:[redo][entry]} %{GREEDYDATA} (thread |T-)%{GREEDYDATA:[redo][thread_id]}( sequence |\.S-)%{GREEDYDATA:[redo][sequence]} ID %{GREEDYDATA:[redo][id]} (dest |LAD\:)%{NUMBER:[redo][archivelog_dest]}(\:$|$)" } }
    }
    # alter database | system
    else if [message] =~ /ALTER (SYSTEM|DATABASE) SET/ {
      grok {
        break_on_match => false
        match => { "message" => [
          "%{GREEDYDATA:[alter][command]} SET %{WORD:[alter][parameter]}\=%{GREEDYDATA:[alter][value]} SCOPE=%{WORD:[alter][scope]}",
          "SID='%{GREEDYDATA:[alter][sid]}'( |;)",
          "PDB='%{GREEDYDATA:[alter][pdb]}'"
        ]}
        tag_on_failure => [ ]
      }
    }
  }
}
output {
  stdout { codec => rubydebug }
  #file {
  #  path => "/var/log/logstash/debug_stream.log"
  #  flush_interval => 500
  #  codec => "json_lines"
  #}
  elasticsearch {
    hosts => [ "elasticsearch:9200" ]
    #index => "%{[@metadata][beat]}-%{[@metadata][version]}-%{+YYYY.MM.dd}"
    index => "m2-oracle-logs-%{+YYYY.MM}"
  }
}

OK, this is a lot stuff! Let’s split up the filter section a bit.

Since Oracle has this “diag” directory which contains a standardized directory structure, we can extract the ORACLE_SID from the path:

    ## Extract ORACLE_SID from logfile path
    grok {
      match => { "[log][file][path]" => "%{GREEDYDATA}/rdbms/%{GREEDYDATA:dbname}/%{GREEDYDATA:instname}/trace/alert_%{GREEDYDATA:oracle_sid}.log" }
    }

Afterwards we search for the pattern “ORA-” to extract possible ORA error messages. The field in Elasticsearch is called “ORA-“:

    # ORA- error
    if [message] =~ /ORA-/ {
      grok { match => { "message" => "(?<ora->ORA-[0-9]*)" } }
    }

Next step is to extract the date/time from the message and overwrite the @timestamp key. We could just ignore this step but I want the @timestamp key (default key in a filebeat index) to represent the time when Oracle created the log message rather than when filebeat discovered it.

This block does the magic for the old date format:

    ## Oracle 12gR1(--) date format (Multitenant support)
    if [message] =~ /^[[:alpha:]]{3} [[:alpha:]]{3} [[:digit:]]{2}/ {
      grok {
        match => { "message" => "^%{DAY} %{MONTH:month} %{MONTHDAY:monthday} %{TIME:time} %{YEAR:year}\n(%{WORD:cdb}\(%{NUMBER}\):)?%{GREEDYDATA:message}$" }
          overwrite => ["message"]
      }
      mutate {
        add_field => {
          "TIMESTAMP" => "%{year} %{month} %{monthday} %{time}"
        }
      }
    }

This block does the magic for the new date format:

    ## Oracle 12cR2(++) date format (Multitenand support)
    else if [message] =~ /^[[:digit:]]{4}-[[:digit:]]{2}-[[:digit:]]{2}T/ {
      grok {
        patterns_dir => ["/etc/logstash/logstash-patterns-core/patterns"]
        match => { "message" => "^%{TIMESTAMP_ISO8601:TIMESTAMP}\n(%{WORD:cdb}\(%{NUMBER}\):)?%{GREEDYDATA:message}$" }
        overwrite => ["message"]
      }
    }

… and the following block overwrites the @timestamp key:

    ## Replace @timestamp with timestamp and remove timestamp
    date {
      #timezone => "MET"
      match => [ "TIMESTAMP", "ISO8601", "yyyy MMM dd HH:mm:ss" ]
      target => "@timestamp"
      remove_field => [ "TIMESTAMP", "monthday", "month", "year", "time" ]
   }

The new format looks much easier! The reason is that there are already patterns out there which describe the new date format. You can download these patterns here: https://github.com/logstash-plugins/logstash-patterns-core/tree/master/patterns

There are a lot more grok blocks which extract more data. These blocks are not necessary. I added them to my configuration to make it easier/improved performance to search for certain things such as “checkpoint not complete”.

I will update this post from time to time with my current configuration.

Now you can open kibana and do all the fancy stuff with the data! You can find any problems, even those which do not exist at all 🙂

Cheers!

8 thoughts on “Oracle alertlog Monitoring with Elasticsearch and Kibana

Add yours

  1. Hi there,
    I’m testing and trying to understand this filter-grok structure. I have 11g oracle db and I’m testing it with your code. Thank you for this oppurtunity 🙂 But I have a question:

    In here:

    if [message] =~ /^[[:alpha:]]{3} [[:alpha:]]{3} [[:digit:]]{2} [[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2} [[:digit:]]{4}/ {
    grok {
    match => [“message”,”^%{DAY:day} %{MONTH:month} %{MONTHDAY:monthday} %{TIME:time} %{YEAR:year}\n((?[[:alnum:]]*\([[:digit:]]*\)):)?%{GREEDYDATA:message}$”]
    overwrite => [“message”]

    I’m getting COMPILER ERROR when I’m trying to test it with “https://grokdebug.herokuapp.com/”. An example paragraph from my log file which I’m trying to parse paragraph:

    Mon Jul 30 18:30:00 2018
    Errors in file /u01/app/oracle/diag/rdbms/ghdb_ist/GHDB/trace/GHDB_j000_10660.trc:
    ORA-12012: error on auto execute of job “DB_MAIN”.”TBCARCH_SIZES”
    ORA-02019: connection description for remote database not found
    ORA-06512: at “DB_MAIN.PROC_EAI_LOG_SIZES”, line 3
    ORA-06512: at line 1
    Mon Jul 30 18:31:22 2018

    Can you please help me? I’m trying to understand 🙂
    Best wishes,
    Bora.

    Liked by 1 person

    1. Hello Bora,

      thank you for your comment! Yes, there are still many improvements to be made… I already had plans on updating this article because I found some more bugs with 12cR2 logs.

      The if block should determine if this is the date format of 12.1 and lower or 12.2 and higher. Maybe I can create a tag in filebeat to simplify this if block.

      The grok block should extract the date from the message and then remove it from the message. Additionally it will remove the pluggable database information (“DEVA(3)”) in 12cR1. e.g.:
      Mon Oct 10 02:15:00 2019
      DEVA(3):Errors in file /opt/oracle/diag/rdbms/orcl/orcl/trace/orcl_j001_8577.trc:
      ORA-12012: error on auto execute of job “TM”.”MVIEW_REFRESH_JOB” […]

      But there is a bug in this part: “((?[[:alnum:]]*\([[:digit:]]*\)):)?”.

      In my current version I extract the pdb name as well. And it’s also 11g compatible. The “?” makes the part “(%{WORD:cdb}\(%{NUMBER}\):)?” optional:
      if [message] =~ /^[[:alpha:]]{3} [[:alpha:]]{3} [[:digit:]]{2} [[:digit:]]{2}:[[:digit:]]{2}:[[:digit:]]{2} [[:digit:]]{4}/ {
      grok {
      match => [“message”,”^%{DAY} %{MONTH:month} %{MONTHDAY:monthday} %{TIME:time} %{YEAR:year}\n(%{WORD:cdb}\(%{NUMBER}\):)?%{GREEDYDATA:message}$”]
      overwrite => [“message”]
      }

      I hope this helps!

      Greetings
      Daniel

      Like

  2. Hello there, I really liked your post and am currently facing a challenge, i have done the grok structure as said but unable to identify how to represent it in kibana, as i m getting the complete log file of oracle in kibana.. I am new to this technology and any help will be appreciated.

    Like

    1. I’m not so good at building dashboards in Kibana… but there are many tutorials out there which can help you with that. In this config I tried to extract some information which I want to be able to filter in Kibana later. This improves the performance of my elasticsearch search/filter requests but is not mandatory.
      I would build a dashboard which simply shows all log records. Additionally, you can show the number of “ORA-” messages. Another interesting thing is the number of “checkpoint not complete” and “private strand flush not complete” which enable you to do redolog tuning.
      From here on you can add additional information as you need them.

      Like

Leave a comment

Create a website or blog at WordPress.com

Up ↑