DeepLearning/NLP

[Word2Vec] CBOW - Python 코드

헬로희 2025. 2. 25. 13:03
728x90

1. Word2Vec - CBOW

CBOW에 대한 정의는 아래 게시글 참조

 

[NLP] Word Embedding

자연어처리 공부중입니다. 잘못된 부분은 편히 댓글 부탁드립니다.1. 워드 임베딩이란?단어를 인공신경망 학습을 통해 벡터화하는 하는 것즉 텍스트를 숫자로 변환하는 방법2. 희소 표현(Sparse Re

hello-heehee.tistory.com


2. CODE

1) Gensim

 

Gensim: topic modelling for humans

Efficient topic modelling in Python

radimrehurek.com

가장 많이 사용되고 상용화되어있는 Gensim의 Word2Vec이다.

from gensim.models import Word2Vec
from gensim.models.word2vec import Text8Corpus

# 예제 문장 데이터
documents = [
    ["이", "문장은", "예제입니다"],
    ["자연어", "처리는", "흥미롭습니다"],
    ["딥러닝", "기반의", "자연어", "처리"],
    ["word2vec", "모델을", "학습합니다"],
    ["CBOW", "방식을", "사용합니다"]
]

# Word2Vec 모델 학습 (CBOW 방식)
model = Word2Vec(sentences=documents, vector_size=100, window=5, min_count=1, sg=0)  # sg=0이면 CBOW, sg=1이면 Skip-gram

# 단어 벡터 출력
word = "자연어"
if word in model.wv:
    print(f"'{word}'의 벡터 표현:")
    print(model.wv[word])

# 유사한 단어 찾기
similar_words = model.wv.most_similar(word, topn=5)
print(f"'{word}'와 유사한 단어들:", similar_words)

Wikidocs 에서도 Gensim을 활용한 Word2Vec이 설명되어있으니 참고

 

09-03 영어/한국어 Word2Vec 실습

gensim 패키지에서 제공하는 이미 구현된 Word2Vec을 사용하여 영어와 한국어 데이터를 학습합니다. ## 1. 영어 Word2Vec 만들기 파이썬의 gensim 패키지…

wikidocs.net

Gensim은 다른 개발자들이 정리해놓은 것들도 많으니 생략하겠습니다. 추후에 공부후 따로 작성하겠습니다.


2) github/Ksang

 

GitHub - ksang/word2vec: A word2vec CBOW and Skip-gram implementation in PyTorch

A word2vec CBOW and Skip-gram implementation in PyTorch - ksang/word2vec

github.com

Pytorch로 사용할 수 있는 방식도 위 개발자가 구현해놔서 정리해보았다.
이 Word2Vec은 특정 라이브러리가 있는 건 아니고, Ksang이라는 개발자가 직접 코드를 개발하였다.

  • model.py
import torch
import torch.nn.functional as F
import numpy as np

TABLE_SIZE = 1e8

def create_sample_table(word_count):
    """ Create negative sample table for vocabulary, words with
        higher frequency will have higher occurrences in table.
    """
    table = []
    frequency = np.power(np.array(word_count), 0.75)
    sum_frequency = sum(frequency)
    ratio = frequency / sum_frequency
    count = np.round(ratio * TABLE_SIZE)
    for word_idx, c in enumerate(count):
        table += [word_idx] * int(c)
    return np.array(table)

#Word2Vec의 SkipGram
class SkipGramModel(torch.nn.Module):
    """ Center word as input, context words as target.
        Objective is to maximize the score of map from input to target.
    """
    def __init__(self, device, vocabulary_size, embedding_dim, neg_num=0, word_count=[]):
        super(SkipGramModel, self).__init__()
        self.device = device
        self.neg_num = neg_num
        self.embeddings = torch.nn.Embedding(vocabulary_size, embedding_dim)
        initrange = 0.5 / embedding_dim
        self.embeddings.weight.data.uniform_(-initrange, initrange)
        if self.neg_num > 0:
            self.table = create_sample_table(word_count)

    def forward(self, centers, contexts):
        batch_size = len(centers)
        u_embeds = self.embeddings(centers).view(batch_size,1,-1)
        v_embeds = self.embeddings(contexts).view(batch_size,1,-1)
        score  = torch.bmm(u_embeds, v_embeds.transpose(1,2)).squeeze()
        loss = F.logsigmoid(score).squeeze()
        if self.neg_num > 0:
            neg_contexts = torch.LongTensor(np.random.choice(self.table, size=(batch_size, self.neg_num))).to(self.device)
            neg_v_embeds = self.embeddings(neg_contexts)
            neg_score = torch.bmm(u_embeds, neg_v_embeds.transpose(1,2)).squeeze()
            neg_score = torch.sum(neg_score, dim=1)
            neg_score = F.logsigmoid(-1*neg_score).squeeze()
            loss += neg_score
        return -1 * loss.sum()

    def get_embeddings(self):
        return self.embeddings.weight.data

