Skip to content

NewsGroups Dataset Vignette

In this vignette, I will show you how to create a database for storing and manipulating

Introduction to dataset

We will be using the 20 Newsgroups dataset for this vignette. This is the sklearn website description:

The 20 newsgroups dataset comprises around 18000 newsgroups posts on 20 topics split in two subsets: one for training (or development) and the other one for testing (or for performance evaluation). The split between the train and test set is based upon a messages posted before and after a specific date.

We use sklearn's fetch_20newsgroups method to download and access articles from the politics newsgroup.

import sklearn.datasets
newsgroups = sklearn.datasets.fetch_20newsgroups(categories=['talk.politics.guns', 'talk.politics.mideast', 'talk.politics.misc'])
newsgroups.keys(), len(newsgroups['data'])
(dict_keys(['data', 'filenames', 'target_names', 'target', 'DESCR']), 1575)

This is an example of a newsgroup post.

print(newsgroups['data'][0])
From: golchowy@alchemy.chem.utoronto.ca (Gerald Olchowy)
Subject: Re: Help fight the Clinton Administration's invasion of your privacy
Organization: University of Toronto Chemistry Department
Lines: 16

In article <9308@blue.cis.pitt.edu> cjp+@pitt.edu (Casimir J Palowitch) writes:
>The Clinton Administration wants to "manage" your use of digital
>encryption. This includes a proposal which would limit your use of
>encryption to a standard developed by the NSA, the technical details of 
>which would remain classified with the government.
>
>This cannot be allowed to happen.
>

It is a bit unfair to call blame the Clinton Administration alone...this
initiative was underway under the Bush Administration...it is basically
a bipartisan effort of the establishment Demopublicans and
Republicrats...the same bipartisan effort that brought the S&L scandal,
and BCCI, etc.

Gerald

It looks very similar to an email, so we will use Python's email package to parse the text and return a dictionary containing the various relevant fields. Our parse_email function shows how we can extract metadata fields like author, subject, and organization from the message, as well as the main text body.

import email

def parse_newsgroup(email_text):
    message = email.message_from_string(email_text)
    return {
        'author': message['from'],
        'subject': message['Subject'],
        'organization': message['Organization'],
        'lines': int(message['Lines']),
        'text': message.get_payload(),
    }

parse_newsgroup(newsgroups['data'][0])
{'author': 'golchowy@alchemy.chem.utoronto.ca (Gerald Olchowy)',
 'subject': "Re: Help fight the Clinton Administration's invasion of your privacy",
 'organization': 'University of Toronto Chemistry Department',
 'lines': 16,
 'text': 'In article <9308@blue.cis.pitt.edu> cjp+@pitt.edu (Casimir J Palowitch) writes:\n>The Clinton Administration wants to "manage" your use of digital\n>encryption. This includes a proposal which would limit your use of\n>encryption to a standard developed by the NSA, the technical details of \n>which would remain classified with the government.\n>\n>This cannot be allowed to happen.\n>\n\nIt is a bit unfair to call blame the Clinton Administration alone...this\ninitiative was underway under the Bush Administration...it is basically\na bipartisan effort of the establishment Demopublicans and\nRepublicrats...the same bipartisan effort that brought the S&L scandal,\nand BCCI, etc.\n\nGerald\n'}

Creating a database schema

The first step will be to create a database schema that is appropriate for the newsgroup dataset by defining a container dataclass using the @schema decorator. The schema decorator will convert the class into a dataclass with slots enabled (provided __slots__ = [] is given in the definition), and inherit from DocTableRow to add some additional functionality. The type hints associated with each variable will be used in the schema definition for the new tables, and arguments to Col(), IDCol(), AddedCol(), and UpdatedCol() will mostly be passed to dataclasses.field (see docs for more detail), so all dataclass functionality is maintained. The doctable schema guide explains more about schema and schema object definitions.

Here I define a NewsgroupDoc class to represent a single document and define __slots__ so the decorator can automatically create a slot class. Each member variable will act as a column in our database schema, and the first variable we define is an id column with the defaulted value IDCol(). This is a special function that will translate to a schema that uses the id colum as the primary key and enable auto-incrementing. Because id is defaulted, we must default our other variables as well.

