forked from docwho2/java-postgres-lambda-trigger
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathPostgresAddressTrigger.java
More file actions
124 lines (101 loc) · 4.86 KB
/
PostgresAddressTrigger.java
File metadata and controls
124 lines (101 loc) · 4.86 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
/*
* Click nbfs://nbhost/SystemFileSystem/Templates/Licenses/license-default.txt to change this license
* Click nbfs://nbhost/SystemFileSystem/Templates/Classes/Class.java to edit this template
*/
package demo;
import com.fasterxml.jackson.databind.JsonNode;
import static demo.PostgresAbstractTrigger.TG_OP.INSERT;
import java.util.Objects;
import org.jooq.JSONB;
import org.jooq.impl.DSL;
import software.amazon.awssdk.auth.credentials.EnvironmentVariableCredentialsProvider;
import software.amazon.awssdk.core.SdkSystemSetting;
import software.amazon.awssdk.http.urlconnection.UrlConnectionHttpClient;
import software.amazon.awssdk.regions.Region;
import software.amazon.awssdk.services.location.LocationClient;
import software.amazon.awssdk.services.location.model.SearchPlaceIndexForTextResponse;
/**
* Geo Code Addresses that are added to the address table
*
* @author sjensen
*/
public class PostgresAddressTrigger extends PostgresAbstractTrigger {
static final LocationClient locationClient;
static final String PLACE_INDEX;
static {
locationClient = LocationClient.builder()
.region(Region.of(System.getenv(SdkSystemSetting.AWS_REGION.environmentVariable())))
.credentialsProvider(EnvironmentVariableCredentialsProvider.create())
.httpClient(UrlConnectionHttpClient.builder().build())
.build();
PLACE_INDEX = System.getenv("PLACE_INDEX");
}
@Override
protected void processEvent(TG_OP operation, String table_name, JsonNode old_addr, JsonNode new_addr) {
boolean needs_geocode = false;
if (operation.equals(INSERT)) {
// New Address, so always encode
needs_geocode = true;
} else {
// UPDATE, we need to determine if geo coding is required
// for now just look for address_1 changes
var old_address_1 = old_addr.findValue("address_1") != null ? old_addr.findValue("address_1").asText() : null;
var new_address_1 = new_addr.findValue("address_1") != null ? new_addr.findValue("address_1").asText() : null;
if (!Objects.equals(old_address_1, new_address_1)) {
needs_geocode = true;
}
}
if (needs_geocode) {
geoCodeAddress(new_addr);
} else {
log.debug("No Geo Coding is required on this address");
}
}
private void geoCodeAddress(JsonNode addr) {
// Build address String
final var address = new StringBuilder(addr.findValue("address_1").asText());
// Required
address.append(", ").append(addr.findValue("city").asText());
address.append(", ").append(addr.findValue("district").asText());
if (!addr.findValue("postal_code").isNull()) {
address.append(" ").append(addr.findValue("postal_code").asText());
}
var response = locationClient.searchPlaceIndexForText((t) -> {
t.indexName(PLACE_INDEX)
.maxResults(1)
.text(address.toString());
});
log.debug(response);
if (response.hasResults()) {
final var place = response.results().get(0).place();
// Take the response and create a serializable version that we can later convert to JSON
var last_coding = mapper.convertValue(response.toBuilder(), SearchPlaceIndexForTextResponse.serializableBuilderClass());
var dsl = PostgresDataSource.getDSL();
var statement = dsl.update(DSL.table("address"))
// Make sure we set this to false so we don't trigger ourselves again
.set(DSL.field("requires_geo_coding", Boolean.class), false)
.set(DSL.field("address_formatted"), place.label())
.set(DSL.field("geo_coded"), DSL.now())
.set(DSL.field("geo_longitude", Double.class), place.geometry().point().get(0))
.set(DSL.field("geo_latitude", Double.class), place.geometry().point().get(1))
.set(DSL.field("geo_last_coding", JSONB.class), JSONB.jsonb(mapper.valueToTree(last_coding).toString()))
.where(DSL.field("id", Integer.class).eq(addr.findValue("id").asInt()));
int updated = 0;
int counter = 0;
while (updated == 0) {
updated = statement.execute();
if (updated == 0) {
// Sleep a little since row did not update
try {
var sleep = ++counter * 100;
log.debug("Sleeping " + sleep + " ms due to insert not finding row yet");
Thread.sleep(sleep);
} catch (InterruptedException ie) {
}
}
}
} else {
log.info("Places query returned no results");
}
}
}