Elasticsearch parent child join

Daehwan Bae
3 min readMar 1, 2021

Sometimes we need to use parent/child join indexing in Elasticsearch.
This code and architecture are written to help a little.

Requirements

There are 2 kinds of Kafka topics
-
Event topic : event_id, event_start(from), event_end(until), tags
- Item topic : event_id, item_id

User Search Scenario
- Query by current timestamp from event documents
- Return item documents after sorting by custom ranking option

Indexing using Java Application

Create DTO for Event and Item

@Builder
@Data
class EventDocument implements Serializable {
String eventId;
String status;
Map<String, String> joinField;
Date from;
Data until;
}
@Builder
@Data
class ItemDocument implements Serializable {
String eventId;
String itemId;
Map<String, String> joinField;
}

Change Kafka Message to DTO Object

EventDocument createEventDocument(JsonNode node) {
Map<String, String> joinField = new HashMap<String, String>() {{
put("name", "event");
}};
String eventId = node.get("...").asText();
Date from = ...;
Date until = ...;
String status = ...;
return EventDocument.builder()
.eventId(eventId)
.from(from)
.until(until)
.status(status)
.joinField(joinField)
.build();
}
ItemDocument createItemDocument(JsonNode node) {
String eventId = node.get("...").asText();
Map<String, String> joinField = new HashMap<String, String>() {{
put("name", "item");
put("parent", eventId);
}};
String itemId = node.get("...").asText();
return ItemDocument.builder().eventId(eventId).itemId(itemId).joinField(joinField).build();
}

Change DTO Object to Elasticsearch Document

IndexRequest createEventIndexRequest(EventDocument evnet) {
return new IndexRequest()
.id(event.getEventId())
.routing(event.getEventId())
.source(objectMapper.writeValueAsString(event), XContentType.JSON);
}
IndexRequest createItemIndexRequest(ItemDocument item) {
return new IndexRequest()
.id(item.getEventId()+"_"+item.getItemId())
.routing(item.getEventId())
.source(objectMapper.writeValueAsString(item), XContentType.JSON);
}

Create Elasticsearch Mapping

PUT event_item/_doc/_mapping
{
"properties": {
"joinField": {
"type": "join",
"relations": {
"event": "item"
},
"eventId": {
"type": "keyword"
},
"status": {
"type": "keyword"
},
"itemId": {
"type": "keyword"
},
"from": {
"type": "date"
},
"until": {
"type": "date"
}
}

Elasticsearch Json Query using Custom Ranking

Parent/child query use more resources
In my test, query latency was not bad.
But it seems to use more cpu. (QPS can be bad)
So, I added cache layer.

GET alias/_search
{
"from": 0,
"size": 30,
"sort": [
{
"_score": {"order": "asc"},
"_id": {"order": "asc"}
}
],
"query": {
"bool": {
"must": [
{
"has_parent": {
"parent_type": "event",
"score": true,
"inner_hits": {},
"query": {
"function_score": {
"query": {
"term": {
"status": "available"
}
},
"boost_mode": "replace",
"script_score": {
"script": {
"source": "if (params.current < doc['until'].value) { return doc["until"].value * 100 } else {return doc["from"].value} )",
"params": {
"current": "YOUR_TIMESTAMP"
}
}
}
}
}
}
}
]
}
}

Store and Recovery using Spark and S3

ES spark driver options

  • es.mapping.data.rich: To keep ES’s original date format
  • es.read.field.as.array.include: To keep ES’s original array format
  • es.mapping.routing: ES routing key (parent and child documents have to be saved in same node)
  • You must limit the throughput by setting the number of spark cores and the es option.

Store to S3

void store(String indexName, String s3Path, String joinFieldName, String esHost) {
Dataset<Row> df = spark.read().format("org.elasticsearch.spark.sql")
.option("es.nodes", esHost)
.optoin("es.nodes.wan.only", "true")
.option("es.mapping.date.rich", "false")
.option("es.read.field.as.array.include", "tags")
.load(indexName+ "/_doc")
.filter(String.format("joinField.name = '%s'", joinFieldName));

df.write().mode("overwrite").parquet(s3Path);
}

Recovery from S3

void restoreEvent(String newIndexName, String s3Path, String joinFieldName, String esHost) {
Dataset<Row> df = spark.read().parquet(s3Path);
df.write().format("org.elasticsearch.spark.sql")
.option("es.mapping.id", "eventId")
.option("es.mapping.routing", "eventId")
.option("es.mapping.join", "joinField")
.optoin("es.nodes.wan.only", "true")
.option("es.nodes", esHost)
.option("es.batch.size.bytes", "32mb")
.option("es.batch.size.entry", "100")
.option("es.batch.write.refresh", "false")
.save(newIndexName+"/_doc")
}
void restoreItem(String newIndexName, String s3Path, String joinFieldName, String esHost) {
Dataset<Row> df = spark.read().parquet(s3Path);

df = df.withColumn("uuid", functions.concat_ws("__", df.col("eventId"), df.col("itemId")));

df.write().format("org.elasticsearch.spark.sql")
.option("es.mapping.id", "uuid")
.option("es.mapping.exclude", "uuid")
.option("es.mapping.routing", "eventId")
.option("es.mapping.join", "joinField")
.optoin("es.nodes.wan.only", "true")
.option("es.nodes", esHost)
.option("es.batch.size.bytes", "32mb")
.option("es.batch.size.entry", "100")
.option("es.batch.write.refresh", "false")
.save(newIndexName+"/_doc")
}

--

--