#Word2Vec의 CBOW
class CBOWModel(torch.nn.Module):
    """ Context words as input, returns possiblity distribution
        prediction of center word (target).
    """
    def __init__(self, device, vocabulary_size, embedding_dim):
        super(CBOWModel, self).__init__()
        self.device = device
        self.embeddings = torch.nn.Embedding(vocabulary_size, embedding_dim)
        initrange = 0.5 / embedding_dim
        self.embeddings.weight.data.uniform_(-initrange, initrange) #Initialize
        self.linear1 = torch.nn.Linear(embedding_dim, vocabulary_size)
        
	# CBOW 앞단
    def forward(self, contexts):
        # input
        embeds = self.embeddings(contexts)
        # projection
        add_embeds = torch.sum(embeds, dim=1)
        # output
        out = self.linear1(add_embeds)
        log_probs = F.log_softmax(out, dim=1)
        return log_probs

	#Embeddings의 Weight값
    def get_embeddings(self):
        return self.embeddings.weight.data
        
    #추가) Linears의 Weight값
    def get_linears(self):
    	return self.linear1.weight.data

Word2Vec의 CBOW와 SkipGram의 기본적인 모델부분이다.
CBOW기준 코드설명을 적어놨고, Weight값을 보기 위해 코드를 추가한 부분도 있다.


  • inference.py
import pickle

def save_embeddings(filename, embeddings, linears, dictionary):
    """Embeddings and reverse dictionary serialization and dump to a file."""
    data = {
        'emb':  embeddings,
        'lin' : linears, #추가
        'dict': dictionary
    }
    file = open(filename, 'wb')
    print("Saving embeddings to file:", filename)
    pickle.dump(data, file)

class Word2Vec(object):
    """Inference interface of Word2Vec embeddings
        Before inference the embdedding result of a word, data need to be initialized
        by calling method from_file or from_object.
    """
    def __init__(self):
        self.embeddings = None
        self.linears = None #추가
        self.dictionary = None
        
    def from_file(self, filename):
        file = open(filename, 'rb')
        data = pickle.load(file)
        self.embeddings = data['emb']
        self.linears = data['line'] #추가
        self.dictionary = data['dict']

    def from_object(self, embeddings, linears, dictionary):
        self.embeddings = embeddings
        self.linears = linears #추가
        self.dictionary = dictionary

    def inference(self, word):
        assert self.embeddings is not None and self.dictionary is not None,\
            'Embeddings not initialized, use from_file or from_object to load data.'
        word_idx = self.dictionary.get(word)
        # Unknown word returns UNK's word_idx
        if word_idx is None:
            word_idx = 0
        return self.embeddings[word_idx]

Model을 Load하고 Inference하는 코드부분으로 model.py에서 추가한 weight function을 inference에서 사용할 수 있도록 일부 코드 수정이 되어있다.
이 부분은 코드 설명이 쉬워서 따로 추가한 건 없고 기존 모델과는 달리 pickle로 모델을 저장하고 불러온다는게 조금 다른 특색이다. pickle이 압축하여 전송/저장 하고 불러와서 해당 압축을 푸는 방식인데, 아마 용량측면에서 사용한 것 같다.


  • word2vec.py
import argparse
import zipfile
import re
import collections
import numpy as np
from six.moves import xrange
import random
import torch
import timeit
from torch.autograd import Variable
from models import SkipGramModel
from models import CBOWModel
from inference import save_embeddings

model_list = ['CBOW', 'skipgram']

cmd_parser = argparse.ArgumentParser(description=None)
# Data arguments
cmd_parser.add_argument('-d', '--data', default='data/text8.zip',
                        help='Data file for word2vec training.')
cmd_parser.add_argument('-o', '--output', default='embeddings.bin',
                        help='Output embeddings filename.')
cmd_parser.add_argument('-s', '--size', default=50000, type=int,
                        help='Vocabulary size.')
# Model training arguments
cmd_parser.add_argument('-m', '--mode', default='skipgram', choices=model_list,
                        help='Training model.')
cmd_parser.add_argument('-bs', '--batch_size', default=128, type=int,
                        help='Training batch size.')
cmd_parser.add_argument('-ns', '--num_skips', default=2, type=int,
                        help='How many times to reuse an input to generate a label.')
cmd_parser.add_argument('-sw', '--skip_window', default=1, type=int,
                        help='How many words to consider left and right.')
cmd_parser.add_argument('-ed', '--embedding_dim', default=128, type=int,
                        help='Dimension of the embedding vector.')
cmd_parser.add_argument('-lr', '--learning_rate', default=0.001, type=float,
                        help='Learning rate')
cmd_parser.add_argument('-i', '--num_steps', default=10000, type=int,
                        help='Number of steps to run.')