I also define a couple of methods as part of our schema class - they are ignored in the schema creation process, but allow us to manipulate the object within Python. The author_email property will extract just the email address from the author field. Note that even though it is a property, it is defined as a method and therefore will not be considered when creating the class schema. I also define a classmethod that can be used to create a new NewsgroupDoc from the newsgroup text - this replaces the functionality of the parse_email function we created above. This way, the class knows how to create itself from the raw newsgroup text.

import sys
sys.path.append('..')
import doctable

import re
import email
import dataclasses

def try_int(text):
    try:
        return int(text.split()[0])
    except:
        return None


@doctable.schema
class NewsgroupDoc:
    __slots__ = []

    # schema columns
    id: int = doctable.IDCol()
    author: str = None
    subject: str = None
    organization: str = None
    length: int = None
    text: str = None

    @property
    def author_email(self, pattern=re.compile(r'\b[A-Za-z0-9._%+-]+@[A-Za-z0-9.-]+\.[A-Z|a-z]{2,}\b')):
        '''Get the author\'s email address from the author field text.
        '''
        return re.search(pattern, self.author)[0]


    @classmethod
    def from_string(cls, newsgroup_text):
        '''Code to create a NewsGroupDoc object from the original newsgroup string.
        '''
        message = email.message_from_string(newsgroup_text)
        return cls(
            author = message['from'],
            subject = message['Subject'],
            organization = message['Organization'],
            length = len(message.get_payload()),
            text = message.get_payload(),
        )


# for example, we create a new NewsGroupDoc from the first newsgroup article
ngdoc = NewsgroupDoc.from_string(newsgroups['data'][0])
print(ngdoc.author)
ngdoc.author_email
golchowy@alchemy.chem.utoronto.ca (Gerald Olchowy)





'golchowy@alchemy.chem.utoronto.ca'

To make sure the NewsgroupDoc will translate to the database schema we expect, we can create a new DocTable object that uses it as a schema. We use the schema argument of the DocTable constructor to specify the schema, and print it below. See that most fields were translated to VARCHAR type fields, but id and length were translated to INTEGER types based on their type hints.

ng_table = doctable.DocTable(target=':memory:', tabname='documents', schema=NewsgroupDoc)
ng_table.schema_table()
name type nullable default autoincrement primary_key
0 id INTEGER False None auto 1
1 author VARCHAR True None auto 0
2 subject VARCHAR True None auto 0
3 organization VARCHAR True None auto 0
4 length INTEGER True None auto 0
5 text VARCHAR True None auto 0

To better describe the data we are interested in, we now create a class that inherits from DocTable. This class will act as the main interface for working with our dataset. We use the _tabname_ and _schema_ properties to define the table name and schema so we don't need to include them in the constructor. We also define a method count_author_emails - we will describe the behavior of this method later.

import collections

class NewsgroupTable(doctable.DocTable):
    _tabname_ = 'documents'
    _schema_ = NewsgroupDoc

    def count_author_emails(self, *args, **kwargs):
        author_emails = self.select('author', *args, **kwargs)
        return collections.Counter(author_emails)

Instead of using target=':memory:', we want to create a database on our filesystem so we can store data. By default, DocTable uses sqlite as the database engine, so with target we need only specify a filename. Because this is just a demonstration, we will create the database in a temporary folder using the tempfile package. This database does not exist yet, so we use the new_db flag to indicate that a new one should be created.

import tempfile

tempfolder = tempfile.TemporaryDirectory()
table_fname = f'{tempfolder.name}/tmp1.db'
ng_table = NewsgroupTable(target=table_fname, new_db=True)
ng_table.schema_table()
name type nullable default autoincrement primary_key
0 id INTEGER False None auto 1
1 author VARCHAR True None auto 0
2 subject VARCHAR True None auto 0
3 organization VARCHAR True None auto 0
4 length INTEGER True None auto 0
5 text VARCHAR True None auto 0

Parsing and storing documents

Now we would like to parse our documents for storage in the database. It is relatively straighforward to create a list of parsed texts using the from_string method. After doing this, we could potentially just insert them directly into the database.

%timeit [NewsgroupDoc.from_string(text) for text in newsgroups['data']]
191 ms ± 527 µs per loop (mean ± std. dev. of 7 runs, 1 loop each)

