import array
import gzip
import matplotlib.pyplot as plt
import random
import tensorflow as tf
from collections import defaultdict
from scipy.spatial import distance
from sklearn.manifold import TSNE
Data is available at http://cseweb.ucsd.edu/~jmcauley/pml/data/. Download and save to your own directory
dataDir = "/home/jmcauley/pml_data/"
This code reads image data in a specific binary format, described here: http://jmcauley.ucsd.edu/data/amazon/links.html
def readImageFeatures(path):
f = open(path, 'rb')
while True:
asin = f.read(10).decode('utf-8')
if len(asin) < 10: break
a = array.array('f')
a.fromfile(f, 4096)
yield str(asin), a.tolist()
def parse(path):
g = gzip.open(path, 'r')
for l in g:
yield eval(l)
X = []
asinPos = {}
for asin, f in readImageFeatures(dataDir + 'image_features_Baby.b'):
asinPos[asin] = len(X)
X.append(tf.constant(f, shape=[1,len(f)]))
Extract metadata describing ground-truth compatibility relationships among items
compat = []
asinList = list(asinPos.keys())
for l in parse(dataDir + 'meta_Baby.json.gz'):
a1 = l['asin']
if 'related' in l and 'also_bought' in l['related']:
for a2 in l['related']['also_bought']:
if a1 in asinPos and a2 in asinPos:
compat.append((asinPos[a1],asinPos[a2],1))
compat.append((random.randint(0, len(X)-1),random.randint(0, len(X)-1), 0))
Number of compatible pairs
len(compat)
featDim = X[0].shape[1] # Image feature dimensionality
styleDim = 5 # Dimensionality of compressed (projected) representations
featDim
Define the compatibility model
optimizer = tf.keras.optimizers.Adam(0.00001)
class CompatibilityModel(tf.keras.Model):
def __init__(self, featDim, styleDim):
super(CompatibilityModel, self).__init__()
self.E1 = tf.Variable(tf.random.normal([featDim,styleDim],stddev=0.001))
self.E2 = tf.Variable(tf.random.normal([featDim,styleDim],stddev=0.001))
self.c = tf.Variable(0.0)
def predict(self, x1, x2):
s1 = tf.matmul(x1, self.E1)
s2 = tf.matmul(x2, self.E2)
return tf.math.sigmoid(self.c - tf.reduce_sum(tf.math.squared_difference(s1,s2)))
def call(self, x1, x2, y):
return -tf.math.log(self.predict(x1,x2)*(2*y - 1) - y + 1)
model = CompatibilityModel(featDim, styleDim)
def trainingStep(compat):
with tf.GradientTape() as tape:
(i1,i2,y) = random.choice(compat)
x1,x2 = X[i1],X[i2]
objective = model(x1,x2,y)
gradients = tape.gradient(objective, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return objective.numpy()
for i in range(50000):
obj = trainingStep(compat)
if (i % 5000 == 4999): print("iteration " + str(i+1) + ", objective = " + str(obj))
For these exercises we use musical instrument data; we do so because (a) it has fine-grained subcategories (e.g. "accessories", "guitars", etc.) which can be used for these exercises; and (b) because it is small. These exercises might ideally be run with a large category of (e.g.) clothing images, though such datasets are larger and more difficult to work with.
First collect the subcategories associated with each item (for use in Exercise 9.3)
categories = dict()
itemsPerCategory = defaultdict(set)
for l in parse(dataDir + 'meta_Musical_Instruments.json.gz'):
cats = l['categories'][0]
if len(cats) < 2:
continue
cat = cats[1] # Extract the "second level" (or sub-) category, for products that have them (ignore others)
categories[l['asin']] = cat
itemsPerCategory[cat].add(l['asin'])
Read image data
X = []
asinPos = {}
posPerCategory = defaultdict(set)
for asin, f in readImageFeatures(dataDir + 'image_features_Musical_Instruments.b'):
if not asin in categories: # Skip items for which we don't have a category
continue
asinPos[asin] = len(X)
posPerCategory[categories[asin]].add(asinPos[asin])
X.append(tf.constant(f, shape=[1,len(f)]))
Extract compatibility relationships. Build our collection of "difficult" negatives consisting of items from the same category.
compat = []
asinList = list(asinPos.keys())
for l in parse(dataDir + 'meta_Musical_Instruments.json.gz'):
a1 = l['asin']
if not a1 in categories:
continue
cat = categories[a1]
if 'related' in l and 'also_bought' in l['related']:
for a2 in l['related']['also_bought']:
if not a2 in categories or categories[a2] != cat:
continue # Only consider positive relations of the same category
if a1 in asinPos and a2 in asinPos:
compat.append((asinPos[a1],asinPos[a2],1))
negSameCat = random.sample(posPerCategory[cat],1)[0]
compat.append((asinPos[a1],negSameCat, 0))
len(compat)
featDim = X[0].shape[1] # Image feature dimensionality
styleDim = 5 # Dimensionality of compressed (projected) representations
optimizer = tf.keras.optimizers.Adam(0.00001)
class CompatibilityModel(tf.keras.Model):
def __init__(self, featDim, styleDim):
super(CompatibilityModel, self).__init__()
self.E1 = tf.Variable(tf.random.normal([featDim,styleDim],stddev=0.001))
self.E2 = tf.Variable(tf.random.normal([featDim,styleDim],stddev=0.001))
self.c = tf.Variable(0.0)
def predict(self, x1, x2):
s1 = tf.matmul(x1, self.E1)
s2 = tf.matmul(x2, self.E2)
return tf.math.sigmoid(self.c - tf.reduce_sum(tf.math.squared_difference(s1,s2)))
def call(self, x1, x2, y):
return -tf.math.log(self.predict(x1, x2)*(2*y - 1) - y + 1)
Modify the model to compute similarity based on the inner product rather than Euclidean distance
class CompatibilityModelInner(tf.keras.Model):
def __init__(self, featDim, styleDim):
super(CompatibilityModel, self).__init__()
self.E1 = tf.Variable(tf.random.normal([featDim,styleDim],stddev=0.001))
self.E2 = tf.Variable(tf.random.normal([featDim,styleDim],stddev=0.001))
self.c = tf.Variable(0.0)
def predict(self, x1, x2):
s1 = tf.matmul(x1, self.E1)
s2 = tf.matmul(x2, self.E2)
return tf.math.sigmoid(self.c + tf.tensordot(s1, s2, 1))
def call(self, x1, x2, y):
return -tf.math.log(self.predict(x1, x2)*(2*y - 1) - y + 1)
Compare models based on the inner product and Euclidean distance. Both make use of "difficult" negatives (Exercise 9.3)
model1 = CompatibilityModel(featDim, styleDim)
model2 = CompatibilityModel(featDim, styleDim)
def trainingStep(model, compat):
with tf.GradientTape() as tape:
(i1,i2,y) = random.choice(compat)
x1,x2 = X[i1],X[i2]
objective = model(x1,x2,y)
gradients = tape.gradient(objective, model.trainable_variables)
optimizer.apply_gradients(zip(gradients, model.trainable_variables))
return objective.numpy()
random.shuffle(compat)
compatTrain = compat[:700000]
compatTest = compat[700000:]
for i in range(50000):
obj = trainingStep(model1, compat)
if (i % 5000 == 4999): print("iteration " + str(i+1) + ", objective = " + str(obj))
for i in range(50000):
obj = trainingStep(model2, compat)
if (i % 5000 == 4999): print("iteration " + str(i+1) + ", objective = " + str(obj))
Compute accuracy (what fraction of positive relationships were predicted as positive)
acc = 0
for (i1,i2,y) in compatTest:
x1,x2 = X[i1],X[i2]
p = model1(x1,x2,y)
if (p.numpy() > 0.5) == (y == 1):
acc += 1
acc / len(compatTest)
acc = 0
for (i1,i2,y) in compatTest:
x1,x2 = X[i1],X[i2]
p = model2(x1,x2,y)
if (p.numpy() > 0.5) == (y == 1):
acc += 1
acc / len(compatTest)
t-SNE embedding
Xembed = []
for asin in asinList:
i = asinPos[asin]
x = X[i]
embedded = list(tf.matmul(x, model1.E1).numpy()[0])
Xembed.append(embedded)
Xembed2 = TSNE(n_components=2).fit_transform(Xembed)
scatterPlotsX = defaultdict(list)
scatterPlotsY = defaultdict(list)
for xy, asin in zip(Xembed2, asinList):
if asin in categories:
cat = categories[asin]
try:
scatterPlotsX[cat].append(xy[0])
scatterPlotsY[cat].append(xy[1])
except Exception as e:
pass
Scatterplots by subcategory aren't particularly interesting in this case. Try e.g. price or brand for more compelling examples.
for cat in ['Instrument Accessories', 'Stringed Instruments', 'Guitars']:
plt.scatter(scatterPlotsX[cat],
scatterPlotsY[cat], color='lightgrey', lw = 0, label = cat)
plt.legend(loc='lower left')
plt.xticks([])
plt.yticks([])
plt.xlabel("first embedded dimension ")
plt.ylabel("second embedded dimension")
plt.title("\emph{TSNE}-based item embeddings")
plt.show()