Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -59,3 +59,6 @@ typings/

# Compiled artifacts
build/

# NPM package lockfile
package-lock.json
24 changes: 24 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,30 @@ iex.stockCompany('AAPL')
// }
```

### Subscribe to a topic (NodeJS Only)

The following code demostrates how to subscribe to an IEX reltime
feed via NodeJS. IEX curretnly does not support subscribing from
other platforms.

```typescript
import { IEXClientRT } from 'iex-api'

const iexRT = new IEXClientRT()
iexRT.observe('TWLO', 'MSFT').subscribe(update => console.log(update))
```

Or using promises
```typescript
import { IEXClientRT } from 'iex-api'
import { take, toArray } from 'rxjs/operator'

const iexRT = new IEXClientRT()
iexRT.observe('TWLO', 'MSFT')
.pipe(take(5), toArray())
.toPromise()
.then(values => console.log(values))
```

To Do:
------
Expand Down
6 changes: 5 additions & 1 deletion package.json
Original file line number Diff line number Diff line change
Expand Up @@ -33,10 +33,14 @@
"url": "https://github.com/bilalq/iex-api/issues"
},
"homepage": "https://github.com/bilalq/iex-api#readme",
"dependencies": {},
"dependencies": {
"rxjs": "^6.2.2",
"socket.io-client": "^2.1.0"
},
"devDependencies": {
"@types/jest": "^21.1.8",
"@types/npm": "^2.0.29",
"@types/socket.io-client": "^1.4.32",
"fetch-ponyfill": "^4.1.0",
"jest": "^21.2.1",
"ts-jest": "^21.2.4",
Expand Down
1 change: 1 addition & 0 deletions src/__tests__/attribution.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* tslint:disable:newline-per-chained-call */
import attribution from '../attribution'

describe('attribution', () => {
Expand Down
3 changes: 2 additions & 1 deletion src/__tests__/client.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* tslint:disable:newline-per-chained-call */
/* tslint:disable:no-magic-numbers */
/* tslint:disable:no-var-requires */
/* tslint:disable:no-require-imports */
Expand Down Expand Up @@ -79,7 +80,7 @@ describe('IEXClient', () => {
const symbols = await iex.symbols()
symbols.forEach(stockSym => {
expect(stockSym.date).toEqual(expect.any(String))
expect(stockSym.iexId).toEqual(expect.any(String))
expect(parseInt(stockSym.iexId, 10)).not.toBeNaN()
expect(typeof stockSym.isEnabled).toEqual('boolean')
expect(stockSym.name).toEqual(expect.any(String))
expect(stockSym.symbol).toEqual(expect.any(String))
Expand Down
79 changes: 79 additions & 0 deletions src/__tests__/iexClientRT.spec.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/* tslint:disable:no-submodule-imports */
import { first } from 'rxjs/operators'
/* tslint:enable:no-submodule-imports */

import { default as IEXClientRT, SocketIOBuilder } from '../iexClientRT'

const mockOn = (event: string, fn: () => any) => {

if (event === 'connect') {
fn.apply(null)
}
}

const mockSocketSpecBuilder = () => ({
emit: jest.fn(() => undefined),
on: jest.fn(mockOn)
})

describe('IEXClinetRT', () => {

describe('unit tests', () => {
let client: IEXClientRT
let mockSocket: SocketIOBuilder
let mockSocketSpec: {on: jest.Mock, emit: jest.Mock}

beforeEach(() => {
mockSocketSpec = mockSocketSpecBuilder()
mockSocket = jest.fn(() => mockSocketSpec)
client = new IEXClientRT(mockSocket)
})

it('Subscribes to IEX when observable is subscribed', () => {
expect(mockSocketSpec.emit)
.toHaveBeenCalledTimes(0)

client.observe('MSFT', 'TWLO')
.subscribe()

expect(mockSocketSpec.emit)
.toHaveBeenCalledWith('subscribe', 'MSFT')
expect(mockSocketSpec.emit)
.toHaveBeenCalledWith('subscribe', 'TWLO')
})

it('Unsubscribes from IEX when observables are unsubscribed', () => {
const os = client.observe('MSFT', 'TWLO')
const subs = [os.subscribe(), os.subscribe()]
subs.forEach(sub => {
sub.unsubscribe()
})

expect(mockSocketSpec.emit)
.toHaveBeenCalledWith('unsubscribe', 'MSFT')
expect(mockSocketSpec.emit)
.toHaveBeenCalledWith('unsubscribe', 'TWLO')
})
})

describe('integration tests', () => {
let client: IEXClientRT

beforeEach(() => {
client = new IEXClientRT()
})

it('Fetches realtime data', async () => {
const data = await client.observe('MSFT')
.pipe(first())
.toPromise()

expect(data.symbol)
.toEqual('MSFT')
expect(data.securityType)
.toEqual(expect.any(String))
expect(data.lastSalePrice)
.toEqual(expect.any(Number))
})
})
})
1 change: 1 addition & 0 deletions src/__tests__/index.spec.ts
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
/* tslint:disable:newline-per-chained-call */
import attribution from '../attribution'
import IEXClient from '../client'
import { Attribution as indexAttribution, IEXClient as indexClient } from '../index'
Expand Down
21 changes: 21 additions & 0 deletions src/apis/stocks.ts
Original file line number Diff line number Diff line change
Expand Up @@ -305,3 +305,24 @@ date: string | null
marketPercent: number
avgMarketPercent: number
}

/**
* Response type when fetching realtime data
* for stocks.
*/
export interface RealtimeQuoteResponse {
readonly symbol: string,
readonly sector: string,
readonly securityType: string,
readonly bidPrice: number,
readonly bidSize: number,
readonly askPrice: number,
readonly askSize: number,
readonly lastUpdated: number,
readonly lastSalePrice: number,
readonly lastSaleSize: number,
readonly lastSaleITime: number,
readonly volume: number,
readonly marketPercent: number,
readonly seq: number
}
4 changes: 2 additions & 2 deletions src/client.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@ import * as StocksAPI from './apis/stocks'
* way. It is usable in Browser, React Native, and NodeJS contexts.
*/
export default class IEXClient {
private fetchFunction: typeof fetch
private httpsEndpoint: string
private readonly fetchFunction: typeof fetch
private readonly httpsEndpoint: string

/**
* @param fetchFunction A function that is API compatible with the browser
Expand Down
192 changes: 192 additions & 0 deletions src/iexClientRT.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
import { merge, Observable, Observer } from 'rxjs'
import * as socketIO from 'socket.io-client'

import { RealtimeQuoteResponse } from './apis/stocks'

interface SubscriptionEntry {
readonly observable: Observable<RealtimeQuoteResponse>,
next(value: RealtimeQuoteResponse): any,
complete(): any,
readonly count: number
}

interface Subscriptions {
[topic: string]: SubscriptionEntry | undefined
}

interface EntryHandler {
onEmpty(): any,
onResumend(): any
}

/**
* Create an entry containing an observer that will emit values
* produced by Socket.IO
* @param handler Object specified what to do when the first observer
* subscribes to the topic or when all observers unsubscribe
*/
const createEntry = (handler: EntryHandler) => {
const observers: Array<Observer<RealtimeQuoteResponse>> = []
const observable = Observable.create((observer: Observer<RealtimeQuoteResponse>) => {
observers.push(observer)
if (observers.length === 1) {
handler.onResumend()
}

return () => {
const ix = observers.findIndex(candidate => observer === candidate)

if (ix >= 0) {
observers.splice(ix, 1)

if (observers.length === 0) {
handler.onEmpty()
}
}
}
})

return {
observable,
next(value: RealtimeQuoteResponse) {
observers.forEach(o => {
o.next(value)
})
},
complete() {
observers.forEach(o => {
o.complete()
})
observers.splice(0, observers.length)
},
get count() {
return observers.length
}
}
}

const endpoint = 'https://ws-api.iextrading.com/1.0/tops'

export type SocketIOBuilder = (endpoint: string) => SocketIOClient.Socket

const socketIOConnect = (host: string) => socketIO.connect(host)

/**
* Client for observing realtime data streams
* produced by IEX.
*/
export default class IEXClientRT {

private isReady: boolean

private readonly socket: SocketIOClient.Socket

private readonly subscriptions: Subscriptions

public constructor(socketBuilder?: SocketIOBuilder) {
/*tslint:disable:no-unsafe-any */
this.onConnect = this.onConnect.bind(this)
this.onMessage = this.onMessage.bind(this)
this.subscribePending = this.subscribePending.bind(this)
this.subscribeIfReady = this.subscribeIfReady.bind(this)
this.getOrCreateObservable = this.getOrCreateObservable.bind(this)
this.observe = this.observe.bind(this)
/*tslint:enable:no-unsafe-any */

this.isReady = false
this.subscriptions = {}
const createSocket = socketBuilder !== undefined ? socketBuilder : socketIOConnect
this.socket = createSocket(endpoint)
/*tslint:disable:no-unbound-method*/
this.socket.on('connect', this.onConnect)
this.socket.on('message', this.onMessage)
/*tslint:enable:no-unbound-method*/
}

/**
* Event handler that listens to messages emmited by IEX
* @param rawMessage The message from iex as a JSON string
*/
private onMessage(rawMessage: string) {
const message = JSON.parse(rawMessage) as RealtimeQuoteResponse
const topic = message.symbol
const entry = this.subscriptions[topic]
if (entry) {
entry.next(message)
}
}

/**
* Event handler called when the socket establishes a connection
* with IEX. It will create the data streams for all listeners
* currently subscribed to an IEX feed.
*/
private onConnect() {
this.isReady = true
this.subscribePending()
}

/**
* Subscribe all listeners awaiting data from IEX to their
* respective feed once the connection with IEX is established.
*/
private subscribePending() {
const pending = Object.keys(this.subscriptions)
.filter(key => {
const subscription = this.subscriptions[key]
return subscription && subscription.count > 0
})

if (pending.length > 0) {
this.socket.emit('subscribe', pending.join())
}
}

/**
* Subscribe to a topic from an IEX feed if a connection
* with IEX has been established
* @param topic The IEX feed topic being subscribed to
*/
private subscribeIfReady(topic: string) {
if (!this.isReady) {
return
}

this.socket.emit('subscribe', topic)
}

/**
* Get an observable for the specified topic. If a subscription to the topic
* already exists that observable is returned. Otherwise, a subscription is
* created and a new observable is created to handle that subscription.
* @param unnormalizedTopic The IEX topic that the subscriber is interested in.
*/
private getOrCreateObservable(unnormalizedTopic: string): Observable<RealtimeQuoteResponse> {
const topic = unnormalizedTopic.toUpperCase()
let observable = this.subscriptions[topic]
const onEmpty = () => {
this.socket.emit('unsubscribe', topic)
}
const onResumend = () => {
this.subscribeIfReady(topic)
}

if (!observable) {
this.subscriptions[topic] = observable = createEntry({
onEmpty,
onResumend
})
}

return observable.observable
}

/**
* Get an Observable that produces values whenever one of the
* securities passed as parameter changes
* @param topics The securities one wishes to subscribe
*/
public observe(...topics: string[]): Observable<RealtimeQuoteResponse> {
return merge(...topics.map(t => this.getOrCreateObservable(t)))
}
}
4 changes: 3 additions & 1 deletion src/index.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
import Attribution from './attribution'
import IEXClient from './client'
import IEXClientRT from './iexClientRT'

export {
Attribution,
IEXClient
IEXClient,
IEXClientRT
}