Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,7 @@ hs_err_pid*

.apt_generated
**/.sts4-cache/*

# Media files
*.mp4
*.jpg
76 changes: 49 additions & 27 deletions build.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -15,18 +15,18 @@
*/
buildscript {
ext {
dataflowBeamVersion = '2.24.0'
visionApiVersion = '1.99.3'
beamVersion = '2.37.0'
visionApiVersion = '2.0.21'
videoApiVersion = '2.0.18'
}
repositories {
mavenCentral()
jcenter()
maven {
url "https://plugins.gradle.org/m2/"
}
dependencies {
classpath "gradle.plugin.com.google.cloud.tools:jib-gradle-plugin:2.5.0"
classpath "com.diffplug.spotless:spotless-plugin-gradle:3.24.2"
classpath "gradle.plugin.com.google.cloud.tools:jib-gradle-plugin:3.2.0"
classpath "com.diffplug.spotless:spotless-plugin-gradle:6.3.0"
}
}
}
Expand All @@ -43,16 +43,16 @@ apply plugin: 'application'
apply plugin: 'eclipse'
apply plugin: 'idea'
apply plugin: 'com.google.cloud.tools.jib'
apply plugin: "com.diffplug.gradle.spotless"
apply plugin: 'com.diffplug.spotless'
// Licence header enforced by spotless
def javaLicenseHeader = """/*
* Copyright 2020 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
Expand All @@ -62,15 +62,15 @@ def javaLicenseHeader = """/*
*/
"""
java {
sourceCompatibility = JavaVersion.VERSION_1_8
targetCompatibility = JavaVersion.VERSION_1_8
sourceCompatibility = JavaVersion.VERSION_11
targetCompatibility = JavaVersion.VERSION_11
}

mainClassName = System.getProperty('mainClass', 'com.google.solutions.ml.api.vision.VisionAnalyticsPipeline')
mainClassName = System.getProperty('mainClass', 'com.google.solutions.annotation.AnnotationPipeline')

jib {
from {
image = 'gcr.io/dataflow-templates-base/java8-template-launcher-base:latest'
image = 'gcr.io/dataflow-templates-base/java11-template-launcher-base:20220124_RC00'
}
to {
credHelper = 'gcloud'
Expand All @@ -92,19 +92,24 @@ repositories {
mavenCentral()
}
dependencies {
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: dataflowBeamVersion
implementation group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: dataflowBeamVersion
implementation group: 'org.apache.beam', name: 'beam-runners-direct-java', version: dataflowBeamVersion
implementation group: 'org.apache.beam', name: 'beam-sdks-java-extensions-ml', version: dataflowBeamVersion
implementation group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.5'
implementation "com.google.auto.value:auto-value-annotations:1.6.2"
annotationProcessor "com.google.auto.value:auto-value:1.6.2"
implementation group: 'org.apache.beam', name: 'beam-sdks-java-core', version: beamVersion
implementation(group: 'org.apache.beam', name: 'beam-runners-google-cloud-dataflow-java', version: beamVersion) {
exclude group: 'io.confluent', module: 'kafka-schema-registry-client'
exclude group: 'io.confluent', module: 'kafka-avro-serializer'
}
implementation group: 'org.apache.beam', name: 'beam-runners-direct-java', version: beamVersion
implementation group: 'org.apache.beam', name: 'beam-sdks-java-extensions-ml', version: beamVersion
implementation group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.36'
implementation "com.google.auto.value:auto-value-annotations:1.9"
annotationProcessor "com.google.auto.value:auto-value:1.9"
implementation 'com.google.http-client:google-http-client:1.41.4'
implementation group: 'com.google.cloud', name: 'google-cloud-vision', version: visionApiVersion
testImplementation group: 'org.apache.beam', name: 'beam-runners-direct-java', version: dataflowBeamVersion
testImplementation group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.5'
testImplementation group: 'org.hamcrest', name: 'hamcrest-core', version: '1.3'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '1.3'
testImplementation 'junit:junit:4.12'
implementation group: 'com.google.cloud', name: 'google-cloud-video-intelligence', version: videoApiVersion
testImplementation group: 'org.apache.beam', name: 'beam-runners-direct-java', version: beamVersion
testImplementation group: 'org.slf4j', name: 'slf4j-jdk14', version: '1.7.36'
testImplementation group: 'org.hamcrest', name: 'hamcrest-core', version: '2.2'
testImplementation group: 'org.hamcrest', name: 'hamcrest-library', version: '2.2'
testImplementation 'junit:junit:4.13.2'
}

jar {
Expand Down Expand Up @@ -134,7 +139,7 @@ run {

task execPipeline(type: JavaExec) {
dependsOn(assemble)
main = System.getProperty("mainClass")
mainClass = System.getProperty("mainClass")
classpath = sourceSets.main.runtimeClasspath
systemProperties System.getProperties()
def execArgs = System.getProperty("exec.args")
Expand All @@ -143,13 +148,30 @@ task execPipeline(type: JavaExec) {

// Spotless configuration

Boolean enableSpotlessCheck = project.hasProperty('enableSpotlessCheck') && project.enableSpotlessCheck == 'true'
def enableSpotlessCheck = project.hasProperty('enableSpotlessCheck') && project.enableSpotlessCheck == 'true'
spotless {
enforceCheck enableSpotlessCheck
java {
licenseHeader javaLicenseHeader
googleJavaFormat()
}
}

run.mustRunAfter 'resources'

// Tests

test {
useJUnit()
testLogging {
events "passed", "skipped", "failed"
showStandardStreams = true
exceptionFormat "full"
}
}

sourceSets {
test {
java.srcDir file('src/test')
}
}
2 changes: 1 addition & 1 deletion gradle.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2020 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down
4 changes: 2 additions & 2 deletions gradle/wrapper/gradle-wrapper.properties
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
#
# Copyright 2020 Google LLC
# Copyright 2022 Google LLC
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,6 @@

distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-6.5.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-7.4-bin.zip
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
2 changes: 1 addition & 1 deletion settings.gradle
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/*
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* https://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.solutions.annotation;

import com.google.cloud.videointelligence.v1p3beta1.StreamingFeature;
import com.google.cloud.vision.v1.Feature;
import com.google.protobuf.GeneratedMessageV3;
import com.google.solutions.annotation.gcs.GCSFileInfo;
import com.google.solutions.annotation.ml.videointelligence.VideoAnnotator;
import com.google.solutions.annotation.ml.vision.ImageAnnotator;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;

public class AnnotateFilesDoFn
extends DoFn<Iterable<GCSFileInfo>, KV<GCSFileInfo, GeneratedMessageV3>> {

private static final long serialVersionUID = 1L;
private final List<StreamingFeature> videoFeatures;
private VideoAnnotator videoAnnotator;
private ImageAnnotator imageAnnotator;
private final List<Feature.Type> imageFeatures;

public AnnotateFilesDoFn(List<Feature.Type> imageFeatures, List<StreamingFeature> videoFeatures) {
this.imageFeatures = imageFeatures;
this.videoFeatures = videoFeatures;
}

@Setup
public void setup() throws IOException {
imageAnnotator = new ImageAnnotator(imageFeatures);
videoAnnotator = new VideoAnnotator(videoFeatures);
}

@Teardown
public void teardown() {
imageAnnotator.teardown();
videoAnnotator.teardown();
}

@ProcessElement
public void processElement(
@Element Iterable<GCSFileInfo> fileInfos,
OutputReceiver<KV<GCSFileInfo, GeneratedMessageV3>> out) {
List<GCSFileInfo> videoFiles = new ArrayList<>();
List<GCSFileInfo> imageFiles = new ArrayList<>();
for (GCSFileInfo fileInfo : fileInfos) {
if (AnnotationPipeline.SUPPORTED_IMAGE_CONTENT_TYPES.stream()
.anyMatch(fileInfo.getContentType()::equalsIgnoreCase)) {
imageFiles.add(fileInfo);
} else if (AnnotationPipeline.SUPPORTED_VIDEO_CONTENT_TYPES.stream()
.anyMatch(fileInfo.getContentType()::equalsIgnoreCase)) {
videoFiles.add(fileInfo);
} else {
throw new RuntimeException("Unsupported content type: " + fileInfo.getContentType());
}
}

List<KV<GCSFileInfo, GeneratedMessageV3>> responses = new ArrayList<>();
if (!imageFiles.isEmpty()) {
responses.addAll(imageAnnotator.processFiles(imageFiles));
}
if (!videoFiles.isEmpty()) {
responses.addAll(videoAnnotator.processFiles(videoFiles));
}

for (KV<GCSFileInfo, GeneratedMessageV3> response : responses) {
out.output(response);
}
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright 2020 Google LLC
* Copyright 2022 Google LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -13,55 +13,64 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.google.solutions.ml.api.vision;
package com.google.solutions.annotation;

import com.google.cloud.videointelligence.v1p3beta1.StreamingFeature;
import com.google.cloud.vision.v1.AnnotateImageResponse;
import com.google.cloud.vision.v1.AnnotateImageResponse.Builder;
import com.google.cloud.vision.v1.Feature;
import com.google.protobuf.GeneratedMessageV3;
import com.google.solutions.annotation.gcs.GCSFileInfo;
import java.util.List;
import java.util.Random;
import org.apache.beam.sdk.transforms.DoFn;
import org.apache.beam.sdk.values.KV;

/**
* Image annotation simulation class to test batching logic without incurring Vision API costs.
* Annotation simulation class to test batching logic without incurring API costs.
*
* It simulates the delay of calling the API and produces a single annotation.
* <p>It simulates the delay of calling the API and produces a single annotation.
*/
public class AnnotateImagesSimulatorDoFn extends
DoFn<Iterable<String>, KV<String, AnnotateImageResponse>> {
public class AnnotateFilesSimulatorDoFn
extends DoFn<Iterable<GCSFileInfo>, KV<GCSFileInfo, GeneratedMessageV3>> {

private static final long serialVersionUID = 1L;

public AnnotateImagesSimulatorDoFn(List<Feature.Type> featureTypes) {
public AnnotateFilesSimulatorDoFn(
List<Feature.Type> imageFeatures, List<StreamingFeature> videoFeatures) {
/*
* Feature types are ignored at the moment. But the simulation logic can be enhanced if needed to produce annotations
* based on the requested features.
*/
}

@ProcessElement
public void processElement(@Element Iterable<String> imageUris,
OutputReceiver<KV<String, AnnotateImageResponse>> out) {
VisionAnalyticsPipeline.numberOfRequests.inc();
public void processElement(
@Element Iterable<GCSFileInfo> fileInfos,
OutputReceiver<KV<GCSFileInfo, GeneratedMessageV3>> out) {
AnnotationPipeline.numberOfImageApiRequests.inc();
try {
/**
* It creates a pattern similar to using the actual APIs with 16 requests per batch and two features requested.
* If more sophisticated simulation is needed - externalize the values or make these parameters
* dependent on batch size and the number of features requested.
* It creates a pattern similar to using the actual APIs with 16 requests per batch and two
* features requested. If more sophisticated simulation is needed - externalize the values or
* make these parameters dependent on batch size and the number of features requested.
*/
Thread.sleep(500 + (new Random().nextInt(1000)));
} catch (InterruptedException e) {
// Do nothing
}

imageUris.forEach(
imageUri -> {
fileInfos.forEach(
fileInfo -> {
Builder responseBuilder = AnnotateImageResponse.newBuilder();
responseBuilder.addLabelAnnotationsBuilder(0).setDescription("Test").setScore(.5F)
.setTopicality(.6F).setMid("/m/test");
responseBuilder
.addLabelAnnotationsBuilder(0)
.setDescription("Test")
.setScore(.5F)
.setTopicality(.6F)
.setMid("/m/test");
AnnotateImageResponse response = responseBuilder.build();
out.output(KV.of(imageUri, response));
out.output(KV.of(fileInfo, response));
});
}
}
Loading