Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"import numpy as np\n",
"import torch\n",
"import torch.nn as nn\n",
"from torch import hub\n",
"from script import vggish_input, vggish_params\n",
"import os"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"def list_sort(x : str):\n",
" if '.' in x:\n",
" # 将文件名字用_进行分割\n",
" x = x.rpartition('_')\n",
" # 将x用.进行分割,最后拿到数字\n",
" x = x[0][2:4]\n",
" else:\n",
" x = 0\n",
" return int(x)"
]
}
],
"metadata": {
"kernelspec": {
"display_name": "my_env",
"language": "python",
"name": "my_env"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.9.12"
}
},
"nbformat": 4,
"nbformat_minor": 4
}
189 changes: 189 additions & 0 deletions auditory-project-code/auditory/.ipynb_checkpoints/vggish-checkpoint.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,189 @@
import numpy as np
import torch
import torch.nn as nn
from torch import hub

from . import vggish_input, vggish_params


class VGG(nn.Module):
def __init__(self, features):
super(VGG, self).__init__()
self.features = features
self.embeddings = nn.Sequential(
nn.Linear(512 * 4 * 6, 4096),
nn.ReLU(True),
nn.Linear(4096, 4096),
nn.ReLU(True),
nn.Linear(4096, 128),
nn.ReLU(True))

def forward(self, x):
x = self.features(x)

# Transpose the output from features to
# remain compatible with vggish embeddings
x = torch.transpose(x, 1, 3)
x = torch.transpose(x, 1, 2)
x = x.contiguous()
x = x.view(x.size(0), -1)

return self.embeddings(x)


class Postprocessor(nn.Module):
"""Post-processes VGGish embeddings. Returns a torch.Tensor instead of a
numpy array in order to preserve the gradient.

"The initial release of AudioSet included 128-D VGGish embeddings for each
segment of AudioSet. These released embeddings were produced by applying
a PCA transformation (technically, a whitening transform is included as well)
and 8-bit quantization to the raw embedding output from VGGish, in order to
stay compatible with the YouTube-8M project which provides visual embeddings
in the same format for a large set of YouTube videos. This class implements
the same PCA (with whitening) and quantization transformations."
"""

def __init__(self):
"""Constructs a postprocessor."""
super(Postprocessor, self).__init__()
# Create empty matrix, for user's state_dict to load
self.pca_eigen_vectors = torch.empty(
(vggish_params.EMBEDDING_SIZE, vggish_params.EMBEDDING_SIZE,),
dtype=torch.float,
)
self.pca_means = torch.empty(
(vggish_params.EMBEDDING_SIZE, 1), dtype=torch.float
)

self.pca_eigen_vectors = nn.Parameter(self.pca_eigen_vectors, requires_grad=False)
self.pca_means = nn.Parameter(self.pca_means, requires_grad=False)

def postprocess(self, embeddings_batch):
"""Applies tensor postprocessing to a batch of embeddings.

Args:
embeddings_batch: An tensor of shape [batch_size, embedding_size]
containing output from the embedding layer of VGGish.

Returns:
A tensor of the same shape as the input, containing the PCA-transformed,
quantized, and clipped version of the input.
"""
assert len(embeddings_batch.shape) == 2, "Expected 2-d batch, got %r" % (
embeddings_batch.shape,
)
assert (
embeddings_batch.shape[1] == vggish_params.EMBEDDING_SIZE
), "Bad batch shape: %r" % (embeddings_batch.shape,)

# Apply PCA.
# - Embeddings come in as [batch_size, embedding_size].
# - Transpose to [embedding_size, batch_size].
# - Subtract pca_means column vector from each column.
# - Premultiply by PCA matrix of shape [output_dims, input_dims]
# where both are are equal to embedding_size in our case.
# - Transpose result back to [batch_size, embedding_size].
pca_applied = torch.mm(self.pca_eigen_vectors, (embeddings_batch.t() - self.pca_means)).t()