cmd_parser.add_argument('-ne', '--negative_example', default=5, type=int,
                        help='Number of negative examples.')
cmd_parser.add_argument('-c', '--clip', default=1.0, type=float,
                        help='Clip gradient norm value.')

# Device
cmd_parser.add_argument('-dc', '--disable_cuda', default=False, action='store_true',
                        help='Explicitly disable cuda and GPU.')

def read_data(filename):
    """Extract the first file enclosed in a zip file as a list of words."""
    if filename.endswith('.zip'):
        with zipfile.ZipFile(filename) as f:
            text = f.read(f.namelist()[0]).decode('ascii')
    else:
        with open(filename, "r") as f:
            text = f.read()
    return [word.lower() for word in re.compile('\w+').findall(text)]

def build_dataset(words, n_words):
    """Process raw inputs into a dataset.
        Returns:
            data        list of codes (integers from 0 to vocabulary_size-1).
                        This is the original text but words are replaced by their codes
            count       list of words(strings) to count of occurrences
            dictionary  map of words(strings) to their codes(integers)
            reverse_dictionary  maps codes(integers) to words(strings)
    """
    count = [['UNK', -1]]
    count.extend(collections.Counter(words).most_common(n_words - 1))
    dictionary = dict()
    for word, _ in count:
        dictionary[word] = len(dictionary)
    data = list()
    unk_count = 0
    for word in words:
        index = dictionary.get(word, 0)
        if index == 0:  # dictionary['UNK']
            unk_count += 1
        data.append(index)
    count[0][1] = unk_count
    reversed_dictionary = dict(zip(dictionary.values(), dictionary.keys()))
    return data, count, dictionary, reversed_dictionary

def generate_batch(device, data, data_index, batch_size, num_skips, skip_window):
    """Generates a batch of training data
        returns:
            centers:      a list of center word indexes for this batch.
            contexts:     a list of contexts indexes for this batch.
            data_index: current data index for next batch.
    """
    assert batch_size % num_skips == 0
    assert num_skips <= 2 * skip_window
    centers = np.ndarray(shape=(batch_size), dtype=np.int32)
    contexts = np.ndarray(shape=(batch_size, 1), dtype=np.int32)
    span = 2 * skip_window + 1  # [ skip_window target skip_window ]
    buffer = collections.deque(maxlen=span)
    if data_index + span > len(data):
        data_index = 0
    buffer.extend(data[data_index:data_index + span])
    data_index += span
    for i in range(batch_size // num_skips):
        context_words = [w for w in range(span) if w != skip_window]
        words_to_use = random.sample(context_words, num_skips)
        for j, context_word in enumerate(words_to_use):
            centers[i * num_skips + j] = buffer[skip_window]
            contexts[i * num_skips + j, 0] = buffer[context_word]
        if data_index == len(data):
            for word in data[:span]:
                buffer.append(word)
            data_index = span
        else:
            buffer.append(data[data_index])
            data_index += 1
    # Backtrack a little bit to avoid skipping words in the end of a batch
    data_index = (data_index + len(data) - span) % len(data)
    centers = torch.LongTensor(centers).to(device)
    contexts = torch.LongTensor(contexts).to(device)
    return centers, contexts, data_index

def get_deivice(disable_cuda):
    """Get CPU/GPU device
    """
    if not disable_cuda and torch.cuda.is_available():
        device = torch.device('cuda')
    else:
        device = torch.device('cpu')
    return device

def train(device, data, word_count, mode, vocabulary_size, embedding_dim, batch_size,
          num_skips, skip_window, num_steps, learning_rate, neg_num, clip):
    """Training and backpropagation process, returns final embedding as result"""
    if mode == 'CBOW':
        model = CBOWModel(device, vocabulary_size, embedding_dim)
    elif mode == 'skipgram':
        model = SkipGramModel(device, vocabulary_size, embedding_dim, neg_num, word_count)
    else:
        raise ValueError("Model \"%s\" not supported" % model)
    model.to(device)
    print("Start training on device:", device)
    optimizer = torch.optim.Adam(
        model.parameters(), lr=learning_rate)
    loss_function = torch.nn.NLLLoss()
    data_index = 0
    loss_val = 0
    for i in xrange(num_steps):
        # prepare feed data and forward pass
        centers, contexts, data_index = generate_batch(device, data, data_index,
                                                       batch_size, num_skips, skip_window)
        if mode == 'CBOW':
            y_pred = model(contexts)
            loss = loss_function(y_pred, centers)
        elif mode == 'skipgram':
            loss = model(centers, contexts)
        else:
            raise ValueError("Model \"%s\" not supported" % model)
        # Zero gradients, perform a backward pass, and update the weights.
        optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(model.parameters(), clip)
        optimizer.step()
        # Print loss value at certain step
        loss_val += loss.item()
        if i > 0 and i % (num_steps/100) == 0:
            print('  Average loss at step', i, ':', loss_val/(num_steps/100))
            loss_val = 0

    return model.get_embeddings()


def tsne_plot(embeddings, num, reverse_dictionary, filename):
    """Plot tSNE result of embeddings for a subset of words"""
    try:
        from sklearn.manifold import TSNE
        import matplotlib.pyplot as plt
    except ImportError as ex:
        print('Please install sklearn, matplotlib, and scipy to plot embeddings.')
        print(ex)
        return
    tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000, method='exact')
    low_dim_embs = tsne.fit_transform(final_embeddings[:num, :])
    low_dim_labels = [reverse_dictionary[i] for i in xrange(num)]
    assert low_dim_embs.shape[0] >= len(low_dim_labels), 'More labels than embeddings'
    plt.figure(figsize=(18, 18))  # in inches
    for i, label in enumerate(low_dim_labels):
        x, y = low_dim_embs[i, :]
        plt.scatter(x, y)
        plt.annotate(label,
                     xy=(x, y),
                     xytext=(5, 2),
                     textcoords='offset points',
                     ha='right',
                     va='bottom')
    print("Saving plot to:", filename)
    plt.savefig(filename)

