-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRandomSentenceSpout.java
More file actions
69 lines (60 loc) · 2.16 KB
/
RandomSentenceSpout.java
File metadata and controls
69 lines (60 loc) · 2.16 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
/*
* To change this license header, choose License Headers in Project Properties.
* To change this template file, choose Tools | Templates
* and open the template in the editor.
*/
package com.abinash.example;
/**
*
* @author Abinash
*
*/
import org.apache.storm.spout.SpoutOutputCollector;
import org.apache.storm.task.TopologyContext;
import org.apache.storm.topology.OutputFieldsDeclarer;
import org.apache.storm.topology.base.BaseRichSpout;
import org.apache.storm.tuple.Fields;
import org.apache.storm.tuple.Values;
import org.apache.storm.utils.Utils;
import java.util.Map;
import java.util.Random;
//This spout randomly emits sentences
public class RandomSentenceSpout extends BaseRichSpout {
//Collector used to emit output
SpoutOutputCollector _collector;
//Used to generate a random number
Random _rand;
//Open is called when an instance of the class is created
@Override
public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
//Set the instance collector to the one passed in
_collector = collector;
//For randomness
_rand = new Random();
}
//Emit data to the stream
@Override
public void nextTuple() {
//Sleep for a bit
Utils.sleep(100);
//The sentences that are randomly emitted
String[] sentences = new String[]{"the cow jumped over the moon", "an apple a day keeps the doctor away","four score and seven years ago", "snow white and the seven dwarfs", "i am at two with nature" };
//Randomly pick a sentence
String sentence = sentences[_rand.nextInt(sentences.length)];
//Emit the sentence
_collector.emit(new Values(sentence));
}
//Ack is not implemented since this is a basic example
@Override
public void ack(Object id) {
}
//Fail is not implemented since this is a basic example
@Override
public void fail(Object id) {
}
//Declare the output fields . In this case, an sentence
@Override
public void declareOutputFields(OutputFieldsDeclarer declarer) {
declarer.declare(new Fields("sentence"));
}
}