# Quantize by:
# - clipping to [min, max] range
clipped_embeddings = torch.clamp(
pca_applied, vggish_params.QUANTIZE_MIN_VAL, vggish_params.QUANTIZE_MAX_VAL
)
# - convert to 8-bit in range [0.0, 255.0]
quantized_embeddings = torch.round(
(clipped_embeddings - vggish_params.QUANTIZE_MIN_VAL)
* (
255.0
/ (vggish_params.QUANTIZE_MAX_VAL - vggish_params.QUANTIZE_MIN_VAL)
)
)
return torch.squeeze(quantized_embeddings)

def forward(self, x):
return self.postprocess(x)


def make_layers():
layers = []
in_channels = 1
for v in [64, "M", 128, "M", 256, 256, "M", 512, 512, "M"]:
if v == "M":
layers += [nn.MaxPool2d(kernel_size=2, stride=2)]
else:
conv2d = nn.Conv2d(in_channels, v, kernel_size=3, padding=1)
layers += [conv2d, nn.ReLU(inplace=True)]
in_channels = v
return nn.Sequential(*layers)


def _vgg():
return VGG(make_layers())


# def _spectrogram():
# config = dict(
# sr=16000,
# n_fft=400,
# n_mels=64,
# hop_length=160,
# window="hann",
# center=False,
# pad_mode="reflect",
# htk=True,
# fmin=125,
# fmax=7500,
# output_format='Magnitude',
# # device=device,
# )
# return Spectrogram.MelSpectrogram(**config)


class VGGish(VGG):
def __init__(self, urls, device=None, pretrained=True, preprocess=True, postprocess=True, progress=True):
super().__init__(make_layers())
if pretrained:
state_dict = hub.load_state_dict_from_url(urls['vggish'], progress=progress)
super().load_state_dict(state_dict)

if device is None:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
self.device = device
self.preprocess = preprocess
self.postprocess = postprocess
if self.postprocess:
self.pproc = Postprocessor()
if pretrained:
state_dict = hub.load_state_dict_from_url(urls['pca'], progress=progress)
# TODO: Convert the state_dict to torch
state_dict[vggish_params.PCA_EIGEN_VECTORS_NAME] = torch.as_tensor(
state_dict[vggish_params.PCA_EIGEN_VECTORS_NAME], dtype=torch.float
)
state_dict[vggish_params.PCA_MEANS_NAME] = torch.as_tensor(
state_dict[vggish_params.PCA_MEANS_NAME].reshape(-1, 1), dtype=torch.float
)

self.pproc.load_state_dict(state_dict)
self.to(self.device)

def forward(self, x, fs=None):
if self.preprocess:
x = self._preprocess(x, fs)
x = x.to(self.device)
x = VGG.forward(self, x)
if self.postprocess:
x = self._postprocess(x)
return x

def _preprocess(self, x, fs):
if isinstance(x, np.ndarray):
x = vggish_input.waveform_to_examples(x, fs)
elif isinstance(x, str):
x = vggish_input.wavfile_to_examples(x)
else:
raise AttributeError
return x

def _postprocess(self, x):
return self.pproc(x)
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
# Copyright 2017 The TensorFlow Authors All Rights Reserved.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
# ==============================================================================

"""Defines the 'VGGish' model used to generate AudioSet embedding features.

The public AudioSet release (https://research.google.com/audioset/download.html)
includes 128-D features extracted from the embedding layer of a VGG-like model
that was trained on a large Google-internal YouTube dataset. Here we provide
a TF-Slim definition of the same model, without any dependences on libraries
internal to Google. We call it 'VGGish'.

Note that we only define the model up to the embedding layer, which is the
penultimate layer before the final classifier layer. We also provide various
hyperparameter values (in vggish_params.py) that were used to train this model
internally.

For comparison, here is TF-Slim's VGG definition:
https://github.com/tensorflow/models/blob/master/research/slim/nets/vgg.py
"""

import tensorflow.compat.v1 as tf
import tf_slim as slim

import vggish_params as params


