-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathWordCount.java
More file actions
86 lines (77 loc) · 2.94 KB
/
WordCount.java
File metadata and controls
86 lines (77 loc) · 2.94 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.abinash.example;
/**
*
* @author Abinash
*/
import java.util.HashMap;
import java.util.Map;
import java.util.Iterator;
import org.apache.storm.Constants;
import org.apache.storm.topology.BasicOutputCollector;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseBasicBolt;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Tuple;
import org.apache.storm.tuple.Values;
import org.apache.storm.Config;
//For logging
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
//There are variety of bolt types. In this case, use BaseBasicBolt
public class WordCount extends BaseBasicBolt {
//Create logger for this class
private static final Logger logger = LogManager.getLogger(WordCount.class);
//For holding words and counts
Map<String, Integer> counts = new HashMap<String, Integer>();
//How Often to emit a count of words
private Integer emitFrequency;
//Default constructor
public WordCount() {
emitFrequency=5; //Default to 60 seconds
}
//Construct that sets emit frequency
public WordCount(Integer frequency) {
emitFrequency = frequency;
}
//Configure frequency of tick tuples for this bolt
//This delivers a 'tick' tuple on a specific interval,
//which is used to trigger certain actions
@Override
public Map<String , Object> getComponentConfiguration() {
Config conf= new Config();
conf.put(Config.TOPOLOGY_TICK_TUPLE_FREQ_SECS,emitFrequency);
return conf;
}
//execute is called to process tuples
@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
//If it's a tick tuple,emit all words and counts
if(tuple.getSourceComponent().equals(Constants.SYSTEM_COMPONENT_ID) && tuple.getSourceStreamId().equals(Constants.SYSTEM_TICK_STREAM_ID)) {
for(String word : counts.keySet()) {
Integer count = counts.get(word);
collector.emit(new Values(word,count));
logger.info("Emiting a count of " + count + "for word" + word);
}
}else {
//Get the word contents from the tuple
String word = tuple.getString(0);
//Have we counted any already
Integer count = counts.get(word);
if (count == null)
count = 0 ;
//Increment the count and store it
count++;
counts.put(word,count);
}
}
//Declare that this emits a tuple containing twp fields;word and count
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("word","count"));
}
}