Posted on

How to use s3 select in AWS SDK for Go

Consider that you have some compressed JSON data stored in an s3 bucket, for example, an event log — stream of JSON objects, one per line. You may, at some point, need to select a subset of such records based on their attribute.

For example, s3 object uncompressed content may look as follows:

{"user_id": 1, ...}
{"user_id": 2, ...}
{"user_id": 3, ...}
{"user_id": 1, ...}

And you'd want to get only records where user_id equals 1.

Omitting more heavy-weight way to do this with Amazon Athena, you have two options: (a) read object, decode it and filter required records in your code, or (b) use select object content s3 feature to do filtering for you.

SelectObjectContent SDK method is used to issue SELECT command against s3 object. Considering the data structure above, input for this call may look like this:

input := &s3.SelectObjectContentInput{
	Bucket:         aws.String(bucket),
	Key:            aws.String(key),
	Expression:     aws.String(fmt.Sprintf("select * from S3Object where S3Object.user_id=%d", uid)),
	ExpressionType: aws.String("SQL"),
	InputSerialization: &s3.InputSerialization{
		CompressionType: aws.String("GZIP"),
		JSON:            &s3.JSONInput{Type: aws.String("LINES")},
	},
	OutputSerialization: &s3.OutputSerialization{
		JSON: &s3.JSONOutput{RecordDelimiter: aws.String("\n")},
	},
}

Notice how SQL query is constructed here:

select * from S3Object where S3Object.user_id=ID

Input here is configured to handle data as gzipped and having single json object per line, while output is configured to also be in json format with records separated by newlines, effectively providing the same structure as input data.

Upon calling API with given input, our code is provided with s3.SelectObjectContentOutput in response, which allows receiving filtered subset of data in a streaming manner:

out, err := svc.SelectObjectContentWithContext(ctx, input)
if err != nil {
	return err
}
defer out.EventStream.Close()

Returned stream (out.EventStream) may provide multiple types of messages, some of them carry data while others only signal progress and few other things. For the goal of reading data only messages of *s3.RecordsEvent type are useful:

for evt := range out.EventStream.Events() {
	switch e := evt.(type) {
	case *s3.RecordsEvent:
		// do something useful with e.Payload, which is []byte
	}
}

And that's it! The complete code filtering s3 json.gz object by user_id attribute looks like this:

package s3select

import (
	"context"
	"fmt"
	"io"

	"github.com/aws/aws-sdk-go/aws"
	"github.com/aws/aws-sdk-go/service/s3"
)

// FilterByUser writes to w subset of json records with user_id matching uid
// from s3 object defined by bucket and key.
func FilterByUser(ctx context.Context, svc *s3.S3, w io.Writer, bucket, key string, uid uint64) error {
	input := &s3.SelectObjectContentInput{
		Bucket:         aws.String(bucket),
		Key:            aws.String(key),
		Expression:     aws.String(fmt.Sprintf("select * from S3Object where S3Object.user_id=%d", uid)),
		ExpressionType: aws.String("SQL"),
		InputSerialization: &s3.InputSerialization{
			CompressionType: aws.String("GZIP"),
			JSON:            &s3.JSONInput{Type: aws.String("LINES")},
		},
		OutputSerialization: &s3.OutputSerialization{
			JSON: &s3.JSONOutput{RecordDelimiter: aws.String("\n")},
		},
	}
	out, err := svc.SelectObjectContentWithContext(ctx, input)
	if err != nil {
		return err
	}
	defer out.EventStream.Close()

	for evt := range out.EventStream.Events() {
		switch e := evt.(type) {
		case *s3.RecordsEvent:
			if _, err := w.Write(e.Payload); err != nil {
				return err
			}
		}
	}
	return out.EventStream.Err()
}