Accelerated queries - Amazon OpenSearch Service

Accelerated queries

On the details page for your data source, choose the Accelerate Performance option. To ensure a fast experience with your data in Amazon S3, there are three different types of accelerations that you can set up to index data into OpenSearch Service—skipping indexes, materialized views, and covering indexes.

Skipping indexes

With a skipping index, you can index only the metadata of the data stored in Amazon S3. When you query a table with a skipping index, the query planner references the index and rewrites the query to efficiently locate the data, instead of scanning all partitions and files. This allows the skipping index to quickly narrow down the specific location of the stored data.

From the data source details page, select Accelerate Performance where you can get started by selecting the database and table that you want to accelerate. Alternaltively, you can opt to auto-generate a skipping index. If you prefer to manually add fields to be accelerated you can do so by selecting the Add fields button. When adding the fields, you'll be asked what type of skipping index you'd like to add. You'll need to choose from one of the following:

  • Partition: Uses data partition details to locate data (best for partitioning based columns such as year, month, day, hour)

  • MinMax: Uses lower and upper bound of the indexed column to locate data (best for numeric columns)

  • ValueSet: Uses a unique value set to locate data (best for columns with low-moderate cardinality and require exact matching)

  • BloomFilter: Uses a bloom filter to locate data (best for columns with high cardinality and do not require exact matching)

You can also manually create a skipping index on your table using Query Workbench. Simply select the S3 data source from the data source dropdown and add the following query:

CREATE SKIPPING INDEX ON datasourcename.gluedatabasename.vpclogstable( `srcaddr` BLOOM_FILTER, `dstaddr` BLOOM_FILTER, `day` PARTITION, `account_id`BLOOM_FILTER ) WITH ( index_settings = '{"number_of_shards":5,"number_of_replicas":1}', auto_refresh = true, checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint' )

Materialized views

With materialized views, you can use complex queries, such as aggregations, to power Dashboard visualizations. Materialized views ingest a small amount of your data, depending on the query, into OpenSearch Servicestorage. OpenSearch Service then forms an index from the ingested data that you can use for visualizations. You can manage the materialized view index with Index State Management in Amazon OpenSearch Service, just as you can with any other OpenSearch index.

Since you will be specifying a target index, you’ll be asked to name the index and add the Watermark Delay which defines how late data can come in and still be processed.

Use the following query to create a new materialized view for the VPC flow logs table that you created in Create Spark Tables using Query Workbench:

CREATE MATERIALIZED VIEW {table_name}__week_live_mview AS SELECT cloud.account_uid AS `aws.vpc.cloud_account_uid`, cloud.region AS `aws.vpc.cloud_region`, cloud.zone AS `aws.vpc.cloud_zone`, cloud.provider AS `aws.vpc.cloud_provider`, CAST(IFNULL(src_endpoint.port, 0) AS LONG) AS `aws.vpc.srcport`, CAST(IFNULL(src_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-src-aws-service`, CAST(IFNULL(src_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.srcaddr`, CAST(IFNULL(src_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.src-interface_uid`, CAST(IFNULL(src_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.src-vpc_uid`, CAST(IFNULL(src_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.src-instance_uid`, CAST(IFNULL(src_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.src-subnet_uid`, CAST(IFNULL(dst_endpoint.port, 0) AS LONG) AS `aws.vpc.dstport`, CAST(IFNULL(dst_endpoint.svc_name, 'Unknown') AS STRING) AS `aws.vpc.pkt-dst-aws-service`, CAST(IFNULL(dst_endpoint.ip, '0.0.0.0') AS STRING) AS `aws.vpc.dstaddr`, CAST(IFNULL(dst_endpoint.interface_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-interface_uid`, CAST(IFNULL(dst_endpoint.vpc_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-vpc_uid`, CAST(IFNULL(dst_endpoint.instance_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-instance_uid`, CAST(IFNULL(dst_endpoint.subnet_uid, 'Unknown') AS STRING) AS `aws.vpc.dst-subnet_uid`, CASE WHEN regexp(dst_endpoint.ip, '(10\\..*)|(192\\.168\\..*)|(172\\.1[6-9]\\..*)|(172\\.2[0-9]\\..*)|(172\\.3[0-1]\\.*)') THEN 'ingress' ELSE 'egress' END AS `aws.vpc.flow-direction`, CAST(IFNULL(connection_info['protocol_num'], 0) AS INT) AS `aws.vpc.connection.protocol_num`, CAST(IFNULL(connection_info['tcp_flags'], '0') AS STRING) AS `aws.vpc.connection.tcp_flags`, CAST(IFNULL(connection_info['protocol_ver'], '0') AS STRING) AS `aws.vpc.connection.protocol_ver`, CAST(IFNULL(connection_info['boundary'], 'Unknown') AS STRING) AS `aws.vpc.connection.boundary`, CAST(IFNULL(connection_info['direction'], 'Unknown') AS STRING) AS `aws.vpc.connection.direction`, CAST(IFNULL(traffic.packets, 0) AS LONG) AS `aws.vpc.packets`, CAST(IFNULL(traffic.bytes, 0) AS LONG) AS `aws.vpc.bytes`, CAST(FROM_UNIXTIME(time / 1000) AS TIMESTAMP) AS `@timestamp`, CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `start_time`, CAST(FROM_UNIXTIME(start_time / 1000) AS TIMESTAMP) AS `interval_start_time`, CAST(FROM_UNIXTIME(end_time / 1000) AS TIMESTAMP) AS `end_time`, status_code AS `aws.vpc.status_code`, severity AS `aws.vpc.severity`, class_name AS `aws.vpc.class_name`, category_name AS `aws.vpc.category_name`, activity_name AS `aws.vpc.activity_name`, disposition AS `aws.vpc.disposition`, type_name AS `aws.vpc.type_name`, region AS `aws.vpc.region`, accountid AS `aws.vpc.account-id` FROM datasourcename.gluedatabasename.vpclogstable WITH ( auto_refresh = true, refresh_interval = '15 Minute', checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint', watermark_delay = '1 Minute', )

Covering indexes

With a covering index, you can ingest data from a specified column in a table. This is the most performant of the three indexing types. Because OpenSearch Service ingests all data from your desired column, you get better performance and can perform advanced analytics.

Just as with materialized views, OpenSearch Service creates a new index from the covering index data. You can use this new index for Dashboard visualizations and other OpenSearch Service functionality, such as anomaly detection or geospatial capabilities. You can manage the covering view index with Index State Management in Amazon OpenSearch Service, just as you can with any other OpenSearch index.

Use the following query to create a new covering index for the VPC flow logs table that you created in Create Spark Tables using Query Workbench:

CREATE INDEX vpc_covering_index ON datasourcename.gluedatabasename.vpclogstable (version, account_id, interface_id, srcaddr, dstaddr, srcport, dstport, protocol, packets, bytes, start, action, log_status STRING, `aws-account-id`, `aws-service`, `aws-region`, year, month, day, hour ) WITH ( auto_refresh = true, refresh_interval = '15 minute', checkpoint_location = 's3://accountnum-vpcflow/AWSLogs/checkpoint' )