def define_vggish_slim(features_tensor=None, training=False):
"""Defines the VGGish TensorFlow model.

All ops are created in the current default graph, under the scope 'vggish/'.

The input is either a tensor passed in via the optional 'features_tensor'
argument or a placeholder created below named 'vggish/input_features'. The
input is expected to have dtype float32 and shape [batch_size, num_frames,
num_bands] where batch_size is variable and num_frames and num_bands are
constants, and [num_frames, num_bands] represents a log-mel-scale spectrogram
patch covering num_bands frequency bands and num_frames time frames (where
each frame step is usually 10ms). This is produced by computing the stabilized
log(mel-spectrogram + params.LOG_OFFSET). The output is a tensor named
'vggish/embedding' which produces the pre-activation values of a 128-D
embedding layer, which is usually the penultimate layer when used as part of a
full model with a final classifier layer.

Args:
features_tensor: If not None, the tensor containing the input features.
If None, a placeholder input is created.
training: If true, all parameters are marked trainable.

Returns:
The op 'vggish/embeddings'.
"""
# Defaults:
# - All weights are initialized to N(0, INIT_STDDEV).
# - All biases are initialized to 0.
# - All activations are ReLU.
# - All convolutions are 3x3 with stride 1 and SAME padding.
# - All max-pools are 2x2 with stride 2 and SAME padding.
with slim.arg_scope([slim.conv2d, slim.fully_connected],
weights_initializer=tf.truncated_normal_initializer(
stddev=params.INIT_STDDEV),
biases_initializer=tf.zeros_initializer(),
activation_fn=tf.nn.relu,
trainable=training), \
slim.arg_scope([slim.conv2d],
kernel_size=[3, 3], stride=1, padding='SAME'), \
slim.arg_scope([slim.max_pool2d],
kernel_size=[2, 2], stride=2, padding='SAME'), \
tf.variable_scope('vggish'):
# Input: a batch of 2-D log-mel-spectrogram patches.
if features_tensor is None:
features_tensor = tf.placeholder(
tf.float32, shape=(None, params.NUM_FRAMES, params.NUM_BANDS),
name='input_features')
# Reshape to 4-D so that we can convolve a batch with conv2d().
net = tf.reshape(features_tensor,
[-1, params.NUM_FRAMES, params.NUM_BANDS, 1])

# The VGG stack of alternating convolutions and max-pools.
net = slim.conv2d(net, 64, scope='conv1')
net = slim.max_pool2d(net, scope='pool1')
net = slim.conv2d(net, 128, scope='conv2')
net = slim.max_pool2d(net, scope='pool2')
net = slim.repeat(net, 2, slim.conv2d, 256, scope='conv3')
net = slim.max_pool2d(net, scope='pool3')
net = slim.repeat(net, 2, slim.conv2d, 512, scope='conv4')
net = slim.max_pool2d(net, scope='pool4')

# Flatten before entering fully-connected layers
net = slim.flatten(net)
net = slim.repeat(net, 2, slim.fully_connected, 4096, scope='fc1')
# The embedding layer.
net = slim.fully_connected(net, params.EMBEDDING_SIZE, scope='fc2',
activation_fn=None)
return tf.identity(net, name='embedding')


def load_vggish_slim_checkpoint(session, checkpoint_path):
"""Loads a pre-trained VGGish-compatible checkpoint.

This function can be used as an initialization function (referred to as
init_fn in TensorFlow documentation) which is called in a Session after
initializating all variables. When used as an init_fn, this will load
a pre-trained checkpoint that is compatible with the VGGish model
definition. Only variables defined by VGGish will be loaded.

Args:
session: an active TensorFlow session.
checkpoint_path: path to a file containing a checkpoint that is
compatible with the VGGish model definition.
"""
# Get the list of names of all VGGish variables that exist in
# the checkpoint (i.e., all inference-mode VGGish variables).
with tf.Graph().as_default():
define_vggish_slim(training=False)
vggish_var_names = [v.name for v in tf.global_variables()]

# Get the list of all currently existing variables that match
# the list of variable names we just computed.
vggish_vars = [v for v in tf.global_variables() if v.name in vggish_var_names]

# Use a Saver to restore just the variables selected above.
saver = tf.train.Saver(vggish_vars, name='vggish_load_pretrained',
write_version=1)
saver.restore(session, checkpoint_path)
Loading