Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
438 changes: 438 additions & 0 deletions class04/mincemeat.py

Large diffs are not rendered by default.

85 changes: 85 additions & 0 deletions class04/mr_pagerank.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
# encoding: utf-8
import mincemeat


import sys
sys.path.append("../dfs/")

import client as dfs

def mapfn(k, v):
import sys
sys.path.append("../dfs/")
import client as dfs
N = 1000

for l in dfs.get_file_content(v):
l = l.strip()
if len(l) == 0:
continue
cols = l.split(" ")
if len(cols) != 2 and len(cols) != 4:
sys.stderr.write("Malformed record len=%d: %s" %(len(cols), str(l)))
continue

docid = cols[0]
outlinks = [] if cols[1] == "==" else cols[1].split(",")
if len(cols) == 4:
rank = float(cols[2])
iter_num = int(cols[3])
else:
rank = 1.0/N
iter_num = 0

if len(outlinks) == 0:
for d in range(0, N):
yield str(d), ("rank", rank/N)
else:
for d in outlinks:
yield str(d), ("rank", rank / len(outlinks))
yield docid, ("outlinks", cols[1])
yield docid, ("iter", iter_num + 1)


def reducefn(k, vs):
import sys
sys.path.append("../dfs/")
import client as dfs
N = 1000

new_rank = 0.0
iter_num = None
for v in vs:
if v[0] == "rank":
new_rank += v[1]
elif v[0] == "outlinks":
outlinks = v[1]
elif v[0] == "iter":
iter_num = v[1]
else:
sys.stderr.write("Malformed reduce task: key=%s value=%s" % (k, str(vs)))

if iter_num is None:
sys.stderr.write("Malformed reduce task (no iter num): key=%s value=%s" % (k, str(vs)))
return
new_rank = 0.85 * new_rank + 0.15 / N
out_filename = "/class04/iter%d/%s" % (iter_num, WORKER_NAME)
print "%s %s %f %d" % (k, outlinks, new_rank, iter_num)

return out_filename

s = mincemeat.Server()

import argparse
# читаем список файлов, из которых состоят матрицы
parser = argparse.ArgumentParser()
parser.add_argument("--toc", required = True)
args = parser.parse_args()

graph_files = [l for l in dfs.get_file_content(args.toc)]
# и подаем этот список на вход мапперам
s.map_input = mincemeat.MapInputSequence(graph_files)
s.mapfn = mapfn
s.reducefn = reducefn

results = s.run_server(password="")
55 changes: 55 additions & 0 deletions dfs/README.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
### TL;DR:
```
cd bigdata_2015/dfs
rm -r files data data2
mkdir data data2
python server.py --role master --port 8000
python server.py --role chunkserver --master localhost:8000 --chunkserver localhost --port 8001 --data data
python server.py --role chunkserver --master localhost:8000 --chunkserver localhost --port 8002 --data data2
```

### Подробности
DFS состоит из нескольких серверов: мастера и как минимум одного файлового сервера. Мастер хранит метаданные, файловые серверы хранят фрагменты в каталоге на диске. Метаданные включают в себя наш знакомый файл `files` -- отоюражение имен файлов в идентификаторы фрагментов. Информацию о расположении фрагментов (известную нам как `chunk_locations`) мастер получает от файловых серверов.

В командах выше запускаются с чистого листа 1 мастер и 2 файловых сервера. Мастер запускается на порту 8000, файловые серверы запускаются на портах 8001 и 8002 и хранят данные, соответственно, в каталогах `data` и `data2`.

### Клиентский код
В скрипте `client.py` есть уже знакомые функции `files()`, `chunk_locations()`, `get_chunk_data()`, `get_file_content()` -- они делают то же самое что и делали в предыдущих заданиях. Новые функции `create_chunk()` и `write_chunk_data()` создают новый фрагмент и записывают его содержимое. Вспомогательная функция `file_appender()` возвращает объект с методом `write()` который буферизует записи и при закрытии создаёт новый фрагмент и записывает в него накопленные записи. Этим объектом удобно пользоваться таким образом:

```
with dfs.file_appender("/foo/bar") as f:
f.write("Foo")
f.write("Bar")
# Здесь мы выйдем из контекста with, автоматически вызовется метод f.close()
# и в файле /foo/bar создастся новый фрагмент.
# Если файла не существовало, он появится
```

Клиентский код по умолчанию обращается к мастеру, висящему на `localhost:8000` (константа `MASTER_URL` в `client.py`)

### Необходимые модули
Для работы DFS требуется Python 2 и модуль `poster`. Модуль `poster` отсутствует в стандартном SDK, ставится командой pip install poster (возможно, запущенной от имени суперпользователя)