This is a relatively straigtforward task with a dataset of this size, but if we had a larger dataset or used more costly parsing algorithms, we would want to distribute parsing across multiple processes - we will take that approach for demonstration. First we define the process_and_store class to be used in each worker process.

def thread_func(numbers, db):
    print(f'starting process')
    db.reopen_engine() # create all new connections
    db.insert([{'subject': i} for  i in numbers])
    #for num in numbers:
    #    db.insert({'process': process_id, 'number': num})
    #    time.sleep(0.01)

numbers = list(range(100)) # these numbers are to be inserted into the database

ng_table.delete()
with doctable.Distribute(5) as d:
    d.map_chunk(thread_func, numbers, ng_table)
ng_table.head(10)
starting process
starting process
starting process
starting process
starting process
id author subject organization length text
0 1 None 0 None None None
1 2 None 1 None None None
2 3 None 2 None None None
3 4 None 3 None None None
4 5 None 4 None None None
5 6 None 5 None None None
6 7 None 6 None None None
7 8 None 7 None None None
8 9 None 8 None None None
9 10 None 9 None None None
def printer(x, table):
    print(x, table)

with doctable.WorkerPool(3, verbose=False) as p:
    assert(p.any_alive())
    print(f'av efficiency: {p.av_efficiency()}')

    p.map(printer, list(range(100)), table=ng_table)


    # test most basic map function
    #elements = list(range(100))
    #assert(pool.map(example_func, elements) == [example_func(e) for e in elements])
    print(f'av efficiency: {p.av_efficiency()}')
---------------------------------------------------------------------------

AssertionError                            Traceback (most recent call last)

<ipython-input-10-48aa27ce5dc0> in <module>
      3 
      4 with doctable.WorkerPool(3, verbose=False) as p:
----> 5     assert(p.any_alive())
      6     print(f'av efficiency: {p.av_efficiency()}')
      7


AssertionError:
import pickle
pickle.dumps(ng_table.schema_info)
import multiprocessing
class parse_thread:
    def __init__(self, table: doctable.DocTable):
        self.table = table

    def __call__(self, texts):
        with self.table as t:
            #records = [NewsgroupDoc.from_string(text) for text in texts]

            t.insert(NewsgroupDoc(1000))

def parse_thread2(x):
    return None

chunks = doctable.chunk(newsgroups['data'], chunk_size=100)
#parse_func = parse_thread(ng_table)
with multiprocessing.Pool(4) as p:
    %time p.map(parse_thread(ng_table), chunks, 100)
#%time map(parse_thread(1), chunks)
class process_and_store:
    table: doctable.DocTable = None

    def __init__(self, table_cls, *table_args, **table_kwargs):
        '''Store info to construct the table.
        '''
        self.table_cls = table_cls
        self.table_args = table_args
        self.table_kwargs = table_kwargs

    def connect_db(self):
        '''Make a new connection to the database and return the associated table.
        '''
        if self.table is None:
            self.table = self.table_cls(*self.table_args, **self.table_kwargs)
        return self.table

    def __call__(self, text):
        '''Execute function in worker process.
        '''
        table = self.connect_db()

        record = NewsgroupDoc.from_string(text)
        table.insert(record)

import multiprocessing
with multiprocessing.Pool(4) as p:
    %time p.map(process_and_store(NewsgroupTable, target=table_fname), newsgroups['data'])

Notice that this takes very little CPU time, but a long "wall time" (overall time it takes to run the program). This is because the threads are IO-starved - they spend a lot of time waiting on each other to commit database transactions. This might be a good opportunity to use variations on threading models, but most parsing classes

class process_and_store_chunk(process_and_store):
    def __call__(self, texts):
        '''Execute function in worker process.
        '''
        table = self.connect_db()

        records = [NewsgroupDoc.from_string(text) for text in texts]
        table.insert(records)

chunked_newsgroups = doctable.chunk(newsgroups['data'], chunk_size=500)
with multiprocessing.Pool(4) as p:
    %time p.map(process_and_store_chunk(NewsgroupTable, target=table_fname), chunked_newsgroups)


parser = ParsePipeline([
    parse_email
])


for email_text in newsgroups['data']:
    email_data = parse_email(email_text)


import multiprocessing
with multiprocessing.Pool(10) as p:
    print(p)