if __name__ == '__main__':
    args = cmd_parser.parse_args()
    dev = get_deivice(args.disable_cuda)
    # Data preprocessing
    vocabulary = read_data(args.data)
    print('Data size', len(vocabulary))
    data, count, dictionary, reverse_dictionary = build_dataset(vocabulary,
                                                                args.size)
    vocabulary_size = min(args.size, len(count))
    print('Vocabulary size', vocabulary_size)
    word_count = [ c[1] for c in count]
    # Model training
    start_time = timeit.default_timer()
    final_embeddings = train(device=dev,
                             data=data,
                             word_count=word_count,
                             mode=args.mode,
                             vocabulary_size=vocabulary_size,
                             embedding_dim=args.embedding_dim,
                             batch_size=args.batch_size,
                             num_skips=args.num_skips,
                             skip_window=args.skip_window,
                             num_steps=args.num_steps,
                             learning_rate=args.learning_rate,
                             clip=args.clip,
                             neg_num=args.negative_example)
    print('Training time:', timeit.default_timer() - start_time, 'Seconds')
    # Save result and plotting
    save_embeddings(args.output, final_embeddings, dictionary)

argument부분에서 parameter들을 설정할 수 있고, train function을 보면 train단에 backward(backpropagation)단이 있다.


  • plotting
save_plot_name ='tsne.png' #Plotting output filename
plot_num=100 #Plotting data number

def tsne_plot(embeddings, num, reverse_dictionary, filename):
    """Plot tSNE result of embeddings for a subset of words"""
    try:
        from sklearn.manifold import TSNE
        import matplotlib.pyplot as plt
    except ImportError as ex:
        print('Please install sklearn, matplotlib, and scipy to plot embeddings.')
        print(ex)
        return
    tsne = TSNE(perplexity=30, n_components=2, init='pca', n_iter=5000, method='exact')
    low_dim_embs = tsne.fit_transform(final_embeddings[:num, :])
    low_dim_labels = [reverse_dictionary[i] for i in xrange(num)]
    assert low_dim_embs.shape[0] >= len(low_dim_labels), 'More labels than embeddings'
    plt.figure(figsize=(18, 18))  # in inches
    for i, label in enumerate(low_dim_labels):
        x, y = low_dim_embs[i, :]
        plt.scatter(x, y)
        plt.annotate(label,
                     xy=(x, y),
                     xytext=(5, 2),
                     textcoords='offset points',
                     ha='right',
                     va='bottom')
    print("Saving plot to:", filename)
    plt.savefig(filename)

if __name__=="__main__":
	m=Word2Vec()
    model = m.from_file("embeddings.bin") #load model
    
    norm = torch.sqrt(torch.cumsum(torch.mul(model, model), 1))
    nomalized_embeddings = (model/norm).cpu().numpy()
    tsne_plot(embeddings=nomalized_embeddings,
              num=min(vocabulary_size, plot_num),
              reverse_dictionary=reverse_dictionary,
              filename=save_plot_name)

수치로는 어떤 단어가 거리가 먼지(score가 낮은지) 알 수 없기 때문에 plot chart를 만들어주는 function이다.

728x90

'DeepLearning > NLP' 카테고리의 다른 글

[NLP] Drain3 - Python  (0) 2025.02.19
[NLP] 텍스트 데이터 전처리 - Log Template  (1) 2025.01.13
[NLP] Transformer - BERT  (1) 2025.01.11
[NLP] Encoder-Decoder와 Attention  (2) 2025.01.06
[NLP] 텍스트 데이터 전처리  (2) 2025.01.05