### Подводные камни
```
python server.py --role chunkserver --master localhost:8000 --chunkserver localhost --port 8001 --data data
Traceback (most recent call last):
File "server.py", line 309, in <module>
send_heartbeat()
File "server.py", line 153, in send_heartbeat
self.send_error(404, "No permission to list directory")
NameError: global name 'self' is not defined
```

Сделайте каталог `data`

```
File "print-corpus.py", line 20, in <module>
for l in metadata.get_file_content("/wikipedia/__toc__"):
File "../dfs/client.py", line 99, in get_file_content
for l in get_chunk_data(self.chunk_locations[chunk_id], chunk_id):
KeyError: u'6bf8debcb30e532944848c2329315072_0'
```

Если в `chunk_locations` не находится какой-то ключ, который есть в `files`, это означает, что файловый сервер, хранящий соответствующий фрагмент, не запущен или у него неправильно указан аргумент `--data`
###
127 changes: 127 additions & 0 deletions dfs/client.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
from collections import namedtuple
#Use this import if you're using Python3
import urllib2
from urllib2 import urlopen
#Use this import if you're using Python2
#from urllib2 import urlopen
import json
from poster.encode import multipart_encode
from poster.streaminghttp import register_openers
from contextlib import closing
import argparse

register_openers()

#MASTER_URL = "bigdata-hw01.barashev.net"
MASTER_URL = "localhost:8000"
def _json_object_hook(d): return namedtuple('X', d.keys())(*d.values())
def json2obj(data): return json.loads(data, object_hook=_json_object_hook)

def files():
resp = urlopen(url = "http://%s/files" % MASTER_URL, timeout=10)
if resp.getcode() != 200:
raise Exception("ERROR: can't get files from master")
return json2obj(resp.read().decode(encoding='UTF-8'))

def chunk_locations():
resp = urlopen(url = "http://%s/chunk_locations" % MASTER_URL, timeout=10)
if resp.getcode() != 200:
raise Exception("ERROR: can't get chunk locations from master")
return json2obj(resp.read().decode(encoding='UTF-8'))

def get_chunk_data(chunk_server_id, chunk_id):
resp = urlopen(url="http://%s/read?id=%s" % (chunk_server_id, chunk_id), timeout=10)
if resp.getcode() != 200:
raise Exception("ERROR: can't get chunk %s from chunkserver %s" % (chunk_id, chunk_server_id))
for line in resp:
yield line.decode(encoding='UTF-8')

def get_file_content(filename):
chunks = []
for f in files():
if f.name == filename:
chunks = f.chunks
if len(chunks) == 0:
return
clocs = {}
for c in chunk_locations():
clocs[c.id] = c.chunkserver

for chunk in chunks:
try:
loc = clocs[chunk]
if loc == "":
raise "ERROR: location of chunk %s is unknown" % chunk
for l in get_chunk_data(loc, chunk):
yield l.rstrip()

except StopIteration:
pass

def create_chunk(filename):
resp = urlopen(url = "http://%s/new_chunk?f=%s" % (MASTER_URL, filename))
if resp.getcode() != 200:
raise Exception("ERROR: can't create new chunk of file=%s" % filename)
return resp.read().split(" ")

def write_chunk_data(chunk_server_id, chunk_id, data):
datagen, headers = multipart_encode({"data": data, "chunk_id": chunk_id})
request = urllib2.Request("http://%s/write" % chunk_server_id, datagen, headers)
response = urllib2.urlopen(request)
if response.getcode() != 200:
raise Exception("ERROR: can't write chunk %s to chunkserver %s" % (chunk_id, chunk_server_id))

def file_appender(filename):
return closing(FileAppend(filename))

class FileAppend:
def __init__(self, filename):
self.filename = filename
self.lines = []

def write(self, line):
self.lines.append(line)

def close(self):
chunkserver, chunk_id = create_chunk(self.filename)
write_chunk_data(chunkserver, chunk_id, "\n".join(self.lines))

class CachedMetadata:
def __init__(self):
self.file_chunks = {}
for f in files():
self.file_chunks[f.name] = f.chunks
self.chunk_locations = {}
for cl in chunk_locations():
self.chunk_locations[cl.id] = cl.chunkserver

def get_file_content(self, filename):
for chunk_id in self.file_chunks[filename]:
for l in get_chunk_data(self.chunk_locations[chunk_id], chunk_id):
yield l

def put_file(from_file, to_file, master):
global MASTER_URL
MASTER_URL=master
with open(from_file) as f, file_appender(to_file) as buf:
for l in f:
buf.write(l.rstrip())

def get_file(from_file, master):
global MASTER_URL
MASTER_URL=master
for l in get_file_content(from_file):
print l

if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--command", required = True)
parser.add_argument("--f")
parser.add_argument("--t")
parser.add_argument("--master", required=True, default="localhost:8000")
args = parser.parse_args()

if "put" == args.command:
put_file(args.f, args.t, args.master)
elif "get" == args.command:
get_file(args.f, args.master)
Loading